T8、TaskFlow

作者: Brinnatt 分类: 小工具 发布时间: 2025-10-11 09:18

官方文档:https://docs.openstack.org/taskflow/latest/

T8.1、TaskFlow 概念

概念 说明
Task 最小执行单元,定义 execute()revert()
Flow 任务组合体,可为顺序、图、或无序
Engine 执行调度器(控制执行、失败检测、回滚)
Store 参数传递的上下文(变量共享区)
Retry 重试策略(失败后控制逻辑)
Persistence 状态持久化,可恢复未完成 Flow

T8.2、Flow 类型与依赖关系

Flow 类型 模块 执行顺序 回滚支持 特点
linear_flow.Flow taskflow.patterns.linear_flow 顺序执行 最常用(线性依赖)
graph_flow.Flow taskflow.patterns.graph_flow 依赖图 支持并行任务
unordered_flow.Flow taskflow.patterns.unordered_flow 无序随机 独立任务集合
retry taskflow.retry 控制失败重试 可组合使用

T8.2.1、Linear Flow(线性流)

最常见的任务链执行模型:

from taskflow import task
from taskflow.patterns import linear_flow
from taskflow import engines

class StepA(task.Task):
    def execute(self):
        print("执行 A")
    def revert(self, **kwargs):
        print("回滚 A")

class StepB(task.Task):
    def execute(self):
        print("执行 B")
        raise Exception("B 失败")
    def revert(self, **kwargs):
        print("回滚 B")

flow = linear_flow.Flow("linear").add(
    StepA(name="step_A"),
    StepB(name="step_B")
)

engine = engines.load(flow)
engine.run()

T8.2.2、Graph Flow(依赖图流)

定义任务依赖,实现并行与拓扑调度:

from taskflow import task
from taskflow.patterns import graph_flow
from taskflow import engines

class A(task.Task):
    def execute(self):
        print("A 执行")

class B(task.Task):
    def execute(self):
        print("B 执行")

class C(task.Task):
    def execute(self):
        print("C 执行")

ta, tb, tc = A(name="A"), B(name="B"), C(name="C")

flow = graph_flow.Flow("graph")
flow.add(ta, tb, tc)
flow.link(ta, tb)  # A → B
flow.link(tb, tc)  # B → C

engine = engines.load(flow)
engine.run()

T8.2.3、Unordered Flow(无序流)

任务无依赖、执行顺序随机,但依旧支持回滚。

from taskflow import task
from taskflow.patterns import unordered_flow
from taskflow import engines

class Hello(task.Task):
    def execute(self):
        print("Hello")
    def revert(self, **kwargs):
        print("Hello 回滚")

class World(task.Task):
    def execute(self):
        print("World")
        raise Exception("模拟失败")
    def revert(self, **kwargs):
        print("World 回滚")

flow = unordered_flow.Flow("unordered").add(
    Hello(name="hello"),
    World(name="world")
)
engine = engines.load(flow)
engine.run()

T8.2.4、企业级示例(模仿 OpenStack Cinder)

from taskflow import task, engines
from taskflow.patterns import linear_flow as lf

class CreateDBRecord(task.Task):
    def execute(self, volume_id):
        print(f"创建数据库记录 for {volume_id}")
        db_record = {"id": volume_id, "status": "creating"}
        return db_record  # 返回对象

    def revert(self, result, volume_id, **kwargs):
        print(f"回滚数据库记录 for {volume_id}")

class CreateBackend(task.Task):
    def execute(self, create_db_record):
        print(f"在后端创建卷: {create_db_record['id']}")
        raise Exception("模拟后端创建失败")

    def revert(self, create_db_record, **kwargs):
        print(f"回滚后端卷: {create_db_record['id']}")

class UpdateStatus(task.Task):
    def execute(self, create_db_record):
        print(f"更新卷状态为 available: {create_db_record['id']}")
        create_db_record["status"] = "available"

    def revert(self, create_db_record, **kwargs):
        print(f"回滚状态更新: {create_db_record['id']}")

flow = lf.Flow("create-volume").add(
    # 这里显式说:CreateDBRecord 提供名字 create_db_record
    CreateDBRecord(name="create_db", provides="create_db_record"),
    CreateBackend(name="create_backend"),
    UpdateStatus(name="update_status")
)

engine = engines.load(flow, store={"volume_id": "vol-123"})
try:
    engine.run()
except Exception as e:
    print("捕获异常:", e)

T8.3、Retry(重试机制)

TaskFlow 内置多种 retry 策略:

策略类 说明
retry.Times(n) 重试 n 次
retry.AlwaysRevert() 总是触发回滚
retry.ForEach(iterable) 针对每个输入值尝试
from taskflow import task
from taskflow.patterns import linear_flow
from taskflow.retry import Times
from taskflow import engines

class Unstable(task.Task):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.count = 0

    def execute(self):
        self.count += 1
        print(f"执行第 {self.count} 次")
        if self.count < 3:
            raise Exception("失败重试")
        print("成功!")

flow = linear_flow.Flow("retry-flow", retry=Times(3))
flow.add(Unstable(name="unstable_task"))
engine = engines.load(flow)
engine.run()

T8.4、Store(上下文共享)

store 是 Flow 内任务共享参数的上下文。

from taskflow import task, engines
from taskflow.patterns import linear_flow as lf

class Add(task.Task):
    def execute(self, x, y):
        print(f"Add 执行: {x} + {y}")
        return x + y

class Multiply(task.Task):
    def execute(self, add_result):
        print("Multiply 执行: 结果 =", add_result * 10)

# 构建线性流程
flow = lf.Flow("calc").add(
    Add(name="add", provides="add_result"),
    Multiply(name="multiply")
)

# 初始化引擎
engine = engines.load(flow, store={'x': 2, 'y': 3})

# 运行
engine.run()
标签云