golang实现TCP协议缓冲区数据流传输插件库buffstreams的使用

Golang实现TCP协议缓冲区数据流传输插件库buffstreams的使用

BuffStreams是一个用于在Golang中通过TCP流式传输Protocol Buffers消息的库。

什么是BuffStreams?

BuffStreams是对TCP连接的一组抽象,用于流式传输包含消息长度+消息本身的数据格式(类似于Protocol Buffers,因此得名)。

如何使用

安装库

go get "github.com/StabbyCutyou/buffstreams"

导入库

import "github.com/StabbyCutyou/buffstreams"

监听连接

核心对象之一是TCPListener,它允许你在本地端口上打开套接字并等待客户端连接。

配置监听器

cfg := TCPListenerConfig {
  EnableLogging: false, // true将日志消息打印到stdout/stderr
  MaxMessageSize: 4096,
  Callback: func(byte[])error{return nil} // 任何符合此签名的函数类型
  Address: FormatAddress("", strconv.Itoa(5031)) // ip:port格式的地址
}

开始监听

btl, err := buffstreams.ListenTCP(cfg)

// 异步非阻塞方式
err := btl.StartListeningAsync()

// 或阻塞方式
err := btl.StartListening()

监听回调

Buffstreams通过回调处理传入消息:

type ListenCallback func([]byte) error

示例回调:

func ListenCallbackExample ([]byte data) error {
  msg := &message.ImportantProtoBuffStreamingMessage{}
  err := proto.Unmarshal(data, msg)
  // 现在可以对msg进行操作
  ...
}

写入消息

配置连接

cfg := TCPConnConfig {
  EnableLogging: false,
  MaxMessageSize: 4096, // 应与服务器端的MaxMessageSize匹配
  Address: FormatAddress("127.0.0.1", strconv.Itoa(5031))
}

建立连接并写入

btc, err := buffstreams.DialTCP(cfg)

// 写入数据
bytesWritten, err := btc.Write(msgBytes, true)

管理器

Manager类提供了对拨号和监听连接的简单抽象管理。

创建管理器

bm := buffstreams.NewManager()

监听端口

err := bm.StartListening(cfg)

拨号连接

err := bm.Dial(cfg)

写入数据

bytesWritten, err := bm.Write("127.0.0.1:5031", dataBytes)

关闭连接

// 关闭监听器
err := bm.CloseListener("127.0.0.1:5031")

// 关闭写入器
err := bm.CloseWriter("127.0.0.1:5031")

完整示例

服务器端

package main

import (
	"github.com/StabbyCutyou/buffstreams"
	"strconv"
)

func main() {
	// 配置监听器
	cfg := buffstreams.TCPListenerConfig{
		EnableLogging:  true,
		MaxMessageSize: 4096,
		Callback:       handleMessage,
		Address:        buffstreams.FormatAddress("", strconv.Itoa(5031)),
	}

	// 创建监听器
	btl, err := buffstreams.ListenTCP(cfg)
	if err != nil {
		panic(err)
	}

	// 开始监听
	err = btl.StartListening()
	if err != nil {
		panic(err)
	}
}

func handleMessage(data []byte) error {
	// 在这里处理接收到的消息
	// msg := &YourProtoMessage{}
	// proto.Unmarshal(data, msg)
	// 处理msg...
	return nil
}

客户端

package main

import (
	"github.com/StabbyCutyou/buffstreams"
	"strconv"
)

func main() {
	// 配置连接
	cfg := buffstreams.TCPConnConfig{
		EnableLogging:  true,
		MaxMessageSize: 4096,
		Address:       buffstreams.FormatAddress("127.0.0.1", strconv.Itoa(5031)),
	}

	// 建立连接
	btc, err := buffstreams.DialTCP(cfg)
	if err != nil {
		panic(err)
	}

	// 创建并序列化Protocol Buffers消息
	// msg := &YourProtoMessage{...}
	// msgBytes, err := proto.Marshal(msg)
	msgBytes := []byte("test message")

	// 写入消息
	_, err = btc.Write(msgBytes, true)
	if err != nil {
		panic(err)
	}
}

性能

Buffstreams经过优化,能够在1gig网卡上达到每秒110万条消息(每条消息110字节)的传输速度。

许可证

Apache v2 - 参见LICENSE文件


更多关于golang实现TCP协议缓冲区数据流传输插件库buffstreams的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang实现TCP协议缓冲区数据流传输插件库buffstreams的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang实现TCP协议缓冲区数据流传输:buffstreams库使用指南

buffstreams简介

buffstreams是一个专门为Golang设计的TCP流处理库,它简化了基于TCP协议的数据流传输,特别适合处理固定大小或可变大小的数据块传输。该库提供了缓冲区管理、数据分帧和流控制等功能。

安装buffstreams

go get github.com/StabbyCutyou/buffstreams

基本使用示例

1. 创建TCP服务器

package main

import (
	"fmt"
	"log"
	"github.com/StabbyCutyou/buffstreams"
)

func main() {
	// 创建buffstreams配置
	cfg := buffstreams.TCPListenerConfig{
		Address:           "localhost:8080", // 监听地址
		MaxMessageSize:    4096,             // 最大消息大小
		EnableLogging:     true,             // 启用日志
		ConnectionTimeout: 0,                // 连接超时(0表示不超时)
	}

	// 创建TCP监听器
	listener, err := buffstreams.ListenTCP(cfg)
	if err != nil {
		log.Fatalf("Failed to listen: %v", err)
	}
	defer listener.Close()

	fmt.Println("Server started, waiting for connections...")

	// 处理连接
	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Printf("Accept error: %v", err)
			continue
		}

		go handleConnection(conn)
	}
}

func handleConnection(conn *buffstreams.TCPConn) {
	defer conn.Close()
	
	buf := make([]byte, 4096)
	for {
		// 读取数据
		n, err := conn.Read(buf)
		if err != nil {
			log.Printf("Read error: %v", err)
			return
		}

		// 处理接收到的数据
		received := string(buf[:n])
		fmt.Printf("Received: %s\n", received)

		// 回显数据
		_, err = conn.Write(buf[:n])
		if err != nil {
			log.Printf("Write error: %v", err)
			return
		}
	}
}

2. 创建TCP客户端

package main

import (
	"fmt"
	"log"
	"time"
	"github.com/StabbyCutyou/buffstreams"
)

func main() {
	// 创建buffstreams配置
	cfg := buffstreams.TCPConnConfig{
		Address:           "localhost:8080", // 服务器地址
		MaxMessageSize:    4096,             // 最大消息大小
		EnableLogging:     true,             // 启用日志
		ConnectionTimeout: 5 * time.Second,  // 连接超时
	}

	// 连接到服务器
	conn, err := buffstreams.DialTCP(cfg)
	if err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer conn.Close()

	// 发送数据
	message := "Hello, buffstreams!"
	_, err = conn.Write([]byte(message))
	if err != nil {
		log.Fatalf("Write error: %v", err)
	}

	// 读取响应
	buf := make([]byte, 4096)
	n, err := conn.Read(buf)
	if err != nil {
		log.Fatalf("Read error: %v", err)
	}

	fmt.Printf("Server response: %s\n", string(buf[:n]))
}

高级功能

1. 使用帧分隔符处理消息

// 服务器端配置
cfg := buffstreams.TCPListenerConfig{
	Address:           "localhost:8080",
	MaxMessageSize:    4096,
	EnableLogging:     true,
	ConnectionTimeout: 0,
	FrameDelimiter:    '\n', // 设置帧分隔符
}

// 客户端发送带分隔符的消息
message := "Frame 1\nFrame 2\n"
_, err = conn.Write([]byte(message))

2. 处理大文件传输

// 服务器端处理大文件
func handleFileTransfer(conn *buffstreams.TCPConn) {
	file, err := os.Create("received_file.dat")
	if err != nil {
		log.Printf("File create error: %v", err)
		return
	}
	defer file.Close()

	buf := make([]byte, 4096)
	for {
		n, err := conn.Read(buf)
		if err != nil {
			if err == io.EOF {
				log.Println("File transfer completed")
				return
			}
			log.Printf("Read error: %v", err)
			return
		}

		_, err = file.Write(buf[:n])
		if err != nil {
			log.Printf("File write error: %v", err)
			return
		}
	}
}

3. 性能调优

// 调整缓冲区大小和并发设置
cfg := buffstreams.TCPListenerConfig{
	Address:           "localhost:8080",
	MaxMessageSize:    8192,       // 增大缓冲区
	EnableLogging:     false,      // 生产环境可关闭日志
	ConnectionTimeout: 10 * time.Second,
	ReadBufferSize:    65536,      // 增大读缓冲区
	WriteBufferSize:   65536,      // 增大写缓冲区
}

错误处理建议

  1. 始终检查ReadWrite操作的返回值
  2. 实现连接重试机制
  3. 使用超时防止阻塞
  4. 记录详细的错误日志

总结

buffstreams为Golang TCP通信提供了简单高效的解决方案,特别适合处理流式数据。通过合理配置缓冲区大小、帧分隔符和超时设置,可以构建稳定可靠的网络应用。

实际项目中,可以根据需求进一步封装buffstreams,添加应用层协议(如自定义包头)或加密功能,以满足更复杂的业务需求。

回到顶部