作为一个重度 Claude Code 用户,我有个执念:能不能让它接我自己的模型?
一方面是 API 账单压力,另一方面是想把本地跑的 Qwen3 35B 喂给它用用看。折腾了一整天,最终跑通了。中间踩的坑挺有意思,完整代码也放出来,直接能用。
先说结论:可行,但没你想的那么简单。核心问题不是"接个 URL 换个 Key",而是两套协议的差异和上下文窗口的硬件限制。
一、为什么需要一个代理?
Claude Code 只认 Anthropic 自己的协议:
POST/v1/messages
Content-Type:application/json
{
"model":"claude-sonnet-4-6",
"system":"你是一个编程助手...",
"messages":[...],
"tools":[...],
"stream":true
}
而市面上 99% 的第三方模型服务(包括本地 llama.cpp、vLLM、各种云转发)用的是 OpenAI 格式:
POST/v1/chat/completions
{
"model":"gpt-4o",
"messages":[
{"role":"system","content":"..."},
{"role":"user","content":"..."}
],
"stream":true
}
两套格式差异不小:
| 差异点 |
Anthropic 格式 |
OpenAI 格式 |
| 系统提示 |
顶层 system 字段 |
messages[0].role=system |
| 工具定义 |
input_schema |
parameters |
| 工具调用 |
tool_use content block |
tool_calls 数组 |
| 工具结果 |
tool_result content block |
role=tool 消息 |
| 流式格式 |
有类型的 SSE 事件 |
纯 delta SSE |
| 思考内容 |
thinking block |
reasoning_content 字段 |
所以要写一个协议翻译层:对外是 Anthropic 接口,对内调 OpenAI 兼容接口。
二、完整代码(约 200 行)
依赖安装:
pipinstallfastapiuvicornhttpx
文件头:
#!/usr/bin/env python3
"""Anthropic Messages API -> OpenAI-compatible chat completions proxy"""
import json
import uuid
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, StreamingResponse
import uvicorn
2.1 路由配置:一个代理,多个后端
这是整个设计里最有价值的部分——按模型名路由到不同的上游。Claude Code 发来 claude-sonnet-4-6,代理转给本地 Qwen;发来 claude-opus-4-8,转给云端高质量模型——客户端完全感知不到差异。
# 默认路由:没有单独配置的模型都走这里
DEFAULT_ROUTE = {
"upstream_model": "your-default-model",
"api_base": "https://your-api.example.com/v1",
"api_key": "sk-your-key-here",
"timeout": 120,
}
# 按模型名覆盖:只写需要改的字段,其余继承默认值
MODEL_ROUTING = {
"claude-opus-4-8": {
"upstream_model": "gpt-5.4", # 走云端高质量模型
},
"claude-opus-4-7": {
"upstream_model": "claude-opus-4-7", # 走官方 Claude
},
"claude-sonnet-4-6": {
"upstream_model": "qwen", # 走本地 Qwen
"api_base": "http://localhost:8080/v1",
"api_key": "", # 本地服务无需 key
"timeout": 600, # 本地模型慢,超时要长
},
"claude-haiku-4-5": {
"upstream_model": "gpt-5.5",
"api_base": "https://another-api.example.com/v1",
"api_key": "sk-another-key",
},
}
app = FastAPI()
def get_route(model):
route = dict(DEFAULT_ROUTE)
route.update(MODEL_ROUTING.get(model, {}))
return route
2.2 消息格式转换:最复杂的部分
Anthropic 的 messages 格式里有很多特殊 content block 类型,需要一一映射到 OpenAI 格式:
def to_oai_messages(body):
result = []
# 1. system 字段单独处理,转成 role=system 消息
system = body.get("system", "")
if system:
# system 可以是字符串,也可以是 content block 数组
text = system if isinstance(system, str) else "\n".join(
block.get("text", "") for block in system if block.get("type") == "text"
)
if text:
result.append({"role": "system", "content": text})
for message in body.get("messages", []):
role = message["role"]
content = message.get("content", "")
if role == "assistant":
# assistant 消息可能包含文字 + 工具调用,需要拆分
text_parts = []
tool_calls = []
blocks = content if isinstance(content, list) else [{"type": "text", "text": content}]
for block in blocks:
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif block.get("type") == "tool_use":
# Anthropic 的 tool_use block → OpenAI 的 tool_calls
tool_calls.append({
"id": block.get("id", "call_" + uuid.uuid4().hex[:8]),
"type": "function",
"function": {
"name": block.get("name", ""),
"arguments": json.dumps(block.get("input", {}), ensure_ascii=False),
},
})
oai_message = {"role": "assistant", "content": "\n".join(text_parts)}
if tool_calls:
oai_message["tool_calls"] = tool_calls
result.append(oai_message)
elif role == "user":
if isinstance(content, list):
# user 消息可能混合:工具返回结果 + 普通文字
tool_results = [b for b in content if b.get("type") == "tool_result"]
text_blocks = [b for b in content if b.get("type") == "text"]
# 工具结果要用 role=tool 单独发
for tr in tool_results:
tc = tr.get("content", "")
if isinstance(tc, list):
tc = "\n".join(b.get("text", "") for b in tc if b.get("type") == "text")
result.append({"role": "tool", "tool_call_id": tr.get("tool_use_id", ""), "content": tc})
if text_blocks:
result.append({"role": "user",
"content": "\n".join(b.get("text", "") for b in text_blocks)})
else:
result.append({"role": "user", "content": content or ""})
else:
result.append({"role": role, "content": content if isinstance(content, str) else ""})
return result
def to_oai_tools(tools):
if not tools:
return []
return [{
"type": "function",
"function": {
"name": tool.get("name", ""),
"description": tool.get("description", ""),
# Anthropic 用 input_schema,OpenAI 用 parameters——字段名不同,内容格式相同
"parameters": tool.get("input_schema", {"type": "object", "properties": {}}),
},
} for tool in tools]
2.3 流式响应转换:最烧脑的部分
Anthropic 的流式格式是有语义的事件流,每个内容块(文字、工具调用、思考内容)都有明确的开始/结束边界。而 OpenAI 格式只是一串 delta。代理要做的是:把 OpenAI 的 delta 流重新包装成 Anthropic 的事件流。
def sse(event, data):
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
def parse_sse_data(line):
if line.startswith("data:"):
return line[5:].lstrip()
return None
async def stream_gen(api_base, payload, headers, model, msg_id, timeout=120, use_thinking=False):
# 1. 先发 message_start,告诉客户端消息开始了
yield sse("message_start", {"type": "message_start", "message": {
"id": msg_id, "type": "message", "role": "assistant",
"content": [], "model": model, "stop_reason": None,
"usage": {"input_tokens": 0, "output_tokens": 1},
}})
# 状态追踪:每个 content block 有自己的 index
text_block_started = False
thinking_block_started = False
thinking_block_stopped = False
tool_call_accumulator = {} # 工具调用是分片发来的,需要先攒齐
next_block_index = [0] # 用列表是为了在嵌套函数里修改
def start_thinking_block():
nonlocal thinking_block_started
if not thinking_block_started:
thinking_block_started = True
index = next_block_index[0]; next_block_index[0] += 1
return index, sse("content_block_start", {
"type": "content_block_start", "index": index,
"content_block": {"type": "thinking", "thinking": ""},
})
return None, ""
def start_text_block():
nonlocal text_block_started
if not text_block_started:
text_block_started = True
index = next_block_index[0]; next_block_index[0] += 1
return index, sse("content_block_start", {
"type": "content_block_start", "index": index,
"content_block": {"type": "text", "text": ""},
})
return None, ""
current_thinking_index = [None]
current_text_index = [None]
import sys as _sys
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream("POST", f"{api_base}/chat/completions",
json=payload, headers=headers) as response:
if response.status_code >= 400:
body_bytes = await response.aread()
print(f"[ERROR] upstream {response.status_code}: {body_bytes[:300]}",
file=_sys.stderr, flush=True)
return
async for line in response.aiter_lines():
data = parse_sse_data(line)
if data is None: continue
if data.strip() == "[DONE]": break
try:
chunk = json.loads(data)
choice = chunk.get("choices", [{}])[0]
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
# 处理思考内容(来自 Qwen3/DeepSeek 的 reasoning_content 字段)
reasoning = delta.get("reasoning_content")
if use_thinking and reasoning:
if current_thinking_index[0] is None:
index, event = start_thinking_block()
current_thinking_index[0] = index
yield event
yield sse("content_block_delta", {
"type": "content_block_delta",
"index": current_thinking_index[0],
"delta": {"type": "thinking_delta", "thinking": reasoning},
})
# 处理正文内容
text = delta.get("content")
if text:
# 正文开始时,先关掉 thinking block
if use_thinking and current_thinking_index[0] is not None and not thinking_block_stopped:
thinking_block_stopped = True
yield sse("content_block_stop", {
"type": "content_block_stop",
"index": current_thinking_index[0],
})
if current_text_index[0] is None:
index, event = start_text_block()
current_text_index[0] = index
yield event
yield sse("ping", {"type": "ping"})
yield sse("content_block_delta", {
"type": "content_block_delta",
"index": current_text_index[0],
"delta": {"type": "text_delta", "text": text},
})
# 处理工具调用(OpenAI 的 tool_calls 是流式分片发来的,需要攒齐再发)
for tool_call in delta.get("tool_calls", []):
oi = tool_call.get("index", 0)
if oi not in tool_call_accumulator:
bi = next_block_index[0]; next_block_index[0] += 1
tool_call_accumulator[oi] = {
"id": tool_call.get("id", ""), "name": "",
"args": "", "block_index": bi,
}
fn = tool_call.get("function", {})
if fn.get("name"): tool_call_accumulator[oi]["name"] += fn["name"]
if fn.get("arguments"): tool_call_accumulator[oi]["args"] += fn["arguments"]
if tool_call.get("id"): tool_call_accumulator[oi]["id"] = tool_call["id"]
# 工具调用结束时,批量输出所有工具 block
if finish_reason in ("tool_calls", "stop") and tool_call_accumulator:
if current_text_index[0] is not None:
yield sse("content_block_stop", {
"type": "content_block_stop", "index": current_text_index[0],
})
for tc in tool_call_accumulator.values():
yield sse("content_block_start", {
"type": "content_block_start", "index": tc["block_index"],
"content_block": {
"type": "tool_use",
"id": tc["id"] or f"toolu_{uuid.uuid4().hex[:16]}",
"name": tc["name"], "input": {},
},
})
yield sse("ping", {"type": "ping"})
yield sse("content_block_delta", {
"type": "content_block_delta", "index": tc["block_index"],
"delta": {"type": "input_json_delta", "partial_json": tc["args"]},
})
yield sse("content_block_stop", {
"type": "content_block_stop", "index": tc["block_index"],
})
yield sse("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "tool_use", "stop_sequence": None},
"usage": {"output_tokens": 1},
})
yield sse("message_stop", {"type": "message_stop"})
return
except Exception:
pass # 跳过解析失败的行
# 收尾:关闭所有未关闭的 block
if current_thinking_index[0] is not None and not thinking_block_stopped:
yield sse("content_block_stop", {
"type": "content_block_stop", "index": current_thinking_index[0],
})
if current_text_index[0] is not None:
yield sse("content_block_stop", {
"type": "content_block_stop", "index": current_text_index[0],
})
elif not text_block_started:
# 保底:如果一个字都没输出,也要发一个空 text block(否则 Claude Code 报错)
idx = next_block_index[0]
yield sse("content_block_start", {
"type": "content_block_start", "index": idx,
"content_block": {"type": "text", "text": ""},
})
yield sse("content_block_stop", {"type": "content_block_stop", "index": idx})
yield sse("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"output_tokens": 1},
})
yield sse("message_stop", {"type": "message_stop"})
2.4 主请求处理器
FastAPI 入口,处理 thinking beta 头的检测,区分流式和非流式两条路径:
@app.post("/v1/messages")
async def messages(request: Request):
body = await request.json()
model = body.get("model", "claude-opus-4-8")
stream = body.get("stream", False)
beta_header = request.headers.get("anthropic-beta", "")
# Claude Code 2.1+ 发来 interleaved-thinking beta 时,需要返回 thinking block
use_thinking = "interleaved-thinking" in beta_header
route = get_route(model)
payload = {
"model": route["upstream_model"],
"messages": to_oai_messages(body),
"max_tokens": body.get("max_tokens", 4096),
"stream": stream,
}
if "temperature" in body:
payload["temperature"] = body["temperature"]
oai_tools = to_oai_tools(body.get("tools", []))
if oai_tools:
payload["tools"] = oai_tools
headers = {"Content-Type": "application/json"}
if route["api_key"]:
headers["Authorization"] = f"Bearer {route['api_key']}"
if stream:
msg_id = f"msg_{uuid.uuid4().hex[:24]}"
return StreamingResponse(
stream_gen(route["api_base"], payload, headers, model, msg_id,
route["timeout"], use_thinking),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
# 非流式:直接转发,把 OpenAI 格式响应转回 Anthropic 格式
async with httpx.AsyncClient(timeout=route["timeout"]) as client:
response = await client.post(
f"{route['api_base']}/chat/completions", json=payload, headers=headers
)
oai = response.json()
if "error" in oai or "choices" not in oai:
return JSONResponse({"type": "error", "error": oai.get("error", oai)},
status_code=response.status_code)
choice = oai["choices"][0]
message = choice.get("message", {})
usage = oai.get("usage", {})
content_blocks = []
if use_thinking:
content_blocks.append({"type": "thinking",
"thinking": message.get("reasoning_content", "")})
if message.get("content"):
content_blocks.append({"type": "text", "text": message["content"]})
for tool_call in message.get("tool_calls", []):
function = tool_call.get("function", {})
try: args = json.loads(function.get("arguments", "{}"))
except Exception: args = {}
content_blocks.append({"type": "tool_use", "id": tool_call.get("id", ""),
"name": function.get("name", ""), "input": args})
stop_reason = "tool_use" if message.get("tool_calls") else "end_turn"
return JSONResponse({
"id": f"msg_{uuid.uuid4().hex[:24]}",
"type": "message", "role": "assistant",
"content": content_blocks, "model": model,
"stop_reason": stop_reason, "stop_sequence": None,
"usage": {"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0)},
})
@app.get("/v1/models")
async def list_models():
return JSONResponse({
"object": "list",
"data": [{"id": m, "object": "model", "created": 1700000000, "owned_by": "anthropic"}
for m in MODEL_ROUTING.keys()],
})
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5000, log_level="warning")
三、启动和停止代理
前台运行(测试用)
python3anthropic_proxy_plus.py
# 监听 0.0.0.0:5000,Ctrl+C 停止
后台持久运行
注意:直接用 nohup python3 xxx.py & 在某些环境下 SSH 断开后会被杀掉。用 start_new_session=True 更可靠。
python3-c"
import subprocess
p = subprocess.Popen(
['python3', '/root/anthropic_proxy_plus.py'],
stdout=open('/root/proxy_plus.log', 'w'),
stderr=subprocess.STDOUT,
stdin=open('/dev/null'),
start_new_session=True # 关键:防止 SSH 断开后被 SIGHUP 杀死
)
print(p.pid)
"
重启代理
不要在 SSH 里直接 pkill -f anthropic_proxy_plus!
SSH 进程命令行包含了执行的命令字符串,模糊匹配会把 SSH session 也杀掉。
# 写一个 restart 脚本,让脚本来 pkill,不在 SSH 里直接跑
cat>/root/restart_proxy.sh<< 'EOF'
#!/bin/bash
pkill -f "python3 /root/anthropic_proxy_plus.py" 2>/dev/null || true
sleep 1
python3 -c "
import subprocess
p = subprocess.Popen(
['python3', '/root/anthropic_proxy_plus.py'],
stdout=open('/root/proxy_plus.log', 'w'),
stderr=subprocess.STDOUT,
stdin=open('/dev/null'),
start_new_session=True
)
print(p.pid)
"
EOF
bash/root/restart_proxy.sh
验证是否正常
# 查看模型列表
curlhttp://localhost:5000/v1/models
# 简单对话测试
curl-XPOSThttp://localhost:5000/v1/messages\
-H"Content-Type: application/json"\
-d'{
"model": "claude-sonnet-4-6",
"max_tokens": 256,
"messages": [{"role": "user", "content": "你好"}]
}'
四、接入 Claude Code
代理跑起来之后,只需要两个环境变量:
exportANTHROPIC_BASE_URL="http://127.0.0.1:5000"
exportANTHROPIC_AUTH_TOKEN="any-string-you-like"# 代理不校验,随便填
# 指定模型启动
claude--modelclaude-sonnet-4-6
# 或者先设置默认模型
exportANTHROPIC_MODEL="claude-sonnet-4-6"
claude
验证走的是哪个模型:Claude Code 里问一句“你用的是什么模型?”,如果你的本地 Qwen 会如实回答“我是通义千问”,就说明路由成功了。
五、接本地 Qwen 的完整方案
这是我折腾时间最长的部分。核心问题:Claude Code 的系统 prompt 约 30,000 tokens,大多数本地模型默认上下文不够。
踩坑过程
- 最初设置
-c 16384(16k 上下文)→ Claude Code 请求直接 400:request (30385 tokens) exceeds context size - 想直接改
-c 65536 → 12GB 显存 99% 占用,OOM 崩溃 - 发现关键参数
-nkvo(把 KV Cache 转到内存)→ 显存只放模型权重,上下文无限扩
最终 llama-server 服务配置(systemd)
# ~/.config/systemd/user/llama-server.service
[Unit]
Description=llama-server Qwen3.6-35B-A3B IQ3_M
After=network.target
[Service]
Type=simple
WorkingDirectory=%h/llama-deploy
Environment=LD_LIBRARY_PATH=%h/llama-deploy/llama.cpp/build/bin
ExecStart=%h/llama-deploy/llama.cpp/build/bin/llama-server \
-m %h/llama-deploy/models/IQ3_M.gguf \
-ngl 30 \
-nkvo \
-ctk q4_0 \
-ctv q4_0 \
-c 65536 \
-n 8192 \
--host 0.0.0.0 \
--port 8080 \
--threads 20 \
--reasoning off
Restart=on-failure
RestartSec=5
StandardOutput=append:%h/llama-deploy/server.log
StandardError=append:%h/llama-deploy/server.log
[Install]
WantedBy=default.target
各关键参数说明:
| 参数 |
重要度 |
说明 |
-nkvo |
★ 最关键 |
KV Cache 从显存转到内存,显存只放模型权重,上下文可以扩到任意大 |
-ctk q4_0 -ctv q4_0 |
推荐 |
KV Cache 4-bit 量化,64k 上下文只占约 4GB 内存(不量化要 16GB) |
-c 65536 |
必须 |
Claude Code 系统 prompt ~30k,至少要 32k 上下文,建议 64k |
--reasoning off |
★ 速度影响最大 |
关闭 Qwen3 思考模式,每轮响应从 25s → 1.8s |
-ngl 30 |
根据显存调 |
GPU 层数,12GB 显存跑 IQ3_M 量化约 30 层 |
KV Cache 优化前后对比
| 指标 |
优化前 |
优化后 |
| Claude Code 可用性 |
❌ 400 报错(prompt 超限) |
✅ 正常工作 |
| 简短请求响应时间 |
25 秒 |
1.8 秒 |
| GPU 占用 |
11787/12288 MiB(99%) |
11183/12288 MiB(91%) |
| 上下文窗口 |
16384 tokens |
65536 tokens |
| KV Cache 位置 |
显存(瓶颈) |
内存(242GB 可用) |
| 思考开销 |
每轮 15-20 秒 |
0(已关闭) |
管理命令
# 重启(修改配置后)
systemctl--userdaemon-reload
systemctl--userrestartllama-server
# 查看状态
systemctl--userstatusllama-server
# 查看实时日志
tail-f~/llama-deploy/server.log
# 检查显存
nvidia-smi--query-gpu=memory.used,memory.free--format=csv,noheader
六、踩坑记录与感悟
坑1:thinking block 输出为空(Cooked for 0s)
Claude Code 2.1.167+ 对 claude-sonnet-4-6 强制启用 interleaved-thinking beta,期望响应里有 thinking 类型的 content block。
早期代码直接插入了一个空的 thinking block("thinking": ""),结果 Claude Code 显示 Cooked for 0s——什么都没输出。
真正的问题是:Qwen3 把思考内容放在 delta.reasoning_content 字段,需要在流式传输时实时映射成 Anthropic 的 thinking_delta 事件。修完之后,Claude Code 能正常显示“模型思考了 X 秒”。
坑2:pkill -f 把自己干掉了
调试时想 pkill -f anthropic_proxy_plus 来重启代理,结果 SSH session 直接断了。
原因:SSH 的进程命令行里包含了正在执行的命令字符串,pkill -f 模糊匹配时把 SSH 进程也一起杀了。解决方案:写一个 restart_proxy.sh 脚本,让脚本来执行 pkill,而不是在 SSH 里直接跑。
坑3:冷启动 2 分钟
即使解决了所有协议问题,Claude Code 第一条消息还是要等 2 分钟。
原因:Claude Code 每次请求都携带完整的约 30k token 系统 prompt。本地模型需要把这 30k token 全部 prefill,在混合 GPU/CPU 推理下大约 260 tokens/秒,30000 / 260 ≈ 115 秒。
这是当前硬件的物理上限,无法用软件绕过。好消息是 llama-server 有 prompt cache,相同系统 prompt 的后续请求会复用 KV,第二条消息起恢复到 3-10 秒。
总结性感悟
- 协议差异比想象中大:不是换个 URL 就能接上,SSE 流式格式、工具调用、思考内容的处理方式全都不一样。
- 显存是本地部署的核心瓶颈:KV Cache 随上下文线性增长,用
-nkvo 转到内存是关键突破。 - 按模型路由的设计很实用:重任务走云端高质量模型,日常交互走本地免费模型,客户端无感知。
- 不要盲目开思考模式:对于工具类应用,思考模式增加的是延迟,不是质量。
代码已经在生产跑了,Claude Code 接本地 Qwen 可以用,第一条消息忍一忍,后面流畅。有问题欢迎留言。