Golang实现TCP服务器实时处理不定字节数据流

Golang实现TCP服务器实时处理不定字节数据流 你好!

我对此主题进行了一些研究,但确实找不到答案。来自 Nodejs 领域,TCP 连接会在事件监听器上"发送"数据给我,我从来不必为 TCP 客户端的数据流创建字节数组。

我正在编写一个"接收器"来接收 TCP 连接及其数据,并将其放入 Kafka 主题中。我希望获取实时数据流(非空消息),目前不太关心解析消息。他们设计连接到我的服务器的调制解调器,以保持连接打开并发送磁力计数据的消息。换句话说,我不知道数据包的大小。我认为在我的代码中,handleRequest 函数中的无限 for 循环每次读取时都会填充 buf 变量,这意味着它可能会随着时间的推移而填满。

然而,这种方法也有效,因为每次读取时我可以生成一条 Kafka 消息,但我可能会遇到 buf 被填满的问题。我还研究了使用 ioutil.ReadAll,它会一直读取直到客户端关闭或出现其他错误,但在那之前我无法对其进行操作。

我的直觉是需要使用 ReadAll 并创建一个包含 ReadAll 部分内容的通道,并将其提供给其他代码。但这感觉…有点取巧。我还认为,在没有框架告知每次读取预期内容的情况下保持 TCP 连接存活并不是最佳方式,但 TCP 相关事务对我们团队中的许多人来说都是新的。例如,TCP 客户端在传递数据后是否应该结束连接?无论如何,这有点离题了。

对于如何处理这个问题有什么建议吗?

package main

import (
	"encoding/binary"
	"fmt"
	"net"
	"os"
)

const (
	CONN_HOST = "localhost"
	CONN_PORT = "9000"
	CONN_TYPE = "tcp"
	CONN_URL  = CONN_HOST + ":" + CONN_PORT
)

func main() {
	// 监听传入连接
	l, err := net.Listen(CONN_TYPE, CONN_URL)

	if err != nil {
		fmt.Println("Error listening:", err.Error())
		os.Exit(1)
	}

	// 在应用程序关闭时关闭监听器
	defer l.Close()

	fmt.Println("Listening on " + CONN_URL)
	for {
		// 监听连接
		conn, err := l.Accept()

		if err != nil {
			fmt.Println("Error accepting connection:", err.Error())
			os.Exit(1)
		}

		go handleRequest(conn)
	}
}

func handleRequest(conn net.Conn) {
	// 保存传入信息的缓冲区
	buf := make([]byte, 1024)

	for {
		len, err := conn.Read(buf)

		if err != nil {
			fmt.Println("Error reading:", err.Error())
			break
		}

		s := string(buf[:len])

		fmt.Println("Stuff", s)
		fmt.Println("len", binary.Size(buf))
	}
}

更多关于Golang实现TCP服务器实时处理不定字节数据流的实战教程也可以访问 https://www.itying.com/category-94-b0.html

3 回复

你说得对。经过一些实践后,我意识到缓冲区不会在新空间中被填满。它只是每次读取时替换内容,直到达到指定的大小。

func main() {
    fmt.Println("hello world")
}

更多关于Golang实现TCP服务器实时处理不定字节数据流的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你的代码看起来没问题。它每次迭代最多读取1024字节到缓冲区,并打印接收到的字节。下一次迭代会重用相同的缓冲区。如果不查看接收数据的结构,你无法知道收到的789字节(或其他数量)是一个消息、半个消息还是五个消息。但如果你只是传递数据,可能不需要关心这个。

在处理TCP流数据时,特别是当数据包大小不固定时,确实需要一种更灵活的方法。你的当前实现使用固定大小的缓冲区,这可能导致数据截断或处理不完整消息的问题。以下是改进方案:

package main

import (
	"bufio"
	"fmt"
	"net"
	"os"
	"time"
)

const (
	CONN_HOST = "localhost"
	CONN_PORT = "9000"
	CONN_TYPE = "tcp"
	CONN_URL  = CONN_HOST + ":" + CONN_PORT
)

func main() {
	l, err := net.Listen(CONN_TYPE, CONN_URL)
	if err != nil {
		fmt.Println("Error listening:", err.Error())
		os.Exit(1)
	}
	defer l.Close()

	fmt.Println("Listening on " + CONN_URL)
	for {
		conn, err := l.Accept()
		if err != nil {
			fmt.Println("Error accepting connection:", err.Error())
			continue
		}
		go handleRequest(conn)
	}
}

func handleRequest(conn net.Conn) {
	defer conn.Close()
	
	// 设置读取超时以避免永久阻塞
	conn.SetReadDeadline(time.Now().Add(30 * time.Second))
	
	reader := bufio.NewReader(conn)
	
	for {
		// 读取直到遇到分隔符(这里使用换行符作为示例)
		data, err := reader.ReadBytes('\n')
		if err != nil {
			if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
				fmt.Println("Read timeout")
			} else {
				fmt.Println("Error reading:", err.Error())
			}
			break
		}
		
		// 重置读取超时
		conn.SetReadDeadline(time.Now().Add(30 * time.Second))
		
		// 处理数据(这里只是打印,实际应该发送到Kafka)
		if len(data) > 0 {
			message := string(data[:len(data)-1]) // 移除分隔符
			fmt.Printf("Received message: %s\n", message)
			fmt.Printf("Message length: %d bytes\n", len(data))
			
			// 这里添加Kafka生产逻辑
			// sendToKafka(message)
		}
	}
}

如果数据没有明确的分隔符,可以使用以下基于长度前缀的方法:

func handleRequestWithLengthPrefix(conn net.Conn) {
	defer conn.Close()
	
	conn.SetReadDeadline(time.Now().Add(30 * time.Second))
	
	for {
		// 先读取4字节的长度前缀
		lengthBuf := make([]byte, 4)
		_, err := conn.Read(lengthBuf)
		if err != nil {
			fmt.Println("Error reading length prefix:", err.Error())
			break
		}
		
		// 解析消息长度(大端序)
		messageLength := binary.BigEndian.Uint32(lengthBuf)
		
		// 读取实际消息数据
		messageBuf := make([]byte, messageLength)
		bytesRead := 0
		
		for bytesRead < int(messageLength) {
			n, err := conn.Read(messageBuf[bytesRead:])
			if err != nil {
				fmt.Println("Error reading message data:", err.Error())
				return
			}
			bytesRead += n
		}
		
		// 重置读取超时
		conn.SetReadDeadline(time.Now().Add(30 * time.Second))
		
		// 处理完整消息
		message := string(messageBuf)
		fmt.Printf("Received message: %s\n", message)
		fmt.Printf("Message length: %d bytes\n", messageLength)
		
		// 这里添加Kafka生产逻辑
		// sendToKafka(message)
	}
}

对于完全流式处理,可以使用缓冲区累积数据:

func handleRequestStreaming(conn net.Conn) {
	defer conn.Close()
	
	conn.SetReadDeadline(time.Now().Add(30 * time.Second))
	
	buffer := make([]byte, 0, 4096) // 初始容量4KB
	tempBuf := make([]byte, 256)    // 每次读取的临时缓冲区
	
	for {
		n, err := conn.Read(tempBuf)
		if err != nil {
			if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
				fmt.Println("Read timeout")
			} else {
				fmt.Println("Error reading:", err.Error())
			}
			break
		}
		
		// 将新数据追加到缓冲区
		buffer = append(buffer, tempBuf[:n]...)
		
		// 处理缓冲区中的完整消息(假设以换行符分隔)
		for {
			newlineIndex := -1
			for i, b := range buffer {
				if b == '\n' {
					newlineIndex = i
					break
				}
			}
			
			if newlineIndex == -1 {
				break // 没有完整消息,继续读取
			}
			
			// 提取完整消息
			message := buffer[:newlineIndex]
			buffer = buffer[newlineIndex+1:]
			
			// 处理消息
			fmt.Printf("Received message: %s\n", string(message))
			fmt.Printf("Message length: %d bytes\n", len(message))
			
			// 这里添加Kafka生产逻辑
			// sendToKafka(string(message))
		}
		
		// 重置读取超时
		conn.SetReadDeadline(time.Now().Add(30 * time.Second))
	}
}

关于TCP连接保持:对于实时数据流,保持连接开放是标准做法,避免了频繁建立和断开连接的开销。你的方法在架构上是正确的,只需要优化数据读取逻辑以适应不定长的数据流。

回到顶部