Golang实现TCP服务器实时处理不定字节数据流
Golang实现TCP服务器实时处理不定字节数据流 你好!
我对此主题进行了一些研究,但确实找不到答案。来自 Nodejs 领域,TCP 连接会在事件监听器上"发送"数据给我,我从来不必为 TCP 客户端的数据流创建字节数组。
我正在编写一个"接收器"来接收 TCP 连接及其数据,并将其放入 Kafka 主题中。我希望获取实时数据流(非空消息),目前不太关心解析消息。他们设计连接到我的服务器的调制解调器,以保持连接打开并发送磁力计数据的消息。换句话说,我不知道数据包的大小。我认为在我的代码中,handleRequest 函数中的无限 for 循环每次读取时都会填充 buf 变量,这意味着它可能会随着时间的推移而填满。
然而,这种方法也有效,因为每次读取时我可以生成一条 Kafka 消息,但我可能会遇到 buf 被填满的问题。我还研究了使用 ioutil.ReadAll,它会一直读取直到客户端关闭或出现其他错误,但在那之前我无法对其进行操作。
我的直觉是需要使用 ReadAll 并创建一个包含 ReadAll 部分内容的通道,并将其提供给其他代码。但这感觉…有点取巧。我还认为,在没有框架告知每次读取预期内容的情况下保持 TCP 连接存活并不是最佳方式,但 TCP 相关事务对我们团队中的许多人来说都是新的。例如,TCP 客户端在传递数据后是否应该结束连接?无论如何,这有点离题了。
对于如何处理这个问题有什么建议吗?
package main
import (
"encoding/binary"
"fmt"
"net"
"os"
)
const (
CONN_HOST = "localhost"
CONN_PORT = "9000"
CONN_TYPE = "tcp"
CONN_URL = CONN_HOST + ":" + CONN_PORT
)
func main() {
// 监听传入连接
l, err := net.Listen(CONN_TYPE, CONN_URL)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
// 在应用程序关闭时关闭监听器
defer l.Close()
fmt.Println("Listening on " + CONN_URL)
for {
// 监听连接
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting connection:", err.Error())
os.Exit(1)
}
go handleRequest(conn)
}
}
func handleRequest(conn net.Conn) {
// 保存传入信息的缓冲区
buf := make([]byte, 1024)
for {
len, err := conn.Read(buf)
if err != nil {
fmt.Println("Error reading:", err.Error())
break
}
s := string(buf[:len])
fmt.Println("Stuff", s)
fmt.Println("len", binary.Size(buf))
}
}
更多关于Golang实现TCP服务器实时处理不定字节数据流的实战教程也可以访问 https://www.itying.com/category-94-b0.html
你说得对。经过一些实践后,我意识到缓冲区不会在新空间中被填满。它只是每次读取时替换内容,直到达到指定的大小。
func main() {
fmt.Println("hello world")
}
更多关于Golang实现TCP服务器实时处理不定字节数据流的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在处理TCP流数据时,特别是当数据包大小不固定时,确实需要一种更灵活的方法。你的当前实现使用固定大小的缓冲区,这可能导致数据截断或处理不完整消息的问题。以下是改进方案:
package main
import (
"bufio"
"fmt"
"net"
"os"
"time"
)
const (
CONN_HOST = "localhost"
CONN_PORT = "9000"
CONN_TYPE = "tcp"
CONN_URL = CONN_HOST + ":" + CONN_PORT
)
func main() {
l, err := net.Listen(CONN_TYPE, CONN_URL)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
defer l.Close()
fmt.Println("Listening on " + CONN_URL)
for {
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting connection:", err.Error())
continue
}
go handleRequest(conn)
}
}
func handleRequest(conn net.Conn) {
defer conn.Close()
// 设置读取超时以避免永久阻塞
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
reader := bufio.NewReader(conn)
for {
// 读取直到遇到分隔符(这里使用换行符作为示例)
data, err := reader.ReadBytes('\n')
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Println("Read timeout")
} else {
fmt.Println("Error reading:", err.Error())
}
break
}
// 重置读取超时
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
// 处理数据(这里只是打印,实际应该发送到Kafka)
if len(data) > 0 {
message := string(data[:len(data)-1]) // 移除分隔符
fmt.Printf("Received message: %s\n", message)
fmt.Printf("Message length: %d bytes\n", len(data))
// 这里添加Kafka生产逻辑
// sendToKafka(message)
}
}
}
如果数据没有明确的分隔符,可以使用以下基于长度前缀的方法:
func handleRequestWithLengthPrefix(conn net.Conn) {
defer conn.Close()
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
for {
// 先读取4字节的长度前缀
lengthBuf := make([]byte, 4)
_, err := conn.Read(lengthBuf)
if err != nil {
fmt.Println("Error reading length prefix:", err.Error())
break
}
// 解析消息长度(大端序)
messageLength := binary.BigEndian.Uint32(lengthBuf)
// 读取实际消息数据
messageBuf := make([]byte, messageLength)
bytesRead := 0
for bytesRead < int(messageLength) {
n, err := conn.Read(messageBuf[bytesRead:])
if err != nil {
fmt.Println("Error reading message data:", err.Error())
return
}
bytesRead += n
}
// 重置读取超时
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
// 处理完整消息
message := string(messageBuf)
fmt.Printf("Received message: %s\n", message)
fmt.Printf("Message length: %d bytes\n", messageLength)
// 这里添加Kafka生产逻辑
// sendToKafka(message)
}
}
对于完全流式处理,可以使用缓冲区累积数据:
func handleRequestStreaming(conn net.Conn) {
defer conn.Close()
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
buffer := make([]byte, 0, 4096) // 初始容量4KB
tempBuf := make([]byte, 256) // 每次读取的临时缓冲区
for {
n, err := conn.Read(tempBuf)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Println("Read timeout")
} else {
fmt.Println("Error reading:", err.Error())
}
break
}
// 将新数据追加到缓冲区
buffer = append(buffer, tempBuf[:n]...)
// 处理缓冲区中的完整消息(假设以换行符分隔)
for {
newlineIndex := -1
for i, b := range buffer {
if b == '\n' {
newlineIndex = i
break
}
}
if newlineIndex == -1 {
break // 没有完整消息,继续读取
}
// 提取完整消息
message := buffer[:newlineIndex]
buffer = buffer[newlineIndex+1:]
// 处理消息
fmt.Printf("Received message: %s\n", string(message))
fmt.Printf("Message length: %d bytes\n", len(message))
// 这里添加Kafka生产逻辑
// sendToKafka(string(message))
}
// 重置读取超时
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
}
}
关于TCP连接保持:对于实时数据流,保持连接开放是标准做法,避免了频繁建立和断开连接的开销。你的方法在架构上是正确的,只需要优化数据读取逻辑以适应不定长的数据流。


