Golang中如何更准确地定义UDP连接会话

Golang中如何更准确地定义UDP连接会话 在监听UDP套接字消息时,我希望能够以某种方式确定数据包的来源,并将其分发到不同的会话中,以便获得更详细的接收数据报告。目前我只是简单地通过查找当前会话并记录到其通道中来实现,我想知道是否有更优雅的方法?

完整代码函数

     for {
        buff := bufPool.Get().([]byte)
        size, addr, err := pc.ReadFromUDP(buff[0:])
        if err != nil {
            done <- err
            return
        }
        switch s, ok := getSession(addr); ok {
        case true:
            s.buffer <- buff[0:size]
            bufPool.Put(buff)
            s.expiration = time.Now().Add(time.Duration(time.Second * 10)).UnixNano()
            atomic.AddInt64(&s.countMessage, 1)
            atomic.AddInt64(&s.len, int64(size))
        case false:
            s := &session{
                id:         rand.Int(),
                conn:       addr,
                expiration: time.Now().UnixNano(),
                buffer:     make(chan []byte, 64),
                run: func(wg *sync.WaitGroup, in chan []byte, ip string) {
                    for b := range in {
                        var m Message
                        err := json.Unmarshal(b, &m)
                        if err != nil {
                            log.Fatal(err)
                            continue
                        }
                        m.Device_ip = ip
                        out <- m
                    }
                },
            }
            wg.Add(1)
            s.buffer <- buff[0:size]
            bufPool.Put(buff)
            atomic.AddInt64(&s.countMessage, 1)
            atomic.AddInt64(&s.len, int64(size))
            go s.run(&wg, s.buffer, s.conn.IP.String())

更多关于Golang中如何更准确地定义UDP连接会话的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

感谢您的回答,能否请您提供关于您回答中按位运算的资源或评论?

更多关于Golang中如何更准确地定义UDP连接会话的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


德米特里,这个解决方案可能比实际需要的更复杂。我原本试图将比特位打包到特定大小的整数中,以使解决方案更高效地利用缓存,但现在重新审视后,我认为这样做不会带来显著的收益。我建议改为:

type key struct {
    zone string
    addr [16]byte
    port int16
    ipv6 bool
}

func makeKeyFromUDPAddr(u *net.UDPAddr) key {
    k := key{
        zone: u.Zone,
        port: int16(u.Port),
        ipv6: len(u.IP) == 16,
    }
    copy(k.addr[:], u.IP)
    return k
}

func (k key) IP() net.IP {
    if k.ipv6 {
        return net.IP(k.addr[:16])
    }
    return net.IP(k.addr[:4])
}

var sessions = make(map[key]*session)

你好,德米特里,

如果你将 net.UDPAddr 中的数据复制到一个“扁平”的结构体中(我指的是一个内部不包含切片或指针的结构体),你就可以将其用作映射的键:

type key struct {
  bits  uint32
  addr  [16]byte
  zone  string
}

func makeKeyFromUDPAddr(u *net.UDPAddr) (k key) {
  k.bits = int32(u.Port | ((copy(k.addr[:], u.IP) & 0x10) << 12))
  k.zone = u.Zone
}

func (k key) port() int {
  return k.bits & 0xffff
}

func (k key) addrLen() int {
  // 第16位表示IPv6
  shifter := (k.bits & 0x10000) >> 15
  return 4 << shifter
}

func (k key) IP() net.IP {
  return net.IP(k.addr[:k.addrLen()])
}

var sessions = make(map[key]*session)

在UDP连接会话管理中,可以通过更结构化的方式来处理会话标识和生命周期管理。以下是改进方案:

type SessionManager struct {
    sessions sync.Map // key: string(addr), value: *Session
    timeout  time.Duration
}

type Session struct {
    ID         int
    Addr       *net.UDPAddr
    LastActive time.Time
    Buffer     chan []byte
    MessageCnt int64
    DataLen    int64
    Cancel     context.CancelFunc
}

func NewSessionManager(timeout time.Duration) *SessionManager {
    return &SessionManager{
        timeout: timeout,
    }
}

func (sm *SessionManager) GetOrCreate(addr *net.UDPAddr) *Session {
    key := addr.String()
    
    if sess, ok := sm.sessions.Load(key); ok {
        s := sess.(*Session)
        s.LastActive = time.Now()
        return s
    }
    
    ctx, cancel := context.WithCancel(context.Background())
    s := &Session{
        ID:         rand.Int(),
        Addr:       addr,
        LastActive: time.Now(),
        Buffer:     make(chan []byte, 64),
        Cancel:     cancel,
    }
    
    sm.sessions.Store(key, s)
    go s.processMessages(ctx)
    go sm.monitorSession(ctx, key, s)
    
    return s
}

func (sm *SessionManager) monitorSession(ctx context.Context, key string, s *Session) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if time.Since(s.LastActive) > sm.timeout {
                sm.sessions.Delete(key)
                s.Cancel()
                close(s.Buffer)
                return
            }
        }
    }
}

func (s *Session) processMessages(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case data, ok := <-s.Buffer:
            if !ok {
                return
            }
            var msg Message
            if err := json.Unmarshal(data, &msg); err == nil {
                msg.DeviceIP = s.Addr.IP.String()
                atomic.AddInt64(&s.MessageCnt, 1)
                atomic.AddInt64(&s.DataLen, int64(len(data)))
                out <- msg
            }
        }
    }
}

// 使用示例
func main() {
    sm := NewSessionManager(10 * time.Second)
    
    pc, err := net.ListenUDP("udp", &net.UDPAddr{Port: 8080})
    if err != nil {
        log.Fatal(err)
    }
    
    bufPool := sync.Pool{
        New: func() interface{} {
            return make([]byte, 1500)
        },
    }
    
    for {
        buf := bufPool.Get().([]byte)
        n, addr, err := pc.ReadFromUDP(buf)
        if err != nil {
            continue
        }
        
        session := sm.GetOrCreate(addr)
        session.Buffer <- buf[:n]
        bufPool.Put(buf)
    }
}

这个实现提供了:

  1. 使用sync.Map进行线程安全的会话存储
  2. 独立的会话监控协程处理超时清理
  3. 上下文管理确保资源正确释放
  4. 结构化的会话状态管理
  5. 明确的会话生命周期控制

每个UDP地址对应一个会话实例,包含独立的处理协程和状态跟踪,能够准确记录每个来源的数据统计。

回到顶部