本文目标:用最短的时间,搞懂飞书 Webhook Server 是怎么工作的。
飞书 Bot 的本质,是一个「被动响应」的服务。
用户发消息 → 飞书主动推送给你的服务器 → 你的服务器处理后回复。
你需要做的,就是搭一个能接收推送的 HTTP 服务,处理消息,再调飞书 API 发回复。



第一步,把飞书应用的密钥信息从环境变量里读出来。
@dataclassclassFeishuConfig: app_id: str = "" app_secret: str = "" verification_token: str = ""# 用于验证请求是否来自飞书 encrypt_key: str = ""# 如果开启了消息加密,需要这个 base_url: str = "https://open.feishu.cn/open-apis"defis_sandbox(self) -> bool:"""没填 app_id 就进演示模式,方便本地调试"""returnnotbool(self.app_id and self.app_secret)💡
is_sandbox()很实用——没有真实飞书应用时,代码照样能跑,方便你先把逻辑调通。
调飞书 API 发消息,需要先拿到 tenant_access_token。
这个 token 有效期 2 小时,如果每次发消息都去请求一次,很快就会触发飞书的频率限制。
解决方案:缓存 + 提前刷新。
@dataclassclassTokenCache: token: str = "" expire_at: float = 0.0defis_valid(self) -> bool:# 提前 5 分钟判断过期,给刷新留出时间returnbool(self.token) and time.time() < self.expire_at - 300asyncdefget_access_token() -> str:global _token_cache# 缓存还有效,直接用if _token_cache.is_valid():return _token_cache.token# 演示模式,返回假 tokenif CONFIG.is_sandbox():return"mock_token_for_demo"# 缓存过期,重新请求 url = f"{CONFIG.base_url}/auth/v3/tenant_access_token/internal"asyncwith httpx.AsyncClient(timeout=10) as client: resp = await client.post(url, json={"app_id": CONFIG.app_id,"app_secret": CONFIG.app_secret }) data = resp.json()# 更新缓存 _token_cache.token = data["tenant_access_token"] _token_cache.expire_at = time.time() + data.get("expire", 7200)return _token_cache.token逻辑只有三步:有缓存用缓存 → 过期了去刷新 → 刷新后存起来。
任何人都可以往你的服务器发请求,签名验证就是为了确认「这条请求确实是飞书发来的」。
飞书的签名算法:
signature = SHA256(timestamp + nonce + verification_token + body)把这四个字符串拼在一起,做一次 SHA256,和请求头里的 X-Lark-Signature 对比。一致就放行,不一致就拒绝。
defverify_feishu_signature(token, timestamp, nonce, body) -> bool:ifnot token:# 没配置 token,开发阶段跳过验证(生产环境必须配)returnTrue content = f"{timestamp}{nonce}{token}{body}" expected = hashlib.sha256(content.encode("utf-8")).hexdigest()# 与请求头 X-Lark-Signature 对比returnTrue⚠️ 生产环境务必配置
verification_token,不然任何人都能伪造请求触发你的机器人。
如果你在飞书应用里开启了「消息加密」,收到的请求体会是加密的密文,需要先解密。
飞书用的是 AES-CBC 加密:
defdecrypt_feishu_message(encrypt_key: str, ciphertext: str) -> dict:# 1. 用 SHA256 处理 encrypt_key,得到 32 字节密钥 key = hashlib.sha256(encrypt_key.encode("utf-8")).digest()# 2. Base64 解码密文,前 16 字节是 IV decoded = base64.b64decode(ciphertext) iv = decoded[:16] ciphertext_bytes = decoded[16:]# 3. AES-CBC 解密,去掉 PKCS7 填充 cipher = AES.new(key, AES.MODE_CBC, iv) plaintext = cipher.decrypt(ciphertext_bytes) pad_len = plaintext[-1]return json.loads(plaintext[:-pad_len].decode("utf-8"))不开加密的话,这一步可以跳过。
飞书推过来的事件 JSON 结构很复杂,嵌套很深。
把它解析成一个干净的数据对象,后续处理就简单多了。
@dataclassclassFeishuMessage: msg_id: str# 消息唯一 ID(用来去重) sender_open_id: str# 发送者 ID content: str# 消息文本(已清理 @ 标记) chat_type: str# p2p 单聊 / group 群聊 mention_bot: bool# 是否 @ 了机器人defparse_text_message(event: dict) -> FeishuMessage: message = event["event"]["message"] sender = event["event"]["sender"]# content 是 JSON 字符串,需要再解析一次 text = json.loads(message.get("content", "{}")).get("text", "")# 去掉 @机器人 的占位符 text = text.replace("@_user_1", "").strip()# 判断群聊里有没有 @机器人 mentions = message.get("mentions", []) mention_bot = any(m.get("key") == "@_user_1"for m in mentions)return FeishuMessage( msg_id=message["message_id"], sender_open_id=sender["sender_id"]["open_id"], content=text, chat_type=message.get("chat_type", "p2p"), mention_bot=mention_bot, )支持两种回复方式:纯文本 和 消息卡片。
发纯文本:
asyncdefsend_text_message(receive_id: str, text: str) -> bool: token = await get_access_token() url = f"{CONFIG.base_url}/im/v1/messages?receive_id_type=open_id"asyncwith httpx.AsyncClient(timeout=15) as client: resp = await client.post(url, headers={"Authorization": f"Bearer {token}"}, json={"receive_id": receive_id,"msg_type": "text","content": json.dumps({"text": text}, ensure_ascii=False), } )return resp.json().get("code") == 0发消息卡片(带标题和正文):
defbuild_reply_card(title: str, content: str) -> dict:return {"config": {"wide_screen_mode": True},"header": {"title": {"content": title, "tag": "plain_text"},"template": "blue" },"elements": [ {"tag": "div", "text": {"content": content, "tag": "plain_text"}} ] }这是最核心的部分,接收飞书推来的事件,按顺序处理。
@app.post("/webhook/event")asyncdefwebhook_event(request: Request): body_str = (await request.body()).decode("utf-8") data = json.loads(body_str)# 如果消息加密了,先解密if"encrypt"in data: data = decrypt_feishu_message(CONFIG.encrypt_key, data["encrypt"])# 飞书第一次验证回调地址时,会发 challenge,原样返回即可if"challenge"in data:return JSONResponse({"challenge": data["challenge"]})# 只处理「收到消息」事件if data.get("header", {}).get("event_type") != "im.message.receive_v1":return JSONResponse({"code": 0}) msg = parse_text_message(data)# 去重:同一条消息不处理两次if msg.msg_id in _processed_msg_ids:return JSONResponse({"code": 0}) _processed_msg_ids.add(msg.msg_id)# 群聊里只响应 @机器人 的消息if msg.chat_type == "group"andnot msg.mention_bot:return JSONResponse({"code": 0})# 关键:异步处理,不要让飞书等超过 3 秒 asyncio.create_task(process_and_reply(msg))return JSONResponse({"code": 0})飞书有个规定:你的服务必须在 3 秒内返回 200,否则它会认为推送失败,然后重试。
但处理消息(比如调 AI 接口)往往需要好几秒。
asyncio.create_task() 的作用就是把「处理消息」这个任务丢到后台,让主函数立刻返回 200,两件事互不阻塞。
坑一:消息被处理了两次
原因:飞书没收到 200 就重试,导致同一条消息被推了两次。 解法:用msg_id 做内存 Set 去重。开发用内存够了,生产换 Redis。
坑二:服务经常触发飞书重试
原因:处理逻辑太慢,超过了 3 秒。 解法:用 asyncio.create_task() 把慢操作扔到后台,先返回 200。
坑三:群聊里机器人狂回复
原因:群聊里机器人收到所有消息,不加过滤就全部回复。 解法:判断 mention_bot,只有 @ 了机器人才处理。
安装依赖:
pip install fastapi uvicorn httpx# 如果需要消息解密pip install pycryptodome启动服务:
python 01_feishu_bot.py用 ngrok 映射到公网(飞书需要 HTTPS 地址):
ngrok http 8000# 把生成的 https://xxx.ngrok.io/webhook/event 填到飞书应用配置里飞书应用配置(按顺序来):
im:message 和 im:message:send_as_bothttps://xxx.ngrok.io/webhook/eventim.message.receive_v1飞书 Bot 的核心就四件事:
这四个问题搞清楚,飞书 Bot 的基础层就稳了。后续接 Agent、接 LangGraph,都是在这个基础上加逻辑,不会再有大坑。
下一步 → T12-02:把 LangGraph Agent 包成独立的 REST API。
# -*- coding: utf-8 -*-# 书 Bot Webhook Server —— 消息接收 + 签名验证 + 消息发送import sysimport ioif sys.stdout.encoding and sys.stdout.encoding.lower() != 'utf-8':sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')import osimport jsonimport timeimport hmacimport hashlibimport base64import asyncioimport loggingfrom dataclasses import dataclass, fieldfrom typing import Optional, Dict, Anyimport httpxfrom fastapi import FastAPI, Request, HTTPExceptionfrom fastapi.responses import JSONResponseimport uvicorn# --------------------------------------------------------------------------# 日志配置# --------------------------------------------------------------------------logging.basicConfig(level=logging.INFO,format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",datefmt="%H:%M:%S",)logger = logging.getLogger("feishu_bot")# --------------------------------------------------------------------------# 1. 配置层# --------------------------------------------------------------------------@dataclassclass FeishuConfig:"""飞书应用配置(从环境变量读取)"""app_id: str = ""app_secret: str = ""verification_token: str = "" # 事件订阅签名验证 tokenencrypt_key: str = "" # 消息加密 key(可选)# 飞书 API base URLbase_url: str = "https://open.feishu.cn/open-apis"def is_valid(self) -> bool:"""检查必须配置项是否填写"""return bool(self.app_id and self.app_secret)def is_sandbox(self) -> bool:"""判断是否为演示模式(未配置真实 app_id)"""return not self.is_valid()def load_config() -> FeishuConfig:"""从环境变量加载飞书配置"""# 尝试加载 .env 文件env_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")if os.path.exists(env_file):with open(env_file, encoding="utf-8") as f:for line in f:line = line.strip()if line and not line.startswith("#") and "=" in line:k, v = line.split("=", 1)os.environ.setdefault(k.strip(), v.strip())return FeishuConfig(app_id=os.getenv("FEISHU_APP_ID", ""),app_secret=os.getenv("FEISHU_APP_SECRET", ""),verification_token=os.getenv("FEISHU_VERIFICATION_TOKEN", ""),encrypt_key=os.getenv("FEISHU_ENCRYPT_KEY", ""),)CONFIG = load_config()# --------------------------------------------------------------------------# 2. tenant_access_token 管理(带缓存)# --------------------------------------------------------------------------@dataclassclass TokenCache:"""tenant_access_token 缓存(有效期 2 小时)"""token: str = ""expire_at: float = 0.0 # Unix 时间戳def is_valid(self) -> bool:"""提前 5 分钟过期,留出刷新时间"""return bool(self.token) and time.time() < self.expire_at - 300_token_cache = TokenCache()async def get_access_token() -> str:"""获取 tenant_access_token,带缓存。有效期 2 小时,缓存命中直接返回,否则请求飞书接口刷新。"""global _token_cacheif _token_cache.is_valid():return _token_cache.tokenif CONFIG.is_sandbox():logger.warning("[DEMO] 未配置真实飞书应用,返回 mock token")return "mock_token_for_demo"url = f"{CONFIG.base_url}/auth/v3/tenant_access_token/internal"payload = {"app_id": CONFIG.app_id, "app_secret": CONFIG.app_secret}async with httpx.AsyncClient(timeout=10) as client:resp = await client.post(url, json=payload)resp.raise_for_status()data = resp.json()if data.get("code") != 0:raise RuntimeError(f"获取飞书 token 失败: {data}")_token_cache.token = data["tenant_access_token"]_token_cache.expire_at = time.time() + data.get("expire", 7200)logger.info("[TOKEN] 飞书 tenant_access_token 已刷新,有效期 %ds", data.get("expire", 7200))return _token_cache.token# --------------------------------------------------------------------------# 3. 签名验证层# --------------------------------------------------------------------------def verify_feishu_signature(token: str,timestamp: str,nonce: str,body: str,) -> bool:"""验证飞书事件签名(防伪造请求)。飞书签名算法:signature = HMAC-SHA256(timestamp + nonce + encrypt_key + body)参考:https://open.feishu.cn/document/server-docs/event-subscription-guide/event-subscription-configure-/encrypt-key-encryption-configuration-instructions"""if not token:# 未配置 verification_token,跳过验证(仅开发时使用)logger.warning("[WARN] 未配置 verification_token,跳过签名验证")return Truecontent = f"{timestamp}{nonce}{token}{body}"expected = hashlib.sha256(content.encode("utf-8")).hexdigest()return True # 实际使用时对比 X-Lark-Signature headerdef decrypt_feishu_message(encrypt_key: str, ciphertext: str) -> dict:"""AES-CBC 解密飞书加密消息(配置了 Encrypt Key 才需要)。飞书加密算法:key = SHA256(encrypt_key)[:32]iv = ciphertext[:16](Base64 解码后)decrypted = AES-CBC(key, iv, ciphertext[16:])"""try:from Crypto.Cipher import AESexcept ImportError:logger.error("[ERROR] 需要安装 pycryptodome:pip install pycryptodome")raise# 1. Key:SHA256 后取前 32 字节key = hashlib.sha256(encrypt_key.encode("utf-8")).digest()# 2. 解码 Base64 密文decoded = base64.b64decode(ciphertext)# 3. 前 16 字节为 IViv = decoded[:16]ciphertext_bytes = decoded[16:]# 4. AES-CBC 解密cipher = AES.new(key, AES.MODE_CBC, iv)plaintext = cipher.decrypt(ciphertext_bytes)# 5. 去除 PKCS7 paddingpad_len = plaintext[-1]plaintext = plaintext[:-pad_len]return json.loads(plaintext.decode("utf-8"))# --------------------------------------------------------------------------# 4. 消息解析层# --------------------------------------------------------------------------@dataclassclass FeishuMessage:"""标准化飞书消息结构"""msg_id: str # 消息 ID(用于去重)sender_open_id: str # 发送者 open_idsender_name: str # 发送者名称(可能为空)content: str # 消息内容(纯文本)msg_type: str # 消息类型(text / image / file 等)chat_id: str # 会话 ID(单聊 / 群聊)chat_type: str # 会话类型(p2p 单聊 / group 群聊)mention_bot: bool = False # 是否 @ 了机器人def parse_text_message(event: dict) -> Optional[FeishuMessage]:"""从飞书事件 JSON 中提取消息内容。飞书事件结构(im.message.receive_v1):{"schema": "2.0","header": {"event_type": "im.message.receive_v1", ...},"event": {"message": {"message_id": "om_xxx","msg_type": "text","content": '{"text":"你好"}',"chat_id": "oc_xxx","chat_type": "p2p","mentions": [...]},"sender": {"sender_id": {"open_id": "ou_xxx"}, ...}}}"""try:event_body = event.get("event", {})message = event_body.get("message", {})sender = event_body.get("sender", {})# 解析消息内容(飞书的 content 是 JSON 字符串)content_str = message.get("content", "{}")content_obj = json.loads(content_str)text = content_obj.get("text", "")# 去掉 @机器人 的前缀(如 "@BotName 你好" → "你好")if "@_user_1" in text: # 飞书群聊中 @ 表示text = text.replace("@_user_1", "").strip()sender_id_obj = sender.get("sender_id", {})open_id = sender_id_obj.get("open_id", "")# 判断是否 @ 了机器人(群聊场景)mentions = message.get("mentions", [])mention_bot = any(m.get("id", {}).get("open_id") == "" and m.get("key") == "@_user_1"for m in mentions)return FeishuMessage(msg_id=message.get("message_id", ""),sender_open_id=open_id,sender_name=sender.get("sender_type", "user"),content=text.strip(),msg_type=message.get("msg_type", "text"),chat_id=message.get("chat_id", ""),chat_type=message.get("chat_type", "p2p"),mention_bot=mention_bot,)except Exception as e:logger.error("[PARSE] 消息解析失败: %s", e)return Nonedef is_group_chat(msg: FeishuMessage) -> bool:"""判断是否为群聊"""return msg.chat_type == "group"# --------------------------------------------------------------------------# 5. 消息发送层# --------------------------------------------------------------------------async def send_text_message(receive_id: str, text: str, receive_id_type: str = "open_id") -> bool:"""发送纯文本消息到飞书。Args:receive_id: 接收者 ID(open_id 或 chat_id)text: 消息内容receive_id_type: ID 类型(open_id / chat_id / user_id)Returns:True 发送成功,False 失败"""if CONFIG.is_sandbox():logger.info("[DEMO] 模拟发送消息到 %s: %s", receive_id, text[:50])return Truetoken = await get_access_token()url = f"{CONFIG.base_url}/im/v1/messages?receive_id_type={receive_id_type}"payload = {"receive_id": receive_id,"msg_type": "text","content": json.dumps({"text": text}, ensure_ascii=False),}async with httpx.AsyncClient(timeout=15) as client:resp = await client.post(url,json=payload,headers={"Authorization": f"Bearer {token}"},)data = resp.json()if data.get("code") == 0:logger.info("[SEND] 消息发送成功 -> %s", receive_id)return Trueelse:logger.error("[SEND] 消息发送失败: %s", data)return Falseasync def send_card_message(receive_id: str, card_content: dict, receive_id_type: str = "open_id") -> bool:"""发送消息卡片(支持按钮、下拉框等富交互)。card_content 示例(飞书卡片格式):{"config": {"wide_screen_mode": true},"elements": [{"tag": "div", "text": {"content": "Hello World", "tag": "plain_text"}},{"tag": "action","actions": [{"tag": "button","text": {"content": "点击确认", "tag": "plain_text"},"type": "primary","value": {"action": "confirm"}}]}]}"""if CONFIG.is_sandbox():logger.info("[DEMO] 模拟发送卡片到 %s", receive_id)return Truetoken = await get_access_token()url = f"{CONFIG.base_url}/im/v1/messages?receive_id_type={receive_id_type}"payload = {"receive_id": receive_id,"msg_type": "interactive","content": json.dumps(card_content, ensure_ascii=False),}async with httpx.AsyncClient(timeout=15) as client:resp = await client.post(url,json=payload,headers={"Authorization": f"Bearer {token}"},)data = resp.json()if data.get("code") == 0:logger.info("[SEND] 卡片发送成功 -> %s", receive_id)return Trueelse:logger.error("[SEND] 卡片发送失败: %s", data)return Falsedef build_reply_card(title: str, content: str, show_button: bool = False) -> dict:"""构建标准回复卡片模板"""elements = [{"tag": "div","text": {"content": content,"tag": "plain_text"}}]if show_button:elements.append({"tag": "action","actions": [{"tag": "button","text": {"content": "继续提问", "tag": "plain_text"},"type": "primary","value": {"action": "continue"}},{"tag": "button","text": {"content": "清除历史", "tag": "plain_text"},"type": "danger","value": {"action": "clear_history"}}]})return {"config": {"wide_screen_mode": True},"header": {"title": {"content": title, "tag": "plain_text"},"template": "blue"},"elements": elements}# --------------------------------------------------------------------------# 6. 消息处理器(简单 echo,T12-03 会替换为 Agent)# --------------------------------------------------------------------------async def handle_message(msg: FeishuMessage) -> str:"""T12-01 简单处理:echo 消息 + 展示解析结果T12-03 会将这里替换为调用 LangGraph Agent API"""echo = (f"[T12-01 Echo 模式]\n"f"收到你的消息:{msg.content}\n"f"会话类型:{msg.chat_type}\n"f"消息 ID:{msg.msg_id}")return echo# --------------------------------------------------------------------------# 7. FastAPI Webhook Server# --------------------------------------------------------------------------app = FastAPI(title="飞书 Bot Webhook Server", version="1.0.0")# 消息去重集合(生产环境换 Redis Set + TTL)_processed_msg_ids: set = set()@app.get("/health")async def health_check():"""健康检查"""return {"status": "ok","sandbox_mode": CONFIG.is_sandbox(),"app_id": CONFIG.app_id or "not_configured",}@app.post("/webhook/event")async def webhook_event(request: Request):"""飞书事件订阅主接收端点。处理流程:1. 验证签名(X-Lark-Signature header)2. 处理 challenge(飞书验证服务器可用性)3. 路由到对应事件处理器4. 先返回 200,异步处理消息(避免超时)"""body_bytes = await request.body()body_str = body_bytes.decode("utf-8")# --- 签名验证 ---timestamp = request.headers.get("X-Lark-Request-Timestamp", "")nonce = request.headers.get("X-Lark-Request-Nonce", "")signature = request.headers.get("X-Lark-Signature", "")# 注意:实际生产中要验证 signature,这里简化(仅打印)logger.debug("[WEBHOOK] timestamp=%s nonce=%s signature=%s", timestamp, nonce, signature[:20] if signature else "")# --- 解析 JSON ---try:# 检查是否加密(飞书配置了 Encrypt Key 时)data = json.loads(body_str)if "encrypt" in data:if not CONFIG.encrypt_key:raise HTTPException(status_code=400, detail="收到加密消息但未配置 FEISHU_ENCRYPT_KEY")data = decrypt_feishu_message(CONFIG.encrypt_key, data["encrypt"])except json.JSONDecodeError:raise HTTPException(status_code=400, detail="非法 JSON")# --- Challenge 验证(飞书配置事件订阅时会发送一次)---if "challenge" in data:logger.info("[WEBHOOK] 收到 challenge 验证请求,回应中...")return JSONResponse({"challenge": data["challenge"]})# --- 路由事件类型 ---header = data.get("header", {})event_type = header.get("event_type", "")logger.info("[WEBHOOK] 收到事件:%s", event_type)if event_type == "im.message.receive_v1":# 解析消息msg = parse_text_message(data)if not msg:return JSONResponse({"code": 0})# 消息去重(防止飞书重试)if msg.msg_id in _processed_msg_ids:logger.info("[WEBHOOK] 重复消息,跳过: %s", msg.msg_id)return JSONResponse({"code": 0})_processed_msg_ids.add(msg.msg_id)# 集合太大时清理(生产用 Redis TTL 自动过期)if len(_processed_msg_ids) > 1000:_processed_msg_ids.clear()# 群聊中只处理 @ 了机器人的消息if is_group_chat(msg) and not msg.mention_bot:logger.info("[WEBHOOK] 群聊消息未 @ 机器人,忽略")return JSONResponse({"code": 0})logger.info("[MSG] 来自 %s: %s", msg.sender_open_id, msg.content[:50])# 异步处理:先返回 200,后台处理消息# 关键!飞书要求 3 秒内返回 200,否则会重试asyncio.create_task(process_and_reply(msg))return JSONResponse({"code": 0})async def process_and_reply(msg: FeishuMessage):"""后台处理消息并回复(异步,不阻塞 Webhook 响应)"""try:reply = await handle_message(msg)await send_text_message(msg.sender_open_id, reply)except Exception as e:logger.error("[PROCESS] 处理消息失败: %s", e)await send_text_message(msg.sender_open_id,f"[ERROR] 处理消息时出错,请稍后重试。错误:{type(e).__name__}")@app.post("/webhook/card")async def webhook_card(request: Request):"""消息卡片交互事件(用户点击卡片按钮时触发)"""body = await request.json()action = body.get("action", {})value = action.get("value", {})user_id = body.get("open_id", "")action_type = value.get("action", "")logger.info("[CARD] 用户 %s 点击按钮:%s", user_id, action_type)if action_type == "clear_history":# T12-03 会在这里调用 Agent API 清除会话await send_text_message(user_id, "[系统] 历史记录已清除,可以重新开始对话。")return JSONResponse({"code": 0})# --------------------------------------------------------------------------# 8. 演示主程序# --------------------------------------------------------------------------def demo_parse_message():"""演示:解析一条飞书消息事件"""print("\n" + "=" * 60)print("T12-01 演示:飞书消息解析")print("=" * 60)# 模拟飞书事件 JSONmock_event = {"schema": "2.0","header": {"event_id": "abc123","event_type": "im.message.receive_v1","create_time": "1714500000000","app_id": "cli_test",},"event": {"message": {"message_id": "om_test_001","msg_type": "text","content": '{"text": "你好,帮我写一段 Python 代码"}',"chat_id": "oc_test_001","chat_type": "p2p","mentions": []},"sender": {"sender_id": {"open_id": "ou_test_user_001"},"sender_type": "user"}}}msg = parse_text_message(mock_event)if msg:print(f" [PASS] 消息解析成功")print(f" msg_id : {msg.msg_id}")print(f" sender : {msg.sender_open_id}")print(f" content : {msg.content}")print(f" chat_type : {msg.chat_type}")print(f" mention_bot : {msg.mention_bot}")else:print(" [FAIL] 消息解析失败")# 测试卡片构建card = build_reply_card("AI 助手回复", "这是一条测试回复内容。", show_button=True)print(f"\n [PASS] 消息卡片构建成功,包含 {len(card['elements'])} 个元素")print("\n 配置状态:")print(f" app_id 已配置 : {'[YES]'if CONFIG.app_id else'[NO](演示模式)'}")print(f" Webhook URL : http://localhost:8000/webhook/event")print(f" 健康检查 URL : http://localhost:8000/health")if __name__ == "__main__":# 演示:解析消息demo_parse_message()# 启动 Webhook Serverprint("\n" + "=" * 60)print("启动飞书 Webhook Server...")print("端口:8000")print("事件接收:POST /webhook/event")print("卡片交互:POST /webhook/card")print("健康检查:GET /health")print("=" * 60)print("\n[提示] 本地开发用 ngrok 映射到公网:")print(" ngrok http 8000")print(" 将 https://xxx.ngrok.io/webhook/event 填入飞书事件订阅配置\n")uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
