天上星河转,人间帘幕垂
在 AI Agent 框架百花齐放的今天,如何用最精简的代码实现一个功能完备的个人 AI 助手?本文将带你深入剖析 nanobot 的后端架构设计,看它如何用仅约 4000 行 Python 代码实现了完整的 Agent 能力。
一、项目概览
nanobot 是一个超轻量级的个人 AI 助手框架,灵感来源于 Clawdbot,但代码量仅为后者的 1%。尽管如此精简,nanobot 依然具备以下核心能力:
多通道支持:Telegram、WhatsApp 等即时通讯平台
工具调用:文件操作、Shell 命令、Web 搜索、消息发送等
让我们从架构图开始,逐层深入分析。
二、整体架构设计
┌─────────────────────────────────────────────────────────────────────────┐│ nanobot 架构 │├─────────────────────────────────────────────────────────────────────────┤│ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Telegram │ │ WhatsApp │ │ CLI │ ││ │ Channel │ │ Channel │ │ │ ││ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ ││ │ │ │ ││ └──────────────────┼──────────────────┘ ││ ▼ ││ ┌─────────────────────────────────────────────────────────────────┐ ││ │ Message Bus │ ││ │ (InboundQueue / OutboundQueue) │ ││ └─────────────────────────────────────────────────────────────────┘ ││ │ ││ ▼ ││ ┌─────────────────────────────────────────────────────────────────┐ ││ │ Agent Loop │ ││ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ ││ │ │ Context │ │ Tool │ │ Session │ │ ││ │ │ Builder │ │ Registry │ │ Manager │ │ ││ │ └──────────────┘ └──────────────┘ └──────────────┘ │ ││ └─────────────────────────────────────────────────────────────────┘ ││ │ │ │ ││ ▼ ▼ ▼ ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Memory │ │ Skills │ │ LLM │ ││ │ Store │ │ Loader │ │ Provider │ ││ └──────────────┘ └──────────────┘ └──────────────┘ ││ ││ ┌──────────────────────────────────────────────────────────────────┐ ││ │ Background Services │ ││ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ ││ │ │ Cron │ │ Heartbeat │ │ Subagent │ │ ││ │ │ Service │ │ Service │ │ Manager │ │ ││ │ └──────────────┘ └──────────────┘ └──────────────┘ │ ││ └──────────────────────────────────────────────────────────────────┘ ││ │└─────────────────────────────────────────────────────────────────────────┘
nanobot 采用了经典的事件驱动架构,核心组件包括:
Channels(通道层):负责与外部通讯平台的对接
Message Bus(消息总线):解耦通道与核心处理逻辑
Agent Loop(代理循环):核心处理引擎,实现 LLM ↔ 工具的交互循环
Background Services(后台服务):定时任务、心跳检测、子代理管理
三、核心模块深度解析
3.1 消息总线(Message Bus)
消息总线是 nanobot 架构的"中枢神经",它实现了通道与 Agent 核心的完全解耦。
classMessageBus:"""异步消息总线,解耦聊天通道与 Agent 核心"""def__init__(self): self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue() self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue() self._outbound_subscribers: dict[str, list[Callable]] = {}
设计亮点:
双队列设计:inbound 队列接收来自通道的消息,outbound 队列发送响应到通道
完全异步:基于 asyncio.Queue 实现非阻塞操作
消息数据结构的定义同样简洁:
@dataclassclassInboundMessage: channel: str # telegram, whatsapp, cli sender_id: str # 用户标识 chat_id: str # 聊天/频道标识 content: str # 消息内容 timestamp: datetime media: list[str] # 媒体文件路径 metadata: dict # 通道特定数据
3.2 代理循环(Agent Loop)
Agent Loop 是整个系统的核心引擎,实现了标准的 ReAct(Reasoning + Acting)模式:
classAgentLoop:""" 代理循环是核心处理引擎。 工作流程: 1. 从消息总线接收消息 2. 构建上下文(历史、记忆、技能) 3. 调用 LLM 4. 执行工具调用 5. 发送响应 """
核心处理流程:
asyncdef_process_message(self, msg: InboundMessage):# 1. 获取或创建会话 session = self.sessions.get_or_create(msg.session_key)# 2. 构建消息上下文 messages = self.context.build_messages( history=session.get_history(), current_message=msg.content, media=msg.media )# 3. Agent 循环(最多 20 次迭代) iteration = 0while iteration < self.max_iterations: iteration += 1# 调用 LLM response = await self.provider.chat( messages=messages, tools=self.tools.get_definitions(), model=self.model )# 处理工具调用if response.has_tool_calls:# 执行工具并添加结果到上下文for tool_call in response.tool_calls: result = await self.tools.execute( tool_call.name, tool_call.arguments ) messages = self.context.add_tool_result( messages, tool_call.id, tool_call.name, result )else:# 无工具调用,返回最终响应break
设计亮点:
迭代次数限制:防止无限循环,默认最多 20 次迭代
3.3 工具系统(Tool System)
nanobot 采用了高度可扩展的工具系统设计,所有工具都继承自统一的基类:
classTool(ABC):"""Agent 工具的抽象基类""" @property @abstractmethoddefname(self) -> str:"""工具名称"""pass @property @abstractmethoddefdescription(self) -> str:"""工具描述"""pass @property @abstractmethoddefparameters(self) -> dict[str, Any]:"""JSON Schema 格式的参数定义"""pass @abstractmethodasyncdefexecute(self, **kwargs) -> str:"""执行工具"""pass
内置工具一览:
| | |
|---|
read_file | | |
write_file | | |
edit_file | | |
list_dir | | |
exec | | |
web_search | | |
web_fetch | | |
message | | |
spawn | | |
工具注册机制:
classToolRegistry:"""工具注册表,支持动态注册和执行"""defregister(self, tool: Tool) -> None: self._tools[tool.name] = tooldefget_definitions(self) -> list[dict]:"""获取所有工具定义(OpenAI 函数调用格式)"""return [tool.to_schema() for tool in self._tools.values()]asyncdefexecute(self, name: str, params: dict) -> str: tool = self._tools.get(name)ifnot tool:returnf"Error: Tool '{name}' not found"# 参数验证 errors = tool.validate_params(params)if errors:returnf"Error: Invalid parameters: {errors}"returnawait tool.execute(**params)
安全机制(以 ExecTool 为例):
classExecTool(Tool):def__init__(self, ...):# 危险命令模式黑名单 self.deny_patterns = [r"\brm\s+-[rf]{1,2}\b", # rm -rfr"\b(format|mkfs|diskpart)\b", # 磁盘操作r"\b(shutdown|reboot|poweroff)\b", # 系统电源r":\(\)\s*\{.*\};\s*:", # Fork 炸弹 ]def_guard_command(self, command: str, cwd: str) -> str | None:"""命令安全检查"""for pattern in self.deny_patterns:if re.search(pattern, command.lower()):return"Error: Command blocked by safety guard"returnNone
3.4 上下文构建器(Context Builder)
Context Builder 负责组装发送给 LLM 的完整提示词,包括系统提示、记忆、技能和对话历史:
classContextBuilder: BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"]defbuild_system_prompt(self, skill_names: list[str] | None = None) -> str: parts = [ ]# 1. 核心身份 parts.append(self._get_identity())# 2. Bootstrap 文件(用户自定义指令) parts.append(self._load_bootstrap_files())# 3. 记忆上下文 memory = self.memory.get_memory_context()if memory: parts.append(f"# Memory\n\n{memory}")# 4. 技能(渐进式加载)# - 始终加载的技能:完整内容# - 可用技能:仅摘要(Agent 按需读取)return"\n\n---\n\n".join(parts)
渐进式技能加载(Progressive Skill Loading):
这是一个非常聪明的设计——并非一次性加载所有技能内容,而是:
始终加载(always=true):直接包含在系统提示中
按需加载:仅提供摘要,Agent 使用 read_file 工具在需要时读取
defbuild_skills_summary(self) -> str:"""构建技能摘要(名称、描述、路径、可用性)""" lines = ["<skills>"]for skill in all_skills: lines.append(f" <skill available=\"{available}\">") lines.append(f" <name>{skill['name']}</name>") lines.append(f" <description>{description}</description>") lines.append(f" <location>{skill['path']}</location>") lines.append(f" </skill>") lines.append("</skills>")return"\n".join(lines)
3.5 会话管理(Session Manager)
会话管理采用 JSONL 格式持久化对话历史:
classSessionManager:"""管理对话会话,存储为 JSONL 文件"""def__init__(self, workspace: Path): self.sessions_dir = Path.home() / ".nanobot" / "sessions" self._cache: dict[str, Session] = {} # 内存缓存defget_or_create(self, key: str) -> Session:# 先查缓存if key in self._cache:return self._cache[key]# 尝试从磁盘加载 session = self._load(key)if session isNone: session = Session(key=key) self._cache[key] = sessionreturn session
会话文件格式(JSONL):
{"_type": "metadata", "created_at": "2024-01-01T00:00:00", ...}{"role": "user", "content": "Hello!", "timestamp": "..."}{"role": "assistant", "content": "Hi there!", "timestamp": "..."}
3.6 记忆系统(Memory Store)
nanobot 实现了简单但有效的双层记忆系统:
classMemoryStore:""" Agent 记忆系统 - 每日笔记:memory/YYYY-MM-DD.md - 长期记忆:memory/MEMORY.md """defget_memory_context(self) -> str: parts = [ ]# 长期记忆 long_term = self.read_long_term()if long_term: parts.append("## Long-term Memory\n" + long_term)# 今日笔记 today = self.read_today()if today: parts.append("## Today's Notes\n" + today)return"\n\n".join(parts)defget_recent_memories(self, days: int = 7) -> str:"""获取最近 N 天的记忆"""# 遍历日期文件,组合内容
3.7 LLM Provider 抽象
nanobot 使用 LiteLLM 作为统一的 LLM 接入层,支持多种模型提供商:
classLiteLLMProvider(LLMProvider):""" 基于 LiteLLM 的多提供商支持 支持:OpenRouter, Anthropic, OpenAI, Gemini, Groq, vLLM 等 """def__init__(self, api_key, api_base, default_model):# 智能检测提供商类型 self.is_openrouter = api_key.startswith("sk-or-") self.is_vllm = bool(api_base) andnot self.is_openrouter# 根据提供商设置环境变量if self.is_openrouter: os.environ["OPENROUTER_API_KEY"] = api_keyelif"anthropic"in default_model: os.environ["ANTHROPIC_API_KEY"] = api_key# ...asyncdefchat(self, messages, tools, model, ...):# 模型名称前缀处理if self.is_openrouter: model = f"openrouter/{model}"elif self.is_vllm: model = f"hosted_vllm/{model}" response = await acompletion( model=model, messages=messages, tools=tools, tool_choice="auto"if tools elseNone, )return self._parse_response(response)
响应解析:
def_parse_response(self, response) -> LLMResponse: choice = response.choices[0] message = choice.message tool_calls = [ ]if message.tool_calls:for tc in message.tool_calls: tool_calls.append(ToolCallRequest( id=tc.id, name=tc.function.name, arguments=json.loads(tc.function.arguments), ))return LLMResponse( content=message.content, tool_calls=tool_calls, finish_reason=choice.finish_reason, usage={...} )
四、通道层实现
4.1 通道基类
所有通道都继承自统一的基类,确保接口一致:
classBaseChannel(ABC):"""聊天通道抽象基类""" name: str = "base" @abstractmethodasyncdefstart(self) -> None:"""启动通道,开始监听消息"""pass @abstractmethodasyncdefstop(self) -> None:"""停止通道"""pass @abstractmethodasyncdefsend(self, msg: OutboundMessage) -> None:"""发送消息"""passdefis_allowed(self, sender_id: str) -> bool:"""白名单检查""" allow_list = getattr(self.config, "allow_from", [ ])ifnot allow_list:returnTrue# 未配置则允许所有人return sender_id in allow_list
4.2 Telegram 通道
Telegram 通道使用 python-telegram-bot 库,采用长轮询模式:
classTelegramChannel(BaseChannel): name = "telegram"asyncdefstart(self) -> None:# 构建应用 self._app = Application.builder().token(self.config.token).build()# 添加消息处理器 self._app.add_handler(MessageHandler( filters.TEXT | filters.PHOTO | filters.VOICE, self._on_message ))# 启动轮询await self._app.initialize()await self._app.start()await self._app.updater.start_polling( drop_pending_updates=True# 启动时忽略旧消息 )asyncdefsend(self, msg: OutboundMessage) -> None:# Markdown 转 Telegram HTML html_content = _markdown_to_telegram_html(msg.content)await self._app.bot.send_message( chat_id=int(msg.chat_id), text=html_content, parse_mode="HTML" )
语音转写支持:
asyncdef_on_message(self, update, context):if message.voice:# 下载语音文件 file = await self._app.bot.get_file(message.voice.file_id)await file.download_to_drive(file_path)# 使用 Groq Whisper 转写 transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key) transcription = await transcriber.transcribe(file_path) content_parts.append(f"[transcription: {transcription}]")
4.3 WhatsApp 通道
WhatsApp 通道采用 Node.js Bridge 架构,Python 与 Node.js 之间通过 WebSocket 通信:
classWhatsAppChannel(BaseChannel): name = "whatsapp"asyncdefstart(self) -> None:import websocketswhile self._running:try:asyncwith websockets.connect(self.config.bridge_url) as ws: self._ws = ws self._connected = Trueasyncfor message in ws:await self._handle_bridge_message(message)except Exception as e:# 自动重连await asyncio.sleep(5)asyncdef_handle_bridge_message(self, raw: str): data = json.loads(raw)if data.get("type") == "message":await self._handle_message( sender_id=data.get("sender"), chat_id=data.get("sender"), # 用于回复 content=data.get("content"), )
Bridge 架构优势:
WhatsApp Web 协议需要 Node.js 生态(@whiskeysockets/baileys)
Python 专注业务逻辑,Node.js 处理协议层
五、后台服务
5.1 子代理系统(Subagent Manager)
子代理允许主代理将复杂任务委托给后台执行:
classSubagentManager:"""管理后台子代理执行"""asyncdefspawn(self, task: str, label: str, origin_channel: str, origin_chat_id: str): task_id = str(uuid.uuid4())[:8]# 创建后台任务 bg_task = asyncio.create_task( self._run_subagent(task_id, task, label, origin) ) self._running_tasks[task_id] = bg_taskreturnf"Subagent [{label}] started (id: {task_id})"asyncdef_run_subagent(self, task_id, task, label, origin):# 子代理拥有独立的工具集(无 message、无 spawn) tools = ToolRegistry() tools.register(ReadFileTool()) tools.register(WriteFileTool()) tools.register(ExecTool(...)) tools.register(WebSearchTool(...))# 专用系统提示 system_prompt = self._build_subagent_prompt(task)# 执行 Agent 循环# ...# 完成后通过消息总线通知主代理await self._announce_result(task_id, label, task, result, origin, "ok")
子代理特点:
受限工具集:无法发送消息或创建新子代理(防止递归)
5.2 定时任务服务(Cron Service)
Cron 服务支持多种调度方式:
classCronService:"""计划任务服务"""defadd_job(self, name, schedule, message, deliver=False, to=None, channel=None): job = CronJob( id=str(uuid.uuid4())[:8], name=name, schedule=schedule, # CronSchedule 对象 payload=CronPayload(message=message, deliver=deliver, ...), state=CronJobState(next_run_at_ms=self._compute_next_run(schedule)), ) self._store.jobs.append(job) self._arm_timer() # 设置下一次唤醒return job
支持的调度类型:
@dataclassclassCronSchedule: kind: str # "at", "every", "cron" at_ms: int | None# 一次性执行时间点 every_ms: int | None# 间隔执行(毫秒) expr: str | None# Cron 表达式(如 "0 9 * * *")
5.3 心跳服务(Heartbeat Service)
心跳服务定期唤醒 Agent 检查待办事项:
classHeartbeatService:"""定期心跳服务 - 唤醒 Agent 检查任务""" HEARTBEAT_PROMPT = """Read HEARTBEAT.md in your workspace (if it exists).Follow any instructions or tasks listed there.If nothing needs attention, reply with just: HEARTBEAT_OK"""asyncdef_tick(self) -> None: content = self._read_heartbeat_file()# 如果 HEARTBEAT.md 为空则跳过if _is_heartbeat_empty(content):returnif self.on_heartbeat: response = await self.on_heartbeat(HEARTBEAT_PROMPT)if"HEARTBEAT_OK"in response: logger.info("Heartbeat: OK (no action needed)")else: logger.info("Heartbeat: completed task")
HEARTBEAT.md 格式示例:
- [ ] Check calendar and remind of upcoming events- [ ] Scan inbox for urgent emails- [ ] Check weather forecast for today
六、配置系统
nanobot 使用 Pydantic 实现类型安全的配置管理:
classConfig(BaseSettings):"""nanobot 根配置""" agents: AgentsConfig channels: ChannelsConfig providers: ProvidersConfig gateway: GatewayConfig tools: ToolsConfigdefget_api_key(self) -> str | None:"""按优先级获取 API Key"""return ( self.providers.openrouter.api_key or self.providers.anthropic.api_key or self.providers.openai.api_key or self.providers.gemini.api_key orNone )classConfig: env_prefix = "NANOBOT_"# 支持环境变量覆盖
配置文件格式 (~/.nanobot/config.json):
{"providers": {"openrouter": {"apiKey": "sk-or-v1-xxx" } },"agents": {"defaults": {"model": "anthropic/claude-opus-4-5" } },"channels": {"telegram": {"enabled": true,"token": "YOUR_BOT_TOKEN","allowFrom": ["123456789"] } }}
七、CLI 设计
nanobot 使用 Typer 构建命令行界面:
app = typer.Typer(name="nanobot", help="🐈 nanobot - Personal AI Assistant")@app.command()defgateway(port: int = 18790, verbose: bool = False):"""启动 nanobot 网关""" config = load_config()# 创建组件 bus = MessageBus() provider = LiteLLMProvider(...) agent = AgentLoop(bus=bus, provider=provider, ...) cron = CronService(...) heartbeat = HeartbeatService(...) channels = ChannelManager(config, bus)asyncdefrun():await asyncio.gather( cron.start(), heartbeat.start(), agent.run(), channels.start_all(), ) asyncio.run(run())@app.command()defagent(message: str = None, session_id: str = "cli:default"):"""与 Agent 交互"""if message:# 单消息模式 response = await agent_loop.process_direct(message, session_id) console.print(response)else:# 交互模式whileTrue: user_input = console.input("[bold blue]You:[/bold blue] ") response = await agent_loop.process_direct(user_input, session_id) console.print(response)
命令概览:
| |
|---|
nanobot onboard | |
nanobot agent -m "..." | |
nanobot agent | |
nanobot gateway | |
nanobot status | |
nanobot cron add/list/remove | |
nanobot channels login | |
八、技术亮点总结
8.1 架构优势
极简代码量:~4000 行代码实现完整 Agent 功能
8.2 关键设计模式
8.3 可扩展性设计
新增通道:继承 BaseChannel,实现 start/stop/send
新增工具:继承 Tool,实现 name/description/parameters/execute
新增技能:在 skills/ 目录创建 SKILL.md
新增 LLM:LiteLLM 已支持 100+ 模型,或自定义 Provider
九、本地部署实践
# 1. 安装pip install nanobot-ai# 2. 初始化nanobot onboard# 3. 配置 API Keyvim ~/.nanobot/config.json# 4. 测试nanobot agent -m "What is 2+2?"# 5. 启动完整服务nanobot gateway
十、总结
nanobot 用极简的代码展示了如何构建一个功能完备的 AI Agent 框架。其核心设计理念:
对于想要学习 AI Agent 架构的开发者,nanobot 是一个绝佳的参考项目。代码清晰易懂,架构设计优雅,非常适合作为学习材料或二次开发的基础。
项目地址:https://github.com/HKUDS/nanobot
技术栈:Python 3.11+ / asyncio / LiteLLM / Pydantic / Typer
如果你觉得这篇文章对你有帮助,欢迎点赞、在看、转发三连!