跳转至

第十一章 从零构建Agent框架

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

核心理念:理解Agent框架的本质,具备"造轮子"的能力

参考项目:hello-agents Chapter 7


导航: 上一章:GUI Agent | 下一章:Agent Memory系统 | 目录


1. 为什么要从零构建

1.1 理解框架黑箱

LangChain、CrewAI 等框架极大地降低了 Agent 开发门槛,但也隐藏了大量关键细节:

被隐藏的细节 实际影响
Prompt 如何拼接 无法精确控制 LLM 行为
工具调用如何路由 出错时难以调试
记忆如何管理 Token 超限时不知如何处理
错误如何恢复 生产环境稳定性差

1.2 定制化需求

实际生产中,通用框架常常不够用:

  • 安全合规:企业内部 API 调用需要审计日志,框架不支持
  • 性能优化:特定场景下需要定制 Token 管理策略
  • 私有协议:对接内部工具系统,框架的工具抽象不匹配

1.3 学习 Agent 核心原理

构建过程中你会深入理解:

  1. ReAct 循环的本质——LLM 的结构化输出 + 条件分支
  2. 工具调用的本质——JSON Schema 描述 + 函数映射
  3. 记忆管理的本质——上下文窗口的有限资源调度

💡 本章目标:用 ~500 行 Python 代码,不依赖 LangChain/CrewAI,构建一个功能完整的 Agent 框架。


2. 框架整体架构设计

2.1 Agent 核心循环

Agent 的本质是一个 感知→推理→行动→观察 的闭环:

Text Only
┌─────────────────────────────────────────┐
│              Agent 核心循环               │
│                                         │
│   用户输入 ──→ [感知] 解析意图           │
│                  │                      │
│                  ▼                      │
│              [推理] LLM思考             │
│                  │                      │
│           ┌──────┴──────┐               │
│           ▼             ▼               │
│      需要工具?      直接回答             │
│           │             │               │
│           ▼             ▼               │
│      [行动] 调用工具  输出结果           │
│           │                             │
│           ▼                             │
│      [观察] 获取结果                     │
│           │                             │
│           └──→ 回到 [推理] ──→ ...      │
└─────────────────────────────────────────┘

从数学角度,Agent 的行为可以形式化为一个策略函数:

\[\pi(a_t | s_t, h_t) = \text{LLM}(a_t | \text{prompt}(s_t, h_t))\]

其中 \(s_t\) 是当前状态,\(h_t = \{(a_1, o_1), \ldots, (a_{t-1}, o_{t-1})\}\) 是历史轨迹,\(a_t\) 是下一步行动,\(o_t\) 是观察结果。

2.2 模块化设计

我们的框架包含四个核心模块:

Text Only
mini_agent/
├── __init__.py          # 包入口
├── llm.py               # LLM引擎模块
├── tools.py             # 工具系统
├── prompt.py            # 提示模板引擎
├── memory.py            # 记忆管理
├── agent.py             # Agent核心循环
├── multi_agent.py       # 多Agent编排
└── utils.py             # 工具函数

2.3 依赖安装

Bash
pip install openai>=1.0.0 tiktoken pydantic>=2.0

3. LLM 引擎模块

LLM 引擎是 Agent 的"大脑",负责与大模型 API 交互。

3.1 基础封装

Python
"""mini_agent/llm.py - LLM引擎模块"""

from __future__ import annotations

import json
import time
from dataclasses import dataclass, field
from typing import Any, Generator

import tiktoken
from openai import OpenAI

@dataclass
class LLMResponse:
    """LLM响应的标准封装"""
    content: str | None = None
    tool_calls: list[dict[str, Any]] = field(default_factory=list)  # 工具调用列表,默认空列表(每实例独立)
    usage: dict[str, int] = field(default_factory=dict)
    model: str = ""
    latency_ms: float = 0.0

    @property
    def has_tool_calls(self) -> bool:
        return len(self.tool_calls) > 0

@dataclass
class ModelConfig:
    """模型配置"""
    api_key: str
    model: str = "gpt-4o-mini"
    base_url: str | None = None          # 兼容 DeepSeek、Qwen 等
    temperature: float = 0.7
    max_tokens: int = 4096
    timeout: float = 60.0

class LLMEngine:
    """
    LLM引擎:统一封装多模型API调用。

    支持:
    - OpenAI (gpt-4o, gpt-4o-mini)
    - DeepSeek (deepseek-chat, deepseek-reasoner)
    - Qwen (qwen-plus, qwen-turbo)
    - 任何兼容 OpenAI API 格式的模型
    """

    # 常用模型的 Token 编码映射
    ENCODING_MAP: dict[str, str] = {
        "gpt-4o": "o200k_base",
        "gpt-4o-mini": "o200k_base",
        "gpt-4": "cl100k_base",
        "gpt-3.5-turbo": "cl100k_base",
    }

    def __init__(self, config: ModelConfig) -> None:
        self.config = config
        self.client = OpenAI(
            api_key=config.api_key,
            base_url=config.base_url,
            timeout=config.timeout,
        )
        # 累计使用量跟踪
        self.total_prompt_tokens: int = 0
        self.total_completion_tokens: int = 0
        self.total_cost: float = 0.0

    def chat(
        self,
        messages: list[dict[str, Any]],
        tools: list[dict[str, Any]] | None = None,
        tool_choice: str = "auto",
        **kwargs: Any,
    ) -> LLMResponse:
        """
        发送聊天请求并返回标准化响应。

        Args:
            messages: OpenAI格式的消息列表
            tools: 工具定义列表 (JSON Schema)
            tool_choice: 工具选择策略 ("auto", "none", "required")
        """
        start = time.perf_counter()

        # 构建请求参数
        params: dict[str, Any] = {
            "model": self.config.model,
            "messages": messages,
            "temperature": self.config.temperature,
            "max_tokens": self.config.max_tokens,
            **kwargs,
        }
        if tools:
            params["tools"] = tools
            params["tool_choice"] = tool_choice

        # 调用API
        response = self.client.chat.completions.create(**params)
        latency = (time.perf_counter() - start) * 1000

        # 解析响应
        choice = response.choices[0]
        message = choice.message

        # 提取工具调用
        tool_calls = []
        if message.tool_calls:
            for tc in message.tool_calls:
                tool_calls.append({
                    "id": tc.id,
                    "name": tc.function.name,
                    "arguments": json.loads(tc.function.arguments) if tc.function.arguments else {},  # 防止arguments为None或空字符串
                })

        # 记录Token使用
        usage = {}
        if response.usage:
            usage = {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens,
            }
            self.total_prompt_tokens += response.usage.prompt_tokens
            self.total_completion_tokens += response.usage.completion_tokens

        return LLMResponse(
            content=message.content,
            tool_calls=tool_calls,
            usage=usage,
            model=response.model,
            latency_ms=latency,
        )

    def chat_stream(
        self,
        messages: list[dict[str, Any]],
        **kwargs: Any,
    ) -> Generator[str, None, None]:  # Generator[产出类型, send类型, 返回类型]:类型注解表示这是一个yield产出str的生成器
        """流式输出(用于实时显示回答)"""
        params: dict[str, Any] = {
            "model": self.config.model,
            "messages": messages,
            "temperature": self.config.temperature,
            "max_tokens": self.config.max_tokens,
            "stream": True,
            **kwargs,
        }
        stream = self.client.chat.completions.create(**params)
        for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

    def count_tokens(self, text: str) -> int:
        """计算文本的Token数量"""
        encoding_name = self.ENCODING_MAP.get(
            self.config.model, "cl100k_base"
        )
        encoding = tiktoken.get_encoding(encoding_name)
        return len(encoding.encode(text))

    def count_messages_tokens(self, messages: list[dict[str, Any]]) -> int:
        """
        计算消息列表的总Token数。
        参考: https://platform.openai.com/docs/guides/chat/introduction
        每条消息额外消耗 ~4 tokens (role, content 等元数据)
        """
        total = 0
        for msg in messages:
            total += 4  # 消息元数据开销
            for key, value in msg.items():
                if isinstance(value, str):
                    total += self.count_tokens(value)
        total += 2  # 回复起始开销
        return total

    def get_usage_report(self) -> str:
        """获取累计使用量报告"""
        return (
            f"📊 Token使用统计:\n"
            f"  Prompt: {self.total_prompt_tokens:,}\n"
            f"  Completion: {self.total_completion_tokens:,}\n"
            f"  Total: {self.total_prompt_tokens + self.total_completion_tokens:,}"
        )

3.2 多模型适配示例

Python
# 使用 OpenAI
openai_config = ModelConfig(api_key="sk-xxx", model="gpt-4o-mini")
engine_openai = LLMEngine(openai_config)

# 使用 DeepSeek(兼容 OpenAI 格式)
deepseek_config = ModelConfig(
    api_key="sk-xxx",
    model="deepseek-chat",
    base_url="https://api.deepseek.com",
)
engine_deepseek = LLMEngine(deepseek_config)

# 使用 Qwen(阿里通义千问)
qwen_config = ModelConfig(
    api_key="sk-xxx",
    model="qwen-plus",
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
engine_qwen = LLMEngine(qwen_config)

4. 工具系统

工具系统让 Agent 能够执行实际操作——它是 Agent 从"聊天机器人"进化为"行动者"的关键。

4.1 工具注册装饰器

Python
"""mini_agent/tools.py - 工具系统"""

from __future__ import annotations

import inspect
import json
import math
import subprocess
from typing import Any, Callable, get_type_hints

# ──────────────────────────────────────
# 类型映射: Python类型 → JSON Schema类型
# ──────────────────────────────────────
TYPE_MAP: dict[type, str] = {
    str: "string",
    int: "integer",
    float: "number",
    bool: "boolean",
    list: "array",
    dict: "object",
}

class Tool:
    """工具的统一抽象"""

    def __init__(
        self,
        func: Callable[..., Any],
        name: str | None = None,
        description: str | None = None,
    ) -> None:
        self.func = func
        self.name = name or func.__name__
        self.description = description or (func.__doc__ or "").strip()
        self.schema = self._generate_schema()

    def _generate_schema(self) -> dict[str, Any]:
        """
        自动从函数签名生成 OpenAI Function Calling 所需的 JSON Schema。

        原理:利用 inspect.signature 获取参数名和默认值,
        利用 get_type_hints 获取类型注解,自动映射为 JSON Schema。
        """
        sig = inspect.signature(self.func)  # 获取函数签名(参数名、默认值)
        hints = get_type_hints(self.func)  # 获取类型注解(参数类型),用于自动生成JSON Schema

        properties: dict[str, Any] = {}
        required: list[str] = []

        for param_name, param in sig.parameters.items():
            if param_name == "self":
                continue

            # 获取类型
            python_type = hints.get(param_name, str)
            json_type = TYPE_MAP.get(python_type, "string")

            # 构建属性Schema
            prop: dict[str, Any] = {"type": json_type}

            # 尝试从docstring提取参数描述
            if self.func.__doc__:
                for line in self.func.__doc__.split("\n"):
                    stripped = line.strip()
                    if stripped.startswith(f"{param_name}:") or \
                       stripped.startswith(f"{param_name} :"):
                        prop["description"] = stripped.split(":", 1)[1].strip()
                        break

            properties[param_name] = prop

            # 没有默认值的参数是必填的
            if param.default is inspect.Parameter.empty:
                required.append(param_name)

        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.description,
                "parameters": {
                    "type": "object",
                    "properties": properties,
                    "required": required,
                },
            },
        }

    def execute(self, **kwargs: Any) -> str:
        """执行工具并返回字符串结果"""
        try:
            result = self.func(**kwargs)
            return str(result)
        except Exception as e:
            return f"❌ 工具执行错误 [{self.name}]: {e}"

class ToolRegistry:
    """工具注册中心:管理所有可用工具"""

    def __init__(self) -> None:
        self._tools: dict[str, Tool] = {}

    def register(
        self,
        name: str | None = None,
        description: str | None = None,
    ) -> Callable:
        """
        工具注册装饰器。

        用法:
            @registry.register(name="search", description="搜索网络")
            def web_search(query: str) -> str:
                ...
        """
        def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
            tool = Tool(func, name=name, description=description)
            self._tools[tool.name] = tool
            return func
        return decorator

    def get(self, name: str) -> Tool | None:
        """获取工具"""
        return self._tools.get(name)

    def execute(self, name: str, arguments: dict[str, Any]) -> str:
        """按名称执行工具"""
        tool = self._tools.get(name)
        if not tool:
            return f"❌ 未找到工具: {name}"
        return tool.execute(**arguments)

    def get_schemas(self) -> list[dict[str, Any]]:
        """获取所有工具的JSON Schema(用于传给LLM)"""
        return [tool.schema for tool in self._tools.values()]

    def list_tools(self) -> list[str]:
        """列出所有已注册工具名"""
        return list(self._tools.keys())

# ──────────────────────────────────
# 全局工具注册表 & 内置工具
# ──────────────────────────────────
registry = ToolRegistry()

@registry.register(description="执行数学表达式计算,支持加减乘除、幂运算、三角函数等")
def calculator(expression: str) -> str:
    """
    安全计算数学表达式。
    expression: 要计算的数学表达式,如 '2**10 + math.sqrt(144)'
    """
    allowed_names = {
        k: v for k, v in math.__dict__.items()
        if not k.startswith("_")
    }
    allowed_names["abs"] = abs
    allowed_names["round"] = round
    try:
        # ⚠️ 生产环境请使用 asteval 或 sympy 等安全计算库
        # eval 即使限制了 __builtins__,仍可能通过属性链逃逸
        # 增加 AST 白名单检查,仅允许安全的数学节点类型
        import ast
        tree = ast.parse(expression, mode='eval')
        for node in ast.walk(tree):
            if not isinstance(node, (ast.Expression, ast.BinOp, ast.UnaryOp,
                                    ast.Constant, ast.Call, ast.Name, ast.Attribute,
                                    ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Mod,
                                    ast.Pow, ast.USub, ast.UAdd)):
                return f"计算错误: 不支持的表达式节点类型 {type(node).__name__}"
        result = eval(expression, {"__builtins__": {}}, allowed_names)
        return f"计算结果: {result}"
    except Exception as e:
        return f"计算错误: {e}"

@registry.register(description="执行Python代码片段并返回标准输出")
def python_executor(code: str) -> str:
    """
    在子进程中安全执行Python代码。
    code: 要执行的Python代码字符串
    """
    try:
        result = subprocess.run(
            ["python", "-c", code],
            capture_output=True,
            text=True,
            timeout=30,
        )
        output = result.stdout.strip()
        if result.returncode != 0:
            output += f"\n错误: {result.stderr.strip()}"
        return output or "(无输出)"
    except subprocess.TimeoutExpired:
        return "❌ 代码执行超时(30s)"

@registry.register(description="获取当前日期和时间")
def get_current_time() -> str:
    """返回当前的日期和时间"""
    from datetime import datetime
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

4.2 自定义工具示例

Python
# 用户可以轻松添加自己的工具
@registry.register(description="搜索网络获取最新信息")
def web_search(query: str, max_results: int = 5) -> str:
    """
    使用搜索API查找信息。
    query: 搜索查询关键词
    max_results: 最大返回结果数
    """
    # 这里用模拟数据演示;生产中替换为真实搜索API
    return f"搜索 '{query}' 的结果:\n1. 相关结果A\n2. 相关结果B\n3. 相关结果C"

4.3 查看自动生成的 Schema

Python
import json
schemas = registry.get_schemas()
print(json.dumps(schemas[0], indent=2, ensure_ascii=False))

输出:

JSON
{
  "type": "function",
  "function": {
    "name": "calculator",
    "description": "执行数学表达式计算,支持加减乘除、幂运算、三角函数等",
    "parameters": {
      "type": "object",
      "properties": {
        "expression": {
          "type": "string",
          "description": "要计算的数学表达式,如 '2**10 + math.sqrt(144)'"
        }
      },
      "required": ["expression"]
    }
  }
}

5. 提示模板引擎

提示词(Prompt)决定了 Agent 的行为模式。一个好的提示模板引擎需要做到:结构化可复用动态注入

5.1 提示模板设计

Python
"""mini_agent/prompt.py - 提示模板引擎"""

from __future__ import annotations

from datetime import datetime
from typing import Any

class PromptTemplate:
    """
    提示模板引擎。

    支持变量替换: {variable_name}
    支持条件块和工具描述动态注入。
    """

    def __init__(self, template: str) -> None:
        self.template = template

    def render(self, **kwargs: Any) -> str:
        """渲染模板,替换所有占位符"""
        result = self.template
        for key, value in kwargs.items():
            result = result.replace(f"{{{key}}}", str(value))
        return result

# ──────────────────────────────────
# 预定义的Prompt模板
# ──────────────────────────────────

REACT_SYSTEM_PROMPT = PromptTemplate("""你是一个智能AI助手,能够使用工具来帮助用户解决问题。

## 当前时间
{current_time}

## 可用工具
{tool_descriptions}

## 工作方式
你遵循 ReAct(Reasoning + Acting)模式工作:
1. **思考(Thought)**:分析用户问题,决定下一步行动
2. **行动(Action)**:如果需要,调用合适的工具
3. **观察(Observation)**:分析工具返回的结果
4. 重复以上步骤直到能给出最终答案

## 重要原则
- 仔细分析问题后再决定是否需要调用工具
- 如果可以直接回答,无需调用工具
- 每步只调用一个工具
- 根据工具返回的结果进行推理,不要编造事实
- 给出清晰、有条理的最终回答
""")

SUMMARIZE_PROMPT = PromptTemplate("""请将以下对话历史压缩为一段简洁的摘要,保留关键信息:

{conversation}

要求:
1. 保留用户的核心需求
2. 保留关键的工具调用结果
3. 保留重要的结论
4. 控制在200字以内
""")

RESEARCH_SYSTEM_PROMPT = PromptTemplate("""你是一个专业的研究助手,擅长通过多步搜索和分析来回答复杂问题。

## 当前时间
{current_time}

## 可用工具
{tool_descriptions}

## 研究方法
1. 将复杂问题分解为子问题
2. 对每个子问题进行搜索和分析
3. 综合所有信息形成完整答案
4. 给出参考来源

## 输出格式
- 使用 Markdown 格式
- 包含清晰的标题和段落
- 关键数据用粗体标注
""")

def format_tool_descriptions(tools_schemas: list[dict[str, Any]]) -> str:
    """将工具Schema格式化为人类可读的描述文本"""
    descriptions = []
    for schema in tools_schemas:
        func = schema["function"]
        name = func["name"]
        desc = func["description"]
        params = func["parameters"].get("properties", {})

        param_strs = []
        for pname, pinfo in params.items():
            ptype = pinfo.get("type", "string")
            pdesc = pinfo.get("description", "")
            param_strs.append(f"    - {pname} ({ptype}): {pdesc}")

        params_text = "\n".join(param_strs) if param_strs else "    (无参数)"
        descriptions.append(f"- **{name}**: {desc}\n  参数:\n{params_text}")

    return "\n\n".join(descriptions)

def build_system_prompt(
    tools_schemas: list[dict[str, Any]],
    template: PromptTemplate = REACT_SYSTEM_PROMPT,
) -> str:
    """构建完整的系统提示词"""
    tool_desc = format_tool_descriptions(tools_schemas)
    return template.render(
        current_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        tool_descriptions=tool_desc,
    )

6. Agent 核心循环实现

这是全章最核心的部分——实现 ReAct 循环。

6.1 ReAct 循环原理

ReAct(Reasoning + Acting)的核心流程:

Text Only
Thought:  我需要查找2024年GDP数据
Action:   web_search(query="2024年中国GDP")
Observation: 中国2024年GDP约为...
Thought:  我已经获得了数据,现在可以回答
Answer:   根据搜索结果...

循环终止条件(满足任一即停止): 1. LLM 直接给出回答(无 tool_calls) 2. 达到最大迭代次数 \(N_{\max}\) 3. 遇到不可恢复的错误

6.2 完整 Agent 类实现

Python
"""mini_agent/agent.py - Agent核心循环"""

from __future__ import annotations

import json
import logging
from dataclasses import dataclass, field
from typing import Any

from .llm import LLMEngine, LLMResponse, ModelConfig
from .memory import ConversationMemory
from .prompt import build_system_prompt, REACT_SYSTEM_PROMPT, PromptTemplate
from .tools import ToolRegistry, registry as default_registry

logger = logging.getLogger(__name__)

@dataclass
class AgentConfig:
    """Agent配置"""
    name: str = "MiniAgent"
    max_iterations: int = 10          # 最大迭代次数
    max_retries: int = 2              # 工具调用失败重试次数
    verbose: bool = True              # 是否打印中间步骤
    system_template: PromptTemplate = field(
        default_factory=lambda: REACT_SYSTEM_PROMPT
    )

@dataclass
class AgentStep:
    """Agent单步执行记录"""
    step_num: int
    thought: str | None = None        # LLM的思考
    action: str | None = None         # 调用的工具名
    action_input: dict | None = None  # 工具参数
    observation: str | None = None    # 工具返回结果
    is_final: bool = False            # 是否为最终回答

class Agent:
    """
    ReAct Agent 核心实现。

    工作流程:
    1. 接收用户输入
    2. 构建系统提示词(动态注入工具描述)
    3. 进入 ReAct 循环
    4. 返回最终结果

    Usage:
        agent = Agent(
            llm=LLMEngine(ModelConfig(api_key="sk-xxx")),
            tools=registry,
        )
        result = agent.run("帮我计算 2^10 + 3^5")
    """

    def __init__(
        self,
        llm: LLMEngine,
        tools: ToolRegistry | None = None,
        memory: ConversationMemory | None = None,
        config: AgentConfig | None = None,
    ) -> None:
        self.llm = llm
        self.tools = tools or default_registry
        self.memory = memory or ConversationMemory(max_tokens=8000)
        self.config = config or AgentConfig()
        self.steps: list[AgentStep] = []

    def run(self, user_input: str) -> str:
        """
        执行Agent主循环。

        Args:
            user_input: 用户输入的问题或指令

        Returns:
            Agent的最终回答
        """
        self.steps = []

        # 1. 构建系统提示词
        system_prompt = build_system_prompt(
            self.tools.get_schemas(),
            self.config.system_template,
        )

        # 2. 初始化消息列表
        messages: list[dict[str, Any]] = [
            {"role": "system", "content": system_prompt},
        ]

        # 加入历史记忆
        messages.extend(self.memory.get_messages())

        # 加入当前用户输入
        messages.append({"role": "user", "content": user_input})

        # 3. ReAct循环
        for i in range(1, self.config.max_iterations + 1):
            step = AgentStep(step_num=i)

            if self.config.verbose:
                print(f"\n{'='*50}")
                print(f"🔄 Step {i}/{self.config.max_iterations}")

            # 调用LLM
            response = self._call_llm(messages)

            # 情况A: LLM直接给出回答(无工具调用)
            if not response.has_tool_calls:
                step.thought = response.content
                step.is_final = True
                self.steps.append(step)

                if self.config.verbose:
                    print(f"💬 最终回答: {response.content}")

                # 保存到记忆
                self.memory.add_message("user", user_input)
                self.memory.add_message("assistant", response.content or "")

                return response.content or ""

            # 情况B: LLM请求调用工具
            # 先记录assistant消息(包含tool_calls)
            assistant_msg = self._build_assistant_message(response)
            messages.append(assistant_msg)

            # 执行所有工具调用
            for tool_call in response.tool_calls:
                tool_name = tool_call["name"]
                tool_args = tool_call["arguments"]
                tool_id = tool_call["id"]

                step.thought = response.content
                step.action = tool_name
                step.action_input = tool_args

                if self.config.verbose:
                    print(f"🤔 思考: {response.content or '(思考中...)'}")
                    print(f"🔧 调用工具: {tool_name}")
                    print(f"   参数: {json.dumps(tool_args, ensure_ascii=False)}")

                # 执行工具(带重试)
                observation = self._execute_with_retry(tool_name, tool_args)
                step.observation = observation

                if self.config.verbose:
                    # 截断过长输出
                    display = observation[:500] + "..." if len(observation) > 500 else observation
                    print(f"👁️ 观察: {display}")

                # 将工具结果加入消息
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_id,
                    "content": observation,
                })

            self.steps.append(step)

        # 超过最大迭代次数
        fallback = "⚠️ 已达到最大推理步数,以下是目前的分析结果:\n"
        if self.steps:
            last = self.steps[-1]
            if last.thought:
                fallback += last.thought
        return fallback

    def _call_llm(self, messages: list[dict[str, Any]]) -> LLMResponse:
        """调用LLM,传入工具定义"""
        tools_schemas = self.tools.get_schemas()
        return self.llm.chat(
            messages=messages,
            tools=tools_schemas if tools_schemas else None,
        )

    def _build_assistant_message(self, response: LLMResponse) -> dict[str, Any]:
        """构建包含tool_calls的assistant消息"""
        msg: dict[str, Any] = {"role": "assistant"}
        if response.content:
            msg["content"] = response.content
        if response.tool_calls:
            msg["tool_calls"] = [
                {
                    "id": tc["id"],
                    "type": "function",
                    "function": {
                        "name": tc["name"],
                        "arguments": json.dumps(tc["arguments"]),
                    },
                }
                for tc in response.tool_calls
            ]
        return msg

    def _execute_with_retry(
        self, tool_name: str, arguments: dict[str, Any]
    ) -> str:
        """执行工具调用,失败时自动重试"""
        last_error = ""
        for attempt in range(self.config.max_retries + 1):
            result = self.tools.execute(tool_name, arguments)
            if not result.startswith("❌"):
                return result
            last_error = result
            if attempt < self.config.max_retries:
                logger.warning(
                    f"工具 {tool_name}{attempt+1}次调用失败,重试中..."
                )
        return last_error

    def get_execution_trace(self) -> str:
        """获取完整的执行轨迹(用于调试)"""
        trace_lines = [f"📋 执行轨迹 ({len(self.steps)} steps):"]
        for step in self.steps:
            trace_lines.append(f"\n--- Step {step.step_num} ---")
            if step.thought:
                trace_lines.append(f"  Thought: {step.thought}")
            if step.action:
                trace_lines.append(f"  Action: {step.action}({step.action_input})")
            if step.observation:
                trace_lines.append(f"  Observation: {step.observation[:200]}")
            if step.is_final:
                trace_lines.append("  [FINAL ANSWER]")
        return "\n".join(trace_lines)

6.3 快速使用

Python
from mini_agent.llm import LLMEngine, ModelConfig
from mini_agent.agent import Agent

# 初始化
llm = LLMEngine(ModelConfig(api_key="sk-xxx", model="gpt-4o-mini"))
agent = Agent(llm=llm)

# 运行
answer = agent.run("帮我计算 2的10次方加上144的平方根")
print(answer)

# 查看执行轨迹
print(agent.get_execution_trace())

预期输出:

Text Only
==================================================
🔄 Step 1/10
🤔 思考: 我需要计算 2^10 + √144,让我使用计算器工具
🔧 调用工具: calculator
   参数: {"expression": "2**10 + math.sqrt(144)"}
👁️ 观察: 计算结果: 1036.0

==================================================
🔄 Step 2/10
💬 最终回答: 2的10次方 (1024) 加上144的平方根 (12) 等于 **1036**。

7. 记忆管理

Agent 的记忆系统解决一个核心问题:如何在有限的上下文窗口中保留最有价值的信息

7.1 对话历史与滑动窗口

Python
"""mini_agent/memory.py - 记忆管理"""

from __future__ import annotations

from collections import deque
from dataclasses import dataclass, field
from typing import Any

import tiktoken

@dataclass
class Message:
    """消息结构"""
    role: str          # "user", "assistant", "system", "tool"
    content: str
    metadata: dict[str, Any] = field(default_factory=dict)

class ConversationMemory:
    """
    对话记忆管理器。

    策略:
    1. 维护完整的对话历史
    2. 当Token超限时,使用滑动窗口 + 最旧消息移除
    3. 可选:摘要压缩(将旧消息压缩为摘要)
    """

    def __init__(
        self,
        max_tokens: int = 8000,
        encoding_name: str = "cl100k_base",
    ) -> None:
        self.max_tokens = max_tokens
        self.encoding = tiktoken.get_encoding(encoding_name)
        self._messages: deque[Message] = deque()  # 双端队列:左侧弹出O(1),适合FIFO消息管理
        self._summary: str = ""  # 被压缩的历史摘要

    def add_message(self, role: str, content: str, **metadata: Any) -> None:
        """添加一条消息"""
        self._messages.append(Message(role=role, content=content, metadata=metadata))
        self._trim_if_needed()

    def get_messages(self) -> list[dict[str, str]]:
        """获取当前记忆中的消息列表(OpenAI格式)"""
        messages = []
        # 如果有压缩摘要,作为system消息注入
        if self._summary:
            messages.append({
                "role": "system",
                "content": f"[历史对话摘要]\n{self._summary}",
            })
        for msg in self._messages:
            messages.append({"role": msg.role, "content": msg.content})
        return messages

    def _count_tokens(self, text: str) -> int:
        return len(self.encoding.encode(text))

    def _total_tokens(self) -> int:
        total = 0
        if self._summary:
            total += self._count_tokens(self._summary)
        for msg in self._messages:
            total += self._count_tokens(msg.content) + 4  # 消息元数据
        return total

    def _trim_if_needed(self) -> None:
        """当Token数超限时,移除最旧的消息"""
        while self._total_tokens() > self.max_tokens and len(self._messages) > 1:
            self._messages.popleft()

    def clear(self) -> None:
        """清空所有记忆"""
        self._messages.clear()
        self._summary = ""

    def set_summary(self, summary: str) -> None:
        """设置压缩后的历史摘要"""
        self._summary = summary

    @property
    def message_count(self) -> int:
        return len(self._messages)

    @property
    def token_count(self) -> int:
        return self._total_tokens()

7.2 摘要压缩

当对话变长时,可以用 LLM 自身来压缩历史:

Python
def compress_memory(agent: Agent) -> None:
    """使用LLM压缩对话历史"""
    from .prompt import SUMMARIZE_PROMPT

    messages = agent.memory.get_messages()
    if len(messages) < 6:  # 对话太短不需要压缩
        return

    # 将前半部分对话压缩为摘要
    mid = len(messages) // 2
    old_messages = messages[:mid]
    conversation_text = "\n".join(
        f"{m['role']}: {m['content']}" for m in old_messages
    )

    summary_prompt = SUMMARIZE_PROMPT.render(conversation=conversation_text)
    response = agent.llm.chat([
        {"role": "user", "content": summary_prompt}
    ])

    # 更新记忆
    agent.memory.set_summary(response.content or "")
    # 移除已压缩的旧消息
    # 注意:此处直接访问了_messages私有属性,建议为Memory类添加
    # 公开方法如 remove_oldest(n) 来封装此操作
    for _ in range(mid):
        if agent.memory._messages:
            agent.memory._messages.popleft()

8. 多 Agent 编排

多个 Agent 协作可以解决更复杂的任务。我们实现一个简单但实用的 Supervisor(主管)模式

8.1 Supervisor 架构

Text Only
        ┌─────────────┐
        │  Supervisor  │  (任务分发 & 结果聚合)
        │    Agent     │
        └──────┬──────┘
         ┌─────┼─────┐
         ▼     ▼     ▼
      ┌─────┐┌─────┐┌─────┐
      │搜索 ││分析 ││写作 │
      │Agent││Agent││Agent│
      └─────┘└─────┘└─────┘

8.2 实现代码

Python
"""mini_agent/multi_agent.py - 多Agent编排"""

from __future__ import annotations

import json
from dataclasses import dataclass, field
from typing import Any

from .agent import Agent, AgentConfig
from .llm import LLMEngine
from .tools import ToolRegistry

@dataclass
class AgentMessage:
    """Agent之间传递的消息"""
    sender: str
    receiver: str
    content: str
    metadata: dict[str, Any] = field(default_factory=dict)

class SupervisorAgent:
    """
    主管Agent:负责分解任务、分发给子Agent、聚合结果。

    工作流程:
    1. 接收用户任务
    2. 用LLM分解为子任务
    3. 将子任务分发给对应的子Agent
    4. 收集结果并综合回答
    """

    def __init__(
        self,
        llm: LLMEngine,
        sub_agents: dict[str, Agent],
    ) -> None:
        self.llm = llm
        self.sub_agents = sub_agents
        self.message_log: list[AgentMessage] = []

    def run(self, task: str) -> str:
        """执行多Agent协作任务"""

        # Step 1: 让LLM分解任务
        agent_names = list(self.sub_agents.keys())
        decompose_prompt = f"""你是一个任务分解专家。请将以下任务分解为子任务,并分配给合适的Agent。

可用Agent: {json.dumps(agent_names, ensure_ascii=False)}

任务: {task}

请以JSON格式返回,示例:
[
  {{"agent": "agent_name", "subtask": "具体子任务描述"}},
  ...
]

只返回JSON数组,不要其他内容。"""

        response = self.llm.chat([
            {"role": "user", "content": decompose_prompt}
        ])

        # Step 2: 解析任务分配
        try:
            content = response.content or "[]"
            # 清理可能的markdown代码块标记
            content = content.strip().strip("```json").strip("```").strip()
            subtasks = json.loads(content)
        except json.JSONDecodeError:
            return f"任务分解失败: {response.content}"

        # Step 3: 依次执行子任务
        results: list[dict[str, str]] = []
        for item in subtasks:
            agent_name = item.get("agent", "")
            subtask = item.get("subtask", "")

            agent = self.sub_agents.get(agent_name)
            if not agent:
                results.append({
                    "agent": agent_name,
                    "result": f"⚠️ 未找到Agent: {agent_name}",
                })
                continue

            print(f"\n📤 分发给 [{agent_name}]: {subtask}")
            result = agent.run(subtask)
            results.append({"agent": agent_name, "result": result})

            # 记录消息
            self.message_log.append(AgentMessage(
                sender="supervisor", receiver=agent_name, content=subtask,
            ))
            self.message_log.append(AgentMessage(
                sender=agent_name, receiver="supervisor", content=result,
            ))

        # Step 4: 综合结果
        synthesis_prompt = f"""你是一个结果综合专家。请根据以下子任务的执行结果,给出完整的最终回答。

原始任务: {task}

各Agent执行结果:
{json.dumps(results, ensure_ascii=False, indent=2)}

请给出结构化的综合回答:"""

        final_response = self.llm.chat([
            {"role": "user", "content": synthesis_prompt}
        ])

        return final_response.content or ""

8.3 使用多 Agent

Python
from mini_agent.llm import LLMEngine, ModelConfig
from mini_agent.agent import Agent, AgentConfig
from mini_agent.multi_agent import SupervisorAgent
from mini_agent.tools import ToolRegistry, registry

llm = LLMEngine(ModelConfig(api_key="sk-xxx", model="gpt-4o-mini"))

# 创建专职Agent
researcher = Agent(
    llm=llm,
    tools=registry,
    config=AgentConfig(name="researcher", max_iterations=5),
)
analyst = Agent(
    llm=llm,
    tools=registry,
    config=AgentConfig(name="analyst", max_iterations=5),
)

# 主管Agent
supervisor = SupervisorAgent(
    llm=llm,
    sub_agents={"researcher": researcher, "analyst": analyst},
)

result = supervisor.run("分析Python和Rust在AI领域的应用前景")
print(result)

9. 完整项目:智能研究助手

将所有模块集成,构建一个能够 搜索→分析→生成报告 的智能研究助手。

9.1 项目结构

Text Only
research_assistant/
├── main.py             # 入口文件
├── mini_agent/         # 我们构建的框架
│   ├── __init__.py
│   ├── llm.py
│   ├── tools.py
│   ├── prompt.py
│   ├── memory.py
│   ├── agent.py
│   └── multi_agent.py
└── requirements.txt

9.2 __init__.py 包入口

Python
"""mini_agent - 从零构建的轻量级Agent框架"""

from .llm import LLMEngine, ModelConfig, LLMResponse
from .tools import Tool, ToolRegistry, registry
from .prompt import PromptTemplate, build_system_prompt
from .memory import ConversationMemory
from .agent import Agent, AgentConfig

__all__ = [  # 定义模块公开API,控制from package import *的导出范围
    "LLMEngine", "ModelConfig", "LLMResponse",
    "Tool", "ToolRegistry", "registry",
    "PromptTemplate", "build_system_prompt",
    "ConversationMemory",
    "Agent", "AgentConfig",
]

9.3 主程序

Python
"""main.py - 智能研究助手"""

import os
from mini_agent import (
    LLMEngine, ModelConfig, Agent, AgentConfig,
    ToolRegistry, ConversationMemory,
)
from mini_agent.prompt import RESEARCH_SYSTEM_PROMPT
from mini_agent.tools import registry

def setup_research_agent() -> Agent:
    """配置研究助手Agent"""

    # 添加研究专用工具
    @registry.register(description="搜索学术论文和技术文章")
    def search_papers(query: str, year: int = 2024) -> str:
        """
        搜索学术论文。
        query: 搜索关键词
        year: 发表年份筛选
        """
        # 生产中替换为 Semantic Scholar / arXiv API
        return (
            f"搜索 '{query}' ({year}年) 的论文结果:\n"
            f"1. [论文A] {query}的最新进展 - 2024\n"
            f"2. [论文B] {query}在工业界的应用 - 2024\n"
            f"3. [论文C] {query}综述 - 2024"
        )

    @registry.register(description="读取和分析网页内容")
    def read_webpage(url: str) -> str:
        """
        读取网页内容。
        url: 要读取的网页URL
        """
        # 生产中使用 requests + BeautifulSoup
        return f"[模拟] 网页 {url} 的主要内容:这是一篇关于AI技术的文章..."

    @registry.register(description="生成Markdown格式的研究报告")
    def generate_report(title: str, sections: str) -> str:
        """
        生成结构化报告。
        title: 报告标题
        sections: 报告各节内容(JSON格式)
        """
        return f"# {title}\n\n{sections}\n\n---\n*报告生成完成*"

    # 创建Agent
    llm = LLMEngine(ModelConfig(
        api_key=os.getenv("OPENAI_API_KEY", "sk-xxx"),
        model="gpt-4o-mini",
    ))

    agent = Agent(
        llm=llm,
        tools=registry,
        memory=ConversationMemory(max_tokens=12000),
        config=AgentConfig(
            name="ResearchAssistant",
            max_iterations=8,
            verbose=True,
            system_template=RESEARCH_SYSTEM_PROMPT,
        ),
    )
    return agent

def main() -> None:
    """交互式研究助手"""
    agent = setup_research_agent()

    print("🔬 智能研究助手已启动")
    print("   输入问题开始研究,输入 'quit' 退出\n")

    while True:
        user_input = input("👤 你: ").strip()
        if user_input.lower() in ("quit", "exit", "q"):
            print("\n" + agent.llm.get_usage_report())
            print("👋 再见!")
            break
        if not user_input:
            continue

        answer = agent.run(user_input)
        print(f"\n🤖 助手: {answer}\n")

if __name__ == "__main__":
    main()

9.4 requirements.txt

Text Only
openai>=1.0.0
tiktoken>=0.5.0
pydantic>=2.0.0

10. 与主流框架对比

10.1 功能对比

特性 我们的框架 LangGraph CrewAI
代码量 ~500行 ~50,000行 ~30,000行
学习曲线
ReAct循环
工具系统 ✅ 自动Schema ✅ 丰富生态 ✅ 内置工具
多Agent ✅ Supervisor ✅ 图编排 ✅ 角色扮演
记忆管理 ✅ 基础 ✅ 检查点 ✅ 基础
流式输出
可观测性 基础日志 LangSmith 内置
生产就绪 ⚠️ 需完善
定制灵活性 ⭐⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐

10.2 何时自研 vs 使用框架

选择自研框架: - 需要深度定制 Agent 行为 - 安全/合规要求严格,需要完全掌控代码 - 团队有足够的工程能力 - 需要极致的性能优化

选择成熟框架: - 快速原型验证 - 需要丰富的工具生态 - 团队对 Agent 技术不够熟悉 - 需要开箱即用的可观测性

10.3 架构演进建议

Text Only
    MVP阶段           成长阶段            成熟阶段
 ┌──────────┐    ┌──────────────┐    ┌───────────────┐
 │ 自研框架  │ →  │ 自研 + 框架   │ →  │ 平台化Agent   │
 │ ~500行   │    │ 组件混用      │    │ 基础设施       │
 └──────────┘    └──────────────┘    └───────────────┘
   验证想法         补齐短板           规模化运营

📝 本章小结

本章从零构建了一个完整的Agent框架,深入理解了Agent运行机制的每个环节:

  1. ✅ 理解了自研Agent框架的价值(去除黑箱、完全掌控、极致优化)
  2. ✅ 掌握了Agent框架的模块化架构设计(LLM引擎 / 工具系统 / 提示模板 / 记忆管理)
  3. ✅ 实现了LLM引擎模块(统一接口、流式输出、Token计数)
  4. ✅ 实现了工具系统(装饰器注册、JSON Schema自动生成、安全执行)
  5. ✅ 实现了Prompt模板引擎(变量渲染、预定义模板、工具描述格式化)
  6. ✅ 实现了Agent核心ReAct循环(Observe→Think→Act→Observe)
  7. ✅ 实现了记忆管理模块(滑动窗口、Token裁剪、LLM摘要压缩)
  8. ✅ 实现了多Agent编排(Supervisor模式、任务分解、结果综合)
  9. ✅ 完成了完整项目集成(Research Assistant研究助手)
  10. ✅ 对比了自研框架与主流框架(LangGraph/CrewAI/OpenAI SDK)的取舍

✅ 学习检查清单

  • 能解释自研Agent框架相比使用LangChain的优劣势
  • 能实现LLM引擎模块(统一调用接口、流式输出、Token计数)
  • 能实现工具注册系统(装饰器模式、JSON Schema自动生成)
  • 能实现ReAct核心循环(解析tool_calls → 执行工具 → 注入结果 → 继续推理)
  • 能实现对话记忆管理(滑动窗口、Token裁剪、摘要压缩)
  • 能实现Supervisor多Agent编排(任务分解 → 子Agent执行 → 结果综合)
  • 理解Agent框架的模块划分原则(LLM引擎/工具/Prompt/记忆/编排)
  • 能将各模块集成为完整可运行的Agent项目
  • 了解自研框架与LangGraph/CrewAI/OpenAI SDK的功能对应关系

12. 面试题

Q1: ReAct与Plan-and-Execute有什么区别?各适用于什么场景?

ReAct是“边想边做”,每步都可能调整方向,适合探索性任务和需要实时反馈的场景。Plan-and-Execute是“先规划再执行”,适合目标明确、步骤可预测的任务。实践中常用混合模式:先规划大纲,每步用ReAct执行,并根据结果动态调整计划。

Q2: 为什么要自研Agent框架而不直接用LangChain?

自研的价值在于:① 深入理解原理,调试问题时不再“黑箱”。② 定制化能力更强,安全/合规要求严格时需完全掌控代码。③ 性能可极致优化,去除不需要的抽象层。但快速原型验证、生态丰富的场景还是应该用成熟框架。

Q3: Agent框架中的工具调用如何保证安全性?

关键措施:① 白名单制:只允许调用注册的工具。② 参数验证:JSON Schema校验输入。③ 权限分级:危险操作(文件写入、代码执行)需人工审批。④ 资源限制:超时、内存限制、网络限制。⑤ 日志审计:记录所有工具调用。


13. 练习与扩展

练习1:添加新工具(⭐)

ToolRegistry 添加一个文件读写工具,使 Agent 能够创建和读取本地文件。

练习2:实现并行工具调用(⭐⭐)

当 LLM 返回多个 tool_calls 时,当前实现是串行执行。请使用 asyncio 改造为并行执行。

提示

Python
import asyncio

async def execute_tools_parallel(
    tools: ToolRegistry,
    tool_calls: list[dict],
) -> list[str]:
    tasks = [
        asyncio.to_thread(tools.execute, tc["name"], tc["arguments"])
        for tc in tool_calls
    ]
    return await asyncio.gather(*tasks)

练习3:添加人类审批节点(⭐⭐)

在工具执行前加入人类确认环节——当 Agent 要执行"危险"操作(如代码执行、文件写入)时,先打印操作内容,等待用户输入 y/n 确认。

练习4:持久化对话历史(⭐⭐⭐)

ConversationMemory 扩展为支持 SQLite 持久化,使得 Agent 重启后能恢复上次对话。

练习5:实现 Planning Agent(⭐⭐⭐)

在 ReAct 之上实现 Plan-and-Execute 模式: 1. Agent 先制定完整计划(步骤列表) 2. 逐步执行每个步骤 3. 根据中间结果动态调整计划


🔗 下一步

下一章我们将深入Agent的记忆系统,学习如何让Agent具备长期记忆能力。

继续学习: 12-Agent记忆系统


祝你学习愉快! 🎉


最后更新日期:2026-02-12 适用版本:AI Agent开发实战教程 v2026