golang实现TCP协议缓冲区数据流传输插件库buffstreams的使用
Golang实现TCP协议缓冲区数据流传输插件库buffstreams的使用
BuffStreams是一个用于在Golang中通过TCP流式传输Protocol Buffers消息的库。
什么是BuffStreams?
BuffStreams是对TCP连接的一组抽象,用于流式传输包含消息长度+消息本身的数据格式(类似于Protocol Buffers,因此得名)。
如何使用
安装库
go get "github.com/StabbyCutyou/buffstreams"
导入库
import "github.com/StabbyCutyou/buffstreams"
监听连接
核心对象之一是TCPListener,它允许你在本地端口上打开套接字并等待客户端连接。
配置监听器
cfg := TCPListenerConfig {
EnableLogging: false, // true将日志消息打印到stdout/stderr
MaxMessageSize: 4096,
Callback: func(byte[])error{return nil} // 任何符合此签名的函数类型
Address: FormatAddress("", strconv.Itoa(5031)) // ip:port格式的地址
}
开始监听
btl, err := buffstreams.ListenTCP(cfg)
// 异步非阻塞方式
err := btl.StartListeningAsync()
// 或阻塞方式
err := btl.StartListening()
监听回调
Buffstreams通过回调处理传入消息:
type ListenCallback func([]byte) error
示例回调:
func ListenCallbackExample ([]byte data) error {
msg := &message.ImportantProtoBuffStreamingMessage{}
err := proto.Unmarshal(data, msg)
// 现在可以对msg进行操作
...
}
写入消息
配置连接
cfg := TCPConnConfig {
EnableLogging: false,
MaxMessageSize: 4096, // 应与服务器端的MaxMessageSize匹配
Address: FormatAddress("127.0.0.1", strconv.Itoa(5031))
}
建立连接并写入
btc, err := buffstreams.DialTCP(cfg)
// 写入数据
bytesWritten, err := btc.Write(msgBytes, true)
管理器
Manager类提供了对拨号和监听连接的简单抽象管理。
创建管理器
bm := buffstreams.NewManager()
监听端口
err := bm.StartListening(cfg)
拨号连接
err := bm.Dial(cfg)
写入数据
bytesWritten, err := bm.Write("127.0.0.1:5031", dataBytes)
关闭连接
// 关闭监听器
err := bm.CloseListener("127.0.0.1:5031")
// 关闭写入器
err := bm.CloseWriter("127.0.0.1:5031")
完整示例
服务器端
package main
import (
"github.com/StabbyCutyou/buffstreams"
"strconv"
)
func main() {
// 配置监听器
cfg := buffstreams.TCPListenerConfig{
EnableLogging: true,
MaxMessageSize: 4096,
Callback: handleMessage,
Address: buffstreams.FormatAddress("", strconv.Itoa(5031)),
}
// 创建监听器
btl, err := buffstreams.ListenTCP(cfg)
if err != nil {
panic(err)
}
// 开始监听
err = btl.StartListening()
if err != nil {
panic(err)
}
}
func handleMessage(data []byte) error {
// 在这里处理接收到的消息
// msg := &YourProtoMessage{}
// proto.Unmarshal(data, msg)
// 处理msg...
return nil
}
客户端
package main
import (
"github.com/StabbyCutyou/buffstreams"
"strconv"
)
func main() {
// 配置连接
cfg := buffstreams.TCPConnConfig{
EnableLogging: true,
MaxMessageSize: 4096,
Address: buffstreams.FormatAddress("127.0.0.1", strconv.Itoa(5031)),
}
// 建立连接
btc, err := buffstreams.DialTCP(cfg)
if err != nil {
panic(err)
}
// 创建并序列化Protocol Buffers消息
// msg := &YourProtoMessage{...}
// msgBytes, err := proto.Marshal(msg)
msgBytes := []byte("test message")
// 写入消息
_, err = btc.Write(msgBytes, true)
if err != nil {
panic(err)
}
}
性能
Buffstreams经过优化,能够在1gig网卡上达到每秒110万条消息(每条消息110字节)的传输速度。
许可证
Apache v2 - 参见LICENSE文件
更多关于golang实现TCP协议缓冲区数据流传输插件库buffstreams的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang实现TCP协议缓冲区数据流传输插件库buffstreams的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang实现TCP协议缓冲区数据流传输:buffstreams库使用指南
buffstreams简介
buffstreams是一个专门为Golang设计的TCP流处理库,它简化了基于TCP协议的数据流传输,特别适合处理固定大小或可变大小的数据块传输。该库提供了缓冲区管理、数据分帧和流控制等功能。
安装buffstreams
go get github.com/StabbyCutyou/buffstreams
基本使用示例
1. 创建TCP服务器
package main
import (
"fmt"
"log"
"github.com/StabbyCutyou/buffstreams"
)
func main() {
// 创建buffstreams配置
cfg := buffstreams.TCPListenerConfig{
Address: "localhost:8080", // 监听地址
MaxMessageSize: 4096, // 最大消息大小
EnableLogging: true, // 启用日志
ConnectionTimeout: 0, // 连接超时(0表示不超时)
}
// 创建TCP监听器
listener, err := buffstreams.ListenTCP(cfg)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
defer listener.Close()
fmt.Println("Server started, waiting for connections...")
// 处理连接
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("Accept error: %v", err)
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn *buffstreams.TCPConn) {
defer conn.Close()
buf := make([]byte, 4096)
for {
// 读取数据
n, err := conn.Read(buf)
if err != nil {
log.Printf("Read error: %v", err)
return
}
// 处理接收到的数据
received := string(buf[:n])
fmt.Printf("Received: %s\n", received)
// 回显数据
_, err = conn.Write(buf[:n])
if err != nil {
log.Printf("Write error: %v", err)
return
}
}
}
2. 创建TCP客户端
package main
import (
"fmt"
"log"
"time"
"github.com/StabbyCutyou/buffstreams"
)
func main() {
// 创建buffstreams配置
cfg := buffstreams.TCPConnConfig{
Address: "localhost:8080", // 服务器地址
MaxMessageSize: 4096, // 最大消息大小
EnableLogging: true, // 启用日志
ConnectionTimeout: 5 * time.Second, // 连接超时
}
// 连接到服务器
conn, err := buffstreams.DialTCP(cfg)
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
// 发送数据
message := "Hello, buffstreams!"
_, err = conn.Write([]byte(message))
if err != nil {
log.Fatalf("Write error: %v", err)
}
// 读取响应
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
log.Fatalf("Read error: %v", err)
}
fmt.Printf("Server response: %s\n", string(buf[:n]))
}
高级功能
1. 使用帧分隔符处理消息
// 服务器端配置
cfg := buffstreams.TCPListenerConfig{
Address: "localhost:8080",
MaxMessageSize: 4096,
EnableLogging: true,
ConnectionTimeout: 0,
FrameDelimiter: '\n', // 设置帧分隔符
}
// 客户端发送带分隔符的消息
message := "Frame 1\nFrame 2\n"
_, err = conn.Write([]byte(message))
2. 处理大文件传输
// 服务器端处理大文件
func handleFileTransfer(conn *buffstreams.TCPConn) {
file, err := os.Create("received_file.dat")
if err != nil {
log.Printf("File create error: %v", err)
return
}
defer file.Close()
buf := make([]byte, 4096)
for {
n, err := conn.Read(buf)
if err != nil {
if err == io.EOF {
log.Println("File transfer completed")
return
}
log.Printf("Read error: %v", err)
return
}
_, err = file.Write(buf[:n])
if err != nil {
log.Printf("File write error: %v", err)
return
}
}
}
3. 性能调优
// 调整缓冲区大小和并发设置
cfg := buffstreams.TCPListenerConfig{
Address: "localhost:8080",
MaxMessageSize: 8192, // 增大缓冲区
EnableLogging: false, // 生产环境可关闭日志
ConnectionTimeout: 10 * time.Second,
ReadBufferSize: 65536, // 增大读缓冲区
WriteBufferSize: 65536, // 增大写缓冲区
}
错误处理建议
- 始终检查
Read
和Write
操作的返回值 - 实现连接重试机制
- 使用超时防止阻塞
- 记录详细的错误日志
总结
buffstreams为Golang TCP通信提供了简单高效的解决方案,特别适合处理流式数据。通过合理配置缓冲区大小、帧分隔符和超时设置,可以构建稳定可靠的网络应用。
实际项目中,可以根据需求进一步封装buffstreams,添加应用层协议(如自定义包头)或加密功能,以满足更复杂的业务需求。