跳转至

第14章:性能优化

性能优化

14.1 性能优化概述

性能优化的目标

  1. 降低延迟:减少请求响应时间
  2. 提高吞吐:增加系统处理能力
  3. 降低资源消耗:减少CPU、内存、网络使用
  4. 提高用户体验:提升用户使用体验

性能优化的原则

  1. 先测量后优化:不要过早优化
  2. 找到瓶颈:优化真正的瓶颈
  3. 权衡利弊:考虑优化的代价
  4. 持续优化:性能优化是持续的过程

14.2 性能分析

14.2.1 性能指标

响应时间

Python
import time
import functools

def measure_response_time(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):  # *args接收任意位置参数;**kwargs接收任意关键字参数
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        response_time = end_time - start_time
        print(f"{func.__name__} response time: {response_time:.3f}s")
        return result
    return wrapper

@measure_response_time
def process_data(data):
    # 处理数据
    time.sleep(1)
    return data

process_data([1, 2, 3])

吞吐量

Python
import time
from threading import Thread

def measure_throughput(func, num_requests):
    start_time = time.time()

    threads = []
    for i in range(num_requests):
        t = Thread(target=func)
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    end_time = time.time()
    throughput = num_requests / (end_time - start_time)
    print(f"Throughput: {throughput:.2f} requests/second")

def handle_request():
    time.sleep(0.1)

measure_throughput(handle_request, 100)

资源使用率

Python
import psutil

def measure_resource_usage():
    # CPU使用率
    cpu_percent = psutil.cpu_percent(interval=1)
    print(f"CPU usage: {cpu_percent}%")

    # 内存使用率
    memory = psutil.virtual_memory()
    print(f"Memory usage: {memory.percent}%")

    # 磁盘使用率
    disk = psutil.disk_usage('/')
    print(f"Disk usage: {disk.percent}%")

    # 网络IO
    network = psutil.net_io_counters()
    print(f"Network sent: {network.bytes_sent}")
    print(f"Network received: {network.bytes_recv}")

measure_resource_usage()

14.2.2 性能分析工具

cProfile

Python
import cProfile

def process_data(data):
    result = []
    for item in data:
        result.append(item * 2)
    return result

# 使用cProfile分析性能
cProfile.run('process_data(range(10000))')

memory_profiler

Python
from memory_profiler import profile

@profile
def process_data(data):
    result = []
    for item in data:
        result.append(item * 2)
    return result

process_data(range(10000))

Py-Spy

Bash
# 安装py-spy
pip install py-spy

# 监控Python进程
py-spy top --pid <PID>

# 生成火焰图
py-spy record -o profile.svg --pid <PID>

14.3 代码优化

14.3.1 算法优化

Python
# 优化前:O(n^2)
def find_duplicates_slow(arr):
    duplicates = []
    for i in range(len(arr)):
        for j in range(i + 1, len(arr)):
            if arr[i] == arr[j] and arr[i] not in duplicates:
                duplicates.append(arr[i])
    return duplicates

# 优化后:O(n)
def find_duplicates_fast(arr):
    seen = set()
    duplicates = set()
    for item in arr:
        if item in seen:
            duplicates.add(item)
        else:
            seen.add(item)
    return list(duplicates)

14.3.2 数据结构优化

Python
# 优化前:使用列表查找
def find_user(users, user_id):
    for user in users:
        if user['id'] == user_id:
            return user
    return None

# 优化后:使用字典查找
def find_user_optimized(users_dict, user_id):
    return users_dict.get(user_id)

14.3.3 缓存优化

Python
from functools import lru_cache

# 使用LRU缓存
@lru_cache(maxsize=128)
def fibonacci(n):
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

# 计算斐波那契数列
print(fibonacci(100))

14.4 数据库优化

14.4.1 查询优化

SQL
-- 优化前:使用SELECT *
SELECT * FROM users WHERE email = 'test@example.com';

-- 优化后:只查询需要的字段
SELECT id, name, email FROM users WHERE email = 'test@example.com';

-- 优化前:使用函数
SELECT * FROM users WHERE YEAR(created_at) = 2024;

-- 优化后:使用范围查询
SELECT * FROM users
WHERE created_at >= '2024-01-01' AND created_at < '2025-01-01';

14.4.2 索引优化

SQL
-- 创建索引
CREATE INDEX idx_user_email ON users(email);  -- INDEX索引加速查询
CREATE INDEX idx_order_user_id ON orders(user_id);
CREATE INDEX idx_order_created_at ON orders(created_at);

-- 使用索引
EXPLAIN SELECT * FROM users WHERE email = 'test@example.com';  -- EXPLAIN查看查询执行计划

14.4.3 连接优化

Python
# 使用连接池
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    'mysql+pymysql://user:password@localhost/db',
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=3600
)

14.5 缓存优化

14.5.1 多级缓存

Python
import redis

# L1缓存:内存缓存(使用字典)
l1_cache = {}

def get_from_l1(key):
    return l1_cache.get(key)

# L2缓存:Redis
r = redis.Redis(host='localhost', port=6379, db=0)

def get_from_l2(key):
    return r.get(key)

# 多级缓存
def get_data(key):
    # 先查L1缓存
    data = get_from_l1(key)
    if data:
        return data

    # 再查L2缓存
    data = get_from_l2(key)
    if data:
        # 回填L1缓存
        l1_cache[key] = data
        return data

    # 最后查数据库
    data = query_from_database(key)

    # 回填L1和L2缓存
    l1_cache[key] = data
    r.setex(key, 3600, data)

    return data

14.5.2 缓存预热

Python
def warm_up_cache():
    # 预加载热点数据
    hot_keys = get_hot_keys()

    for key in hot_keys:
        data = query_from_database(key)
        r.setex(key, 3600, data)

    print(f"Warmed up {len(hot_keys)} cache entries")

warm_up_cache()

14.6 并发优化

14.6.1 多线程

Python
import threading  # 线程池/多线程:并发执行任务
import time

def process_task(task_id):
    print(f"Processing task {task_id}")
    time.sleep(1)
    print(f"Task {task_id} completed")

# 创建多个线程
threads = []
for i in range(5):
    t = threading.Thread(target=process_task, args=(i,))
    t.start()
    threads.append(t)

# 等待所有线程完成
for t in threads:
    t.join()

14.6.2 多进程

Python
import multiprocessing
import time

def process_task(task_id):
    print(f"Processing task {task_id}")
    time.sleep(1)
    print(f"Task {task_id} completed")

# 创建多个进程
processes = []
for i in range(5):
    p = multiprocessing.Process(target=process_task, args=(i,))
    p.start()
    processes.append(p)

# 等待所有进程完成
for p in processes:
    p.join()

14.6.3 异步IO

Python
import asyncio
import aiohttp

async def fetch_url(url):  # async def定义异步函数;用await调用
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()  # await等待异步操作完成

async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net'
    ]

    # 并发请求
    tasks = [fetch_url(url) for url in urls]
    results = await asyncio.gather(*tasks)

    return results

# 运行异步任务
results = asyncio.run(main())  # asyncio.run()启动异步事件循环

14.7 压力测试

14.7.1 Locust

Python
from locust import HttpUser, task, between

class WebsiteUser(HttpUser):
    wait_time = between(1, 5)

    @task
    def index(self):
        self.client.get("/")

    @task(3)
    def about(self):
        self.client.get("/about")

14.7.2 JMeter

JMeter是一个开源的压力测试工具,支持多种协议。

14.7.3 Apache Bench

Bash
# 安装Apache Bench
sudo apt-get install apache2-utils

# 压力测试
ab -n 1000 -c 100 http://example.com/

14.8 实战练习

练习1:优化一个慢查询

优化一个慢查询: 1. 分析查询计划 2. 添加索引 3. 优化查询语句 4. 验证优化效果

练习2:优化一个API接口

优化一个API接口: 1. 分析性能瓶颈 2. 优化代码 3. 添加缓存 4. 压力测试

练习3:优化一个系统

优化一个系统: 1. 分析系统性能 2. 优化数据库 3. 优化缓存 4. 优化并发

14.9 面试准备

常见面试题

  1. 如何进行性能分析?
  2. 如何优化数据库查询?
  3. 如何优化缓存?
  4. 多线程和多进程的区别?
  5. 如何进行压力测试?

项目经验准备

准备一个性能优化项目: - 优化前的问题 - 优化方案 - 优化效果 - 经验总结

14.10 总结

本章介绍了性能优化,包括性能分析、代码优化、数据库优化、缓存优化、并发优化和压力测试。性能优化是提升系统质量的重要手段。

关键要点

  1. 性能优化需要先测量后优化
  2. 性能分析包括响应时间、吞吐量、资源使用率
  3. 代码优化包括算法优化、数据结构优化、缓存优化
  4. 数据库优化包括查询优化、索引优化、连接优化
  5. 并发优化包括多线程、多进程、异步IO

下一步

下一章将深入学习高可用架构,包括容灾备份、多活架构等内容。