跳转至

04 - 云端推理服务

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

将模型部署到云端,提供可扩展的推理服务

📎 交叉引用:FastAPI推理服务封装请参考 LLM学习/03-推理服务部署 §5,通用LLM部署成本/监控请参考 LLM应用/11-大模型部署 §11.4。本章侧重云平台特有功能(SageMaker/GCP Vertex AI)。

📖 章节概述

本章将深入探讨云端推理服务的部署和优化,包括云端部署、API服务设计和成本优化等内容。这些技术可以帮助你构建高性能、低成本的云端推理服务。

🎯 学习目标

完成本章后,你将能够:

  • 理解云端推理服务的架构设计
  • 掌握模型部署到云端的方法
  • 了解API服务的设计和实现
  • 能够优化云端推理服务的成本

1. 云端推理服务概述

1.1 云端推理的优势

可扩展性: - 根据需求自动扩缩容 - 支持高并发访问 - 灵活调整资源配置

成本效益: - 按需付费 - 无需购买硬件 - 降低运维成本

高性能: - 使用最新的GPU硬件 - 优化的网络和存储 - 专业的运维支持

1.2 云端推理架构

Text Only
云端推理服务架构
├── 负载均衡层
│   ├── API网关
│   └── 负载均衡器
├── 推理服务层
│   ├── 模型服务器
│   ├── 批处理服务
│   └── 缓存服务
├── 模型管理层
│   ├── 模型仓库
│   ├── 模型版本控制
│   └── 模型监控
└── 监控和日志
    ├── 性能监控
    ├── 日志收集
    └── 告警系统

2. 云端部署

2.1 使用Hugging Face Inference Endpoints

Python
from huggingface_hub import InferenceClient

# 初始化推理客户端
client = InferenceClient(
    model="meta-llama/Llama-2-7b-hf",
    token="your_api_token"
)

# 推理
prompt = "请介绍一下人工智能的发展历程"
output = client.text_generation(
    prompt,
    max_new_tokens=200,
    temperature=0.7,
    top_p=0.95
)

print(output)

2.2 使用AWS SageMaker

Python
import boto3
import sagemaker
from sagemaker.huggingface import HuggingFaceModel

def deploy_to_sagemaker(model_name, instance_type="ml.g4dn.xlarge"):
    """
    部署模型到AWS SageMaker

    Args:
        model_name: 模型名称
        instance_type: 实例类型
    """
    # 创建SageMaker会话
    session = sagemaker.Session()
    role = sagemaker.get_execution_role()

    # 创建HuggingFace模型
    # 注意:版本号需与SageMaker支持的容器镜像匹配
    # 最新支持版本请参考:https://docs.aws.amazon.com/sagemaker/latest/dg-ecr-paths/sagemaker-algo-docker-registry-paths.html
    huggingface_model = HuggingFaceModel(
        model_data=model_name,
        role=role,
        transformers_version="4.37",  # 更新到2024年稳定版本
        pytorch_version="2.1",        # 更新到PyTorch 2.x
        py_version="py310",
        model_server_workers=1
    )

    # 部署模型
    predictor = huggingface_model.deploy(
        initial_instance_count=1,
        instance_type=instance_type,
        endpoint_name=f"{model_name}-endpoint"
    )

    return predictor

# 使用示例
# predictor = deploy_to_sagemaker("meta-llama/Llama-2-7b-hf")

# 推理
# response = predictor.predict({
#     "inputs": "请介绍一下人工智能的发展历程",
#     "parameters": {
#         "max_new_tokens": 200,
#         "temperature": 0.7
#     }
# })

2.3 使用Google Cloud AI Platform

Python
from google.cloud import aiplatform

def deploy_to_gcp(model_name, project_id, region="us-central1"):
    """
    部署模型到Google Cloud AI Platform

    Args:
        model_name: 模型名称
        project_id: GCP项目ID
        region: 区域
    """
    # 初始化AI Platform
    aiplatform.init(project=project_id, location=region)

    # 上传模型
    model = aiplatform.Model.upload(
        display_name=model_name,
        artifact_uri=f"gs://your-bucket/{model_name}",
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/pytorch-gpu.1-13:latest"
    )

    # 部署端点
    endpoint = model.deploy(
        machine_type="n1-standard-4",
        accelerator_type="NVIDIA_TESLA_T4",
        accelerator_count=1,
        min_replica_count=1,
        max_replica_count=3
    )

    return endpoint

# 使用示例
# endpoint = deploy_to_gcp("meta-llama/Llama-2-7b-hf", "your-project-id")

3. API服务设计

3.1 使用FastAPI构建推理服务

Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

app = FastAPI(title="LLM Inference API")

# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto"
)

class InferenceRequest(BaseModel):  # BaseModel Pydantic数据验证模型
    prompt: str
    max_tokens: int = 200
    temperature: float = 0.7
    top_p: float = 0.95

class InferenceResponse(BaseModel):
    text: str
    tokens_generated: int

@app.post("/generate", response_model=InferenceResponse)
async def generate_text(request: InferenceRequest):  # async定义异步函数
    """
    生成文本
    """
    try:  # try/except捕获异常
        # 编码输入
        inputs = tokenizer(request.prompt, return_tensors="pt").to(model.device)

        # 生成文本
        with torch.no_grad():  # 禁用梯度计算,节省内存
            outputs = model.generate(
                **inputs,
                max_new_tokens=request.max_tokens,
                temperature=request.temperature,
                top_p=request.top_p,
                do_sample=True
            )

        # 解码输出
        generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
        tokens_generated = outputs.shape[1] - inputs["input_ids"].shape[1]

        return InferenceResponse(
            text=generated_text,
            tokens_generated=tokens_generated
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """
    健康检查
    """
    return {"status": "healthy"}

@app.get("/model/info")
async def model_info():
    """
    模型信息
    """
    return {
        "model_name": model_name,
        "device": str(model.device),
        "parameters": model.num_parameters()
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

3.2 批处理推理服务

Python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import asyncio

app = FastAPI(title="Batch Inference API")

# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto"
)

class BatchInferenceRequest(BaseModel):
    prompts: list[str]
    max_tokens: int = 200
    temperature: float = 0.7
    top_p: float = 0.95

class BatchInferenceResponse(BaseModel):
    texts: list[str]
    tokens_generated: list[int]

@app.post("/batch-generate", response_model=BatchInferenceResponse)
async def batch_generate_text(request: BatchInferenceRequest):
    """
    批量生成文本
    """
    try:
        # 编码输入
        inputs = tokenizer(
            request.prompts,
            padding=True,
            truncation=True,
            return_tensors="pt"
        ).to(model.device)

        # 生成文本
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=request.max_tokens,
                temperature=request.temperature,
                top_p=request.top_p,
                do_sample=True,
                pad_token_id=tokenizer.eos_token_id
            )

        # 解码输出
        generated_texts = []
        tokens_generated = []

        for i, output in enumerate(outputs):  # enumerate同时获取索引和元素
            generated_text = tokenizer.decode(output, skip_special_tokens=True)
            generated_texts.append(generated_text)
            tokens_generated.append(output.shape[0] - inputs["input_ids"][i].shape[0])

        return BatchInferenceResponse(
            texts=generated_texts,
            tokens_generated=tokens_generated
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

3.3 流式推理服务

Python
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import json

app = FastAPI(title="Streaming Inference API")

# 加载模型
model_name = "meta-llama/Llama-2-7b-hf"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map="auto"
)

class StreamingRequest(BaseModel):
    prompt: str
    max_tokens: int = 200
    temperature: float = 0.7
    top_p: float = 0.95

async def stream_generator(request: StreamingRequest):
    """
    流式生成器
    """
    try:
        # 编码输入
        inputs = tokenizer(request.prompt, return_tensors="pt").to(model.device)

        # 流式生成
        generated_tokens = []
        with torch.no_grad():
            for i in range(request.max_tokens):
                outputs = model.generate(
                    **inputs,
                    max_new_tokens=1,
                    temperature=request.temperature,
                    top_p=request.top_p,
                    do_sample=True
                )

                # 获取新生成的token
                new_token = outputs[0][-1].unsqueeze(0)  # unsqueeze增加一个维度  # [-1]负索引取最后元素
                generated_tokens.append(new_token)

                # 更新输入
                inputs["input_ids"] = torch.cat([inputs["input_ids"], new_token], dim=1)  # torch.cat沿已有维度拼接张量

                # 生成文本片段
                text = tokenizer.decode(torch.cat(generated_tokens, dim=0), skip_special_tokens=True)

                # 发送数据
                yield json.dumps({"text": text, "done": False}) + "\n"  # yield产出值,函数变为生成器  # json.dumps将Python对象序列化为JSON字符串

        # 发送完成信号
        yield json.dumps({"text": text, "done": True}) + "\n"

    except Exception as e:
        yield json.dumps({"error": str(e)}) + "\n"

@app.post("/stream-generate")
async def stream_generate_text(request: StreamingRequest):
    """
    流式生成文本
    """
    return StreamingResponse(
        stream_generator(request),
        media_type="text/event-stream"
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

4. 成本优化

4.1 实例选择策略

Python
def calculate_cost(instance_type, hours, region="us-east-1"):
    """
    计算云实例成本

    Args:
        instance_type: 实例类型
        hours: 使用小时数
        region: 区域
    """
    # AWS定价(示例)
    pricing = {
        "ml.g4dn.xlarge": 0.526,  # 1x T4 GPU
        "ml.g4dn.2xlarge": 0.752,  # 1x T4 GPU
        "ml.g5.xlarge": 1.006,  # 1x A10G GPU
        "ml.g5.2xlarge": 1.341,  # 1x A10G GPU
        "ml.p3.2xlarge": 3.06,  # 1x V100 GPU
        "ml.p3.8xlarge": 12.24,  # 4x V100 GPU
    }

    hourly_cost = pricing.get(instance_type, 0)
    total_cost = hourly_cost * hours

    return {
        "instance_type": instance_type,
        "hourly_cost": hourly_cost,
        "hours": hours,
        "total_cost": total_cost
    }

# 使用示例
# cost = calculate_cost("ml.g4dn.xlarge", 100)
# print(f"总成本: ${cost['total_cost']:.2f}")

4.2 自动扩缩容

Python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import boto3

app = FastAPI(title="Auto-scaling Inference Service")

# 初始化AWS客户端
autoscaling = boto3.client('autoscaling')
ec2 = boto3.client('ec2')

class ScalingPolicy:
    """
    自动扩缩容策略
    """
    def __init__(self, min_instances=1, max_instances=5,
                 scale_up_threshold=0.8, scale_down_threshold=0.3):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.scale_up_threshold = scale_up_threshold
        self.scale_down_threshold = scale_down_threshold

    def should_scale_up(self, current_load, current_instances):
        """
        判断是否需要扩容
        """
        return (current_load > self.scale_up_threshold and
                current_instances < self.max_instances)

    def should_scale_down(self, current_load, current_instances):
        """
        判断是否需要缩容
        """
        return (current_load < self.scale_down_threshold and
                current_instances > self.min_instances)

# 全局变量
scaling_policy = ScalingPolicy()
current_instances = 1
current_load = 0.0

@app.get("/metrics")
async def get_metrics():
    """
    获取指标
    """
    return {
        "current_instances": current_instances,
        "current_load": current_load
    }

@app.post("/scale")
async def scale_instances():
    """
    扩缩容实例
    """
    global current_instances

    if scaling_policy.should_scale_up(current_load, current_instances):
        # 扩容
        new_instances = min(current_instances + 1, scaling_policy.max_instances)
        current_instances = new_instances
        return {"action": "scale_up", "new_instances": new_instances}

    elif scaling_policy.should_scale_down(current_load, current_instances):
        # 缩容
        new_instances = max(current_instances - 1, scaling_policy.min_instances)
        current_instances = new_instances
        return {"action": "scale_down", "new_instances": new_instances}

    else:
        return {"action": "no_change", "current_instances": current_instances}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

4.3 缓存策略

Python
from fastapi import FastAPI
from pydantic import BaseModel
import hashlib
import json
from datetime import datetime, timedelta

app = FastAPI(title="Cached Inference Service")

class CacheEntry:
    """
    缓存条目
    """
    def __init__(self, text: str, ttl: int = 3600):
        self.text = text
        self.created_at = datetime.now()
        self.ttl = ttl

    def is_expired(self):
        """
        检查是否过期
        """
        return datetime.now() - self.created_at > timedelta(seconds=self.ttl)

class InferenceCache:
    """
    推理缓存
    """
    def __init__(self, max_size=1000):
        self.cache = {}
        self.max_size = max_size

    def _generate_key(self, prompt: str, **kwargs) -> str:  # *args接收任意位置参数,**kwargs接收任意关键字参数
        """
        生成缓存键
        """
        data = {"prompt": prompt, **kwargs}
        data_str = json.dumps(data, sort_keys=True)
        return hashlib.md5(data_str.encode()).hexdigest()

    def get(self, prompt: str, **kwargs) -> str | None:
        """
        获取缓存
        """
        key = self._generate_key(prompt, **kwargs)
        entry = self.cache.get(key)

        if entry and not entry.is_expired():
            return entry.text

        return None

    def set(self, prompt: str, text: str, **kwargs):
        """
        设置缓存
        """
        key = self._generate_key(prompt, **kwargs)

        # 如果缓存已满,删除最旧的条目
        if len(self.cache) >= self.max_size:
            oldest_key = min(self.cache.keys(),
                           key=lambda k: self.cache[k].created_at)  # lambda匿名函数
            del self.cache[oldest_key]

        self.cache[key] = CacheEntry(text)

    def clear(self):
        """
        清空缓存
        """
        self.cache.clear()

# 全局缓存
inference_cache = InferenceCache()

class InferenceRequest(BaseModel):
    prompt: str
    max_tokens: int = 200
    temperature: float = 0.7
    use_cache: bool = True

@app.post("/generate")
async def generate_with_cache(request: InferenceRequest):
    """
    带缓存的推理
    """
    # 检查缓存
    if request.use_cache:
        cached_result = inference_cache.get(
            request.prompt,
            max_tokens=request.max_tokens,
            temperature=request.temperature
        )

        if cached_result:
            return {"text": cached_result, "from_cache": True}

    # 执行推理(这里简化处理)
    generated_text = f"生成的文本: {request.prompt}"

    # 缓存结果
    if request.use_cache:
        inference_cache.set(
            request.prompt,
            generated_text,
            max_tokens=request.max_tokens,
            temperature=request.temperature
        )

    return {"text": generated_text, "from_cache": False}

@app.post("/cache/clear")
async def clear_cache():
    """
    清空缓存
    """
    inference_cache.clear()
    return {"status": "cache_cleared"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

5. 监控和日志

5.1 性能监控

Python
from fastapi import FastAPI, Request
import time
import psutil
import torch

app = FastAPI(title="Monitored Inference Service")

# 性能指标
metrics = {
    "total_requests": 0,
    "total_tokens": 0,
    "total_time": 0.0,
    "gpu_memory_used": 0.0,
    "cpu_usage": 0.0
}

@app.middleware("http")
async def monitor_requests(request: Request, call_next):
    """
    监控请求
    """
    start_time = time.time()

    # 处理请求
    response = await call_next(request)  # await等待异步操作完成

    # 更新指标
    process_time = time.time() - start_time
    metrics["total_requests"] += 1
    metrics["total_time"] += process_time

    # 添加响应头
    response.headers["X-Process-Time"] = str(process_time)

    return response

@app.get("/metrics")
async def get_metrics():
    """
    获取性能指标
    """
    # 获取GPU内存使用
    if torch.cuda.is_available():
        gpu_memory_used = torch.cuda.memory_allocated() / 1e9
        metrics["gpu_memory_used"] = gpu_memory_used

    # 获取CPU使用率
    metrics["cpu_usage"] = psutil.cpu_percent()

    # 计算平均响应时间
    avg_time = metrics["total_time"] / metrics["total_requests"] if metrics["total_requests"] > 0 else 0

    return {
        **metrics,
        "avg_response_time": avg_time
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

5.2 日志收集

Python
from fastapi import FastAPI, Request
import logging
from datetime import datetime
import json

app = FastAPI(title="Logging Inference Service")

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('inference.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

@app.middleware("http")
async def log_requests(request: Request, call_next):
    """
    记录请求日志
    """
    start_time = datetime.now()

    # 记录请求信息
    log_data = {
        "timestamp": start_time.isoformat(),
        "method": request.method,
        "url": str(request.url),
        "client": request.client.host if request.client else None
    }

    logger.info(f"Request: {json.dumps(log_data)}")

    # 处理请求
    response = await call_next(request)

    # 记录响应信息
    end_time = datetime.now()
    process_time = (end_time - start_time).total_seconds()

    log_data.update({
        "status_code": response.status_code,
        "process_time": process_time
    })

    logger.info(f"Response: {json.dumps(log_data)}")

    return response

@app.get("/")
async def root():
    """
    根路径
    """
    logger.info("Root endpoint accessed")
    return {"message": "Inference Service"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

6. 练习题

基础练习

  1. 实现简单的推理API

    Python
    # TODO: 实现一个简单的推理API
    from fastapi import FastAPI
    app = FastAPI()
    
    @app.post("/generate")
    async def generate(prompt: str):
        # 你的代码
        pass
    

  2. 实现缓存机制

    Python
    # TODO: 实现一个简单的缓存类
    class SimpleCache:
        def __init__(self):
            # 你的代码
            pass
    
        def get(self, key):
            # 你的代码
            pass
    
        def set(self, key, value):
            # 你的代码
            pass
    

进阶练习

  1. 实现自动扩缩容

    Python
    # TODO: 实现基于负载的自动扩缩容
    class AutoScaler:
        def __init__(self, min_instances, max_instances):
            # 你的代码
            pass
    
        def check_and_scale(self, current_load):
            # 你的代码
            pass
    

  2. 实现成本计算器

    Python
    # TODO: 实现云端推理成本计算器
    class CostCalculator:
        def __init__(self, pricing):
            # 你的代码
            pass
    
        def calculate(self, instance_type, hours):
            # 你的代码
            pass
    

项目练习

  1. 创建完整的云端推理服务
  2. 支持多种推理模式
  3. 实现自动扩缩容
  4. 集成监控和日志

7. 最佳实践

✅ 推荐做法

  1. 选择合适的云服务
  2. 根据需求选择AWS、GCP或Azure
  3. 考虑成本、性能和可用性
  4. 使用托管服务简化部署

  5. 优化成本

  6. 使用预留实例
  7. 实现自动扩缩容
  8. 利用缓存减少计算

  9. 监控和告警

  10. 监控关键指标
  11. 设置告警阈值
  12. 及时响应问题

❌ 避免做法

  1. 过度配置
  2. 不要使用过大的实例
  3. 根据实际需求选择
  4. 定期评估和优化

  5. 忽略安全

  6. 使用HTTPS加密
  7. 实现认证和授权
  8. 保护API密钥

  9. 缺乏监控

  10. 不要忽略性能监控
  11. 记录关键指标
  12. 建立告警机制

8. 总结

本章介绍了云端推理服务的核心内容:

  • 云端部署: AWS、GCP等云平台部署
  • API服务: FastAPI构建推理服务
  • 成本优化: 实例选择、自动扩缩容、缓存
  • 监控日志: 性能监控和日志收集

构建高质量的云端推理服务需要综合考虑性能、成本和可靠性。

9. 下一步

继续学习05-边缘部署,了解如何将模型部署到边缘设备。