跳转至

第二十四章 多模态RAG与向量数据库进阶

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

从文本到图像、从向量检索到SQL查询——全面掌握进阶RAG技术

学习时间: 8-10小时 难度级别: ⭐⭐⭐⭐ 中高级 前置知识: RAG基础(第5章)、向量数据库(第6章)、高级RAG(第18章) 学习目标: 掌握多模态Embedding、Milvus向量数据库、Text2SQL、RAG评估等进阶技术


📖 章节导读

传统RAG只处理文本,但真实文档中充满了图表、公式和表格。本章从多模态Embedding出发,结合Milvus向量数据库和Text2SQL技术,构建能处理任意数据类型的进阶RAG系统,并掌握系统评估方法。


1. 多模态RAG概述

1.1 为什么需要多模态RAG

传统纯文本RAG的局限:

问题 场景
图像信息丢失 技术文档中的架构图、流程图无法被检索
表格数据失真 PDF表格转文本后结构混乱
跨模态查询无法处理 "找到和这张图类似的文档"
公式无法理解 数学论文中LaTeX公式被转为乱码

1.2 统一检索架构

Text Only
用户查询 (文本/图像)
多模态Embedding模型 (CLIP / BGE-M3)
统一向量空间
    ↓━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ↓                           ↓
文本向量索引              图像/表格向量索引
    ↓                           ↓
    ↓━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
混合检索 + 重排序
多模态上下文 → LLM生成

1.3 核心挑战与方案

挑战 方案
不同模态如何统一表示 CLIP等多模态Embedding将文本和图像映射到同一向量空间
图像如何参与RAG 方案A:用Vision LLM生成图像描述再检索;方案B:直接用多模态Embedding
表格如何处理 转为Markdown/HTML保留结构,或直接Text2SQL查询
检索质量如何保证 多路召回 + Cross-Encoder重排序

2. 多模态Embedding

2.1 CLIP模型原理

CLIP (Contrastive Language-Image Pre-training) 是OpenAI开发的多模态模型,通过对比学习将文本和图像映射到同一向量空间。

核心思想: - 正样本对(匹配的文本-图像对)向量距离拉近 - 负样本对(不匹配的)向量距离推远 - 训练完成后,文本向量和图像向量可直接计算相似度

Python
"""CLIP模型基础用法"""

from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch
import numpy as np

# 加载模型
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# --- 图像编码 ---
def encode_image(image_path: str) -> np.ndarray:
    image = Image.open(image_path).convert("RGB")
    inputs = processor(images=image, return_tensors="pt")
    with torch.no_grad():  # 禁用梯度计算,节省内存(推理时使用)
        features = model.get_image_features(**inputs)
    # L2归一化
    features = features / features.norm(dim=-1, keepdim=True)
    return features.squeeze().numpy()

# --- 文本编码 ---
def encode_text(text: str) -> np.ndarray:
    inputs = processor(text=[text], return_tensors="pt", padding=True)
    with torch.no_grad():
        features = model.get_text_features(**inputs)
    features = features / features.norm(dim=-1, keepdim=True)
    return features.squeeze().numpy()

# --- 文本-图像相似度 ---
def text_image_similarity(text: str, image_path: str) -> float:
    text_emb = encode_text(text)
    image_emb = encode_image(image_path)
    return float(np.dot(text_emb, image_emb))

# 示例
# score = text_image_similarity("a cat sitting on a couch", "cat.jpg")
# print(f"相似度: {score:.4f}")

2.2 BGE-M3多功能文本嵌入

BGE-M3是BAAI推出的多功能文本Embedding模型(M3=Multi-Lingual, Multi-Functionality, Multi-Granularity),支持多语言、多粒度和混合检索。注意:BGE-M3是纯文本模型,不能直接编码图像,但与CLIP互补——CLIP负责图像检索,BGE-M3负责高质量文本检索:

Python
"""BGE-M3嵌入 —— 支持Dense + Sparse + ColBERT多种检索方式"""

from sentence_transformers import SentenceTransformer
import numpy as np

# 加载BGE-M3模型
model = SentenceTransformer("BAAI/bge-m3")

def get_dense_embeddings(texts: list[str]) -> np.ndarray:
    """获取Dense Embedding(标准向量检索)"""
    embeddings = model.encode(
        texts,
        normalize_embeddings=True,  # L2归一化
        show_progress_bar=True,
    )
    return embeddings

# 示例
texts = [
    "什么是Transformer架构?",
    "注意力机制的计算过程",
    "如何训练大语言模型",
]
embeddings = get_dense_embeddings(texts)
print(f"Embedding维度: {embeddings.shape}")  # (3, 1024)

# 计算相似度矩阵
similarity_matrix = np.dot(embeddings, embeddings.T)
print("相似度矩阵:")
print(similarity_matrix.round(3))

💡 维度说明:BGE-M3默认输出1024维向量,OpenAI text-embedding-3-small 输出1536维。在实际项目中,同一个Collection必须统一维度。后续Milvus章节使用OpenAI Embedding(1536维),如需换用BGE-M3,请将 dimension 参数改为1024。

2.3 图像嵌入实现

使用OpenAI的多模态能力生成图像描述,再用文本Embedding统一检索:

Python
"""方案A:Vision LLM生成描述 + 文本Embedding(实用方案)"""

import base64
from openai import OpenAI
from pathlib import Path

client = OpenAI()

def image_to_description(image_path: str) -> str:
    """用GPT-4o-mini为图像生成详细描述"""
    with open(image_path, "rb") as f:  # with自动管理文件关闭
        b64 = base64.b64encode(f.read()).decode("utf-8")

    suffix = Path(image_path).suffix.lower()
    mime_map = {".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg"}
    mime_type = mime_map.get(suffix, "image/png")

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": (
                            "请详细描述这张图片的内容,包括:\n"
                            "1. 图片类型(图表/照片/架构图/截图等)\n"
                            "2. 主要内容和关键信息\n"
                            "3. 如果是图表,描述数据趋势和关键数值\n"
                            "4. 如果是架构图,描述组件和关系\n"
                            "用中文回答,尽量详细。"
                        ),
                    },
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:{mime_type};base64,{b64}"},
                    },
                ],
            }
        ],
        max_tokens=500,
    )
    return response.choices[0].message.content

def get_text_embedding(text: str) -> list[float]:
    """获取文本Embedding"""
    resp = client.embeddings.create(model="text-embedding-3-small", input=[text])
    return resp.data[0].embedding

# 图像处理流水线
def process_image_for_rag(image_path: str) -> dict:
    """将图像转换为可检索的文档"""
    description = image_to_description(image_path)
    embedding = get_text_embedding(description)
    return {
        "type": "image",
        "source": image_path,
        "description": description,
        "embedding": embedding,
    }

2.4 文本-图像联合检索

Python
"""统一检索实现:文本和图像在同一向量空间中检索"""

import numpy as np
from dataclasses import dataclass

@dataclass  # @dataclass自动生成__init__等方法
class Document:
    doc_id: str
    doc_type: str       # "text" | "image" | "table"
    content: str        # 原始内容或图像路径
    description: str    # 文本描述(图像为LLM生成的描述)
    embedding: list[float]
    metadata: dict = None

class MultimodalIndex:
    """多模态统一索引"""

    def __init__(self):
        self.documents: list[Document] = []

    def add_document(self, doc: Document):
        self.documents.append(doc)

    def search(self, query_embedding: list[float], top_k: int = 5) -> list[tuple[Document, float]]:
        """向量相似度搜索"""
        q = np.array(query_embedding)
        results = []
        for doc in self.documents:
            d = np.array(doc.embedding)
            score = float(np.dot(q, d) / (np.linalg.norm(q) * np.linalg.norm(d)))
            results.append((doc, score))
        results.sort(key=lambda x: x[1], reverse=True)  # lambda匿名函数
        return results[:top_k]

    def hybrid_search(
        self,
        query_embedding: list[float],
        doc_type_filter: str | None = None,
        top_k: int = 5,
    ) -> list[tuple[Document, float]]:
        """混合搜索:向量检索 + 标量过滤"""
        q = np.array(query_embedding)
        results = []
        for doc in self.documents:
            if doc_type_filter and doc.doc_type != doc_type_filter:
                continue
            d = np.array(doc.embedding)
            score = float(np.dot(q, d) / (np.linalg.norm(q) * np.linalg.norm(d)))
            results.append((doc, score))
        results.sort(key=lambda x: x[1], reverse=True)
        return results[:top_k]

3. Milvus向量数据库

3.1 向量数据库对比

特性 Milvus Chroma Pinecone Qdrant
部署方式 自托管/云 内嵌/自托管 纯云 自托管/云
最大向量数 百亿级 百万级 十亿级 十亿级
混合检索 ✅ 原生 ⚠️ 有限
多向量字段
GPU加速 N/A
Python SDK pymilvus chromadb pinecone qdrant-client
适用场景 大规模生产 快速原型 全托管 中等规模

3.2 Milvus Lite安装与使用

Milvus Lite是轻量级版本,适合开发和测试,无需启动服务端:

Bash
# 安装pymilvus(自带Milvus Lite)
pip install "pymilvus>=2.4"
Python
"""Milvus Lite快速上手"""

from pymilvus import MilvusClient

# 创建客户端(使用本地文件存储)
client = MilvusClient("./milvus_demo.db")

# 创建Collection
client.create_collection(
    collection_name="docs",
    dimension=1536,  # text-embedding-3-small的维度
)

# 插入数据
data = [
    {"id": 1, "vector": [0.1] * 1536, "text": "什么是RAG?", "source": "doc1.pdf"},
    {"id": 2, "vector": [0.2] * 1536, "text": "向量数据库原理", "source": "doc2.pdf"},
    {"id": 3, "vector": [0.3] * 1536, "text": "多模态嵌入技术", "source": "doc3.pdf"},
]
client.insert(collection_name="docs", data=data)

# 搜索
results = client.search(
    collection_name="docs",
    data=[[0.15] * 1536],  # 查询向量
    limit=2,               # 返回top-2
    output_fields=["text", "source"],  # 返回的字段
)

for result in results[0]:
    print(f"  ID: {result['id']}, 距离: {result['distance']:.4f}")
    print(f"  文本: {result['entity']['text']}")

3.3 Collection创建与索引配置

Python
"""高级Collection配置:Schema定义 + 自定义索引"""

from pymilvus import (
    MilvusClient,
    CollectionSchema,
    FieldSchema,
    DataType,
)

client = MilvusClient("./milvus_advanced.db")

# --- 方法1:使用Schema精确定义 ---

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="doc_type", dtype=DataType.VARCHAR, max_length=20),
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=10000),
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=500),
    FieldSchema(name="page_num", dtype=DataType.INT32),
    FieldSchema(name="dense_vector", dtype=DataType.FLOAT_VECTOR, dim=1536),
]

schema = CollectionSchema(fields=fields, description="多模态文档集合")

# 创建Collection
client.create_collection(
    collection_name="multimodal_docs",
    schema=schema,
)

# 创建索引(HNSW索引适合大多数场景)
index_params = client.prepare_index_params()
index_params.add_index(
    field_name="dense_vector",
    index_type="HNSW",
    metric_type="COSINE",
    params={
        "M": 16,              # 每层连接数(越大越精确,越慢)
        "efConstruction": 256, # 构建时搜索宽度
    },
)

client.create_index(
    collection_name="multimodal_docs",
    index_params=index_params,
)

print("✅ Collection和索引创建完成")

3.4 向量插入与检索

Python
"""完整的插入与检索流程"""

from pymilvus import MilvusClient
from openai import OpenAI

openai_client = OpenAI()
milvus_client = MilvusClient("./milvus_rag.db")

# 创建collection
milvus_client.create_collection(
    collection_name="knowledge_base",
    dimension=1536,
)

def embed(texts: list[str]) -> list[list[float]]:
    resp = openai_client.embeddings.create(
        model="text-embedding-3-small", input=texts
    )
    return [d.embedding for d in resp.data]

# ---- 批量插入 ----

documents = [
    {"text": "RAG通过检索增强生成,提高LLM的准确性和时效性。", "source": "rag_intro.md"},
    {"text": "向量数据库存储高维向量,支持近似最近邻搜索。", "source": "vector_db.md"},
    {"text": "HNSW是一种基于图的近似最近邻搜索算法。", "source": "hnsw.md"},
    {"text": "多模态RAG可以处理文本、图像、表格等多种数据类型。", "source": "multimodal.md"},
    {"text": "Text2SQL将自然语言查询转换为SQL语句。", "source": "text2sql.md"},
]

texts = [d["text"] for d in documents]
vectors = embed(texts)

data = [
    {
        "id": i,
        "vector": vectors[i],
        "text": documents[i]["text"],
        "source": documents[i]["source"],
    }
    for i in range(len(documents))
]

milvus_client.insert(collection_name="knowledge_base", data=data)
print(f"✅ 插入 {len(data)} 条数据")

# ---- 检索 ----

def search(query: str, top_k: int = 3) -> list[dict]:
    query_vector = embed([query])[0]
    results = milvus_client.search(
        collection_name="knowledge_base",
        data=[query_vector],
        limit=top_k,
        output_fields=["text", "source"],
    )
    return [
        {
            "text": r["entity"]["text"],
            "source": r["entity"]["source"],
            "score": r["distance"],
        }
        for r in results[0]
    ]

# 测试检索
results = search("什么是向量搜索?")
for r in results:
    print(f"  [{r['score']:.4f}] {r['text']} ({r['source']})")

3.5 混合检索(向量 + 标量过滤)

Python
"""混合检索:向量相似度 + 标量过滤条件"""

from pymilvus import MilvusClient

client = MilvusClient("./milvus_hybrid.db")

# 创建collection
client.create_collection(
    collection_name="articles",
    dimension=1536,
)

# 假设已插入数据,每条包含: id, vector, text, category, date

# --- 带过滤条件的检索 ---

def hybrid_search(
    query_vector: list[float],
    category: str | None = None,
    date_after: str | None = None,
    top_k: int = 5,
) -> list:
    """混合检索:向量相似度 + 标量过滤"""
    # 构建过滤表达式
    filters = []
    if category:
        filters.append(f'category == "{category}"')
    if date_after:
        filters.append(f'date >= "{date_after}"')

    filter_expr = " and ".join(filters) if filters else ""

    results = client.search(
        collection_name="articles",
        data=[query_vector],
        limit=top_k,
        filter=filter_expr,  # 标量过滤
        output_fields=["text", "category", "date"],
    )
    return results[0]

# 使用示例
# results = hybrid_search(
#     query_vector=embed(["RAG技术"])[0],
#     category="AI",
#     date_after="2024-01-01",
#     top_k=5,
# )

3.6 性能调优

Python
"""Milvus性能调优要点"""

# 1. 索引类型选择
index_configs = {
    # 小规模(<100万向量):精确且快速
    "small_scale": {
        "index_type": "HNSW",
        "metric_type": "COSINE",
        "params": {"M": 16, "efConstruction": 256},
    },
    # 中规模(100万-1000万):平衡精度和速度
    "medium_scale": {
        "index_type": "IVF_FLAT",
        "metric_type": "COSINE",
        "params": {"nlist": 1024},
    },
    # 大规模(>1000万):用量化压缩内存
    "large_scale": {
        "index_type": "IVF_PQ",
        "metric_type": "L2",
        "params": {"nlist": 2048, "m": 16, "nbits": 8},
    },
}

# 2. 搜索参数调优
search_params = {
    "HNSW": {"ef": 128},           # ef越大越精确,越慢
    "IVF_FLAT": {"nprobe": 32},    # nprobe越大越精确,越慢
    "IVF_PQ": {"nprobe": 64},
}

# 3. 批量操作(比逐条插入快10-100倍)
# ✅ 推荐:批量插入
# client.insert(collection_name="docs", data=batch_data)

# ❌ 避免:逐条插入
# for item in data:
#     client.insert(collection_name="docs", data=[item])

3.7 完整代码实战

Python
"""完整Milvus RAG实战:从文档到检索到生成"""

from pymilvus import MilvusClient
from openai import OpenAI
import json

openai_client = OpenAI()
milvus_client = MilvusClient("./rag_production.db")

COLLECTION_NAME = "production_kb"
EMBEDDING_DIM = 1536

# ---- 初始化 ----

def init_collection():
    """初始化Collection"""
    if milvus_client.has_collection(COLLECTION_NAME):
        milvus_client.drop_collection(COLLECTION_NAME)

    milvus_client.create_collection(
        collection_name=COLLECTION_NAME,
        dimension=EMBEDDING_DIM,
    )
    print(f"✅ Collection '{COLLECTION_NAME}' 已创建")

# ---- 数据预处理 ----

def chunk_documents(texts: list[str], chunk_size: int = 500, overlap: int = 50) -> list[dict]:
    """切分文档"""
    chunks = []
    for doc_idx, text in enumerate(texts):  # enumerate同时获取索引和元素
        start = 0
        chunk_idx = 0
        while start < len(text):
            end = min(start + chunk_size, len(text))
            chunks.append({
                "text": text[start:end],
                "doc_id": doc_idx,
                "chunk_id": chunk_idx,
            })
            start += chunk_size - overlap
            chunk_idx += 1
    return chunks

def embed_batch(texts: list[str], batch_size: int = 100) -> list[list[float]]:
    """批量Embedding"""
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        resp = openai_client.embeddings.create(
            model="text-embedding-3-small", input=batch
        )
        all_embeddings.extend([d.embedding for d in resp.data])
    return all_embeddings

# ---- 索引构建 ----

def build_index(documents: list[str]):
    """构建索引"""
    init_collection()

    # 切分
    chunks = chunk_documents(documents)
    texts = [c["text"] for c in chunks]

    # Embedding
    print(f"正在计算 {len(texts)} 个文本块的Embedding...")
    embeddings = embed_batch(texts)

    # 插入Milvus
    data = [
        {
            "id": i,
            "vector": embeddings[i],
            "text": chunks[i]["text"],
            "doc_id": chunks[i]["doc_id"],
            "chunk_id": chunks[i]["chunk_id"],
        }
        for i in range(len(chunks))
    ]

    milvus_client.insert(collection_name=COLLECTION_NAME, data=data)
    print(f"✅ 已索引 {len(data)} 个文本块")

# ---- 检索与生成 ----

def retrieve_and_generate(query: str, top_k: int = 3) -> str:
    """RAG:检索 + 生成"""
    # 检索
    query_emb = embed_batch([query])[0]
    results = milvus_client.search(
        collection_name=COLLECTION_NAME,
        data=[query_emb],
        limit=top_k,
        output_fields=["text", "doc_id", "chunk_id"],
    )

    contexts = []
    for r in results[0]:
        contexts.append(r["entity"]["text"])

    context_text = "\n---\n".join(contexts)

    # 生成
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "根据检索到的文档回答问题。引用相关内容。"
                    f"\n\n【检索结果】\n{context_text}"
                ),
            },
            {"role": "user", "content": query},
        ],
    )
    return response.choices[0].message.content

# ---- 使用示例 ----
if __name__ == "__main__":
    # 构建索引
    sample_docs = [
        "RAG(Retrieval-Augmented Generation)是一种将检索和生成结合的技术...",
        "Milvus是一个开源的向量数据库,专为相似性搜索和AI应用设计...",
    ]
    build_index(sample_docs)

    # 查询
    answer = retrieve_and_generate("RAG的核心原理是什么?")
    print(f"\n回答:{answer}")

4. 多模态RAG实现

4.1 文档中的图像处理

Python
"""从PDF中提取并处理图像"""

import pymupdf
from pathlib import Path
import base64
from openai import OpenAI

client = OpenAI()

def extract_pdf_content(pdf_path: str) -> list[dict]:
    """从PDF提取文本和图像"""
    doc = pymupdf.open(pdf_path)
    contents = []

    for page_num, page in enumerate(doc):
        # 提取文本
        text = page.get_text()
        if text.strip():  # 链式调用:strip去除空白
            contents.append({
                "type": "text",
                "content": text.strip(),
                "page": page_num + 1,
                "source": pdf_path,
            })

        # 提取图像
        images = page.get_images(full=True)
        for img_idx, img in enumerate(images):
            xref = img[0]
            pix = pymupdf.Pixmap(doc, xref)
            if pix.n > 4:  # CMYK转RGB
                pix = pymupdf.Pixmap(pymupdf.csRGB, pix)

            img_bytes = pix.tobytes("png")
            b64 = base64.b64encode(img_bytes).decode("utf-8")

            # 用Vision LLM生成图像描述
            description = describe_image(b64)

            contents.append({
                "type": "image",
                "content": b64,  # base64编码的图像
                "description": description,
                "page": page_num + 1,
                "source": pdf_path,
            })

    return contents

def describe_image(image_b64: str) -> str:
    """用GPT-4o生成图像描述"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "请详细描述这张图片的内容,用中文回答。"},
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/png;base64,{image_b64}"},
                    },
                ],
            }
        ],
        max_tokens=300,
    )
    return response.choices[0].message.content

4.2 表格数据向量化

Python
"""表格数据处理:保留结构信息"""

import pandas as pd
from openai import OpenAI

client = OpenAI()

def table_to_searchable_text(df: pd.DataFrame, table_name: str = "") -> list[str]:
    """将表格转换为可检索的文本块"""
    texts = []

    # 1. 表格整体描述
    overview = f"表格: {table_name}\n"
    overview += f"列: {', '.join(df.columns)}\n"
    overview += f"行数: {len(df)}\n"
    overview += f"数据类型: {dict(df.dtypes)}\n"
    texts.append(overview)

    # 2. 表格Markdown格式(适合小表格)
    if len(df) <= 50:
        md = df.to_markdown(index=False)
        texts.append(f"表格 {table_name} 的完整数据:\n{md}")

    # 3. 按行转换(适合大表格)
    for idx, row in df.iterrows():
        row_text = f"表格 {table_name}{idx+1}行: "
        row_text += "; ".join(f"{col}={val}" for col, val in row.items())
        texts.append(row_text)

    # 4. 统计摘要
    numeric_cols = df.select_dtypes(include="number").columns
    if len(numeric_cols) > 0:
        summary = f"表格 {table_name} 统计摘要:\n"
        summary += df[numeric_cols].describe().to_markdown()
        texts.append(summary)

    return texts

# 示例
# df = pd.read_csv("sales_data.csv")
# chunks = table_to_searchable_text(df, "销售数据")

4.3 统一索引构建

Python
"""多模态统一索引:文本 + 图像 + 表格"""

from pymilvus import MilvusClient
from openai import OpenAI
import json

openai_client = OpenAI()
milvus_client = MilvusClient("./multimodal_rag.db")

COLLECTION = "multimodal_kb"

def init_multimodal_collection():
    if milvus_client.has_collection(COLLECTION):
        milvus_client.drop_collection(COLLECTION)
    milvus_client.create_collection(
        collection_name=COLLECTION,
        dimension=1536,
    )

def embed(texts: list[str]) -> list[list[float]]:
    resp = openai_client.embeddings.create(
        model="text-embedding-3-small", input=texts
    )
    return [d.embedding for d in resp.data]

def build_multimodal_index(contents: list[dict]):
    """构建多模态统一索引"""
    init_multimodal_collection()

    data = []
    texts_to_embed = []

    for item in contents:
        if item["type"] == "text":
            texts_to_embed.append(item["content"])
        elif item["type"] == "image":
            texts_to_embed.append(item["description"])
        elif item["type"] == "table":
            texts_to_embed.append(item["content"])

    embeddings = embed(texts_to_embed)

    for i, item in enumerate(contents):
        data.append({
            "id": i,
            "vector": embeddings[i],
            "doc_type": item["type"],
            "content": item.get("description", item["content"])[:5000],
            "page": item.get("page", 0),
            "source": item.get("source", ""),
        })

    milvus_client.insert(collection_name=COLLECTION, data=data)
    print(f"✅ 已索引 {len(data)} 个多模态文档块")

4.4 多模态检索与重排序

Python
"""多模态检索 + Cross-Encoder重排序"""

from sentence_transformers import CrossEncoder
import numpy as np

# 加载重排序模型
reranker = CrossEncoder("BAAI/bge-reranker-v2-m3", max_length=512)

def multimodal_search_with_rerank(
    query: str,
    top_k: int = 5,
    rerank_top_n: int = 20,
) -> list[dict]:
    """多模态检索 + 重排序"""
    # Step 1: 向量检索(召回较多候选)
    query_emb = embed([query])[0]
    candidates = milvus_client.search(
        collection_name=COLLECTION,
        data=[query_emb],
        limit=rerank_top_n,
        output_fields=["doc_type", "content", "page", "source"],
    )[0]

    # Step 2: Cross-Encoder重排序
    pairs = [(query, c["entity"]["content"]) for c in candidates]
    rerank_scores = reranker.predict(pairs)

    # Step 3: 合并分数并排序
    results = []
    for i, candidate in enumerate(candidates):
        results.append({
            "content": candidate["entity"]["content"],
            "doc_type": candidate["entity"]["doc_type"],
            "page": candidate["entity"]["page"],
            "source": candidate["entity"]["source"],
            "vector_score": candidate["distance"],
            "rerank_score": float(rerank_scores[i]),
        })

    results.sort(key=lambda x: x["rerank_score"], reverse=True)
    return results[:top_k]

4.5 生成时的多模态上下文整合

Python
"""多模态RAG生成:将检索到的文本、图像描述、表格整合为上下文"""

from openai import OpenAI

client = OpenAI()

def multimodal_generate(query: str, retrieved_docs: list[dict]) -> str:
    """基于多模态检索结果生成回答"""
    # 按类型组织上下文
    text_contexts = []
    image_contexts = []
    table_contexts = []

    for doc in retrieved_docs:
        if doc["doc_type"] == "text":
            text_contexts.append(f"[文本, 第{doc['page']}页] {doc['content']}")
        elif doc["doc_type"] == "image":
            image_contexts.append(f"[图像描述, 第{doc['page']}页] {doc['content']}")
        elif doc["doc_type"] == "table":
            table_contexts.append(f"[表格, 第{doc['page']}页] {doc['content']}")

    context_parts = []
    if text_contexts:
        context_parts.append("## 文本内容\n" + "\n\n".join(text_contexts))
    if image_contexts:
        context_parts.append("## 图像内容\n" + "\n\n".join(image_contexts))
    if table_contexts:
        context_parts.append("## 表格数据\n" + "\n\n".join(table_contexts))

    context = "\n\n".join(context_parts)

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "你是一个知识库问答助手。根据检索到的多模态内容回答问题。\n"
                    "内容可能包含文本、图像描述和表格数据,请综合利用。\n"
                    "回答要准确,引用来源(页码)。\n\n"
                    f"【检索内容】\n{context}"
                ),
            },
            {"role": "user", "content": query},
        ],
    )
    return response.choices[0].message.content

5. Text2SQL技术

5.1 为什么需要Text2SQL

场景 RAG方案 Text2SQL方案
"2024年Q3销售额多少?" 需要索引所有数据行 直接生成 SELECT SUM(amount) WHERE quarter='2024Q3'
"哪个产品利润率最高?" 难以精确计算 SELECT product, MAX(profit_rate) ...
聚合统计查询 不擅长 ✅ 天然擅长
模糊语义查询 ✅ 擅长 不擅长

结论:Text2SQL和RAG互补,结构化数据用SQL,非结构化数据用向量检索。

5.2 基于LLM的Text2SQL实现

Python
"""Text2SQL:自然语言转SQL查询"""

import sqlite3
from openai import OpenAI

client = OpenAI()

# ---- 创建示例数据库 ----

def create_sample_db(db_path: str = "sample.db"):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS sales (
            id INTEGER PRIMARY KEY,
            product TEXT NOT NULL,
            category TEXT NOT NULL,
            amount REAL NOT NULL,
            quantity INTEGER NOT NULL,
            date TEXT NOT NULL,
            region TEXT NOT NULL
        )
    """)

    sample_data = [
        ("iPhone 15", "手机", 5999, 120, "2024-01-15", "华东"),
        ("MacBook Pro", "笔记本", 14999, 45, "2024-01-20", "华北"),
        ("iPad Air", "平板", 4799, 80, "2024-02-10", "华东"),
        ("AirPods Pro", "耳机", 1899, 200, "2024-02-15", "华南"),
        ("iPhone 15", "手机", 5999, 150, "2024-03-01", "华北"),
        ("MacBook Air", "笔记本", 8999, 60, "2024-03-10", "华南"),
        ("Apple Watch", "手表", 2999, 90, "2024-03-20", "华东"),
    ]

    cursor.executemany(
        "INSERT INTO sales (product, category, amount, quantity, date, region) VALUES (?, ?, ?, ?, ?, ?)",
        sample_data,
    )
    conn.commit()
    conn.close()

# ---- Schema提取 ----

def get_schema(db_path: str) -> str:
    """提取数据库Schema信息"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tables = cursor.fetchall()

    schema_parts = []
    for (table_name,) in tables:
        cursor.execute(f"PRAGMA table_info({table_name})")
        columns = cursor.fetchall()

        col_defs = []
        for col in columns:
            col_defs.append(f"  {col[1]} {col[2]}")
        schema_parts.append(f"CREATE TABLE {table_name} (\n" + ",\n".join(col_defs) + "\n)")

        # 获取样本数据
        cursor.execute(f"SELECT * FROM {table_name} LIMIT 3")
        samples = cursor.fetchall()
        if samples:
            col_names = [c[1] for c in columns]
            schema_parts.append(f"-- 样本数据 ({table_name}):")
            for sample in samples:
                row = dict(zip(col_names, sample))
                schema_parts.append(f"--   {row}")

    conn.close()
    return "\n\n".join(schema_parts)

5.3 Schema感知的Prompt设计

Python
"""Schema-aware Text2SQL Prompt"""

TEXT2SQL_PROMPT = """你是一个SQL专家。根据用户的自然语言问题,生成对应的SQLite SQL查询。

### 数据库Schema:
{schema}

### 规则:
1. 只生成SELECT查询(不允许INSERT/UPDATE/DELETE)
2. 使用SQLite语法
3. 表名和列名与Schema完全一致
4. 如果问题无法用SQL回答,返回"CANNOT_ANSWER"
5. 只返回SQL语句,不要解释

### 用户问题: {question}

### SQL查询:"""

def text_to_sql(question: str, db_path: str = "sample.db") -> str:
    """自然语言转SQL"""
    schema = get_schema(db_path)

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "user",
                "content": TEXT2SQL_PROMPT.format(schema=schema, question=question),
            }
        ],
        temperature=0,  # SQL生成用低温度确保确定性
    )
    sql = response.choices[0].message.content.strip()
    # 清理可能的markdown代码块
    sql = sql.replace("```sql", "").replace("```", "").strip()
    return sql

5.4 SQL验证与安全

Python
"""SQL安全验证"""

import re
import sqlite3

FORBIDDEN_KEYWORDS = [
    "DROP", "DELETE", "INSERT", "UPDATE", "ALTER",
    "CREATE", "TRUNCATE", "EXEC", "EXECUTE",
    "--",  # SQL注释(可能用于注入)
]

def validate_sql(sql: str) -> tuple[bool, str]:
    """验证SQL安全性"""
    upper_sql = sql.upper().strip()

    # 检查是否为SELECT语句
    if not upper_sql.startswith("SELECT"):
        return False, "只允许SELECT查询"

    # 检查禁止的关键字
    for keyword in FORBIDDEN_KEYWORDS:
        if keyword in upper_sql:
            return False, f"包含禁止的关键字: {keyword}"

    # 检查分号(防止多语句注入)
    if sql.count(";") > 1:
        return False, "不允许多条SQL语句"

    return True, "OK"

def safe_execute_sql(sql: str, db_path: str = "sample.db") -> dict:
    """安全执行SQL并返回结果"""
    # 验证
    is_valid, message = validate_sql(sql)
    if not is_valid:
        return {"success": False, "error": message, "data": None}

    try:  # try/except捕获异常,防止程序崩溃
        conn = sqlite3.connect(db_path)
        conn.execute("PRAGMA query_only = ON")  # 只读模式
        cursor = conn.cursor()

        cursor.execute(sql)
        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()

        conn.close()

        return {
            "success": True,
            "columns": columns,
            "data": [dict(zip(columns, row)) for row in rows],
            "row_count": len(rows),
        }
    except Exception as e:
        return {"success": False, "error": str(e), "data": None}

5.5 与RAG的结合(混合查询路由)

Python
"""混合查询路由:自动判断使用RAG还是Text2SQL"""

from openai import OpenAI

client = OpenAI()

ROUTER_PROMPT = """判断用户的问题应该使用哪种方式回答:

1. "SQL" - 如果问题涉及结构化数据查询、统计、聚合、排序、过滤等
2. "RAG" - 如果问题涉及概念解释、原理说明、非结构化知识

只回答 "SQL" 或 "RAG",不要解释。

用户问题: {question}"""

def route_query(question: str) -> str:
    """路由查询到合适的处理方式"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "user", "content": ROUTER_PROMPT.format(question=question)},
        ],
        temperature=0,
        max_tokens=5,
    )
    route = response.choices[0].message.content.strip().upper()
    return "SQL" if "SQL" in route else "RAG"

def hybrid_answer(question: str, db_path: str = "sample.db") -> str:
    """混合问答:自动选择RAG或Text2SQL"""
    route = route_query(question)

    if route == "SQL":
        # Text2SQL路径
        sql = text_to_sql(question, db_path)
        result = safe_execute_sql(sql, db_path)

        if result["success"]:
            # 将SQL结果交给LLM生成自然语言回答
            response = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {
                        "role": "system",
                        "content": "根据SQL查询结果,用自然语言回答用户问题。",
                    },
                    {
                        "role": "user",
                        "content": (
                            f"问题: {question}\n"
                            f"SQL: {sql}\n"
                            f"结果: {result['data']}"
                        ),
                    },
                ],
            )
            return f"📊 [SQL查询]\n{response.choices[0].message.content}"
        else:
            return f"❌ SQL执行失败: {result['error']}"
    else:
        # RAG路径(使用之前实现的retrieve_and_generate)
        answer = retrieve_and_generate(question)
        return f"📚 [知识检索]\n{answer}"

# 示例
# print(hybrid_answer("2024年1月总销售额是多少?"))  → SQL
# print(hybrid_answer("什么是RAG技术?"))              → RAG

5.6 完整Text2SQL代码

Python
"""完整的Text2SQL问答系统"""

import sqlite3
import gradio as gr
from openai import OpenAI

client = OpenAI()

def text2sql_app(question: str, db_path: str = "sample.db") -> tuple[str, str, str]:
    """Text2SQL完整流程,返回(SQL, 查询结果, 自然语言回答)"""
    # 1. 生成SQL
    schema = get_schema(db_path)
    sql = text_to_sql(question, db_path)

    if sql == "CANNOT_ANSWER":
        return "无法生成SQL", "", "该问题无法用SQL回答"

    # 2. 验证并执行
    result = safe_execute_sql(sql, db_path)

    if not result["success"]:
        return sql, f"执行失败: {result['error']}", ""

    # 3. 格式化结果
    import pandas as pd
    df = pd.DataFrame(result["data"])
    result_str = df.to_markdown(index=False)

    # 4. 生成自然语言回答
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": "根据SQL查询结果回答用户的问题,用中文回答,简洁准确。",
            },
            {
                "role": "user",
                "content": f"问题: {question}\nSQL: {sql}\n结果:\n{result_str}",
            },
        ],
    )
    answer = response.choices[0].message.content

    return sql, result_str, answer

# Gradio界面
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("# 📊 Text2SQL智能问答")
    question = gr.Textbox(label="输入问题", placeholder="例如:哪个产品销量最高?")
    btn = gr.Button("查询", variant="primary")
    sql_output = gr.Code(label="生成的SQL", language="sql")
    table_output = gr.Markdown(label="查询结果")
    answer_output = gr.Textbox(label="回答", lines=3)

    btn.click(fn=text2sql_app, inputs=question, outputs=[sql_output, table_output, answer_output])

    gr.Examples(
        examples=[
            "2024年总销售额是多少?",
            "哪个区域的销量最高?",
            "按类别统计平均单价",
            "列出所有手机类产品",
        ],
        inputs=question,
    )

6. RAG系统评估

6.1 RAGAS评估框架

RAGAS (Retrieval Augmented Generation Assessment) 是RAG系统专用评估框架:

Bash
pip install ragas

6.2 评估维度

维度 含义 评估什么
Faithfulness 忠实度 生成的回答是否忠于检索到的上下文
Answer Relevancy 回答相关性 生成的回答是否回答了用户的问题
Context Precision 上下文精确度 检索到的上下文中相关内容排名是否靠前
Context Recall 上下文召回率 回答所需的信息是否都被检索到了

6.3 自动化评估流水线

Python
"""RAG评估实现(基于LLM-as-Judge)"""

from openai import OpenAI
import json

client = OpenAI()

# ---- 评估指标 ----

def evaluate_faithfulness(answer: str, contexts: list[str]) -> float:
    """评估回答对上下文的忠实度"""
    context_text = "\n---\n".join(contexts)
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "你是一个评估专家。判断以下回答是否完全基于给定的上下文。\n"
                    "评分标准:\n"
                    "1.0 = 回答完全基于上下文,没有编造任何信息\n"
                    "0.5 = 回答部分基于上下文,但有些内容无法从上下文中验证\n"
                    "0.0 = 回答主要基于编造的信息\n"
                    "只返回一个数字(0.0/0.5/1.0)。"
                ),
            },
            {
                "role": "user",
                "content": f"上下文:\n{context_text}\n\n回答:\n{answer}",
            },
        ],
        temperature=0,
    )
    try:
        return float(response.choices[0].message.content.strip())
    except ValueError:
        return 0.5

def evaluate_answer_relevancy(question: str, answer: str) -> float:
    """评估回答对问题的相关性"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "你是一个评估专家。判断以下回答是否充分回答了用户的问题。\n"
                    "评分:1.0=完全回答, 0.7=大部分回答, 0.4=部分回答, 0.0=完全无关\n"
                    "只返回一个数字。"
                ),
            },
            {
                "role": "user",
                "content": f"问题:\n{question}\n\n回答:\n{answer}",
            },
        ],
        temperature=0,
    )
    try:
        return float(response.choices[0].message.content.strip())
    except ValueError:
        return 0.5

def evaluate_context_precision(question: str, contexts: list[str]) -> float:
    """评估检索上下文的精确度(相关内容是否排在前面)"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "你是一个评估专家。判断以下检索结果对回答问题的有用程度。\n"
                    "对每个检索结果标记 1(相关)或 0(不相关)。\n"
                    "返回JSON数组,例如: [1, 0, 1]"
                ),
            },
            {
                "role": "user",
                "content": (
                    f"问题: {question}\n\n"
                    + "\n\n".join(f"检索结果{i+1}: {c}" for i, c in enumerate(contexts))
                ),
            },
        ],
        temperature=0,
    )
    try:
        relevance = json.loads(response.choices[0].message.content.strip())  # json.loads将JSON字符串→Python对象
        if not relevance:
            return 0.0
        # Average Precision计算
        hits = 0
        precision_sum = 0.0
        for i, rel in enumerate(relevance):
            if rel == 1:
                hits += 1
                precision_sum += hits / (i + 1)
        return precision_sum / max(sum(relevance), 1)
    except (json.JSONDecodeError, ValueError):
        return 0.5

# ---- 综合评估 ----

def evaluate_rag_sample(
    question: str,
    answer: str,
    contexts: list[str],
) -> dict:
    """综合评估一个RAG样本"""
    scores = {
        "faithfulness": evaluate_faithfulness(answer, contexts),
        "answer_relevancy": evaluate_answer_relevancy(question, answer),
        "context_precision": evaluate_context_precision(question, contexts),
    }
    scores["overall"] = sum(scores.values()) / len(scores)
    return scores

def evaluate_rag_dataset(dataset: list[dict]) -> dict:
    """评估整个数据集"""
    all_scores = []
    for sample in dataset:
        scores = evaluate_rag_sample(
            question=sample["question"],
            answer=sample["answer"],
            contexts=sample["contexts"],
        )
        all_scores.append(scores)
        print(f"  Q: {sample['question'][:50]}... → 综合: {scores['overall']:.2f}")

    # 汇总
    avg_scores = {}
    for key in all_scores[0]:
        avg_scores[key] = sum(s[key] for s in all_scores) / len(all_scores)
    return avg_scores

# 使用示例
eval_dataset = [
    {
        "question": "什么是RAG?",
        "answer": "RAG是检索增强生成技术,通过检索相关文档来增强LLM的生成质量。",
        "contexts": ["RAG(Retrieval-Augmented Generation)通过检索增强生成..."],
    },
    # ... 更多评估样本
]
# results = evaluate_rag_dataset(eval_dataset)
# print(f"评估结果: {results}")

7. 索引优化技术

7.1 HNSW vs IVF vs Flat索引

索引类型 原理 适用规模 精度 速度 内存
FLAT 暴力搜索 <10万 100%
IVF_FLAT 倒排索引 + 暴力 10万-1000万
IVF_PQ 倒排 + 量化 >1000万
HNSW 层级导航小世界图 1万-1000万

7.2 量化技术

Python
"""向量量化对比"""

# PQ (Product Quantization) 乘积量化
# 将高维向量分段,每段独立量化
pq_index_config = {
    "index_type": "IVF_PQ",
    "metric_type": "L2",
    "params": {
        "nlist": 1024,      # 聚类中心数(更多=更精确但更慢)
        "m": 16,            # 子向量数(维度的因子,如1536/16=96)
        "nbits": 8,         # 每个子向量的位数(通常8)
    },
}
# 压缩比:原始1536*4=6144字节 → 16*1=16字节 ≈ 384x压缩

# SQ (Scalar Quantization) 标量量化
# 将float32量化为int8
sq_index_config = {
    "index_type": "IVF_SQ8",
    "metric_type": "L2",
    "params": {
        "nlist": 1024,
    },
}
# 压缩比:4x(float32 → int8)

7.3 索引参数调优

Python
"""索引参数调优指南"""

def get_optimal_params(num_vectors: int, dimension: int, priority: str = "balanced"):
    """根据数据规模和优先级推荐索引参数"""

    if num_vectors < 100_000:
        # 小规模:直接用HNSW
        return {
            "index_type": "HNSW",
            "params": {"M": 16, "efConstruction": 256},
            "search_params": {"ef": 128},
            "notes": "小规模数据,HNSW提供最佳精度-速度平衡",
        }

    elif num_vectors < 10_000_000:
        if priority == "accuracy":
            return {
                "index_type": "HNSW",
                "params": {"M": 32, "efConstruction": 512},
                "search_params": {"ef": 256},
                "notes": "中规模+高精度需求,内存消耗较大",
            }
        elif priority == "speed":
            return {
                "index_type": "IVF_SQ8",
                "params": {"nlist": 4096},
                "search_params": {"nprobe": 64},
                "notes": "中规模+速度优先,4x内存压缩",
            }
        else:
            return {
                "index_type": "IVF_FLAT",
                "params": {"nlist": 2048},
                "search_params": {"nprobe": 128},
                "notes": "中规模均衡方案",
            }

    else:
        # 大规模:必须用量化
        return {
            "index_type": "IVF_PQ",
            "params": {"nlist": 8192, "m": dimension // 96, "nbits": 8},
            "search_params": {"nprobe": 128},
            "notes": "大规模数据,PQ量化显著节省内存",
        }

# 示例
params = get_optimal_params(5_000_000, 1536, priority="balanced")
print(f"推荐索引: {params['index_type']}")
print(f"参数: {params['params']}")
print(f"说明: {params['notes']}")

7.4 检索性能基准测试

Python
"""检索性能基准测试"""

import time
import numpy as np
from pymilvus import MilvusClient

def benchmark_search(
    client: MilvusClient,
    collection_name: str,
    dimension: int,
    num_queries: int = 100,
    top_k: int = 10,
) -> dict:
    """基准测试检索性能"""
    # 生成随机查询向量
    query_vectors = np.random.random((num_queries, dimension)).tolist()

    latencies = []
    for qv in query_vectors:
        start = time.perf_counter()
        client.search(
            collection_name=collection_name,
            data=[qv],
            limit=top_k,
        )
        elapsed = (time.perf_counter() - start) * 1000  # 毫秒
        latencies.append(elapsed)

    return {
        "num_queries": num_queries,
        "top_k": top_k,
        "avg_latency_ms": np.mean(latencies),
        "p50_ms": np.percentile(latencies, 50),
        "p95_ms": np.percentile(latencies, 95),
        "p99_ms": np.percentile(latencies, 99),
        "qps": 1000.0 / np.mean(latencies),
    }

# 使用
# results = benchmark_search(milvus_client, "knowledge_base", 1536)
# print(f"平均延迟: {results['avg_latency_ms']:.2f}ms")
# print(f"P95延迟: {results['p95_ms']:.2f}ms")
# print(f"QPS: {results['qps']:.1f}")

8. 实战:多模态知识库

8.1 处理含图表的PDF文档

Python
"""端到端多模态知识库 —— 处理含图表的PDF"""

import pymupdf
import base64
from pathlib import Path
from dataclasses import dataclass, field
from openai import OpenAI
from pymilvus import MilvusClient
import numpy as np

client = OpenAI()
milvus = MilvusClient("./multimodal_kb.db")

COLLECTION = "mm_knowledge_base"
EMBED_DIM = 1536

@dataclass
class ContentBlock:
    block_id: int
    block_type: str  # "text" | "image" | "table"
    content: str     # 文本内容或图像的LLM描述
    raw_data: str = ""  # 原始数据(图像的base64等)
    page: int = 0
    source: str = ""
    embedding: list[float] = field(default_factory=list)  # 每个实例获得独立的空列表,避免所有实例共享同一个列表

class MultimodalKnowledgeBase:
    """多模态知识库"""

    def __init__(self):
        self.blocks: list[ContentBlock] = []
        self._init_collection()

    def _init_collection(self):
        if milvus.has_collection(COLLECTION):
            milvus.drop_collection(COLLECTION)
        milvus.create_collection(collection_name=COLLECTION, dimension=EMBED_DIM)

    def _embed(self, texts: list[str]) -> list[list[float]]:
        all_embs = []
        for i in range(0, len(texts), 100):
            batch = texts[i:i + 100]
            resp = client.embeddings.create(model="text-embedding-3-small", input=batch)
            all_embs.extend([d.embedding for d in resp.data])
        return all_embs

    def _describe_image(self, img_b64: str) -> str:
        resp = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": [
                    {"type": "text", "text": "请详细描述这张图片的内容,用中文。"},
                    {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_b64}"}},
                ],
            }],
            max_tokens=300,
        )
        return resp.choices[0].message.content

    # ---- 文档加载 ----

    def load_pdf(self, pdf_path: str):
        """加载PDF文档,提取文本和图像"""
        doc = pymupdf.open(pdf_path)
        initial_count = len(self.blocks)
        block_id = initial_count

        for page_num, page in enumerate(doc):
            # 文本
            text = page.get_text().strip()
            if text:
                # 按段落切分
                paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
                # 合并短段落
                merged = []
                current = ""
                for p in paragraphs:
                    if len(current) + len(p) < 500:
                        current += "\n" + p if current else p
                    else:
                        if current:
                            merged.append(current)
                        current = p
                if current:
                    merged.append(current)

                for chunk in merged:
                    self.blocks.append(ContentBlock(
                        block_id=block_id,
                        block_type="text",
                        content=chunk,
                        page=page_num + 1,
                        source=pdf_path,
                    ))
                    block_id += 1

            # 图像
            for img in page.get_images(full=True):
                try:
                    xref = img[0]
                    pix = pymupdf.Pixmap(doc, xref)
                    if pix.width < 50 or pix.height < 50:
                        continue  # 跳过太小的图像
                    if pix.n > 4:
                        pix = pymupdf.Pixmap(pymupdf.csRGB, pix)
                    img_b64 = base64.b64encode(pix.tobytes("png")).decode("utf-8")
                    description = self._describe_image(img_b64)

                    self.blocks.append(ContentBlock(
                        block_id=block_id,
                        block_type="image",
                        content=description,
                        raw_data=img_b64,
                        page=page_num + 1,
                        source=pdf_path,
                    ))
                    block_id += 1
                except Exception:
                    continue

        print(f"✅ 从 {pdf_path} 提取了 {len(self.blocks) - initial_count} 个内容块")

    # ---- 索引构建 ----

    def build_index(self):
        """构建向量索引"""
        if not self.blocks:
            print("⚠️ 没有内容可索引")
            return

        texts = [b.content for b in self.blocks]
        embeddings = self._embed(texts)

        data = []
        for i, block in enumerate(self.blocks):
            block.embedding = embeddings[i]
            data.append({
                "id": block.block_id,
                "vector": embeddings[i],
                "block_type": block.block_type,
                "content": block.content[:5000],
                "page": block.page,
                "source": block.source,
            })

        milvus.insert(collection_name=COLLECTION, data=data)
        print(f"✅ 已索引 {len(data)} 个内容块(文本: {sum(1 for b in self.blocks if b.block_type == 'text')}, 图像: {sum(1 for b in self.blocks if b.block_type == 'image')})")  # sum(1 for...if)条件计数惯用法

    # ---- 统一查询 ----

    def query(self, question: str, top_k: int = 5) -> str:
        """统一查询接口"""
        q_emb = self._embed([question])[0]

        results = milvus.search(
            collection_name=COLLECTION,
            data=[q_emb],
            limit=top_k,
            output_fields=["block_type", "content", "page", "source"],
        )[0]

        # 组织上下文
        context_parts = []
        for r in results:
            entity = r["entity"]
            prefix = {"text": "📝", "image": "🖼️", "table": "📊"}.get(entity["block_type"], "📄")  # 匿名字典+get:用字典实现switch-case映射,未匹配返回默认值
            context_parts.append(
                f"{prefix} [{entity['block_type']}] (第{entity['page']}页, 相似度:{r['distance']:.3f})\n{entity['content']}"
            )

        context = "\n\n---\n\n".join(context_parts)

        # 生成回答
        resp = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": (
                        "你是知识库问答助手。根据以下多模态检索内容回答问题。\n"
                        "内容可能包含文本摘录和图像描述,请综合利用。\n"
                        "引用来源页码。\n\n"
                        f"【检索内容】\n{context}"
                    ),
                },
                {"role": "user", "content": question},
            ],
        )
        return resp.choices[0].message.content

# ---- 使用 ----

if __name__ == "__main__":
    kb = MultimodalKnowledgeBase()

    # 加载文档
    # kb.load_pdf("technical_report.pdf")
    # kb.build_index()

    # 查询
    # answer = kb.query("文档中的架构图展示了什么?")
    # print(answer)

8.2 端到端测试

Python
"""端到端测试脚本"""

def test_multimodal_kb():
    """测试多模态知识库的完整流程"""

    # 1. 创建测试数据
    kb = MultimodalKnowledgeBase()

    # 手动添加测试数据(不依赖PDF)
    test_blocks = [
        ContentBlock(0, "text", "RAG是检索增强生成技术,通过检索外部知识来增强LLM的准确性。", page=1, source="test.pdf"),
        ContentBlock(1, "text", "Milvus是开源向量数据库,支持百亿级向量的高效检索。", page=2, source="test.pdf"),
        ContentBlock(2, "image", "架构图:显示了用户查询经过Embedding模型转换为向量,然后在Milvus中检索,最后由LLM生成回答的流程。", page=3, source="test.pdf"),
        ContentBlock(3, "text", "HNSW索引通过层级图结构实现高效的近似最近邻搜索。", page=4, source="test.pdf"),
        ContentBlock(4, "table", "性能对比表:HNSW召回率99%延迟5ms,IVF_FLAT召回率95%延迟3ms,IVF_PQ召回率90%延迟1ms。", page=5, source="test.pdf"),
    ]

    kb.blocks = test_blocks
    kb.build_index()

    # 2. 测试查询
    test_queries = [
        "什么是RAG?",
        "架构图展示了什么流程?",
        "不同索引类型的性能对比如何?",
    ]

    print("\n" + "=" * 60)
    print("端到端测试")
    print("=" * 60)

    for q in test_queries:
        print(f"\n{q}")
        answer = kb.query(q, top_k=3)
        print(f"💡 {answer}")
        print("-" * 40)

# test_multimodal_kb()

9. 总结与最佳实践

9.1 技术选型指南

场景 推荐方案
纯文本文档RAG OpenAI Embedding + Milvus HNSW
含图表的技术文档 Vision LLM描述 + 统一文本Embedding
结构化数据查询 Text2SQL + LLM回答生成
混合数据场景 查询路由(SQL/RAG分流)
高精度需求 多路召回 + Cross-Encoder重排序
大规模(>1000万向量) Milvus分布式 + IVF_PQ量化

9.2 最佳实践清单

Embedding选择: - 中文场景优先:BGE-M3 > text-embedding-3-small - 需要多模态:CLIP用于图像检索,Vision LLM用于图像理解 - 维度权衡:更高维度=更精确,但索引更大、检索更慢

向量数据库: - 开发测试:Milvus Lite(零配置本地文件) - 生产环境:Milvus standalone或K8s部署 - 必须创建索引,否则每次搜索退化为暴力扫描 - 批量写入(batch insert)比逐条写入快100倍

Text2SQL: - 始终在只读模式下执行SQL - Schema信息 + 样本数据放入Prompt - 使用低temperature(0-0.1)提高确定性 - SQL验证是安全必需环节

RAG评估: - 每次系统变更后跑评估流水线 - 关注Context Precision(检索质量)和Faithfulness(生成质量) - 至少准备50+评估样本才有统计意义

9.3 常见陷阱

陷阱 解决方案
图像直接OCR效果差 用Vision LLM生成结构化描述
表格转文本后丢失结构 保持Markdown/HTML格式或用Text2SQL
向量检索top_k太小 先召回多(20+),再用reranker筛选
Embedding模型选错 中文用BGE系列,英文用OpenAI
Milvus不建索引 数据多时检索极慢,必须建索引
Text2SQL注入风险 只读模式 + 关键字过滤 + SQL验证

9.4 延伸学习

  • GraphRAG:Microsoft开源的基于知识图谱的RAG方案
  • ColBERT:Token级向量交互的精细检索模型
  • Corrective RAG:检索结果不满意时自动修正重检索
  • Agentic RAG:用Agent编排多步检索逻辑

📝 本章小结

知识点 掌握程度
多模态RAG架构 ⭐⭐⭐ 必须掌握
CLIP / BGE-M3 Embedding ⭐⭐⭐ 必须掌握
Milvus基本操作 ⭐⭐⭐ 必须掌握
混合检索(向量+过滤) ⭐⭐⭐ 必须掌握
Text2SQL实现 ⭐⭐ 重要
RAG评估(RAGAS) ⭐⭐ 重要
索引优化与量化 ⭐⭐ 重要
Cross-Encoder重排序 ⭐⭐ 重要

📝 练习与面试

练习1:Milvus向量搜索(⭐)

用Milvus Lite创建Collection,插入100条文本的Embedding,实现语义搜索并返回top-5结果。

练习2:Text2SQL问答(⭐⭐)

搭建一个简单的Text2SQL系统:给定一个SQLite数据库,用自然语言提问,生成并执行SQL,返回结果。注意安全性。

练习3:多模态知识库(⭐⭐⭐)

构建一个完整的多模态RAG系统:加载含image的PDF,提取文本+图像,统一建序,实现检索+生成。

面试题

Q1: HNSW和IVF索引的区别是什么?各适合什么场景?

HNSW是基于图的索引,查询快、精度高,但内存占用大,适合中小规模(<1000万)且对延迟敏感的场景。IVF是基于聚类的索引,内存效率更高,适合大规模数据(>1000万),可结合PQ量化进一步压缩。

Q2: BGE-M3和OpenAI Embedding怎么选?

BGE-M3优势:开源免费、支持本地部署、中文效果更佳、支持Dense+Sparse混合检索。OpenAI优势:无需GPU、API调用简单、英文效果优秀。成本敏感+中文场景用BGE-M3,快速原型+英文场景用OpenAI。

Q3: Text2SQL的安全风险和防护措施有哪些?

主要风险:SQL注入(DROP/DELETE)、数据泄露、资源消耗(全表扫描)。防护:① 只读PRAGMA。② 关键字黑名单过滤。③ SQL语法验证。④ 执行超时限制。⑤ 日志审计所有查询。


导航:上一章 23-Gradio构建AI应用 | 返回目录