1. 学习目标LangGraph 基础与状态管理
- 理解 StateGraph 设计哲学
- 掌握节点(Node)与边(Edge)的概念
- 理解消息在节点间如何传递
2.核心概念速览
StateGraph 工作流程:
- 记录用户输入
- 原样返回(类似回声)
- 展示 LangGraph 最基础的"输入→处理→输出"流程如下图:
3.代码实现
环境:win11+python3.11+vscode编辑器langgraph>=0.2.0
langchain-core>=0.3.0
langchain-community>=0.3.0
- 展示最基础的 StateGraph 工作流程
- 用户输入什么,Agent 就返回什么
from langgraph.graph import StateGraph, ENDfrom typing import TypedDict# ========== 1. 定义 State(状态)==========class GraphState(TypedDict): """定义图中流转的数据结构""" messages: list # 存储对话消息# ========== 2. 定义 Node(节点)==========def node_echo(state: GraphState) -> GraphState: """ Echo 节点:简单返回输入内容 这是 LangGraph 最简单的节点示例 """ user_input = state["messages"] print(f"📥 收到输入: {user_input}") # 原样返回(这里简化处理,实际可以添加更多处理逻辑) return {"messages": user_input}def node_process(state: GraphState) -> GraphState: """ 处理节点:对输入进行简单处理(比如添加标记) """ messages = state["messages"] processed = f"【处理后】{messages}" print(f"🔧 处理后: {processed}") return {"messages": [processed]}# ========== 3. 构建 Graph(图)==========def create_echo_graph() -> StateGraph: """ 创建 Echo Agent 的 StateGraph """ # 初始化图 workflow = StateGraph(GraphState) # 添加节点 workflow.add_node("echo", node_echo) workflow.add_node("process", node_process) # 设置入口点:START -> echo workflow.set_entry_point("echo") # 添加边:echo -> process -> END workflow.add_edge("echo", "process") workflow.add_edge("process", END) return workflow# ========== 4. 运行 Agent==========def run_echo_agent(user_input: str): """ 运行 Echo Agent """ # 编译图 app = create_echo_graph().compile() # 准备输入 initial_state = {"messages": user_input} # 执行 result = app.invoke(initial_state) print(f"📤 最终输出: {result['messages']}") return result# ========== 5. 交互式测试==========if __name__ == "__main__": print("=" * 50) print("🧪 LangGraph Echo Agent 测试") print("=" * 50) print("输入内容,Agent 会原样返回(经过处理节点)") print("输入 'quit' 退出") print("=" * 50) while True: user_input = input("\n👤 你: ").strip() if user_input.lower() in ["quit", "exit", "q"]: print("👋 再见!") break if not user_input: continue print() run_echo_agent(user_input)