Golang中UDP服务器架构设计问题求助

Golang中UDP服务器架构设计问题求助 在过去的几年里,我一直在开发一个简单的UDP数据包服务器,计划用于一个业余爱好者的视频游戏。我最初使用C++和标准UDP套接字编写了这个服务器。(我的最后一个版本可以在这里查看

我的C++架构基本如下:

  • 包含序列化游戏数据的UDP数据包,根据操作码进行解码。
  • 一个阻塞的收集器线程,专门负责直接从套接字拉取数据包,然后将数据包传递给工作线程进行处理。
  • [X] 工作线程检查数据包操作码,将数据包转换为相应的数据包类型,然后根据类型调用处理函数。

在我看来,这似乎是开发服务器架构一个非常标准的起点,而且似乎运行得相当不错。一段时间后,我对代码日益增长的复杂性感到有些厌倦,并想知道是否有更好的语言可以用来开发服务器。这让我接触到了Go,我开始对学习这门语言并尝试用它构建我的服务器产生了兴趣。

我基本上已经让新版本的服务器(在这里查看)达到了与我的C++服务器相同的功能水平,现在我正面临几个关键的设计决策,希望得到一些有经验的建议:

目前我的服务器基本上是我在C++中所做工作的翻版:

  • 我有几乎相同用途的数据包类型。
  • 一个收集器goroutine,它只是等待套接字上的数据包,并将它们放入一个Go通道中。
  • [X] 处理程序goroutine,等待数据包到达通道,并根据其数据包类型进行处理。

这是在Go中正确的做法吗?我仍在学习goroutine的复杂性,甚至还没有为数据库等设置互斥锁/信号量……我希望在这里能得到一些建议,在我深入这个架构之前为我指明正确的方向。

话虽如此,它目前是能工作的,而且我可能已经在遵循良好的设计了!我期待你们的回复。


更多关于Golang中UDP服务器架构设计问题求助的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

我会首先摒弃使用通道,而是让 Collect 为每个到达的数据包启动一个新的 goroutine 来处理。

func main() {
    fmt.Println("hello world")
}

更多关于Golang中UDP服务器架构设计问题求助的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


@amnon 你认为启动一个新的 goroutine 会有多大开销?

这是一个很好的例子,实际上我只是不知道在惯用的 Go 设计中,期望是怎样的。

开销出人意料地小……为每条消息启动一个新的 Goroutine 是非常地道的做法。 你预期的消息速率是多少?我预计你每秒能够处理数千条消息而没有任何问题。

尝试一下并对性能进行基准测试。

我可以尝试一下。我还需要创建一个更科学的基准测试,用于收集处理给定数据包的平均时间。特别是如果我能混合数据包类型并在它们背后分配更多逻辑,我将能够得出一个平均处理时间。

我还有一个问题是,创建比线程更多的goroutine是否值得。我知道goroutine不依赖于线程,但我假设随着并发数量的增长,任何单个goroutine的执行总延迟会随着调度尝试而变得越来越大。

目前,我仅生成足够多的goroutine来饱和我电脑上的最大并行线程数,结合Go通道,我认为这将限制任何单个数据包执行的延迟。这也将允许我保持某种自然的数据包顺序(而不是显式地为每个数据包编号)。

你的UDP服务器架构在Go中的实现方向基本正确,利用了goroutine和channel的并发特性。以下是一个更符合Go惯用法的示例实现,包含必要的并发控制和错误处理:

package main

import (
    "encoding/binary"
    "errors"
    "fmt"
    "net"
    "sync"
)

const (
    maxPacketSize = 1500
    workerCount   = 4
)

type PacketType uint8

const (
    PacketTypeMove PacketType = iota + 1
    PacketTypeAttack
    PacketTypeChat
)

type Packet struct {
    ClientAddr *net.UDPAddr
    Type       PacketType
    Data       []byte
}

type Server struct {
    conn        *net.UDPConn
    packetChan  chan Packet
    wg          sync.WaitGroup
    quit        chan struct{}
    handlers    map[PacketType]func(Packet)
    handlerLock sync.RWMutex
}

func NewServer(port int) (*Server, error) {
    addr := &net.UDPAddr{Port: port}
    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        return nil, err
    }

    s := &Server{
        conn:       conn,
        packetChan: make(chan Packet, 1024),
        quit:       make(chan struct{}),
        handlers:   make(map[PacketType]func(Packet)),
    }

    s.registerDefaultHandlers()
    return s, nil
}

func (s *Server) registerDefaultHandlers() {
    s.RegisterHandler(PacketTypeMove, s.handleMove)
    s.RegisterHandler(PacketTypeAttack, s.handleAttack)
    s.RegisterHandler(PacketTypeChat, s.handleChat)
}

func (s *Server) RegisterHandler(packetType PacketType, handler func(Packet)) {
    s.handlerLock.Lock()
    defer s.handlerLock.Unlock()
    s.handlers[packetType] = handler
}

func (s *Server) Start() {
    s.wg.Add(workerCount + 1)
    
    go s.receiver()
    
    for i := 0; i < workerCount; i++ {
        go s.worker(i)
    }
}

func (s *Server) receiver() {
    defer s.wg.Done()
    
    buffer := make([]byte, maxPacketSize)
    
    for {
        select {
        case <-s.quit:
            return
        default:
            n, addr, err := s.conn.ReadFromUDP(buffer)
            if err != nil {
                if !errors.Is(err, net.ErrClosed) {
                    fmt.Printf("Read error: %v\n", err)
                }
                continue
            }
            
            if n < 1 {
                continue
            }
            
            packetType := PacketType(buffer[0])
            data := make([]byte, n-1)
            copy(data, buffer[1:n])
            
            s.packetChan <- Packet{
                ClientAddr: addr,
                Type:       packetType,
                Data:       data,
            }
        }
    }
}

func (s *Server) worker(id int) {
    defer s.wg.Done()
    
    for {
        select {
        case <-s.quit:
            return
        case packet := <-s.packetChan:
            s.handlerLock.RLock()
            handler, exists := s.handlers[packet.Type]
            s.handlerLock.RUnlock()
            
            if exists {
                handler(packet)
            } else {
                fmt.Printf("Worker %d: No handler for packet type %d\n", id, packet.Type)
            }
        }
    }
}

func (s *Server) handleMove(packet Packet) {
    if len(packet.Data) < 8 {
        return
    }
    x := binary.LittleEndian.Uint32(packet.Data[0:4])
    y := binary.LittleEndian.Uint32(packet.Data[4:8])
    fmt.Printf("Client %s moved to (%d, %d)\n", packet.ClientAddr.String(), x, y)
}

func (s *Server) handleAttack(packet Packet) {
    if len(packet.Data) < 4 {
        return
    }
    targetID := binary.LittleEndian.Uint32(packet.Data[0:4])
    fmt.Printf("Client %s attacked target %d\n", packet.ClientAddr.String(), targetID)
}

func (s *Server) handleChat(packet Packet) {
    fmt.Printf("Client %s: %s\n", packet.ClientAddr.String(), string(packet.Data))
}

func (s *Server) Stop() {
    close(s.quit)
    s.conn.Close()
    s.wg.Wait()
}

func main() {
    server, err := NewServer(8080)
    if err != nil {
        panic(err)
    }
    
    server.Start()
    defer server.Stop()
    
    select {}
}

这个实现展示了几个关键点:

  1. 结构化设计:使用Server结构体封装状态和方法
  2. 并发安全:使用sync.RWMutex保护handlers映射的并发访问
  3. 优雅关闭:通过quit通道和WaitGroup实现有序关闭
  4. 工作池模式:固定数量的worker goroutine处理数据包
  5. 处理器注册:支持动态注册和替换数据包处理器

对于数据库访问,可以在Server结构体中添加数据库连接池:

type Server struct {
    // ... 现有字段
    db        *sql.DB
    dbLock    sync.RWMutex
}

func (s *Server) QueryPlayerData(playerID uint32) (PlayerData, error) {
    s.dbLock.RLock()
    defer s.dbLock.RUnlock()
    
    var data PlayerData
    err := s.db.QueryRow("SELECT * FROM players WHERE id = ?", playerID).Scan(&data)
    return data, err
}

这种架构利用了Go的并发原语,避免了显式的线程管理,通过channel进行goroutine间通信,符合Go的并发模式。

回到顶部