Golang解析缓冲数组时遇到困难

Golang解析缓冲数组时遇到困难 以下是数据的传入方式

[2 68 52 50 58 49 48 32]
[32 48 32 32 48 32 32 48]
[48 49 48 49 48 32 32 32]
[32 3 120]
[2 68 52 50 58 49 49 32]
[32 48 32 32 48 32 32 48]
[48 49 48 49 48 32 32 32]
[32 3 121]
[2 68 52 50 58 49 50 32]
[32 48 32 32 48 32 32 48]
[48 49 48 49 48 32 32 32]
[32 3 122]
[2 68 52 50 58 49 51 32]
[32 48 32 32 48 32 32 48]
[48 49 48 49 48 32 32 32]
[32 3 123]
[2 68 52 50 58 49 52 32]
[32 48 32 32 48 32 32 48]
[48 49 48 48 48 32 32 32]
[32 3 125]
[2 68 52 50 58 49 52 32]
[32 48 32 32 48 32 32 48]
[48 49 51 48 48 32 32 32]
[32 3 126 2 68 52 50 58 49 52 32 32 48 32 32 48]
[32 32 48 48 49 51 48 48]
[32 32 32 32 3 126]
[2 68 52 50 58 49 52 32]
[32 48 32 32 48 32 32 48]
[48 49 51 48 48 32 32 32]
[32 3 126]
[2 68 52 50 58 49 52 32]
[32 48 32 32 48 32 32 48]
[48 49 48 49 48 32 32 32]
[32 3 124 2 68 52 50 58 49 52 32 32 48 32 32 48]
[32 32 48 48 49 48 49 48]
[32 32 32 32 3 124]
[2 68 52 50 58 49 53 32]
[32 48 32 32 48 32 32 48]
[48 49 48 49 48 32 32 32]
[32 3 125]

这是我演示此过程的方式

conn, err := net.Dial("tcp", "192.168.2.64:9001")

	if err != nil {
		fmt.Println("Connection not established")
	}

	fmt.Println(conn.LocalAddr())

	for {
		buffer := make([]byte, 1024)

		bytesRead, err := conn.Read(buffer)

		if err != nil {
			fmt.Println(err)
		}

		fmt.Println(buffer[:bytesRead])

	}

这是我用来尝试解析它的程序。

package main

import (
	"bufio"
	"fmt"
	"io"
	"net"
)

func calculateXORChecksum(data []byte) byte {
	var checksum byte
	for _, b := range data {
		checksum ^= b
	}
	return checksum
}

func findByte(data []byte) int {

	for index, aByte := range data {
		if aByte == 2 {
			return index
		}
	}
	return -1
}
func readSTXETXWithChecksum(conn net.Conn) ([]byte, byte, error) {
	reader := bufio.NewReader(conn)

	// Read until we get the ETX (ASCII 3) character
	message, err := reader.ReadBytes(3)
	if err != nil {
		return []byte(""), 0, err
	}

	checksumByte, err := reader.ReadByte()
	if err != nil {
		return []byte(""), 0, err

	}

	data := append(message, checksumByte) // All characters including the checksum

	return data, checksumByte, nil
}

func main() {
	conn, err := net.Dial("tcp", "192.168.2.64:9001")

	if err != nil {
		fmt.Println("Connection not established")
	}

	fmt.Println(conn.LocalAddr())

	for {
		fmt.Println()

		data, _, err := readSTXETXWithChecksum(conn)
		if err == io.EOF {
			fmt.Println("Connection closed by the peer")
		} else if err != nil {
			fmt.Println("Error reading message:", err)
		} else {
			fmt.Println("Received data:", []byte(data))

			fmt.Println("Received data:", string([]byte(data)[:len(data)-1]))

			fmt.Println()
		}

	}

}

问题是,当我收到像 [32 3 126 2 68 52 50 58 49 52 32 32 48 32 32 48] 这样的字节数组时,3校验和字节 之后的任何内容都会被丢弃。这是因为在 for 循环的每次迭代中都声明了一个新的扫描器。然而,如果我在 for 循环外部声明一个读取器,缓冲区数组最终会被填满,并且当它满了之后声明新的读取器时,可能会丢失数据。

我为自己无法解决看似简单的问题而感到相当愚蠢。 我尝试了一些手动方法,但无法使其正常工作,并且觉得这不是正确的方法。

buffer := make([]byte, 1024)
	var aLine []byte

	var remaining []byte
	for {

		numberRead, err := conn.Read(buffer)

		if err != nil {
			fmt.Println("the error")
		}

		if len(remaining) != 0 {
			aLine = append(remaining, buffer[:numberRead]...)
		}
		// fmt.Println(aLine)

		foundEnd := findByte(buffer[:numberRead])
		if foundEnd != -1 {

			aLine = append(aLine, buffer[:foundEnd+1]...)
			fmt.Println(aLine)
			aLine = nil
			if numberRead > foundEnd+1 {
				leftover := buffer[foundEnd+1 : numberRead]
				remaining = append(remaining, leftover...)
			}

		} else {
			aLine = append(aLine, buffer[:numberRead]...)
		}

更多关于Golang解析缓冲数组时遇到困难的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

好的,谢谢,这能解决我每次读取26字节的问题。

那么,当我从TCP连接读取数据时,我担心如果出现以下情况会发生什么:

reader := bufio.NewReaderSize(conn, 1024)

缓冲区数组被填充到1022字节,然后又接收了6个字节。

你对这部分有什么见解吗?

更多关于Golang解析缓冲数组时遇到困难的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


然而,如果我在 for 循环外部声明一个 reader,缓冲区数组最终会被填满,并且当它满了时声明一个新的,可能会导致数据丢失。

显然,在外部定义 Reader 是正确的。 我不太清楚你在担心什么?不知道你会如何丢失数据?你正在使用的 ReadBytes 函数不会导致这个问题。 如果你还有疑问,请写下原因,我无法知道你在担心什么。

什么也不会发生,数据不会凭空消失。

buf:=make([]byte,1022)

//r := bufio.Reader // 大小为1024且已满
n, _ = r.Read(buf)
// n 将为 1022
n, _ = r.Read(buf)
// n 将为 1024 - 1022 = 2

n, _ = r.Read(buf)
// 下一次读取源数据;n <= 1022

请注意,我需要提醒你一点(也许你已经知道),对于TCP连接,它是流式的,io.Reader接口并不保证你每次都能读取到缓冲区大小的数据。

我可能因为之前遇到的一个问题而感到困惑。

	f, err := os.Open("./serial-formated.txt")
	if err != nil {
		fmt.Println("err")
	}

	defer f.Close()
	reader := bufio.NewReaderSize(f, 4082)

	startByte := 2
	endByte := 3
	for {

		n := 26

		// Read n bytes from standard input
		buf := make([]byte, n)
		numRead, err := reader.Read(buf)
		readChar := []byte(buf[:numRead])

我需要读取每26个字节,因此必须确保读取器的缓冲区大小能被26整除(这里是4082)。否则,有时它可能无法每次都恰好读取26个字节。

所以,当我从TCP连接读取数据时,我担心如果出现以下情况会发生什么: reader := bufio.NewReaderSize(conn, 1024) 缓冲区数组已填充了1022个字节,然后又接收了6个字节。

如果你想确保读取足够的字节,为什么不使用 io.ReadFull 呢? io.ReadFull(reader, buf)

io.Reader 接口只会根据你传入的 buf 大小来确定上限,而 bufio.Reader 在内部是有缓冲的。当你调用 Read 时,它会预先读取一部分到自己的缓冲区,然后从缓冲区将数据传送到你的 buf 中。关于这一点,可以阅读源代码,这并不难。

// Read reads data into p.
// It returns the number of bytes read into p.
// The bytes are taken from at most one Read on the underlying [Reader],
// hence n may be less than len(p).
// To read exactly len(p) bytes, use io.ReadFull(b, p).
// If the underlying [Reader] can return a non-zero count with io.EOF,
// then this Read method can do so as well; see the [io.Reader] docs.
func (b *Reader) Read(p []byte) (n int, err error) {
	n = len(p)
	if n == 0 {
		if b.Buffered() > 0 {
			return 0, nil
		}
		return 0, b.readErr()
	}
	if b.r == b.w {
		if b.err != nil {
			return 0, b.readErr()
		}
		if len(p) >= len(b.buf) {
			// Large read, empty buffer.
			// Read directly into p to avoid copy.
			n, b.err = b.rd.Read(p)
			if n < 0 {
				panic(errNegativeRead)
			}
			if n > 0 {
				b.lastByte = int(p[n-1])
				b.lastRuneSize = -1
			}
			return n, b.readErr()
		}
		// One read.
		// Do not use b.fill, which will loop.
		b.r = 0
		b.w = 0
		n, b.err = b.rd.Read(b.buf)
		if n < 0 {
			panic(errNegativeRead)
		}
		if n == 0 {
			return 0, b.readErr()
		}
		b.w += n
	}

	// copy as much as we can
	// Note: if the slice panics here, it is probably because
	// the underlying reader returned a bad count. See issue 49795.
	n = copy(p, b.buf[b.r:b.w])
	b.r += n
	b.lastByte = int(b.buf[b.r-1])
	b.lastRuneSize = -1
	return n, nil
}
	n = copy(p, b.buf[b.r:b.w])
	b.r += n
	b.lastByte = int(b.buf[b.r-1])
	b.lastRuneSize = -1
	return n, nil

可见,在 bufio.Reader 中,会优先消耗缓冲区的内容。在你的例子中,当使用一个非 26 的倍数时,会返回一个非 26 的长度,因为 bufio.Reader 的缓冲区大小不是 26 的倍数。 显然的结论是,不会发生数据丢失。

从你提供的数据来看,这是基于STX(2)/ETX(3)分隔的协议数据。问题在于reader.ReadBytes(3)会读取到第一个ETX字节,但后续数据可能包含多个消息。以下是修复方案:

package main

import (
	"bytes"
	"fmt"
	"net"
)

func parseMessages(data []byte) [][]byte {
	var messages [][]byte
	start := 0
	
	for i := 0; i < len(data); i++ {
		if data[i] == 3 && i+1 < len(data) { // 找到ETX且后面有校验和
			if start < i && data[start] == 2 { // 确保以STX开始
				message := data[start : i+2] // 包含ETX和校验和
				messages = append(messages, message)
				start = i + 2 // 移动到下一个消息开始
				i = start - 1 // 调整循环索引
			}
		} else if data[i] == 2 && i > start { // 发现新的STX,但未找到完整消息
			start = i
		}
	}
	
	// 返回剩余的不完整数据
	if start < len(data) {
		return messages, data[start:]
	}
	return messages, nil
}

func main() {
	conn, err := net.Dial("tcp", "192.168.2.64:9001")
	if err != nil {
		fmt.Println("Connection not established:", err)
		return
	}
	defer conn.Close()

	fmt.Println("Connected:", conn.LocalAddr())

	buffer := make([]byte, 1024)
	var leftover []byte

	for {
		n, err := conn.Read(buffer)
		if err != nil {
			fmt.Println("Read error:", err)
			break
		}

		// 合并剩余数据和新的数据
		var data []byte
		if len(leftover) > 0 {
			data = append(leftover, buffer[:n]...)
			leftover = nil
		} else {
			data = buffer[:n]
		}

		// 解析完整消息
		messages, remaining := parseMessages(data)
		
		// 处理每个完整消息
		for _, msg := range messages {
			if len(msg) >= 3 && msg[0] == 2 && msg[len(msg)-2] == 3 {
				// 提取数据和校验和
				messageData := msg[1 : len(msg)-2]
				receivedChecksum := msg[len(msg)-1]
				
				// 计算校验和
				calculatedChecksum := byte(0)
				for _, b := range msg[:len(msg)-1] {
					calculatedChecksum ^= b
				}
				
				if calculatedChecksum == receivedChecksum {
					fmt.Printf("Valid message: %s\n", string(messageData))
					fmt.Printf("Raw bytes: %v\n", msg)
				} else {
					fmt.Printf("Checksum error: calculated %d, received %d\n", 
						calculatedChecksum, receivedChecksum)
				}
			}
		}
		
		// 保存剩余的不完整数据
		if len(remaining) > 0 {
			leftover = remaining
		}
	}
}

对于你的测试数据,这里是一个解析示例:

func testParse() {
	// 模拟你的数据
	testData := []byte{
		32, 3, 126, 2, 68, 52, 50, 58, 49, 52, 32, 32, 48, 32, 32, 48,
		32, 32, 48, 48, 49, 51, 48, 48, 32, 32, 32, 32, 3, 126,
	}
	
	messages, remaining := parseMessages(testData)
	
	fmt.Printf("Found %d messages\n", len(messages))
	for i, msg := range messages {
		fmt.Printf("Message %d: %v\n", i, msg)
	}
	fmt.Printf("Remaining: %v\n", remaining)
}

关键点:

  1. 使用缓冲区累积数据,而不是每次创建新的读取器
  2. parseMessages函数处理多个消息和跨读取的数据包
  3. 正确处理STX(2)和ETX(3)边界
  4. 保留不完整的消息数据用于下一次读取
  5. 验证每个消息的校验和

这种方法可以处理你提到的[32 3 126 2 68 ...]这种情况,其中包含多个消息或消息片段。

回到顶部