Golang中如何实现这段代码的并发/并行处理
Golang中如何实现这段代码的并发/并行处理 我正在用Go语言实现一个我在其他多种语言中已经完成过的算法。 我首先完成了一个单线程的Go版本,以确保它能正常工作。 现在我想做一个多线程版本,就像我在其他语言中做的那样。 我使用的是Go 1.16。
以下是我需要使其多线程化的单线程Go代码。
sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
for i, r_hi := range restwins { // sieve twinpair restracks
l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num,
modpg, primes, resinvrs)
lastwins[i] = l; sums[i] = c
fmt.Printf("\r%d of %d twinpairs done", (i + 1), pairscnt)
}
这里我使用了一个goroutine来使其多线程化。
它可以编译,但似乎只执行了一次,并且给出了错误的数值结果。
sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
for i, r_hi := range restwins {
go func() {
l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num,
modpg, primes, resinvrs)
lastwins[i] = l; sums[i] = c
fmt.Printf("\r%d of %d twinpairs done", (i + 1), pairscnt)
}()
}
这是Crystal语言中相同(可工作)的代码片段,它具有类似的并发模型。
sums = Array(UInt64).new(pairscnt, 0)
lastwins = Array(UInt64).new(pairscnt, 0)
done = Channel(Nil).new(pairscnt)
restwins.each_with_index do |r_hi, i|
spawn do
lastwins[i], sums[i] = twins_sieve(r_hi, kmin, kmax, kb, start_num,
end_num, modpg, primes, resinvrs)
print "\r#{i + 1} of #{pairscnt} twinpairs done"
done.send(nil)
end end
pairscnt.times { done.receive }
这是Rust的代码片段,它实现了真正的并行执行,并且是所有版本中最快的。
let (lastwins, sums): (Vec<_>, Vec<_>) = {
let counter = RelaxedCounter::new();
restwins.par_iter().map( |r_hi| {
let out = twins_sieve(*r_hi, kmin, kmax, kb, start_num, end_num,
modpg, &primes.to_vec(), &resinvrs);
print!("\r{} of {} twinpairs done", counter.increment(), pairscnt);
out
}).unzip()
};
我希望Go专家能告诉我需要做什么才能使代码正确执行。
更多关于Golang中如何实现这段代码的并发/并行处理的实战教程也可以访问 https://www.itying.com/category-94-b0.html
两种技术都能编译,使用 htop 可以看到线程在工作。
然而,twins_sieve 的输出值并未存储到数组中,
因此我无法获得正确的输出值。
更多关于Golang中如何实现这段代码的并发/并行处理的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
感谢 @petrus,这个方法可行。以下是完整的代码段。
sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
var wg sync.WaitGroup
for i, r_hi := range restwins {
wg.Add(1)
go func(i, r_hi int) {
defer wg.Done()
l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num, modpg, primes, resinvrs)
lastwins[i] = l; sums[i] = c
fmt.Printf("\r%d of %d twinpairs done", (i + 1), pairscnt)
}(i, r_hi)
}
wg.Wait()
fmt.Printf("\r%d of %d twinpairs done", pairscnt, pairscnt)
供参考,以下是单线程与多线程版本的一些数据;环境为 Linux,I7-3.5GHz,8 线程。
输入值 | 单线程版本 | 多线程版本
__________________|_________________|_______________
100_000_000_000 | 29.1 秒 | 5.8 秒
500_000_000_000 | 145.6 秒 | 29.9 秒
1_000_000_000_000 | 316.9 秒 | 59.9 秒
这些时间是在“嘈杂”的系统环境下测得的,同时还有浏览器播放视频等其他任务在进行。 但它显示了不同版本之间的相对差异。
不过,对于此算法,能够实现真正并行的 Rust、Nim 等语言要快得多。
问题出在这里:
for i, r_hi := range restwins {
go func() {
// 使用 i 和 r_hi
}()
}
在 Go 语言中,闭包函数对变量的访问是“通过引用”的。for 循环为 restwins 中的每个元素创建一个新的 goroutine,但这些 goroutine 共享实际的 i 和 r_hi 变量,因此你可能会遇到以下情况:
- 你在循环中创建的 goroutine 可能直到循环结束后才真正被调度和启动。如果发生这种情况,那么所有的 goroutine 看到的都是
i和r_hi的最终值。 - 也许 goroutine 是分批执行的,你会看到其中一部分 goroutine 观察到
i的某个值,而另一部分 goroutine 观察到i的另一个值,等等。
解决方案是:
-
将
i和r_hi作为参数传递给你的内部 goroutine 函数:for i, r_hi := range restwins { go func(i, r_hi uint) { // ... }(i, r_hi) }这样做的好处是,如果你不需要闭包捕获任何其他值,你的函数会变成一个“普通”函数,无需闭包可能带来的额外处理开销。
-
或者在函数外部复制变量:
for i, r_hi := range restwins { i, r_hi := i, r_hi go func() { // ... }() }这样做的好处是,你不必重复类型名称,因此,如果你将来需要更改类型,例如从
uint改为uint64,你无需更改函数参数中的类型。
jzakiya: 我没有得到正确的输出值。
《Go编程语言》,Alan A. A. Donovan · Brian W. Kernighan
package main
import (
"fmt"
"sync"
)
func main() {
pairscnt := 7
restwins := make([]uint, pairscnt)
sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
var wg sync.WaitGroup
for i, r_hi := range restwins {
wg.Add(1)
go func(i int, r_hi uint) {
defer wg.Done()
l, c := uint(i), uint(r_hi+1)
lastwins[i] = l
sums[i] = c
fmt.Printf("\r%d of %d twinpairs done", (i + 1), pairscnt)
}(i, r_hi)
}
wg.Wait()
fmt.Println()
fmt.Println(lastwins, sums)
}
7 of 7 twinpairs done 3 of 7 twinpairs done 2 of 7 twinpairs done 5 of 7 twinpairs done 4 of 7 twinpairs done 6 of 7 twinpairs done 1 of 7 twinpairs done [0 1 2 3 4 5 6] [1 1 1 1 1 1 1]
虽然这段代码可以运行,但它可能不是最优的。
在Go中实现并发处理时,需要解决两个关键问题:goroutine间的数据竞争和等待所有goroutine完成。以下是修正后的代码:
sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
var wg sync.WaitGroup
for i, r_hi := range restwins {
wg.Add(1)
go func(idx int, r_hi uint) {
defer wg.Done()
l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num,
modpg, primes, resinvrs)
lastwins[idx] = l
sums[idx] = c
fmt.Printf("\r%d of %d twinpairs done", (idx + 1), pairscnt)
}(i, r_hi)
}
wg.Wait()
如果需要避免数据竞争,可以使用带互斥锁的版本:
sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
var wg sync.WaitGroup
var mu sync.Mutex
for i, r_hi := range restwins {
wg.Add(1)
go func(idx int, r_hi uint) {
defer wg.Done()
l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num,
modpg, primes, resinvrs)
mu.Lock()
lastwins[idx] = l
sums[idx] = c
mu.Unlock()
fmt.Printf("\r%d of %d twinpairs done", (idx + 1), pairscnt)
}(i, r_hi)
}
wg.Wait()
对于更高效的并行处理,可以使用工作池模式:
sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
var wg sync.WaitGroup
jobs := make(chan struct {
idx int
r_hi uint
}, len(restwins))
results := make(chan struct {
idx int
l uint
c uint
}, len(restwins))
// 启动工作goroutine
for w := 0; w < runtime.NumCPU(); w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
l, c := twins_sieve(job.r_hi, kmin, kmax, kb, start_num, end_num,
modpg, primes, resinvrs)
results <- struct {
idx int
l uint
c uint
}{job.idx, l, c}
}
}()
}
// 发送任务
for i, r_hi := range restwins {
jobs <- struct {
idx int
r_hi uint
}{i, r_hi}
}
close(jobs)
// 收集结果
go func() {
wg.Wait()
close(results)
}()
// 处理结果
completed := 0
for result := range results {
lastwins[result.idx] = result.l
sums[result.idx] = result.c
completed++
fmt.Printf("\r%d of %d twinpairs done", completed, pairscnt)
}
这些实现解决了goroutine闭包捕获循环变量的问题,并确保所有goroutine完成后再继续执行。


