跳转至

07-数据库优化与调优

数据库优化与调优

📋 本章概览

数据库优化是提升应用性能的关键技能。本章将系统学习从SQL语句到系统配置的全面优化方法,特别针对AI/ML场景下的大数据量、高并发查询需求。

学习目标: - 掌握SQL查询优化技巧 - 理解索引原理与优化策略 - 学会使用性能分析工具 - 掌握数据库配置调优方法 - 了解AI场景下的特殊优化需求

预计学习时间: 6-8小时

前置章节: 第06章:NoSQL数据库


1. 性能优化的整体思路

1.1 性能瓶颈的常见位置

Text Only
┌─────────────────────────────────────────────────────────────┐
│                    数据库性能瓶颈分析                         │
├─────────────────────────────────────────────────────────────┤
│  应用层  │  连接池配置、ORM使用、事务管理                      │
├─────────┼───────────────────────────────────────────────────┤
│  SQL层   │  查询语句、索引使用、锁竞争                         │
├─────────┼───────────────────────────────────────────────────┤
│  存储层  │  磁盘I/O、内存缓冲、日志写入                        │
├─────────┼───────────────────────────────────────────────────┤
│  硬件层  │  CPU、内存、磁盘、网络                             │
└─────────────────────────────────────────────────────────────┘

1.2 优化流程

Python
# 数据库优化标准流程
class DatabaseOptimization:
    """
    数据库优化流程
    """
    def optimization_workflow(self):
        steps = [
            "1. 监控与诊断:收集性能指标,定位瓶颈",
            "2. 分析原因:使用EXPLAIN、慢查询日志等工具",
            "3. 制定方案:索引优化、SQL改写、配置调整",
            "4. 实施优化:逐步应用优化措施",
            "5. 验证效果:对比优化前后的性能指标",
            "6. 持续监控:建立长期监控机制"
        ]
        return steps

2. SQL查询优化

2.1 使用EXPLAIN分析查询

SQL
-- 基础EXPLAIN使用
EXPLAIN SELECT * FROM users WHERE age > 25;  -- EXPLAIN查看查询执行计划

-- 详细分析(MySQL 8.0+)
EXPLAIN ANALYZE SELECT * FROM users WHERE age > 25;

-- 格式化输出(PostgreSQL)
EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
SELECT * FROM users WHERE age > 25;

EXPLAIN输出关键字段解读:

字段 含义 优化建议
type 访问类型 优先保证range以上,避免ALL(全表扫描)
key 使用的索引 检查是否使用了预期索引
rows 扫描行数 数值越小越好
Extra 额外信息 关注Using filesort、Using temporary

2.2 常见低效SQL及优化

SQL
-- ❌ 低效:SELECT *
SELECT * FROM users WHERE age > 25;

-- ✅ 优化:只查询需要的列
SELECT user_id, username, email FROM users WHERE age > 25;

-- ❌ 低效:在索引列上使用函数
SELECT * FROM users WHERE YEAR(created_at) = 2024;

-- ✅ 优化:避免函数操作
SELECT * FROM users
WHERE created_at >= '2024-01-01'
  AND created_at < '2025-01-01';

-- ❌ 低效:隐式类型转换
SELECT * FROM users WHERE phone = 13800138000;

-- ✅ 优化:保持类型一致
SELECT * FROM users WHERE phone = '13800138000';

-- ❌ 低效:OR条件导致索引失效
SELECT * FROM users
WHERE age = 25 OR email = 'test@example.com';

-- ✅ 优化:使用UNION
SELECT * FROM users WHERE age = 25
UNION ALL
SELECT * FROM users WHERE email = 'test@example.com';

-- ❌ 低效:NOT IN 子查询
SELECT * FROM orders
WHERE user_id NOT IN (SELECT user_id FROM blacklist);  -- 子查询:嵌套在另一个查询中

-- ✅ 优化:使用LEFT JOIN
SELECT o.* FROM orders o
LEFT JOIN blacklist b ON o.user_id = b.user_id
WHERE b.user_id IS NULL;

2.3 分页查询优化

SQL
-- ❌ 低效:深分页
SELECT * FROM users
ORDER BY created_at DESC
LIMIT 1000000, 10;

-- ✅ 优化1:使用覆盖索引
SELECT * FROM users u
JOIN (
    SELECT user_id FROM users
    ORDER BY created_at DESC
    LIMIT 1000000, 10
) tmp ON u.user_id = tmp.user_id;

-- ✅ 优化2:使用游标/书签分页
SELECT * FROM users
WHERE created_at < '2024-01-01 12:00:00'
ORDER BY created_at DESC
LIMIT 10;

-- ✅ 优化3:使用延迟关联(AI场景常用)
SELECT * FROM model_predictions p
JOIN (
    SELECT prediction_id
    FROM model_predictions
    WHERE model_version = 'v2.0'
    ORDER BY confidence DESC
    LIMIT 1000
) tmp ON p.prediction_id = tmp.prediction_id;

3. 索引优化策略

3.1 索引设计原则

SQL
-- 1. 为WHERE、JOIN、ORDER BY、GROUP BY的列创建索引
CREATE INDEX idx_user_age ON users(age);
CREATE INDEX idx_order_user ON orders(user_id);
CREATE INDEX idx_product_category ON products(category_id, created_at);

-- 2. 复合索引的最左前缀原则
CREATE INDEX idx_name_age_city ON users(name, age, city);
-- 有效:WHERE name = '张三'
-- 有效:WHERE name = '张三' AND age = 25
-- 有效:WHERE name = '张三' AND age = 25 AND city = '北京'
-- 无效:WHERE age = 25(缺少最左列)
-- 部分有效:WHERE name = '张三' AND city = '北京'(age被跳过)

-- 3. 选择性高的列放在前面
-- 性别选择性低(只有男女),用户名选择性高
CREATE INDEX idx_user_name_gender ON users(username, gender);

-- 4. 避免冗余索引
CREATE INDEX idx_a ON table(col1, col2);
CREATE INDEX idx_b ON table(col1);  -- 冗余,被idx_a覆盖

3.2 索引维护

SQL
-- MySQL索引维护
-- 查看索引使用情况
SELECT
    TABLE_NAME,
    INDEX_NAME,
    CARDINALITY,
    TABLE_ROWS,
    CARDINALITY / TABLE_ROWS AS selectivity
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = 'your_database';

-- 分析表(更新统计信息)
ANALYZE TABLE users;

-- 优化表(回收空间)
OPTIMIZE TABLE users;

-- 删除未使用的索引(需要开启performance_schema)
SELECT
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_FETCH,
    COUNT_INSERT,
    COUNT_UPDATE,
    COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
  AND COUNT_FETCH = 0
ORDER BY OBJECT_SCHEMA, OBJECT_NAME;

-- PostgreSQL索引维护
-- 查看索引大小和使用情况
SELECT
    schemaname,
    tablename,
    indexname,
    pg_size_pretty(pg_relation_size(indexrelid)) as index_size,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY pg_relation_size(indexrelid) DESC;

-- 重建索引
REINDEX INDEX idx_users_email;

-- 清理表和索引
VACUUM ANALYZE users;

3.3 AI场景专用索引策略

SQL
-- 1. 时间序列数据索引(模型训练日志)
CREATE INDEX idx_training_logs_time  -- INDEX索引加速查询
ON training_logs(model_id, created_at DESC);

-- 2. 高维特征向量索引(使用pgvector扩展)
-- 安装:CREATE EXTENSION vector;
CREATE INDEX idx_embeddings_vector
ON embeddings USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);  -- CTE公共表表达式:临时命名结果集

-- 3. JSONB索引(灵活的特征存储)
CREATE INDEX idx_model_config_params
ON model_configs USING GIN (hyperparameters);

-- 4. 部分索引(只索引活跃数据)
CREATE INDEX idx_active_experiments
ON experiments(created_at)
WHERE status = 'running';

-- 5. 函数索引(预计算常用查询)
CREATE INDEX idx_predictions_confidence_rounded
ON predictions(ROUND(confidence::numeric, 2));

4. 数据库配置优化

4.1 MySQL关键配置参数

INI
# my.cnf 关键配置
[mysqld]
# ===== 内存配置 =====
# InnoDB缓冲池大小(建议物理内存的50-75%)
innodb_buffer_pool_size = 4G

# 每个连接的缓冲区
sort_buffer_size = 2M
read_buffer_size = 2M
read_rnd_buffer_size = 4M
join_buffer_size = 4M

# ===== InnoDB配置 =====
# 日志文件大小(影响恢复速度和写入性能)
innodb_log_file_size = 512M
innodb_log_buffer_size = 64M

# 刷新策略(权衡性能和持久性)
# 0: 每秒刷新,性能最好,可能丢1秒数据
# 1: 每次提交刷新,最安全,性能最差
# 2: 每次提交写入OS缓存,每秒刷新,折中
innodb_flush_log_at_trx_commit = 2

# 刷新方式
innodb_flush_method = O_DIRECT

# I/O线程数
innodb_read_io_threads = 8
innodb_write_io_threads = 8

# ===== 连接配置 =====
max_connections = 500
max_user_connections = 450
wait_timeout = 600
interactive_timeout = 600

# ===== 查询缓存(⚠️ MySQL 8.0+ 已完全移除此功能) =====
#
# 🔴 重要警告:以下配置仅适用于 MySQL 5.7 及更早版本!
#
# MySQL 8.0+ 已彻底移除查询缓存功能,原因包括:
# - 在高并发场景下成为性能瓶颈(全局锁竞争)
# - 查询缓存命中率通常很低
# - 每次数据修改都需要失效相关缓存,开销大
#
# 在 MySQL 8.0+ 配置文件中使用以下参数会导致 MySQL 启动失败!
# 如果你正在使用 MySQL 8.0+,请跳过此配置块。
#
# === 以下配置仅适用于 MySQL 5.7 及更早版本 ===
# query_cache_type = 1
# query_cache_size = 256M
# query_cache_limit = 2M
# === MySQL 5.7 配置结束 ===
#
# 📌 MySQL 8.0+ 替代方案:
# 1. 应用层缓存:使用 Redis/Memcached 缓存热点查询结果
# 2. Buffer Pool 优化:增大 innodb_buffer_pool_size(上面已配置)
# 3. 查询优化:通过索引优化、SQL改写提升查询效率
# 4. 只读副本:将读请求分流到只读副本,减轻主库压力

# ===== 日志配置 =====
slow_query_log = ON
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1  # 记录超过1秒的查询
log_queries_not_using_indexes = ON

4.2 PostgreSQL关键配置参数

INI
# postgresql.conf 关键配置

# ===== 内存配置 =====
# 共享缓冲区(建议物理内存的25-40%)
shared_buffers = 2GB

# 有效缓存大小(操作系统缓存+shared_buffers)
effective_cache_size = 6GB

# 工作内存(排序、哈希操作使用)
work_mem = 64MB

# 维护工作内存(VACUUM、CREATE INDEX等)
maintenance_work_mem = 512MB

# ===== WAL配置 =====
# WAL缓冲区
wal_buffers = 16MB

# 检查点间隔
checkpoint_timeout = 10min
max_wal_size = 2GB
min_wal_size = 512MB

# 检查点完成目标(平滑I/O)
checkpoint_completion_target = 0.9

# ===== 并发配置 =====
max_connections = 200
max_worker_processes = 8
max_parallel_workers_per_gather = 4
max_parallel_workers = 8

# ===== 查询规划器 =====
random_page_cost = 1.1  # SSD设置为1.1,HDD保持4.0
effective_io_concurrency = 200  # SSD可以设置更高

# ===== 日志配置 =====
logging_collector = on
log_directory = 'log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_min_duration_statement = 1000  # 记录超过1秒的查询
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
log_checkpoints = on
log_connections = on
log_disconnections = on
log_lock_waits = on

5. 连接池优化

行业最佳实践:HikariCP是Java生态中性能最好的连接池,SpringBoot 2.0+已将其作为默认连接池。其核心优势包括: - 更快的连接获取速度(使用CAS无锁算法) - 更小的内存占用 - 更准确的连接超时检测 - 更好的并发性能

5.1 连接池原理

Text Only
┌─────────────────────────────────────────────────────────────┐
│                      连接池工作原理                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   应用请求连接 ──→ 连接池 ──→ 空闲连接? ──→ 直接返回        │
│                      ↑    └─→ 无空闲 ──→ 创建新连接         │
│                      │              └─→ 达到最大? ──→ 等待  │
│                      │                                     │
│   应用释放连接 ──────┘                                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

连接池关键参数说明: - pool_size:常驻连接数,建议设置为 (CPU核心数 × 2) + 有效磁盘数 - max_overflow:最大额外连接,建议为pool_size的50%-100% - pool_timeout:获取连接超时时间,建议10-30秒 - pool_recycle:连接回收时间,防止数据库端超时,建议1小时 - pool_pre_ping:连接前检测,避免使用已断开的连接

5.2 Python连接池配置

Python
# SQLAlchemy连接池配置
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool

# 生产环境推荐配置
engine = create_engine(
    'mysql+pymysql://user:pass@localhost/dbname',
    poolclass=QueuePool,
    pool_size=10,           # 常驻连接数
    max_overflow=20,        # 最大额外连接
    pool_timeout=30,        # 获取连接超时时间
    pool_recycle=3600,      # 连接回收时间(防止超时)
    pool_pre_ping=True,     # 连接前ping检测
    echo=False              # 关闭SQL日志(生产环境)
)

SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine
)

# PostgreSQL连接池(使用psycopg2)
engine_pg = create_engine(
    'postgresql+psycopg2://user:pass@localhost/dbname',
    pool_size=20,
    max_overflow=30,
    pool_timeout=30,
    pool_recycle=3600,
    pool_pre_ping=True,
    connect_args={
        'connect_timeout': 10,
        'options': '-c statement_timeout=30000'  # 30秒查询超时
    }
)

# 异步连接池(asyncpg)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

async_engine = create_async_engine(
    'postgresql+asyncpg://user:pass@localhost/dbname',
    pool_size=20,
    max_overflow=30,
    pool_timeout=30,
    pool_recycle=3600,
    echo=False
)

AsyncSessionLocal = sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)

5.3 连接池监控

Python
# 连接池监控工具
class ConnectionPoolMonitor:
    """连接池监控器"""

    def __init__(self, engine):
        self.engine = engine
        self.pool = engine.pool

    def get_pool_stats(self):
        """获取连接池统计信息"""
        return {
            'size': self.pool.size(),           # 当前连接数
            'checked_in': self.pool.checkedin(), # 空闲连接
            'checked_out': self.pool.checkedout(), # 使用中连接
            'overflow': self.pool.overflow(),    # 超出pool_size的连接
        }

    def log_pool_status(self):
        """记录连接池状态"""
        stats = self.get_pool_stats()
        print(f"连接池状态: {stats}")

        # 告警:使用率过高
        usage_rate = stats['checked_out'] / (stats['size'] or 1)
        if usage_rate > 0.8:
            print(f"⚠️ 警告:连接池使用率 {usage_rate:.1%},考虑增加pool_size")

        return stats

# 使用示例
monitor = ConnectionPoolMonitor(engine)
monitor.log_pool_status()

6. 缓存策略

6.1 多级缓存架构

Text Only
┌─────────────────────────────────────────────────────────────┐
│                     多级缓存架构                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   应用层                                                     │
│   ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    │
│   │  本地缓存   │    │  分布式缓存  │    │  数据库缓存  │    │
│   │  (LRU)     │    │  (Redis)    │    │  (Query)    │    │
│   │  < 1ms     │    │  ~ 5ms      │    │  ~ 50ms     │    │
│   └─────────────┘    └─────────────┘    └─────────────┘    │
│         ↑                  ↑                  ↑             │
│         └──────────────────┴──────────────────┘             │
│                         回源策略                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

6.2 缓存实现代码

Python
import functools
import hashlib
import json
from typing import Optional, Any
import redis

class DatabaseCache:
    """数据库查询缓存装饰器"""

    def __init__(self, redis_client: redis.Redis,
                 default_ttl: int = 300,
                 key_prefix: str = "db_cache"):
        self.redis = redis_client
        self.default_ttl = default_ttl
        self.key_prefix = key_prefix

    def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
        """生成缓存key"""
        cache_key = f"{func_name}:{str(args)}:{str(kwargs)}"
        hash_key = hashlib.md5(cache_key.encode()).hexdigest()
        return f"{self.key_prefix}:{hash_key}"

    def cache_query(self, ttl: Optional[int] = None):
        """查询缓存装饰器"""
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):  # *args接收任意位置参数;**kwargs接收任意关键字参数
                cache_key = self._generate_key(func.__name__, args, kwargs)

                # 尝试从缓存获取
                cached = self.redis.get(cache_key)
                if cached:
                    return json.loads(cached)

                # 执行查询
                result = func(*args, **kwargs)

                # 写入缓存
                self.redis.setex(
                    cache_key,
                    ttl or self.default_ttl,
                    json.dumps(result, default=str)
                )

                return result
            return wrapper
        return decorator

    def invalidate_cache(self, pattern: str = "*"):
        """清除缓存"""
        keys = self.redis.keys(f"{self.key_prefix}:{pattern}")
        if keys:
            self.redis.delete(*keys)

# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
cache = DatabaseCache(redis_client, default_ttl=600)

class UserService:
    @cache.cache_query(ttl=300)
    def get_user_stats(self, user_id: int):
        """获取用户统计(缓存5分钟)"""
        # 复杂的数据库查询
        result = db.execute("""
            SELECT u.*, COUNT(o.order_id) as order_count
            FROM users u
            LEFT JOIN orders o ON u.user_id = o.user_id
            WHERE u.user_id = %s
            GROUP BY u.user_id
        """, (user_id,))
        return result.fetchone()

    @cache.cache_query(ttl=60)
    def get_model_predictions(self, model_id: str, limit: int = 100):
        """获取模型预测结果(缓存1分钟)"""
        result = db.execute("""
            SELECT * FROM predictions
            WHERE model_id = %s
            ORDER BY created_at DESC
            LIMIT %s
        """, (model_id, limit))
        return result.fetchall()

6.3 缓存更新策略

Python
class CacheStrategy:
    """缓存策略管理"""

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def cache_aside(self, key: str, db_query_func, ttl: int = 300):
        """旁路缓存(Cache-Aside)策略"""
        # 1. 先查缓存
        value = self.redis.get(key)
        if value:
            return json.loads(value)  # json.loads将JSON字符串转为Python对象

        # 2. 缓存未命中,查数据库
        value = db_query_func()

        # 3. 写入缓存
        if value:
            self.redis.setex(key, ttl, json.dumps(value, default=str))  # json.dumps将Python对象转为JSON字符串

        return value

    def write_through(self, key: str, value: Any,
                      db_write_func, ttl: int = 300):
        """直写(Write-Through)策略"""
        # 1. 先写数据库
        db_write_func(value)

        # 2. 同步写缓存
        self.redis.setex(key, ttl, json.dumps(value, default=str))

    def write_behind(self, key: str, value: Any, ttl: int = 300):
        """回写(Write-Behind)策略"""
        # 1. 先写缓存
        self.redis.setex(key, ttl, json.dumps(value, default=str))

        # 2. 异步写数据库(使用消息队列)
        # message_queue.publish('db_write_queue', {'key': key, 'value': value})
        pass

    def invalidate_on_write(self, key_pattern: str):
        """写入时失效策略"""
        # 数据更新时,删除相关缓存
        keys = self.redis.keys(key_pattern)
        if keys:
            self.redis.delete(*keys)

7. AI场景专项优化

7.1 批量插入优化

Python
# 批量插入优化(模型训练数据)
from sqlalchemy import insert
import time

def batch_insert_optimized(session, table, data_list, batch_size=10000):
    """
    优化的批量插入

    Args:
        session: 数据库会话
        table: 表对象
        data_list: 数据列表
        batch_size: 每批大小
    """
    total = len(data_list)
    inserted = 0
    start_time = time.time()

    try:
        for i in range(0, total, batch_size):
            batch = data_list[i:i + batch_size]

            # 使用批量插入
            session.execute(
                insert(table),
                batch
            )

            # 每10批提交一次
            if (i // batch_size + 1) % 10 == 0:
                session.commit()

            inserted += len(batch)

            if i % (batch_size * 10) == 0:
                elapsed = time.time() - start_time
                rate = inserted / elapsed if elapsed > 0 else 0
                print(f"已插入: {inserted}/{total}, 速率: {rate:.0f} 条/秒")

        # 最后提交
        session.commit()

    except Exception as e:
        session.rollback()
        raise e

    elapsed = time.time() - start_time
    print(f"完成!共插入 {inserted} 条,耗时 {elapsed:.2f} 秒,"
          f"平均 {inserted/elapsed:.0f} 条/秒")

# MySQL专用:LOAD DATA优化
def bulk_load_mysql(connection, table_name, csv_file):
    """使用LOAD DATA进行超大批量导入

    安全提示:table_name 和 csv_file 应来自受信内部配置,
    不可直接接受用户输入,以防止SQL注入。
    DDL语句(ALTER TABLE等)无法使用参数化查询,
    生产环境应通过白名单验证表名。
    """
    cursor = connection.cursor()

    # 临时禁用索引和检查
    cursor.execute(f"ALTER TABLE {table_name} DISABLE KEYS")
    cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
    cursor.execute("SET UNIQUE_CHECKS = 0")
    cursor.execute("SET AUTOCOMMIT = 0")

    # 执行LOAD DATA
    cursor.execute(f"""
        LOAD DATA LOCAL INFILE '{csv_file}'
        INTO TABLE {table_name}
        FIELDS TERMINATED BY ','
        ENCLOSED BY '"'
        LINES TERMINATED BY '\n'
        IGNORE 1 ROWS
    """)

    # 恢复设置
    cursor.execute("COMMIT")
    cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
    cursor.execute("SET UNIQUE_CHECKS = 1")
    cursor.execute(f"ALTER TABLE {table_name} ENABLE KEYS")

    cursor.close()

7.2 特征存储优化

Python
# 特征存储优化方案
class FeatureStore:
    """特征存储优化"""

    def __init__(self, db_engine, redis_client):
        self.db = db_engine
        self.cache = redis_client

    def get_features_online(self, entity_id: str, feature_names: list):
        """
        在线特征获取(低延迟)

        策略:
        1. 优先从Redis获取
        2. 缓存未命中时从数据库获取并缓存
        """
        cache_key = f"features:{entity_id}"

        # 尝试从缓存获取
        cached = self.cache.hmget(cache_key, feature_names)
        if all(cached):
            return dict(zip(feature_names, cached))  # zip并行遍历多个可迭代对象

        # 缓存未命中,查询数据库
        missing_features = [
            name for name, val in zip(feature_names, cached) if val is None
        ]

        query = f"""
            SELECT {', '.join(missing_features)}
            FROM feature_store
            WHERE entity_id = %s
        """
        result = self.db.execute(query, (entity_id,)).fetchone()

        if result:
            # 更新缓存
            feature_dict = dict(result)
            self.cache.hset(cache_key, mapping=feature_dict)
            self.cache.expire(cache_key, 3600)  # 1小时过期

            # 合并结果
            return {**dict(zip(feature_names, cached)), **feature_dict}

        return None

    def get_features_offline(self, entity_ids: list, feature_names: list):
        """
        离线特征获取(批量)

        策略:
        1. 使用批量查询
        2. 分区并行读取
        """
        # 分批查询避免超长IN列表
        batch_size = 1000
        all_results = []

        for i in range(0, len(entity_ids), batch_size):
            batch_ids = entity_ids[i:i + batch_size]

            query = f"""
                SELECT entity_id, {', '.join(feature_names)}
                FROM feature_store
                WHERE entity_id = ANY(%s)
            """

            result = self.db.execute(query, (batch_ids,))
            all_results.extend(result.fetchall())

        return all_results

7.3 模型服务数据库优化

Python
# 模型推理服务的数据库优化
import asyncio
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import text

class ModelServingDB:
    """模型服务数据库优化"""

    def __init__(self, engine):
        self.engine = engine
        self.executor = ThreadPoolExecutor(max_workers=10)

    async def get_prediction_async(self, model_id: str, input_hash: str):  # async def定义异步函数;用await调用
        """异步获取预测结果"""
        loop = asyncio.get_event_loop()

        def _query():
            with self.engine.connect() as conn:
                result = conn.execute(text("""
                    SELECT prediction, confidence, latency_ms
                    FROM model_predictions
                    WHERE model_id = :model_id AND input_hash = :input_hash
                    AND created_at > NOW() - INTERVAL '1 hour'
                """), {"model_id": model_id, "input_hash": input_hash})
                return result.fetchone()

        # 在线程池中执行同步查询
        return await loop.run_in_executor(self.executor, _query)  # await等待异步操作完成

    def batch_log_predictions(self, predictions: list):
        """批量记录预测日志(异步写入)"""
        # 使用COPY或批量INSERT
        # 这里使用后台线程异步写入
        def _insert():
            with self.engine.connect() as conn:
                for pred in predictions:
                    conn.execute(text("""
                        INSERT INTO prediction_logs
                        (model_id, input_data, output_data, latency_ms, created_at)
                        VALUES (:model_id, :input_data, :output_data, :latency_ms, NOW())
                    """), pred)
                conn.commit()

        # 提交到线程池,不阻塞主线程
        self.executor.submit(_insert)

8. 监控与诊断工具

8.1 慢查询日志分析

Python
# 慢查询日志分析工具
import re
from collections import defaultdict  # defaultdict带默认值的字典,避免KeyError
from datetime import datetime

class SlowQueryAnalyzer:
    """慢查询日志分析器"""

    def __init__(self):
        self.queries = []

    def parse_mysql_slow_log(self, log_file):
        """解析MySQL慢查询日志"""
        pattern = re.compile(
            r'# Time: (?P<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z?)\n'
            r'# User@Host: .*\n'
            r'# Query_time: (?P<query_time>[\d.]+) .*\n'
            r'(?P<sql>SELECT|INSERT|UPDATE|DELETE.*?)\n(?=# Time:|$)',
            re.DOTALL | re.IGNORECASE
        )

        with open(log_file, 'r') as f:  # with自动管理资源,确保文件正确关闭
            content = f.read()

        for match in pattern.finditer(content):
            self.queries.append({
                'time': match.group('time'),
                'query_time': float(match.group('query_time')),
                'sql': match.group('sql').strip()
            })

        return self.analyze()

    def analyze(self):
        """分析慢查询"""
        if not self.queries:
            return "没有找到慢查询"

        # 按SQL指纹分组
        fingerprint_stats = defaultdict(lambda: {  # lambda匿名函数:简洁的单行函数
            'count': 0,
            'total_time': 0,
            'max_time': 0,
            'samples': []
        })

        for q in self.queries:
            # 生成SQL指纹(去除具体值)
            fingerprint = self._fingerprint(q['sql'])

            stats = fingerprint_stats[fingerprint]
            stats['count'] += 1
            stats['total_time'] += q['query_time']
            stats['max_time'] = max(stats['max_time'], q['query_time'])
            if len(stats['samples']) < 3:
                stats['samples'].append(q['sql'])

        # 按总耗时排序
        sorted_queries = sorted(
            fingerprint_stats.items(),
            key=lambda x: x[1]['total_time'],
            reverse=True
        )

        # 生成报告
        report = ["=== 慢查询分析报告 ===\n"]
        for i, (fingerprint, stats) in enumerate(sorted_queries[:10], 1):  # enumerate同时获取索引和值
            avg_time = stats['total_time'] / stats['count']
            report.append(
                f"\nTop {i}:"
                f"\n  SQL指纹: {fingerprint[:100]}..."
                f"\n  执行次数: {stats['count']}"
                f"\n  总耗时: {stats['total_time']:.2f}s"
                f"\n  平均耗时: {avg_time:.2f}s"
                f"\n  最大耗时: {stats['max_time']:.2f}s"
                f"\n  示例SQL: {stats['samples'][0][:100]}..."
            )

        return '\n'.join(report)

    def _fingerprint(self, sql: str) -> str:
        """生成SQL指纹"""
        # 移除注释
        sql = re.sub(r'/\*.*?\*/', '', sql)
        sql = re.sub(r'--.*?\n', '', sql)

        # 替换字符串和数字
        sql = re.sub(r"'[^']*'", '?', sql)
        sql = re.sub(r'\b\d+\b', '?', sql)

        # 规范化空白
        sql = ' '.join(sql.split())

        return sql.lower()

# 使用示例
analyzer = SlowQueryAnalyzer()
report = analyzer.parse_mysql_slow_log('/var/log/mysql/slow.log')
print(report)

8.2 实时性能监控

Python
# 实时性能监控
# 需要安装: pip install psutil
import time
import psutil
from dataclasses import dataclass
from typing import List
import threading  # 线程池/多线程:并发执行任务
from sqlalchemy import text

@dataclass  # 自动生成__init__等方法
class DBMetrics:
    """数据库指标"""
    timestamp: float
    connections_active: int
    connections_total: int
    queries_per_second: float
    slow_queries: int
    cache_hit_ratio: float
    disk_io_read: float
    disk_io_write: float

class DBPerformanceMonitor:
    """数据库性能监控器"""

    def __init__(self, engine, interval: int = 5):
        self.engine = engine
        self.interval = interval
        self.metrics_history: List[DBMetrics] = []
        self._running = False
        self._thread = None

    def start(self):
        """启动监控"""
        self._running = True
        self._thread = threading.Thread(target=self._monitor_loop)
        self._thread.daemon = True
        self._thread.start()
        print("性能监控已启动")

    def stop(self):
        """停止监控"""
        self._running = False
        if self._thread:
            self._thread.join()
        print("性能监控已停止")

    def _monitor_loop(self):
        """监控循环"""
        last_queries = 0

        while self._running:
            try:  # try/except捕获异常
                with self.engine.connect() as conn:
                    # 获取MySQL状态
                    result = conn.execute(text("SHOW GLOBAL STATUS"))
                    status = {row[0]: row[1] for row in result}

                    # 计算QPS
                    current_queries = int(status.get('Queries', 0))
                    qps = (current_queries - last_queries) / self.interval
                    last_queries = current_queries

                    # 获取连接数
                    conn_result = conn.execute(text("SHOW PROCESSLIST"))
                    active_connections = conn_result.rowcount

                    # 计算缓冲池命中率(MySQL 8.0+ 使用InnoDB缓冲池指标)
                    buffer_reads = int(status.get('Innodb_buffer_pool_reads', 0))
                    buffer_requests = int(status.get('Innodb_buffer_pool_read_requests', 1))
                    cache_ratio = 1 - (buffer_reads / buffer_requests) if buffer_requests > 0 else 0

                    metrics = DBMetrics(
                        timestamp=time.time(),
                        connections_active=active_connections,
                        connections_total=int(status.get('Threads_connected', 0)),
                        queries_per_second=qps,
                        slow_queries=int(status.get('Slow_queries', 0)),
                        cache_hit_ratio=cache_ratio,
                        disk_io_read=psutil.disk_io_counters().read_bytes,
                        disk_io_write=psutil.disk_io_counters().write_bytes
                    )

                    self.metrics_history.append(metrics)

                    # 保持最近100个数据点
                    if len(self.metrics_history) > 100:
                        self.metrics_history.pop(0)

                    # 告警检查
                    self._check_alerts(metrics)

            except Exception as e:
                print(f"监控异常: {e}")

            time.sleep(self.interval)

    def _check_alerts(self, metrics: DBMetrics):
        """检查告警条件"""
        alerts = []

        if metrics.connections_active > 400:
            alerts.append(f"⚠️ 连接数过高: {metrics.connections_active}")

        if metrics.queries_per_second > 10000:
            alerts.append(f"⚠️ QPS过高: {metrics.queries_per_second:.0f}")

        if metrics.cache_hit_ratio < 0.8:
            alerts.append(f"⚠️ 缓存命中率低: {metrics.cache_hit_ratio:.1%}")

        for alert in alerts:
            print(alert)

    def get_summary(self) -> str:
        """获取监控摘要"""
        if not self.metrics_history:
            return "暂无监控数据"

        recent = self.metrics_history[-10:]
        avg_qps = sum(m.queries_per_second for m in recent) / len(recent)
        max_connections = max(m.connections_active for m in recent)

        return f"""
=== 性能监控摘要 ===
最近 {len(recent)} 次采样:
- 平均QPS: {avg_qps:.1f}
- 最大连接数: {max_connections}
- 当前缓存命中率: {recent[-1].cache_hit_ratio:.1%}
- 慢查询总数: {recent[-1].slow_queries}
        """

9. 本章自测

练习1:EXPLAIN分析

分析以下查询,指出性能问题并优化:

SQL
SELECT * FROM orders o
JOIN users u ON o.user_id = u.user_id  -- JOIN连接多个表
WHERE YEAR(o.created_at) = 2024
  AND u.email LIKE '%@gmail.com'
ORDER BY o.total_amount DESC
LIMIT 100;

练习2:索引设计

为以下AI特征存储表设计最优索引:

SQL
CREATE TABLE feature_store (
    entity_id VARCHAR(64) PRIMARY KEY,
    feature_vector JSON,
    feature_version VARCHAR(20),
    created_at TIMESTAMP,
    updated_at TIMESTAMP
);

查询场景: 1. 根据entity_id精确查询 2. 查询最近7天的特征数据 3. 根据feature_version查询

练习3:连接池配置

根据以下场景配置合适的连接池参数: - 数据库:PostgreSQL - 应用服务器:4台 - 每台服务器并发:100 - 数据库最大连接数:500

练习4:缓存策略

设计一个模型预测结果的缓存方案: - 预测结果需要缓存 - 模型版本更新时缓存失效 - 支持高并发读取


10. 本章小结

核心知识点

  1. SQL优化:使用EXPLAIN分析查询,避免全表扫描,优化分页查询
  2. 索引策略:最左前缀原则,复合索引设计,定期维护
  3. 配置调优:内存参数、连接数、日志配置
  4. 连接池:合理设置大小和超时参数,监控使用率
  5. 缓存策略:多级缓存,合理的失效策略
  6. AI场景:批量插入优化,特征存储设计,异步处理

优化检查清单

Markdown
□ 所有查询都使用EXPLAIN验证执行计划
□ WHERE条件有合适的索引支持
□ 避免SELECT *,只查询需要的列
□ 分页查询使用优化方案
□ 连接池配置合理,监控到位
□ 热点数据有缓存策略
□ 慢查询日志已开启并定期分析
□ 数据库配置参数已根据硬件调整

下一步

完成本章学习后,继续学习 第08章:事务与并发控制,了解数据库的ACID特性、隔离级别和并发控制机制。


参考资源: - MySQL性能优化指南 - PostgreSQL性能调优 - High Performance MySQL (书籍)