Golang中长轮询与工作池的实现与应用

Golang中长轮询与工作池的实现与应用 你好,我正在编写一个新的服务,并试图找出最佳的解决方案。

它将是一个处理会话的 REST API 端点。用户将发出一个请求,并无限期地等待新信息。 我考虑创建一个工作池,每个工作线程都与一个用户关联,并等待直到有该用户的信息后返回。这带来了两个疑问:

  1. 如何将新信息注入到工作线程?
  2. 如何确保那些只是在等待的工作线程不会占用大量资源?

由于可以同时连接的用户数量有限,我尽量不使用 WebSocket。但如果性能相似,我可以改用 WebSocket。

2 回复

rest API 端点,用于处理 sessions。用户将发出请求并 无限期地等待新信息

听起来有些矛盾。

我不会将工作线程与用户直接关联,尤其是在你预期会有许多并发用户的情况下,除非新信息基本上是持续生成的。应该让工作线程从“新信息”中识别出属于哪个用户,然后从一个 map 中查找该用户的连接。

// 示例代码:从map中查找用户连接
func findUserConnection(userID string, connections map[string]net.Conn) (net.Conn, bool) {
    conn, exists := connections[userID]
    return conn, exists
}

更多关于Golang中长轮询与工作池的实现与应用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


对于长轮询与工作池的实现,这里提供一个基于通道和goroutine的高效方案。这个方案能有效管理资源并实现信息注入。

package main

import (
    "context"
    "fmt"
    "net/http"
    "sync"
    "time"
)

// 用户会话管理器
type SessionManager struct {
    mu          sync.RWMutex
    sessions    map[string]chan interface{} // 用户ID到消息通道的映射
    timeout     time.Duration
}

func NewSessionManager(timeout time.Duration) *SessionManager {
    return &SessionManager{
        sessions: make(map[string]chan interface{}),
        timeout:  timeout,
    }
}

// 等待消息的长轮询端点
func (sm *SessionManager) WaitForUpdate(w http.ResponseWriter, r *http.Request) {
    userID := r.URL.Query().Get("user_id")
    if userID == "" {
        http.Error(w, "user_id required", http.StatusBadRequest)
        return
    }

    // 创建或获取用户的消息通道
    sm.mu.Lock()
    ch, exists := sm.sessions[userID]
    if !exists {
        ch = make(chan interface{}, 1) // 缓冲大小为1避免阻塞
        sm.sessions[userID] = ch
    }
    sm.mu.Unlock()

    ctx, cancel := context.WithTimeout(r.Context(), sm.timeout)
    defer cancel()

    select {
    case msg := <-ch:
        // 收到消息,返回给客户端
        fmt.Fprintf(w, "data: %v\n", msg)
    case <-ctx.Done():
        // 超时或客户端断开
        w.WriteHeader(http.StatusNoContent)
    }
}

// 向特定用户发送消息
func (sm *SessionManager) SendMessage(userID string, message interface{}) bool {
    sm.mu.RLock()
    ch, exists := sm.sessions[userID]
    sm.mu.RUnlock()

    if exists {
        select {
        case ch <- message:
            return true
        default:
            // 通道已满,用户可能不在等待
            return false
        }
    }
    return false
}

// 清理空闲会话
func (sm *SessionManager) CleanupIdleSessions() {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    
    for userID, ch := range sm.sessions {
        select {
        case <-ch: // 尝试读取,如果通道为空则跳过
        default:
            // 通道为空且超过一定时间无活动,可以清理
            // 实际实现中需要更精确的空闲检测
            delete(sm.sessions, userID)
            close(ch)
        }
    }
}

// 工作池实现
type WorkerPool struct {
    maxWorkers int
    jobQueue   chan func()
    wg         sync.WaitGroup
}

func NewWorkerPool(maxWorkers int) *WorkerPool {
    return &WorkerPool{
        maxWorkers: maxWorkers,
        jobQueue:   make(chan func()),
    }
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.maxWorkers; i++ {
        wp.wg.Add(1)
        go func(workerID int) {
            defer wp.wg.Done()
            for job := range wp.jobQueue {
                job()
            }
        }(i)
    }
}

func (wp *WorkerPool) Submit(job func()) {
    wp.jobQueue <- job
}

func (wp *WorkerPool) Stop() {
    close(wp.jobQueue)
    wp.wg.Wait()
}

func main() {
    sessionMgr := NewSessionManager(30 * time.Second)
    workerPool := NewWorkerPool(100) // 100个工作goroutine
    
    workerPool.Start()
    
    http.HandleFunc("/wait", func(w http.ResponseWriter, r *http.Request) {
        workerPool.Submit(func() {
            sessionMgr.WaitForUpdate(w, r)
        })
    })
    
    http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
        userID := r.URL.Query().Get("user_id")
        message := r.URL.Query().Get("message")
        
        if sessionMgr.SendMessage(userID, message) {
            w.Write([]byte("Message sent"))
        } else {
            w.Write([]byte("User not waiting"))
        }
    })
    
    // 定期清理空闲会话
    go func() {
        ticker := time.NewTicker(5 * time.Minute)
        defer ticker.Stop()
        for range ticker.C {
            sessionMgr.CleanupIdleSessions()
        }
    }()
    
    http.ListenAndServe(":8080", nil)
}

这个实现解决了你的两个问题:

  1. 信息注入:通过SendMessage方法向特定用户的通道发送消息,等待的goroutine会立即收到
  2. 资源管理:使用带缓冲的通道和context超时机制,确保不会永久阻塞。工作池限制并发goroutine数量,定期清理空闲会话释放资源

对于性能比较,这个方案在连接数有限(几千个)时通常比WebSocket更轻量,因为不需要维持完整的双向连接。每个等待请求只是一个阻塞的goroutine,内存开销很小(约2KB每个)。

回到顶部