Golang中UDP服务器架构设计问题求助
Golang中UDP服务器架构设计问题求助 在过去的几年里,我一直在开发一个简单的UDP数据包服务器,计划用于一个业余爱好者的视频游戏。我最初使用C++和标准UDP套接字编写了这个服务器。(我的最后一个版本可以在这里查看)
我的C++架构基本如下:
- 包含序列化游戏数据的UDP数据包,根据操作码进行解码。
- 一个阻塞的收集器线程,专门负责直接从套接字拉取数据包,然后将数据包传递给工作线程进行处理。
- [X] 工作线程检查数据包操作码,将数据包转换为相应的数据包类型,然后根据类型调用处理函数。
在我看来,这似乎是开发服务器架构一个非常标准的起点,而且似乎运行得相当不错。一段时间后,我对代码日益增长的复杂性感到有些厌倦,并想知道是否有更好的语言可以用来开发服务器。这让我接触到了Go,我开始对学习这门语言并尝试用它构建我的服务器产生了兴趣。
我基本上已经让新版本的服务器(在这里查看)达到了与我的C++服务器相同的功能水平,现在我正面临几个关键的设计决策,希望得到一些有经验的建议:
目前我的服务器基本上是我在C++中所做工作的翻版:
- 我有几乎相同用途的数据包类型。
- 一个收集器goroutine,它只是等待套接字上的数据包,并将它们放入一个Go通道中。
- [X] 处理程序goroutine,等待数据包到达通道,并根据其数据包类型进行处理。
这是在Go中正确的做法吗?我仍在学习goroutine的复杂性,甚至还没有为数据库等设置互斥锁/信号量……我希望在这里能得到一些建议,在我深入这个架构之前为我指明正确的方向。
话虽如此,它目前是能工作的,而且我可能已经在遵循良好的设计了!我期待你们的回复。
更多关于Golang中UDP服务器架构设计问题求助的实战教程也可以访问 https://www.itying.com/category-94-b0.html
我会首先摒弃使用通道,而是让 Collect 为每个到达的数据包启动一个新的 goroutine 来处理。
func main() {
fmt.Println("hello world")
}
更多关于Golang中UDP服务器架构设计问题求助的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
开销出人意料地小……为每条消息启动一个新的 Goroutine 是非常地道的做法。 你预期的消息速率是多少?我预计你每秒能够处理数千条消息而没有任何问题。
尝试一下并对性能进行基准测试。
我可以尝试一下。我还需要创建一个更科学的基准测试,用于收集处理给定数据包的平均时间。特别是如果我能混合数据包类型并在它们背后分配更多逻辑,我将能够得出一个平均处理时间。
我还有一个问题是,创建比线程更多的goroutine是否值得。我知道goroutine不依赖于线程,但我假设随着并发数量的增长,任何单个goroutine的执行总延迟会随着调度尝试而变得越来越大。
目前,我仅生成足够多的goroutine来饱和我电脑上的最大并行线程数,结合Go通道,我认为这将限制任何单个数据包执行的延迟。这也将允许我保持某种自然的数据包顺序(而不是显式地为每个数据包编号)。
你的UDP服务器架构在Go中的实现方向基本正确,利用了goroutine和channel的并发特性。以下是一个更符合Go惯用法的示例实现,包含必要的并发控制和错误处理:
package main
import (
"encoding/binary"
"errors"
"fmt"
"net"
"sync"
)
const (
maxPacketSize = 1500
workerCount = 4
)
type PacketType uint8
const (
PacketTypeMove PacketType = iota + 1
PacketTypeAttack
PacketTypeChat
)
type Packet struct {
ClientAddr *net.UDPAddr
Type PacketType
Data []byte
}
type Server struct {
conn *net.UDPConn
packetChan chan Packet
wg sync.WaitGroup
quit chan struct{}
handlers map[PacketType]func(Packet)
handlerLock sync.RWMutex
}
func NewServer(port int) (*Server, error) {
addr := &net.UDPAddr{Port: port}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
s := &Server{
conn: conn,
packetChan: make(chan Packet, 1024),
quit: make(chan struct{}),
handlers: make(map[PacketType]func(Packet)),
}
s.registerDefaultHandlers()
return s, nil
}
func (s *Server) registerDefaultHandlers() {
s.RegisterHandler(PacketTypeMove, s.handleMove)
s.RegisterHandler(PacketTypeAttack, s.handleAttack)
s.RegisterHandler(PacketTypeChat, s.handleChat)
}
func (s *Server) RegisterHandler(packetType PacketType, handler func(Packet)) {
s.handlerLock.Lock()
defer s.handlerLock.Unlock()
s.handlers[packetType] = handler
}
func (s *Server) Start() {
s.wg.Add(workerCount + 1)
go s.receiver()
for i := 0; i < workerCount; i++ {
go s.worker(i)
}
}
func (s *Server) receiver() {
defer s.wg.Done()
buffer := make([]byte, maxPacketSize)
for {
select {
case <-s.quit:
return
default:
n, addr, err := s.conn.ReadFromUDP(buffer)
if err != nil {
if !errors.Is(err, net.ErrClosed) {
fmt.Printf("Read error: %v\n", err)
}
continue
}
if n < 1 {
continue
}
packetType := PacketType(buffer[0])
data := make([]byte, n-1)
copy(data, buffer[1:n])
s.packetChan <- Packet{
ClientAddr: addr,
Type: packetType,
Data: data,
}
}
}
}
func (s *Server) worker(id int) {
defer s.wg.Done()
for {
select {
case <-s.quit:
return
case packet := <-s.packetChan:
s.handlerLock.RLock()
handler, exists := s.handlers[packet.Type]
s.handlerLock.RUnlock()
if exists {
handler(packet)
} else {
fmt.Printf("Worker %d: No handler for packet type %d\n", id, packet.Type)
}
}
}
}
func (s *Server) handleMove(packet Packet) {
if len(packet.Data) < 8 {
return
}
x := binary.LittleEndian.Uint32(packet.Data[0:4])
y := binary.LittleEndian.Uint32(packet.Data[4:8])
fmt.Printf("Client %s moved to (%d, %d)\n", packet.ClientAddr.String(), x, y)
}
func (s *Server) handleAttack(packet Packet) {
if len(packet.Data) < 4 {
return
}
targetID := binary.LittleEndian.Uint32(packet.Data[0:4])
fmt.Printf("Client %s attacked target %d\n", packet.ClientAddr.String(), targetID)
}
func (s *Server) handleChat(packet Packet) {
fmt.Printf("Client %s: %s\n", packet.ClientAddr.String(), string(packet.Data))
}
func (s *Server) Stop() {
close(s.quit)
s.conn.Close()
s.wg.Wait()
}
func main() {
server, err := NewServer(8080)
if err != nil {
panic(err)
}
server.Start()
defer server.Stop()
select {}
}
这个实现展示了几个关键点:
- 结构化设计:使用
Server结构体封装状态和方法 - 并发安全:使用
sync.RWMutex保护handlers映射的并发访问 - 优雅关闭:通过
quit通道和WaitGroup实现有序关闭 - 工作池模式:固定数量的worker goroutine处理数据包
- 处理器注册:支持动态注册和替换数据包处理器
对于数据库访问,可以在Server结构体中添加数据库连接池:
type Server struct {
// ... 现有字段
db *sql.DB
dbLock sync.RWMutex
}
func (s *Server) QueryPlayerData(playerID uint32) (PlayerData, error) {
s.dbLock.RLock()
defer s.dbLock.RUnlock()
var data PlayerData
err := s.db.QueryRow("SELECT * FROM players WHERE id = ?", playerID).Scan(&data)
return data, err
}
这种架构利用了Go的并发原语,避免了显式的线程管理,通过channel进行goroutine间通信,符合Go的并发模式。


