构建生产级本地 AI 搜索引擎:Python + Ollama + 混合检索 + RAG 的架构深潜与工程落地
如果说 ChatGPT 更像“对话式生成器”,Perplexity 更像“带证据链的答案引擎”,那么企业真正需要的,往往是第三种系统:既能接入公网搜索,也能接入内部知识库;既能流式回答,也能给出可追溯来源;既要低成本本地化部署,也要扛住高并发、可观测、可扩展。
本文不再停留在“搜索 + LLM = AI 搜索”这一层,而是从架构师视角,完整拆解一套可落地、可演进、可生产部署的本地 AI 搜索引擎方案。
一、为什么企业真正需要的是“本地 AI 搜索引擎”
很多团队一开始会把 AI 搜索想简单了:
这个 Demo 在技术分享里没问题,但一旦放到真实业务,就会迅速暴露四类系统性问题。
1.1 数据合规问题
企业知识往往来自:
这些内容通常不能直接上传到公网模型。尤其在医疗、金融、政企、制造场景,数据出域本身就不被允许。
1.2 搜索质量问题
单纯“搜网页 + 拼上下文”会遇到:
1.3 延迟与吞吐问题
一个能真正上线的 AI 搜索系统,必须回答以下问题:
1.4 工程治理问题
当系统开始服务真实用户时,架构重点就从“能不能回答”转向:
- • 可扩展:后续如何接入内部知识库、权限体系、多租户
所以,从企业视角看,本地 AI 搜索引擎的本质不是一个 Prompt 工程 Demo,而是一个“检索增强型答案系统”。
二、先给结论:一套生产级 AI 搜索引擎应该长什么样
2.1 核心能力清单
一套可上线的本地 AI 搜索引擎,至少要包含下面这些能力:
- • 混合召回:公网搜索 + 内部知识库 + 向量检索 + 关键词检索
- • 查询重写:把自然语言问题改写成更适合搜索的形式
- • 上下文治理:Chunk 切分、Token 预算、窗口裁剪、引用编号
- • 流式回答:SSE/WebSocket 输出,降低用户感知等待
- • 结果缓存:搜索结果缓存、抽取正文缓存、答案缓存
- • 可观测性:trace、metrics、日志、回放
- • 多模型演进:推理模型、Embedding 模型、Rerank 模型解耦
2.2 推荐总体架构
┌─────────────────────┐
│ Web / APP │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ API Gateway / BFF │
│ Auth / Quota / CORS │
└──────────┬──────────┘
│
┌─────────────────▼─────────────────┐
│ Search Orchestrator │
│ Query Rewrite / Fanout / Merge │
└───────┬──────────┬──────────┬─────┘
│ │ │
┌─────────▼───┐ ┌────▼────┐ ┌──▼──────────┐
│ Web Search │ │ Vector │ │ BM25 / FTS │
│ SearXNG │ │ Store │ │ OpenSearch │
└─────────┬───┘ └────┬────┘ └──┬──────────┘
│ │ │
└──────┬───┴──────┬───┘
│ │
┌────────▼──────────▼───────┐
│ Recall Merge + Rerank │
│ Dedup / Freshness / Score │
└────────┬───────────────────┘
│
┌────────▼────────┐
│ Context Builder │
│ Chunk / Budget │
└────────┬────────┘
│
┌──────────────▼──────────────┐
│ LLM Answer Service │
│ Ollama / vLLM / llama.cpp │
└──────────────┬──────────────┘
│
┌────────▼────────┐
│ SSE / Streaming │
└─────────────────┘
旁路支撑能力:
- Redis:缓存、幂等、热点保护
- PostgreSQL:审计、会话、检索记录、离线评测
- Prometheus + Grafana:观测
- Kafka / RabbitMQ:异步入库、索引更新
2.3 一句话理解这套架构
它不是“让 LLM 去搜索”,而是“由检索编排层先构造证据,再让 LLM 基于证据生成答案”。
这个差异决定了系统的可控性和可生产性。
三、核心原理:为什么 AI 搜索不是简单的 RAG
3.1 RAG 的基本流程
RAG 的经典链路是:
这个流程没错,但在生产环境中,真正决定效果的不是“有没有检索”,而是以下四个工程细节。
3.2 Query Rewrite:用户问题通常不适合直接检索
用户会这样问:
- • “为什么 Java 服务在 k8s 里 RSS 很高?”
这些问题直接拿去做搜索,经常命中不准。原因很简单:自然语言问题不等于高质量检索语句。
生产级做法通常是把用户问题拆成三个视角:
例如:
原问题:
为什么 Java 服务在 k8s 里 RSS 很高?
重写后:
Kubernetes Java RSS memory high container memory usage heap off-heap page cache
子问题:
1. Java 进程 RSS 由哪些部分组成
2. 容器内 RSS 和 JVM Heap 的关系
3. k8s 场景下如何排查 off-heap / mmap / page cache
这一步会直接提升召回质量。
3.3 Recall:单路召回在企业场景一定不够
生产系统通常不只一条召回链路,而是多路并行:
- • Web Search:适合查最新信息、公开资料、社区经验
- • Vector Search:适合语义相似问题和口语化表达
- • Metadata Filter:适合按部门、项目、时间、文档类型筛选
为什么要混合召回?
所以最常见的正确姿势是:
关键词召回保精度 + 向量召回保覆盖 + 公网召回补时效
3.4 Rerank:召回出来不代表能直接用
很多 Demo 在召回后直接把 TopK 拼进 Prompt。问题在于:
因此需要至少两级排序:
- • 粗排:关键词得分、来源权重、时间衰减、点击先验
- • 精排:Cross-Encoder Rerank,对“问题-文档片段”做成对打分
Cross-Encoder 的本质是:不是单独编码 query 和 document 再算余弦,而是把两者一起输入模型,让模型直接判断“这段内容到底能不能回答这个问题”。
在复杂问题里,这一步的提升通常比“换更大的生成模型”更明显。
3.5 Grounded Generation:答案必须被证据约束
生产级 AI 搜索和普通聊天机器人最大的区别,不是答案更长,而是答案更“可证伪”。
推荐的生成约束包括:
也就是说,我们需要的是:
answerable 比 fluent 更重要
faithful 比 creative 更重要
四、生产级架构设计:从 Demo 到可上线系统
4.1 分层架构
推荐把系统拆成 6 层。
接入层
职责:
可选技术:
编排层
职责:
这一层是整个系统的大脑。
检索层
职责:
模型层
职责:
建议分离部署,而不是混在一个 Python 进程里。
数据层
职责:
治理层
职责:
4.2 在线链路时序图
4.3 离线入库链路
在线搜索只是一半,企业真正可持续的价值来自“知识资产化”。
离线链路通常包括:
- 1. 文档采集:Wiki、PDF、Markdown、代码仓库、数据库、SOP
- 2. 内容解析:OCR、正文提取、表格抽取、代码块保留
- 5. 元数据注入:租户、部门、项目、版本、时间、权限
线上效果稳定的关键,往往不在 LLM,而在这条离线链路做得是否扎实。
五、工程化升级重点:高并发、可扩展、可治理
5.1 并发模型:不是所有步骤都应该串行
一次查询大致包含这些阶段:
其中最适合并行化的是:
其中最需要限流的是:
推荐的原则:
5.2 缓存设计:热点问题是天然红利
AI 搜索的缓存不是单一缓存,而是多层缓存。
L1:进程内缓存
适合缓存:
特点:
L2:Redis 分布式缓存
适合缓存:
缓存 Key 设计建议
不要只用 query 当 key,建议带上:
否则一旦 Prompt 或召回策略升级,缓存会污染新结果。
5.3 限流与熔断
生产环境中,最常见的两个崩法是:
推荐治理策略:
- • API 层限流:按租户、按用户、按 IP 配额
5.4 多模型解耦
千万不要把“一个模型解决所有问题”当成默认设计。
推荐解耦成三类模型:
这样做的好处是:
5.5 多租户与权限控制
企业知识库接入后,权限是必须做的,而不是“以后再说”。
最少要支持:
检索时的权限过滤必须前置在召回阶段,而不是等答案生成后再做脱敏。
因为一旦召回阶段就把不该看的内容送进模型,上下文已经泄漏了。
六、项目结构设计:一份更接近生产环境的代码组织
local-ai-search/
├── app/
│ ├── main.py
│ ├── config.py
│ ├── bootstrap.py
│ ├── api/
│ │ ├── deps.py
│ │ ├── routes_health.py
│ │ └── routes_search.py
│ ├── domain/
│ │ ├── models.py
│ │ ├── enums.py
│ │ └── contracts.py
│ ├── application/
│ │ ├── query_rewrite_service.py
│ │ ├── retrieval_service.py
│ │ ├── answer_service.py
│ │ ├── context_service.py
│ │ └── orchestrator.py
│ ├── infrastructure/
│ │ ├── cache/
│ │ │ ├── memory_cache.py
│ │ │ └── redis_cache.py
│ │ ├── llm/
│ │ │ └── ollama_client.py
│ │ ├── retrieval/
│ │ │ ├── searxng_client.py
│ │ │ ├── vector_store.py
│ │ │ ├── keyword_store.py
│ │ │ └── rerank_client.py
│ │ ├── parser/
│ │ │ └── content_extractor.py
│ │ └── observability/
│ │ ├── metrics.py
│ │ └── tracing.py
│ └── shared/
│ ├── retry.py
│ ├── hashing.py
│ ├── token_budget.py
│ └── jsonlog.py
├── scripts/
│ ├── ingest_docs.py
│ └── evaluate_rag.py
├── tests/
│ ├── unit/
│ ├── integration/
│ └── e2e/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── README.md
这样分层的意义是:
后续如果从 Ollama 切到 vLLM、从 Chroma 切到 Milvus,改动会局部很多。
七、核心数据模型设计
7.1 Pydantic 模型
# app/domain/models.py
from __future__ import annotations
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field, HttpUrl
class RetrievalSource(str, Enum):
WEB = "web"
INTERNAL = "internal"
VECTOR = "vector"
KEYWORD = "keyword"
class SearchRequest(BaseModel):
query: str = Field(..., min_length=1, max_length=1000)
tenant_id: str = Field(..., min_length=1, max_length=64)
user_id: str = Field(..., min_length=1, max_length=64)
session_id: str | None = Field(default=None, max_length=128)
stream: bool = True
top_k: int = Field(default=8, ge=1, le=20)
enable_web_search: bool = True
enable_internal_search: bool = True
retrieval_profile: str = Field(default="balanced", pattern="^(balanced|fast|deep)$")
class RetrievedChunk(BaseModel):
chunk_id: str
doc_id: str
title: str
content: str
url: HttpUrl | None = None
source: RetrievalSource
score: float = 0.0
metadata: dict[str, Any] = Field(default_factory=dict)
class Citation(BaseModel):
index: int
title: str
url: HttpUrl | None = None
source: RetrievalSource
class SearchAnswer(BaseModel):
answer: str
citations: list[Citation] = Field(default_factory=list)
latency_ms: int = 0
trace_id: str
degraded: bool = False
debug: dict[str, Any] = Field(default_factory=dict)
7.2 为什么要显式建模 RetrievedChunk
因为生产环境的排序、过滤、审计都要围绕 chunk,而不是原始 document。
原因包括:
八、配置设计:让系统能跑在开发、测试、生产三个环境
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
app_name: str = "local-ai-search"
env: str = "dev"
host: str = "0.0.0.0"
port: int = 8000
ollama_base_url: str = "http://ollama:11434"
ollama_chat_model: str = "qwen2.5:7b-instruct-q4_K_M"
ollama_embed_model: str = "bge-m3:latest"
ollama_timeout_seconds: int = 120
llm_max_concurrency: int = 4
searxng_base_url: str = "http://searxng:8080"
web_search_timeout_ms: int = 2500
redis_url: str = "redis://redis:6379/0"
postgres_dsn: str = "postgresql+psycopg://postgres:postgres@postgres:5432/aisearch"
max_context_tokens: int = 6000
max_chunk_tokens: int = 500
chunk_overlap_tokens: int = 80
retrieval_timeout_ms: int = 3000
rerank_timeout_ms: int = 1500
cache_ttl_search_seconds: int = 300
cache_ttl_answer_seconds: int = 180
trace_enabled: bool = True
metrics_enabled: bool = True
log_level: str = "INFO"
settings = Settings()
这里的设计重点不是字段多,而是:
后期调优会非常频繁,配置项必须充分外置。
九、检索链路设计:生产系统的真正核心
9.1 查询重写服务
# app/application/query_rewrite_service.py
from __future__ import annotations
import re
from dataclasses import dataclass
@dataclass(slots=True)
class RewriteResult:
original_query: str
search_queries: list[str]
intent: str
class QueryRewriteService:
"""
轻量版查询改写器。
生产环境可替换为小模型改写、规则引擎或领域词典增强。
"""
def rewrite(self, query: str) -> RewriteResult:
normalized = re.sub(r"\s+", " ", query.strip())
lowered = normalized.lower()
search_queries = [normalized]
if "k8s" in lowered:
search_queries.append(normalized.replace("k8s", "kubernetes"))
if "rss" in lowered:
search_queries.append(f"{normalized} off-heap mmap page cache")
if "排查" in normalized:
search_queries.append(f"{normalized} runbook troubleshooting")
# 去重保序
deduped = list(dict.fromkeys(search_queries))
intent = "diagnosis" if "排查" in normalized or "为什么" in normalized else "qa"
return RewriteResult(
original_query=normalized,
search_queries=deduped,
intent=intent,
)
这个实现不复杂,但已经体现了一条重要原则:
查询重写服务本质上是一个可演进策略点
你可以先用规则,再换小模型,再叠加领域词典。
9.2 召回聚合服务
# app/application/retrieval_service.py
from __future__ import annotations
import asyncio
from typing import Iterable
from app.domain.models import RetrievedChunk, RetrievalSource
class RetrievalService:
def __init__(self, web_client, vector_store, keyword_store, rerank_client):
self.web_client = web_client
self.vector_store = vector_store
self.keyword_store = keyword_store
self.rerank_client = rerank_client
async def retrieve(
self,
query: str,
search_queries: list[str],
top_k: int,
enable_web_search: bool,
enable_internal_search: bool,
) -> list[RetrievedChunk]:
tasks = []
if enable_web_search:
tasks.append(self._safe_web_search(search_queries))
if enable_internal_search:
tasks.append(self._safe_vector_search(query, top_k * 3))
tasks.append(self._safe_keyword_search(query, top_k * 3))
recall_results = await asyncio.gather(*tasks, return_exceptions=True)
merged = []
for result in recall_results:
if isinstance(result, Exception):
continue
merged.extend(result)
deduped = self._deduplicate(merged)
reranked = await self.rerank_client.rerank(query, deduped, top_k=top_k)
return reranked
async def _safe_web_search(self, queries: list[str]) -> list[RetrievedChunk]:
results = await self.web_client.multi_search(queries)
return [
RetrievedChunk(
chunk_id=item["id"],
doc_id=item["doc_id"],
title=item["title"],
content=item["content"],
url=item.get("url"),
source=RetrievalSource.WEB,
score=float(item.get("score", 0.0)),
metadata=item.get("metadata", {}),
)
for item in results
]
async def _safe_vector_search(self, query: str, limit: int) -> list[RetrievedChunk]:
return await self.vector_store.search(query, limit=limit)
async def _safe_keyword_search(self, query: str, limit: int) -> list[RetrievedChunk]:
return await self.keyword_store.search(query, limit=limit)
def _deduplicate(self, chunks: Iterable[RetrievedChunk]) -> list[RetrievedChunk]:
seen = set()
output: list[RetrievedChunk] = []
for chunk in chunks:
dedup_key = (chunk.doc_id, chunk.content[:120])
if dedup_key in seen:
continue
seen.add(dedup_key)
output.append(chunk)
return output
这里的关键点有三个:
9.3 Rerank 服务
# app/infrastructure/retrieval/rerank_client.py
from __future__ import annotations
from app.domain.models import RetrievedChunk
class RerankClient:
"""
示例实现:先融合规则分,再预留 cross-encoder 精排接口。
生产环境推荐接 BGE Reranker / jina-reranker / bce-reranker。
"""
async def rerank(self, query: str, chunks: list[RetrievedChunk], top_k: int) -> list[RetrievedChunk]:
query_terms = set(query.lower().split())
for chunk in chunks:
title_text = f"{chunk.title} {chunk.content[:500]}".lower()
lexical_score = sum(1 for term in query_terms if term in title_text) / max(len(query_terms), 1)
freshness_boost = 0.0
if chunk.metadata.get("updated_at"):
freshness_boost = 0.05
source_boost = 0.08 if chunk.source.value == "internal" else 0.03
chunk.score = chunk.score * 0.5 + lexical_score * 0.35 + freshness_boost + source_boost
chunks.sort(key=lambda item: item.score, reverse=True)
return chunks[:top_k]
真实线上系统里,一般会采用:
这个比例比较常见,因为:
十、上下文构建:决定生成质量的最后一道关
很多时候,模型答偏并不是模型不行,而是上下文构建做得差。
10.1 上下文构建原则
10.2 生产级 Context Builder
# app/application/context_service.py
from __future__ import annotations
from dataclasses import dataclass
import tiktoken
from app.config import settings
from app.domain.models import RetrievedChunk
@dataclass(slots=True)
class PromptPackage:
system_prompt: str
user_prompt: str
citations: list[dict]
class ContextService:
def __init__(self) -> None:
self.encoding = tiktoken.get_encoding("cl100k_base")
def build_prompt(self, query: str, chunks: list[RetrievedChunk]) -> PromptPackage:
system_prompt = (
"你是企业级 AI 搜索助手。"
"请严格基于参考资料回答,不要编造。"
"如果资料不足,请明确指出不确定。"
"回答结论后标注引用编号,例如[1][2]。"
"最后附上“参考来源”列表。"
)
token_budget = settings.max_context_tokens
consumed = 0
context_blocks = []
citations = []
for idx, chunk in enumerate(chunks, start=1):
block = f"[{idx}] 标题:{chunk.title}\n来源:{chunk.url or chunk.doc_id}\n内容:{chunk.content.strip()}"
block_tokens = len(self.encoding.encode(block))
if consumed + block_tokens > token_budget:
break
consumed += block_tokens
context_blocks.append(block)
citations.append(
{
"index": idx,
"title": chunk.title,
"url": str(chunk.url) if chunk.url else None,
"source": chunk.source.value,
}
)
user_prompt = (
f"用户问题:\n{query}\n\n"
f"参考资料:\n{'\n\n'.join(context_blocks)}\n\n"
"请输出:\n"
"1. 先给结论\n"
"2. 再给关键依据\n"
"3. 如果存在不确定性,单独说明\n"
"4. 文末列出参考来源"
)
return PromptPackage(
system_prompt=system_prompt,
user_prompt=user_prompt,
citations=citations,
)
10.3 为什么要预留回答空间
如果把上下文塞满整个窗口,会有三个副作用:
一般经验:
- • 10% 给 system prompt 和格式约束
这是比“能塞多长塞多长”更稳定的策略。
十一、LLM 推理服务:流式输出、并发控制、超时治理
11.1 Ollama 客户端封装
# app/infrastructure/llm/ollama_client.py
from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator
import httpx
from app.config import settings
class OllamaClient:
def __init__(self) -> None:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(settings.ollama_timeout_seconds, connect=5.0),
limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
)
self._semaphore = asyncio.Semaphore(settings.llm_max_concurrency)
async def chat_stream(self, system_prompt: str, user_prompt: str) -> AsyncIterator[str]:
payload = {
"model": settings.ollama_chat_model,
"stream": True,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"options": {
"temperature": 0.2,
"top_p": 0.9,
"num_predict": 1024,
},
}
async with self._semaphore:
async with self._client.stream("POST", f"{settings.ollama_base_url}/api/chat", json=payload) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line.strip():
continue
data = json.loads(line)
if data.get("done"):
break
message = data.get("message", {})
content = message.get("content")
if content:
yield content
async def close(self) -> None:
await self._client.aclose()
11.2 这里最容易犯的几个错
错误 1:用“请求数”当并发控制
LLM 的瓶颈不是 HTTP 请求数,而是模型推理资源。
正确做法是:
错误 2:没有连接池和 keepalive
如果每次都新建连接,TTFT 会被网络和握手拖慢。
错误 3:把搜索和推理绑定死
外部搜索偶发失败时,不应该整个系统不可用。
应该允许“无外部搜索降级回答”或者“仅内部知识库回答”。
十二、编排器:把一切串起来的关键服务
# app/application/orchestrator.py
from __future__ import annotations
import time
from collections.abc import AsyncIterator
from app.domain.models import SearchRequest
class SearchOrchestrator:
def __init__(self, rewrite_service, retrieval_service, context_service, answer_service):
self.rewrite_service = rewrite_service
self.retrieval_service = retrieval_service
self.context_service = context_service
self.answer_service = answer_service
async def stream_answer(self, request: SearchRequest) -> AsyncIterator[str]:
started_at = time.perf_counter()
degraded = False
rewritten = self.rewrite_service.rewrite(request.query)
chunks = await self.retrieval_service.retrieve(
query=rewritten.original_query,
search_queries=rewritten.search_queries,
top_k=request.top_k,
enable_web_search=request.enable_web_search,
enable_internal_search=request.enable_internal_search,
)
if not chunks:
degraded = True
prompt = self.context_service.build_prompt(request.query, chunks)
async for token in self.answer_service.answer_stream(prompt):
yield f"data: {token}\n\n"
latency_ms = int((time.perf_counter() - started_at) * 1000)
tail_payload = {
"type": "final",
"latency_ms": latency_ms,
"degraded": degraded,
"citations": prompt.citations,
}
yield f"data: {tail_payload}\n\n"
yield "data: [DONE]\n\n"
这个 orchestrator 的设计目标很明确:
- • 整个请求链路可以打通 trace_id 和 metrics
十三、API 层实现:支持 SSE 流式输出
# app/api/routes_search.py
from __future__ import annotations
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from app.api.deps import get_orchestrator
from app.domain.models import SearchRequest
router = APIRouter(prefix="/api/v1", tags=["search"])
@router.post("/search")
async def search(request: SearchRequest, orchestrator=Depends(get_orchestrator)):
stream = orchestrator.stream_answer(request)
return StreamingResponse(
stream,
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
为什么生产里更推荐 SSE?
只有当你需要:
才更建议上 WebSocket。
十四、生产级缓存设计
14.1 两级缓存实现思路
# app/infrastructure/cache/memory_cache.py
from __future__ import annotations
import time
class MemoryCache:
def __init__(self, max_size: int = 1024):
self.max_size = max_size
self.store: dict[str, tuple[float, object]] = {}
def get(self, key: str, ttl_seconds: int):
item = self.store.get(key)
if not item:
return None
saved_at, value = item
if time.time() - saved_at > ttl_seconds:
self.store.pop(key, None)
return None
return value
def set(self, key: str, value: object):
if len(self.store) >= self.max_size:
oldest_key = min(self.store.keys(), key=lambda item: self.store[item][0])
self.store.pop(oldest_key, None)
self.store[key] = (time.time(), value)
# app/infrastructure/cache/redis_cache.py
from __future__ import annotations
import json
import redis.asyncio as redis
class RedisCache:
def __init__(self, redis_url: str):
self.client = redis.from_url(redis_url, encoding="utf-8", decode_responses=True)
async def get_json(self, key: str):
value = await self.client.get(key)
return json.loads(value) if value else None
async def set_json(self, key: str, value: dict | list, ttl_seconds: int):
await self.client.setex(key, ttl_seconds, json.dumps(value, ensure_ascii=False))
14.2 什么值得缓存
最值得缓存的是:
不建议长期缓存的是:
14.3 缓存穿透与击穿怎么防
至少做三件事:
十五、可观测性建设:没有观测就没有生产系统
15.1 关键指标
建议采集以下指标。
用户体验指标
检索质量指标
系统资源指标
15.2 Prometheus 指标示例
# app/infrastructure/observability/metrics.py
from prometheus_client import Counter, Histogram, Gauge
request_total = Counter(
"ai_search_request_total",
"Total count of ai search requests",
["tenant_id", "status"],
)
request_latency = Histogram(
"ai_search_request_latency_ms",
"Latency of ai search request in ms",
buckets=(50, 100, 300, 500, 1000, 2000, 3000, 5000, 10000),
)
llm_inflight = Gauge(
"ai_search_llm_inflight",
"Current inflight requests of llm generation",
)
15.3 为什么要记录检索中间态
线上排查时最痛苦的不是报错,而是“答得不准但也没报错”。
所以建议保存:
当然要注意脱敏和审计权限。
有了这些信息,后续做离线评测、效果回放、Prompt 调优会轻松很多。
十六、离线评测体系:没有评测,就无法持续优化
很多团队做 AI 搜索时,只看“感觉好不好”。这在早期可以,但系统想持续迭代,必须有评测集。
16.1 建议评测维度
- • Answer Relevance:答案是否回答了问题
- • Context Precision:召回内容是否真的相关
- • Citation Accuracy:引用是否正确
16.2 实践建议
建立一批高价值问答对:
每次改 Prompt、改模型、改召回策略,都跑一轮离线评测。
这比“我感觉这次更聪明了”可靠得多。
十七、生产级部署方案
17.1 Docker Compose 版本
单机或小团队验证环境,推荐先用 Compose。
version: "3.9"
services:
api:
build: .
restart: always
ports:
- "8000:8000"
environment:
OLLAMA_BASE_URL: http://ollama:11434
REDIS_URL: redis://redis:6379/0
SEARXNG_BASE_URL: http://searxng:8080
depends_on:
- ollama
- redis
- searxng
ollama:
image: ollama/ollama:latest
restart: always
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
environment:
OLLAMA_KEEP_ALIVE: "24h"
OLLAMA_NUM_PARALLEL: "4"
OLLAMA_MAX_LOADED_MODELS: "2"
redis:
image: redis:7-alpine
restart: always
command: redis-server --appendonly yes --maxmemory 2gb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
searxng:
image: searxng/searxng:latest
restart: always
ports:
- "8080:8080"
volumes:
ollama_data:
17.2 Kubernetes 版本需要关注什么
到了 K8s,重点不是“能跑起来”,而是这些问题:
推荐拆分为:
这样可以按资源特征独立伸缩。
十八、两个真实业务场景,看看架构如何落地
18.1 场景一:企业内部知识问答
问题示例:
“支付服务出现下游超时,标准排查路径是什么?”
系统执行路径:
- 1. Query Rewrite 把“标准排查路径”改写为“runbook / troubleshooting”
- 2. 内部知识库 BM25 命中《支付超时故障排查手册》
- 4. Rerank 发现 runbook 文档优先级最高
- 5. Context Builder 选取 SOP 中的关键步骤和阈值
这个场景里:
18.2 场景二:本地技术研究助手
问题示例:
“为什么 Java 应用在 Kubernetes 里内存看起来比 Xmx 大很多?”
系统执行路径:
- 1. 外部搜索召回 JVM、容器内存、page cache 等公开资料
- 2. 内部知识库召回本团队的 K8s JVM 调优手册
- 3. 混合重排后,将“原理解释 + 团队标准实践”一起送入上下文
- 4. 最终答案既解释原理,也给出组织内推荐排查路径
这个场景的价值在于:
这正是“本地 AI 搜索引擎”比“纯聊天机器人”更有价值的地方。
十九、常见架构误区
误区 1:把大模型当搜索引擎
模型不具备稳定、可控、可追踪的检索能力。
搜索一定要由检索系统完成,模型负责理解和组织答案。
误区 2:只做向量检索,不做关键词检索
很多企业文档里有:
这些内容向量检索不一定强,关键词检索必须保留。
误区 3:只看回答流畅度,不看忠实度
最危险的答案,往往不是“答不上来”,而是“答得很像对的”。
误区 4:把 Prompt 当成唯一优化手段
在 AI 搜索里,通常优先级更高的是:
误区 5:把所有逻辑塞进一个 Python 文件
前期快,后期一定难维护。
检索、上下文、生成、缓存、观测最好从一开始就解耦。
二十、性能优化路线图
如果你准备把这套系统从 PoC 推向线上,建议按下面顺序优化。
第一阶段:先保证正确性
第二阶段:再优化延迟
第三阶段:再提升吞吐
第四阶段:最后优化演进能力
这条路线比“一上来就上最复杂架构”更现实。
二十一、最小可运行示例的依赖清单
fastapi>=0.115.0
uvicorn[standard]>=0.30.0
httpx>=0.27.0
pydantic>=2.8.0
pydantic-settings>=2.3.0
redis[hiredis]>=5.0.0
tiktoken>=0.7.0
trafilatura>=1.12.0
beautifulsoup4>=4.12.0
prometheus-client>=0.20.0
如果你计划继续往生产推进,建议再引入:
- • Milvus / OpenSearch / pgvector:更稳的检索底座
二十二、一份更完整的回答服务实现
为了让整条链路闭环,这里补一版 AnswerService。
# app/application/answer_service.py
from __future__ import annotations
from collections.abc import AsyncIterator
class AnswerService:
def __init__(self, ollama_client):
self.ollama_client = ollama_client
async def answer_stream(self, prompt_package) -> AsyncIterator[str]:
async for token in self.ollama_client.chat_stream(
system_prompt=prompt_package.system_prompt,
user_prompt=prompt_package.user_prompt,
):
yield token
它看起来简单,但这是好事。
因为复杂度已经被前面的分层吸收掉了。
如果后续要加:
都可以在这个服务里扩展,而不污染其它层。
二十三、如果你要继续做生产化,下一步应该补什么
这篇文章已经给出了一套能支撑中小规模生产落地的主干方案。
如果继续向企业级演进,建议优先补以下能力:
1. 文档权限过滤
让召回阶段就遵守 ACL,而不是回答后再处理。
2. 索引版本管理
支持双索引切换、灰度回滚、增量重建。
3. 对话记忆摘要
多轮问答时,不要把整个历史硬拼进上下文,要做摘要压缩。
4. 结构化答案输出
让模型输出 JSON Schema,方便前端渲染卡片、来源、置信度。
5. RAG 评测平台
把评测变成日常工程动作,而不是临时脚本。
二十四、全文总结
从架构视角看,本地 AI 搜索引擎的关键,不是“把 Ollama 跑起来”,而是把下面四件事真正做好:
- 1. 检索做对:混合召回、查询改写、精排去重、权限过滤
- 2. 生成做稳:上下文预算、引用约束、流式输出、降级策略
- 3. 工程做实:缓存、限流、熔断、观测、评测、回放
- 4. 架构做活:模型解耦、索引解耦、服务拆分、可灰度演进
如果只做 Demo,你会得到一个“偶尔惊艳”的系统。
如果按本文的思路做工程化升级,你更有机会得到一个“长期可维护、可持续优化、能真正服务业务”的生产级 AI 搜索平台。
附:一套推荐的技术选型组合
如果你希望快速落地,同时保留后续扩展空间,我会推荐下面这套组合:
| | |
|---|
| | |
| | |
| | |
| BGE Reranker / BCE Reranker | |
| | |
| | |
| PostgreSQL FTS / OpenSearch | |
| | |
| Prometheus + Grafana + OTel | |
| | |
如果是我来落地这类系统,通常会分三步走:
- • 第一步:FastAPI + Ollama + SearXNG + Redis,先跑通主链路
- • 第二步:补向量检索、Rerank、离线入库和评测
这个演进路径足够务实,也足够贴近真实项目。