跳转至

Agent评估与可观测性

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

Agent评估与可观测性图

📌 Agent系统的非确定性、多步骤特性使其评估和监控远比传统软件复杂。掌握LangSmith等可观测性工具、LLM-as-Judge评估方法和成本优化策略,是构建可靠Agent系统的关键。

🎯 学习目标

  • 理解Agent评估的独特挑战与评估维度
  • 掌握LangSmith、Weights & Biases Weave、Phoenix等评估工具
  • 熟练使用LLM-as-Judge方法评估Agent输出
  • 理解分布式追踪与OpenTelemetry集成
  • 掌握Token监控、成本优化与异常检测
  • 能够搭建完整的Agent监控与评估体系
  • 掌握Agent评估相关面试考点

19.1 Agent评估的挑战

19.1.1 为什么Agent评估很难

与传统软件测试不同,Agent系统面临以下独特挑战:

挑战 描述 影响
非确定性输出 相同输入可能产生不同输出 无法用简单的assert测试
多步骤交互 任务涉及多轮工具调用和推理 需要评估过程而非仅结果
工具调用正确性 不仅要选对工具,参数也要准确 需要细粒度的调用评估
中间状态依赖 后续步骤依赖前序步骤的正确性 错误会级联放大
环境交互 Agent可能修改外部状态 测试需要隔离环境
评估标准模糊 "好的回答"难以形式化定义 需要多维度评估
Text Only
传统软件测试:  input → function → output → assert(output == expected)
Agent系统评估:  input → [think → tool_call → observe]×N → output → ???
                            ↑ 每一步都需要评估 ↑

19.1.2 评估维度全景

Python
"""Agent评估维度定义"""
from pydantic import BaseModel, Field

class AgentEvaluation(BaseModel):  # Pydantic BaseModel:自动数据验证和序列化
    """Agent评估结果"""

    # 1. 任务完成率
    goal_achieved: bool = Field(description="是否完成最终目标")
    goal_score: float = Field(description="目标完成程度 0-1")

    # 2. 工具调用准确率
    tool_calls_total: int = Field(description="总工具调用次数")
    tool_calls_correct: int = Field(description="正确工具调用次数")
    tool_accuracy: float = Field(description="工具调用准确率")

    # 3. 推理质量
    reasoning_coherence: float = Field(description="推理连贯性 0-1")
    reasoning_relevance: float = Field(description="推理相关性 0-1")

    # 4. 效率
    total_steps: int = Field(description="总步骤数")
    total_tokens: int = Field(description="消耗Token数")
    latency_seconds: float = Field(description="总耗时(秒)")

    # 5. 安全性
    prompt_injection_detected: bool = Field(description="是否检测到注入攻击")
    harmful_content_detected: bool = Field(description="是否生成有害内容")
    pii_leakage: bool = Field(description="是否泄露个人信息")

19.2 评估框架与工具

19.2.1 LangSmith(LangChain生态)

LangSmith是LangChain官方的可观测性和评估平台,提供Tracing、Evaluation、Monitoring三大功能。

Python
"""LangSmith集成示例"""
# pip install langsmith langchain-openai

import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "agent-evaluation"

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langsmith import Client
from langsmith.evaluation import evaluate, LangChainStringEvaluator

# ============ 1. 自动Tracing ============
# 设置环境变量后,所有LangChain调用自动被追踪
llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_template("回答问题:{question}")
chain = prompt | llm

# 这次调用会自动记录到LangSmith
result = chain.invoke({"question": "什么是LangGraph?"})

# ============ 2. 创建评估数据集 ============
client = Client()

dataset_name = "agent-eval-dataset"
dataset = client.create_dataset(dataset_name, description="Agent评估数据集")

# 添加测试用例
test_cases = [
    {
        "input": {"question": "什么是RAG?"},
        "expected_output": "RAG是检索增强生成,通过检索外部知识增强模型回答"
    },
    {
        "input": {"question": "LangChain和LlamaIndex哪个更适合RAG?"},
        "expected_output": "LlamaIndex更专注RAG场景,LangChain更通用"
    }
]

for case in test_cases:
    client.create_example(
        inputs=case["input"],
        outputs={"answer": case["expected_output"]},
        dataset_id=dataset.id
    )

# ============ 3. 运行评估 ============
def predict(inputs: dict) -> dict:
    """被评估的Agent/Chain"""
    result = chain.invoke(inputs)
    return {"answer": result.content}

# 使用内置评估器
eval_results = evaluate(
    predict,
    data=dataset_name,
    evaluators=[
        LangChainStringEvaluator("cot_qa"),        # CoT问答评估
        LangChainStringEvaluator("relevance"),      # 相关性
        LangChainStringEvaluator("helpfulness"),    # 有用性
    ],
    experiment_prefix="v1-gpt4o"
)

print(eval_results)

19.2.2 自定义LangSmith评估器

Python
"""自定义LangSmith评估器"""
from langsmith.evaluation import RunEvaluator
from langsmith.schemas import Example, Run

class ToolUsageEvaluator(RunEvaluator):
    """评估Agent工具调用的准确性"""

    def evaluate_run(self, run: Run, example: Example | None = None) -> dict:
        # 从run中提取工具调用信息
        tool_calls = []
        for child in run.child_runs or []:
            if child.run_type == "tool":
                tool_calls.append({
                    "name": child.name,
                    "input": child.inputs,
                    "output": child.outputs,
                    "error": child.error
                })

        # 评估逻辑
        expected_tools = example.outputs.get("expected_tools", []) if example else []
        actual_tools = [tc["name"] for tc in tool_calls]

        correct = sum(1 for t in actual_tools if t in expected_tools)
        accuracy = correct / len(expected_tools) if expected_tools else 1.0

        return {
            "key": "tool_accuracy",
            "score": accuracy,
            "comment": f"调用工具: {actual_tools}, 期望: {expected_tools}"
        }

class LatencyEvaluator(RunEvaluator):
    """评估响应延迟"""

    def evaluate_run(self, run: Run, example: Example | None = None) -> dict:
        latency = (run.end_time - run.start_time).total_seconds() if run.end_time else float('inf')

        # 根据延迟评分
        if latency < 3:
            score = 1.0
        elif latency < 10:
            score = 0.7
        elif latency < 30:
            score = 0.4
        else:
            score = 0.1

        return {
            "key": "latency_score",
            "score": score,
            "comment": f"延迟: {latency:.2f}秒"
        }

19.2.3 Weights & Biases Weave

Python
"""W&B Weave评估示例"""
# pip install weave

import weave

weave.init("agent-eval-project")

# 使用@weave.op()装饰器自动追踪函数
@weave.op()
def my_agent(question: str) -> str:
    """被追踪的Agent函数"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke(question)
    return response.content

# 定义评估指标
@weave.op()
def relevance_scorer(question: str, output: str) -> dict:
    """相关性评分"""
    llm = ChatOpenAI(model="gpt-4o-mini")
    score_response = llm.invoke(
        f"评分(0-1):以下回答与问题的相关程度。\n问题:{question}\n回答:{output}\n只输出分数:"
    )
    try:  # try/except捕获异常,防止程序崩溃
        score = float(score_response.content.strip())  # 链式调用:strip去除空白
    except ValueError:
        score = 0.5
    return {"relevance": score}

# 创建评估数据集
eval_dataset = [
    {"question": "什么是Agent?"},
    {"question": "如何构建RAG系统?"},
]

# 运行评估
evaluation = weave.Evaluation(
    dataset=eval_dataset,
    scorers=[relevance_scorer]
)
# await evaluation.evaluate(my_agent)

19.2.4 Phoenix(Arize AI)

Python
"""Phoenix可观测性平台"""
# pip install arize-phoenix openinference-instrumentation-langchain

import phoenix as px
from openinference.instrumentation.langchain import LangChainInstrumentor
from phoenix.otel import register

# 启动Phoenix服务
# px.launch_app()  # 本地UI: http://localhost:6006

# 配置OpenTelemetry追踪
tracer_provider = register(
    project_name="agent-monitoring",
    endpoint="http://localhost:6006/v1/traces"
)

# 自动instrumentation
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)

# 之后所有LangChain调用自动被追踪到Phoenix
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
result = llm.invoke("什么是Agent?")
# 在Phoenix UI中查看trace详情

19.2.5 自建评估Pipeline

Python
"""自建Agent评估Pipeline"""
import time
import json
from dataclasses import dataclass, field
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import BaseCallbackHandler

@dataclass  # @dataclass自动生成__init__等方法
class AgentTrace:
    """Agent执行追踪"""
    task: str
    steps: list[dict] = field(default_factory=list)  # 每个实例创建独立的空列表,避免共享可变默认值
    total_tokens: int = 0
    start_time: float = 0
    end_time: float = 0

    @property  # 将方法变为只读属性,调用时无需括号:trace.latency
    def latency(self) -> float:
        return self.end_time - self.start_time

    @property  # @property将方法变为属性访问
    def num_steps(self) -> int:
        return len(self.steps)

class TraceCollector(BaseCallbackHandler):
    """收集Agent执行轨迹的回调"""

    def __init__(self):
        self.traces: list[dict] = []
        self.current_tokens = 0

    def on_llm_start(self, serialized, prompts, **kwargs):  # **kwargs收集关键字参数
        self.traces.append({"type": "llm_start", "time": time.time()})

    def on_llm_end(self, response, **kwargs):
        usage = response.llm_output.get("token_usage", {}) if response.llm_output else {}
        self.current_tokens += usage.get("total_tokens", 0)
        self.traces.append({
            "type": "llm_end",
            "time": time.time(),
            "tokens": usage.get("total_tokens", 0)
        })

    def on_tool_start(self, serialized, input_str, **kwargs):
        self.traces.append({
            "type": "tool_start",
            "tool": serialized.get("name", "unknown"),
            "input": input_str,
            "time": time.time()
        })

    def on_tool_end(self, output, **kwargs):
        self.traces.append({
            "type": "tool_end",
            "output": str(output)[:500],
            "time": time.time()
        })

class AgentEvaluator:
    """Agent评估器"""

    def __init__(self, judge_llm=None):
        self.judge_llm = judge_llm or ChatOpenAI(model="gpt-4o", temperature=0)

    def evaluate_goal_achievement(self, task: str, result: str) -> float:
        """评估任务完成度"""
        response = self.judge_llm.invoke(
            f"""评估以下Agent的任务完成度(0-1分):
任务:{task}
结果:{result}
请只输出分数(0到1之间的小数):"""
        )
        try:
            return float(response.content.strip())
        except ValueError:
            return 0.5

    def evaluate_tool_usage(self, trace: AgentTrace, expected_tools: list[str]) -> dict:
        """评估工具使用情况"""
        actual_tools = [
            s["tool"] for s in trace.steps
            if s.get("type") == "tool_start"
        ]

        correct = sum(1 for t in actual_tools if t in expected_tools)
        unnecessary = sum(1 for t in actual_tools if t not in expected_tools)
        missed = sum(1 for t in expected_tools if t not in actual_tools)

        precision = correct / len(actual_tools) if actual_tools else 0
        recall = correct / len(expected_tools) if expected_tools else 1

        return {
            "precision": precision,
            "recall": recall,
            "unnecessary_calls": unnecessary,
            "missed_tools": missed
        }

    def evaluate_efficiency(self, trace: AgentTrace) -> dict:
        """评估执行效率"""
        return {
            "total_steps": trace.num_steps,
            "total_tokens": trace.total_tokens,
            "latency_seconds": trace.latency,
            "tokens_per_step": trace.total_tokens / max(trace.num_steps, 1),
            "cost_estimate_usd": trace.total_tokens * 0.00001  # 粗略估算
        }

    def full_evaluation(self, task: str, result: str, trace: AgentTrace, expected_tools: list[str] = None) -> dict:
        """完整评估"""
        return {
            "goal_achievement": self.evaluate_goal_achievement(task, result),
            "tool_usage": self.evaluate_tool_usage(trace, expected_tools or []),
            "efficiency": self.evaluate_efficiency(trace)
        }

19.3 LLM-as-Judge

19.3.1 核心思想

使用强大的LLM(如GPT-4o)作为"裁判"评估其他LLM的输出质量。

Python
"""LLM-as-Judge评估框架"""
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

class JudgeResult(BaseModel):
    """裁判评分结果"""
    score: int = Field(description="评分(1-5)")
    reasoning: str = Field(description="评分理由")
    strengths: list[str] = Field(description="优点")
    weaknesses: list[str] = Field(description="不足")

judge_llm = ChatOpenAI(model="gpt-4o", temperature=0)

# --- 单输出评估(Point-wise)---
pointwise_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个专业的AI输出评估专家。请根据以下维度评估AI助手的回答(1-5分):

评分标准:
5分 - 完美:准确、完整、清晰、有帮助
4分 - 优秀:基本准确,有少量遗漏
3分 - 一般:部分正确,但有明显不足
2分 - 较差:多处错误或不相关
1分 - 极差:完全错误或有害

评估维度:
- 准确性:信息是否正确
- 完整性:是否覆盖问题要点
- 清晰度:表述是否清楚
- 有用性:对用户是否有帮助"""),
    ("human", """用户问题:{question}

AI回答:{answer}

参考答案(如有):{reference}

请给出评分和详细理由:""")
])

pointwise_judge = pointwise_prompt | judge_llm.with_structured_output(JudgeResult)

# --- 对比评估(Pair-wise)---
class PairwiseResult(BaseModel):
    winner: str = Field(description="获胜者: A或B或tie")
    reasoning: str = Field(description="判断理由")

pairwise_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个公正的裁判,比较两个AI回答的质量。"),
    ("human", """问题:{question}

回答A:{answer_a}

回答B:{answer_b}

哪个回答更好?请分析原因。""")
])

pairwise_judge = pairwise_prompt | judge_llm.with_structured_output(PairwiseResult)

# --- 使用示例 ---
# 单输出评估
result = pointwise_judge.invoke({
    "question": "什么是RAG?",
    "answer": "RAG是检索增强生成,通过检索增强模型回答。",
    "reference": "RAG(Retrieval-Augmented Generation)通过检索外部知识库来增强大模型回答。"
})
print(f"评分: {result.score}/5")
print(f"理由: {result.reasoning}")

# 对比评估
pair_result = pairwise_judge.invoke({
    "question": "什么是RAG?",
    "answer_a": "RAG是一种AI技术。",
    "answer_b": "RAG是检索增强生成,通过检索外部知识来增强语言模型的回答质量,减少幻觉。"
})
print(f"获胜者: {pair_result.winner}")

19.3.2 评分一致性优化

Python
"""提高LLM-as-Judge评分一致性的技巧"""

# 技巧1: 提供详细的评分rubric
rubric_prompt = """评分Rubric:
5 - 完全正确且全面:回答准确、完整、有条理、包含示例
4 - 基本正确:主要观点正确,可能缺少细节或示例
3 - 部分正确:有正确信息但也有错误或重大遗漏
2 - 大部分错误:核心概念理解有误
1 - 完全错误:信息严重错误或与问题无关
"""

# 技巧2: 多次评估取平均(减少随机性)
def robust_judge(question, answer, n_trials=3):
    scores = []
    for _ in range(n_trials):
        result = pointwise_judge.invoke({
            "question": question,
            "answer": answer,
            "reference": ""
        })
        scores.append(result.score)

    return {
        "mean_score": sum(scores) / len(scores),
        "scores": scores,
        "variance": max(scores) - min(scores)
    }

# 技巧3: 位置偏差消除(对比评估时交换AB顺序)
def unbiased_pairwise(question, answer_a, answer_b):
    # 正向评估
    result1 = pairwise_judge.invoke({
        "question": question,
        "answer_a": answer_a,
        "answer_b": answer_b
    })
    # 反向评估(交换AB)
    result2 = pairwise_judge.invoke({
        "question": question,
        "answer_a": answer_b,
        "answer_b": answer_a
    })

    # 综合判断
    if result1.winner == "A" and result2.winner == "B":
        return "A wins (consistent)"
    elif result1.winner == "B" and result2.winner == "A":
        return "B wins (consistent)"
    else:
        return "Tie / Inconsistent"

19.4 可观测性

19.4.1 分布式追踪架构

Text Only
Trace(一次完整的Agent执行)
├── Span: 用户输入处理       [10ms]
├── Span: LLM调用 - 规划     [2.3s, 1500 tokens]
│   └── Span: API请求         [2.1s]
├── Span: 工具调用 - 搜索     [500ms]
│   ├── Span: 查询构建         [5ms]
│   └── Span: API请求          [490ms]
├── Span: LLM调用 - 分析      [3.1s, 2000 tokens]
├── Span: 工具调用 - 数据库   [200ms]
└── Span: LLM调用 - 生成答案  [1.8s, 800 tokens]
    总计: 7.91s, 4300 tokens, $0.043

19.4.2 OpenTelemetry集成

Python
"""OpenTelemetry集成Agent追踪"""
# pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
import functools
import time

# 配置TracerProvider
resource = Resource.create({"service.name": "agent-service"})
tracer_provider = TracerProvider(resource=resource)

# 导出到OTLP(支持Jaeger、Phoenix等)
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)

tracer = trace.get_tracer("agent.tracer")

# 追踪装饰器
def trace_agent(func):
    """Agent函数追踪装饰器"""
    @functools.wraps(func)  # @wraps保留被装饰函数的原始信息
    def wrapper(*args, **kwargs):  # *args收集位置参数,**kwargs收集关键字参数,使装饰器兼容任意函数签名
        with tracer.start_as_current_span(
            func.__name__,
            attributes={
                "agent.function": func.__name__,
                "agent.args": str(args)[:500]
            }
        ) as span:
            start = time.time()
            try:
                result = func(*args, **kwargs)
                span.set_attribute("agent.success", True)
                span.set_attribute("agent.result_length", len(str(result)))
                return result
            except Exception as e:
                span.set_attribute("agent.success", False)
                span.set_attribute("agent.error", str(e))
                span.record_exception(e)
                raise
            finally:
                span.set_attribute("agent.latency_ms", (time.time() - start) * 1000)
    return wrapper

# 使用追踪
@trace_agent
def search_documents(query: str) -> list[str]:
    """检索文档"""
    with tracer.start_as_current_span("vector_search") as span:
        span.set_attribute("search.query", query)
        # 模拟检索
        results = [f"文档{i}" for i in range(3)]
        span.set_attribute("search.num_results", len(results))
        return results

@trace_agent
def generate_answer(query: str, context: list[str]) -> str:
    """生成回答"""
    with tracer.start_as_current_span("llm_call") as span:
        span.set_attribute("llm.model", "gpt-4o")
        span.set_attribute("llm.context_length", sum(len(c) for c in context))
        # 模拟LLM调用
        answer = f"基于{len(context)}篇文档的回答"
        span.set_attribute("llm.output_length", len(answer))
        return answer

19.4.3 Token监控与成本优化

Python
"""Token使用量监控与成本优化"""
from dataclasses import dataclass, field
from datetime import datetime
import json

@dataclass
class TokenUsage:
    """Token使用记录"""
    timestamp: str
    model: str
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    estimated_cost: float

class TokenMonitor:
    """Token使用量监控器"""

    # 模型定价(美元/1K tokens,截至2025年)
    PRICING = {
        "gpt-4o": {"input": 0.0025, "output": 0.01},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
        "o4-mini": {"input": 0.0011, "output": 0.0044},
        "claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
        "deepseek-v3": {"input": 0.00027, "output": 0.0011},
    }

    def __init__(self):
        self.usage_log: list[TokenUsage] = []
        self.daily_budget: float = 10.0  # 每日预算(美元)
        self.alert_threshold: float = 0.8  # 80%预算告警

    def log_usage(self, model: str, prompt_tokens: int, completion_tokens: int):
        """记录Token使用"""
        pricing = self.PRICING.get(model, {"input": 0.01, "output": 0.03})
        cost = (prompt_tokens / 1000 * pricing["input"] +
                completion_tokens / 1000 * pricing["output"])

        usage = TokenUsage(
            timestamp=datetime.now().isoformat(),
            model=model,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=prompt_tokens + completion_tokens,
            estimated_cost=cost
        )
        self.usage_log.append(usage)

        # 检查预算
        daily_cost = self.get_daily_cost()
        if daily_cost >= self.daily_budget * self.alert_threshold:
            print(f"⚠️ Token预算告警:已使用 ${daily_cost:.4f} / ${self.daily_budget}")

        return usage

    def get_daily_cost(self) -> float:
        """获取今日总成本"""
        today = datetime.now().date().isoformat()
        return sum(
            u.estimated_cost for u in self.usage_log
            if u.timestamp.startswith(today)
        )

    def get_summary(self) -> dict:
        """获取使用汇总"""
        total_tokens = sum(u.total_tokens for u in self.usage_log)
        total_cost = sum(u.estimated_cost for u in self.usage_log)
        by_model = {}
        for u in self.usage_log:
            if u.model not in by_model:
                by_model[u.model] = {"tokens": 0, "cost": 0}
            by_model[u.model]["tokens"] += u.total_tokens
            by_model[u.model]["cost"] += u.estimated_cost

        return {
            "total_tokens": total_tokens,
            "total_cost_usd": round(total_cost, 4),
            "num_calls": len(self.usage_log),
            "by_model": by_model
        }

# 使用示例
monitor = TokenMonitor()
monitor.log_usage("gpt-4o", prompt_tokens=1000, completion_tokens=500)
monitor.log_usage("gpt-4o-mini", prompt_tokens=2000, completion_tokens=800)
print(json.dumps(monitor.get_summary(), indent=2, ensure_ascii=False))  # json.dumps将Python对象→JSON字符串

19.4.4 异常检测与告警

Python
"""Agent异常检测"""
from collections import deque
import statistics

class AnomalyDetector:
    """基于滑动窗口的异常检测"""

    def __init__(self, window_size: int = 100, z_threshold: float = 3.0):
        self.window_size = window_size
        self.z_threshold = z_threshold
        self.latency_window = deque(maxlen=window_size)  # deque(maxlen=N):固定长度队列,满时自动丢弃最旧元素,实现滑动窗口
        self.token_window = deque(maxlen=window_size)  # deque双端队列,两端操作O(1)
        self.error_count = 0
        self.total_count = 0

    def record(self, latency: float, tokens: int, is_error: bool = False):
        """记录一次请求"""
        self.latency_window.append(latency)
        self.token_window.append(tokens)
        self.total_count += 1
        if is_error:
            self.error_count += 1

        alerts = []

        # 延迟异常检测(Z-score)
        if len(self.latency_window) >= 10:
            mean = statistics.mean(self.latency_window)
            std = statistics.stdev(self.latency_window) or 1
            z_score = (latency - mean) / std
            if z_score > self.z_threshold:
                alerts.append(f"🚨 延迟异常: {latency:.2f}s (均值{mean:.2f}s, Z={z_score:.1f})")

        # Token用量异常
        if len(self.token_window) >= 10:
            mean = statistics.mean(self.token_window)
            std = statistics.stdev(self.token_window) or 1
            z_score = (tokens - mean) / std
            if z_score > self.z_threshold:
                alerts.append(f"🚨 Token异常: {tokens} (均值{mean:.0f}, Z={z_score:.1f})")

        # 错误率检测
        error_rate = self.error_count / self.total_count if self.total_count > 0 else 0
        if error_rate > 0.1 and self.total_count >= 20:
            alerts.append(f"🚨 错误率过高: {error_rate:.1%} ({self.error_count}/{self.total_count})")

        return alerts

detector = AnomalyDetector()
# 正常请求
for _ in range(50):
    detector.record(latency=2.0, tokens=1000)
# 异常请求
alerts = detector.record(latency=30.0, tokens=10000)
for alert in alerts:
    print(alert)

19.5 A/B测试与在线评估

Python
"""Agent A/B测试框架"""
import random
import hashlib
from dataclasses import dataclass

@dataclass
class ABTestConfig:
    """A/B测试配置"""
    name: str
    variant_a: dict  # 配置A(对照组)
    variant_b: dict  # 配置B(实验组)
    traffic_split: float = 0.5  # B组流量比例

class AgentABTest:
    """Agent A/B测试"""

    def __init__(self, config: ABTestConfig):
        self.config = config
        self.results = {"A": [], "B": []}

    def get_variant(self, user_id: str) -> str:
        """基于用户ID确定性分组"""
        hash_val = int(hashlib.md5(
            f"{self.config.name}:{user_id}".encode()
        ).hexdigest(), 16)
        return "B" if (hash_val % 100) < (self.config.traffic_split * 100) else "A"

    def record_result(self, variant: str, metrics: dict):
        """记录结果"""
        self.results[variant].append(metrics)

    def analyze(self) -> dict:
        """分析A/B测试结果"""
        analysis = {}
        for variant in ["A", "B"]:
            results = self.results[variant]
            if results:
                analysis[variant] = {
                    "n": len(results),
                    "avg_score": sum(r.get("score", 0) for r in results) / len(results),
                    "avg_latency": sum(r.get("latency", 0) for r in results) / len(results),
                    "avg_cost": sum(r.get("cost", 0) for r in results) / len(results),
                }
        return analysis

# 使用示例
ab_test = AgentABTest(ABTestConfig(
    name="model-comparison",
    variant_a={"model": "gpt-4o-mini", "temperature": 0},
    variant_b={"model": "gpt-4o", "temperature": 0},
    traffic_split=0.3  # 30%流量给B组
))

19.6 Prompt版本管理

Python
"""Prompt版本管理"""
import json
import hashlib
from datetime import datetime
from pathlib import Path

class PromptRegistry:
    """Prompt版本注册表"""

    def __init__(self, storage_dir: str = "./prompts"):
        self.storage_dir = Path(storage_dir)
        self.storage_dir.mkdir(exist_ok=True)
        self.registry: dict[str, list[dict]] = {}

    def register(self, name: str, template: str, metadata: dict = None) -> str:
        """注册新版本的Prompt"""
        version_hash = hashlib.md5(template.encode()).hexdigest()[:8]

        version_info = {
            "version": version_hash,
            "template": template,
            "metadata": metadata or {},
            "created_at": datetime.now().isoformat(),
            "is_active": True
        }

        if name not in self.registry:
            self.registry[name] = []

        # 将之前版本设为非活跃
        for v in self.registry[name]:
            v["is_active"] = False

        self.registry[name].append(version_info)

        # 持久化
        self._save()

        return version_hash

    def get_active(self, name: str) -> str | None:
        """获取活跃版本的Prompt"""
        if name not in self.registry:
            return None
        for v in reversed(self.registry[name]):
            if v["is_active"]:
                return v["template"]
        return None

    def rollback(self, name: str, version: str):
        """回滚到指定版本"""
        if name in self.registry:
            for v in self.registry[name]:
                v["is_active"] = (v["version"] == version)
            self._save()

    def _save(self):
        with open(self.storage_dir / "registry.json", "w") as f:  # with自动管理文件关闭
            json.dump(self.registry, f, ensure_ascii=False, indent=2)

# 使用
registry = PromptRegistry()
v1 = registry.register(
    "rag_answer",
    "基于上下文回答问题。上下文:{context}\n问题:{question}",
    {"author": "team", "note": "初始版本"}
)
v2 = registry.register(
    "rag_answer",
    "你是一个专业助手。严格基于上下文回答,如不确定请说明。\n上下文:{context}\n问题:{question}",
    {"author": "team", "note": "增加角色设定和不确定性处理"}
)
print(f"当前活跃Prompt: {registry.get_active('rag_answer')}")

19.7 完整代码:搭建LangSmith监控的Agent系统

Python
"""完整示例:带LangSmith监控的Agent系统"""
import os
import time
from typing import TypedDict, Annotated, Literal
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver

# ============ 环境配置 ============
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "monitored-agent"

# ============ 工具定义 ============
@tool
def search_knowledge_base(query: str) -> str:
    """搜索内部知识库"""
    return f"知识库搜索结果:关于'{query}'的相关文档..."

@tool
def query_database(sql: str) -> str:
    """查询业务数据库"""
    return f"数据库查询结果:{sql} 返回5条记录"

@tool
def send_notification(message: str, channel: str = "slack") -> str:
    """发送通知"""
    return f"通知已发送到{channel}: {message}"

tools = [search_knowledge_base, query_database, send_notification]
tool_node = ToolNode(tools)

# ============ State定义 ============
class MonitoredAgentState(TypedDict):  # TypedDict定义类型化字典
    messages: Annotated[list, add_messages]  # Annotated附加元数据注解
    step_count: int

# ============ Agent节点 ============
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)

def agent_node(state: MonitoredAgentState) -> dict:
    """Agent推理节点"""
    system = SystemMessage(content="""你是一个企业助手Agent。
你可以:搜索知识库、查询数据库、发送通知。
请根据用户需求选择合适的工具完成任务。""")

    messages = [system] + state["messages"]
    response = llm.invoke(messages)

    return {
        "messages": [response],
        "step_count": state.get("step_count", 0) + 1
    }

def should_continue(state: MonitoredAgentState) -> Literal["tools", "end"]:
    """路由决策"""
    last_msg = state["messages"][-1]
    if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:  # hasattr检查对象是否有某属性
        if state.get("step_count", 0) < 10:  # 防止无限循环
            return "tools"
    return "end"

# ============ 构建Graph ============
workflow = StateGraph(MonitoredAgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)

workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})
workflow.add_edge("tools", "agent")

memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# ============ 执行(自动追踪到LangSmith)============
config = {"configurable": {"thread_id": "monitored-001"}}
result = app.invoke(
    {
        "messages": [HumanMessage(content="帮我查一下上个月的销售数据,然后搜索相关的市场分析报告")],
        "step_count": 0
    },
    config=config
)

# 所有执行轨迹、Token用量、延迟都自动记录在LangSmith中
for msg in result["messages"]:
    if hasattr(msg, "content") and msg.content:
        print(f"[{msg.type}]: {msg.content[:200]}")

📋 面试要点

高频面试题

Q1: Agent评估和传统软件测试有什么区别?

答:主要区别:①非确定性——相同输入可能不同输出,无法用简单assert;②多步骤——需评估每步而非仅结果;③工具调用——需评估工具选择和参数准确性;④评估标准模糊——"好的回答"难以形式化。因此需要LLM-as-Judge、多维度评估和统计方法。

Q2: LLM-as-Judge有什么优缺点?如何提高评分一致性?

答:优点:可扩展、无需人工标注、灵活。缺点:自身可能有偏见、成本高、存在位置偏差。提高一致性:①提供详细评分Rubric;②多次评估取平均;③对比评估时交换AB顺序消除位置偏差;④使用强模型(GPT-4o)作为评判。

Q3: 如何监控Agent系统的Token成本?

答:①在每次LLM调用后记录prompt_tokens和completion_tokens;②根据模型定价计算成本;③设置每日/每月预算告警;④使用滑动窗口检测异常token用量;⑤通过模型降级策略优化成本(简单问题用小模型)。

Q4: 什么是分布式追踪?在Agent系统中如何应用?

答:分布式追踪将一次完整执行分解为Trace→Span的层级结构。在Agent中,一次用户请求是一个Trace,每次LLM调用、工具调用是一个Span。通过OpenTelemetry标准接入,可在LangSmith/Phoenix等平台可视化查看每步延迟、Token消耗和错误信息。

Q5: 如何设计Agent的A/B测试?

答:①基于用户ID确定性分组(MD5哈希取模),保证同一用户始终进入同组;②定义核心指标(任务完成率、延迟、成本、用户满意度);③确保样本量足够后进行统计显著性检验;④逐步放量(5%→20%→50%→100%)。


✏️ 练习

练习1:自建评估Pipeline

为你的Agent系统构建一个包含以下指标的评估Pipeline:任务完成率、工具调用准确率、延迟和Token成本。至少准备10个测试用例。

练习2:LLM-as-Judge

实现Point-wise和Pair-wise两种LLM-as-Judge评估方法,对比GPT-4o-mini和GPT-4o在RAG问答任务中的表现。分析位置偏差的影响。

练习3:Token监控系统

实现一个完整的Token监控系统,包括:用量记录、成本计算、每日预算告警、异常检测。支持多模型的定价计算。

练习4:可观测性集成

将LangSmith或Phoenix集成到你的Agent项目中,实现完整的分布式追踪,能在UI中查看每次请求的执行链路和性能指标。


📚 参考资料 - LangSmith官方文档 - Phoenix (Arize AI) - OpenTelemetry Python - Weights & Biases Weave - 论文:"Judging LLM-as-a-Judge with MT-Bench and Chatbot Arena" (2024)


最后更新日期:2026-02-12 适用版本:LLM应用指南 v2026