从脚本到生产,只差一个装饰器的距离
当你的数据管道从“每日跑一次”升级到“实时响应,分钟级调度”,当你开始为任务失败后的重试逻辑和状态追踪头疼——是时候认真考虑一个像样的工作流编排工具了。
Prefect 3.0,这个将Python原生体验发挥到极致的工作流编排框架,刚刚迎来了它的全面升级。相比传统编排工具,它最大的特点就是“Pythonic”——你的工作流就是普通的Python函数,加几个装饰器而已。
下面这张图勾勒了Prefect 3.0的核心工作流程:
核心概念:Flow 和 Task
Prefect 3.0 围绕两个核心抽象构建:
Task 是工作流的最小执行单元,代表一个独立的操作步骤。Flow 则是任务的编排容器,定义了任务之间的依赖关系和执行顺序。最妙的是,任何一个Python函数都可以通过装饰器变成Flow。
一个最简单的例子长这样:
from prefect import flow, taskimport httpx@task(log_prints=True)def fetch_stars(repo: str): url = f"https://api.github.com/repos/{repo}" data = httpx.get(url).json() stars = data["stargazers_count"] print(f"{repo} has {stars} stars!") return stars@flow(name="GitHub Stars Tracker")def github_flow(repos: list[str]): for repo in repos: fetch_stars(repo)if __name__ == "__main__": github_flow(["PrefectHQ/prefect", "pandas-dev/pandas"])
运行这段代码,Prefect 会自动记录每次执行的完整信息——输入参数、输出结果、执行时间、状态变化,全部可以在 UI 中查看。这就是 Prefect 的魅力:你的代码几乎没有改动,但突然就有了完整的可观测性。
Prefect 3.0 的四大革新
一、事件驱动:不再问“好了吗”,而是让数据告诉你
这是 3.0 最引人注目的变化。此前仅限于 Prefect Cloud 的事件与自动化系统,现在完全开源开放了。你可以让工作流响应任意事件——S3文件上传、数据库变更、API调用、甚至是来自其他服务的webhook——而不是让调度器每隔5分钟去问一次“数据准备好了吗”。
比如,你想让关键工作流失败后自动暂停,给工程师留出排查时间:
# 通过 Prefect SDK 创建自动化规则automation = await prefect_client.create_automation( name="pause-on-failure", trigger=Trigger( event="prefect.flow-run.failed", match={"deployment.name": "critical-workflow"} ), action=PauseDeploymentAction())
你也可以在代码中主动发送自定义事件:
from prefect.events import emit_eventemit_event("data.pipeline.completed", payload={ "records_processed": 10000, "status": "success"})
二、事务语义:让数据管道真正可靠
每个任务都被包裹在一个事务中。如果任务在相同输入下再次运行,它不会重新执行,而是直接加载缓存的结果。多个任务可以被归入同一个事务——任一失败,全部回滚。这在数据一致性要求高的场景下非常实用:
from prefect import flow, taskfrom prefect.transactions import transaction@taskdef deduct_inventory(item: str, qty: int): print(f"Deducting {qty} of {item}") return {"deducted": True}@task(on_rollback=lambda: print("Rolling back inventory..."))def write_to_warehouse(data): print(f"Writing {data} to warehouse") return {"committed": True}@flowdef order_flow(item: str, qty: int): with transaction(): inventory_result = deduct_inventory(item, qty) write_to_warehouse(inventory_result)
三、新Worker架构:告别Agent,拥抱现代基础设施
3.0 正式淘汰了旧版Agent模型,全面采用Worker架构。Worker是轻量级的轮询服务,从工作池(Work Pool)中领取调度好的任务,负责启动和管理执行环境。部署也变得极其简洁:
# 创建一个部署,按cron调度github_flow.serve( name="github-stats-daily", cron="0 9 * * *", # 每天早上9点运行 parameters={"repos": ["PrefectHQ/prefect", "pandas-dev/pandas"]})# 部署到EC2 Workerfrom prefect.deployments import Deployment, DockerImagedeployment = Deployment.build_from_flow( flow=github_flow, name="ecs-deployment", work_pool_name="aws-ecs-pool", image=DockerImage(name="my-registry/prefect-flows:latest"))deployment.apply()
四、性能飞跃:10倍于前代
内部测试显示,在1000+任务的场景下,调度延迟降低了90%;内存占用减少了约40%。而这一切改进对用户来说几乎是透明的——你不需要重写代码,升级带来的性能提升会直接体现在运行效率上。
完整示例:一个带重试和监控的ETL管道
from prefect import flow, taskfrom prefect.tasks import task_input_hashfrom datetime import timedeltaimport httpximport pandas as pd@task( retries=3, retry_delay_seconds=10, cache_expiration=timedelta(hours=1), cache_key_fn=task_input_hash)def extract_data(api_url: str) -> dict: """从API提取数据,失败自动重试3次,相同输入会命中缓存""" response = httpx.get(api_url, timeout=30.0) response.raise_for_status() return response.json()@task(log_prints=True)def transform_data(raw_data: dict) -> pd.DataFrame: """清洗和转换数据""" df = pd.DataFrame(raw_data.get("items", [])) df = df.drop_duplicates() df["processed_at"] = pd.Timestamp.now() print(f"Transformed {len(df)} records") return df@taskdef load_data(df: pd.DataFrame, target_table: str) -> None: """加载到目标表(示例中打印数据,实际可替换为数据库写入)""" print(f"Loading {len(df)} rows to {target_table}") # 实际生产环境中,这里会是 df.to_sql 或写入S3等操作@flowdef etl_pipeline(api_endpoints: list[str], target: str = "analytics.table"): results = [] for endpoint in api_endpoints: raw = extract_data(endpoint) transformed = transform_data(raw) results.append(transformed) final_df = pd.concat(results, ignore_index=True) load_data(final_df, target) return final_dfif __name__ == "__main__": # 本地运行 etl_pipeline( ["https://api.example.com/users", "https://api.example.com/orders"], target="staging.user_activity" ) # 也可以部署为定时任务 # etl_pipeline.serve(name="hourly-etl", cron="0 * * * *")
为什么选择 Prefect 3.0?
上手零门槛:不用学新的DSL或配置语法,Python写什么它就是什么
本地开发友好:pip install prefect + prefect server start,秒级启动本地UI
部署方式灵活:从单机cron到Kubernetes、ECS,一套代码通吃
比Airflow更Pythonic:没有DAG定义文件的冗长,没有复杂的模板语法,纯Python
Prefect 3.0 不是要重写你的代码,而是让现有的Python脚本获得企业级的编排能力。从今天开始,给你的函数加上 @task 和 @flow,让它们活起来。
编辑:余文彬
审校:余雨馨