题目:
假设公司有一个 AI 产品。
高峰期:QPS = 1000
调用:OpenAI、Claude、Gemini
请设计一个 Python 服务架构,保证:
解题思路
并发:asyncio
限流:Semaphore、Token
缓存:Redis、Semantic Cache
降级:
削峰:Kafka、RabbitMQ
监控:P95、P99、429、Timeout、Success Rate
题目答案
一、整体架构设计
如果高峰期达到:QPS = 1000
并且调用:
我的目标是:
整体架构:
┌─────────────┐ │ Client │ └──────┬──────┘ │ ▼ ┌──────────────────┐ │ API Gateway │ └────────┬─────────┘ │ ▼ ┌──────────────────┐ │ Request Queue │ │ Kafka/RabbitMQ │ └────────┬─────────┘ │ ┌───────────────┼───────────────┐ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Worker-1 │ │ Worker-2 │ │ Worker-N │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ ▼ ▼ ▼ Redis Cache + Semantic Cache │ ▼ Rate Limiter(Token Bucket) │ ▼ Model Router │ ┌────┼────┐ ▼ ▼ ▼OpenAI Claude Gemini │ ▼Prometheus + Grafana
二、如何避免限流
假设:OpenAI RPM=60000
如果直接:await
可能瞬间打出几千请求。
导致:429 Too Many Requests
所以必须做限流。
Semaphore控制并发
import asyncioSEM = asyncio.Semaphore(100)async def call_model(prompt): async with SEM: return await real_call(prompt)
作用:
避免瞬时流量过高。
Token Bucket控制速率
例如:
伪代码:
class TokenBucket: def __init__(self, rate): self.rate = rate self.tokens = rate async def acquire(self): while self.tokens <= 0: await asyncio.sleep(0.1) self.tokens -= 1
生产环境一般使用:Redis + Lua
实现分布式限流。
三、降低延迟
使用 asyncio
错误方式:
for prompt in prompts: result = call_llm(prompt)
假设:
单次请求 = 3秒
正确方式:
import asyncioasync def process(prompts): tasks = [ call_llm(prompt) for prompt in prompts ] return await asyncio.gather(*tasks)
理论上:
完成。
四、提高成功率
自动重试
for retry in range(3): try: return await call_openai() except Exception: await asyncio.sleep( 2 ** retry )
熔断器
当OpenAI 连续失败:
触发:
后续请求不再访问 OpenAI。
避免:
五、多模型降级
设计模型路由:
async def route(prompt): try: return await openai_chat(prompt) except Exception: try: return await claude_chat(prompt) except Exception: return await gemini_chat(prompt)
流程:
保证:
六、控制成本
Redis Cache
很多请求重复。
例如:
Python是什么?Python是什么?Python是什么?
缓存:
命中:
cached = redis.get(cache_key)if cached: return cached
直接返回。
Semantic Cache
普通缓存:
与:
无法命中。
Semantic Cache:
Embedding↓向量检索↓相似度 > 0.95↓直接返回缓存
可减少:
七、流量削峰
如果突然:
直接调用模型:
概率极高。
使用 Kafka:
Client ↓Kafka ↓Worker ↓OpenAI
作用:
八、监控指标
必须监控:
性能
例如:P95 = 5s
表示:
95%的请求在5秒内完成。
稳定性
Success Rate429 Rate500 RateTimeout Rate
成本
Input TokensOutput TokensCost
例如:
九、核心代码结构
class LLMGateway: async def chat(self, prompt): # 1. Redis缓存 cached = await cache.get(prompt) if cached: return cached # 2. 限流 await token_bucket.acquire() # 3. 多模型路由 result = await router.chat(prompt) # 4. 缓存结果 await cache.set(prompt, result) return result
满分答案会额外提到
很多高级候选人还会补充:
OpenTelemetryTraceIDRequestID
用于调用链追踪。
以及:
例如:
OpenAI线程池Claude线程池Gemini线程池
避免某个供应商故障拖垮整个系统。