06-NoSQL数据库¶
非关系型数据库的世界 目标:掌握MongoDB和Redis,理解何时选择NoSQL
📋 本章概览¶
预计学习时间:3-4小时 前置章节:第01章:数据库基础概念 实践要求:安装MongoDB和Redis,完成CRUD操作
本章内容: 1. NoSQL概述 2. MongoDB文档数据库 3. Redis键值数据库 4. 其他NoSQL数据库简介 5. SQL vs NoSQL选择指南
1. NoSQL概述¶
1.1 什么是NoSQL?¶
NoSQL(Not Only SQL):非关系型数据库,不使用固定的表结构,适合处理大规模、高并发、非结构化数据。
1.2 NoSQL数据库类型¶
| 类型 | 代表产品 | 数据模型 | 适用场景 |
|---|---|---|---|
| 文档型 | MongoDB | JSON文档 | 内容管理、用户资料 |
| 键值型 | Redis | 键值对 | 缓存、会话、实时系统 |
| 列族型 | Cassandra | 列族 | 时序数据、日志 |
| 图数据库 | Neo4j | 节点和关系 | 社交网络、推荐系统 |
| 向量数据库 | Pinecone | 向量嵌入 | AI语义搜索 |
1.3 NoSQL特点¶
优点: - ✅ 灵活的数据模型 - ✅ 水平扩展容易 - ✅ 高性能读写 - ✅ 适合大数据量
缺点: - ❌ 不支持复杂事务(多数) - ❌ 不支持JOIN操作 - ❌ 一致性较弱(最终一致性)
2. MongoDB文档数据库¶
2.1 MongoDB简介¶
MongoDB是最流行的文档型数据库,使用BSON(二进制JSON)格式存储数据。
适用场景: - 数据结构多变 - 需要快速迭代 - 存储嵌套数据 - AI实验元数据
2.2 安装MongoDB¶
Docker安装(推荐):
docker run -d \
--name mongodb \
-p 27017:27017 \
-e MONGO_INITDB_ROOT_USERNAME=admin \
-e MONGO_INITDB_ROOT_PASSWORD=password \
-v mongo_data:/data/db \
mongo:7.0
本地安装:
# macOS
brew tap mongodb/brew
brew install mongodb-community
brew services start mongodb-community
# Ubuntu
sudo apt install mongodb
sudo systemctl start mongodb
2.3 MongoDB基础操作¶
// 连接数据库
mongosh "mongodb://admin:password@localhost:27017"
// 查看数据库
show dbs
// 创建/切换数据库
use mydb
// 查看当前数据库
db
// 创建集合(表)
db.createCollection("users")
// 查看集合
show collections
// 插入文档(单条)
db.users.insertOne({
name: "张三",
age: 25,
email: "zhangsan@example.com",
tags: ["AI", "Python"],
address: {
city: "北京",
zip: "100000"
}
})
// 插入多条
db.users.insertMany([
{ name: "李四", age: 30, email: "lisi@example.com" },
{ name: "王五", age: 28, email: "wangwu@example.com" }
])
// 查询所有
db.users.find()
// 条件查询
db.users.find({ age: { $gt: 25 } })
db.users.find({ name: "张三" })
// 查询数组包含
db.users.find({ tags: "AI" })
// 嵌套查询
db.users.find({ "address.city": "北京" })
// 投影(只返回特定字段)
db.users.find({}, { name: 1, email: 1, _id: 0 })
// 排序
db.users.find().sort({ age: -1 }) // -1降序,1升序
// 分页
db.users.find().skip(10).limit(5)
// 更新单条
db.users.updateOne(
{ name: "张三" },
{ $set: { age: 26 } }
)
// 更新多条
db.users.updateMany(
{ age: { $lt: 30 } },
{ $set: { status: "young" } }
)
// 删除
db.users.deleteOne({ name: "张三" })
db.users.deleteMany({ age: { $lt: 25 } })
2.4 MongoDB查询操作符¶
// 比较操作符
// $eq: 等于, $ne: 不等于
// $gt: 大于, $gte: 大于等于
// $lt: 小于, $lte: 小于等于
// $in: 在数组中, $nin: 不在数组中
db.products.find({ price: { $gte: 100, $lte: 500 } })
db.products.find({ category: { $in: ["手机", "电脑"] } })
// 逻辑操作符
// $and, $or, $not, $nor
db.products.find({
$and: [
{ price: { $gte: 100 } },
{ stock: { $gt: 0 } }
]
})
db.products.find({
$or: [
{ category: "手机" },
{ category: "平板" }
]
})
// 元素操作符
// $exists: 字段是否存在
// $type: 字段类型
db.users.find({ phone: { $exists: true } })
// 数组操作符
// $all: 包含所有元素
// $elemMatch: 匹配数组元素条件
// $size: 数组长度
db.users.find({ tags: { $all: ["AI", "Python"] } })
db.users.find({ tags: { $size: 2 } })
2.5 MongoDB索引¶
// 创建单字段索引
db.users.createIndex({ name: 1 })
// 创建复合索引
db.users.createIndex({ age: 1, name: 1 })
// 创建唯一索引
db.users.createIndex({ email: 1 }, { unique: true })
// 创建文本索引(全文搜索)
db.articles.createIndex({ content: "text" })
// 文本搜索
db.articles.find({ $text: { $search: "MongoDB tutorial" } })
// 查看索引
db.users.getIndexes()
// 删除索引
db.users.dropIndex("name_1")
2.6 Python操作MongoDB¶
from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime
# 连接数据库
client = MongoClient('mongodb://admin:password@localhost:27017/')
# 选择数据库
db = client['mydb']
# 选择集合
collection = db['users']
# 插入文档
user = {
'name': '张三',
'age': 25,
'email': 'zhangsan@example.com',
'tags': ['AI', 'Python'],
'created_at': datetime.now()
}
result = collection.insert_one(user)
print(f"插入ID: {result.inserted_id}")
# 插入多条
users = [
{'name': '李四', 'age': 30},
{'name': '王五', 'age': 28}
]
result = collection.insert_many(users)
# 查询单条
user = collection.find_one({'name': '张三'})
print(user)
# 查询多条
for user in collection.find({'age': {'$gt': 25}}):
print(user)
# 投影
for user in collection.find({}, {'name': 1, 'email': 1, '_id': 0}):
print(user)
# 排序
for user in collection.find().sort('age', DESCENDING):
print(user)
# 分页
for user in collection.find().skip(10).limit(5):
print(user)
# 更新
collection.update_one(
{'name': '张三'},
{'$set': {'age': 26}}
)
# 更新或插入(upsert)
collection.update_one(
{'name': '赵六'},
{'$set': {'age': 35}},
upsert=True
)
# 删除
collection.delete_one({'name': '张三'})
collection.delete_many({'age': {'$lt': 25}})
# 聚合管道
pipeline = [
{'$match': {'age': {'$gte': 25}}},
{'$group': {'_id': '$age', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
]
for result in collection.aggregate(pipeline):
print(result)
# 创建索引
collection.create_index([('email', ASCENDING)], unique=True)
collection.create_index([('name', TEXT)])
# 关闭连接
client.close()
2.7 AI场景:实验元数据管理¶
from pymongo import MongoClient
from datetime import datetime
client = MongoClient('mongodb://localhost:27017/')
db = client['ml_platform']
experiments = db['experiments']
# 记录实验
def log_experiment(exp_name, config, metrics, artifacts=None):
experiment = {
'exp_name': exp_name,
'config': config, # 灵活的超参数配置
'metrics': metrics, # 训练指标
'artifacts': artifacts or [], # 模型文件、日志等
'status': 'running',
'created_at': datetime.now(),
'updated_at': datetime.now()
}
return experiments.insert_one(experiment).inserted_id
# 更新实验指标
def update_metrics(exp_id, step, metrics):
experiments.update_one(
{'_id': exp_id},
{
'$push': {'history': {'step': step, **metrics}},
'$set': {'metrics': metrics, 'updated_at': datetime.now()}
}
)
# 查询最佳实验
def get_best_experiments(model_type, metric='accuracy', top_k=5):
return list(experiments.find(
{'config.model_type': model_type}
).sort(
f'metrics.{metric}', -1
).limit(top_k))
# 使用示例
exp_id = log_experiment(
'resnet50_imagenet',
{
'model_type': 'ResNet50',
'lr': 0.001,
'batch_size': 32,
'optimizer': 'Adam',
'augmentation': {
'random_crop': True,
'horizontal_flip': True
}
},
{'accuracy': 0, 'loss': 0}
)
# 训练过程中更新
for step in range(100):
# ... 训练代码 ...
update_metrics(exp_id, step, {
'accuracy': 0.95,
'loss': 0.1
})
# 查询
best = get_best_experiments('ResNet50', 'accuracy', 3)
print(best)
3. Redis键值数据库¶
3.1 Redis简介¶
Redis是内存中的键值数据库,速度极快,常用作缓存、消息队列、实时系统。
特点: - 内存存储,读写速度10万+ QPS - 支持多种数据结构 - 支持持久化 - 支持主从复制、集群
3.2 安装Redis¶
Docker安装:
本地安装:
# macOS
brew install redis
brew services start redis
# Ubuntu
sudo apt install redis-server
sudo systemctl start redis
3.3 Redis基础命令¶
# 连接Redis
redis-cli
# 字符串操作
SET name "张三"
GET name
SET counter 100
INCR counter # 101
INCRBY counter 5 # 106
DECR counter # 105
# 设置过期时间(秒)
SET temp_key "value" EX 60
TTL temp_key # 查看剩余时间
# 哈希操作
HSET user:1 name "张三" age 25 email "zhangsan@example.com"
HGET user:1 name
HGETALL user:1
HINCRBY user:1 age 1
# 列表操作
LPUSH queue "task1"
LPUSH queue "task2"
RPOP queue # "task1"
LRANGE queue 0 -1 # 查看所有
# 集合操作
SADD tags "AI" "Python" "Deep Learning"
SMEMBERS tags
SISMEMBER tags "AI"
SREM tags "Python"
# 有序集合
ZADD leaderboard 100 "player1"
ZADD leaderboard 200 "player2"
ZADD leaderboard 150 "player3"
ZRANGE leaderboard 0 -1 WITHSCORES # 按分数升序
ZREVRANGE leaderboard 0 -1 WITHSCORES # 按分数降序
ZINCRBY leaderboard 50 "player1"
# 发布订阅
SUBSCRIBE channel1
PUBLISH channel1 "Hello"
# 事务
MULTI
SET key1 "value1"
SET key2 "value2"
EXEC
# 查看所有键
KEYS *
KEYS user:*
# 删除
DEL key1
FLUSHDB # 清空当前数据库
FLUSHALL # 清空所有数据库
3.4 Python操作Redis¶
import redis
import json
from datetime import timedelta
# 连接Redis
r = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True # 自动解码为字符串
)
# 字符串操作
r.set('name', '张三')
print(r.get('name'))
# 设置过期时间
r.setex('temp_key', 60, 'value') # 60秒过期
r.set('key', 'value', ex=3600) # 1小时过期
# 序列化存储
data = {'model': 'ResNet50', 'accuracy': 0.95}
r.set('model:info', json.dumps(data))
info = json.loads(r.get('model:info'))
# 哈希操作
r.hset('user:1', mapping={
'name': '张三',
'age': 25,
'email': 'zhangsan@example.com'
})
print(r.hget('user:1', 'name'))
print(r.hgetall('user:1'))
# 列表操作(队列)
r.lpush('tasks', 'task1', 'task2', 'task3')
task = r.rpop('tasks')
print(f"处理任务: {task}")
# 列表操作(栈)
r.lpush('stack', 'item1', 'item2')
item = r.lpop('stack')
# 集合操作
r.sadd('tags', 'AI', 'Python', 'ML')
print(r.smembers('tags'))
print(r.sismember('tags', 'AI'))
# 有序集合(排行榜)
r.zadd('leaderboard', {'player1': 100, 'player2': 200})
r.zincrby('leaderboard', 50, 'player1')
top_players = r.zrevrange('leaderboard', 0, 2, withscores=True)
print(top_players)
# 管道(批量操作)
pipe = r.pipeline()
for i in range(100):
pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()
# 发布订阅
# 发布者
r.publish('channel', 'Hello Subscribers!')
# 订阅者(在另一个线程/进程)
import signal
pubsub = r.pubsub()
pubsub.subscribe('channel')
def signal_handler(sig, frame):
"""信号处理函数,优雅退出"""
pubsub.unsubscribe()
print("\n取消订阅并退出")
exit(0)
# 注册Ctrl+C信号处理
signal.signal(signal.SIGINT, signal_handler)
try: # try/except捕获异常
for message in pubsub.listen():
if message['type'] == 'message':
print(f"收到消息: {message['data']}")
except Exception as e:
print(f"订阅出错: {e}")
finally:
pubsub.close() # 确保连接被关闭
3.5 AI场景:模型服务缓存¶
import redis
import hashlib
import json
import time
class ModelCache:
def __init__(self, redis_host='localhost'):
self.r = redis.Redis(host=redis_host, port=6379, decode_responses=True)
self.default_ttl = 3600 # 1小时
def _generate_key(self, model_name, input_data):
"""生成缓存键"""
data_str = json.dumps(input_data, sort_keys=True) # json.dumps将Python对象转为JSON字符串
hash_val = hashlib.md5(f"{model_name}:{data_str}".encode()).hexdigest()
return f"pred:{model_name}:{hash_val}"
def get_prediction(self, model_name, input_data):
"""获取缓存的预测结果"""
key = self._generate_key(model_name, input_data)
cached = self.r.get(key)
if cached:
return json.loads(cached) # json.loads将JSON字符串转为Python对象
return None
def set_prediction(self, model_name, input_data, prediction, ttl=None):
"""缓存预测结果"""
key = self._generate_key(model_name, input_data)
value = json.dumps({
'prediction': prediction,
'cached_at': time.time()
})
self.r.setex(key, ttl or self.default_ttl, value)
def predict_with_cache(self, model, model_name, input_data):
"""带缓存的预测"""
# 先查缓存
cached = self.get_prediction(model_name, input_data)
if cached:
print("缓存命中")
return cached['prediction']
# 执行预测
print("执行预测")
prediction = model.predict(input_data)
# 存入缓存
self.set_prediction(model_name, input_data, prediction)
return prediction
# 特征缓存
class FeatureCache:
def __init__(self, redis_host='localhost'):
self.r = redis.Redis(host=redis_host, port=6379, decode_responses=True)
def get_features(self, entity_type, entity_id, feature_names):
"""获取实体特征"""
key = f"features:{entity_type}:{entity_id}"
features = self.r.hmget(key, feature_names)
return {k: float(v) if v else 0 for k, v in zip(feature_names, features)} # zip并行遍历多个可迭代对象
def set_features(self, entity_type, entity_id, features, ttl=300):
"""设置实体特征"""
key = f"features:{entity_type}:{entity_id}"
self.r.hset(key, mapping=features)
self.r.expire(key, ttl)
# 使用示例
cache = ModelCache()
# 模拟模型
class DummyModel:
def predict(self, data):
time.sleep(1) # 模拟耗时
return {'class': 'cat', 'confidence': 0.95}
model = DummyModel()
# 第一次预测(会执行模型)
result1 = cache.predict_with_cache(model, 'classifier', {'image': 'cat.jpg'})
print(result1)
# 第二次预测(命中缓存,瞬间返回)
result2 = cache.predict_with_cache(model, 'classifier', {'image': 'cat.jpg'})
print(result2)
4. 其他NoSQL数据库简介¶
4.1 Cassandra(列族型)¶
-- 适合时序数据、大规模写入
-- 示例:传感器数据存储
CREATE KEYSPACE iot WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': 3
};
USE iot;
CREATE TABLE sensor_data (
sensor_id text,
timestamp timestamp,
temperature double,
humidity double,
PRIMARY KEY (sensor_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);
-- 写入数据(极高吞吐量)
INSERT INTO sensor_data (sensor_id, timestamp, temperature, humidity)
VALUES ('sensor001', '2024-01-15 10:00:00', 25.5, 60.0);
-- 查询最近数据
SELECT * FROM sensor_data
WHERE sensor_id = 'sensor001'
LIMIT 100;
4.2 Neo4j(图数据库)¶
// 适合关系网络、推荐系统
// 创建节点
CREATE (p1:Person {name: '张三', age: 25})
CREATE (p2:Person {name: '李四', age: 30})
CREATE (p3:Person {name: '王五', age: 28})
// 创建关系
CREATE (p1)-[:FRIEND {since: '2020-01-01'}]->(p2)
CREATE (p2)-[:FRIEND]->(p3)
// 查询朋友的朋友
MATCH (zhangsan:Person {name: '张三'})-[:FRIEND]->()-[:FRIEND]->(fof)
RETURN fof.name
// 最短路径
MATCH p = shortestPath(
(a:Person {name: '张三'})-[:FRIEND*]-(b:Person {name: '王五'})
)
RETURN p
4.3 向量数据库¶
📌 Pinecone API 版本说明
以下示例基于 Pinecone Python SDK v7.0+(2026年最新版本)。 - v7.0+ 版本采用了全新的客户端初始化方式(
Pinecone类) - 支持更多向量索引类型和增强的搜索功能 - 安装:pip install "pinecone-client>=7.0.0"- 如需使用旧版本,请参考 Pinecone 文档
# Pinecone示例 (v7.0+ API)
from pinecone import Pinecone, ServerlessSpec
# 初始化(v3.0+ 新方式)
pc = Pinecone(api_key="your-api-key")
# 创建索引(如果尚未创建)
# v3.0+ 使用 list_indexes() 返回 IndexList 对象
existing_indexes = [idx.name for idx in pc.list_indexes()]
if "semantic-search" not in existing_indexes:
pc.create_index(
name="semantic-search",
dimension=1536, # OpenAI text-embedding-ada-002 维度
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
# 连接索引
index = pc.Index("semantic-search")
# 插入向量(upsert)
index.upsert([
("id1", [0.1, 0.2, ...], {"text": "文档1", "source": "web"}),
("id2", [0.3, 0.4, ...], {"text": "文档2", "source": "pdf"})
])
# 相似度搜索
results = index.query(
vector=[0.1, 0.2, ...],
top_k=5,
include_metadata=True
)
# 输出结果
for match in results.matches:
print(f"ID: {match.id}, Score: {match.score}, Metadata: {match.metadata}")
5. SQL vs NoSQL选择指南¶
5.1 选择决策树¶
需要复杂事务?
├── 是 → SQL(MySQL/PostgreSQL)
└── 否 → 数据结构化程度?
├── 高度结构化 → SQL
└── 半结构化/多变 → NoSQL
├── 需要复杂查询?
│ ├── 是 → MongoDB
│ └── 否 → 主要使用场景?
│ ├── 缓存/会话 → Redis
│ ├── 时序数据 → Cassandra
│ ├── 关系网络 → Neo4j
│ └── 语义搜索 → 向量数据库
└── 需要极高性能?
├── 是 → Redis
└── 否 → MongoDB
5.2 对比总结¶
| 场景 | 推荐 | 理由 |
|---|---|---|
| 电商订单系统 | SQL | 强一致性、复杂事务 |
| 用户资料管理 | MongoDB | 灵活结构、快速迭代 |
| 模型预测缓存 | Redis | 极高性能、过期机制 |
| 实验元数据 | MongoDB | JSON存储、灵活查询 |
| 实时特征存储 | Redis | 低延迟、高并发 |
| 日志/时序数据 | Cassandra | 高吞吐、时间排序 |
| 推荐系统 | Neo4j | 关系查询高效 |
| 语义搜索 | 向量DB | 相似度计算 |
5.3 混合架构示例¶
┌─────────────────────────────────────────┐
│ AI应用架构 │
├─────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │ 用户请求 │───→│ API网关 │ │
│ └─────────┘ └────┬────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ↓ ↓ ↓ │
│ ┌────────┐ ┌────────┐ ┌────────┐│
│ │ Redis │ │MongoDB │ │PostgreSQL│
│ │ 缓存 │ │ 元数据 │ │ 核心业务 │
│ └────────┘ └────────┘ └────────┘│
│ │ │ │ │
│ └────────────┴────────────┘ │
│ │ │
│ ↓ │
│ ┌──────────┐ │
│ │ 模型服务 │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────┘
🎯 本章自测¶
MongoDB练习¶
-
安装MongoDB,创建数据库和集合
-
设计一个博客系统的文档结构:
- 文章(标题、内容、作者、标签、评论)
- 实现文章的CRUD操作
-
实现按标签查询文章
-
使用聚合管道统计:
- 每个作者的文章数量
- 每篇文章的评论数
Redis练习¶
- 实现一个简单的任务队列:
- 生产者添加任务
- 消费者处理任务
-
支持任务优先级
-
实现API限流器:
- 限制每用户每分钟请求次数
- 使用Redis计数
综合练习¶
- 设计一个AI模型服务的缓存架构:
- 使用Redis缓存预测结果
- 使用MongoDB存储请求日志
- 实现缓存失效策略
📚 扩展阅读¶
推荐资源¶
下一步¶
完成本章后,继续学习 第07章:数据库优化与调优,掌握数据库性能优化技巧!
本章完 | 预计学习时间:3-4小时