🔧 LLM工程化实践¶
学习目标:掌握LLM部署特殊挑战、推理优化、量化部署、Prompt管理、评估体系和LLMOps全流程 预计时长:15-18小时 前置知识:LLM基础知识、模型部署基础(第2章)、Docker基础
📋 本章概览¶
大语言模型(LLM)的工程化与传统ML模型部署有本质区别——模型规模大(7B-70B+参数)、推理成本高、延迟要求特殊(流式输出)。本章专注LLM的部署优化、量化压缩、Prompt管理和完整的LLMOps流水线。
LLMOps全流程:
数据准备 → 微调训练 → 评估验证 → 量化压缩 → 部署上线 → 监控优化
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
数据清洗 SFT/RLHF 自动评估 GPTQ/AWQ vLLM/TGI 成本监控
数据标注 LoRA/QLoRA 人工评估 GGUF Triton 质量监控
版本管理 实验跟踪 Red Team INT4/INT8 K8s 漂移检测
上图展示了LLM AI应用的框架,描绘了从数据收集和处理,到语言模型的微调,再到通过移动和Web应用进行用户交互的完整流程。在LLM工程化中,需要考虑模型版本管理、强化学习和API集成等关键组件。
一、LLM部署特殊挑战¶
1.1 挑战分析¶
"""
LLM部署核心挑战量化分析
"""
import math
def estimate_llm_resources(
num_params_billion: float,
precision: str = "fp16",
batch_size: int = 1,
seq_len: int = 2048,
kv_cache: bool = True
) -> dict:
"""
估算LLM部署资源需求
Args:
num_params_billion: 模型参数量(十亿)
precision: 精度 fp32/fp16/int8/int4
batch_size: 批大小
seq_len: 序列长度
kv_cache: 是否使用KV Cache
"""
precision_bytes = {
"fp32": 4, "fp16": 2, "bf16": 2, "int8": 1, "int4": 0.5
}
bytes_per_param = precision_bytes[precision]
num_params = num_params_billion * 1e9
# 模型权重显存
model_memory_gb = num_params * bytes_per_param / (1024 ** 3)
# KV Cache显存估算(简化计算)
# 每层 2 * hidden_dim * seq_len * batch_size * bytes_per_param
# 假设 hidden_dim ≈ sqrt(num_params / num_layers) 的关系
num_layers = int(num_params_billion * 4) # 粗略估计
hidden_dim = int(math.sqrt(num_params / num_layers))
if kv_cache:
kv_memory_gb = (
2 * num_layers * hidden_dim * seq_len * batch_size * 2 # KV Cache用fp16
) / (1024 ** 3)
else:
kv_memory_gb = 0
# 激活值显存(推理时较小)
activation_memory_gb = model_memory_gb * 0.05 * batch_size
total_memory_gb = model_memory_gb + kv_memory_gb + activation_memory_gb
# GPU选型建议
gpu_suggestions = []
if total_memory_gb <= 16:
gpu_suggestions.append("RTX 4080 (16GB)")
if total_memory_gb <= 24:
gpu_suggestions.append("RTX 4090 (24GB)")
if total_memory_gb <= 48:
gpu_suggestions.append("A6000 (48GB)")
if total_memory_gb <= 80:
gpu_suggestions.append("A100 80GB")
if total_memory_gb <= 96:
gpu_suggestions.append("H100 (80GB + NVLink)")
if total_memory_gb > 96:
num_gpus = math.ceil(total_memory_gb / 80)
gpu_suggestions.append(f"{num_gpus}x A100 80GB (Tensor Parallel)")
return {
"模型参数量": f"{num_params_billion}B",
"精度": precision,
"批大小": batch_size,
"序列长度": seq_len,
"模型权重显存": f"{model_memory_gb:.1f} GB",
"KV Cache显存": f"{kv_memory_gb:.1f} GB",
"激活值显存": f"{activation_memory_gb:.1f} GB",
"总显存需求": f"{total_memory_gb:.1f} GB",
"GPU建议": gpu_suggestions
}
# 常见模型资源估算
models = [
("Qwen2.5-7B", 7, "fp16"),
("Qwen2.5-7B", 7, "int4"),
("Llama-3.1-70B", 70, "fp16"),
("Llama-3.1-70B", 70, "int4"),
("DeepSeek-V3", 671, "fp16"), # MoE模型,激活参数少
]
for name, params, precision in models:
result = estimate_llm_resources(params, precision, batch_size=1, seq_len=4096)
print(f"\n{'='*50}")
print(f"模型: {name} ({precision})")
for k, v in result.items():
print(f" {k}: {v}")
1.2 推理优化核心技术¶
"""
LLM推理关键优化技术
"""
class LLMInferenceOptimizations:
"""LLM推理优化技术总结"""
@staticmethod # @staticmethod静态方法,不需要实例
def kv_cache():
"""KV Cache - 避免自回归推理的重复计算"""
explanation = """
原理:
- Transformer解码时,每生成一个token需要计算所有历史token的attention
- KV Cache缓存已计算的Key和Value,新token只需计算增量
效果:
- 将 O(n²) 的计算降为 O(n)
- 以空间换时间
挑战:
- 长序列KV Cache显存占用大
- 需要高效的内存管理
"""
return explanation
@staticmethod
def paged_attention():
"""PagedAttention (vLLM核心) - 高效KV Cache管理"""
explanation = """
原理(来自OS虚拟内存管理思想):
- 将KV Cache分成固定大小的Page(块)
- 使用页表(Page Table)管理非连续内存
- 不同请求可以共享相同prefix的KV Cache页
优点:
- 减少内存碎片,KV Cache利用率近100%
- 支持Copy-on-Write,节省shared prefix的内存
- 比HuggingFace实现高3-5x吞吐量
"""
return explanation
@staticmethod
def continuous_batching():
"""Continuous Batching - 持续批处理"""
explanation = """
传统Static Batching:
- 一批请求必须等所有请求完成才能返回
- 短请求被长请求拖慢
Continuous Batching:
- 请求完成后立即返回,新请求随时加入
- 每个iteration级别调度
- 充分利用GPU计算资源
效果:吞吐量提升2-10x
"""
return explanation
@staticmethod
def speculative_decoding():
"""Speculative Decoding - 投机采样"""
explanation = """
原理:
- 用一个小模型(draft model)快速生成k个候选token
- 用大模型(target model)并行验证这k个token
- 接受正确的token,拒绝错误的
效果:
- 不影响输出质量(数学上等价)
- 可加速2-3x(取决于draft model的accept rate)
适用:大模型推理延迟敏感的场景
"""
return explanation
# 打印所有优化技术
opts = LLMInferenceOptimizations()
for method_name in ["kv_cache", "paged_attention", "continuous_batching", "speculative_decoding"]:
# getattr通过字符串名动态获取方法引用,等价于opts.kv_cache等,实现遍历调用多个方法而无需硬编码
method = getattr(opts, method_name) # hasattr/getattr/setattr动态操作对象属性
print(f"\n{'='*50}")
print(f"📌 {method_name}")
print(method())
二、vLLM / TGI / Ollama 部署实践¶
2.1 vLLM部署¶
"""
vLLM - 高吞吐量LLM推理引擎
核心特性:PagedAttention、Continuous Batching、Tensor Parallel
"""
# ===== 1. 安装 =====
# pip install vllm
# ===== 2. Python API使用 =====
from vllm import LLM, SamplingParams
def vllm_inference():
"""vLLM离线推理"""
# 加载模型
llm = LLM(
model="Qwen/Qwen2.5-7B-Instruct",
tensor_parallel_size=1, # GPU数量
gpu_memory_utilization=0.9, # GPU显存利用率
max_model_len=4096, # 最大序列长度
dtype="auto", # 自动选择精度
# quantization="awq", # 使用AWQ量化模型
)
# 采样参数
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
top_k=50,
max_tokens=512,
repetition_penalty=1.1,
stop=["<|endoftext|>", "<|im_end|>"]
)
# 批量推理
prompts = [
"请解释什么是PagedAttention?",
"如何优化LLM推理延迟?",
"什么是KV Cache?"
]
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
print(f"Prompt: {output.prompt[:50]}...")
print(f"Generated: {output.outputs[0].text[:200]}...")
print(f"Tokens/s: {len(output.outputs[0].token_ids) / output.metrics.finished_time:.1f}")
print()
# vllm_inference() # 取消注释运行
# ===== 3. OpenAI兼容API服务 =====
"""
启动vLLM服务器(命令行):
python -m vllm.entrypoints.openai.api_server \
--model Qwen/Qwen2.5-7B-Instruct \
--served-model-name qwen2.5-7b \
--tensor-parallel-size 1 \
--gpu-memory-utilization 0.9 \
--max-model-len 4096 \
--port 8000 \
--host 0.0.0.0
"""
# ===== 4. 客户端调用 =====
from openai import OpenAI
def call_vllm_api():
"""调用vLLM的OpenAI兼容API"""
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="dummy" # vLLM不需要真实key
)
# Chat Completion
response = client.chat.completions.create(
model="qwen2.5-7b",
messages=[
{"role": "system", "content": "你是一个AI助手。"},
{"role": "user", "content": "解释MLOps的核心概念。"}
],
temperature=0.7,
max_tokens=512,
stream=True # 流式输出
)
# 流式处理
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
# call_vllm_api() # 取消注释运行
2.2 TGI(Text Generation Inference)¶
# TGI Docker部署
# docker run --gpus all --shm-size 1g -p 8080:80 \
# -v /data/models:/data \
# ghcr.io/huggingface/text-generation-inference:latest \
# --model-id Qwen/Qwen2.5-7B-Instruct \
# --quantize awq \
# --max-input-length 4096 \
# --max-total-tokens 8192 \
# --max-batch-prefill-tokens 4096 \
# --max-concurrent-requests 128
"""TGI客户端调用"""
import requests
def call_tgi(prompt: str, max_tokens: int = 256):
"""调用TGI推理服务"""
response = requests.post(
"http://localhost:8080/generate",
json={
"inputs": prompt,
"parameters": {
"max_new_tokens": max_tokens,
"temperature": 0.7,
"top_p": 0.9,
"do_sample": True,
"return_full_text": False
}
},
headers={"Content-Type": "application/json"}
)
result = response.json()
return result[0]["generated_text"]
# 流式调用
def call_tgi_stream(prompt: str):
"""TGI流式推理"""
response = requests.post(
"http://localhost:8080/generate_stream",
json={
"inputs": prompt,
"parameters": {
"max_new_tokens": 256,
"temperature": 0.7
}
},
stream=True
)
for line in response.iter_lines():
if line:
decoded = line.decode("utf-8")
if decoded.startswith("data:"):
import json
data = json.loads(decoded[5:])
if "token" in data:
print(data["token"]["text"], end="", flush=True)
print()
2.3 Ollama本地部署¶
# Ollama - 最简单的本地LLM部署
# 安装(Linux/Mac)
# curl -fsSL https://ollama.ai/install.sh | sh
# 下载并运行模型
# ollama pull qwen2.5:7b
# ollama run qwen2.5:7b
# 自定义Modelfile
# FROM qwen2.5:7b
# PARAMETER temperature 0.7
# PARAMETER num_ctx 4096
# SYSTEM "你是一个专业的AI工程师助手。"
# ollama create my-assistant -f Modelfile
# ollama run my-assistant
"""Ollama API调用"""
import requests
def ollama_chat(messages: list, model: str = "qwen2.5:7b"):
"""调用Ollama Chat API"""
response = requests.post(
"http://localhost:11434/api/chat",
json={
"model": model,
"messages": messages,
"stream": False,
"options": {
"temperature": 0.7,
"num_ctx": 4096,
"num_predict": 512
}
}
)
result = response.json()
return result["message"]["content"]
def ollama_generate_stream(prompt: str, model: str = "qwen2.5:7b"):
"""Ollama流式生成"""
import json
response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": True
},
stream=True
)
full_response = ""
for line in response.iter_lines():
if line:
data = json.loads(line)
if "response" in data:
print(data["response"], end="", flush=True)
full_response += data["response"]
if data.get("done"):
break
print()
return full_response
# 使用
# result = ollama_chat([
# {"role": "user", "content": "什么是vLLM?"}
# ])
# print(result)
2.4 推理引擎对比¶
"""推理引擎选型指南"""
comparison = {
"vLLM": {
"吞吐量": "⭐⭐⭐⭐⭐ (PagedAttention)",
"延迟": "⭐⭐⭐⭐",
"易用性": "⭐⭐⭐⭐ (OpenAI兼容)",
"量化支持": "AWQ/GPTQ/INT4",
"最佳场景": "高并发生产环境",
"多模态": "支持",
"分布式": "Tensor Parallel"
},
"TGI": {
"吞吐量": "⭐⭐⭐⭐",
"延迟": "⭐⭐⭐⭐",
"易用性": "⭐⭐⭐⭐⭐ (Docker一键)",
"量化支持": "AWQ/GPTQ/BitsAndBytes",
"最佳场景": "HuggingFace生态",
"多模态": "支持",
"分布式": "Tensor Parallel"
},
"Ollama": {
"吞吐量": "⭐⭐⭐",
"延迟": "⭐⭐⭐",
"易用性": "⭐⭐⭐⭐⭐ (最简单)",
"量化支持": "GGUF (llama.cpp)",
"最佳场景": "本地开发/测试",
"多模态": "支持",
"分布式": "不支持"
},
"Triton + TensorRT-LLM": {
"吞吐量": "⭐⭐⭐⭐⭐ (最高)",
"延迟": "⭐⭐⭐⭐⭐ (最低)",
"易用性": "⭐⭐ (复杂)",
"量化支持": "FP8/INT8/INT4",
"最佳场景": "对性能要求极致",
"多模态": "支持",
"分布式": "Tensor/Pipeline Parallel"
}
}
for engine, specs in comparison.items():
print(f"\n🔹 {engine}:")
for k, v in specs.items():
print(f" {k}: {v}")
三、量化部署¶
3.1 量化方法概述¶
"""
LLM量化方法对比和实践
"""
quantization_methods = {
"GPTQ": {
"类型": "后训练量化 (PTQ)",
"原理": "逐层量化,使用Hessian信息最小化量化误差",
"精度": "INT4/INT8",
"速度": "推理快,量化过程较慢(需校准数据)",
"质量": "⭐⭐⭐⭐ 接近原始精度",
"工具": "AutoGPTQ, vLLM原生支持",
"适用": "GPU部署"
},
"AWQ": {
"类型": "后训练量化 (PTQ)",
"原理": "保护重要权重通道(Activation-aware),量化损失更小",
"精度": "INT4",
"速度": "推理快,量化更快(无需逐层校准)",
"质量": "⭐⭐⭐⭐⭐ 优于GPTQ",
"工具": "AutoAWQ, vLLM/TGI原生支持",
"适用": "GPU部署(推荐)"
},
"GGUF": {
"类型": "后训练量化 (PTQ)",
"原理": "llama.cpp格式,支持CPU+GPU混合推理",
"精度": "Q2_K到Q8_0多档",
"速度": "CPU推理最佳选择",
"质量": "⭐⭐⭐-⭐⭐⭐⭐ (取决于量化档位)",
"工具": "llama.cpp, Ollama",
"适用": "CPU部署 / 边缘设备 / 本地"
},
"BitsAndBytes": {
"类型": "动态量化",
"原理": "NF4/INT8动态量化,支持QLoRA微调",
"精度": "INT4(NF4)/INT8",
"速度": "有额外的反量化开销",
"质量": "⭐⭐⭐⭐",
"工具": "bitsandbytes + HuggingFace",
"适用": "微调 + 推理(显存受限场景)"
}
}
for method, info in quantization_methods.items():
print(f"\n{'='*40}")
print(f"📌 {method}")
for k, v in info.items():
print(f" {k}: {v}")
3.2 AWQ量化实践¶
"""
AWQ量化示例
pip install autoawq
"""
# from awq import AutoAWQForCausalLM
# from transformers import AutoTokenizer
def quantize_model_awq(
model_path: str = "Qwen/Qwen2.5-7B-Instruct",
output_path: str = "Qwen2.5-7B-Instruct-AWQ",
quant_config: dict = None
):
"""使用AWQ量化模型"""
if quant_config is None:
quant_config = {
"zero_point": True, # 零点量化
"q_group_size": 128, # 量化分组大小
"w_bit": 4, # 权重位数
"version": "GEMM" # GEMM或GEMV内核
}
# 加载模型
# model = AutoAWQForCausalLM.from_pretrained(
# model_path,
# device_map="auto",
# safetensors=True
# )
# tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
# 量化
# model.quantize(tokenizer, quant_config=quant_config)
# 保存
# model.save_quantized(output_path)
# tokenizer.save_pretrained(output_path)
print(f"Model quantized and saved to {output_path}")
print(f"Config: {quant_config}")
# quantize_model_awq()
# 使用vLLM直接加载AWQ模型
"""
python -m vllm.entrypoints.openai.api_server \
--model Qwen2.5-7B-Instruct-AWQ \
--quantization awq \
--dtype half \
--max-model-len 4096 \
--port 8000
"""
3.3 GGUF量化与CPU部署¶
"""
GGUF量化(使用llama.cpp)
适合CPU/混合推理部署
"""
# 量化流程(命令行)
"""
# 1. 安装llama.cpp
git clone https://github.com/ggerganov/llama.cpp
cd llama.cpp && make -j
# 2. 转换HuggingFace模型为GGUF
python convert_hf_to_gguf.py /path/to/model --outtype f16 --outfile model-f16.gguf
# 3. 量化
./llama-quantize model-f16.gguf model-Q4_K_M.gguf Q4_K_M
# 量化档位说明:
# Q2_K - 2bit 最小,质量较低
# Q3_K_M - 3bit 小,质量可接受
# Q4_K_M - 4bit 推荐平衡点 ⭐
# Q5_K_M - 5bit 高质量
# Q6_K - 6bit 接近原始
# Q8_0 - 8bit 几乎无损
"""
# 使用Python调用GGUF模型
"""
pip install llama-cpp-python
"""
# from llama_cpp import Llama
#
# llm = Llama(
# model_path="model-Q4_K_M.gguf",
# n_ctx=4096, # 上下文长度
# n_threads=8, # CPU线程数
# n_gpu_layers=35, # 分配到GPU的层数(0=纯CPU)
# verbose=False
# )
#
# output = llm.create_chat_completion(
# messages=[{"role": "user", "content": "什么是量化?"}],
# temperature=0.7,
# max_tokens=256
# )
# print(output["choices"][0]["message"]["content"])
四、Prompt管理与版本控制¶
4.1 Prompt模板管理系统¶
"""
Prompt版本管理系统
"""
import json
import hashlib
from datetime import datetime
from dataclasses import dataclass, field, asdict
from pathlib import Path
@dataclass
class PromptTemplate:
"""Prompt模板"""
name: str
version: str
template: str
system_prompt: str | None = None
description: str = ""
# default_factory避免可变默认值陷阱:每次实例化创建新list/新时间戳,若直接写=[]则所有实例共享同一列表
tags: list[str] = field(default_factory=list)
variables: list[str] = field(default_factory=list)
model: str = ""
temperature: float = 0.7
max_tokens: int = 512
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
@property # @property将方法变为属性访问
def hash(self) -> str:
"""Prompt内容哈希"""
content = self.template + (self.system_prompt or "")
return hashlib.md5(content.encode()).hexdigest()[:8]
def render(self, **kwargs) -> str: # *args接收任意位置参数;**kwargs接收任意关键字参数
"""渲染Prompt模板"""
rendered = self.template
for key, value in kwargs.items():
rendered = rendered.replace(f"{{{{{key}}}}}", str(value))
return rendered
class PromptRegistry:
"""Prompt注册与版本管理"""
def __init__(self, storage_dir: str = "prompts"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.templates: dict[str, dict[str, PromptTemplate]] = {}
self._load_all()
def register(self, template: PromptTemplate):
"""注册新Prompt"""
if template.name not in self.templates:
self.templates[template.name] = {}
self.templates[template.name][template.version] = template
self._save(template)
print(f"Registered: {template.name} v{template.version} (hash={template.hash})")
def get(self, name: str, version: str = "latest") -> PromptTemplate:
"""获取Prompt模板"""
if name not in self.templates:
raise KeyError(f"Prompt '{name}' not found")
versions = self.templates[name]
if version == "latest":
version = sorted(versions.keys())[-1]
if version not in versions:
raise KeyError(f"Version '{version}' not found for prompt '{name}'")
return versions[version]
def list_all(self) -> dict:
"""列出所有Prompt"""
result = {}
for name, versions in self.templates.items():
result[name] = {
"versions": list(versions.keys()),
"latest": sorted(versions.keys())[-1]
}
return result
def compare(self, name: str, v1: str, v2: str) -> dict:
"""比较两个版本的Prompt"""
t1 = self.get(name, v1)
t2 = self.get(name, v2)
return {
"name": name,
"v1": v1,
"v2": v2,
"template_changed": t1.template != t2.template,
"system_changed": t1.system_prompt != t2.system_prompt,
"model_changed": t1.model != t2.model,
"temperature_changed": t1.temperature != t2.temperature
}
def _save(self, template: PromptTemplate):
"""保存Prompt到文件"""
filepath = self.storage_dir / f"{template.name}_{template.version}.json"
with open(filepath, "w", encoding="utf-8") as f: # with自动管理资源,确保文件正确关闭
json.dump(asdict(template), f, ensure_ascii=False, indent=2)
def _load_all(self):
"""加载所有已保存的Prompt"""
for filepath in self.storage_dir.glob("*.json"):
try:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
template = PromptTemplate(**data)
if template.name not in self.templates:
self.templates[template.name] = {}
self.templates[template.name][template.version] = template
except Exception:
pass
# ===== 使用示例 =====
registry = PromptRegistry("./prompt_store")
# 注册v1
registry.register(PromptTemplate(
name="code_review",
version="1.0",
template="""请审查以下代码,找出潜在问题并给出改进建议。
代码语言: {{language}}
代码内容:
请从以下维度审查:
1. 代码质量与可读性
2. 潜在Bug
3. 性能问题
4. 安全问题""",
system_prompt="你是一位经验丰富的高级软件工程师,擅长代码审查。",
description="代码审查Prompt",
tags=["code-review", "engineering"],
variables=["language", "code"],
model="qwen2.5-72b",
temperature=0.3
))
# 注册v2(改进版)
registry.register(PromptTemplate(
name="code_review",
version="2.0",
template="""请对以下{{language}}代码进行全面审查。
代码:
审查维度(按优先级):
1. 🐛 Bug与逻辑错误
2. 🔒 安全漏洞
3. ⚡ 性能问题
4. 📖 代码可读性
5. 🏗️ 架构设计
对每个问题,请给出:
- 问题描述
- 风险等级(高/中/低)
- 修复建议和代码示例""",
system_prompt="你是一位拥有10年经验的Staff Engineer,精通代码审查和软件架构。回答要具体、可执行。",
description="改进版代码审查Prompt,更结构化",
tags=["code-review", "engineering", "v2"],
variables=["language", "code"],
model="qwen2.5-72b",
temperature=0.2
))
# 获取并渲染
template = registry.get("code_review", "latest")
rendered = template.render(
language="Python",
code="def add(a, b): return a+b"
)
print(f"\nRendered prompt (v{template.version}):")
print(rendered[:200] + "...")
# 比较版本
diff = registry.compare("code_review", "1.0", "2.0")
print(f"\nVersion diff: {diff}")
五、LLM评估体系¶
5.1 LLM-as-Judge评估¶
"""
LLM-as-Judge:使用强LLM评估弱LLM的输出质量
"""
from dataclasses import dataclass
import json
@dataclass
class EvalResult:
"""评估结果"""
question: str
answer: str
score: float
reasoning: str
dimensions: dict
class LLMJudge:
"""LLM评估器"""
JUDGE_PROMPT = """你是一位严格的AI输出质量评估专家。请评估以下回答的质量。
## 评估问题
{question}
## 待评估回答
{answer}
## 参考答案(如有)
{reference}
## 评估维度
请从以下维度打分(每维度1-5分):
1. **准确性** (Correctness): 回答是否事实准确?
2. **完整性** (Completeness): 是否覆盖了问题的所有方面?
3. **相关性** (Relevance): 回答是否紧扣问题?
4. **清晰度** (Clarity): 表达是否清晰、有条理?
5. **深度** (Depth): 分析是否有深度?
请以JSON格式输出:
{{
"correctness": <1-5>,
"completeness": <1-5>,
"relevance": <1-5>,
"clarity": <1-5>,
"depth": <1-5>,
"overall_score": <1-5>,
"reasoning": "<评估理由>"
}}"""
def __init__(self, judge_model_fn):
"""
Args:
judge_model_fn: 评判模型的调用函数 f(prompt) -> str
"""
self.judge_fn = judge_model_fn
def evaluate_single(self, question: str, answer: str, reference: str = "无") -> EvalResult:
"""评估单个回答"""
prompt = self.JUDGE_PROMPT.format(
question=question,
answer=answer,
reference=reference
)
response = self.judge_fn(prompt)
try:
result = json.loads(response) # json.loads将JSON字符串转为Python对象
return EvalResult(
question=question,
answer=answer[:200],
score=result["overall_score"],
reasoning=result["reasoning"],
dimensions={
"correctness": result["correctness"],
"completeness": result["completeness"],
"relevance": result["relevance"],
"clarity": result["clarity"],
"depth": result["depth"]
}
)
except (json.JSONDecodeError, KeyError):
return EvalResult(
question=question, answer=answer[:200],
score=0, reasoning="Parse error", dimensions={}
)
def evaluate_batch(self, eval_data: list[dict]) -> dict:
"""批量评估"""
results = []
for item in eval_data:
result = self.evaluate_single(
question=item["question"],
answer=item["answer"],
reference=item.get("reference", "无")
)
results.append(result)
# 汇总统计
scores = [r.score for r in results if r.score > 0]
summary = {
"total_samples": len(eval_data),
"avg_score": sum(scores) / len(scores) if scores else 0,
"score_distribution": {
f"score_{i}": sum(1 for s in scores if int(s) == i) / len(scores)
for i in range(1, 6)
},
"results": results
}
return summary
# 模拟使用
def mock_judge(prompt: str) -> str:
"""模拟评判模型(实际使用时替换为真实LLM调用)"""
return json.dumps({ # json.dumps将Python对象转为JSON字符串
"correctness": 4,
"completeness": 3,
"relevance": 5,
"clarity": 4,
"depth": 3,
"overall_score": 4,
"reasoning": "回答基本准确且切题,但在某些方面可以更深入。"
})
judge = LLMJudge(mock_judge)
result = judge.evaluate_single(
question="什么是MLOps?",
answer="MLOps是将机器学习模型从实验环境部署到生产环境的一套工程实践..."
)
print(f"Score: {result.score}/5, Reasoning: {result.reasoning}")
5.2 RAG评估(RAGAS)¶
"""
RAG评估框架
pip install ragas
"""
# ===== RAGAS评估指标 =====
"""
RAGAS核心指标:
1. Faithfulness(忠实度): 回答是否基于检索的上下文?
2. Answer Relevancy(答案相关性): 回答是否切题?
3. Context Precision(上下文精确度): 检索到的内容是否相关?
4. Context Recall(上下文召回率): 检索是否覆盖了答案所需的信息?
"""
# from ragas import evaluate
# from ragas.metrics import (
# faithfulness,
# answer_relevancy,
# context_precision,
# context_recall
# )
# from datasets import Dataset
def rag_evaluation_example():
"""RAG评估示例"""
# 准备评估数据
eval_data = {
"question": [
"什么是PagedAttention?",
"vLLM和TGI哪个吞吐量更高?"
],
"answer": [
"PagedAttention是vLLM提出的一种注意力计算优化技术,"
"借鉴操作系统的虚拟内存管理,将KV Cache分成固定大小的页进行管理。",
"vLLM使用PagedAttention,通常在高并发场景下吞吐量比TGI更高。"
],
"contexts": [
["PagedAttention将KV Cache分成block进行管理,减少内存碎片。",
"vLLM通过Copy-on-Write共享prefix KV Cache。"],
["vLLM benchmark显示在高并发下吞吐量领先。",
"TGI在低并发场景性能与vLLM接近。"]
],
"ground_truth": [
"PagedAttention是受虚拟内存启发的注意力计算优化,核心是非连续KV Cache管理。",
"vLLM在高并发下吞吐量通常高于TGI,但TGI在某些场景也有优势。"
]
}
# dataset = Dataset.from_dict(eval_data)
# result = evaluate(
# dataset,
# metrics=[faithfulness, answer_relevancy, context_precision, context_recall]
# )
# 模拟结果输出
result = {
"faithfulness": 0.89,
"answer_relevancy": 0.92,
"context_precision": 0.85,
"context_recall": 0.78
}
print("RAG Evaluation Results:")
for metric, score in result.items():
status = "✅" if score >= 0.8 else "⚠️" if score >= 0.6 else "❌"
print(f" {status} {metric}: {score:.4f}")
rag_evaluation_example()
六、成本优化¶
6.1 模型路由(Model Router)¶
"""
智能模型路由:根据查询复杂度选择合适的模型
简单问题 → 小模型(便宜快速)
复杂问题 → 大模型(昂贵但高质量)
"""
from dataclasses import dataclass
from enum import Enum
class QueryComplexity(Enum):
SIMPLE = "simple"
MEDIUM = "medium"
COMPLEX = "complex"
@dataclass
class ModelConfig:
name: str
cost_per_1k_tokens: float # 元/千token
avg_latency_ms: float
max_quality: float # 0-1
class ModelRouter:
"""
智能模型路由器
根据查询复杂度和预算约束选择模型
"""
def __init__(self):
self.models: dict[str, ModelConfig] = {}
self.routing_rules = {}
self.call_log = []
def register_model(self, config: ModelConfig, complexity: QueryComplexity):
"""注册模型及其处理的复杂度级别"""
self.models[config.name] = config
self.routing_rules[complexity] = config.name
def classify_query(self, query: str) -> QueryComplexity:
"""
简单分类查询复杂度
实际场景可使用小型分类器或规则
"""
query_lower = query.lower()
# 简单问题特征
simple_patterns = ["什么是", "定义", "是什么", "how to", "what is"]
# 复杂问题特征
complex_patterns = ["比较", "分析", "设计", "优化", "如何解决", "trade-off",
"架构", "方案", "for和against", "详细解释"]
# any()配合生成器:遍历patterns列表,只要任一模式p出现在query_lower中就返回True(短路求值,找到即停止)
if len(query) < 20 or any(p in query_lower for p in simple_patterns):
return QueryComplexity.SIMPLE
elif any(p in query_lower for p in complex_patterns) or len(query) > 200:
return QueryComplexity.COMPLEX
else:
return QueryComplexity.MEDIUM
def route(self, query: str) -> str:
"""路由到合适的模型"""
complexity = self.classify_query(query)
model_name = self.routing_rules.get(complexity, list(self.models.keys())[0])
self.call_log.append({
"query_length": len(query),
"complexity": complexity.value,
"model": model_name,
"cost": self.models[model_name].cost_per_1k_tokens
})
return model_name
def get_cost_report(self) -> dict:
"""获取成本报告"""
import numpy as np
total_calls = len(self.call_log)
if total_calls == 0:
return {"total_calls": 0}
model_counts = {}
total_cost = 0
for log in self.call_log:
model = log["model"]
model_counts[model] = model_counts.get(model, 0) + 1
total_cost += log["cost"]
# 如果全部使用最大模型的成本
max_cost_model = max(self.models.values(), key=lambda m: m.cost_per_1k_tokens)
naive_cost = total_calls * max_cost_model.cost_per_1k_tokens
return {
"total_calls": total_calls,
"model_distribution": model_counts,
"total_cost_estimate": total_cost,
"naive_cost_estimate": naive_cost,
"savings_pct": (1 - total_cost / naive_cost) * 100 if naive_cost > 0 else 0
}
# 使用示例
router = ModelRouter()
router.register_model(
ModelConfig("qwen2.5-7b", cost_per_1k_tokens=0.01, avg_latency_ms=50, max_quality=0.7),
QueryComplexity.SIMPLE
)
router.register_model(
ModelConfig("qwen2.5-32b", cost_per_1k_tokens=0.05, avg_latency_ms=150, max_quality=0.85),
QueryComplexity.MEDIUM
)
router.register_model(
ModelConfig("qwen2.5-72b", cost_per_1k_tokens=0.2, avg_latency_ms=400, max_quality=0.95),
QueryComplexity.COMPLEX
)
# 模拟查询路由
queries = [
"什么是MLOps?",
"如何在Kubernetes上部署推理服务?",
"请详细比较vLLM和TGI的架构差异,分析各自的优缺点,并给出在高并发场景下的选型建议。",
"Python是什么?",
"设计一个支持A/B测试和自动回滚的模型部署系统方案。",
]
for q in queries:
model = router.route(q)
print(f"Query: {q[:40]:.<42s} → {model}") # 切片操作:[start:end:step]提取子序列
report = router.get_cost_report()
print(f"\nCost Report:")
print(f" Total calls: {report['total_calls']}")
print(f" Distribution: {report['model_distribution']}")
print(f" Estimated savings: {report['savings_pct']:.1f}%")
6.2 语义缓存¶
"""
语义缓存:相似查询复用缓存结果
减少LLM调用次数,降低成本和延迟
"""
import hashlib
import numpy as np
from dataclasses import dataclass
import time
@dataclass
class CacheEntry:
query: str
response: str
embedding: np.ndarray
timestamp: float
hit_count: int = 0
class SemanticCache:
"""
基于语义相似度的LLM缓存
"""
def __init__(self, similarity_threshold: float = 0.92, max_size: int = 10000, ttl: float = 3600):
self.threshold = similarity_threshold
self.max_size = max_size
self.ttl = ttl # 缓存过期时间(秒)
self.cache: list[CacheEntry] = []
self.stats = {"hits": 0, "misses": 0}
def _get_embedding(self, text: str) -> np.ndarray:
"""获取文本嵌入向量(实际使用时替换为真实embedding模型)"""
# 模拟embedding,实际使用 sentence-transformers 等
np.random.seed(hash(text) % 2**32)
return np.random.randn(384).astype(np.float32)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""计算余弦相似度"""
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def get(self, query: str) -> str | None:
"""查找缓存"""
query_emb = self._get_embedding(query)
now = time.time()
best_match = None
best_similarity = 0.0
for entry in self.cache:
# TTL检查
if now - entry.timestamp > self.ttl:
continue
similarity = self._cosine_similarity(query_emb, entry.embedding)
if similarity > self.threshold and similarity > best_similarity:
best_similarity = similarity
best_match = entry
if best_match:
best_match.hit_count += 1
self.stats["hits"] += 1
return best_match.response
self.stats["misses"] += 1
return None
def put(self, query: str, response: str):
"""写入缓存"""
embedding = self._get_embedding(query)
entry = CacheEntry(
query=query,
response=response,
embedding=embedding,
timestamp=time.time()
)
# LRU驱逐
if len(self.cache) >= self.max_size:
self.cache.sort(key=lambda e: e.hit_count)
self.cache.pop(0)
self.cache.append(entry)
def get_stats(self) -> dict:
"""获取缓存统计"""
total = self.stats["hits"] + self.stats["misses"]
hit_rate = self.stats["hits"] / total if total > 0 else 0
return {
"total_queries": total,
"cache_hits": self.stats["hits"],
"cache_misses": self.stats["misses"],
"hit_rate": f"{hit_rate:.2%}",
"cache_size": len(self.cache)
}
# 使用示例
cache = SemanticCache(similarity_threshold=0.92)
# 模拟第一次调用(cache miss)
query1 = "什么是MLOps?"
result = cache.get(query1)
if result is None:
response = "MLOps是机器学习运维的实践..." # 从LLM获取
cache.put(query1, response)
print(f"Cache MISS: {query1}")
# 相似查询(cache hit)
query2 = "请解释MLOps"
result = cache.get(query2)
print(f"Cache {'HIT' if result else 'MISS'}: {query2}")
print(f"\nCache stats: {cache.get_stats()}")
七、LLMOps流水线¶
7.1 完整LLMOps Pipeline¶
"""
LLMOps完整流水线:数据→微调→评估→部署→监控
"""
from dataclasses import dataclass, field
from enum import Enum
import time
class PipelineStage(Enum):
DATA_PREPARATION = "data_preparation"
FINE_TUNING = "fine_tuning"
EVALUATION = "evaluation"
QUANTIZATION = "quantization"
DEPLOYMENT = "deployment"
MONITORING = "monitoring"
@dataclass # 自动生成__init__等方法
class PipelineConfig:
"""LLMOps Pipeline配置"""
# 基础模型
base_model: str = "Qwen/Qwen2.5-7B-Instruct"
# 数据
train_data: str = "data/train.jsonl"
eval_data: str = "data/eval.jsonl"
# 微调
finetune_method: str = "lora" # lora/qlora/full
lora_rank: int = 64
learning_rate: float = 2e-4
num_epochs: int = 3
batch_size: int = 4
# 评估
eval_metrics: list[str] = field(default_factory=lambda: [ # lambda匿名函数:简洁的单行函数
"accuracy", "f1", "bleu", "rouge", "llm_judge"
])
eval_threshold: float = 0.8
# 量化
quantization: str = "awq" # awq/gptq/gguf/none
# 部署
serving_engine: str = "vllm" # vllm/tgi/ollama
max_concurrent: int = 100
gpu_memory_utilization: float = 0.9
# 监控
monitor_interval: int = 300 # 秒
drift_threshold: float = 0.2
class LLMOpsPipeline:
"""LLMOps流水线执行器"""
def __init__(self, config: PipelineConfig):
self.config = config
self.stage = None
self.artifacts = {}
self.metrics = {}
def run(self):
"""运行完整Pipeline"""
stages = [
(PipelineStage.DATA_PREPARATION, self._prepare_data),
(PipelineStage.FINE_TUNING, self._fine_tune),
(PipelineStage.EVALUATION, self._evaluate),
(PipelineStage.QUANTIZATION, self._quantize),
(PipelineStage.DEPLOYMENT, self._deploy),
(PipelineStage.MONITORING, self._setup_monitoring),
]
for stage, func in stages:
self.stage = stage
print(f"\n{'='*50}")
print(f"Stage: {stage.value}")
print(f"{'='*50}")
try: # try/except捕获异常
func()
print(f"✅ {stage.value} completed")
except Exception as e:
print(f"❌ {stage.value} failed: {e}")
return False
print(f"\n{'='*50}")
print("🎉 LLMOps Pipeline completed successfully!")
self._print_summary()
return True
def _prepare_data(self):
"""Stage 1: 数据准备"""
print(" → Loading and validating training data...")
print(f" → Train: {self.config.train_data}")
print(f" → Eval: {self.config.eval_data}")
# 数据质量检查
checks = {
"format_valid": True,
"no_duplicates": True,
"no_pii": True, # 无个人信息
"instruction_quality": True,
"min_samples": True
}
print(f" → Data quality checks: {sum(checks.values())}/{len(checks)} passed")
self.artifacts["data_version"] = "v1.2"
self.artifacts["train_samples"] = 5000
self.artifacts["eval_samples"] = 500
def _fine_tune(self):
"""Stage 2: 微调"""
print(f" → Base model: {self.config.base_model}")
print(f" → Method: {self.config.finetune_method}")
print(f" → LoRA rank: {self.config.lora_rank}")
print(f" → LR: {self.config.learning_rate}, Epochs: {self.config.num_epochs}")
# 模拟训练过程
for epoch in range(self.config.num_epochs):
loss = 2.0 * (0.6 ** epoch)
print(f" → Epoch {epoch+1}/{self.config.num_epochs}: loss={loss:.4f}")
self.artifacts["model_path"] = f"models/{self.config.base_model.split('/')[-1]}-finetuned" # 负索引:从末尾倒数访问元素
self.artifacts["adapter_path"] = "models/lora_adapter"
self.metrics["train_loss"] = 0.43
def _evaluate(self):
"""Stage 3: 评估"""
print(f" → Evaluating on {self.artifacts.get('eval_samples', '?')} samples...")
eval_results = {
"accuracy": 0.88,
"f1_macro": 0.85,
"bleu": 0.72,
"rouge_l": 0.81,
"llm_judge_score": 4.2,
"faithfulness": 0.91,
"toxicity": 0.02
}
self.metrics.update(eval_results)
# 红队测试
red_team = {
"injection_resistance": 0.95,
"bias_score": 0.08,
"hallucination_rate": 0.05
}
self.metrics.update(red_team)
for metric, value in {**eval_results, **red_team}.items():
status = "✅" if value >= 0.8 or (metric in ["toxicity", "bias_score", "hallucination_rate"] and value < 0.1) else "⚠️"
print(f" → {status} {metric}: {value}")
# 评估门禁
if eval_results["accuracy"] < self.config.eval_threshold:
raise ValueError(f"Evaluation below threshold: {eval_results['accuracy']} < {self.config.eval_threshold}")
def _quantize(self):
"""Stage 4: 量化"""
if self.config.quantization == "none":
print(" → Skipping quantization")
return
print(f" → Quantization method: {self.config.quantization}")
print(f" → Original size: ~14GB (FP16)")
size_reduction = {"awq": 0.28, "gptq": 0.28, "gguf": 0.25}
new_size = 14 * size_reduction.get(self.config.quantization, 0.5)
print(f" → Quantized size: ~{new_size:.1f}GB")
self.artifacts["quantized_model"] = f"models/quantized-{self.config.quantization}"
def _deploy(self):
"""Stage 5: 部署"""
print(f" → Engine: {self.config.serving_engine}")
print(f" → Max concurrent: {self.config.max_concurrent}")
# 生成部署配置
deploy_config = {
"engine": self.config.serving_engine,
"model": self.artifacts.get("quantized_model", self.artifacts.get("model_path")),
"gpu_memory_utilization": self.config.gpu_memory_utilization,
"max_concurrent_requests": self.config.max_concurrent,
}
print(f" → Deploy config: {deploy_config}")
# 冒烟测试
print(" → Running smoke test...")
print(" → Smoke test passed ✅")
self.artifacts["endpoint"] = "http://model-service:8000/v1"
def _setup_monitoring(self):
"""Stage 6: 监控设置"""
print(f" → Monitor interval: {self.config.monitor_interval}s")
print(f" → Drift threshold: {self.config.drift_threshold}")
monitors = [
"Latency (P50/P95/P99)",
"Throughput (tokens/s)",
"Error rate",
"Token usage & cost",
"Output quality (LLM-as-Judge sampling)",
"Input/output distribution drift"
]
for m in monitors:
print(f" → 📊 {m}")
def _print_summary(self):
"""打印Pipeline总结"""
print(f"\n{'='*50}")
print("📋 Pipeline Summary")
print(f"{'='*50}")
print(f"Base Model: {self.config.base_model}")
print(f"Fine-tune: {self.config.finetune_method} (rank={self.config.lora_rank})")
print(f"Quantization: {self.config.quantization}")
print(f"Serving: {self.config.serving_engine}")
print(f"\nKey Metrics:")
for k, v in self.metrics.items():
print(f" {k}: {v}")
print(f"\nArtifacts:")
for k, v in self.artifacts.items():
print(f" {k}: {v}")
# 运行Pipeline
config = PipelineConfig(
base_model="Qwen/Qwen2.5-7B-Instruct",
finetune_method="lora",
lora_rank=64,
quantization="awq",
serving_engine="vllm"
)
pipeline = LLMOpsPipeline(config)
pipeline.run()
💼 面试常考题¶
Q1: vLLM的PagedAttention原理是什么?为什么能提高吞吐量?¶
答:PagedAttention借鉴OS虚拟内存管理思想,将KV Cache分成固定大小的Page(通常16个token),使用Page Table映射逻辑块到物理块。优势:①减少内存碎片,KV Cache利用率从20-40%提升到接近100%②支持Copy-on-Write共享prefix③动态分配内存,不需要预分配最大长度。Continuous Batching配合PagedAttention,吞吐量比HF实现高3-24x。
Q2: GPTQ和AWQ量化有什么区别?¶
答:GPTQ:逐层量化,使用Hessian矩阵的逆来最小化量化误差,需要校准数据集,量化过程较慢但质量好。AWQ:观察到不同权重通道的重要性不同(由激活值决定),保护重要通道(乘以缩放因子后再量化),量化速度快且质量优于GPTQ。AWQ是目前GPU部署INT4量化的首选。
Q3: 如何优化LLM推理成本?¶
答:多层策略:①模型层面:量化(INT4/INT8)、蒸馏更小模型②架构层面:模型路由(简单问题→小模型)、语义缓存(重复查询复用)、级联策略(先小模型尝试,不满意再大模型)③工程层面:Continuous Batching提高GPU利用率、Speculative Decoding加速④业务层面:合理设置max_tokens、prompt精简、结果复用。
Q4: LLM应用如何做评估?离线评估和在线评估有哪些方法?¶
答:离线评估:①自动指标(BLEU/ROUGE/BERTScore)②LLM-as-Judge(GPT-4打分)③红队测试(安全性/偏见/幻觉)④RAG评估(RAGAS:Faithfulness/Relevancy)。在线评估:①用户满意度(👍👎反馈率)②任务完成率③对话轮数④用户留存⑤A/B测试业务指标。推荐组合:离线LLM-Judge快速迭代 + 在线A/B验证业务效果。
Q5: Prompt管理为什么重要?如何做版本控制?¶
答:Prompt是LLM应用的核心"代码",决定输出质量。版本管理的必要性:①可追溯(哪个版本效果好)②可回滚(新版本不好赶紧退回)③协作(团队共享优化Prompt)④A/B测试(不同版本对比效果)。方案:将Prompt视为代码,用Git管理+专门的Prompt Registry(带模板渲染、变量管理、版本对比、效果评估关联)。
Q6: 如何设计一个LLMOps流水线?¶
答:完整链路: 1. 数据管理:训练数据清洗→标注→版本控制→质量检测 2. 微调训练:选择方法(LoRA/QLoRA)→超参搜索→实验跟踪(W&B) 3. 评估验证:自动评估+人工评估+红队测试→评估门禁 4. 量化压缩:AWQ/GPTQ→精度验证 5. 部署上线:vLLM/TGI→金丝雀发布→性能测试 6. 监控运维:延迟/吞吐/成本/质量监控→告警→触发重训练
Q7: KV Cache的显存占用如何计算?如何优化?¶
答:KV Cache大小 = \(2 \times L \times H \times S \times B \times dtype\_size\),其中L=层数,H=隐藏维度,S=序列长度,B=批大小。7B模型4096长度单请求约1-2GB。优化:①PagedAttention减少碎片②GQA/MQA减少KV头数③KV Cache量化(FP8)④限制max_tokens⑤前缀缓存共享。
Q8: LLM部署时如何做负载测试?关注哪些指标?¶
答:工具:Locust/K6/自定义脚本。关键指标:①TTFT(首Token延迟)②TPS(每秒Token数/吞吐量)③ITL(Token间延迟)④并发上限⑤GPU利用率⑥Error Rate。测试方法:逐渐增加并发用户数,记录各指标随并发的变化曲线,找到系统的饱和点和降级点。建议区分prefill和decode阶段分别分析。
Q9: 如何处理LLM的幻觉问题?¶
答:多层防护:①RAG增强:用检索到的事实约束生成②Prompt工程:要求引用来源、加入"如果不确定请说不知道"③后处理:事实核查模块、NLI一致性检验④评估:建立幻觉检测benchmark⑤RLHF/DPO:训练模型减少幻觉⑥Decoding策略:降低temperature、限制生成长度。
✅ 学习检查清单¶
- 能估算LLM部署的显存需求
- 理解PagedAttention和Continuous Batching原理
- 能使用vLLM/TGI/Ollama部署LLM
- 理解GPTQ/AWQ/GGUF量化方法及选型
- 能设计Prompt版本管理系统
- 能使用LLM-as-Judge评估模型质量
- 理解RAGAS的RAG评估指标
- 能设计模型路由和语义缓存降低成本
- 能设计完整的LLMOps Pipeline
- 理解KV Cache计算和优化方法
- 能回答所有面试题
📌 返回目录:README — MLOps与AI工程化导航页
