写技术文章最怕什么?就是光讲理论,不给干货。今天聊的 Prefect 这个库,我敢说它是 Python 调度库里把 “简单” 和 “强大” 平衡得最好的那个。
别一听 “工作流编排” 就觉得头大,它用起来比你想的还要顺手。
🚀 一、从普通函数到生产级任务,只需一个装饰器
传统上,如果你想对一个任务设置重试机制,代码会变得非常臃肿。但用 Prefect,逻辑清晰得像伪代码。
下方代码展示了一个可能不稳定的API调用,我们设定它在失败后最多重试3次,每次间隔2秒。这让你的程序在面对网络抖动时更加健壮。
from prefect import task, flow
import random
@task(retries=3, retry_delay_seconds=2)
defcall_risky_api():
if random.choice([True, False]):
raise ValueError("网络波动,请求失败!")
return {"data": "核心业务数据"}
@flow
defmain_flow():
result = call_risky_api()
print(f"最终拿到结果: {result}")
main_flow()
执行结果(模拟失败重试场景):
12:00:01.123 | 调用 call_risky_api 失败,准备重试...
12:00:03.456 | 调用 call_risky_api 失败,准备重试...
12:00:05.789 | 调用 call_risky_api 成功!
最终拿到结果: {'data': '核心业务数据'}
🔍 二、看见你的代码执行轨迹
写脚本最怕黑盒运行,失败了也不知道卡在哪。Prefect 自带的可视化面板能让你实时监控。
通过 serve 方法启动后,你可以在 Web UI 上清晰地看到任务依赖关系。下图代码创建了一个简单的 ETL 流程,展示了任务间的数据流转。
from prefect import flow, task
@task
defextract():
return [1, 2, 3, 4, 5]
@task
deftransform(data):
return [x * 10for x in data]
@task
defload(data):
print(f"写入数据库: {data}")
@flow(name="数据清洗管道")
defetl_pipeline():
raw = extract()
processed = transform(raw)
load(processed)
if __name__ == "__main__":
etl_pipeline.serve(name="首次部署")
执行效果简介:
启动本地服务器...
访问 http://localhost:4200 查看运行状态
写入数据库: [10, 20, 30, 40, 50]
流程执行成功,耗时 0.32秒
📅 三、原生支持定时与并发
别再写 time.sleep 或死循环了。Prefect 原生支持 Cron 表达式,让调度变得极其规范。
同时,通过 concurrency_limit 可以轻松控制并发,防止数据库被打爆。
from prefect import flow
from prefect.tasks import task
@task
defprocess_item(x):
return x ** 2
@flow
defheavy_job():
data = range(10)
results = process_item.map(data)
return results
# 部署并设定每分钟执行,且限制并发数
heavy_job.serve(name="定时任务", cron="* * * * *")
实战对比优势分析
相比 Apache Airflow,Prefect 最大的优势在于减少样板代码和更好的动态流程图支持。
· 优势:调试极其方便,支持局部重跑,对于 Python 开发者极其友好,上手成本极低。
· 劣势:相关中文生态资源相对较少,部分高级功能依赖 Prefect Cloud 云服务。
✍️ 总结与互动
总而言之,Prefect 解决了调度工具的两大痛点:复杂的配置和糟糕的可观测性。
它既是脚本的“升级包”,也是复杂系统的“稳定器”。
希望今天的实战对你有帮助。你在工作中有没有遇到过因为任务调度不稳而导致的“血案”?
欢迎在评论区分享你的踩坑经历,或者聊聊你还想看哪个库的解析,我会一一回复!