session_id持有独立的 checkpointer,实现多用户对话历史的完全隔离将接入层与 Agent 层拆分为独立服务,是本方案的核心设计决策。分层带来了更高的复用性与扩展弹性,代价是引入了多服务管理的运维复杂度。
| 接入层 | ||
| Agent 层 | ||
| 复用性 | ||
| 扩展性 | ||
| 部署复杂度 |
ReAct(Reasoning + Acting)是 LangGraph 的核心执行范式。LLM 自主决策是否调用工具,形成「推理 → 行动」的闭环迭代,直到得出最终答案。

每个会话(Session)持有独立的内存存储实例,LangGraph 通过 thread_id 路由到对应用户的对话历史,实现数据完全隔离。

采用固定窗口限流策略,同一会话两次请求的最小间隔为 rate_limit_seconds,有效防止飞书等平台重试机制导致的 Agent 并发调用问题。

三、技术架构总览


AgentState
关键设计:
add_messagesmessages,职责单一清晰
执行流程:call_model→ should_continue路由 → 若有工具调用则进入 call_tools→ 携带工具结果再次进入 call_model→ 直到 LLM 不再发出工具调用,流程结束。
SessionManager
/chat
/chat/stream(SSE)
SSE 协议要点:
data: 开头,以 \n\n 结尾[DONE]EventSource API 监听并实时渲染 Token


LangGraph Agent HTTP API 封装的核心价值在于服务化与可复用:
MemorySaver实例 | ||
astream() 异步推送 | ||
下一步: 将把
MemorySaver替换为Redis Checkpointer,实现会话数据的跨进程持久化,支持多实例水平扩展。
qwen3:4b 模型 | |
fastapi uvicorn langgraphlangchain-ollama |

# -*- coding: utf-8 -*-import sysimport ioif sys.stdout.encoding and sys.stdout.encoding.lower() != 'utf-8':sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')import osimport timeimport jsonimport loggingimport asynciofrom typing import Optional, AsyncGenerator, Annotatedfrom dataclasses import dataclass, fieldimport uvicornfrom fastapi import FastAPI, HTTPExceptionfrom fastapi.responses import StreamingResponsefrom pydantic import BaseModel# LangGraph / LangChainfrom langchain_core.messages import HumanMessage, AIMessage, SystemMessagefrom langchain_core.tools import toolfrom langgraph.graph import StateGraph, ENDfrom langgraph.graph.message import add_messagesfrom langgraph.checkpoint.memory import MemorySaverfrom langchain_ollama import ChatOllama# --------------------------------------------------------------------------# 日志配置# --------------------------------------------------------------------------logging.basicConfig(level=logging.INFO,format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",datefmt="%H:%M:%S",)logger = logging.getLogger("agent_api")# --------------------------------------------------------------------------# 1. 工具定义# --------------------------------------------------------------------------@tool("search_knowledge_base")def search_knowledge_base(query: str) -> str:"""搜索知识库,返回相关信息。适用于:用户提问相关技术概念、操作步骤、故障排查。"""# 模拟知识库(Day8 已学真实 Chroma 实现,这里简化)knowledge = {"langchain": "LangChain 是 AI 应用开发框架,提供工具定义、链式调用等能力。","langgraph": "LangGraph 是 LangChain 的状态图编排框架,用于构建有状态的 Agent 工作流。","mcp": "MCP(Model Context Protocol)是 Anthropic 提出的工具调用标准协议,支持 Tools/Resources/Prompts 三种原语。","rag": "RAG(检索增强生成)通过向量检索增强 LLM 的知识库查询能力,减少幻觉。","feishu": "飞书机器人通过事件订阅接收用户消息,用 lark-oapi SDK 发送回复。",}query_lower = query.lower()for key, val in knowledge.items():if key in query_lower:return f"[知识库] {val}"return "[知识库] 未找到相关内容,建议直接回答或引导用户补充信息。"@tool("get_current_time")def get_current_time() -> str:"""获取当前日期和时间。当用户询问时间相关问题时使用。"""from datetime import datetimenow = datetime.now()return f"当前时间:{now.strftime('%Y年%m月%d日 %H:%M:%S')}"@tool("calculate")def calculate(expression: str) -> str:"""安全的数学计算工具。支持:加减乘除、幂运算、括号。示例:expression = "2 ** 10 + 100""""try:# 只允许数学运算字符,防止代码注入allowed = set("0123456789+-*/().,** ")if not all(c in allowed for c in expression):return f"[ERROR] 不支持的字符,只允许数学运算表达式"result = eval(expression, {"__builtins__": {}}) # noqa: S307return f"{expression} = {result}"except Exception as e:return f"[ERROR] 计算失败: {e}"TOOLS = [search_knowledge_base, get_current_time, calculate]TOOL_MAP = {t.name: t for t in TOOLS}# --------------------------------------------------------------------------# 2. LangGraph Agent 构建# --------------------------------------------------------------------------from typing import TypedDictclass AgentState(TypedDict):messages: Annotated[list, add_messages]def build_agent_graph(model_name: str = "qwen3:4b", base_url: str = "http://localhost:11434") -> object:"""构建带工具的 LangGraph ReAct Agent。架构:call_model → (有工具调用?) → call_tools → call_model → ...(无工具调用?) → END"""llm = ChatOllama(model=model_name,base_url=base_url,temperature=0.1,).bind_tools(TOOLS)system_prompt = ("你是一个企业 AI 助手,通过飞书与用户交互。\n""你有以下工具可以使用:\n""- search_knowledge_base:搜索技术知识库\n""- get_current_time:获取当前时间\n""- calculate:数学计算\n\n""回复要求:简洁、友好、专业。中文回复。\n""如果用户提问不清晰,主动引导补充信息。")def call_model(state: AgentState) -> dict:"""调用 LLM,决策是否使用工具"""messages = state["messages"]# 如果没有系统消息,在最前面加一个if not messages or not isinstance(messages[0], SystemMessage):messages = [SystemMessage(content=system_prompt)] + list(messages)response = llm.invoke(messages)return {"messages": [response]}def call_tools(state: AgentState) -> dict:"""执行工具调用"""last_message = state["messages"][-1]results = []for tool_call in last_message.tool_calls:tool_name = tool_call["name"]tool_args = tool_call["args"]logger.info("[TOOL] 调用 %s,参数: %s", tool_name, tool_args)if tool_name in TOOL_MAP:try:result = TOOL_MAP[tool_name].invoke(tool_args)except Exception as e:result = f"[ERROR] 工具调用失败: {e}"else:result = f"[ERROR] 未知工具: {tool_name}"from langchain_core.messages import ToolMessageresults.append(ToolMessage(content=str(result),tool_call_id=tool_call["id"],))return {"messages": results}def should_continue(state: AgentState) -> str:"""路由:判断是否需要继续调用工具"""last_message = state["messages"][-1]if hasattr(last_message, "tool_calls") and last_message.tool_calls:return "call_tools"return END# 构建 StateGraphgraph = StateGraph(AgentState)graph.add_node("call_model", call_model)graph.add_node("call_tools", call_tools)graph.set_entry_point("call_model")graph.add_conditional_edges("call_model", should_continue)graph.add_edge("call_tools", "call_model")return graph# --------------------------------------------------------------------------# 3. 会话管理# --------------------------------------------------------------------------@dataclassclass SessionInfo:"""单个会话信息"""session_id: strcheckpointer: MemorySavercreated_at: float = field(default_factory=time.time)last_active: float = field(default_factory=time.time)message_count: int = 0last_request_time: float = 0.0 # 限流用class SessionManager:"""多用户会话管理器(内存版本)。生产环境用 Redis Checkpointer(Day13 主题)。"""def __init__(self, rate_limit_seconds: float = 5.0):self._sessions: dict[str, SessionInfo] = {}self._lock = asyncio.Lock()self.rate_limit_seconds = rate_limit_secondsasync def get_or_create(self, session_id: str) -> SessionInfo:"""获取或创建 session"""async with self._lock:if session_id not in self._sessions:logger.info("[SESSION] 创建新 session: %s", session_id)self._sessions[session_id] = SessionInfo(session_id=session_id,checkpointer=MemorySaver(),)session = self._sessions[session_id]session.last_active = time.time()return sessionasync def delete(self, session_id: str) -> bool:"""清除 session"""async with self._lock:if session_id in self._sessions:del self._sessions[session_id]logger.info("[SESSION] 删除 session: %s", session_id)return Truereturn Falseasync def check_rate_limit(self, session_id: str) -> bool:"""简单限流:同一 session 两次请求间隔 < rate_limit_seconds 则拒绝。防止飞书重试造成 Agent 并发调用。"""session = await self.get_or_create(session_id)now = time.time()if now - session.last_request_time < self.rate_limit_seconds:return False # 被限流session.last_request_time = nowreturn Truedef list_sessions(self) -> list[dict]:"""列出所有活跃 session"""return [{"session_id": s.session_id,"created_at": s.created_at,"last_active": s.last_active,"message_count": s.message_count,}for s in self._sessions.values()]SESSION_MANAGER = SessionManager(rate_limit_seconds=3.0)# --------------------------------------------------------------------------# 4. 请求/响应模型# --------------------------------------------------------------------------class ChatRequest(BaseModel):session_id: strmessage: strstream: bool = Falseclass ChatResponse(BaseModel):session_id: strreply: strelapsed_ms: inttool_calls_made: int = 0# --------------------------------------------------------------------------# 5. Agent 执行# --------------------------------------------------------------------------# 全局 Agent Graph(构建一次复用)_AGENT_GRAPH = None_GRAPH_COMPILE_FAILED = Falsedef get_agent_graph():"""懒加载 Agent Graph(首次调用时构建)"""global _AGENT_GRAPH, _GRAPH_COMPILE_FAILEDif _GRAPH_COMPILE_FAILED:return Noneif _AGENT_GRAPH is None:try:model_name = os.getenv("OLLAMA_MODEL", "qwen3:4b")base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")graph_builder = build_agent_graph(model_name, base_url)# 不传 checkpointer,每次调用时通过 config 传入_AGENT_GRAPH = graph_builder.compile()logger.info("[AGENT] Agent Graph 构建成功,模型:%s", model_name)except Exception as e:logger.error("[AGENT] Agent Graph 构建失败: %s", e)_GRAPH_COMPILE_FAILED = Truereturn _AGENT_GRAPHdef count_tool_calls(messages: list) -> int:"""统计 Agent 调用了几次工具"""from langchain_core.messages import ToolMessagereturn sum(1 for m in messages if isinstance(m, ToolMessage))async def run_agent(session_id: str, user_message: str) -> tuple[str, int]:"""运行 Agent,返回 (reply, tool_call_count)。每个 session 使用独立的 MemorySaver(多轮对话记忆)。"""session = await SESSION_MANAGER.get_or_create(session_id)session.message_count += 1# 构建带 checkpointer 的 Graph(每个 session 独立)model_name = os.getenv("OLLAMA_MODEL", "qwen3:4b")base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")try:graph_builder = build_agent_graph(model_name, base_url)compiled = graph_builder.compile(checkpointer=session.checkpointer)except Exception as e:logger.error("[AGENT] 构建 Agent 失败: %s", e)return f"[ERROR] Agent 初始化失败: {type(e).__name__}: {e}", 0# 运行 Agentconfig = {"configurable": {"thread_id": session_id}}try:result = compiled.invoke({"messages": [HumanMessage(content=user_message)]},config=config,)messages = result["messages"]# 取最后一条 AI 消息reply = next((m.content for m in reversed(messages) if isinstance(m, AIMessage)),"抱歉,我没有生成回复。")tool_count = count_tool_calls(messages)return reply, tool_countexcept Exception as e:logger.error("[AGENT] Agent 执行失败: %s", e)return f"抱歉,处理您的消息时出现错误:{type(e).__name__}", 0# --------------------------------------------------------------------------# 6. FastAPI 端点# --------------------------------------------------------------------------app = FastAPI(title="LangGraph Agent API", version="1.0.0")@app.get("/health")async def health():"""健康检查"""return {"status": "ok","active_sessions": len(SESSION_MANAGER._sessions),"model": os.getenv("OLLAMA_MODEL", "qwen3:4b"),}@app.post("/chat", response_model=ChatResponse)async def chat(req: ChatRequest):"""同步对话接口。每个 session_id 独立维护多轮对话历史。"""if not req.session_id or not req.message.strip():raise HTTPException(status_code=400, detail="session_id 和 message 不能为空")# 限流检查ok = await SESSION_MANAGER.check_rate_limit(req.session_id)if not ok:raise HTTPException(status_code=429, detail="请求过于频繁,请稍后再试")start_ms = int(time.time() * 1000)logger.info("[CHAT] session=%s msg=%s", req.session_id, req.message[:50])reply, tool_count = await asyncio.to_thread(# 在线程中运行同步 Agent(避免阻塞 FastAPI 事件循环)lambda: asyncio.run(run_agent(req.session_id, req.message)))elapsed = int(time.time() * 1000) - start_mslogger.info("[CHAT] session=%s elapsed=%dms tools=%d", req.session_id, elapsed, tool_count)return ChatResponse(session_id=req.session_id,reply=reply,elapsed_ms=elapsed,tool_calls_made=tool_count,)@app.post("/chat/stream")async def chat_stream(req: ChatRequest):"""流式对话接口(SSE)。飞书不支持流式,此接口供 Web 端使用。"""async def generate() -> AsyncGenerator[str, None]:session = await SESSION_MANAGER.get_or_create(req.session_id)model_name = os.getenv("OLLAMA_MODEL", "qwen3:4b")base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")try:graph_builder = build_agent_graph(model_name, base_url)compiled = graph_builder.compile(checkpointer=session.checkpointer)config = {"configurable": {"thread_id": req.session_id}}async for chunk in compiled.astream({"messages": [HumanMessage(content=req.message)]},config=config,stream_mode="messages",):msg_chunk, metadata = chunkif hasattr(msg_chunk, "content") and msg_chunk.content:yield f"data: {json.dumps({'text': msg_chunk.content}, ensure_ascii=False)}\n\n"yield "data: [DONE]\n\n"except Exception as e:yield f"data: {json.dumps({'error': str(e)})}\n\n"return StreamingResponse(generate(), media_type="text/event-stream")@app.delete("/session/{session_id}")async def delete_session(session_id: str):"""清除会话历史"""ok = await SESSION_MANAGER.delete(session_id)if ok:return {"message": f"session {session_id} 已清除"}raise HTTPException(status_code=404, detail="session 不存在")@app.get("/sessions")async def list_sessions():"""列出所有活跃 session(管理接口)"""return {"sessions": SESSION_MANAGER.list_sessions()}# --------------------------------------------------------------------------# 7. 主程序 + 演示# --------------------------------------------------------------------------def demo_without_ollama():"""演示 API 结构(不需要 Ollama)"""print("\n" + "=" * 60)print("T12-02 演示:Agent API 结构")print("=" * 60)print("\n[工具清单]")for t in TOOLS:print(f" - {t.name}: {t.description[:60]}")print("\n[API 端点]")endpoints = [("POST", "/chat", "同步对话(飞书使用此接口)"),("POST", "/chat/stream", "流式对话(SSE,供 Web 使用)"),("DELETE", "/session/{id}", "清除会话历史"),("GET", "/sessions", "列出活跃 session"),("GET", "/health", "健康检查"),]for method, path, desc in endpoints:print(f" [{method:6}] {path:25}{desc}")print("\n[会话管理设计]")print(" - 每个 session_id 独立 MemorySaver(多轮记忆)")print(" - session_id = 飞书 open_id(一用户一 session)")print(" - 限流:同一 session 3 秒内只处理 1 条消息")print(" - 生产环境:MemorySaver → Redis Checkpointer(Day13)")print("\n[工具调用测试]")result = calculate.invoke({"expression": "2 ** 10"})print(f" calculate('2 ** 10') = {result}")result = get_current_time.invoke({})print(f" get_current_time() = {result}")result = search_knowledge_base.invoke({"query": "什么是 langgraph"})print(f" search_knowledge_base('什么是 langgraph') = {result[:60]}")if __name__ == "__main__":demo_without_ollama()print("\n" + "=" * 60)print("启动 Agent API Server...")print("端口:8002")print("接口:POST /chat")print("=" * 60)print()uvicorn.run(app, host="0.0.0.0", port=8002, log_level="info")
