Python中taskflow使用指南
taskflow 是 openstack 未确保冗长的系列操作能正确执行的工作流框架
主要应用场景有如 Cinder 的 create volume 这般复杂、冗长、容易失败, 却又要求保持数据与环境一致的业务逻辑.
由于适用场景非常多,例如游戏行业的常见操作
停服 备份数据库 升级数据库 升级应用 启动服务器
非常适合用工作流引擎来确保执行
这个由 openstack 编写的 taskflow 国内相关介绍非常少,官方用户指南内容太多,导致学习这个框架花费需要花费非常多的时间
这里将会较为详细的介绍 taskflow 的代码和使用
百度有一篇中文的相关介绍,作者对 taskflow 还是很熟的,但是写得太简要,直接看感觉明白了实际还是一头雾水
要看懂 taskflow,需要先熟悉两个最基本的概念
-
工作流 这玩意最直观的一个应用就是 oa 里审批,上面我们说的游戏业的常见操作也可以作为例子
停服 备份数据库 升级数据库 升级应用 启动服务器
上述操作也是一个工作流, 停服成功才能备份,备份成功才能升级 升级应用和备份可以同时进行,升级数据库和升级应用都成功了才能启动服务器 升级失败还要回滚
-
有限状态机 推举导读. 工作流就是用状态机来实现流程控制和执行任务的
上面两条光看懂了还不行,还要写过一轮深入了解,taskflow 使用了 openstack 的通用状态机项目,所以要能看 taskflow 先要熟悉 openstack 的状态机项目 Automaton
https://pypi.python.org/pypi/automaton/
为了熟悉状态机项目,我用状态机写了一个tailfile项目,作用就是实现 tail -f 的效果,倒读 N 行和判断文件是否变化都用到了状态机,
因为状态和事件都比较少,用状态机写这些功能有点杀鸡用牛刀,主要是为了熟悉状态机才专门用 automaton 来写
熟悉玩 automaton 以后,可以开始来看 taskflow(基于最新版本 2.14)了,
首先,因为 taskflow 作为通用项目而不是 openstack 内部项目,所以 taskflow 没有使用 oslo 中的一些通用工具
而且里面同样为了兼容写了一些复杂的接口还用了 futurist 实现异步,我为了和自己的其他几个项目统一起来,基于 taskflow2.14 生成了一个简化过的项目
https://github.com/lolizeppelin/simpleflow
代码变化了几个如下部分
- 日志用回 oslo_log 的方式
- 删除了 sqlalchemy 以外的存储方式,table 的和所有 sql expression 都改为 orm 方式(只支持 mysql ),后来我熟悉 taskflow 以后发现删除其他 backends 只保留 mysql 不是一个正确做法,有缺陷,后续会说明为什么
- 因为删除了 sqlalchemy 以外的存储方式, 所以 persitence 中 backends 就么有存在的必要了,直接传 sqlalchemy 的 session 即可
- persitence 中的内容和 storage 合并
- jobs 和 conductors 删除,这个部分可以和主要的 taskflow 功能无关
- Executor 不使用 futurist,自己写了一个简化的 futurist,代码兼容原来的写法,只支持协程,删除多线程和多进程相关支持
- 读写锁业复制了过来,用协程实现
- 砍掉了 worker based 的 Engine
因为极大的简化了 futurist 和删掉了 backends 层所以比较好读一些,读代码的时候建议看我简化后的项目
我们先来看 taskflow 的几个主要单位
-
Engine
Engine 是 taskflow 是启动口, 主要工作 创建状态机, 循环状态机, 在指定状态下通过 Executor 执行任务
Engine 分为好几种 work based 的 Engine 比较特殊我们不看,直接看 action engine
几种 action engine 其实没有什么区别,通过 Executor 分为
- 序列化(阻塞,顺序执行)引擎
- 基于线程的并行引擎
- 基于协程的并行引擎
- 基于多进程的并行引擎
并行引擎的优势是,当个任务没有顺序关联的情况下可以同时执行多个任务 当然,引擎不影响任务之间的顺序关系,除非你想强制一个一个任务执行,否则都应该使用并行引擎
Executor 的实现可以参考我简化过的futurist 因为是用 eventlet 实现的,所以需要熟悉 eventlet
-
Atom 明天继续
Python中taskflow使用指南
赞
我无法理解你的问题
好别扭…有点文字问题都不能修正…不想发了…
赞,可以发到自己的博客
class CallJoe(task.Task):
def execute(self, joe_number, *args, **kwargs):
print(“CallJoe args”, args)
print(“CallJoe kwargs”, kwargs)
print(“Calling joe %s.” % joe_number)
这种定义的类, 想实现类似:
t = CallJoe()
t.execute(‘13911111111’, “foo”, “bar”, baz=“value_baz”, qux=“value_qux”)
找了好久也没摸索出怎么把这个 非关键字变长参数传进去. 还请楼主指教.
不行
为什么非要变长呢 打包到 list 或者 dict 里不好嘛


