07-数据库优化与调优¶
📋 本章概览¶
数据库优化是提升应用性能的关键技能。本章将系统学习从SQL语句到系统配置的全面优化方法,特别针对AI/ML场景下的大数据量、高并发查询需求。
学习目标: - 掌握SQL查询优化技巧 - 理解索引原理与优化策略 - 学会使用性能分析工具 - 掌握数据库配置调优方法 - 了解AI场景下的特殊优化需求
预计学习时间: 6-8小时
前置章节: 第06章:NoSQL数据库
1. 性能优化的整体思路¶
1.1 性能瓶颈的常见位置¶
┌─────────────────────────────────────────────────────────────┐
│ 数据库性能瓶颈分析 │
├─────────────────────────────────────────────────────────────┤
│ 应用层 │ 连接池配置、ORM使用、事务管理 │
├─────────┼───────────────────────────────────────────────────┤
│ SQL层 │ 查询语句、索引使用、锁竞争 │
├─────────┼───────────────────────────────────────────────────┤
│ 存储层 │ 磁盘I/O、内存缓冲、日志写入 │
├─────────┼───────────────────────────────────────────────────┤
│ 硬件层 │ CPU、内存、磁盘、网络 │
└─────────────────────────────────────────────────────────────┘
1.2 优化流程¶
# 数据库优化标准流程
class DatabaseOptimization:
"""
数据库优化流程
"""
def optimization_workflow(self):
steps = [
"1. 监控与诊断:收集性能指标,定位瓶颈",
"2. 分析原因:使用EXPLAIN、慢查询日志等工具",
"3. 制定方案:索引优化、SQL改写、配置调整",
"4. 实施优化:逐步应用优化措施",
"5. 验证效果:对比优化前后的性能指标",
"6. 持续监控:建立长期监控机制"
]
return steps
2. SQL查询优化¶
2.1 使用EXPLAIN分析查询¶
-- 基础EXPLAIN使用
EXPLAIN SELECT * FROM users WHERE age > 25; -- EXPLAIN查看查询执行计划
-- 详细分析(MySQL 8.0+)
EXPLAIN ANALYZE SELECT * FROM users WHERE age > 25;
-- 格式化输出(PostgreSQL)
EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON)
SELECT * FROM users WHERE age > 25;
EXPLAIN输出关键字段解读:
| 字段 | 含义 | 优化建议 |
|---|---|---|
| type | 访问类型 | 优先保证range以上,避免ALL(全表扫描) |
| key | 使用的索引 | 检查是否使用了预期索引 |
| rows | 扫描行数 | 数值越小越好 |
| Extra | 额外信息 | 关注Using filesort、Using temporary |
2.2 常见低效SQL及优化¶
-- ❌ 低效:SELECT *
SELECT * FROM users WHERE age > 25;
-- ✅ 优化:只查询需要的列
SELECT user_id, username, email FROM users WHERE age > 25;
-- ❌ 低效:在索引列上使用函数
SELECT * FROM users WHERE YEAR(created_at) = 2024;
-- ✅ 优化:避免函数操作
SELECT * FROM users
WHERE created_at >= '2024-01-01'
AND created_at < '2025-01-01';
-- ❌ 低效:隐式类型转换
SELECT * FROM users WHERE phone = 13800138000;
-- ✅ 优化:保持类型一致
SELECT * FROM users WHERE phone = '13800138000';
-- ❌ 低效:OR条件导致索引失效
SELECT * FROM users
WHERE age = 25 OR email = 'test@example.com';
-- ✅ 优化:使用UNION
SELECT * FROM users WHERE age = 25
UNION ALL
SELECT * FROM users WHERE email = 'test@example.com';
-- ❌ 低效:NOT IN 子查询
SELECT * FROM orders
WHERE user_id NOT IN (SELECT user_id FROM blacklist); -- 子查询:嵌套在另一个查询中
-- ✅ 优化:使用LEFT JOIN
SELECT o.* FROM orders o
LEFT JOIN blacklist b ON o.user_id = b.user_id
WHERE b.user_id IS NULL;
2.3 分页查询优化¶
-- ❌ 低效:深分页
SELECT * FROM users
ORDER BY created_at DESC
LIMIT 1000000, 10;
-- ✅ 优化1:使用覆盖索引
SELECT * FROM users u
JOIN (
SELECT user_id FROM users
ORDER BY created_at DESC
LIMIT 1000000, 10
) tmp ON u.user_id = tmp.user_id;
-- ✅ 优化2:使用游标/书签分页
SELECT * FROM users
WHERE created_at < '2024-01-01 12:00:00'
ORDER BY created_at DESC
LIMIT 10;
-- ✅ 优化3:使用延迟关联(AI场景常用)
SELECT * FROM model_predictions p
JOIN (
SELECT prediction_id
FROM model_predictions
WHERE model_version = 'v2.0'
ORDER BY confidence DESC
LIMIT 1000
) tmp ON p.prediction_id = tmp.prediction_id;
3. 索引优化策略¶
3.1 索引设计原则¶
-- 1. 为WHERE、JOIN、ORDER BY、GROUP BY的列创建索引
CREATE INDEX idx_user_age ON users(age);
CREATE INDEX idx_order_user ON orders(user_id);
CREATE INDEX idx_product_category ON products(category_id, created_at);
-- 2. 复合索引的最左前缀原则
CREATE INDEX idx_name_age_city ON users(name, age, city);
-- 有效:WHERE name = '张三'
-- 有效:WHERE name = '张三' AND age = 25
-- 有效:WHERE name = '张三' AND age = 25 AND city = '北京'
-- 无效:WHERE age = 25(缺少最左列)
-- 部分有效:WHERE name = '张三' AND city = '北京'(age被跳过)
-- 3. 选择性高的列放在前面
-- 性别选择性低(只有男女),用户名选择性高
CREATE INDEX idx_user_name_gender ON users(username, gender);
-- 4. 避免冗余索引
CREATE INDEX idx_a ON table(col1, col2);
CREATE INDEX idx_b ON table(col1); -- 冗余,被idx_a覆盖
3.2 索引维护¶
-- MySQL索引维护
-- 查看索引使用情况
SELECT
TABLE_NAME,
INDEX_NAME,
CARDINALITY,
TABLE_ROWS,
CARDINALITY / TABLE_ROWS AS selectivity
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = 'your_database';
-- 分析表(更新统计信息)
ANALYZE TABLE users;
-- 优化表(回收空间)
OPTIMIZE TABLE users;
-- 删除未使用的索引(需要开启performance_schema)
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
INDEX_NAME,
COUNT_FETCH,
COUNT_INSERT,
COUNT_UPDATE,
COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
AND COUNT_FETCH = 0
ORDER BY OBJECT_SCHEMA, OBJECT_NAME;
-- PostgreSQL索引维护
-- 查看索引大小和使用情况
SELECT
schemaname,
tablename,
indexname,
pg_size_pretty(pg_relation_size(indexrelid)) as index_size,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY pg_relation_size(indexrelid) DESC;
-- 重建索引
REINDEX INDEX idx_users_email;
-- 清理表和索引
VACUUM ANALYZE users;
3.3 AI场景专用索引策略¶
-- 1. 时间序列数据索引(模型训练日志)
CREATE INDEX idx_training_logs_time -- INDEX索引加速查询
ON training_logs(model_id, created_at DESC);
-- 2. 高维特征向量索引(使用pgvector扩展)
-- 安装:CREATE EXTENSION vector;
CREATE INDEX idx_embeddings_vector
ON embeddings USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100); -- CTE公共表表达式:临时命名结果集
-- 3. JSONB索引(灵活的特征存储)
CREATE INDEX idx_model_config_params
ON model_configs USING GIN (hyperparameters);
-- 4. 部分索引(只索引活跃数据)
CREATE INDEX idx_active_experiments
ON experiments(created_at)
WHERE status = 'running';
-- 5. 函数索引(预计算常用查询)
CREATE INDEX idx_predictions_confidence_rounded
ON predictions(ROUND(confidence::numeric, 2));
4. 数据库配置优化¶
4.1 MySQL关键配置参数¶
# my.cnf 关键配置
[mysqld]
# ===== 内存配置 =====
# InnoDB缓冲池大小(建议物理内存的50-75%)
innodb_buffer_pool_size = 4G
# 每个连接的缓冲区
sort_buffer_size = 2M
read_buffer_size = 2M
read_rnd_buffer_size = 4M
join_buffer_size = 4M
# ===== InnoDB配置 =====
# 日志文件大小(影响恢复速度和写入性能)
innodb_log_file_size = 512M
innodb_log_buffer_size = 64M
# 刷新策略(权衡性能和持久性)
# 0: 每秒刷新,性能最好,可能丢1秒数据
# 1: 每次提交刷新,最安全,性能最差
# 2: 每次提交写入OS缓存,每秒刷新,折中
innodb_flush_log_at_trx_commit = 2
# 刷新方式
innodb_flush_method = O_DIRECT
# I/O线程数
innodb_read_io_threads = 8
innodb_write_io_threads = 8
# ===== 连接配置 =====
max_connections = 500
max_user_connections = 450
wait_timeout = 600
interactive_timeout = 600
# ===== 查询缓存(⚠️ MySQL 8.0+ 已完全移除此功能) =====
#
# 🔴 重要警告:以下配置仅适用于 MySQL 5.7 及更早版本!
#
# MySQL 8.0+ 已彻底移除查询缓存功能,原因包括:
# - 在高并发场景下成为性能瓶颈(全局锁竞争)
# - 查询缓存命中率通常很低
# - 每次数据修改都需要失效相关缓存,开销大
#
# 在 MySQL 8.0+ 配置文件中使用以下参数会导致 MySQL 启动失败!
# 如果你正在使用 MySQL 8.0+,请跳过此配置块。
#
# === 以下配置仅适用于 MySQL 5.7 及更早版本 ===
# query_cache_type = 1
# query_cache_size = 256M
# query_cache_limit = 2M
# === MySQL 5.7 配置结束 ===
#
# 📌 MySQL 8.0+ 替代方案:
# 1. 应用层缓存:使用 Redis/Memcached 缓存热点查询结果
# 2. Buffer Pool 优化:增大 innodb_buffer_pool_size(上面已配置)
# 3. 查询优化:通过索引优化、SQL改写提升查询效率
# 4. 只读副本:将读请求分流到只读副本,减轻主库压力
# ===== 日志配置 =====
slow_query_log = ON
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1 # 记录超过1秒的查询
log_queries_not_using_indexes = ON
4.2 PostgreSQL关键配置参数¶
# postgresql.conf 关键配置
# ===== 内存配置 =====
# 共享缓冲区(建议物理内存的25-40%)
shared_buffers = 2GB
# 有效缓存大小(操作系统缓存+shared_buffers)
effective_cache_size = 6GB
# 工作内存(排序、哈希操作使用)
work_mem = 64MB
# 维护工作内存(VACUUM、CREATE INDEX等)
maintenance_work_mem = 512MB
# ===== WAL配置 =====
# WAL缓冲区
wal_buffers = 16MB
# 检查点间隔
checkpoint_timeout = 10min
max_wal_size = 2GB
min_wal_size = 512MB
# 检查点完成目标(平滑I/O)
checkpoint_completion_target = 0.9
# ===== 并发配置 =====
max_connections = 200
max_worker_processes = 8
max_parallel_workers_per_gather = 4
max_parallel_workers = 8
# ===== 查询规划器 =====
random_page_cost = 1.1 # SSD设置为1.1,HDD保持4.0
effective_io_concurrency = 200 # SSD可以设置更高
# ===== 日志配置 =====
logging_collector = on
log_directory = 'log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_min_duration_statement = 1000 # 记录超过1秒的查询
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
log_checkpoints = on
log_connections = on
log_disconnections = on
log_lock_waits = on
5. 连接池优化¶
行业最佳实践:HikariCP是Java生态中性能最好的连接池,SpringBoot 2.0+已将其作为默认连接池。其核心优势包括: - 更快的连接获取速度(使用CAS无锁算法) - 更小的内存占用 - 更准确的连接超时检测 - 更好的并发性能
5.1 连接池原理¶
┌─────────────────────────────────────────────────────────────┐
│ 连接池工作原理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 应用请求连接 ──→ 连接池 ──→ 空闲连接? ──→ 直接返回 │
│ ↑ └─→ 无空闲 ──→ 创建新连接 │
│ │ └─→ 达到最大? ──→ 等待 │
│ │ │
│ 应用释放连接 ──────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
连接池关键参数说明: - pool_size:常驻连接数,建议设置为 (CPU核心数 × 2) + 有效磁盘数 - max_overflow:最大额外连接,建议为pool_size的50%-100% - pool_timeout:获取连接超时时间,建议10-30秒 - pool_recycle:连接回收时间,防止数据库端超时,建议1小时 - pool_pre_ping:连接前检测,避免使用已断开的连接
5.2 Python连接池配置¶
# SQLAlchemy连接池配置
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
# 生产环境推荐配置
engine = create_engine(
'mysql+pymysql://user:pass@localhost/dbname',
poolclass=QueuePool,
pool_size=10, # 常驻连接数
max_overflow=20, # 最大额外连接
pool_timeout=30, # 获取连接超时时间
pool_recycle=3600, # 连接回收时间(防止超时)
pool_pre_ping=True, # 连接前ping检测
echo=False # 关闭SQL日志(生产环境)
)
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
# PostgreSQL连接池(使用psycopg2)
engine_pg = create_engine(
'postgresql+psycopg2://user:pass@localhost/dbname',
pool_size=20,
max_overflow=30,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True,
connect_args={
'connect_timeout': 10,
'options': '-c statement_timeout=30000' # 30秒查询超时
}
)
# 异步连接池(asyncpg)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
async_engine = create_async_engine(
'postgresql+asyncpg://user:pass@localhost/dbname',
pool_size=20,
max_overflow=30,
pool_timeout=30,
pool_recycle=3600,
echo=False
)
AsyncSessionLocal = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
5.3 连接池监控¶
# 连接池监控工具
class ConnectionPoolMonitor:
"""连接池监控器"""
def __init__(self, engine):
self.engine = engine
self.pool = engine.pool
def get_pool_stats(self):
"""获取连接池统计信息"""
return {
'size': self.pool.size(), # 当前连接数
'checked_in': self.pool.checkedin(), # 空闲连接
'checked_out': self.pool.checkedout(), # 使用中连接
'overflow': self.pool.overflow(), # 超出pool_size的连接
}
def log_pool_status(self):
"""记录连接池状态"""
stats = self.get_pool_stats()
print(f"连接池状态: {stats}")
# 告警:使用率过高
usage_rate = stats['checked_out'] / (stats['size'] or 1)
if usage_rate > 0.8:
print(f"⚠️ 警告:连接池使用率 {usage_rate:.1%},考虑增加pool_size")
return stats
# 使用示例
monitor = ConnectionPoolMonitor(engine)
monitor.log_pool_status()
6. 缓存策略¶
6.1 多级缓存架构¶
┌─────────────────────────────────────────────────────────────┐
│ 多级缓存架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 应用层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 本地缓存 │ │ 分布式缓存 │ │ 数据库缓存 │ │
│ │ (LRU) │ │ (Redis) │ │ (Query) │ │
│ │ < 1ms │ │ ~ 5ms │ │ ~ 50ms │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↑ ↑ ↑ │
│ └──────────────────┴──────────────────┘ │
│ 回源策略 │
│ │
└─────────────────────────────────────────────────────────────┘
6.2 缓存实现代码¶
import functools
import hashlib
import json
from typing import Optional, Any
import redis
class DatabaseCache:
"""数据库查询缓存装饰器"""
def __init__(self, redis_client: redis.Redis,
default_ttl: int = 300,
key_prefix: str = "db_cache"):
self.redis = redis_client
self.default_ttl = default_ttl
self.key_prefix = key_prefix
def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""生成缓存key"""
cache_key = f"{func_name}:{str(args)}:{str(kwargs)}"
hash_key = hashlib.md5(cache_key.encode()).hexdigest()
return f"{self.key_prefix}:{hash_key}"
def cache_query(self, ttl: Optional[int] = None):
"""查询缓存装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs): # *args接收任意位置参数;**kwargs接收任意关键字参数
cache_key = self._generate_key(func.__name__, args, kwargs)
# 尝试从缓存获取
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 执行查询
result = func(*args, **kwargs)
# 写入缓存
self.redis.setex(
cache_key,
ttl or self.default_ttl,
json.dumps(result, default=str)
)
return result
return wrapper
return decorator
def invalidate_cache(self, pattern: str = "*"):
"""清除缓存"""
keys = self.redis.keys(f"{self.key_prefix}:{pattern}")
if keys:
self.redis.delete(*keys)
# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
cache = DatabaseCache(redis_client, default_ttl=600)
class UserService:
@cache.cache_query(ttl=300)
def get_user_stats(self, user_id: int):
"""获取用户统计(缓存5分钟)"""
# 复杂的数据库查询
result = db.execute("""
SELECT u.*, COUNT(o.order_id) as order_count
FROM users u
LEFT JOIN orders o ON u.user_id = o.user_id
WHERE u.user_id = %s
GROUP BY u.user_id
""", (user_id,))
return result.fetchone()
@cache.cache_query(ttl=60)
def get_model_predictions(self, model_id: str, limit: int = 100):
"""获取模型预测结果(缓存1分钟)"""
result = db.execute("""
SELECT * FROM predictions
WHERE model_id = %s
ORDER BY created_at DESC
LIMIT %s
""", (model_id, limit))
return result.fetchall()
6.3 缓存更新策略¶
class CacheStrategy:
"""缓存策略管理"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def cache_aside(self, key: str, db_query_func, ttl: int = 300):
"""旁路缓存(Cache-Aside)策略"""
# 1. 先查缓存
value = self.redis.get(key)
if value:
return json.loads(value) # json.loads将JSON字符串转为Python对象
# 2. 缓存未命中,查数据库
value = db_query_func()
# 3. 写入缓存
if value:
self.redis.setex(key, ttl, json.dumps(value, default=str)) # json.dumps将Python对象转为JSON字符串
return value
def write_through(self, key: str, value: Any,
db_write_func, ttl: int = 300):
"""直写(Write-Through)策略"""
# 1. 先写数据库
db_write_func(value)
# 2. 同步写缓存
self.redis.setex(key, ttl, json.dumps(value, default=str))
def write_behind(self, key: str, value: Any, ttl: int = 300):
"""回写(Write-Behind)策略"""
# 1. 先写缓存
self.redis.setex(key, ttl, json.dumps(value, default=str))
# 2. 异步写数据库(使用消息队列)
# message_queue.publish('db_write_queue', {'key': key, 'value': value})
pass
def invalidate_on_write(self, key_pattern: str):
"""写入时失效策略"""
# 数据更新时,删除相关缓存
keys = self.redis.keys(key_pattern)
if keys:
self.redis.delete(*keys)
7. AI场景专项优化¶
7.1 批量插入优化¶
# 批量插入优化(模型训练数据)
from sqlalchemy import insert
import time
def batch_insert_optimized(session, table, data_list, batch_size=10000):
"""
优化的批量插入
Args:
session: 数据库会话
table: 表对象
data_list: 数据列表
batch_size: 每批大小
"""
total = len(data_list)
inserted = 0
start_time = time.time()
try:
for i in range(0, total, batch_size):
batch = data_list[i:i + batch_size]
# 使用批量插入
session.execute(
insert(table),
batch
)
# 每10批提交一次
if (i // batch_size + 1) % 10 == 0:
session.commit()
inserted += len(batch)
if i % (batch_size * 10) == 0:
elapsed = time.time() - start_time
rate = inserted / elapsed if elapsed > 0 else 0
print(f"已插入: {inserted}/{total}, 速率: {rate:.0f} 条/秒")
# 最后提交
session.commit()
except Exception as e:
session.rollback()
raise e
elapsed = time.time() - start_time
print(f"完成!共插入 {inserted} 条,耗时 {elapsed:.2f} 秒,"
f"平均 {inserted/elapsed:.0f} 条/秒")
# MySQL专用:LOAD DATA优化
def bulk_load_mysql(connection, table_name, csv_file):
"""使用LOAD DATA进行超大批量导入
安全提示:table_name 和 csv_file 应来自受信内部配置,
不可直接接受用户输入,以防止SQL注入。
DDL语句(ALTER TABLE等)无法使用参数化查询,
生产环境应通过白名单验证表名。
"""
cursor = connection.cursor()
# 临时禁用索引和检查
cursor.execute(f"ALTER TABLE {table_name} DISABLE KEYS")
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
cursor.execute("SET UNIQUE_CHECKS = 0")
cursor.execute("SET AUTOCOMMIT = 0")
# 执行LOAD DATA
cursor.execute(f"""
LOAD DATA LOCAL INFILE '{csv_file}'
INTO TABLE {table_name}
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS
""")
# 恢复设置
cursor.execute("COMMIT")
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
cursor.execute("SET UNIQUE_CHECKS = 1")
cursor.execute(f"ALTER TABLE {table_name} ENABLE KEYS")
cursor.close()
7.2 特征存储优化¶
# 特征存储优化方案
class FeatureStore:
"""特征存储优化"""
def __init__(self, db_engine, redis_client):
self.db = db_engine
self.cache = redis_client
def get_features_online(self, entity_id: str, feature_names: list):
"""
在线特征获取(低延迟)
策略:
1. 优先从Redis获取
2. 缓存未命中时从数据库获取并缓存
"""
cache_key = f"features:{entity_id}"
# 尝试从缓存获取
cached = self.cache.hmget(cache_key, feature_names)
if all(cached):
return dict(zip(feature_names, cached)) # zip并行遍历多个可迭代对象
# 缓存未命中,查询数据库
missing_features = [
name for name, val in zip(feature_names, cached) if val is None
]
query = f"""
SELECT {', '.join(missing_features)}
FROM feature_store
WHERE entity_id = %s
"""
result = self.db.execute(query, (entity_id,)).fetchone()
if result:
# 更新缓存
feature_dict = dict(result)
self.cache.hset(cache_key, mapping=feature_dict)
self.cache.expire(cache_key, 3600) # 1小时过期
# 合并结果
return {**dict(zip(feature_names, cached)), **feature_dict}
return None
def get_features_offline(self, entity_ids: list, feature_names: list):
"""
离线特征获取(批量)
策略:
1. 使用批量查询
2. 分区并行读取
"""
# 分批查询避免超长IN列表
batch_size = 1000
all_results = []
for i in range(0, len(entity_ids), batch_size):
batch_ids = entity_ids[i:i + batch_size]
query = f"""
SELECT entity_id, {', '.join(feature_names)}
FROM feature_store
WHERE entity_id = ANY(%s)
"""
result = self.db.execute(query, (batch_ids,))
all_results.extend(result.fetchall())
return all_results
7.3 模型服务数据库优化¶
# 模型推理服务的数据库优化
import asyncio
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import text
class ModelServingDB:
"""模型服务数据库优化"""
def __init__(self, engine):
self.engine = engine
self.executor = ThreadPoolExecutor(max_workers=10)
async def get_prediction_async(self, model_id: str, input_hash: str): # async def定义异步函数;用await调用
"""异步获取预测结果"""
loop = asyncio.get_event_loop()
def _query():
with self.engine.connect() as conn:
result = conn.execute(text("""
SELECT prediction, confidence, latency_ms
FROM model_predictions
WHERE model_id = :model_id AND input_hash = :input_hash
AND created_at > NOW() - INTERVAL '1 hour'
"""), {"model_id": model_id, "input_hash": input_hash})
return result.fetchone()
# 在线程池中执行同步查询
return await loop.run_in_executor(self.executor, _query) # await等待异步操作完成
def batch_log_predictions(self, predictions: list):
"""批量记录预测日志(异步写入)"""
# 使用COPY或批量INSERT
# 这里使用后台线程异步写入
def _insert():
with self.engine.connect() as conn:
for pred in predictions:
conn.execute(text("""
INSERT INTO prediction_logs
(model_id, input_data, output_data, latency_ms, created_at)
VALUES (:model_id, :input_data, :output_data, :latency_ms, NOW())
"""), pred)
conn.commit()
# 提交到线程池,不阻塞主线程
self.executor.submit(_insert)
8. 监控与诊断工具¶
8.1 慢查询日志分析¶
# 慢查询日志分析工具
import re
from collections import defaultdict # defaultdict带默认值的字典,避免KeyError
from datetime import datetime
class SlowQueryAnalyzer:
"""慢查询日志分析器"""
def __init__(self):
self.queries = []
def parse_mysql_slow_log(self, log_file):
"""解析MySQL慢查询日志"""
pattern = re.compile(
r'# Time: (?P<time>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z?)\n'
r'# User@Host: .*\n'
r'# Query_time: (?P<query_time>[\d.]+) .*\n'
r'(?P<sql>SELECT|INSERT|UPDATE|DELETE.*?)\n(?=# Time:|$)',
re.DOTALL | re.IGNORECASE
)
with open(log_file, 'r') as f: # with自动管理资源,确保文件正确关闭
content = f.read()
for match in pattern.finditer(content):
self.queries.append({
'time': match.group('time'),
'query_time': float(match.group('query_time')),
'sql': match.group('sql').strip()
})
return self.analyze()
def analyze(self):
"""分析慢查询"""
if not self.queries:
return "没有找到慢查询"
# 按SQL指纹分组
fingerprint_stats = defaultdict(lambda: { # lambda匿名函数:简洁的单行函数
'count': 0,
'total_time': 0,
'max_time': 0,
'samples': []
})
for q in self.queries:
# 生成SQL指纹(去除具体值)
fingerprint = self._fingerprint(q['sql'])
stats = fingerprint_stats[fingerprint]
stats['count'] += 1
stats['total_time'] += q['query_time']
stats['max_time'] = max(stats['max_time'], q['query_time'])
if len(stats['samples']) < 3:
stats['samples'].append(q['sql'])
# 按总耗时排序
sorted_queries = sorted(
fingerprint_stats.items(),
key=lambda x: x[1]['total_time'],
reverse=True
)
# 生成报告
report = ["=== 慢查询分析报告 ===\n"]
for i, (fingerprint, stats) in enumerate(sorted_queries[:10], 1): # enumerate同时获取索引和值
avg_time = stats['total_time'] / stats['count']
report.append(
f"\nTop {i}:"
f"\n SQL指纹: {fingerprint[:100]}..."
f"\n 执行次数: {stats['count']}"
f"\n 总耗时: {stats['total_time']:.2f}s"
f"\n 平均耗时: {avg_time:.2f}s"
f"\n 最大耗时: {stats['max_time']:.2f}s"
f"\n 示例SQL: {stats['samples'][0][:100]}..."
)
return '\n'.join(report)
def _fingerprint(self, sql: str) -> str:
"""生成SQL指纹"""
# 移除注释
sql = re.sub(r'/\*.*?\*/', '', sql)
sql = re.sub(r'--.*?\n', '', sql)
# 替换字符串和数字
sql = re.sub(r"'[^']*'", '?', sql)
sql = re.sub(r'\b\d+\b', '?', sql)
# 规范化空白
sql = ' '.join(sql.split())
return sql.lower()
# 使用示例
analyzer = SlowQueryAnalyzer()
report = analyzer.parse_mysql_slow_log('/var/log/mysql/slow.log')
print(report)
8.2 实时性能监控¶
# 实时性能监控
# 需要安装: pip install psutil
import time
import psutil
from dataclasses import dataclass
from typing import List
import threading # 线程池/多线程:并发执行任务
from sqlalchemy import text
@dataclass # 自动生成__init__等方法
class DBMetrics:
"""数据库指标"""
timestamp: float
connections_active: int
connections_total: int
queries_per_second: float
slow_queries: int
cache_hit_ratio: float
disk_io_read: float
disk_io_write: float
class DBPerformanceMonitor:
"""数据库性能监控器"""
def __init__(self, engine, interval: int = 5):
self.engine = engine
self.interval = interval
self.metrics_history: List[DBMetrics] = []
self._running = False
self._thread = None
def start(self):
"""启动监控"""
self._running = True
self._thread = threading.Thread(target=self._monitor_loop)
self._thread.daemon = True
self._thread.start()
print("性能监控已启动")
def stop(self):
"""停止监控"""
self._running = False
if self._thread:
self._thread.join()
print("性能监控已停止")
def _monitor_loop(self):
"""监控循环"""
last_queries = 0
while self._running:
try: # try/except捕获异常
with self.engine.connect() as conn:
# 获取MySQL状态
result = conn.execute(text("SHOW GLOBAL STATUS"))
status = {row[0]: row[1] for row in result}
# 计算QPS
current_queries = int(status.get('Queries', 0))
qps = (current_queries - last_queries) / self.interval
last_queries = current_queries
# 获取连接数
conn_result = conn.execute(text("SHOW PROCESSLIST"))
active_connections = conn_result.rowcount
# 计算缓冲池命中率(MySQL 8.0+ 使用InnoDB缓冲池指标)
buffer_reads = int(status.get('Innodb_buffer_pool_reads', 0))
buffer_requests = int(status.get('Innodb_buffer_pool_read_requests', 1))
cache_ratio = 1 - (buffer_reads / buffer_requests) if buffer_requests > 0 else 0
metrics = DBMetrics(
timestamp=time.time(),
connections_active=active_connections,
connections_total=int(status.get('Threads_connected', 0)),
queries_per_second=qps,
slow_queries=int(status.get('Slow_queries', 0)),
cache_hit_ratio=cache_ratio,
disk_io_read=psutil.disk_io_counters().read_bytes,
disk_io_write=psutil.disk_io_counters().write_bytes
)
self.metrics_history.append(metrics)
# 保持最近100个数据点
if len(self.metrics_history) > 100:
self.metrics_history.pop(0)
# 告警检查
self._check_alerts(metrics)
except Exception as e:
print(f"监控异常: {e}")
time.sleep(self.interval)
def _check_alerts(self, metrics: DBMetrics):
"""检查告警条件"""
alerts = []
if metrics.connections_active > 400:
alerts.append(f"⚠️ 连接数过高: {metrics.connections_active}")
if metrics.queries_per_second > 10000:
alerts.append(f"⚠️ QPS过高: {metrics.queries_per_second:.0f}")
if metrics.cache_hit_ratio < 0.8:
alerts.append(f"⚠️ 缓存命中率低: {metrics.cache_hit_ratio:.1%}")
for alert in alerts:
print(alert)
def get_summary(self) -> str:
"""获取监控摘要"""
if not self.metrics_history:
return "暂无监控数据"
recent = self.metrics_history[-10:]
avg_qps = sum(m.queries_per_second for m in recent) / len(recent)
max_connections = max(m.connections_active for m in recent)
return f"""
=== 性能监控摘要 ===
最近 {len(recent)} 次采样:
- 平均QPS: {avg_qps:.1f}
- 最大连接数: {max_connections}
- 当前缓存命中率: {recent[-1].cache_hit_ratio:.1%}
- 慢查询总数: {recent[-1].slow_queries}
"""
9. 本章自测¶
练习1:EXPLAIN分析¶
分析以下查询,指出性能问题并优化:
SELECT * FROM orders o
JOIN users u ON o.user_id = u.user_id -- JOIN连接多个表
WHERE YEAR(o.created_at) = 2024
AND u.email LIKE '%@gmail.com'
ORDER BY o.total_amount DESC
LIMIT 100;
练习2:索引设计¶
为以下AI特征存储表设计最优索引:
CREATE TABLE feature_store (
entity_id VARCHAR(64) PRIMARY KEY,
feature_vector JSON,
feature_version VARCHAR(20),
created_at TIMESTAMP,
updated_at TIMESTAMP
);
查询场景: 1. 根据entity_id精确查询 2. 查询最近7天的特征数据 3. 根据feature_version查询
练习3:连接池配置¶
根据以下场景配置合适的连接池参数: - 数据库:PostgreSQL - 应用服务器:4台 - 每台服务器并发:100 - 数据库最大连接数:500
练习4:缓存策略¶
设计一个模型预测结果的缓存方案: - 预测结果需要缓存 - 模型版本更新时缓存失效 - 支持高并发读取
10. 本章小结¶
核心知识点¶
- SQL优化:使用EXPLAIN分析查询,避免全表扫描,优化分页查询
- 索引策略:最左前缀原则,复合索引设计,定期维护
- 配置调优:内存参数、连接数、日志配置
- 连接池:合理设置大小和超时参数,监控使用率
- 缓存策略:多级缓存,合理的失效策略
- AI场景:批量插入优化,特征存储设计,异步处理
优化检查清单¶
□ 所有查询都使用EXPLAIN验证执行计划
□ WHERE条件有合适的索引支持
□ 避免SELECT *,只查询需要的列
□ 分页查询使用优化方案
□ 连接池配置合理,监控到位
□ 热点数据有缓存策略
□ 慢查询日志已开启并定期分析
□ 数据库配置参数已根据硬件调整
下一步¶
完成本章学习后,继续学习 第08章:事务与并发控制,了解数据库的ACID特性、隔离级别和并发控制机制。
参考资源: - MySQL性能优化指南 - PostgreSQL性能调优 - High Performance MySQL (书籍)