向量数据库¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
📖 章节导读¶
向量数据库是RAG系统的核心组件,专门用于存储和检索高维向量数据。本章将深入探讨向量数据库的原理、选型、使用和优化。
🎯 学习目标¶
- 理解向量数据库的核心原理
- 掌握向量数据库的选型方法
- 学会使用主流向量数据库
- 了解向量数据库的优化技巧
- 掌握大厂面试中的相关问题
6.1 向量数据库概述¶
6.1.1 什么是向量数据库¶
定义:向量数据库是一种专门用于存储、索引和查询高维向量数据的数据库系统。
核心功能:
- 向量存储:高效存储高维向量
- 向量索引:构建向量索引以加速检索
- 相似度搜索:快速找到最相似的向量
- 元数据管理:管理向量的元数据
- 扩展性:支持大规模数据和高并发
为什么需要向量数据库:
- 高维数据:传统数据库不适合高维向量
- 相似度搜索:需要高效的相似度搜索能力
- 大规模数据:需要支持海量向量数据
- 实时查询:需要快速响应查询请求
6.1.2 向量数据库 vs 传统数据库¶
对比分析:
| 特性 | 向量数据库 | 传统数据库 |
|---|---|---|
| 数据类型 | 向量 | 结构化数据 |
| 查询方式 | 相似度搜索 | 精确匹配 |
| 索引结构 | 向量索引 | B树、哈希等 |
| 查询性能 | 近似搜索 | 精确搜索 |
| 扩展性 | 水平扩展 | 垂直扩展 |
| 适用场景 | AI应用、推荐 | 事务处理 |
选择建议:
- 选择向量数据库:需要相似度搜索、AI应用、推荐系统
- 选择传统数据库:事务处理、精确查询、关系型数据
6.1.3 向量数据库的应用场景¶
主要应用:
- RAG系统:文档检索和问答
- 推荐系统:相似物品推荐
- 图像检索:以图搜图
- 语义搜索:语义相似度搜索
- 异常检测:检测异常向量
具体案例:
- 文档问答:检索相关文档
- 商品推荐:推荐相似商品
- 人脸识别:检索相似人脸
- 代码搜索:搜索相似代码
- 日志分析:检测异常日志
6.2 向量嵌入技术¶
6.2.1 什么是向量嵌入¶
定义:向量嵌入是将文本、图像、音频等数据转换为高维向量表示的技术。
核心思想:
- 语义表示:向量表示数据的语义信息
- 相似度计算:向量距离反映语义相似度
- 降维:将高维数据映射到低维空间
- 连续表示:离散数据转换为连续向量
为什么需要向量嵌入:
- 语义理解:向量表示语义相似性
- 相似度计算:可以计算向量之间的相似度
- 机器学习:便于机器学习模型处理
- 检索效率:向量检索效率高
6.2.2 文本嵌入模型¶
主流模型:
- OpenAI Embeddings:
- text-embedding-3-small
- text-embedding-3-large
-
text-embedding-ada-002
-
Sentence-Transformers:
- all-MiniLM-L6-v2
- all-mpnet-base-v2
-
paraphrase-multilingual-MiniLM-L12-v2
-
Hugging Face Models:
- BERT系列
- RoBERTa系列
- DeBERTa系列
代码实现:
from sentence_transformers import SentenceTransformer
import numpy as np
class TextEmbedder:
"""文本嵌入器"""
def __init__(self, model_name='all-MiniLM-L6-v2'):
"""
初始化文本嵌入器
Args:
model_name: 模型名称
"""
self.model = SentenceTransformer(model_name)
def embed(self, texts: list) -> np.ndarray:
"""
将文本转换为向量
Args:
texts: 文本列表
Returns:
向量数组
"""
embeddings = self.model.encode(texts)
return embeddings
def embed_single(self, text: str) -> np.ndarray:
"""
将单个文本转换为向量
Args:
text: 文本
Returns:
向量
"""
embedding = self.model.encode([text])[0]
return embedding
def compute_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""
计算两个向量的余弦相似度
Args:
vec1: 向量1
vec2: 向量2
Returns:
相似度
"""
# 计算余弦相似度
similarity = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
return float(similarity)
# 使用示例
embedder = TextEmbedder()
# 嵌入文本
texts = ["机器学习是人工智能的一个分支", "深度学习使用神经网络", "今天天气不错"]
embeddings = embedder.embed(texts)
print(f"向量形状: {embeddings.shape}")
print(f"向量维度: {embeddings.shape[1]}")
# 计算相似度
similarity = embedder.compute_similarity(embeddings[0], embeddings[1])
print(f"\n文本1和文本2的相似度: {similarity:.4f}")
6.2.3 向量相似度计算¶
常用方法:
- 余弦相似度:
- 最常用的方法
- 衡量向量方向的相似性
-
范围: [-1, 1]
-
欧氏距离:
- 衡量向量之间的距离
- 范围: [0, +∞]
-
距离越小越相似
-
点积:
- 简单高效
- 需要向量归一化
- 范围: [-1, 1]
代码实现:
class VectorSimilarity:
"""向量相似度计算"""
@staticmethod # @staticmethod无需实例即可调用
def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""
余弦相似度
Args:
vec1: 向量1
vec2: 向量2
Returns:
相似度
"""
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
similarity = dot_product / (norm1 * norm2 + 1e-8)
return float(similarity)
@staticmethod
def euclidean_distance(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""
欧氏距离
Args:
vec1: 向量1
vec2: 向量2
Returns:
距离
"""
distance = np.linalg.norm(vec1 - vec2)
return float(distance)
@staticmethod
def dot_product(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""
点积
Args:
vec1: 向量1
vec2: 向量2
Returns:
点积
"""
return float(np.dot(vec1, vec2))
@staticmethod
def find_most_similar(query_vector: np.ndarray,
vectors: np.ndarray,
top_k: int = 5,
method: str = 'cosine') -> list:
"""
找到最相似的向量
Args:
query_vector: 查询向量
vectors: 向量数组
top_k: 返回的top-k数量
method: 相似度方法(cosine/euclidean/dot)
Returns:
最相似的向量索引和分数
"""
if method == 'cosine':
similarities = np.dot(vectors, query_vector)
norms = np.linalg.norm(vectors, axis=1) * np.linalg.norm(query_vector)
scores = similarities / (norms + 1e-8)
top_indices = np.argsort(scores)[-top_k:][::-1]
return [(int(idx), float(scores[idx])) for idx in top_indices]
elif method == 'euclidean':
distances = np.linalg.norm(vectors - query_vector, axis=1)
top_indices = np.argsort(distances)[:top_k]
return [(int(idx), float(distances[idx])) for idx in top_indices]
else: # dot product
scores = np.dot(vectors, query_vector)
top_indices = np.argsort(scores)[-top_k:][::-1]
return [(int(idx), float(scores[idx])) for idx in top_indices]
# 使用示例
similarity = VectorSimilarity()
# 计算相似度
vec1 = np.array([1, 2, 3])
vec2 = np.array([1, 2, 4])
cos_sim = similarity.cosine_similarity(vec1, vec2)
euc_dist = similarity.euclidean_distance(vec1, vec2)
dot_prod = similarity.dot_product(vec1, vec2)
print(f"余弦相似度: {cos_sim:.4f}")
print(f"欧氏距离: {euc_dist:.4f}")
print(f"点积: {dot_prod:.4f}")
# 找到最相似的向量
vectors = np.array([[1, 2, 3], [1, 2, 4], [2, 3, 4], [1, 1, 1]])
query = np.array([1, 2, 3])
results = similarity.find_most_similar(query, vectors, top_k=3, method='cosine')
print(f"\n最相似的向量:")
for idx, score in results:
print(f"索引: {idx}, 分数: {score:.4f}")
6.3 主流向量数据库¶
6.3.1 Chroma¶
简介:Chroma是一个开源的向量数据库,易于使用和部署。
特点:
- 轻量级,易于安装
- 支持多种嵌入模型
- 支持元数据过滤
- 支持持久化存储
安装:
代码实现:
import chromadb
from chromadb.config import Settings
class ChromaVectorDB:
"""Chroma向量数据库"""
def __init__(self, collection_name: str, persist_directory: str = "./chroma_db"):
"""
初始化Chroma向量数据库
Args:
collection_name: 集合名称
persist_directory: 持久化目录
"""
self.client = chromadb.PersistentClient(path=persist_directory)
self.collection = self.client.get_or_create_collection(
name=collection_name
)
def add_documents(self, documents: list, embeddings: list, metadatas: list = None, ids: list = None):
"""
添加文档
Args:
documents: 文档列表
embeddings: 向量列表
metadatas: 元数据列表
ids: ID列表
"""
self.collection.add(
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
def query(self, query_embedding: list, n_results: int = 5, where: dict = None):
"""
查询
Args:
query_embedding: 查询向量
n_results: 返回结果数量
where: 元数据过滤条件
Returns:
查询结果
"""
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
where=where
)
return results
def delete(self, ids: list):
"""
删除文档
Args:
ids: ID列表
"""
self.collection.delete(ids=ids)
def get(self, ids: list = None, where: dict = None):
"""
获取文档
Args:
ids: ID列表
where: 元数据过滤条件
Returns:
文档列表
"""
results = self.collection.get(
ids=ids,
where=where
)
return results
# 使用示例
from sentence_transformers import SentenceTransformer
# 初始化
embedder = TextEmbedder()
chroma_db = ChromaVectorDB("documents")
# 准备数据
documents = [
"Python是一种高级编程语言",
"机器学习是人工智能的一个分支",
"深度学习使用神经网络",
"Transformer是一种深度学习模型"
]
# 生成嵌入
embeddings = embedder.embed(documents)
# 添加文档
chroma_db.add_documents(
documents=documents,
embeddings=embeddings.tolist(),
ids=[f"doc_{i}" for i in range(len(documents))]
)
# 查询
query = "什么是机器学习"
query_embedding = embedder.embed_single(query)
results = chroma_db.query(query_embedding.tolist(), n_results=2)
print("查询结果:")
for i, (doc, dist) in enumerate(zip(results['documents'][0], results['distances'][0]), 1): # enumerate同时获取索引和元素
print(f"\n结果{i}:")
print(f"文档: {doc}")
print(f"距离: {dist:.4f}")
6.3.2 Pinecone¶
简介:Pinecone是一个托管的向量数据库服务,提供高性能的向量搜索。
特点:
- 完全托管,无需运维
- 高性能,低延迟
- 支持大规模数据
- 提供API接口
安装:
代码实现:
from pinecone import Pinecone, ServerlessSpec
class PineconeVectorDB:
"""Pinecone向量数据库 (v3.0+ API)"""
def __init__(self, api_key: str, index_name: str, dimension: int = 384):
"""
初始化Pinecone向量数据库
Args:
api_key: API密钥
index_name: 索引名称
dimension: 向量维度
"""
# v3.0+ API: 使用 Pinecone 类而不是 init()
self.pc = Pinecone(api_key=api_key)
# 创建索引
if index_name not in self.pc.list_indexes().names():
self.pc.create_index(
name=index_name,
dimension=dimension,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
self.index = self.pc.Index(index_name)
def upsert(self, vectors: list):
"""
插入或更新向量
Args:
vectors: 向量列表 [(id, vector, metadata), ...]
"""
self.index.upsert(vectors=vectors)
def query(self, query_vector: list, top_k: int = 5, filter: dict = None):
"""
查询
Args:
query_vector: 查询向量
top_k: 返回结果数量
filter: 元数据过滤条件
Returns:
查询结果
"""
results = self.index.query(
vector=query_vector,
top_k=top_k,
filter=filter,
include_metadata=True
)
return results
def delete(self, ids: list):
"""
删除向量
Args:
ids: ID列表
"""
self.index.delete(ids=ids)
# 使用示例
# 注意:需要有效的Pinecone API密钥
# api_key = "your-pinecone-api-key"
# pinecone_db = PineconeVectorDB(api_key, "documents", dimension=384)
#
# # 准备数据
# vectors = [
# ("doc_0", embeddings[0].tolist(), {"text": documents[0]}),
# ("doc_1", embeddings[1].tolist(), {"text": documents[1]}),
# ("doc_2", embeddings[2].tolist(), {"text": documents[2]}),
# ]
#
# # 插入向量
# pinecone_db.upsert(vectors)
#
# # 查询
# results = pinecone_db.query(query_embedding.tolist(), top_k=2)
# print("查询结果:")
# for match in results['matches']:
# print(f"ID: {match['id']}")
# print(f"分数: {match['score']:.4f}")
# print(f"元数据: {match['metadata']}")
6.3.3 Milvus¶
简介:Milvus是一个开源的向量数据库,支持大规模向量检索。
特点:
- 开源免费
- 支持多种索引类型
- 支持分布式部署
- 高性能,可扩展
安装:
代码实现:
from pymilvus import connections, utility, Collection, FieldSchema, CollectionSchema, DataType
class MilvusVectorDB:
"""Milvus向量数据库"""
def __init__(self, host: str = "localhost", port: int = 19530, collection_name: str = "documents"):
"""
初始化Milvus向量数据库
Args:
host: 主机地址
port: 端口
collection_name: 集合名称
"""
# 连接Milvus
connections.connect(host=host, port=port)
self.collection_name = collection_name
# 创建集合
self._create_collection()
def _create_collection(self):
"""创建集合"""
# 定义字段
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535)
]
# 创建Schema
schema = CollectionSchema(fields, self.collection_name)
# 创建集合
if not utility.has_collection(self.collection_name):
self.collection = Collection(name=self.collection_name, schema=schema)
else:
self.collection = Collection(name=self.collection_name)
def insert(self, embeddings: list, texts: list):
"""
插入向量
Args:
embeddings: 向量列表
texts: 文本列表
"""
data = [embeddings, texts]
self.collection.insert(data)
self.collection.flush()
def create_index(self):
"""创建索引"""
index_params = {
"metric_type": "COSINE",
"index_type": "IVF_FLAT",
"params": {"nlist": 128}
}
self.collection.create_index(field_name="embedding", index_params=index_params)
def load(self):
"""加载集合"""
self.collection.load()
def query(self, query_embedding: list, top_k: int = 5):
"""
查询
Args:
query_embedding: 查询向量
top_k: 返回结果数量
Returns:
查询结果
"""
results = self.collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=top_k,
output_fields=["text"]
)
return results
# 使用示例
# 注意:需要先启动Milvus服务
# milvus_db = MilvusVectorDB(host="localhost", port=19530, collection_name="documents")
#
# # 插入向量
# milvus_db.insert(embeddings.tolist(), documents)
#
# # 创建索引
# milvus_db.create_index()
#
# # 加载集合
# milvus_db.load()
#
# # 查询
# results = milvus_db.query(query_embedding.tolist(), top_k=2)
# print("查询结果:")
# for result in results[0]:
# print(f"距离: {result.distance:.4f}")
# print(f"文本: {result.entity.get('text')}")
6.4 向量数据库优化¶
6.4.1 索引优化¶
索引类型:
- FLAT:精确搜索,适合小数据集
- IVF_FLAT:平衡精度和速度
- IVF_PQ:压缩索引,节省内存
- HNSW:高精度,高速度
优化策略:
- 选择合适的索引:根据数据规模选择
- 调整索引参数:优化nlist、nprobe等参数
- 定期重建索引:随着数据变化重建索引
代码实现:
class VectorIndexOptimizer:
"""向量索引优化器"""
@staticmethod
def choose_index_type(num_vectors: int, dimension: int) -> str:
"""
选择索引类型
Args:
num_vectors: 向量数量
dimension: 向量维度
Returns:
索引类型
"""
if num_vectors < 10000:
return "FLAT"
elif num_vectors < 100000:
return "IVF_FLAT"
elif num_vectors < 1000000:
return "IVF_PQ"
else:
return "HNSW"
@staticmethod
def optimize_ivf_params(num_vectors: int) -> dict:
"""
优化IVF参数
Args:
num_vectors: 向量数量
Returns:
优化后的参数
"""
# nlist通常设置为sqrt(num_vectors)
nlist = int(np.sqrt(num_vectors))
# nprobe通常设置为nlist的10-20%
nprobe = max(1, int(nlist * 0.1))
return {
"nlist": nlist,
"nprobe": nprobe
}
# 使用示例
optimizer = VectorIndexOptimizer()
# 选择索引类型
index_type = optimizer.choose_index_type(50000, 384)
print(f"推荐的索引类型: {index_type}")
# 优化IVF参数
params = optimizer.optimize_ivf_params(50000)
print(f"IVF参数: {params}")
6.4.2 查询优化¶
优化策略:
- 批量查询:减少网络往返
- 异步查询:提高并发性能
- 缓存查询结果:减少重复查询
- 使用近似搜索:平衡精度和速度
代码实现:
import asyncio # Python标准异步库
from functools import lru_cache
class OptimizedVectorDB(ChromaVectorDB):
"""优化的向量数据库"""
def __init__(self, collection_name: str, persist_directory: str = "./chroma_db"):
super().__init__(collection_name, persist_directory) # super()调用父类方法
self.query_cache = {}
@lru_cache(maxsize=1000)
def query_with_cache(self, query_embedding: tuple, n_results: int = 5) -> dict:
"""
带缓存的查询
Args:
query_embedding: 查询向量(转换为tuple以支持缓存)
n_results: 返回结果数量
Returns:
查询结果
"""
return self.query(list(query_embedding), n_results)
async def async_query(self, query_embedding: list, n_results: int = 5) -> dict: # async def定义协程函数
"""
异步查询
Args:
query_embedding: 查询向量
n_results: 返回结果数量
Returns:
查询结果
"""
# 这里简化处理,实际可以使用异步库
return self.query(query_embedding, n_results)
async def batch_query(self, query_embeddings: list, n_results: int = 5) -> list:
"""
批量查询
Args:
query_embeddings: 查询向量列表
n_results: 返回结果数量
Returns:
查询结果列表
"""
tasks = [
self.async_query(embedding, n_results)
for embedding in query_embeddings
]
results = await asyncio.gather(*tasks) # await等待异步操作完成
return results
# 使用示例
optimized_db = OptimizedVectorDB("documents")
# 带缓存的查询
query_embedding_tuple = tuple(query_embedding)
result = optimized_db.query_with_cache(query_embedding_tuple, n_results=2)
print("带缓存的查询结果:", result)
6.4.3 性能监控¶
监控指标:
- 查询延迟:查询响应时间
- 吞吐量:每秒查询数(QPS)
- 索引大小:索引占用空间
- 内存使用:内存占用情况
代码实现:
import time
from collections.abc import Callable
class VectorDBMonitor:
"""向量数据库监控器"""
def __init__(self, db):
self.db = db
self.query_times = []
self.query_count = 0
def monitor_query(self, query_func: Callable, *args, **kwargs): # *args收集位置参数;**kwargs收集关键字参数
"""
监控查询性能
Args:
query_func: 查询函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
查询结果
"""
start_time = time.time()
result = query_func(*args, **kwargs)
end_time = time.time()
query_time = end_time - start_time
self.query_times.append(query_time)
self.query_count += 1
return result
def get_stats(self) -> dict:
"""
获取统计信息
Returns:
统计信息
"""
if not self.query_times:
return {}
return {
"query_count": self.query_count,
"avg_query_time": np.mean(self.query_times),
"min_query_time": np.min(self.query_times),
"max_query_time": np.max(self.query_times),
"median_query_time": np.median(self.query_times),
"p95_query_time": np.percentile(self.query_times, 95),
"p99_query_time": np.percentile(self.query_times, 99)
}
# 使用示例
monitor = VectorDBMonitor(chroma_db)
# 监控查询
result = monitor.monitor_query(
chroma_db.query,
query_embedding.tolist(),
n_results=2
)
# 获取统计信息
stats = monitor.get_stats()
print("查询统计:")
for key, value in stats.items():
print(f"{key}: {value:.4f}")
6.5 练习题¶
练习题1:基础向量数据库¶
题目:使用Chroma实现一个简单的向量数据库,支持添加和查询向量。
参考答案:
import chromadb
class SimpleVectorDB:
"""简单向量数据库"""
def __init__(self, collection_name: str):
self.client = chromadb.EphemeralClient() # 内存临时存储,生产环境使用 PersistentClient
self.collection = self.client.get_or_create_collection(collection_name)
def add(self, vectors: list, documents: list, ids: list):
"""添加向量"""
self.collection.add(
embeddings=vectors,
documents=documents,
ids=ids
)
def query(self, query_vector: list, n_results: int = 5):
"""查询"""
results = self.collection.query(
query_embeddings=[query_vector],
n_results=n_results
)
return results
练习题2:相似度计算¶
题目:实现余弦相似度、欧氏距离和点积三种相似度计算方法。
参考答案:
import numpy as np
class SimilarityCalculator:
"""相似度计算器"""
@staticmethod
def cosine(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""余弦相似度"""
return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))
@staticmethod
def euclidean(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""欧氏距离"""
return float(np.linalg.norm(vec1 - vec2))
@staticmethod
def dot_product(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""点积"""
return float(np.dot(vec1, vec2))
6.6 面试准备¶
6.6.1 大厂面试题¶
字节跳动面试题:
- 问题:什么是向量数据库?它有什么优势?
参考答案: - 向量数据库是专门存储和检索向量的数据库 - 优势: - 高效的相似度搜索 - 支持高维向量 - 可扩展性强 - 适合AI应用
- 问题:向量数据库和传统数据库有什么区别?
参考答案: - 数据类型:向量 vs 结构化数据 - 查询方式:相似度搜索 vs 精确匹配 - 索引结构:向量索引 vs B树 - 适用场景:AI应用 vs 事务处理
腾讯面试题:
- 问题:如何选择合适的向量数据库?
参考答案: - 数据规模:小数据集用Chroma,大数据集用Pinecone/Milvus - 部署方式:自建用Milvus,托管用Pinecone - 成本考虑:开源免费,托管付费 - 性能要求:高性能用Pinecone,一般用Chroma
- 问题:如何优化向量数据库的性能?
参考答案: - 索引优化:选择合适的索引类型 - 查询优化:批量查询、异步查询 - 缓存优化:缓存查询结果 - 参数调优:调整索引参数
阿里巴巴面试题:
- 问题:向量嵌入有哪些常用模型?
参考答案: - OpenAI Embeddings: text-embedding-3-small - Sentence-Transformers: all-MiniLM-L6-v2 - Hugging Face Models: BERT、RoBERTa - 选择依据:语言、性能、成本
- 问题:在实际项目中如何应用向量数据库?
参考答案: - 需求分析:确定应用场景 - 数据准备:准备文档和向量 - 数据库选型:选择合适的向量数据库 - 系统设计:设计系统架构 - 实现开发:实现向量存储和检索 - 测试优化:测试和优化性能
6.6.2 面试技巧¶
技巧1:理论联系实际
结合实际项目经验,说明如何应用向量数据库解决实际问题。
技巧2:对比分析
对比不同向量数据库的优缺点,说明选择依据。
技巧3:展示思考过程
说明选择向量数据库和优化策略的思考过程。
技巧4:持续优化
说明如何通过监控和调优不断提升性能。
📝 本章小结¶
本章系统介绍了向量数据库的核心内容:
- ✅ 向量数据库概述:定义、与传统数据库对比、应用场景
- ✅ 向量嵌入技术:文本嵌入、相似度计算
- ✅ 主流向量数据库:Chroma、Pinecone、Milvus
- ✅ 向量数据库优化:索引优化、查询优化、性能监控
- ✅ 练习题:基础向量数据库、相似度计算
- ✅ 面试准备:大厂面试题和解答技巧
通过本章学习,你应该能够: - 理解向量数据库的核心原理 - 掌握向量数据库的选型方法 - 学会使用主流向量数据库 - 了解向量数据库的优化技巧 - 准备好应对大厂面试
🔗 下一步¶
下一章我们将深入学习Agent开发基础,掌握如何构建智能Agent系统。
继续学习: 07-Agent开发基础.md
💡 思考题¶
-
向量数据库的核心原理是什么?
将数据通过Embedding模型转化为高维向量存储,利用近似最近邻(ANN)算法实现语义相似度检索。核心索引算法:HNSW(高召回率,内存大)、IVF(平衡型)、PQ(省内存,精度换空间)。距离度量:余弦相似度(最常用)、欧氏距离、内积。
-
向量数据库和传统数据库有什么区别?
传统数据库(MySQL/PostgreSQL):精确匹配、结构化查询(SQL)、B+树索引。向量数据库(Milvus/Chroma):语义相似搜索、ANN索引、适合非结构化数据(文本/图像)。关键差异:传统DB查"完全相等",向量DB查"最相似"。pgvector等扩展可让传统DB支持向量检索。
-
如何选择合适的向量数据库?
原型验证→Chroma(轻量内存型);中小规模(<100万向量)→Qdrant(Rust高性能);大规模生产→Milvus(分布式,十亿级);全托管→Pinecone;已有PG→pgvector。选型维度:数据规模、延迟要求、是否需要混合查询、运维成本、开源vs托管。
-
如何优化向量数据库的性能?
①索引选择(小数据用FLAT,大数据用HNSW/IVF) ②向量维度压缩(PQ量化) ③合理分片和副本 ④批量写入而非逐条 ⑤预热索引到内存 ⑥选择合适的ef/nprobe参数平衡召回率和延迟。
-
在实际项目中如何应用向量数据库?
最典型场景:RAG知识库(文档分块→Embedding→存Milvus→语义检索→喂给LLM)。其他:推荐系统(用户/商品向量相似匹配)、图像搜索(CLIP向量)、去重(相似文档检测)。生产中关注增量更新、Embedding版本管理、监控召回率。
📚 参考资料¶
- Chroma Documentation
- Pinecone Documentation
- Milvus Documentation
- "Efficient Nearest Neighbor Search" - Andoni & Indyk
- Sentence-Transformers Documentation
最后更新日期:2026-02-12 适用版本:LLM应用指南 v2026