第十一章 从零构建Agent框架¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
核心理念:理解Agent框架的本质,具备"造轮子"的能力
导航: 上一章:GUI Agent | 下一章:Agent Memory系统 | 目录
1. 为什么要从零构建¶
1.1 理解框架黑箱¶
LangChain、CrewAI 等框架极大地降低了 Agent 开发门槛,但也隐藏了大量关键细节:
| 被隐藏的细节 | 实际影响 |
|---|---|
| Prompt 如何拼接 | 无法精确控制 LLM 行为 |
| 工具调用如何路由 | 出错时难以调试 |
| 记忆如何管理 | Token 超限时不知如何处理 |
| 错误如何恢复 | 生产环境稳定性差 |
1.2 定制化需求¶
实际生产中,通用框架常常不够用:
- 安全合规:企业内部 API 调用需要审计日志,框架不支持
- 性能优化:特定场景下需要定制 Token 管理策略
- 私有协议:对接内部工具系统,框架的工具抽象不匹配
1.3 学习 Agent 核心原理¶
构建过程中你会深入理解:
- ReAct 循环的本质——LLM 的结构化输出 + 条件分支
- 工具调用的本质——JSON Schema 描述 + 函数映射
- 记忆管理的本质——上下文窗口的有限资源调度
💡 本章目标:用 ~500 行 Python 代码,不依赖 LangChain/CrewAI,构建一个功能完整的 Agent 框架。
2. 框架整体架构设计¶
2.1 Agent 核心循环¶
Agent 的本质是一个 感知→推理→行动→观察 的闭环:
┌─────────────────────────────────────────┐
│ Agent 核心循环 │
│ │
│ 用户输入 ──→ [感知] 解析意图 │
│ │ │
│ ▼ │
│ [推理] LLM思考 │
│ │ │
│ ┌──────┴──────┐ │
│ ▼ ▼ │
│ 需要工具? 直接回答 │
│ │ │ │
│ ▼ ▼ │
│ [行动] 调用工具 输出结果 │
│ │ │
│ ▼ │
│ [观察] 获取结果 │
│ │ │
│ └──→ 回到 [推理] ──→ ... │
└─────────────────────────────────────────┘
从数学角度,Agent 的行为可以形式化为一个策略函数:
其中 \(s_t\) 是当前状态,\(h_t = \{(a_1, o_1), \ldots, (a_{t-1}, o_{t-1})\}\) 是历史轨迹,\(a_t\) 是下一步行动,\(o_t\) 是观察结果。
2.2 模块化设计¶
我们的框架包含四个核心模块:
mini_agent/
├── __init__.py # 包入口
├── llm.py # LLM引擎模块
├── tools.py # 工具系统
├── prompt.py # 提示模板引擎
├── memory.py # 记忆管理
├── agent.py # Agent核心循环
├── multi_agent.py # 多Agent编排
└── utils.py # 工具函数
2.3 依赖安装¶
3. LLM 引擎模块¶
LLM 引擎是 Agent 的"大脑",负责与大模型 API 交互。
3.1 基础封装¶
"""mini_agent/llm.py - LLM引擎模块"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from typing import Any, Generator
import tiktoken
from openai import OpenAI
@dataclass
class LLMResponse:
"""LLM响应的标准封装"""
content: str | None = None
tool_calls: list[dict[str, Any]] = field(default_factory=list) # 工具调用列表,默认空列表(每实例独立)
usage: dict[str, int] = field(default_factory=dict)
model: str = ""
latency_ms: float = 0.0
@property
def has_tool_calls(self) -> bool:
return len(self.tool_calls) > 0
@dataclass
class ModelConfig:
"""模型配置"""
api_key: str
model: str = "gpt-4o-mini"
base_url: str | None = None # 兼容 DeepSeek、Qwen 等
temperature: float = 0.7
max_tokens: int = 4096
timeout: float = 60.0
class LLMEngine:
"""
LLM引擎:统一封装多模型API调用。
支持:
- OpenAI (gpt-4o, gpt-4o-mini)
- DeepSeek (deepseek-chat, deepseek-reasoner)
- Qwen (qwen-plus, qwen-turbo)
- 任何兼容 OpenAI API 格式的模型
"""
# 常用模型的 Token 编码映射
ENCODING_MAP: dict[str, str] = {
"gpt-4o": "o200k_base",
"gpt-4o-mini": "o200k_base",
"gpt-4": "cl100k_base",
"gpt-3.5-turbo": "cl100k_base",
}
def __init__(self, config: ModelConfig) -> None:
self.config = config
self.client = OpenAI(
api_key=config.api_key,
base_url=config.base_url,
timeout=config.timeout,
)
# 累计使用量跟踪
self.total_prompt_tokens: int = 0
self.total_completion_tokens: int = 0
self.total_cost: float = 0.0
def chat(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
tool_choice: str = "auto",
**kwargs: Any,
) -> LLMResponse:
"""
发送聊天请求并返回标准化响应。
Args:
messages: OpenAI格式的消息列表
tools: 工具定义列表 (JSON Schema)
tool_choice: 工具选择策略 ("auto", "none", "required")
"""
start = time.perf_counter()
# 构建请求参数
params: dict[str, Any] = {
"model": self.config.model,
"messages": messages,
"temperature": self.config.temperature,
"max_tokens": self.config.max_tokens,
**kwargs,
}
if tools:
params["tools"] = tools
params["tool_choice"] = tool_choice
# 调用API
response = self.client.chat.completions.create(**params)
latency = (time.perf_counter() - start) * 1000
# 解析响应
choice = response.choices[0]
message = choice.message
# 提取工具调用
tool_calls = []
if message.tool_calls:
for tc in message.tool_calls:
tool_calls.append({
"id": tc.id,
"name": tc.function.name,
"arguments": json.loads(tc.function.arguments) if tc.function.arguments else {}, # 防止arguments为None或空字符串
})
# 记录Token使用
usage = {}
if response.usage:
usage = {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
}
self.total_prompt_tokens += response.usage.prompt_tokens
self.total_completion_tokens += response.usage.completion_tokens
return LLMResponse(
content=message.content,
tool_calls=tool_calls,
usage=usage,
model=response.model,
latency_ms=latency,
)
def chat_stream(
self,
messages: list[dict[str, Any]],
**kwargs: Any,
) -> Generator[str, None, None]: # Generator[产出类型, send类型, 返回类型]:类型注解表示这是一个yield产出str的生成器
"""流式输出(用于实时显示回答)"""
params: dict[str, Any] = {
"model": self.config.model,
"messages": messages,
"temperature": self.config.temperature,
"max_tokens": self.config.max_tokens,
"stream": True,
**kwargs,
}
stream = self.client.chat.completions.create(**params)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def count_tokens(self, text: str) -> int:
"""计算文本的Token数量"""
encoding_name = self.ENCODING_MAP.get(
self.config.model, "cl100k_base"
)
encoding = tiktoken.get_encoding(encoding_name)
return len(encoding.encode(text))
def count_messages_tokens(self, messages: list[dict[str, Any]]) -> int:
"""
计算消息列表的总Token数。
参考: https://platform.openai.com/docs/guides/chat/introduction
每条消息额外消耗 ~4 tokens (role, content 等元数据)
"""
total = 0
for msg in messages:
total += 4 # 消息元数据开销
for key, value in msg.items():
if isinstance(value, str):
total += self.count_tokens(value)
total += 2 # 回复起始开销
return total
def get_usage_report(self) -> str:
"""获取累计使用量报告"""
return (
f"📊 Token使用统计:\n"
f" Prompt: {self.total_prompt_tokens:,}\n"
f" Completion: {self.total_completion_tokens:,}\n"
f" Total: {self.total_prompt_tokens + self.total_completion_tokens:,}"
)
3.2 多模型适配示例¶
# 使用 OpenAI
openai_config = ModelConfig(api_key="sk-xxx", model="gpt-4o-mini")
engine_openai = LLMEngine(openai_config)
# 使用 DeepSeek(兼容 OpenAI 格式)
deepseek_config = ModelConfig(
api_key="sk-xxx",
model="deepseek-chat",
base_url="https://api.deepseek.com",
)
engine_deepseek = LLMEngine(deepseek_config)
# 使用 Qwen(阿里通义千问)
qwen_config = ModelConfig(
api_key="sk-xxx",
model="qwen-plus",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
engine_qwen = LLMEngine(qwen_config)
4. 工具系统¶
工具系统让 Agent 能够执行实际操作——它是 Agent 从"聊天机器人"进化为"行动者"的关键。
4.1 工具注册装饰器¶
"""mini_agent/tools.py - 工具系统"""
from __future__ import annotations
import inspect
import json
import math
import subprocess
from typing import Any, Callable, get_type_hints
# ──────────────────────────────────────
# 类型映射: Python类型 → JSON Schema类型
# ──────────────────────────────────────
TYPE_MAP: dict[type, str] = {
str: "string",
int: "integer",
float: "number",
bool: "boolean",
list: "array",
dict: "object",
}
class Tool:
"""工具的统一抽象"""
def __init__(
self,
func: Callable[..., Any],
name: str | None = None,
description: str | None = None,
) -> None:
self.func = func
self.name = name or func.__name__
self.description = description or (func.__doc__ or "").strip()
self.schema = self._generate_schema()
def _generate_schema(self) -> dict[str, Any]:
"""
自动从函数签名生成 OpenAI Function Calling 所需的 JSON Schema。
原理:利用 inspect.signature 获取参数名和默认值,
利用 get_type_hints 获取类型注解,自动映射为 JSON Schema。
"""
sig = inspect.signature(self.func) # 获取函数签名(参数名、默认值)
hints = get_type_hints(self.func) # 获取类型注解(参数类型),用于自动生成JSON Schema
properties: dict[str, Any] = {}
required: list[str] = []
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
# 获取类型
python_type = hints.get(param_name, str)
json_type = TYPE_MAP.get(python_type, "string")
# 构建属性Schema
prop: dict[str, Any] = {"type": json_type}
# 尝试从docstring提取参数描述
if self.func.__doc__:
for line in self.func.__doc__.split("\n"):
stripped = line.strip()
if stripped.startswith(f"{param_name}:") or \
stripped.startswith(f"{param_name} :"):
prop["description"] = stripped.split(":", 1)[1].strip()
break
properties[param_name] = prop
# 没有默认值的参数是必填的
if param.default is inspect.Parameter.empty:
required.append(param_name)
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": properties,
"required": required,
},
},
}
def execute(self, **kwargs: Any) -> str:
"""执行工具并返回字符串结果"""
try:
result = self.func(**kwargs)
return str(result)
except Exception as e:
return f"❌ 工具执行错误 [{self.name}]: {e}"
class ToolRegistry:
"""工具注册中心:管理所有可用工具"""
def __init__(self) -> None:
self._tools: dict[str, Tool] = {}
def register(
self,
name: str | None = None,
description: str | None = None,
) -> Callable:
"""
工具注册装饰器。
用法:
@registry.register(name="search", description="搜索网络")
def web_search(query: str) -> str:
...
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
tool = Tool(func, name=name, description=description)
self._tools[tool.name] = tool
return func
return decorator
def get(self, name: str) -> Tool | None:
"""获取工具"""
return self._tools.get(name)
def execute(self, name: str, arguments: dict[str, Any]) -> str:
"""按名称执行工具"""
tool = self._tools.get(name)
if not tool:
return f"❌ 未找到工具: {name}"
return tool.execute(**arguments)
def get_schemas(self) -> list[dict[str, Any]]:
"""获取所有工具的JSON Schema(用于传给LLM)"""
return [tool.schema for tool in self._tools.values()]
def list_tools(self) -> list[str]:
"""列出所有已注册工具名"""
return list(self._tools.keys())
# ──────────────────────────────────
# 全局工具注册表 & 内置工具
# ──────────────────────────────────
registry = ToolRegistry()
@registry.register(description="执行数学表达式计算,支持加减乘除、幂运算、三角函数等")
def calculator(expression: str) -> str:
"""
安全计算数学表达式。
expression: 要计算的数学表达式,如 '2**10 + math.sqrt(144)'
"""
allowed_names = {
k: v for k, v in math.__dict__.items()
if not k.startswith("_")
}
allowed_names["abs"] = abs
allowed_names["round"] = round
try:
# ⚠️ 生产环境请使用 asteval 或 sympy 等安全计算库
# eval 即使限制了 __builtins__,仍可能通过属性链逃逸
# 增加 AST 白名单检查,仅允许安全的数学节点类型
import ast
tree = ast.parse(expression, mode='eval')
for node in ast.walk(tree):
if not isinstance(node, (ast.Expression, ast.BinOp, ast.UnaryOp,
ast.Constant, ast.Call, ast.Name, ast.Attribute,
ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Mod,
ast.Pow, ast.USub, ast.UAdd)):
return f"计算错误: 不支持的表达式节点类型 {type(node).__name__}"
result = eval(expression, {"__builtins__": {}}, allowed_names)
return f"计算结果: {result}"
except Exception as e:
return f"计算错误: {e}"
@registry.register(description="执行Python代码片段并返回标准输出")
def python_executor(code: str) -> str:
"""
在子进程中安全执行Python代码。
code: 要执行的Python代码字符串
"""
try:
result = subprocess.run(
["python", "-c", code],
capture_output=True,
text=True,
timeout=30,
)
output = result.stdout.strip()
if result.returncode != 0:
output += f"\n错误: {result.stderr.strip()}"
return output or "(无输出)"
except subprocess.TimeoutExpired:
return "❌ 代码执行超时(30s)"
@registry.register(description="获取当前日期和时间")
def get_current_time() -> str:
"""返回当前的日期和时间"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
4.2 自定义工具示例¶
# 用户可以轻松添加自己的工具
@registry.register(description="搜索网络获取最新信息")
def web_search(query: str, max_results: int = 5) -> str:
"""
使用搜索API查找信息。
query: 搜索查询关键词
max_results: 最大返回结果数
"""
# 这里用模拟数据演示;生产中替换为真实搜索API
return f"搜索 '{query}' 的结果:\n1. 相关结果A\n2. 相关结果B\n3. 相关结果C"
4.3 查看自动生成的 Schema¶
import json
schemas = registry.get_schemas()
print(json.dumps(schemas[0], indent=2, ensure_ascii=False))
输出:
{
"type": "function",
"function": {
"name": "calculator",
"description": "执行数学表达式计算,支持加减乘除、幂运算、三角函数等",
"parameters": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "要计算的数学表达式,如 '2**10 + math.sqrt(144)'"
}
},
"required": ["expression"]
}
}
}
5. 提示模板引擎¶
提示词(Prompt)决定了 Agent 的行为模式。一个好的提示模板引擎需要做到:结构化、可复用、动态注入。
5.1 提示模板设计¶
"""mini_agent/prompt.py - 提示模板引擎"""
from __future__ import annotations
from datetime import datetime
from typing import Any
class PromptTemplate:
"""
提示模板引擎。
支持变量替换: {variable_name}
支持条件块和工具描述动态注入。
"""
def __init__(self, template: str) -> None:
self.template = template
def render(self, **kwargs: Any) -> str:
"""渲染模板,替换所有占位符"""
result = self.template
for key, value in kwargs.items():
result = result.replace(f"{{{key}}}", str(value))
return result
# ──────────────────────────────────
# 预定义的Prompt模板
# ──────────────────────────────────
REACT_SYSTEM_PROMPT = PromptTemplate("""你是一个智能AI助手,能够使用工具来帮助用户解决问题。
## 当前时间
{current_time}
## 可用工具
{tool_descriptions}
## 工作方式
你遵循 ReAct(Reasoning + Acting)模式工作:
1. **思考(Thought)**:分析用户问题,决定下一步行动
2. **行动(Action)**:如果需要,调用合适的工具
3. **观察(Observation)**:分析工具返回的结果
4. 重复以上步骤直到能给出最终答案
## 重要原则
- 仔细分析问题后再决定是否需要调用工具
- 如果可以直接回答,无需调用工具
- 每步只调用一个工具
- 根据工具返回的结果进行推理,不要编造事实
- 给出清晰、有条理的最终回答
""")
SUMMARIZE_PROMPT = PromptTemplate("""请将以下对话历史压缩为一段简洁的摘要,保留关键信息:
{conversation}
要求:
1. 保留用户的核心需求
2. 保留关键的工具调用结果
3. 保留重要的结论
4. 控制在200字以内
""")
RESEARCH_SYSTEM_PROMPT = PromptTemplate("""你是一个专业的研究助手,擅长通过多步搜索和分析来回答复杂问题。
## 当前时间
{current_time}
## 可用工具
{tool_descriptions}
## 研究方法
1. 将复杂问题分解为子问题
2. 对每个子问题进行搜索和分析
3. 综合所有信息形成完整答案
4. 给出参考来源
## 输出格式
- 使用 Markdown 格式
- 包含清晰的标题和段落
- 关键数据用粗体标注
""")
def format_tool_descriptions(tools_schemas: list[dict[str, Any]]) -> str:
"""将工具Schema格式化为人类可读的描述文本"""
descriptions = []
for schema in tools_schemas:
func = schema["function"]
name = func["name"]
desc = func["description"]
params = func["parameters"].get("properties", {})
param_strs = []
for pname, pinfo in params.items():
ptype = pinfo.get("type", "string")
pdesc = pinfo.get("description", "")
param_strs.append(f" - {pname} ({ptype}): {pdesc}")
params_text = "\n".join(param_strs) if param_strs else " (无参数)"
descriptions.append(f"- **{name}**: {desc}\n 参数:\n{params_text}")
return "\n\n".join(descriptions)
def build_system_prompt(
tools_schemas: list[dict[str, Any]],
template: PromptTemplate = REACT_SYSTEM_PROMPT,
) -> str:
"""构建完整的系统提示词"""
tool_desc = format_tool_descriptions(tools_schemas)
return template.render(
current_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
tool_descriptions=tool_desc,
)
6. Agent 核心循环实现¶
这是全章最核心的部分——实现 ReAct 循环。
6.1 ReAct 循环原理¶
ReAct(Reasoning + Acting)的核心流程:
Thought: 我需要查找2024年GDP数据
Action: web_search(query="2024年中国GDP")
Observation: 中国2024年GDP约为...
Thought: 我已经获得了数据,现在可以回答
Answer: 根据搜索结果...
循环终止条件(满足任一即停止): 1. LLM 直接给出回答(无 tool_calls) 2. 达到最大迭代次数 \(N_{\max}\) 3. 遇到不可恢复的错误
6.2 完整 Agent 类实现¶
"""mini_agent/agent.py - Agent核心循环"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from typing import Any
from .llm import LLMEngine, LLMResponse, ModelConfig
from .memory import ConversationMemory
from .prompt import build_system_prompt, REACT_SYSTEM_PROMPT, PromptTemplate
from .tools import ToolRegistry, registry as default_registry
logger = logging.getLogger(__name__)
@dataclass
class AgentConfig:
"""Agent配置"""
name: str = "MiniAgent"
max_iterations: int = 10 # 最大迭代次数
max_retries: int = 2 # 工具调用失败重试次数
verbose: bool = True # 是否打印中间步骤
system_template: PromptTemplate = field(
default_factory=lambda: REACT_SYSTEM_PROMPT
)
@dataclass
class AgentStep:
"""Agent单步执行记录"""
step_num: int
thought: str | None = None # LLM的思考
action: str | None = None # 调用的工具名
action_input: dict | None = None # 工具参数
observation: str | None = None # 工具返回结果
is_final: bool = False # 是否为最终回答
class Agent:
"""
ReAct Agent 核心实现。
工作流程:
1. 接收用户输入
2. 构建系统提示词(动态注入工具描述)
3. 进入 ReAct 循环
4. 返回最终结果
Usage:
agent = Agent(
llm=LLMEngine(ModelConfig(api_key="sk-xxx")),
tools=registry,
)
result = agent.run("帮我计算 2^10 + 3^5")
"""
def __init__(
self,
llm: LLMEngine,
tools: ToolRegistry | None = None,
memory: ConversationMemory | None = None,
config: AgentConfig | None = None,
) -> None:
self.llm = llm
self.tools = tools or default_registry
self.memory = memory or ConversationMemory(max_tokens=8000)
self.config = config or AgentConfig()
self.steps: list[AgentStep] = []
def run(self, user_input: str) -> str:
"""
执行Agent主循环。
Args:
user_input: 用户输入的问题或指令
Returns:
Agent的最终回答
"""
self.steps = []
# 1. 构建系统提示词
system_prompt = build_system_prompt(
self.tools.get_schemas(),
self.config.system_template,
)
# 2. 初始化消息列表
messages: list[dict[str, Any]] = [
{"role": "system", "content": system_prompt},
]
# 加入历史记忆
messages.extend(self.memory.get_messages())
# 加入当前用户输入
messages.append({"role": "user", "content": user_input})
# 3. ReAct循环
for i in range(1, self.config.max_iterations + 1):
step = AgentStep(step_num=i)
if self.config.verbose:
print(f"\n{'='*50}")
print(f"🔄 Step {i}/{self.config.max_iterations}")
# 调用LLM
response = self._call_llm(messages)
# 情况A: LLM直接给出回答(无工具调用)
if not response.has_tool_calls:
step.thought = response.content
step.is_final = True
self.steps.append(step)
if self.config.verbose:
print(f"💬 最终回答: {response.content}")
# 保存到记忆
self.memory.add_message("user", user_input)
self.memory.add_message("assistant", response.content or "")
return response.content or ""
# 情况B: LLM请求调用工具
# 先记录assistant消息(包含tool_calls)
assistant_msg = self._build_assistant_message(response)
messages.append(assistant_msg)
# 执行所有工具调用
for tool_call in response.tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["arguments"]
tool_id = tool_call["id"]
step.thought = response.content
step.action = tool_name
step.action_input = tool_args
if self.config.verbose:
print(f"🤔 思考: {response.content or '(思考中...)'}")
print(f"🔧 调用工具: {tool_name}")
print(f" 参数: {json.dumps(tool_args, ensure_ascii=False)}")
# 执行工具(带重试)
observation = self._execute_with_retry(tool_name, tool_args)
step.observation = observation
if self.config.verbose:
# 截断过长输出
display = observation[:500] + "..." if len(observation) > 500 else observation
print(f"👁️ 观察: {display}")
# 将工具结果加入消息
messages.append({
"role": "tool",
"tool_call_id": tool_id,
"content": observation,
})
self.steps.append(step)
# 超过最大迭代次数
fallback = "⚠️ 已达到最大推理步数,以下是目前的分析结果:\n"
if self.steps:
last = self.steps[-1]
if last.thought:
fallback += last.thought
return fallback
def _call_llm(self, messages: list[dict[str, Any]]) -> LLMResponse:
"""调用LLM,传入工具定义"""
tools_schemas = self.tools.get_schemas()
return self.llm.chat(
messages=messages,
tools=tools_schemas if tools_schemas else None,
)
def _build_assistant_message(self, response: LLMResponse) -> dict[str, Any]:
"""构建包含tool_calls的assistant消息"""
msg: dict[str, Any] = {"role": "assistant"}
if response.content:
msg["content"] = response.content
if response.tool_calls:
msg["tool_calls"] = [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc["arguments"]),
},
}
for tc in response.tool_calls
]
return msg
def _execute_with_retry(
self, tool_name: str, arguments: dict[str, Any]
) -> str:
"""执行工具调用,失败时自动重试"""
last_error = ""
for attempt in range(self.config.max_retries + 1):
result = self.tools.execute(tool_name, arguments)
if not result.startswith("❌"):
return result
last_error = result
if attempt < self.config.max_retries:
logger.warning(
f"工具 {tool_name} 第{attempt+1}次调用失败,重试中..."
)
return last_error
def get_execution_trace(self) -> str:
"""获取完整的执行轨迹(用于调试)"""
trace_lines = [f"📋 执行轨迹 ({len(self.steps)} steps):"]
for step in self.steps:
trace_lines.append(f"\n--- Step {step.step_num} ---")
if step.thought:
trace_lines.append(f" Thought: {step.thought}")
if step.action:
trace_lines.append(f" Action: {step.action}({step.action_input})")
if step.observation:
trace_lines.append(f" Observation: {step.observation[:200]}")
if step.is_final:
trace_lines.append(" [FINAL ANSWER]")
return "\n".join(trace_lines)
6.3 快速使用¶
from mini_agent.llm import LLMEngine, ModelConfig
from mini_agent.agent import Agent
# 初始化
llm = LLMEngine(ModelConfig(api_key="sk-xxx", model="gpt-4o-mini"))
agent = Agent(llm=llm)
# 运行
answer = agent.run("帮我计算 2的10次方加上144的平方根")
print(answer)
# 查看执行轨迹
print(agent.get_execution_trace())
预期输出:
==================================================
🔄 Step 1/10
🤔 思考: 我需要计算 2^10 + √144,让我使用计算器工具
🔧 调用工具: calculator
参数: {"expression": "2**10 + math.sqrt(144)"}
👁️ 观察: 计算结果: 1036.0
==================================================
🔄 Step 2/10
💬 最终回答: 2的10次方 (1024) 加上144的平方根 (12) 等于 **1036**。
7. 记忆管理¶
Agent 的记忆系统解决一个核心问题:如何在有限的上下文窗口中保留最有价值的信息。
7.1 对话历史与滑动窗口¶
"""mini_agent/memory.py - 记忆管理"""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass, field
from typing import Any
import tiktoken
@dataclass
class Message:
"""消息结构"""
role: str # "user", "assistant", "system", "tool"
content: str
metadata: dict[str, Any] = field(default_factory=dict)
class ConversationMemory:
"""
对话记忆管理器。
策略:
1. 维护完整的对话历史
2. 当Token超限时,使用滑动窗口 + 最旧消息移除
3. 可选:摘要压缩(将旧消息压缩为摘要)
"""
def __init__(
self,
max_tokens: int = 8000,
encoding_name: str = "cl100k_base",
) -> None:
self.max_tokens = max_tokens
self.encoding = tiktoken.get_encoding(encoding_name)
self._messages: deque[Message] = deque() # 双端队列:左侧弹出O(1),适合FIFO消息管理
self._summary: str = "" # 被压缩的历史摘要
def add_message(self, role: str, content: str, **metadata: Any) -> None:
"""添加一条消息"""
self._messages.append(Message(role=role, content=content, metadata=metadata))
self._trim_if_needed()
def get_messages(self) -> list[dict[str, str]]:
"""获取当前记忆中的消息列表(OpenAI格式)"""
messages = []
# 如果有压缩摘要,作为system消息注入
if self._summary:
messages.append({
"role": "system",
"content": f"[历史对话摘要]\n{self._summary}",
})
for msg in self._messages:
messages.append({"role": msg.role, "content": msg.content})
return messages
def _count_tokens(self, text: str) -> int:
return len(self.encoding.encode(text))
def _total_tokens(self) -> int:
total = 0
if self._summary:
total += self._count_tokens(self._summary)
for msg in self._messages:
total += self._count_tokens(msg.content) + 4 # 消息元数据
return total
def _trim_if_needed(self) -> None:
"""当Token数超限时,移除最旧的消息"""
while self._total_tokens() > self.max_tokens and len(self._messages) > 1:
self._messages.popleft()
def clear(self) -> None:
"""清空所有记忆"""
self._messages.clear()
self._summary = ""
def set_summary(self, summary: str) -> None:
"""设置压缩后的历史摘要"""
self._summary = summary
@property
def message_count(self) -> int:
return len(self._messages)
@property
def token_count(self) -> int:
return self._total_tokens()
7.2 摘要压缩¶
当对话变长时,可以用 LLM 自身来压缩历史:
def compress_memory(agent: Agent) -> None:
"""使用LLM压缩对话历史"""
from .prompt import SUMMARIZE_PROMPT
messages = agent.memory.get_messages()
if len(messages) < 6: # 对话太短不需要压缩
return
# 将前半部分对话压缩为摘要
mid = len(messages) // 2
old_messages = messages[:mid]
conversation_text = "\n".join(
f"{m['role']}: {m['content']}" for m in old_messages
)
summary_prompt = SUMMARIZE_PROMPT.render(conversation=conversation_text)
response = agent.llm.chat([
{"role": "user", "content": summary_prompt}
])
# 更新记忆
agent.memory.set_summary(response.content or "")
# 移除已压缩的旧消息
# 注意:此处直接访问了_messages私有属性,建议为Memory类添加
# 公开方法如 remove_oldest(n) 来封装此操作
for _ in range(mid):
if agent.memory._messages:
agent.memory._messages.popleft()
8. 多 Agent 编排¶
多个 Agent 协作可以解决更复杂的任务。我们实现一个简单但实用的 Supervisor(主管)模式。
8.1 Supervisor 架构¶
┌─────────────┐
│ Supervisor │ (任务分发 & 结果聚合)
│ Agent │
└──────┬──────┘
┌─────┼─────┐
▼ ▼ ▼
┌─────┐┌─────┐┌─────┐
│搜索 ││分析 ││写作 │
│Agent││Agent││Agent│
└─────┘└─────┘└─────┘
8.2 实现代码¶
"""mini_agent/multi_agent.py - 多Agent编排"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any
from .agent import Agent, AgentConfig
from .llm import LLMEngine
from .tools import ToolRegistry
@dataclass
class AgentMessage:
"""Agent之间传递的消息"""
sender: str
receiver: str
content: str
metadata: dict[str, Any] = field(default_factory=dict)
class SupervisorAgent:
"""
主管Agent:负责分解任务、分发给子Agent、聚合结果。
工作流程:
1. 接收用户任务
2. 用LLM分解为子任务
3. 将子任务分发给对应的子Agent
4. 收集结果并综合回答
"""
def __init__(
self,
llm: LLMEngine,
sub_agents: dict[str, Agent],
) -> None:
self.llm = llm
self.sub_agents = sub_agents
self.message_log: list[AgentMessage] = []
def run(self, task: str) -> str:
"""执行多Agent协作任务"""
# Step 1: 让LLM分解任务
agent_names = list(self.sub_agents.keys())
decompose_prompt = f"""你是一个任务分解专家。请将以下任务分解为子任务,并分配给合适的Agent。
可用Agent: {json.dumps(agent_names, ensure_ascii=False)}
任务: {task}
请以JSON格式返回,示例:
[
{{"agent": "agent_name", "subtask": "具体子任务描述"}},
...
]
只返回JSON数组,不要其他内容。"""
response = self.llm.chat([
{"role": "user", "content": decompose_prompt}
])
# Step 2: 解析任务分配
try:
content = response.content or "[]"
# 清理可能的markdown代码块标记
content = content.strip().strip("```json").strip("```").strip()
subtasks = json.loads(content)
except json.JSONDecodeError:
return f"任务分解失败: {response.content}"
# Step 3: 依次执行子任务
results: list[dict[str, str]] = []
for item in subtasks:
agent_name = item.get("agent", "")
subtask = item.get("subtask", "")
agent = self.sub_agents.get(agent_name)
if not agent:
results.append({
"agent": agent_name,
"result": f"⚠️ 未找到Agent: {agent_name}",
})
continue
print(f"\n📤 分发给 [{agent_name}]: {subtask}")
result = agent.run(subtask)
results.append({"agent": agent_name, "result": result})
# 记录消息
self.message_log.append(AgentMessage(
sender="supervisor", receiver=agent_name, content=subtask,
))
self.message_log.append(AgentMessage(
sender=agent_name, receiver="supervisor", content=result,
))
# Step 4: 综合结果
synthesis_prompt = f"""你是一个结果综合专家。请根据以下子任务的执行结果,给出完整的最终回答。
原始任务: {task}
各Agent执行结果:
{json.dumps(results, ensure_ascii=False, indent=2)}
请给出结构化的综合回答:"""
final_response = self.llm.chat([
{"role": "user", "content": synthesis_prompt}
])
return final_response.content or ""
8.3 使用多 Agent¶
from mini_agent.llm import LLMEngine, ModelConfig
from mini_agent.agent import Agent, AgentConfig
from mini_agent.multi_agent import SupervisorAgent
from mini_agent.tools import ToolRegistry, registry
llm = LLMEngine(ModelConfig(api_key="sk-xxx", model="gpt-4o-mini"))
# 创建专职Agent
researcher = Agent(
llm=llm,
tools=registry,
config=AgentConfig(name="researcher", max_iterations=5),
)
analyst = Agent(
llm=llm,
tools=registry,
config=AgentConfig(name="analyst", max_iterations=5),
)
# 主管Agent
supervisor = SupervisorAgent(
llm=llm,
sub_agents={"researcher": researcher, "analyst": analyst},
)
result = supervisor.run("分析Python和Rust在AI领域的应用前景")
print(result)
9. 完整项目:智能研究助手¶
将所有模块集成,构建一个能够 搜索→分析→生成报告 的智能研究助手。
9.1 项目结构¶
research_assistant/
├── main.py # 入口文件
├── mini_agent/ # 我们构建的框架
│ ├── __init__.py
│ ├── llm.py
│ ├── tools.py
│ ├── prompt.py
│ ├── memory.py
│ ├── agent.py
│ └── multi_agent.py
└── requirements.txt
9.2 __init__.py 包入口¶
"""mini_agent - 从零构建的轻量级Agent框架"""
from .llm import LLMEngine, ModelConfig, LLMResponse
from .tools import Tool, ToolRegistry, registry
from .prompt import PromptTemplate, build_system_prompt
from .memory import ConversationMemory
from .agent import Agent, AgentConfig
__all__ = [ # 定义模块公开API,控制from package import *的导出范围
"LLMEngine", "ModelConfig", "LLMResponse",
"Tool", "ToolRegistry", "registry",
"PromptTemplate", "build_system_prompt",
"ConversationMemory",
"Agent", "AgentConfig",
]
9.3 主程序¶
"""main.py - 智能研究助手"""
import os
from mini_agent import (
LLMEngine, ModelConfig, Agent, AgentConfig,
ToolRegistry, ConversationMemory,
)
from mini_agent.prompt import RESEARCH_SYSTEM_PROMPT
from mini_agent.tools import registry
def setup_research_agent() -> Agent:
"""配置研究助手Agent"""
# 添加研究专用工具
@registry.register(description="搜索学术论文和技术文章")
def search_papers(query: str, year: int = 2024) -> str:
"""
搜索学术论文。
query: 搜索关键词
year: 发表年份筛选
"""
# 生产中替换为 Semantic Scholar / arXiv API
return (
f"搜索 '{query}' ({year}年) 的论文结果:\n"
f"1. [论文A] {query}的最新进展 - 2024\n"
f"2. [论文B] {query}在工业界的应用 - 2024\n"
f"3. [论文C] {query}综述 - 2024"
)
@registry.register(description="读取和分析网页内容")
def read_webpage(url: str) -> str:
"""
读取网页内容。
url: 要读取的网页URL
"""
# 生产中使用 requests + BeautifulSoup
return f"[模拟] 网页 {url} 的主要内容:这是一篇关于AI技术的文章..."
@registry.register(description="生成Markdown格式的研究报告")
def generate_report(title: str, sections: str) -> str:
"""
生成结构化报告。
title: 报告标题
sections: 报告各节内容(JSON格式)
"""
return f"# {title}\n\n{sections}\n\n---\n*报告生成完成*"
# 创建Agent
llm = LLMEngine(ModelConfig(
api_key=os.getenv("OPENAI_API_KEY", "sk-xxx"),
model="gpt-4o-mini",
))
agent = Agent(
llm=llm,
tools=registry,
memory=ConversationMemory(max_tokens=12000),
config=AgentConfig(
name="ResearchAssistant",
max_iterations=8,
verbose=True,
system_template=RESEARCH_SYSTEM_PROMPT,
),
)
return agent
def main() -> None:
"""交互式研究助手"""
agent = setup_research_agent()
print("🔬 智能研究助手已启动")
print(" 输入问题开始研究,输入 'quit' 退出\n")
while True:
user_input = input("👤 你: ").strip()
if user_input.lower() in ("quit", "exit", "q"):
print("\n" + agent.llm.get_usage_report())
print("👋 再见!")
break
if not user_input:
continue
answer = agent.run(user_input)
print(f"\n🤖 助手: {answer}\n")
if __name__ == "__main__":
main()
9.4 requirements.txt¶
10. 与主流框架对比¶
10.1 功能对比¶
| 特性 | 我们的框架 | LangGraph | CrewAI |
|---|---|---|---|
| 代码量 | ~500行 | ~50,000行 | ~30,000行 |
| 学习曲线 | 低 | 高 | 中 |
| ReAct循环 | ✅ | ✅ | ✅ |
| 工具系统 | ✅ 自动Schema | ✅ 丰富生态 | ✅ 内置工具 |
| 多Agent | ✅ Supervisor | ✅ 图编排 | ✅ 角色扮演 |
| 记忆管理 | ✅ 基础 | ✅ 检查点 | ✅ 基础 |
| 流式输出 | ✅ | ✅ | ✅ |
| 可观测性 | 基础日志 | LangSmith | 内置 |
| 生产就绪 | ⚠️ 需完善 | ✅ | ✅ |
| 定制灵活性 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ |
10.2 何时自研 vs 使用框架¶
选择自研框架: - 需要深度定制 Agent 行为 - 安全/合规要求严格,需要完全掌控代码 - 团队有足够的工程能力 - 需要极致的性能优化
选择成熟框架: - 快速原型验证 - 需要丰富的工具生态 - 团队对 Agent 技术不够熟悉 - 需要开箱即用的可观测性
10.3 架构演进建议¶
MVP阶段 成长阶段 成熟阶段
┌──────────┐ ┌──────────────┐ ┌───────────────┐
│ 自研框架 │ → │ 自研 + 框架 │ → │ 平台化Agent │
│ ~500行 │ │ 组件混用 │ │ 基础设施 │
└──────────┘ └──────────────┘ └───────────────┘
验证想法 补齐短板 规模化运营
📝 本章小结¶
本章从零构建了一个完整的Agent框架,深入理解了Agent运行机制的每个环节:
- ✅ 理解了自研Agent框架的价值(去除黑箱、完全掌控、极致优化)
- ✅ 掌握了Agent框架的模块化架构设计(LLM引擎 / 工具系统 / 提示模板 / 记忆管理)
- ✅ 实现了LLM引擎模块(统一接口、流式输出、Token计数)
- ✅ 实现了工具系统(装饰器注册、JSON Schema自动生成、安全执行)
- ✅ 实现了Prompt模板引擎(变量渲染、预定义模板、工具描述格式化)
- ✅ 实现了Agent核心ReAct循环(Observe→Think→Act→Observe)
- ✅ 实现了记忆管理模块(滑动窗口、Token裁剪、LLM摘要压缩)
- ✅ 实现了多Agent编排(Supervisor模式、任务分解、结果综合)
- ✅ 完成了完整项目集成(Research Assistant研究助手)
- ✅ 对比了自研框架与主流框架(LangGraph/CrewAI/OpenAI SDK)的取舍
✅ 学习检查清单¶
- 能解释自研Agent框架相比使用LangChain的优劣势
- 能实现LLM引擎模块(统一调用接口、流式输出、Token计数)
- 能实现工具注册系统(装饰器模式、JSON Schema自动生成)
- 能实现ReAct核心循环(解析tool_calls → 执行工具 → 注入结果 → 继续推理)
- 能实现对话记忆管理(滑动窗口、Token裁剪、摘要压缩)
- 能实现Supervisor多Agent编排(任务分解 → 子Agent执行 → 结果综合)
- 理解Agent框架的模块划分原则(LLM引擎/工具/Prompt/记忆/编排)
- 能将各模块集成为完整可运行的Agent项目
- 了解自研框架与LangGraph/CrewAI/OpenAI SDK的功能对应关系
12. 面试题¶
Q1: ReAct与Plan-and-Execute有什么区别?各适用于什么场景?
ReAct是“边想边做”,每步都可能调整方向,适合探索性任务和需要实时反馈的场景。Plan-and-Execute是“先规划再执行”,适合目标明确、步骤可预测的任务。实践中常用混合模式:先规划大纲,每步用ReAct执行,并根据结果动态调整计划。
Q2: 为什么要自研Agent框架而不直接用LangChain?
自研的价值在于:① 深入理解原理,调试问题时不再“黑箱”。② 定制化能力更强,安全/合规要求严格时需完全掌控代码。③ 性能可极致优化,去除不需要的抽象层。但快速原型验证、生态丰富的场景还是应该用成熟框架。
Q3: Agent框架中的工具调用如何保证安全性?
关键措施:① 白名单制:只允许调用注册的工具。② 参数验证:JSON Schema校验输入。③ 权限分级:危险操作(文件写入、代码执行)需人工审批。④ 资源限制:超时、内存限制、网络限制。⑤ 日志审计:记录所有工具调用。
13. 练习与扩展¶
练习1:添加新工具(⭐)¶
为 ToolRegistry 添加一个文件读写工具,使 Agent 能够创建和读取本地文件。
练习2:实现并行工具调用(⭐⭐)¶
当 LLM 返回多个 tool_calls 时,当前实现是串行执行。请使用 asyncio 改造为并行执行。
提示:
import asyncio
async def execute_tools_parallel(
tools: ToolRegistry,
tool_calls: list[dict],
) -> list[str]:
tasks = [
asyncio.to_thread(tools.execute, tc["name"], tc["arguments"])
for tc in tool_calls
]
return await asyncio.gather(*tasks)
练习3:添加人类审批节点(⭐⭐)¶
在工具执行前加入人类确认环节——当 Agent 要执行"危险"操作(如代码执行、文件写入)时,先打印操作内容,等待用户输入 y/n 确认。
练习4:持久化对话历史(⭐⭐⭐)¶
将 ConversationMemory 扩展为支持 SQLite 持久化,使得 Agent 重启后能恢复上次对话。
练习5:实现 Planning Agent(⭐⭐⭐)¶
在 ReAct 之上实现 Plan-and-Execute 模式: 1. Agent 先制定完整计划(步骤列表) 2. 逐步执行每个步骤 3. 根据中间结果动态调整计划
🔗 下一步¶
下一章我们将深入Agent的记忆系统,学习如何让Agent具备长期记忆能力。
继续学习: 12-Agent记忆系统
祝你学习愉快! 🎉
最后更新日期:2026-02-12 适用版本:AI Agent开发实战教程 v2026