08. 新一代AI Agent:从工具使用到自主执行¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
📌 定位说明:本章侧重新一代Agent的前沿研究(Manus/Claude Code/Operator等)。Agent编码实战(手写框架、多Agent系统开发)请参考 AI Agent开发实战/。
目录¶
- AI Agent演进历程
- Manus:通用AI Agent的突破
- Claude Code与Claude Cowork
- 3.4 Claude Skills:可复用的Agent技能
- OpenAI Operator与Computer Use
- AI Agent架构设计
- 多Agent协作系统
- Agent安全与可控性
- 实践项目:构建自主研究Agent
1. AI Agent演进历程¶
1.1 从LLM到Agent的范式转变¶
AI Agent代表了人工智能从"被动响应"到"主动执行"的根本性转变。
传统LLM的局限性:
┌─────────────────────────────────────────────────────────┐
│ 传统LLM交互模式 │
├─────────────────────────────────────────────────────────┤
│ 用户输入 → LLM推理 → 文本输出 │
│ │
│ 局限: │
│ • 无法执行实际操作 │
│ • 缺乏上下文记忆 │
│ • 不能访问外部系统 │
│ • 单次对话无状态 │
└─────────────────────────────────────────────────────────┘
AI Agent的核心能力:
┌─────────────────────────────────────────────────────────┐
│ AI Agent架构 │
├─────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 感知层 │ → │ 推理层 │ → │ 执行层 │ │
│ │ (Perception)│ │ (Reasoning) │ │ (Action) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↑ ↓ │
│ └──────────── 记忆层 ─────────────────┘ │
│ (Memory) │
└─────────────────────────────────────────────────────────┘
1.2 Agent能力分级¶
| 级别 | 名称 | 能力特征 | 代表系统 |
|---|---|---|---|
| L1 | 简单工具调用 | 调用预定义API | GPT-3.5 + Function Calling |
| L2 | 多步骤规划 | 分解任务并执行 | AutoGPT, LangChain Agents |
| L3 | 环境感知 | 理解并操作数字环境 | Claude Computer Use |
| L4 | 自主决策 | 动态目标调整与执行 | Manus, Operator |
| L5 | 通用智能体 | 跨域自适应学习 | 未来目标 |
1.3 ReAct与Agent核心范式¶
ReAct (Reasoning + Acting) 是Agent的基础架构:
class ReActAgent:
"""
ReAct: Synergizing Reasoning and Acting in Language Models
核心思想:将推理(Reasoning)和行动(Acting)交织进行
"""
def __init__(self, llm, tools, max_iterations=10):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = max_iterations
self.memory = []
def run(self, query: str) -> str:
"""执行ReAct循环"""
self.memory.append(f"Task: {query}")
for i in range(self.max_iterations):
# 1. 思考 (Thought)
thought = self.think()
# 2. 行动 (Action)
action = self.act(thought)
# 3. 观察 (Observation)
observation = self.execute_action(action)
# 4. 更新记忆
self.memory.extend([
f"Thought {i+1}: {thought}",
f"Action {i+1}: {action}",
f"Observation {i+1}: {observation}"
])
# 5. 检查是否完成
if self.is_complete(observation):
return self.generate_answer()
return "Max iterations reached"
def think(self) -> str:
"""生成思考过程"""
prompt = self._build_react_prompt()
response = self.llm.generate(prompt)
return self._parse_thought(response)
def act(self, thought: str) -> Dict:
"""根据思考选择行动"""
prompt = f"""Based on the thought: {thought}
Available tools: {list(self.tools.keys())}
Choose the next action in format:
Action: [tool_name]
Action Input: [input]"""
response = self.llm.generate(prompt)
return self._parse_action(response)
def execute_action(self, action: Dict) -> str:
"""执行选定的工具"""
tool_name = action['tool']
tool_input = action['input']
if tool_name not in self.tools:
return f"Error: Tool {tool_name} not found"
try: # try/except捕获异常,防止程序崩溃
result = self.tools[tool_name].run(tool_input)
return str(result)
except Exception as e:
return f"Error: {str(e)}"
2. Manus:通用AI Agent的突破¶
2.1 Manus概述¶
Manus(拉丁语"手"的意思)由中国团队Monica.im于2025年3月发布,是首个真正意义上能够自主执行复杂任务的通用AI Agent。
核心突破:
┌────────────────────────────────────────────────────────────┐
│ Manus核心特性 │
├────────────────────────────────────────────────────────────┤
│ │
│ 1. 端到端任务执行 │
│ • 从需求理解到成果交付的完整闭环 │
│ • 支持多步骤、跨平台复杂任务 │
│ │
│ 2. 多Agent协作架构 │
│ • 规划Agent + 执行Agent + 验证Agent │
│ • 类似软件工程团队的协作模式 │
│ │
│ 3. 虚拟机环境隔离 │
│ • 在安全的沙箱环境中执行操作 │
│ • 支持代码执行、文件操作、网页浏览 │
│ │
│ 4. 自适应学习 │
│ • 从执行历史中学习和优化 │
│ • 任务完成质量持续提升 │
│ │
└────────────────────────────────────────────────────────────┘
2.2 Manus架构解析¶
class ManusAgent:
"""
Manus通用AI Agent架构(基于公开信息推断)
核心组件:
1. 任务规划器 (Task Planner)
2. 执行引擎 (Execution Engine)
3. 工具集成层 (Tool Integration)
4. 记忆系统 (Memory System)
5. 验证器 (Validator)
"""
def __init__(self):
self.planner = TaskPlanner(llm="claude-3-5-sonnet")
self.executor = ExecutionEngine()
self.tool_registry = ToolRegistry()
self.memory = HierarchicalMemory()
self.validator = OutputValidator()
def execute_task(self, user_request: str) -> TaskResult:
"""
端到端任务执行流程
Args:
user_request: 用户的自然语言请求
Returns:
TaskResult: 包含执行结果、中间步骤、日志等
"""
# Phase 1: 任务理解与规划
task_plan = self.planner.create_plan(user_request)
# Phase 2: 执行循环
execution_log = []
for step in task_plan.steps:
# 执行步骤
result = self._execute_step(step)
execution_log.append(result)
# 动态重规划(如果需要)
if result.needs_replanning:
task_plan = self.planner.replan(task_plan, execution_log)
# 更新记忆
self.memory.add_execution_step(step, result)
# Phase 3: 结果验证与交付
final_output = self._assemble_output(execution_log)
validation = self.validator.validate(final_output, user_request)
return TaskResult(
output=final_output,
execution_log=execution_log,
validation_score=validation.score,
artifacts=self._collect_artifacts()
)
def _execute_step(self, step: ExecutionStep) -> StepResult:
"""执行单个步骤"""
if step.type == "web_browsing":
return self._browse_web(step.params)
elif step.type == "code_execution":
return self._execute_code(step.params)
elif step.type == "file_operation":
return self._operate_file(step.params)
elif step.type == "api_call":
return self._call_api(step.params)
elif step.type == "human_confirmation":
return self._request_confirmation(step.params)
else:
raise ValueError(f"Unknown step type: {step.type}")
class TaskPlanner:
"""任务规划器:将复杂请求分解为可执行步骤"""
def create_plan(self, request: str) -> TaskPlan:
"""
创建任务执行计划
使用Chain-of-Thought进行任务分解
"""
planning_prompt = f"""You are an expert task planner. Break down the following request into clear, executable steps.
Request: {request}
For each step, specify:
1. Step type (web_browsing, code_execution, file_operation, api_call, human_confirmation)
2. Required parameters
3. Expected output
4. Dependencies on previous steps
Output the plan in structured JSON format."""
plan_json = self.llm.generate(planning_prompt, format="json")
return TaskPlan.from_json(plan_json)
def replan(self, current_plan: TaskPlan, execution_log: List[StepResult]) -> TaskPlan:
"""根据执行反馈动态调整计划"""
context = self._build_replanning_context(current_plan, execution_log)
replan_prompt = f"""The current plan needs adjustment based on execution results.
Current Plan: {current_plan.to_json()}
Execution Log: {execution_log}
Please provide an updated plan that:
1. Addresses the issues encountered
2. Incorporates new information discovered
3. Optimizes remaining steps
Output the revised plan."""
new_plan_json = self.llm.generate(replan_prompt, format="json")
return TaskPlan.from_json(new_plan_json)
class HierarchicalMemory:
"""分层记忆系统"""
def __init__(self):
self.working_memory = [] # 当前任务上下文
self.episodic_memory = [] # 历史任务经验
self.semantic_memory = {} # 知识库
def add_execution_step(self, step: ExecutionStep, result: StepResult):
"""添加执行步骤到工作记忆"""
self.working_memory.append({
"step": step,
"result": result,
"timestamp": time.time()
})
def get_relevant_context(self, current_step: ExecutionStep) -> str:
"""检索与当前步骤相关的上下文"""
# 检索工作记忆中的相关步骤
relevant_working = self._retrieve_from_working_memory(current_step)
# 检索历史经验中的相似案例
similar_episodes = self._retrieve_from_episodic_memory(current_step)
# 检索相关知识
relevant_knowledge = self._retrieve_from_semantic_memory(current_step)
return self._format_context(relevant_working, similar_episodes, relevant_knowledge)
2.3 Manus应用场景¶
# 示例:Manus执行复杂任务的流程
# 场景1:股票分析报告生成
def stock_analysis_example():
"""
用户请求:"分析特斯拉股票过去一个月的表现,
生成一份包含技术面和基本面的投资报告"
"""
manus = ManusAgent()
result = manus.execute_task("""
分析特斯拉(TSLA)股票过去一个月的表现,包括:
1. 股价走势和技术指标分析
2. 近期新闻和事件影响
3. 财务数据对比
4. 生成PDF格式的投资分析报告
""")
# Manus的执行流程:
# 1. 搜索特斯拉股价数据
# 2. 计算技术指标(MA, RSI, MACD等)
# 3. 搜索相关新闻
# 4. 查询财务报表
# 5. 编写Python代码生成可视化图表
# 6. 撰写分析报告
# 7. 转换为PDF格式
# 8. 交付最终报告
return result.artifacts['report.pdf']
# 场景2:网站搭建
def website_building_example():
"""
用户请求:"为我创建一个个人博客网站"
"""
manus = ManusAgent()
result = manus.execute_task("""
创建一个现代化的个人博客网站,要求:
1. 响应式设计
2. 支持Markdown文章
3. 包含关于页面和联系表单
4. 部署到Vercel
""")
# Manus的执行流程:
# 1. 设计网站架构
# 2. 创建Next.js项目
# 3. 编写组件代码
# 4. 配置样式和主题
# 5. 实现Markdown渲染
# 6. 添加联系表单功能
# 7. 测试和调试
# 8. 部署到Vercel
# 9. 提供访问链接
return result.artifacts['deployment_url']
# 场景3:数据处理流水线
def data_pipeline_example():
"""
用户请求:"从多个数据源整合销售数据并生成仪表板"
"""
manus = ManusAgent()
result = manus.execute_task("""
构建销售数据整合流水线:
1. 从CSV文件读取线下销售数据
2. 通过API获取线上销售数据
3. 清洗和标准化数据格式
4. 计算关键指标(销售额、增长率等)
5. 创建交互式仪表板
""")
return result.artifacts['dashboard.html']
2.4 Manus技术亮点¶
┌─────────────────────────────────────────────────────────────┐
│ Manus技术创新点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 异步执行与并行处理 │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Task A ──┐ │ │
│ │ ├──→ [并行执行引擎] → 结果合并 │ │
│ │ Task B ──┘ │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ 2. 智能错误恢复 │
│ • 自动检测执行失败 │
│ • 分析错误原因并选择恢复策略 │
│ • 支持重试、替代方案、人工介入 │
│ │
│ 3. 多模态输入处理 │
│ • 文本、图片、文档理解 │
│ • 从截图中提取信息 │
│ • 生成可视化输出 │
│ │
│ 4. 持续学习机制 │
│ • 记录成功执行模式 │
│ • 建立任务-策略映射 │
│ • 个性化执行风格 │
│ │
└─────────────────────────────────────────────────────────────┘
3. Claude Code与Claude Cowork¶
3.1 Claude Code:AI编程助手¶
Claude Code是Anthropic于2024年底发布的AI编程工具,代表了代码Agent的先进水平。
核心能力:
class ClaudeCode:
"""
Claude Code核心架构
特点:
1. 深度代码理解
2. 安全的代码执行环境
3. 版本控制集成
4. 上下文感知编辑
"""
def __init__(self, project_path: str):
self.project = ProjectContext(project_path)
self.code_index = CodeIndex(project_path)
self.tool_executor = SecureToolExecutor()
self.git_integration = GitIntegration(project_path)
def process_request(self, user_input: str) -> CodeAction:
"""处理用户编程请求"""
# 1. 理解意图
intent = self._understand_intent(user_input)
# 2. 收集上下文
context = self._gather_context(intent)
# 3. 规划行动
plan = self._plan_actions(intent, context)
# 4. 执行并验证
results = []
for action in plan:
result = self._execute_action(action)
results.append(result)
# 实时反馈
if action.requires_confirmation:
self._show_diff_and_confirm(result)
return self._summarize_results(results)
def _gather_context(self, intent: Intent) -> Context:
"""智能上下文收集"""
context = Context()
# 检索相关文件
if intent.target_files:
for file_path in intent.target_files:
context.add_file(self.project.read_file(file_path))
else:
# 语义搜索相关代码
relevant_files = self.code_index.semantic_search(intent.description)
for file in relevant_files[:5]: # Top-5相关文件
context.add_file(file)
# 获取项目结构
context.project_structure = self.project.get_structure()
# 获取依赖信息
context.dependencies = self.project.get_dependencies()
# 获取Git状态
context.git_status = self.git_integration.get_status()
return context
def _plan_actions(self, intent: Intent, context: Context) -> List[CodeAction]:
"""规划代码修改行动"""
planning_prompt = f"""You are an expert software engineer.
Task: {intent.description}
Context:
{context.to_prompt()}
Plan the necessary code changes. For each change, specify:
1. File path
2. Change type (create, modify, delete, rename)
3. Detailed description of changes
4. Dependencies on other changes
Output as a structured action plan."""
plan_response = self.llm.generate(planning_prompt, format="json")
return self._parse_action_plan(plan_response)
class CodeIndex:
"""代码索引系统:支持语义搜索"""
def __init__(self, project_path: str):
self.project_path = project_path
self.index = self._build_index()
def _build_index(self) -> VectorStore:
"""构建代码向量索引"""
code_chunks = []
for file_path in self._get_code_files():
chunks = self._parse_and_chunk(file_path)
for chunk in chunks:
embedding = self.embedder.encode(chunk.content)
code_chunks.append({
"embedding": embedding,
"content": chunk.content,
"metadata": {
"file": file_path,
"line_start": chunk.line_start,
"line_end": chunk.line_end,
"type": chunk.type # function, class, etc.
}
})
return VectorStore.from_documents(code_chunks)
def semantic_search(self, query: str, top_k: int = 10) -> List[CodeChunk]:
"""语义搜索相关代码"""
query_embedding = self.embedder.encode(query)
results = self.index.similarity_search(query_embedding, k=top_k)
return [CodeChunk.from_result(r) for r in results]
3.2 Claude Cowork:面向非技术用户的Agent¶
Claude Cowork是Anthropic推出的通用Agent产品,面向更广泛的非技术用户群体。
class ClaudeCowork:
"""
Claude Cowork: 通用AI助手
目标用户:非技术人员
核心场景:日常办公、研究、内容创作
"""
def __init__(self):
self.browser = BrowserController()
self.file_manager = FileManager()
self.document_processor = DocumentProcessor()
self.calendar = CalendarIntegration()
self.email = EmailIntegration()
def assist(self, request: UserRequest) -> AssistanceResult:
"""处理用户协助请求"""
# 分类请求类型
request_type = self._classify_request(request)
if request_type == "research":
return self._handle_research(request)
elif request_type == "document_creation":
return self._handle_document_creation(request)
elif request_type == "data_analysis":
return self._handle_data_analysis(request)
elif request_type == "scheduling":
return self._handle_scheduling(request)
elif request_type == "communication":
return self._handle_communication(request)
else:
return self._handle_general(request)
def _handle_research(self, request: UserRequest) -> ResearchResult:
"""处理研究类请求"""
# 1. 理解研究主题
topic = request.extract_topic()
# 2. 多源信息收集
sources = []
# 网页搜索
search_results = self.browser.search(topic)
for result in search_results[:10]:
page_content = self.browser.read_page(result.url)
sources.append({
"url": result.url,
"title": result.title,
"content": page_content
})
# 3. 信息整合与分析
analysis = self._synthesize_information(sources, topic)
# 4. 生成报告
report = self._generate_research_report(analysis)
return ResearchResult(
summary=analysis.summary,
key_findings=analysis.findings,
sources=sources,
report=report
)
def _handle_document_creation(self, request: UserRequest) -> DocumentResult:
"""处理文档创作请求"""
# 理解文档需求
doc_spec = self._parse_document_requirements(request)
# 收集参考材料
references = []
if request.has_attachments:
for attachment in request.attachments:
content = self.document_processor.read(attachment)
references.append(content)
# 生成文档大纲
outline = self._generate_outline(doc_spec, references)
# 逐段生成内容
sections = []
for section in outline.sections:
content = self._write_section(section, references, sections)
sections.append({
"heading": section.heading,
"content": content
})
# 格式化和导出
document = self._format_document(sections, doc_spec.format)
return DocumentResult(
document=document,
outline=outline,
word_count=sum(len(s["content"]) for s in sections)
)
3.3 代码Agent的核心挑战¶
class CodeAgentChallenges:
"""代码Agent面临的关键挑战与解决方案"""
@staticmethod # @staticmethod无需实例即可调用
def challenge_1_context_window():
"""
挑战1: 大型代码库的上下文限制
解决方案:
1. 智能代码检索
2. 分层摘要
3. 增量索引
"""
# 智能检索示例
def intelligent_retrieval(project, query):
# 第一层:文件级检索
relevant_files = file_level_search(project, query)
# 第二层:函数/类级检索
code_chunks = []
for file in relevant_files:
chunks = chunk_code(file)
scored_chunks = rank_by_relevance(chunks, query)
code_chunks.extend(scored_chunks[:3])
# 第三层:关系扩展
extended_context = expand_by_relations(code_chunks)
return extended_context
@staticmethod
def challenge_2_code_execution_safety():
"""
挑战2: 代码执行安全性
解决方案:
1. 沙箱环境
2. 资源限制
3. 权限控制
"""
class SecureExecutor:
def __init__(self):
self.sandbox = DockerSandbox()
self.resource_limits = {
"cpu": "1 core",
"memory": "512MB",
"timeout": 30,
"network": False
}
def execute(self, code: str) -> ExecutionResult:
# 代码静态分析
if self._contains_dangerous_operations(code):
return ExecutionResult.error("Dangerous operations detected")
# 在沙箱中执行
return self.sandbox.run(code, limits=self.resource_limits)
@staticmethod
def challenge_3_long_horizon_planning():
"""
挑战3: 长程规划与执行
解决方案:
1. 分层规划
2. 里程碑检查
3. 动态调整
"""
class HierarchicalPlanner:
def plan(self, goal: str) -> Plan:
# 高层规划
high_level = self._create_high_level_plan(goal)
# 逐层细化
detailed_plan = Plan()
for phase in high_level.phases:
sub_tasks = self._decompose(phase)
detailed_plan.add_phase(phase, sub_tasks)
return detailed_plan
def execute_with_checkpoints(self, plan: Plan):
for phase in plan.phases:
for task in phase.tasks:
result = self.execute_task(task)
# 检查点验证
if not self._validate_checkpoint(result):
self._handle_deviation(phase, result)
3.4 Claude Skills:可复用的Agent技能¶
Skills(技能) 是Anthropic于2025年推出的Agent能力复用机制。核心思想是:将Agent成功完成过的工作流程(包括步骤、工具调用序列、上下文模板)保存为可复用的"技能",后续遇到类似任务时一键触发,无需从零推理。
这解决了Agent的一个核心痛点:每次执行相似任务都要从头推理,既浪费Token又不稳定。
传统Agent执行同类任务:
第1次: 用户请求 → Agent从头推理 → 多步试错 → 完成 (耗时, 不稳定)
第2次: 类似请求 → Agent从头推理 → 多步试错 → 完成 (重复劳动)
第3次: 类似请求 → Agent从头推理 → ...
引入Skills后:
第1次: 用户请求 → Agent推理 → 完成 → 💾 保存为Skill
第2次: 类似请求 → 匹配已有Skill → 直接执行 ✅ (快速, 稳定)
第3次: 类似请求 → 匹配已有Skill → 直接执行 ✅
Skill的核心结构(概念实现):
"""
Claude Skills 概念实现
Skill = 可复用的Agent工作流模板
每个Skill包含:触发条件、执行步骤、所需工具、上下文模板
⚠️ 注意:Skills 是 Anthropic 产品层面的特性,以下代码展示其核心设计理念,
非官方API。实际使用请参考 Anthropic 官方文档。
"""
from dataclasses import dataclass, field
from datetime import datetime
import json
from typing import Callable
@dataclass # @dataclass自动生成__init__等方法
class Skill:
"""Agent技能定义"""
name: str # 技能名称
description: str # 技能描述(用于匹配)
trigger_patterns: list[str] # 触发条件(关键词/意图描述)
steps: list[dict] # 执行步骤模板
required_tools: list[str] # 所需工具
context_template: str # 上下文模板(含占位符)
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
success_count: int = 0 # 成功执行次数
avg_tokens: int = 0 # 平均Token消耗
@dataclass
class SkillExecutionResult:
"""技能执行结果"""
skill_name: str
success: bool
output: str
tokens_used: int
steps_completed: int
class SkillRegistry:
"""技能注册与管理中心"""
def __init__(self):
self.skills: dict[str, Skill] = {}
def register(self, skill: Skill):
"""注册一个技能"""
self.skills[skill.name] = skill
print(f"💾 技能已注册: {skill.name}")
def match(self, user_request: str) -> Skill | None:
"""根据用户请求匹配最合适的技能
简化实现:关键词匹配。实际产品会使用embedding相似度。
"""
best_match = None
best_score = 0
for skill in self.skills.values():
score = sum(
1 for pattern in skill.trigger_patterns
if pattern.lower() in user_request.lower()
)
if score > best_score:
best_score = score
best_match = skill
return best_match if best_score > 0 else None
def save(self, filepath: str):
"""持久化保存所有技能"""
data = {}
for name, skill in self.skills.items():
data[name] = {
"name": skill.name,
"description": skill.description,
"trigger_patterns": skill.trigger_patterns,
"steps": skill.steps,
"required_tools": skill.required_tools,
"context_template": skill.context_template,
"success_count": skill.success_count,
}
with open(filepath, "w", encoding="utf-8") as f: # with自动管理文件关闭
json.dump(data, f, ensure_ascii=False, indent=2)
class SkillableAgent:
"""支持Skills的Agent"""
def __init__(self, name: str, skill_registry: SkillRegistry):
self.name = name
self.skills = skill_registry
def run(self, user_request: str) -> str:
"""执行请求:优先匹配已有技能,否则从头推理"""
# 1. 尝试匹配已有技能
matched_skill = self.skills.match(user_request)
if matched_skill:
print(f"⚡ 匹配到技能: {matched_skill.name} (已成功{matched_skill.success_count}次)")
return self._execute_skill(matched_skill, user_request)
# 2. 无匹配 → 从头推理
print(f"🧠 无匹配技能,从头推理...")
result = self._reason_from_scratch(user_request)
# 3. 成功后,询问是否保存为技能(可自动化)
print(f"💡 提示: 此工作流可保存为技能以供复用")
return result
def _execute_skill(self, skill: Skill, user_request: str) -> str:
"""按照技能模板执行"""
output_parts = []
for i, step in enumerate(skill.steps): # enumerate同时获取索引和元素
action = step["action"]
# 将模板中的占位符替换为实际输入
if "{user_input}" in action:
action = action.replace("{user_input}", user_request)
print(f" Step {i+1}: {step.get('description', action)}")
# 实际执行(简化:这里直接输出步骤描述)
output_parts.append(f"[{step.get('description', '')}] 完成")
skill.success_count += 1
return "\n".join(output_parts)
def _reason_from_scratch(self, user_request: str) -> str:
"""从头推理(常规Agent流程)"""
return f"[从头推理完成] {user_request}"
def learn_skill(self, name: str, description: str, trigger_patterns: list[str],
steps: list[dict], required_tools: list[str] = None):
"""从成功执行中学习一个新技能"""
skill = Skill(
name=name,
description=description,
trigger_patterns=trigger_patterns,
steps=steps,
required_tools=required_tools or [],
context_template="",
)
self.skills.register(skill)
# === 使用示例 ===
registry = SkillRegistry()
# 预定义一些技能
registry.register(Skill(
name="代码审查",
description="对代码进行全面的安全性、性能、可读性审查",
trigger_patterns=["代码审查", "review", "检查代码", "代码质量"],
steps=[
{"description": "读取目标代码文件", "action": "read_file({user_input})"},
{"description": "检查安全漏洞", "action": "analyze_security"},
{"description": "检查性能问题", "action": "analyze_performance"},
{"description": "检查代码风格", "action": "check_style"},
{"description": "生成审查报告", "action": "generate_report"},
],
required_tools=["read_file", "code_analyzer", "report_generator"],
))
registry.register(Skill(
name="周报生成",
description="根据本周工作记录自动生成周报",
trigger_patterns=["周报", "weekly report", "本周总结"],
steps=[
{"description": "收集本周Git提交记录", "action": "git_log_this_week"},
{"description": "收集本周会议记录", "action": "get_meeting_notes"},
{"description": "收集本周任务完成情况", "action": "get_task_status"},
{"description": "汇总并生成周报", "action": "generate_weekly_report"},
],
required_tools=["git", "calendar", "task_tracker", "document_writer"],
))
# 使用Agent
agent = SkillableAgent("Claude", registry)
# 匹配到已有技能 → 直接执行
print("--- 请求1: 有匹配技能 ---")
result = agent.run("请帮我做一下这个PR的代码审查")
print(f"结果: {result}\n")
# 无匹配技能 → 从头推理
print("--- 请求2: 无匹配技能 ---")
result = agent.run("帮我分析这个数据集的异常值")
print(f"结果: {result}\n")
# 从成功执行中学习新技能
agent.learn_skill(
name="异常值分析",
description="分析数据集中的异常值并生成报告",
trigger_patterns=["异常值", "outlier", "数据异常"],
steps=[
{"description": "加载数据集", "action": "load_dataset({user_input})"},
{"description": "统计描述分析", "action": "descriptive_stats"},
{"description": "异常值检测(IQR+Z-score)", "action": "detect_outliers"},
{"description": "可视化异常分布", "action": "plot_outliers"},
{"description": "生成分析报告", "action": "generate_report"},
],
)
# 再次请求 → 匹配到刚学习的技能
print("--- 请求3: 匹配到新学习的技能 ---")
result = agent.run("这批数据有很多异常值,帮我分析一下")
print(f"结果: {result}")
Skills vs 传统Prompt的关键区别:
| 维度 | 传统Prompt | Skills |
|---|---|---|
| 执行方式 | 每次从头推理 | 匹配模板后按步骤执行 |
| 稳定性 | 每次可能不同 | 高度一致 |
| 速度 | 需要多轮推理 | 跳过推理直接执行 |
| Token消耗 | 高 | 低(减少推理开销) |
| 可进化 | 依赖Prompt优化 | 从成功经验中自动学习 |
| 产品形态 | Claude对话 | Claude Skills面板 |
📌 Skills的产品意义:Skills让Agent从"每次都是新手"进化为"越用越熟练"。这与人类的技能习得过程类似——从刻意思考(System 2)到自动化执行(System 1)。Claude的Skills面板允许用户查看、编辑、分享技能,让Agent的能力成为可管理的资产。
📖 交叉引用:Agent设计模式基础(Prompt Chaining/Routing等)→ AI Agent开发实战/01-Agent基础与架构 §4;Agent记忆系统如何持久化技能 → AI Agent开发实战/12-Agent记忆系统
4. OpenAI Operator与Computer Use¶
4.1 OpenAI Operator¶
Operator是OpenAI于2025年1月发布的AI Agent,能够像人类一样使用计算机界面。
class OpenAIOperator:
"""
OpenAI Operator架构
核心能力:
1. 视觉感知屏幕内容
2. 理解GUI元素
3. 执行鼠标/键盘操作
4. 多步骤任务执行
"""
def __init__(self):
self.vision_model = GPT4Vision()
self.action_model = ActionPredictionModel()
self.browser = BrowserAutomation()
self.os_interface = OSInterface()
def execute_task(self, task_description: str) -> TaskResult:
"""
执行需要操作计算机界面的任务
示例任务:
- "在Amazon上搜索蓝牙耳机并按评分排序"
- "在Gmail中查找上周的会议邀请并添加到日历"
"""
max_steps = 50
step = 0
state_history = []
while step < max_steps:
# 1. 截取当前屏幕
screenshot = self.os_interface.capture_screen()
# 2. 视觉理解
ui_elements = self.vision_model.analyze_ui(screenshot)
# 3. 决策下一步行动
action = self.action_model.predict(
task=task_description,
current_state=screenshot,
ui_elements=ui_elements,
history=state_history
)
# 4. 执行行动
if action.type == "click":
self.os_interface.click(action.target)
elif action.type == "type":
self.os_interface.type(action.text)
elif action.type == "scroll":
self.os_interface.scroll(action.direction, action.amount)
elif action.type == "key":
self.os_interface.press_key(action.key)
elif action.type == "complete":
return TaskResult.success(action.result)
# 5. 记录状态
state_history.append({
"step": step,
"screenshot": screenshot,
"action": action,
"ui_elements": ui_elements
})
step += 1
time.sleep(0.5) # 等待界面响应
return TaskResult.incomplete(state_history)
class ComputerUseAgent:
"""
通用计算机使用Agent
能够操作:
- 浏览器
- 桌面应用
- 文件系统
- 命令行
"""
def __init__(self):
self.tools = {
"screenshot": ScreenshotTool(),
"mouse": MouseControlTool(),
"keyboard": KeyboardTool(),
"shell": ShellTool(),
"browser": BrowserTool()
}
def run(self, instruction: str) -> ExecutionTrace:
"""执行需要计算机操作的指令"""
trace = ExecutionTrace()
while not self._is_complete():
# 感知环境
observation = self._observe()
trace.add_observation(observation)
# 推理下一步
thought = self._reason(instruction, trace)
trace.add_thought(thought)
# 选择行动
action = self._select_action(thought)
trace.add_action(action)
# 执行
result = self._execute(action)
trace.add_result(result)
return trace
def _observe(self) -> Observation:
"""观察当前计算机状态"""
screenshot = self.tools["screenshot"].capture()
# 使用视觉模型分析
analysis = self.vision_model.describe(
screenshot,
prompt="Describe the current computer state, including open applications,
visible UI elements, and any relevant text content."
)
return Observation(
screenshot=screenshot,
description=analysis.description,
ui_elements=analysis.elements
)
4.2 Computer Use技术实现¶
class ComputerUseImplementation:
"""Computer Use Agent的技术实现细节"""
@staticmethod
def visual_grounding():
"""
视觉定位:将自然语言指令映射到屏幕坐标
"""
class VisualGroundingModel:
"""
基于视觉-语言模型的UI元素定位
"""
def __init__(self):
self.model = CLIP() # 或类似的视觉-语言模型
def locate_element(self, screenshot: Image, description: str) -> BoundingBox:
"""
在截图中定位描述的元素
Args:
screenshot: 屏幕截图
description: 元素描述,如"搜索按钮"
Returns:
BoundingBox: 元素位置 (x1, y1, x2, y2)
"""
# 方法1: 使用OCR检测可点击文本
text_regions = self.ocr.detect(screenshot)
# 方法2: 使用图标检测模型
icon_regions = self.icon_detector.detect(screenshot)
# 方法3: 使用视觉-语言匹配
all_regions = text_regions + icon_regions
# 计算与描述的相似度
best_match = None
best_score = 0
for region in all_regions:
# 裁剪区域
crop = screenshot.crop(region.bbox)
# 计算视觉-文本相似度
score = self.model.similarity(crop, description)
if score > best_score:
best_score = score
best_match = region
return best_match.bbox if best_match else None
@staticmethod
def action_space_design():
"""
动作空间设计
"""
# 定义Agent可以执行的动作
ACTION_SPACE = {
# 鼠标操作
"mouse_move": {
"description": "移动鼠标到指定坐标",
"parameters": {"x": int, "y": int}
},
"mouse_click": {
"description": "在指定位置点击",
"parameters": {"x": int, "y": int, "button": ["left", "right"]}
},
"mouse_drag": {
"description": "拖拽操作",
"parameters": {"start": (int, int), "end": (int, int)}
},
"mouse_scroll": {
"description": "滚动",
"parameters": {"direction": ["up", "down"], "amount": int}
},
# 键盘操作
"key_press": {
"description": "按下按键",
"parameters": {"key": str}
},
"type_text": {
"description": "输入文本",
"parameters": {"text": str}
},
"hotkey": {
"description": "组合键",
"parameters": {"keys": List[str]}
},
# 系统操作
"screenshot": {
"description": "截取屏幕",
"parameters": {}
},
"wait": {
"description": "等待界面响应",
"parameters": {"seconds": float}
},
# 任务控制
"complete": {
"description": "标记任务完成",
"parameters": {"result": str}
},
"fail": {
"description": "标记任务失败",
"parameters": {"reason": str}
}
}
return ACTION_SPACE
@staticmethod
def safety_mechanisms():
"""
Computer Use的安全机制
"""
class SafetyGuardrails:
def __init__(self):
self.restricted_actions = [
"delete_system_files",
"modify_system_settings",
"access_sensitive_data"
]
self.confirmation_required = [
"make_purchase",
"send_email",
"delete_files",
"modify_permissions"
]
def check_action(self, action: Action) -> SafetyResult:
"""检查行动是否安全"""
# 检查是否在禁止列表
if action.type in self.restricted_actions:
return SafetyResult.reject("Action not allowed")
# 检查是否需要确认
if action.type in self.confirmation_required:
return SafetyResult.require_confirmation(
f"This action will {action.description}. Proceed?"
)
# 检查异常模式
if self._detect_suspicious_pattern(action):
return SafetyResult.require_review("Suspicious pattern detected")
return SafetyResult.allow()
def _detect_suspicious_pattern(self, action: Action) -> bool:
"""检测可疑操作模式"""
# 实现各种安全检查逻辑
pass
4.3 浏览器自动化¶
class BrowserAutomation:
"""浏览器自动化实现"""
def __init__(self):
self.driver = PlaywrightDriver()
self.page = None
async def navigate(self, url: str): # async def定义协程函数
"""导航到指定URL"""
self.page = await self.driver.new_page() # await等待异步操作完成
await self.page.goto(url)
async def search(self, query: str, engine: str = "google"):
"""在搜索引擎中搜索"""
if engine == "google":
await self.navigate("https://www.google.com")
await self.page.fill('input[name="q"]', query)
await self.page.press('input[name="q"]', "Enter")
# 等待结果加载
await self.page.wait_for_selector("#search")
async def fill_form(self, form_data: Dict[str, str]):
"""填写表单"""
for field, value in form_data.items():
# 尝试多种选择器策略
selectors = [
f'input[name="{field}"]',
f'input[id="{field}"]',
f'input[placeholder*="{field}"]',
f'textarea[name="{field}"]'
]
for selector in selectors:
try:
await self.page.fill(selector, value)
break
except:
continue
async def extract_data(self, extraction_rules: List[Rule]) -> List[Dict]:
"""根据规则提取网页数据"""
results = []
for rule in extraction_rules:
elements = await self.page.query_selector_all(rule.selector)
for element in elements:
item = {}
for field, field_rule in rule.fields.items():
value = await element.eval_on_selector(
field_rule.selector,
"el => el.textContent"
)
item[field] = value.strip() if value else None # 链式调用:strip去除空白
results.append(item)
return results
async def monitor_changes(self, selector: str, callback):
"""监控页面元素变化"""
await self.page.evaluate(f"""
new MutationObserver((mutations) => {{
window.__pageChanges = window.__pageChanges || [];
window.__pageChanges.push(mutations);
}}).observe(
document.querySelector('{selector}'),
{{ childList: true, subtree: true }}
);
""")
5. AI Agent架构设计¶
5.1 分层Agent架构¶
┌─────────────────────────────────────────────────────────────────┐
│ AI Agent分层架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 应用层 (Application) │ │
│ │ • 任务定义 • 用户交互 • 结果展示 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 规划层 (Planning) │ │
│ │ • 目标分解 • 策略选择 • 动态重规划 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 推理层 (Reasoning) │ │
│ │ • 逻辑推理 • 因果分析 • 假设验证 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 执行层 (Execution) │ │
│ │ • 工具调用 • API请求 • 代码执行 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 感知层 (Perception) │ │
│ │ • 文本理解 • 图像识别 • 语音处理 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 记忆层 (Memory) │ │
│ │ • 工作记忆 • 长期记忆 • 知识检索 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
5.2 核心组件实现¶
class AgentCoreArchitecture:
"""Agent核心架构实现"""
class PerceptionModule:
"""感知模块"""
def __init__(self):
self.text_processor = TextProcessor()
self.vision_processor = VisionProcessor()
self.audio_processor = AudioProcessor()
def process(self, input_data: Union[str, Image, Audio]) -> Perception:
"""统一感知处理入口"""
if isinstance(input_data, str): # isinstance检查类型
return self._process_text(input_data)
elif isinstance(input_data, Image):
return self._process_image(input_data)
elif isinstance(input_data, Audio):
return self._process_audio(input_data)
else:
raise ValueError(f"Unsupported input type: {type(input_data)}")
def _process_text(self, text: str) -> TextPerception:
"""文本感知处理"""
# 意图识别
intent = self.text_processor.classify_intent(text)
# 实体提取
entities = self.text_processor.extract_entities(text)
# 情感分析
sentiment = self.text_processor.analyze_sentiment(text)
return TextPerception(
raw_text=text,
intent=intent,
entities=entities,
sentiment=sentiment
)
class MemoryModule:
"""记忆模块"""
def __init__(self, vector_store: VectorStore):
self.working_memory = WorkingMemory(capacity=7) # 米勒定律
self.episodic_memory = EpisodicMemory(vector_store)
self.semantic_memory = SemanticMemory(vector_store)
self.procedural_memory = ProceduralMemory()
def store(self, experience: Experience):
"""存储经验"""
# 工作记忆
self.working_memory.add(experience)
# 情节记忆(长期)
if experience.is_significant:
self.episodic_memory.store(experience)
# 提取知识到语义记忆
knowledge = self._extract_knowledge(experience)
self.semantic_memory.store(knowledge)
# 更新程序性记忆(技能)
if experience.type == "skill_execution":
self.procedural_memory.update(experience)
def retrieve(self, query: str, context: Context) -> RetrievedInfo:
"""检索相关信息"""
# 多路检索
working_results = self.working_memory.search(query)
episodic_results = self.episodic_memory.search(query, top_k=5)
semantic_results = self.semantic_memory.search(query, top_k=5)
procedural_results = self.procedural_memory.match(context)
# 融合排序
fused = self._fusion_rank(
working_results,
episodic_results,
semantic_results,
procedural_results
)
return fused
class PlanningModule:
"""规划模块"""
def __init__(self, llm):
self.llm = llm
self.plan_library = PlanLibrary()
def create_plan(self, goal: Goal, context: Context) -> Plan:
"""创建执行计划"""
# 尝试从计划库匹配
similar_plan = self.plan_library.find_similar(goal)
if similar_plan and similar_plan.success_rate > 0.8:
return self._adapt_plan(similar_plan, context)
# 否则生成新计划
return self._generate_plan(goal, context)
def _generate_plan(self, goal: Goal, context: Context) -> Plan:
"""使用LLM生成计划"""
prompt = f"""Create a step-by-step plan to achieve the following goal:
Goal: {goal.description}
Context:
{context.to_prompt()}
Available tools: {context.available_tools}
Generate a detailed plan with:
1. Sequential steps
2. Each step's expected outcome
3. Dependencies between steps
4. Potential failure points and alternatives
Output as structured JSON."""
plan_json = self.llm.generate(prompt, format="json")
return Plan.from_json(plan_json)
def replan(self, current_plan: Plan, failure: Failure) -> Plan:
"""失败后的重规划"""
prompt = f"""The current plan failed. Create a revised plan.
Original Plan: {current_plan.to_json()}
Failure: {failure.description}
Current State: {failure.state}
Provide a new plan that addresses the failure."""
new_plan_json = self.llm.generate(prompt, format="json")
return Plan.from_json(new_plan_json)
class ToolModule:
"""工具模块"""
def __init__(self):
self.tools: Dict[str, Tool] = {}
self.tool_descriptions = []
def register(self, tool: Tool):
"""注册工具"""
self.tools[tool.name] = tool
self.tool_descriptions.append(tool.get_description())
def select_and_execute(self, intent: Intent, context: Context) -> ToolResult:
"""选择并执行合适的工具"""
# 工具选择
tool_name = self._select_tool(intent, context)
tool = self.tools[tool_name]
# 参数提取
parameters = self._extract_parameters(intent, tool)
# 执行
try:
result = tool.execute(**parameters) # **parameters将字典解包为关键字参数,实现动态参数传递
return ToolResult.success(result)
except Exception as e:
return ToolResult.failure(str(e))
def _select_tool(self, intent: Intent, context: Context) -> str:
"""基于意图选择工具"""
prompt = f"""Given the user intent and available tools, select the most appropriate tool.
Intent: {intent.description}
Available Tools:
{self._format_tool_descriptions()}
Select the best tool and explain why."""
selection = self.llm.generate(prompt)
return self._parse_tool_selection(selection)
5.3 Agent通信协议¶
class AgentCommunicationProtocol:
"""多Agent系统的通信协议"""
class Message:
"""Agent间消息格式"""
def __init__(
self,
sender: str,
receiver: str,
message_type: str,
content: Dict,
conversation_id: str,
timestamp: float = None
):
self.sender = sender
self.receiver = receiver
self.message_type = message_type # request, response, inform, delegate
self.content = content
self.conversation_id = conversation_id
self.timestamp = timestamp or time.time()
class CommunicationBus:
"""Agent通信总线"""
def __init__(self):
self.channels: Dict[str, asyncio.Queue] = {}
self.subscribers: Dict[str, List[str]] = {}
def register_agent(self, agent_id: str):
"""注册Agent到通信总线"""
self.channels[agent_id] = asyncio.Queue()
async def send(self, message: Message):
"""发送消息"""
if message.receiver not in self.channels:
raise ValueError(f"Unknown receiver: {message.receiver}")
await self.channels[message.receiver].put(message)
async def receive(self, agent_id: str, timeout: float = None) -> Message:
"""接收消息"""
if agent_id not in self.channels:
raise ValueError(f"Unknown agent: {agent_id}")
try:
return await asyncio.wait_for(
self.channels[agent_id].get(),
timeout=timeout
)
except asyncio.TimeoutError:
return None
def subscribe(self, agent_id: str, topic: str):
"""订阅主题"""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(agent_id)
async def publish(self, topic: str, message: Message):
"""发布到主题"""
if topic in self.subscribers:
for subscriber in self.subscribers[topic]:
await self.send(message.copy(receiver=subscriber))
6. 多Agent协作系统¶
6.1 协作模式¶
┌─────────────────────────────────────────────────────────────────┐
│ 多Agent协作模式 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 层级式协作 (Hierarchical) │
│ │
│ ┌───────────┐ │
│ │ Manager │ │
│ └─────┬─────┘ │
│ ┌───────┼───────┐ │
│ ↓ ↓ ↓ │
│ ┌────┐ ┌────┐ ┌────┐ │
│ │ A1 │ │ A2 │ │ A3 │ │
│ └────┘ └────┘ └────┘ │
│ │
│ 2. 对等协作 (Peer-to-Peer) │
│ │
│ ┌─────┐ │
│ │ A1 │←────────→┌─────┐ │
│ └──┬──┘ │ A2 │ │
│ ↓ └──┬──┘ │
│ ┌─────┐ ↓ │
│ │ A3 │←────────→┌─────┐ │
│ └─────┘ │ A4 │ │
│ └─────┘ │
│ │
│ 3. 市场式协作 (Market-based) │
│ │
│ ┌─────────────────────────────────────┐ │
│ │ 任务拍卖市场 │ │
│ │ Task X ──→ [Bid: A1:$5, A2:$8] ──→ A1 wins │
│ │ Task Y ──→ [Bid: A2:$3, A3:$7] ──→ A2 wins │
│ └─────────────────────────────────────┘ │
│ │
│ 4. 流水线协作 (Pipeline) │
│ │
│ Input → [A1:提取] → [A2:分析] → [A3:生成] → Output │
│ │
└─────────────────────────────────────────────────────────────────┘
6.2 多Agent系统实现¶
class MultiAgentSystem:
"""多Agent协作系统"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.coordinator = Coordinator()
self.communication_bus = CommunicationBus()
def register_agent(self, agent: Agent):
"""注册Agent"""
self.agents[agent.id] = agent
self.communication_bus.register_agent(agent.id)
agent.set_communication_bus(self.communication_bus)
async def execute_collaborative_task(
self,
task: ComplexTask,
collaboration_mode: str = "hierarchical"
) -> CollaborativeResult:
"""执行协作任务"""
if collaboration_mode == "hierarchical":
return await self._hierarchical_execution(task)
elif collaboration_mode == "peer":
return await self._peer_execution(task)
elif collaboration_mode == "market":
return await self._market_execution(task)
else:
raise ValueError(f"Unknown collaboration mode: {collaboration_mode}")
async def _hierarchical_execution(self, task: ComplexTask) -> CollaborativeResult:
"""层级式协作执行"""
# 1. 选择Manager Agent
manager = self.coordinator.select_manager(task)
# 2. Manager分解任务
subtasks = manager.decompose_task(task)
# 3. 分配子任务
assignments = self.coordinator.assign_subtasks(subtasks, self.agents)
# 4. 并行执行
results = await asyncio.gather(*[ # 并发执行多个协程任务
self.agents[agent_id].execute(subtask)
for subtask, agent_id in assignments.items()
])
# 5. Manager整合结果
final_result = manager.integrate_results(results)
return CollaborativeResult(
output=final_result,
execution_trace=results,
coordination_log=self.coordinator.get_log()
)
async def _market_execution(self, task: ComplexTask) -> CollaborativeResult:
"""市场式协作执行(基于拍卖)"""
# 1. 分解任务
subtasks = self.coordinator.decompose(task)
# 2. 拍卖分配
assignments = {}
for subtask in subtasks:
# 收集投标
bids = []
for agent_id, agent in self.agents.items():
if agent.can_handle(subtask):
cost = agent.estimate_cost(subtask)
bids.append((agent_id, cost))
# 选择最低出价
if bids:
winner = min(bids, key=lambda x: x[1]) # lambda匿名函数
assignments[subtask.id] = winner[0]
# 3. 执行
results = await self._execute_assignments(assignments)
return CollaborativeResult(
output=self._aggregate_results(results),
assignments=assignments,
bids=bids
)
class Coordinator:
"""协调器:负责任务分配和冲突解决"""
def __init__(self):
self.task_queue = PriorityQueue()
self.agent_status = {}
def decompose_task(self, task: ComplexTask) -> List[SubTask]:
"""将复杂任务分解为子任务"""
prompt = f"""Decompose the following complex task into manageable subtasks:
Task: {task.description}
Requirements: {task.requirements}
Provide a list of subtasks with:
1. Subtask description
2. Dependencies on other subtasks
3. Estimated complexity
4. Required capabilities
Output as JSON."""
decomposition = self.llm.generate(prompt, format="json")
return [SubTask.from_dict(s) for s in decomposition["subtasks"]]
def assign_subtasks(
self,
subtasks: List[SubTask],
agents: Dict[str, Agent]
) -> Dict[str, str]:
"""将子任务分配给合适的Agent"""
assignments = {}
for subtask in subtasks:
# 计算每个Agent的匹配分数
scores = {}
for agent_id, agent in agents.items():
if agent.is_available():
score = self._compute_match_score(agent, subtask)
scores[agent_id] = score
# 选择最佳匹配
if scores:
best_agent = max(scores, key=scores.get)
assignments[subtask.id] = best_agent
return assignments
def _compute_match_score(self, agent: Agent, subtask: SubTask) -> float:
"""计算Agent与子任务的匹配分数"""
score = 0.0
# 能力匹配
capability_match = len(
set(agent.capabilities) & set(subtask.required_capabilities)
) / len(subtask.required_capabilities)
score += capability_match * 0.4
# 历史表现
if subtask.type in agent.performance_history:
score += agent.performance_history[subtask.type] * 0.3
# 当前负载
load_factor = 1.0 - (agent.current_load / agent.max_capacity)
score += load_factor * 0.2
# 通信开销
if subtask.dependencies:
# 优先分配给协作Agent
score += 0.1
return score
def resolve_conflict(self, conflict: Conflict) -> Resolution:
"""解决Agent间冲突"""
if conflict.type == "resource_contention":
# 资源竞争:优先级或时间片分配
return self._resolve_resource_conflict(conflict)
elif conflict.type == "goal_conflict":
# 目标冲突:协商或上级裁决
return self._resolve_goal_conflict(conflict)
elif conflict.type == "communication_failure":
# 通信失败:重试或替代路由
return self._resolve_communication_failure(conflict)
else:
raise ValueError(f"Unknown conflict type: {conflict.type}")
6.3 Agent团队示例¶
class ResearchTeam:
"""研究团队:多Agent协作示例"""
def __init__(self):
self.system = MultiAgentSystem()
# 创建专业Agent
self.planner = PlannerAgent("planner")
self.researcher = ResearchAgent("researcher")
self.analyst = AnalystAgent("analyst")
self.writer = WriterAgent("writer")
self.reviewer = ReviewerAgent("reviewer")
# 注册到系统
for agent in [self.planner, self.researcher, self.analyst, self.writer, self.reviewer]:
self.system.register_agent(agent)
async def conduct_research(self, topic: str) -> ResearchReport:
"""执行协作研究"""
# Phase 1: 规划
plan = await self.planner.create_research_plan(topic)
# Phase 2: 信息收集(并行)
search_tasks = [
self.researcher.search_academic(topic),
self.researcher.search_news(topic),
self.researcher.search_web(topic)
]
search_results = await asyncio.gather(*search_tasks)
# Phase 3: 分析
analysis = await self.analyst.analyze(search_results)
# Phase 4: 撰写
draft = await self.writer.write_report(analysis)
# Phase 5: 审阅和修订
review = await self.reviewer.review(draft)
final_report = await self.writer.revise(draft, review)
return final_report
class PlannerAgent(Agent):
"""规划Agent"""
async def create_research_plan(self, topic: str) -> ResearchPlan:
"""创建研究计划"""
# 理解研究范围
scope = self._determine_scope(topic)
# 识别关键问题
key_questions = self._identify_questions(topic, scope)
# 规划研究步骤
steps = []
for question in key_questions:
steps.append(ResearchStep(
question=question,
sources=self._identify_sources(question),
methods=self._select_methods(question)
))
return ResearchPlan(
topic=topic,
scope=scope,
steps=steps,
timeline=self._estimate_timeline(steps)
)
class ResearchAgent(Agent):
"""研究Agent"""
async def search_academic(self, topic: str) -> List[Paper]:
"""搜索学术论文"""
# 使用Google Scholar API、arXiv API等
pass
async def search_news(self, topic: str) -> List[NewsArticle]:
"""搜索新闻"""
# 使用News API
pass
async def search_web(self, topic: str) -> List[WebPage]:
"""搜索网页"""
# 使用搜索引擎API
pass
class AnalystAgent(Agent):
"""分析Agent"""
async def analyze(self, sources: List[Source]) -> Analysis:
"""分析收集的信息"""
# 信息整合
integrated = self._integrate_information(sources)
# 模式识别
patterns = self._identify_patterns(integrated)
# 趋势分析
trends = self._analyze_trends(integrated)
# 差距识别
gaps = self._identify_gaps(integrated)
return Analysis(
summary=integrated,
patterns=patterns,
trends=trends,
gaps=gaps
)
7. Agent安全与可控性¶
7.1 安全架构¶
┌─────────────────────────────────────────────────────────────────┐
│ Agent安全架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 应用层安全 │ │
│ │ • 输入验证 • 输出过滤 • 敏感信息检测 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 行为层安全 │ │
│ │ • 动作审查 • 权限控制 • 异常检测 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 执行层安全 │ │
│ │ • 沙箱隔离 • 资源限制 • 审计日志 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 基础设施安全 │ │
│ │ • 网络安全 • 数据加密 • 访问控制 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
7.2 安全机制实现¶
class AgentSafetyFramework:
"""Agent安全框架"""
class InputGuardrails:
"""输入安全护栏"""
def __init__(self):
self.forbidden_patterns = [
r"ignore previous instructions",
r"disregard safety",
r"bypass security",
r"execute.*system.*command",
]
self.sensitive_keywords = [
"password", "secret", "key", "token", "credential"
]
def validate(self, user_input: str) -> ValidationResult:
"""验证用户输入"""
# 检查注入攻击
for pattern in self.forbidden_patterns:
if re.search(pattern, user_input, re.IGNORECASE): # re.search正则表达式搜索匹配
return ValidationResult.reject(
"Potential prompt injection detected"
)
# 检查敏感信息泄露请求
if self._is_sensitive_info_request(user_input):
return ValidationResult.require_confirmation(
"This request may involve sensitive information"
)
# 内容安全检查
safety_score = self._check_content_safety(user_input)
if safety_score < 0.5:
return ValidationResult.reject("Content violates safety policy")
return ValidationResult.allow()
def _is_sensitive_info_request(self, text: str) -> bool:
"""检测是否为敏感信息请求"""
text_lower = text.lower()
return any(keyword in text_lower for keyword in self.sensitive_keywords) # any()任一为True则返回True
class ActionGuardrails:
"""行动安全护栏"""
def __init__(self):
self.risk_levels = {
"read_file": "low",
"write_file": "medium",
"delete_file": "high",
"execute_code": "high",
"network_request": "medium",
"database_query": "medium",
"send_email": "high",
"make_payment": "critical"
}
def evaluate(self, action: Action) -> SafetyDecision:
"""评估行动安全性"""
risk_level = self.risk_levels.get(action.type, "unknown")
if risk_level == "critical":
return SafetyDecision.require_human_approval(
f"Critical action '{action.type}' requires explicit approval"
)
elif risk_level == "high":
# 检查是否有异常模式
if self._detect_anomaly(action):
return SafetyDecision.require_confirmation()
# 检查是否超出正常范围
if not self._is_within_normal_scope(action):
return SafetyDecision.require_confirmation()
elif risk_level == "medium":
# 记录但允许
self._log_action(action)
return SafetyDecision.allow()
def _detect_anomaly(self, action: Action) -> bool:
"""检测异常行动模式"""
# 实现异常检测逻辑
# 例如:短时间内大量删除操作
pass
class OutputGuardrails:
"""输出安全护栏"""
def filter(self, output: str, context: Context) -> FilteredOutput:
"""过滤不安全输出"""
# 检查敏感信息泄露
if self._contains_sensitive_data(output):
output = self._redact_sensitive_data(output)
# 内容安全过滤
if self._contains_harmful_content(output):
return FilteredOutput.block("Harmful content detected")
# 事实准确性检查(对于关键信息)
if context.requires_fact_checking:
accuracy = self._check_factual_accuracy(output)
if accuracy < 0.8:
output = self._add_disclaimer(output)
return FilteredOutput.allow(output)
class SandboxedExecution:
"""沙箱执行环境"""
def __init__(self):
self.docker_client = docker.from_env()
def execute(self, code: str, timeout: int = 30) -> ExecutionResult:
"""在沙箱中执行代码"""
# 创建临时容器
container = self.docker_client.containers.run(
"python:3.9-slim",
command=f"python -c '{code}'",
detach=True,
mem_limit="512m",
cpu_quota=100000, # 1 CPU
network_mode="none", # 禁用网络
read_only=True, # 只读文件系统
security_opt=["no-new-privileges"]
)
try:
result = container.wait(timeout=timeout)
logs = container.logs().decode("utf-8")
return ExecutionResult(
success=result["StatusCode"] == 0,
output=logs,
exit_code=result["StatusCode"]
)
finally:
container.remove(force=True)
class HumanInTheLoop:
"""人在回路机制"""
def __init__(self, approval_modes: Dict[str, str]):
"""
Args:
approval_modes: 不同风险级别的审批模式
- "auto": 自动执行
- "confirm": 需要确认
- "approve": 需要明确批准
"""
self.approval_modes = approval_modes
async def request_approval(
self,
action: Action,
context: Context
) -> ApprovalResult:
"""请求人类批准"""
risk_level = self._assess_risk(action)
mode = self.approval_modes.get(risk_level, "approve")
if mode == "auto":
return ApprovalResult.approved()
elif mode == "confirm":
# 发送通知,等待确认
notification = self._create_notification(action, context)
response = await self._send_and_wait(notification, timeout=60)
if response and response.confirmed:
return ApprovalResult.approved()
else:
return ApprovalResult.denied("Not confirmed")
elif mode == "approve":
# 需要显式批准
request = self._create_approval_request(action, context)
response = await self._send_and_wait(request, timeout=300)
if response and response.approved:
return ApprovalResult.approved(response.conditions)
else:
return ApprovalResult.denied(response.reason if response else "Timeout")
7.3 可解释性与审计¶
class AgentExplainability:
"""Agent可解释性框架"""
class DecisionTracer:
"""决策追踪器"""
def __init__(self):
self.trace = DecisionTrace()
def log_decision(
self,
decision_point: str,
context: Dict,
reasoning: str,
decision: str,
alternatives: List[str]
):
"""记录决策过程"""
self.trace.add_step(DecisionStep(
timestamp=time.time(),
decision_point=decision_point,
context=context,
reasoning=reasoning,
decision=decision,
alternatives_considered=alternatives,
confidence=self._calculate_confidence(reasoning)
))
def generate_explanation(self, detail_level: str = "summary") -> str:
"""生成决策解释"""
if detail_level == "summary":
return self._generate_summary()
elif detail_level == "detailed":
return self._generate_detailed_explanation()
elif detail_level == "technical":
return self._generate_technical_explanation()
else:
raise ValueError(f"Unknown detail level: {detail_level}")
class AuditLogger:
"""审计日志系统"""
def __init__(self, storage_backend):
self.storage = storage_backend
def log_interaction(self, interaction: AgentInteraction):
"""记录Agent交互"""
audit_record = {
"timestamp": interaction.timestamp,
"session_id": interaction.session_id,
"user_id": interaction.user_id,
"input": self._sanitize(interaction.input),
"actions_taken": [
{
"action": action.type,
"parameters": self._sanitize(action.parameters),
"result": action.result,
"timestamp": action.timestamp
}
for action in interaction.actions
],
"output": self._sanitize(interaction.output),
"safety_checks": [
{
"check_type": check.type,
"result": check.result,
"timestamp": check.timestamp
}
for check in interaction.safety_checks
]
}
self.storage.store(audit_record)
def query_history(
self,
user_id: str = None,
time_range: Tuple[datetime, datetime] = None,
action_type: str = None
) -> List[AuditRecord]:
"""查询历史记录"""
filters = {}
if user_id:
filters["user_id"] = user_id
if time_range:
filters["timestamp"] = {"$gte": time_range[0], "$lte": time_range[1]}
if action_type:
filters["actions_taken.action"] = action_type
return self.storage.query(filters)
8. 实践项目:构建自主研究Agent¶
8.1 项目概述¶
构建一个能够自主完成研究任务的AI Agent,具备以下能力: 1. 理解研究主题 2. 搜索和收集信息 3. 分析和综合信息 4. 生成研究报告
8.2 完整实现¶
# autonomous_research_agent.py
import asyncio # Python标准异步库
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class ResearchConfig:
"""研究Agent配置"""
max_search_results: int = 10
max_analysis_depth: int = 3
output_format: str = "markdown"
include_citations: bool = True
fact_check: bool = True
class AutonomousResearchAgent:
"""
自主研究Agent
功能:
1. 主题分析与问题生成
2. 多源信息检索
3. 信息可信度评估
4. 综合分析与报告生成
"""
def __init__(self, llm_client, search_client, config: ResearchConfig = None):
self.llm = llm_client
self.search = search_client
self.config = config or ResearchConfig()
# 子模块
self.topic_analyzer = TopicAnalyzer(llm_client)
self.information_retriever = InformationRetriever(search_client)
self.credibility_assessor = CredibilityAssessor()
self.synthesizer = InformationSynthesizer(llm_client)
self.report_generator = ReportGenerator(llm_client)
async def conduct_research(self, topic: str) -> ResearchReport:
"""
执行完整的研究流程
Args:
topic: 研究主题
Returns:
ResearchReport: 研究报告
"""
print(f"🔬 开始研究主题: {topic}")
# Phase 1: 主题分析
print("📊 Phase 1: 分析研究主题...")
research_questions = await self.topic_analyzer.analyze(topic)
print(f" 生成 {len(research_questions)} 个研究问题")
# Phase 2: 信息检索
print("🔍 Phase 2: 检索相关信息...")
all_sources = []
for question in research_questions:
sources = await self.information_retriever.retrieve(
question,
max_results=self.config.max_search_results
)
all_sources.extend(sources)
print(f" 收集到 {len(all_sources)} 个信息源")
# Phase 3: 可信度评估
print("✅ Phase 3: 评估信息可信度...")
assessed_sources = []
for source in all_sources:
assessment = self.credibility_assessor.assess(source)
if assessment.score > 0.6: # 过滤低可信度来源
assessed_sources.append((source, assessment))
assessed_sources.sort(key=lambda x: x[1].score, reverse=True)
print(f" 保留 {len(assessed_sources)} 个高可信度来源")
# Phase 4: 信息综合
print("🧩 Phase 4: 综合信息...")
synthesis = await self.synthesizer.synthesize(
topic,
assessed_sources,
depth=self.config.max_analysis_depth
)
print(" 信息综合完成")
# Phase 5: 生成报告
print("📝 Phase 5: 生成研究报告...")
report = await self.report_generator.generate(
topic=topic,
synthesis=synthesis,
sources=assessed_sources,
config=self.config
)
print("✨ 研究完成!")
return report
class TopicAnalyzer:
"""主题分析器"""
def __init__(self, llm):
self.llm = llm
async def analyze(self, topic: str) -> List[ResearchQuestion]:
"""分析主题并生成研究问题"""
prompt = f"""Analyze the following research topic and generate specific research questions.
Topic: {topic}
Generate 5-7 specific research questions that:
1. Cover different aspects of the topic
2. Are answerable through research
3. Build upon each other logically
4. Range from factual to analytical
Output as JSON list with fields: question, type (factual/analytical/comparative), priority (1-5)"""
response = await self.llm.generate(prompt, format="json")
questions = json.loads(response) # json.loads将JSON字符串→Python对象
return [ResearchQuestion(**q) for q in questions] # **q将字典解包为关键字参数构造dataclass
class InformationRetriever:
"""信息检索器"""
def __init__(self, search_client):
self.search = search_client
async def retrieve(
self,
query: ResearchQuestion,
max_results: int = 10
) -> List[InformationSource]:
"""检索相关信息"""
sources = []
# 网页搜索
web_results = await self.search.web_search(
query.question,
num_results=max_results // 2
)
for result in web_results:
content = await self._fetch_content(result.url)
sources.append(InformationSource(
type="web",
title=result.title,
url=result.url,
content=content,
timestamp=datetime.now()
))
# 学术搜索
academic_results = await self.search.academic_search(
query.question,
num_results=max_results // 2
)
for result in academic_results:
sources.append(InformationSource(
type="academic",
title=result.title,
url=result.url,
content=result.abstract,
authors=result.authors,
publication_date=result.date,
timestamp=datetime.now()
))
return sources
async def _fetch_content(self, url: str) -> str:
"""获取网页内容"""
# 实现网页内容抓取
pass
class CredibilityAssessor:
"""可信度评估器"""
def assess(self, source: InformationSource) -> CredibilityAssessment:
"""评估信息源可信度"""
scores = {
"domain_authority": self._assess_domain(source),
"content_quality": self._assess_content(source),
"recency": self._assess_recency(source),
"citations": self._assess_citations(source)
}
# 加权平均
weights = {
"domain_authority": 0.3,
"content_quality": 0.3,
"recency": 0.2,
"citations": 0.2
}
overall_score = sum(scores[k] * weights[k] for k in scores)
return CredibilityAssessment(
score=overall_score,
breakdown=scores,
flags=self._identify_red_flags(source)
)
def _assess_domain(self, source: InformationSource) -> float:
"""评估域名权威性"""
trusted_domains = {
".edu": 0.9,
".gov": 0.9,
"wikipedia.org": 0.7,
"arxiv.org": 0.85,
"ieee.org": 0.85
}
for domain, score in trusted_domains.items():
if domain in source.url:
return score
return 0.5 # 默认分数
def _assess_content(self, source: InformationSource) -> float:
"""评估内容质量"""
# 基于内容长度、结构、语言质量等评估
content = source.content
score = 0.5
# 长度检查
if len(content) > 1000:
score += 0.1
# 结构化检查
if any(marker in content for marker in ["##", "###", "Introduction", "Conclusion"]):
score += 0.1
# 引用检查
if "http" in content or "Source:" in content:
score += 0.1
return min(score, 1.0)
def _assess_recency(self, source: InformationSource) -> float:
"""评估时效性"""
if not source.publication_date:
return 0.5
age_days = (datetime.now() - source.publication_date).days
if age_days < 30:
return 1.0
elif age_days < 365:
return 0.8
elif age_days < 365 * 3:
return 0.6
else:
return 0.4
def _assess_citations(self, source: InformationSource) -> float:
"""评估引用情况"""
if source.type == "academic":
return 0.8 # 学术来源默认较高
# 检查内容中的引用
content = source.content
citation_markers = ["[1]", "[2]", "Source:", "According to"]
citation_count = sum(1 for marker in citation_markers if marker in content)
return min(0.5 + citation_count * 0.1, 1.0)
class InformationSynthesizer:
"""信息综合器"""
def __init__(self, llm):
self.llm = llm
async def synthesize(
self,
topic: str,
sources: List[tuple],
depth: int = 3
) -> Synthesis:
"""综合多个信息源"""
# 提取关键信息
key_points = await self._extract_key_points(sources)
# 识别共识与分歧
consensus, disagreements = self._identify_agreements_and_conflicts(key_points)
# 构建论证结构
arguments = await self._build_arguments(key_points, depth)
# 识别知识缺口
gaps = self._identify_knowledge_gaps(topic, key_points)
return Synthesis(
topic=topic,
key_findings=key_points,
consensus_areas=consensus,
contested_areas=disagreements,
argument_structure=arguments,
knowledge_gaps=gaps
)
async def _extract_key_points(
self,
sources: List[tuple]
) -> List[KeyPoint]:
"""从来源中提取关键信息点"""
all_content = "\n\n".join([
f"Source {i+1} (credibility: {assessment.score}): {source.content}"
for i, (source, assessment) in enumerate(sources[:5]) # Top 5 sources
])
prompt = f"""Extract key information points from the following sources.
Sources:
{all_content}
Extract 10-15 key points. For each point:
1. State the fact/claim clearly
2. Note which sources support it
3. Assess confidence level (high/medium/low)
Output as JSON list."""
response = await self.llm.generate(prompt, format="json")
points = json.loads(response)
return [KeyPoint(**p) for p in points]
class ReportGenerator:
"""报告生成器"""
def __init__(self, llm):
self.llm = llm
async def generate(
self,
topic: str,
synthesis: Synthesis,
sources: List[tuple],
config: ResearchConfig
) -> ResearchReport:
"""生成研究报告"""
# 生成大纲
outline = await self._generate_outline(topic, synthesis)
# 逐节生成
sections = []
for section in outline.sections:
content = await self._write_section(section, synthesis, config)
sections.append(ReportSection(
title=section.title,
content=content,
level=section.level
))
# 添加引用
if config.include_citations:
references = self._format_references(sources)
else:
references = []
# 组装报告
report_content = self._assemble_report(sections, references, config)
return ResearchReport(
title=f"Research Report: {topic}",
content=report_content,
sections=sections,
sources_used=len(sources),
generation_date=datetime.now(),
metadata={
"topic": topic,
"config": config,
"synthesis_summary": synthesis.summary()
}
)
def _assemble_report(
self,
sections: List[ReportSection],
references: List[str],
config: ResearchConfig
) -> str:
"""组装最终报告"""
if config.output_format == "markdown":
return self._to_markdown(sections, references)
elif config.output_format == "html":
return self._to_html(sections, references)
else:
return self._to_text(sections, references)
def _to_markdown(
self,
sections: List[ReportSection],
references: List[str]
) -> str:
"""生成Markdown格式报告"""
lines = []
# 标题
lines.append(f"# {sections[0].title if sections else 'Research Report'}")
lines.append(f"\n*Generated on {datetime.now().strftime('%Y-%m-%d')}*\n")
# 内容
for section in sections:
prefix = "#" * section.level
lines.append(f"\n{prefix} {section.title}\n")
lines.append(section.content)
# 引用
if references:
lines.append("\n## References\n")
for i, ref in enumerate(references, 1):
lines.append(f"{i}. {ref}")
return "\n".join(lines)
# 数据类定义
@dataclass
class ResearchQuestion:
question: str
type: str
priority: int
@dataclass
class InformationSource:
type: str
title: str
url: str
content: str
timestamp: datetime
authors: List[str] = None
publication_date: datetime = None
@dataclass
class CredibilityAssessment:
score: float
breakdown: Dict[str, float]
flags: List[str]
@dataclass
class KeyPoint:
statement: str
supporting_sources: List[int]
confidence: str
@dataclass
class Synthesis:
topic: str
key_findings: List[KeyPoint]
consensus_areas: List[str]
contested_areas: List[str]
argument_structure: Dict
knowledge_gaps: List[str]
def summary(self) -> str:
return f"Synthesis of {len(self.key_findings)} key points on '{self.topic}'"
@dataclass
class ReportSection:
title: str
content: str
level: int
@dataclass
class ResearchReport:
title: str
content: str
sections: List[ReportSection]
sources_used: int
generation_date: datetime
metadata: Dict
# 使用示例
async def main():
"""主函数"""
# 初始化组件(需要实际的LLM和搜索客户端)
# llm_client = ...
# search_client = ...
# 创建Agent
# agent = AutonomousResearchAgent(llm_client, search_client)
# 执行研究
# report = await agent.conduct_research("Large Language Models in Healthcare")
# 保存报告
# with open("research_report.md", "w", encoding="utf-8") as f:
# f.write(report.content)
print("自主研究Agent框架已加载")
print("使用方法:")
print(" agent = AutonomousResearchAgent(llm_client, search_client)")
print(" report = await agent.conduct_research('你的研究主题')")
if __name__ == "__main__":
asyncio.run(main()) # 创建事件循环运行顶层协程
8.3 运行与扩展¶
# 安装依赖
pip install aiohttp beautifulsoup4 python-dotenv
# 配置API密钥
export OPENAI_API_KEY="your-key"
export SERPER_API_KEY="your-search-key"
# 运行Agent
python autonomous_research_agent.py
扩展方向: 1. 添加更多数据源(数据库、API等) 2. 实现多语言支持 3. 添加可视化报告生成功能 4. 集成事实核查API 5. 支持协作研究模式
总结¶
新一代AI Agent正在从简单的工具调用向自主执行复杂任务的智能体演进。关键发展趋势包括:
- 端到端自主性:从Manus到Operator,Agent能够独立完成从理解需求到交付成果的完整流程
- 多模态感知:视觉、听觉等多模态能力的整合使Agent能够操作真实世界界面
- 多Agent协作:复杂任务需要多个专业Agent协作完成(Subagent/编排者模式)
- 技能复用与进化:Claude Skills等机制让Agent从Agent“每次从头推理”进化为“越用越熟练”
- 安全可控:随着Agent能力增强,安全护栏和人在回路机制变得至关重要
未来,AI Agent将成为人类工作和生活的智能伙伴,在保持人类监督和价值观对齐的前提下,大幅提升生产力和创造力。
参考资源¶
论文¶
- ReAct: Synergizing Reasoning and Acting in Language Models (Yao et al., 2022)
- Reflexion: Self-Reflective Agents (Shinn et al., 2023)
- Voyager: An Open-Ended Embodied Agent with Large Language Models (Wang et al., 2023)
项目¶
- AutoGPT: https://github.com/Significant-Gravitas/AutoGPT
- LangChain: https://github.com/langchain-ai/langchain
- Microsoft AutoGen: https://github.com/microsoft/autogen
产品¶
- Manus: https://manus.im
- Claude Code: https://docs.anthropic.com/en/docs/agents-and-tools/claude-code
- OpenAI Operator: https://openai.com/operator
文档版本: 1.0 作者: AI Learning Team
最后更新日期:2026-02-12 适用版本:LLM学习教程 v2026