通过本项目的学习,你将掌握:
用户发消息 → 飞书服务器检测到事件 → POST 到你的 Webhook URL → 你的服务器验证签名 → 处理消息 → 返回 200challenge 字段,原样返回即可| thread_id | |
| MemorySaver | |
| Redis Checkpointer |
# 第 419 行:通过 thread_id 区分会话config = {"configurable": {"thread_id": open_id}}# 第 418-424 行:Agent 自动保存/恢复消息历史compiled = build_compiled_graph(session.checkpointer)result = compiled.invoke( {"messages": [HumanMessage(content=user_text)]}, config=config, # 指定 thread_id,自动恢复之前的对话)# 立即返回 200,不等待 Agent 处理完成asyncio.create_task(process_message(open_id, user_text, msg_id))return JSONResponse({"code": 0}) # 立即返回| 同步处理 | ||
| 异步处理 |
_processed_msg_ids: set[str] = set() # 全局集合defis_duplicate(msg_id: str) -> bool:if msg_id in _processed_msg_ids:returnTrue# 已处理过,跳过 _processed_msg_ids.add(msg_id) # 记录已处理iflen(_processed_msg_ids) > 2000: _processed_msg_ids.clear() # 防止内存泄漏returnFalse# 收到消息时先检查是否重复if is_duplicate(msg_id): logger.info("[WEBHOOK] 重复消息 %s,跳过", msg_id)return JSONResponse({"code": 0}) # 直接返回,不处理

.env 文件加载飞书应用配置和 Ollama 配置FEISHU_APP_ID = os.getenv("FEISHU_APP_ID", "")FEISHU_APP_SECRET = os.getenv("FEISHU_APP_SECRET", "")OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3:4b")IS_DEMO_MODE = not (FEISHU_APP_ID and FEISHU_APP_SECRET)_token: str = ""_token_expire: float = 0.0asyncdefget_feishu_token() -> str:if _token and time.time() < _token_expire - 300:return _token # 未过期,直接返回缓存# 过期后重新获取 resp = await client.post(f"{FEISHU_BASE_URL}/auth/v3/tenant_access_token/internal", json={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET}, ) _token = data["tenant_access_token"] _token_expire = time.time() + 7200# 2 小时后过期return _tokenasyncdefsend_text(open_id: str, text: str): token = await get_feishu_token()asyncwith httpx.AsyncClient(timeout=15) as client: resp = await client.post(f"{FEISHU_BASE_URL}/im/v1/messages?receive_id_type=open_id", json={"receive_id": open_id,"msg_type": "text","content": json.dumps({"text": text}, ensure_ascii=False), }, headers={"Authorization": f"Bearer {token}"}, )关键点:
receive_id_type=open_idensure_ascii=Falseasyncdefsend_card(open_id: str, title: str, content: str, session_id: str): card = {"header": {"title": {"content": title}, "template": "blue"},"elements": [ {"tag": "div", "text": {"content": content, "tag": "lark_md"}}, {"tag": "hr"}, {"tag": "action", "actions": [ {"tag": "button","text": {"content": "清除对话历史"},"type": "danger","value": {"action": "clear_history", "session_id": session_id} },# ... 其他按钮 ]} ] }# 发送逻辑同 send_text关键点:
msg_type: "interactive"tag: "lark_md"value/webhook/card 接口)@tool("search_knowledge_base")defsearch_knowledge_base(query: str) -> str: kb = {"langchain": "LangChain 是 AI 应用开发框架...","langgraph": "LangGraph 是基于 LangChain 的状态图框架...",# ... 更多知识 }for key, val in kb.items():if key in query.lower():returnf"[知识库] {val}"return"[知识库] 未找到精确匹配"作用:模拟知识库检索(生产环境可用真实向量数据库)
@tool("get_current_time")defget_current_time() -> str:from datetime import datetimereturn datetime.now().strftime("当前时间:%Y年%m月%d日 %H:%M:%S")作用:弥补 LLM 无法获取实时信息的短板
@tool("calculate")defcalculate(expression: str) -> str: allowed = set("0123456789+-*/().,** ")ifnotall(c in allowed for c in expression):return"[ERROR] 只允许数学运算表达式"try: result = eval(expression, {"__builtins__": {}}) # 安全执行returnf"{expression} = {result}"except Exception as e:returnf"[ERROR] 计算失败: {e}"作用:
{"__builtins__": {}}build_compiled_graph(checkpointer)defbuild_compiled_graph(checkpointer: MemorySaver):# 1. 绑定工具到 LLM llm = ChatOllama( model=OLLAMA_MODEL, base_url=OLLAMA_BASE_URL, temperature=0.1, ).bind_tools(TOOLS)# 2. 定义节点defcall_model(state): messages = state["messages"] response = llm.invoke(messages)return {"messages": [response]}defcall_tools(state): last = state["messages"][-1] results = []for tc in last.tool_calls: result = TOOL_MAP[tc["name"]].invoke(tc["args"]) results.append(ToolMessage(content=str(result), tool_call_id=tc["id"]))return {"messages": results}defshould_continue(state) -> str: last = state["messages"][-1]return"call_tools"if (hasattr(last, "tool_calls") and last.tool_calls) else END# 3. 构建图 graph = StateGraph(dict) graph.add_node("call_model", call_model) graph.add_node("call_tools", call_tools) graph.set_entry_point("call_model") graph.add_conditional_edges("call_model", should_continue) graph.add_edge("call_tools", "call_model")# 4. 编译(绑定 checkpointer)return graph.compile(checkpointer=checkpointer)| call_model | |
| call_tools | |
| should_continue | |
| checkpointer |
用户消息 → call_model → (有工具调用?) → call_tools → call_model → ... → (无工具调用) → END ↓ 返回工具结果@dataclassclassSession: session_id: str open_id: str checkpointer: MemorySaver = field(default_factory=MemorySaver) created_at: float = field(default_factory=time.time) last_active: float = field(default_factory=time.time) use_card_mode: bool = True# 卡片模式 / 纯文本模式作用:每个用户独立的会话对象,包含记忆和配置
_sessions: dict[str, Session] = {}defget_session(open_id: str) -> Session:if open_id notin _sessions: _sessions[open_id] = Session( session_id=open_id, open_id=open_id, ) logger.info("[SESSION] 新建 session: %s", open_id[:20]) s = _sessions[open_id] s.last_active = time.time()return s作用:懒创建,每个用户第一次对话时创建 Session
defclear_session(open_id: str):if open_id in _sessions:del _sessions[open_id] # 删除 Session 及其 checkpointer logger.info("[SESSION] 清除 session: %s", open_id[:20])作用:用户点击"清除历史"按钮时调用
_processed_msg_ids: set[str] = set()defis_duplicate(msg_id: str) -> bool:if msg_id in _processed_msg_ids:returnTrue _processed_msg_ids.add(msg_id)iflen(_processed_msg_ids) > 2000: _processed_msg_ids.clear() # 防止内存泄漏returnFalse作用:
set 记录已处理消息 IDprocess_message(open_id, user_text, msg_id)asyncdefprocess_message(open_id: str, user_text: str, msg_id: str):# 步骤 1:获取 Session session = get_session(open_id)# 步骤 2:发送"正在思考"提示ifnot IS_DEMO_MODE:await send_text(open_id, "正在思考中,请稍候...")try:# 步骤 3:构建 Agent(带 checkpointer) compiled = build_compiled_graph(session.checkpointer) config = {"configurable": {"thread_id": open_id}}# 步骤 4:执行 Agent result = compiled.invoke( {"messages": [HumanMessage(content=user_text)]}, config=config, )# 步骤 5:提取回复 messages = result["messages"] raw_reply = next( (m.content for m inreversed(messages) ifisinstance(m, AIMessage)),"抱歉,没有生成回复。" )# 步骤 6:格式化回复 reply = format_reply(raw_reply)# 步骤 7:发送回复if session.use_card_mode:await send_card(open_id, "AI 助手", reply, session_id=open_id)else:await send_text(open_id, reply)except Exception as e:# 错误处理 err_msg = f"抱歉,处理您的消息时出现了问题。\n错误类型:{type(e).__name__}"await send_text(open_id, err_msg)关键点:
webhook_event(request: Request)@app.post("/webhook/event")asyncdefwebhook_event(request: Request):# 1. 解析请求体 body_bytes = await request.body() data = json.loads(body_bytes.decode("utf-8"))# 2. Challenge 验证(首次配置时)if"challenge"in data:return JSONResponse({"challenge": data["challenge"]})# 3. 解析事件 header = data.get("header", {}) event_type = header.get("event_type", "")if event_type != "im.message.receive_v1":return JSONResponse({"code": 0})# 4. 提取消息信息 event_body = data.get("event", {}) message = event_body.get("message", {}) sender = event_body.get("sender", {}) msg_id = message.get("message_id", "") msg_type = message.get("msg_type", "") chat_type = message.get("chat_type", "p2p") open_id = sender.get("sender_id", {}).get("open_id", "")# 5. 只处理文本消息if msg_type != "text":return JSONResponse({"code": 0})# 6. 消息去重if is_duplicate(msg_id):return JSONResponse({"code": 0})# 7. 解析文本内容 content_str = message.get("content", "{}") content_obj = json.loads(content_str) user_text = content_obj.get("text", "").strip()# 8. 群聊只处理 @ 机器人的消息if chat_type == "group":ifnot ("@_user_1"in user_text):return JSONResponse({"code": 0}) user_text = re.sub(r"@\S+\s*", "", user_text).strip()# 9. 异步后台处理 asyncio.create_task(process_message(open_id, user_text, msg_id))# 10. 立即返回 200return JSONResponse({"code": 0})关键点:
webhook_card(request: Request)@app.post("/webhook/card")asyncdefwebhook_card(request: Request): body = await request.json() action_value = body.get("action", {}).get("value", {}) open_id = body.get("open_id", "") action_type = action_value.get("action", "")if action_type == "clear_history": clear_session(open_id)await send_text(open_id, "[系统] 对话历史已清除,可以重新开始!")elif action_type == "toggle_text_mode": session = get_session(open_id) session.use_card_mode = not session.use_card_mode mode = "卡片模式"if session.use_card_mode else"纯文本模式"await send_text(open_id, f"[系统] 已切换为{mode}")return JSONResponse({"code": 0})作用:
问题:Webhook 需要快速响应,但 Agent 执行慢
解决方案:
asyncio.create_task(process_message(...)) # 后台执行return JSONResponse({"code": 0}) # 立即返回优点:
缺点:
问题:多用户同时使用,如何避免对话混淆?
解决方案:
# 每个用户独立的 checkpointersession = get_session(open_id)compiled = build_compiled_graph(session.checkpointer)config = {"configurable": {"thread_id": open_id}}优点:
问题:飞书可能重复发送同一消息
解决方案:
defis_duplicate(msg_id: str) -> bool:if msg_id in _processed_msg_ids:returnTrue# 已处理,跳过 _processed_msg_ids.add(msg_id)returnFalse优点:
缺点:
核心机制:
graph.invoke() 时,自动保存当前状态到 checkpointerthread_id 自动恢复之前的消息历史代码体现:
# 第一次调用:thread_id="user_A"result1 = compiled.invoke( {"messages": [HumanMessage("你好")]}, config={"configurable": {"thread_id": "user_A"}})# 此时 checkpointer 保存了:[HumanMessage("你好"), AIMessage("你好!")]# 第二次调用:同 thread_idresult2 = compiled.invoke( {"messages": [HumanMessage("我叫什么?")]}, config={"configurable": {"thread_id": "user_A"}})# checkpointer 自动恢复历史,LLM 能看到"你好"的上下文# 回复:"你是新用户,还没有告诉我你的名字"实现原理:
MemorySaver{thread_id: [messages]}messages = checkpointer.get(thread_id) + new_messagescheckpointer.save(thread_id, messages)为什么需要异步?
async/await 避免阻塞事件循环代码对比:
requests.get(url) | await httpx.get(url) |
本项目的异步实践:
# Token 获取(异步 HTTP)asyncdefget_feishu_token() -> str:asyncwith httpx.AsyncClient() as client: resp = await client.post(...) # 不阻塞# 消息发送(异步 HTTP)asyncdefsend_text(open_id: str, text: str):asyncwith httpx.AsyncClient() as client:await client.post(...)# 后台任务(异步任务)asyncio.create_task(process_message(...)) # 不阻塞 Webhook为什么用卡片?
卡片结构:
{"config": {"wide_screen_mode": True}, # 宽屏模式"header": {"title": {"content": "AI 助手"}, # 标题"template": "blue"# 颜色模板 },"elements": [ {"tag": "div", "text": {"content": "回复内容", "tag": "lark_md"}}, # 正文(支持 Markdown) {"tag": "hr"}, # 分割线 {"tag": "action", "actions": [ # 按钮 {"tag": "button", "text": {"content": "清除历史"}, "value": {...}} ]} ]}交互流程:
用户点击按钮 → 飞书 POST /webhook/card → action_value {"action": "clear_history"} → 后端处理 → clear_session() → send_text("已清除")当前工具:知识库(模拟)、时间、计算
可添加工具:
示例:
@tool("search_documents")defsearch_documents(query: str) -> str:"""搜索真实文档库(Chroma)"""from chromadb import Client client = Client() collection = client.get_collection("documents") results = collection.query(query_texts=[query], n_results=3)return"\n".join(results["documents"][0])当前:MemorySaver(内存)
问题:
升级方案:Redis Checkpointer
from langgraph.checkpoint.postgres.aio import AsyncSaverasyncdefget_redis_checkpointer(): saver = AsyncSaver.from_conn_string("postgresql://user:password@localhost:5432/checkpoints" )await saver.setup() # 初始化表return saver# 使用checkpointer = await get_redis_checkpointer()compiled = graph.compile(checkpointer=checkpointer)优点:
当前:飞书
可接入:
接入思路:
# 平台无关的 Agent 接口asyncdefprocess_agent_message(session_id: str, text: str) -> str:# Agent 逻辑(与平台无关)return reply# 飞书适配器classFeishuAdapter:asyncdefon_message(self, open_id: str, text: str): reply = await process_agent_message(open_id, text)await send_feishu_message(open_id, reply)# 企微适配器classWecomAdapter:asyncdefon_message(self, user_id: str, text: str): reply = await process_agent_message(user_id, text)await send_wecom_message(user_id, reply)A:网络原因导致 Webhook 超时,飞书服务器会重试
解决:使用 is_duplicate(msg_id) 去重
A:使用 ngrok 暴露本地端口到公网
# 安装 ngrok# Windows: 下载 ngrok.exe# 启动隧道ngrok http 8000# 飞书事件订阅配置填:https://xxx.ngrok.io/webhook/event查看日志:
logger.info("[WEBHOOK] 收到消息: open_id=%s text=%s", open_id, text)A:可能原因
checkpointer 未绑定:
# 错误:无记忆compiled = graph.compile()# 正确:带记忆compiled = graph.compile(checkpointer=session.checkpointer)thread_id 不一致:
# 错误:每次调用用不同 thread_idconfig = {"configurable": {"thread_id": str(uuid.uuid4())}}# 正确:用户固定 thread_idconfig = {"configurable": {"thread_id": open_id}}Session 被清除:
# 用户点击"清除历史"后,checkpointer 数据被删除clear_session(open_id)A:优化策略
from functools import lru_cache@lru_cache(maxsize=100)@tool("get_weather")defget_weather(city: str) -> str:# 缓存最近 100 次查询return fetch_weather(city)A:推荐方案

关键点:
/health 接口用于 K8s/Docker 健康探测# 启动 Ollamaollama serve# 拉取模型ollama pull qwen3:4b# 启动飞书 Agent 服务python 03_feishu_agent.py# 1. 启动 ngrokngrok http 8000# 2. 复制 ngrok URL(如 https://xxx.ngrok.io)# 3. 飞书后台配置事件订阅# URL: https://xxx.ngrok.io/webhook/event# 4. 测试验证curl http://localhost:8000/health# 演示模式下可直接测试 Agent 逻辑import asynciofrom langgraph.checkpoint.memory import MemorySaverfrom day12.feishu_agent import get_session, process_message# 模拟飞书用户消息asyncio.run(process_message("demo_user", "什么是 LangGraph?", "msg_001"))执行结果

