高负载下Golang缓冲区损坏问题探究

高负载下Golang缓冲区损坏问题探究 我正在编写一个简单的服务器来连接DLMS电表并从中获取读数,但在高负载下遇到了缓冲区损坏的问题,而在少量连接时一切正常。以下是我所做的全部内容。 (请不要在意硬编码,目的是为了测试)

package internal

import (
	"encoding/hex"
	"fmt"
	"net"
	"regexp"
	"strconv"
	"strings"
	"sync"
	"time"
)

type Meter struct {
	socket      *net.TCPConn
	state       State
	meterSerial string
	mutex       sync.Mutex
}


func NewMeter(socket *net.TCPConn) *Meter {
	return &Meter{
		socket: socket,
		state:  Connected,
	}
}

func (m *Meter) Read() {
	for {
		data := make([]byte, 1024)
		m.mutex.Lock()
		n, err := m.socket.Read(data)
		m.mutex.Unlock()
		if err == net.ErrClosed {
			m.socket.Close()
			return
		}
		if n > 0 {
			hexString := hex.EncodeToString(data[:n])
			fmt.Printf("Read %d bytes\nData: %s\n", n, hexString)
			m.OnData(hexString)
		}
	}
}

func (m *Meter) OnData(hexString string) {
	if hexString == Handshake {
		if m.state == Connected {
			response, err := hex.DecodeString(AARE)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			m.state = ReadAARQ
			return
		} else {
			response, err := hex.DecodeString(Handshake)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			return
		}
	}

	if m.state == ReadAARQ {
		if hexString == AARQ {
			response, err := hex.DecodeString(GetSerialRequest)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			m.state = ReadSerial
			return
		}
	}

	if m.state == ReadSerial {
		m.meterSerial = parseSerial(hexString)
		from := time.Date(2023, 1, 1, 0, 0, 0, 0, time.Local)
		m.getDailyBilling(from, time.Now())
		return
	}

	if m.state == ReadDailyBilling {
		parseDailyBilling(hexString)

		if hexString[18:20] == "02" && hexString[22:24] != "01" {
			response, _ := hex.DecodeString(fmt.Sprintf("0001001100010007c002c1%s", hexString[24:32]))
			go m.socket.Write(response)
			return
		}

		m.state = Idle
	}
}

func parseSerial(hexData string) string {
	if !strings.HasPrefix(hexData, "0001000100110012c401c100090c") {
		return ""
	}

	bytes, err := hex.DecodeString(hexData[28:])
	if err != nil {
		fmt.Println(err)
	}

	return string(bytes)
}

func (m *Meter) getDailyBilling(dates ...time.Time) {
	var from, to time.Time
	if len(dates) > 2 || len(dates) == 0 {
		return
	}
	if len(dates) == 1 {
		from = dates[0]
		to = dates[0].AddDate(0, 0, 1)
	}
	if len(dates) == 2 {
		from = dates[0]
		to = dates[1]
	}

	response, err := hex.DecodeString(fmt.Sprintf("0001001100010040c001c100070000620200ff0201010204020412000809060000010000ff0f02120000090c%s090c%s0100", dateToHex(from), dateToHex(to)))
	if err != nil {
		fmt.Println(err)
	}
	go m.socket.Write(response)
	m.state = ReadDailyBilling
}

func parseDailyBilling(hexData string) []map[string]interface{} {
	data := strings.Split(hexData, "0216090c")
	if len(data) == 0 {
		return nil
	}

	result := make([]map[string]interface{}, 0)

	for _, dailyData := range data[1:] {
		valid := true
		if !strings.HasPrefix(dailyData, "07") {
			valid = false
		}
		date := dlmsHexToDate(dailyData[0:24])
		vals := regexp.MustCompile(".{1,10}").FindAllString(dailyData[24:234], -1)

		reading := make([]int, 0)

		for _, val := range vals {
			if !strings.HasPrefix(val, "06") {
				valid = false
			}
			decVal, _ := hex.DecodeString(val[2:])
			reading = append(reading, int(decVal[0]))
		}

		if !valid {
			fmt.Println("whole buff:", data)
			fmt.Println("date:", dailyData[0:24])
			fmt.Println("vals:", vals)
			panic("Bleeeee")
		}

		if valid {
			result = append(result, map[string]interface{}{
				"date":    date,
				"reading": reading,
			})
		}
	}
	return result
}

func dateToHex(date time.Time) string {
	return fmt.Sprintf("0%02x%02x%02x%02x%02x%02x%02xFFFED400", date.Year(), int(date.Month()), date.Day(), int(date.Weekday()), date.Hour(), date.Minute(), date.Second())
}

func dlmsHexToDate(hexString string) time.Time {
	hexyear, _ := hex.DecodeString(hexString[0:4])
	hexmonth, _ := hex.DecodeString(hexString[4:6])
	hexday, _ := hex.DecodeString(hexString[6:8])
	hexhour, _ := hex.DecodeString(hexString[10:12])
	hexminute, _ := hex.DecodeString(hexString[12:14])
	hexsecond, _ := hex.DecodeString(hexString[14:16])

	year, _ := strconv.ParseInt(string(hexyear), 0, 16)
	month, _ := strconv.ParseInt(string(hexmonth), 0, 16)
	day, _ := strconv.ParseInt(string(hexday), 0, 16)
	hour, _ := strconv.ParseInt(string(hexhour), 0, 16)
	minute, _ := strconv.ParseInt(string(hexminute), 0, 16)
	second, _ := strconv.ParseInt(string(hexsecond), 0, 16)

	return time.Date(int(year), time.Month(month), int(day), int(hour), int(minute), int(second), 0, time.UTC)
}

更多关于高负载下Golang缓冲区损坏问题探究的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

你介意预约一个会议时段,让我们在线讨论一下你的代码吗?

更多关于高负载下Golang缓冲区损坏问题探究的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


感谢您的回答。仪表使用BER编码,我遇到了两种类型的缓冲区损坏:

  1. 缓冲区大小小于预期。 例如:00 01 00 01 00 11 00 04 c0 01 c4 01 其中 00 04 是 uint16 类型的 PDU 大小,有时我收到的 PDU 小于 4 字节。

  2. 无效字节。 例如:本应收到 00 01 00 01 00 11 00 04 c0 01 c4 01,却收到了 00 01 00 01 00 11 00 04 c0 01 c4 21 之类的数据。

需要更多关于corrupted buffer problem的细节,你期望看到什么,实际又看到了什么?

快速浏览了你的代码,这里有一些建议(在代码块中注释)。顺便说一句,GitHub 可能是进行代码审查更合适的平台。

func (m *Meter) OnData(hexString string) {
	if hexString == Handshake {
		if m.state == Connected {
			response, err := hex.DecodeString(AARE)
			if err != nil {
				fmt.Println(err) // 发生错误时应返回
			}
			go m.socket.Write(response) // 也许使用通道是更好的选择,这样你就不必锁定socket连接
			m.state = ReadAARQ
			return
		} else {
			response, err := hex.DecodeString(Handshake)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			return
		}
	}

	if m.state == ReadAARQ {
		if hexString == AARQ {
			response, err := hex.DecodeString(GetSerialRequest)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			m.state = ReadSerial
			return
		}
	}

	if m.state == ReadSerial {
		m.meterSerial = parseSerial(hexString)
		from := time.Date(2023, 1, 1, 0, 0, 0, 0, time.Local)
		m.getDailyBilling(from, time.Now())
		return
	}

	if m.state == ReadDailyBilling {
		parseDailyBilling(hexString) // 返回值未被使用

		if hexString[18:20] == "02" && hexString[22:24] != "01" {
			response, _ := hex.DecodeString(fmt.Sprintf("0001001100010007c002c1%s", hexString[24:32]))
			go m.socket.Write(response)
			return
		}

		m.state = Idle
	}
}

在高负载下出现缓冲区损坏问题,主要原因是并发访问共享资源时的竞态条件。以下是关键问题和修复方案:

主要问题

  1. 竞态条件:多个goroutine同时访问m.statem.socket.Write()
  2. 数据竞争Read()OnData()中的并发操作未正确同步
  3. 缓冲区重用hexString在多个goroutine间可能被修改

修复后的代码

package internal

import (
	"encoding/hex"
	"fmt"
	"net"
	"regexp"
	"strconv"
	"strings"
	"sync"
	"time"
)

type Meter struct {
	socket      *net.TCPConn
	state       State
	meterSerial string
	mutex       sync.Mutex
	writeMutex  sync.Mutex
	dataChan    chan []byte
}

func NewMeter(socket *net.TCPConn) *Meter {
	m := &Meter{
		socket:   socket,
		state:    Connected,
		dataChan: make(chan []byte, 100),
	}
	go m.processData()
	return m
}

func (m *Meter) Read() {
	buffer := make([]byte, 1024)
	for {
		n, err := m.socket.Read(buffer)
		if err != nil {
			if err == net.ErrClosed {
				m.socket.Close()
				return
			}
			continue
		}
		
		if n > 0 {
			dataCopy := make([]byte, n)
			copy(dataCopy, buffer[:n])
			m.dataChan <- dataCopy
		}
	}
}

func (m *Meter) processData() {
	for data := range m.dataChan {
		m.mutex.Lock()
		hexString := hex.EncodeToString(data)
		fmt.Printf("Read %d bytes\nData: %s\n", len(data), hexString)
		m.OnData(hexString)
		m.mutex.Unlock()
	}
}

func (m *Meter) OnData(hexString string) {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	if hexString == Handshake {
		if m.state == Connected {
			response, err := hex.DecodeString(AARE)
			if err != nil {
				fmt.Println(err)
				return
			}
			m.safeWrite(response)
			m.state = ReadAARQ
			return
		} else {
			response, err := hex.DecodeString(Handshake)
			if err != nil {
				fmt.Println(err)
				return
			}
			m.safeWrite(response)
			return
		}
	}

	if m.state == ReadAARQ {
		if hexString == AARQ {
			response, err := hex.DecodeString(GetSerialRequest)
			if err != nil {
				fmt.Println(err)
				return
			}
			m.safeWrite(response)
			m.state = ReadSerial
			return
		}
	}

	if m.state == ReadSerial {
		m.meterSerial = parseSerial(hexString)
		from := time.Date(2023, 1, 1, 0, 0, 0, 0, time.Local)
		m.getDailyBilling(from, time.Now())
		return
	}

	if m.state == ReadDailyBilling {
		if len(hexString) < 32 {
			return
		}
		
		parseDailyBilling(hexString)

		if hexString[18:20] == "02" && hexString[22:24] != "01" {
			response, _ := hex.DecodeString(fmt.Sprintf("0001001100010007c002c1%s", hexString[24:32]))
			m.safeWrite(response)
			return
		}

		m.state = Idle
	}
}

func (m *Meter) safeWrite(data []byte) {
	m.writeMutex.Lock()
	defer m.writeMutex.Unlock()
	
	_, err := m.socket.Write(data)
	if err != nil {
		fmt.Printf("Write error: %v\n", err)
	}
}

func (m *Meter) getDailyBilling(dates ...time.Time) {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	var from, to time.Time
	if len(dates) > 2 || len(dates) == 0 {
		return
	}
	if len(dates) == 1 {
		from = dates[0]
		to = dates[0].AddDate(0, 0, 1)
	}
	if len(dates) == 2 {
		from = dates[0]
		to = dates[1]
	}

	response, err := hex.DecodeString(fmt.Sprintf("0001001100010040c001c100070000620200ff0201010204020412000809060000010000ff0f02120000090c%s090c%s0100", 
		dateToHex(from), dateToHex(to)))
	if err != nil {
		fmt.Println(err)
		return
	}
	
	m.safeWrite(response)
	m.state = ReadDailyBilling
}

func parseDailyBilling(hexData string) []map[string]interface{} {
	if len(hexData) < 234 {
		return nil
	}

	data := strings.Split(hexData, "0216090c")
	if len(data) == 0 {
		return nil
	}

	result := make([]map[string]interface{}, 0)

	for _, dailyData := range data[1:] {
		if len(dailyData) < 234 {
			continue
		}
		
		valid := true
		if !strings.HasPrefix(dailyData, "07") {
			valid = false
		}
		
		if len(dailyData) < 24 {
			continue
		}
		date := dlmsHexToDate(dailyData[0:24])
		
		if len(dailyData) < 258 {
			continue
		}
		vals := regexp.MustCompile(".{1,10}").FindAllString(dailyData[24:234], -1)

		reading := make([]int, 0)

		for _, val := range vals {
			if !strings.HasPrefix(val, "06") {
				valid = false
			}
			if len(val) < 3 {
				continue
			}
			decVal, _ := hex.DecodeString(val[2:])
			if len(decVal) > 0 {
				reading = append(reading, int(decVal[0]))
			}
		}

		if !valid {
			fmt.Println("Invalid data format")
			continue
		}

		if valid {
			result = append(result, map[string]interface{}{
				"date":    date,
				"reading": reading,
			})
		}
	}
	return result
}

关键修复点

  1. 添加数据通道:使用带缓冲的channel处理接收到的数据
  2. 分离读写锁:使用独立的mutex控制状态和写操作
  3. 数据复制:在读取时复制数据,避免缓冲区重用
  4. 边界检查:添加字符串长度检查防止越界访问
  5. 错误处理:完善错误处理和资源清理

这些修改确保了在高并发场景下,每个连接的数据处理是线程安全的,避免了缓冲区损坏和数据竞争问题。

回到顶部