Golang中实现顺序非阻塞并发的模式探讨
Golang中实现顺序非阻塞并发的模式探讨 你好,
我正在开发一个项目,需要能够快速连续多次调用一个函数,该函数将执行一个相当耗时的操作,要求是非阻塞的,并且按照相同的调用顺序完成。
例如,假设有一个函数从0计数到to,其中to是一个给定的整数参数,并且对于每个数字,它还会尝试并发地计算斐波那契数列。
func count(to int) {
for n := 0; n < to; n++ {
fmt.Printf("Count: %d", n)
fib(n)
}
fmt.Printf("Finished Counting to %d.", to)
}
func fib(n int) {
// calculate fibonacci sequence to `n`
fmt.Printf("Fibonacci sequence of %d is %d.", n, answer)
}
我想要实现的输出如下,其中to为5。
Count: 0
Count: 1
Count: 2
Count: 3
Count: 4
Finished Counting to 5.
Fibonacci sequence of 0 is 0.
Fibonacci sequence of 1 is 1.
Fibonacci sequence of 2 is 1.
Fibonacci sequence of 3 is 2.
Fibonacci sequence of 4 is 3.
我对Go的并发知识了解不深,但我最初的方法是使用sync.WaitGroup来计算斐波那契数列。类似这样:
func fib(n int) {
wg.Add(1)
go func(n int) {
// calculate fibonacci sequence to `n`
wg.Done()
}(n)
wg.Wait()
}
虽然这确实保持了调用顺序,但WaitGroup是多余的,因为wg.Wait()只是阻塞,与同步调用函数相比没有任何优势。
我的下一个方法是使用通道,类似于以下内容。
type Counter struct {
NumbersIn chan int
}
func (counter *Counter) Start() {
defer close(counter.NumbersIn)
for number := range counter.NumbersIn {
fmt.Printf("Count: %d", n)
wg.Add(1)
go func(n int) {
fib(n)
wg.Done()
}()
wg.Wait()
}
}
func (counter *Counter) Count(to int) {
for n := 0; n < to; n++ {
counter.NumbersIn <- n
}
fmt.Printf("Finished Counting to %d.", to)
}
虽然这确实保持了调用顺序,但和上面的尝试一样,它在wg.Wait()处阻塞了。
我相信,解决方案是管道模式的某种变体。但我在应用它时遇到了困难,无法避免阻塞。
感谢任何帮助!
更多关于Golang中实现顺序非阻塞并发的模式探讨的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang中实现顺序非阻塞并发的模式探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Go中实现顺序非阻塞并发,可以使用带缓冲的通道配合goroutine来分离任务的提交和执行。以下是实现方案:
package main
import (
"fmt"
"sync"
)
type OrderedExecutor struct {
tasks chan task
wg sync.WaitGroup
}
type task struct {
n int
}
func NewOrderedExecutor(bufferSize int) *OrderedExecutor {
e := &OrderedExecutor{
tasks: make(chan task, bufferSize),
}
e.start()
return e
}
func (e *OrderedExecutor) start() {
e.wg.Add(1)
go func() {
defer e.wg.Done()
for task := range e.tasks {
fib(task.n)
}
}()
}
func (e *OrderedExecutor) Submit(n int) {
e.tasks <- task{n: n}
}
func (e *OrderedExecutor) Close() {
close(e.tasks)
e.wg.Wait()
}
func count(to int, executor *OrderedExecutor) {
for n := 0; n < to; n++ {
fmt.Printf("Count: %d\n", n)
executor.Submit(n)
}
fmt.Printf("Finished Counting to %d.\n", to)
}
func fib(n int) {
answer := fibonacci(n)
fmt.Printf("Fibonacci sequence of %d is %d.\n", n, answer)
}
func fibonacci(n int) int {
if n <= 1 {
return n
}
return fibonacci(n-1) + fibonacci(n-2)
}
func main() {
executor := NewOrderedExecutor(10)
count(5, executor)
executor.Close()
}
输出结果:
Count: 0
Count: 1
Count: 2
Count: 3
Count: 4
Finished Counting to 5.
Fibonacci sequence of 0 is 0.
Fibonacci sequence of 1 is 1.
Fibonacci sequence of 2 is 1.
Fibonacci sequence of 3 is 2.
Fibonacci sequence of 4 is 3.
如果需要保持严格的顺序执行但允许并发提交,可以使用带顺序保证的worker池:
package main
import (
"fmt"
"sync"
)
type SequentialWorker struct {
jobs chan job
results chan result
wg sync.WaitGroup
}
type job struct {
id int
n int
}
type result struct {
id int
n int
ans int
}
func NewSequentialWorker(workerCount int) *SequentialWorker {
w := &SequentialWorker{
jobs: make(chan job, 100),
results: make(chan result, 100),
}
for i := 0; i < workerCount; i++ {
w.wg.Add(1)
go w.worker(i)
}
go w.collectResults()
return w
}
func (w *SequentialWorker) worker(id int) {
defer w.wg.Done()
for job := range w.jobs {
ans := fibonacci(job.n)
w.results <- result{id: job.id, n: job.n, ans: ans}
}
}
func (w *SequentialWorker) collectResults() {
results := make(map[int]result)
nextID := 0
for res := range w.results {
results[res.id] = res
for {
if r, ok := results[nextID]; ok {
fmt.Printf("Fibonacci sequence of %d is %d.\n", r.n, r.ans)
delete(results, nextID)
nextID++
} else {
break
}
}
}
}
func (w *SequentialWorker) Submit(id int, n int) {
w.jobs <- job{id: id, n: n}
}
func (w *SequentialWorker) Close() {
close(w.jobs)
w.wg.Wait()
close(w.results)
}
func main() {
worker := NewSequentialWorker(3)
for i := 0; i < 5; i++ {
fmt.Printf("Count: %d\n", i)
worker.Submit(i, i)
}
fmt.Printf("Finished Counting to 5.\n")
worker.Close()
}
这个实现确保了:
count()函数非阻塞执行- 斐波那契计算并发执行
- 结果按提交顺序输出
- 通过缓冲通道避免阻塞主流程

