跳转至

向量数据库

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

向量数据库图

📖 章节导读

向量数据库是RAG系统的核心组件,专门用于存储和检索高维向量数据。本章将深入探讨向量数据库的原理、选型、使用和优化。

🎯 学习目标

  • 理解向量数据库的核心原理
  • 掌握向量数据库的选型方法
  • 学会使用主流向量数据库
  • 了解向量数据库的优化技巧
  • 掌握大厂面试中的相关问题

6.1 向量数据库概述

6.1.1 什么是向量数据库

定义:向量数据库是一种专门用于存储、索引和查询高维向量数据的数据库系统。

核心功能:

  1. 向量存储:高效存储高维向量
  2. 向量索引:构建向量索引以加速检索
  3. 相似度搜索:快速找到最相似的向量
  4. 元数据管理:管理向量的元数据
  5. 扩展性:支持大规模数据和高并发

为什么需要向量数据库:

  1. 高维数据:传统数据库不适合高维向量
  2. 相似度搜索:需要高效的相似度搜索能力
  3. 大规模数据:需要支持海量向量数据
  4. 实时查询:需要快速响应查询请求

6.1.2 向量数据库 vs 传统数据库

对比分析:

特性 向量数据库 传统数据库
数据类型 向量 结构化数据
查询方式 相似度搜索 精确匹配
索引结构 向量索引 B树、哈希等
查询性能 近似搜索 精确搜索
扩展性 水平扩展 垂直扩展
适用场景 AI应用、推荐 事务处理

选择建议:

  • 选择向量数据库:需要相似度搜索、AI应用、推荐系统
  • 选择传统数据库:事务处理、精确查询、关系型数据

6.1.3 向量数据库的应用场景

主要应用:

  1. RAG系统:文档检索和问答
  2. 推荐系统:相似物品推荐
  3. 图像检索:以图搜图
  4. 语义搜索:语义相似度搜索
  5. 异常检测:检测异常向量

具体案例:

  • 文档问答:检索相关文档
  • 商品推荐:推荐相似商品
  • 人脸识别:检索相似人脸
  • 代码搜索:搜索相似代码
  • 日志分析:检测异常日志

6.2 向量嵌入技术

6.2.1 什么是向量嵌入

定义:向量嵌入是将文本、图像、音频等数据转换为高维向量表示的技术。

核心思想:

  1. 语义表示:向量表示数据的语义信息
  2. 相似度计算:向量距离反映语义相似度
  3. 降维:将高维数据映射到低维空间
  4. 连续表示:离散数据转换为连续向量

为什么需要向量嵌入:

  1. 语义理解:向量表示语义相似性
  2. 相似度计算:可以计算向量之间的相似度
  3. 机器学习:便于机器学习模型处理
  4. 检索效率:向量检索效率高

6.2.2 文本嵌入模型

主流模型:

  1. OpenAI Embeddings:
  2. text-embedding-3-small
  3. text-embedding-3-large
  4. text-embedding-ada-002

  5. Sentence-Transformers:

  6. all-MiniLM-L6-v2
  7. all-mpnet-base-v2
  8. paraphrase-multilingual-MiniLM-L12-v2

  9. Hugging Face Models:

  10. BERT系列
  11. RoBERTa系列
  12. DeBERTa系列

代码实现:

Python
from sentence_transformers import SentenceTransformer
import numpy as np

class TextEmbedder:
    """文本嵌入器"""

    def __init__(self, model_name='all-MiniLM-L6-v2'):
        """
        初始化文本嵌入器

        Args:
            model_name: 模型名称
        """
        self.model = SentenceTransformer(model_name)

    def embed(self, texts: list) -> np.ndarray:
        """
        将文本转换为向量

        Args:
            texts: 文本列表

        Returns:
            向量数组
        """
        embeddings = self.model.encode(texts)
        return embeddings

    def embed_single(self, text: str) -> np.ndarray:
        """
        将单个文本转换为向量

        Args:
            text: 文本

        Returns:
            向量
        """
        embedding = self.model.encode([text])[0]
        return embedding

    def compute_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        """
        计算两个向量的余弦相似度

        Args:
            vec1: 向量1
            vec2: 向量2

        Returns:
            相似度
        """
        # 计算余弦相似度
        similarity = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
        return float(similarity)

# 使用示例
embedder = TextEmbedder()

# 嵌入文本
texts = ["机器学习是人工智能的一个分支", "深度学习使用神经网络", "今天天气不错"]
embeddings = embedder.embed(texts)

print(f"向量形状: {embeddings.shape}")
print(f"向量维度: {embeddings.shape[1]}")

# 计算相似度
similarity = embedder.compute_similarity(embeddings[0], embeddings[1])
print(f"\n文本1和文本2的相似度: {similarity:.4f}")

6.2.3 向量相似度计算

常用方法:

  1. 余弦相似度:
  2. 最常用的方法
  3. 衡量向量方向的相似性
  4. 范围: [-1, 1]

  5. 欧氏距离:

  6. 衡量向量之间的距离
  7. 范围: [0, +∞]
  8. 距离越小越相似

  9. 点积:

  10. 简单高效
  11. 需要向量归一化
  12. 范围: [-1, 1]

代码实现:

Python
class VectorSimilarity:
    """向量相似度计算"""

    @staticmethod  # @staticmethod无需实例即可调用
    def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """
        余弦相似度

        Args:
            vec1: 向量1
            vec2: 向量2

        Returns:
            相似度
        """
        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        similarity = dot_product / (norm1 * norm2 + 1e-8)
        return float(similarity)

    @staticmethod
    def euclidean_distance(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """
        欧氏距离

        Args:
            vec1: 向量1
            vec2: 向量2

        Returns:
            距离
        """
        distance = np.linalg.norm(vec1 - vec2)
        return float(distance)

    @staticmethod
    def dot_product(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """
        点积

        Args:
            vec1: 向量1
            vec2: 向量2

        Returns:
            点积
        """
        return float(np.dot(vec1, vec2))

    @staticmethod
    def find_most_similar(query_vector: np.ndarray,
                        vectors: np.ndarray,
                        top_k: int = 5,
                        method: str = 'cosine') -> list:
        """
        找到最相似的向量

        Args:
            query_vector: 查询向量
            vectors: 向量数组
            top_k: 返回的top-k数量
            method: 相似度方法(cosine/euclidean/dot)

        Returns:
            最相似的向量索引和分数
        """
        if method == 'cosine':
            similarities = np.dot(vectors, query_vector)
            norms = np.linalg.norm(vectors, axis=1) * np.linalg.norm(query_vector)
            scores = similarities / (norms + 1e-8)
            top_indices = np.argsort(scores)[-top_k:][::-1]
            return [(int(idx), float(scores[idx])) for idx in top_indices]

        elif method == 'euclidean':
            distances = np.linalg.norm(vectors - query_vector, axis=1)
            top_indices = np.argsort(distances)[:top_k]
            return [(int(idx), float(distances[idx])) for idx in top_indices]

        else:  # dot product
            scores = np.dot(vectors, query_vector)
            top_indices = np.argsort(scores)[-top_k:][::-1]
            return [(int(idx), float(scores[idx])) for idx in top_indices]

# 使用示例
similarity = VectorSimilarity()

# 计算相似度
vec1 = np.array([1, 2, 3])
vec2 = np.array([1, 2, 4])

cos_sim = similarity.cosine_similarity(vec1, vec2)
euc_dist = similarity.euclidean_distance(vec1, vec2)
dot_prod = similarity.dot_product(vec1, vec2)

print(f"余弦相似度: {cos_sim:.4f}")
print(f"欧氏距离: {euc_dist:.4f}")
print(f"点积: {dot_prod:.4f}")

# 找到最相似的向量
vectors = np.array([[1, 2, 3], [1, 2, 4], [2, 3, 4], [1, 1, 1]])
query = np.array([1, 2, 3])

results = similarity.find_most_similar(query, vectors, top_k=3, method='cosine')
print(f"\n最相似的向量:")
for idx, score in results:
    print(f"索引: {idx}, 分数: {score:.4f}")

6.3 主流向量数据库

6.3.1 Chroma

简介:Chroma是一个开源的向量数据库,易于使用和部署。

特点:

  • 轻量级,易于安装
  • 支持多种嵌入模型
  • 支持元数据过滤
  • 支持持久化存储

安装:

Bash
pip install chromadb

代码实现:

Python
import chromadb
from chromadb.config import Settings

class ChromaVectorDB:
    """Chroma向量数据库"""

    def __init__(self, collection_name: str, persist_directory: str = "./chroma_db"):
        """
        初始化Chroma向量数据库

        Args:
            collection_name: 集合名称
            persist_directory: 持久化目录
        """
        self.client = chromadb.PersistentClient(path=persist_directory)
        self.collection = self.client.get_or_create_collection(
            name=collection_name
        )

    def add_documents(self, documents: list, embeddings: list, metadatas: list = None, ids: list = None):
        """
        添加文档

        Args:
            documents: 文档列表
            embeddings: 向量列表
            metadatas: 元数据列表
            ids: ID列表
        """
        self.collection.add(
            documents=documents,
            embeddings=embeddings,
            metadatas=metadatas,
            ids=ids
        )

    def query(self, query_embedding: list, n_results: int = 5, where: dict = None):
        """
        查询

        Args:
            query_embedding: 查询向量
            n_results: 返回结果数量
            where: 元数据过滤条件

        Returns:
            查询结果
        """
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=n_results,
            where=where
        )
        return results

    def delete(self, ids: list):
        """
        删除文档

        Args:
            ids: ID列表
        """
        self.collection.delete(ids=ids)

    def get(self, ids: list = None, where: dict = None):
        """
        获取文档

        Args:
            ids: ID列表
            where: 元数据过滤条件

        Returns:
            文档列表
        """
        results = self.collection.get(
            ids=ids,
            where=where
        )
        return results

# 使用示例
from sentence_transformers import SentenceTransformer

# 初始化
embedder = TextEmbedder()
chroma_db = ChromaVectorDB("documents")

# 准备数据
documents = [
    "Python是一种高级编程语言",
    "机器学习是人工智能的一个分支",
    "深度学习使用神经网络",
    "Transformer是一种深度学习模型"
]

# 生成嵌入
embeddings = embedder.embed(documents)

# 添加文档
chroma_db.add_documents(
    documents=documents,
    embeddings=embeddings.tolist(),
    ids=[f"doc_{i}" for i in range(len(documents))]
)

# 查询
query = "什么是机器学习"
query_embedding = embedder.embed_single(query)

results = chroma_db.query(query_embedding.tolist(), n_results=2)
print("查询结果:")
for i, (doc, dist) in enumerate(zip(results['documents'][0], results['distances'][0]), 1):  # enumerate同时获取索引和元素
    print(f"\n结果{i}:")
    print(f"文档: {doc}")
    print(f"距离: {dist:.4f}")

6.3.2 Pinecone

简介:Pinecone是一个托管的向量数据库服务,提供高性能的向量搜索。

特点:

  • 完全托管,无需运维
  • 高性能,低延迟
  • 支持大规模数据
  • 提供API接口

安装:

Bash
pip install pinecone

代码实现:

Python
from pinecone import Pinecone, ServerlessSpec

class PineconeVectorDB:
    """Pinecone向量数据库 (v3.0+ API)"""

    def __init__(self, api_key: str, index_name: str, dimension: int = 384):
        """
        初始化Pinecone向量数据库

        Args:
            api_key: API密钥
            index_name: 索引名称
            dimension: 向量维度
        """
        # v3.0+ API: 使用 Pinecone 类而不是 init()
        self.pc = Pinecone(api_key=api_key)

        # 创建索引
        if index_name not in self.pc.list_indexes().names():
            self.pc.create_index(
                name=index_name,
                dimension=dimension,
                metric="cosine",
                spec=ServerlessSpec(
                    cloud="aws",
                    region="us-east-1"
                )
            )

        self.index = self.pc.Index(index_name)

    def upsert(self, vectors: list):
        """
        插入或更新向量

        Args:
            vectors: 向量列表 [(id, vector, metadata), ...]
        """
        self.index.upsert(vectors=vectors)

    def query(self, query_vector: list, top_k: int = 5, filter: dict = None):
        """
        查询

        Args:
            query_vector: 查询向量
            top_k: 返回结果数量
            filter: 元数据过滤条件

        Returns:
            查询结果
        """
        results = self.index.query(
            vector=query_vector,
            top_k=top_k,
            filter=filter,
            include_metadata=True
        )
        return results

    def delete(self, ids: list):
        """
        删除向量

        Args:
            ids: ID列表
        """
        self.index.delete(ids=ids)

# 使用示例
# 注意:需要有效的Pinecone API密钥
# api_key = "your-pinecone-api-key"
# pinecone_db = PineconeVectorDB(api_key, "documents", dimension=384)
#
# # 准备数据
# vectors = [
#     ("doc_0", embeddings[0].tolist(), {"text": documents[0]}),
#     ("doc_1", embeddings[1].tolist(), {"text": documents[1]}),
#     ("doc_2", embeddings[2].tolist(), {"text": documents[2]}),
# ]
#
# # 插入向量
# pinecone_db.upsert(vectors)
#
# # 查询
# results = pinecone_db.query(query_embedding.tolist(), top_k=2)
# print("查询结果:")
# for match in results['matches']:
#     print(f"ID: {match['id']}")
#     print(f"分数: {match['score']:.4f}")
#     print(f"元数据: {match['metadata']}")

6.3.3 Milvus

简介:Milvus是一个开源的向量数据库,支持大规模向量检索。

特点:

  • 开源免费
  • 支持多种索引类型
  • 支持分布式部署
  • 高性能,可扩展

安装:

Bash
pip install pymilvus

代码实现:

Python
from pymilvus import connections, utility, Collection, FieldSchema, CollectionSchema, DataType

class MilvusVectorDB:
    """Milvus向量数据库"""

    def __init__(self, host: str = "localhost", port: int = 19530, collection_name: str = "documents"):
        """
        初始化Milvus向量数据库

        Args:
            host: 主机地址
            port: 端口
            collection_name: 集合名称
        """
        # 连接Milvus
        connections.connect(host=host, port=port)

        self.collection_name = collection_name

        # 创建集合
        self._create_collection()

    def _create_collection(self):
        """创建集合"""
        # 定义字段
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),
            FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535)
        ]

        # 创建Schema
        schema = CollectionSchema(fields, self.collection_name)

        # 创建集合
        if not utility.has_collection(self.collection_name):
            self.collection = Collection(name=self.collection_name, schema=schema)
        else:
            self.collection = Collection(name=self.collection_name)

    def insert(self, embeddings: list, texts: list):
        """
        插入向量

        Args:
            embeddings: 向量列表
            texts: 文本列表
        """
        data = [embeddings, texts]
        self.collection.insert(data)
        self.collection.flush()

    def create_index(self):
        """创建索引"""
        index_params = {
            "metric_type": "COSINE",
            "index_type": "IVF_FLAT",
            "params": {"nlist": 128}
        }
        self.collection.create_index(field_name="embedding", index_params=index_params)

    def load(self):
        """加载集合"""
        self.collection.load()

    def query(self, query_embedding: list, top_k: int = 5):
        """
        查询

        Args:
            query_embedding: 查询向量
            top_k: 返回结果数量

        Returns:
            查询结果
        """
        results = self.collection.search(
            data=[query_embedding],
            anns_field="embedding",
            param={"metric_type": "COSINE", "params": {"nprobe": 10}},
            limit=top_k,
            output_fields=["text"]
        )
        return results

# 使用示例
# 注意:需要先启动Milvus服务
# milvus_db = MilvusVectorDB(host="localhost", port=19530, collection_name="documents")
#
# # 插入向量
# milvus_db.insert(embeddings.tolist(), documents)
#
# # 创建索引
# milvus_db.create_index()
#
# # 加载集合
# milvus_db.load()
#
# # 查询
# results = milvus_db.query(query_embedding.tolist(), top_k=2)
# print("查询结果:")
# for result in results[0]:
#     print(f"距离: {result.distance:.4f}")
#     print(f"文本: {result.entity.get('text')}")

6.4 向量数据库优化

6.4.1 索引优化

索引类型:

  1. FLAT:精确搜索,适合小数据集
  2. IVF_FLAT:平衡精度和速度
  3. IVF_PQ:压缩索引,节省内存
  4. HNSW:高精度,高速度

优化策略:

  1. 选择合适的索引:根据数据规模选择
  2. 调整索引参数:优化nlist、nprobe等参数
  3. 定期重建索引:随着数据变化重建索引

代码实现:

Python
class VectorIndexOptimizer:
    """向量索引优化器"""

    @staticmethod
    def choose_index_type(num_vectors: int, dimension: int) -> str:
        """
        选择索引类型

        Args:
            num_vectors: 向量数量
            dimension: 向量维度

        Returns:
            索引类型
        """
        if num_vectors < 10000:
            return "FLAT"
        elif num_vectors < 100000:
            return "IVF_FLAT"
        elif num_vectors < 1000000:
            return "IVF_PQ"
        else:
            return "HNSW"

    @staticmethod
    def optimize_ivf_params(num_vectors: int) -> dict:
        """
        优化IVF参数

        Args:
            num_vectors: 向量数量

        Returns:
            优化后的参数
        """
        # nlist通常设置为sqrt(num_vectors)
        nlist = int(np.sqrt(num_vectors))

        # nprobe通常设置为nlist的10-20%
        nprobe = max(1, int(nlist * 0.1))

        return {
            "nlist": nlist,
            "nprobe": nprobe
        }

# 使用示例
optimizer = VectorIndexOptimizer()

# 选择索引类型
index_type = optimizer.choose_index_type(50000, 384)
print(f"推荐的索引类型: {index_type}")

# 优化IVF参数
params = optimizer.optimize_ivf_params(50000)
print(f"IVF参数: {params}")

6.4.2 查询优化

优化策略:

  1. 批量查询:减少网络往返
  2. 异步查询:提高并发性能
  3. 缓存查询结果:减少重复查询
  4. 使用近似搜索:平衡精度和速度

代码实现:

Python
import asyncio  # Python标准异步库
from functools import lru_cache

class OptimizedVectorDB(ChromaVectorDB):
    """优化的向量数据库"""

    def __init__(self, collection_name: str, persist_directory: str = "./chroma_db"):
        super().__init__(collection_name, persist_directory)  # super()调用父类方法
        self.query_cache = {}

    @lru_cache(maxsize=1000)
    def query_with_cache(self, query_embedding: tuple, n_results: int = 5) -> dict:
        """
        带缓存的查询

        Args:
            query_embedding: 查询向量(转换为tuple以支持缓存)
            n_results: 返回结果数量

        Returns:
            查询结果
        """
        return self.query(list(query_embedding), n_results)

    async def async_query(self, query_embedding: list, n_results: int = 5) -> dict:  # async def定义协程函数
        """
        异步查询

        Args:
            query_embedding: 查询向量
            n_results: 返回结果数量

        Returns:
            查询结果
        """
        # 这里简化处理,实际可以使用异步库
        return self.query(query_embedding, n_results)

    async def batch_query(self, query_embeddings: list, n_results: int = 5) -> list:
        """
        批量查询

        Args:
            query_embeddings: 查询向量列表
            n_results: 返回结果数量

        Returns:
            查询结果列表
        """
        tasks = [
            self.async_query(embedding, n_results)
            for embedding in query_embeddings
        ]
        results = await asyncio.gather(*tasks)  # await等待异步操作完成
        return results

# 使用示例
optimized_db = OptimizedVectorDB("documents")

# 带缓存的查询
query_embedding_tuple = tuple(query_embedding)
result = optimized_db.query_with_cache(query_embedding_tuple, n_results=2)
print("带缓存的查询结果:", result)

6.4.3 性能监控

监控指标:

  1. 查询延迟:查询响应时间
  2. 吞吐量:每秒查询数(QPS)
  3. 索引大小:索引占用空间
  4. 内存使用:内存占用情况

代码实现:

Python
import time
from collections.abc import Callable

class VectorDBMonitor:
    """向量数据库监控器"""

    def __init__(self, db):
        self.db = db
        self.query_times = []
        self.query_count = 0

    def monitor_query(self, query_func: Callable, *args, **kwargs):  # *args收集位置参数;**kwargs收集关键字参数
        """
        监控查询性能

        Args:
            query_func: 查询函数
            *args: 位置参数
            **kwargs: 关键字参数

        Returns:
            查询结果
        """
        start_time = time.time()
        result = query_func(*args, **kwargs)
        end_time = time.time()

        query_time = end_time - start_time
        self.query_times.append(query_time)
        self.query_count += 1

        return result

    def get_stats(self) -> dict:
        """
        获取统计信息

        Returns:
            统计信息
        """
        if not self.query_times:
            return {}

        return {
            "query_count": self.query_count,
            "avg_query_time": np.mean(self.query_times),
            "min_query_time": np.min(self.query_times),
            "max_query_time": np.max(self.query_times),
            "median_query_time": np.median(self.query_times),
            "p95_query_time": np.percentile(self.query_times, 95),
            "p99_query_time": np.percentile(self.query_times, 99)
        }

# 使用示例
monitor = VectorDBMonitor(chroma_db)

# 监控查询
result = monitor.monitor_query(
    chroma_db.query,
    query_embedding.tolist(),
    n_results=2
)

# 获取统计信息
stats = monitor.get_stats()
print("查询统计:")
for key, value in stats.items():
    print(f"{key}: {value:.4f}")

6.5 练习题

练习题1:基础向量数据库

题目:使用Chroma实现一个简单的向量数据库,支持添加和查询向量。

参考答案:

Python
import chromadb

class SimpleVectorDB:
    """简单向量数据库"""

    def __init__(self, collection_name: str):
        self.client = chromadb.EphemeralClient()  # 内存临时存储,生产环境使用 PersistentClient
        self.collection = self.client.get_or_create_collection(collection_name)

    def add(self, vectors: list, documents: list, ids: list):
        """添加向量"""
        self.collection.add(
            embeddings=vectors,
            documents=documents,
            ids=ids
        )

    def query(self, query_vector: list, n_results: int = 5):
        """查询"""
        results = self.collection.query(
            query_embeddings=[query_vector],
            n_results=n_results
        )
        return results

练习题2:相似度计算

题目:实现余弦相似度、欧氏距离和点积三种相似度计算方法。

参考答案:

Python
import numpy as np

class SimilarityCalculator:
    """相似度计算器"""

    @staticmethod
    def cosine(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """余弦相似度"""
        return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))

    @staticmethod
    def euclidean(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """欧氏距离"""
        return float(np.linalg.norm(vec1 - vec2))

    @staticmethod
    def dot_product(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """点积"""
        return float(np.dot(vec1, vec2))

6.6 面试准备

6.6.1 大厂面试题

字节跳动面试题:

  1. 问题:什么是向量数据库?它有什么优势?

参考答案: - 向量数据库是专门存储和检索向量的数据库 - 优势: - 高效的相似度搜索 - 支持高维向量 - 可扩展性强 - 适合AI应用

  1. 问题:向量数据库和传统数据库有什么区别?

参考答案: - 数据类型:向量 vs 结构化数据 - 查询方式:相似度搜索 vs 精确匹配 - 索引结构:向量索引 vs B树 - 适用场景:AI应用 vs 事务处理

腾讯面试题:

  1. 问题:如何选择合适的向量数据库?

参考答案: - 数据规模:小数据集用Chroma,大数据集用Pinecone/Milvus - 部署方式:自建用Milvus,托管用Pinecone - 成本考虑:开源免费,托管付费 - 性能要求:高性能用Pinecone,一般用Chroma

  1. 问题:如何优化向量数据库的性能?

参考答案: - 索引优化:选择合适的索引类型 - 查询优化:批量查询、异步查询 - 缓存优化:缓存查询结果 - 参数调优:调整索引参数

阿里巴巴面试题:

  1. 问题:向量嵌入有哪些常用模型?

参考答案: - OpenAI Embeddings: text-embedding-3-small - Sentence-Transformers: all-MiniLM-L6-v2 - Hugging Face Models: BERT、RoBERTa - 选择依据:语言、性能、成本

  1. 问题:在实际项目中如何应用向量数据库?

参考答案: - 需求分析:确定应用场景 - 数据准备:准备文档和向量 - 数据库选型:选择合适的向量数据库 - 系统设计:设计系统架构 - 实现开发:实现向量存储和检索 - 测试优化:测试和优化性能

6.6.2 面试技巧

技巧1:理论联系实际

结合实际项目经验,说明如何应用向量数据库解决实际问题。

技巧2:对比分析

对比不同向量数据库的优缺点,说明选择依据。

技巧3:展示思考过程

说明选择向量数据库和优化策略的思考过程。

技巧4:持续优化

说明如何通过监控和调优不断提升性能。

📝 本章小结

本章系统介绍了向量数据库的核心内容:

  1. ✅ 向量数据库概述:定义、与传统数据库对比、应用场景
  2. ✅ 向量嵌入技术:文本嵌入、相似度计算
  3. ✅ 主流向量数据库:Chroma、Pinecone、Milvus
  4. ✅ 向量数据库优化:索引优化、查询优化、性能监控
  5. ✅ 练习题:基础向量数据库、相似度计算
  6. ✅ 面试准备:大厂面试题和解答技巧

通过本章学习,你应该能够: - 理解向量数据库的核心原理 - 掌握向量数据库的选型方法 - 学会使用主流向量数据库 - 了解向量数据库的优化技巧 - 准备好应对大厂面试

🔗 下一步

下一章我们将深入学习Agent开发基础,掌握如何构建智能Agent系统。

继续学习: 07-Agent开发基础.md

💡 思考题

  1. 向量数据库的核心原理是什么?

    将数据通过Embedding模型转化为高维向量存储,利用近似最近邻(ANN)算法实现语义相似度检索。核心索引算法:HNSW(高召回率,内存大)、IVF(平衡型)、PQ(省内存,精度换空间)。距离度量:余弦相似度(最常用)、欧氏距离、内积。

  2. 向量数据库和传统数据库有什么区别?

    传统数据库(MySQL/PostgreSQL):精确匹配、结构化查询(SQL)、B+树索引。向量数据库(Milvus/Chroma):语义相似搜索、ANN索引、适合非结构化数据(文本/图像)。关键差异:传统DB查"完全相等",向量DB查"最相似"。pgvector等扩展可让传统DB支持向量检索。

  3. 如何选择合适的向量数据库?

    原型验证→Chroma(轻量内存型);中小规模(<100万向量)→Qdrant(Rust高性能);大规模生产→Milvus(分布式,十亿级);全托管→Pinecone;已有PG→pgvector。选型维度:数据规模、延迟要求、是否需要混合查询、运维成本、开源vs托管。

  4. 如何优化向量数据库的性能?

    ①索引选择(小数据用FLAT,大数据用HNSW/IVF) ②向量维度压缩(PQ量化) ③合理分片和副本 ④批量写入而非逐条 ⑤预热索引到内存 ⑥选择合适的ef/nprobe参数平衡召回率和延迟。

  5. 在实际项目中如何应用向量数据库?

    最典型场景:RAG知识库(文档分块→Embedding→存Milvus→语义检索→喂给LLM)。其他:推荐系统(用户/商品向量相似匹配)、图像搜索(CLIP向量)、去重(相似文档检测)。生产中关注增量更新、Embedding版本管理、监控召回率。

📚 参考资料

  1. Chroma Documentation
  2. Pinecone Documentation
  3. Milvus Documentation
  4. "Efficient Nearest Neighbor Search" - Andoni & Indyk
  5. Sentence-Transformers Documentation

最后更新日期:2026-02-12 适用版本:LLM应用指南 v2026