Golang中实现顺序非阻塞并发的模式探讨

Golang中实现顺序非阻塞并发的模式探讨 你好,

我正在开发一个项目,需要能够快速连续多次调用一个函数,该函数将执行一个相当耗时的操作,要求是非阻塞的,并且按照相同的调用顺序完成。

例如,假设有一个函数从0计数到to,其中to是一个给定的整数参数,并且对于每个数字,它还会尝试并发地计算斐波那契数列。

func count(to int) {
    for n := 0; n < to; n++ {
        fmt.Printf("Count: %d", n)

        fib(n)
    }
    fmt.Printf("Finished Counting to %d.", to)
}

func fib(n int) {
    // calculate fibonacci sequence to `n`

    fmt.Printf("Fibonacci sequence of %d is %d.", n, answer)
}

我想要实现的输出如下,其中to为5。

Count: 0
Count: 1
Count: 2
Count: 3
Count: 4
Finished Counting to 5.
Fibonacci sequence of 0 is 0.
Fibonacci sequence of 1 is 1.
Fibonacci sequence of 2 is 1.
Fibonacci sequence of 3 is 2.
Fibonacci sequence of 4 is 3.

我对Go的并发知识了解不深,但我最初的方法是使用sync.WaitGroup来计算斐波那契数列。类似这样:

func fib(n int) {
    wg.Add(1)

    go func(n int) {
        // calculate fibonacci sequence to `n`
       wg.Done()
    }(n) 

    wg.Wait()
}

虽然这确实保持了调用顺序,但WaitGroup是多余的,因为wg.Wait()只是阻塞,与同步调用函数相比没有任何优势。

我的下一个方法是使用通道,类似于以下内容。

type Counter struct {
    NumbersIn chan int
}

func (counter *Counter) Start() {
    defer close(counter.NumbersIn)

    for number := range counter.NumbersIn {
        fmt.Printf("Count: %d", n)
     
        wg.Add(1)

        go func(n int) {
            fib(n)
            wg.Done()
        }()

        wg.Wait()
    }
}

func (counter *Counter) Count(to int) {
    for n := 0; n < to; n++ {
        counter.NumbersIn <- n
    }

    fmt.Printf("Finished Counting to %d.", to)
}

虽然这确实保持了调用顺序,但和上面的尝试一样,它在wg.Wait()处阻塞了。

我相信,解决方案是管道模式的某种变体。但我在应用它时遇到了困难,无法避免阻塞。

感谢任何帮助!


更多关于Golang中实现顺序非阻塞并发的模式探讨的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中实现顺序非阻塞并发的模式探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中实现顺序非阻塞并发,可以使用带缓冲的通道配合goroutine来分离任务的提交和执行。以下是实现方案:

package main

import (
	"fmt"
	"sync"
)

type OrderedExecutor struct {
	tasks chan task
	wg    sync.WaitGroup
}

type task struct {
	n int
}

func NewOrderedExecutor(bufferSize int) *OrderedExecutor {
	e := &OrderedExecutor{
		tasks: make(chan task, bufferSize),
	}
	e.start()
	return e
}

func (e *OrderedExecutor) start() {
	e.wg.Add(1)
	go func() {
		defer e.wg.Done()
		for task := range e.tasks {
			fib(task.n)
		}
	}()
}

func (e *OrderedExecutor) Submit(n int) {
	e.tasks <- task{n: n}
}

func (e *OrderedExecutor) Close() {
	close(e.tasks)
	e.wg.Wait()
}

func count(to int, executor *OrderedExecutor) {
	for n := 0; n < to; n++ {
		fmt.Printf("Count: %d\n", n)
		executor.Submit(n)
	}
	fmt.Printf("Finished Counting to %d.\n", to)
}

func fib(n int) {
	answer := fibonacci(n)
	fmt.Printf("Fibonacci sequence of %d is %d.\n", n, answer)
}

func fibonacci(n int) int {
	if n <= 1 {
		return n
	}
	return fibonacci(n-1) + fibonacci(n-2)
}

func main() {
	executor := NewOrderedExecutor(10)
	count(5, executor)
	executor.Close()
}

输出结果:

Count: 0
Count: 1
Count: 2
Count: 3
Count: 4
Finished Counting to 5.
Fibonacci sequence of 0 is 0.
Fibonacci sequence of 1 is 1.
Fibonacci sequence of 2 is 1.
Fibonacci sequence of 3 is 2.
Fibonacci sequence of 4 is 3.

如果需要保持严格的顺序执行但允许并发提交,可以使用带顺序保证的worker池:

package main

import (
	"fmt"
	"sync"
)

type SequentialWorker struct {
	jobs    chan job
	results chan result
	wg      sync.WaitGroup
}

type job struct {
	id  int
	n   int
}

type result struct {
	id  int
	n   int
	ans int
}

func NewSequentialWorker(workerCount int) *SequentialWorker {
	w := &SequentialWorker{
		jobs:    make(chan job, 100),
		results: make(chan result, 100),
	}
	
	for i := 0; i < workerCount; i++ {
		w.wg.Add(1)
		go w.worker(i)
	}
	
	go w.collectResults()
	return w
}

func (w *SequentialWorker) worker(id int) {
	defer w.wg.Done()
	for job := range w.jobs {
		ans := fibonacci(job.n)
		w.results <- result{id: job.id, n: job.n, ans: ans}
	}
}

func (w *SequentialWorker) collectResults() {
	results := make(map[int]result)
	nextID := 0
	
	for res := range w.results {
		results[res.id] = res
		
		for {
			if r, ok := results[nextID]; ok {
				fmt.Printf("Fibonacci sequence of %d is %d.\n", r.n, r.ans)
				delete(results, nextID)
				nextID++
			} else {
				break
			}
		}
	}
}

func (w *SequentialWorker) Submit(id int, n int) {
	w.jobs <- job{id: id, n: n}
}

func (w *SequentialWorker) Close() {
	close(w.jobs)
	w.wg.Wait()
	close(w.results)
}

func main() {
	worker := NewSequentialWorker(3)
	
	for i := 0; i < 5; i++ {
		fmt.Printf("Count: %d\n", i)
		worker.Submit(i, i)
	}
	
	fmt.Printf("Finished Counting to 5.\n")
	worker.Close()
}

这个实现确保了:

  1. count()函数非阻塞执行
  2. 斐波那契计算并发执行
  3. 结果按提交顺序输出
  4. 通过缓冲通道避免阻塞主流程
回到顶部