Golang共享工作池的实现与应用

Golang共享工作池的实现与应用 大家好!

这是我在论坛的第一篇帖子 🙂

我有个问题,在别处都没找到合适的答案。

我知道如何用 Go 协程轻松构建工作池。我的问题与共享工作池有关。

假设我有一个网络服务器,不同用户的请求不断涌入。我的想法是构建一个共享工作池,也就是所有网络会话共享的池。

创建池没有问题,我只是在思考收集结果的最佳方式。我的意思是,如果让所有工作线程都向同一个通道写入结果会很混乱,因为同一个通道会收到来自不同会话的结果。我是否应该为每个会话提供一个"私有"通道,让工作线程向这个通道写入?

另外,在共享池中如何实现等待条件?目前所有示例都考虑使用单一任务通道,当不需要更多任务时关闭通道。但共享池呢?假设我向工作线程通道提交了两个任务,如何让会话等待所有工作线程完成这些请求的处理?

提前感谢任何支持,抱歉问了这么多问题!


更多关于Golang共享工作池的实现与应用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

4 回复

感谢您的反馈 @lutzhorn 🙂

更多关于Golang共享工作池的实现与应用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


请记住,goroutine 的创建成本很低。在许多情况下,管理 goroutine 池并没有意义;相反,在需要时创建一个,并在完成后让其自行结束。如果目标是限制并发工作量,可以使用某种信号量机制。

mklein:

我应该为每个会话提供一个"私有"通道,这样工作器就会向这个通道写入数据吗?

是的,创建一个会话将接收结果的通道。工作器将通过这个通道发送单个结果,然后可以关闭它。

mklein:

目前所有的示例都考虑使用单一的任务通道,当不再需要任务时这些通道会被关闭。但是对于共享池的情况呢?

对于服务器来说,当任务通道为空时终止工作器是没有意义的。这种情况随时可能发生。你需要对系统关闭做出响应。只有在系统关闭时才应该关闭任务通道,这将导致工作器终止。

在共享工作池的实现中,处理结果收集和等待条件确实是关键问题。以下是一个完整的实现方案:

1. 工作池与任务结构定义

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Task 表示一个工作任务
type Task struct {
    ID       string
    Data     interface{}
    ResultCh chan<- Result
    Session  string // 标识属于哪个会话
}

// Result 表示任务执行结果
type Result struct {
    TaskID string
    Data   interface{}
    Error  error
}

// WorkerPool 工作池
type WorkerPool struct {
    tasks    chan Task
    workers  int
    wg       sync.WaitGroup
    stop     chan struct{}
}

2. 工作池实现

// NewWorkerPool 创建新的工作池
func NewWorkerPool(workers int) *WorkerPool {
    pool := &WorkerPool{
        tasks:   make(chan Task, 100),
        workers: workers,
        stop:    make(chan struct{}),
    }
    
    pool.wg.Add(workers)
    for i := 0; i < workers; i++ {
        go pool.worker(i)
    }
    
    return pool
}

// worker 工作协程
func (p *WorkerPool) worker(id int) {
    defer p.wg.Done()
    
    for {
        select {
        case task := <-p.tasks:
            // 模拟工作处理
            result := p.processTask(task)
            // 将结果发送到该任务的私有通道
            task.ResultCh <- result
            
        case <-p.stop:
            return
        }
    }
}

// processTask 处理具体任务
func (p *WorkerPool) processTask(task Task) Result {
    // 模拟处理时间
    time.Sleep(100 * time.Millisecond)
    
    return Result{
        TaskID: task.ID,
        Data:   fmt.Sprintf("Processed: %v", task.Data),
        Error:  nil,
    }
}

// Submit 提交任务到工作池
func (p *WorkerPool) Submit(task Task) {
    p.tasks <- task
}

// Stop 停止工作池
func (p *WorkerPool) Stop() {
    close(p.stop)
    p.wg.Wait()
}

3. 会话管理

// Session 表示一个网络会话
type Session struct {
    ID       string
    pool     *WorkerPool
    tasks    []string
    results  map[string]Result
    resultCh chan Result
    mu       sync.RWMutex
    wg       sync.WaitGroup
}

// NewSession 创建新会话
func NewSession(id string, pool *WorkerPool) *Session {
    return &Session{
        ID:       id,
        pool:     pool,
        results:  make(map[string]Result),
        resultCh: make(chan Result, 10),
    }
}

// SubmitTask 提交任务并等待结果
func (s *Session) SubmitTask(taskID string, data interface{}) {
    s.mu.Lock()
    s.tasks = append(s.tasks, taskID)
    s.mu.Unlock()
    
    s.wg.Add(1)
    
    // 创建任务并指定结果通道
    task := Task{
        ID:       taskID,
        Data:     data,
        ResultCh: s.resultCh,
        Session:  s.ID,
    }
    
    go s.pool.Submit(task)
}

// WaitForResults 等待所有任务完成并收集结果
func (s *Session) WaitForResults(timeout time.Duration) (map[string]Result, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    
    go func() {
        s.wg.Wait()
        cancel()
    }()
    
    completed := make(map[string]bool)
    
    for {
        select {
        case result := <-s.resultCh:
            s.mu.Lock()
            s.results[result.TaskID] = result
            completed[result.TaskID] = true
            s.mu.Unlock()
            s.wg.Done()
            
            // 检查是否所有任务都完成了
            if s.allTasksCompleted(completed) {
                return s.results, nil
            }
            
        case <-ctx.Done():
            return s.results, ctx.Err()
        }
    }
}

// allTasksCompleted 检查所有任务是否完成
func (s *Session) allTasksCompleted(completed map[string]bool) bool {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    for _, taskID := range s.tasks {
        if !completed[taskID] {
            return false
        }
    }
    return true
}

4. 使用示例

func main() {
    // 创建共享工作池
    pool := NewWorkerPool(5)
    defer pool.Stop()
    
    // 模拟多个会话
    sessions := make([]*Session, 3)
    for i := 0; i < 3; i++ {
        sessions[i] = NewSession(fmt.Sprintf("session-%d", i), pool)
    }
    
    var wg sync.WaitGroup
    
    // 每个会话提交多个任务
    for i, session := range sessions {
        wg.Add(1)
        go func(s *Session, idx int) {
            defer wg.Done()
            
            // 提交3个任务
            for j := 0; j < 3; j++ {
                taskID := fmt.Sprintf("task-%d-%d", idx, j)
                s.SubmitTask(taskID, fmt.Sprintf("data-%d-%d", idx, j))
            }
            
            // 等待结果
            results, err := s.WaitForResults(5 * time.Second)
            if err != nil {
                fmt.Printf("Session %s error: %v\n", s.ID, err)
                return
            }
            
            fmt.Printf("Session %s results:\n", s.ID)
            for taskID, result := range results {
                fmt.Printf("  %s: %v\n", taskID, result.Data)
            }
        }(session, i)
    }
    
    wg.Wait()
}

关键设计要点

  1. 私有结果通道:每个会话有自己的 resultCh,避免结果混淆
  2. 等待机制:使用 sync.WaitGroup 和超时控制确保会话能等待所有任务完成
  3. 会话隔离:通过 Session 结构维护任务状态和结果
  4. 共享池效率:多个会话共享同一个工作池,提高资源利用率

这种设计既保持了工作池的共享特性,又确保了会话间的结果隔离和正确的等待语义。

回到顶部