Golang中Map、互斥锁与协程的实战应用
Golang中Map、互斥锁与协程的实战应用 我有一个需要由多个goroutine完成的“任务”映射。所有协程必须完成所有任务,但任何任务都不能同时被多个协程执行。
我最初创建了一个互斥锁映射来处理这个问题,但随后遇到了并发问题。因此,我创建了一个带有另一个互斥锁的父容器。思路是,先锁定Mu,然后锁定和解锁任务互斥锁。但这当然也不起作用,因为如果任务互斥锁阻塞,那么容器也会被阻塞。
type Container struct {
Mu sync.mutex
tasks map[string]sync.mutex
}
我该如何编写代码,使得每个goroutine不会被不必要地阻塞,并且代码是线程安全的。
谢谢
更多关于Golang中Map、互斥锁与协程的实战应用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
感谢你们两位提供的想法。目前,我实现了那个廉价/简单的方案。因为我眼下就需要一个解决方案。等我时间充裕一些时,会去研究一下 sync/atomic 包。
更多关于Golang中Map、互斥锁与协程的实战应用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
创建一个从第一个 goroutine 到下一个 goroutine 的通道链。第一个 goroutine 执行第一个任务,然后将下一个通道传递给下一个 goroutine,依此类推。后续的 goroutine 在前一个 goroutine 完成任务之前永远不会看到任务,因此不需要显式同步。
但是,这是否意味着 goroutine 2 只有在 goroutine 1 至少完成一项工作后才能开始?更糟糕的是,goroutine N-1 只有在 goroutine N-2 至少完成一项工作后才能开始,而 goroutine N-2 只有在 goroutine N-3 完成一些工作后才能开始,依此类推…
确保在所有这些映射访问周围包含足够的同步,以在同一同步操作中捕获测试和设置。这正是容易出错并导致竞态条件的地方。
这种底层同步的复杂性正是第一条 Go 谚语(https://go-proverbs.github.io/)的灵感来源。Go 确实提供了必要的原语,但更符合惯用法的是使用通道通信。
最后一次编辑,我保证。如果你以前没有做过这种级别的异步编码,那么值得在中等复杂的情况下多做几次,以便你能真正体会到这条谚语的含义。
你的第一个想法相当不错,从逻辑上看它似乎应该能工作,只是你的实现方式在某个环节失败了。
最简单的修复方法是再试一次,但不要使用互斥锁,而是使用 atomic.Bool:atomic package - sync/atomic - Go Packages
保留任务映射,每个 goroutine G 遍历映射的键,尝试查看是否还有任务满足以下两个条件:a) 没有被其他 goroutine 处理,且 b) 尚未被 G 自身处理过。
要解决条件 b),每个 goroutine G 需要记录它已经完成了哪些任务,这很简单,因为任务名称(任务映射的键)是唯一的。
@skillian。感谢您的建议。您说得基本正确。每个消费者(我们姑且这么称呼它)必须按相同顺序执行所有任务。还有一个额外的复杂性是,这些任务的处理时间可能相差很大,从短短几秒到几分钟,甚至超过一小时。
因此,随着消费者数量的增加,当消费者 C 正在处理一个耗时的任务 n 时,消费者 A 和 B 仍然能够自由地处理任务 n+1、n+2 等,这一点将变得非常重要。
目前,我实现了一个简单的解决方案,但我确信随着系统规模扩大,它的性能不会很好。
无论如何,再次感谢,我仍在仔细考虑这个问题……
我在这方面的 Go 语言理解还不够深入。但看了您的代码后,我认为在 for 循环中为每个任务实例化一个 goroutine,并不能保证我所需要的任务执行顺序。
我来回复这个问题,因为我不太确定你什么时候能有更多时间 😊
我有一个需要由多个goroutine完成的“任务”映射。所有goroutine必须完成所有任务,但同一时间不能有超过一个goroutine运行同一个任务。
你的意思是,假设你有3个goroutine,我称之为a、b和c,所有这3个goroutine都必须处理每个任务(例如,任务1需要a、b和c在其上执行,任务2也需要a、b和c在其上执行,等等)?任务的顺序重要吗(即a需要在b之前完成,b需要在c之前完成吗)?
如果是这样,我认为你只需要这样做:
type task struct { /* ... */ }
type taskResult struct {
key string
task task
}
results := make(chan taskResult, 8) // 根据你需要的缓冲大小设置
wg := sync.WaitGroup{}
// 从你的“主”goroutine遍历键和值;不需要加锁
for k, v := range tasks {
wg.Add(1)
go func(key string, task task) {
defer wg.Done()
// 你不需要在a、b和c之间进行任何同步,因为
// 每个worker按顺序执行它们,但每个作业作为一个整体
// 仍然是并发完成的
doA(key, task)
doB(key, task)
doC(key, task)
results <- taskResult{key, task}
}(k, v)
}
go func() {
wg.Wait()
close(results)
}()
for _, res := range results {
// 对结果进行处理。也许你需要更新映射?
// 不过,你不需要任何锁,因为映射没有被并发访问。
tasks[res.key] = res.task
}
这是一个典型的并发任务调度问题,需要确保任务互斥执行的同时避免全局锁的瓶颈。以下是几种解决方案:
方案1:使用sync.Map + sync.Mutex组合
package main
import (
"fmt"
"sync"
"time"
)
type TaskManager struct {
tasks sync.Map // string -> *sync.Mutex
completed sync.Map // 记录已完成的任务
}
func (tm *TaskManager) DoTask(taskID string, workerID int) {
// 获取或创建任务的互斥锁
mu, _ := tm.tasks.LoadOrStore(taskID, &sync.Mutex{})
taskMu := mu.(*sync.Mutex)
// 锁定当前任务
taskMu.Lock()
defer taskMu.Unlock()
// 检查任务是否已完成
if _, ok := tm.completed.Load(taskID); ok {
fmt.Printf("Worker %d: Task %s already completed\n", workerID, taskID)
return
}
// 执行任务
fmt.Printf("Worker %d: Starting task %s\n", workerID, taskID)
time.Sleep(100 * time.Millisecond) // 模拟任务执行
fmt.Printf("Worker %d: Completed task %s\n", workerID, taskID)
// 标记任务完成
tm.completed.Store(taskID, true)
}
func main() {
tm := &TaskManager{}
var wg sync.WaitGroup
tasks := []string{"task1", "task2", "task3", "task1", "task2", "task3"}
for i, task := range tasks {
wg.Add(1)
go func(workerID int, taskID string) {
defer wg.Done()
tm.DoTask(taskID, workerID)
}(i, task)
}
wg.Wait()
}
方案2:使用通道进行任务调度
package main
import (
"fmt"
"sync"
"time"
)
type TaskScheduler struct {
taskQueue chan string
done chan struct{}
active sync.Map // 正在执行的任务
}
func NewTaskScheduler(workers int) *TaskScheduler {
ts := &TaskScheduler{
taskQueue: make(chan string, 100),
done: make(chan struct{}),
}
// 启动worker池
for i := 0; i < workers; i++ {
go ts.worker(i)
}
return ts
}
func (ts *TaskScheduler) worker(id int) {
for task := range ts.taskQueue {
// 使用sync.Map确保任务互斥
if _, loaded := ts.active.LoadOrStore(task, struct{}{}); loaded {
// 任务已在执行,重新入队
go func(t string) { ts.taskQueue <- t }(task)
continue
}
// 执行任务
fmt.Printf("Worker %d: Processing %s\n", id, task)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Worker %d: Finished %s\n", id, task)
// 移除活跃任务标记
ts.active.Delete(task)
}
}
func (ts *TaskScheduler) AddTask(task string) {
ts.taskQueue <- task
}
func (ts *TaskScheduler) Wait() {
close(ts.taskQueue)
// 等待所有任务完成
for len(ts.taskQueue) > 0 {
time.Sleep(10 * time.Millisecond)
}
}
func main() {
scheduler := NewTaskScheduler(3)
tasks := []string{"A", "B", "C", "A", "B", "C", "D", "E"}
for _, task := range tasks {
scheduler.AddTask(task)
}
scheduler.Wait()
}
方案3:使用单飞模式(SingleFlight)
package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"sync"
"time"
)
type TaskExecutor struct {
group singleflight.Group
done sync.Map
}
func (te *TaskExecutor) Execute(task string, workerID int) (interface{}, error) {
// 检查是否已完成
if _, ok := te.done.Load(task); ok {
fmt.Printf("Worker %d: Task %s already done\n", workerID, task)
return nil, nil
}
// 使用SingleFlight确保同一任务只执行一次
result, err, _ := te.group.Do(task, func() (interface{}, error) {
fmt.Printf("Worker %d: Executing task %s\n", workerID, task)
time.Sleep(100 * time.Millisecond)
// 标记完成
te.done.Store(task, true)
return fmt.Sprintf("Result of %s", task), nil
})
if err == nil {
fmt.Printf("Worker %d: Got result for %s: %v\n", workerID, task, result)
}
return result, err
}
func main() {
executor := &TaskExecutor{}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task := fmt.Sprintf("task%d", id%3) // 只有3个不同的任务
executor.Execute(task, id)
}(i)
}
wg.Wait()
}
方案4:改进的互斥锁映射方案
package main
import (
"fmt"
"sync"
"time"
)
type SafeTaskMap struct {
mu sync.RWMutex
tasks map[string]*taskEntry
}
type taskEntry struct {
mu sync.Mutex
done bool
waiters []chan struct{}
}
func NewSafeTaskMap() *SafeTaskMap {
return &SafeTaskMap{
tasks: make(map[string]*taskEntry),
}
}
func (stm *SafeTaskMap) Execute(taskID string, workerID int) {
// 快速路径:只读锁检查
stm.mu.RLock()
entry, exists := stm.tasks[taskID]
stm.mu.RUnlock()
if !exists {
// 需要创建新条目
stm.mu.Lock()
// 双重检查
if entry, exists = stm.tasks[taskID]; !exists {
entry = &taskEntry{}
stm.tasks[taskID] = entry
}
stm.mu.Unlock()
}
// 锁定具体任务
entry.mu.Lock()
if entry.done {
entry.mu.Unlock()
fmt.Printf("Worker %d: Task %s already completed\n", workerID, taskID)
return
}
// 执行任务
fmt.Printf("Worker %d: Starting task %s\n", workerID, taskID)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Worker %d: Completed task %s\n", workerID, taskID)
entry.done = true
entry.mu.Unlock()
}
func main() {
stm := NewSafeTaskMap()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task := fmt.Sprintf("task%d", id%4)
stm.Execute(task, id)
}(i)
}
wg.Wait()
}
推荐方案1,它结合了sync.Map的并发安全特性和互斥锁的任务级保护,避免了全局锁的瓶颈。每个任务有自己的互斥锁,不同任务可以并行执行,相同任务会串行执行,且已完成的任务会快速返回。


