T8、TaskFlow
官方文档:https://docs.openstack.org/taskflow/latest/
T8.1、TaskFlow 概念
概念 | 说明 |
---|---|
Task | 最小执行单元,定义 execute() 与 revert() |
Flow | 任务组合体,可为顺序、图、或无序 |
Engine | 执行调度器(控制执行、失败检测、回滚) |
Store | 参数传递的上下文(变量共享区) |
Retry | 重试策略(失败后控制逻辑) |
Persistence | 状态持久化,可恢复未完成 Flow |
T8.2、Flow 类型与依赖关系
Flow 类型 | 模块 | 执行顺序 | 回滚支持 | 特点 |
---|---|---|---|---|
linear_flow.Flow |
taskflow.patterns.linear_flow |
顺序执行 | ✅ | 最常用(线性依赖) |
graph_flow.Flow |
taskflow.patterns.graph_flow |
依赖图 | ✅ | 支持并行任务 |
unordered_flow.Flow |
taskflow.patterns.unordered_flow |
无序随机 | ✅ | 独立任务集合 |
retry |
taskflow.retry |
控制失败重试 | ✅ | 可组合使用 |
T8.2.1、Linear Flow(线性流)
最常见的任务链执行模型:
from taskflow import task
from taskflow.patterns import linear_flow
from taskflow import engines
class StepA(task.Task):
def execute(self):
print("执行 A")
def revert(self, **kwargs):
print("回滚 A")
class StepB(task.Task):
def execute(self):
print("执行 B")
raise Exception("B 失败")
def revert(self, **kwargs):
print("回滚 B")
flow = linear_flow.Flow("linear").add(
StepA(name="step_A"),
StepB(name="step_B")
)
engine = engines.load(flow)
engine.run()
T8.2.2、Graph Flow(依赖图流)
定义任务依赖,实现并行与拓扑调度:
from taskflow import task
from taskflow.patterns import graph_flow
from taskflow import engines
class A(task.Task):
def execute(self):
print("A 执行")
class B(task.Task):
def execute(self):
print("B 执行")
class C(task.Task):
def execute(self):
print("C 执行")
ta, tb, tc = A(name="A"), B(name="B"), C(name="C")
flow = graph_flow.Flow("graph")
flow.add(ta, tb, tc)
flow.link(ta, tb) # A → B
flow.link(tb, tc) # B → C
engine = engines.load(flow)
engine.run()
T8.2.3、Unordered Flow(无序流)
任务无依赖、执行顺序随机,但依旧支持回滚。
from taskflow import task
from taskflow.patterns import unordered_flow
from taskflow import engines
class Hello(task.Task):
def execute(self):
print("Hello")
def revert(self, **kwargs):
print("Hello 回滚")
class World(task.Task):
def execute(self):
print("World")
raise Exception("模拟失败")
def revert(self, **kwargs):
print("World 回滚")
flow = unordered_flow.Flow("unordered").add(
Hello(name="hello"),
World(name="world")
)
engine = engines.load(flow)
engine.run()
T8.2.4、企业级示例(模仿 OpenStack Cinder)
from taskflow import task, engines
from taskflow.patterns import linear_flow as lf
class CreateDBRecord(task.Task):
def execute(self, volume_id):
print(f"创建数据库记录 for {volume_id}")
db_record = {"id": volume_id, "status": "creating"}
return db_record # 返回对象
def revert(self, result, volume_id, **kwargs):
print(f"回滚数据库记录 for {volume_id}")
class CreateBackend(task.Task):
def execute(self, create_db_record):
print(f"在后端创建卷: {create_db_record['id']}")
raise Exception("模拟后端创建失败")
def revert(self, create_db_record, **kwargs):
print(f"回滚后端卷: {create_db_record['id']}")
class UpdateStatus(task.Task):
def execute(self, create_db_record):
print(f"更新卷状态为 available: {create_db_record['id']}")
create_db_record["status"] = "available"
def revert(self, create_db_record, **kwargs):
print(f"回滚状态更新: {create_db_record['id']}")
flow = lf.Flow("create-volume").add(
# 这里显式说:CreateDBRecord 提供名字 create_db_record
CreateDBRecord(name="create_db", provides="create_db_record"),
CreateBackend(name="create_backend"),
UpdateStatus(name="update_status")
)
engine = engines.load(flow, store={"volume_id": "vol-123"})
try:
engine.run()
except Exception as e:
print("捕获异常:", e)
T8.3、Retry(重试机制)
TaskFlow 内置多种 retry 策略:
策略类 | 说明 |
---|---|
retry.Times(n) |
重试 n 次 |
retry.AlwaysRevert() |
总是触发回滚 |
retry.ForEach(iterable) |
针对每个输入值尝试 |
from taskflow import task
from taskflow.patterns import linear_flow
from taskflow.retry import Times
from taskflow import engines
class Unstable(task.Task):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.count = 0
def execute(self):
self.count += 1
print(f"执行第 {self.count} 次")
if self.count < 3:
raise Exception("失败重试")
print("成功!")
flow = linear_flow.Flow("retry-flow", retry=Times(3))
flow.add(Unstable(name="unstable_task"))
engine = engines.load(flow)
engine.run()
T8.4、Store(上下文共享)
store
是 Flow 内任务共享参数的上下文。
from taskflow import task, engines
from taskflow.patterns import linear_flow as lf
class Add(task.Task):
def execute(self, x, y):
print(f"Add 执行: {x} + {y}")
return x + y
class Multiply(task.Task):
def execute(self, add_result):
print("Multiply 执行: 结果 =", add_result * 10)
# 构建线性流程
flow = lf.Flow("calc").add(
Add(name="add", provides="add_result"),
Multiply(name="multiply")
)
# 初始化引擎
engine = engines.load(flow, store={'x': 2, 'y': 3})
# 运行
engine.run()