跳转至

08. 新一代AI Agent:从工具使用到自主执行

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

📌 定位说明:本章侧重新一代Agent的前沿研究(Manus/Claude Code/Operator等)。Agent编码实战(手写框架、多Agent系统开发)请参考 AI Agent开发实战/

目录

  1. AI Agent演进历程
  2. Manus:通用AI Agent的突破
  3. Claude Code与Claude Cowork
  4. 3.4 Claude Skills:可复用的Agent技能
  5. OpenAI Operator与Computer Use
  6. AI Agent架构设计
  7. 多Agent协作系统
  8. Agent安全与可控性
  9. 实践项目:构建自主研究Agent

1. AI Agent演进历程

1.1 从LLM到Agent的范式转变

AI Agent代表了人工智能从"被动响应"到"主动执行"的根本性转变。

传统LLM的局限性:

Text Only
┌─────────────────────────────────────────────────────────┐
│                    传统LLM交互模式                        │
├─────────────────────────────────────────────────────────┤
│  用户输入 → LLM推理 → 文本输出                           │
│                                                         │
│  局限:                                                  │
│  • 无法执行实际操作                                       │
│  • 缺乏上下文记忆                                        │
│  • 不能访问外部系统                                      │
│  • 单次对话无状态                                        │
└─────────────────────────────────────────────────────────┘

AI Agent的核心能力:

Text Only
┌─────────────────────────────────────────────────────────┐
│                    AI Agent架构                          │
├─────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐ │
│  │  感知层     │ →  │  推理层     │ →  │  执行层     │ │
│  │ (Perception)│    │ (Reasoning) │    │ (Action)    │ │
│  └─────────────┘    └─────────────┘    └─────────────┘ │
│         ↑                                    ↓         │
│         └──────────── 记忆层 ─────────────────┘         │
│                    (Memory)                             │
└─────────────────────────────────────────────────────────┘

1.2 Agent能力分级

级别 名称 能力特征 代表系统
L1 简单工具调用 调用预定义API GPT-3.5 + Function Calling
L2 多步骤规划 分解任务并执行 AutoGPT, LangChain Agents
L3 环境感知 理解并操作数字环境 Claude Computer Use
L4 自主决策 动态目标调整与执行 Manus, Operator
L5 通用智能体 跨域自适应学习 未来目标

1.3 ReAct与Agent核心范式

ReAct (Reasoning + Acting) 是Agent的基础架构:

Python
class ReActAgent:
    """
    ReAct: Synergizing Reasoning and Acting in Language Models

    核心思想:将推理(Reasoning)和行动(Acting)交织进行
    """
    def __init__(self, llm, tools, max_iterations=10):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.max_iterations = max_iterations
        self.memory = []

    def run(self, query: str) -> str:
        """执行ReAct循环"""
        self.memory.append(f"Task: {query}")

        for i in range(self.max_iterations):
            # 1. 思考 (Thought)
            thought = self.think()

            # 2. 行动 (Action)
            action = self.act(thought)

            # 3. 观察 (Observation)
            observation = self.execute_action(action)

            # 4. 更新记忆
            self.memory.extend([
                f"Thought {i+1}: {thought}",
                f"Action {i+1}: {action}",
                f"Observation {i+1}: {observation}"
            ])

            # 5. 检查是否完成
            if self.is_complete(observation):
                return self.generate_answer()

        return "Max iterations reached"

    def think(self) -> str:
        """生成思考过程"""
        prompt = self._build_react_prompt()
        response = self.llm.generate(prompt)
        return self._parse_thought(response)

    def act(self, thought: str) -> Dict:
        """根据思考选择行动"""
        prompt = f"""Based on the thought: {thought}

Available tools: {list(self.tools.keys())}

Choose the next action in format:
Action: [tool_name]
Action Input: [input]"""

        response = self.llm.generate(prompt)
        return self._parse_action(response)

    def execute_action(self, action: Dict) -> str:
        """执行选定的工具"""
        tool_name = action['tool']
        tool_input = action['input']

        if tool_name not in self.tools:
            return f"Error: Tool {tool_name} not found"

        try:  # try/except捕获异常,防止程序崩溃
            result = self.tools[tool_name].run(tool_input)
            return str(result)
        except Exception as e:
            return f"Error: {str(e)}"

2. Manus:通用AI Agent的突破

2.1 Manus概述

Manus(拉丁语"手"的意思)由中国团队Monica.im于2025年3月发布,是首个真正意义上能够自主执行复杂任务的通用AI Agent。

核心突破:

Text Only
┌────────────────────────────────────────────────────────────┐
│                     Manus核心特性                           │
├────────────────────────────────────────────────────────────┤
│                                                            │
│  1. 端到端任务执行                                          │
│     • 从需求理解到成果交付的完整闭环                          │
│     • 支持多步骤、跨平台复杂任务                              │
│                                                            │
│  2. 多Agent协作架构                                         │
│     • 规划Agent + 执行Agent + 验证Agent                      │
│     • 类似软件工程团队的协作模式                              │
│                                                            │
│  3. 虚拟机环境隔离                                          │
│     • 在安全的沙箱环境中执行操作                              │
│     • 支持代码执行、文件操作、网页浏览                         │
│                                                            │
│  4. 自适应学习                                              │
│     • 从执行历史中学习和优化                                  │
│     • 任务完成质量持续提升                                    │
│                                                            │
└────────────────────────────────────────────────────────────┘

2.2 Manus架构解析

Python
class ManusAgent:
    """
    Manus通用AI Agent架构(基于公开信息推断)

    核心组件:
    1. 任务规划器 (Task Planner)
    2. 执行引擎 (Execution Engine)
    3. 工具集成层 (Tool Integration)
    4. 记忆系统 (Memory System)
    5. 验证器 (Validator)
    """

    def __init__(self):
        self.planner = TaskPlanner(llm="claude-3-5-sonnet")
        self.executor = ExecutionEngine()
        self.tool_registry = ToolRegistry()
        self.memory = HierarchicalMemory()
        self.validator = OutputValidator()

    def execute_task(self, user_request: str) -> TaskResult:
        """
        端到端任务执行流程

        Args:
            user_request: 用户的自然语言请求

        Returns:
            TaskResult: 包含执行结果、中间步骤、日志等
        """
        # Phase 1: 任务理解与规划
        task_plan = self.planner.create_plan(user_request)

        # Phase 2: 执行循环
        execution_log = []
        for step in task_plan.steps:
            # 执行步骤
            result = self._execute_step(step)
            execution_log.append(result)

            # 动态重规划(如果需要)
            if result.needs_replanning:
                task_plan = self.planner.replan(task_plan, execution_log)

            # 更新记忆
            self.memory.add_execution_step(step, result)

        # Phase 3: 结果验证与交付
        final_output = self._assemble_output(execution_log)
        validation = self.validator.validate(final_output, user_request)

        return TaskResult(
            output=final_output,
            execution_log=execution_log,
            validation_score=validation.score,
            artifacts=self._collect_artifacts()
        )

    def _execute_step(self, step: ExecutionStep) -> StepResult:
        """执行单个步骤"""
        if step.type == "web_browsing":
            return self._browse_web(step.params)
        elif step.type == "code_execution":
            return self._execute_code(step.params)
        elif step.type == "file_operation":
            return self._operate_file(step.params)
        elif step.type == "api_call":
            return self._call_api(step.params)
        elif step.type == "human_confirmation":
            return self._request_confirmation(step.params)
        else:
            raise ValueError(f"Unknown step type: {step.type}")

class TaskPlanner:
    """任务规划器:将复杂请求分解为可执行步骤"""

    def create_plan(self, request: str) -> TaskPlan:
        """
        创建任务执行计划

        使用Chain-of-Thought进行任务分解
        """
        planning_prompt = f"""You are an expert task planner. Break down the following request into clear, executable steps.

Request: {request}

For each step, specify:
1. Step type (web_browsing, code_execution, file_operation, api_call, human_confirmation)
2. Required parameters
3. Expected output
4. Dependencies on previous steps

Output the plan in structured JSON format."""

        plan_json = self.llm.generate(planning_prompt, format="json")
        return TaskPlan.from_json(plan_json)

    def replan(self, current_plan: TaskPlan, execution_log: List[StepResult]) -> TaskPlan:
        """根据执行反馈动态调整计划"""
        context = self._build_replanning_context(current_plan, execution_log)

        replan_prompt = f"""The current plan needs adjustment based on execution results.

Current Plan: {current_plan.to_json()}
Execution Log: {execution_log}

Please provide an updated plan that:
1. Addresses the issues encountered
2. Incorporates new information discovered
3. Optimizes remaining steps

Output the revised plan."""

        new_plan_json = self.llm.generate(replan_prompt, format="json")
        return TaskPlan.from_json(new_plan_json)

class HierarchicalMemory:
    """分层记忆系统"""

    def __init__(self):
        self.working_memory = []  # 当前任务上下文
        self.episodic_memory = []  # 历史任务经验
        self.semantic_memory = {}  # 知识库

    def add_execution_step(self, step: ExecutionStep, result: StepResult):
        """添加执行步骤到工作记忆"""
        self.working_memory.append({
            "step": step,
            "result": result,
            "timestamp": time.time()
        })

    def get_relevant_context(self, current_step: ExecutionStep) -> str:
        """检索与当前步骤相关的上下文"""
        # 检索工作记忆中的相关步骤
        relevant_working = self._retrieve_from_working_memory(current_step)

        # 检索历史经验中的相似案例
        similar_episodes = self._retrieve_from_episodic_memory(current_step)

        # 检索相关知识
        relevant_knowledge = self._retrieve_from_semantic_memory(current_step)

        return self._format_context(relevant_working, similar_episodes, relevant_knowledge)

2.3 Manus应用场景

Python
# 示例:Manus执行复杂任务的流程

# 场景1:股票分析报告生成
def stock_analysis_example():
    """
    用户请求:"分析特斯拉股票过去一个月的表现,
              生成一份包含技术面和基本面的投资报告"
    """

    manus = ManusAgent()

    result = manus.execute_task("""
    分析特斯拉(TSLA)股票过去一个月的表现,包括:
    1. 股价走势和技术指标分析
    2. 近期新闻和事件影响
    3. 财务数据对比
    4. 生成PDF格式的投资分析报告
    """)

    # Manus的执行流程:
    # 1. 搜索特斯拉股价数据
    # 2. 计算技术指标(MA, RSI, MACD等)
    # 3. 搜索相关新闻
    # 4. 查询财务报表
    # 5. 编写Python代码生成可视化图表
    # 6. 撰写分析报告
    # 7. 转换为PDF格式
    # 8. 交付最终报告

    return result.artifacts['report.pdf']

# 场景2:网站搭建
def website_building_example():
    """
    用户请求:"为我创建一个个人博客网站"
    """

    manus = ManusAgent()

    result = manus.execute_task("""
    创建一个现代化的个人博客网站,要求:
    1. 响应式设计
    2. 支持Markdown文章
    3. 包含关于页面和联系表单
    4. 部署到Vercel
    """)

    # Manus的执行流程:
    # 1. 设计网站架构
    # 2. 创建Next.js项目
    # 3. 编写组件代码
    # 4. 配置样式和主题
    # 5. 实现Markdown渲染
    # 6. 添加联系表单功能
    # 7. 测试和调试
    # 8. 部署到Vercel
    # 9. 提供访问链接

    return result.artifacts['deployment_url']

# 场景3:数据处理流水线
def data_pipeline_example():
    """
    用户请求:"从多个数据源整合销售数据并生成仪表板"
    """

    manus = ManusAgent()

    result = manus.execute_task("""
    构建销售数据整合流水线:
    1. 从CSV文件读取线下销售数据
    2. 通过API获取线上销售数据
    3. 清洗和标准化数据格式
    4. 计算关键指标(销售额、增长率等)
    5. 创建交互式仪表板
    """)

    return result.artifacts['dashboard.html']

2.4 Manus技术亮点

Text Only
┌─────────────────────────────────────────────────────────────┐
│                    Manus技术创新点                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 异步执行与并行处理                                       │
│     ┌─────────────────────────────────────────────────┐    │
│     │  Task A ──┐                                     │    │
│     │           ├──→ [并行执行引擎] → 结果合并          │    │
│     │  Task B ──┘                                     │    │
│     └─────────────────────────────────────────────────┘    │
│                                                             │
│  2. 智能错误恢复                                            │
│     • 自动检测执行失败                                      │
│     • 分析错误原因并选择恢复策略                             │
│     • 支持重试、替代方案、人工介入                           │
│                                                             │
│  3. 多模态输入处理                                          │
│     • 文本、图片、文档理解                                  │
│     • 从截图中提取信息                                      │
│     • 生成可视化输出                                        │
│                                                             │
│  4. 持续学习机制                                            │
│     • 记录成功执行模式                                      │
│     • 建立任务-策略映射                                     │
│     • 个性化执行风格                                        │
│                                                             │
└─────────────────────────────────────────────────────────────┘

3. Claude Code与Claude Cowork

3.1 Claude Code:AI编程助手

Claude Code是Anthropic于2024年底发布的AI编程工具,代表了代码Agent的先进水平。

核心能力:

Python
class ClaudeCode:
    """
    Claude Code核心架构

    特点:
    1. 深度代码理解
    2. 安全的代码执行环境
    3. 版本控制集成
    4. 上下文感知编辑
    """

    def __init__(self, project_path: str):
        self.project = ProjectContext(project_path)
        self.code_index = CodeIndex(project_path)
        self.tool_executor = SecureToolExecutor()
        self.git_integration = GitIntegration(project_path)

    def process_request(self, user_input: str) -> CodeAction:
        """处理用户编程请求"""

        # 1. 理解意图
        intent = self._understand_intent(user_input)

        # 2. 收集上下文
        context = self._gather_context(intent)

        # 3. 规划行动
        plan = self._plan_actions(intent, context)

        # 4. 执行并验证
        results = []
        for action in plan:
            result = self._execute_action(action)
            results.append(result)

            # 实时反馈
            if action.requires_confirmation:
                self._show_diff_and_confirm(result)

        return self._summarize_results(results)

    def _gather_context(self, intent: Intent) -> Context:
        """智能上下文收集"""
        context = Context()

        # 检索相关文件
        if intent.target_files:
            for file_path in intent.target_files:
                context.add_file(self.project.read_file(file_path))
        else:
            # 语义搜索相关代码
            relevant_files = self.code_index.semantic_search(intent.description)
            for file in relevant_files[:5]:  # Top-5相关文件
                context.add_file(file)

        # 获取项目结构
        context.project_structure = self.project.get_structure()

        # 获取依赖信息
        context.dependencies = self.project.get_dependencies()

        # 获取Git状态
        context.git_status = self.git_integration.get_status()

        return context

    def _plan_actions(self, intent: Intent, context: Context) -> List[CodeAction]:
        """规划代码修改行动"""

        planning_prompt = f"""You are an expert software engineer.

Task: {intent.description}

Context:
{context.to_prompt()}

Plan the necessary code changes. For each change, specify:
1. File path
2. Change type (create, modify, delete, rename)
3. Detailed description of changes
4. Dependencies on other changes

Output as a structured action plan."""

        plan_response = self.llm.generate(planning_prompt, format="json")
        return self._parse_action_plan(plan_response)

class CodeIndex:
    """代码索引系统:支持语义搜索"""

    def __init__(self, project_path: str):
        self.project_path = project_path
        self.index = self._build_index()

    def _build_index(self) -> VectorStore:
        """构建代码向量索引"""
        code_chunks = []

        for file_path in self._get_code_files():
            chunks = self._parse_and_chunk(file_path)
            for chunk in chunks:
                embedding = self.embedder.encode(chunk.content)
                code_chunks.append({
                    "embedding": embedding,
                    "content": chunk.content,
                    "metadata": {
                        "file": file_path,
                        "line_start": chunk.line_start,
                        "line_end": chunk.line_end,
                        "type": chunk.type  # function, class, etc.
                    }
                })

        return VectorStore.from_documents(code_chunks)

    def semantic_search(self, query: str, top_k: int = 10) -> List[CodeChunk]:
        """语义搜索相关代码"""
        query_embedding = self.embedder.encode(query)
        results = self.index.similarity_search(query_embedding, k=top_k)
        return [CodeChunk.from_result(r) for r in results]

3.2 Claude Cowork:面向非技术用户的Agent

Claude Cowork是Anthropic推出的通用Agent产品,面向更广泛的非技术用户群体。

Python
class ClaudeCowork:
    """
    Claude Cowork: 通用AI助手

    目标用户:非技术人员
    核心场景:日常办公、研究、内容创作
    """

    def __init__(self):
        self.browser = BrowserController()
        self.file_manager = FileManager()
        self.document_processor = DocumentProcessor()
        self.calendar = CalendarIntegration()
        self.email = EmailIntegration()

    def assist(self, request: UserRequest) -> AssistanceResult:
        """处理用户协助请求"""

        # 分类请求类型
        request_type = self._classify_request(request)

        if request_type == "research":
            return self._handle_research(request)
        elif request_type == "document_creation":
            return self._handle_document_creation(request)
        elif request_type == "data_analysis":
            return self._handle_data_analysis(request)
        elif request_type == "scheduling":
            return self._handle_scheduling(request)
        elif request_type == "communication":
            return self._handle_communication(request)
        else:
            return self._handle_general(request)

    def _handle_research(self, request: UserRequest) -> ResearchResult:
        """处理研究类请求"""

        # 1. 理解研究主题
        topic = request.extract_topic()

        # 2. 多源信息收集
        sources = []

        # 网页搜索
        search_results = self.browser.search(topic)
        for result in search_results[:10]:
            page_content = self.browser.read_page(result.url)
            sources.append({
                "url": result.url,
                "title": result.title,
                "content": page_content
            })

        # 3. 信息整合与分析
        analysis = self._synthesize_information(sources, topic)

        # 4. 生成报告
        report = self._generate_research_report(analysis)

        return ResearchResult(
            summary=analysis.summary,
            key_findings=analysis.findings,
            sources=sources,
            report=report
        )

    def _handle_document_creation(self, request: UserRequest) -> DocumentResult:
        """处理文档创作请求"""

        # 理解文档需求
        doc_spec = self._parse_document_requirements(request)

        # 收集参考材料
        references = []
        if request.has_attachments:
            for attachment in request.attachments:
                content = self.document_processor.read(attachment)
                references.append(content)

        # 生成文档大纲
        outline = self._generate_outline(doc_spec, references)

        # 逐段生成内容
        sections = []
        for section in outline.sections:
            content = self._write_section(section, references, sections)
            sections.append({
                "heading": section.heading,
                "content": content
            })

        # 格式化和导出
        document = self._format_document(sections, doc_spec.format)

        return DocumentResult(
            document=document,
            outline=outline,
            word_count=sum(len(s["content"]) for s in sections)
        )

3.3 代码Agent的核心挑战

Python
class CodeAgentChallenges:
    """代码Agent面临的关键挑战与解决方案"""

    @staticmethod  # @staticmethod无需实例即可调用
    def challenge_1_context_window():
        """
        挑战1: 大型代码库的上下文限制

        解决方案:
        1. 智能代码检索
        2. 分层摘要
        3. 增量索引
        """

        # 智能检索示例
        def intelligent_retrieval(project, query):
            # 第一层:文件级检索
            relevant_files = file_level_search(project, query)

            # 第二层:函数/类级检索
            code_chunks = []
            for file in relevant_files:
                chunks = chunk_code(file)
                scored_chunks = rank_by_relevance(chunks, query)
                code_chunks.extend(scored_chunks[:3])

            # 第三层:关系扩展
            extended_context = expand_by_relations(code_chunks)

            return extended_context

    @staticmethod
    def challenge_2_code_execution_safety():
        """
        挑战2: 代码执行安全性

        解决方案:
        1. 沙箱环境
        2. 资源限制
        3. 权限控制
        """

        class SecureExecutor:
            def __init__(self):
                self.sandbox = DockerSandbox()
                self.resource_limits = {
                    "cpu": "1 core",
                    "memory": "512MB",
                    "timeout": 30,
                    "network": False
                }

            def execute(self, code: str) -> ExecutionResult:
                # 代码静态分析
                if self._contains_dangerous_operations(code):
                    return ExecutionResult.error("Dangerous operations detected")

                # 在沙箱中执行
                return self.sandbox.run(code, limits=self.resource_limits)

    @staticmethod
    def challenge_3_long_horizon_planning():
        """
        挑战3: 长程规划与执行

        解决方案:
        1. 分层规划
        2. 里程碑检查
        3. 动态调整
        """

        class HierarchicalPlanner:
            def plan(self, goal: str) -> Plan:
                # 高层规划
                high_level = self._create_high_level_plan(goal)

                # 逐层细化
                detailed_plan = Plan()
                for phase in high_level.phases:
                    sub_tasks = self._decompose(phase)
                    detailed_plan.add_phase(phase, sub_tasks)

                return detailed_plan

            def execute_with_checkpoints(self, plan: Plan):
                for phase in plan.phases:
                    for task in phase.tasks:
                        result = self.execute_task(task)

                        # 检查点验证
                        if not self._validate_checkpoint(result):
                            self._handle_deviation(phase, result)

3.4 Claude Skills:可复用的Agent技能

Skills(技能) 是Anthropic于2025年推出的Agent能力复用机制。核心思想是:将Agent成功完成过的工作流程(包括步骤、工具调用序列、上下文模板)保存为可复用的"技能",后续遇到类似任务时一键触发,无需从零推理。

这解决了Agent的一个核心痛点:每次执行相似任务都要从头推理,既浪费Token又不稳定。

Text Only
传统Agent执行同类任务:

第1次: 用户请求 → Agent从头推理 → 多步试错 → 完成 (耗时, 不稳定)
第2次: 类似请求 → Agent从头推理 → 多步试错 → 完成 (重复劳动)
第3次: 类似请求 → Agent从头推理 → ...

引入Skills后:

第1次: 用户请求 → Agent推理 → 完成 → 💾 保存为Skill
第2次: 类似请求 → 匹配已有Skill → 直接执行 ✅ (快速, 稳定)
第3次: 类似请求 → 匹配已有Skill → 直接执行 ✅

Skill的核心结构(概念实现):

Python
"""
Claude Skills 概念实现

Skill = 可复用的Agent工作流模板
每个Skill包含:触发条件、执行步骤、所需工具、上下文模板

⚠️ 注意:Skills 是 Anthropic 产品层面的特性,以下代码展示其核心设计理念,
非官方API。实际使用请参考 Anthropic 官方文档。
"""

from dataclasses import dataclass, field
from datetime import datetime
import json
from typing import Callable

@dataclass  # @dataclass自动生成__init__等方法
class Skill:
    """Agent技能定义"""
    name: str                         # 技能名称
    description: str                  # 技能描述(用于匹配)
    trigger_patterns: list[str]       # 触发条件(关键词/意图描述)
    steps: list[dict]                 # 执行步骤模板
    required_tools: list[str]         # 所需工具
    context_template: str             # 上下文模板(含占位符)
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())
    success_count: int = 0            # 成功执行次数
    avg_tokens: int = 0               # 平均Token消耗

@dataclass
class SkillExecutionResult:
    """技能执行结果"""
    skill_name: str
    success: bool
    output: str
    tokens_used: int
    steps_completed: int

class SkillRegistry:
    """技能注册与管理中心"""

    def __init__(self):
        self.skills: dict[str, Skill] = {}

    def register(self, skill: Skill):
        """注册一个技能"""
        self.skills[skill.name] = skill
        print(f"💾 技能已注册: {skill.name}")

    def match(self, user_request: str) -> Skill | None:
        """根据用户请求匹配最合适的技能

        简化实现:关键词匹配。实际产品会使用embedding相似度。
        """
        best_match = None
        best_score = 0

        for skill in self.skills.values():
            score = sum(
                1 for pattern in skill.trigger_patterns
                if pattern.lower() in user_request.lower()
            )
            if score > best_score:
                best_score = score
                best_match = skill

        return best_match if best_score > 0 else None

    def save(self, filepath: str):
        """持久化保存所有技能"""
        data = {}
        for name, skill in self.skills.items():
            data[name] = {
                "name": skill.name,
                "description": skill.description,
                "trigger_patterns": skill.trigger_patterns,
                "steps": skill.steps,
                "required_tools": skill.required_tools,
                "context_template": skill.context_template,
                "success_count": skill.success_count,
            }
        with open(filepath, "w", encoding="utf-8") as f:  # with自动管理文件关闭
            json.dump(data, f, ensure_ascii=False, indent=2)

class SkillableAgent:
    """支持Skills的Agent"""

    def __init__(self, name: str, skill_registry: SkillRegistry):
        self.name = name
        self.skills = skill_registry

    def run(self, user_request: str) -> str:
        """执行请求:优先匹配已有技能,否则从头推理"""

        # 1. 尝试匹配已有技能
        matched_skill = self.skills.match(user_request)

        if matched_skill:
            print(f"⚡ 匹配到技能: {matched_skill.name} (已成功{matched_skill.success_count}次)")
            return self._execute_skill(matched_skill, user_request)

        # 2. 无匹配 → 从头推理
        print(f"🧠 无匹配技能,从头推理...")
        result = self._reason_from_scratch(user_request)

        # 3. 成功后,询问是否保存为技能(可自动化)
        print(f"💡 提示: 此工作流可保存为技能以供复用")
        return result

    def _execute_skill(self, skill: Skill, user_request: str) -> str:
        """按照技能模板执行"""
        output_parts = []

        for i, step in enumerate(skill.steps):  # enumerate同时获取索引和元素
            action = step["action"]
            # 将模板中的占位符替换为实际输入
            if "{user_input}" in action:
                action = action.replace("{user_input}", user_request)

            print(f"  Step {i+1}: {step.get('description', action)}")
            # 实际执行(简化:这里直接输出步骤描述)
            output_parts.append(f"[{step.get('description', '')}] 完成")

        skill.success_count += 1
        return "\n".join(output_parts)

    def _reason_from_scratch(self, user_request: str) -> str:
        """从头推理(常规Agent流程)"""
        return f"[从头推理完成] {user_request}"

    def learn_skill(self, name: str, description: str, trigger_patterns: list[str],
                    steps: list[dict], required_tools: list[str] = None):
        """从成功执行中学习一个新技能"""
        skill = Skill(
            name=name,
            description=description,
            trigger_patterns=trigger_patterns,
            steps=steps,
            required_tools=required_tools or [],
            context_template="",
        )
        self.skills.register(skill)

# === 使用示例 ===

registry = SkillRegistry()

# 预定义一些技能
registry.register(Skill(
    name="代码审查",
    description="对代码进行全面的安全性、性能、可读性审查",
    trigger_patterns=["代码审查", "review", "检查代码", "代码质量"],
    steps=[
        {"description": "读取目标代码文件", "action": "read_file({user_input})"},
        {"description": "检查安全漏洞", "action": "analyze_security"},
        {"description": "检查性能问题", "action": "analyze_performance"},
        {"description": "检查代码风格", "action": "check_style"},
        {"description": "生成审查报告", "action": "generate_report"},
    ],
    required_tools=["read_file", "code_analyzer", "report_generator"],
))

registry.register(Skill(
    name="周报生成",
    description="根据本周工作记录自动生成周报",
    trigger_patterns=["周报", "weekly report", "本周总结"],
    steps=[
        {"description": "收集本周Git提交记录", "action": "git_log_this_week"},
        {"description": "收集本周会议记录", "action": "get_meeting_notes"},
        {"description": "收集本周任务完成情况", "action": "get_task_status"},
        {"description": "汇总并生成周报", "action": "generate_weekly_report"},
    ],
    required_tools=["git", "calendar", "task_tracker", "document_writer"],
))

# 使用Agent
agent = SkillableAgent("Claude", registry)

# 匹配到已有技能 → 直接执行
print("--- 请求1: 有匹配技能 ---")
result = agent.run("请帮我做一下这个PR的代码审查")
print(f"结果: {result}\n")

# 无匹配技能 → 从头推理
print("--- 请求2: 无匹配技能 ---")
result = agent.run("帮我分析这个数据集的异常值")
print(f"结果: {result}\n")

# 从成功执行中学习新技能
agent.learn_skill(
    name="异常值分析",
    description="分析数据集中的异常值并生成报告",
    trigger_patterns=["异常值", "outlier", "数据异常"],
    steps=[
        {"description": "加载数据集", "action": "load_dataset({user_input})"},
        {"description": "统计描述分析", "action": "descriptive_stats"},
        {"description": "异常值检测(IQR+Z-score)", "action": "detect_outliers"},
        {"description": "可视化异常分布", "action": "plot_outliers"},
        {"description": "生成分析报告", "action": "generate_report"},
    ],
)

# 再次请求 → 匹配到刚学习的技能
print("--- 请求3: 匹配到新学习的技能 ---")
result = agent.run("这批数据有很多异常值,帮我分析一下")
print(f"结果: {result}")

Skills vs 传统Prompt的关键区别

维度 传统Prompt Skills
执行方式 每次从头推理 匹配模板后按步骤执行
稳定性 每次可能不同 高度一致
速度 需要多轮推理 跳过推理直接执行
Token消耗 低(减少推理开销)
可进化 依赖Prompt优化 从成功经验中自动学习
产品形态 Claude对话 Claude Skills面板

📌 Skills的产品意义:Skills让Agent从"每次都是新手"进化为"越用越熟练"。这与人类的技能习得过程类似——从刻意思考(System 2)到自动化执行(System 1)。Claude的Skills面板允许用户查看、编辑、分享技能,让Agent的能力成为可管理的资产。

📖 交叉引用:Agent设计模式基础(Prompt Chaining/Routing等)→ AI Agent开发实战/01-Agent基础与架构 §4;Agent记忆系统如何持久化技能 → AI Agent开发实战/12-Agent记忆系统


4. OpenAI Operator与Computer Use

4.1 OpenAI Operator

Operator是OpenAI于2025年1月发布的AI Agent,能够像人类一样使用计算机界面。

Python
class OpenAIOperator:
    """
    OpenAI Operator架构

    核心能力:
    1. 视觉感知屏幕内容
    2. 理解GUI元素
    3. 执行鼠标/键盘操作
    4. 多步骤任务执行
    """

    def __init__(self):
        self.vision_model = GPT4Vision()
        self.action_model = ActionPredictionModel()
        self.browser = BrowserAutomation()
        self.os_interface = OSInterface()

    def execute_task(self, task_description: str) -> TaskResult:
        """
        执行需要操作计算机界面的任务

        示例任务:
        - "在Amazon上搜索蓝牙耳机并按评分排序"
        - "在Gmail中查找上周的会议邀请并添加到日历"
        """

        max_steps = 50
        step = 0
        state_history = []

        while step < max_steps:
            # 1. 截取当前屏幕
            screenshot = self.os_interface.capture_screen()

            # 2. 视觉理解
            ui_elements = self.vision_model.analyze_ui(screenshot)

            # 3. 决策下一步行动
            action = self.action_model.predict(
                task=task_description,
                current_state=screenshot,
                ui_elements=ui_elements,
                history=state_history
            )

            # 4. 执行行动
            if action.type == "click":
                self.os_interface.click(action.target)
            elif action.type == "type":
                self.os_interface.type(action.text)
            elif action.type == "scroll":
                self.os_interface.scroll(action.direction, action.amount)
            elif action.type == "key":
                self.os_interface.press_key(action.key)
            elif action.type == "complete":
                return TaskResult.success(action.result)

            # 5. 记录状态
            state_history.append({
                "step": step,
                "screenshot": screenshot,
                "action": action,
                "ui_elements": ui_elements
            })

            step += 1
            time.sleep(0.5)  # 等待界面响应

        return TaskResult.incomplete(state_history)

class ComputerUseAgent:
    """
    通用计算机使用Agent

    能够操作:
    - 浏览器
    - 桌面应用
    - 文件系统
    - 命令行
    """

    def __init__(self):
        self.tools = {
            "screenshot": ScreenshotTool(),
            "mouse": MouseControlTool(),
            "keyboard": KeyboardTool(),
            "shell": ShellTool(),
            "browser": BrowserTool()
        }

    def run(self, instruction: str) -> ExecutionTrace:
        """执行需要计算机操作的指令"""

        trace = ExecutionTrace()

        while not self._is_complete():
            # 感知环境
            observation = self._observe()
            trace.add_observation(observation)

            # 推理下一步
            thought = self._reason(instruction, trace)
            trace.add_thought(thought)

            # 选择行动
            action = self._select_action(thought)
            trace.add_action(action)

            # 执行
            result = self._execute(action)
            trace.add_result(result)

        return trace

    def _observe(self) -> Observation:
        """观察当前计算机状态"""
        screenshot = self.tools["screenshot"].capture()

        # 使用视觉模型分析
        analysis = self.vision_model.describe(
            screenshot,
            prompt="Describe the current computer state, including open applications,
                   visible UI elements, and any relevant text content."
        )

        return Observation(
            screenshot=screenshot,
            description=analysis.description,
            ui_elements=analysis.elements
        )

4.2 Computer Use技术实现

Python
class ComputerUseImplementation:
    """Computer Use Agent的技术实现细节"""

    @staticmethod
    def visual_grounding():
        """
        视觉定位:将自然语言指令映射到屏幕坐标
        """

        class VisualGroundingModel:
            """
            基于视觉-语言模型的UI元素定位
            """

            def __init__(self):
                self.model = CLIP()  # 或类似的视觉-语言模型

            def locate_element(self, screenshot: Image, description: str) -> BoundingBox:
                """
                在截图中定位描述的元素

                Args:
                    screenshot: 屏幕截图
                    description: 元素描述,如"搜索按钮"

                Returns:
                    BoundingBox: 元素位置 (x1, y1, x2, y2)
                """
                # 方法1: 使用OCR检测可点击文本
                text_regions = self.ocr.detect(screenshot)

                # 方法2: 使用图标检测模型
                icon_regions = self.icon_detector.detect(screenshot)

                # 方法3: 使用视觉-语言匹配
                all_regions = text_regions + icon_regions

                # 计算与描述的相似度
                best_match = None
                best_score = 0

                for region in all_regions:
                    # 裁剪区域
                    crop = screenshot.crop(region.bbox)

                    # 计算视觉-文本相似度
                    score = self.model.similarity(crop, description)

                    if score > best_score:
                        best_score = score
                        best_match = region

                return best_match.bbox if best_match else None

    @staticmethod
    def action_space_design():
        """
        动作空间设计
        """

        # 定义Agent可以执行的动作
        ACTION_SPACE = {
            # 鼠标操作
            "mouse_move": {
                "description": "移动鼠标到指定坐标",
                "parameters": {"x": int, "y": int}
            },
            "mouse_click": {
                "description": "在指定位置点击",
                "parameters": {"x": int, "y": int, "button": ["left", "right"]}
            },
            "mouse_drag": {
                "description": "拖拽操作",
                "parameters": {"start": (int, int), "end": (int, int)}
            },
            "mouse_scroll": {
                "description": "滚动",
                "parameters": {"direction": ["up", "down"], "amount": int}
            },

            # 键盘操作
            "key_press": {
                "description": "按下按键",
                "parameters": {"key": str}
            },
            "type_text": {
                "description": "输入文本",
                "parameters": {"text": str}
            },
            "hotkey": {
                "description": "组合键",
                "parameters": {"keys": List[str]}
            },

            # 系统操作
            "screenshot": {
                "description": "截取屏幕",
                "parameters": {}
            },
            "wait": {
                "description": "等待界面响应",
                "parameters": {"seconds": float}
            },

            # 任务控制
            "complete": {
                "description": "标记任务完成",
                "parameters": {"result": str}
            },
            "fail": {
                "description": "标记任务失败",
                "parameters": {"reason": str}
            }
        }

        return ACTION_SPACE

    @staticmethod
    def safety_mechanisms():
        """
        Computer Use的安全机制
        """

        class SafetyGuardrails:
            def __init__(self):
                self.restricted_actions = [
                    "delete_system_files",
                    "modify_system_settings",
                    "access_sensitive_data"
                ]

                self.confirmation_required = [
                    "make_purchase",
                    "send_email",
                    "delete_files",
                    "modify_permissions"
                ]

            def check_action(self, action: Action) -> SafetyResult:
                """检查行动是否安全"""

                # 检查是否在禁止列表
                if action.type in self.restricted_actions:
                    return SafetyResult.reject("Action not allowed")

                # 检查是否需要确认
                if action.type in self.confirmation_required:
                    return SafetyResult.require_confirmation(
                        f"This action will {action.description}. Proceed?"
                    )

                # 检查异常模式
                if self._detect_suspicious_pattern(action):
                    return SafetyResult.require_review("Suspicious pattern detected")

                return SafetyResult.allow()

            def _detect_suspicious_pattern(self, action: Action) -> bool:
                """检测可疑操作模式"""
                # 实现各种安全检查逻辑
                pass

4.3 浏览器自动化

Python
class BrowserAutomation:
    """浏览器自动化实现"""

    def __init__(self):
        self.driver = PlaywrightDriver()
        self.page = None

    async def navigate(self, url: str):  # async def定义协程函数
        """导航到指定URL"""
        self.page = await self.driver.new_page()  # await等待异步操作完成
        await self.page.goto(url)

    async def search(self, query: str, engine: str = "google"):
        """在搜索引擎中搜索"""
        if engine == "google":
            await self.navigate("https://www.google.com")
            await self.page.fill('input[name="q"]', query)
            await self.page.press('input[name="q"]', "Enter")

        # 等待结果加载
        await self.page.wait_for_selector("#search")

    async def fill_form(self, form_data: Dict[str, str]):
        """填写表单"""
        for field, value in form_data.items():
            # 尝试多种选择器策略
            selectors = [
                f'input[name="{field}"]',
                f'input[id="{field}"]',
                f'input[placeholder*="{field}"]',
                f'textarea[name="{field}"]'
            ]

            for selector in selectors:
                try:
                    await self.page.fill(selector, value)
                    break
                except:
                    continue

    async def extract_data(self, extraction_rules: List[Rule]) -> List[Dict]:
        """根据规则提取网页数据"""
        results = []

        for rule in extraction_rules:
            elements = await self.page.query_selector_all(rule.selector)

            for element in elements:
                item = {}
                for field, field_rule in rule.fields.items():
                    value = await element.eval_on_selector(
                        field_rule.selector,
                        "el => el.textContent"
                    )
                    item[field] = value.strip() if value else None  # 链式调用:strip去除空白

                results.append(item)

        return results

    async def monitor_changes(self, selector: str, callback):
        """监控页面元素变化"""
        await self.page.evaluate(f"""
            new MutationObserver((mutations) => {{
                window.__pageChanges = window.__pageChanges || [];
                window.__pageChanges.push(mutations);
            }}).observe(
                document.querySelector('{selector}'),
                {{ childList: true, subtree: true }}
            );
        """)

5. AI Agent架构设计

5.1 分层Agent架构

Text Only
┌─────────────────────────────────────────────────────────────────┐
│                      AI Agent分层架构                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    应用层 (Application)                  │   │
│  │  • 任务定义    • 用户交互    • 结果展示                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    规划层 (Planning)                     │   │
│  │  • 目标分解    • 策略选择    • 动态重规划                 │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    推理层 (Reasoning)                    │   │
│  │  • 逻辑推理    • 因果分析    • 假设验证                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    执行层 (Execution)                    │   │
│  │  • 工具调用    • API请求    • 代码执行                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    感知层 (Perception)                   │   │
│  │  • 文本理解    • 图像识别    • 语音处理                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    记忆层 (Memory)                       │   │
│  │  • 工作记忆    • 长期记忆    • 知识检索                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

5.2 核心组件实现

Python
class AgentCoreArchitecture:
    """Agent核心架构实现"""

    class PerceptionModule:
        """感知模块"""

        def __init__(self):
            self.text_processor = TextProcessor()
            self.vision_processor = VisionProcessor()
            self.audio_processor = AudioProcessor()

        def process(self, input_data: Union[str, Image, Audio]) -> Perception:
            """统一感知处理入口"""
            if isinstance(input_data, str):  # isinstance检查类型
                return self._process_text(input_data)
            elif isinstance(input_data, Image):
                return self._process_image(input_data)
            elif isinstance(input_data, Audio):
                return self._process_audio(input_data)
            else:
                raise ValueError(f"Unsupported input type: {type(input_data)}")

        def _process_text(self, text: str) -> TextPerception:
            """文本感知处理"""
            # 意图识别
            intent = self.text_processor.classify_intent(text)

            # 实体提取
            entities = self.text_processor.extract_entities(text)

            # 情感分析
            sentiment = self.text_processor.analyze_sentiment(text)

            return TextPerception(
                raw_text=text,
                intent=intent,
                entities=entities,
                sentiment=sentiment
            )

    class MemoryModule:
        """记忆模块"""

        def __init__(self, vector_store: VectorStore):
            self.working_memory = WorkingMemory(capacity=7)  # 米勒定律
            self.episodic_memory = EpisodicMemory(vector_store)
            self.semantic_memory = SemanticMemory(vector_store)
            self.procedural_memory = ProceduralMemory()

        def store(self, experience: Experience):
            """存储经验"""
            # 工作记忆
            self.working_memory.add(experience)

            # 情节记忆(长期)
            if experience.is_significant:
                self.episodic_memory.store(experience)

            # 提取知识到语义记忆
            knowledge = self._extract_knowledge(experience)
            self.semantic_memory.store(knowledge)

            # 更新程序性记忆(技能)
            if experience.type == "skill_execution":
                self.procedural_memory.update(experience)

        def retrieve(self, query: str, context: Context) -> RetrievedInfo:
            """检索相关信息"""
            # 多路检索
            working_results = self.working_memory.search(query)
            episodic_results = self.episodic_memory.search(query, top_k=5)
            semantic_results = self.semantic_memory.search(query, top_k=5)
            procedural_results = self.procedural_memory.match(context)

            # 融合排序
            fused = self._fusion_rank(
                working_results,
                episodic_results,
                semantic_results,
                procedural_results
            )

            return fused

    class PlanningModule:
        """规划模块"""

        def __init__(self, llm):
            self.llm = llm
            self.plan_library = PlanLibrary()

        def create_plan(self, goal: Goal, context: Context) -> Plan:
            """创建执行计划"""

            # 尝试从计划库匹配
            similar_plan = self.plan_library.find_similar(goal)
            if similar_plan and similar_plan.success_rate > 0.8:
                return self._adapt_plan(similar_plan, context)

            # 否则生成新计划
            return self._generate_plan(goal, context)

        def _generate_plan(self, goal: Goal, context: Context) -> Plan:
            """使用LLM生成计划"""

            prompt = f"""Create a step-by-step plan to achieve the following goal:

Goal: {goal.description}

Context:
{context.to_prompt()}

Available tools: {context.available_tools}

Generate a detailed plan with:
1. Sequential steps
2. Each step's expected outcome
3. Dependencies between steps
4. Potential failure points and alternatives

Output as structured JSON."""

            plan_json = self.llm.generate(prompt, format="json")
            return Plan.from_json(plan_json)

        def replan(self, current_plan: Plan, failure: Failure) -> Plan:
            """失败后的重规划"""

            prompt = f"""The current plan failed. Create a revised plan.

Original Plan: {current_plan.to_json()}
Failure: {failure.description}
Current State: {failure.state}

Provide a new plan that addresses the failure."""

            new_plan_json = self.llm.generate(prompt, format="json")
            return Plan.from_json(new_plan_json)

    class ToolModule:
        """工具模块"""

        def __init__(self):
            self.tools: Dict[str, Tool] = {}
            self.tool_descriptions = []

        def register(self, tool: Tool):
            """注册工具"""
            self.tools[tool.name] = tool
            self.tool_descriptions.append(tool.get_description())

        def select_and_execute(self, intent: Intent, context: Context) -> ToolResult:
            """选择并执行合适的工具"""

            # 工具选择
            tool_name = self._select_tool(intent, context)
            tool = self.tools[tool_name]

            # 参数提取
            parameters = self._extract_parameters(intent, tool)

            # 执行
            try:
                result = tool.execute(**parameters)  # **parameters将字典解包为关键字参数,实现动态参数传递
                return ToolResult.success(result)
            except Exception as e:
                return ToolResult.failure(str(e))

        def _select_tool(self, intent: Intent, context: Context) -> str:
            """基于意图选择工具"""

            prompt = f"""Given the user intent and available tools, select the most appropriate tool.

Intent: {intent.description}
Available Tools:
{self._format_tool_descriptions()}

Select the best tool and explain why."""

            selection = self.llm.generate(prompt)
            return self._parse_tool_selection(selection)

5.3 Agent通信协议

Python
class AgentCommunicationProtocol:
    """多Agent系统的通信协议"""

    class Message:
        """Agent间消息格式"""

        def __init__(
            self,
            sender: str,
            receiver: str,
            message_type: str,
            content: Dict,
            conversation_id: str,
            timestamp: float = None
        ):
            self.sender = sender
            self.receiver = receiver
            self.message_type = message_type  # request, response, inform, delegate
            self.content = content
            self.conversation_id = conversation_id
            self.timestamp = timestamp or time.time()

    class CommunicationBus:
        """Agent通信总线"""

        def __init__(self):
            self.channels: Dict[str, asyncio.Queue] = {}
            self.subscribers: Dict[str, List[str]] = {}

        def register_agent(self, agent_id: str):
            """注册Agent到通信总线"""
            self.channels[agent_id] = asyncio.Queue()

        async def send(self, message: Message):
            """发送消息"""
            if message.receiver not in self.channels:
                raise ValueError(f"Unknown receiver: {message.receiver}")

            await self.channels[message.receiver].put(message)

        async def receive(self, agent_id: str, timeout: float = None) -> Message:
            """接收消息"""
            if agent_id not in self.channels:
                raise ValueError(f"Unknown agent: {agent_id}")

            try:
                return await asyncio.wait_for(
                    self.channels[agent_id].get(),
                    timeout=timeout
                )
            except asyncio.TimeoutError:
                return None

        def subscribe(self, agent_id: str, topic: str):
            """订阅主题"""
            if topic not in self.subscribers:
                self.subscribers[topic] = []
            self.subscribers[topic].append(agent_id)

        async def publish(self, topic: str, message: Message):
            """发布到主题"""
            if topic in self.subscribers:
                for subscriber in self.subscribers[topic]:
                    await self.send(message.copy(receiver=subscriber))

6. 多Agent协作系统

6.1 协作模式

Text Only
┌─────────────────────────────────────────────────────────────────┐
│                     多Agent协作模式                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. 层级式协作 (Hierarchical)                                    │
│                                                                 │
│         ┌───────────┐                                           │
│         │  Manager  │                                           │
│         └─────┬─────┘                                           │
│       ┌───────┼───────┐                                         │
│       ↓       ↓       ↓                                         │
│    ┌────┐  ┌────┐  ┌────┐                                       │
│    │ A1 │  │ A2 │  │ A3 │                                       │
│    └────┘  └────┘  └────┘                                       │
│                                                                 │
│  2. 对等协作 (Peer-to-Peer)                                      │
│                                                                 │
│         ┌─────┐                                                 │
│         │  A1 │←────────→┌─────┐                                │
│         └──┬──┘          │  A2 │                                │
│            ↓             └──┬──┘                                │
│         ┌─────┐             ↓                                   │
│         │  A3 │←────────→┌─────┐                                │
│         └─────┘          │  A4 │                                │
│                          └─────┘                                │
│                                                                 │
│  3. 市场式协作 (Market-based)                                    │
│                                                                 │
│     ┌─────────────────────────────────────┐                    │
│     │           任务拍卖市场               │                    │
│     │  Task X ──→ [Bid: A1:$5, A2:$8] ──→ A1 wins              │
│     │  Task Y ──→ [Bid: A2:$3, A3:$7] ──→ A2 wins              │
│     └─────────────────────────────────────┘                    │
│                                                                 │
│  4. 流水线协作 (Pipeline)                                        │
│                                                                 │
│    Input → [A1:提取] → [A2:分析] → [A3:生成] → Output            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

6.2 多Agent系统实现

Python
class MultiAgentSystem:
    """多Agent协作系统"""

    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.coordinator = Coordinator()
        self.communication_bus = CommunicationBus()

    def register_agent(self, agent: Agent):
        """注册Agent"""
        self.agents[agent.id] = agent
        self.communication_bus.register_agent(agent.id)
        agent.set_communication_bus(self.communication_bus)

    async def execute_collaborative_task(
        self,
        task: ComplexTask,
        collaboration_mode: str = "hierarchical"
    ) -> CollaborativeResult:
        """执行协作任务"""

        if collaboration_mode == "hierarchical":
            return await self._hierarchical_execution(task)
        elif collaboration_mode == "peer":
            return await self._peer_execution(task)
        elif collaboration_mode == "market":
            return await self._market_execution(task)
        else:
            raise ValueError(f"Unknown collaboration mode: {collaboration_mode}")

    async def _hierarchical_execution(self, task: ComplexTask) -> CollaborativeResult:
        """层级式协作执行"""

        # 1. 选择Manager Agent
        manager = self.coordinator.select_manager(task)

        # 2. Manager分解任务
        subtasks = manager.decompose_task(task)

        # 3. 分配子任务
        assignments = self.coordinator.assign_subtasks(subtasks, self.agents)

        # 4. 并行执行
        results = await asyncio.gather(*[  # 并发执行多个协程任务
            self.agents[agent_id].execute(subtask)
            for subtask, agent_id in assignments.items()
        ])

        # 5. Manager整合结果
        final_result = manager.integrate_results(results)

        return CollaborativeResult(
            output=final_result,
            execution_trace=results,
            coordination_log=self.coordinator.get_log()
        )

    async def _market_execution(self, task: ComplexTask) -> CollaborativeResult:
        """市场式协作执行(基于拍卖)"""

        # 1. 分解任务
        subtasks = self.coordinator.decompose(task)

        # 2. 拍卖分配
        assignments = {}
        for subtask in subtasks:
            # 收集投标
            bids = []
            for agent_id, agent in self.agents.items():
                if agent.can_handle(subtask):
                    cost = agent.estimate_cost(subtask)
                    bids.append((agent_id, cost))

            # 选择最低出价
            if bids:
                winner = min(bids, key=lambda x: x[1])  # lambda匿名函数
                assignments[subtask.id] = winner[0]

        # 3. 执行
        results = await self._execute_assignments(assignments)

        return CollaborativeResult(
            output=self._aggregate_results(results),
            assignments=assignments,
            bids=bids
        )

class Coordinator:
    """协调器:负责任务分配和冲突解决"""

    def __init__(self):
        self.task_queue = PriorityQueue()
        self.agent_status = {}

    def decompose_task(self, task: ComplexTask) -> List[SubTask]:
        """将复杂任务分解为子任务"""

        prompt = f"""Decompose the following complex task into manageable subtasks:

Task: {task.description}
Requirements: {task.requirements}

Provide a list of subtasks with:
1. Subtask description
2. Dependencies on other subtasks
3. Estimated complexity
4. Required capabilities

Output as JSON."""

        decomposition = self.llm.generate(prompt, format="json")
        return [SubTask.from_dict(s) for s in decomposition["subtasks"]]

    def assign_subtasks(
        self,
        subtasks: List[SubTask],
        agents: Dict[str, Agent]
    ) -> Dict[str, str]:
        """将子任务分配给合适的Agent"""

        assignments = {}

        for subtask in subtasks:
            # 计算每个Agent的匹配分数
            scores = {}
            for agent_id, agent in agents.items():
                if agent.is_available():
                    score = self._compute_match_score(agent, subtask)
                    scores[agent_id] = score

            # 选择最佳匹配
            if scores:
                best_agent = max(scores, key=scores.get)
                assignments[subtask.id] = best_agent

        return assignments

    def _compute_match_score(self, agent: Agent, subtask: SubTask) -> float:
        """计算Agent与子任务的匹配分数"""

        score = 0.0

        # 能力匹配
        capability_match = len(
            set(agent.capabilities) & set(subtask.required_capabilities)
        ) / len(subtask.required_capabilities)
        score += capability_match * 0.4

        # 历史表现
        if subtask.type in agent.performance_history:
            score += agent.performance_history[subtask.type] * 0.3

        # 当前负载
        load_factor = 1.0 - (agent.current_load / agent.max_capacity)
        score += load_factor * 0.2

        # 通信开销
        if subtask.dependencies:
            # 优先分配给协作Agent
            score += 0.1

        return score

    def resolve_conflict(self, conflict: Conflict) -> Resolution:
        """解决Agent间冲突"""

        if conflict.type == "resource_contention":
            # 资源竞争:优先级或时间片分配
            return self._resolve_resource_conflict(conflict)

        elif conflict.type == "goal_conflict":
            # 目标冲突:协商或上级裁决
            return self._resolve_goal_conflict(conflict)

        elif conflict.type == "communication_failure":
            # 通信失败:重试或替代路由
            return self._resolve_communication_failure(conflict)

        else:
            raise ValueError(f"Unknown conflict type: {conflict.type}")

6.3 Agent团队示例

Python
class ResearchTeam:
    """研究团队:多Agent协作示例"""

    def __init__(self):
        self.system = MultiAgentSystem()

        # 创建专业Agent
        self.planner = PlannerAgent("planner")
        self.researcher = ResearchAgent("researcher")
        self.analyst = AnalystAgent("analyst")
        self.writer = WriterAgent("writer")
        self.reviewer = ReviewerAgent("reviewer")

        # 注册到系统
        for agent in [self.planner, self.researcher, self.analyst, self.writer, self.reviewer]:
            self.system.register_agent(agent)

    async def conduct_research(self, topic: str) -> ResearchReport:
        """执行协作研究"""

        # Phase 1: 规划
        plan = await self.planner.create_research_plan(topic)

        # Phase 2: 信息收集(并行)
        search_tasks = [
            self.researcher.search_academic(topic),
            self.researcher.search_news(topic),
            self.researcher.search_web(topic)
        ]
        search_results = await asyncio.gather(*search_tasks)

        # Phase 3: 分析
        analysis = await self.analyst.analyze(search_results)

        # Phase 4: 撰写
        draft = await self.writer.write_report(analysis)

        # Phase 5: 审阅和修订
        review = await self.reviewer.review(draft)
        final_report = await self.writer.revise(draft, review)

        return final_report

class PlannerAgent(Agent):
    """规划Agent"""

    async def create_research_plan(self, topic: str) -> ResearchPlan:
        """创建研究计划"""

        # 理解研究范围
        scope = self._determine_scope(topic)

        # 识别关键问题
        key_questions = self._identify_questions(topic, scope)

        # 规划研究步骤
        steps = []
        for question in key_questions:
            steps.append(ResearchStep(
                question=question,
                sources=self._identify_sources(question),
                methods=self._select_methods(question)
            ))

        return ResearchPlan(
            topic=topic,
            scope=scope,
            steps=steps,
            timeline=self._estimate_timeline(steps)
        )

class ResearchAgent(Agent):
    """研究Agent"""

    async def search_academic(self, topic: str) -> List[Paper]:
        """搜索学术论文"""
        # 使用Google Scholar API、arXiv API等
        pass

    async def search_news(self, topic: str) -> List[NewsArticle]:
        """搜索新闻"""
        # 使用News API
        pass

    async def search_web(self, topic: str) -> List[WebPage]:
        """搜索网页"""
        # 使用搜索引擎API
        pass

class AnalystAgent(Agent):
    """分析Agent"""

    async def analyze(self, sources: List[Source]) -> Analysis:
        """分析收集的信息"""

        # 信息整合
        integrated = self._integrate_information(sources)

        # 模式识别
        patterns = self._identify_patterns(integrated)

        # 趋势分析
        trends = self._analyze_trends(integrated)

        # 差距识别
        gaps = self._identify_gaps(integrated)

        return Analysis(
            summary=integrated,
            patterns=patterns,
            trends=trends,
            gaps=gaps
        )

7. Agent安全与可控性

7.1 安全架构

Text Only
┌─────────────────────────────────────────────────────────────────┐
│                     Agent安全架构                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    应用层安全                            │   │
│  │  • 输入验证    • 输出过滤    • 敏感信息检测               │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    行为层安全                            │   │
│  │  • 动作审查    • 权限控制    • 异常检测                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    执行层安全                            │   │
│  │  • 沙箱隔离    • 资源限制    • 审计日志                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                              ↓                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    基础设施安全                          │   │
│  │  • 网络安全    • 数据加密    • 访问控制                   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

7.2 安全机制实现

Python
class AgentSafetyFramework:
    """Agent安全框架"""

    class InputGuardrails:
        """输入安全护栏"""

        def __init__(self):
            self.forbidden_patterns = [
                r"ignore previous instructions",
                r"disregard safety",
                r"bypass security",
                r"execute.*system.*command",
            ]
            self.sensitive_keywords = [
                "password", "secret", "key", "token", "credential"
            ]

        def validate(self, user_input: str) -> ValidationResult:
            """验证用户输入"""

            # 检查注入攻击
            for pattern in self.forbidden_patterns:
                if re.search(pattern, user_input, re.IGNORECASE):  # re.search正则表达式搜索匹配
                    return ValidationResult.reject(
                        "Potential prompt injection detected"
                    )

            # 检查敏感信息泄露请求
            if self._is_sensitive_info_request(user_input):
                return ValidationResult.require_confirmation(
                    "This request may involve sensitive information"
                )

            # 内容安全检查
            safety_score = self._check_content_safety(user_input)
            if safety_score < 0.5:
                return ValidationResult.reject("Content violates safety policy")

            return ValidationResult.allow()

        def _is_sensitive_info_request(self, text: str) -> bool:
            """检测是否为敏感信息请求"""
            text_lower = text.lower()
            return any(keyword in text_lower for keyword in self.sensitive_keywords)  # any()任一为True则返回True

    class ActionGuardrails:
        """行动安全护栏"""

        def __init__(self):
            self.risk_levels = {
                "read_file": "low",
                "write_file": "medium",
                "delete_file": "high",
                "execute_code": "high",
                "network_request": "medium",
                "database_query": "medium",
                "send_email": "high",
                "make_payment": "critical"
            }

        def evaluate(self, action: Action) -> SafetyDecision:
            """评估行动安全性"""

            risk_level = self.risk_levels.get(action.type, "unknown")

            if risk_level == "critical":
                return SafetyDecision.require_human_approval(
                    f"Critical action '{action.type}' requires explicit approval"
                )

            elif risk_level == "high":
                # 检查是否有异常模式
                if self._detect_anomaly(action):
                    return SafetyDecision.require_confirmation()

                # 检查是否超出正常范围
                if not self._is_within_normal_scope(action):
                    return SafetyDecision.require_confirmation()

            elif risk_level == "medium":
                # 记录但允许
                self._log_action(action)

            return SafetyDecision.allow()

        def _detect_anomaly(self, action: Action) -> bool:
            """检测异常行动模式"""
            # 实现异常检测逻辑
            # 例如:短时间内大量删除操作
            pass

    class OutputGuardrails:
        """输出安全护栏"""

        def filter(self, output: str, context: Context) -> FilteredOutput:
            """过滤不安全输出"""

            # 检查敏感信息泄露
            if self._contains_sensitive_data(output):
                output = self._redact_sensitive_data(output)

            # 内容安全过滤
            if self._contains_harmful_content(output):
                return FilteredOutput.block("Harmful content detected")

            # 事实准确性检查(对于关键信息)
            if context.requires_fact_checking:
                accuracy = self._check_factual_accuracy(output)
                if accuracy < 0.8:
                    output = self._add_disclaimer(output)

            return FilteredOutput.allow(output)

    class SandboxedExecution:
        """沙箱执行环境"""

        def __init__(self):
            self.docker_client = docker.from_env()

        def execute(self, code: str, timeout: int = 30) -> ExecutionResult:
            """在沙箱中执行代码"""

            # 创建临时容器
            container = self.docker_client.containers.run(
                "python:3.9-slim",
                command=f"python -c '{code}'",
                detach=True,
                mem_limit="512m",
                cpu_quota=100000,  # 1 CPU
                network_mode="none",  # 禁用网络
                read_only=True,  # 只读文件系统
                security_opt=["no-new-privileges"]
            )

            try:
                result = container.wait(timeout=timeout)
                logs = container.logs().decode("utf-8")

                return ExecutionResult(
                    success=result["StatusCode"] == 0,
                    output=logs,
                    exit_code=result["StatusCode"]
                )
            finally:
                container.remove(force=True)

class HumanInTheLoop:
    """人在回路机制"""

    def __init__(self, approval_modes: Dict[str, str]):
        """
        Args:
            approval_modes: 不同风险级别的审批模式
                - "auto": 自动执行
                - "confirm": 需要确认
                - "approve": 需要明确批准
        """
        self.approval_modes = approval_modes

    async def request_approval(
        self,
        action: Action,
        context: Context
    ) -> ApprovalResult:
        """请求人类批准"""

        risk_level = self._assess_risk(action)
        mode = self.approval_modes.get(risk_level, "approve")

        if mode == "auto":
            return ApprovalResult.approved()

        elif mode == "confirm":
            # 发送通知,等待确认
            notification = self._create_notification(action, context)
            response = await self._send_and_wait(notification, timeout=60)

            if response and response.confirmed:
                return ApprovalResult.approved()
            else:
                return ApprovalResult.denied("Not confirmed")

        elif mode == "approve":
            # 需要显式批准
            request = self._create_approval_request(action, context)
            response = await self._send_and_wait(request, timeout=300)

            if response and response.approved:
                return ApprovalResult.approved(response.conditions)
            else:
                return ApprovalResult.denied(response.reason if response else "Timeout")

7.3 可解释性与审计

Python
class AgentExplainability:
    """Agent可解释性框架"""

    class DecisionTracer:
        """决策追踪器"""

        def __init__(self):
            self.trace = DecisionTrace()

        def log_decision(
            self,
            decision_point: str,
            context: Dict,
            reasoning: str,
            decision: str,
            alternatives: List[str]
        ):
            """记录决策过程"""

            self.trace.add_step(DecisionStep(
                timestamp=time.time(),
                decision_point=decision_point,
                context=context,
                reasoning=reasoning,
                decision=decision,
                alternatives_considered=alternatives,
                confidence=self._calculate_confidence(reasoning)
            ))

        def generate_explanation(self, detail_level: str = "summary") -> str:
            """生成决策解释"""

            if detail_level == "summary":
                return self._generate_summary()
            elif detail_level == "detailed":
                return self._generate_detailed_explanation()
            elif detail_level == "technical":
                return self._generate_technical_explanation()
            else:
                raise ValueError(f"Unknown detail level: {detail_level}")

    class AuditLogger:
        """审计日志系统"""

        def __init__(self, storage_backend):
            self.storage = storage_backend

        def log_interaction(self, interaction: AgentInteraction):
            """记录Agent交互"""

            audit_record = {
                "timestamp": interaction.timestamp,
                "session_id": interaction.session_id,
                "user_id": interaction.user_id,
                "input": self._sanitize(interaction.input),
                "actions_taken": [
                    {
                        "action": action.type,
                        "parameters": self._sanitize(action.parameters),
                        "result": action.result,
                        "timestamp": action.timestamp
                    }
                    for action in interaction.actions
                ],
                "output": self._sanitize(interaction.output),
                "safety_checks": [
                    {
                        "check_type": check.type,
                        "result": check.result,
                        "timestamp": check.timestamp
                    }
                    for check in interaction.safety_checks
                ]
            }

            self.storage.store(audit_record)

        def query_history(
            self,
            user_id: str = None,
            time_range: Tuple[datetime, datetime] = None,
            action_type: str = None
        ) -> List[AuditRecord]:
            """查询历史记录"""

            filters = {}
            if user_id:
                filters["user_id"] = user_id
            if time_range:
                filters["timestamp"] = {"$gte": time_range[0], "$lte": time_range[1]}
            if action_type:
                filters["actions_taken.action"] = action_type

            return self.storage.query(filters)

8. 实践项目:构建自主研究Agent

8.1 项目概述

构建一个能够自主完成研究任务的AI Agent,具备以下能力: 1. 理解研究主题 2. 搜索和收集信息 3. 分析和综合信息 4. 生成研究报告

8.2 完整实现

Python
# autonomous_research_agent.py

import asyncio  # Python标准异步库
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class ResearchConfig:
    """研究Agent配置"""
    max_search_results: int = 10
    max_analysis_depth: int = 3
    output_format: str = "markdown"
    include_citations: bool = True
    fact_check: bool = True

class AutonomousResearchAgent:
    """
    自主研究Agent

    功能:
    1. 主题分析与问题生成
    2. 多源信息检索
    3. 信息可信度评估
    4. 综合分析与报告生成
    """

    def __init__(self, llm_client, search_client, config: ResearchConfig = None):
        self.llm = llm_client
        self.search = search_client
        self.config = config or ResearchConfig()

        # 子模块
        self.topic_analyzer = TopicAnalyzer(llm_client)
        self.information_retriever = InformationRetriever(search_client)
        self.credibility_assessor = CredibilityAssessor()
        self.synthesizer = InformationSynthesizer(llm_client)
        self.report_generator = ReportGenerator(llm_client)

    async def conduct_research(self, topic: str) -> ResearchReport:
        """
        执行完整的研究流程

        Args:
            topic: 研究主题

        Returns:
            ResearchReport: 研究报告
        """
        print(f"🔬 开始研究主题: {topic}")

        # Phase 1: 主题分析
        print("📊 Phase 1: 分析研究主题...")
        research_questions = await self.topic_analyzer.analyze(topic)
        print(f"   生成 {len(research_questions)} 个研究问题")

        # Phase 2: 信息检索
        print("🔍 Phase 2: 检索相关信息...")
        all_sources = []
        for question in research_questions:
            sources = await self.information_retriever.retrieve(
                question,
                max_results=self.config.max_search_results
            )
            all_sources.extend(sources)
        print(f"   收集到 {len(all_sources)} 个信息源")

        # Phase 3: 可信度评估
        print("✅ Phase 3: 评估信息可信度...")
        assessed_sources = []
        for source in all_sources:
            assessment = self.credibility_assessor.assess(source)
            if assessment.score > 0.6:  # 过滤低可信度来源
                assessed_sources.append((source, assessment))
        assessed_sources.sort(key=lambda x: x[1].score, reverse=True)
        print(f"   保留 {len(assessed_sources)} 个高可信度来源")

        # Phase 4: 信息综合
        print("🧩 Phase 4: 综合信息...")
        synthesis = await self.synthesizer.synthesize(
            topic,
            assessed_sources,
            depth=self.config.max_analysis_depth
        )
        print("   信息综合完成")

        # Phase 5: 生成报告
        print("📝 Phase 5: 生成研究报告...")
        report = await self.report_generator.generate(
            topic=topic,
            synthesis=synthesis,
            sources=assessed_sources,
            config=self.config
        )
        print("✨ 研究完成!")

        return report

class TopicAnalyzer:
    """主题分析器"""

    def __init__(self, llm):
        self.llm = llm

    async def analyze(self, topic: str) -> List[ResearchQuestion]:
        """分析主题并生成研究问题"""

        prompt = f"""Analyze the following research topic and generate specific research questions.

Topic: {topic}

Generate 5-7 specific research questions that:
1. Cover different aspects of the topic
2. Are answerable through research
3. Build upon each other logically
4. Range from factual to analytical

Output as JSON list with fields: question, type (factual/analytical/comparative), priority (1-5)"""

        response = await self.llm.generate(prompt, format="json")
        questions = json.loads(response)  # json.loads将JSON字符串→Python对象

        return [ResearchQuestion(**q) for q in questions]  # **q将字典解包为关键字参数构造dataclass

class InformationRetriever:
    """信息检索器"""

    def __init__(self, search_client):
        self.search = search_client

    async def retrieve(
        self,
        query: ResearchQuestion,
        max_results: int = 10
    ) -> List[InformationSource]:
        """检索相关信息"""

        sources = []

        # 网页搜索
        web_results = await self.search.web_search(
            query.question,
            num_results=max_results // 2
        )
        for result in web_results:
            content = await self._fetch_content(result.url)
            sources.append(InformationSource(
                type="web",
                title=result.title,
                url=result.url,
                content=content,
                timestamp=datetime.now()
            ))

        # 学术搜索
        academic_results = await self.search.academic_search(
            query.question,
            num_results=max_results // 2
        )
        for result in academic_results:
            sources.append(InformationSource(
                type="academic",
                title=result.title,
                url=result.url,
                content=result.abstract,
                authors=result.authors,
                publication_date=result.date,
                timestamp=datetime.now()
            ))

        return sources

    async def _fetch_content(self, url: str) -> str:
        """获取网页内容"""
        # 实现网页内容抓取
        pass

class CredibilityAssessor:
    """可信度评估器"""

    def assess(self, source: InformationSource) -> CredibilityAssessment:
        """评估信息源可信度"""

        scores = {
            "domain_authority": self._assess_domain(source),
            "content_quality": self._assess_content(source),
            "recency": self._assess_recency(source),
            "citations": self._assess_citations(source)
        }

        # 加权平均
        weights = {
            "domain_authority": 0.3,
            "content_quality": 0.3,
            "recency": 0.2,
            "citations": 0.2
        }

        overall_score = sum(scores[k] * weights[k] for k in scores)

        return CredibilityAssessment(
            score=overall_score,
            breakdown=scores,
            flags=self._identify_red_flags(source)
        )

    def _assess_domain(self, source: InformationSource) -> float:
        """评估域名权威性"""
        trusted_domains = {
            ".edu": 0.9,
            ".gov": 0.9,
            "wikipedia.org": 0.7,
            "arxiv.org": 0.85,
            "ieee.org": 0.85
        }

        for domain, score in trusted_domains.items():
            if domain in source.url:
                return score

        return 0.5  # 默认分数

    def _assess_content(self, source: InformationSource) -> float:
        """评估内容质量"""
        # 基于内容长度、结构、语言质量等评估
        content = source.content

        score = 0.5

        # 长度检查
        if len(content) > 1000:
            score += 0.1

        # 结构化检查
        if any(marker in content for marker in ["##", "###", "Introduction", "Conclusion"]):
            score += 0.1

        # 引用检查
        if "http" in content or "Source:" in content:
            score += 0.1

        return min(score, 1.0)

    def _assess_recency(self, source: InformationSource) -> float:
        """评估时效性"""
        if not source.publication_date:
            return 0.5

        age_days = (datetime.now() - source.publication_date).days

        if age_days < 30:
            return 1.0
        elif age_days < 365:
            return 0.8
        elif age_days < 365 * 3:
            return 0.6
        else:
            return 0.4

    def _assess_citations(self, source: InformationSource) -> float:
        """评估引用情况"""
        if source.type == "academic":
            return 0.8  # 学术来源默认较高

        # 检查内容中的引用
        content = source.content
        citation_markers = ["[1]", "[2]", "Source:", "According to"]
        citation_count = sum(1 for marker in citation_markers if marker in content)

        return min(0.5 + citation_count * 0.1, 1.0)

class InformationSynthesizer:
    """信息综合器"""

    def __init__(self, llm):
        self.llm = llm

    async def synthesize(
        self,
        topic: str,
        sources: List[tuple],
        depth: int = 3
    ) -> Synthesis:
        """综合多个信息源"""

        # 提取关键信息
        key_points = await self._extract_key_points(sources)

        # 识别共识与分歧
        consensus, disagreements = self._identify_agreements_and_conflicts(key_points)

        # 构建论证结构
        arguments = await self._build_arguments(key_points, depth)

        # 识别知识缺口
        gaps = self._identify_knowledge_gaps(topic, key_points)

        return Synthesis(
            topic=topic,
            key_findings=key_points,
            consensus_areas=consensus,
            contested_areas=disagreements,
            argument_structure=arguments,
            knowledge_gaps=gaps
        )

    async def _extract_key_points(
        self,
        sources: List[tuple]
    ) -> List[KeyPoint]:
        """从来源中提取关键信息点"""

        all_content = "\n\n".join([
            f"Source {i+1} (credibility: {assessment.score}): {source.content}"
            for i, (source, assessment) in enumerate(sources[:5])  # Top 5 sources
        ])

        prompt = f"""Extract key information points from the following sources.

Sources:
{all_content}

Extract 10-15 key points. For each point:
1. State the fact/claim clearly
2. Note which sources support it
3. Assess confidence level (high/medium/low)

Output as JSON list."""

        response = await self.llm.generate(prompt, format="json")
        points = json.loads(response)

        return [KeyPoint(**p) for p in points]

class ReportGenerator:
    """报告生成器"""

    def __init__(self, llm):
        self.llm = llm

    async def generate(
        self,
        topic: str,
        synthesis: Synthesis,
        sources: List[tuple],
        config: ResearchConfig
    ) -> ResearchReport:
        """生成研究报告"""

        # 生成大纲
        outline = await self._generate_outline(topic, synthesis)

        # 逐节生成
        sections = []
        for section in outline.sections:
            content = await self._write_section(section, synthesis, config)
            sections.append(ReportSection(
                title=section.title,
                content=content,
                level=section.level
            ))

        # 添加引用
        if config.include_citations:
            references = self._format_references(sources)
        else:
            references = []

        # 组装报告
        report_content = self._assemble_report(sections, references, config)

        return ResearchReport(
            title=f"Research Report: {topic}",
            content=report_content,
            sections=sections,
            sources_used=len(sources),
            generation_date=datetime.now(),
            metadata={
                "topic": topic,
                "config": config,
                "synthesis_summary": synthesis.summary()
            }
        )

    def _assemble_report(
        self,
        sections: List[ReportSection],
        references: List[str],
        config: ResearchConfig
    ) -> str:
        """组装最终报告"""

        if config.output_format == "markdown":
            return self._to_markdown(sections, references)
        elif config.output_format == "html":
            return self._to_html(sections, references)
        else:
            return self._to_text(sections, references)

    def _to_markdown(
        self,
        sections: List[ReportSection],
        references: List[str]
    ) -> str:
        """生成Markdown格式报告"""

        lines = []

        # 标题
        lines.append(f"# {sections[0].title if sections else 'Research Report'}")
        lines.append(f"\n*Generated on {datetime.now().strftime('%Y-%m-%d')}*\n")

        # 内容
        for section in sections:
            prefix = "#" * section.level
            lines.append(f"\n{prefix} {section.title}\n")
            lines.append(section.content)

        # 引用
        if references:
            lines.append("\n## References\n")
            for i, ref in enumerate(references, 1):
                lines.append(f"{i}. {ref}")

        return "\n".join(lines)

# 数据类定义
@dataclass
class ResearchQuestion:
    question: str
    type: str
    priority: int

@dataclass
class InformationSource:
    type: str
    title: str
    url: str
    content: str
    timestamp: datetime
    authors: List[str] = None
    publication_date: datetime = None

@dataclass
class CredibilityAssessment:
    score: float
    breakdown: Dict[str, float]
    flags: List[str]

@dataclass
class KeyPoint:
    statement: str
    supporting_sources: List[int]
    confidence: str

@dataclass
class Synthesis:
    topic: str
    key_findings: List[KeyPoint]
    consensus_areas: List[str]
    contested_areas: List[str]
    argument_structure: Dict
    knowledge_gaps: List[str]

    def summary(self) -> str:
        return f"Synthesis of {len(self.key_findings)} key points on '{self.topic}'"

@dataclass
class ReportSection:
    title: str
    content: str
    level: int

@dataclass
class ResearchReport:
    title: str
    content: str
    sections: List[ReportSection]
    sources_used: int
    generation_date: datetime
    metadata: Dict

# 使用示例
async def main():
    """主函数"""

    # 初始化组件(需要实际的LLM和搜索客户端)
    # llm_client = ...
    # search_client = ...

    # 创建Agent
    # agent = AutonomousResearchAgent(llm_client, search_client)

    # 执行研究
    # report = await agent.conduct_research("Large Language Models in Healthcare")

    # 保存报告
    # with open("research_report.md", "w", encoding="utf-8") as f:
    #     f.write(report.content)

    print("自主研究Agent框架已加载")
    print("使用方法:")
    print("  agent = AutonomousResearchAgent(llm_client, search_client)")
    print("  report = await agent.conduct_research('你的研究主题')")

if __name__ == "__main__":
    asyncio.run(main())  # 创建事件循环运行顶层协程

8.3 运行与扩展

Bash
# 安装依赖
pip install aiohttp beautifulsoup4 python-dotenv

# 配置API密钥
export OPENAI_API_KEY="your-key"
export SERPER_API_KEY="your-search-key"

# 运行Agent
python autonomous_research_agent.py

扩展方向: 1. 添加更多数据源(数据库、API等) 2. 实现多语言支持 3. 添加可视化报告生成功能 4. 集成事实核查API 5. 支持协作研究模式


总结

新一代AI Agent正在从简单的工具调用向自主执行复杂任务的智能体演进。关键发展趋势包括:

  1. 端到端自主性:从Manus到Operator,Agent能够独立完成从理解需求到交付成果的完整流程
  2. 多模态感知:视觉、听觉等多模态能力的整合使Agent能够操作真实世界界面
  3. 多Agent协作:复杂任务需要多个专业Agent协作完成(Subagent/编排者模式)
  4. 技能复用与进化:Claude Skills等机制让Agent从Agent“每次从头推理”进化为“越用越熟练”
  5. 安全可控:随着Agent能力增强,安全护栏和人在回路机制变得至关重要

未来,AI Agent将成为人类工作和生活的智能伙伴,在保持人类监督和价值观对齐的前提下,大幅提升生产力和创造力。


参考资源

论文

  • ReAct: Synergizing Reasoning and Acting in Language Models (Yao et al., 2022)
  • Reflexion: Self-Reflective Agents (Shinn et al., 2023)
  • Voyager: An Open-Ended Embodied Agent with Large Language Models (Wang et al., 2023)

项目

产品


文档版本: 1.0 作者: AI Learning Team


最后更新日期:2026-02-12 适用版本:LLM学习教程 v2026