golang基于Reactor模式的轻量级非阻塞TCP网络库插件gev的使用

Golang基于Reactor模式的轻量级非阻塞TCP网络库插件gev的使用

gev是一个基于Reactor模式的轻量级、快速非阻塞TCP网络库/WebSocket服务器,支持自定义协议,可以快速轻松地构建高性能服务器。

主要特性

  • 基于epoll和kqueue的高性能事件循环
  • 支持多核和多线程
  • 通过Ring Buffer实现读写缓冲区的动态扩展
  • 异步读写
  • 支持SO_REUSEPORT端口复用
  • 自动清理空闲连接
  • 支持WebSocket/Protobuf和自定义协议
  • 支持定时任务和延迟任务
  • 高性能WebSocket服务器

网络模型

gev只使用少量goroutine,其中一个监听连接,其他(工作协程)处理已连接客户端的读写事件。工作协程的数量是可配置的,默认为主机CPU的核心数。

安装

go get -u github.com/Allenxuxu/gev

快速开始

Echo服务器示例

package main

import (
	"flag"
	"net/http"
	_ "net/http/pprof"
	"strconv"
	"time"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/log"
	"github.com/Allenxuxu/toolkit/sync/atomic"
)

type example struct {
	Count atomic.Int64
}

func (s *example) OnConnect(c *gev.Connection) {
	s.Count.Add(1)
	//log.Println(" OnConnect : ", c.PeerAddr())
}

func (s *example) OnMessage(c *gev.Connection, ctx interface{}, data []byte) (out interface{}) {
	//log.Println("OnMessage")
	out = data
	return
}

func (s *example) OnClose(c *gev.Connection) {
	s.Count.Add(-1)
	//log.Println("OnClose")
}

func main() {
	go func() {
		if err := http.ListenAndServe(":6060", nil); err != nil {
			panic(err)
		}
	}()

	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.MetricsServer("", ":9091"),
	)
	if err != nil {
		panic(err)
	}

	s.RunEvery(time.Second*2, func() {
		log.Info("connections :", handler.Count.Get())
	})

	s.Start()
}

处理器接口

程序必须实现Handler接口:

type CallBack interface {
	OnMessage(c *Connection, ctx interface{}, data []byte) interface{}
	OnClose(c *Connection)
}

type Handler interface {
	CallBack
	OnConnect(c *Connection)
}

当完整的数据帧到达时,OnMessage会被回调。用户可以获取数据、处理业务逻辑,并返回需要发送的数据。

协议处理

gev提供了默认的Protocol实现,会获取接收缓冲区(ringbuffer)中的所有数据。实际使用中通常会有自己的数据帧协议,gev可以通过插件形式设置:在创建Server时通过可变参数设置。

s, err := gev.NewServer(handler,gev.Protocol(&ExampleProtocol{}))

WebSocket支持

WebSocket协议构建在TCP协议之上,因此gev不需要内置支持,而是以插件形式在plugins/websocket目录中提供支持。

type Protocol struct {
	upgrade *ws.Upgrader
}

func New(u *ws.Upgrader) *Protocol {
	return &Protocol{upgrade: u}
}

func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
	upgraded := c.Context()
	if upgraded == nil {
		var err error
		out, _, err = p.upgrade.Upgrade(buffer)
		if err != nil {
			log.Println("Websocket Upgrade :", err)
			return
		}
		c.SetContext(true)
	} else {
		header, err := ws.VirtualReadHeader(buffer)
		if err != nil {
			log.Println(err)
			return
		}
		if buffer.VirtualLength() >= int(header.Length) {
			buffer.VirtualFlush()

			payload := make([]byte, int(header.Length))
			_, _ = buffer.Read(payload)

			if header.Masked {
				ws.Cipher(payload, header.Mask, 0)
			}

			ctx = &header
			out = payload
		} else {
			buffer.VirtualRevert()
		}
	}
	return
}

func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
	return data
}

更多示例

  • Echo服务器
  • 自动清理空闲连接
  • 最大连接数限制
  • 服务器定时推送
  • WebSocket
  • Protobuf
  • 等等

更多关于golang基于Reactor模式的轻量级非阻塞TCP网络库插件gev的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于Reactor模式的轻量级非阻塞TCP网络库插件gev的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用gev构建轻量级非阻塞TCP网络库

gev是一个基于Reactor模式的轻量级高性能网络库,采用Golang实现,支持非阻塞IO和多路复用技术。下面我将详细介绍gev的使用方法和示例代码。

gev核心特性

  1. 基于事件驱动的Reactor模式
  2. 非阻塞IO操作
  3. 支持多路复用(epoll/kqueue)
  4. 高性能的协程池
  5. 简洁易用的API接口

安装gev

go get github.com/Allenxuxu/gev

基本使用示例

1. 创建TCP服务器

package main

import (
	"log"
	
	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}

func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Println("OnMessage:", string(data))
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	
	// 创建服务器
	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":1234"),
		gev.NumLoops(4), // 设置4个事件循环
	)
	if err != nil {
		panic(err)
	}

	log.Println("server start")
	s.Start()
}

2. 创建TCP客户端

package main

import (
	"log"
	"time"
	
	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
)

type clientHandler struct{}

func (h *clientHandler) OnConnect(c *connection.Connection) {
	log.Println("client connected")
	
	// 定时发送消息
	go func() {
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()
		
		for range ticker.C {
			c.Send([]byte("hello from client"))
		}
	}()
}

func (h *clientHandler) OnMessage(c *connection.Connection, data []byte) (out []byte) {
	log.Printf("client received: %s", string(data))
	return nil
}

func (h *clientHandler) OnClose(c *connection.Connection) {
	log.Println("client closed")
}

func main() {
	handler := new(clientHandler)
	
	// 创建客户端连接
	conn, err := gev.NewClient(handler, 
		gev.Network("tcp"),
		gev.Address("127.0.0.1:1234"),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()
	
	// 保持连接
	select {}
}

高级特性

1. 使用协程池处理业务

s, err := gev.NewServer(handler,
	gev.Network("tcp"),
	gev.Address(":1234"),
	gev.NumLoops(4),
	gev.WorkerPoolSize(100), // 设置100个worker的协程池
)

2. 自定义协议解析

gev支持自定义协议解析器:

type customProtocol struct{}

func (p *customProtocol) UnPacket(c *connection.Connection, buffer *connection.Buffer) (interface{}, []byte) {
	// 自定义协议解析逻辑
	if buffer.Length() > 4 {
		data := buffer.Bytes()
		length := binary.BigEndian.Uint32(data[:4])
		if buffer.Length() >= int(length)+4 {
			buffer.Retrieve(4)
			out := data[4 : 4+length]
			buffer.Retrieve(int(length))
			return nil, out
		}
	}
	return nil, nil
}

func (p *customProtocol) Packet(c *connection.Connection, data []byte) []byte {
	// 自定义协议打包逻辑
	length := len(data)
	ret := make([]byte, length+4)
	binary.BigEndian.PutUint32(ret[:4], uint32(length))
	copy(ret[4:], data)
	return ret
}

// 使用自定义协议
s, err := gev.NewServer(handler,
	gev.Network("tcp"),
	gev.Address(":1234"),
	gev.Protocol(&customProtocol{}),
)

3. SSL/TLS支持

// 创建TLS配置
cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
	log.Fatal(err)
}

tlsConfig := &tls.Config{
	Certificates: []tls.Certificate{cert},
}

// 创建带TLS的服务器
s, err := gev.NewServer(handler,
	gev.Network("tcp"),
	gev.Address(":1234"),
	gev.TLSConfig(tlsConfig),
)

性能优化建议

  1. 根据CPU核心数合理设置NumLoops参数
  2. 对于高并发场景,使用WorkerPool处理业务逻辑
  3. 避免在事件回调中进行耗时操作
  4. 合理设置读写缓冲区大小

总结

gev是一个轻量级但功能强大的Golang网络库,特别适合需要高性能TCP通信的场景。通过Reactor模式和协程池的结合,它能够在保持简洁API的同时提供出色的性能表现。

以上示例展示了gev的基本用法和部分高级特性,更多功能可以参考官方文档和源码。在实际项目中,可以根据需求选择合适的配置参数和扩展方式。

回到顶部