Golang Go语言分享一个有趣的任务并发控制器

piaodazhu/gotcc是一个通用、简单而且好玩的并发控制器模块。它支持用户定义若干子任务,并且声明子任务之间存在的依赖关系,控制器将会在遵循依赖关系的前提下,并发执行这些子任务。

Github 求🌟: https://github.com/piaodazhu/gotcc

特性

一些有趣的特性:

  1. 根据用户声明的依赖关系,自动控制子任务的并发执行。
  2. 支持依赖逻辑表达式:与 /或 /非 /异或及其任意组合。比如任务 E 只有在 A 完成且 B 完成,且 C 或 D 不完成时才会启动,那么用这套逻辑表达式是可以直接定义的。
  3. 子任务之间多对多的结果传递。比如 C 依赖 A 和 B 的结果,那么 C 执行时的输入参数将会包括 A 和 B 各自的执行结果;如果 D 也依赖于 A ,那么 A 的结果也会传递给 D 一份。
  4. 支持对每一个任务都定义一个“撤销操作”,在异常时“回滚”。当任意子任务异常,所有执行中的任务将被取消,假设已经执行完成的任务序列为D->A->B,那么随后undo(B)->undo(A)->undo(D)将执行。
  5. 收集控制器执行过程中产生的多个错误:哪些子任务出错了、返回的是什么 error 、哪些 undo 操作出错了、返回的是什么 error ,都会被收集。

个人感觉,和手动控制 goroutine 之间的同步比起来,这个控制器有如下优势:

  1. 让程序更加清晰。用这个模块,可以让用户功能代码和并发控制代码分离开。由于依赖是预先声明的,在分析代码时,自然而然可以根据依赖声明画出子任务的流图,比混杂的代码更容易分析。
  2. 具有一定的动态性。声明依赖逻辑表达式只关注任务的逻辑关系,而一个任务执行还是不执行,是在运行时决定的。任务如果有确定性的先后顺序还好,如果任务是动态的,那么手动控制这些 goroutine 的执行流程可能就得写很多 if 语句。随着子任务的增多,复杂性也会越来越高。
  3. 支持回滚,可以在子任务执行失败时尽可能恢复所有子任务执行前的状态。当然这取决于任务的性质和用户自己对 undo 操作的定义,控制器只是尽力逆序执行 undo 操作。如果手动控制,一旦发现某个 goroutine 出错了,回滚可能是一件比较麻烦的事。

这个控制器目前也有不足,因为还没写几天,只写了几个基本的 Test ,可能存在没发现的 bug 。一些地方用了闭包、channelinterface{},甚至用map[string]interface{}来放参数表(目前只能想到这样的方法了)。虽然还没有做基准测试,但是可想而知会有一些的性能损耗。应该还有很多可以改进的地方,大家有兴趣可以试试来提 PR 。

起因

做这个小项目的起因是另一个在我的任务中,需要在不同节点上执行不同的命令,这些命令有的具有先后顺序有的则可以并行,一旦有一个命令失败我希望能让所有已经执行过的命令回滚(比如 insmod 对应 rmmod ,启动进程对应杀死进程、git commit 对应 git reset )。因为这个任务的背景不是在 K8s 上,想想还挺麻烦的。另一方面,这种具有依赖、支持回滚的并发控制应该不局限于眼前这个场景,于是干脆抽象出来,写了这样一个控制器的项目,之后可以直接拿来用。


Golang Go语言分享一个有趣的任务并发控制器

更多关于Golang Go语言分享一个有趣的任务并发控制器的实战教程也可以访问 https://www.itying.com/category-94-b0.html

11 回复

> Tasks will run concurrently, but taskB will not start until taskA completes, and taskC will not start until both taskA and taskB complete. But if taskC failed (return err!=nil), ExampleUndo(“BindArg-B”) will be executed.

这。。如果我没看错的话,这串行执行就可以了吧:
A()
B()
if err := C(); err != nil {
Undo()
}

这弄成并发任务管理器去做,是不是把本来简单的问题复杂化了。。

更多关于Golang Go语言分享一个有趣的任务并发控制器的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


确实,这例子脱裤放屁。README 是在 AI 生成基础上改的,当时没发现,我这就换个例子。。

企业级系统里,很多模组,很难预测它的执行时间、需要的资源,以及会不会崩溃。

所以直接在编程语言里,通过代码进行流程控制,这种系统做出来后,鲁棒性极低。

企业级的做法是,把任务管理、任务执行、任务监视,这几个功能,做成不同的模组单元来进行分离,确保一个模组崩溃后,不会影响另一个模块。

其次,用户任务的 CURD ,以及用户任务在执行中的状态更新,全用数据库来记录,来实现任何时候都能有据可查,实时可查。

多谢前辈指出,学习了🤝

两位前辈实际上是指出了这个并发控制器模块不适用的两种场景:
1. 任务依赖关系简单的场景。这种场景下应该直接串行化解决,不会增加手动管理的复杂度。此外,任务粒度小的场景也不适用,因为并发控制器内部开销相比于小任务自身的执行代价来说是得不偿失的。因此,对于依赖关系简单、任务粒度小的一些任务,可以合并视作一个大任务来解决。
2. 大型系统层面的任务调度的场景。大型系统要保证一个模块崩溃不影响其他模块,使用程序来做这样的保证是不可靠的。所以在这个层面,这个项目是不适用的。但对于大型系统下的小模组获取存在应用的场景。

个人感觉,这个模块的应用层面其实跟 Goroutine 相似,Goroutine 作为一种很强大的机制也是要在一定场景下才能有勇武之地,gotcc 可以作为 Goroutine 在某些场合下的扩展。

欢迎大家多来交流吐槽,相互学习!


百十来行随便写了个树状的并发流控:
https://gist.github.com/lesismal/6b397b12bb1e328395873a2c35a71af0

这个树状,基本就可以衍生出各种并发依赖的流程的控制了,而且直接结构体树状生命就可以,看上去可能清晰点。
并发度也是先执行最底层依赖、即叶子节点,每个 node 的叶子都执行完后、其中最后执行完的那个叶子协程复用继续执行该 node ,并依次层层向上直到根任务、整个任务树完成。如果想限制协程数量,把直接使用 go 创建协程的部分改成使用协程池就可以了。

只是简单示例,所以没加 context 之类的,而且实际情况中 context 的 timeout cancel 这些也都存在临界问题。
比如 sql ExecContext ,context 可能超时返回 error 了,但其实数据库已经收到了指令并且执行成功、只是返回的比 timeout 时间久了一点点。所以 context 这些并不能保证幂等性。
如果多个并发流的任务其中一个或者多个发生错误并已经触发了 rollback ,go 里也没法去用 context 或者其他方式强制其他协程中止流程,只能是执行到某一步时去判断了 context Done 或者状态才能提前中止,所以请不要在意失败示范中 rollback 后还打印了 4 的日志。

单进程内的事务也好,那些所谓的分布式事务也好,数据要求强一致的场景,基本都是要靠数据层的数据状态来做幂等性判定。
分布式事务的那些流程设计、方案设计,也几乎(或者更自信点,把这几乎这两个字去掉)没有能够百分百确保完全依靠代码实现执行成功或者回滚,往往需要人工流程手动操作。

是不是没做依赖循环检测,简单测了一下卡在那里了。

之前没考虑到,现在已经临时补上了。

学习了 :thumbup 👍

代码 append 了下,代码量更少了,也变强了:
1. 增加了 context
2. 去掉了 rollback 回调,因为类似 sql 那种 defer rollback()就可以了
3. 子任务 err 后就不再向上调用父任务了

虽然想不出实际场景哪里需要这样用,但是蛮有趣的

在Go语言中,实现一个有趣且实用的任务并发控制器,可以使用Go的goroutine和channel来优雅地管理并发任务。下面是一个简单的示例,展示了一个基本的任务并发控制器:

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, wg *sync.WaitGroup, jobs <-chan int) {
	defer wg.Done()
	for j := range jobs {
		fmt.Printf("Worker %d started job %d\n", id, j)
		time.Sleep(time.Second)
		fmt.Printf("Worker %d finished job %d\n", id, j)
	}
}

func main() {
	const numWorkers = 3
	const numJobs = 10

	jobs := make(chan int, numJobs)
	var wg sync.WaitGroup

	for w := 1; w <= numWorkers; w++ {
		wg.Add(1)
		go worker(w, &wg, jobs)
	}

	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)

	wg.Wait()
	fmt.Println("All jobs done.")
}

这个程序创建了一个固定数量的工作goroutine,每个goroutine从jobs channel中接收任务并执行。通过调整numWorkersnumJobs的值,你可以控制并发任务的数量和工作线程的数量。sync.WaitGroup用于等待所有工作goroutine完成。

这种并发控制器模式非常适合处理大量任务,同时限制同时运行的任务数量,从而避免资源过载。

回到顶部