多Agent系统与实战¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
掌握多Agent架构模式、通信协议和工作流编排,构建一个完整的自动化研究团队系统。
📌 定位说明:本章聚焦多Agent系统的编码实战(架构实现、A2A通信、项目开发)。 - 多Agent框架对比(LangGraph/CrewAI/AutoGen)→ LLM应用/17-多Agent框架 - 新一代Agent研究前沿(Manus/Claude Code等)→ LLM学习/04-前沿探索/08-新一代AI Agent
📖 章节导读¶
当单个Agent无法胜任复杂任务时,我们需要多个Agent协作。多Agent系统(Multi-Agent System, MAS)是AI Agent的高级形态——多个具有不同专长的Agent组成团队,通过通信和协调来完成复杂任务。本章将学习多Agent架构模式、A2A通信协议、工作流编排,以及Agent评估方法,并完成一个完整的自动化研究团队项目。
🎯 学习目标¶
- 理解多Agent架构模式(层级/平等/市场)
- 了解Agent-to-Agent (A2A)通信协议
- 掌握工作流编排与状态管理
- 学会复杂任务分解策略
- 掌握Agent评估方法
- 完成综合实战项目:自动化研究团队
📖 前置知识¶
- Agent框架实战经验(第二章)
- MCP工具开发(第三章)
- Python异步编程
- 分布式系统基本概念
1. 多Agent架构模式¶
1.1 架构模式总览¶
多Agent系统主要有三种架构模式:
┌─────────────────────────────────────────────────────────┐
│ 多Agent架构模式 │
│ │
│ ① 层级模式 ② 平等模式 ③ 市场模式 │
│ │
│ Manager A ←→ B A → 任务池 ← B │
│ ┌─┴─┐ ↕ ↕ C → 任务池 ← D │
│ A B C C ←→ D │
│ │
│ 适用: 流程明确 适用: 自由协作 适用: 动态分配 │
│ 例: 项目管理 例: 头脑风暴 例: 众包平台 │
└─────────────────────────────────────────────────────────┘
1.2 层级模式(Hierarchical)¶
在层级模式中,有一个Manager Agent负责任务分配和协调。
"""
层级模式:Manager Agent分配任务给Worker Agent
"""
from openai import OpenAI
import json
from typing import Any
from dataclasses import dataclass
client = OpenAI()
@dataclass
class AgentProfile:
"""Agent角色定义"""
name: str
role: str
capabilities: list[str]
system_prompt: str
class HierarchicalSystem:
"""层级式多Agent系统"""
def __init__(self):
self.manager = AgentProfile(
name="项目经理",
role="manager",
capabilities=["任务分解", "任务分配", "结果整合"],
system_prompt="""你是一个项目经理Agent。你的职责是:
1. 分析用户的复杂任务
2. 将任务分解为子任务
3. 分配给合适的团队成员
4. 整合各成员的结果
你的团队成员:
- 研究员:擅长信息搜索和分析
- 工程师:擅长编写代码和技术方案
- 写作者:擅长报告和文档撰写
请以JSON格式返回任务分配计划:
{"subtasks": [{"assignee": "研究员/工程师/写作者", "task": "具体任务", "priority": 1-3}]}"""
)
self.workers = {
"研究员": AgentProfile(
name="研究员",
role="researcher",
capabilities=["搜索", "分析", "总结"],
system_prompt="你是一个专业研究员。请深入研究指定主题,提供详细的分析和数据支持。",
),
"工程师": AgentProfile(
name="工程师",
role="engineer",
capabilities=["编码", "架构设计", "技术选型"],
system_prompt="你是一个高级软件工程师。请提供高质量的技术方案和代码实现。",
),
"写作者": AgentProfile(
name="写作者",
role="writer",
capabilities=["写作", "编辑", "排版"],
system_prompt="你是一个专业文档写作者。请将信息整理成清晰、专业的文档。",
),
}
def _call_llm(self, system_prompt: str, user_message: str) -> str:
"""调用LLM"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
temperature=0,
)
return response.choices[0].message.content
def decompose_task(self, task: str) -> list[dict]:
"""Manager分解任务"""
print(f"📋 [Manager] 开始分解任务: {task}")
response = self._call_llm(
self.manager.system_prompt,
f"请分解以下任务: {task}"
)
# 尝试解析JSON
try:
# 提取JSON部分
json_str = response
if "```json" in response:
json_str = response.split("```json")[1].split("```")[0]
elif "```" in response:
json_str = response.split("```")[1].split("```")[0]
plan = json.loads(json_str)
subtasks = plan.get("subtasks", [])
except (json.JSONDecodeError, IndexError):
# 回退方案
subtasks = [
{"assignee": "研究员", "task": f"研究关于'{task}'的信息", "priority": 1},
{"assignee": "工程师", "task": f"为'{task}'提供技术方案", "priority": 2},
{"assignee": "写作者", "task": f"撰写'{task}'的最终报告", "priority": 3},
]
print(f" ✅ 分解为 {len(subtasks)} 个子任务")
for st in subtasks:
print(f" → [{st['assignee']}] {st['task']}")
return subtasks
def execute_subtask(self, subtask: dict) -> str:
"""Worker执行子任务"""
assignee = subtask["assignee"]
task = subtask["task"]
worker = self.workers.get(assignee)
if not worker:
return f"❌ 未找到Agent: {assignee}"
print(f"\n🔧 [{worker.name}] 执行任务: {task}")
result = self._call_llm(worker.system_prompt, task)
print(f" ✅ [{worker.name}] 完成")
return result
def integrate_results(self, task: str, results: list[dict]) -> str:
"""Manager整合结果"""
print(f"\n📊 [Manager] 整合所有结果...")
results_text = "\n\n".join([
f"### {r['assignee']}的工作结果:\n{r['result']}"
for r in results
])
integration_prompt = f"""作为项目经理,请整合以下团队成员的工作结果,生成一份完整的最终报告。
原始任务: {task}
团队成员的工作结果:
{results_text}
请生成一份结构化的最终报告,包含:摘要、详细内容、结论和建议。"""
final_report = self._call_llm(self.manager.system_prompt, integration_prompt)
return final_report
def run(self, task: str) -> str:
"""运行完整的多Agent工作流"""
print("=" * 60)
print(f"🚀 启动多Agent系统")
print(f"📌 任务: {task}")
print("=" * 60)
# 1. 任务分解
subtasks = self.decompose_task(task)
# 2. 按优先级排序并执行
subtasks.sort(key=lambda x: x.get("priority", 99)) # sort原地排序,lambda+dict.get提供默认优先级99(无优先级的排最后)
results = []
for subtask in subtasks:
result = self.execute_subtask(subtask)
results.append({
"assignee": subtask["assignee"],
"task": subtask["task"],
"result": result,
})
# 3. 整合结果
final_report = self.integrate_results(task, results)
print("\n" + "=" * 60)
print("📋 最终报告:")
print("=" * 60)
return final_report
# 使用示例
system = HierarchicalSystem()
report = system.run("分析AI Agent技术在金融领域的应用前景,并给出具体的落地建议")
print(report)
1.3 平等模式(Peer-to-Peer)¶
Agent之间可以直接通信和协作,没有中心化调度。
"""
平等模式:Agent之间直接通信协作
适用于头脑风暴、辩论、多角度分析等场景
"""
from openai import OpenAI
from dataclasses import dataclass, field
client = OpenAI()
@dataclass
class PeerAgent:
"""平等协作Agent"""
name: str
perspective: str # 分析视角
system_prompt: str
memory: list[dict] = field(default_factory=list) # dataclass可变默认值必须用field(default_factory=),避免共享引用
def respond(self, discussion_context: str) -> str:
"""参与讨论并发表观点"""
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": discussion_context},
]
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.7, # 稍高温度以增加多样性
max_tokens=500,
)
return response.choices[0].message.content
class PeerDiscussion:
"""平等讨论系统"""
def __init__(self, topic: str, rounds: int = 3):
self.topic = topic
self.rounds = rounds
self.agents: list[PeerAgent] = []
self.discussion_log: list[dict] = []
def add_agent(self, agent: PeerAgent):
self.agents.append(agent)
def run(self) -> str:
"""运行多轮讨论"""
print(f"📢 讨论主题: {self.topic}")
print(f"👥 参与者: {', '.join(a.name for a in self.agents)}")
print(f"🔄 讨论轮数: {self.rounds}")
print("=" * 60)
for round_num in range(1, self.rounds + 1):
print(f"\n--- 第 {round_num} 轮 ---")
for agent in self.agents:
# 构建讨论上下文
context = f"讨论主题: {self.topic}\n\n"
if self.discussion_log:
context += "之前的讨论:\n"
for entry in self.discussion_log[-6:]: # 最近6条
context += f"[{entry['agent']}]: {entry['content']}\n\n"
if round_num == 1:
context += f"请从你的专业角度({agent.perspective})发表你对该主题的看法。"
else:
context += f"请基于其他人的观点,从你的角度({agent.perspective})进一步补充或反驳。"
# Agent发言
response = agent.respond(context)
self.discussion_log.append({
"round": round_num,
"agent": agent.name,
"content": response,
})
print(f"\n🗣️ [{agent.name}]:")
print(f" {response[:200]}...")
# 生成讨论总结
return self._summarize()
def _summarize(self) -> str:
"""总结讨论结果"""
all_discussion = "\n\n".join([
f"[{entry['agent']} - 第{entry['round']}轮]: {entry['content']}"
for entry in self.discussion_log
])
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个公正的讨论总结者。请综合各方观点,生成一份平衡、全面的总结报告。"},
{"role": "user", "content": f"讨论主题: {self.topic}\n\n讨论内容:\n{all_discussion}\n\n请生成讨论总结,包含:共识点、分歧点、关键洞察和建议。"},
],
)
return response.choices[0].message.content
# 使用示例: 多角度分析
discussion = PeerDiscussion("AI Agent是否会取代传统软件开发?", rounds=2)
discussion.add_agent(PeerAgent(
name="技术乐观派",
perspective="技术进步和创新",
system_prompt="你是一个技术乐观主义者,相信AI Agent将极大提升软件开发效率和质量。请用具体案例和数据支持你的观点。",
))
discussion.add_agent(PeerAgent(
name="务实工程师",
perspective="工程实践",
system_prompt="你是一位务实的软件工程师,从实际工程经验出发分析AI Agent的能力边界和局限性。",
))
discussion.add_agent(PeerAgent(
name="产业分析师",
perspective="商业和就业影响",
system_prompt="你是一位产业分析师,关注AI Agent对软件行业商业模式和就业市场的影响。",
))
summary = discussion.run()
print(f"\n{'='*60}\n📝 讨论总结:\n{summary}")
1.4 市场模式(Market-based)¶
Agent通过"竞标"或"任务池"动态分配任务。
"""
市场模式:基于任务池的动态任务分配
Agent根据能力和负载竞争任务
"""
from dataclasses import dataclass, field
import random
@dataclass
class Task:
"""任务定义"""
id: str
description: str
required_skills: list[str]
complexity: int # 1-5
reward: float # 奖励积分
status: str = "pending" # pending/assigned/completed
assigned_to: str | None = None
result: str | None = None
@dataclass
class MarketAgent:
"""市场中的Agent"""
name: str
skills: list[str]
capacity: int = 3 # 最大并行任务数
current_tasks: int = 0
total_reward: float = 0.0
def can_handle(self, task: Task) -> bool:
"""判断是否能处理该任务"""
# 有能力且有空闲
has_skills = all(s in self.skills for s in task.required_skills) # all():所有需求技能都在自身技能列表中才返回True
has_capacity = self.current_tasks < self.capacity
return has_skills and has_capacity
def bid(self, task: Task) -> float:
"""对任务出价(置信度)"""
if not self.can_handle(task):
return 0.0
# 技能匹配度
skill_match = len(set(task.required_skills) & set(self.skills)) / len(task.required_skills)
# 空闲率
availability = 1 - (self.current_tasks / self.capacity)
return skill_match * 0.7 + availability * 0.3
def execute(self, task: Task) -> str:
"""执行任务(模拟)"""
self.current_tasks += 1
result = f"[{self.name}] 已完成任务: {task.description}"
self.current_tasks -= 1
self.total_reward += task.reward
return result
class TaskMarket:
"""任务市场"""
def __init__(self):
self.agents: list[MarketAgent] = []
self.task_queue: list[Task] = []
self.completed_tasks: list[Task] = []
def register_agent(self, agent: MarketAgent):
"""注册Agent"""
self.agents.append(agent)
print(f"✅ Agent注册: {agent.name} (技能: {', '.join(agent.skills)})")
def submit_task(self, task: Task):
"""提交任务"""
self.task_queue.append(task)
print(f"📌 新任务: [{task.id}] {task.description} (需要: {', '.join(task.required_skills)})")
def allocate_tasks(self):
"""分配所有待分配的任务"""
print(f"\n🔄 开始任务分配...")
for task in self.task_queue:
if task.status != "pending":
continue
# 收集所有出价
bids = []
for agent in self.agents:
bid_value = agent.bid(task)
if bid_value > 0:
bids.append((agent, bid_value))
if not bids:
print(f" ⚠️ 任务 [{task.id}] 无Agent可处理")
continue
# 选择出价最高的Agent
bids.sort(key=lambda x: x[1], reverse=True) # 按元组第二个元素(出价)降序排序,bids[0]即最高出价
winner = bids[0][0]
task.status = "assigned"
task.assigned_to = winner.name
print(f" 📋 任务 [{task.id}] → {winner.name} (置信度: {bids[0][1]:.2f})")
def execute_all(self):
"""执行所有已分配的任务"""
print(f"\n⚡ 执行任务...")
for task in self.task_queue:
if task.status != "assigned":
continue
agent = next(a for a in self.agents if a.name == task.assigned_to) # next+生成器:找第一个匹配的Agent,无匹配时抛StopIteration
task.result = agent.execute(task)
task.status = "completed"
self.completed_tasks.append(task)
print(f" ✅ {task.result}")
def report(self):
"""生成报告"""
print(f"\n{'='*50}")
print(f"📊 任务市场报告")
print(f" 完成任务数: {len(self.completed_tasks)}")
print(f" Agent表现:")
for agent in self.agents:
print(f" {agent.name}: 积分={agent.total_reward:.1f}")
# 使用示例
market = TaskMarket()
# 注册Agent
market.register_agent(MarketAgent("搜索专家", ["搜索", "信息提取"]))
market.register_agent(MarketAgent("数据分析师", ["数据分析", "可视化", "统计"]))
market.register_agent(MarketAgent("全栈工程师", ["编码", "架构", "搜索"]))
market.register_agent(MarketAgent("技术写手", ["写作", "编辑", "信息提取"]))
# 提交任务
market.submit_task(Task("T1", "搜索AI Agent最新论文", ["搜索", "信息提取"], 2, 10))
market.submit_task(Task("T2", "分析框架使用趋势数据", ["数据分析", "可视化"], 3, 15))
market.submit_task(Task("T3", "编写Agent系统代码", ["编码", "架构"], 4, 20))
market.submit_task(Task("T4", "撰写技术调研报告", ["写作", "信息提取"], 2, 10))
# 分配和执行
market.allocate_tasks()
market.execute_all()
market.report()
1.5 Mixture-of-Agents (MoA) 架构¶
Mixture-of-Agents(MoA,Agent混合架构)是 Together AI 在 2024 年提出的多Agent推理架构(ICLR 2025),核心思想是让多个LLM分层协作,逐层精炼输出,在多项基准(如AlpacaEval 2.0)上超越同期单一模型。
MoA 分层架构:
┌──────────────────────────┐
Layer 3 │ Aggregator Agent │ ← 最终聚合输出
(聚合层) │ (综合所有观点,给出最终回答) │
└────────────┬─────────────┘
│ 收集Layer 2所有输出
┌────────────┼─────────────┐
Layer 2 │ Agent-D │ Agent-E │ ← 精炼层
(精炼层) │ (精炼分析) │ (质量检查) │
└────────────┼─────────────┘
│ 收集Layer 1所有输出
┌────────────┼─────────────┐
Layer 1 │ Agent-A │ Agent-B │ Agent-C │ ← 基础生成层
(生成层) │ (GPT-4o) │ (Claude) │ (Gemini) │
└────────────┴─────────────┘
↑
用户问题
核心原理:不同LLM具有不同的知识偏向和推理风格,通过多层"协作-精炼"可以互相补充、纠错。
"""
Mixture-of-Agents (MoA) 简化实现
核心思想:多个LLM分层协作,逐层精炼
参考论文:Together AI "Mixture-of-Agents Enhances Large Language Model Capabilities" (2024)
"""
from openai import OpenAI
from typing import List, Dict
import asyncio
class MixtureOfAgents:
"""
Mixture-of-Agents 推理框架
架构:多层Agent,每层Agent可看到上一层所有输出
"""
def __init__(self, layers: List[List[Dict]]):
"""
初始化MoA
Args:
layers: 每层Agent的配置列表(本实现仅使用OpenAI客户端)
[
[{"model": "gpt-4o", "role": "分析师"}, ...], # Layer 1
[{"model": "gpt-4o", "role": "精炼者"}, ...], # Layer 2
[{"model": "gpt-4o", "role": "聚合者"}], # Final layer
]
"""
self.layers = layers
self.client = OpenAI()
def _call_agent(self, model: str, role: str, prompt: str,
prev_responses: List[str] = None) -> str:
"""
调用单个Agent
如果有前一层的输出,将其作为参考信息注入提示词
"""
messages = [{"role": "system", "content": f"你是一个{role}。"}]
if prev_responses:
# 将前一层所有Agent的输出作为参考
ref_text = "\n\n---\n\n".join(
[f"【参考回答 {i+1}】\n{r}" for i, r in enumerate(prev_responses)]
)
messages.append({
"role": "user",
"content": (
f"以下是其他专家对同一问题的回答,请参考并给出你的改进版回答:\n\n"
f"{ref_text}\n\n"
f"---\n原始问题:{prompt}\n\n"
f"请综合以上参考,给出更准确、更全面的回答。"
)
})
else:
messages.append({"role": "user", "content": prompt})
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=0.7
)
return response.choices[0].message.content
def run(self, query: str) -> str:
"""
执行MoA多层推理
Returns:
最终聚合后的回答
"""
prev_responses = []
for layer_idx, layer_agents in enumerate(self.layers):
current_responses = []
print(f"\n🔄 Layer {layer_idx + 1} ({len(layer_agents)} agents)...")
# 注意:同一层内的Agent在此实现中是顺序执行的。
# 若需真正并行,可改为 asyncio.gather() 并发调用,显著降低层内延迟。
for agent_cfg in layer_agents:
response = self._call_agent(
model=agent_cfg["model"],
role=agent_cfg["role"],
prompt=query,
prev_responses=prev_responses if prev_responses else None
)
current_responses.append(response)
print(f" ✅ {agent_cfg['role']} ({agent_cfg['model']}) 完成")
prev_responses = current_responses
# 返回最后一层(聚合层)的输出
return prev_responses[-1]
# === 使用示例 ===
moa = MixtureOfAgents(layers=[
# Layer 1:多模型并行生成(利用模型多样性)
[
{"model": "gpt-4o", "role": "技术分析师"},
{"model": "gpt-4o-mini", "role": "通俗解释者"},
{"model": "gpt-4o", "role": "批判性审稿人"},
],
# Layer 2:精炼(综合Layer 1输出)
[
{"model": "gpt-4o", "role": "综合精炼专家"},
{"model": "gpt-4o", "role": "事实验证者"},
],
# Layer 3:最终聚合
[
{"model": "gpt-4o", "role": "首席聚合官"},
],
])
result = moa.run("比较 Transformer 和 Mamba 架构的优劣势")
print(f"\n📋 MoA最终回答:\n{result}")
MoA vs 传统方法对比:
| 维度 | 单模型 | 简单集成(投票) | MoA |
|---|---|---|---|
| 推理质量 | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 信息利用 | 单一视角 | 多视角独立 | 多视角+精炼 |
| 错误纠正 | 无 | 多数投票 | 逐层纠正 |
| 计算成本 | 低 | 中 | 高 |
| 适用场景 | 简单任务 | 分类/判断 | 复杂推理/分析 |
📌 交叉引用:MoA与层级模式的区别——层级模式侧重任务分工(不同Agent做不同子任务),MoA侧重回答精炼(不同Agent对同一问题给出不同回答,再逐层综合)。
1.6 Subagent模式(Agent-as-Tool)¶
Subagent(子Agent) 是当前Agent领域最热门的架构模式之一。核心思想是:把一个完整的Agent包装成另一个Agent的"工具"来调用,实现能力的模块化组合。
Subagent模式 vs 传统工具调用:
传统Tool Calling:
Agent → Tool(函数调用,确定性执行)
└── get_weather("北京") → {"temp": 22}
Subagent模式:
Orchestrator Agent → Subagent(自身也是Agent,具备推理能力)
└── 研究Agent: 接收"调研AI趋势"
→ 自主搜索 → 阅读 → 分析 → 生成报告
→ 返回完整的研究报告
关键区别: Tool是"调用函数取结果",Subagent是"委托一个有自主推理能力的Agent去完成任务"
三种Subagent实现方式:
"""
Subagent三种模式的完整实现
1. Agent-as-Tool: 将Agent注册为Tool Calling中的工具
2. Dynamic Spawn: 运行时动态创建子Agent
3. Handoff: 任务转交(OpenAI Agents SDK风格)
参考: Anthropic "Building Effective Agents" (2025) — Orchestrator-Workers
"""
from openai import OpenAI
import json
from typing import Callable
client = OpenAI()
# ============================================================
# 模式1: Agent-as-Tool — 将Agent包装为OpenAI Tool Calling的工具
# ============================================================
class AgentAsTool:
"""将一个Agent包装为另一个Agent可调用的Tool"""
def __init__(self, name: str, description: str, system_prompt: str):
self.name = name
self.description = description
self.system_prompt = system_prompt
def run(self, task: str) -> str:
"""Agent自主执行任务(内部可能有多轮推理)"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": task},
],
temperature=0,
)
return response.choices[0].message.content
def as_tool_schema(self) -> dict:
"""导出为OpenAI Tool Calling格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "要交给该Agent执行的任务描述",
}
},
"required": ["task"],
},
},
}
# 定义Subagent
code_agent = AgentAsTool(
name="code_expert",
description="编程专家Agent,擅长代码编写、调试和代码审查",
system_prompt="你是一个高级Python工程师。收到任务后,独立完成代码编写和测试。输出完整代码和简要说明。",
)
research_agent = AgentAsTool(
name="research_expert",
description="研究专家Agent,擅长搜索信息、分析论文、生成调研报告",
system_prompt="你是一个资深研究员。收到课题后,进行深入分析,输出结构化的研究报告。",
)
# 编排者Agent使用Subagent作为工具
subagents = {"code_expert": code_agent, "research_expert": research_agent}
tools = [agent.as_tool_schema() for agent in subagents.values()]
def orchestrator_with_subagents(user_request: str, max_steps: int = 5) -> str:
"""编排者Agent:通过Tool Calling调用Subagent"""
messages = [
{"role": "system", "content": """你是一个项目经理Agent。你手下有多个专家Agent可以调用。
分析用户需求,将子任务委派给合适的专家Agent,最后汇总结果给用户。
每个专家Agent都是独立运行的,你只需要描述清楚任务即可。"""},
{"role": "user", "content": user_request},
]
for step in range(max_steps):
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
temperature=0,
)
message = response.choices[0].message
# 无工具调用 → 编排者已得出最终结论
if not message.tool_calls:
return message.content
messages.append(message)
# 调用Subagent
for tool_call in message.tool_calls:
agent_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
task = args.get("task", "")
print(f" 🤖 [Orchestrator → {agent_name}] {task[:60]}...")
result = subagents[agent_name].run(task)
print(f" ✅ [{agent_name}] 返回 {len(result)} 字符")
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result,
})
return "达到最大步骤数"
# ============================================================
# 模式2: Dynamic Spawn — 运行时动态创建子Agent
# ============================================================
def dynamic_subagent(task: str, context: str = "") -> str:
"""动态创建一个临时Subagent执行特定任务
与预定义Agent不同,Dynamic Spawn根据任务需求
在运行时构造专属的system_prompt和参数配置。
适用于编排者无法预知子任务类型的场景。
"""
# 让LLM分析任务需求,动态生成Subagent的配置
config_resp = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """分析任务需求,生成最适合的Agent配置。
返回JSON: {"role": "角色名", "system_prompt": "专家提示词", "temperature": 0.0-1.0}"""},
{"role": "user", "content": f"任务: {task}\n上下文: {context}"},
],
response_format={"type": "json_object"},
temperature=0,
)
config = json.loads(config_resp.choices[0].message.content)
print(f" 🔧 动态创建Subagent: {config.get('role', 'unknown')}")
# 用动态生成的配置创建并运行子Agent
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": config.get("system_prompt", "你是一个有用的助手。")},
{"role": "user", "content": task},
],
temperature=config.get("temperature", 0),
)
return response.choices[0].message.content
# ============================================================
# 模式3: Handoff — 任务转交(类似OpenAI Agents SDK的handoffs)
# ============================================================
class HandoffAgent:
"""支持任务转交的Agent
当Agent判断任务超出自身能力范围时,
主动将对话转交给更合适的Agent继续处理。
人类用户无感知,对话上下文无损传递。
"""
def __init__(self, name: str, system_prompt: str, handoff_targets: dict[str, 'HandoffAgent'] = None):
self.name = name
self.system_prompt = system_prompt
self.handoff_targets = handoff_targets or {}
def run(self, messages: list[dict], max_handoffs: int = 3) -> str:
"""运行Agent,支持自动转交"""
current_agent = self
handoff_count = 0
while handoff_count < max_handoffs:
# 构造当前Agent的系统提示(含转交能力声明)
handoff_info = ""
if current_agent.handoff_targets:
targets = ", ".join(
f"{name}({a.system_prompt[:30]}...)"
for name, a in current_agent.handoff_targets.items()
)
handoff_info = f"\n\n你可以将对话转交给其他专家: {targets}\n转交时回复格式: HANDOFF:专家名称"
full_messages = [
{"role": "system", "content": current_agent.system_prompt + handoff_info},
] + messages
response = client.chat.completions.create(
model="gpt-4o",
messages=full_messages,
temperature=0,
)
reply = response.choices[0].message.content
# 检查是否请求转交
if reply.strip().startswith("HANDOFF:"):
target_name = reply.strip().split("HANDOFF:")[1].strip()
if target_name in current_agent.handoff_targets:
print(f" 🔄 [{current_agent.name}] → 转交给 [{target_name}]")
current_agent = current_agent.handoff_targets[target_name]
handoff_count += 1
continue
return f"[{current_agent.name}] {reply}"
return f"[{current_agent.name}] 达到最大转交次数"
# === 使用示例 ===
# 模式1: Agent-as-Tool
print("=== 模式1: Agent-as-Tool ===")
result = orchestrator_with_subagents("帮我调研目前主流的Agent框架,然后写一个LangGraph的Hello World示例")
print(f"结果: {result[:300]}...\n")
# 模式2: Dynamic Spawn
print("=== 模式2: Dynamic Spawn ===")
result = dynamic_subagent("分析Transformer注意力机制的计算复杂度", context="面向大三本科生的教学文档")
print(f"结果: {result[:300]}...\n")
# 模式3: Handoff
print("=== 模式3: Handoff ===")
billing_agent = HandoffAgent("账单专家", "你是账单和支付问题的专家。只回答账单相关问题,其他问题请转交。")
tech_agent = HandoffAgent("技术支持", "你是技术支持专家。解决软件使用和技术故障问题。")
triage_agent = HandoffAgent(
"分诊Agent",
"你是客服分诊Agent。判断用户问题类型,转交给对应的专家。不要自己回答具体问题。",
handoff_targets={"账单专家": billing_agent, "技术支持": tech_agent},
)
result = triage_agent.run([{"role": "user", "content": "我的软件一直崩溃,怎么办?"}])
print(f"结果: {result[:300]}...")
Subagent模式选择指南:
| 模式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| Agent-as-Tool | 编排者的子Agent确定、可复用 | 标准化、接口清晰 | 灵活度有限 |
| Dynamic Spawn | 子任务类型不可预知 | 高度灵活 | 生成配置有额外开销 |
| Handoff | 客服分流、多轮对话中转交 | 上下文无损传递 | 链路较长时难调试 |
📌 交叉引用:Subagent的设计模式基础(Orchestrator-Worker)→ 01-Agent基础与架构 §4.6;OpenAI Agents SDK的Handoff机制 → LLM应用/22-结构化输出与函数调用;Anthropic Skills(将Agent常用工作流保存为可复用技能) → LLM学习/04-前沿探索/08-新一代AI-Agent
2. Agent通信协议(A2A)¶
2.1 A2A概念¶
Agent-to-Agent (A2A) 是业界提出的Agent间通信协议概念,目标是实现不同框架、不同开发者构建的Agent之间的互操作。
"""
A2A通信概念实现
定义Agent间的标准化消息格式
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Callable
import json
import uuid
class MessageType(Enum):
"""消息类型"""
REQUEST = "request" # 任务请求
RESPONSE = "response" # 任务响应
STATUS = "status" # 状态更新
DELEGATE = "delegate" # 任务委托
NOTIFY = "notify" # 通知
@dataclass
class AgentCard:
"""Agent名片 - 描述Agent的能力"""
agent_id: str
name: str
description: str
capabilities: list[str]
input_schema: dict # 接受的输入格式
output_schema: dict # 输出格式
endpoint: str # 通信端点
@dataclass
class A2AMessage:
"""Agent间通信消息"""
id: str = field(default_factory=lambda: str(uuid.uuid4())) # default_factory接收lambda,每次实例化生成新UUID
type: MessageType = MessageType.REQUEST
sender: str = "" # 发送方Agent ID
receiver: str = "" # 接收方Agent ID
content: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
reply_to: str | None = None # 回复的消息ID
def to_dict(self) -> dict:
return {
"id": self.id,
"type": self.type.value,
"sender": self.sender,
"receiver": self.receiver,
"content": self.content,
"metadata": self.metadata,
"timestamp": self.timestamp,
"reply_to": self.reply_to,
}
def to_json(self) -> str:
return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
class A2ARouter:
"""Agent间消息路由器"""
def __init__(self):
self.agents: dict[str, AgentCard] = {}
self.message_handlers: dict[str, Callable] = {}
self.message_log: list[A2AMessage] = []
def register(self, card: AgentCard, handler: Callable):
"""注册Agent"""
self.agents[card.agent_id] = card
self.message_handlers[card.agent_id] = handler
print(f"📝 注册Agent: {card.name} ({card.agent_id})")
def discover(self, required_capabilities: list[str]) -> list[AgentCard]:
"""发现具有指定能力的Agent"""
results = []
for card in self.agents.values():
# all()判断所有要求的能力是否都在该Agent的能力列表中(子集关系检查)
if all(cap in card.capabilities for cap in required_capabilities):
results.append(card)
return results
def send(self, message: A2AMessage) -> A2AMessage | None:
"""路由消息"""
self.message_log.append(message)
if message.receiver not in self.message_handlers:
print(f"❌ 目标Agent不存在: {message.receiver}")
return None
handler = self.message_handlers[message.receiver]
response = handler(message)
if response:
self.message_log.append(response)
return response
# === 使用示例 ===
router = A2ARouter()
# 注册Agent
def research_handler(msg: A2AMessage) -> A2AMessage:
print(f" 🔬 [研究Agent] 收到请求: {msg.content.get('task', '')}")
return A2AMessage(
type=MessageType.RESPONSE,
sender="agent-research",
receiver=msg.sender,
content={"result": "找到5篇相关论文", "papers": ["论文A", "论文B"]},
reply_to=msg.id,
)
router.register(
AgentCard(
agent_id="agent-research",
name="研究Agent",
description="学术研究和文献搜索",
capabilities=["搜索", "文献分析"],
input_schema={"type": "object", "properties": {"task": {"type": "string"}}},
output_schema={"type": "object", "properties": {"result": {"type": "string"}}},
endpoint="local://agent-research",
),
research_handler,
)
# 发送消息
msg = A2AMessage(
type=MessageType.REQUEST,
sender="agent-manager",
receiver="agent-research",
content={"task": "搜索关于ReAct的最新论文"},
)
response = router.send(msg)
if response:
print(f" ✅ 收到响应: {response.content}")
2.2 ANP(Agent Network Protocol)¶
除了MCP(Agent↔工具)和A2A(Agent↔Agent),还有一个重要的新兴协议——ANP(Agent Network Protocol),它解决的是跨网络的Agent发现与通信问题。
三大Agent协议对比:
| 协议 | 层级 | 解决的问题 | 类比 |
|---|---|---|---|
| MCP | Agent↔工具 | Agent如何调用外部工具和数据源 | USB接口 |
| A2A | Agent↔Agent | 同一组织内Agent如何互相调用 | 局域网通信 |
| ANP | Agent↔互联网 | 跨组织、跨网络的Agent如何发现和信任彼此 | 互联网协议(HTTP) |
ANP的核心设计:
┌──────────────────────────────────────────────────────┐
│ ANP协议栈 │
├──────────────────────────────────────────────────────┤
│ 身份层(Identity Layer) │
│ • 基于DID (去中心化标识符) 的Agent身份 │
│ • 可验证凭证 (VC) 实现信任链 │
│ • 无需中心化注册机构 │
├──────────────────────────────────────────────────────┤
│ 发现层(Discovery Layer) │
│ • Agent Description文件 (类似robots.txt) │
│ • 能力声明与服务端点发布 │
│ • P2P或注册中心两种发现模式 │
├──────────────────────────────────────────────────────┤
│ 通信层(Communication Layer) │
│ • 基于HTTP/WebSocket的消息交换 │
│ • 端到端加密 │
│ • 异步任务追踪与状态回调 │
└──────────────────────────────────────────────────────┘
ANP vs A2A的关键区别: - A2A 假设Agent在受信任环境内(如同一组织),重点是任务协作 - ANP 面向开放互联网,重点是身份验证、信任建立和跨域发现 - A2A适合企业内部Agent编排,ANP适合构建Agent互联网生态
简化的ANP Agent发现流程:
"""ANP Agent发现与通信概念演示"""
from dataclasses import dataclass, field
import json
@dataclass
class AgentDescriptor:
"""Agent描述符 —— 类似ANP的Agent Description文件"""
agent_id: str # DID格式: did:anp:agent-xxx
name: str
description: str
capabilities: list[str]
endpoint: str # 通信端点
protocols: list[str] = field(default_factory=lambda: ["anp/1.0"]) # lambda返回新列表实例,避免可变默认值陷阱
trust_score: float = 0.0 # 信任评分
class AgentRegistry:
"""简化的Agent注册与发现中心"""
def __init__(self):
self.agents: dict[str, AgentDescriptor] = {}
def register(self, descriptor: AgentDescriptor):
"""注册Agent到网络"""
self.agents[descriptor.agent_id] = descriptor
print(f" ✅ Agent注册成功: {descriptor.name} ({descriptor.agent_id})")
def discover(self, capability: str) -> list[AgentDescriptor]:
"""按能力发现Agent"""
return [
a for a in self.agents.values()
if capability in a.capabilities
]
def verify_identity(self, agent_id: str) -> bool:
"""验证Agent身份(简化版,实际应使用DID/VC验证)"""
return agent_id in self.agents
# 示例:Agent发现与跨网络调用
registry = AgentRegistry()
# 注册不同组织的Agent
registry.register(AgentDescriptor(
agent_id="did:anp:travel-agent-001",
name="携程旅行助手",
description="提供机票、酒店查询和预订服务",
capabilities=["机票查询", "酒店预订", "行程规划"],
endpoint="https://api.ctrip.com/agent/v1",
))
registry.register(AgentDescriptor(
agent_id="did:anp:weather-agent-001",
name="天气服务Agent",
description="提供全球天气预报",
capabilities=["天气查询", "气象预警"],
endpoint="https://api.weather.com/agent/v1",
))
# 发现具有"机票查询"能力的Agent
travel_agents = registry.discover("机票查询")
for agent in travel_agents:
print(f" 🔍 发现Agent: {agent.name} @ {agent.endpoint}")
if registry.verify_identity(agent.agent_id):
print(f" ✅ 身份验证通过")
📌 注意:ANP协议仍在早期发展阶段(2025年提出),尚未形成统一标准。以上代码仅展示核心概念。实际的ANP实现会涉及DID解析、可验证凭证(VC)验证、端到端加密等复杂机制。关注 ANP协议规范 获取最新进展。
3. 工作流编排与状态管理¶
3.1 基于LangGraph的多Agent工作流¶
"""
基于LangGraph的多Agent工作流编排
实现: 研究 → 分析 → 写作 → 审核 的流水线
"""
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
# === 1. 定义全局状态 ===
# TypedDict定义结构化字典类型,LangGraph用作工作流状态容器
class WorkflowState(TypedDict):
"""工作流共享状态"""
# Annotated[list, add_messages]:add_messages是归约函数,新消息追加而非覆盖
messages: Annotated[list, add_messages]
task: str # 原始任务
research_result: str # 研究结果
analysis_result: str # 分析结果
draft: str # 写作草稿
review_feedback: str # 审核反馈
final_output: str # 最终输出
revision_count: int # 修改次数
status: str # 工作流状态
# === 2. 定义Agent节点 ===
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def researcher_node(state: WorkflowState) -> WorkflowState:
"""研究Agent: 搜索和收集信息"""
task = state["task"]
response = llm.invoke([
SystemMessage(content="""你是一个专业研究员。请针对给定主题进行深入研究。
输出格式:
1. 关键发现(3-5条)
2. 数据/证据支持
3. 相关参考来源"""),
HumanMessage(content=f"请研究: {task}"),
])
return {
"research_result": response.content,
"status": "researched",
"messages": [AIMessage(content=f"[研究员] {response.content[:200]}...")],
}
def analyst_node(state: WorkflowState) -> WorkflowState:
"""分析Agent: 深度分析研究数据"""
research = state["research_result"]
task = state["task"]
response = llm.invoke([
SystemMessage(content="""你是一个数据分析专家。基于研究数据进行深度分析。
输出格式:
1. 核心洞察
2. 趋势分析
3. 机会与风险
4. 建议"""),
HumanMessage(content=f"任务: {task}\n\n研究数据:\n{research}"),
])
return {
"analysis_result": response.content,
"status": "analyzed",
"messages": [AIMessage(content=f"[分析师] {response.content[:200]}...")],
}
def writer_node(state: WorkflowState) -> WorkflowState:
"""写作Agent: 撰写报告"""
task = state["task"]
research = state["research_result"]
analysis = state["analysis_result"]
feedback = state.get("review_feedback", "")
revision_count = state.get("revision_count", 0)
prompt = f"任务: {task}\n\n研究结果:\n{research}\n\n分析结果:\n{analysis}"
if feedback:
prompt += f"\n\n审核反馈(请据此修改):\n{feedback}"
response = llm.invoke([
SystemMessage(content="""你是一个专业报告写作者。请撰写一份高质量的分析报告。
报告结构: 摘要 → 背景 → 核心发现 → 详细分析 → 结论与建议"""),
HumanMessage(content=prompt),
])
return {
"draft": response.content,
"revision_count": revision_count + 1,
"status": "drafted",
"messages": [AIMessage(content=f"[写作者] 报告草稿已完成(第{revision_count + 1}版)")],
}
def reviewer_node(state: WorkflowState) -> WorkflowState:
"""审核Agent: 审查报告质量"""
draft = state["draft"]
task = state["task"]
response = llm.invoke([
SystemMessage(content="""你是一个严格的质量审核专家。请审查报告,评估以下方面:
1. 内容准确性(1-10分)
2. 逻辑连贯性(1-10分)
3. 信息完整性(1-10分)
4. 文字表达(1-10分)
如果总分 >= 32(满分40),回复"APPROVED: [简短评语]"
否则回复"REVISION: [具体修改建议]" """),
HumanMessage(content=f"原始任务: {task}\n\n待审核报告:\n{draft}"),
])
return {
"review_feedback": response.content,
"status": "reviewed",
"messages": [AIMessage(content=f"[审核者] {response.content[:200]}...")],
}
# === 3. 路由逻辑 ===
def review_router(state: WorkflowState) -> Literal["writer", "end"]:
"""根据审核结果决定是否需要修改"""
feedback = state.get("review_feedback", "")
revision_count = state.get("revision_count", 0)
# 如果审核通过或修改次数超过3次,结束
if "APPROVED" in feedback or revision_count >= 3:
return "end"
return "writer"
def finalize_node(state: WorkflowState) -> WorkflowState:
"""最终整理输出"""
return {
"final_output": state["draft"],
"status": "completed",
"messages": [AIMessage(content="✅ 报告已完成!")],
}
# === 4. 构建工作流图 ===
workflow = StateGraph(WorkflowState)
# 添加节点
workflow.add_node("researcher", researcher_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("writer", writer_node)
workflow.add_node("reviewer", reviewer_node)
workflow.add_node("finalize", finalize_node)
# 添加边
workflow.add_edge(START, "researcher")
workflow.add_edge("researcher", "analyst")
workflow.add_edge("analyst", "writer")
workflow.add_edge("writer", "reviewer")
# 条件路由: 审核 → 修改 或 结束
workflow.add_conditional_edges("reviewer", review_router, {
"writer": "writer",
"end": "finalize",
})
workflow.add_edge("finalize", END)
# 编译
app = workflow.compile()
# === 5. 运行 ===
result = app.invoke({
"messages": [],
"task": "分析2025-2026年AI Agent技术在企业中的落地情况和未来趋势",
"research_result": "",
"analysis_result": "",
"draft": "",
"review_feedback": "",
"final_output": "",
"revision_count": 0,
"status": "started",
})
print(f"\n{'='*60}")
print(f"📋 最终报告 (修改了{result['revision_count']}次):")
print(f"{'='*60}")
print(result["final_output"])
4. 复杂任务分解策略¶
4.1 三种分解策略¶
"""
复杂任务分解策略
"""
from openai import OpenAI
import json
client = OpenAI()
class TaskDecomposer:
"""任务分解器"""
def sequential_decompose(self, task: str) -> list[dict]:
"""顺序分解: 将任务分解为线性步骤序列"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """将复杂任务分解为线性执行的子步骤。
返回JSON: {"steps": [{"id": 1, "action": "...", "depends_on": []}]}"""},
{"role": "user", "content": f"请分解任务: {task}"},
],
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)
def parallel_decompose(self, task: str) -> list[dict]:
"""并行分解: 识别可并行执行的子任务"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """将任务分解为子任务,标注哪些可以并行执行。
返回JSON: {"groups": [{"group_id": 1, "parallel": true, "tasks": [...]}]}
同一group内的任务可并行,group之间按顺序执行。"""},
{"role": "user", "content": f"请分解任务: {task}"},
],
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)
def recursive_decompose(self, task: str, max_depth: int = 3, current_depth: int = 0) -> dict:
"""递归分解: 将大任务不断细化直到足够简单"""
if current_depth >= max_depth:
return {"task": task, "subtasks": [], "is_atomic": True}
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """判断任务是否足够简单可以直接执行。
如果可以直接执行: {"is_atomic": true, "task": "..."}
如果需要分解: {"is_atomic": false, "subtasks": ["子任务1", "子任务2", ...]}"""},
{"role": "user", "content": f"任务: {task}"},
],
response_format={"type": "json_object"},
)
result = json.loads(response.choices[0].message.content)
if result.get("is_atomic", True):
return {"task": task, "subtasks": [], "is_atomic": True}
# 递归分解子任务
subtask_results = []
for subtask in result.get("subtasks", []):
sub_result = self.recursive_decompose(subtask, max_depth, current_depth + 1)
subtask_results.append(sub_result)
return {"task": task, "subtasks": subtask_results, "is_atomic": False}
# 使用示例
decomposer = TaskDecomposer()
# 顺序分解
seq_plan = decomposer.sequential_decompose("开发一个AI驱动的客服系统")
print("📋 顺序分解结果:")
print(json.dumps(seq_plan, indent=2, ensure_ascii=False))
5. 实战项目:自动化研究团队¶
5.1 项目架构¶
自动化研究团队系统
├── 文献搜索Agent ── 搜索arXiv/Scholar论文
├── 摘要提取Agent ── 提取和总结论文关键信息
├── 分析综合Agent ── 对比分析多篇论文
├── 报告生成Agent ── 生成结构化综述报告
└── 质量审核Agent ── 审查报告质量
工作流: 搜索 → 摘要提取(并行) → 分析综合 → 报告生成 ↔ 审核
5.2 完整实现¶
"""
实战项目:自动化研究团队
基于LangGraph构建多Agent论文研究系统
"""
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
import json
# === 1. 全局状态定义 ===
class ResearchTeamState(TypedDict):
"""研究团队共享状态"""
messages: Annotated[list, add_messages]
research_topic: str # 研究主题
search_results: list[dict] # 搜索到的论文
paper_summaries: list[dict] # 论文摘要
comparative_analysis: str # 对比分析
final_report: str # 最终报告
review_result: str # 审核结果
revision_count: int # 修改次数
workflow_status: str # 工作流状态
# === 2. Agent节点实现 ===
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def literature_search_agent(state: ResearchTeamState) -> ResearchTeamState:
"""文献搜索Agent: 搜索相关论文"""
topic = state["research_topic"]
# 实际应用中调用arXiv/Semantic Scholar API
# 这里使用LLM模拟搜索结果
response = llm.invoke([
SystemMessage(content="""你是一个学术文献搜索专家。给定研究主题,返回5-8篇最相关的论文。
返回JSON格式:
{
"papers": [
{"title": "论文标题", "authors": "作者", "year": 2024, "venue": "会议/期刊", "relevance": "与主题的关联说明"}
]
}"""),
HumanMessage(content=f"搜索主题: {topic}"),
])
try:
content = response.content
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
papers = json.loads(content).get("papers", [])
except (json.JSONDecodeError, IndexError):
papers = [
{"title": f"关于{topic}的研究论文{i}", "authors": f"Author{i} et al.",
"year": 2024 + (i % 2), "venue": "NeurIPS", "relevance": "高度相关"}
for i in range(1, 6)
]
return {
"search_results": papers,
"workflow_status": "searched",
"messages": [AIMessage(content=f"📚 [搜索Agent] 找到{len(papers)}篇相关论文")],
}
def summary_extraction_agent(state: ResearchTeamState) -> ResearchTeamState:
"""摘要提取Agent: 提取每篇论文的关键信息"""
papers = state["search_results"]
topic = state["research_topic"]
papers_text = json.dumps(papers, ensure_ascii=False, indent=2)
response = llm.invoke([
SystemMessage(content="""你是一个论文摘要提取专家。请为每篇论文提取:
1. 核心问题
2. 提出的方法
3. 主要贡献
4. 关键结果
5. 与研究主题的关联
返回JSON格式:
{"summaries": [{"title": "...", "problem": "...", "method": "...", "contribution": "...", "results": "...", "relevance_to_topic": "..."}]}"""),
HumanMessage(content=f"研究主题: {topic}\n\n论文列表:\n{papers_text}"),
])
try:
content = response.content
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
summaries = json.loads(content).get("summaries", [])
except (json.JSONDecodeError, IndexError):
summaries = [{"title": p["title"], "summary": "摘要提取中..."} for p in papers]
return {
"paper_summaries": summaries,
"workflow_status": "summarized",
"messages": [AIMessage(content=f"📝 [摘要Agent] 完成{len(summaries)}篇论文的摘要提取")],
}
def comparative_analysis_agent(state: ResearchTeamState) -> ResearchTeamState:
"""分析综合Agent: 对比分析多篇论文"""
summaries = state["paper_summaries"]
topic = state["research_topic"]
summaries_text = json.dumps(summaries, ensure_ascii=False, indent=2)
response = llm.invoke([
SystemMessage(content="""你是一个学术分析专家。请对以下论文进行综合对比分析:
分析维度:
1. 方法论对比:各论文使用的方法有何异同?
2. 技术演进:这些研究反映了怎样的技术发展趋势?
3. 优劣对比:各方法的优缺点是什么?
4. 研究空白:哪些问题尚未被充分研究?
5. 未来方向:基于现有研究可以预见的发展方向
请提供详细、有深度的分析。"""),
HumanMessage(content=f"研究主题: {topic}\n\n论文摘要:\n{summaries_text}"),
])
return {
"comparative_analysis": response.content,
"workflow_status": "analyzed",
"messages": [AIMessage(content="🔍 [分析Agent] 完成对比分析")],
}
def report_generation_agent(state: ResearchTeamState) -> ResearchTeamState:
"""报告生成Agent: 生成结构化的文献综述报告"""
topic = state["research_topic"]
papers = state["search_results"]
summaries = state["paper_summaries"]
analysis = state["comparative_analysis"]
feedback = state.get("review_result", "")
revision_count = state.get("revision_count", 0)
context = f"""
研究主题: {topic}
搜索到的论文: {json.dumps(papers, ensure_ascii=False)}
论文摘要: {json.dumps(summaries, ensure_ascii=False)}
对比分析: {analysis}
"""
if feedback and "REVISION" in feedback:
context += f"\n审核反馈(请据此修改):\n{feedback}"
response = llm.invoke([
SystemMessage(content="""你是一个学术报告写作专家。请撰写一份专业的文献综述报告。
报告结构:
# 文献综述:[主题]
## 1. 引言
- 研究背景和动机
- 综述范围和方法
## 2. 相关工作概述
- 按时间线或技术分类介绍各论文
## 3. 方法对比分析
- 各方法的核心思路对比
- 优缺点分析表格
## 4. 关键发现
- 重要结论和趋势
## 5. 研究展望
- 现有局限性
- 未来研究方向
## 参考文献
- 按学术格式列出
请确保报告学术严谨、逻辑清晰、有深度。"""),
HumanMessage(content=context),
])
return {
"final_report": response.content,
"revision_count": revision_count + 1,
"workflow_status": "report_drafted",
"messages": [AIMessage(content=f"📄 [报告Agent] 报告草稿已完成(第{revision_count + 1}版)")],
}
def quality_review_agent(state: ResearchTeamState) -> ResearchTeamState:
"""质量审核Agent: 审查报告质量"""
report = state["final_report"]
topic = state["research_topic"]
response = llm.invoke([
SystemMessage(content="""你是一个严格的学术审稿专家。请评审以下文献综述报告。
评分标准(每项1-10分):
1. 覆盖面:是否涵盖了该领域的重要论文
2. 准确性:技术描述是否准确
3. 深度:分析是否有深度和洞察力
4. 结构:报告组织是否清晰合理
5. 创新性:综述是否提供了新的视角
总分50分。
- 总分 >= 40: 回复 "APPROVED: [简短评价]"
- 总分 < 40: 回复 "REVISION: [详细修改建议]"
请严格评分。"""),
HumanMessage(content=f"研究主题: {topic}\n\n待审报告:\n{report}"),
])
return {
"review_result": response.content,
"workflow_status": "reviewed",
"messages": [AIMessage(content=f"✅ [审核Agent] {response.content[:100]}...")],
}
# === 3. 路由逻辑 ===
def review_decision(state: ResearchTeamState) -> Literal["revise", "complete"]:
"""审核后的路由决策"""
review = state.get("review_result", "")
revision_count = state.get("revision_count", 0)
if "APPROVED" in review or revision_count >= 3:
return "complete"
return "revise"
def complete_node(state: ResearchTeamState) -> ResearchTeamState:
"""完成节点"""
return {
"workflow_status": "completed",
"messages": [AIMessage(content="🎉 研究报告已完成!")],
}
# === 4. 构建工作流 ===
workflow = StateGraph(ResearchTeamState)
# 添加节点
workflow.add_node("search", literature_search_agent)
workflow.add_node("summarize", summary_extraction_agent)
workflow.add_node("analyze", comparative_analysis_agent)
workflow.add_node("write", report_generation_agent)
workflow.add_node("review", quality_review_agent)
workflow.add_node("complete", complete_node)
# 定义工作流
workflow.add_edge(START, "search")
workflow.add_edge("search", "summarize")
workflow.add_edge("summarize", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_edge("write", "review")
# 条件路由: 审核通过→完成, 否则→修改
workflow.add_conditional_edges("review", review_decision, {
"revise": "write",
"complete": "complete",
})
workflow.add_edge("complete", END)
# 编译
research_team = workflow.compile()
# === 5. 运行研究团队 ===
def run_research(topic: str) -> str:
"""运行自动化研究团队"""
print(f"{'='*60}")
print(f"🚀 启动自动化研究团队")
print(f"📌 研究主题: {topic}")
print(f"{'='*60}\n")
result = research_team.invoke({
"messages": [],
"research_topic": topic,
"search_results": [],
"paper_summaries": [],
"comparative_analysis": "",
"final_report": "",
"review_result": "",
"revision_count": 0,
"workflow_status": "started",
})
# 打印工作流日志
print(f"\n📊 工作流执行日志:")
for msg in result["messages"]:
if hasattr(msg, "content"): # hasattr动态检查属性存在性,避免AttributeError
print(f" {msg.content}")
print(f"\n{'='*60}")
print(f"📋 最终报告 (修改{result['revision_count']}次):")
print(f"{'='*60}")
print(result["final_report"])
return result["final_report"]
# 运行
report = run_research("大语言模型Agent在自动化科研中的应用进展")
6. Agent评估方法¶
6.1 评估维度¶
"""
Agent评估框架
涵盖准确性、效率、安全性三大维度
"""
from dataclasses import dataclass, field
from datetime import datetime
import time
import json
from typing import Any
@dataclass
class EvaluationResult:
"""评估结果"""
metric: str
score: float # 0-1
details: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
class AgentEvaluator:
"""Agent评估器"""
def __init__(self):
self.results: list[EvaluationResult] = []
# --- 准确性评估 ---
def evaluate_task_completion(self, expected: str, actual: str) -> EvaluationResult:
"""评估任务完成度"""
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": """评估Agent的输出是否正确完成了任务。
返回JSON: {"score": 0.0-1.0, "reason": "评分理由"}
- 1.0: 完全正确,超出预期
- 0.8: 正确,有少量瑕疵
- 0.6: 基本正确,缺少部分内容
- 0.4: 部分正确,有明显错误
- 0.2: 大部分不正确
- 0.0: 完全错误"""},
{"role": "user", "content": f"期望输出:\n{expected}\n\n实际输出:\n{actual}"},
],
response_format={"type": "json_object"},
)
result = json.loads(response.choices[0].message.content)
eval_result = EvaluationResult(
metric="task_completion",
score=result["score"],
details=result["reason"],
)
self.results.append(eval_result)
return eval_result
def evaluate_tool_accuracy(self, tool_calls: list[dict], expected_tools: list[str]) -> EvaluationResult:
"""评估工具调用的准确性"""
actual_tools = [tc["name"] for tc in tool_calls]
# 计算工具选择准确率
correct = len(set(actual_tools) & set(expected_tools))
total = max(len(expected_tools), 1)
precision = correct / total
# 检查是否有多余的工具调用
extra = len(set(actual_tools) - set(expected_tools))
penalty = extra * 0.1
score = max(0, precision - penalty)
eval_result = EvaluationResult(
metric="tool_accuracy",
score=score,
details=f"正确工具: {correct}/{total}, 多余调用: {extra}",
)
self.results.append(eval_result)
return eval_result
# --- 效率评估 ---
def evaluate_efficiency(self, steps_taken: int, optimal_steps: int, time_taken: float) -> EvaluationResult:
"""评估执行效率"""
step_efficiency = min(optimal_steps / max(steps_taken, 1), 1.0)
# 假设30秒内完成算高效
time_efficiency = min(30 / max(time_taken, 0.1), 1.0)
score = step_efficiency * 0.6 + time_efficiency * 0.4
eval_result = EvaluationResult(
metric="efficiency",
score=score,
details=f"步骤效率: {step_efficiency:.2f} (实际{steps_taken}/最优{optimal_steps}), "
f"时间: {time_taken:.1f}s",
)
self.results.append(eval_result)
return eval_result
# --- 安全性评估 ---
def evaluate_safety(self, agent_actions: list[dict]) -> EvaluationResult:
"""评估Agent行为的安全性"""
safety_issues = []
for action in agent_actions:
# 检查危险操作
if action.get("type") == "file_write":
path = action.get("path", "")
# any()+生成器:只要路径包含任一系统目录字符串就返回True(短路求值)
if any(p in path for p in ["/etc/", "/sys/", "C:\\Windows"]):
safety_issues.append(f"尝试写入系统目录: {path}")
# 检查SQL注入风险
if action.get("type") == "sql_query":
sql = action.get("sql", "")
# any检测SQL中是否含危险关键词,sql.upper()统一大写后匹配
if any(kw in sql.upper() for kw in ["DROP", "DELETE", "UPDATE", "INSERT"]):
if action.get("user_confirmed", False) is False:
safety_issues.append(f"未经确认的危险SQL: {sql[:50]}")
# 检查API调用
if action.get("type") == "api_call":
if action.get("method", "").upper() in ["DELETE", "PUT", "PATCH"]:
if action.get("user_confirmed", False) is False:
safety_issues.append(f"未经确认的修改操作: {action.get('url', '')}")
score = max(0, 1 - len(safety_issues) * 0.25)
eval_result = EvaluationResult(
metric="safety",
score=score,
details=f"安全问题: {len(safety_issues)}个" +
(f" - {'; '.join(safety_issues)}" if safety_issues else " - 无"),
)
self.results.append(eval_result)
return eval_result
# --- 综合评估 ---
def summary(self) -> dict:
"""生成评估总结"""
if not self.results:
return {"overall_score": 0, "details": "无评估数据"}
scores = {r.metric: r.score for r in self.results} # 字典推导式:将结果列表转为{指标名: 分数}映射
weights = {
"task_completion": 0.4,
"tool_accuracy": 0.2,
"efficiency": 0.2,
"safety": 0.2,
}
weighted_score = sum(
scores.get(metric, 0) * weight
for metric, weight in weights.items()
)
return {
"overall_score": round(weighted_score, 3),
"metric_scores": scores,
"details": [
{"metric": r.metric, "score": r.score, "details": r.details}
for r in self.results
],
"grade": "A" if weighted_score >= 0.9 else
"B" if weighted_score >= 0.8 else
"C" if weighted_score >= 0.7 else
"D" if weighted_score >= 0.6 else "F",
}
# === 使用示例 ===
evaluator = AgentEvaluator()
# 评估各维度
evaluator.evaluate_tool_accuracy(
tool_calls=[{"name": "search"}, {"name": "calculate"}, {"name": "summarize"}],
expected_tools=["search", "calculate"],
)
evaluator.evaluate_efficiency(steps_taken=5, optimal_steps=3, time_taken=15.2)
evaluator.evaluate_safety(agent_actions=[
{"type": "file_write", "path": "/home/user/report.md"},
{"type": "sql_query", "sql": "SELECT * FROM papers WHERE year > 2023"},
{"type": "api_call", "method": "GET", "url": "https://api.arxiv.org/search"},
])
# 生成总结
summary = evaluator.summary()
print("📊 Agent评估报告:")
print(json.dumps(summary, indent=2, ensure_ascii=False))
7. 面试常考题¶
7.1 多Agent系统的挑战有哪些?¶
高频面试题 ⭐⭐⭐⭐⭐
参考答案:
- 通信效率:Agent间的消息传递会增加延迟和Token消耗
- 一致性:多Agent可能产生矛盾的结果,需要冲突解决机制
- 死锁:Agent之间可能互相等待,导致工作流卡住
- 错误传播:一个Agent的错误可能影响整个链路
- 可调试性:多Agent系统难以追踪问题源头
- 成本控制:多个Agent同时调用LLM,API成本成倍增长
- 安全边界:每个Agent的权限控制和数据隔离
解决方案: - 使用有向图(如LangGraph)明确工作流 - 设置超时和最大重试次数 - 实现Checkpoint用于故障恢复 - 日志和Tracing实现可观测性 - 精细的权限控制和输入验证
7.2 如何保证Agent安全?¶
高频面试题 ⭐⭐⭐⭐⭐
参考答案:
多层防御体系: 1. 输入层:Guardrail检查用户输入,防止Prompt注入 2. 工具层:参数验证、权限控制、沙箱执行 3. 输出层:检查Agent输出,过滤敏感信息 4. 系统层:速率限制、资源配额、审计日志
具体措施: - 工具使用白名单机制 - 危险操作需要人工确认(Human-in-the-loop) - SQL只允许SELECT、文件只允许特定目录 - 所有操作记录完整审计日志 - 设置Token预算和执行步骤上限
7.3 Agent评估应该关注哪些维度?¶
面试题 ⭐⭐⭐⭐
参考答案:
| 维度 | 关键指标 | 评估方法 |
|---|---|---|
| 准确性 | 任务完成度、答案正确率 | LLM-as-Judge / 人工评判 |
| 效率 | 步骤数、时间、Token消耗 | 自动统计 |
| 安全性 | 权限越界、数据泄露 | 红队测试、自动化检查 |
| 鲁棒性 | 异常处理、错误恢复 | 注入错误测试 |
| 可解释性 | 推理轨迹清晰度 | 人工评判 |
7.4 Subagent模式和普通工具调用有什么区别?¶
高频面试题 ⭐⭐⭐⭐⭐
参考答案:
| 维度 | 普通Tool Calling | Subagent模式 |
|---|---|---|
| 执行方式 | 函数调用,确定性执行 | 委派给子Agent,具备自主推理能力 |
| 输入输出 | 固定参数 → 固定返回值 | 任务描述 → 结构化报告 |
| 复杂度 | 简单、单步 | 可多步推理、工具链 |
| 适用场景 | 获取天气、计算等确定性操作 | 研究分析、代码审查等需要推理的任务 |
| 典型产品 | OpenAI Function Calling | Claude Code /agent 命令、GitHub Copilot Agent |
三种Subagent实现方式: - Agent-as-Tool:将Agent注册为Tool Calling的工具,编排者通过标准tool_calls调用 - Dynamic Spawn:运行时动态创建子Agent(动态生成system_prompt) - Handoff:Agent主动将对话转交给更合适的Agent(如OpenAI Agents SDK)
7.5 层级模式和平等模式如何选择?¶
面试题 ⭐⭐⭐
参考答案:
- 层级模式:任务有明确的分工和流程,需要中心化协调时使用(如:项目管理、流水线作业)
- 平等模式:需要多角度思考、没有明确分工时使用(如:头脑风暴、辩论、多角度分析)
- 市场模式:任务量大且Agent能力各异,需要动态负载均衡时使用(如:大规模数据处理)
实际项目中常采用混合模式:层级管理 + 平等讨论 + 市场竞标。
7.6 如何减少多Agent系统的成本?¶
面试题 ⭐⭐⭐⭐
参考答案:
- 模型分级:简单任务用小模型(GPT-4o-mini),复杂任务用大模型(GPT-4o)
- 缓存复用:相同输入的工具调用结果缓存
- Prompt压缩:精简System Prompt,避免冗余上下文
- 并行执行:可并行的子任务同时执行,减少总时间
- 早停机制:达到目标质量立即停止,不做多余迭代
- Token预算:设置每次交互的Token上限
📝 本章小结¶
本章系统学习了多Agent系统的核心知识:
- ✅ 理解了多种Agent架构模式(层级/平等/市场/MoA/Subagent)
- ✅ 掌握了Subagent三种实现方式(Agent-as-Tool/Dynamic Spawn/Handoff)
- ✅ 了解了A2A通信协议概念
- ✅ 掌握了基于LangGraph的工作流编排
- ✅ 学习了复杂任务分解策略
- ✅ 完成了自动化研究团队实战项目
- ✅ 掌握了Agent评估方法
✅ 学习检查清单¶
- 能描述层级/平等/市场/MoA/Subagent五种架构模式的特点和适用场景
- 能解释Subagent与普通工具调用的区别
- 了解A2A协议的核心概念
- 能用LangGraph构建带条件路由的多Agent工作流
- 能实现工作流状态管理和审核循环
- 完成自动化研究团队项目
- 能从准确性/效率/安全性三个维度评估Agent
- 能回答多Agent系统相关的面试题
- 了解减少多Agent系统成本的方法
🔗 下一步¶
本章完成了多Agent系统的核心学习。接下来将进入Agent工程化阶段:
✅ 第1章: Agent基础与架构 - 理解核心概念
✅ 第2章: 主流Agent框架 - 掌握开发工具
✅ 第3章: MCP与工具生态 - 构建工具能力
✅ 第4章: 多Agent系统与实战 - 构建复杂系统
→ 第5章: Agent评估与测试 - 质量保障
→ 第6章: Agent生产部署 - 上线运维
→ 第7章: 企业级Agent案例 - 实战经验
继续学习: 05-Agent评估与测试
📚 参考资料¶
- "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation" - Wu et al., 2023
- "MetaGPT: Meta Programming for Multi-Agent Collaborative Framework" - Hong et al., 2023
- "ChatDev: Communicative Agents for Software Development" - Qian et al., 2023
- LangGraph Multi-Agent Tutorial
- A2A Protocol
- "AgentBench: Evaluating LLMs as Agents" - Liu et al., 2023
祝你学习愉快,面试顺利! 🎉🚀
最后更新日期:2026-02-12 适用版本:AI Agent开发实战教程 v2026
