前言
软件版本
Python 3.8.10 poetry 1.1.12 prefect 1.0.0
- poetry github:https://github.com/python-poe...
- prefect github:https://github.com/PrefectHQ/...
安装
用 poetry 初始化项目后在 pyproject.toml 添加以下依赖,然后运行 poetry update -vvv
# 国内镜像源(可选) [[tool.poetry.source]] name = "aliyun" url = "https://mirrors.aliyun.com/pypi/simple/" default = true [tool.poetry.dependencies] python = "^3.8" prefect = "~1.0.0"
设置启用检查点环境变量
# Linux export PREFECT__FLOWS__CHECKPOINTING=true # Windows powershell $env:PREFECT__FLOWS__CHECKPOINTING="true" # Windows cmd(注意行尾不要有空格) set PREFECT__FLOWS__CHECKPOINTING=true
- 测试代码
test_prefect.py
# encoding: utf-8 # author: qbit # date: 2022-01-12 # summary: 测试 prefect,加减乘除 import os import sys import shutil import prefect from prefect import task, Flow from prefect.engine.results import LocalResult logger = prefect.context.get("logger") cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__)) cur_filename = os.path.basename(__file__) dirname = f".{os.path.splitext(cur_filename)[0]}" # 以当前 py 文件名作为缓存目录名 PrefectLocalResultDir = os.path.join(cur_dir_fullpath, dirname) def ClearDirectory(dir): r""" 清空目录 """ for filename in os.listdir(dir): file = os.path.join(dir, filename) try: if os.path.isfile(file) or os.path.islink(file): os.remove(file) elif os.path.isdir(file): shutil.rmtree(file) except Exception as e: print(f'Failed to delete{file}. Reason: {e}') @task(target="{task_name}.target", checkpoint=True, result=LocalResult(dir=PrefectLocalResultDir)) def TaskAdd(x, y): result = x + y logger.info(f"{x} + {y} = {result}") return result @task def TaskSubtract(x): r""" 读入参数减 1 """ result = x - 1 logger.info(f"{x} - 1 = {result}") return result @task def TaskMultiply(x): r""" 读入参数乘以 2 """ result = x * 2 logger.info(f"{x} * 2 = {result}") print(f"result: {result}") return result @task(log_stdout=True) def TaskDivide(x, y): r""" 读入参数做除法 """ result = y / x logger.info(f"{y} / {x} = {result}") return result if __name__ == '__main__': if (len(sys.argv) > 1) and (sys.argv[1] == "restart"): print(f"****** Clear {PrefectLocalResultDir} ...") ClearDirectory(PrefectLocalResultDir) with Flow("示例: 四则运算") as flow: addResult = TaskAdd(2, 1) subResult = TaskSubtract(addResult) mulResult = TaskMultiply(addResult) TaskDivide(subResult, mulResult) flow_state = flow.run()
运行
第一次运行(注意第一个计算结果的 state 是 Success)
# 运行命令 poetry run python ./test_prefect.py # 结果输出 [2022-02-24 17:35:48+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算' [2022-02-24 17:35:49+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run... [2022-02-24 17:35:49+0800] INFO - prefect | 2 + 1 = 3 [2022-02-24 17:35:49+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Success' [2022-02-24 17:35:49+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run... [2022-02-24 17:35:49+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run... [2022-02-24 17:35:49+0800] INFO - prefect | 3 * 2 = 6 [2022-02-24 17:35:49+0800] INFO - prefect | 3 - 1 = 2 [2022-02-24 17:35:49+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success' [2022-02-24 17:35:49+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success' [2022-02-24 17:35:49+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run... [2022-02-24 17:35:49+0800] INFO - prefect | 6 / 2 = 3.0 [2022-02-24 17:35:49+0800] INFO - prefect.TaskRunner | result: 3.0 [2022-02-24 17:35:49+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success' [2022-02-24 17:35:49+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
第二次运行(注意第一个计算结果的 state 是 cached)
# 运行命令 poetry run python ./test_prefect.py # 结果输出 [2022-02-24 17:36:47+0800] INFO - prefect.FlowRunner | Beginning Flow run for '示例: 四则运算' [2022-02-24 17:36:47+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Starting task run... [2022-02-24 17:36:47+0800] INFO - prefect.TaskRunner | Task 'TaskAdd': Finished task run for task with final state: 'Cached' [2022-02-24 17:36:47+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Starting task run... [2022-02-24 17:36:47+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Starting task run... [2022-02-24 17:36:47+0800] INFO - prefect | 3 * 2 = 6 [2022-02-24 17:36:47+0800] INFO - prefect | 3 - 1 = 2 [2022-02-24 17:36:47+0800] INFO - prefect.TaskRunner | Task 'TaskMultiply': Finished task run for task with final state: 'Success' [2022-02-24 17:36:47+0800] INFO - prefect.TaskRunner | Task 'TaskSubtract': Finished task run for task with final state: 'Success' [2022-02-24 17:36:47+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Starting task run... [2022-02-24 17:36:47+0800] INFO - prefect | 6 / 2 = 3.0 [2022-02-24 17:36:47+0800] INFO - prefect.TaskRunner | result: 3.0 [2022-02-24 17:36:47+0800] INFO - prefect.TaskRunner | Task 'TaskDivide': Finished task run for task with final state: 'Success' [2022-02-24 17:36:47+0800] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
静态 DAG 图
- 官方文档:https://docs.prefect.io/core/...
- 下载 Graphviz 并配置到 PATH 环境变量
修改 pyproject.toml,添加 viz extra,然后运行 poetry update -vvv
[tool.poetry.dependencies] python = "^3.8" prefect = { version = "~1.0.0", extras = ["viz"] }
修改 test_prefect.py 的主函数
if __name__ == '__main__': if (len(sys.argv) > 1) and (sys.argv[1] == "restart"): print(f"****** Clear {PrefectLocalResultDir} ...") ClearDirectory(PrefectLocalResultDir) with Flow("示例: 四则运算") as flow: addResult = TaskAdd(2, 1) subResult = TaskSubtract(addResult) mulResult = TaskMultiply(addResult) TaskDivide(subResult, mulResult) flow.visualize(filename='flow_start', format='png') flow_state = flow.run() flow.visualize(flow_state=flow_state, filename='flow_end', format='png')
运行代码会生成 flow_start.png flow_end.png 两张图片
poetry run python ./test_prefect.py restart
- flow_start.png
- flow_end.png
- 颜色代表的状态:https://docs.prefect.io/api/l...