Agent评估与可观测性¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
📌 Agent系统的非确定性、多步骤特性使其评估和监控远比传统软件复杂。掌握LangSmith等可观测性工具、LLM-as-Judge评估方法和成本优化策略,是构建可靠Agent系统的关键。
🎯 学习目标¶
- 理解Agent评估的独特挑战与评估维度
- 掌握LangSmith、Weights & Biases Weave、Phoenix等评估工具
- 熟练使用LLM-as-Judge方法评估Agent输出
- 理解分布式追踪与OpenTelemetry集成
- 掌握Token监控、成本优化与异常检测
- 能够搭建完整的Agent监控与评估体系
- 掌握Agent评估相关面试考点
19.1 Agent评估的挑战¶
19.1.1 为什么Agent评估很难¶
与传统软件测试不同,Agent系统面临以下独特挑战:
| 挑战 | 描述 | 影响 |
|---|---|---|
| 非确定性输出 | 相同输入可能产生不同输出 | 无法用简单的assert测试 |
| 多步骤交互 | 任务涉及多轮工具调用和推理 | 需要评估过程而非仅结果 |
| 工具调用正确性 | 不仅要选对工具,参数也要准确 | 需要细粒度的调用评估 |
| 中间状态依赖 | 后续步骤依赖前序步骤的正确性 | 错误会级联放大 |
| 环境交互 | Agent可能修改外部状态 | 测试需要隔离环境 |
| 评估标准模糊 | "好的回答"难以形式化定义 | 需要多维度评估 |
传统软件测试: input → function → output → assert(output == expected)
Agent系统评估: input → [think → tool_call → observe]×N → output → ???
↑ 每一步都需要评估 ↑
19.1.2 评估维度全景¶
"""Agent评估维度定义"""
from pydantic import BaseModel, Field
class AgentEvaluation(BaseModel): # Pydantic BaseModel:自动数据验证和序列化
"""Agent评估结果"""
# 1. 任务完成率
goal_achieved: bool = Field(description="是否完成最终目标")
goal_score: float = Field(description="目标完成程度 0-1")
# 2. 工具调用准确率
tool_calls_total: int = Field(description="总工具调用次数")
tool_calls_correct: int = Field(description="正确工具调用次数")
tool_accuracy: float = Field(description="工具调用准确率")
# 3. 推理质量
reasoning_coherence: float = Field(description="推理连贯性 0-1")
reasoning_relevance: float = Field(description="推理相关性 0-1")
# 4. 效率
total_steps: int = Field(description="总步骤数")
total_tokens: int = Field(description="消耗Token数")
latency_seconds: float = Field(description="总耗时(秒)")
# 5. 安全性
prompt_injection_detected: bool = Field(description="是否检测到注入攻击")
harmful_content_detected: bool = Field(description="是否生成有害内容")
pii_leakage: bool = Field(description="是否泄露个人信息")
19.2 评估框架与工具¶
19.2.1 LangSmith(LangChain生态)¶
LangSmith是LangChain官方的可观测性和评估平台,提供Tracing、Evaluation、Monitoring三大功能。
"""LangSmith集成示例"""
# pip install langsmith langchain-openai
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "agent-evaluation"
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langsmith import Client
from langsmith.evaluation import evaluate, LangChainStringEvaluator
# ============ 1. 自动Tracing ============
# 设置环境变量后,所有LangChain调用自动被追踪
llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_template("回答问题:{question}")
chain = prompt | llm
# 这次调用会自动记录到LangSmith
result = chain.invoke({"question": "什么是LangGraph?"})
# ============ 2. 创建评估数据集 ============
client = Client()
dataset_name = "agent-eval-dataset"
dataset = client.create_dataset(dataset_name, description="Agent评估数据集")
# 添加测试用例
test_cases = [
{
"input": {"question": "什么是RAG?"},
"expected_output": "RAG是检索增强生成,通过检索外部知识增强模型回答"
},
{
"input": {"question": "LangChain和LlamaIndex哪个更适合RAG?"},
"expected_output": "LlamaIndex更专注RAG场景,LangChain更通用"
}
]
for case in test_cases:
client.create_example(
inputs=case["input"],
outputs={"answer": case["expected_output"]},
dataset_id=dataset.id
)
# ============ 3. 运行评估 ============
def predict(inputs: dict) -> dict:
"""被评估的Agent/Chain"""
result = chain.invoke(inputs)
return {"answer": result.content}
# 使用内置评估器
eval_results = evaluate(
predict,
data=dataset_name,
evaluators=[
LangChainStringEvaluator("cot_qa"), # CoT问答评估
LangChainStringEvaluator("relevance"), # 相关性
LangChainStringEvaluator("helpfulness"), # 有用性
],
experiment_prefix="v1-gpt4o"
)
print(eval_results)
19.2.2 自定义LangSmith评估器¶
"""自定义LangSmith评估器"""
from langsmith.evaluation import RunEvaluator
from langsmith.schemas import Example, Run
class ToolUsageEvaluator(RunEvaluator):
"""评估Agent工具调用的准确性"""
def evaluate_run(self, run: Run, example: Example | None = None) -> dict:
# 从run中提取工具调用信息
tool_calls = []
for child in run.child_runs or []:
if child.run_type == "tool":
tool_calls.append({
"name": child.name,
"input": child.inputs,
"output": child.outputs,
"error": child.error
})
# 评估逻辑
expected_tools = example.outputs.get("expected_tools", []) if example else []
actual_tools = [tc["name"] for tc in tool_calls]
correct = sum(1 for t in actual_tools if t in expected_tools)
accuracy = correct / len(expected_tools) if expected_tools else 1.0
return {
"key": "tool_accuracy",
"score": accuracy,
"comment": f"调用工具: {actual_tools}, 期望: {expected_tools}"
}
class LatencyEvaluator(RunEvaluator):
"""评估响应延迟"""
def evaluate_run(self, run: Run, example: Example | None = None) -> dict:
latency = (run.end_time - run.start_time).total_seconds() if run.end_time else float('inf')
# 根据延迟评分
if latency < 3:
score = 1.0
elif latency < 10:
score = 0.7
elif latency < 30:
score = 0.4
else:
score = 0.1
return {
"key": "latency_score",
"score": score,
"comment": f"延迟: {latency:.2f}秒"
}
19.2.3 Weights & Biases Weave¶
"""W&B Weave评估示例"""
# pip install weave
import weave
weave.init("agent-eval-project")
# 使用@weave.op()装饰器自动追踪函数
@weave.op()
def my_agent(question: str) -> str:
"""被追踪的Agent函数"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke(question)
return response.content
# 定义评估指标
@weave.op()
def relevance_scorer(question: str, output: str) -> dict:
"""相关性评分"""
llm = ChatOpenAI(model="gpt-4o-mini")
score_response = llm.invoke(
f"评分(0-1):以下回答与问题的相关程度。\n问题:{question}\n回答:{output}\n只输出分数:"
)
try: # try/except捕获异常,防止程序崩溃
score = float(score_response.content.strip()) # 链式调用:strip去除空白
except ValueError:
score = 0.5
return {"relevance": score}
# 创建评估数据集
eval_dataset = [
{"question": "什么是Agent?"},
{"question": "如何构建RAG系统?"},
]
# 运行评估
evaluation = weave.Evaluation(
dataset=eval_dataset,
scorers=[relevance_scorer]
)
# await evaluation.evaluate(my_agent)
19.2.4 Phoenix(Arize AI)¶
"""Phoenix可观测性平台"""
# pip install arize-phoenix openinference-instrumentation-langchain
import phoenix as px
from openinference.instrumentation.langchain import LangChainInstrumentor
from phoenix.otel import register
# 启动Phoenix服务
# px.launch_app() # 本地UI: http://localhost:6006
# 配置OpenTelemetry追踪
tracer_provider = register(
project_name="agent-monitoring",
endpoint="http://localhost:6006/v1/traces"
)
# 自动instrumentation
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)
# 之后所有LangChain调用自动被追踪到Phoenix
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
result = llm.invoke("什么是Agent?")
# 在Phoenix UI中查看trace详情
19.2.5 自建评估Pipeline¶
"""自建Agent评估Pipeline"""
import time
import json
from dataclasses import dataclass, field
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import BaseCallbackHandler
@dataclass # @dataclass自动生成__init__等方法
class AgentTrace:
"""Agent执行追踪"""
task: str
steps: list[dict] = field(default_factory=list) # 每个实例创建独立的空列表,避免共享可变默认值
total_tokens: int = 0
start_time: float = 0
end_time: float = 0
@property # 将方法变为只读属性,调用时无需括号:trace.latency
def latency(self) -> float:
return self.end_time - self.start_time
@property # @property将方法变为属性访问
def num_steps(self) -> int:
return len(self.steps)
class TraceCollector(BaseCallbackHandler):
"""收集Agent执行轨迹的回调"""
def __init__(self):
self.traces: list[dict] = []
self.current_tokens = 0
def on_llm_start(self, serialized, prompts, **kwargs): # **kwargs收集关键字参数
self.traces.append({"type": "llm_start", "time": time.time()})
def on_llm_end(self, response, **kwargs):
usage = response.llm_output.get("token_usage", {}) if response.llm_output else {}
self.current_tokens += usage.get("total_tokens", 0)
self.traces.append({
"type": "llm_end",
"time": time.time(),
"tokens": usage.get("total_tokens", 0)
})
def on_tool_start(self, serialized, input_str, **kwargs):
self.traces.append({
"type": "tool_start",
"tool": serialized.get("name", "unknown"),
"input": input_str,
"time": time.time()
})
def on_tool_end(self, output, **kwargs):
self.traces.append({
"type": "tool_end",
"output": str(output)[:500],
"time": time.time()
})
class AgentEvaluator:
"""Agent评估器"""
def __init__(self, judge_llm=None):
self.judge_llm = judge_llm or ChatOpenAI(model="gpt-4o", temperature=0)
def evaluate_goal_achievement(self, task: str, result: str) -> float:
"""评估任务完成度"""
response = self.judge_llm.invoke(
f"""评估以下Agent的任务完成度(0-1分):
任务:{task}
结果:{result}
请只输出分数(0到1之间的小数):"""
)
try:
return float(response.content.strip())
except ValueError:
return 0.5
def evaluate_tool_usage(self, trace: AgentTrace, expected_tools: list[str]) -> dict:
"""评估工具使用情况"""
actual_tools = [
s["tool"] for s in trace.steps
if s.get("type") == "tool_start"
]
correct = sum(1 for t in actual_tools if t in expected_tools)
unnecessary = sum(1 for t in actual_tools if t not in expected_tools)
missed = sum(1 for t in expected_tools if t not in actual_tools)
precision = correct / len(actual_tools) if actual_tools else 0
recall = correct / len(expected_tools) if expected_tools else 1
return {
"precision": precision,
"recall": recall,
"unnecessary_calls": unnecessary,
"missed_tools": missed
}
def evaluate_efficiency(self, trace: AgentTrace) -> dict:
"""评估执行效率"""
return {
"total_steps": trace.num_steps,
"total_tokens": trace.total_tokens,
"latency_seconds": trace.latency,
"tokens_per_step": trace.total_tokens / max(trace.num_steps, 1),
"cost_estimate_usd": trace.total_tokens * 0.00001 # 粗略估算
}
def full_evaluation(self, task: str, result: str, trace: AgentTrace, expected_tools: list[str] = None) -> dict:
"""完整评估"""
return {
"goal_achievement": self.evaluate_goal_achievement(task, result),
"tool_usage": self.evaluate_tool_usage(trace, expected_tools or []),
"efficiency": self.evaluate_efficiency(trace)
}
19.3 LLM-as-Judge¶
19.3.1 核心思想¶
使用强大的LLM(如GPT-4o)作为"裁判"评估其他LLM的输出质量。
"""LLM-as-Judge评估框架"""
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
class JudgeResult(BaseModel):
"""裁判评分结果"""
score: int = Field(description="评分(1-5)")
reasoning: str = Field(description="评分理由")
strengths: list[str] = Field(description="优点")
weaknesses: list[str] = Field(description="不足")
judge_llm = ChatOpenAI(model="gpt-4o", temperature=0)
# --- 单输出评估(Point-wise)---
pointwise_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个专业的AI输出评估专家。请根据以下维度评估AI助手的回答(1-5分):
评分标准:
5分 - 完美:准确、完整、清晰、有帮助
4分 - 优秀:基本准确,有少量遗漏
3分 - 一般:部分正确,但有明显不足
2分 - 较差:多处错误或不相关
1分 - 极差:完全错误或有害
评估维度:
- 准确性:信息是否正确
- 完整性:是否覆盖问题要点
- 清晰度:表述是否清楚
- 有用性:对用户是否有帮助"""),
("human", """用户问题:{question}
AI回答:{answer}
参考答案(如有):{reference}
请给出评分和详细理由:""")
])
pointwise_judge = pointwise_prompt | judge_llm.with_structured_output(JudgeResult)
# --- 对比评估(Pair-wise)---
class PairwiseResult(BaseModel):
winner: str = Field(description="获胜者: A或B或tie")
reasoning: str = Field(description="判断理由")
pairwise_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个公正的裁判,比较两个AI回答的质量。"),
("human", """问题:{question}
回答A:{answer_a}
回答B:{answer_b}
哪个回答更好?请分析原因。""")
])
pairwise_judge = pairwise_prompt | judge_llm.with_structured_output(PairwiseResult)
# --- 使用示例 ---
# 单输出评估
result = pointwise_judge.invoke({
"question": "什么是RAG?",
"answer": "RAG是检索增强生成,通过检索增强模型回答。",
"reference": "RAG(Retrieval-Augmented Generation)通过检索外部知识库来增强大模型回答。"
})
print(f"评分: {result.score}/5")
print(f"理由: {result.reasoning}")
# 对比评估
pair_result = pairwise_judge.invoke({
"question": "什么是RAG?",
"answer_a": "RAG是一种AI技术。",
"answer_b": "RAG是检索增强生成,通过检索外部知识来增强语言模型的回答质量,减少幻觉。"
})
print(f"获胜者: {pair_result.winner}")
19.3.2 评分一致性优化¶
"""提高LLM-as-Judge评分一致性的技巧"""
# 技巧1: 提供详细的评分rubric
rubric_prompt = """评分Rubric:
5 - 完全正确且全面:回答准确、完整、有条理、包含示例
4 - 基本正确:主要观点正确,可能缺少细节或示例
3 - 部分正确:有正确信息但也有错误或重大遗漏
2 - 大部分错误:核心概念理解有误
1 - 完全错误:信息严重错误或与问题无关
"""
# 技巧2: 多次评估取平均(减少随机性)
def robust_judge(question, answer, n_trials=3):
scores = []
for _ in range(n_trials):
result = pointwise_judge.invoke({
"question": question,
"answer": answer,
"reference": ""
})
scores.append(result.score)
return {
"mean_score": sum(scores) / len(scores),
"scores": scores,
"variance": max(scores) - min(scores)
}
# 技巧3: 位置偏差消除(对比评估时交换AB顺序)
def unbiased_pairwise(question, answer_a, answer_b):
# 正向评估
result1 = pairwise_judge.invoke({
"question": question,
"answer_a": answer_a,
"answer_b": answer_b
})
# 反向评估(交换AB)
result2 = pairwise_judge.invoke({
"question": question,
"answer_a": answer_b,
"answer_b": answer_a
})
# 综合判断
if result1.winner == "A" and result2.winner == "B":
return "A wins (consistent)"
elif result1.winner == "B" and result2.winner == "A":
return "B wins (consistent)"
else:
return "Tie / Inconsistent"
19.4 可观测性¶
19.4.1 分布式追踪架构¶
Trace(一次完整的Agent执行)
├── Span: 用户输入处理 [10ms]
├── Span: LLM调用 - 规划 [2.3s, 1500 tokens]
│ └── Span: API请求 [2.1s]
├── Span: 工具调用 - 搜索 [500ms]
│ ├── Span: 查询构建 [5ms]
│ └── Span: API请求 [490ms]
├── Span: LLM调用 - 分析 [3.1s, 2000 tokens]
├── Span: 工具调用 - 数据库 [200ms]
└── Span: LLM调用 - 生成答案 [1.8s, 800 tokens]
总计: 7.91s, 4300 tokens, $0.043
19.4.2 OpenTelemetry集成¶
"""OpenTelemetry集成Agent追踪"""
# pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
import functools
import time
# 配置TracerProvider
resource = Resource.create({"service.name": "agent-service"})
tracer_provider = TracerProvider(resource=resource)
# 导出到OTLP(支持Jaeger、Phoenix等)
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("agent.tracer")
# 追踪装饰器
def trace_agent(func):
"""Agent函数追踪装饰器"""
@functools.wraps(func) # @wraps保留被装饰函数的原始信息
def wrapper(*args, **kwargs): # *args收集位置参数,**kwargs收集关键字参数,使装饰器兼容任意函数签名
with tracer.start_as_current_span(
func.__name__,
attributes={
"agent.function": func.__name__,
"agent.args": str(args)[:500]
}
) as span:
start = time.time()
try:
result = func(*args, **kwargs)
span.set_attribute("agent.success", True)
span.set_attribute("agent.result_length", len(str(result)))
return result
except Exception as e:
span.set_attribute("agent.success", False)
span.set_attribute("agent.error", str(e))
span.record_exception(e)
raise
finally:
span.set_attribute("agent.latency_ms", (time.time() - start) * 1000)
return wrapper
# 使用追踪
@trace_agent
def search_documents(query: str) -> list[str]:
"""检索文档"""
with tracer.start_as_current_span("vector_search") as span:
span.set_attribute("search.query", query)
# 模拟检索
results = [f"文档{i}" for i in range(3)]
span.set_attribute("search.num_results", len(results))
return results
@trace_agent
def generate_answer(query: str, context: list[str]) -> str:
"""生成回答"""
with tracer.start_as_current_span("llm_call") as span:
span.set_attribute("llm.model", "gpt-4o")
span.set_attribute("llm.context_length", sum(len(c) for c in context))
# 模拟LLM调用
answer = f"基于{len(context)}篇文档的回答"
span.set_attribute("llm.output_length", len(answer))
return answer
19.4.3 Token监控与成本优化¶
"""Token使用量监控与成本优化"""
from dataclasses import dataclass, field
from datetime import datetime
import json
@dataclass
class TokenUsage:
"""Token使用记录"""
timestamp: str
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int
estimated_cost: float
class TokenMonitor:
"""Token使用量监控器"""
# 模型定价(美元/1K tokens,截至2025年)
PRICING = {
"gpt-4o": {"input": 0.0025, "output": 0.01},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"o4-mini": {"input": 0.0011, "output": 0.0044},
"claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
"deepseek-v3": {"input": 0.00027, "output": 0.0011},
}
def __init__(self):
self.usage_log: list[TokenUsage] = []
self.daily_budget: float = 10.0 # 每日预算(美元)
self.alert_threshold: float = 0.8 # 80%预算告警
def log_usage(self, model: str, prompt_tokens: int, completion_tokens: int):
"""记录Token使用"""
pricing = self.PRICING.get(model, {"input": 0.01, "output": 0.03})
cost = (prompt_tokens / 1000 * pricing["input"] +
completion_tokens / 1000 * pricing["output"])
usage = TokenUsage(
timestamp=datetime.now().isoformat(),
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
estimated_cost=cost
)
self.usage_log.append(usage)
# 检查预算
daily_cost = self.get_daily_cost()
if daily_cost >= self.daily_budget * self.alert_threshold:
print(f"⚠️ Token预算告警:已使用 ${daily_cost:.4f} / ${self.daily_budget}")
return usage
def get_daily_cost(self) -> float:
"""获取今日总成本"""
today = datetime.now().date().isoformat()
return sum(
u.estimated_cost for u in self.usage_log
if u.timestamp.startswith(today)
)
def get_summary(self) -> dict:
"""获取使用汇总"""
total_tokens = sum(u.total_tokens for u in self.usage_log)
total_cost = sum(u.estimated_cost for u in self.usage_log)
by_model = {}
for u in self.usage_log:
if u.model not in by_model:
by_model[u.model] = {"tokens": 0, "cost": 0}
by_model[u.model]["tokens"] += u.total_tokens
by_model[u.model]["cost"] += u.estimated_cost
return {
"total_tokens": total_tokens,
"total_cost_usd": round(total_cost, 4),
"num_calls": len(self.usage_log),
"by_model": by_model
}
# 使用示例
monitor = TokenMonitor()
monitor.log_usage("gpt-4o", prompt_tokens=1000, completion_tokens=500)
monitor.log_usage("gpt-4o-mini", prompt_tokens=2000, completion_tokens=800)
print(json.dumps(monitor.get_summary(), indent=2, ensure_ascii=False)) # json.dumps将Python对象→JSON字符串
19.4.4 异常检测与告警¶
"""Agent异常检测"""
from collections import deque
import statistics
class AnomalyDetector:
"""基于滑动窗口的异常检测"""
def __init__(self, window_size: int = 100, z_threshold: float = 3.0):
self.window_size = window_size
self.z_threshold = z_threshold
self.latency_window = deque(maxlen=window_size) # deque(maxlen=N):固定长度队列,满时自动丢弃最旧元素,实现滑动窗口
self.token_window = deque(maxlen=window_size) # deque双端队列,两端操作O(1)
self.error_count = 0
self.total_count = 0
def record(self, latency: float, tokens: int, is_error: bool = False):
"""记录一次请求"""
self.latency_window.append(latency)
self.token_window.append(tokens)
self.total_count += 1
if is_error:
self.error_count += 1
alerts = []
# 延迟异常检测(Z-score)
if len(self.latency_window) >= 10:
mean = statistics.mean(self.latency_window)
std = statistics.stdev(self.latency_window) or 1
z_score = (latency - mean) / std
if z_score > self.z_threshold:
alerts.append(f"🚨 延迟异常: {latency:.2f}s (均值{mean:.2f}s, Z={z_score:.1f})")
# Token用量异常
if len(self.token_window) >= 10:
mean = statistics.mean(self.token_window)
std = statistics.stdev(self.token_window) or 1
z_score = (tokens - mean) / std
if z_score > self.z_threshold:
alerts.append(f"🚨 Token异常: {tokens} (均值{mean:.0f}, Z={z_score:.1f})")
# 错误率检测
error_rate = self.error_count / self.total_count if self.total_count > 0 else 0
if error_rate > 0.1 and self.total_count >= 20:
alerts.append(f"🚨 错误率过高: {error_rate:.1%} ({self.error_count}/{self.total_count})")
return alerts
detector = AnomalyDetector()
# 正常请求
for _ in range(50):
detector.record(latency=2.0, tokens=1000)
# 异常请求
alerts = detector.record(latency=30.0, tokens=10000)
for alert in alerts:
print(alert)
19.5 A/B测试与在线评估¶
"""Agent A/B测试框架"""
import random
import hashlib
from dataclasses import dataclass
@dataclass
class ABTestConfig:
"""A/B测试配置"""
name: str
variant_a: dict # 配置A(对照组)
variant_b: dict # 配置B(实验组)
traffic_split: float = 0.5 # B组流量比例
class AgentABTest:
"""Agent A/B测试"""
def __init__(self, config: ABTestConfig):
self.config = config
self.results = {"A": [], "B": []}
def get_variant(self, user_id: str) -> str:
"""基于用户ID确定性分组"""
hash_val = int(hashlib.md5(
f"{self.config.name}:{user_id}".encode()
).hexdigest(), 16)
return "B" if (hash_val % 100) < (self.config.traffic_split * 100) else "A"
def record_result(self, variant: str, metrics: dict):
"""记录结果"""
self.results[variant].append(metrics)
def analyze(self) -> dict:
"""分析A/B测试结果"""
analysis = {}
for variant in ["A", "B"]:
results = self.results[variant]
if results:
analysis[variant] = {
"n": len(results),
"avg_score": sum(r.get("score", 0) for r in results) / len(results),
"avg_latency": sum(r.get("latency", 0) for r in results) / len(results),
"avg_cost": sum(r.get("cost", 0) for r in results) / len(results),
}
return analysis
# 使用示例
ab_test = AgentABTest(ABTestConfig(
name="model-comparison",
variant_a={"model": "gpt-4o-mini", "temperature": 0},
variant_b={"model": "gpt-4o", "temperature": 0},
traffic_split=0.3 # 30%流量给B组
))
19.6 Prompt版本管理¶
"""Prompt版本管理"""
import json
import hashlib
from datetime import datetime
from pathlib import Path
class PromptRegistry:
"""Prompt版本注册表"""
def __init__(self, storage_dir: str = "./prompts"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.registry: dict[str, list[dict]] = {}
def register(self, name: str, template: str, metadata: dict = None) -> str:
"""注册新版本的Prompt"""
version_hash = hashlib.md5(template.encode()).hexdigest()[:8]
version_info = {
"version": version_hash,
"template": template,
"metadata": metadata or {},
"created_at": datetime.now().isoformat(),
"is_active": True
}
if name not in self.registry:
self.registry[name] = []
# 将之前版本设为非活跃
for v in self.registry[name]:
v["is_active"] = False
self.registry[name].append(version_info)
# 持久化
self._save()
return version_hash
def get_active(self, name: str) -> str | None:
"""获取活跃版本的Prompt"""
if name not in self.registry:
return None
for v in reversed(self.registry[name]):
if v["is_active"]:
return v["template"]
return None
def rollback(self, name: str, version: str):
"""回滚到指定版本"""
if name in self.registry:
for v in self.registry[name]:
v["is_active"] = (v["version"] == version)
self._save()
def _save(self):
with open(self.storage_dir / "registry.json", "w") as f: # with自动管理文件关闭
json.dump(self.registry, f, ensure_ascii=False, indent=2)
# 使用
registry = PromptRegistry()
v1 = registry.register(
"rag_answer",
"基于上下文回答问题。上下文:{context}\n问题:{question}",
{"author": "team", "note": "初始版本"}
)
v2 = registry.register(
"rag_answer",
"你是一个专业助手。严格基于上下文回答,如不确定请说明。\n上下文:{context}\n问题:{question}",
{"author": "team", "note": "增加角色设定和不确定性处理"}
)
print(f"当前活跃Prompt: {registry.get_active('rag_answer')}")
19.7 完整代码:搭建LangSmith监控的Agent系统¶
"""完整示例:带LangSmith监控的Agent系统"""
import os
import time
from typing import TypedDict, Annotated, Literal
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
# ============ 环境配置 ============
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "monitored-agent"
# ============ 工具定义 ============
@tool
def search_knowledge_base(query: str) -> str:
"""搜索内部知识库"""
return f"知识库搜索结果:关于'{query}'的相关文档..."
@tool
def query_database(sql: str) -> str:
"""查询业务数据库"""
return f"数据库查询结果:{sql} 返回5条记录"
@tool
def send_notification(message: str, channel: str = "slack") -> str:
"""发送通知"""
return f"通知已发送到{channel}: {message}"
tools = [search_knowledge_base, query_database, send_notification]
tool_node = ToolNode(tools)
# ============ State定义 ============
class MonitoredAgentState(TypedDict): # TypedDict定义类型化字典
messages: Annotated[list, add_messages] # Annotated附加元数据注解
step_count: int
# ============ Agent节点 ============
llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools)
def agent_node(state: MonitoredAgentState) -> dict:
"""Agent推理节点"""
system = SystemMessage(content="""你是一个企业助手Agent。
你可以:搜索知识库、查询数据库、发送通知。
请根据用户需求选择合适的工具完成任务。""")
messages = [system] + state["messages"]
response = llm.invoke(messages)
return {
"messages": [response],
"step_count": state.get("step_count", 0) + 1
}
def should_continue(state: MonitoredAgentState) -> Literal["tools", "end"]:
"""路由决策"""
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls: # hasattr检查对象是否有某属性
if state.get("step_count", 0) < 10: # 防止无限循环
return "tools"
return "end"
# ============ 构建Graph ============
workflow = StateGraph(MonitoredAgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})
workflow.add_edge("tools", "agent")
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# ============ 执行(自动追踪到LangSmith)============
config = {"configurable": {"thread_id": "monitored-001"}}
result = app.invoke(
{
"messages": [HumanMessage(content="帮我查一下上个月的销售数据,然后搜索相关的市场分析报告")],
"step_count": 0
},
config=config
)
# 所有执行轨迹、Token用量、延迟都自动记录在LangSmith中
for msg in result["messages"]:
if hasattr(msg, "content") and msg.content:
print(f"[{msg.type}]: {msg.content[:200]}")
📋 面试要点¶
高频面试题¶
Q1: Agent评估和传统软件测试有什么区别?
答:主要区别:①非确定性——相同输入可能不同输出,无法用简单assert;②多步骤——需评估每步而非仅结果;③工具调用——需评估工具选择和参数准确性;④评估标准模糊——"好的回答"难以形式化。因此需要LLM-as-Judge、多维度评估和统计方法。
Q2: LLM-as-Judge有什么优缺点?如何提高评分一致性?
答:优点:可扩展、无需人工标注、灵活。缺点:自身可能有偏见、成本高、存在位置偏差。提高一致性:①提供详细评分Rubric;②多次评估取平均;③对比评估时交换AB顺序消除位置偏差;④使用强模型(GPT-4o)作为评判。
Q3: 如何监控Agent系统的Token成本?
答:①在每次LLM调用后记录prompt_tokens和completion_tokens;②根据模型定价计算成本;③设置每日/每月预算告警;④使用滑动窗口检测异常token用量;⑤通过模型降级策略优化成本(简单问题用小模型)。
Q4: 什么是分布式追踪?在Agent系统中如何应用?
答:分布式追踪将一次完整执行分解为Trace→Span的层级结构。在Agent中,一次用户请求是一个Trace,每次LLM调用、工具调用是一个Span。通过OpenTelemetry标准接入,可在LangSmith/Phoenix等平台可视化查看每步延迟、Token消耗和错误信息。
Q5: 如何设计Agent的A/B测试?
答:①基于用户ID确定性分组(MD5哈希取模),保证同一用户始终进入同组;②定义核心指标(任务完成率、延迟、成本、用户满意度);③确保样本量足够后进行统计显著性检验;④逐步放量(5%→20%→50%→100%)。
✏️ 练习¶
练习1:自建评估Pipeline¶
为你的Agent系统构建一个包含以下指标的评估Pipeline:任务完成率、工具调用准确率、延迟和Token成本。至少准备10个测试用例。
练习2:LLM-as-Judge¶
实现Point-wise和Pair-wise两种LLM-as-Judge评估方法,对比GPT-4o-mini和GPT-4o在RAG问答任务中的表现。分析位置偏差的影响。
练习3:Token监控系统¶
实现一个完整的Token监控系统,包括:用量记录、成本计算、每日预算告警、异常检测。支持多模型的定价计算。
练习4:可观测性集成¶
将LangSmith或Phoenix集成到你的Agent项目中,实现完整的分布式追踪,能在UI中查看每次请求的执行链路和性能指标。
📚 参考资料 - LangSmith官方文档 - Phoenix (Arize AI) - OpenTelemetry Python - Weights & Biases Weave - 论文:"Judging LLM-as-a-Judge with MT-Bench and Chatbot Arena" (2024)
最后更新日期:2026-02-12 适用版本:LLM应用指南 v2026