《LanceDB:从上手到内核》系列 · 第 2 篇
分析基准:LanceDB Python SDK 0.30.2 + Lance crate 6.0.0-beta.7(Cargo.toml workspace 一致)实测环境:macOS 15 · Python 3.13.6 · pyarrow 24.0.0 · numpy 2.4.4 · pandas 3.0.2
一句话速读
- • 建表:三种方式(List[Dict] / Schema / DataFrame);生产环境推荐显式 Schema
- • CRUD:
add / update / delete / count_rows;删除是软删除,物理释放走 optimize() - • Upsert:
merge_insert("key") 一条命令搞定,支持三种组合策略 - • 三种检索:向量(ANN)、全文(Native FTS)、混合(Hybrid + RRF 重排)
- • 异步 API:
connect_async → AsyncConnection → AsyncTable,功能对等
一个下午搭好 RAG 知识库
第 1 篇 30 秒跑通了基础向量检索。真实的 RAG 应用还需要:向量 + 全文 + 元数据过滤 + 增删改查 + 自动向量化。本文把 Python SDK 从头到尾过一遍。
完整 CRUD:增删改查
建表:三种方式
import lancedbimport numpy as npimport pandas as pdimport pyarrow as padb = lancedb.connect("/tmp/lancedb_tutorial")rng = np.random.default_rng(42)# 方式 1:从 List[Dict] 推断 Schema(最简单)data = [ {"id": 1, "text": "hello world", "category": "tech","vector": rng.random(384).tolist()}, {"id": 2, "text": "goodbye world", "category": "news","vector": rng.random(384).tolist()},]table = db.create_table("docs", data=data, mode="overwrite")# 方式 2:显式指定 Schema(生产推荐)schema = pa.schema([ pa.field("id", pa.int64()), pa.field("text", pa.string()), pa.field("category", pa.string()), pa.field("vector", pa.list_(pa.float32(), 384)),])table = db.create_table("docs_v2", schema=schema, mode="overwrite")# 方式 3:从 Pandas DataFramedf = pd.DataFrame({"id": range(100),"text": [f"doc {i}"for i inrange(100)],"vector": [rng.random(384).tolist() for _ inrange(100)],})table = db.create_table("docs_v3", data=df, mode="overwrite")
⚠️ 方式 1 的坑:Schema 是从第一条数据推断的。若首条某字段为 None,类型会被推断为错误类型(例如 null 而非 int64)。生产环境建议用方式 2。
写入:append / overwrite
# 追加(默认 mode="append")table.add([ {"id": 3, "text": "new doc", "category": "tech","vector": rng.random(384).tolist()},])# 覆盖(清空旧数据后写入)new_batch = [{"id": 100, "text": "reset", "category": "tech","vector": rng.random(384).tolist()}]table.add(new_batch, mode="overwrite")
LanceDB 接受多种输入格式:
| |
List[Dict] | |
pd.DataFrame | |
pa.Table | |
pl.DataFrame | Polars(需 pip install polars) |
pa.RecordBatchReader | |
更新和删除
# 更新:SQL 谓词 + 新值table.update(where="id = 1", values={"text": "updated doc"})# 删除:SQL 谓词table.delete("id > 100")# 统计print(table.count_rows()) # 总行数print(table.count_rows("category = 'tech'")) # 带过滤
软删除语义:delete() 只是在新版本的 Manifest 中标记行为已删除,旧 Fragment 仍在磁盘上。要真正释放空间,调用 table.optimize()(内部走 Lance 的 compact_files)。
Upsert:merge_insert
传统做法"先查 → 存在则 update,不存在则 insert"需要两次操作。LanceDB 用一条 merge_insert 搞定:
new_data = pa.table({"id": pa.array([1, 999], type=pa.int64()),"text": ["updated again", "brand new doc"],"category": ["tech", "news"],"vector": pa.array( [rng.random(384).tolist() for _ inrange(2)],type=pa.list_(pa.float32(), 384), ),})( table.merge_insert("id") # 以 id 列为 join key .when_matched_update_all() # 匹配行 → 更新全部列 .when_not_matched_insert_all() # 未匹配 → 插入新行 .execute(new_data))
三种策略可以自由组合(python/python/lancedb/merge.py):
| |
when_matched_update_all(where=?) | join 命中 → 用源行覆写目标行所有列;可加 where 条件 |
when_not_matched_insert_all() | |
when_not_matched_by_source_delete(where=?) | |
三种检索模式
模式 1:向量检索(ANN)
query_vec = rng.random(384).astype("float32")results = ( table.search(query_vec) .limit(10) .to_pandas())# 返回列:id, text, category, vector, _distance
_distance 默认是 L2 平方距离(未开方,见第 1 篇脚注)。越小越相似。
带过滤的向量检索(RAG 刚需):
results = ( table.search(query_vec) .where("category = 'tech' AND id > 10", prefilter=True) .select(["id", "text", "_distance"]) .limit(5) .to_pandas())
关于 prefilter(python/python/lancedb/query.py:881,默认 True):
| | |
prefilter=True | | 最终一定满足过滤条件;若过滤后候选太少,可能返回不足 limit 条 |
prefilter=False | | top-K 中不满足条件的被丢弃,返回行数可能 <limit |
常用 SQL 过滤语法:
"score > 0.8"# 比较"year BETWEEN 2020 AND 2024""category = 'tech' AND score > 0.8"# 逻辑"category IN ('tech', 'news')"# 集合"description IS NOT NULL"# 空值"title LIKE '%python%'"# 模糊(大小写敏感)"title ILIKE '%PYTHON%'"# 模糊(大小写不敏感)
模式 2:全文检索(Native FTS)
向量找"语义相似",全文找"关键词命中"。两者互补。
# 1. 建 FTS 索引# ⚠️ 当前 Native FTS 每次只能对单列建索引,多列需逐个调用# ⚠️ PhraseQuery / 短语匹配需要 with_position=Truetable.create_fts_index("text", with_position=True)# 2. 字符串检索(最简用法)results = ( table.search("machine learning", query_type="fts") .limit(10) .to_pandas())# 返回列含 _score(BM25 分数,越高越相关)
更精细的查询构造(注意 MatchQuery / PhraseQuery 的 column 是必需参数,python/python/lancedb/query.py:180-230):
from lancedb.query import MatchQuery, PhraseQuery# 模糊匹配(fuzziness 控制编辑距离)mq = MatchQuery("machne learning", column="text", fuzziness=1)results = table.search(mq, query_type="fts").limit(10).to_pandas()# 短语精确匹配(需要 with_position=True 的索引)pq = PhraseQuery("machine learning", column="text", slop=0)results = table.search(pq, query_type="fts").limit(10).to_pandas()# 布尔组合(& 为 AND,| 为 OR)boolean = MatchQuery("python", "text") & MatchQuery("database", "text")results = table.search(boolean, query_type="fts").limit(10).to_pandas()
关于 use_tantivy:旧版基于 tantivy-py 的 FTS 已不再推荐,create_fts_index(..., use_tantivy=True) 在新 SDK 中会直接报错提示改用 Native FTS。
模式 3:混合检索(Hybrid)
杀手级功能。用户搜"Python 异步编程",纯向量可能返回 JavaScript 的异步内容(语义相近),纯关键词可能漏掉"asyncio 协程"。混合检索两者兼顾。
用法 A · 显式双路(无 Embedding Function 的表):
from lancedb.rerankers import RRFRerankerquery_vec = rng.random(384).astype("float32")results = ( table.search(query_type="hybrid") .vector(query_vec) # 显式给向量分支的 query .text("异步编程") # 显式给 FTS 分支的 query .rerank(RRFReranker(K=60)) .limit(10) .to_pandas())# 返回列含 _relevance_score
用法 B · 绑定 Embedding Function 的表(字符串 query 自动走两路):
# 前提:表的 schema 用 LanceModel 定义了 SourceField + VectorField(下一节示例)results = ( table.search("Python 异步编程", query_type="hybrid") .rerank(RRFReranker(K=60)) .limit(10) .to_pandas())
混合检索执行流程(rust/lancedb/src/query.rsexecute_hybrid):
- 1. 并行执行向量检索 + 全文检索(Rust 端
try_join!) - 3. RRF 重排(
rerankers/rrf.py,默认 K=60):score_i = Σ_branch 1 / (rank_branch(i) + K)
K=60 的由来:Cormack et al., SIGIR 2009 的推荐值。大多数场景无需调整;若想突出"排名靠前的文档"权重,减小 K。
想看所有中间分数(调试用):
results = ( table.search(query_type="hybrid") .vector(query_vec).text("异步编程") .rerank(RRFReranker(return_score="all")) .limit(10) .to_pandas())# 列含:_distance(向量分支)、_score(FTS 分支)、_relevance_score(RRF 融合)
多种输出格式
q = table.search(query_vec).limit(10)df = q.to_pandas() # Pandas DataFrame(最常用)arrow_table = q.to_arrow() # PyArrow Table(零拷贝,最快)records = q.to_list() # List[Dict](最灵活)# polars_df = q.to_polars() # 需要 pip install polars
下游是 PyTorch / TF Dataset 时推荐 to_arrow()——Arrow → NumPy 零拷贝。
完整 RAG 示例(本地可跑,不依赖云 API)
下面用一个本地注册的 "dummy embedding function" 跑通全流程;把 dummy-example 换成 openai / sentence-transformers / ollama / bedrock-text 等任一已注册实现即可上生产:
import numpy as npimport lancedbfrom lancedb.embeddings.base import TextEmbeddingFunctionfrom lancedb.embeddings.registry import register, get_registryfrom lancedb.pydantic import LanceModel, Vectorfrom lancedb.rerankers import RRFReranker# 1. 注册一个本地 embedding(生产直接用 @register("openai"))@register("dummy-example")classDummyEmbedding(TextEmbeddingFunction):defndims(self) -> int:return8defgenerate_embeddings(self, texts):return [[float((hash((t, i)) % 1000)) / 1000.0for i inrange(8)]for t in texts]emb = get_registry().get("dummy-example").create()# 2. 声明 schema(content 自动算向量,写入时无需手动 encode)classDocument(LanceModel): doc_id: str title: str content: str = emb.SourceField() vector: Vector(emb.ndims()) = emb.VectorField()# 3. 建表 + 写入 + 建 FTSdb = lancedb.connect("/tmp/lancedb_rag_demo")table = db.create_table("knowledge", schema=Document, mode="overwrite")table.add([ {"doc_id": "1", "title": "LanceDB 简介","content": "LanceDB is a serverless vector database on Lance format"}, {"doc_id": "2", "title": "向量索引","content": "LanceDB supports IVF-PQ and IVF-HNSW-SQ vector indices"},])table.create_fts_index("content", with_position=True)# 4. 字符串 query 自动走 Embedding Function → 混合检索results = ( table.search("vector database", query_type="hybrid") .rerank(RRFReranker(K=60)) .limit(3) .to_list())for r in results:print(r["title"], "·", r["content"])# 5. 把 results 拼 context 交给 LLM(此处省略)
SourceField() + VectorField() 的声明式设计把"文本 → 向量"这一步托管给 LanceDB;插入和查询时都自动调用对应的 Embedding 实现。
异步 API
应用是异步的(FastAPI、aiohttp 等)时,用 connect_async:
import asyncioimport lancedbasyncdefmain(): db = await lancedb.connect_async("/tmp/lancedb_async_demo") data = [{"id": i, "vector": [float(i % 10)] * 128} for i inrange(100)] table = await db.create_table("async_table", data=data)# 异步向量检索 r = await table.query().nearest_to([0.0] * 128).limit(5).to_arrow()print(f"async 返回 {r.num_rows} 行")# 版本管理(异步) version = await table.version()await table.checkout(version)await table.checkout_latest()asyncio.run(main())
同步 / 异步 API 功能对等:Table ↔ AsyncTable,Connection ↔ AsyncConnection。
小结
- 1. 建表三种方式:List[Dict](方便)、Schema(生产推荐)、DataFrame
- 2. 软删除 +
optimize():磁盘释放走定期压缩 - 3.
merge_insert 三策略:matched_update_all / not_matched_insert_all / not_matched_by_source_delete - 4. 三种检索:向量、Native FTS(单列建索引、PhraseQuery 需
with_position=True)、Hybrid - 5. 混合检索两种写法:无 Embedding 时
.vector().text() 显式双路;有 Embedding 时字符串 query 自动双路
延伸阅读
- • Table API:
python/python/lancedb/table.py - • 查询构造器:
python/python/lancedb/query.py(MatchQuery / PhraseQuery / BooleanQuery 定义在 180-330 行) - • MergeInsert:
python/python/lancedb/merge.py - • RRF 重排:
python/python/lancedb/rerankers/rrf.py(默认 K=60) - • Rust 内核混合执行:
rust/lancedb/src/query.rs - • 可复现示例:
verified-examples/02/test_python_guide.py - • 下一篇:第 3 篇 —— 多语言 SDK 与生态集成(TypeScript / Rust / Java 现状 + LangChain / LlamaIndex)