1. 掌握 LLM-as-Judge 评估模式:用 LLM 对检索结果打分
2. 掌握查询重写策略:换关键词 / 加限定词 / 换角度
3. 掌握自适应检索器的封装:检索 → 评估 → 重写 → 重试
4. 理解 Token 预算控制:重试次数限制,防止无限循环
掌握自适应检索器的封装:检索 → 评估 → 重写 → 重试
Agentic RAG 的检索评估器:
检索评估器负责实时判断召回内容的相关性与充分性
Agentic RAG 的自适应检索器:
自适应检索器则根据评估反馈动态调整查询策略或切换知识源,两者协同让 RAG 从单次被动检索进化为具备反思和自主决策能力的智能体
3.1:🧠 检索评估器
根据用户问题和检索到的上下文,评估检索结果的相关度

3.2:🔀 自适应检索器
重写查询后重新检索,再评估,不够再重写……形成循环。但必须有重试上限(`max_retries`),防止无限循环。

3.3:LangGraph Agentic RAG 状态机流程图

3.4:三种查询重写策略


4.2 检索质量评估器

4.3 自适应检索器

4.4 演示 1:检索质量评估器

4.5 演示 2:自适应检索器

注意事项:
前置条件:
- 运行过之前文章代码(chroma_db 目录已存在)
- ollama pull nomic-embed-text
- ollama 模型 qwen3:4b 可用
# -*- coding: utf-8 -*-"""Day9 · 检索质量评估 + 动态重试================================任务编号:Day9-02执行顺序:2/3(Agentic RAG 进阶:检索评估器 + 自适应检索器)学习目标:1. 掌握 LLM-as-Judge 评估模式:用 LLM 对检索结果打分2. 掌握查询重写策略:换关键词 / 加限定词 / 换角度3. 掌握自适应检索器的封装:检索 → 评估 → 重写 → 重试4. 理解 Token 预算控制:重试次数限制,防止无限循环前置条件:- 完成 Day8-01(chroma_db 目录已存在)- ollama pull nomic-embed-text- ollama 模型 qwen3:4b 可用核心模式:RetrievalResult → 标准化检索结果(文档 + 评分 + 充分性)RetrievalEvaluator → LLM 评估 + 查询重写AdaptiveRetriever → 自适应检索(检索 → 评估 → 重试循环)"""from pathlib import Pathimport sys as _sysfrom dataclasses import dataclass, field_SCRIPT_DIR = Path(__file__).parent_DAY08_DIR = _SCRIPT_DIR.parent / "day08"_sys.path.insert(0, str(_SCRIPT_DIR))# ============================================================# 数据类:标准化的检索结果# ============================================================@dataclassclass RetrievalResult:"""封装一次检索的结果 + 评估信息。字段说明:documents:检索到的文档列表(langchain_core.documents.Document)scores:各文档的相关度评分(0-1),由 LLM 评估avg_score:平均评分,用于判断整体检索质量is_sufficient:是否信息充分(avg_score >= threshold)query_used:本次检索使用的查询(可能被重写)attempt:第几次尝试(从 1 开始)为什么需要标准化?- 传统 RAG 只返回文档列表,没有质量信息- Agentic RAG 需要评估结果来决定是否重试- 标准化后可以传递给 LangGraph 的状态(Day9-03)"""documents: list = field(default_factory=list) # list[Document]scores: list = field(default_factory=list) # list[float]avg_score: float = 0.0is_sufficient: bool = Falsequery_used: str = ""attempt: int = 1def summary(self) -> str:"""返回简要摘要,用于日志输出"""status = "充分" if self.is_sufficient else "不足"return f"第{self.attempt}轮 | query=\"{self.query_used[:30]}\" | avg={self.avg_score:.2f} | {status}"# ============================================================# 检索质量评估器# ============================================================class RetrievalEvaluator:"""用 LLM 评估检索结果的质量,并重写查询以提升检索效果。核心方法:evaluate():LLM 对检索结果逐条打分,判断整体是否充分rewrite_query():LLM 重写查询,提升下次检索的召回率为什么用 LLM 评估而不是规则?- 规则评估(如 distance 阈值)只能判断语义相似度- LLM 评估能判断"信息是否充分回答问题"(语义理解能力)- 生产环境常用模式:LLM-as-Judge"""def __init__(self, model: str = "qwen3:4b"):"""初始化评估器。参数:model:Ollama 模型名称"""self.model = modeldef evaluate(self, query: str, documents: list, threshold: float = 0.6) -> RetrievalResult:"""评估检索结果质量:LLM 逐条打分 + 判断整体充分性。流程:1. 将文档内容拼接为上下文2. LLM 评估每个文档的相关度(0-1)3. 计算平均分,判断是否充分参数:query:用户原始查询documents:检索到的文档列表(list[dict],每个含 content/metadata/distance)threshold:充分性阈值(avg_score >= threshold 视为充分)返回:RetrievalResult 实例"""from langchain_ollama import ChatOllamaimport reif not documents:return RetrievalResult(documents=[], scores=[], avg_score=0.0,is_sufficient=False, query_used=query,)# 拼接文档内容docs_text = "\n".join(f"[文档{i+1}] {doc.get('content', '')[:200]}"for i, doc in enumerate(documents))eval_prompt = f"""你是一个检索质量评估器。根据用户问题和检索到的文档,评估每个文档的相关度。【用户问题】{query}【检索到的文档】{docs_text}请对每个文档评分(0到1之间),格式如下:文档1: 0.X文档2: 0.X...评分标准:- 0.8-1.0:文档直接回答了问题- 0.6-0.8:文档部分相关- 0.4-0.6:文档有一定关联但缺少关键信息- 0.0-0.4:文档与问题基本无关只输出评分,每行一个,不要其他内容:"""try:llm = ChatOllama(model=self.model, temperature=0, num_ctx=4096)response = llm.invoke(eval_prompt)score_text = response.content.strip()# 解析评分scores = []for line in score_text.split("\n"):match = re.search(r'(\d+\.?\d*)', line)if match:s = float(match.group(1))scores.append(max(0.0, min(1.0, s)))# 如果评分数量不够,用默认值补齐while len(scores) < len(documents):scores.append(0.5)avg_score = sum(scores) / len(scores) if scores else 0.0except Exception as e:print(f" [评估异常] {e},使用默认评分")scores = [0.5] * len(documents)avg_score = 0.5return RetrievalResult(documents=documents,scores=scores,avg_score=avg_score,is_sufficient=avg_score >= threshold,query_used=query,)def rewrite_query(self, original_query: str, failed_query: str, eval_score: float) -> str:"""LLM 重写查询,提升下次检索的召回率。重写策略(让 LLM 从三个角度思考):1. 添加更具体的关键词("AI" → "Agentic RAG 自适应检索")2. 换一种表述方式("怎么让 AI 查资料" → "Agent 检索决策机制")3. 拆分为更聚焦的子问题参数:original_query:用户原始查询failed_query:上次检索失败的查询(可能已经被重写过)eval_score:上次评估的评分返回:重写后的查询字符串"""from langchain_ollama import ChatOllamarewrite_prompt = f"""你是一个查询优化专家。用户提出了一个问题,但当前检索结果不够相关。【原始问题】{original_query}【当前检索查询】{failed_query}【检索结果评分】{eval_score:.2f}(满分 1.0,低于 0.6 表示信息不充分)请从以下三个角度之一重写检索查询:1. 添加更具体的关键词(如"AI查资料" → "Agentic RAG 自适应检索机制")2. 换一种表述方式(如"怎么让AI自己决定" → "Agent 检索决策机制")3. 拆分为更聚焦的子问题只输出重写后的查询,不要其他内容:"""try:llm = ChatOllama(model=self.model, temperature=0.7, num_ctx=4096)response = llm.invoke(rewrite_prompt)new_query = response.content.strip()# 去除可能的引号包裹new_query = new_query.strip('"\'""''')if len(new_query) < 3:return failed_query # 重写太短,保持原查询return new_queryexcept Exception as e:print(f" [重写异常] {e},保持原查询")return failed_query# ============================================================# 自适应检索器# ============================================================class AdaptiveRetriever:"""自适应检索器:检索 → 评估 → 重写 → 重试的完整循环。这是 Agentic RAG 的核心实现,封装了:- 底层向量检索(Chroma)- 检索质量评估(RetrievalEvaluator)- 查询重写(RetrievalEvaluator.rewrite_query)- 重试控制(max_retries + Token 预算)对比 Day8 的传统 RAG:Day8:检索一次 → 返回结果Day9:检索 → 评估 → 不够则重写 → 再检索 → 循环"""def __init__(self,collection,evaluator: RetrievalEvaluator | None = None,max_retries: int = 3,score_threshold: float = 0.6,top_k: int = 3,):"""初始化自适应检索器。参数:collection:Chroma Collection 对象evaluator:检索评估器(默认使用 RetrievalEvaluator)max_retries:最大重试次数score_threshold:评分阈值(低于此值触发重试)top_k:每次检索的文档数量"""self.collection = collectionself.evaluator = evaluator or RetrievalEvaluator()self.max_retries = max_retriesself.score_threshold = score_thresholdself.top_k = top_kdef retrieve(self, query: str) -> tuple[RetrievalResult, list[RetrievalResult]]:"""核心方法:自适应检索,返回最佳结果和全部历史。流程:1. 执行向量检索2. 评估检索质量3. 质量不足 → 重写查询 → 重新检索(循环)4. 质量充足或达最大重试次数 → 返回结果参数:query:用户查询返回:(best_result, all_results)best_result:评分最高的 RetrievalResultall_results:所有轮次的 RetrievalResult 列表(用于调试)"""current_query = queryall_results = []best_result = Nonefor attempt in range(1, self.max_retries + 1):# 步骤 1:向量检索docs = self._do_vector_search(current_query)# 步骤 2:评估检索质量result = self.evaluator.evaluate(query, docs, self.score_threshold)result.query_used = current_queryresult.attempt = attemptall_results.append(result)print(f" {result.summary()}")# 更新最佳结果if best_result is None or result.avg_score > best_result.avg_score:best_result = result# 步骤 3:判断是否需要重试if result.is_sufficient:print(f" [OK] 检索质量充分,无需重试")breakif attempt >= self.max_retries:print(f" [WARN] 已达最大重试次数 {self.max_retries},使用最佳结果")break# 步骤 4:重写查询print(f" [重写] 评分 {result.avg_score:.2f} < {self.score_threshold},重写查询...")new_query = self.evaluator.rewrite_query(query, current_query, result.avg_score)print(f" [重写] \"{current_query[:40]}\" → \"{new_query[:40]}\"")current_query = new_query# 如果所有结果都不充分,返回评分最高的那个if best_result is None:best_result = RetrievalResult(query_used=query, attempt=0)return best_result, all_resultsdef _do_vector_search(self, query: str) -> list[dict]:"""底层向量检索:Chroma query。封装嵌入和查询逻辑,与评估逻辑解耦。注意:每次检索都重新实例化 OllamaEmbeddings,因为 OllamaEmbeddings 不是 Pydantic 字段(Day8 踩坑经验)。参数:query:查询字符串返回:list[dict],每个 dict 含 content / metadata / distance"""from langchain_ollama import OllamaEmbeddingsembeddings = OllamaEmbeddings(model="nomic-embed-text")query_vector = embeddings.embed_query(query)results = self.collection.query(query_embeddings=[query_vector],n_results=self.top_k,include=["documents", "metadatas", "distances"],)docs = []raw_docs = results.get("documents", [[]])[0]raw_metas = (results.get("metadatas") or [[]])[0]raw_dists = (results.get("distances") or [[]])[0]for i, content in enumerate(raw_docs):meta = raw_metas[i] if i < len(raw_metas) else {}distance = raw_dists[i] if i < len(raw_dists) else Nonedocs.append({"content": content,"metadata": meta,"distance": distance,})return docs# ============================================================# 工具函数:加载 Collection# ============================================================def _load_collection(persist_directory: str = None, collection_name: str = "day08_rag"):"""加载 Day8 的 Chroma Collection(同 01_agentic_rag_basics.py)"""import chromadbif persist_directory is None:persist_directory = str(_DAY08_DIR / "chroma_db")client = chromadb.PersistentClient(path=persist_directory)collection = client.get_collection(name=collection_name)print(f" [OK] Collection '{collection_name}' 加载成功,共 {collection.count()} 条记录")return collection# ============================================================# 演示 1:检索质量评估器# ============================================================def demo_evaluator():"""演示 RetrievalEvaluator 的评估能力。对比两种查询的评估结果:1. 好查询:"LangGraph 的核心概念是什么?"→ 检索结果应高度相关,avg_score 应较高2. 差查询:"今天中午吃什么?"→ 检索结果应不相关,avg_score 应较低"""print("\n" + "=" * 60)print("Day9-02a · 检索质量评估器演示")print("=" * 60)# 加载 Collectionprint("\n[初始化] 加载 Collection...")try:collection = _load_collection()except Exception as e:print(f"[错误] 无法加载 Collection:{e}")returnevaluator = RetrievalEvaluator()# 简单向量检索(不用自适应)from langchain_ollama import OllamaEmbeddingsdef simple_search(query: str) -> list[dict]:embeddings = OllamaEmbeddings(model="nomic-embed-text")qv = embeddings.embed_query(query)results = collection.query(query_embeddings=[qv], n_results=3,include=["documents", "metadatas", "distances"],)docs = []raw_docs = results.get("documents", [[]])[0]raw_metas = (results.get("metadatas") or [[]])[0]raw_dists = (results.get("distances") or [[]])[0]for i, content in enumerate(raw_docs):meta = raw_metas[i] if i < len(raw_metas) else {}docs.append({"content": content, "metadata": meta, "distance": raw_dists[i] if i < len(raw_dists) else None})return docs# 测试test_queries = [("好查询", "LangGraph 的核心概念是什么?"),("差查询", "今天中午吃什么?"),]for label, query in test_queries:print(f"\n[{label}] \"{query}\"")docs = simple_search(query)result = evaluator.evaluate(query, docs)print(f" 评分详情:")for i, (doc, score) in enumerate(zip(result.documents, result.scores)):content = doc.get("content", "")[:50]print(f" 文档{i+1}: score={score:.2f} | {content}...")print(f" 平均分:{result.avg_score:.2f} | 充分性:{'是'if result.is_sufficient else'否'}")print("\n" + "=" * 60)print("评估器演示完成!")print("=" * 60)# ============================================================# 演示 2:自适应检索器# ============================================================def demo_adaptive_retriever():"""演示 AdaptiveRetriever 的自适应检索能力。对比两种查询:1. 精确查询:直接命中,无需重试2. 模糊查询:需要重写查询,多轮检索"""import timeprint("\n" + "=" * 60)print("Day9-02b · 自适应检索器演示")print("=" * 60)# 加载 Collectionprint("\n[初始化] 加载 Collection...")try:collection = _load_collection()except Exception as e:print(f"[错误] 无法加载 Collection:{e}")return# 创建自适应检索器retriever = AdaptiveRetriever(collection=collection,max_retries=3,score_threshold=0.6,top_k=3,)# 测试test_cases = [{"query": "MCP 协议的作用是什么?","desc": "精确查询 → 预期一轮通过",},{"query": "怎么让 AI 自己决定要不要查资料?","desc": "模糊查询 → 预期需要重写",},{"query": "什么是向量数据库的混合检索?","desc": "中等查询 → 可能需要 1-2 轮",},]for i, case in enumerate(test_cases, 1):print(f"\n{'=' * 60}")print(f"测试 {i}/{len(test_cases)}:{case['desc']}")print(f"查询:{case['query']}")print("-" * 40)start = time.time()best, all_results = retriever.retrieve(case["query"])elapsed = time.time() - startprint(f"\n [结果]")print(f" 最佳评分:{best.avg_score:.2f}")print(f" 检索轮次:{len(all_results)}")print(f" 充分性:{'是'if best.is_sufficient else'否'}")print(f" 耗时:{elapsed:.2f}s")# 打印检索历史if len(all_results) > 1:print(f"\n [检索历史]")for r in all_results:print(f" {r.summary()}")print("\n" + "=" * 60)print("自适应检索器演示完成!")print("=" * 60)print("\n关键收获:")print(" 1. RetrievalResult 标准化了检索结果(文档 + 评分 + 充分性)")print(" 2. RetrievalEvaluator 用 LLM-as-Judge 模式评估检索质量")print(" 3. AdaptiveRetriever 封装了检索 → 评估 → 重写的完整循环")print(" 4. Token 预算控制:max_retries 防止无限循环")if __name__ == "__main__":demo_evaluator()demo_adaptive_retriever()
5.1运行结果

