通过本案例学习,你将掌握:
| Fan-out 模式 | |
| Fan-in 模式 | |
| 性能对比 | |
| 状态合并 |
为什么需要 Multi-Agent(多智能体并行)和Fan-out(广播任务)+ Fan-in(汇总结果)模式?

核心思想:把无依赖的任务并行执行,最后统一汇总。
| Send API | Send(node_name, state) | |
| operator.add | Annotated[list, operator.add] | |
| add_conditional_edges | graph.add_conditional_edges(...) |

加速比:当 T1 = T2 = 2s 时,串行 4s → 并行 2s,2倍加速。


文件位置:day11/02_parallel_agents.py 第 59-89 行



文件位置:第 222-253 行
defaggregator_node(state: ParallelState) -> dict:"""收集所有 Worker 结果后汇总""" sub_results = state.get("sub_results", []) # [result_a, result_b]# LLM 整合生成最终报告return {"summary": summary}文件位置:day11/02_parallel_agents.py 第 288-323 行
graph.add_conditional_edges("start_node", supervisor_fan_out, # Fan-out:并行触发 ["researcher", "coder"],)graph.add_edge("researcher", "aggregator") # Fan-in:汇入汇总graph.add_edge("coder", "aggregator")# 查看概念说明(无需 Ollama)python day11/02_parallel_agents.py --concept# 完整演示(需要 Ollama)python day11/02_parallel_agents.py======================================================================[Day11-02] LangGraph Fan-out/Fan-in 并行 Agent 演示======================================================================[START] 接收任务: 用 Python 实现一个基于 FastAPI 的 REST API...[Supervisor] 任务 Fan-out,并行派发给 researcher + coder [Researcher] 并行启动,任务: 用 Python 实现一个基于 FastAPI... [Coder] 并行启动,任务: 用 Python 实现一个基于 FastAPI... [Researcher] 完成,耗时 0.35s [Coder] 完成,耗时 0.32s[Aggregator] 收到 2 个 Worker 的结果,开始汇总...[Aggregator] 汇总完成,摘要: 本报告整合了 FastAPI REST API 的开发文档...[PARALLEL RESULT] 总耗时: 0.42s 收集到 2 个 Worker 结果 汇总摘要: 本报告整合了 FastAPI REST API 的开发文档... - researcher: 0.35s - coder: 0.32s[COMPARISON] 并行执行耗时: 0.42s 理论串行耗时: 0.35s + 0.32s = 0.67s 加速比: 1.6x任务:FastAPI REST API 开发执行链路: START → start_node → [researcher || coder] → aggregator → END ↑ ↑ ↑ └────────┴───────────┘ 并行执行(同时)时间分析: researcher: 0.35s coder: 0.32s ───────────────────── 并行总耗时: max(0.35, 0.32) + 汇总开销 ≈ 0.42s 串行总耗时: 0.35 + 0.32 = 0.67s 节省: 0.25s (37%)| Send API | Send(node, state) |
| operator.add | |
| Fan-out | |
| Fan-in |
✅ 适合并行模式:
❌ 不适合的场景:
# 1. 定义带 reducer 的状态sub_results: Annotated[list, operator.add]# 2. Fan-out:返回 List[Send]deffan_out(state) -> list[Send]:return [Send("worker_a", state), Send("worker_b", state)]# 3. Worker 返回列表(自动合并)return {"sub_results": [result]}# 4. Fan-in:Aggregator 收集所有结果all_results = state["sub_results"] # [result_a, result_b]# -*- coding: utf-8 -*-import sysimport ioif sys.stdout.encoding != 'utf-8':sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')import osimport timeimport operator # operator.add 用于 reducer 合并列表from typing import TypedDict, Annotated, Anyfrom dotenv import load_dotenvfrom langchain_core.messages import HumanMessage, AIMessage, SystemMessagefrom langchain_core.tools import toolfrom langchain_ollama import ChatOllamafrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messagesfrom langgraph.types import Send # Send API:并行执行的核心load_dotenv()# ============================================================# 配置# ============================================================MODEL_NAME = os.getenv("OLLAMA_MODEL", "qwen3:4b") # 从环境变量读取模型名,默认 qwen3:4bOLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") # Ollama 服务地址llm = ChatOllama(model=MODEL_NAME, base_url=OLLAMA_BASE_URL, temperature=0) # temperature=0 确保输出稳定# ============================================================# 一、State 定义# ============================================================class SubTaskResult(TypedDict):"""单个 Worker 的任务结果"""worker: str # Worker 名称标识task: str # 接收到的子任务描述result: str # Worker 执行结果内容elapsed: float # 执行耗时(秒),用于性能分析class ParallelState(TypedDict):"""并行 Agent 共享状态sub_results:用 operator.add 作为 reducer→ 多个 Worker 并发写入时,LangGraph 会把各自结果 append 到列表→ 不会覆盖,保证所有结果都被收集(Fan-in 关键)"""task: strmessages: Annotated[list, add_messages] # 消息历史,自动合并sub_results: Annotated[list, operator.add] # Fan-in 汇总结果,用 operator.add 防并发覆盖summary: str # aggregator 生成的最终汇总报告class WorkerInput(TypedDict):"""传递给各 Worker 的输入(Send API 携带的数据)"""task: strworker_name: str# ============================================================# 二、Tools# ============================================================@tool("search_docs")def search_docs(query: str) -> str:"""搜索技术文档,返回相关片段"""import time as _time_time.sleep(0.3) # 模拟 I/O 延迟,体现并行优势docs = {"python": "Python 官方文档:https://docs.python.org, 支持 asyncio / dataclass / typing","fastapi": "FastAPI 文档:https://fastapi.tiangolo.com, 基于 Pydantic 和 Starlette","langgraph": "LangGraph 文档:https://langchain-ai.github.io/langgraph, 支持 StateGraph + Checkpointer","default": "未找到精确匹配,建议查阅官方文档。"}for k, v in docs.items():if k.lower() in query.lower():return f"[文档检索] {v}"return docs["default"]@tool("generate_code")def generate_code(requirement: str) -> str:"""根据需求描述,生成 Python 代码框架"""import time as _time_time.sleep(0.3) # 模拟代码生成延迟return f"""# 根据需求自动生成# 需求:{requirement}def solution():\"\"\"自动生成的代码框架\"\"\"# TODO: 实现核心逻辑# 1. 初始化# 2. 执行业务逻辑# 3. 返回结果passif __name__ == '__main__':result = solution()print(f'Result: {{result}}')"""# ============================================================# 三、Fan-out Worker 节点# ============================================================def researcher_parallel(state: ParallelState) -> dict:"""并行 Researcher Worker职责:搜索文档,返回背景知识"""print(f" [Researcher] 并行启动,任务: {state['task'][:40]}...")start = time.time() # 记录开始时间worker_llm = llm.bind_tools([search_docs]) # 绑定工具,让 LLM 可以调用 search_docsresponse = worker_llm.invoke([SystemMessage(content="你是一个文档检索专家。使用 search_docs 工具搜索相关文档,整理为简洁摘要。"),HumanMessage(content=f"请检索以下任务所需的技术文档:{state['task']}")])# 处理工具调用结果tool_results = []if hasattr(response, 'tool_calls') and response.tool_calls:for tc in response.tool_calls:if tc['name'] == 'search_docs':result = search_docs.invoke(tc['args']) # 实际执行工具tool_results.append(result)result_text = response.contentif tool_results:result_text = "\n".join(tool_results)elapsed = time.time() - start # 计算耗时print(f" [Researcher] 完成,耗时 {elapsed:.2f}s")# 返回结果:注意 sub_results 是列表,会被 operator.add 合并return {"messages": [AIMessage(content=f"[Researcher] {result_text}")],"sub_results": [SubTaskResult(worker="researcher",task=state["task"],result=result_text,elapsed=elapsed,)],}def coder_parallel(state: ParallelState) -> dict:"""并行 Coder Worker职责:生成代码框架"""print(f" [Coder] 并行启动,任务: {state['task'][:40]}...")start = time.time()worker_llm = llm.bind_tools([generate_code])response = worker_llm.invoke([SystemMessage(content="你是一个 Python 开发专家。使用 generate_code 工具生成代码框架,并补充说明。"),HumanMessage(content=f"请为以下任务生成代码框架:{state['task']}")])tool_results = []if hasattr(response, 'tool_calls') and response.tool_calls:for tc in response.tool_calls:if tc['name'] == 'generate_code':result = generate_code.invoke(tc['args'])tool_results.append(result)result_text = response.contentif tool_results:result_text = response.content + "\n[生成代码框架]\n" + "\n".join(tool_results)elapsed = time.time() - startprint(f" [Coder] 完成,耗时 {elapsed:.2f}s")return {"messages": [AIMessage(content=f"[Coder] {result_text[:100]}...")],"sub_results": [SubTaskResult(worker="coder",task=state["task"],result=result_text,elapsed=elapsed,)],}# ============================================================# 四、Fan-in 汇总节点# ============================================================def aggregator_node(state: ParallelState) -> dict:"""Fan-in 汇总节点职责:等待所有并行 Worker 完成后,汇总结果当 sub_results 收集了所有 Worker 的结果后才会被调用(LangGraph 自动保证)"""sub_results = state.get("sub_results", [])print(f"\n[Aggregator] 收到 {len(sub_results)} 个 Worker 的结果,开始汇总...")# 整理所有 Worker 的输出为统一格式combined = "\n\n".join(f"=== {r['worker'].upper()} (耗时 {r['elapsed']:.2f}s) ===\n{r['result']}"for r in sub_results)# 让 LLM 汇总生成最终报告response = llm.invoke([SystemMessage(content=("你是一个技术文档整合专家。""请将以下多个 Worker 的输出整合为一份清晰的技术报告,""格式为:概述 + 关键发现 + 代码示例 + 建议。")),HumanMessage(content=f"任务:{state['task']}\n\n各 Worker 输出:\n\n{combined}")])summary = response.contentprint(f"[Aggregator] 汇总完成,摘要: {summary[:80]}...")return {"messages": [AIMessage(content=f"[汇总报告]\n{summary}")],"summary": summary,}# ============================================================# 五、Fan-out 路由(Supervisor)# ============================================================def supervisor_fan_out(state: ParallelState) -> list[Send]:"""Fan-out 路由函数同时向 researcher 和 coder 发送任务(并行触发)Send(node_name, input_state) 是 LangGraph 并行的核心 API:- 每个 Send 创建一个独立的执行分支- 所有分支同时执行(并发)- 结果通过 reducer(operator.add)自动合并到主状态"""print(f"\n[Supervisor] 任务 Fan-out,并行派发给 researcher + coder")return [# Send(目标节点名, 传递给该节点的状态)Send("researcher", {"task": state["task"], "messages": state["messages"], "sub_results": [], "summary": ""}),Send("coder", {"task": state["task"], "messages": state["messages"], "sub_results": [], "summary": ""}),]def start_node(state: ParallelState) -> dict:"""入口节点:接收任务,初始化状态,触发 Fan-out"""print(f"\n[START] 接收任务: {state['task'][:60]}...")return {"sub_results": [], "summary": ""}# ============================================================# 六、构建并行图# ============================================================def build_parallel_graph() -> Any:"""构建 Fan-out / Fan-in 并行 Agent 图结构:START → start_nodestart_node → [researcher, coder](通过 supervisor_fan_out Send API 并行)researcher → aggregatorcoder → aggregatoraggregator → END"""graph = StateGraph(ParallelState)graph.add_node("start_node", start_node)graph.add_node("researcher", researcher_parallel)graph.add_node("coder", coder_parallel)graph.add_node("aggregator", aggregator_node)# 入口:从 START 进入 start_nodegraph.add_edge(START, "start_node")# Fan-out:start_node 通过条件边并行 Send 到 researcher + codergraph.add_conditional_edges("start_node",supervisor_fan_out, # 返回 List[Send],LangGraph 并行执行["researcher", "coder"],)# Fan-in:两个 Worker 完成后都进入 aggregatorgraph.add_edge("researcher", "aggregator")graph.add_edge("coder", "aggregator")# 汇总完成 → ENDgraph.add_edge("aggregator", END)return graph.compile()# ============================================================# 七、串行对比(用于性能对比)# ============================================================def serial_execution(task: str) -> tuple[dict, float]:"""串行执行两个 Worker(用于与并行对比)"""print("\n[SERIAL] 串行执行对比...")start = time.time()# 模拟串行调用(简化版,不走 Graph)r1 = researcher_parallel({"task": task, "messages": [HumanMessage(content=task)], "sub_results": [], "summary": ""})r2 = coder_parallel({"task": task, "messages": [HumanMessage(content=task)], "sub_results": [], "summary": ""})elapsed = time.time() - startresults = r1["sub_results"] + r2["sub_results"]return results, elapsed# ============================================================# 八、演示# ============================================================def demo_parallel():"""Fan-out/Fan-in 并行执行演示"""print("=" * 70)print("[Day11-02] LangGraph Fan-out/Fan-in 并行 Agent 演示")print("=" * 70)graph = build_parallel_graph()tasks = ["用 Python 实现一个基于 FastAPI 的 REST API,包含 CRUD 操作","用 LangGraph 构建一个支持多轮对话的智能助手",]for i, task in enumerate(tasks, 1):print(f"\n{'=' * 70}")print(f"[Task {i}] {task}")print("=" * 70)# --- 并行执行 ---print("\n[PARALLEL] 并行执行(researcher + coder 同时工作)...")start_parallel = time.time()initial_state: ParallelState = {"task": task,"messages": [HumanMessage(content=task)],"sub_results": [],"summary": "",}try:final_state = graph.invoke(initial_state,config={"recursion_limit": 20})parallel_elapsed = time.time() - start_parallelprint(f"\n[PARALLEL RESULT]")print(f" 总耗时: {parallel_elapsed:.2f}s")print(f" 收集到 {len(final_state.get('sub_results', []))} 个 Worker 结果")print(f" 汇总摘要: {final_state.get('summary', '')[:150]}...")# Worker 独立耗时for r in final_state.get("sub_results", []):print(f" - {r['worker']}: {r['elapsed']:.2f}s")except Exception as e:print(f"[ERROR] 并行执行失败: {e}")import tracebacktraceback.print_exc()parallel_elapsed = 999# --- 性能对比说明 ---print(f"\n[COMPARISON]")print(f" 并行执行耗时: {parallel_elapsed:.2f}s")print(f" 理论串行耗时: researcher_time + coder_time")print(f" Fan-out/Fan-in 原理:两个 Worker 同时执行,总耗时 = max(T1, T2) 而非 T1 + T2")print("\n" + "=" * 70)print("[DONE] Day11-02 演示完成")print("=" * 70)def demo_concept():"""核心概念说明(不需要 Ollama)"""print("=" * 70)print("[Day11-02] LangGraph Send API 核心概念")print("=" * 70)print("""串行 vs 并行 对比:串行(T11-01 Supervisor 模式):START → supervisor → researcher → supervisor → coder → supervisor → reviewer → END总耗时 = T(researcher) + T(coder) + T(reviewer)并行(T11-02 Fan-out/Fan-in):START → supervisor ──┬──→ researcher ──┐└──→ coder ──→ aggregator → END总耗时 = max(T(researcher), T(coder))当 T(researcher) = T(coder) = 2s 时:串行耗时:2 + 2 = 4s并行耗时:max(2, 2) = 2s加速比:2xSend API 代码模式:def fan_out(state) -> list[Send]:return [Send("worker_a", {**state}), # 并行触发 worker_aSend("worker_b", {**state}), # 并行触发 worker_b]graph.add_conditional_edges("supervisor", fan_out, ["worker_a", "worker_b"])reducer 合并:sub_results: Annotated[list, operator.add]→ worker_a 写 [result_a]→ worker_b 写 [result_b]→ 最终 sub_results = [result_a, result_b](自动合并,不丢失)""")if __name__ == "__main__":import argparseparser = argparse.ArgumentParser(description="Day11-02 并行 Agent 演示")parser.add_argument("--concept", action="store_true", help="只显示概念说明(不需要 Ollama)")args = parser.parse_args()if args.concept:demo_concept()else:demo_parallel()
