Golang中如何实现这段代码的并发/并行处理

Golang中如何实现这段代码的并发/并行处理 我正在用Go语言实现一个我在其他多种语言中已经完成过的算法。 我首先完成了一个单线程的Go版本,以确保它能正常工作。 现在我想做一个多线程版本,就像我在其他语言中做的那样。 我使用的是Go 1.16。

以下是我需要使其多线程化的单线程Go代码。

sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)

for i, r_hi := range restwins { // sieve twinpair restracks
   l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num,
                       modpg, primes, resinvrs)
   lastwins[i] = l; sums[i] = c
   fmt.Printf("\r%d of %d twinpairs done", (i + 1), pairscnt)
}

这里我使用了一个goroutine来使其多线程化。 它可以编译,但似乎只执行了一次,并且给出了错误的数值结果。

sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)

for i, r_hi := range restwins {
   go func() {
      l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num,
                          modpg, primes, resinvrs)
      lastwins[i] = l; sums[i] = c
      fmt.Printf("\r%d of %d twinpairs done", (i + 1), pairscnt)
   }()
}

这是Crystal语言中相同(可工作)的代码片段,它具有类似的并发模型。

sums = Array(UInt64).new(pairscnt, 0)
lastwins = Array(UInt64).new(pairscnt, 0)
done = Channel(Nil).new(pairscnt)

restwins.each_with_index do |r_hi, i|
  spawn do
    lastwins[i], sums[i] = twins_sieve(r_hi, kmin, kmax, kb, start_num,
                           end_num, modpg, primes, resinvrs)
    print "\r#{i + 1} of #{pairscnt} twinpairs done"
    done.send(nil)
end end
pairscnt.times { done.receive }

这是Rust的代码片段,它实现了真正的并行执行,并且是所有版本中最快的。

let (lastwins, sums): (Vec<_>, Vec<_>) = {
  let counter = RelaxedCounter::new();
  restwins.par_iter().map( |r_hi| {
    let out = twins_sieve(*r_hi, kmin, kmax, kb, start_num, end_num,
                          modpg, &primes.to_vec(), &resinvrs);
    print!("\r{} of {} twinpairs done", counter.increment(), pairscnt);
    out
  }).unzip()
};

我希望Go专家能告诉我需要做什么才能使代码正确执行。


更多关于Golang中如何实现这段代码的并发/并行处理的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

两种技术都能编译,使用 htop 可以看到线程在工作。 然而,twins_sieve 的输出值并未存储到数组中, 因此我无法获得正确的输出值。

更多关于Golang中如何实现这段代码的并发/并行处理的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


感谢 @petrus,这个方法可行。以下是完整的代码段。

sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)

var wg sync.WaitGroup
for i, r_hi := range restwins {
  wg.Add(1)
  go func(i, r_hi int) {
    defer wg.Done()
    l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num, modpg, primes, resinvrs)
    lastwins[i] = l; sums[i] = c
    fmt.Printf("\r%d of %d twinpairs done", (i + 1), pairscnt)
  }(i, r_hi)
}
wg.Wait()
fmt.Printf("\r%d of %d twinpairs done", pairscnt, pairscnt)

供参考,以下是单线程与多线程版本的一些数据;环境为 Linux,I7-3.5GHz,8 线程。

     输入值        |   单线程版本    |   多线程版本
__________________|_________________|_______________
  100_000_000_000 |     29.1 秒     |     5.8 秒
  500_000_000_000 |    145.6 秒     |    29.9 秒
1_000_000_000_000 |    316.9 秒     |    59.9 秒

这些时间是在“嘈杂”的系统环境下测得的,同时还有浏览器播放视频等其他任务在进行。 但它显示了不同版本之间的相对差异。

不过,对于此算法,能够实现真正并行的 Rust、Nim 等语言要快得多。

问题出在这里:

for i, r_hi := range restwins {
   go func() {
      // 使用 i 和 r_hi
   }()
}

在 Go 语言中,闭包函数对变量的访问是“通过引用”的。for 循环为 restwins 中的每个元素创建一个新的 goroutine,但这些 goroutine 共享实际的 ir_hi 变量,因此你可能会遇到以下情况:

  • 你在循环中创建的 goroutine 可能直到循环结束后才真正被调度和启动。如果发生这种情况,那么所有的 goroutine 看到的都是 ir_hi 的最终值。
  • 也许 goroutine 是分批执行的,你会看到其中一部分 goroutine 观察到 i 的某个值,而另一部分 goroutine 观察到 i 的另一个值,等等。

解决方案是:

  • ir_hi 作为参数传递给你的内部 goroutine 函数:

    for i, r_hi := range restwins {
       go func(i, r_hi uint) {
          // ...
       }(i, r_hi)
    }
    

    这样做的好处是,如果你不需要闭包捕获任何其他值,你的函数会变成一个“普通”函数,无需闭包可能带来的额外处理开销。

  • 或者在函数外部复制变量:

    for i, r_hi := range restwins {
       i, r_hi := i, r_hi
       go func() {
          // ...
       }()
    }
    

    这样做的好处是,你不必重复类型名称,因此,如果你将来需要更改类型,例如从 uint 改为 uint64,你无需更改函数参数中的类型。

jzakiya: 我没有得到正确的输出值。


Go语言之旅

Go编程语言规范

《Go编程语言》,Alan A. A. Donovan · Brian W. Kernighan

Go博客:并发不是并行


package main

import (
	"fmt"
	"sync"
)

func main() {
	pairscnt := 7
	restwins := make([]uint, pairscnt)
	sums := make([]uint, pairscnt)
	lastwins := make([]uint, pairscnt)

	var wg sync.WaitGroup
	for i, r_hi := range restwins {
		wg.Add(1)
		go func(i int, r_hi uint) {
			defer wg.Done()
			l, c := uint(i), uint(r_hi+1)
			lastwins[i] = l
			sums[i] = c
			fmt.Printf("\r%d of %d twinpairs done", (i + 1), pairscnt)
		}(i, r_hi)
	}
	wg.Wait()
	fmt.Println()
	fmt.Println(lastwins, sums)
}

Go Playground - The Go Programming Language

7 of 7 twinpairs done 3 of 7 twinpairs done 2 of 7 twinpairs done 5 of 7 twinpairs done 4 of 7 twinpairs done 6 of 7 twinpairs done 1 of 7 twinpairs done [0 1 2 3 4 5 6] [1 1 1 1 1 1 1]

虽然这段代码可以运行,但它可能不是最优的。

最快的素数筛法,在Nim中

在Go中实现并发处理时,需要解决两个关键问题:goroutine间的数据竞争和等待所有goroutine完成。以下是修正后的代码:

sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
var wg sync.WaitGroup

for i, r_hi := range restwins {
    wg.Add(1)
    go func(idx int, r_hi uint) {
        defer wg.Done()
        l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num,
                          modpg, primes, resinvrs)
        lastwins[idx] = l
        sums[idx] = c
        fmt.Printf("\r%d of %d twinpairs done", (idx + 1), pairscnt)
    }(i, r_hi)
}
wg.Wait()

如果需要避免数据竞争,可以使用带互斥锁的版本:

sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
var wg sync.WaitGroup
var mu sync.Mutex

for i, r_hi := range restwins {
    wg.Add(1)
    go func(idx int, r_hi uint) {
        defer wg.Done()
        l, c := twins_sieve(r_hi, kmin, kmax, kb, start_num, end_num,
                          modpg, primes, resinvrs)
        mu.Lock()
        lastwins[idx] = l
        sums[idx] = c
        mu.Unlock()
        fmt.Printf("\r%d of %d twinpairs done", (idx + 1), pairscnt)
    }(i, r_hi)
}
wg.Wait()

对于更高效的并行处理,可以使用工作池模式:

sums := make([]uint, pairscnt)
lastwins := make([]uint, pairscnt)
var wg sync.WaitGroup
jobs := make(chan struct {
    idx int
    r_hi uint
}, len(restwins))
results := make(chan struct {
    idx int
    l   uint
    c   uint
}, len(restwins))

// 启动工作goroutine
for w := 0; w < runtime.NumCPU(); w++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        for job := range jobs {
            l, c := twins_sieve(job.r_hi, kmin, kmax, kb, start_num, end_num,
                              modpg, primes, resinvrs)
            results <- struct {
                idx int
                l   uint
                c   uint
            }{job.idx, l, c}
        }
    }()
}

// 发送任务
for i, r_hi := range restwins {
    jobs <- struct {
        idx int
        r_hi uint
    }{i, r_hi}
}
close(jobs)

// 收集结果
go func() {
    wg.Wait()
    close(results)
}()

// 处理结果
completed := 0
for result := range results {
    lastwins[result.idx] = result.l
    sums[result.idx] = result.c
    completed++
    fmt.Printf("\r%d of %d twinpairs done", completed, pairscnt)
}

这些实现解决了goroutine闭包捕获循环变量的问题,并确保所有goroutine完成后再继续执行。

回到顶部