跳转至

多Agent系统与实战

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

掌握多Agent架构模式、通信协议和工作流编排,构建一个完整的自动化研究团队系统。

📌 定位说明:本章聚焦多Agent系统的编码实战(架构实现、A2A通信、项目开发)。 - 多Agent框架对比(LangGraph/CrewAI/AutoGen)→ LLM应用/17-多Agent框架 - 新一代Agent研究前沿(Manus/Claude Code等)→ LLM学习/04-前沿探索/08-新一代AI Agent

📖 章节导读

当单个Agent无法胜任复杂任务时,我们需要多个Agent协作。多Agent系统(Multi-Agent System, MAS)是AI Agent的高级形态——多个具有不同专长的Agent组成团队,通过通信和协调来完成复杂任务。本章将学习多Agent架构模式、A2A通信协议、工作流编排,以及Agent评估方法,并完成一个完整的自动化研究团队项目。

🎯 学习目标

  • 理解多Agent架构模式(层级/平等/市场)
  • 了解Agent-to-Agent (A2A)通信协议
  • 掌握工作流编排与状态管理
  • 学会复杂任务分解策略
  • 掌握Agent评估方法
  • 完成综合实战项目:自动化研究团队

📖 前置知识

  • Agent框架实战经验(第二章)
  • MCP工具开发(第三章)
  • Python异步编程
  • 分布式系统基本概念

1. 多Agent架构模式

1.1 架构模式总览

多Agent架构模式对比

多Agent系统主要有三种架构模式:

Text Only
┌─────────────────────────────────────────────────────────┐
│              多Agent架构模式                              │
│                                                          │
│ ① 层级模式        ② 平等模式         ③ 市场模式         │
│                                                          │
│   Manager           A ←→ B            A → 任务池 ← B   │
│   ┌─┴─┐           ↕     ↕            C → 任务池 ← D   │
│   A   B   C       C ←→ D                                │
│                                                          │
│ 适用: 流程明确    适用: 自由协作     适用: 动态分配      │
│ 例: 项目管理     例: 头脑风暴       例: 众包平台        │
└─────────────────────────────────────────────────────────┘

1.2 层级模式(Hierarchical)

在层级模式中,有一个Manager Agent负责任务分配和协调。

Python
"""
层级模式:Manager Agent分配任务给Worker Agent
"""

from openai import OpenAI
import json
from typing import Any
from dataclasses import dataclass

client = OpenAI()

@dataclass
class AgentProfile:
    """Agent角色定义"""
    name: str
    role: str
    capabilities: list[str]
    system_prompt: str

class HierarchicalSystem:
    """层级式多Agent系统"""

    def __init__(self):
        self.manager = AgentProfile(
            name="项目经理",
            role="manager",
            capabilities=["任务分解", "任务分配", "结果整合"],
            system_prompt="""你是一个项目经理Agent。你的职责是:
1. 分析用户的复杂任务
2. 将任务分解为子任务
3. 分配给合适的团队成员
4. 整合各成员的结果

你的团队成员:
- 研究员:擅长信息搜索和分析
- 工程师:擅长编写代码和技术方案
- 写作者:擅长报告和文档撰写

请以JSON格式返回任务分配计划:
{"subtasks": [{"assignee": "研究员/工程师/写作者", "task": "具体任务", "priority": 1-3}]}"""
        )

        self.workers = {
            "研究员": AgentProfile(
                name="研究员",
                role="researcher",
                capabilities=["搜索", "分析", "总结"],
                system_prompt="你是一个专业研究员。请深入研究指定主题,提供详细的分析和数据支持。",
            ),
            "工程师": AgentProfile(
                name="工程师",
                role="engineer",
                capabilities=["编码", "架构设计", "技术选型"],
                system_prompt="你是一个高级软件工程师。请提供高质量的技术方案和代码实现。",
            ),
            "写作者": AgentProfile(
                name="写作者",
                role="writer",
                capabilities=["写作", "编辑", "排版"],
                system_prompt="你是一个专业文档写作者。请将信息整理成清晰、专业的文档。",
            ),
        }

    def _call_llm(self, system_prompt: str, user_message: str) -> str:
        """调用LLM"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_message},
            ],
            temperature=0,
        )
        return response.choices[0].message.content

    def decompose_task(self, task: str) -> list[dict]:
        """Manager分解任务"""
        print(f"📋 [Manager] 开始分解任务: {task}")

        response = self._call_llm(
            self.manager.system_prompt,
            f"请分解以下任务: {task}"
        )

        # 尝试解析JSON
        try:
            # 提取JSON部分
            json_str = response
            if "```json" in response:
                json_str = response.split("```json")[1].split("```")[0]
            elif "```" in response:
                json_str = response.split("```")[1].split("```")[0]

            plan = json.loads(json_str)
            subtasks = plan.get("subtasks", [])
        except (json.JSONDecodeError, IndexError):
            # 回退方案
            subtasks = [
                {"assignee": "研究员", "task": f"研究关于'{task}'的信息", "priority": 1},
                {"assignee": "工程师", "task": f"为'{task}'提供技术方案", "priority": 2},
                {"assignee": "写作者", "task": f"撰写'{task}'的最终报告", "priority": 3},
            ]

        print(f"  ✅ 分解为 {len(subtasks)} 个子任务")
        for st in subtasks:
            print(f"    → [{st['assignee']}] {st['task']}")

        return subtasks

    def execute_subtask(self, subtask: dict) -> str:
        """Worker执行子任务"""
        assignee = subtask["assignee"]
        task = subtask["task"]

        worker = self.workers.get(assignee)
        if not worker:
            return f"❌ 未找到Agent: {assignee}"

        print(f"\n🔧 [{worker.name}] 执行任务: {task}")
        result = self._call_llm(worker.system_prompt, task)
        print(f"  ✅ [{worker.name}] 完成")

        return result

    def integrate_results(self, task: str, results: list[dict]) -> str:
        """Manager整合结果"""
        print(f"\n📊 [Manager] 整合所有结果...")

        results_text = "\n\n".join([
            f"### {r['assignee']}的工作结果:\n{r['result']}"
            for r in results
        ])

        integration_prompt = f"""作为项目经理,请整合以下团队成员的工作结果,生成一份完整的最终报告。

原始任务: {task}

团队成员的工作结果:
{results_text}

请生成一份结构化的最终报告,包含:摘要、详细内容、结论和建议。"""

        final_report = self._call_llm(self.manager.system_prompt, integration_prompt)
        return final_report

    def run(self, task: str) -> str:
        """运行完整的多Agent工作流"""
        print("=" * 60)
        print(f"🚀 启动多Agent系统")
        print(f"📌 任务: {task}")
        print("=" * 60)

        # 1. 任务分解
        subtasks = self.decompose_task(task)

        # 2. 按优先级排序并执行
        subtasks.sort(key=lambda x: x.get("priority", 99))  # sort原地排序,lambda+dict.get提供默认优先级99(无优先级的排最后)

        results = []
        for subtask in subtasks:
            result = self.execute_subtask(subtask)
            results.append({
                "assignee": subtask["assignee"],
                "task": subtask["task"],
                "result": result,
            })

        # 3. 整合结果
        final_report = self.integrate_results(task, results)

        print("\n" + "=" * 60)
        print("📋 最终报告:")
        print("=" * 60)

        return final_report

# 使用示例
system = HierarchicalSystem()
report = system.run("分析AI Agent技术在金融领域的应用前景,并给出具体的落地建议")
print(report)

1.3 平等模式(Peer-to-Peer)

Agent之间可以直接通信和协作,没有中心化调度。

Python
"""
平等模式:Agent之间直接通信协作
适用于头脑风暴、辩论、多角度分析等场景
"""

from openai import OpenAI
from dataclasses import dataclass, field

client = OpenAI()

@dataclass
class PeerAgent:
    """平等协作Agent"""
    name: str
    perspective: str  # 分析视角
    system_prompt: str
    memory: list[dict] = field(default_factory=list)  # dataclass可变默认值必须用field(default_factory=),避免共享引用

    def respond(self, discussion_context: str) -> str:
        """参与讨论并发表观点"""
        messages = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": discussion_context},
        ]

        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            temperature=0.7,  # 稍高温度以增加多样性
            max_tokens=500,
        )

        return response.choices[0].message.content

class PeerDiscussion:
    """平等讨论系统"""

    def __init__(self, topic: str, rounds: int = 3):
        self.topic = topic
        self.rounds = rounds
        self.agents: list[PeerAgent] = []
        self.discussion_log: list[dict] = []

    def add_agent(self, agent: PeerAgent):
        self.agents.append(agent)

    def run(self) -> str:
        """运行多轮讨论"""
        print(f"📢 讨论主题: {self.topic}")
        print(f"👥 参与者: {', '.join(a.name for a in self.agents)}")
        print(f"🔄 讨论轮数: {self.rounds}")
        print("=" * 60)

        for round_num in range(1, self.rounds + 1):
            print(f"\n--- 第 {round_num} 轮 ---")

            for agent in self.agents:
                # 构建讨论上下文
                context = f"讨论主题: {self.topic}\n\n"
                if self.discussion_log:
                    context += "之前的讨论:\n"
                    for entry in self.discussion_log[-6:]:  # 最近6条
                        context += f"[{entry['agent']}]: {entry['content']}\n\n"

                if round_num == 1:
                    context += f"请从你的专业角度({agent.perspective})发表你对该主题的看法。"
                else:
                    context += f"请基于其他人的观点,从你的角度({agent.perspective})进一步补充或反驳。"

                # Agent发言
                response = agent.respond(context)

                self.discussion_log.append({
                    "round": round_num,
                    "agent": agent.name,
                    "content": response,
                })

                print(f"\n🗣️ [{agent.name}]:")
                print(f"  {response[:200]}...")

        # 生成讨论总结
        return self._summarize()

    def _summarize(self) -> str:
        """总结讨论结果"""
        all_discussion = "\n\n".join([
            f"[{entry['agent']} - 第{entry['round']}轮]: {entry['content']}"
            for entry in self.discussion_log
        ])

        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "你是一个公正的讨论总结者。请综合各方观点,生成一份平衡、全面的总结报告。"},
                {"role": "user", "content": f"讨论主题: {self.topic}\n\n讨论内容:\n{all_discussion}\n\n请生成讨论总结,包含:共识点、分歧点、关键洞察和建议。"},
            ],
        )
        return response.choices[0].message.content

# 使用示例: 多角度分析
discussion = PeerDiscussion("AI Agent是否会取代传统软件开发?", rounds=2)

discussion.add_agent(PeerAgent(
    name="技术乐观派",
    perspective="技术进步和创新",
    system_prompt="你是一个技术乐观主义者,相信AI Agent将极大提升软件开发效率和质量。请用具体案例和数据支持你的观点。",
))

discussion.add_agent(PeerAgent(
    name="务实工程师",
    perspective="工程实践",
    system_prompt="你是一位务实的软件工程师,从实际工程经验出发分析AI Agent的能力边界和局限性。",
))

discussion.add_agent(PeerAgent(
    name="产业分析师",
    perspective="商业和就业影响",
    system_prompt="你是一位产业分析师,关注AI Agent对软件行业商业模式和就业市场的影响。",
))

summary = discussion.run()
print(f"\n{'='*60}\n📝 讨论总结:\n{summary}")

1.4 市场模式(Market-based)

Agent通过"竞标"或"任务池"动态分配任务。

Python
"""
市场模式:基于任务池的动态任务分配
Agent根据能力和负载竞争任务
"""

from dataclasses import dataclass, field
import random

@dataclass
class Task:
    """任务定义"""
    id: str
    description: str
    required_skills: list[str]
    complexity: int  # 1-5
    reward: float    # 奖励积分
    status: str = "pending"  # pending/assigned/completed
    assigned_to: str | None = None
    result: str | None = None

@dataclass
class MarketAgent:
    """市场中的Agent"""
    name: str
    skills: list[str]
    capacity: int = 3          # 最大并行任务数
    current_tasks: int = 0
    total_reward: float = 0.0

    def can_handle(self, task: Task) -> bool:
        """判断是否能处理该任务"""
        # 有能力且有空闲
        has_skills = all(s in self.skills for s in task.required_skills)  # all():所有需求技能都在自身技能列表中才返回True
        has_capacity = self.current_tasks < self.capacity
        return has_skills and has_capacity

    def bid(self, task: Task) -> float:
        """对任务出价(置信度)"""
        if not self.can_handle(task):
            return 0.0

        # 技能匹配度
        skill_match = len(set(task.required_skills) & set(self.skills)) / len(task.required_skills)
        # 空闲率
        availability = 1 - (self.current_tasks / self.capacity)

        return skill_match * 0.7 + availability * 0.3

    def execute(self, task: Task) -> str:
        """执行任务(模拟)"""
        self.current_tasks += 1
        result = f"[{self.name}] 已完成任务: {task.description}"
        self.current_tasks -= 1
        self.total_reward += task.reward
        return result

class TaskMarket:
    """任务市场"""

    def __init__(self):
        self.agents: list[MarketAgent] = []
        self.task_queue: list[Task] = []
        self.completed_tasks: list[Task] = []

    def register_agent(self, agent: MarketAgent):
        """注册Agent"""
        self.agents.append(agent)
        print(f"✅ Agent注册: {agent.name} (技能: {', '.join(agent.skills)})")

    def submit_task(self, task: Task):
        """提交任务"""
        self.task_queue.append(task)
        print(f"📌 新任务: [{task.id}] {task.description} (需要: {', '.join(task.required_skills)})")

    def allocate_tasks(self):
        """分配所有待分配的任务"""
        print(f"\n🔄 开始任务分配...")

        for task in self.task_queue:
            if task.status != "pending":
                continue

            # 收集所有出价
            bids = []
            for agent in self.agents:
                bid_value = agent.bid(task)
                if bid_value > 0:
                    bids.append((agent, bid_value))

            if not bids:
                print(f"  ⚠️ 任务 [{task.id}] 无Agent可处理")
                continue

            # 选择出价最高的Agent
            bids.sort(key=lambda x: x[1], reverse=True)  # 按元组第二个元素(出价)降序排序,bids[0]即最高出价
            winner = bids[0][0]

            task.status = "assigned"
            task.assigned_to = winner.name

            print(f"  📋 任务 [{task.id}] → {winner.name} (置信度: {bids[0][1]:.2f})")

    def execute_all(self):
        """执行所有已分配的任务"""
        print(f"\n⚡ 执行任务...")

        for task in self.task_queue:
            if task.status != "assigned":
                continue

            agent = next(a for a in self.agents if a.name == task.assigned_to)  # next+生成器:找第一个匹配的Agent,无匹配时抛StopIteration
            task.result = agent.execute(task)
            task.status = "completed"
            self.completed_tasks.append(task)

            print(f"  ✅ {task.result}")

    def report(self):
        """生成报告"""
        print(f"\n{'='*50}")
        print(f"📊 任务市场报告")
        print(f"  完成任务数: {len(self.completed_tasks)}")
        print(f"  Agent表现:")
        for agent in self.agents:
            print(f"    {agent.name}: 积分={agent.total_reward:.1f}")

# 使用示例
market = TaskMarket()

# 注册Agent
market.register_agent(MarketAgent("搜索专家", ["搜索", "信息提取"]))
market.register_agent(MarketAgent("数据分析师", ["数据分析", "可视化", "统计"]))
market.register_agent(MarketAgent("全栈工程师", ["编码", "架构", "搜索"]))
market.register_agent(MarketAgent("技术写手", ["写作", "编辑", "信息提取"]))

# 提交任务
market.submit_task(Task("T1", "搜索AI Agent最新论文", ["搜索", "信息提取"], 2, 10))
market.submit_task(Task("T2", "分析框架使用趋势数据", ["数据分析", "可视化"], 3, 15))
market.submit_task(Task("T3", "编写Agent系统代码", ["编码", "架构"], 4, 20))
market.submit_task(Task("T4", "撰写技术调研报告", ["写作", "信息提取"], 2, 10))

# 分配和执行
market.allocate_tasks()
market.execute_all()
market.report()

1.5 Mixture-of-Agents (MoA) 架构

Mixture-of-Agents(MoA,Agent混合架构)是 Together AI 在 2024 年提出的多Agent推理架构(ICLR 2025),核心思想是让多个LLM分层协作,逐层精炼输出,在多项基准(如AlpacaEval 2.0)上超越同期单一模型。

Text Only
MoA 分层架构:

           ┌──────────────────────────┐
Layer 3    │    Aggregator Agent      │  ← 最终聚合输出
(聚合层)    │  (综合所有观点,给出最终回答)  │
           └────────────┬─────────────┘
                        │ 收集Layer 2所有输出
           ┌────────────┼─────────────┐
Layer 2    │ Agent-D    │ Agent-E     │  ← 精炼层
(精炼层)    │ (精炼分析)  │ (质量检查)   │
           └────────────┼─────────────┘
                        │ 收集Layer 1所有输出
           ┌────────────┼─────────────┐
Layer 1    │ Agent-A    │ Agent-B    │ Agent-C  │  ← 基础生成层
(生成层)    │ (GPT-4o)  │ (Claude)   │ (Gemini) │
           └────────────┴─────────────┘
                    用户问题

核心原理:不同LLM具有不同的知识偏向和推理风格,通过多层"协作-精炼"可以互相补充、纠错。

Python
"""
Mixture-of-Agents (MoA) 简化实现

核心思想:多个LLM分层协作,逐层精炼
参考论文:Together AI "Mixture-of-Agents Enhances Large Language Model Capabilities" (2024)
"""

from openai import OpenAI
from typing import List, Dict
import asyncio

class MixtureOfAgents:
    """
    Mixture-of-Agents 推理框架

    架构:多层Agent,每层Agent可看到上一层所有输出
    """

    def __init__(self, layers: List[List[Dict]]):
        """
        初始化MoA

        Args:
            layers: 每层Agent的配置列表(本实现仅使用OpenAI客户端)
                    [
                        [{"model": "gpt-4o", "role": "分析师"}, ...],  # Layer 1
                        [{"model": "gpt-4o", "role": "精炼者"}, ...],  # Layer 2
                        [{"model": "gpt-4o", "role": "聚合者"}],  # Final layer
                    ]
        """
        self.layers = layers
        self.client = OpenAI()

    def _call_agent(self, model: str, role: str, prompt: str,
                    prev_responses: List[str] = None) -> str:
        """
        调用单个Agent

        如果有前一层的输出,将其作为参考信息注入提示词
        """
        messages = [{"role": "system", "content": f"你是一个{role}。"}]

        if prev_responses:
            # 将前一层所有Agent的输出作为参考
            ref_text = "\n\n---\n\n".join(
                [f"【参考回答 {i+1}\n{r}" for i, r in enumerate(prev_responses)]
            )
            messages.append({
                "role": "user",
                "content": (
                    f"以下是其他专家对同一问题的回答,请参考并给出你的改进版回答:\n\n"
                    f"{ref_text}\n\n"
                    f"---\n原始问题:{prompt}\n\n"
                    f"请综合以上参考,给出更准确、更全面的回答。"
                )
            })
        else:
            messages.append({"role": "user", "content": prompt})

        response = self.client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=0.7
        )
        return response.choices[0].message.content

    def run(self, query: str) -> str:
        """
        执行MoA多层推理

        Returns:
            最终聚合后的回答
        """
        prev_responses = []

        for layer_idx, layer_agents in enumerate(self.layers):
            current_responses = []
            print(f"\n🔄 Layer {layer_idx + 1} ({len(layer_agents)} agents)...")

            # 注意:同一层内的Agent在此实现中是顺序执行的。
            # 若需真正并行,可改为 asyncio.gather() 并发调用,显著降低层内延迟。
            for agent_cfg in layer_agents:
                response = self._call_agent(
                    model=agent_cfg["model"],
                    role=agent_cfg["role"],
                    prompt=query,
                    prev_responses=prev_responses if prev_responses else None
                )
                current_responses.append(response)
                print(f"  ✅ {agent_cfg['role']} ({agent_cfg['model']}) 完成")

            prev_responses = current_responses

        # 返回最后一层(聚合层)的输出
        return prev_responses[-1]

# === 使用示例 ===
moa = MixtureOfAgents(layers=[
    # Layer 1:多模型并行生成(利用模型多样性)
    [
        {"model": "gpt-4o", "role": "技术分析师"},
        {"model": "gpt-4o-mini", "role": "通俗解释者"},
        {"model": "gpt-4o", "role": "批判性审稿人"},
    ],
    # Layer 2:精炼(综合Layer 1输出)
    [
        {"model": "gpt-4o", "role": "综合精炼专家"},
        {"model": "gpt-4o", "role": "事实验证者"},
    ],
    # Layer 3:最终聚合
    [
        {"model": "gpt-4o", "role": "首席聚合官"},
    ],
])

result = moa.run("比较 Transformer 和 Mamba 架构的优劣势")
print(f"\n📋 MoA最终回答:\n{result}")

MoA vs 传统方法对比

维度 单模型 简单集成(投票) MoA
推理质量 ⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
信息利用 单一视角 多视角独立 多视角+精炼
错误纠正 多数投票 逐层纠正
计算成本
适用场景 简单任务 分类/判断 复杂推理/分析

📌 交叉引用:MoA与层级模式的区别——层级模式侧重任务分工(不同Agent做不同子任务),MoA侧重回答精炼(不同Agent对同一问题给出不同回答,再逐层综合)。

1.6 Subagent模式(Agent-as-Tool)

Subagent(子Agent) 是当前Agent领域最热门的架构模式之一。核心思想是:把一个完整的Agent包装成另一个Agent的"工具"来调用,实现能力的模块化组合。

Text Only
Subagent模式 vs 传统工具调用:

传统Tool Calling:
  Agent → Tool(函数调用,确定性执行)
           └── get_weather("北京")  → {"temp": 22}

Subagent模式:
  Orchestrator Agent → Subagent(自身也是Agent,具备推理能力)
                        └── 研究Agent: 接收"调研AI趋势"
                            → 自主搜索 → 阅读 → 分析 → 生成报告
                            → 返回完整的研究报告

关键区别: Tool是"调用函数取结果",Subagent是"委托一个有自主推理能力的Agent去完成任务"

三种Subagent实现方式

Python
"""
Subagent三种模式的完整实现

1. Agent-as-Tool: 将Agent注册为Tool Calling中的工具
2. Dynamic Spawn: 运行时动态创建子Agent
3. Handoff: 任务转交(OpenAI Agents SDK风格)

参考: Anthropic "Building Effective Agents" (2025) — Orchestrator-Workers
"""

from openai import OpenAI
import json
from typing import Callable

client = OpenAI()

# ============================================================
# 模式1: Agent-as-Tool — 将Agent包装为OpenAI Tool Calling的工具
# ============================================================

class AgentAsTool:
    """将一个Agent包装为另一个Agent可调用的Tool"""

    def __init__(self, name: str, description: str, system_prompt: str):
        self.name = name
        self.description = description
        self.system_prompt = system_prompt

    def run(self, task: str) -> str:
        """Agent自主执行任务(内部可能有多轮推理)"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": task},
            ],
            temperature=0,
        )
        return response.choices[0].message.content

    def as_tool_schema(self) -> dict:
        """导出为OpenAI Tool Calling格式"""
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": {
                    "type": "object",
                    "properties": {
                        "task": {
                            "type": "string",
                            "description": "要交给该Agent执行的任务描述",
                        }
                    },
                    "required": ["task"],
                },
            },
        }

# 定义Subagent
code_agent = AgentAsTool(
    name="code_expert",
    description="编程专家Agent,擅长代码编写、调试和代码审查",
    system_prompt="你是一个高级Python工程师。收到任务后,独立完成代码编写和测试。输出完整代码和简要说明。",
)

research_agent = AgentAsTool(
    name="research_expert",
    description="研究专家Agent,擅长搜索信息、分析论文、生成调研报告",
    system_prompt="你是一个资深研究员。收到课题后,进行深入分析,输出结构化的研究报告。",
)

# 编排者Agent使用Subagent作为工具
subagents = {"code_expert": code_agent, "research_expert": research_agent}
tools = [agent.as_tool_schema() for agent in subagents.values()]

def orchestrator_with_subagents(user_request: str, max_steps: int = 5) -> str:
    """编排者Agent:通过Tool Calling调用Subagent"""
    messages = [
        {"role": "system", "content": """你是一个项目经理Agent。你手下有多个专家Agent可以调用。
分析用户需求,将子任务委派给合适的专家Agent,最后汇总结果给用户。
每个专家Agent都是独立运行的,你只需要描述清楚任务即可。"""},
        {"role": "user", "content": user_request},
    ]

    for step in range(max_steps):
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools,
            temperature=0,
        )
        message = response.choices[0].message

        # 无工具调用 → 编排者已得出最终结论
        if not message.tool_calls:
            return message.content

        messages.append(message)

        # 调用Subagent
        for tool_call in message.tool_calls:
            agent_name = tool_call.function.name
            args = json.loads(tool_call.function.arguments)
            task = args.get("task", "")

            print(f"  🤖 [Orchestrator → {agent_name}] {task[:60]}...")
            result = subagents[agent_name].run(task)
            print(f"  ✅ [{agent_name}] 返回 {len(result)} 字符")

            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            })

    return "达到最大步骤数"

# ============================================================
# 模式2: Dynamic Spawn — 运行时动态创建子Agent
# ============================================================

def dynamic_subagent(task: str, context: str = "") -> str:
    """动态创建一个临时Subagent执行特定任务

    与预定义Agent不同,Dynamic Spawn根据任务需求
    在运行时构造专属的system_prompt和参数配置。
    适用于编排者无法预知子任务类型的场景。
    """

    # 让LLM分析任务需求,动态生成Subagent的配置
    config_resp = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": """分析任务需求,生成最适合的Agent配置。
返回JSON: {"role": "角色名", "system_prompt": "专家提示词", "temperature": 0.0-1.0}"""},
            {"role": "user", "content": f"任务: {task}\n上下文: {context}"},
        ],
        response_format={"type": "json_object"},
        temperature=0,
    )
    config = json.loads(config_resp.choices[0].message.content)

    print(f"  🔧 动态创建Subagent: {config.get('role', 'unknown')}")

    # 用动态生成的配置创建并运行子Agent
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": config.get("system_prompt", "你是一个有用的助手。")},
            {"role": "user", "content": task},
        ],
        temperature=config.get("temperature", 0),
    )
    return response.choices[0].message.content

# ============================================================
# 模式3: Handoff — 任务转交(类似OpenAI Agents SDK的handoffs)
# ============================================================

class HandoffAgent:
    """支持任务转交的Agent

    当Agent判断任务超出自身能力范围时,
    主动将对话转交给更合适的Agent继续处理。
    人类用户无感知,对话上下文无损传递。
    """

    def __init__(self, name: str, system_prompt: str, handoff_targets: dict[str, 'HandoffAgent'] = None):
        self.name = name
        self.system_prompt = system_prompt
        self.handoff_targets = handoff_targets or {}

    def run(self, messages: list[dict], max_handoffs: int = 3) -> str:
        """运行Agent,支持自动转交"""
        current_agent = self
        handoff_count = 0

        while handoff_count < max_handoffs:
            # 构造当前Agent的系统提示(含转交能力声明)
            handoff_info = ""
            if current_agent.handoff_targets:
                targets = ", ".join(
                    f"{name}{a.system_prompt[:30]}...)"
                    for name, a in current_agent.handoff_targets.items()
                )
                handoff_info = f"\n\n你可以将对话转交给其他专家: {targets}\n转交时回复格式: HANDOFF:专家名称"

            full_messages = [
                {"role": "system", "content": current_agent.system_prompt + handoff_info},
            ] + messages

            response = client.chat.completions.create(
                model="gpt-4o",
                messages=full_messages,
                temperature=0,
            )
            reply = response.choices[0].message.content

            # 检查是否请求转交
            if reply.strip().startswith("HANDOFF:"):
                target_name = reply.strip().split("HANDOFF:")[1].strip()
                if target_name in current_agent.handoff_targets:
                    print(f"  🔄 [{current_agent.name}] → 转交给 [{target_name}]")
                    current_agent = current_agent.handoff_targets[target_name]
                    handoff_count += 1
                    continue

            return f"[{current_agent.name}] {reply}"

        return f"[{current_agent.name}] 达到最大转交次数"

# === 使用示例 ===

# 模式1: Agent-as-Tool
print("=== 模式1: Agent-as-Tool ===")
result = orchestrator_with_subagents("帮我调研目前主流的Agent框架,然后写一个LangGraph的Hello World示例")
print(f"结果: {result[:300]}...\n")

# 模式2: Dynamic Spawn
print("=== 模式2: Dynamic Spawn ===")
result = dynamic_subagent("分析Transformer注意力机制的计算复杂度", context="面向大三本科生的教学文档")
print(f"结果: {result[:300]}...\n")

# 模式3: Handoff
print("=== 模式3: Handoff ===")
billing_agent = HandoffAgent("账单专家", "你是账单和支付问题的专家。只回答账单相关问题,其他问题请转交。")
tech_agent = HandoffAgent("技术支持", "你是技术支持专家。解决软件使用和技术故障问题。")
triage_agent = HandoffAgent(
    "分诊Agent",
    "你是客服分诊Agent。判断用户问题类型,转交给对应的专家。不要自己回答具体问题。",
    handoff_targets={"账单专家": billing_agent, "技术支持": tech_agent},
)

result = triage_agent.run([{"role": "user", "content": "我的软件一直崩溃,怎么办?"}])
print(f"结果: {result[:300]}...")

Subagent模式选择指南

模式 适用场景 优点 缺点
Agent-as-Tool 编排者的子Agent确定、可复用 标准化、接口清晰 灵活度有限
Dynamic Spawn 子任务类型不可预知 高度灵活 生成配置有额外开销
Handoff 客服分流、多轮对话中转交 上下文无损传递 链路较长时难调试

📌 交叉引用:Subagent的设计模式基础(Orchestrator-Worker)→ 01-Agent基础与架构 §4.6;OpenAI Agents SDK的Handoff机制 → LLM应用/22-结构化输出与函数调用;Anthropic Skills(将Agent常用工作流保存为可复用技能) → LLM学习/04-前沿探索/08-新一代AI-Agent


2. Agent通信协议(A2A)

2.1 A2A概念

Agent-to-Agent (A2A) 是业界提出的Agent间通信协议概念,目标是实现不同框架、不同开发者构建的Agent之间的互操作。

Python
"""
A2A通信概念实现
定义Agent间的标准化消息格式
"""

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Callable
import json
import uuid

class MessageType(Enum):
    """消息类型"""
    REQUEST = "request"        # 任务请求
    RESPONSE = "response"      # 任务响应
    STATUS = "status"          # 状态更新
    DELEGATE = "delegate"      # 任务委托
    NOTIFY = "notify"          # 通知

@dataclass
class AgentCard:
    """Agent名片 - 描述Agent的能力"""
    agent_id: str
    name: str
    description: str
    capabilities: list[str]
    input_schema: dict      # 接受的输入格式
    output_schema: dict     # 输出格式
    endpoint: str           # 通信端点

@dataclass
class A2AMessage:
    """Agent间通信消息"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))  # default_factory接收lambda,每次实例化生成新UUID
    type: MessageType = MessageType.REQUEST
    sender: str = ""         # 发送方Agent ID
    receiver: str = ""       # 接收方Agent ID
    content: dict = field(default_factory=dict)
    metadata: dict = field(default_factory=dict)
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    reply_to: str | None = None  # 回复的消息ID

    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "type": self.type.value,
            "sender": self.sender,
            "receiver": self.receiver,
            "content": self.content,
            "metadata": self.metadata,
            "timestamp": self.timestamp,
            "reply_to": self.reply_to,
        }

    def to_json(self) -> str:
        return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)

class A2ARouter:
    """Agent间消息路由器"""

    def __init__(self):
        self.agents: dict[str, AgentCard] = {}
        self.message_handlers: dict[str, Callable] = {}
        self.message_log: list[A2AMessage] = []

    def register(self, card: AgentCard, handler: Callable):
        """注册Agent"""
        self.agents[card.agent_id] = card
        self.message_handlers[card.agent_id] = handler
        print(f"📝 注册Agent: {card.name} ({card.agent_id})")

    def discover(self, required_capabilities: list[str]) -> list[AgentCard]:
        """发现具有指定能力的Agent"""
        results = []
        for card in self.agents.values():
            # all()判断所有要求的能力是否都在该Agent的能力列表中(子集关系检查)
            if all(cap in card.capabilities for cap in required_capabilities):
                results.append(card)
        return results

    def send(self, message: A2AMessage) -> A2AMessage | None:
        """路由消息"""
        self.message_log.append(message)

        if message.receiver not in self.message_handlers:
            print(f"❌ 目标Agent不存在: {message.receiver}")
            return None

        handler = self.message_handlers[message.receiver]
        response = handler(message)

        if response:
            self.message_log.append(response)

        return response

# === 使用示例 ===

router = A2ARouter()

# 注册Agent
def research_handler(msg: A2AMessage) -> A2AMessage:
    print(f"  🔬 [研究Agent] 收到请求: {msg.content.get('task', '')}")
    return A2AMessage(
        type=MessageType.RESPONSE,
        sender="agent-research",
        receiver=msg.sender,
        content={"result": "找到5篇相关论文", "papers": ["论文A", "论文B"]},
        reply_to=msg.id,
    )

router.register(
    AgentCard(
        agent_id="agent-research",
        name="研究Agent",
        description="学术研究和文献搜索",
        capabilities=["搜索", "文献分析"],
        input_schema={"type": "object", "properties": {"task": {"type": "string"}}},
        output_schema={"type": "object", "properties": {"result": {"type": "string"}}},
        endpoint="local://agent-research",
    ),
    research_handler,
)

# 发送消息
msg = A2AMessage(
    type=MessageType.REQUEST,
    sender="agent-manager",
    receiver="agent-research",
    content={"task": "搜索关于ReAct的最新论文"},
)

response = router.send(msg)
if response:
    print(f"  ✅ 收到响应: {response.content}")

2.2 ANP(Agent Network Protocol)

除了MCP(Agent↔工具)和A2A(Agent↔Agent),还有一个重要的新兴协议——ANP(Agent Network Protocol),它解决的是跨网络的Agent发现与通信问题。

三大Agent协议对比

协议 层级 解决的问题 类比
MCP Agent↔工具 Agent如何调用外部工具和数据源 USB接口
A2A Agent↔Agent 同一组织内Agent如何互相调用 局域网通信
ANP Agent↔互联网 跨组织、跨网络的Agent如何发现和信任彼此 互联网协议(HTTP)

ANP的核心设计

Text Only
┌──────────────────────────────────────────────────────┐
│                    ANP协议栈                          │
├──────────────────────────────────────────────────────┤
│  身份层(Identity Layer)                               │
│    • 基于DID (去中心化标识符) 的Agent身份              │
│    • 可验证凭证 (VC) 实现信任链                        │
│    • 无需中心化注册机构                                │
├──────────────────────────────────────────────────────┤
│  发现层(Discovery Layer)                              │
│    • Agent Description文件 (类似robots.txt)           │
│    • 能力声明与服务端点发布                             │
│    • P2P或注册中心两种发现模式                          │
├──────────────────────────────────────────────────────┤
│  通信层(Communication Layer)                          │
│    • 基于HTTP/WebSocket的消息交换                      │
│    • 端到端加密                                       │
│    • 异步任务追踪与状态回调                             │
└──────────────────────────────────────────────────────┘

ANP vs A2A的关键区别: - A2A 假设Agent在受信任环境内(如同一组织),重点是任务协作 - ANP 面向开放互联网,重点是身份验证、信任建立和跨域发现 - A2A适合企业内部Agent编排,ANP适合构建Agent互联网生态

简化的ANP Agent发现流程

Python
"""ANP Agent发现与通信概念演示"""

from dataclasses import dataclass, field
import json

@dataclass
class AgentDescriptor:
    """Agent描述符 —— 类似ANP的Agent Description文件"""
    agent_id: str           # DID格式: did:anp:agent-xxx
    name: str
    description: str
    capabilities: list[str]
    endpoint: str           # 通信端点
    protocols: list[str] = field(default_factory=lambda: ["anp/1.0"])  # lambda返回新列表实例,避免可变默认值陷阱
    trust_score: float = 0.0  # 信任评分

class AgentRegistry:
    """简化的Agent注册与发现中心"""

    def __init__(self):
        self.agents: dict[str, AgentDescriptor] = {}

    def register(self, descriptor: AgentDescriptor):
        """注册Agent到网络"""
        self.agents[descriptor.agent_id] = descriptor
        print(f"  ✅ Agent注册成功: {descriptor.name} ({descriptor.agent_id})")

    def discover(self, capability: str) -> list[AgentDescriptor]:
        """按能力发现Agent"""
        return [
            a for a in self.agents.values()
            if capability in a.capabilities
        ]

    def verify_identity(self, agent_id: str) -> bool:
        """验证Agent身份(简化版,实际应使用DID/VC验证)"""
        return agent_id in self.agents

# 示例:Agent发现与跨网络调用
registry = AgentRegistry()

# 注册不同组织的Agent
registry.register(AgentDescriptor(
    agent_id="did:anp:travel-agent-001",
    name="携程旅行助手",
    description="提供机票、酒店查询和预订服务",
    capabilities=["机票查询", "酒店预订", "行程规划"],
    endpoint="https://api.ctrip.com/agent/v1",
))

registry.register(AgentDescriptor(
    agent_id="did:anp:weather-agent-001",
    name="天气服务Agent",
    description="提供全球天气预报",
    capabilities=["天气查询", "气象预警"],
    endpoint="https://api.weather.com/agent/v1",
))

# 发现具有"机票查询"能力的Agent
travel_agents = registry.discover("机票查询")
for agent in travel_agents:
    print(f"  🔍 发现Agent: {agent.name} @ {agent.endpoint}")
    if registry.verify_identity(agent.agent_id):
        print(f"     ✅ 身份验证通过")

📌 注意:ANP协议仍在早期发展阶段(2025年提出),尚未形成统一标准。以上代码仅展示核心概念。实际的ANP实现会涉及DID解析、可验证凭证(VC)验证、端到端加密等复杂机制。关注 ANP协议规范 获取最新进展。


3. 工作流编排与状态管理

3.1 基于LangGraph的多Agent工作流

Python
"""
基于LangGraph的多Agent工作流编排
实现: 研究 → 分析 → 写作 → 审核 的流水线
"""

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage

# === 1. 定义全局状态 ===

# TypedDict定义结构化字典类型,LangGraph用作工作流状态容器
class WorkflowState(TypedDict):
    """工作流共享状态"""
    # Annotated[list, add_messages]:add_messages是归约函数,新消息追加而非覆盖
    messages: Annotated[list, add_messages]
    task: str                    # 原始任务
    research_result: str         # 研究结果
    analysis_result: str         # 分析结果
    draft: str                   # 写作草稿
    review_feedback: str         # 审核反馈
    final_output: str            # 最终输出
    revision_count: int          # 修改次数
    status: str                  # 工作流状态

# === 2. 定义Agent节点 ===

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def researcher_node(state: WorkflowState) -> WorkflowState:
    """研究Agent: 搜索和收集信息"""
    task = state["task"]

    response = llm.invoke([
        SystemMessage(content="""你是一个专业研究员。请针对给定主题进行深入研究。
输出格式:
1. 关键发现(3-5条)
2. 数据/证据支持
3. 相关参考来源"""),
        HumanMessage(content=f"请研究: {task}"),
    ])

    return {
        "research_result": response.content,
        "status": "researched",
        "messages": [AIMessage(content=f"[研究员] {response.content[:200]}...")],
    }

def analyst_node(state: WorkflowState) -> WorkflowState:
    """分析Agent: 深度分析研究数据"""
    research = state["research_result"]
    task = state["task"]

    response = llm.invoke([
        SystemMessage(content="""你是一个数据分析专家。基于研究数据进行深度分析。
输出格式:
1. 核心洞察
2. 趋势分析
3. 机会与风险
4. 建议"""),
        HumanMessage(content=f"任务: {task}\n\n研究数据:\n{research}"),
    ])

    return {
        "analysis_result": response.content,
        "status": "analyzed",
        "messages": [AIMessage(content=f"[分析师] {response.content[:200]}...")],
    }

def writer_node(state: WorkflowState) -> WorkflowState:
    """写作Agent: 撰写报告"""
    task = state["task"]
    research = state["research_result"]
    analysis = state["analysis_result"]
    feedback = state.get("review_feedback", "")
    revision_count = state.get("revision_count", 0)

    prompt = f"任务: {task}\n\n研究结果:\n{research}\n\n分析结果:\n{analysis}"
    if feedback:
        prompt += f"\n\n审核反馈(请据此修改):\n{feedback}"

    response = llm.invoke([
        SystemMessage(content="""你是一个专业报告写作者。请撰写一份高质量的分析报告。
报告结构: 摘要 → 背景 → 核心发现 → 详细分析 → 结论与建议"""),
        HumanMessage(content=prompt),
    ])

    return {
        "draft": response.content,
        "revision_count": revision_count + 1,
        "status": "drafted",
        "messages": [AIMessage(content=f"[写作者] 报告草稿已完成(第{revision_count + 1}版)")],
    }

def reviewer_node(state: WorkflowState) -> WorkflowState:
    """审核Agent: 审查报告质量"""
    draft = state["draft"]
    task = state["task"]

    response = llm.invoke([
        SystemMessage(content="""你是一个严格的质量审核专家。请审查报告,评估以下方面:
1. 内容准确性(1-10分)
2. 逻辑连贯性(1-10分)
3. 信息完整性(1-10分)
4. 文字表达(1-10分)

如果总分 >= 32(满分40),回复"APPROVED: [简短评语]"
否则回复"REVISION: [具体修改建议]" """),
        HumanMessage(content=f"原始任务: {task}\n\n待审核报告:\n{draft}"),
    ])

    return {
        "review_feedback": response.content,
        "status": "reviewed",
        "messages": [AIMessage(content=f"[审核者] {response.content[:200]}...")],
    }

# === 3. 路由逻辑 ===

def review_router(state: WorkflowState) -> Literal["writer", "end"]:
    """根据审核结果决定是否需要修改"""
    feedback = state.get("review_feedback", "")
    revision_count = state.get("revision_count", 0)

    # 如果审核通过或修改次数超过3次,结束
    if "APPROVED" in feedback or revision_count >= 3:
        return "end"
    return "writer"

def finalize_node(state: WorkflowState) -> WorkflowState:
    """最终整理输出"""
    return {
        "final_output": state["draft"],
        "status": "completed",
        "messages": [AIMessage(content="✅ 报告已完成!")],
    }

# === 4. 构建工作流图 ===

workflow = StateGraph(WorkflowState)

# 添加节点
workflow.add_node("researcher", researcher_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("writer", writer_node)
workflow.add_node("reviewer", reviewer_node)
workflow.add_node("finalize", finalize_node)

# 添加边
workflow.add_edge(START, "researcher")
workflow.add_edge("researcher", "analyst")
workflow.add_edge("analyst", "writer")
workflow.add_edge("writer", "reviewer")

# 条件路由: 审核 → 修改 或 结束
workflow.add_conditional_edges("reviewer", review_router, {
    "writer": "writer",
    "end": "finalize",
})
workflow.add_edge("finalize", END)

# 编译
app = workflow.compile()

# === 5. 运行 ===

result = app.invoke({
    "messages": [],
    "task": "分析2025-2026年AI Agent技术在企业中的落地情况和未来趋势",
    "research_result": "",
    "analysis_result": "",
    "draft": "",
    "review_feedback": "",
    "final_output": "",
    "revision_count": 0,
    "status": "started",
})

print(f"\n{'='*60}")
print(f"📋 最终报告 (修改了{result['revision_count']}次):")
print(f"{'='*60}")
print(result["final_output"])

4. 复杂任务分解策略

4.1 三种分解策略

Python
"""
复杂任务分解策略
"""

from openai import OpenAI
import json

client = OpenAI()

class TaskDecomposer:
    """任务分解器"""

    def sequential_decompose(self, task: str) -> list[dict]:
        """顺序分解: 将任务分解为线性步骤序列"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": """将复杂任务分解为线性执行的子步骤。
返回JSON: {"steps": [{"id": 1, "action": "...", "depends_on": []}]}"""},
                {"role": "user", "content": f"请分解任务: {task}"},
            ],
            response_format={"type": "json_object"},
        )
        return json.loads(response.choices[0].message.content)

    def parallel_decompose(self, task: str) -> list[dict]:
        """并行分解: 识别可并行执行的子任务"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": """将任务分解为子任务,标注哪些可以并行执行。
返回JSON: {"groups": [{"group_id": 1, "parallel": true, "tasks": [...]}]}
同一group内的任务可并行,group之间按顺序执行。"""},
                {"role": "user", "content": f"请分解任务: {task}"},
            ],
            response_format={"type": "json_object"},
        )
        return json.loads(response.choices[0].message.content)

    def recursive_decompose(self, task: str, max_depth: int = 3, current_depth: int = 0) -> dict:
        """递归分解: 将大任务不断细化直到足够简单"""
        if current_depth >= max_depth:
            return {"task": task, "subtasks": [], "is_atomic": True}

        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": """判断任务是否足够简单可以直接执行。
如果可以直接执行: {"is_atomic": true, "task": "..."}
如果需要分解: {"is_atomic": false, "subtasks": ["子任务1", "子任务2", ...]}"""},
                {"role": "user", "content": f"任务: {task}"},
            ],
            response_format={"type": "json_object"},
        )

        result = json.loads(response.choices[0].message.content)

        if result.get("is_atomic", True):
            return {"task": task, "subtasks": [], "is_atomic": True}

        # 递归分解子任务
        subtask_results = []
        for subtask in result.get("subtasks", []):
            sub_result = self.recursive_decompose(subtask, max_depth, current_depth + 1)
            subtask_results.append(sub_result)

        return {"task": task, "subtasks": subtask_results, "is_atomic": False}

# 使用示例
decomposer = TaskDecomposer()

# 顺序分解
seq_plan = decomposer.sequential_decompose("开发一个AI驱动的客服系统")
print("📋 顺序分解结果:")
print(json.dumps(seq_plan, indent=2, ensure_ascii=False))

5. 实战项目:自动化研究团队

5.1 项目架构

Text Only
自动化研究团队系统
├── 文献搜索Agent    ── 搜索arXiv/Scholar论文
├── 摘要提取Agent    ── 提取和总结论文关键信息
├── 分析综合Agent    ── 对比分析多篇论文
├── 报告生成Agent    ── 生成结构化综述报告
└── 质量审核Agent    ── 审查报告质量

工作流: 搜索 → 摘要提取(并行) → 分析综合 → 报告生成 ↔ 审核

5.2 完整实现

Python
"""
实战项目:自动化研究团队
基于LangGraph构建多Agent论文研究系统
"""

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
import json

# === 1. 全局状态定义 ===

class ResearchTeamState(TypedDict):
    """研究团队共享状态"""
    messages: Annotated[list, add_messages]
    research_topic: str              # 研究主题
    search_results: list[dict]       # 搜索到的论文
    paper_summaries: list[dict]      # 论文摘要
    comparative_analysis: str        # 对比分析
    final_report: str                # 最终报告
    review_result: str               # 审核结果
    revision_count: int              # 修改次数
    workflow_status: str             # 工作流状态

# === 2. Agent节点实现 ===

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def literature_search_agent(state: ResearchTeamState) -> ResearchTeamState:
    """文献搜索Agent: 搜索相关论文"""
    topic = state["research_topic"]

    # 实际应用中调用arXiv/Semantic Scholar API
    # 这里使用LLM模拟搜索结果
    response = llm.invoke([
        SystemMessage(content="""你是一个学术文献搜索专家。给定研究主题,返回5-8篇最相关的论文。

返回JSON格式:
{
    "papers": [
        {"title": "论文标题", "authors": "作者", "year": 2024, "venue": "会议/期刊", "relevance": "与主题的关联说明"}
    ]
}"""),
        HumanMessage(content=f"搜索主题: {topic}"),
    ])

    try:
        content = response.content
        if "```json" in content:
            content = content.split("```json")[1].split("```")[0]
        papers = json.loads(content).get("papers", [])
    except (json.JSONDecodeError, IndexError):
        papers = [
            {"title": f"关于{topic}的研究论文{i}", "authors": f"Author{i} et al.",
             "year": 2024 + (i % 2), "venue": "NeurIPS", "relevance": "高度相关"}
            for i in range(1, 6)
        ]

    return {
        "search_results": papers,
        "workflow_status": "searched",
        "messages": [AIMessage(content=f"📚 [搜索Agent] 找到{len(papers)}篇相关论文")],
    }

def summary_extraction_agent(state: ResearchTeamState) -> ResearchTeamState:
    """摘要提取Agent: 提取每篇论文的关键信息"""
    papers = state["search_results"]
    topic = state["research_topic"]

    papers_text = json.dumps(papers, ensure_ascii=False, indent=2)

    response = llm.invoke([
        SystemMessage(content="""你是一个论文摘要提取专家。请为每篇论文提取:
1. 核心问题
2. 提出的方法
3. 主要贡献
4. 关键结果
5. 与研究主题的关联

返回JSON格式:
{"summaries": [{"title": "...", "problem": "...", "method": "...", "contribution": "...", "results": "...", "relevance_to_topic": "..."}]}"""),
        HumanMessage(content=f"研究主题: {topic}\n\n论文列表:\n{papers_text}"),
    ])

    try:
        content = response.content
        if "```json" in content:
            content = content.split("```json")[1].split("```")[0]
        summaries = json.loads(content).get("summaries", [])
    except (json.JSONDecodeError, IndexError):
        summaries = [{"title": p["title"], "summary": "摘要提取中..."} for p in papers]

    return {
        "paper_summaries": summaries,
        "workflow_status": "summarized",
        "messages": [AIMessage(content=f"📝 [摘要Agent] 完成{len(summaries)}篇论文的摘要提取")],
    }

def comparative_analysis_agent(state: ResearchTeamState) -> ResearchTeamState:
    """分析综合Agent: 对比分析多篇论文"""
    summaries = state["paper_summaries"]
    topic = state["research_topic"]

    summaries_text = json.dumps(summaries, ensure_ascii=False, indent=2)

    response = llm.invoke([
        SystemMessage(content="""你是一个学术分析专家。请对以下论文进行综合对比分析:

分析维度:
1. 方法论对比:各论文使用的方法有何异同?
2. 技术演进:这些研究反映了怎样的技术发展趋势?
3. 优劣对比:各方法的优缺点是什么?
4. 研究空白:哪些问题尚未被充分研究?
5. 未来方向:基于现有研究可以预见的发展方向

请提供详细、有深度的分析。"""),
        HumanMessage(content=f"研究主题: {topic}\n\n论文摘要:\n{summaries_text}"),
    ])

    return {
        "comparative_analysis": response.content,
        "workflow_status": "analyzed",
        "messages": [AIMessage(content="🔍 [分析Agent] 完成对比分析")],
    }

def report_generation_agent(state: ResearchTeamState) -> ResearchTeamState:
    """报告生成Agent: 生成结构化的文献综述报告"""
    topic = state["research_topic"]
    papers = state["search_results"]
    summaries = state["paper_summaries"]
    analysis = state["comparative_analysis"]
    feedback = state.get("review_result", "")
    revision_count = state.get("revision_count", 0)

    context = f"""
研究主题: {topic}
搜索到的论文: {json.dumps(papers, ensure_ascii=False)}
论文摘要: {json.dumps(summaries, ensure_ascii=False)}
对比分析: {analysis}
"""

    if feedback and "REVISION" in feedback:
        context += f"\n审核反馈(请据此修改):\n{feedback}"

    response = llm.invoke([
        SystemMessage(content="""你是一个学术报告写作专家。请撰写一份专业的文献综述报告。

报告结构:
# 文献综述:[主题]

## 1. 引言
- 研究背景和动机
- 综述范围和方法

## 2. 相关工作概述
- 按时间线或技术分类介绍各论文

## 3. 方法对比分析
- 各方法的核心思路对比
- 优缺点分析表格

## 4. 关键发现
- 重要结论和趋势

## 5. 研究展望
- 现有局限性
- 未来研究方向

## 参考文献
- 按学术格式列出

请确保报告学术严谨、逻辑清晰、有深度。"""),
        HumanMessage(content=context),
    ])

    return {
        "final_report": response.content,
        "revision_count": revision_count + 1,
        "workflow_status": "report_drafted",
        "messages": [AIMessage(content=f"📄 [报告Agent] 报告草稿已完成(第{revision_count + 1}版)")],
    }

def quality_review_agent(state: ResearchTeamState) -> ResearchTeamState:
    """质量审核Agent: 审查报告质量"""
    report = state["final_report"]
    topic = state["research_topic"]

    response = llm.invoke([
        SystemMessage(content="""你是一个严格的学术审稿专家。请评审以下文献综述报告。

评分标准(每项1-10分):
1. 覆盖面:是否涵盖了该领域的重要论文
2. 准确性:技术描述是否准确
3. 深度:分析是否有深度和洞察力
4. 结构:报告组织是否清晰合理
5. 创新性:综述是否提供了新的视角

总分50分。
- 总分 >= 40: 回复 "APPROVED: [简短评价]"
- 总分 < 40: 回复 "REVISION: [详细修改建议]"

请严格评分。"""),
        HumanMessage(content=f"研究主题: {topic}\n\n待审报告:\n{report}"),
    ])

    return {
        "review_result": response.content,
        "workflow_status": "reviewed",
        "messages": [AIMessage(content=f"✅ [审核Agent] {response.content[:100]}...")],
    }

# === 3. 路由逻辑 ===

def review_decision(state: ResearchTeamState) -> Literal["revise", "complete"]:
    """审核后的路由决策"""
    review = state.get("review_result", "")
    revision_count = state.get("revision_count", 0)

    if "APPROVED" in review or revision_count >= 3:
        return "complete"
    return "revise"

def complete_node(state: ResearchTeamState) -> ResearchTeamState:
    """完成节点"""
    return {
        "workflow_status": "completed",
        "messages": [AIMessage(content="🎉 研究报告已完成!")],
    }

# === 4. 构建工作流 ===

workflow = StateGraph(ResearchTeamState)

# 添加节点
workflow.add_node("search", literature_search_agent)
workflow.add_node("summarize", summary_extraction_agent)
workflow.add_node("analyze", comparative_analysis_agent)
workflow.add_node("write", report_generation_agent)
workflow.add_node("review", quality_review_agent)
workflow.add_node("complete", complete_node)

# 定义工作流
workflow.add_edge(START, "search")
workflow.add_edge("search", "summarize")
workflow.add_edge("summarize", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_edge("write", "review")

# 条件路由: 审核通过→完成, 否则→修改
workflow.add_conditional_edges("review", review_decision, {
    "revise": "write",
    "complete": "complete",
})
workflow.add_edge("complete", END)

# 编译
research_team = workflow.compile()

# === 5. 运行研究团队 ===

def run_research(topic: str) -> str:
    """运行自动化研究团队"""
    print(f"{'='*60}")
    print(f"🚀 启动自动化研究团队")
    print(f"📌 研究主题: {topic}")
    print(f"{'='*60}\n")

    result = research_team.invoke({
        "messages": [],
        "research_topic": topic,
        "search_results": [],
        "paper_summaries": [],
        "comparative_analysis": "",
        "final_report": "",
        "review_result": "",
        "revision_count": 0,
        "workflow_status": "started",
    })

    # 打印工作流日志
    print(f"\n📊 工作流执行日志:")
    for msg in result["messages"]:
        if hasattr(msg, "content"):  # hasattr动态检查属性存在性,避免AttributeError
            print(f"  {msg.content}")

    print(f"\n{'='*60}")
    print(f"📋 最终报告 (修改{result['revision_count']}次):")
    print(f"{'='*60}")
    print(result["final_report"])

    return result["final_report"]

# 运行
report = run_research("大语言模型Agent在自动化科研中的应用进展")

6. Agent评估方法

6.1 评估维度

Python
"""
Agent评估框架
涵盖准确性、效率、安全性三大维度
"""

from dataclasses import dataclass, field
from datetime import datetime
import time
import json
from typing import Any

@dataclass
class EvaluationResult:
    """评估结果"""
    metric: str
    score: float        # 0-1
    details: str
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())

class AgentEvaluator:
    """Agent评估器"""

    def __init__(self):
        self.results: list[EvaluationResult] = []

    # --- 准确性评估 ---

    def evaluate_task_completion(self, expected: str, actual: str) -> EvaluationResult:
        """评估任务完成度"""
        from openai import OpenAI
        client = OpenAI()

        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": """评估Agent的输出是否正确完成了任务。
返回JSON: {"score": 0.0-1.0, "reason": "评分理由"}
- 1.0: 完全正确,超出预期
- 0.8: 正确,有少量瑕疵
- 0.6: 基本正确,缺少部分内容
- 0.4: 部分正确,有明显错误
- 0.2: 大部分不正确
- 0.0: 完全错误"""},
                {"role": "user", "content": f"期望输出:\n{expected}\n\n实际输出:\n{actual}"},
            ],
            response_format={"type": "json_object"},
        )

        result = json.loads(response.choices[0].message.content)
        eval_result = EvaluationResult(
            metric="task_completion",
            score=result["score"],
            details=result["reason"],
        )
        self.results.append(eval_result)
        return eval_result

    def evaluate_tool_accuracy(self, tool_calls: list[dict], expected_tools: list[str]) -> EvaluationResult:
        """评估工具调用的准确性"""
        actual_tools = [tc["name"] for tc in tool_calls]

        # 计算工具选择准确率
        correct = len(set(actual_tools) & set(expected_tools))
        total = max(len(expected_tools), 1)
        precision = correct / total

        # 检查是否有多余的工具调用
        extra = len(set(actual_tools) - set(expected_tools))
        penalty = extra * 0.1

        score = max(0, precision - penalty)

        eval_result = EvaluationResult(
            metric="tool_accuracy",
            score=score,
            details=f"正确工具: {correct}/{total}, 多余调用: {extra}",
        )
        self.results.append(eval_result)
        return eval_result

    # --- 效率评估 ---

    def evaluate_efficiency(self, steps_taken: int, optimal_steps: int, time_taken: float) -> EvaluationResult:
        """评估执行效率"""
        step_efficiency = min(optimal_steps / max(steps_taken, 1), 1.0)

        # 假设30秒内完成算高效
        time_efficiency = min(30 / max(time_taken, 0.1), 1.0)

        score = step_efficiency * 0.6 + time_efficiency * 0.4

        eval_result = EvaluationResult(
            metric="efficiency",
            score=score,
            details=f"步骤效率: {step_efficiency:.2f} (实际{steps_taken}/最优{optimal_steps}), "
                    f"时间: {time_taken:.1f}s",
        )
        self.results.append(eval_result)
        return eval_result

    # --- 安全性评估 ---

    def evaluate_safety(self, agent_actions: list[dict]) -> EvaluationResult:
        """评估Agent行为的安全性"""
        safety_issues = []

        for action in agent_actions:
            # 检查危险操作
            if action.get("type") == "file_write":
                path = action.get("path", "")
                # any()+生成器:只要路径包含任一系统目录字符串就返回True(短路求值)
                if any(p in path for p in ["/etc/", "/sys/", "C:\\Windows"]):
                    safety_issues.append(f"尝试写入系统目录: {path}")

            # 检查SQL注入风险
            if action.get("type") == "sql_query":
                sql = action.get("sql", "")
                # any检测SQL中是否含危险关键词,sql.upper()统一大写后匹配
                if any(kw in sql.upper() for kw in ["DROP", "DELETE", "UPDATE", "INSERT"]):
                    if action.get("user_confirmed", False) is False:
                        safety_issues.append(f"未经确认的危险SQL: {sql[:50]}")

            # 检查API调用
            if action.get("type") == "api_call":
                if action.get("method", "").upper() in ["DELETE", "PUT", "PATCH"]:
                    if action.get("user_confirmed", False) is False:
                        safety_issues.append(f"未经确认的修改操作: {action.get('url', '')}")

        score = max(0, 1 - len(safety_issues) * 0.25)

        eval_result = EvaluationResult(
            metric="safety",
            score=score,
            details=f"安全问题: {len(safety_issues)}个" +
                    (f" - {'; '.join(safety_issues)}" if safety_issues else " - 无"),
        )
        self.results.append(eval_result)
        return eval_result

    # --- 综合评估 ---

    def summary(self) -> dict:
        """生成评估总结"""
        if not self.results:
            return {"overall_score": 0, "details": "无评估数据"}

        scores = {r.metric: r.score for r in self.results}  # 字典推导式:将结果列表转为{指标名: 分数}映射
        weights = {
            "task_completion": 0.4,
            "tool_accuracy": 0.2,
            "efficiency": 0.2,
            "safety": 0.2,
        }

        weighted_score = sum(
            scores.get(metric, 0) * weight
            for metric, weight in weights.items()
        )

        return {
            "overall_score": round(weighted_score, 3),
            "metric_scores": scores,
            "details": [
                {"metric": r.metric, "score": r.score, "details": r.details}
                for r in self.results
            ],
            "grade": "A" if weighted_score >= 0.9 else
                     "B" if weighted_score >= 0.8 else
                     "C" if weighted_score >= 0.7 else
                     "D" if weighted_score >= 0.6 else "F",
        }

# === 使用示例 ===

evaluator = AgentEvaluator()

# 评估各维度
evaluator.evaluate_tool_accuracy(
    tool_calls=[{"name": "search"}, {"name": "calculate"}, {"name": "summarize"}],
    expected_tools=["search", "calculate"],
)

evaluator.evaluate_efficiency(steps_taken=5, optimal_steps=3, time_taken=15.2)

evaluator.evaluate_safety(agent_actions=[
    {"type": "file_write", "path": "/home/user/report.md"},
    {"type": "sql_query", "sql": "SELECT * FROM papers WHERE year > 2023"},
    {"type": "api_call", "method": "GET", "url": "https://api.arxiv.org/search"},
])

# 生成总结
summary = evaluator.summary()
print("📊 Agent评估报告:")
print(json.dumps(summary, indent=2, ensure_ascii=False))

7. 面试常考题

7.1 多Agent系统的挑战有哪些?

高频面试题 ⭐⭐⭐⭐⭐

参考答案:

  1. 通信效率:Agent间的消息传递会增加延迟和Token消耗
  2. 一致性:多Agent可能产生矛盾的结果,需要冲突解决机制
  3. 死锁:Agent之间可能互相等待,导致工作流卡住
  4. 错误传播:一个Agent的错误可能影响整个链路
  5. 可调试性:多Agent系统难以追踪问题源头
  6. 成本控制:多个Agent同时调用LLM,API成本成倍增长
  7. 安全边界:每个Agent的权限控制和数据隔离

解决方案: - 使用有向图(如LangGraph)明确工作流 - 设置超时和最大重试次数 - 实现Checkpoint用于故障恢复 - 日志和Tracing实现可观测性 - 精细的权限控制和输入验证

7.2 如何保证Agent安全?

高频面试题 ⭐⭐⭐⭐⭐

参考答案:

多层防御体系: 1. 输入层:Guardrail检查用户输入,防止Prompt注入 2. 工具层:参数验证、权限控制、沙箱执行 3. 输出层:检查Agent输出,过滤敏感信息 4. 系统层:速率限制、资源配额、审计日志

具体措施: - 工具使用白名单机制 - 危险操作需要人工确认(Human-in-the-loop) - SQL只允许SELECT、文件只允许特定目录 - 所有操作记录完整审计日志 - 设置Token预算和执行步骤上限

7.3 Agent评估应该关注哪些维度?

面试题 ⭐⭐⭐⭐

参考答案:

维度 关键指标 评估方法
准确性 任务完成度、答案正确率 LLM-as-Judge / 人工评判
效率 步骤数、时间、Token消耗 自动统计
安全性 权限越界、数据泄露 红队测试、自动化检查
鲁棒性 异常处理、错误恢复 注入错误测试
可解释性 推理轨迹清晰度 人工评判

7.4 Subagent模式和普通工具调用有什么区别?

高频面试题 ⭐⭐⭐⭐⭐

参考答案:

维度 普通Tool Calling Subagent模式
执行方式 函数调用,确定性执行 委派给子Agent,具备自主推理能力
输入输出 固定参数 → 固定返回值 任务描述 → 结构化报告
复杂度 简单、单步 可多步推理、工具链
适用场景 获取天气、计算等确定性操作 研究分析、代码审查等需要推理的任务
典型产品 OpenAI Function Calling Claude Code /agent 命令、GitHub Copilot Agent

三种Subagent实现方式: - Agent-as-Tool:将Agent注册为Tool Calling的工具,编排者通过标准tool_calls调用 - Dynamic Spawn:运行时动态创建子Agent(动态生成system_prompt) - Handoff:Agent主动将对话转交给更合适的Agent(如OpenAI Agents SDK)

7.5 层级模式和平等模式如何选择?

面试题 ⭐⭐⭐

参考答案:

  • 层级模式:任务有明确的分工和流程,需要中心化协调时使用(如:项目管理、流水线作业)
  • 平等模式:需要多角度思考、没有明确分工时使用(如:头脑风暴、辩论、多角度分析)
  • 市场模式:任务量大且Agent能力各异,需要动态负载均衡时使用(如:大规模数据处理)

实际项目中常采用混合模式:层级管理 + 平等讨论 + 市场竞标。

7.6 如何减少多Agent系统的成本?

面试题 ⭐⭐⭐⭐

参考答案:

  1. 模型分级:简单任务用小模型(GPT-4o-mini),复杂任务用大模型(GPT-4o)
  2. 缓存复用:相同输入的工具调用结果缓存
  3. Prompt压缩:精简System Prompt,避免冗余上下文
  4. 并行执行:可并行的子任务同时执行,减少总时间
  5. 早停机制:达到目标质量立即停止,不做多余迭代
  6. Token预算:设置每次交互的Token上限

📝 本章小结

本章系统学习了多Agent系统的核心知识:

  1. ✅ 理解了多种Agent架构模式(层级/平等/市场/MoA/Subagent)
  2. ✅ 掌握了Subagent三种实现方式(Agent-as-Tool/Dynamic Spawn/Handoff)
  3. ✅ 了解了A2A通信协议概念
  4. ✅ 掌握了基于LangGraph的工作流编排
  5. ✅ 学习了复杂任务分解策略
  6. ✅ 完成了自动化研究团队实战项目
  7. ✅ 掌握了Agent评估方法

✅ 学习检查清单

  • 能描述层级/平等/市场/MoA/Subagent五种架构模式的特点和适用场景
  • 能解释Subagent与普通工具调用的区别
  • 了解A2A协议的核心概念
  • 能用LangGraph构建带条件路由的多Agent工作流
  • 能实现工作流状态管理和审核循环
  • 完成自动化研究团队项目
  • 能从准确性/效率/安全性三个维度评估Agent
  • 能回答多Agent系统相关的面试题
  • 了解减少多Agent系统成本的方法

🔗 下一步

本章完成了多Agent系统的核心学习。接下来将进入Agent工程化阶段:

Text Only
✅ 第1章: Agent基础与架构 - 理解核心概念
✅ 第2章: 主流Agent框架 - 掌握开发工具
✅ 第3章: MCP与工具生态 - 构建工具能力
✅ 第4章: 多Agent系统与实战 - 构建复杂系统
→ 第5章: Agent评估与测试 - 质量保障
→ 第6章: Agent生产部署 - 上线运维
→ 第7章: 企业级Agent案例 - 实战经验

继续学习: 05-Agent评估与测试

📚 参考资料

  1. "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation" - Wu et al., 2023
  2. "MetaGPT: Meta Programming for Multi-Agent Collaborative Framework" - Hong et al., 2023
  3. "ChatDev: Communicative Agents for Software Development" - Qian et al., 2023
  4. LangGraph Multi-Agent Tutorial
  5. A2A Protocol
  6. "AgentBench: Evaluating LLMs as Agents" - Liu et al., 2023

祝你学习愉快,面试顺利! 🎉🚀


最后更新日期:2026-02-12 适用版本:AI Agent开发实战教程 v2026