跳转至

项目11: 数据库性能优化实战

难度: ⭐⭐⭐⭐ 较难 时间: 15-20小时 涉及知识: 索引优化、查询优化、缓存策略、性能监控


📖 项目概述

项目背景

随着数据量的增长,数据库性能成为系统瓶颈。性能优化是数据库管理的核心任务,直接影响用户体验和系统成本。本项目将带你从零开始构建一个完整的数据库性能优化系统。

项目目标

构建一个完整的数据库性能优化系统,能够: - 分析慢查询 - 优化索引策略 - 实施查询优化 - 配置缓存策略 - 监控数据库性能 - 自动化性能调优

技术栈

  • 数据库: MySQL, PostgreSQL
  • 监控工具: Prometheus, Grafana
  • 分析工具: EXPLAIN, pg_stat_statements
  • 缓存: Redis, Memcached
  • 编程语言: Python, SQL

🏗️ 项目结构

Text Only
database-optimization/
├── config/                  # 配置文件
│   ├── mysql.cnf         # MySQL配置
│   ├── postgresql.conf   # PostgreSQL配置
│   └── redis.conf        # Redis配置
├── scripts/                 # 脚本目录
│   ├── __init__.py
│   ├── analyzer.py        # 慢查询分析
│   ├── optimizer.py        # 查询优化
│   ├── indexer.py         # 索引管理
│   └── monitor.py         # 性能监控
├── utils/                   # 工具函数
│   ├── __init__.py
│   ├── db_connector.py   # 数据库连接
│   ├── query_builder.py  # 查询构建器
│   └── metrics.py        # 指标收集
├── monitoring/              # 监控配置
│   ├── prometheus.yml    # Prometheus配置
│   └── grafana/         # Grafana仪表板
├── tests/                   # 测试目录
│   ├── test_analyzer.py
│   ├── test_optimizer.py
│   └── test_monitor.py
├── main.py                  # 主程序
├── requirements.txt          # 依赖文件
└── README.md                # 项目说明

🎯 核心功能

1. 慢查询分析

  • 慢查询日志: 收集慢查询日志
  • 查询分析: 分析查询执行计划
  • 瓶颈识别: 识别性能瓶颈
  • 优化建议: 提供优化建议

2. 索引优化

  • 索引分析: 分析索引使用情况
  • 索引推荐: 推荐合适的索引
  • 索引创建: 自动创建索引
  • 索引维护: 定期维护索引

3. 查询优化

  • 查询重写: 重写低效查询
  • JOIN优化: 优化JOIN操作
  • 子查询优化: 优化子查询
  • 分页优化: 优化分页查询

4. 缓存策略

  • 查询缓存: 缓存查询结果
  • 对象缓存: 缓存数据库对象
  • 缓存失效: 管理缓存失效
  • 缓存预热: 预热常用数据

5. 性能监控

  • 实时监控: 实时监控数据库指标
  • 告警机制: 性能异常告警
  • 趋势分析: 分析性能趋势
  • 报表生成: 生成性能报表

💻 代码实现

1. 数据库连接器 (utils/db_connector.py)

Python
"""
数据库连接器
"""
import mysql.connector
import psycopg2
from typing import Optional, Dict, Any
import logging

class DatabaseConnector:
    """数据库连接器"""

    def __init__(self, db_type: str, config: Dict[str, Any]):
        """
        初始化连接器

        Args:
            db_type: 数据库类型 (mysql, postgresql)
            config: 连接配置
        """
        self.db_type = db_type
        self.config = config
        self.connection = None
        self.logger = logging.getLogger(__name__)

    def connect(self):
        """建立数据库连接"""
        try:
            if self.db_type == 'mysql':
                self.connection = mysql.connector.connect(**self.config)
            elif self.db_type == 'postgresql':
                self.connection = psycopg2.connect(**self.config)
            else:
                raise ValueError(f"不支持的数据库类型: {self.db_type}")

            self.logger.info(f"成功连接到 {self.db_type} 数据库")
            return self.connection

        except Exception as e:
            self.logger.error(f"数据库连接失败: {e}")
            raise

    def disconnect(self):
        """断开数据库连接"""
        if self.connection:
            self.connection.close()
            self.logger.info("数据库连接已关闭")

    def execute_query(self, query: str, params: Optional[tuple] = None):
        """
        执行查询

        Args:
            query: SQL查询
            params: 查询参数

        Returns:
            查询结果
        """
        cursor = self.connection.cursor(dictionary=True)

        try:
            cursor.execute(query, params or ())

            if query.strip().upper().startswith('SELECT'):
                results = cursor.fetchall()
                return results
            else:
                self.connection.commit()
                return cursor.rowcount

        except Exception as e:
            self.connection.rollback()
            self.logger.error(f"查询执行失败: {e}")
            raise

        finally:
            cursor.close()

    def execute_batch(self, query: str, params_list: list):
        """
        批量执行查询

        Args:
            query: SQL查询
            params_list: 参数列表

        Returns:
            影响的行数
        """
        cursor = self.connection.cursor(dictionary=True)

        try:
            cursor.executemany(query, params_list)
            self.connection.commit()
            return cursor.rowcount

        except Exception as e:
            self.connection.rollback()
            self.logger.error(f"批量查询执行失败: {e}")
            raise

        finally:
            cursor.close()

2. 慢查询分析器 (scripts/analyzer.py)

Python
"""
慢查询分析器
"""
import re
from typing import List, Dict, Any
from datetime import datetime

class SlowQueryAnalyzer:
    """慢查询分析器"""

    def __init__(self, db_connector):
        """
        初始化分析器

        Args:
            db_connector: 数据库连接器
        """
        self.db = db_connector

    def analyze_slow_queries(
        self,
        threshold: float = 1.0,
        limit: int = 100,
    ) -> List[Dict[str, Any]]:
        """
        分析慢查询

        Args:
            threshold: 慢查询阈值(秒)
            limit: 返回结果数量限制

        Returns:
            慢查询列表
        """
        if self.db.db_type == 'mysql':
            return self._analyze_mysql_slow_queries(threshold, limit)
        elif self.db.db_type == 'postgresql':
            return self._analyze_postgresql_slow_queries(threshold, limit)
        else:
            raise ValueError(f"不支持的数据库类型: {self.db.db_type}")

    def _analyze_mysql_slow_queries(
        self,
        threshold: float,
        limit: int,
    ) -> List[Dict[str, Any]]:
        """分析MySQL慢查询"""
        query = """
        SELECT
            query_time,
            lock_time,
            rows_sent,
            rows_examined,
            sql_text
        FROM mysql.slow_log
        WHERE query_time >= %s
        ORDER BY query_time DESC
        LIMIT %s
        """

        results = self.db.execute_query(query, (threshold, limit))

        # 分析每个查询
        analyzed_queries = []
        for result in results:
            analyzed = self._analyze_query(result)
            analyzed_queries.append(analyzed)

        return analyzed_queries

    def _analyze_postgresql_slow_queries(
        self,
        threshold: float,
        limit: int,
    ) -> List[Dict[str, Any]]:
        """分析PostgreSQL慢查询"""
        query = """
        SELECT
            query,
            calls,
            total_time,
            mean_time,
            rows
        FROM pg_stat_statements
        WHERE mean_time >= %s
        ORDER BY mean_time DESC
        LIMIT %s
        """

        results = self.db.execute_query(query, (threshold, limit))

        # 分析每个查询
        analyzed_queries = []
        for result in results:
            analyzed = self._analyze_query(result)
            analyzed_queries.append(analyzed)

        return analyzed_queries

    def _analyze_query(self, query_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        分析单个查询

        Args:
            query_data: 查询数据

        Returns:
            分析结果
        """
        analysis = {
            'query': query_data.get('sql_text') or query_data.get('query'),
            'query_time': query_data.get('query_time') or query_data.get('total_time'),
            'rows_examined': query_data.get('rows_examined') or query_data.get('rows'),
            'issues': [],
            'recommendations': [],
        }

        # 检查全表扫描
        if analysis['rows_examined'] > 10000:
            analysis['issues'].append('全表扫描')
            analysis['recommendations'].append('添加合适的索引')

        # 检查查询时间
        if analysis['query_time'] > 5.0:
            analysis['issues'].append('查询时间过长')
            analysis['recommendations'].append('优化查询或添加索引')

        # 检查SELECT *
        if 'SELECT *' in analysis['query'].upper():
            analysis['issues'].append('使用SELECT *')
            analysis['recommendations'].append('只选择需要的列')

        # 检查LIKE操作
        if re.search(r'LIKE\s+\'%\w+%', analysis['query'], re.IGNORECASE):
            analysis['issues'].append('使用前导通配符的LIKE')
            analysis['recommendations'].append('使用全文索引或避免前导通配符')

        return analysis

    def get_explain_plan(self, query: str) -> Dict[str, Any]:
        """
        获取查询执行计划

        Args:
            query: SQL查询

        Returns:
            执行计划
        """
        explain_query = f"EXPLAIN {query}"
        results = self.db.execute_query(explain_query)

        return {
            'query': query,
            'plan': results,
        }

3. 索引管理器 (scripts/indexer.py)

Python
"""
索引管理器
"""
from typing import List, Dict, Any

class IndexManager:
    """索引管理器"""

    def __init__(self, db_connector):
        """
        初始化索引管理器

        Args:
            db_connector: 数据库连接器
        """
        self.db = db_connector

    def analyze_index_usage(self, table_name: str) -> Dict[str, Any]:
        """
        分析索引使用情况

        Args:
            table_name: 表名

        Returns:
            索引使用分析
        """
        if self.db.db_type == 'mysql':
            return self._analyze_mysql_index_usage(table_name)
        elif self.db.db_type == 'postgresql':
            return self._analyze_postgresql_index_usage(table_name)
        else:
            raise ValueError(f"不支持的数据库类型: {self.db.db_type}")

    def _analyze_mysql_index_usage(self, table_name: str) -> Dict[str, Any]:
        """分析MySQL索引使用"""
        query = """
        SELECT
            INDEX_NAME,
            COLUMN_NAME,
            CARDINALITY,
            NULLABLE
        FROM INFORMATION_SCHEMA.STATISTICS
        WHERE TABLE_NAME = %s
        ORDER BY INDEX_NAME, SEQ_IN_INDEX
        """

        results = self.db.execute_query(query, (table_name,))

        # 组织索引信息
        indexes = {}
        for result in results:
            index_name = result['INDEX_NAME']
            if index_name not in indexes:
                indexes[index_name] = {
                    'columns': [],
                    'cardinality': result['CARDINALITY'],
                    'nullable': result['NULLABLE'],
                }
            indexes[index_name]['columns'].append(result['COLUMN_NAME'])

        return {
            'table': table_name,
            'indexes': indexes,
        }

    def _analyze_postgresql_index_usage(self, table_name: str) -> Dict[str, Any]:
        """分析PostgreSQL索引使用"""
        query = """
        SELECT
            indexname,
            indexdef
        FROM pg_indexes
        WHERE tablename = %s
        """

        results = self.db.execute_query(query, (table_name,))

        indexes = {}
        for result in results:
            indexes[result['indexname']] = {
                'definition': result['indexdef'],
            }

        return {
            'table': table_name,
            'indexes': indexes,
        }

    def recommend_indexes(self, table_name: str) -> List[Dict[str, Any]]:
        """
        推荐索引

        Args:
            table_name: 表名

        Returns:
            推荐的索引列表
        """
        # 获取表的查询模式
        query_patterns = self._get_query_patterns(table_name)

        # 分析查询模式
        recommendations = []
        for pattern in query_patterns:
            if pattern['type'] == 'WHERE':
                recommendations.append({
                    'type': 'index',
                    'columns': pattern['columns'],
                    'reason': 'WHERE子句中频繁使用的列',
                })
            elif pattern['type'] == 'JOIN':
                recommendations.append({
                    'type': 'index',
                    'columns': pattern['columns'],
                    'reason': 'JOIN操作中使用的列',
                })
            elif pattern['type'] == 'ORDER BY':
                recommendations.append({
                    'type': 'index',
                    'columns': pattern['columns'],
                    'reason': 'ORDER BY子句中使用的列',
                })

        return recommendations

    def _get_query_patterns(self, table_name: str) -> List[Dict[str, Any]]:
        """
        获取查询模式

        Args:
            table_name: 表名

        Returns:
            查询模式列表
        """
        # 这里简化实现,实际应该从慢查询日志中分析
        # 返回模拟的查询模式
        return [
            {'type': 'WHERE', 'columns': ['id', 'status']},
            {'type': 'JOIN', 'columns': ['user_id']},
            {'type': 'ORDER BY', 'columns': ['created_at']},
        ]

    def create_index(
        self,
        table_name: str,
        index_name: str,
        columns: List[str],
        unique: bool = False,
    ):
        """
        创建索引

        Args:
            table_name: 表名
            index_name: 索引名
            columns: 列名列表
            unique: 是否唯一索引
        """
        unique_str = "UNIQUE " if unique else ""
        columns_str = ", ".join(columns)

        query = f"""
        CREATE {unique_str} INDEX {index_name}
        ON {table_name} ({columns_str})
        """

        try:
            self.db.execute_query(query)
            print(f"✓ 成功创建索引: {index_name}")
        except Exception as e:
            print(f"✗ 创建索引失败: {e}")

4. 查询优化器 (scripts/optimizer.py)

Python
"""
查询优化器
"""
import re
from typing import Dict, Any

class QueryOptimizer:
    """查询优化器"""

    def __init__(self, db_connector):
        """
        初始化优化器

        Args:
            db_connector: 数据库连接器
        """
        self.db = db_connector

    def optimize_query(self, query: str) -> Dict[str, Any]:
        """
        优化查询

        Args:
            query: SQL查询

        Returns:
            优化结果
        """
        original_query = query
        optimized_query = query
        optimizations = []

        # 优化1: 移除SELECT *
        if 'SELECT *' in optimized_query.upper():
            optimized_query = self._replace_select_star(optimized_query)
            optimizations.append('移除SELECT *,只选择需要的列')

        # 优化2: 添加LIMIT
        if not re.search(r'\bLIMIT\b', optimized_query.upper()):
            optimized_query = self._add_limit(optimized_query)
            optimizations.append('添加LIMIT限制结果数量')

        # 优化3: 优化JOIN顺序
        if 'JOIN' in optimized_query.upper():
            optimized_query = self._optimize_joins(optimized_query)
            optimizations.append('优化JOIN顺序')

        # 优化4: 使用索引提示
        optimized_query = self._add_index_hints(optimized_query)
        optimizations.append('添加索引提示')

        return {
            'original_query': original_query,
            'optimized_query': optimized_query,
            'optimizations': optimizations,
        }

    def _replace_select_star(self, query: str) -> str:
        """
        替换SELECT *

        Args:
            query: SQL查询

        Returns:
            优化后的查询
        """
        # 这里简化实现,实际应该分析表结构
        # 返回模拟的优化查询
        return query.replace('SELECT *', 'SELECT id, name, status')

    def _add_limit(self, query: str) -> str:
        """
        添加LIMIT

        Args:
            query: SQL查询

        Returns:
            优化后的查询
        """
        # 在查询末尾添加LIMIT
        if query.strip().endswith(';'):
            return query.rstrip(';') + ' LIMIT 1000;'
        else:
            return query + ' LIMIT 1000'

    def _optimize_joins(self, query: str) -> str:
        """
        优化JOIN

        Args:
            query: SQL查询

        Returns:
            优化后的查询
        """
        # 这里简化实现,实际应该分析表大小和选择性
        return query

    def _add_index_hints(self, query: str) -> str:
        """
        添加索引提示

        Args:
            query: SQL查询

        Returns:
            优化后的查询
        """
        # 这里简化实现,实际应该分析索引使用情况
        return query

    def compare_queries(
        self,
        query1: str,
        query2: str,
    ) -> Dict[str, Any]:
        """
        比较两个查询的性能

        Args:
            query1: 查询1
            query2: 查询2

        Returns:
            比较结果
        """
        # 获取执行计划
        plan1 = self.db.execute_query(f"EXPLAIN {query1}")
        plan2 = self.db.execute_query(f"EXPLAIN {query2}")

        # 执行查询并计时
        import time

        start_time = time.time()
        self.db.execute_query(query1)
        time1 = time.time() - start_time

        start_time = time.time()
        self.db.execute_query(query2)
        time2 = time.time() - start_time

        return {
            'query1': {
                'query': query1,
                'time': time1,
                'plan': plan1,
            },
            'query2': {
                'query': query2,
                'time': time2,
                'plan': plan2,
            },
            'comparison': {
                'faster': 'query1' if time1 < time2 else 'query2',
                'improvement': abs(time1 - time2) / max(time1, time2) * 100,
            },
        }

5. 性能监控器 (scripts/monitor.py)

Python
"""
性能监控器
"""
import time
from typing import Dict, Any, List
from datetime import datetime
import json

class PerformanceMonitor:
    """性能监控器"""

    def __init__(self, db_connector):
        """
        初始化监控器

        Args:
            db_connector: 数据库连接器
        """
        self.db = db_connector
        self.metrics = []

    def collect_metrics(self) -> Dict[str, Any]:
        """
        收集数据库指标

        Returns:
            指标字典
        """
        if self.db.db_type == 'mysql':
            return self._collect_mysql_metrics()
        elif self.db.db_type == 'postgresql':
            return self._collect_postgresql_metrics()
        else:
            raise ValueError(f"不支持的数据库类型: {self.db.db_type}")

    def _collect_mysql_metrics(self) -> Dict[str, Any]:
        """收集MySQL指标"""
        metrics = {}

        # 连接数
        query = "SHOW STATUS LIKE 'Threads_connected'"
        result = self.db.execute_query(query)
        metrics['connections'] = int(result[0]['Value']) if result else 0

        # 查询数
        query = "SHOW STATUS LIKE 'Questions'"
        result = self.db.execute_query(query)
        metrics['queries'] = int(result[0]['Value']) if result else 0

        # 慢查询数
        query = "SHOW STATUS LIKE 'Slow_queries'"
        result = self.db.execute_query(query)
        metrics['slow_queries'] = int(result[0]['Value']) if result else 0

        # 缓冲池命中率
        query = "SHOW STATUS LIKE 'Innodb_buffer_pool_read%'"
        results = self.db.execute_query(query)
        buffer_pool_reads = 0
        buffer_pool_read_requests = 0

        for result in results:
            if 'read_requests' in result['Variable_name']:
                buffer_pool_read_requests = int(result['Value'])
            elif 'reads' in result['Variable_name']:
                buffer_pool_reads = int(result['Value'])

        if buffer_pool_read_requests > 0:
            metrics['buffer_pool_hit_rate'] = \
                (1 - buffer_pool_reads / buffer_pool_read_requests) * 100
        else:
            metrics['buffer_pool_hit_rate'] = 0

        return metrics

    def _collect_postgresql_metrics(self) -> Dict[str, Any]:
        """收集PostgreSQL指标"""
        metrics = {}

        # 连接数
        query = "SELECT count(*) FROM pg_stat_activity"
        result = self.db.execute_query(query)
        metrics['connections'] = result[0]['count'] if result else 0

        # 活跃连接数
        query = "SELECT count(*) FROM pg_stat_activity WHERE state = 'active'"
        result = self.db.execute_query(query)
        metrics['active_connections'] = result[0]['count'] if result else 0

        # 缓存命中率
        query = """
        SELECT
            sum(heap_blks_hit) / (sum(heap_blks_hit) + sum(heap_blks_read)) * 100 as cache_hit_rate
        FROM pg_statio_user_tables
        """
        result = self.db.execute_query(query)
        metrics['cache_hit_rate'] = result[0]['cache_hit_rate'] if result else 0

        return metrics

    def monitor_query(
        self,
        query: str,
        params: tuple = None,
    ) -> Dict[str, Any]:
        """
        监控查询执行

        Args:
            query: SQL查询
            params: 查询参数

        Returns:
            监控结果
        """
        start_time = time.time()

        try:
            result = self.db.execute_query(query, params)
            execution_time = time.time() - start_time

            # 记录指标
            metric = {
                'timestamp': datetime.now().isoformat(),
                'query': query,
                'execution_time': execution_time,
                'success': True,
                'rows_affected': len(result) if isinstance(result, list) else result,
            }

            self.metrics.append(metric)

            return {
                'result': result,
                'metric': metric,
            }

        except Exception as e:
            execution_time = time.time() - start_time

            metric = {
                'timestamp': datetime.now().isoformat(),
                'query': query,
                'execution_time': execution_time,
                'success': False,
                'error': str(e),
            }

            self.metrics.append(metric)

            return {
                'error': e,
                'metric': metric,
            }

    def get_performance_report(self) -> Dict[str, Any]:
        """
        获取性能报告

        Returns:
            性能报告
        """
        if not self.metrics:
            return {'error': '没有监控数据'}

        total_queries = len(self.metrics)
        successful_queries = sum(1 for m in self.metrics if m['success'])
        failed_queries = total_queries - successful_queries

        avg_execution_time = sum(m['execution_time'] for m in self.metrics) / total_queries
        max_execution_time = max(m['execution_time'] for m in self.metrics)

        slow_queries = [m for m in self.metrics if m['execution_time'] > 1.0]

        return {
            'total_queries': total_queries,
            'successful_queries': successful_queries,
            'failed_queries': failed_queries,
            'success_rate': successful_queries / total_queries * 100,
            'avg_execution_time': avg_execution_time,
            'max_execution_time': max_execution_time,
            'slow_queries': len(slow_queries),
            'slow_query_rate': len(slow_queries) / total_queries * 100,
        }

6. 主程序 (main.py)

Python
"""
主程序
"""
import argparse
import json
from datetime import datetime

from config import mysql_config, postgresql_config
from utils.db_connector import DatabaseConnector
from scripts.analyzer import SlowQueryAnalyzer
from scripts.indexer import IndexManager
from scripts.optimizer import QueryOptimizer
from scripts.monitor import PerformanceMonitor

def main():
    """主函数"""
    parser = argparse.ArgumentParser(description='数据库性能优化工具')
    parser.add_argument('--db-type', choices=['mysql', 'postgresql'], required=True)
    parser.add_argument('--action', choices=['analyze', 'index', 'optimize', 'monitor'], required=True)
    parser.add_argument('--table', help='表名')
    parser.add_argument('--query', help='SQL查询')
    parser.add_argument('--output', help='输出文件')

    args = parser.parse_args()

    # 创建数据库连接
    if args.db_type == 'mysql':
        db = DatabaseConnector('mysql', mysql_config)
    else:
        db = DatabaseConnector('postgresql', postgresql_config)

    db.connect()

    try:
        # 执行操作
        if args.action == 'analyze':
            analyzer = SlowQueryAnalyzer(db)
            results = analyzer.analyze_slow_queries()

            if args.output:
                with open(args.output, 'w') as f:  # with自动管理资源,确保文件正确关闭
                    json.dump(results, f, indent=2)  # json.dumps将Python对象转为JSON字符串
            else:
                print(json.dumps(results, indent=2))

        elif args.action == 'index':
            if not args.table:
                print("错误: 需要指定表名")
                return

            indexer = IndexManager(db)
            usage = indexer.analyze_index_usage(args.table)
            recommendations = indexer.recommend_indexes(args.table)

            result = {
                'table': args.table,
                'index_usage': usage,
                'recommendations': recommendations,
            }

            if args.output:
                with open(args.output, 'w') as f:
                    json.dump(result, f, indent=2)
            else:
                print(json.dumps(result, indent=2))

        elif args.action == 'optimize':
            if not args.query:
                print("错误: 需要指定查询")
                return

            optimizer = QueryOptimizer(db)
            result = optimizer.optimize_query(args.query)

            if args.output:
                with open(args.output, 'w') as f:
                    json.dump(result, f, indent=2)
            else:
                print(json.dumps(result, indent=2))

        elif args.action == 'monitor':
            monitor = PerformanceMonitor(db)
            metrics = monitor.collect_metrics()
            report = monitor.get_performance_report()

            result = {
                'metrics': metrics,
                'report': report,
                'timestamp': datetime.now().isoformat(),
            }

            if args.output:
                with open(args.output, 'w') as f:
                    json.dump(result, f, indent=2)
            else:
                print(json.dumps(result, indent=2))

    finally:
        db.disconnect()

if __name__ == "__main__":
    main()

🧪 测试方法

1. 单元测试

Python
"""
单元测试示例
"""
import pytest
from utils.db_connector import DatabaseConnector
from scripts.analyzer import SlowQueryAnalyzer
from config import mysql_config

def test_slow_query_analyzer():
    """测试慢查询分析器"""
    db = DatabaseConnector('mysql', mysql_config)
    db.connect()

    try:
        analyzer = SlowQueryAnalyzer(db)
        results = analyzer.analyze_slow_queries(threshold=0.5, limit=10)

        assert isinstance(results, list)  # isinstance检查对象类型
        assert len(results) <= 10

        if results:
            assert 'query' in results[0]
            assert 'issues' in results[0]
            assert 'recommendations' in results[0]

        print("✓ 慢查询分析器测试通过")

    finally:
        db.disconnect()

2. 集成测试

Python
"""
集成测试示例
"""
def test_optimization_pipeline():
    """测试优化流程"""
    from scripts.indexer import IndexManager
    from scripts.optimizer import QueryOptimizer

    db = DatabaseConnector('mysql', mysql_config)
    db.connect()

    try:  # try/except捕获异常
        # 分析索引使用
        indexer = IndexManager(db)
        usage = indexer.analyze_index_usage('users')

        # 优化查询
        optimizer = QueryOptimizer(db)
        query = "SELECT * FROM users WHERE status = 'active'"
        result = optimizer.optimize_query(query)

        # 验证结果
        assert 'optimized_query' in result  # assert断言:条件为False时抛出AssertionError
        assert 'optimizations' in result

        print("✓ 优化流程测试通过")

    finally:
        db.disconnect()

📊 扩展建议

1. 功能扩展

  • 自动索引: 自动创建推荐索引
  • 查询重写: 自动重写低效查询
  • 参数调优: 自动调整数据库参数
  • 容量规划: 预测容量需求

2. 性能优化

  • 分区表: 实现表分区
  • 读写分离: 实现读写分离
  • 连接池: 优化连接池配置
  • 批量操作: 优化批量操作

3. 监控增强

  • 实时告警: 实时性能告警
  • 趋势预测: 预测性能趋势
  • 异常检测: 自动检测异常
  • 可视化: 性能数据可视化

📚 学习收获

完成本项目后,你将掌握:

  • ✅ 数据库性能分析方法
  • ✅ 索引优化策略
  • ✅ 查询优化技巧
  • ✅ 缓存策略设计
  • ✅ 性能监控实现
  • ✅ 自动化优化工具
  • ✅ 完整的性能优化系统开发

🔗 参考资源


项目完成时间: 15-20小时 难度等级: ⭐⭐⭐⭐ 较难 推荐指数: ⭐⭐⭐⭐⭐