第二十四章 多模态RAG与向量数据库进阶¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
从文本到图像、从向量检索到SQL查询——全面掌握进阶RAG技术
学习时间: 8-10小时 难度级别: ⭐⭐⭐⭐ 中高级 前置知识: RAG基础(第5章)、向量数据库(第6章)、高级RAG(第18章) 学习目标: 掌握多模态Embedding、Milvus向量数据库、Text2SQL、RAG评估等进阶技术
📖 章节导读¶
传统RAG只处理文本,但真实文档中充满了图表、公式和表格。本章从多模态Embedding出发,结合Milvus向量数据库和Text2SQL技术,构建能处理任意数据类型的进阶RAG系统,并掌握系统评估方法。
1. 多模态RAG概述¶
1.1 为什么需要多模态RAG¶
传统纯文本RAG的局限:
| 问题 | 场景 |
|---|---|
| 图像信息丢失 | 技术文档中的架构图、流程图无法被检索 |
| 表格数据失真 | PDF表格转文本后结构混乱 |
| 跨模态查询无法处理 | "找到和这张图类似的文档" |
| 公式无法理解 | 数学论文中LaTeX公式被转为乱码 |
1.2 统一检索架构¶
用户查询 (文本/图像)
↓
多模态Embedding模型 (CLIP / BGE-M3)
↓
统一向量空间
↓━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
↓ ↓
文本向量索引 图像/表格向量索引
↓ ↓
↓━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
↓
混合检索 + 重排序
↓
多模态上下文 → LLM生成
1.3 核心挑战与方案¶
| 挑战 | 方案 |
|---|---|
| 不同模态如何统一表示 | CLIP等多模态Embedding将文本和图像映射到同一向量空间 |
| 图像如何参与RAG | 方案A:用Vision LLM生成图像描述再检索;方案B:直接用多模态Embedding |
| 表格如何处理 | 转为Markdown/HTML保留结构,或直接Text2SQL查询 |
| 检索质量如何保证 | 多路召回 + Cross-Encoder重排序 |
2. 多模态Embedding¶
2.1 CLIP模型原理¶
CLIP (Contrastive Language-Image Pre-training) 是OpenAI开发的多模态模型,通过对比学习将文本和图像映射到同一向量空间。
核心思想: - 正样本对(匹配的文本-图像对)向量距离拉近 - 负样本对(不匹配的)向量距离推远 - 训练完成后,文本向量和图像向量可直接计算相似度
"""CLIP模型基础用法"""
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch
import numpy as np
# 加载模型
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
# --- 图像编码 ---
def encode_image(image_path: str) -> np.ndarray:
image = Image.open(image_path).convert("RGB")
inputs = processor(images=image, return_tensors="pt")
with torch.no_grad(): # 禁用梯度计算,节省内存(推理时使用)
features = model.get_image_features(**inputs)
# L2归一化
features = features / features.norm(dim=-1, keepdim=True)
return features.squeeze().numpy()
# --- 文本编码 ---
def encode_text(text: str) -> np.ndarray:
inputs = processor(text=[text], return_tensors="pt", padding=True)
with torch.no_grad():
features = model.get_text_features(**inputs)
features = features / features.norm(dim=-1, keepdim=True)
return features.squeeze().numpy()
# --- 文本-图像相似度 ---
def text_image_similarity(text: str, image_path: str) -> float:
text_emb = encode_text(text)
image_emb = encode_image(image_path)
return float(np.dot(text_emb, image_emb))
# 示例
# score = text_image_similarity("a cat sitting on a couch", "cat.jpg")
# print(f"相似度: {score:.4f}")
2.2 BGE-M3多功能文本嵌入¶
BGE-M3是BAAI推出的多功能文本Embedding模型(M3=Multi-Lingual, Multi-Functionality, Multi-Granularity),支持多语言、多粒度和混合检索。注意:BGE-M3是纯文本模型,不能直接编码图像,但与CLIP互补——CLIP负责图像检索,BGE-M3负责高质量文本检索:
"""BGE-M3嵌入 —— 支持Dense + Sparse + ColBERT多种检索方式"""
from sentence_transformers import SentenceTransformer
import numpy as np
# 加载BGE-M3模型
model = SentenceTransformer("BAAI/bge-m3")
def get_dense_embeddings(texts: list[str]) -> np.ndarray:
"""获取Dense Embedding(标准向量检索)"""
embeddings = model.encode(
texts,
normalize_embeddings=True, # L2归一化
show_progress_bar=True,
)
return embeddings
# 示例
texts = [
"什么是Transformer架构?",
"注意力机制的计算过程",
"如何训练大语言模型",
]
embeddings = get_dense_embeddings(texts)
print(f"Embedding维度: {embeddings.shape}") # (3, 1024)
# 计算相似度矩阵
similarity_matrix = np.dot(embeddings, embeddings.T)
print("相似度矩阵:")
print(similarity_matrix.round(3))
💡 维度说明:BGE-M3默认输出1024维向量,OpenAI
text-embedding-3-small输出1536维。在实际项目中,同一个Collection必须统一维度。后续Milvus章节使用OpenAI Embedding(1536维),如需换用BGE-M3,请将dimension参数改为1024。
2.3 图像嵌入实现¶
使用OpenAI的多模态能力生成图像描述,再用文本Embedding统一检索:
"""方案A:Vision LLM生成描述 + 文本Embedding(实用方案)"""
import base64
from openai import OpenAI
from pathlib import Path
client = OpenAI()
def image_to_description(image_path: str) -> str:
"""用GPT-4o-mini为图像生成详细描述"""
with open(image_path, "rb") as f: # with自动管理文件关闭
b64 = base64.b64encode(f.read()).decode("utf-8")
suffix = Path(image_path).suffix.lower()
mime_map = {".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg"}
mime_type = mime_map.get(suffix, "image/png")
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": (
"请详细描述这张图片的内容,包括:\n"
"1. 图片类型(图表/照片/架构图/截图等)\n"
"2. 主要内容和关键信息\n"
"3. 如果是图表,描述数据趋势和关键数值\n"
"4. 如果是架构图,描述组件和关系\n"
"用中文回答,尽量详细。"
),
},
{
"type": "image_url",
"image_url": {"url": f"data:{mime_type};base64,{b64}"},
},
],
}
],
max_tokens=500,
)
return response.choices[0].message.content
def get_text_embedding(text: str) -> list[float]:
"""获取文本Embedding"""
resp = client.embeddings.create(model="text-embedding-3-small", input=[text])
return resp.data[0].embedding
# 图像处理流水线
def process_image_for_rag(image_path: str) -> dict:
"""将图像转换为可检索的文档"""
description = image_to_description(image_path)
embedding = get_text_embedding(description)
return {
"type": "image",
"source": image_path,
"description": description,
"embedding": embedding,
}
2.4 文本-图像联合检索¶
"""统一检索实现:文本和图像在同一向量空间中检索"""
import numpy as np
from dataclasses import dataclass
@dataclass # @dataclass自动生成__init__等方法
class Document:
doc_id: str
doc_type: str # "text" | "image" | "table"
content: str # 原始内容或图像路径
description: str # 文本描述(图像为LLM生成的描述)
embedding: list[float]
metadata: dict = None
class MultimodalIndex:
"""多模态统一索引"""
def __init__(self):
self.documents: list[Document] = []
def add_document(self, doc: Document):
self.documents.append(doc)
def search(self, query_embedding: list[float], top_k: int = 5) -> list[tuple[Document, float]]:
"""向量相似度搜索"""
q = np.array(query_embedding)
results = []
for doc in self.documents:
d = np.array(doc.embedding)
score = float(np.dot(q, d) / (np.linalg.norm(q) * np.linalg.norm(d)))
results.append((doc, score))
results.sort(key=lambda x: x[1], reverse=True) # lambda匿名函数
return results[:top_k]
def hybrid_search(
self,
query_embedding: list[float],
doc_type_filter: str | None = None,
top_k: int = 5,
) -> list[tuple[Document, float]]:
"""混合搜索:向量检索 + 标量过滤"""
q = np.array(query_embedding)
results = []
for doc in self.documents:
if doc_type_filter and doc.doc_type != doc_type_filter:
continue
d = np.array(doc.embedding)
score = float(np.dot(q, d) / (np.linalg.norm(q) * np.linalg.norm(d)))
results.append((doc, score))
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
3. Milvus向量数据库¶
3.1 向量数据库对比¶
| 特性 | Milvus | Chroma | Pinecone | Qdrant |
|---|---|---|---|---|
| 部署方式 | 自托管/云 | 内嵌/自托管 | 纯云 | 自托管/云 |
| 最大向量数 | 百亿级 | 百万级 | 十亿级 | 十亿级 |
| 混合检索 | ✅ 原生 | ⚠️ 有限 | ✅ | ✅ |
| 多向量字段 | ✅ | ❌ | ❌ | ✅ |
| GPU加速 | ✅ | ❌ | N/A | ❌ |
| Python SDK | pymilvus | chromadb | pinecone | qdrant-client |
| 适用场景 | 大规模生产 | 快速原型 | 全托管 | 中等规模 |
3.2 Milvus Lite安装与使用¶
Milvus Lite是轻量级版本,适合开发和测试,无需启动服务端:
"""Milvus Lite快速上手"""
from pymilvus import MilvusClient
# 创建客户端(使用本地文件存储)
client = MilvusClient("./milvus_demo.db")
# 创建Collection
client.create_collection(
collection_name="docs",
dimension=1536, # text-embedding-3-small的维度
)
# 插入数据
data = [
{"id": 1, "vector": [0.1] * 1536, "text": "什么是RAG?", "source": "doc1.pdf"},
{"id": 2, "vector": [0.2] * 1536, "text": "向量数据库原理", "source": "doc2.pdf"},
{"id": 3, "vector": [0.3] * 1536, "text": "多模态嵌入技术", "source": "doc3.pdf"},
]
client.insert(collection_name="docs", data=data)
# 搜索
results = client.search(
collection_name="docs",
data=[[0.15] * 1536], # 查询向量
limit=2, # 返回top-2
output_fields=["text", "source"], # 返回的字段
)
for result in results[0]:
print(f" ID: {result['id']}, 距离: {result['distance']:.4f}")
print(f" 文本: {result['entity']['text']}")
3.3 Collection创建与索引配置¶
"""高级Collection配置:Schema定义 + 自定义索引"""
from pymilvus import (
MilvusClient,
CollectionSchema,
FieldSchema,
DataType,
)
client = MilvusClient("./milvus_advanced.db")
# --- 方法1:使用Schema精确定义 ---
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="doc_type", dtype=DataType.VARCHAR, max_length=20),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=10000),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=500),
FieldSchema(name="page_num", dtype=DataType.INT32),
FieldSchema(name="dense_vector", dtype=DataType.FLOAT_VECTOR, dim=1536),
]
schema = CollectionSchema(fields=fields, description="多模态文档集合")
# 创建Collection
client.create_collection(
collection_name="multimodal_docs",
schema=schema,
)
# 创建索引(HNSW索引适合大多数场景)
index_params = client.prepare_index_params()
index_params.add_index(
field_name="dense_vector",
index_type="HNSW",
metric_type="COSINE",
params={
"M": 16, # 每层连接数(越大越精确,越慢)
"efConstruction": 256, # 构建时搜索宽度
},
)
client.create_index(
collection_name="multimodal_docs",
index_params=index_params,
)
print("✅ Collection和索引创建完成")
3.4 向量插入与检索¶
"""完整的插入与检索流程"""
from pymilvus import MilvusClient
from openai import OpenAI
openai_client = OpenAI()
milvus_client = MilvusClient("./milvus_rag.db")
# 创建collection
milvus_client.create_collection(
collection_name="knowledge_base",
dimension=1536,
)
def embed(texts: list[str]) -> list[list[float]]:
resp = openai_client.embeddings.create(
model="text-embedding-3-small", input=texts
)
return [d.embedding for d in resp.data]
# ---- 批量插入 ----
documents = [
{"text": "RAG通过检索增强生成,提高LLM的准确性和时效性。", "source": "rag_intro.md"},
{"text": "向量数据库存储高维向量,支持近似最近邻搜索。", "source": "vector_db.md"},
{"text": "HNSW是一种基于图的近似最近邻搜索算法。", "source": "hnsw.md"},
{"text": "多模态RAG可以处理文本、图像、表格等多种数据类型。", "source": "multimodal.md"},
{"text": "Text2SQL将自然语言查询转换为SQL语句。", "source": "text2sql.md"},
]
texts = [d["text"] for d in documents]
vectors = embed(texts)
data = [
{
"id": i,
"vector": vectors[i],
"text": documents[i]["text"],
"source": documents[i]["source"],
}
for i in range(len(documents))
]
milvus_client.insert(collection_name="knowledge_base", data=data)
print(f"✅ 插入 {len(data)} 条数据")
# ---- 检索 ----
def search(query: str, top_k: int = 3) -> list[dict]:
query_vector = embed([query])[0]
results = milvus_client.search(
collection_name="knowledge_base",
data=[query_vector],
limit=top_k,
output_fields=["text", "source"],
)
return [
{
"text": r["entity"]["text"],
"source": r["entity"]["source"],
"score": r["distance"],
}
for r in results[0]
]
# 测试检索
results = search("什么是向量搜索?")
for r in results:
print(f" [{r['score']:.4f}] {r['text']} ({r['source']})")
3.5 混合检索(向量 + 标量过滤)¶
"""混合检索:向量相似度 + 标量过滤条件"""
from pymilvus import MilvusClient
client = MilvusClient("./milvus_hybrid.db")
# 创建collection
client.create_collection(
collection_name="articles",
dimension=1536,
)
# 假设已插入数据,每条包含: id, vector, text, category, date
# --- 带过滤条件的检索 ---
def hybrid_search(
query_vector: list[float],
category: str | None = None,
date_after: str | None = None,
top_k: int = 5,
) -> list:
"""混合检索:向量相似度 + 标量过滤"""
# 构建过滤表达式
filters = []
if category:
filters.append(f'category == "{category}"')
if date_after:
filters.append(f'date >= "{date_after}"')
filter_expr = " and ".join(filters) if filters else ""
results = client.search(
collection_name="articles",
data=[query_vector],
limit=top_k,
filter=filter_expr, # 标量过滤
output_fields=["text", "category", "date"],
)
return results[0]
# 使用示例
# results = hybrid_search(
# query_vector=embed(["RAG技术"])[0],
# category="AI",
# date_after="2024-01-01",
# top_k=5,
# )
3.6 性能调优¶
"""Milvus性能调优要点"""
# 1. 索引类型选择
index_configs = {
# 小规模(<100万向量):精确且快速
"small_scale": {
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 16, "efConstruction": 256},
},
# 中规模(100万-1000万):平衡精度和速度
"medium_scale": {
"index_type": "IVF_FLAT",
"metric_type": "COSINE",
"params": {"nlist": 1024},
},
# 大规模(>1000万):用量化压缩内存
"large_scale": {
"index_type": "IVF_PQ",
"metric_type": "L2",
"params": {"nlist": 2048, "m": 16, "nbits": 8},
},
}
# 2. 搜索参数调优
search_params = {
"HNSW": {"ef": 128}, # ef越大越精确,越慢
"IVF_FLAT": {"nprobe": 32}, # nprobe越大越精确,越慢
"IVF_PQ": {"nprobe": 64},
}
# 3. 批量操作(比逐条插入快10-100倍)
# ✅ 推荐:批量插入
# client.insert(collection_name="docs", data=batch_data)
# ❌ 避免:逐条插入
# for item in data:
# client.insert(collection_name="docs", data=[item])
3.7 完整代码实战¶
"""完整Milvus RAG实战:从文档到检索到生成"""
from pymilvus import MilvusClient
from openai import OpenAI
import json
openai_client = OpenAI()
milvus_client = MilvusClient("./rag_production.db")
COLLECTION_NAME = "production_kb"
EMBEDDING_DIM = 1536
# ---- 初始化 ----
def init_collection():
"""初始化Collection"""
if milvus_client.has_collection(COLLECTION_NAME):
milvus_client.drop_collection(COLLECTION_NAME)
milvus_client.create_collection(
collection_name=COLLECTION_NAME,
dimension=EMBEDDING_DIM,
)
print(f"✅ Collection '{COLLECTION_NAME}' 已创建")
# ---- 数据预处理 ----
def chunk_documents(texts: list[str], chunk_size: int = 500, overlap: int = 50) -> list[dict]:
"""切分文档"""
chunks = []
for doc_idx, text in enumerate(texts): # enumerate同时获取索引和元素
start = 0
chunk_idx = 0
while start < len(text):
end = min(start + chunk_size, len(text))
chunks.append({
"text": text[start:end],
"doc_id": doc_idx,
"chunk_id": chunk_idx,
})
start += chunk_size - overlap
chunk_idx += 1
return chunks
def embed_batch(texts: list[str], batch_size: int = 100) -> list[list[float]]:
"""批量Embedding"""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
resp = openai_client.embeddings.create(
model="text-embedding-3-small", input=batch
)
all_embeddings.extend([d.embedding for d in resp.data])
return all_embeddings
# ---- 索引构建 ----
def build_index(documents: list[str]):
"""构建索引"""
init_collection()
# 切分
chunks = chunk_documents(documents)
texts = [c["text"] for c in chunks]
# Embedding
print(f"正在计算 {len(texts)} 个文本块的Embedding...")
embeddings = embed_batch(texts)
# 插入Milvus
data = [
{
"id": i,
"vector": embeddings[i],
"text": chunks[i]["text"],
"doc_id": chunks[i]["doc_id"],
"chunk_id": chunks[i]["chunk_id"],
}
for i in range(len(chunks))
]
milvus_client.insert(collection_name=COLLECTION_NAME, data=data)
print(f"✅ 已索引 {len(data)} 个文本块")
# ---- 检索与生成 ----
def retrieve_and_generate(query: str, top_k: int = 3) -> str:
"""RAG:检索 + 生成"""
# 检索
query_emb = embed_batch([query])[0]
results = milvus_client.search(
collection_name=COLLECTION_NAME,
data=[query_emb],
limit=top_k,
output_fields=["text", "doc_id", "chunk_id"],
)
contexts = []
for r in results[0]:
contexts.append(r["entity"]["text"])
context_text = "\n---\n".join(contexts)
# 生成
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"根据检索到的文档回答问题。引用相关内容。"
f"\n\n【检索结果】\n{context_text}"
),
},
{"role": "user", "content": query},
],
)
return response.choices[0].message.content
# ---- 使用示例 ----
if __name__ == "__main__":
# 构建索引
sample_docs = [
"RAG(Retrieval-Augmented Generation)是一种将检索和生成结合的技术...",
"Milvus是一个开源的向量数据库,专为相似性搜索和AI应用设计...",
]
build_index(sample_docs)
# 查询
answer = retrieve_and_generate("RAG的核心原理是什么?")
print(f"\n回答:{answer}")
4. 多模态RAG实现¶
4.1 文档中的图像处理¶
"""从PDF中提取并处理图像"""
import pymupdf
from pathlib import Path
import base64
from openai import OpenAI
client = OpenAI()
def extract_pdf_content(pdf_path: str) -> list[dict]:
"""从PDF提取文本和图像"""
doc = pymupdf.open(pdf_path)
contents = []
for page_num, page in enumerate(doc):
# 提取文本
text = page.get_text()
if text.strip(): # 链式调用:strip去除空白
contents.append({
"type": "text",
"content": text.strip(),
"page": page_num + 1,
"source": pdf_path,
})
# 提取图像
images = page.get_images(full=True)
for img_idx, img in enumerate(images):
xref = img[0]
pix = pymupdf.Pixmap(doc, xref)
if pix.n > 4: # CMYK转RGB
pix = pymupdf.Pixmap(pymupdf.csRGB, pix)
img_bytes = pix.tobytes("png")
b64 = base64.b64encode(img_bytes).decode("utf-8")
# 用Vision LLM生成图像描述
description = describe_image(b64)
contents.append({
"type": "image",
"content": b64, # base64编码的图像
"description": description,
"page": page_num + 1,
"source": pdf_path,
})
return contents
def describe_image(image_b64: str) -> str:
"""用GPT-4o生成图像描述"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "请详细描述这张图片的内容,用中文回答。"},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_b64}"},
},
],
}
],
max_tokens=300,
)
return response.choices[0].message.content
4.2 表格数据向量化¶
"""表格数据处理:保留结构信息"""
import pandas as pd
from openai import OpenAI
client = OpenAI()
def table_to_searchable_text(df: pd.DataFrame, table_name: str = "") -> list[str]:
"""将表格转换为可检索的文本块"""
texts = []
# 1. 表格整体描述
overview = f"表格: {table_name}\n"
overview += f"列: {', '.join(df.columns)}\n"
overview += f"行数: {len(df)}\n"
overview += f"数据类型: {dict(df.dtypes)}\n"
texts.append(overview)
# 2. 表格Markdown格式(适合小表格)
if len(df) <= 50:
md = df.to_markdown(index=False)
texts.append(f"表格 {table_name} 的完整数据:\n{md}")
# 3. 按行转换(适合大表格)
for idx, row in df.iterrows():
row_text = f"表格 {table_name} 第{idx+1}行: "
row_text += "; ".join(f"{col}={val}" for col, val in row.items())
texts.append(row_text)
# 4. 统计摘要
numeric_cols = df.select_dtypes(include="number").columns
if len(numeric_cols) > 0:
summary = f"表格 {table_name} 统计摘要:\n"
summary += df[numeric_cols].describe().to_markdown()
texts.append(summary)
return texts
# 示例
# df = pd.read_csv("sales_data.csv")
# chunks = table_to_searchable_text(df, "销售数据")
4.3 统一索引构建¶
"""多模态统一索引:文本 + 图像 + 表格"""
from pymilvus import MilvusClient
from openai import OpenAI
import json
openai_client = OpenAI()
milvus_client = MilvusClient("./multimodal_rag.db")
COLLECTION = "multimodal_kb"
def init_multimodal_collection():
if milvus_client.has_collection(COLLECTION):
milvus_client.drop_collection(COLLECTION)
milvus_client.create_collection(
collection_name=COLLECTION,
dimension=1536,
)
def embed(texts: list[str]) -> list[list[float]]:
resp = openai_client.embeddings.create(
model="text-embedding-3-small", input=texts
)
return [d.embedding for d in resp.data]
def build_multimodal_index(contents: list[dict]):
"""构建多模态统一索引"""
init_multimodal_collection()
data = []
texts_to_embed = []
for item in contents:
if item["type"] == "text":
texts_to_embed.append(item["content"])
elif item["type"] == "image":
texts_to_embed.append(item["description"])
elif item["type"] == "table":
texts_to_embed.append(item["content"])
embeddings = embed(texts_to_embed)
for i, item in enumerate(contents):
data.append({
"id": i,
"vector": embeddings[i],
"doc_type": item["type"],
"content": item.get("description", item["content"])[:5000],
"page": item.get("page", 0),
"source": item.get("source", ""),
})
milvus_client.insert(collection_name=COLLECTION, data=data)
print(f"✅ 已索引 {len(data)} 个多模态文档块")
4.4 多模态检索与重排序¶
"""多模态检索 + Cross-Encoder重排序"""
from sentence_transformers import CrossEncoder
import numpy as np
# 加载重排序模型
reranker = CrossEncoder("BAAI/bge-reranker-v2-m3", max_length=512)
def multimodal_search_with_rerank(
query: str,
top_k: int = 5,
rerank_top_n: int = 20,
) -> list[dict]:
"""多模态检索 + 重排序"""
# Step 1: 向量检索(召回较多候选)
query_emb = embed([query])[0]
candidates = milvus_client.search(
collection_name=COLLECTION,
data=[query_emb],
limit=rerank_top_n,
output_fields=["doc_type", "content", "page", "source"],
)[0]
# Step 2: Cross-Encoder重排序
pairs = [(query, c["entity"]["content"]) for c in candidates]
rerank_scores = reranker.predict(pairs)
# Step 3: 合并分数并排序
results = []
for i, candidate in enumerate(candidates):
results.append({
"content": candidate["entity"]["content"],
"doc_type": candidate["entity"]["doc_type"],
"page": candidate["entity"]["page"],
"source": candidate["entity"]["source"],
"vector_score": candidate["distance"],
"rerank_score": float(rerank_scores[i]),
})
results.sort(key=lambda x: x["rerank_score"], reverse=True)
return results[:top_k]
4.5 生成时的多模态上下文整合¶
"""多模态RAG生成:将检索到的文本、图像描述、表格整合为上下文"""
from openai import OpenAI
client = OpenAI()
def multimodal_generate(query: str, retrieved_docs: list[dict]) -> str:
"""基于多模态检索结果生成回答"""
# 按类型组织上下文
text_contexts = []
image_contexts = []
table_contexts = []
for doc in retrieved_docs:
if doc["doc_type"] == "text":
text_contexts.append(f"[文本, 第{doc['page']}页] {doc['content']}")
elif doc["doc_type"] == "image":
image_contexts.append(f"[图像描述, 第{doc['page']}页] {doc['content']}")
elif doc["doc_type"] == "table":
table_contexts.append(f"[表格, 第{doc['page']}页] {doc['content']}")
context_parts = []
if text_contexts:
context_parts.append("## 文本内容\n" + "\n\n".join(text_contexts))
if image_contexts:
context_parts.append("## 图像内容\n" + "\n\n".join(image_contexts))
if table_contexts:
context_parts.append("## 表格数据\n" + "\n\n".join(table_contexts))
context = "\n\n".join(context_parts)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"你是一个知识库问答助手。根据检索到的多模态内容回答问题。\n"
"内容可能包含文本、图像描述和表格数据,请综合利用。\n"
"回答要准确,引用来源(页码)。\n\n"
f"【检索内容】\n{context}"
),
},
{"role": "user", "content": query},
],
)
return response.choices[0].message.content
5. Text2SQL技术¶
5.1 为什么需要Text2SQL¶
| 场景 | RAG方案 | Text2SQL方案 |
|---|---|---|
| "2024年Q3销售额多少?" | 需要索引所有数据行 | 直接生成 SELECT SUM(amount) WHERE quarter='2024Q3' |
| "哪个产品利润率最高?" | 难以精确计算 | SELECT product, MAX(profit_rate) ... |
| 聚合统计查询 | 不擅长 | ✅ 天然擅长 |
| 模糊语义查询 | ✅ 擅长 | 不擅长 |
结论:Text2SQL和RAG互补,结构化数据用SQL,非结构化数据用向量检索。
5.2 基于LLM的Text2SQL实现¶
"""Text2SQL:自然语言转SQL查询"""
import sqlite3
from openai import OpenAI
client = OpenAI()
# ---- 创建示例数据库 ----
def create_sample_db(db_path: str = "sample.db"):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS sales (
id INTEGER PRIMARY KEY,
product TEXT NOT NULL,
category TEXT NOT NULL,
amount REAL NOT NULL,
quantity INTEGER NOT NULL,
date TEXT NOT NULL,
region TEXT NOT NULL
)
""")
sample_data = [
("iPhone 15", "手机", 5999, 120, "2024-01-15", "华东"),
("MacBook Pro", "笔记本", 14999, 45, "2024-01-20", "华北"),
("iPad Air", "平板", 4799, 80, "2024-02-10", "华东"),
("AirPods Pro", "耳机", 1899, 200, "2024-02-15", "华南"),
("iPhone 15", "手机", 5999, 150, "2024-03-01", "华北"),
("MacBook Air", "笔记本", 8999, 60, "2024-03-10", "华南"),
("Apple Watch", "手表", 2999, 90, "2024-03-20", "华东"),
]
cursor.executemany(
"INSERT INTO sales (product, category, amount, quantity, date, region) VALUES (?, ?, ?, ?, ?, ?)",
sample_data,
)
conn.commit()
conn.close()
# ---- Schema提取 ----
def get_schema(db_path: str) -> str:
"""提取数据库Schema信息"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
schema_parts = []
for (table_name,) in tables:
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
col_defs = []
for col in columns:
col_defs.append(f" {col[1]} {col[2]}")
schema_parts.append(f"CREATE TABLE {table_name} (\n" + ",\n".join(col_defs) + "\n)")
# 获取样本数据
cursor.execute(f"SELECT * FROM {table_name} LIMIT 3")
samples = cursor.fetchall()
if samples:
col_names = [c[1] for c in columns]
schema_parts.append(f"-- 样本数据 ({table_name}):")
for sample in samples:
row = dict(zip(col_names, sample))
schema_parts.append(f"-- {row}")
conn.close()
return "\n\n".join(schema_parts)
5.3 Schema感知的Prompt设计¶
"""Schema-aware Text2SQL Prompt"""
TEXT2SQL_PROMPT = """你是一个SQL专家。根据用户的自然语言问题,生成对应的SQLite SQL查询。
### 数据库Schema:
{schema}
### 规则:
1. 只生成SELECT查询(不允许INSERT/UPDATE/DELETE)
2. 使用SQLite语法
3. 表名和列名与Schema完全一致
4. 如果问题无法用SQL回答,返回"CANNOT_ANSWER"
5. 只返回SQL语句,不要解释
### 用户问题: {question}
### SQL查询:"""
def text_to_sql(question: str, db_path: str = "sample.db") -> str:
"""自然语言转SQL"""
schema = get_schema(db_path)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": TEXT2SQL_PROMPT.format(schema=schema, question=question),
}
],
temperature=0, # SQL生成用低温度确保确定性
)
sql = response.choices[0].message.content.strip()
# 清理可能的markdown代码块
sql = sql.replace("```sql", "").replace("```", "").strip()
return sql
5.4 SQL验证与安全¶
"""SQL安全验证"""
import re
import sqlite3
FORBIDDEN_KEYWORDS = [
"DROP", "DELETE", "INSERT", "UPDATE", "ALTER",
"CREATE", "TRUNCATE", "EXEC", "EXECUTE",
"--", # SQL注释(可能用于注入)
]
def validate_sql(sql: str) -> tuple[bool, str]:
"""验证SQL安全性"""
upper_sql = sql.upper().strip()
# 检查是否为SELECT语句
if not upper_sql.startswith("SELECT"):
return False, "只允许SELECT查询"
# 检查禁止的关键字
for keyword in FORBIDDEN_KEYWORDS:
if keyword in upper_sql:
return False, f"包含禁止的关键字: {keyword}"
# 检查分号(防止多语句注入)
if sql.count(";") > 1:
return False, "不允许多条SQL语句"
return True, "OK"
def safe_execute_sql(sql: str, db_path: str = "sample.db") -> dict:
"""安全执行SQL并返回结果"""
# 验证
is_valid, message = validate_sql(sql)
if not is_valid:
return {"success": False, "error": message, "data": None}
try: # try/except捕获异常,防止程序崩溃
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA query_only = ON") # 只读模式
cursor = conn.cursor()
cursor.execute(sql)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
conn.close()
return {
"success": True,
"columns": columns,
"data": [dict(zip(columns, row)) for row in rows],
"row_count": len(rows),
}
except Exception as e:
return {"success": False, "error": str(e), "data": None}
5.5 与RAG的结合(混合查询路由)¶
"""混合查询路由:自动判断使用RAG还是Text2SQL"""
from openai import OpenAI
client = OpenAI()
ROUTER_PROMPT = """判断用户的问题应该使用哪种方式回答:
1. "SQL" - 如果问题涉及结构化数据查询、统计、聚合、排序、过滤等
2. "RAG" - 如果问题涉及概念解释、原理说明、非结构化知识
只回答 "SQL" 或 "RAG",不要解释。
用户问题: {question}"""
def route_query(question: str) -> str:
"""路由查询到合适的处理方式"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "user", "content": ROUTER_PROMPT.format(question=question)},
],
temperature=0,
max_tokens=5,
)
route = response.choices[0].message.content.strip().upper()
return "SQL" if "SQL" in route else "RAG"
def hybrid_answer(question: str, db_path: str = "sample.db") -> str:
"""混合问答:自动选择RAG或Text2SQL"""
route = route_query(question)
if route == "SQL":
# Text2SQL路径
sql = text_to_sql(question, db_path)
result = safe_execute_sql(sql, db_path)
if result["success"]:
# 将SQL结果交给LLM生成自然语言回答
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "根据SQL查询结果,用自然语言回答用户问题。",
},
{
"role": "user",
"content": (
f"问题: {question}\n"
f"SQL: {sql}\n"
f"结果: {result['data']}"
),
},
],
)
return f"📊 [SQL查询]\n{response.choices[0].message.content}"
else:
return f"❌ SQL执行失败: {result['error']}"
else:
# RAG路径(使用之前实现的retrieve_and_generate)
answer = retrieve_and_generate(question)
return f"📚 [知识检索]\n{answer}"
# 示例
# print(hybrid_answer("2024年1月总销售额是多少?")) → SQL
# print(hybrid_answer("什么是RAG技术?")) → RAG
5.6 完整Text2SQL代码¶
"""完整的Text2SQL问答系统"""
import sqlite3
import gradio as gr
from openai import OpenAI
client = OpenAI()
def text2sql_app(question: str, db_path: str = "sample.db") -> tuple[str, str, str]:
"""Text2SQL完整流程,返回(SQL, 查询结果, 自然语言回答)"""
# 1. 生成SQL
schema = get_schema(db_path)
sql = text_to_sql(question, db_path)
if sql == "CANNOT_ANSWER":
return "无法生成SQL", "", "该问题无法用SQL回答"
# 2. 验证并执行
result = safe_execute_sql(sql, db_path)
if not result["success"]:
return sql, f"执行失败: {result['error']}", ""
# 3. 格式化结果
import pandas as pd
df = pd.DataFrame(result["data"])
result_str = df.to_markdown(index=False)
# 4. 生成自然语言回答
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "根据SQL查询结果回答用户的问题,用中文回答,简洁准确。",
},
{
"role": "user",
"content": f"问题: {question}\nSQL: {sql}\n结果:\n{result_str}",
},
],
)
answer = response.choices[0].message.content
return sql, result_str, answer
# Gradio界面
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 📊 Text2SQL智能问答")
question = gr.Textbox(label="输入问题", placeholder="例如:哪个产品销量最高?")
btn = gr.Button("查询", variant="primary")
sql_output = gr.Code(label="生成的SQL", language="sql")
table_output = gr.Markdown(label="查询结果")
answer_output = gr.Textbox(label="回答", lines=3)
btn.click(fn=text2sql_app, inputs=question, outputs=[sql_output, table_output, answer_output])
gr.Examples(
examples=[
"2024年总销售额是多少?",
"哪个区域的销量最高?",
"按类别统计平均单价",
"列出所有手机类产品",
],
inputs=question,
)
6. RAG系统评估¶
6.1 RAGAS评估框架¶
RAGAS (Retrieval Augmented Generation Assessment) 是RAG系统专用评估框架:
6.2 评估维度¶
| 维度 | 含义 | 评估什么 |
|---|---|---|
| Faithfulness | 忠实度 | 生成的回答是否忠于检索到的上下文 |
| Answer Relevancy | 回答相关性 | 生成的回答是否回答了用户的问题 |
| Context Precision | 上下文精确度 | 检索到的上下文中相关内容排名是否靠前 |
| Context Recall | 上下文召回率 | 回答所需的信息是否都被检索到了 |
6.3 自动化评估流水线¶
"""RAG评估实现(基于LLM-as-Judge)"""
from openai import OpenAI
import json
client = OpenAI()
# ---- 评估指标 ----
def evaluate_faithfulness(answer: str, contexts: list[str]) -> float:
"""评估回答对上下文的忠实度"""
context_text = "\n---\n".join(contexts)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"你是一个评估专家。判断以下回答是否完全基于给定的上下文。\n"
"评分标准:\n"
"1.0 = 回答完全基于上下文,没有编造任何信息\n"
"0.5 = 回答部分基于上下文,但有些内容无法从上下文中验证\n"
"0.0 = 回答主要基于编造的信息\n"
"只返回一个数字(0.0/0.5/1.0)。"
),
},
{
"role": "user",
"content": f"上下文:\n{context_text}\n\n回答:\n{answer}",
},
],
temperature=0,
)
try:
return float(response.choices[0].message.content.strip())
except ValueError:
return 0.5
def evaluate_answer_relevancy(question: str, answer: str) -> float:
"""评估回答对问题的相关性"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"你是一个评估专家。判断以下回答是否充分回答了用户的问题。\n"
"评分:1.0=完全回答, 0.7=大部分回答, 0.4=部分回答, 0.0=完全无关\n"
"只返回一个数字。"
),
},
{
"role": "user",
"content": f"问题:\n{question}\n\n回答:\n{answer}",
},
],
temperature=0,
)
try:
return float(response.choices[0].message.content.strip())
except ValueError:
return 0.5
def evaluate_context_precision(question: str, contexts: list[str]) -> float:
"""评估检索上下文的精确度(相关内容是否排在前面)"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"你是一个评估专家。判断以下检索结果对回答问题的有用程度。\n"
"对每个检索结果标记 1(相关)或 0(不相关)。\n"
"返回JSON数组,例如: [1, 0, 1]"
),
},
{
"role": "user",
"content": (
f"问题: {question}\n\n"
+ "\n\n".join(f"检索结果{i+1}: {c}" for i, c in enumerate(contexts))
),
},
],
temperature=0,
)
try:
relevance = json.loads(response.choices[0].message.content.strip()) # json.loads将JSON字符串→Python对象
if not relevance:
return 0.0
# Average Precision计算
hits = 0
precision_sum = 0.0
for i, rel in enumerate(relevance):
if rel == 1:
hits += 1
precision_sum += hits / (i + 1)
return precision_sum / max(sum(relevance), 1)
except (json.JSONDecodeError, ValueError):
return 0.5
# ---- 综合评估 ----
def evaluate_rag_sample(
question: str,
answer: str,
contexts: list[str],
) -> dict:
"""综合评估一个RAG样本"""
scores = {
"faithfulness": evaluate_faithfulness(answer, contexts),
"answer_relevancy": evaluate_answer_relevancy(question, answer),
"context_precision": evaluate_context_precision(question, contexts),
}
scores["overall"] = sum(scores.values()) / len(scores)
return scores
def evaluate_rag_dataset(dataset: list[dict]) -> dict:
"""评估整个数据集"""
all_scores = []
for sample in dataset:
scores = evaluate_rag_sample(
question=sample["question"],
answer=sample["answer"],
contexts=sample["contexts"],
)
all_scores.append(scores)
print(f" Q: {sample['question'][:50]}... → 综合: {scores['overall']:.2f}")
# 汇总
avg_scores = {}
for key in all_scores[0]:
avg_scores[key] = sum(s[key] for s in all_scores) / len(all_scores)
return avg_scores
# 使用示例
eval_dataset = [
{
"question": "什么是RAG?",
"answer": "RAG是检索增强生成技术,通过检索相关文档来增强LLM的生成质量。",
"contexts": ["RAG(Retrieval-Augmented Generation)通过检索增强生成..."],
},
# ... 更多评估样本
]
# results = evaluate_rag_dataset(eval_dataset)
# print(f"评估结果: {results}")
7. 索引优化技术¶
7.1 HNSW vs IVF vs Flat索引¶
| 索引类型 | 原理 | 适用规模 | 精度 | 速度 | 内存 |
|---|---|---|---|---|---|
| FLAT | 暴力搜索 | <10万 | 100% | 慢 | 低 |
| IVF_FLAT | 倒排索引 + 暴力 | 10万-1000万 | 高 | 中 | 中 |
| IVF_PQ | 倒排 + 量化 | >1000万 | 中 | 快 | 低 |
| HNSW | 层级导航小世界图 | 1万-1000万 | 高 | 快 | 高 |
7.2 量化技术¶
"""向量量化对比"""
# PQ (Product Quantization) 乘积量化
# 将高维向量分段,每段独立量化
pq_index_config = {
"index_type": "IVF_PQ",
"metric_type": "L2",
"params": {
"nlist": 1024, # 聚类中心数(更多=更精确但更慢)
"m": 16, # 子向量数(维度的因子,如1536/16=96)
"nbits": 8, # 每个子向量的位数(通常8)
},
}
# 压缩比:原始1536*4=6144字节 → 16*1=16字节 ≈ 384x压缩
# SQ (Scalar Quantization) 标量量化
# 将float32量化为int8
sq_index_config = {
"index_type": "IVF_SQ8",
"metric_type": "L2",
"params": {
"nlist": 1024,
},
}
# 压缩比:4x(float32 → int8)
7.3 索引参数调优¶
"""索引参数调优指南"""
def get_optimal_params(num_vectors: int, dimension: int, priority: str = "balanced"):
"""根据数据规模和优先级推荐索引参数"""
if num_vectors < 100_000:
# 小规模:直接用HNSW
return {
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 256},
"search_params": {"ef": 128},
"notes": "小规模数据,HNSW提供最佳精度-速度平衡",
}
elif num_vectors < 10_000_000:
if priority == "accuracy":
return {
"index_type": "HNSW",
"params": {"M": 32, "efConstruction": 512},
"search_params": {"ef": 256},
"notes": "中规模+高精度需求,内存消耗较大",
}
elif priority == "speed":
return {
"index_type": "IVF_SQ8",
"params": {"nlist": 4096},
"search_params": {"nprobe": 64},
"notes": "中规模+速度优先,4x内存压缩",
}
else:
return {
"index_type": "IVF_FLAT",
"params": {"nlist": 2048},
"search_params": {"nprobe": 128},
"notes": "中规模均衡方案",
}
else:
# 大规模:必须用量化
return {
"index_type": "IVF_PQ",
"params": {"nlist": 8192, "m": dimension // 96, "nbits": 8},
"search_params": {"nprobe": 128},
"notes": "大规模数据,PQ量化显著节省内存",
}
# 示例
params = get_optimal_params(5_000_000, 1536, priority="balanced")
print(f"推荐索引: {params['index_type']}")
print(f"参数: {params['params']}")
print(f"说明: {params['notes']}")
7.4 检索性能基准测试¶
"""检索性能基准测试"""
import time
import numpy as np
from pymilvus import MilvusClient
def benchmark_search(
client: MilvusClient,
collection_name: str,
dimension: int,
num_queries: int = 100,
top_k: int = 10,
) -> dict:
"""基准测试检索性能"""
# 生成随机查询向量
query_vectors = np.random.random((num_queries, dimension)).tolist()
latencies = []
for qv in query_vectors:
start = time.perf_counter()
client.search(
collection_name=collection_name,
data=[qv],
limit=top_k,
)
elapsed = (time.perf_counter() - start) * 1000 # 毫秒
latencies.append(elapsed)
return {
"num_queries": num_queries,
"top_k": top_k,
"avg_latency_ms": np.mean(latencies),
"p50_ms": np.percentile(latencies, 50),
"p95_ms": np.percentile(latencies, 95),
"p99_ms": np.percentile(latencies, 99),
"qps": 1000.0 / np.mean(latencies),
}
# 使用
# results = benchmark_search(milvus_client, "knowledge_base", 1536)
# print(f"平均延迟: {results['avg_latency_ms']:.2f}ms")
# print(f"P95延迟: {results['p95_ms']:.2f}ms")
# print(f"QPS: {results['qps']:.1f}")
8. 实战:多模态知识库¶
8.1 处理含图表的PDF文档¶
"""端到端多模态知识库 —— 处理含图表的PDF"""
import pymupdf
import base64
from pathlib import Path
from dataclasses import dataclass, field
from openai import OpenAI
from pymilvus import MilvusClient
import numpy as np
client = OpenAI()
milvus = MilvusClient("./multimodal_kb.db")
COLLECTION = "mm_knowledge_base"
EMBED_DIM = 1536
@dataclass
class ContentBlock:
block_id: int
block_type: str # "text" | "image" | "table"
content: str # 文本内容或图像的LLM描述
raw_data: str = "" # 原始数据(图像的base64等)
page: int = 0
source: str = ""
embedding: list[float] = field(default_factory=list) # 每个实例获得独立的空列表,避免所有实例共享同一个列表
class MultimodalKnowledgeBase:
"""多模态知识库"""
def __init__(self):
self.blocks: list[ContentBlock] = []
self._init_collection()
def _init_collection(self):
if milvus.has_collection(COLLECTION):
milvus.drop_collection(COLLECTION)
milvus.create_collection(collection_name=COLLECTION, dimension=EMBED_DIM)
def _embed(self, texts: list[str]) -> list[list[float]]:
all_embs = []
for i in range(0, len(texts), 100):
batch = texts[i:i + 100]
resp = client.embeddings.create(model="text-embedding-3-small", input=batch)
all_embs.extend([d.embedding for d in resp.data])
return all_embs
def _describe_image(self, img_b64: str) -> str:
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": "请详细描述这张图片的内容,用中文。"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_b64}"}},
],
}],
max_tokens=300,
)
return resp.choices[0].message.content
# ---- 文档加载 ----
def load_pdf(self, pdf_path: str):
"""加载PDF文档,提取文本和图像"""
doc = pymupdf.open(pdf_path)
initial_count = len(self.blocks)
block_id = initial_count
for page_num, page in enumerate(doc):
# 文本
text = page.get_text().strip()
if text:
# 按段落切分
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
# 合并短段落
merged = []
current = ""
for p in paragraphs:
if len(current) + len(p) < 500:
current += "\n" + p if current else p
else:
if current:
merged.append(current)
current = p
if current:
merged.append(current)
for chunk in merged:
self.blocks.append(ContentBlock(
block_id=block_id,
block_type="text",
content=chunk,
page=page_num + 1,
source=pdf_path,
))
block_id += 1
# 图像
for img in page.get_images(full=True):
try:
xref = img[0]
pix = pymupdf.Pixmap(doc, xref)
if pix.width < 50 or pix.height < 50:
continue # 跳过太小的图像
if pix.n > 4:
pix = pymupdf.Pixmap(pymupdf.csRGB, pix)
img_b64 = base64.b64encode(pix.tobytes("png")).decode("utf-8")
description = self._describe_image(img_b64)
self.blocks.append(ContentBlock(
block_id=block_id,
block_type="image",
content=description,
raw_data=img_b64,
page=page_num + 1,
source=pdf_path,
))
block_id += 1
except Exception:
continue
print(f"✅ 从 {pdf_path} 提取了 {len(self.blocks) - initial_count} 个内容块")
# ---- 索引构建 ----
def build_index(self):
"""构建向量索引"""
if not self.blocks:
print("⚠️ 没有内容可索引")
return
texts = [b.content for b in self.blocks]
embeddings = self._embed(texts)
data = []
for i, block in enumerate(self.blocks):
block.embedding = embeddings[i]
data.append({
"id": block.block_id,
"vector": embeddings[i],
"block_type": block.block_type,
"content": block.content[:5000],
"page": block.page,
"source": block.source,
})
milvus.insert(collection_name=COLLECTION, data=data)
print(f"✅ 已索引 {len(data)} 个内容块(文本: {sum(1 for b in self.blocks if b.block_type == 'text')}, 图像: {sum(1 for b in self.blocks if b.block_type == 'image')})") # sum(1 for...if)条件计数惯用法
# ---- 统一查询 ----
def query(self, question: str, top_k: int = 5) -> str:
"""统一查询接口"""
q_emb = self._embed([question])[0]
results = milvus.search(
collection_name=COLLECTION,
data=[q_emb],
limit=top_k,
output_fields=["block_type", "content", "page", "source"],
)[0]
# 组织上下文
context_parts = []
for r in results:
entity = r["entity"]
prefix = {"text": "📝", "image": "🖼️", "table": "📊"}.get(entity["block_type"], "📄") # 匿名字典+get:用字典实现switch-case映射,未匹配返回默认值
context_parts.append(
f"{prefix} [{entity['block_type']}] (第{entity['page']}页, 相似度:{r['distance']:.3f})\n{entity['content']}"
)
context = "\n\n---\n\n".join(context_parts)
# 生成回答
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"你是知识库问答助手。根据以下多模态检索内容回答问题。\n"
"内容可能包含文本摘录和图像描述,请综合利用。\n"
"引用来源页码。\n\n"
f"【检索内容】\n{context}"
),
},
{"role": "user", "content": question},
],
)
return resp.choices[0].message.content
# ---- 使用 ----
if __name__ == "__main__":
kb = MultimodalKnowledgeBase()
# 加载文档
# kb.load_pdf("technical_report.pdf")
# kb.build_index()
# 查询
# answer = kb.query("文档中的架构图展示了什么?")
# print(answer)
8.2 端到端测试¶
"""端到端测试脚本"""
def test_multimodal_kb():
"""测试多模态知识库的完整流程"""
# 1. 创建测试数据
kb = MultimodalKnowledgeBase()
# 手动添加测试数据(不依赖PDF)
test_blocks = [
ContentBlock(0, "text", "RAG是检索增强生成技术,通过检索外部知识来增强LLM的准确性。", page=1, source="test.pdf"),
ContentBlock(1, "text", "Milvus是开源向量数据库,支持百亿级向量的高效检索。", page=2, source="test.pdf"),
ContentBlock(2, "image", "架构图:显示了用户查询经过Embedding模型转换为向量,然后在Milvus中检索,最后由LLM生成回答的流程。", page=3, source="test.pdf"),
ContentBlock(3, "text", "HNSW索引通过层级图结构实现高效的近似最近邻搜索。", page=4, source="test.pdf"),
ContentBlock(4, "table", "性能对比表:HNSW召回率99%延迟5ms,IVF_FLAT召回率95%延迟3ms,IVF_PQ召回率90%延迟1ms。", page=5, source="test.pdf"),
]
kb.blocks = test_blocks
kb.build_index()
# 2. 测试查询
test_queries = [
"什么是RAG?",
"架构图展示了什么流程?",
"不同索引类型的性能对比如何?",
]
print("\n" + "=" * 60)
print("端到端测试")
print("=" * 60)
for q in test_queries:
print(f"\n❓ {q}")
answer = kb.query(q, top_k=3)
print(f"💡 {answer}")
print("-" * 40)
# test_multimodal_kb()
9. 总结与最佳实践¶
9.1 技术选型指南¶
| 场景 | 推荐方案 |
|---|---|
| 纯文本文档RAG | OpenAI Embedding + Milvus HNSW |
| 含图表的技术文档 | Vision LLM描述 + 统一文本Embedding |
| 结构化数据查询 | Text2SQL + LLM回答生成 |
| 混合数据场景 | 查询路由(SQL/RAG分流) |
| 高精度需求 | 多路召回 + Cross-Encoder重排序 |
| 大规模(>1000万向量) | Milvus分布式 + IVF_PQ量化 |
9.2 最佳实践清单¶
Embedding选择: - 中文场景优先:BGE-M3 > text-embedding-3-small - 需要多模态:CLIP用于图像检索,Vision LLM用于图像理解 - 维度权衡:更高维度=更精确,但索引更大、检索更慢
向量数据库: - 开发测试:Milvus Lite(零配置本地文件) - 生产环境:Milvus standalone或K8s部署 - 必须创建索引,否则每次搜索退化为暴力扫描 - 批量写入(batch insert)比逐条写入快100倍
Text2SQL: - 始终在只读模式下执行SQL - Schema信息 + 样本数据放入Prompt - 使用低temperature(0-0.1)提高确定性 - SQL验证是安全必需环节
RAG评估: - 每次系统变更后跑评估流水线 - 关注Context Precision(检索质量)和Faithfulness(生成质量) - 至少准备50+评估样本才有统计意义
9.3 常见陷阱¶
| 陷阱 | 解决方案 |
|---|---|
| 图像直接OCR效果差 | 用Vision LLM生成结构化描述 |
| 表格转文本后丢失结构 | 保持Markdown/HTML格式或用Text2SQL |
| 向量检索top_k太小 | 先召回多(20+),再用reranker筛选 |
| Embedding模型选错 | 中文用BGE系列,英文用OpenAI |
| Milvus不建索引 | 数据多时检索极慢,必须建索引 |
| Text2SQL注入风险 | 只读模式 + 关键字过滤 + SQL验证 |
9.4 延伸学习¶
- GraphRAG:Microsoft开源的基于知识图谱的RAG方案
- ColBERT:Token级向量交互的精细检索模型
- Corrective RAG:检索结果不满意时自动修正重检索
- Agentic RAG:用Agent编排多步检索逻辑
📝 本章小结¶
| 知识点 | 掌握程度 |
|---|---|
| 多模态RAG架构 | ⭐⭐⭐ 必须掌握 |
| CLIP / BGE-M3 Embedding | ⭐⭐⭐ 必须掌握 |
| Milvus基本操作 | ⭐⭐⭐ 必须掌握 |
| 混合检索(向量+过滤) | ⭐⭐⭐ 必须掌握 |
| Text2SQL实现 | ⭐⭐ 重要 |
| RAG评估(RAGAS) | ⭐⭐ 重要 |
| 索引优化与量化 | ⭐⭐ 重要 |
| Cross-Encoder重排序 | ⭐⭐ 重要 |
📝 练习与面试¶
练习1:Milvus向量搜索(⭐)¶
用Milvus Lite创建Collection,插入100条文本的Embedding,实现语义搜索并返回top-5结果。
练习2:Text2SQL问答(⭐⭐)¶
搭建一个简单的Text2SQL系统:给定一个SQLite数据库,用自然语言提问,生成并执行SQL,返回结果。注意安全性。
练习3:多模态知识库(⭐⭐⭐)¶
构建一个完整的多模态RAG系统:加载含image的PDF,提取文本+图像,统一建序,实现检索+生成。
面试题¶
Q1: HNSW和IVF索引的区别是什么?各适合什么场景?
HNSW是基于图的索引,查询快、精度高,但内存占用大,适合中小规模(<1000万)且对延迟敏感的场景。IVF是基于聚类的索引,内存效率更高,适合大规模数据(>1000万),可结合PQ量化进一步压缩。
Q2: BGE-M3和OpenAI Embedding怎么选?
BGE-M3优势:开源免费、支持本地部署、中文效果更佳、支持Dense+Sparse混合检索。OpenAI优势:无需GPU、API调用简单、英文效果优秀。成本敏感+中文场景用BGE-M3,快速原型+英文场景用OpenAI。
Q3: Text2SQL的安全风险和防护措施有哪些?
主要风险:SQL注入(DROP/DELETE)、数据泄露、资源消耗(全表扫描)。防护:① 只读PRAGMA。② 关键字黑名单过滤。③ SQL语法验证。④ 执行超时限制。⑤ 日志审计所有查询。
导航:上一章 23-Gradio构建AI应用 | 返回目录