Golang中长轮询与工作池的实现与应用
Golang中长轮询与工作池的实现与应用 你好,我正在编写一个新的服务,并试图找出最佳的解决方案。
它将是一个处理会话的 REST API 端点。用户将发出一个请求,并无限期地等待新信息。 我考虑创建一个工作池,每个工作线程都与一个用户关联,并等待直到有该用户的信息后返回。这带来了两个疑问:
- 如何将新信息注入到工作线程?
- 如何确保那些只是在等待的工作线程不会占用大量资源?
由于可以同时连接的用户数量有限,我尽量不使用 WebSocket。但如果性能相似,我可以改用 WebSocket。
2 回复
rest API 端点,用于处理 sessions。用户将发出请求并 无限期地等待新信息。
听起来有些矛盾。
我不会将工作线程与用户直接关联,尤其是在你预期会有许多并发用户的情况下,除非新信息基本上是持续生成的。应该让工作线程从“新信息”中识别出属于哪个用户,然后从一个 map 中查找该用户的连接。
// 示例代码:从map中查找用户连接
func findUserConnection(userID string, connections map[string]net.Conn) (net.Conn, bool) {
conn, exists := connections[userID]
return conn, exists
}
更多关于Golang中长轮询与工作池的实现与应用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
对于长轮询与工作池的实现,这里提供一个基于通道和goroutine的高效方案。这个方案能有效管理资源并实现信息注入。
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
)
// 用户会话管理器
type SessionManager struct {
mu sync.RWMutex
sessions map[string]chan interface{} // 用户ID到消息通道的映射
timeout time.Duration
}
func NewSessionManager(timeout time.Duration) *SessionManager {
return &SessionManager{
sessions: make(map[string]chan interface{}),
timeout: timeout,
}
}
// 等待消息的长轮询端点
func (sm *SessionManager) WaitForUpdate(w http.ResponseWriter, r *http.Request) {
userID := r.URL.Query().Get("user_id")
if userID == "" {
http.Error(w, "user_id required", http.StatusBadRequest)
return
}
// 创建或获取用户的消息通道
sm.mu.Lock()
ch, exists := sm.sessions[userID]
if !exists {
ch = make(chan interface{}, 1) // 缓冲大小为1避免阻塞
sm.sessions[userID] = ch
}
sm.mu.Unlock()
ctx, cancel := context.WithTimeout(r.Context(), sm.timeout)
defer cancel()
select {
case msg := <-ch:
// 收到消息,返回给客户端
fmt.Fprintf(w, "data: %v\n", msg)
case <-ctx.Done():
// 超时或客户端断开
w.WriteHeader(http.StatusNoContent)
}
}
// 向特定用户发送消息
func (sm *SessionManager) SendMessage(userID string, message interface{}) bool {
sm.mu.RLock()
ch, exists := sm.sessions[userID]
sm.mu.RUnlock()
if exists {
select {
case ch <- message:
return true
default:
// 通道已满,用户可能不在等待
return false
}
}
return false
}
// 清理空闲会话
func (sm *SessionManager) CleanupIdleSessions() {
sm.mu.Lock()
defer sm.mu.Unlock()
for userID, ch := range sm.sessions {
select {
case <-ch: // 尝试读取,如果通道为空则跳过
default:
// 通道为空且超过一定时间无活动,可以清理
// 实际实现中需要更精确的空闲检测
delete(sm.sessions, userID)
close(ch)
}
}
}
// 工作池实现
type WorkerPool struct {
maxWorkers int
jobQueue chan func()
wg sync.WaitGroup
}
func NewWorkerPool(maxWorkers int) *WorkerPool {
return &WorkerPool{
maxWorkers: maxWorkers,
jobQueue: make(chan func()),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.maxWorkers; i++ {
wp.wg.Add(1)
go func(workerID int) {
defer wp.wg.Done()
for job := range wp.jobQueue {
job()
}
}(i)
}
}
func (wp *WorkerPool) Submit(job func()) {
wp.jobQueue <- job
}
func (wp *WorkerPool) Stop() {
close(wp.jobQueue)
wp.wg.Wait()
}
func main() {
sessionMgr := NewSessionManager(30 * time.Second)
workerPool := NewWorkerPool(100) // 100个工作goroutine
workerPool.Start()
http.HandleFunc("/wait", func(w http.ResponseWriter, r *http.Request) {
workerPool.Submit(func() {
sessionMgr.WaitForUpdate(w, r)
})
})
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
userID := r.URL.Query().Get("user_id")
message := r.URL.Query().Get("message")
if sessionMgr.SendMessage(userID, message) {
w.Write([]byte("Message sent"))
} else {
w.Write([]byte("User not waiting"))
}
})
// 定期清理空闲会话
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
sessionMgr.CleanupIdleSessions()
}
}()
http.ListenAndServe(":8080", nil)
}
这个实现解决了你的两个问题:
- 信息注入:通过
SendMessage方法向特定用户的通道发送消息,等待的goroutine会立即收到 - 资源管理:使用带缓冲的通道和context超时机制,确保不会永久阻塞。工作池限制并发goroutine数量,定期清理空闲会话释放资源
对于性能比较,这个方案在连接数有限(几千个)时通常比WebSocket更轻量,因为不需要维持完整的双向连接。每个等待请求只是一个阻塞的goroutine,内存开销很小(约2KB每个)。

