跳转至

10 - 性能基准测试

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

性能基准测试

全面评估和对比模型性能

📖 章节概述

本章将深入探讨性能基准测试的方法,包括基准测试、性能对比和优化建议等内容。这些技术可以帮助你全面评估和优化DeepSeek R1的性能。

🎯 学习目标

完成本章后,你将能够:

  • 掌握性能基准测试的方法
  • 了解性能对比的技巧
  • 实现自动化测试框架
  • 能够提出有针对性的优化建议

1. 基准测试概述

1.1 测试维度

性能维度: - 推理速度:tokens/秒 - 延迟:响应时间 - 吞吐量:并发处理能力 - 资源使用:GPU/CPU/内存

质量维度: - 准确性:任务完成度 - 一致性:输出稳定性 - 多样性:输出丰富度 - 相关性:内容相关性

1.2 测试环境

Python
import torch
import psutil
import platform
import subprocess

class TestEnvironment:
    """
    测试环境信息
    """
    def __init__(self):
        self.info = self._collect_info()

    def _collect_info(self):
        """
        收集环境信息
        """
        info = {
            "system": {
                "os": platform.system(),
                "os_version": platform.version(),
                "python_version": platform.python_version(),
                "cpu": platform.processor(),
                "cpu_count": psutil.cpu_count(),
                "memory_total": psutil.virtual_memory().total / 1e9
            },
            "gpu": self._get_gpu_info(),
            "pytorch": {
                "version": torch.__version__,
                "cuda_available": torch.cuda.is_available(),
                "cuda_version": torch.version.cuda if torch.cuda.is_available() else None
            }
        }

        return info

    def _get_gpu_info(self):
        """
        获取GPU信息
        """
        if not torch.cuda.is_available():
            return {"available": False}

        gpu_info = {"available": True, "gpus": []}

        for i in range(torch.cuda.device_count()):
            props = torch.cuda.get_device_properties(i)
            gpu_info["gpus"].append({
                "id": i,
                "name": props.name,
                "memory_total": props.total_memory / 1e9,
                "multi_processor_count": props.multi_processor_count
            })

        return gpu_info

    def print_info(self):
        """
        打印环境信息
        """
        print("=" * 50)
        print("测试环境信息")
        print("=" * 50)

        print("\n系统信息:")
        for key, value in self.info["system"].items():
            print(f"  {key}: {value}")

        print("\nGPU信息:")
        if self.info["gpu"]["available"]:
            for gpu in self.info["gpu"]["gpus"]:
                print(f"  GPU {gpu['id']}: {gpu['name']}")
                print(f"    内存: {gpu['memory_total']:.2f} GB")
                print(f"    多处理器数: {gpu['multi_processor_count']}")
        else:
            print("  不可用")

        print("\nPyTorch信息:")
        for key, value in self.info["pytorch"].items():
            print(f"  {key}: {value}")

        print("=" * 50)

# 使用示例
# env = TestEnvironment()
# env.print_info()

2. 基准测试

2.1 推理速度测试

Python
import time
import numpy as np

class InferenceSpeedBenchmark:
    """
    推理速度基准测试
    """
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer

    def benchmark_inference_speed(self, prompts, params, warmup_runs=3,
                                 test_runs=10):
        """
        基准测试推理速度

        Args:
            prompts: 测试提示词列表
            params: 生成参数
            warmup_runs: 预热运行次数
            test_runs: 测试运行次数
        """
        results = {
            "latencies": [],
            "tokens_per_second": [],
            "total_tokens": []
        }

        # 预热
        print("预热中...")
        if len(prompts) == 0:
            raise ValueError("prompts列表不能为空")
        for i in range(warmup_runs):
            prompt = prompts[i % len(prompts)]  # 使用取模避免索引越界
            input_ids = self.tokenizer.encode(prompt, return_tensors="pt")
            _ = self.model.generate(input_ids, **params)

        # 测试
        print("测试中...")
        for i in range(test_runs):
            prompt = prompts[i % len(prompts)]
            input_ids = self.tokenizer.encode(prompt, return_tensors="pt")
            input_length = input_ids.shape[1]

            # 记录时间
            start_time = time.time()
            outputs = self.model.generate(input_ids, **params)
            end_time = time.time()

            # 计算指标
            latency = end_time - start_time
            output_length = outputs.shape[1]
            generated_tokens = output_length - input_length
            tokens_per_second = generated_tokens / latency

            results["latencies"].append(latency)
            results["tokens_per_second"].append(tokens_per_second)
            results["total_tokens"].append(generated_tokens)

            print(f"Run {i+1}: {latency:.4f}s, {tokens_per_second:.2f} tokens/s")

        # 计算统计信息
        stats = self._calculate_stats(results)

        return stats

    def _calculate_stats(self, results):
        """
        计算统计信息
        """
        stats = {}

        for key, values in results.items():
            stats[key] = {
                "mean": np.mean(values),
                "std": np.std(values),
                "min": np.min(values),
                "max": np.max(values),
                "median": np.median(values),
                "p95": np.percentile(values, 95),
                "p99": np.percentile(values, 99)
            }

        return stats

    def print_stats(self, stats):
        """
        打印统计信息
        """
        print("\n" + "=" * 50)
        print("推理速度统计")
        print("=" * 50)

        for metric, values in stats.items():
            print(f"\n{metric}:")
            print(f"  平均: {values['mean']:.4f}")
            print(f"  标准差: {values['std']:.4f}")
            print(f"  最小: {values['min']:.4f}")
            print(f"  最大: {values['max']:.4f}")
            print(f"  中位数: {values['median']:.4f}")
            print(f"  P95: {values['p95']:.4f}")
            print(f"  P99: {values['p99']:.4f}")

        print("=" * 50)

# 使用示例
# benchmark = InferenceSpeedBenchmark(model, tokenizer)
# prompts = ["请介绍一下人工智能", "什么是机器学习", "深度学习的应用"]
# params = {"max_length": 100, "do_sample": True, "temperature": 0.7}
# stats = benchmark.benchmark_inference_speed(prompts, params)
# benchmark.print_stats(stats)

2.2 吞吐量测试

Python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import numpy as np

class ThroughputBenchmark:
    """
    吞吐量基准测试
    """
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer

    def benchmark_throughput(self, prompts, params, num_requests=100,
                           concurrency=10):
        """
        基准测试吞吐量

        Args:
            prompts: 测试提示词列表
            params: 生成参数
            num_requests: 请求数量
            concurrency: 并发数
        """
        results = {
            "latencies": [],
            "errors": []
        }

        def generate_text(prompt):
            """
            生成文本
            """
            try:  # try/except捕获异常
                start_time = time.time()
                input_ids = self.tokenizer.encode(prompt, return_tensors="pt")
                outputs = self.model.generate(input_ids, **params)
                latency = time.time() - start_time
                return {"latency": latency, "success": True}
            except Exception as e:
                return {"latency": 0, "success": False, "error": str(e)}

        # 并发执行
        start_time = time.time()

        with ThreadPoolExecutor(max_workers=concurrency) as executor:
            futures = []
            for i in range(num_requests):
                prompt = prompts[i % len(prompts)]
                future = executor.submit(generate_text, prompt)
                futures.append(future)

            for future in as_completed(futures):
                result = future.result()
                if result["success"]:
                    results["latencies"].append(result["latency"])
                else:
                    results["errors"].append(result["error"])

        total_time = time.time() - start_time

        # 计算统计信息
        stats = {
            "total_requests": num_requests,
            "successful_requests": len(results["latencies"]),
            "failed_requests": len(results["errors"]),
            "total_time": total_time,
            "throughput": num_requests / total_time,
            "avg_latency": np.mean(results["latencies"]) if results["latencies"] else 0,
            "p50_latency": np.percentile(results["latencies"], 50) if results["latencies"] else 0,
            "p95_latency": np.percentile(results["latencies"], 95) if results["latencies"] else 0,
            "p99_latency": np.percentile(results["latencies"], 99) if results["latencies"] else 0,
            "error_rate": len(results["errors"]) / num_requests
        }

        return stats

    def print_stats(self, stats):
        """
        打印统计信息
        """
        print("\n" + "=" * 50)
        print("吞吐量统计")
        print("=" * 50)

        print(f"\n总请求数: {stats['total_requests']}")
        print(f"成功请求数: {stats['successful_requests']}")
        print(f"失败请求数: {stats['failed_requests']}")
        print(f"总时间: {stats['total_time']:.4f}s")
        print(f"吞吐量: {stats['throughput']:.2f} requests/s")
        print(f"平均延迟: {stats['avg_latency']:.4f}s")
        print(f"P50延迟: {stats['p50_latency']:.4f}s")
        print(f"P95延迟: {stats['p95_latency']:.4f}s")
        print(f"P99延迟: {stats['p99_latency']:.4f}s")
        print(f"错误率: {stats['error_rate']:.2%}")

        print("=" * 50)

# 使用示例
# benchmark = ThroughputBenchmark(model, tokenizer)
# prompts = ["请介绍一下人工智能", "什么是机器学习", "深度学习的应用"]
# params = {"max_length": 100, "do_sample": True, "temperature": 0.7}
# stats = benchmark.benchmark_throughput(prompts, params, num_requests=100, concurrency=10)
# benchmark.print_stats(stats)

2.3 资源使用测试

Python
import torch
import psutil
import time
import numpy as np

class ResourceUsageBenchmark:
    """
    资源使用基准测试
    """
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer

    def benchmark_resource_usage(self, prompts, params,
                                sample_interval=0.1):
        """
        基准测试资源使用

        Args:
            prompts: 测试提示词列表
            params: 生成参数
            sample_interval: 采样间隔(秒)
        """
        results = {
            "gpu_memory": [],
            "cpu_usage": [],
            "timestamps": []
        }

        for i, prompt in enumerate(prompts):  # enumerate同时获取索引和元素
            input_ids = self.tokenizer.encode(prompt, return_tensors="pt")

            # 记录资源使用
            start_time = time.time()

            while True:
                # 记录GPU内存
                if torch.cuda.is_available():
                    gpu_memory = torch.cuda.memory_allocated() / 1e9
                else:
                    gpu_memory = 0

                # 记录CPU使用率
                cpu_usage = psutil.cpu_percent()

                # 记录时间戳
                timestamp = time.time() - start_time

                results["gpu_memory"].append(gpu_memory)
                results["cpu_usage"].append(cpu_usage)
                results["timestamps"].append(timestamp)

                # 生成文本
                outputs = self.model.generate(input_ids, **params)

                # 检查是否完成
                if outputs.shape[1] > input_ids.shape[1]:
                    break

                time.sleep(sample_interval)

        # 计算统计信息
        stats = {
            "gpu_memory": {
                "max": max(results["gpu_memory"]),
                "min": min(results["gpu_memory"]),
                "mean": np.mean(results["gpu_memory"]),
                "std": np.std(results["gpu_memory"])
            },
            "cpu_usage": {
                "max": max(results["cpu_usage"]),
                "min": min(results["cpu_usage"]),
                "mean": np.mean(results["cpu_usage"]),
                "std": np.std(results["cpu_usage"])
            },
            "duration": max(results["timestamps"])
        }

        return stats, results

    def print_stats(self, stats):
        """
        打印统计信息
        """
        print("\n" + "=" * 50)
        print("资源使用统计")
        print("=" * 50)

        print("\nGPU内存使用:")
        print(f"  最大: {stats['gpu_memory']['max']:.2f} GB")
        print(f"  最小: {stats['gpu_memory']['min']:.2f} GB")
        print(f"  平均: {stats['gpu_memory']['mean']:.2f} GB")
        print(f"  标准差: {stats['gpu_memory']['std']:.2f} GB")

        print("\nCPU使用率:")
        print(f"  最大: {stats['cpu_usage']['max']:.2f}%")
        print(f"  最小: {stats['cpu_usage']['min']:.2f}%")
        print(f"  平均: {stats['cpu_usage']['mean']:.2f}%")
        print(f"  标准差: {stats['cpu_usage']['std']:.2f}%")

        print(f"\n总时长: {stats['duration']:.4f}s")

        print("=" * 50)

# 使用示例
# benchmark = ResourceUsageBenchmark(model, tokenizer)
# prompts = ["请介绍一下人工智能", "什么是机器学习", "深度学习的应用"]
# params = {"max_length": 100, "do_sample": True, "temperature": 0.7}
# stats, detailed_results = benchmark.benchmark_resource_usage(prompts, params)
# benchmark.print_stats(stats)

3. 性能对比

3.1 模型对比

Python
import pandas as pd
import matplotlib.pyplot as plt

class ModelComparator:
    """
    模型对比器
    """
    def __init__(self):
        self.results = {}

    def add_model_result(self, model_name, stats):
        """
        添加模型结果

        Args:
            model_name: 模型名称
            stats: 统计信息
        """
        self.results[model_name] = stats

    def compare_models(self):
        """
        对比模型
        """
        # 创建对比表格
        comparison_data = []

        for model_name, stats in self.results.items():
            comparison_data.append({
                "Model": model_name,
                "Avg Latency (s)": stats.get("avg_latency", 0),
                "P95 Latency (s)": stats.get("p95_latency", 0),
                "Throughput (req/s)": stats.get("throughput", 0),
                "Max GPU Memory (GB)": stats.get("gpu_memory", {}).get("max", 0),
                "Error Rate (%)": stats.get("error_rate", 0) * 100
            })

        df = pd.DataFrame(comparison_data)

        return df

    def plot_comparison(self, save_path=None):
        """
        绘制对比图
        """
        df = self.compare_models()

        fig, axes = plt.subplots(2, 2, figsize=(15, 10))

        # 延迟对比
        df.plot(x="Model", y=["Avg Latency (s)", "P95 Latency (s)"],
                kind="bar", ax=axes[0, 0])
        axes[0, 0].set_title("Latency Comparison")
        axes[0, 0].set_ylabel("Latency (s)")

        # 吞吐量对比
        df.plot(x="Model", y="Throughput (req/s)", kind="bar", ax=axes[0, 1])
        axes[0, 1].set_title("Throughput Comparison")
        axes[0, 1].set_ylabel("Throughput (req/s)")

        # GPU内存对比
        df.plot(x="Model", y="Max GPU Memory (GB)", kind="bar", ax=axes[1, 0])
        axes[1, 0].set_title("GPU Memory Comparison")
        axes[1, 0].set_ylabel("GPU Memory (GB)")

        # 错误率对比
        df.plot(x="Model", y="Error Rate (%)", kind="bar", ax=axes[1, 1])
        axes[1, 1].set_title("Error Rate Comparison")
        axes[1, 1].set_ylabel("Error Rate (%)")

        plt.tight_layout()

        if save_path:
            plt.savefig(save_path)

        plt.show()

    def generate_report(self):
        """
        生成对比报告
        """
        df = self.compare_models()

        report = "=" * 50
        report += "\n模型性能对比报告\n"
        report += "=" * 50

        report += "\n\n性能对比表格:\n"
        report += df.to_string(index=False)

        # 找出最佳模型
        best_throughput = df.loc[df["Throughput (req/s)"].idxmax()]
        best_latency = df.loc[df["Avg Latency (s)"].idxmin()]
        best_memory = df.loc[df["Max GPU Memory (GB)"].idxmin()]

        report += "\n\n最佳性能模型:\n"
        report += f"  最高吞吐量: {best_throughput['Model']} ({best_throughput['Throughput (req/s)']:.2f} req/s)\n"
        report += f"  最低延迟: {best_latency['Model']} ({best_latency['Avg Latency (s)']:.4f}s)\n"
        report += f"  最低内存: {best_memory['Model']} ({best_memory['Max GPU Memory (GB)']:.2f} GB)\n"

        report += "\n" + "=" * 50

        return report

# 使用示例
# comparator = ModelComparator()
# comparator.add_model_result("Model A", stats_a)
# comparator.add_model_result("Model B", stats_b)
# df = comparator.compare_models()
# comparator.plot_comparison(save_path="comparison.png")
# report = comparator.generate_report()
# print(report)

3.2 参数对比

Python
class ParameterComparator:
    """
    参数对比器
    """
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
        self.results = {}

    def compare_parameters(self, prompts, param_configs):
        """
        对比不同参数配置

        Args:
            prompts: 测试提示词列表
            param_configs: 参数配置列表
        """
        for config_name, params in param_configs.items():
            print(f"\n测试配置: {config_name}")

            # 运行基准测试
            benchmark = InferenceSpeedBenchmark(self.model, self.tokenizer)
            stats = benchmark.benchmark_inference_speed(prompts, params)

            self.results[config_name] = stats

        return self.results

    def plot_parameter_comparison(self, metric="tokens_per_second",
                                save_path=None):
        """
        绘制参数对比图
        """
        config_names = list(self.results.keys())
        values = [self.results[name][metric]["mean"] for name in config_names]
        errors = [self.results[name][metric]["std"] for name in config_names]

        plt.figure(figsize=(10, 6))
        plt.bar(config_names, values, yerr=errors, capsize=5)
        plt.xlabel("Configuration")
        plt.ylabel(metric.replace("_", " ").title())
        plt.title("Parameter Configuration Comparison")
        plt.xticks(rotation=45)
        plt.tight_layout()

        if save_path:
            plt.savefig(save_path)

        plt.show()

# 使用示例
# comparator = ParameterComparator(model, tokenizer)
# param_configs = {
#     "Low Temperature": {"max_length": 100, "temperature": 0.3, "do_sample": True},
#     "Medium Temperature": {"max_length": 100, "temperature": 0.7, "do_sample": True},
#     "High Temperature": {"max_length": 100, "temperature": 0.9, "do_sample": True}
# }
# results = comparator.compare_parameters(prompts, param_configs)
# comparator.plot_parameter_comparison("tokens_per_second")

4. 优化建议

4.1 性能分析

Python
class PerformanceAnalyzer:
    """
    性能分析器
    """
    def __init__(self, stats):
        self.stats = stats

    def analyze_bottlenecks(self):
        """
        分析性能瓶颈
        """
        bottlenecks = []

        # 分析延迟
        if self.stats["avg_latency"] > 2.0:
            bottlenecks.append({
                "type": "high_latency",
                "severity": "high",
                "description": "平均延迟过高",
                "suggestion": "考虑使用量化、剪枝或批处理优化"
            })

        # 分析吞吐量
        if self.stats["throughput"] < 10:
            bottlenecks.append({
                "type": "low_throughput",
                "severity": "high",
                "description": "吞吐量过低",
                "suggestion": "增加并发数或使用分布式推理"
            })

        # 分析GPU内存
        if self.stats["gpu_memory"]["max"] > 20:
            bottlenecks.append({
                "type": "high_memory",
                "severity": "medium",
                "description": "GPU内存使用过高",
                "suggestion": "使用INT4/INT8量化或梯度检查点"
            })

        # 分析错误率
        if self.stats["error_rate"] > 0.05:
            bottlenecks.append({
                "type": "high_error_rate",
                "severity": "high",
                "description": "错误率过高",
                "suggestion": "检查模型配置和输入数据"
            })

        return bottlenecks

    def generate_optimization_suggestions(self):
        """
        生成优化建议
        """
        suggestions = []

        # 量化建议
        if self.stats["gpu_memory"]["max"] > 15:
            suggestions.append({
                "category": "量化",
                "priority": "high",
                "suggestion": "使用INT4或INT8量化减少内存占用",
                "expected_improvement": "内存使用减少50-75%"
            })

        # 批处理建议
        if self.stats["throughput"] < 20:
            suggestions.append({
                "category": "批处理",
                "priority": "high",
                "suggestion": "增加批处理大小以提高吞吐量",
                "expected_improvement": "吞吐量提升2-4倍"
            })

        # 缓存建议
        if self.stats["avg_latency"] > 1.0:
            suggestions.append({
                "category": "缓存",
                "priority": "medium",
                "suggestion": "实现KV缓存或结果缓存",
                "expected_improvement": "延迟降低30-50%"
            })

        # 分布式建议
        if self.stats["throughput"] < 10:
            suggestions.append({
                "category": "分布式",
                "priority": "medium",
                "suggestion": "使用模型并行或数据并行",
                "expected_improvement": "吞吐量提升2-8倍"
            })

        return suggestions

    def generate_report(self):
        """
        生成分析报告
        """
        bottlenecks = self.analyze_bottlenecks()
        suggestions = self.generate_optimization_suggestions()

        report = "=" * 50
        report += "\n性能分析报告\n"
        report += "=" * 50

        report += "\n\n性能瓶颈:\n"
        for i, bottleneck in enumerate(bottlenecks, 1):
            report += f"\n{i}. {bottleneck['description']}\n"
            report += f"   严重程度: {bottleneck['severity']}\n"
            report += f"   建议: {bottleneck['suggestion']}\n"

        report += "\n\n优化建议:\n"
        for i, suggestion in enumerate(suggestions, 1):
            report += f"\n{i}. {suggestion['category']}优化\n"
            report += f"   优先级: {suggestion['priority']}\n"
            report += f"   建议: {suggestion['suggestion']}\n"
            report += f"   预期改进: {suggestion['expected_improvement']}\n"

        report += "\n" + "=" * 50

        return report

# 使用示例
# analyzer = PerformanceAnalyzer(stats)
# bottlenecks = analyzer.analyze_bottlenecks()
# suggestions = analyzer.generate_optimization_suggestions()
# report = analyzer.generate_report()
# print(report)

5. 练习题

基础练习

  1. 实现推理速度测试

    Python
    # TODO: 实现推理速度测试
    class InferenceSpeedTest:
        def __init__(self, model, tokenizer):
            # 你的代码
            pass
    
        def test(self, prompts, params):
            # 你的代码
            pass
    

  2. 实现吞吐量测试

    Python
    # TODO: 实现吞吐量测试
    class ThroughputTest:
        def __init__(self, model, tokenizer):
            # 你的代码
            pass
    
        def test(self, prompts, params, num_requests):
            # 你的代码
            pass
    

进阶练习

  1. 实现性能对比

    Python
    # TODO: 实现性能对比
    class PerformanceComparator:
        def __init__(self):
            # 你的代码
            pass
    
        def compare(self, models, test_cases):
            # 你的代码
            pass
    
        def plot_comparison(self, results):
            # 你的代码
            pass
    

  2. 实现优化建议

    Python
    # TODO: 实现优化建议生成器
    class OptimizationSuggester:
        def __init__(self, stats):
            # 你的代码
            pass
    
        def analyze(self):
            # 你的代码
            pass
    
        def suggest(self):
            # 你的代码
            pass
    

项目练习

  1. 创建完整的基准测试框架
  2. 支持多种测试类型
  3. 自动生成报告
  4. 可视化对比结果

6. 最佳实践

✅ 推荐做法

  1. 全面测试
  2. 测试多个维度
  3. 使用多样化数据
  4. 重复测试确保稳定性

  5. 准确记录

  6. 记录环境信息
  7. 保存测试数据
  8. 文档化测试过程

  9. 持续监控

  10. 定期运行测试
  11. 监控性能变化
  12. 及时发现问题

❌ 避免做法

  1. 单一测试
  2. 不要只测试一个场景
  3. 考虑多种使用情况
  4. 测试边界条件

  5. 忽略环境

  6. 记录测试环境
  7. 确保环境一致
  8. 控制变量

  9. 过度优化

  10. 根据实际需求优化
  11. 考虑成本效益
  12. 平衡性能和质量

7. 总结

本章介绍了性能基准测试的核心技术:

  • 基准测试: 推理速度、吞吐量、资源使用
  • 性能对比: 模型对比、参数对比
  • 优化建议: 瓶颈分析、优化建议

这些技术可以帮助你全面评估和优化DeepSeek R1的性能。

8. 下一步

继续学习11-实际应用案例,了解DeepSeek R1的实际应用和最佳实践。