04 - 云端推理服务¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
将模型部署到云端,提供可扩展的推理服务
📎 交叉引用:FastAPI推理服务封装请参考 LLM学习/03-推理服务部署 §5,通用LLM部署成本/监控请参考 LLM应用/11-大模型部署 §11.4。本章侧重云平台特有功能(SageMaker/GCP Vertex AI)。
📖 章节概述¶
本章将深入探讨云端推理服务的部署和优化,包括云端部署、API服务设计和成本优化等内容。这些技术可以帮助你构建高性能、低成本的云端推理服务。
🎯 学习目标¶
完成本章后,你将能够:
- 理解云端推理服务的架构设计
- 掌握模型部署到云端的方法
- 了解API服务的设计和实现
- 能够优化云端推理服务的成本
1. 云端推理服务概述¶
1.1 云端推理的优势¶
可扩展性: - 根据需求自动扩缩容 - 支持高并发访问 - 灵活调整资源配置
成本效益: - 按需付费 - 无需购买硬件 - 降低运维成本
高性能: - 使用最新的GPU硬件 - 优化的网络和存储 - 专业的运维支持
1.2 云端推理架构¶
Text Only
云端推理服务架构
├── 负载均衡层
│ ├── API网关
│ └── 负载均衡器
├── 推理服务层
│ ├── 模型服务器
│ ├── 批处理服务
│ └── 缓存服务
├── 模型管理层
│ ├── 模型仓库
│ ├── 模型版本控制
│ └── 模型监控
└── 监控和日志
├── 性能监控
├── 日志收集
└── 告警系统
2. 云端部署¶
2.1 使用Hugging Face Inference Endpoints¶
Python
from huggingface_hub import InferenceClient
# 初始化推理客户端
client = InferenceClient(
model="meta-llama/Llama-2-7b-hf",
token="your_api_token"
)
# 推理
prompt = "请介绍一下人工智能的发展历程"
output = client.text_generation(
prompt,
max_new_tokens=200,
temperature=0.7,
top_p=0.95
)
print(output)
2.2 使用AWS SageMaker¶
Python
import boto3
import sagemaker
from sagemaker.huggingface import HuggingFaceModel
def deploy_to_sagemaker(model_name, instance_type="ml.g4dn.xlarge"):
"""
部署模型到AWS SageMaker
Args:
model_name: 模型名称
instance_type: 实例类型
"""
# 创建SageMaker会话
session = sagemaker.Session()
role = sagemaker.get_execution_role()
# 创建HuggingFace模型
# 注意:版本号需与SageMaker支持的容器镜像匹配
# 最新支持版本请参考:https://docs.aws.amazon.com/sagemaker/latest/dg-ecr-paths/sagemaker-algo-docker-registry-paths.html
huggingface_model = HuggingFaceModel(
model_data=model_name,
role=role,
transformers_version="4.37", # 更新到2024年稳定版本
pytorch_version="2.1", # 更新到PyTorch 2.x
py_version="py310",
model_server_workers=1
)
# 部署模型
predictor = huggingface_model.deploy(
initial_instance_count=1,
instance_type=instance_type,
endpoint_name=f"{model_name}-endpoint"
)
return predictor
# 使用示例
# predictor = deploy_to_sagemaker("meta-llama/Llama-2-7b-hf")
# 推理
# response = predictor.predict({
# "inputs": "请介绍一下人工智能的发展历程",
# "parameters": {
# "max_new_tokens": 200,
# "temperature": 0.7
# }
# })
2.3 使用Google Cloud AI Platform¶
Python
from google.cloud import aiplatform
def deploy_to_gcp(model_name, project_id, region="us-central1"):
"""
部署模型到Google Cloud AI Platform
Args:
model_name: 模型名称
project_id: GCP项目ID
region: 区域
"""
# 初始化AI Platform
aiplatform.init(project=project_id, location=region)
# 上传模型
model = aiplatform.Model.upload(
display_name=model_name,
artifact_uri=f"gs://your-bucket/{model_name}",
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/pytorch-gpu.1-13:latest"
)
# 部署端点
endpoint = model.deploy(
machine_type="n1-standard-4",
accelerator_type="NVIDIA_TESLA_T4",
accelerator_count=1,
min_replica_count=1,
max_replica_count=3
)
return endpoint
# 使用示例
# endpoint = deploy_to_gcp("meta-llama/Llama-2-7b-hf", "your-project-id")
3. API服务设计¶
3.1 使用FastAPI构建推理服务¶
Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
app = FastAPI(title="LLM Inference API")
# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto"
)
class InferenceRequest(BaseModel): # BaseModel Pydantic数据验证模型
prompt: str
max_tokens: int = 200
temperature: float = 0.7
top_p: float = 0.95
class InferenceResponse(BaseModel):
text: str
tokens_generated: int
@app.post("/generate", response_model=InferenceResponse)
async def generate_text(request: InferenceRequest): # async定义异步函数
"""
生成文本
"""
try: # try/except捕获异常
# 编码输入
inputs = tokenizer(request.prompt, return_tensors="pt").to(model.device)
# 生成文本
with torch.no_grad(): # 禁用梯度计算,节省内存
outputs = model.generate(
**inputs,
max_new_tokens=request.max_tokens,
temperature=request.temperature,
top_p=request.top_p,
do_sample=True
)
# 解码输出
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
tokens_generated = outputs.shape[1] - inputs["input_ids"].shape[1]
return InferenceResponse(
text=generated_text,
tokens_generated=tokens_generated
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""
健康检查
"""
return {"status": "healthy"}
@app.get("/model/info")
async def model_info():
"""
模型信息
"""
return {
"model_name": model_name,
"device": str(model.device),
"parameters": model.num_parameters()
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
3.2 批处理推理服务¶
Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import asyncio
app = FastAPI(title="Batch Inference API")
# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto"
)
class BatchInferenceRequest(BaseModel):
prompts: list[str]
max_tokens: int = 200
temperature: float = 0.7
top_p: float = 0.95
class BatchInferenceResponse(BaseModel):
texts: list[str]
tokens_generated: list[int]
@app.post("/batch-generate", response_model=BatchInferenceResponse)
async def batch_generate_text(request: BatchInferenceRequest):
"""
批量生成文本
"""
try:
# 编码输入
inputs = tokenizer(
request.prompts,
padding=True,
truncation=True,
return_tensors="pt"
).to(model.device)
# 生成文本
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=request.max_tokens,
temperature=request.temperature,
top_p=request.top_p,
do_sample=True,
pad_token_id=tokenizer.eos_token_id
)
# 解码输出
generated_texts = []
tokens_generated = []
for i, output in enumerate(outputs): # enumerate同时获取索引和元素
generated_text = tokenizer.decode(output, skip_special_tokens=True)
generated_texts.append(generated_text)
tokens_generated.append(output.shape[0] - inputs["input_ids"][i].shape[0])
return BatchInferenceResponse(
texts=generated_texts,
tokens_generated=tokens_generated
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
3.3 流式推理服务¶
Python
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import json
app = FastAPI(title="Streaming Inference API")
# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map="auto"
)
class StreamingRequest(BaseModel):
prompt: str
max_tokens: int = 200
temperature: float = 0.7
top_p: float = 0.95
async def stream_generator(request: StreamingRequest):
"""
流式生成器
"""
try:
# 编码输入
inputs = tokenizer(request.prompt, return_tensors="pt").to(model.device)
# 流式生成
generated_tokens = []
with torch.no_grad():
for i in range(request.max_tokens):
outputs = model.generate(
**inputs,
max_new_tokens=1,
temperature=request.temperature,
top_p=request.top_p,
do_sample=True
)
# 获取新生成的token
new_token = outputs[0][-1].unsqueeze(0) # unsqueeze增加一个维度 # [-1]负索引取最后元素
generated_tokens.append(new_token)
# 更新输入
inputs["input_ids"] = torch.cat([inputs["input_ids"], new_token], dim=1) # torch.cat沿已有维度拼接张量
# 生成文本片段
text = tokenizer.decode(torch.cat(generated_tokens, dim=0), skip_special_tokens=True)
# 发送数据
yield json.dumps({"text": text, "done": False}) + "\n" # yield产出值,函数变为生成器 # json.dumps将Python对象序列化为JSON字符串
# 发送完成信号
yield json.dumps({"text": text, "done": True}) + "\n"
except Exception as e:
yield json.dumps({"error": str(e)}) + "\n"
@app.post("/stream-generate")
async def stream_generate_text(request: StreamingRequest):
"""
流式生成文本
"""
return StreamingResponse(
stream_generator(request),
media_type="text/event-stream"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
4. 成本优化¶
4.1 实例选择策略¶
Python
def calculate_cost(instance_type, hours, region="us-east-1"):
"""
计算云实例成本
Args:
instance_type: 实例类型
hours: 使用小时数
region: 区域
"""
# AWS定价(示例)
pricing = {
"ml.g4dn.xlarge": 0.526, # 1x T4 GPU
"ml.g4dn.2xlarge": 0.752, # 1x T4 GPU
"ml.g5.xlarge": 1.006, # 1x A10G GPU
"ml.g5.2xlarge": 1.341, # 1x A10G GPU
"ml.p3.2xlarge": 3.06, # 1x V100 GPU
"ml.p3.8xlarge": 12.24, # 4x V100 GPU
}
hourly_cost = pricing.get(instance_type, 0)
total_cost = hourly_cost * hours
return {
"instance_type": instance_type,
"hourly_cost": hourly_cost,
"hours": hours,
"total_cost": total_cost
}
# 使用示例
# cost = calculate_cost("ml.g4dn.xlarge", 100)
# print(f"总成本: ${cost['total_cost']:.2f}")
4.2 自动扩缩容¶
Python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import boto3
app = FastAPI(title="Auto-scaling Inference Service")
# 初始化AWS客户端
autoscaling = boto3.client('autoscaling')
ec2 = boto3.client('ec2')
class ScalingPolicy:
"""
自动扩缩容策略
"""
def __init__(self, min_instances=1, max_instances=5,
scale_up_threshold=0.8, scale_down_threshold=0.3):
self.min_instances = min_instances
self.max_instances = max_instances
self.scale_up_threshold = scale_up_threshold
self.scale_down_threshold = scale_down_threshold
def should_scale_up(self, current_load, current_instances):
"""
判断是否需要扩容
"""
return (current_load > self.scale_up_threshold and
current_instances < self.max_instances)
def should_scale_down(self, current_load, current_instances):
"""
判断是否需要缩容
"""
return (current_load < self.scale_down_threshold and
current_instances > self.min_instances)
# 全局变量
scaling_policy = ScalingPolicy()
current_instances = 1
current_load = 0.0
@app.get("/metrics")
async def get_metrics():
"""
获取指标
"""
return {
"current_instances": current_instances,
"current_load": current_load
}
@app.post("/scale")
async def scale_instances():
"""
扩缩容实例
"""
global current_instances
if scaling_policy.should_scale_up(current_load, current_instances):
# 扩容
new_instances = min(current_instances + 1, scaling_policy.max_instances)
current_instances = new_instances
return {"action": "scale_up", "new_instances": new_instances}
elif scaling_policy.should_scale_down(current_load, current_instances):
# 缩容
new_instances = max(current_instances - 1, scaling_policy.min_instances)
current_instances = new_instances
return {"action": "scale_down", "new_instances": new_instances}
else:
return {"action": "no_change", "current_instances": current_instances}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
4.3 缓存策略¶
Python
from fastapi import FastAPI
from pydantic import BaseModel
import hashlib
import json
from datetime import datetime, timedelta
app = FastAPI(title="Cached Inference Service")
class CacheEntry:
"""
缓存条目
"""
def __init__(self, text: str, ttl: int = 3600):
self.text = text
self.created_at = datetime.now()
self.ttl = ttl
def is_expired(self):
"""
检查是否过期
"""
return datetime.now() - self.created_at > timedelta(seconds=self.ttl)
class InferenceCache:
"""
推理缓存
"""
def __init__(self, max_size=1000):
self.cache = {}
self.max_size = max_size
def _generate_key(self, prompt: str, **kwargs) -> str: # *args接收任意位置参数,**kwargs接收任意关键字参数
"""
生成缓存键
"""
data = {"prompt": prompt, **kwargs}
data_str = json.dumps(data, sort_keys=True)
return hashlib.md5(data_str.encode()).hexdigest()
def get(self, prompt: str, **kwargs) -> str | None:
"""
获取缓存
"""
key = self._generate_key(prompt, **kwargs)
entry = self.cache.get(key)
if entry and not entry.is_expired():
return entry.text
return None
def set(self, prompt: str, text: str, **kwargs):
"""
设置缓存
"""
key = self._generate_key(prompt, **kwargs)
# 如果缓存已满,删除最旧的条目
if len(self.cache) >= self.max_size:
oldest_key = min(self.cache.keys(),
key=lambda k: self.cache[k].created_at) # lambda匿名函数
del self.cache[oldest_key]
self.cache[key] = CacheEntry(text)
def clear(self):
"""
清空缓存
"""
self.cache.clear()
# 全局缓存
inference_cache = InferenceCache()
class InferenceRequest(BaseModel):
prompt: str
max_tokens: int = 200
temperature: float = 0.7
use_cache: bool = True
@app.post("/generate")
async def generate_with_cache(request: InferenceRequest):
"""
带缓存的推理
"""
# 检查缓存
if request.use_cache:
cached_result = inference_cache.get(
request.prompt,
max_tokens=request.max_tokens,
temperature=request.temperature
)
if cached_result:
return {"text": cached_result, "from_cache": True}
# 执行推理(这里简化处理)
generated_text = f"生成的文本: {request.prompt}"
# 缓存结果
if request.use_cache:
inference_cache.set(
request.prompt,
generated_text,
max_tokens=request.max_tokens,
temperature=request.temperature
)
return {"text": generated_text, "from_cache": False}
@app.post("/cache/clear")
async def clear_cache():
"""
清空缓存
"""
inference_cache.clear()
return {"status": "cache_cleared"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
5. 监控和日志¶
5.1 性能监控¶
Python
from fastapi import FastAPI, Request
import time
import psutil
import torch
app = FastAPI(title="Monitored Inference Service")
# 性能指标
metrics = {
"total_requests": 0,
"total_tokens": 0,
"total_time": 0.0,
"gpu_memory_used": 0.0,
"cpu_usage": 0.0
}
@app.middleware("http")
async def monitor_requests(request: Request, call_next):
"""
监控请求
"""
start_time = time.time()
# 处理请求
response = await call_next(request) # await等待异步操作完成
# 更新指标
process_time = time.time() - start_time
metrics["total_requests"] += 1
metrics["total_time"] += process_time
# 添加响应头
response.headers["X-Process-Time"] = str(process_time)
return response
@app.get("/metrics")
async def get_metrics():
"""
获取性能指标
"""
# 获取GPU内存使用
if torch.cuda.is_available():
gpu_memory_used = torch.cuda.memory_allocated() / 1e9
metrics["gpu_memory_used"] = gpu_memory_used
# 获取CPU使用率
metrics["cpu_usage"] = psutil.cpu_percent()
# 计算平均响应时间
avg_time = metrics["total_time"] / metrics["total_requests"] if metrics["total_requests"] > 0 else 0
return {
**metrics,
"avg_response_time": avg_time
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
5.2 日志收集¶
Python
from fastapi import FastAPI, Request
import logging
from datetime import datetime
import json
app = FastAPI(title="Logging Inference Service")
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('inference.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""
记录请求日志
"""
start_time = datetime.now()
# 记录请求信息
log_data = {
"timestamp": start_time.isoformat(),
"method": request.method,
"url": str(request.url),
"client": request.client.host if request.client else None
}
logger.info(f"Request: {json.dumps(log_data)}")
# 处理请求
response = await call_next(request)
# 记录响应信息
end_time = datetime.now()
process_time = (end_time - start_time).total_seconds()
log_data.update({
"status_code": response.status_code,
"process_time": process_time
})
logger.info(f"Response: {json.dumps(log_data)}")
return response
@app.get("/")
async def root():
"""
根路径
"""
logger.info("Root endpoint accessed")
return {"message": "Inference Service"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
6. 练习题¶
基础练习¶
-
实现简单的推理API
-
实现缓存机制
进阶练习¶
-
实现自动扩缩容
-
实现成本计算器
项目练习¶
- 创建完整的云端推理服务
- 支持多种推理模式
- 实现自动扩缩容
- 集成监控和日志
7. 最佳实践¶
✅ 推荐做法¶
- 选择合适的云服务
- 根据需求选择AWS、GCP或Azure
- 考虑成本、性能和可用性
-
使用托管服务简化部署
-
优化成本
- 使用预留实例
- 实现自动扩缩容
-
利用缓存减少计算
-
监控和告警
- 监控关键指标
- 设置告警阈值
- 及时响应问题
❌ 避免做法¶
- 过度配置
- 不要使用过大的实例
- 根据实际需求选择
-
定期评估和优化
-
忽略安全
- 使用HTTPS加密
- 实现认证和授权
-
保护API密钥
-
缺乏监控
- 不要忽略性能监控
- 记录关键指标
- 建立告警机制
8. 总结¶
本章介绍了云端推理服务的核心内容:
- 云端部署: AWS、GCP等云平台部署
- API服务: FastAPI构建推理服务
- 成本优化: 实例选择、自动扩缩容、缓存
- 监控日志: 性能监控和日志收集
构建高质量的云端推理服务需要综合考虑性能、成本和可靠性。
9. 下一步¶
继续学习05-边缘部署,了解如何将模型部署到边缘设备。