Golang中如何更准确地定义UDP连接会话
Golang中如何更准确地定义UDP连接会话 在监听UDP套接字消息时,我希望能够以某种方式确定数据包的来源,并将其分发到不同的会话中,以便获得更详细的接收数据报告。目前我只是简单地通过查找当前会话并记录到其通道中来实现,我想知道是否有更优雅的方法?
for {
buff := bufPool.Get().([]byte)
size, addr, err := pc.ReadFromUDP(buff[0:])
if err != nil {
done <- err
return
}
switch s, ok := getSession(addr); ok {
case true:
s.buffer <- buff[0:size]
bufPool.Put(buff)
s.expiration = time.Now().Add(time.Duration(time.Second * 10)).UnixNano()
atomic.AddInt64(&s.countMessage, 1)
atomic.AddInt64(&s.len, int64(size))
case false:
s := &session{
id: rand.Int(),
conn: addr,
expiration: time.Now().UnixNano(),
buffer: make(chan []byte, 64),
run: func(wg *sync.WaitGroup, in chan []byte, ip string) {
for b := range in {
var m Message
err := json.Unmarshal(b, &m)
if err != nil {
log.Fatal(err)
continue
}
m.Device_ip = ip
out <- m
}
},
}
wg.Add(1)
s.buffer <- buff[0:size]
bufPool.Put(buff)
atomic.AddInt64(&s.countMessage, 1)
atomic.AddInt64(&s.len, int64(size))
go s.run(&wg, s.buffer, s.conn.IP.String())
更多关于Golang中如何更准确地定义UDP连接会话的实战教程也可以访问 https://www.itying.com/category-94-b0.html
4 回复
感谢您的回答,能否请您提供关于您回答中按位运算的资源或评论?
更多关于Golang中如何更准确地定义UDP连接会话的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
德米特里,这个解决方案可能比实际需要的更复杂。我原本试图将比特位打包到特定大小的整数中,以使解决方案更高效地利用缓存,但现在重新审视后,我认为这样做不会带来显著的收益。我建议改为:
type key struct {
zone string
addr [16]byte
port int16
ipv6 bool
}
func makeKeyFromUDPAddr(u *net.UDPAddr) key {
k := key{
zone: u.Zone,
port: int16(u.Port),
ipv6: len(u.IP) == 16,
}
copy(k.addr[:], u.IP)
return k
}
func (k key) IP() net.IP {
if k.ipv6 {
return net.IP(k.addr[:16])
}
return net.IP(k.addr[:4])
}
var sessions = make(map[key]*session)
你好,德米特里,
如果你将 net.UDPAddr 中的数据复制到一个“扁平”的结构体中(我指的是一个内部不包含切片或指针的结构体),你就可以将其用作映射的键:
type key struct {
bits uint32
addr [16]byte
zone string
}
func makeKeyFromUDPAddr(u *net.UDPAddr) (k key) {
k.bits = int32(u.Port | ((copy(k.addr[:], u.IP) & 0x10) << 12))
k.zone = u.Zone
}
func (k key) port() int {
return k.bits & 0xffff
}
func (k key) addrLen() int {
// 第16位表示IPv6
shifter := (k.bits & 0x10000) >> 15
return 4 << shifter
}
func (k key) IP() net.IP {
return net.IP(k.addr[:k.addrLen()])
}
var sessions = make(map[key]*session)
在UDP连接会话管理中,可以通过更结构化的方式来处理会话标识和生命周期管理。以下是改进方案:
type SessionManager struct {
sessions sync.Map // key: string(addr), value: *Session
timeout time.Duration
}
type Session struct {
ID int
Addr *net.UDPAddr
LastActive time.Time
Buffer chan []byte
MessageCnt int64
DataLen int64
Cancel context.CancelFunc
}
func NewSessionManager(timeout time.Duration) *SessionManager {
return &SessionManager{
timeout: timeout,
}
}
func (sm *SessionManager) GetOrCreate(addr *net.UDPAddr) *Session {
key := addr.String()
if sess, ok := sm.sessions.Load(key); ok {
s := sess.(*Session)
s.LastActive = time.Now()
return s
}
ctx, cancel := context.WithCancel(context.Background())
s := &Session{
ID: rand.Int(),
Addr: addr,
LastActive: time.Now(),
Buffer: make(chan []byte, 64),
Cancel: cancel,
}
sm.sessions.Store(key, s)
go s.processMessages(ctx)
go sm.monitorSession(ctx, key, s)
return s
}
func (sm *SessionManager) monitorSession(ctx context.Context, key string, s *Session) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if time.Since(s.LastActive) > sm.timeout {
sm.sessions.Delete(key)
s.Cancel()
close(s.Buffer)
return
}
}
}
}
func (s *Session) processMessages(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case data, ok := <-s.Buffer:
if !ok {
return
}
var msg Message
if err := json.Unmarshal(data, &msg); err == nil {
msg.DeviceIP = s.Addr.IP.String()
atomic.AddInt64(&s.MessageCnt, 1)
atomic.AddInt64(&s.DataLen, int64(len(data)))
out <- msg
}
}
}
}
// 使用示例
func main() {
sm := NewSessionManager(10 * time.Second)
pc, err := net.ListenUDP("udp", &net.UDPAddr{Port: 8080})
if err != nil {
log.Fatal(err)
}
bufPool := sync.Pool{
New: func() interface{} {
return make([]byte, 1500)
},
}
for {
buf := bufPool.Get().([]byte)
n, addr, err := pc.ReadFromUDP(buf)
if err != nil {
continue
}
session := sm.GetOrCreate(addr)
session.Buffer <- buf[:n]
bufPool.Put(buf)
}
}
这个实现提供了:
- 使用
sync.Map进行线程安全的会话存储 - 独立的会话监控协程处理超时清理
- 上下文管理确保资源正确释放
- 结构化的会话状态管理
- 明确的会话生命周期控制
每个UDP地址对应一个会话实例,包含独立的处理协程和状态跟踪,能够准确记录每个来源的数据统计。

