大家好,我是小皮。
Python 企业级智能体框架,提供完整的技术架构方案,支持 AOP、IOC、沙箱安全、弹性模式、可观测性等企业级特性。
项目地址:https://github.com/LuckyStar2456/W-Agent-FrameWork
📚 目录
✨ 功能特性
| |
|---|
| AOP 面向切面编程 | 支持 AspectJ 切点表达式(execution、within、@annotation、bean、args),提供重试、断路器等切面实现 |
| IOC 依赖注入 | 三级缓存、构造器/字段/setter 注入、@Autowired/@Qualifier 注解,支持组件扫描和自动装配 |
| 沙箱安全 | Wasm/nsjail 沙箱、seccomp 过滤、资源限制,确保技能执行安全 |
| 弹性模式 | 重试(带退避)、断路器(CLOSED/OPEN/HALF_OPEN 状态)、超时控制,提高系统稳定性 |
| 可观测性 | OpenTelemetry 集成、链路追踪、指标监控、健康检查,提供完整的可观测性解决方案 |
| RAG 集成 | 向量检索(Redis/内存)、相似度搜索,支持文档存储和检索增强生成 |
| 配置热更新 | 动态配置管理、配置变更事件,支持运行时配置更新和属性绑定 |
| 事件总线 | |
| 生命周期管理 | 组件生命周期管理、优雅启动和关闭,支持依赖顺序管理 |
| 分布式锁 | |
| 技能系统 | |
🚀 快速开始
安装
# 从 PyPI 安装pip install wagent-framework# 安装可选依赖pip install wagent-framework[fastapi,langchain,redis,opentelemetry]# 从源码安装pip install -e .
创建你的第一个 Agent
import asynciofrom w_agent import BaseAgent, BeanFactory, AgentComponent@AgentComponent(name="hello_agent")classHelloAgent(BaseAgent):asyncdefarun(self, prompt: str) -> str:returnf"Hello, {prompt}!"asyncdefmain():# 创建 Bean 工厂 bean_factory = BeanFactory()# 注册 Agent agent = HelloAgent() bean_factory.register_bean("hello_agent", agent)# 使用 Agent agent = await bean_factory.get_bean("hello_agent") result = await agent.arun("World") print(result) # 输出: Hello, World!asyncio.run(main())
使用配置热更新
import asynciofrom w_agent import DynamicConfigManagerasyncdefmain(): config_manager = DynamicConfigManager()# 设置配置 config_manager.set("api_key", "your-api-key")# 绑定配置到属性classMyService:def__init__(self): self.api_key = None config_manager.bind("api_key", self, "api_key")asyncdefon_config_change(self, key, new_value, old_value): print(f"Config changed: {key} = {new_value}") service = MyService() print(service.api_key) # 输出: your-api-key# 动态更新配置await config_manager.update_batch({"api_key": "new-api-key"}) print(service.api_key) # 输出: new-api-keyasyncio.run(main())
使用 AOP 切面编程
import asynciofrom w_agent import Retry, CircuitBreaker# 使用重试装饰器@Retry(max_attempts=3, delay=0.1, backoff=2.0)asyncdefunreliable_operation(): print("执行不稳定操作...")# 模拟失败raise Exception("临时失败")# 使用断路器@CircuitBreaker(failure_threshold=5, recovery_timeout=30.0)asyncdefprotected_operation(): print("执行受保护操作...")# 模拟失败raise Exception("服务不可用")asyncdefmain():try:await unreliable_operation()except Exception as e: print(f"最终失败: {e}")asyncio.run(main())
使用事件总线
import asynciofrom w_agent import EventBus, Eventevent_bus = EventBus()@event_bus.on("user.created")asyncdefon_user_created(event): print(f"User created: {event.payload}")@event_bus.on("user.updated")asyncdefon_user_updated(event): print(f"User updated: {event.payload}")asyncdefmain():# 发布用户创建事件await event_bus.emit(Event("user.created", {"user_id": 123, "name": "John"}))# 发布用户更新事件await event_bus.emit(Event("user.updated", {"user_id": 123, "name": "John Doe"}))asyncio.run(main())
📁 项目结构
w_agent/├── aop/ # 面向切面编程│ ├── aspects.py # 切面实现(重试、断路器)│ ├── joinpoint.py # 连接点│ ├── pointcut.py # 切点表达式解析│ └── proxy_factory.py # 代理工厂├── config/ # 配置管理│ └── dynamic_config.py # 动态配置├── container/ # 依赖注入容器│ ├── bean_factory.py # Bean 工厂│ └── reflection_cache.py # 反射缓存├── core/ # 核心功能│ ├── agent.py # Agent 基类│ ├── decorators.py # 装饰器│ ├── doctor.py # 系统检查│ └── event_bus.py # 事件总线├── deployment/ # 部署相关│ └── fastapi_depends.py # FastAPI 依赖注入├── distributed/ # 分布式功能│ ├── lock.py # 分布式锁│ └── lock_pool.py # 锁池管理├── exceptions/ # 异常处理│ └── framework_errors.py # 框架错误定义├── lifecycle/ # 生命周期管理│ ├── graceful_shutdown.py # 优雅关闭│ ├── manager.py # 生命周期管理器│ └── order.py # 生命周期顺序├── observability/ # 可观测性│ ├── health.py # 健康检查│ ├── logging.py # 日志管理│ ├── metrics.py # 指标监控│ └── tracing.py # 链路追踪├── resilience/ # 弹性模式│ ├── bulkhead.py # 隔离舱模式│ └── timeout.py # 超时控制├── scanner/ # 组件扫描│ ├── cache.py # 扫描缓存│ └── parallel_scanner.py # 并行扫描├── security/ # 安全相关│ └── mcp_auth.py # MCP 认证├── skills/ # 技能系统│ ├── sandbox/ # 沙箱│ │ ├── nsjail_sandbox.py│ │ └── wasm_sandbox.py│ ├── signature.py # 技能签名│ └── skill.py # 技能基类├── testing/ # 测试工具│ └── mock_utils.py # 模拟工具├── tools/ # 工具类│ └── langchain_adapter.py # LangChain 适配器├── __init__.py # 包初始化├── __main__.py # 主入口└── cli.py # 命令行工具
🔧 核心模块
AOP 切面编程
AOP (Aspect-Oriented Programming) 允许你在不修改原有代码的情况下,为代码添加横切关注点。
from w_agent import AspectJPointcut, BeforeAdvice, AfterAdvice, AroundAdvice, ProxyFactory# 定义目标类classTarget:defdo_something(self, value):returnf"Result: {value}"# 创建通知execution_order = []asyncdefbefore_advice(joinpoint): execution_order.append("before")asyncdefafter_advice(joinpoint, result): execution_order.append("after")asyncdefaround_advice(joinpoint, proceed): execution_order.append("around_before") result = await proceed() execution_order.append("around_after")return result# 创建代理proxy_factory = ProxyFactory()target = Target()advices = [ BeforeAdvice(before_advice), AroundAdvice(around_advice), AfterAdvice(after_advice)]proxy = proxy_factory.create_proxy(target, {"do_something": advices})# 调用方法result = await proxy.do_something("test")print(f"Result: {result}")print(f"Execution order: {execution_order}")
依赖注入
IOC (Inversion of Control) 容器提供了依赖注入功能,简化了组件之间的依赖管理。
from w_agent import BeanFactory, ServiceComponent, Autowired, Qualifier@ServiceComponent(name="user_repository")classUserRepository:defget_user(self, user_id):return {"id": user_id, "name": "John"}@ServiceComponent(name="user_service")classUserService: @Autowired @Qualifier(name="user_repository")defset_repository(self, repository): self.repository = repositorydefget_user(self, user_id):return self.repository.get_user(user_id)asyncdefmain():# 创建 Bean 工厂 bean_factory = BeanFactory()# 注册组件 repository = UserRepository() bean_factory.register_bean("user_repository", repository) service = UserService() bean_factory.register_bean("user_service", service)# 自动注入依赖await bean_factory.autowire_all()# 使用服务 user = service.get_user(123) print(f"User: {user}")asyncio.run(main())
事件总线
事件总线提供了异步事件发布/订阅机制,支持事件驱动架构。
from w_agent import EventBus, Event, ConfigChangedEventevent_bus = EventBus()# 订阅配置变更事件@event_bus.on("config_changed")asyncdefon_config_changed(event): print(f"Config changed: {event.payload}")asyncdefmain():# 发布自定义事件await event_bus.emit(Event("custom_event", {"data": "test"}))# 发布配置变更事件await event_bus.emit(ConfigChangedEvent(key="api_key", old_value="old", new_value="new"))asyncio.run(main())
生命周期管理
生命周期管理器负责组件的初始化和销毁,支持依赖顺序管理。
from w_agent import LifecycleManager, LifecycleOrder, PostConstruct, PreDestroyclassDatabaseService: @PostConstruct(order=1)asyncdefinit_db(self): print("初始化数据库连接") @PreDestroy(order=1)asyncdefclose_db(self): print("关闭数据库连接")classUserService: @PostConstruct(order=2)asyncdefinit_service(self): print("初始化用户服务") @PreDestroy(order=2)asyncdefcleanup_service(self): print("清理用户服务")asyncdefmain():# 创建生命周期管理器 lifecycle_manager = LifecycleManager()# 注册组件 db_service = DatabaseService() lifecycle_manager.register(db_service, LifecycleOrder.INFRASTRUCTURE) user_service = UserService() lifecycle_manager.register(user_service, LifecycleOrder.SERVICE)# 执行初始化await lifecycle_manager.post_construct_all()# 执行销毁await lifecycle_manager.pre_destroy_all()asyncio.run(main())
配置管理
动态配置管理器支持配置热更新和属性绑定。
from w_agent import DynamicConfigManagerasyncdefmain(): config_manager = DynamicConfigManager()# 加载配置文件await config_manager.load_from_file("config.json")# 设置配置 config_manager.set("service.port", 8080) config_manager.set("service.host", "localhost")# 绑定配置到对象classServiceConfig:def__init__(self): self.port = None self.host = None config_manager.bind("service.port", self, "port") config_manager.bind("service.host", self, "host")asyncdefon_config_change(self, key, new_value, old_value): print(f"Config changed: {key} = {new_value}") service_config = ServiceConfig() print(f"Initial config: host={service_config.host}, port={service_config.port}")# 动态更新配置await config_manager.update_batch({"service.port": 8081, "service.host": "0.0.0.0"}) print(f"Updated config: host={service_config.host}, port={service_config.port}")asyncio.run(main())
沙箱安全
沙箱提供了安全的技能执行环境,支持 Wasm 和 nsjail 沙箱。
from w_agent import WasmSkillSandbox, NsJailSkillSandbox, Skillfrom pathlib import Path# 创建技能skill = Skill( name="test_skill", description="Test skill", scripts={"test": Path("test.py")})# 使用 Wasm 沙箱wasm_sandbox = WasmSkillSandbox()result = await wasm_sandbox.execute(skill, "test", {"name": "World"})print(f"Wasm sandbox result: {result}")# 使用 nsjail 沙箱nsjail_sandbox = NsJailSkillSandbox()result = await nsjail_sandbox.execute(skill, "test", {"name": "World"})print(f"NsJail sandbox result: {result}")
⚙️ 配置指南
可配置项一览
| | | |
|---|
logging.level | | | |
logging.format | | | |
container.scan_paths | | | |
container.skip_paths | | | |
sandbox.enabled | | | |
sandbox.type | | | |
resilience.retry.max_attempts | | | |
resilience.retry.delay | | | |
resilience.retry.backoff | | | |
resilience.circuit_breaker.failure_threshold | | | |
resilience.circuit_breaker.recovery_timeout | | | |
distributed.lock.redis.url | | | |
distributed.lock.redis.db | | | |
observability.tracing.enabled | | | |
observability.tracing.exporter | | | |
observability.metrics.enabled | | | |
配置文件
创建 config.json:
{"logging": {"level": "INFO","format": "json" },"container": {"scan_paths": ["src"],"skip_paths": ["__pycache__", ".git"] },"sandbox": {"enabled": true,"type": "wasm" },"resilience": {"retry": {"max_attempts": 3,"delay": 0.1,"backoff": 2.0 },"circuit_breaker": {"failure_threshold": 5,"recovery_timeout": 30.0 } }}
环境变量
环境变量优先级高于配置文件:
export W_AGENT_LOGGING_LEVEL=DEBUGexport W_AGENT_CONTAINER_SCAN_PATHS=src,componentsexport W_AGENT_SANDBOX_ENABLED=trueexport W_AGENT_RESILIENCE_RETRY_MAX_ATTEMPTS=5
📖 示例项目
基本 Agent 示例
查看 examples/basic_agent.py 获取基本 Agent 实现示例,展示了:
聊天 Agent 示例
查看 chat-agent 目录获取完整的聊天智能体示例项目,包含:
👨💻 开发者指南
核心概念
- Agent:智能体基类,所有智能体都继承自
BaseAgent - Component:组件,使用
@AgentComponent、@ServiceComponent 等注解标记 - Lifecycle:生命周期,管理组件的初始化和销毁
- Skill:技能,可被 Agent 调用的功能模块
开发流程
- 创建 Agent:继承
BaseAgent 并实现 arun 方法 - 创建服务:使用
@ServiceComponent 标记服务类 - 配置依赖:使用
@Autowired 和 @Qualifier 注入依赖 - 添加生命周期:使用
@PostConstruct 和 @PreDestroy 标记生命周期方法 - 添加 AOP 切面:使用
@Retry、@CircuitBreaker 等注解添加切面 - 注册 Bean:使用
BeanFactory 注册组件 - 启动应用:执行
post_construct_all 初始化组件
最佳实践
常见问题
依赖注入失败
配置不生效
沙箱执行失败
📄 文档
感谢大家阅读,个人观点仅供参考,欢迎在评论区发表不同观点。
欢迎关注、分享、点赞、收藏、在看,我是微信公众号「PHP驿站」作者小皮。