跳转至

Agent生产部署

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

掌握Agent从原型到生产的完整部署流程,包括容错设计、可观测性建设、成本控制和安全防护。

学习时间:6小时 | 难度:⭐⭐⭐⭐ 中高级 | 前置知识:Agent框架实战(Ch02)、评估(Ch05)


🎯 学习目标

  • 掌握Agent从原型到生产的完整部署流程
  • 理解Agent容错设计(重试、降级、熔断)
  • 学会Agent可观测性建设(日志、追踪、指标)
  • 掌握Agent成本控制与性能优化策略
  • 了解Agent安全防护与合规要求

1. 生产化架构

1.1 原型vs生产

Text Only
原型Agent:                      生产Agent:
├── 单次调用                     ├── 高并发请求处理
├── 无错误处理                   ├── 重试 + 降级 + 熔断
├── print调试                    ├── 结构化日志 + 分布式追踪
├── 无限制调用                   ├── 速率限制 + 成本封顶
├── 同步执行                     ├── 异步执行 + 任务队列
└── 信任所有输入                 └── 输入验证 + 安全防护

1.2 生产架构图

Agent生产部署架构

注:上图展示的是一种典型的Agent服务分层架构示意。

生产级Agent系统需要完整的架构支持:

Text Only
                   ┌──────────────┐
                   │   API Gateway │ ← 速率限制、认证
                   └──────┬───────┘
                   ┌──────┴───────┐
                   │  Agent Router │ ← 路由到合适的Agent
                   └──────┬───────┘
          ┌───────────────┼───────────────┐
          │               │               │
    ┌─────┴─────┐  ┌─────┴─────┐  ┌─────┴─────┐
    │ Agent实例1 │  │ Agent实例2 │  │ Agent实例3 │
    └─────┬─────┘  └─────┬─────┘  └─────┬─────┘
          │               │               │
    ┌─────┴───────────────┴───────────────┴─────┐
    │              共享基础设施                    │
    ├── LLM调用层 (重试/降级/缓存/限流)           │
    ├── 工具执行层 (沙箱/超时/权限)               │
    ├── 状态存储 (Redis/PostgreSQL)               │
    ├── 可观测性 (OpenTelemetry)                  │
    └── 安全层 (输入过滤/输出审核)                │
    └─────────────────────────────────────────────┘

2. 容错设计

2.1 LLM调用层容错

Python
import asyncio
import time
import random
from typing import Optional, List
from dataclasses import dataclass
# 导入OpenAI特定异常类,用于精细化异常处理(区分速率限制和通用API错误)
from openai import RateLimitError, APIError

@dataclass
class LLMConfig:
    primary_model: str = "gpt-4o"
    fallback_model: str = "gpt-4o-mini"
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 30.0
    timeout: float = 60.0

class ResilientLLMClient:
    """生产级LLM调用客户端(含重试、降级、熔断)"""

    def __init__(self, config: LLMConfig):
        self.config = config
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            recovery_timeout=60
        )

    async def chat(self, messages, tools=None):
        """带容错的LLM调用"""

        # 熔断器检查
        if not self.circuit_breaker.allow_request():
            # 主模型熔断,直接用降级模型
            return await self._call_with_retry(
                messages, tools, model=self.config.fallback_model
            )

        try:
            result = await self._call_with_retry(
                messages, tools, model=self.config.primary_model
            )
            self.circuit_breaker.record_success()
            return result
        except Exception as e:
            self.circuit_breaker.record_failure()
            # 降级到备用模型
            return await self._call_with_retry(
                messages, tools, model=self.config.fallback_model
            )

    async def _call_with_retry(self, messages, tools, model, retries=None):
        """指数退避重试"""
        max_retries = retries or self.config.max_retries
        last_error = None

        for attempt in range(max_retries):
            try:
                # asyncio.wait_for:为协程设置超时,超时后抛出 asyncio.TimeoutError
                return await asyncio.wait_for(
                    self._raw_call(messages, tools, model),
                    timeout=self.config.timeout
                )
            except asyncio.TimeoutError:  # 单独捕获超时异常,实现超时重试逻辑
                last_error = TimeoutError(f"LLM调用超时 ({self.config.timeout}s)")
            except RateLimitError as e:
                last_error = e
                # 速率限制: 等待更长时间
                delay = min(
                    self.config.base_delay * (2 ** attempt) + random.uniform(0, 1),
                    self.config.max_delay
                )
                # asyncio.sleep:异步等待,不阻塞事件循环(区别于time.sleep会阻塞整个线程)
                await asyncio.sleep(delay)
            except (APIError, ConnectionError) as e:
                last_error = e
                delay = self.config.base_delay * (2 ** attempt)
                await asyncio.sleep(delay)

        raise last_error

    async def _raw_call(self, messages, tools, model):
        """底层LLM API调用"""
        from openai import AsyncOpenAI
        # 注意:生产环境应将client提升为实例属性(self._client),
        # 避免每次调用都创建新连接。可在__init__中初始化或使用单例模式。
        client = AsyncOpenAI()
        kwargs = {
            "model": model,
            "messages": messages,
        }
        if tools:
            kwargs["tools"] = tools
        response = await client.chat.completions.create(**kwargs)  # **kwargs字典解包:将dict中的键值对展开为关键字参数传入函数
        return response.choices[0].message

class CircuitBreaker:
    """熔断器: 连续失败N次后自动切断,一段时间后尝试恢复"""

    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = 0
        self.last_failure_time = 0
        self.state = "CLOSED"  # CLOSED(正常) / OPEN(熔断) / HALF_OPEN(试探)

    def allow_request(self):
        if self.state == "CLOSED":
            return True
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF_OPEN"
                return True
            return False
        return True  # HALF_OPEN

    def record_success(self):
        self.failures = 0
        self.state = "CLOSED"

    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        if self.state == "HALF_OPEN":
            # HALF_OPEN状态下任何失败都立即回到OPEN,重新等待恢复
            self.state = "OPEN"
        elif self.failures >= self.failure_threshold:
            self.state = "OPEN"

2.2 工具执行容错

Python
import asyncio

class ToolExecutor:
    """安全的工具执行管理器"""

    def __init__(self, timeout=30, max_output_size=10000):
        self.timeout = timeout
        self.max_output_size = max_output_size

    async def execute(self, tool_name, tool_func, args):
        """带超时和输出限制的工具执行"""
        try:
            result = await asyncio.wait_for(
                tool_func(**args),
                timeout=self.timeout
            )

            # 限制输出大小(防止巨大输出撑爆context)
            result_str = str(result)
            if len(result_str) > self.max_output_size:
                result_str = result_str[:self.max_output_size] + \
                    f"\n... [输出被截断, 原始长度{len(result_str)}字符]"

            return {
                "success": True,
                "result": result_str,
                "tool": tool_name
            }

        except asyncio.TimeoutError:
            return {
                "success": False,
                "error": f"工具 {tool_name} 执行超时 ({self.timeout}s)",
                "tool": tool_name
            }
        except Exception as e:
            return {
                "success": False,
                "error": f"工具 {tool_name} 执行异常: {type(e).__name__}: {str(e)}",
                "tool": tool_name
            }

3. 可观测性

3.1 结构化日志

Python
import structlog
import uuid
from datetime import datetime

# 配置结构化日志
structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

class ObservableAgent:
    """带完整可观测性的Agent"""

    async def run(self, query: str, user_id: str = None):
        # 生成请求ID(关联整个请求链路)
        request_id = str(uuid.uuid4())[:8]

        log = logger.bind(
            request_id=request_id,
            user_id=user_id,
            query_preview=query[:100]
        )

        log.info("agent.request.start")
        start_time = datetime.now()

        try:
            result = await self._execute(query, log)

            elapsed = (datetime.now() - start_time).total_seconds()
            log.info("agent.request.success",
                     elapsed_seconds=elapsed,
                     tool_calls=result.tool_calls_count,
                     total_tokens=result.total_tokens,
                     iterations=result.iterations)

            return result

        except Exception as e:
            elapsed = (datetime.now() - start_time).total_seconds()
            log.error("agent.request.failed",
                      error=str(e),
                      error_type=type(e).__name__,
                      elapsed_seconds=elapsed)
            raise

3.2 OpenTelemetry追踪

Python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# 初始化追踪
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("agent-service")

class TracedAgent:
    """分布式追踪的Agent"""

    async def run(self, query):
        with tracer.start_as_current_span("agent.run") as span:
            span.set_attribute("agent.query", query[:200])

            for i in range(self.max_iterations):
                with tracer.start_as_current_span(f"agent.iteration.{i}"):

                    # LLM调用追踪
                    with tracer.start_as_current_span("llm.call") as llm_span:
                        response = await self.llm.chat(messages)
                        llm_span.set_attribute("llm.model", self.model)
                        llm_span.set_attribute("llm.tokens", response.usage.total_tokens)

                    # 工具调用追踪
                    if response.tool_calls:
                        for tc in response.tool_calls:
                            with tracer.start_as_current_span(f"tool.{tc.function.name}") as tool_span:
                                result = await self.execute_tool(tc)
                                tool_span.set_attribute("tool.success", result["success"])

            span.set_attribute("agent.iterations", i + 1)
            span.set_attribute("agent.success", True)

4. 成本控制

4.1 成本优化策略

Python
import datetime

class CostController:
    """Agent成本控制器"""

    def __init__(self,
                 max_cost_per_request=0.50,     # 单次请求最大$0.50
                 max_tokens_per_request=50000,  # 单次请求最大50K tokens
                 daily_budget=100.0):            # 每日预算$100
        self.max_cost = max_cost_per_request
        self.max_tokens = max_tokens_per_request
        self.daily_budget = daily_budget

        self.current_request_cost = 0.0
        self.current_request_tokens = 0
        self.daily_cost = 0.0
        self._daily_reset_date = datetime.date.today()  # 每日预算重置日期

    # Token价格表 ($/1M tokens, 截至2026年2月,价格可能变动请以官网为准)
    PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-3.5-sonnet": {"input": 3.00, "output": 15.00},
    }

    def reset_request(self):
        """重置单次请求的计费状态(每次新请求开始时调用)"""
        self.current_request_cost = 0.0
        self.current_request_tokens = 0

    def _check_daily_reset(self):
        """检查是否需要重置每日预算(跨天自动归零)"""
        import datetime
        today = datetime.date.today()
        if today > self._daily_reset_date:
            self.daily_cost = 0.0
            self._daily_reset_date = today

    def check_budget(self, model, input_tokens, output_tokens):
        """检查是否超预算"""
        self._check_daily_reset()
        cost = self._estimate_cost(model, input_tokens, output_tokens)
        self.current_request_cost += cost
        self.current_request_tokens += input_tokens + output_tokens

        if self.current_request_cost > self.max_cost:
            raise BudgetExceededError(
                f"单次请求成本已达${self.current_request_cost:.2f},"
                f"超过限额${self.max_cost}"
            )

        if self.daily_cost + self.current_request_cost > self.daily_budget:
            raise BudgetExceededError("今日预算已用尽")

    def _estimate_cost(self, model, input_tokens, output_tokens):
        pricing = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])
        return (input_tokens * pricing["input"] +
                output_tokens * pricing["output"]) / 1_000_000

    def suggest_optimization(self):
        """根据使用模式建议优化"""
        suggestions = []

        if self.current_request_tokens > 30000:
            suggestions.append("消息历史过长,建议启用摘要压缩")

        if self.current_request_cost > 0.20:
            suggestions.append("考虑将简单判断迁移到gpt-4o-mini")

        return suggestions

class BudgetExceededError(Exception):
    pass

4.2 智能缓存

Python
import hashlib
import json
from typing import Optional

class AgentCache:
    """Agent响应缓存(减少重复LLM调用)"""

    def __init__(self, redis_client, ttl=3600):
        self.redis = redis_client
        self.ttl = ttl

    def _make_key(self, messages, tools):
        """生成缓存key"""
        content = json.dumps({
            "messages": messages[-3:],  # 只用最近3条消息作为key
            "tools": sorted([t["function"]["name"] for t in (tools or [])])
        }, sort_keys=True)
        return f"agent:cache:{hashlib.sha256(content.encode()).hexdigest()[:16]}"

    async def get(self, messages, tools) -> Optional[dict]:
        key = self._make_key(messages, tools)
        cached = await self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None

    async def set(self, messages, tools, response):
        """只缓存最终回答(不缓存工具调用,因为外部状态可能变化)"""
        if response.get("tool_calls"):
            return  # 不缓存工具调用决策

        key = self._make_key(messages, tools)
        await self.redis.setex(key, self.ttl, json.dumps(response))

5. 安全防护

5.1 输入/输出安全

Python
import re

class AgentSecurityGuard:
    """Agent安全防线"""

    # 输入过滤规则
    BLOCKED_PATTERNS = [
        r"ignore\s+previous\s+instructions",
        r"ignore\s+all\s+above",
        r"system\s*:\s*you\s+are\s+now",
        r"jailbreak",
        r"DAN\s+mode",
    ]

    def validate_input(self, user_input: str) -> tuple:
        """验证用户输入是否安全"""
        # 长度检查
        if len(user_input) > 10000:
            return False, "输入过长(最大10000字符)"

        # Prompt注入检测
        lower_input = user_input.lower()
        for pattern in self.BLOCKED_PATTERNS:
            if re.search(pattern, lower_input):
                return False, "检测到潜在的Prompt注入攻击"

        return True, "OK"

    def validate_tool_call(self, tool_name: str, args: dict) -> tuple:
        """验证工具调用是否安全"""
        # 文件访问检查
        if tool_name in ("read_file", "write_file"):
            path = args.get("path", "")
            if ".." in path or path.startswith("/etc") or path.startswith("/root"):
                return False, f"不允许访问路径: {path}"

        # Shell命令检查
        if tool_name == "shell":
            cmd = args.get("command", "")
            dangerous = ["rm -rf", "sudo", "chmod 777", "> /dev/", "mkfs"]
            if any(d in cmd for d in dangerous):  # any+子串匹配:检查命令中是否包含任一危险模式
                return False, f"不允许执行危险命令: {cmd}"

        # SQL注入检查
        if tool_name == "database_query":
            sql = args.get("sql", "")
            # any+生成器表达式:将SQL转大写后检查是否含破坏性关键词(黑名单模式)
            if any(kw in sql.upper() for kw in ["DROP", "DELETE", "TRUNCATE", "ALTER"]):
                return False, f"不允许执行破坏性SQL: {sql}"

        return True, "OK"

    def validate_output(self, output: str) -> str:
        """输出脱敏"""
        # 移除可能的API密钥
        output = re.sub(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED_API_KEY]', output)
        # 移除可能的密码
        output = re.sub(r'password["\s:=]+\S+', 'password=[REDACTED]', output, flags=re.I)

        return output

6. 部署方案

6.1 Docker化部署

Docker
# Dockerfile
FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# 非root用户运行
RUN useradd -m agent
USER agent

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Python
# main.py - FastAPI Agent服务
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio

app = FastAPI(title="Agent Service")

class AgentRequest(BaseModel):
    query: str
    user_id: str = "anonymous"
    max_iterations: int = 10

class AgentResponse(BaseModel):
    answer: str
    tool_calls: list
    tokens_used: int
    cost_usd: float
    latency_ms: float

@app.post("/agent/run", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
    # 输入验证
    guard = AgentSecurityGuard()
    is_safe, reason = guard.validate_input(request.query)
    if not is_safe:
        raise HTTPException(400, detail=reason)

    agent = get_agent_instance()
    result = await agent.run(
        query=request.query,
        user_id=request.user_id,
        max_iterations=request.max_iterations
    )

    return AgentResponse(
        answer=guard.validate_output(result.final_answer),
        tool_calls=result.tool_calls_summary,
        tokens_used=result.total_tokens,
        cost_usd=result.total_cost,
        latency_ms=result.latency_ms
    )

@app.get("/health")
async def health():
    return {"status": "healthy"}

7. 练习题

代码实践

  1. 入门:为Agent实现指数退避重试 + 模型降级逻辑
  2. 进阶:实现完整的成本控制器(单次预算+日预算+Token限制)
  3. 高级:用FastAPI + Docker部署一个生产级Agent服务,包含完整的可观测性

面试题

  1. Agent在生产环境中最容易出什么问题?如何预防?
  2. 如何设计Agent的限流策略?(请求级/用户级/全局)
  3. Agent的长任务(执行>1分钟)如何处理?
  4. 如何确保Agent不泄露公司内部数据?
  5. 设计一个Agent的灰度发布方案

📝 本章小结

本章系统学习了Agent生产部署的核心知识:

  1. ✅ 理解了Agent原型到生产的架构差异
  2. ✅ 掌握了容错设计(重试、降级、熔断)
  3. ✅ 学会了可观测性建设(结构化日志、OpenTelemetry追踪)
  4. ✅ 掌握了成本控制与智能缓存策略
  5. ✅ 了解了Agent安全防护措施
  6. ✅ 掌握了Docker化部署方案

✅ 学习检查清单

  • 能实现指数退避重试 + 模型降级逻辑
  • 能实现熔断器模式保护LLM调用
  • 能为Agent添加结构化日志和OpenTelemetry追踪
  • 能实现成本控制器(单次预算+日预算)
  • 能实现Agent输入/输出安全检查
  • 能用FastAPI + Docker部署Agent服务
  • 了解Agent限流和灰度发布策略

🔗 下一步

下一章我们将通过四个企业级Agent案例,深入学习实际项目的设计决策和实现细节。

继续学习: 07-企业级Agent案例

📚 参考资料

  1. OpenTelemetry Documentation
  2. FastAPI Documentation
  3. Docker Best Practices
  4. "Building Reliable LLM Applications" - various industry practices
  5. structlog Documentation

祝你学习愉快! 🎉


最后更新日期:2026-02-12 适用版本:AI Agent开发实战教程 v2026