Spring Boot + Python 的 Agent 平台落地:从架构分层到可运行工程
专题:新一代 Agent 系统架构与工程实践|第 12 篇
前十一篇分别讨论了 Agent 架构从传统组件拼装向 Skills、Harness、Loop 三底座演进,以及 RAG 工具化、ReAct、MCP、Context Engineering、Memory、State、可靠性、安全、恢复、Eval 与可观测性。本文作为该专题的工程收官篇,将这些能力落到一套可实施的平台架构中:
- • Spring Boot 负责业务控制面、治理和持久化;
- • Python 负责模型接入、Agent 推理和执行运行时;
- • RocketMQ 负责异步任务调度与事件传递;
- • PostgreSQL 保存任务、流程、版本和治理元数据;
- • 对象存储保存文档、代码、模型输出和大体积 Artifact;
- • Tool Gateway、Context Builder、State Store、Eval 与 OpenTelemetry 共同组成生产级 Harness。
本文不会只给出概念架构,而会进一步给出模块边界、数据库模型、接口协议、消息设计、Spring Boot 与 FastAPI 代码、部署方案和演进路径。
引言:为什么 Agent 平台适合采用 Spring Boot + Python 双技术栈
企业 Agent 平台通常同时包含两类截然不同的工作。
第一类是传统企业系统擅长的控制与治理:
- • 模型、Prompt、Skill 和 Tool 注册;
第二类是 AI 运行时擅长的模型与算法执行:
如果全部使用 Java 实现,业务治理与工程稳定性较强,但部分模型 SDK、文档处理和 Agent 实验能力接入速度可能较慢。
如果全部使用 Python 实现,模型生态丰富、开发迭代快,但企业权限、复杂事务、长期任务状态、后台管理和大规模业务集成可能需要重新建设。
因此,一个更实用的分工是:
Spring Boot:
负责 Agent 平台“应该做什么、谁能做、做到哪一步”。
Python:
负责模型和 Agent“这一轮如何思考、生成和执行”。
这不是简单的“Java 调 Python”。
真正的平台边界应是:
控制面与执行面分离
+ 同步 API 与异步任务分离
+ 业务状态与模型上下文分离
+ 工具治理与工具执行分离
+ 元数据与大体积 Artifact 分离
一、平台建设目标
一套生产级 Agent 平台至少需要满足以下目标。
1.1 支持多种任务形态
1.2 支持多模型和多供应商
平台不应将业务流程写死到某个具体模型。
需要支持:
Model Provider
→ Model Config
→ Model Capability
→ Routing Policy
→ Runtime Adapter
模型能力至少包括:
1.3 支持能力治理
1.4 支持可靠长任务
1.5 支持可观测和可评测
二、技术选型边界
2.1 Spring Boot 负责什么
推荐由 Spring Boot 承担:
Identity & Tenant
Task API
Task State Machine
Workflow / Approval
Model Registry
Prompt Registry
Skill Registry
Tool Registry
Policy Engine
Tool Gateway
Context Metadata
Memory Governance
Eval Control Plane
Cost & Quota
Audit
Outbox
Admin API
这些模块具有共同特征:
2.2 Python 负责什么
推荐由 Python Worker 承担:
Model Adapter
Agent Runtime
ReAct / Planner
RAG Pipeline
Document Parser
Embedding
Image Generation
Image Transformation
Code Execution Adapter
Context Semantic Ranker
Compaction
LLM Judge
Offline Eval
这些模块具有共同特征:
2.3 不要让 Python Worker 成为第二个控制中心
常见错误是 Python Worker 同时负责:
最终形成两个控制中心:
Spring Boot 有一套状态
Python 又有一套状态
推荐原则:
Spring Boot 是任务状态和治理的唯一权威来源,Python 只提交执行结果和状态变化建议。
Python 可以返回:
{
"suggestedTransition": "WAITING_APPROVAL",
"progressDelta": {},
"artifacts": []
}
但真正的任务状态迁移由 Spring Boot 在事务中完成。
2.4 Spring AI 不是平台落地的强制前提
Spring AI 可以提供:
但平台架构不应将自身完全绑定到某个框架版本。
截至本文写作时:
- • Spring AI 1.x 面向 Spring Boot 3.4.x 和 3.5.x;
- • Spring AI 2.0 面向 Spring Boot 4.0.x 和 4.1.x。
如果现有系统仍运行在 Spring Boot 3.1.8,不建议为了接入 Agent 能力直接在原服务中强行加入当前最新版 Spring AI。
可以选择:
路线 A:保持现有 Spring Boot 3.1.8
Spring Boot 3.1.8 控制面
→ HTTP / RocketMQ
→ Python Agent Worker
模型调用、Agent Loop 和 RAG 放在 Python Worker 中。
优点:
路线 B:独立建设 AI Gateway
原业务服务:Spring Boot 3.1.8
AI Gateway:受支持的 Spring Boot + Spring AI
Python Worker:复杂 Agent 和算法
优点:
- • 可以使用 Spring AI Tool Calling、MCP 和 Observability;
路线 C:整体升级
当业务允许升级到受支持的 Spring Boot 版本后,再评估将部分模型、Tool 和 MCP 能力收敛到 Java 侧。
平台的核心契约——Task、Message、Tool、State 和 Artifact——不应因框架升级而改变。
三、总体架构
四、控制面与执行面的核心边界
五、平台模块划分
5.1 Spring Boot 项目结构
agent-platform-server/
├── agent-api
│ ├── task
│ ├── conversation
│ ├── model
│ ├── prompt
│ ├── skill
│ ├── tool
│ └── eval
├── agent-domain
│ ├── task
│ ├── workflow
│ ├── state
│ ├── registry
│ ├── policy
│ ├── memory
│ └── billing
├── agent-application
│ ├── command
│ ├── query
│ ├── orchestrator
│ └── event
├── agent-infrastructure
│ ├── persistence
│ ├── rocketmq
│ ├── objectstorage
│ ├── toolgateway
│ ├── security
│ └── observability
└── agent-bootstrap
不要在 Controller 中直接:
查询模型配置
→ 拼 Prompt
→ 调用 Python
→ 更新状态
应通过 Application Service 和 Domain Service 显式表达状态迁移。
5.2 Python Worker 项目结构
agent-worker/
├── app/
│ ├── api/
│ │ ├── health.py
│ │ └── internal.py
│ ├── consumer/
│ │ ├── task_consumer.py
│ │ └── event_publisher.py
│ ├── runtime/
│ │ ├── agent_runtime.py
│ │ ├── loop_executor.py
│ │ ├── context_runtime.py
│ │ └── result_builder.py
│ ├── models/
│ │ ├── base.py
│ │ ├── chat.py
│ │ ├── embedding.py
│ │ └── image.py
│ ├── providers/
│ │ ├── registry.py
│ │ └── adapters/
│ ├── tools/
│ │ └── gateway_client.py
│ ├── skills/
│ │ └── loader.py
│ ├── rag/
│ │ ├── query_planner.py
│ │ ├── retriever.py
│ │ └── verifier.py
│ ├── eval/
│ │ ├── graders.py
│ │ └── runner.py
│ ├── observability/
│ │ ├── tracing.py
│ │ └── metrics.py
│ ├── schemas/
│ └── settings.py
├── tests/
├── pyproject.toml
└── Dockerfile
Python Worker 应按任务能力拆分,而不是把所有逻辑写入一个 worker.py。
六、任务模型
6.1 Task 与 Execution 分离
同一个业务 Task 可能经历多次 Execution:
因此建议区分:
Task:
用户想完成的业务目标。
Execution:
某一次具体运行尝试。
6.2 核心状态
CREATED
READY
DISPATCHING
RUNNING
WAITING_TOOL
WAITING_APPROVAL
PAUSED
RETRYING
SUCCEEDED
FAILED
EXHAUSTED
CANCELLED
6.3 Task 表
CREATE TABLE agent_task (
id VARCHAR(64) PRIMARY KEY,
tenant_id VARCHAR(64) NOT NULL,
user_id VARCHAR(64) NOT NULL,
task_type VARCHAR(64) NOT NULL,
goal JSONB NOT NULL,
acceptance_criteria JSONB NOT NULL DEFAULT '[]'::jsonb,
constraints JSONB NOT NULL DEFAULT '{}'::jsonb,
status VARCHAR(32) NOT NULL,
priority INTEGER NOT NULL DEFAULT 50,
state_version BIGINT NOT NULL DEFAULT 0,
current_execution_id VARCHAR(64),
current_step_no INTEGER NOT NULL DEFAULT 0,
current_checkpoint_id VARCHAR(64),
max_steps INTEGER NOT NULL DEFAULT 30,
used_steps INTEGER NOT NULL DEFAULT 0,
max_tokens BIGINT,
used_tokens BIGINT NOT NULL DEFAULT 0,
max_cost NUMERIC(18, 6),
used_cost NUMERIC(18, 6) NOT NULL DEFAULT 0,
next_run_at TIMESTAMPTZ,
deadline_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_agent_task_schedule
ON agent_task(status, next_run_at, priority);
6.4 Execution 表
CREATE TABLE agent_execution (
id VARCHAR(64) PRIMARY KEY,
task_id VARCHAR(64) NOT NULL,
attempt_no INTEGER NOT NULL,
worker_type VARCHAR(64) NOT NULL,
worker_id VARCHAR(128),
model_snapshot JSONB NOT NULL,
capability_snapshot JSONB NOT NULL,
status VARCHAR(32) NOT NULL,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
cost NUMERIC(18, 6) NOT NULL DEFAULT 0,
error_code VARCHAR(64),
error_snapshot JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(task_id, attempt_no)
);
6.5 Step 表
CREATE TABLE agent_step (
id VARCHAR(64) PRIMARY KEY,
task_id VARCHAR(64) NOT NULL,
execution_id VARCHAR(64) NOT NULL,
step_no INTEGER NOT NULL,
step_type VARCHAR(64) NOT NULL,
status VARCHAR(32) NOT NULL,
decision JSONB,
action JSONB,
observation JSONB,
verification JSONB,
progress_delta JSONB,
trace_id VARCHAR(128),
started_at TIMESTAMPTZ NOT NULL,
finished_at TIMESTAMPTZ,
UNIQUE(task_id, step_no)
);
大体积模型内容、日志和文件不建议直接存入 Step 表,应写入对象存储并保存 Artifact 引用。
七、模型注册中心
7.1 模型配置
CREATE TABLE aigc_model_config (
id VARCHAR(64) PRIMARY KEY,
tenant_id VARCHAR(64),
provider VARCHAR(64) NOT NULL,
model_code VARCHAR(128) NOT NULL,
model_type VARCHAR(32) NOT NULL,
endpoint TEXT,
credential_ref VARCHAR(256),
config JSONB NOT NULL DEFAULT '{}'::jsonb,
context_window INTEGER,
supports_tools BOOLEAN NOT NULL DEFAULT FALSE,
supports_structured BOOLEAN NOT NULL DEFAULT FALSE,
supports_streaming BOOLEAN NOT NULL DEFAULT FALSE,
data_classification VARCHAR(32) NOT NULL,
status VARCHAR(32) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(tenant_id, model_code)
);
credential_ref 引用 Vault、KMS 或 Kubernetes Secret,不保存明文 API Key。
7.2 设计时绑定与运行时路由
DAG 节点可以保存:
{
"modelCode": "reasoning-primary"
}
运行时通过 Registry 解析:
modelCode
→ Provider
→ Model ID
→ Endpoint
→ Credential
→ Capability
→ Routing Policy
如果需要动态路由,可以将节点配置扩展为:
{
"modelPolicy": "reasoning-high-quality",
"fallbackPolicy": "reasoning-read-only-fallback"
}
7.3 Python Provider Adapter
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class ModelConfig:
provider: str
model_code: str
model_name: str
endpoint: str | None
credential_token: str
options: dict[str, Any]
class ChatModelAdapter(ABC):
@abstractmethod
async def generate(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
response_schema: dict[str, Any] | None = None,
) -> dict[str, Any]:
raise NotImplementedError
7.4 Provider Registry
class ProviderRegistry:
def __init__(self) -> None:
self._factories: dict[str, callable] = {}
def register(self, provider: str, factory: callable) -> None:
if provider in self._factories:
raise ValueError(
f"Provider already registered: {provider}"
)
self._factories[provider] = factory
def create(self, config: ModelConfig) -> ChatModelAdapter:
factory = self._factories.get(config.provider)
if factory is None:
raise ValueError(
f"Unsupported provider: {config.provider}"
)
return factory(config)
业务代码不应直接判断:
if provider == "x":
...
elif provider == "y":
...
八、同步 API 与异步任务边界
8.1 哪些请求适合同步
8.2 哪些任务适合异步
8.3 不要使用 FastAPI BackgroundTasks 承担可靠长任务
FastAPI BackgroundTasks 适合请求返回后执行轻量工作,例如日志或简单通知。
对于需要:
的任务,应使用外部消息队列或任务系统。
本文采用 RocketMQ 作为异步骨干。
九、消息模型
9.1 消息 Envelope
{
"messageId": "msg-20260630-001",
"messageType": "AGENT_TASK_DISPATCHED",
"schemaVersion": "1.0",
"occurredAt": "2026-06-30T08:00:00Z",
"tenantId": "tenant-a",
"taskId": "task-001",
"executionId": "exec-001",
"stepId": "step-001",
"trace": {
"traceparent": "00-...",
"tracestate": ""
},
"payload": {}
}
9.2 Task Dispatch Payload
{
"taskType": "text_generate",
"stateVersion": 0,
"modelSnapshot": {
"modelCode": "reasoning-primary",
"provider": "provider-a",
"modelName": "model-x"
},
"capabilities": {
"skillIds": ["technical-article@1.0.0"],
"allowedToolIds": [],
"contextProfile": "article-generation@1.0.0"
},
"input": {
"artifactIds": [],
"variables": {}
},
"budget": {
"maxSteps": 20,
"maxTokens": 120000,
"maxCost": 3.0
}
}
9.3 Result Payload
{
"status": "STEP_COMPLETED",
"expectedStateVersion": 0,
"suggestedTransition": "RUNNING",
"progressDelta": {},
"usage": {
"inputTokens": 3200,
"outputTokens": 1800,
"cost": 0.12
},
"artifacts": [
{
"artifactId": "ar-001",
"type": "MARKDOWN",
"uri": "oss://agent/task-001/article.md",
"sha256": "..."
}
],
"nextAction":null
}
9.4 消息兼容
消息必须包含 schemaVersion。
兼容原则:
消费者不应直接反序列化到不断变化的内部 Domain Entity。
应使用独立 Message DTO。
十、Outbox:数据库状态与消息一致性
10.1 为什么需要 Outbox
错误流程:
数据库创建 Task 成功
→ RocketMQ 发送失败
→ Task 永远停留在 READY
或者:
RocketMQ 发送成功
→ 数据库事务回滚
→ Worker 收到不存在的 Task
10.2 Outbox 表
CREATE TABLE agent_outbox (
id VARCHAR(64) PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id VARCHAR(64) NOT NULL,
event_type VARCHAR(64) NOT NULL,
schema_version VARCHAR(16) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(32) NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
next_retry_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ
);
CREATE INDEX idx_agent_outbox_publish
ON agent_outbox(status, next_retry_at, created_at);
10.3 创建任务
@Service
@RequiredArgsConstructor
public class AgentTaskApplicationService {
private final AgentTaskRepository taskRepository;
private final AgentOutboxRepository outboxRepository;
private final RequestIdempotencyRepository idempotencyRepository;
@Transactional
public CreateTaskResult create(
CreateTaskCommand command
) {
Optional<CreateTaskResult> existing =
idempotencyRepository.findResult(
command.tenantId(),
command.idempotencyKey()
);
if (existing.isPresent()) {
return existing.get();
}
AgentTask task = AgentTask.create(command);
taskRepository.insert(task);
AgentOutboxEvent event =
AgentOutboxEvent.taskCreated(task);
outboxRepository.insert(event);
CreateTaskResult result =
new CreateTaskResult(
task.id(),
task.status(),
task.stateVersion()
);
idempotencyRepository.save(
command.tenantId(),
command.idempotencyKey(),
result
);
return result;
}
}
10.4 Outbox Publisher
@Component
@RequiredArgsConstructor
public class AgentOutboxPublisher {
private final AgentOutboxRepository repository;
private final RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedDelayString = "${agent.outbox.interval-ms:1000}")
public void publishBatch() {
List<AgentOutboxEvent> events =
repository.lockNextBatch(100);
for (AgentOutboxEvent event : events) {
try {
Message<String> message =
MessageBuilder
.withPayload(
event.payload().toString()
)
.setHeader(
"KEYS",
event.id()
)
.setHeader(
"eventType",
event.eventType()
)
.build();
rocketMQTemplate.syncSend(
topicFor(event),
message,
5000
);
repository.markPublished(event.id());
} catch (RuntimeException ex) {
repository.markRetry(
event.id(),
nextRetryTime(event.retryCount())
);
}
}
}
}
实际部署中要考虑多实例抢占,可使用 FOR UPDATE SKIP LOCKED。
十一、消息至少一次与业务幂等
RocketMQ 发送重试或消费重投可能产生重复消息。
因此平台应采用:
消息至少一次
+ 消费幂等
+ State Version
+ Tool Idempotency
11.1 消费记录表
CREATE TABLE agent_message_consume (
consumer_group VARCHAR(128) NOT NULL,
message_id VARCHAR(128) NOT NULL,
task_id VARCHAR(64),
status VARCHAR(32) NOT NULL,
result_snapshot JSONB,
consumed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY(consumer_group, message_id)
);
11.2 Python 消费幂等
async def handle_message(message: AgentMessage) -> None:
existing = await consume_store.get(
consumer_group="agent-worker",
message_id=message.message_id,
)
if existing and existing["status"] == "SUCCEEDED":
return
await consume_store.try_start(
consumer_group="agent-worker",
message_id=message.message_id,
task_id=message.task_id,
)
try:
result = await runtime.execute(message)
await result_publisher.publish(result)
await consume_store.mark_succeeded(
consumer_group="agent-worker",
message_id=message.message_id,
result=result,
)
except RetryableError:
await consume_store.mark_retryable(
consumer_group="agent-worker",
message_id=message.message_id,
)
raise
except Exception as exc:
await consume_store.mark_failed(
consumer_group="agent-worker",
message_id=message.message_id,
error=str(exc),
)
raise
十二、Task API
12.1 创建任务
POST /api/v1/agent/tasks
Idempotency-Key: req-20260630-001
Content-Type: application/json
{
"taskType": "technical_article",
"goal": {
"description": "生成一篇关于 Agent 平台落地的技术文章"
},
"acceptanceCriteria": [
"包含总体架构",
"包含 Spring Boot 和 Python 代码",
"包含可靠性与部署设计"
],
"modelPolicy": "reasoning-high-quality",
"skillId": "technical-article@1.0.0",
"budget": {
"maxSteps": 20,
"maxTokens": 120000,
"maxCost": 3.0
}
}
12.2 查询任务
GET /api/v1/agent/tasks/{taskId}
返回:
{
"taskId": "task-001",
"status": "RUNNING",
"stateVersion": 6,
"progress": {
"currentStep": 6,
"completedMilestones": 2,
"totalMilestones": 4
},
"usage": {
"tokens": 42000,
"cost": 0.86
},
"artifacts": []
}
12.3 取消任务
POST /api/v1/agent/tasks/{taskId}/cancel
取消不是简单更新状态。
控制面需要:
十三、Python FastAPI 服务
FastAPI 主要提供:
可靠异步任务仍由 RocketMQ 驱动。
13.1 应用入口
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.settings import settings
from app.runtime.container import container
@asynccontextmanager
async def lifespan(app: FastAPI):
await container.start()
yield
await container.stop()
app = FastAPI(
title="Agent Worker",
version="1.0.0",
lifespan=lifespan,
)
@app.get("/health/live")
async def liveness() -> dict:
return {"status": "UP"}
@app.get("/health/ready")
async def readiness() -> dict:
checks = await container.readiness_checks()
ready = all(item["ok"] for item in checks.values())
return {
"status": "UP" if ready else "DOWN",
"checks": checks,
}
13.2 不要在模块导入时建立外部连接
错误:
client = SomeModelClient(api_key="...")
consumer = RocketMQConsumer(...)
这会导致:
使用 FastAPI Lifespan 或显式容器管理连接。
十四、Python Agent Runtime
14.1 运行接口
from typing import Protocol
class AgentRuntime(Protocol):
async def execute(
self,
message: dict,
) -> dict:
...
14.2 参考实现
class DefaultAgentRuntime:
def __init__(
self,
control_client,
context_client,
provider_registry,
tool_gateway,
artifact_store,
tracer,
) -> None:
self.control_client = control_client
self.context_client = context_client
self.provider_registry = provider_registry
self.tool_gateway = tool_gateway
self.artifact_store = artifact_store
self.tracer = tracer
async def execute(self, message: dict) -> dict:
task_id = message["taskId"]
execution_id = message["executionId"]
snapshot = await self.control_client.load_snapshot(
task_id=task_id,
execution_id=execution_id,
expected_state_version=message["payload"][
"stateVersion"
],
)
context = await self.context_client.build(
task_id=task_id,
step_id=snapshot["nextStepId"],
local_goal=snapshot["localGoal"],
model_policy=snapshot["modelPolicy"],
)
model_config = snapshot["resolvedModel"]
model = self.provider_registry.create(
ModelConfig(**model_config)
)
response = await model.generate(
messages=context["messages"],
tools=context["tools"],
response_schema=context.get(
"responseSchema"
),
)
if response.get("toolCalls"):
return await self._handle_tool_calls(
snapshot,
response,
)
artifact = await self.artifact_store.write_json(
task_id=task_id,
name=f"{snapshot['nextStepId']}-output.json",
content=response,
)
return {
"status": "STEP_COMPLETED",
"expectedStateVersion": snapshot[
"stateVersion"
],
"suggestedTransition": "RUNNING",
"progressDelta": self._build_progress(
response
),
"artifacts": [artifact],
"usage": response["usage"],
}
async def _handle_tool_calls(
self,
snapshot: dict,
response: dict,
) -> dict:
observations = []
for call in response["toolCalls"]:
observation = await self.tool_gateway.invoke(
task_id=snapshot["taskId"],
step_id=snapshot["nextStepId"],
tool_call=call,
principal_token=snapshot[
"delegatedPrincipalToken"
],
)
observations.append(observation)
return {
"status": "TOOL_OBSERVED",
"expectedStateVersion": snapshot[
"stateVersion"
],
"suggestedTransition": "RUNNING",
"observations": observations,
"usage": response["usage"],
}
def _build_progress(self, response: dict) -> dict:
return response.get("progressDelta", {})
十五、为什么 Tool Gateway 应留在 Java 控制面
Python Worker 可以选择工具,但不应直接持有所有业务凭证并调用生产系统。
正确链路:
Python Agent Runtime
→ Spring Boot Tool Gateway
→ Policy
→ Approval
→ Credential Broker
→ Business API / MCP
优势:
15.1 Python Tool Gateway Client
from pydantic import BaseModel
from typing import Any
class ToolCall(BaseModel):
tool_id: str
version: str | None = None
arguments: dict[str, Any]
class ToolGatewayClient:
def __init__(self, http_client, base_url: str) -> None:
self.http_client = http_client
self.base_url = base_url
async def invoke(
self,
task_id: str,
step_id: str,
tool_call: dict,
principal_token: str,
) -> dict:
parsed = ToolCall.model_validate(tool_call)
response = await self.http_client.post(
f"{self.base_url}/internal/v1/tools/invoke",
headers={
"Authorization":
f"Bearer {principal_token}",
"X-Task-Id": task_id,
"X-Step-Id": step_id,
},
json={
"taskId": task_id,
"stepId": step_id,
"toolId": parsed.tool_id,
"requestedVersion": parsed.version,
"arguments": parsed.arguments,
},
timeout=30.0,
)
response.raise_for_status()
return response.json()
十六、Context Builder
Context 构建可以放在 Spring Boot,也可以分层实现:
Spring Boot 负责
Python 负责
推荐的接口是:
Java 构建可信候选集
→ Python 执行语义选择和模型适配
→ Java 保存 Manifest
不能让 Python 绕过控制面直接读取全部企业数据。
十七、Artifact 设计
17.1 为什么需要 Artifact Store
以下内容不适合直接存 PostgreSQL:
17.2 Artifact 表
CREATE TABLE agent_artifact (
id VARCHAR(64) PRIMARY KEY,
tenant_id VARCHAR(64) NOT NULL,
task_id VARCHAR(64),
execution_id VARCHAR(64),
artifact_type VARCHAR(64) NOT NULL,
file_name VARCHAR(512),
content_type VARCHAR(128),
storage_uri TEXT NOT NULL,
size_bytes BIGINT,
sha256 VARCHAR(128),
classification VARCHAR(32) NOT NULL,
access_policy JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expire_at TIMESTAMPTZ
);
17.3 上传流程
推荐:
Spring Boot 申请预签名地址
→ Python 上传对象存储
→ Python 提交 Hash 和元数据
→ Spring Boot 创建 Artifact 记录
避免大文件经过消息队列。
十八、工作流与 Agent Loop 的组合
固定流程和动态 Agent 不应二选一。
确定性节点交给 Java 流程,开放决策交给 Python Agent。
十九、DAG 设计器与执行模型
平台已有 DAG 设计器时,可以将节点分为:
INPUT
TEXT_GENERATE
IMAGE_GENERATE
IMAGE_TRANSFORM
RAG
AGENT
TOOL
APPROVAL
CONDITION
PARALLEL
MERGE
OUTPUT
19.1 节点配置
{
"nodeId": "node-agent-01",
"nodeType": "AGENT",
"config": {
"modelCode": "reasoning-primary",
"skillId": "technical-article@1.0.0",
"contextProfile": "article-long@1.0.0",
"maxSteps": 20,
"allowedToolTags": [
"research",
"document"
]
}
}
19.2 不要把 DAG JSON 直接当执行状态
DAG Definition 是设计时定义。
Execution State 是运行时状态。
需要区分:
Workflow Definition
Workflow Version
Workflow Instance
Node Instance
Task / Execution / Step
二十、人工审批
审批应由 Spring Boot 负责。
CREATE TABLE agent_approval (
id VARCHAR(64) PRIMARY KEY,
task_id VARCHAR(64) NOT NULL,
step_id VARCHAR(64) NOT NULL,
action_type VARCHAR(64) NOT NULL,
action_snapshot JSONB NOT NULL,
arguments_hash VARCHAR(128) NOT NULL,
status VARCHAR(32) NOT NULL,
requested_by VARCHAR(64) NOT NULL,
approved_by VARCHAR(64),
requested_at TIMESTAMPTZ NOT NULL,
decided_at TIMESTAMPTZ,
expire_at TIMESTAMPTZ,
reason TEXT
);
审批通过后发布:
AGENT_APPROVAL_GRANTED
Worker 不需要一直阻塞进程等待。
二十一、配置管理
21.1 Spring Boot 配置
agent:
task:
default-max-steps: 30
dispatch-batch-size: 100
outbox:
interval-ms: 1000
max-retry: 20
tool:
default-timeout-ms: 30000
max-tools-per-context: 12
security:
require-approval-for:
- HIGH
- CRITICAL
rocketmq:
name-server: ${ROCKETMQ_NAME_SERVER}
producer:
group: agent-platform-producer
send-message-timeout: 5000
21.2 Python 配置
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
app_name: str = "agent-worker"
environment: str = "dev"
control_plane_url: str
tool_gateway_url: str
object_storage_endpoint: str
max_concurrent_tasks: int = 8
model_request_timeout_seconds: int = 120
tool_request_timeout_seconds: int = 30
otel_service_name: str = "agent-worker"
otel_exporter_otlp_endpoint: str | None = None
model_config = {
"env_file": ".env",
"extra": "ignore",
}
settings = Settings()
敏感配置只保存引用或运行时环境变量,不写入仓库。
二十二、本地开发环境
22.1 Docker Compose
services:
postgres:
image: postgres:16
environment:
POSTGRES_DB: agent_platform
POSTGRES_USER: agent
POSTGRES_PASSWORD: agent_dev
ports:
- "5432:5432"
volumes:
- ./data/postgres:/var/lib/postgresql/data
namesrv:
image: apache/rocketmq:5.3.2
command: sh mqnamesrv
ports:
- "9876:9876"
broker:
image: apache/rocketmq:5.3.2
depends_on:
- namesrv
command: sh mqbroker
environment:
NAMESRV_ADDR: namesrv:9876
ports:
- "10909:10909"
- "10911:10911"
agent-server:
build: ./agent-platform-server
depends_on:
- postgres
- broker
environment:
SPRING_DATASOURCE_URL:
jdbc:postgresql://postgres:5432/agent_platform
ROCKETMQ_NAME_SERVER: namesrv:9876
ports:
- "8080:8080"
agent-worker:
build: ./agent-worker
depends_on:
- broker
- agent-server
environment:
CONTROL_PLANE_URL: http://agent-server:8080
TOOL_GATEWAY_URL: http://agent-server:8080
ROCKETMQ_NAME_SERVER: namesrv:9876
ports:
- "8000:8000"
这是开发参考配置,不代表生产集群方案。
22.2 宿主机与容器通信
当 Spring Boot 在 macOS 宿主机运行、RocketMQ 在 Docker 内运行时,需要正确配置 Broker 对外地址。
当容器访问宿主机服务时,可使用:
host.docker.internal
不要在容器中使用 127.0.0.1 指向宿主机。
二十三、Kubernetes 生产部署
23.1 服务拆分
agent-api-deployment
agent-scheduler-deployment
agent-result-consumer-deployment
text-worker-deployment
image-worker-deployment
code-worker-deployment
eval-worker-deployment
tool-gateway-deployment
不同 Worker 的资源和权限不同,应独立部署。
23.2 Python Worker 进程模型
在 Kubernetes 中,通常推荐:
一个容器
→ 一个 Uvicorn 进程
→ 由 Kubernetes 复制 Pod
而不是在单个容器内启动大量 Uvicorn Worker。
这样更便于:
23.3 HPA 指标
HTTP Worker 可按 CPU、内存扩容。
异步 Agent Worker 更适合按:
队列积压
最老消息等待时间
活跃任务数
模型并发
GPU 利用率
扩容。
23.4 优雅停机
Worker 收到终止信号后:
- 6. 关闭模型、HTTP、MQ 和 Trace 连接;
二十四、高并发设计
24.1 不同任务使用不同 Topic
agent-text-task
agent-image-task
agent-code-task
agent-eval-task
agent-result
agent-event
避免一个慢任务类型阻塞全部消费。
24.2 消息 Key
推荐:
taskId
需要同一任务有序时,可以按 taskId 选择队列或使用任务级租约。
不要依赖全局消息顺序。
24.3 并发控制
平台需要同时控制:
租户并发
用户并发
任务类型并发
模型并发
供应商并发
工具并发
单任务并行度
24.4 背压
当模型供应商限流时:
- • 将 Retry-After 写入
next_run_at;
24.5 长任务切片
单次 Worker 不应执行无限 Loop。
推荐:
消费一次
→ 执行一轮或有限步骤
→ 提交 State
→ 再发布下一轮消息
这样支持:
二十五、可靠性与恢复
25.1 状态更新使用 CAS
UPDATE agent_task
SET state_version = :nextVersion,
status = :nextStatus,
current_step_no = :nextStep,
used_tokens = used_tokens + :tokenDelta,
used_cost = used_cost + :costDelta,
updated_at = NOW()
WHERE id = :taskId
AND state_version = :expectedVersion;
更新行数不是 1 时,Worker 必须重新加载状态。
25.2 Checkpoint
在以下位置创建:
25.3 Dead Letter
超过最大消费重试后进入 DLQ。
DLQ 不应只依赖人工查看控制台。
平台需要:
25.4 补偿而不是全局事务
Java、Python、模型供应商、业务 API 和对象存储无法放入一个本地事务。
采用:
本地事务
+ Outbox
+ 幂等
+ 状态机
+ 补偿
+ 人工恢复
不要尝试用分布式事务覆盖模型和外部工具。
二十六、安全
26.1 内部接口认证
Java 与 Python 之间不能只依赖内网可信。
建议:
26.2 Python 不持有全部模型密钥
更安全的方式:
Worker 请求 modelCode
→ Credential Broker 返回短期凭证
→ 凭证只允许指定模型和预算
如果供应商不支持短期 Token,至少按 Worker 类型隔离 Secret。
26.3 Tool 调用统一经过 Gateway
Python Worker 不直接连接:
26.4 对象存储权限
Artifact 使用:
二十七、可观测性
Spring AI 可以记录 Chat Model、Embedding、Vector Store 和 Tool Calling 等 Observation;OpenTelemetry 的 GenAI 语义约定则提供模型、Token、Tool 和 Agent 操作的统一观测方向。
平台不应只记录一次 HTTP 调用。
27.1 Trace 层级
Agent Task
├── Dispatch
├── Worker Consume
├── Context Build
├── Model Call
├── Tool Call
├── Artifact Write
├── State Commit
├── Next Dispatch
└── Final Verification
27.2 跨 RocketMQ 传播 Trace
发送消息时写入:
traceparent
tracestate
消费时恢复 Context,并创建 Consumer Span。
27.3 指标
agent_task_created_total
agent_task_success_total
agent_task_failed_total
agent_task_duration
agent_queue_lag
agent_worker_active_tasks
agent_model_request_total
agent_model_latency
agent_input_tokens
agent_output_tokens
agent_cost
agent_tool_call_total
agent_state_conflict_total
agent_checkpoint_restore_total
27.4 内容采集
默认不要把完整 Prompt、模型输出和 Tool Result 写入普通 Trace。
可以记录:
敏感内容单独受控保存。
二十八、Eval 平台接入
28.1 Eval 不应是离线脚本孤岛
平台需要:
Dataset
→ Case
→ Eval Run
→ Agent Execution
→ Grader
→ Result
→ Release Gate
28.2 Eval 任务复用 Worker
可以使用独立 Topic:
agent-eval-task
Eval Worker 通过相同 Runtime 执行任务,但:
28.3 发布门禁
模型、Prompt、Skill、Tool 或 Context Profile 发布前:
Regression Dataset
+ Safety Dataset
+ Recovery Dataset
+ Cost Threshold
全部通过后才能发布。
二十九、测试策略
29.1 Java 测试
29.2 Python 测试
29.3 Contract Test
Java 与 Python 共享:
JSON Schema
OpenAPI
Message Schema
Error Code
State Enum
建议将契约生成独立制品,并在 CI 中验证兼容性。
29.4 故障测试
三十、发布与版本策略
需要同时版本化:
Server API
Message Schema
Task State Schema
Workflow Definition
Model Config
Prompt
Skill
Tool
Context Profile
Worker Image
Task 创建时保存版本快照。
长任务不能在恢复时静默切换新版本。
三十一、成本治理
成本由多部分组成:
模型 Token
Embedding
Reranker
Web Search
图像生成
代码沙箱
存储
网络
人工审批
任务级预算:
budget:
maxInputTokens: 100000
maxOutputTokens: 30000
maxModelCalls: 30
maxToolCalls: 40
maxCost: 5.0
deadlineSeconds: 1800
控制面负责强制执行,不能只在 Prompt 中提醒模型。
三十二、常见错误实践
32.1 Java 与 Python 都维护任务状态
会产生双写和恢复冲突。
32.2 HTTP 请求一直等待长任务完成
会导致超时、资源占用和重试重复。
32.3 FastAPI BackgroundTasks 承担关键任务
进程退出后缺乏可靠恢复。
32.4 大文件通过 RocketMQ 传输
应传 Artifact ID 和 URI。
32.5 发送 MQ 成功后再提交数据库
会产生不存在的任务消息。
32.6 消费失败只依赖 RocketMQ 重试
业务仍需幂等、错误分类和 DLQ 运营。
32.7 Python Worker 直接使用生产管理员密钥
应通过 Tool Gateway 和凭证代理。
32.8 DAG Definition 与 Runtime State 混用
流程定义和实例状态必须分离。
32.9 一开始拆十几个微服务
平台初期可以模块化单体控制面 + 多类 Worker。
先明确边界,再根据负载拆服务。
32.10 为了使用 Spring AI 强行升级核心业务服务
应先通过独立 AI Gateway 或 Python Worker 隔离,升级需要独立评估。
32.11 每个 Provider 在业务代码中写 if/else
使用 Adapter 和 Registry。
32.12 只做 Demo,不建设 Eval 和 Trace
无法判断平台升级后是变好还是变坏。
三十三、推荐的演进路线
阶段一:最小可运行平台
Spring Boot Task API
+ PostgreSQL
+ RocketMQ
+ Python Text Worker
+ Model Registry
+ Artifact Store
先跑通任务创建、异步执行、结果回写。
阶段二:能力平台化
增加:
阶段三:Durable Agent
增加:
阶段四:企业治理
增加:
阶段五:持续质量闭环
增加:
三十四、关键设计决策总结
| | |
|---|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| Provider Adapter + Registry | |
| | |
| | |
| | |
| 保持控制面,AI 能力放 Python 或独立 Gateway | |
| | |
总结
Spring Boot + Python 的 Agent 平台,不应被理解为:
Java 接口调用一个 Python 模型接口
更完整的工程结构是:
Spring Boot 控制面
+ Python Agent 执行面
+ RocketMQ 异步骨干
+ PostgreSQL 状态与治理元数据
+ Object Storage Artifact
+ Tool Gateway
+ Context / Memory / State
+ Eval 与 OpenTelemetry
Spring Boot 负责:
任务
状态
权限
审批
版本
治理
事务
审计
Python 负责:
模型
推理
RAG
Agent Loop
文档与多模态处理
Eval 执行
二者通过稳定契约协作:
Task API
Message Envelope
State Version
Artifact Reference
Tool Invocation
Execution Result
Trace Context
生产落地时,最重要的不是先引入最多的模型和框架,而是先建立以下基础:
- 6. Model、Prompt、Skill、Tool 版本化;
- 7. Python Provider Adapter;
对于已有 Spring Boot 3.1.8 的系统,最稳妥的起点通常不是立刻重构全部服务,而是:
保留现有 Java 控制面
→ 建立稳定任务和消息协议
→ 接入独立 Python Worker
→ 逐步补充 Tool、State、Context、Eval
→ 最后再评估 Spring AI 和 Boot 升级
这样,平台可以先获得 AI 生态的迭代速度,同时保留企业系统所需的可靠性、治理和长期可维护性。
至此,“新一代 Agent 系统架构与工程实践”十二篇主线完成:
架构演进
→ Skills
→ Harness
→ Loop
→ RAG 工具化
→ ReAct
→ Function Call 与 MCP
→ Context Engineering
→ Memory 与 State
→ 可靠性、安全与恢复
→ Eval 与可观测性
→ Spring Boot + Python 平台落地
这些能力并不是彼此独立的技术名词,而是一套完整的工程体系:
让能力可沉淀,让任务可推进,让运行可控制,让失败可恢复,让质量可度量。
参考资料
- 1. Spring AI Reference, Introduction
https://docs.spring.io/spring-ai/reference/index.html - 2. Spring AI Reference, Getting Started
https://docs.spring.io/spring-ai/reference/getting-started.html - 3. Spring AI 1.0 Reference, Getting Started
https://docs.spring.io/spring-ai/reference/1.0/getting-started.html - 4. Spring AI Reference, Tool Calling
https://docs.spring.io/spring-ai/reference/api/tools.html - 5. Spring AI Reference, Model Context Protocol
https://docs.spring.io/spring-ai/reference/api/mcp/mcp-overview.html - 6. Spring AI Reference, Chat Memory
https://docs.spring.io/spring-ai/reference/api/chat-memory.html - 7. Spring AI Reference, Observability
https://docs.spring.io/spring-ai/reference/observability/index.html - 8. Spring AI Reference, Evaluation Testing
https://docs.spring.io/spring-ai/reference/api/testing.html - 9. FastAPI, Deployment Concepts
https://fastapi.tiangolo.com/deployment/concepts/ - 10. FastAPI, Server Workers
https://fastapi.tiangolo.com/deployment/server-workers/ - 11. FastAPI, Background Tasks
https://fastapi.tiangolo.com/tutorial/background-tasks/ - 12. Apache RocketMQ, Sending Retry and Throttling Policy
https://rocketmq.apache.org/docs/featureBehavior/05sendretrypolicy/ - 13. Apache RocketMQ, Consumer
https://rocketmq.apache.org/docs/domainModel/09consumer/ - 14. Apache RocketMQ, Consumer Progress Management
https://rocketmq.apache.org/docs/featureBehavior/09consumerprogress/ - 15. OpenTelemetry, Generative AI Semantic Conventions
https://opentelemetry.io/docs/specs/semconv/gen-ai/ - 16. OpenTelemetry, Inside the LLM Call: GenAI Observability with OpenTelemetry
https://opentelemetry.io/blog/2026/genai-observability/