跳转至

06-NoSQL数据库

NoSQL数据库

非关系型数据库的世界 目标:掌握MongoDB和Redis,理解何时选择NoSQL


📋 本章概览

预计学习时间:3-4小时 前置章节第01章:数据库基础概念 实践要求:安装MongoDB和Redis,完成CRUD操作

本章内容: 1. NoSQL概述 2. MongoDB文档数据库 3. Redis键值数据库 4. 其他NoSQL数据库简介 5. SQL vs NoSQL选择指南


1. NoSQL概述

1.1 什么是NoSQL?

NoSQL(Not Only SQL):非关系型数据库,不使用固定的表结构,适合处理大规模、高并发、非结构化数据。

1.2 NoSQL数据库类型

类型 代表产品 数据模型 适用场景
文档型 MongoDB JSON文档 内容管理、用户资料
键值型 Redis 键值对 缓存、会话、实时系统
列族型 Cassandra 列族 时序数据、日志
图数据库 Neo4j 节点和关系 社交网络、推荐系统
向量数据库 Pinecone 向量嵌入 AI语义搜索

1.3 NoSQL特点

优点: - ✅ 灵活的数据模型 - ✅ 水平扩展容易 - ✅ 高性能读写 - ✅ 适合大数据量

缺点: - ❌ 不支持复杂事务(多数) - ❌ 不支持JOIN操作 - ❌ 一致性较弱(最终一致性)


2. MongoDB文档数据库

2.1 MongoDB简介

MongoDB是最流行的文档型数据库,使用BSON(二进制JSON)格式存储数据。

适用场景: - 数据结构多变 - 需要快速迭代 - 存储嵌套数据 - AI实验元数据

2.2 安装MongoDB

Docker安装(推荐)

Bash
docker run -d \
  --name mongodb \
  -p 27017:27017 \
  -e MONGO_INITDB_ROOT_USERNAME=admin \
  -e MONGO_INITDB_ROOT_PASSWORD=password \
  -v mongo_data:/data/db \
  mongo:7.0

本地安装

Bash
# macOS
brew tap mongodb/brew
brew install mongodb-community
brew services start mongodb-community

# Ubuntu
sudo apt install mongodb
sudo systemctl start mongodb

2.3 MongoDB基础操作

JavaScript
// 连接数据库
mongosh "mongodb://admin:password@localhost:27017"

// 查看数据库
show dbs

// 创建/切换数据库
use mydb

// 查看当前数据库
db

// 创建集合(表)
db.createCollection("users")

// 查看集合
show collections

// 插入文档(单条)
db.users.insertOne({
    name: "张三",
    age: 25,
    email: "zhangsan@example.com",
    tags: ["AI", "Python"],
    address: {
        city: "北京",
        zip: "100000"
    }
})

// 插入多条
db.users.insertMany([
    { name: "李四", age: 30, email: "lisi@example.com" },
    { name: "王五", age: 28, email: "wangwu@example.com" }
])

// 查询所有
db.users.find()

// 条件查询
db.users.find({ age: { $gt: 25 } })
db.users.find({ name: "张三" })

// 查询数组包含
db.users.find({ tags: "AI" })

// 嵌套查询
db.users.find({ "address.city": "北京" })

// 投影(只返回特定字段)
db.users.find({}, { name: 1, email: 1, _id: 0 })

// 排序
db.users.find().sort({ age: -1 })  // -1降序,1升序

// 分页
db.users.find().skip(10).limit(5)

// 更新单条
db.users.updateOne(
    { name: "张三" },
    { $set: { age: 26 } }
)

// 更新多条
db.users.updateMany(
    { age: { $lt: 30 } },
    { $set: { status: "young" } }
)

// 删除
db.users.deleteOne({ name: "张三" })
db.users.deleteMany({ age: { $lt: 25 } })

2.4 MongoDB查询操作符

JavaScript
// 比较操作符
// $eq: 等于, $ne: 不等于
// $gt: 大于, $gte: 大于等于
// $lt: 小于, $lte: 小于等于
// $in: 在数组中, $nin: 不在数组中

db.products.find({ price: { $gte: 100, $lte: 500 } })
db.products.find({ category: { $in: ["手机", "电脑"] } })

// 逻辑操作符
// $and, $or, $not, $nor

db.products.find({
    $and: [
        { price: { $gte: 100 } },
        { stock: { $gt: 0 } }
    ]
})

db.products.find({
    $or: [
        { category: "手机" },
        { category: "平板" }
    ]
})

// 元素操作符
// $exists: 字段是否存在
// $type: 字段类型

db.users.find({ phone: { $exists: true } })

// 数组操作符
// $all: 包含所有元素
// $elemMatch: 匹配数组元素条件
// $size: 数组长度

db.users.find({ tags: { $all: ["AI", "Python"] } })
db.users.find({ tags: { $size: 2 } })

2.5 MongoDB索引

JavaScript
// 创建单字段索引
db.users.createIndex({ name: 1 })

// 创建复合索引
db.users.createIndex({ age: 1, name: 1 })

// 创建唯一索引
db.users.createIndex({ email: 1 }, { unique: true })

// 创建文本索引(全文搜索)
db.articles.createIndex({ content: "text" })

// 文本搜索
db.articles.find({ $text: { $search: "MongoDB tutorial" } })

// 查看索引
db.users.getIndexes()

// 删除索引
db.users.dropIndex("name_1")

2.6 Python操作MongoDB

Python
from pymongo import MongoClient, ASCENDING, DESCENDING
from datetime import datetime

# 连接数据库
client = MongoClient('mongodb://admin:password@localhost:27017/')

# 选择数据库
db = client['mydb']

# 选择集合
collection = db['users']

# 插入文档
user = {
    'name': '张三',
    'age': 25,
    'email': 'zhangsan@example.com',
    'tags': ['AI', 'Python'],
    'created_at': datetime.now()
}
result = collection.insert_one(user)
print(f"插入ID: {result.inserted_id}")

# 插入多条
users = [
    {'name': '李四', 'age': 30},
    {'name': '王五', 'age': 28}
]
result = collection.insert_many(users)

# 查询单条
user = collection.find_one({'name': '张三'})
print(user)

# 查询多条
for user in collection.find({'age': {'$gt': 25}}):
    print(user)

# 投影
for user in collection.find({}, {'name': 1, 'email': 1, '_id': 0}):
    print(user)

# 排序
for user in collection.find().sort('age', DESCENDING):
    print(user)

# 分页
for user in collection.find().skip(10).limit(5):
    print(user)

# 更新
collection.update_one(
    {'name': '张三'},
    {'$set': {'age': 26}}
)

# 更新或插入(upsert)
collection.update_one(
    {'name': '赵六'},
    {'$set': {'age': 35}},
    upsert=True
)

# 删除
collection.delete_one({'name': '张三'})
collection.delete_many({'age': {'$lt': 25}})

# 聚合管道
pipeline = [
    {'$match': {'age': {'$gte': 25}}},
    {'$group': {'_id': '$age', 'count': {'$sum': 1}}},
    {'$sort': {'count': -1}}
]
for result in collection.aggregate(pipeline):
    print(result)

# 创建索引
collection.create_index([('email', ASCENDING)], unique=True)
collection.create_index([('name', TEXT)])

# 关闭连接
client.close()

2.7 AI场景:实验元数据管理

Python
from pymongo import MongoClient
from datetime import datetime

client = MongoClient('mongodb://localhost:27017/')
db = client['ml_platform']
experiments = db['experiments']

# 记录实验
def log_experiment(exp_name, config, metrics, artifacts=None):
    experiment = {
        'exp_name': exp_name,
        'config': config,  # 灵活的超参数配置
        'metrics': metrics,  # 训练指标
        'artifacts': artifacts or [],  # 模型文件、日志等
        'status': 'running',
        'created_at': datetime.now(),
        'updated_at': datetime.now()
    }
    return experiments.insert_one(experiment).inserted_id

# 更新实验指标
def update_metrics(exp_id, step, metrics):
    experiments.update_one(
        {'_id': exp_id},
        {
            '$push': {'history': {'step': step, **metrics}},
            '$set': {'metrics': metrics, 'updated_at': datetime.now()}
        }
    )

# 查询最佳实验
def get_best_experiments(model_type, metric='accuracy', top_k=5):
    return list(experiments.find(
        {'config.model_type': model_type}
    ).sort(
        f'metrics.{metric}', -1
    ).limit(top_k))

# 使用示例
exp_id = log_experiment(
    'resnet50_imagenet',
    {
        'model_type': 'ResNet50',
        'lr': 0.001,
        'batch_size': 32,
        'optimizer': 'Adam',
        'augmentation': {
            'random_crop': True,
            'horizontal_flip': True
        }
    },
    {'accuracy': 0, 'loss': 0}
)

# 训练过程中更新
for step in range(100):
    # ... 训练代码 ...
    update_metrics(exp_id, step, {
        'accuracy': 0.95,
        'loss': 0.1
    })

# 查询
best = get_best_experiments('ResNet50', 'accuracy', 3)
print(best)

3. Redis键值数据库

3.1 Redis简介

Redis是内存中的键值数据库,速度极快,常用作缓存、消息队列、实时系统。

特点: - 内存存储,读写速度10万+ QPS - 支持多种数据结构 - 支持持久化 - 支持主从复制、集群

3.2 安装Redis

Docker安装

Bash
docker run -d \
  --name redis \
  -p 6379:6379 \
  -v redis_data:/data \
  redis:7-alpine

本地安装

Bash
# macOS
brew install redis
brew services start redis

# Ubuntu
sudo apt install redis-server
sudo systemctl start redis

3.3 Redis基础命令

Bash
# 连接Redis
redis-cli

# 字符串操作
SET name "张三"
GET name
SET counter 100
INCR counter        # 101
INCRBY counter 5    # 106
DECR counter        # 105

# 设置过期时间(秒)
SET temp_key "value" EX 60
TTL temp_key        # 查看剩余时间

# 哈希操作
HSET user:1 name "张三" age 25 email "zhangsan@example.com"
HGET user:1 name
HGETALL user:1
HINCRBY user:1 age 1

# 列表操作
LPUSH queue "task1"
LPUSH queue "task2"
RPOP queue          # "task1"
LRANGE queue 0 -1   # 查看所有

# 集合操作
SADD tags "AI" "Python" "Deep Learning"
SMEMBERS tags
SISMEMBER tags "AI"
SREM tags "Python"

# 有序集合
ZADD leaderboard 100 "player1"
ZADD leaderboard 200 "player2"
ZADD leaderboard 150 "player3"
ZRANGE leaderboard 0 -1 WITHSCORES  # 按分数升序
ZREVRANGE leaderboard 0 -1 WITHSCORES  # 按分数降序
ZINCRBY leaderboard 50 "player1"

# 发布订阅
SUBSCRIBE channel1
PUBLISH channel1 "Hello"

# 事务
MULTI
SET key1 "value1"
SET key2 "value2"
EXEC

# 查看所有键
KEYS *
KEYS user:*

# 删除
DEL key1
FLUSHDB             # 清空当前数据库
FLUSHALL            # 清空所有数据库

3.4 Python操作Redis

Python
import redis
import json
from datetime import timedelta

# 连接Redis
r = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    decode_responses=True  # 自动解码为字符串
)

# 字符串操作
r.set('name', '张三')
print(r.get('name'))

# 设置过期时间
r.setex('temp_key', 60, 'value')  # 60秒过期
r.set('key', 'value', ex=3600)    # 1小时过期

# 序列化存储
data = {'model': 'ResNet50', 'accuracy': 0.95}
r.set('model:info', json.dumps(data))
info = json.loads(r.get('model:info'))

# 哈希操作
r.hset('user:1', mapping={
    'name': '张三',
    'age': 25,
    'email': 'zhangsan@example.com'
})
print(r.hget('user:1', 'name'))
print(r.hgetall('user:1'))

# 列表操作(队列)
r.lpush('tasks', 'task1', 'task2', 'task3')
task = r.rpop('tasks')
print(f"处理任务: {task}")

# 列表操作(栈)
r.lpush('stack', 'item1', 'item2')
item = r.lpop('stack')

# 集合操作
r.sadd('tags', 'AI', 'Python', 'ML')
print(r.smembers('tags'))
print(r.sismember('tags', 'AI'))

# 有序集合(排行榜)
r.zadd('leaderboard', {'player1': 100, 'player2': 200})
r.zincrby('leaderboard', 50, 'player1')
top_players = r.zrevrange('leaderboard', 0, 2, withscores=True)
print(top_players)

# 管道(批量操作)
pipe = r.pipeline()
for i in range(100):
    pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()

# 发布订阅
# 发布者
r.publish('channel', 'Hello Subscribers!')

# 订阅者(在另一个线程/进程)
import signal

pubsub = r.pubsub()
pubsub.subscribe('channel')

def signal_handler(sig, frame):
    """信号处理函数,优雅退出"""
    pubsub.unsubscribe()
    print("\n取消订阅并退出")
    exit(0)

# 注册Ctrl+C信号处理
signal.signal(signal.SIGINT, signal_handler)

try:  # try/except捕获异常
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"收到消息: {message['data']}")
except Exception as e:
    print(f"订阅出错: {e}")
finally:
    pubsub.close()  # 确保连接被关闭

3.5 AI场景:模型服务缓存

Python
import redis
import hashlib
import json
import time

class ModelCache:
    def __init__(self, redis_host='localhost'):
        self.r = redis.Redis(host=redis_host, port=6379, decode_responses=True)
        self.default_ttl = 3600  # 1小时

    def _generate_key(self, model_name, input_data):
        """生成缓存键"""
        data_str = json.dumps(input_data, sort_keys=True)  # json.dumps将Python对象转为JSON字符串
        hash_val = hashlib.md5(f"{model_name}:{data_str}".encode()).hexdigest()
        return f"pred:{model_name}:{hash_val}"

    def get_prediction(self, model_name, input_data):
        """获取缓存的预测结果"""
        key = self._generate_key(model_name, input_data)
        cached = self.r.get(key)
        if cached:
            return json.loads(cached)  # json.loads将JSON字符串转为Python对象
        return None

    def set_prediction(self, model_name, input_data, prediction, ttl=None):
        """缓存预测结果"""
        key = self._generate_key(model_name, input_data)
        value = json.dumps({
            'prediction': prediction,
            'cached_at': time.time()
        })
        self.r.setex(key, ttl or self.default_ttl, value)

    def predict_with_cache(self, model, model_name, input_data):
        """带缓存的预测"""
        # 先查缓存
        cached = self.get_prediction(model_name, input_data)
        if cached:
            print("缓存命中")
            return cached['prediction']

        # 执行预测
        print("执行预测")
        prediction = model.predict(input_data)

        # 存入缓存
        self.set_prediction(model_name, input_data, prediction)

        return prediction

# 特征缓存
class FeatureCache:
    def __init__(self, redis_host='localhost'):
        self.r = redis.Redis(host=redis_host, port=6379, decode_responses=True)

    def get_features(self, entity_type, entity_id, feature_names):
        """获取实体特征"""
        key = f"features:{entity_type}:{entity_id}"
        features = self.r.hmget(key, feature_names)
        return {k: float(v) if v else 0 for k, v in zip(feature_names, features)}  # zip并行遍历多个可迭代对象

    def set_features(self, entity_type, entity_id, features, ttl=300):
        """设置实体特征"""
        key = f"features:{entity_type}:{entity_id}"
        self.r.hset(key, mapping=features)
        self.r.expire(key, ttl)

# 使用示例
cache = ModelCache()

# 模拟模型
class DummyModel:
    def predict(self, data):
        time.sleep(1)  # 模拟耗时
        return {'class': 'cat', 'confidence': 0.95}

model = DummyModel()

# 第一次预测(会执行模型)
result1 = cache.predict_with_cache(model, 'classifier', {'image': 'cat.jpg'})
print(result1)

# 第二次预测(命中缓存,瞬间返回)
result2 = cache.predict_with_cache(model, 'classifier', {'image': 'cat.jpg'})
print(result2)

4. 其他NoSQL数据库简介

4.1 Cassandra(列族型)

SQL
-- 适合时序数据、大规模写入
-- 示例:传感器数据存储

CREATE KEYSPACE iot WITH replication = {
    'class': 'SimpleStrategy',
    'replication_factor': 3
};

USE iot;

CREATE TABLE sensor_data (
    sensor_id text,
    timestamp timestamp,
    temperature double,
    humidity double,
    PRIMARY KEY (sensor_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);

-- 写入数据(极高吞吐量)
INSERT INTO sensor_data (sensor_id, timestamp, temperature, humidity)
VALUES ('sensor001', '2024-01-15 10:00:00', 25.5, 60.0);

-- 查询最近数据
SELECT * FROM sensor_data
WHERE sensor_id = 'sensor001'
LIMIT 100;

4.2 Neo4j(图数据库)

Cypher
// 适合关系网络、推荐系统

// 创建节点
CREATE (p1:Person {name: '张三', age: 25})
CREATE (p2:Person {name: '李四', age: 30})
CREATE (p3:Person {name: '王五', age: 28})

// 创建关系
CREATE (p1)-[:FRIEND {since: '2020-01-01'}]->(p2)
CREATE (p2)-[:FRIEND]->(p3)

// 查询朋友的朋友
MATCH (zhangsan:Person {name: '张三'})-[:FRIEND]->()-[:FRIEND]->(fof)
RETURN fof.name

// 最短路径
MATCH p = shortestPath(
    (a:Person {name: '张三'})-[:FRIEND*]-(b:Person {name: '王五'})
)
RETURN p

4.3 向量数据库

📌 Pinecone API 版本说明

以下示例基于 Pinecone Python SDK v7.0+(2026年最新版本)。 - v7.0+ 版本采用了全新的客户端初始化方式(Pinecone 类) - 支持更多向量索引类型和增强的搜索功能 - 安装:pip install "pinecone-client>=7.0.0" - 如需使用旧版本,请参考 Pinecone 文档

Python
# Pinecone示例 (v7.0+ API)
from pinecone import Pinecone, ServerlessSpec

# 初始化(v3.0+ 新方式)
pc = Pinecone(api_key="your-api-key")

# 创建索引(如果尚未创建)
# v3.0+ 使用 list_indexes() 返回 IndexList 对象
existing_indexes = [idx.name for idx in pc.list_indexes()]
if "semantic-search" not in existing_indexes:
    pc.create_index(
        name="semantic-search",
        dimension=1536,  # OpenAI text-embedding-ada-002 维度
        metric="cosine",
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )

# 连接索引
index = pc.Index("semantic-search")

# 插入向量(upsert)
index.upsert([
    ("id1", [0.1, 0.2, ...], {"text": "文档1", "source": "web"}),
    ("id2", [0.3, 0.4, ...], {"text": "文档2", "source": "pdf"})
])

# 相似度搜索
results = index.query(
    vector=[0.1, 0.2, ...],
    top_k=5,
    include_metadata=True
)

# 输出结果
for match in results.matches:
    print(f"ID: {match.id}, Score: {match.score}, Metadata: {match.metadata}")

5. SQL vs NoSQL选择指南

5.1 选择决策树

Text Only
需要复杂事务?
├── 是 → SQL(MySQL/PostgreSQL)
└── 否 → 数据结构化程度?
    ├── 高度结构化 → SQL
    └── 半结构化/多变 → NoSQL
        ├── 需要复杂查询?
        │   ├── 是 → MongoDB
        │   └── 否 → 主要使用场景?
        │       ├── 缓存/会话 → Redis
        │       ├── 时序数据 → Cassandra
        │       ├── 关系网络 → Neo4j
        │       └── 语义搜索 → 向量数据库
        └── 需要极高性能?
            ├── 是 → Redis
            └── 否 → MongoDB

5.2 对比总结

场景 推荐 理由
电商订单系统 SQL 强一致性、复杂事务
用户资料管理 MongoDB 灵活结构、快速迭代
模型预测缓存 Redis 极高性能、过期机制
实验元数据 MongoDB JSON存储、灵活查询
实时特征存储 Redis 低延迟、高并发
日志/时序数据 Cassandra 高吞吐、时间排序
推荐系统 Neo4j 关系查询高效
语义搜索 向量DB 相似度计算

5.3 混合架构示例

Text Only
┌─────────────────────────────────────────┐
│              AI应用架构                  │
├─────────────────────────────────────────┤
│                                         │
│  ┌─────────┐    ┌─────────┐            │
│  │ 用户请求 │───→│ API网关 │            │
│  └─────────┘    └────┬────┘            │
│                      │                  │
│         ┌────────────┼────────────┐    │
│         ↓            ↓            ↓    │
│    ┌────────┐   ┌────────┐   ┌────────┐│
│    │ Redis  │   │MongoDB │   │PostgreSQL│
│    │ 缓存   │   │ 元数据 │   │ 核心业务 │
│    └────────┘   └────────┘   └────────┘│
│         │            │            │    │
│         └────────────┴────────────┘    │
│                      │                  │
│                      ↓                  │
│               ┌──────────┐             │
│               │ 模型服务 │             │
│               └──────────┘             │
│                                         │
└─────────────────────────────────────────┘

🎯 本章自测

MongoDB练习

  1. 安装MongoDB,创建数据库和集合

  2. 设计一个博客系统的文档结构

  3. 文章(标题、内容、作者、标签、评论)
  4. 实现文章的CRUD操作
  5. 实现按标签查询文章

  6. 使用聚合管道统计

  7. 每个作者的文章数量
  8. 每篇文章的评论数

Redis练习

  1. 实现一个简单的任务队列
  2. 生产者添加任务
  3. 消费者处理任务
  4. 支持任务优先级

  5. 实现API限流器

  6. 限制每用户每分钟请求次数
  7. 使用Redis计数

综合练习

  1. 设计一个AI模型服务的缓存架构
  2. 使用Redis缓存预测结果
  3. 使用MongoDB存储请求日志
  4. 实现缓存失效策略

📚 扩展阅读

推荐资源

下一步

完成本章后,继续学习 第07章:数据库优化与调优,掌握数据库性能优化技巧!


本章完 | 预计学习时间:3-4小时