Golang中Map、互斥锁与协程的实战应用

Golang中Map、互斥锁与协程的实战应用 我有一个需要由多个goroutine完成的“任务”映射。所有协程必须完成所有任务,但任何任务都不能同时被多个协程执行。

我最初创建了一个互斥锁映射来处理这个问题,但随后遇到了并发问题。因此,我创建了一个带有另一个互斥锁的父容器。思路是,先锁定Mu,然后锁定和解锁任务互斥锁。但这当然也不起作用,因为如果任务互斥锁阻塞,那么容器也会被阻塞。

type Container struct {
Mu sync.mutex
tasks map[string]sync.mutex
}

我该如何编写代码,使得每个goroutine不会被不必要地阻塞,并且代码是线程安全的。

谢谢


更多关于Golang中Map、互斥锁与协程的实战应用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

9 回复

感谢你们两位提供的想法。目前,我实现了那个廉价/简单的方案。因为我眼下就需要一个解决方案。等我时间充裕一些时,会去研究一下 sync/atomic 包。

更多关于Golang中Map、互斥锁与协程的实战应用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


创建一个从第一个 goroutine 到下一个 goroutine 的通道链。第一个 goroutine 执行第一个任务,然后将下一个通道传递给下一个 goroutine,依此类推。后续的 goroutine 在前一个 goroutine 完成任务之前永远不会看到任务,因此不需要显式同步。

@mje

但是,这是否意味着 goroutine 2 只有在 goroutine 1 至少完成一项工作后才能开始?更糟糕的是,goroutine N-1 只有在 goroutine N-2 至少完成一项工作后才能开始,而 goroutine N-2 只有在 goroutine N-3 完成一些工作后才能开始,依此类推…

确保在所有这些映射访问周围包含足够的同步,以在同一同步操作中捕获测试和设置。这正是容易出错并导致竞态条件的地方。

这种底层同步的复杂性正是第一条 Go 谚语(https://go-proverbs.github.io/)的灵感来源。Go 确实提供了必要的原语,但更符合惯用法的是使用通道通信。

最后一次编辑,我保证。如果你以前没有做过这种级别的异步编码,那么值得在中等复杂的情况下多做几次,以便你能真正体会到这条谚语的含义。

是的,但简单地使用互斥锁很可能会导致相同的行为。为了从一开始就获得完全的并发性,您需要更复杂的调度机制,例如标记每个 goroutine 已完成的任务,或者每个 goroutine 维护一个已完成任务的集合,等等。或许您可以编写一个调度器 goroutine,它读取每个任务并将其提交到其他通道上的 goroutines,并跟踪哪些 goroutine 正在处理每个任务以及哪些 goroutine 已经完成了每个任务。这样可以使 goroutines 本身保持简单且无需同步。

我最初提出的简单解决方案是完全足够的,如果您有许多运行时间短的任务,那么初始并发性的损失将是微不足道的。如果您有相对较少但运行时间长的任务,那么调度器方法可能是必要的。

@tranman

你的第一个想法相当不错,从逻辑上看它似乎应该能工作,只是你的实现方式在某个环节失败了。

最简单的修复方法是再试一次,但不要使用互斥锁,而是使用 atomic.Boolatomic package - sync/atomic - Go Packages

保留任务映射,每个 goroutine G 遍历映射的键,尝试查看是否还有任务满足以下两个条件:a) 没有被其他 goroutine 处理,且 b) 尚未被 G 自身处理过。

要解决条件 b),每个 goroutine G 需要记录它已经完成了哪些任务,这很简单,因为任务名称(任务映射的键)是唯一的。

@skillian。感谢您的建议。您说得基本正确。每个消费者(我们姑且这么称呼它)必须按相同顺序执行所有任务。还有一个额外的复杂性是,这些任务的处理时间可能相差很大,从短短几秒到几分钟,甚至超过一小时。

因此,随着消费者数量的增加,当消费者 C 正在处理一个耗时的任务 n 时,消费者 A 和 B 仍然能够自由地处理任务 n+1、n+2 等,这一点将变得非常重要。

目前,我实现了一个简单的解决方案,但我确信随着系统规模扩大,它的性能不会很好。

无论如何,再次感谢,我仍在仔细考虑这个问题……

我在这方面的 Go 语言理解还不够深入。但看了您的代码后,我认为在 for 循环中为每个任务实例化一个 goroutine,并不能保证我所需要的任务执行顺序。

我来回复这个问题,因为我不太确定你什么时候能有更多时间 😊

我有一个需要由多个goroutine完成的“任务”映射。所有goroutine必须完成所有任务,但同一时间不能有超过一个goroutine运行同一个任务。

你的意思是,假设你有3个goroutine,我称之为abc,所有这3个goroutine都必须处理每个任务(例如,任务1需要abc在其上执行,任务2也需要abc在其上执行,等等)?任务的顺序重要吗(即a需要在b之前完成,b需要在c之前完成吗)?

如果是这样,我认为你只需要这样做:

type task struct { /* ... */ }

type taskResult struct {
    key string
    task task
}

results := make(chan taskResult, 8) // 根据你需要的缓冲大小设置
wg := sync.WaitGroup{}

// 从你的“主”goroutine遍历键和值;不需要加锁
for k, v := range tasks {
    wg.Add(1)
    go func(key string, task task) {
        defer wg.Done()
        // 你不需要在a、b和c之间进行任何同步,因为
        // 每个worker按顺序执行它们,但每个作业作为一个整体
        // 仍然是并发完成的
        doA(key, task)
        doB(key, task)
        doC(key, task)
        results <- taskResult{key, task}
    }(k, v)
}
go func() {
    wg.Wait()
    close(results)
}()
for _, res := range results {
    // 对结果进行处理。也许你需要更新映射?
    // 不过,你不需要任何锁,因为映射没有被并发访问。
    tasks[res.key] = res.task
}

这是一个典型的并发任务调度问题,需要确保任务互斥执行的同时避免全局锁的瓶颈。以下是几种解决方案:

方案1:使用sync.Map + sync.Mutex组合

package main

import (
    "fmt"
    "sync"
    "time"
)

type TaskManager struct {
    tasks     sync.Map  // string -> *sync.Mutex
    completed sync.Map  // 记录已完成的任务
}

func (tm *TaskManager) DoTask(taskID string, workerID int) {
    // 获取或创建任务的互斥锁
    mu, _ := tm.tasks.LoadOrStore(taskID, &sync.Mutex{})
    taskMu := mu.(*sync.Mutex)
    
    // 锁定当前任务
    taskMu.Lock()
    defer taskMu.Unlock()
    
    // 检查任务是否已完成
    if _, ok := tm.completed.Load(taskID); ok {
        fmt.Printf("Worker %d: Task %s already completed\n", workerID, taskID)
        return
    }
    
    // 执行任务
    fmt.Printf("Worker %d: Starting task %s\n", workerID, taskID)
    time.Sleep(100 * time.Millisecond) // 模拟任务执行
    fmt.Printf("Worker %d: Completed task %s\n", workerID, taskID)
    
    // 标记任务完成
    tm.completed.Store(taskID, true)
}

func main() {
    tm := &TaskManager{}
    var wg sync.WaitGroup
    
    tasks := []string{"task1", "task2", "task3", "task1", "task2", "task3"}
    
    for i, task := range tasks {
        wg.Add(1)
        go func(workerID int, taskID string) {
            defer wg.Done()
            tm.DoTask(taskID, workerID)
        }(i, task)
    }
    
    wg.Wait()
}

方案2:使用通道进行任务调度

package main

import (
    "fmt"
    "sync"
    "time"
)

type TaskScheduler struct {
    taskQueue chan string
    done      chan struct{}
    active    sync.Map // 正在执行的任务
}

func NewTaskScheduler(workers int) *TaskScheduler {
    ts := &TaskScheduler{
        taskQueue: make(chan string, 100),
        done:      make(chan struct{}),
    }
    
    // 启动worker池
    for i := 0; i < workers; i++ {
        go ts.worker(i)
    }
    
    return ts
}

func (ts *TaskScheduler) worker(id int) {
    for task := range ts.taskQueue {
        // 使用sync.Map确保任务互斥
        if _, loaded := ts.active.LoadOrStore(task, struct{}{}); loaded {
            // 任务已在执行,重新入队
            go func(t string) { ts.taskQueue <- t }(task)
            continue
        }
        
        // 执行任务
        fmt.Printf("Worker %d: Processing %s\n", id, task)
        time.Sleep(100 * time.Millisecond)
        fmt.Printf("Worker %d: Finished %s\n", id, task)
        
        // 移除活跃任务标记
        ts.active.Delete(task)
    }
}

func (ts *TaskScheduler) AddTask(task string) {
    ts.taskQueue <- task
}

func (ts *TaskScheduler) Wait() {
    close(ts.taskQueue)
    // 等待所有任务完成
    for len(ts.taskQueue) > 0 {
        time.Sleep(10 * time.Millisecond)
    }
}

func main() {
    scheduler := NewTaskScheduler(3)
    
    tasks := []string{"A", "B", "C", "A", "B", "C", "D", "E"}
    
    for _, task := range tasks {
        scheduler.AddTask(task)
    }
    
    scheduler.Wait()
}

方案3:使用单飞模式(SingleFlight)

package main

import (
    "fmt"
    "golang.org/x/sync/singleflight"
    "sync"
    "time"
)

type TaskExecutor struct {
    group singleflight.Group
    done  sync.Map
}

func (te *TaskExecutor) Execute(task string, workerID int) (interface{}, error) {
    // 检查是否已完成
    if _, ok := te.done.Load(task); ok {
        fmt.Printf("Worker %d: Task %s already done\n", workerID, task)
        return nil, nil
    }
    
    // 使用SingleFlight确保同一任务只执行一次
    result, err, _ := te.group.Do(task, func() (interface{}, error) {
        fmt.Printf("Worker %d: Executing task %s\n", workerID, task)
        time.Sleep(100 * time.Millisecond)
        
        // 标记完成
        te.done.Store(task, true)
        
        return fmt.Sprintf("Result of %s", task), nil
    })
    
    if err == nil {
        fmt.Printf("Worker %d: Got result for %s: %v\n", workerID, task, result)
    }
    
    return result, err
}

func main() {
    executor := &TaskExecutor{}
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            task := fmt.Sprintf("task%d", id%3) // 只有3个不同的任务
            executor.Execute(task, id)
        }(i)
    }
    
    wg.Wait()
}

方案4:改进的互斥锁映射方案

package main

import (
    "fmt"
    "sync"
    "time"
)

type SafeTaskMap struct {
    mu    sync.RWMutex
    tasks map[string]*taskEntry
}

type taskEntry struct {
    mu       sync.Mutex
    done     bool
    waiters  []chan struct{}
}

func NewSafeTaskMap() *SafeTaskMap {
    return &SafeTaskMap{
        tasks: make(map[string]*taskEntry),
    }
}

func (stm *SafeTaskMap) Execute(taskID string, workerID int) {
    // 快速路径:只读锁检查
    stm.mu.RLock()
    entry, exists := stm.tasks[taskID]
    stm.mu.RUnlock()
    
    if !exists {
        // 需要创建新条目
        stm.mu.Lock()
        // 双重检查
        if entry, exists = stm.tasks[taskID]; !exists {
            entry = &taskEntry{}
            stm.tasks[taskID] = entry
        }
        stm.mu.Unlock()
    }
    
    // 锁定具体任务
    entry.mu.Lock()
    
    if entry.done {
        entry.mu.Unlock()
        fmt.Printf("Worker %d: Task %s already completed\n", workerID, taskID)
        return
    }
    
    // 执行任务
    fmt.Printf("Worker %d: Starting task %s\n", workerID, taskID)
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("Worker %d: Completed task %s\n", workerID, taskID)
    
    entry.done = true
    entry.mu.Unlock()
}

func main() {
    stm := NewSafeTaskMap()
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            task := fmt.Sprintf("task%d", id%4)
            stm.Execute(task, id)
        }(i)
    }
    
    wg.Wait()
}

推荐方案1,它结合了sync.Map的并发安全特性和互斥锁的任务级保护,避免了全局锁的瓶颈。每个任务有自己的互斥锁,不同任务可以并行执行,相同任务会串行执行,且已完成的任务会快速返回。

回到顶部