跳转至

项目2: AI Agent工作流

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

难度: ⭐⭐⭐⭐ 高级 时间: 15-20小时 涉及知识: Agent系统、工具调用、LangChain Agents、任务规划


📖 项目概述

项目背景

传统的AI应用通常是"一问一答"的模式,用户提问,AI回答。但这种方式无法处理复杂的多步骤任务。AI Agent(智能体)是一种更高级的AI应用形式,它能够: - 理解用户的目标 - 自主规划任务步骤 - 调用各种工具完成任务 - 根据结果调整策略 - 最终达成用户的目标

AI Agent可以应用于各种场景,如:自动化办公、数据分析、代码生成、研究助手等。

项目目标

构建一个功能完整的AI Agent工作流系统,能够: - 理解用户的复杂任务 - 自动规划执行步骤 - 调用多种工具完成任务 - 支持多Agent协作 - 提供可视化的执行过程 - 支持自我反思和优化

技术栈

  • Agent框架: LangChain Agents
  • 大模型: OpenAI GPT-4 / 通义千问 / Claude
  • 工具集成: Serper搜索、Python REPL、文件操作、API调用
  • 记忆系统: LangChain Memory
  • 前端框架: Streamlit
  • 任务规划: LangChain Plan-and-Execute
  • 可视化: Graphviz, Mermaid

🏗️ 项目结构

Text Only
agent-workflow/
├── app/                      # 应用主目录
│   ├── __init__.py
│   ├── main.py              # FastAPI主应用
│   ├── config.py            # 配置文件
│   ├── agent/               # Agent模块
│   │   ├── __init__.py
│   │   ├── base_agent.py    # 基础Agent类
│   │   ├── planner.py       # 任务规划器
│   │   ├── executor.py      # 任务执行器
│   │   └── reflector.py     # 反思器
│   ├── tools/               # 工具模块
│   │   ├── __init__.py
│   │   ├── search.py        # 搜索工具
│   │   ├── python.py        # Python工具
│   │   ├── file.py          # 文件工具
│   │   ├── api.py           # API工具
│   │   └── calculator.py    # 计算器工具
│   ├── memory/              # 记忆模块
│   │   ├── __init__.py
│   │   ├── conversation.py  # 对话记忆
│   │   ├── entity.py        # 实体记忆
│   │   └── vector.py        # 向量记忆
│   └── api/                 # API路由
│       ├── __init__.py
│       ├── agent.py         # Agent API
│       └── workflow.py      # 工作流API
├── frontend/                 # 前端目录
│   ├── app.py              # Streamlit应用
│   ├── pages/              # 页面组件
│   │   ├── home.py
│   │   ├── agent_chat.py
│   │   └── workflow_view.py
│   └── components/         # UI组件
│       ├── agent_card.py
│       └── workflow_diagram.py
├── data/                    # 数据目录
│   ├── memory/             # 记忆存储
│   ├── logs/               # 执行日志
│   └── workflows/          # 工作流定义
├── tests/                   # 测试目录
│   ├── test_agent.py
│   ├── test_tools.py
│   └── test_workflow.py
├── utils/                   # 工具函数
│   ├── __init__.py
│   ├── logger.py           # 日志工具
│   └── visualizer.py       # 可视化工具
├── requirements.txt         # Python依赖
├── Dockerfile              # Docker配置
├── docker-compose.yml      # Docker Compose配置
└── README.md              # 项目说明

🎯 核心功能

1. Agent核心

  • 任务理解: 理解用户的复杂任务目标
  • 任务规划: 将任务分解为可执行的步骤
  • 工具选择: 根据任务需求选择合适的工具
  • 执行监控: 监控任务执行过程
  • 结果验证: 验证任务完成情况

2. 工具集成

  • 搜索工具: 使用搜索引擎获取信息
  • Python工具: 执行Python代码
  • 文件工具: 读写文件
  • API工具: 调用外部API
  • 计算器工具: 执行数学计算

3. 记忆系统

  • 对话记忆: 记录对话历史
  • 实体记忆: 记录重要实体信息
  • 向量记忆: 基于向量存储长期记忆

4. 多Agent协作

  • 角色定义: 定义不同Agent的职责
  • 任务分配: 自动分配任务给合适的Agent
  • 协作机制: Agent之间的信息共享
  • 冲突解决: 处理Agent之间的冲突

5. 可视化

  • 执行过程可视化: 展示Agent的执行过程
  • 工作流图: 可视化任务分解和执行流程
  • 工具调用记录: 展示工具调用的详细记录

💻 代码实现

1. 配置文件 (app/config.py)

Python
"""
Agent工作流系统配置文件
"""
import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    """应用配置"""

    # API配置
    API_HOST: str = "0.0.0.0"
    API_PORT: int = 8000
    API_PREFIX: str = "/api/v1"

    # LLM配置
    OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
    OPENAI_MODEL: str = "gpt-4o"
    OPENAI_TEMPERATURE: float = 0.0
    OPENAI_MAX_TOKENS: int = 2000

    # 搜索工具配置
    SERPER_API_KEY: str = os.getenv("SERPER_API_KEY", "")
    SEARCH_ENGINE: str = "google"
    MAX_SEARCH_RESULTS: int = 5

    # Agent配置
    MAX_ITERATIONS: int = 10
    EARLY_STOPPING: bool = True
    VERBOSE: bool = True

    # 记忆配置
    MEMORY_DIR: str = "./data/memory"
    MAX_MEMORY_SIZE: int = 1000

    # 工作流配置
    WORKFLOW_DIR: str = "./data/workflows"
    LOG_DIR: str = "./data/logs"

    # Python工具配置
    PYTHON_TIMEOUT: int = 30
    PYTHON_MAX_OUTPUT: int = 10000

    class Config:
        env_file = ".env"
        case_sensitive = True

# 全局配置实例
settings = Settings()

2. 基础Agent类 (app/agent/base_agent.py)

Python
"""
基础Agent类
"""
from typing import Any
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from app.config import settings
from app.tools import get_all_tools

class BaseAgent:
    """基础Agent类"""

    def __init__(
        self,
        name: str = "助手",
        role: str = "通用助手",
        tools: list | None = None,
        llm: ChatOpenAI | None = None,
        memory: ConversationBufferMemory | None = None
    ):
        """
        初始化Agent

        Args:
            name: Agent名称
            role: Agent角色
            tools: 工具列表
            llm: 语言模型
            memory: 记忆
        """
        self.name = name
        self.role = role

        # 初始化LLM
        if llm is None:
            self.llm = ChatOpenAI(
                model=settings.OPENAI_MODEL,
                temperature=settings.OPENAI_TEMPERATURE,
                max_tokens=settings.OPENAI_MAX_TOKENS,
                openai_api_key=settings.OPENAI_API_KEY
            )
        else:
            self.llm = llm

        # 初始化工具
        if tools is None:
            self.tools = get_all_tools()
        else:
            self.tools = tools

        # 初始化记忆
        if memory is None:
            self.memory = ConversationBufferMemory(
                memory_key="chat_history",
                return_messages=True
            )
        else:
            self.memory = memory

        # 创建Agent
        self._create_agent()

    def _create_agent(self):
        """创建Agent(使用 create_tool_calling_agent,替代已弃用的 initialize_agent)"""
        from langchain.agents import AgentExecutor, create_tool_calling_agent
        from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

        # 定义系统提示
        system_prompt = f"""你是一个{self.role},名称为{self.name}

你的职责是帮助用户完成各种任务。你可以使用以下工具:
{self._get_tools_description()}

使用工具时,请遵循以下规则:
1. 仔细分析用户的需求
2. 选择最合适的工具
3. 正确调用工具
4. 分析工具返回的结果
5. 如果需要,可以多次调用工具
6. 最终给出清晰的答案

请始终使用中文回答。"""

        prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            MessagesPlaceholder("chat_history", optional=True),
            ("human", "{input}"),
            MessagesPlaceholder("agent_scratchpad"),
        ])

        agent = create_tool_calling_agent(self.llm, self.tools, prompt)
        self.agent_executor = AgentExecutor(
            agent=agent,
            tools=self.tools,
            memory=self.memory,
            verbose=settings.VERBOSE,
            max_iterations=settings.MAX_ITERATIONS,
            early_stopping_method="generate" if settings.EARLY_STOPPING else None,
        )

    def _get_tools_description(self) -> str:
        """获取工具描述"""
        descriptions = []
        for tool in self.tools:
            descriptions.append(f"- {tool.name}: {tool.description}")
        return "\n".join(descriptions)

    def run(self, input_text: str) -> dict[str, Any]:
        """
        运行Agent

        Args:
            input_text: 用户输入

        Returns:
            执行结果
        """
        try:  # try/except捕获异常,防止程序崩溃
            result = self.agent_executor.invoke({"input": input_text})
            return {
                "success": True,
                "result": result["output"],
                "agent": self.name,
                "role": self.role
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "agent": self.name,
                "role": self.role
            }

    async def arun(self, input_text: str) -> dict[str, Any]:  # async def定义协程函数
        """
        异步运行Agent

        Args:
            input_text: 用户输入

        Returns:
            执行结果
        """
        try:
            result = await self.agent_executor.ainvoke({"input": input_text})  # await等待异步操作完成
            return {
                "success": True,
                "result": result["output"],
                "agent": self.name,
                "role": self.role
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "agent": self.name,
                "role": self.role
            }

    def clear_memory(self):
        """清除记忆"""
        self.memory.clear()

    def get_memory(self) -> list[dict[str, Any]]:
        """获取记忆"""
        return self.memory.load_memory_variables({})

3. 任务规划器 (app/agent/planner.py)

Python
"""
任务规划器
"""
from typing import Any
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from app.config import settings

class TaskPlanner:
    """任务规划器"""

    def __init__(self, llm: ChatOpenAI = None):
        """
        初始化任务规划器

        Args:
            llm: 语言模型
        """
        if llm is None:
            self.llm = ChatOpenAI(
                model=settings.OPENAI_MODEL,
                temperature=0.0,
                openai_api_key=settings.OPENAI_API_KEY
            )
        else:
            self.llm = llm

        self.prompt_template = PromptTemplate(
            input_variables=["task", "available_tools"],
            template="""你是一个任务规划专家。请将以下任务分解为可执行的步骤。

任务描述:
{task}

可用工具:
{available_tools}

请按照以下格式输出任务分解:

步骤1: [步骤描述]
- 使用工具: [工具名称]
- 目标: [步骤目标]
- 预期输出: [预期输出]

步骤2: [步骤描述]
- 使用工具: [工具名称]
- 目标: [步骤目标]
- 预期输出: [预期输出]

...

请确保:
1. 每个步骤都是可执行的
2. 步骤之间有合理的依赖关系
3. 选择的工具是合适的
4. 预期输出是明确的
"""
        )

    def plan(
        self,
        task: str,
        available_tools: list[str]
    ) -> list[dict[str, Any]]:
        """
        规划任务

        Args:
            task: 任务描述
            available_tools: 可用工具列表

        Returns:
            任务步骤列表
        """
        # 格式化工具列表
        tools_description = "\n".join([f"- {tool}" for tool in available_tools])

        # 生成规划
        prompt = self.prompt_template.format(
            task=task,
            available_tools=tools_description
        )

        response = self.llm.invoke(prompt)

        # 解析响应
        steps = self._parse_plan(response.content)

        return steps

    def _parse_plan(self, plan_text: str) -> list[dict[str, Any]]:
        """
        解析规划文本

        Args:
            plan_text: 规划文本

        Returns:
            任务步骤列表
        """
        steps = []
        current_step = None

        for line in plan_text.split("\n"):
            line = line.strip()

            # 检测步骤
            if line.startswith("步骤"):
                if current_step:
                    steps.append(current_step)
                current_step = {"description": line, "tool": "", "goal": "", "output": ""}

            # 检测工具
            elif line.startswith("- 使用工具:"):
                if current_step:
                    current_step["tool"] = line.split(":")[1].strip()

            # 检测目标
            elif line.startswith("- 目标:"):
                if current_step:
                    current_step["goal"] = line.split(":")[1].strip()

            # 检测预期输出
            elif line.startswith("- 预期输出:"):
                if current_step:
                    current_step["output"] = line.split(":")[1].strip()

        # 添加最后一个步骤
        if current_step:
            steps.append(current_step)

        return steps

    def optimize_plan(
        self,
        steps: list[dict[str, Any]],
        feedback: str
    ) -> list[dict[str, Any]]:
        """
        优化规划

        Args:
            steps: 原始步骤
            feedback: 反馈信息

        Returns:
            优化后的步骤
        """
        # 将步骤转换为文本
        steps_text = "\n".join([
            f"步骤{i+1}: {step['description']}\n"
            f"  工具: {step['tool']}\n"
            f"  目标: {step['goal']}"
            for i, step in enumerate(steps)  # enumerate同时获取索引和元素
        ])

        prompt = f"""基于以下反馈,优化任务规划:

原始规划:
{steps_text}

反馈:
{feedback}

请提供优化后的规划(使用相同的格式):"""

        response = self.llm.invoke(prompt)
        optimized_steps = self._parse_plan(response.content)

        return optimized_steps

4. 工具定义 (app/tools/init.py)

Python
"""
工具模块初始化
"""
from app.tools.search import SearchTool
from app.tools.python import PythonTool
from app.tools.file import FileTool
from app.tools.calculator import CalculatorTool

def get_all_tools():
    """获取所有工具"""
    return [
        SearchTool(),
        PythonTool(),
        FileTool(),
        CalculatorTool()
    ]

5. 搜索工具 (app/tools/search.py)

Python
"""
搜索工具
"""
import requests
from typing import Any
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from app.config import settings

class SearchInput(BaseModel):  # Pydantic BaseModel:自动数据验证和序列化
    """搜索输入"""
    query: str = Field(description="搜索查询")
    num_results: int = Field(default=5, description="返回结果数量")

class SearchTool(BaseTool):
    """搜索工具"""

    name = "search"
    description = "使用搜索引擎搜索信息"
    args_schema = SearchInput

    def __init__(self):
        """初始化搜索工具"""
        super().__init__()  # super()调用父类方法
        self.api_key = settings.SERPER_API_KEY
        self.base_url = "https://google.serper.dev/search"  # ⚠️ 需要API密钥

    def _run(
        self,
        query: str,
        num_results: int = 5
    ) -> str:
        """
        执行搜索

        Args:
            query: 搜索查询
            num_results: 返回结果数量

        Returns:
            搜索结果
        """
        try:
            # 调用搜索API
            headers = {
                "X-API-KEY": self.api_key,
                "Content-Type": "application/json"
            }

            payload = {
                "q": query,
                "num": min(num_results, settings.MAX_SEARCH_RESULTS)
            }

            response = requests.post(
                self.base_url,
                headers=headers,
                json=payload
            )

            response.raise_for_status()
            data = response.json()

            # 解析结果
            results = []
            if "organic" in data:
                for item in data["organic"][:num_results]:
                    results.append({
                        "title": item.get("title", ""),
                        "link": item.get("link", ""),
                        "snippet": item.get("snippet", "")
                    })

            # 格式化输出
            output = f"搜索查询: {query}\n\n"
            for i, result in enumerate(results, 1):
                output += f"{i}. {result['title']}\n"
                output += f"   链接: {result['link']}\n"
                output += f"   摘要: {result['snippet']}\n\n"

            return output

        except Exception as e:
            return f"搜索失败: {str(e)}"

    async def _arun(
        self,
        query: str,
        num_results: int = 5
    ) -> str:
        """异步执行搜索"""
        return self._run(query, num_results)

6. Python工具 (app/tools/python.py)

Python
"""
Python执行工具
"""
import sys
import io
import contextlib
from typing import Any
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from app.config import settings

class PythonInput(BaseModel):
    """Python输入"""
    code: str = Field(description="要执行的Python代码")

class PythonTool(BaseTool):
    """Python执行工具"""

    name = "python"
    description = "执行Python代码并返回输出"
    args_schema = PythonInput

    def __init__(self):
        """初始化Python工具"""
        super().__init__()
        self.timeout = settings.PYTHON_TIMEOUT
        self.max_output = settings.PYTHON_MAX_OUTPUT

    def _run(self, code: str) -> str:
        """
        执行Python代码

        Args:
            code: Python代码

        Returns:
            执行输出
        """
        try:
            # 重定向输出
            output_buffer = io.StringIO()
            error_buffer = io.StringIO()

            with contextlib.redirect_stdout(output_buffer), \
                 contextlib.redirect_stderr(error_buffer):

                # 执行代码
                exec_globals = {
                    "__name__": "__main__",
                    "__builtins__": __builtins__
                }

                exec(code, exec_globals)

            # 获取输出
            stdout = output_buffer.getvalue()
            stderr = error_buffer.getvalue()

            # 组合输出
            result = ""
            if stdout:
                result += f"输出:\n{stdout}\n"
            if stderr:
                result += f"错误:\n{stderr}\n"

            if not result:
                result = "代码执行成功,无输出"

            # 限制输出长度
            if len(result) > self.max_output:
                result = result[:self.max_output] + "\n... (输出被截断)"

            return result

        except Exception as e:
            return f"执行失败: {str(e)}"

    async def _arun(self, code: str) -> str:
        """异步执行Python代码"""
        return self._run(code)

7. 文件工具 (app/tools/file.py)

Python
"""
文件操作工具
"""
from pathlib import Path
from typing import Any
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field

class FileReadInput(BaseModel):
    """文件读取输入"""
    path: str = Field(description="文件路径")

class FileWriteInput(BaseModel):
    """文件写入输入"""
    path: str = Field(description="文件路径")
    content: str = Field(description="文件内容")

class FileListInput(BaseModel):
    """文件列表输入"""
    directory: str = Field(default=".", description="目录路径")

class FileReadTool(BaseTool):
    """文件读取工具"""

    name = "read_file"
    description = "读取文件内容"
    args_schema = FileReadInput

    def _run(self, path: str) -> str:
        """
        读取文件

        Args:
            path: 文件路径

        Returns:
            文件内容
        """
        try:
            file_path = Path(path)
            if not file_path.exists():
                return f"文件不存在: {path}"

            content = file_path.read_text(encoding="utf-8")

            # 限制返回长度
            max_length = 5000
            if len(content) > max_length:
                content = content[:max_length] + "\n... (内容被截断)"

            return content

        except Exception as e:
            return f"读取失败: {str(e)}"

    async def _arun(self, path: str) -> str:
        """异步读取文件"""
        return self._run(path)

class FileWriteTool(BaseTool):
    """文件写入工具"""

    name = "write_file"
    description = "写入文件内容"
    args_schema = FileWriteInput

    def _run(self, path: str, content: str) -> str:
        """
        写入文件

        Args:
            path: 文件路径
            content: 文件内容

        Returns:
            操作结果
        """
        try:
            file_path = Path(path)

            # 创建父目录
            file_path.parent.mkdir(parents=True, exist_ok=True)

            # 写入文件
            file_path.write_text(content, encoding="utf-8")

            return f"文件写入成功: {path}"

        except Exception as e:
            return f"写入失败: {str(e)}"

    async def _arun(self, path: str, content: str) -> str:
        """异步写入文件"""
        return self._run(path, content)

class FileListTool(BaseTool):
    """文件列表工具"""

    name = "list_files"
    description = "列出目录中的文件"
    args_schema = FileListInput

    def _run(self, directory: str = ".") -> str:
        """
        列出文件

        Args:
            directory: 目录路径

        Returns:
            文件列表
        """
        try:
            dir_path = Path(directory)
            if not dir_path.exists():
                return f"目录不存在: {directory}"

            items = []
            for item in dir_path.iterdir():
                item_type = "目录" if item.is_dir() else "文件"
                items.append(f"{item_type}: {item.name}")

            if not items:
                return "目录为空"

            return "\n".join(items)

        except Exception as e:
            return f"列出失败: {str(e)}"

    async def _arun(self, directory: str = ".") -> str:
        """异步列出文件"""
        return self._run(directory)

8. 计算器工具 (app/tools/calculator.py)

Python
"""
计算器工具
"""
import ast
import operator
from typing import Any
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field

class CalculatorInput(BaseModel):
    """计算器输入"""
    expression: str = Field(description="数学表达式")

class CalculatorTool(BaseTool):
    """计算器工具"""

    name = "calculator"
    description = "执行数学计算"
    args_schema = CalculatorInput

    # 支持的运算符
    operators = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.Pow: operator.pow,
        ast.Mod: operator.mod,
        ast.USub: operator.neg,
    }

    def _run(self, expression: str) -> str:
        """
        执行计算

        Args:
            expression: 数学表达式

        Returns:
            计算结果
        """
        try:
            # 解析表达式
            node = ast.parse(expression, mode='eval')

            # 计算结果
            result = self._eval(node.body)

            return f"计算结果: {result}"

        except Exception as e:
            return f"计算失败: {str(e)}"

    def _eval(self, node):
        """递归计算AST节点"""
        if isinstance(node, ast.Num):  # isinstance检查类型
            return node.n
        elif isinstance(node, ast.Constant):
            return node.value
        elif isinstance(node, ast.BinOp):
            left = self._eval(node.left)
            right = self._eval(node.right)
            op_type = type(node.op)
            if op_type in self.operators:
                return self.operators[op_type](left, right)
            else:
                raise ValueError(f"不支持的运算符: {op_type}")
        elif isinstance(node, ast.UnaryOp):
            operand = self._eval(node.operand)
            op_type = type(node.op)
            if op_type in self.operators:
                return self.operators[op_type](operand)
            else:
                raise ValueError(f"不支持的一元运算符: {op_type}")
        else:
            raise ValueError(f"不支持的节点类型: {type(node)}")

    async def _arun(self, expression: str) -> str:
        """异步执行计算"""
        return self._run(expression)

9. FastAPI主应用 (app/main.py)

Python
"""
FastAPI主应用
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

from app.config import settings
from app.api import agent, workflow

# 创建FastAPI应用
app = FastAPI(
    title="AI Agent工作流系统",
    description="基于LangChain的智能Agent工作流系统",
    version="1.0.0"
)

# 配置CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 注册路由
app.include_router(
    agent.router,
    prefix=settings.API_PREFIX,
    tags=["agent"]
)
app.include_router(
    workflow.router,
    prefix=settings.API_PREFIX,
    tags=["workflow"]
)

@app.get("/")
async def root():
    """根路径"""
    return {
        "message": "AI Agent工作流系统",
        "version": "1.0.0",
        "docs": "/docs"
    }

@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host=settings.API_HOST,
        port=settings.API_PORT,
        reload=True
    )

10. Agent API (app/api/agent.py)

Python
"""
Agent API
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

from app.agent.base_agent import BaseAgent
from app.tools import get_all_tools

router = APIRouter()

# 存储Agent实例
agents = {}

class AgentCreateRequest(BaseModel):
    """创建Agent请求"""
    name: str
    role: str
    tools: list[str] | None = None

class AgentChatRequest(BaseModel):
    """Agent聊天请求"""
    message: str

class AgentChatResponse(BaseModel):
    """Agent聊天响应"""
    success: bool
    result: str | None = None
    error: str | None = None
    agent: str
    role: str

@router.post("/agent/create", response_model=dict)
async def create_agent(request: AgentCreateRequest):
    """
    创建Agent

    Args:
        request: 创建请求

    Returns:
        Agent信息
    """
    try:
        # 获取工具
        all_tools = get_all_tools()
        if request.tools:
            selected_tools = [
                tool for tool in all_tools
                if tool.name in request.tools
            ]
        else:
            selected_tools = all_tools

        # 创建Agent
        agent = BaseAgent(
            name=request.name,
            role=request.role,
            tools=selected_tools
        )

        # 存储Agent
        agent_id = request.name
        agents[agent_id] = agent

        return {
            "message": "Agent创建成功",
            "agent_id": agent_id,
            "name": agent.name,
            "role": agent.role,
            "tools": [tool.name for tool in agent.tools]
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/agent/{agent_id}/chat", response_model=AgentChatResponse)
async def agent_chat(agent_id: str, request: AgentChatRequest):
    """
    Agent聊天

    Args:
        agent_id: Agent ID
        request: 聊天请求

    Returns:
        聊天响应
    """
    try:
        # 获取Agent
        if agent_id not in agents:
            raise HTTPException(status_code=404, detail="Agent不存在")

        agent = agents[agent_id]

        # 执行聊天
        result = agent.run(request.message)

        return result
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.delete("/agent/{agent_id}")
async def delete_agent(agent_id: str):
    """
    删除Agent

    Args:
        agent_id: Agent ID

    Returns:
        删除结果
    """
    if agent_id not in agents:
        raise HTTPException(status_code=404, detail="Agent不存在")

    del agents[agent_id]

    return {"message": "Agent删除成功"}

@router.get("/agent/list")
async def list_agents():
    """
    列出所有Agent

    Returns:
        Agent列表
    """
    agent_list = []
    for agent_id, agent in agents.items():
        agent_list.append({
            "agent_id": agent_id,
            "name": agent.name,
            "role": agent.role,
            "tools": [tool.name for tool in agent.tools]
        })

    return {"agents": agent_list}

11. Streamlit前端 (frontend/app.py)

Python
"""
Streamlit前端应用
"""
import streamlit as st
import requests

# 配置页面
st.set_page_config(
    page_title="AI Agent工作流系统",
    page_icon="🤖",
    layout="wide"
)

# API配置
API_BASE_URL = "http://localhost:8000/api/v1"

def main():
    """主函数"""
    st.title("🤖 AI Agent工作流系统")

    # 侧边栏 - Agent管理
    with st.sidebar:
        st.header("Agent管理")

        # 创建Agent
        st.subheader("创建新Agent")
        agent_name = st.text_input("Agent名称", value="助手")
        agent_role = st.text_input("Agent角色", value="通用助手")

        # 工具选择
        all_tools = ["search", "python", "read_file", "write_file", "list_files", "calculator"]
        selected_tools = st.multiselect("选择工具", all_tools, default=all_tools)

        if st.button("创建Agent"):
            with st.spinner("正在创建Agent..."):
                try:
                    response = requests.post(
                        f"{API_BASE_URL}/agent/create",
                        json={
                            "name": agent_name,
                            "role": agent_role,
                            "tools": selected_tools
                        }
                    )

                    if response.status_code == 200:
                        st.success("Agent创建成功!")
                        st.json(response.json())
                    else:
                        st.error(f"创建失败: {response.text}")
                except Exception as e:
                    st.error(f"创建失败: {str(e)}")

        st.divider()

        # Agent列表
        st.subheader("Agent列表")
        if st.button("刷新列表"):
            try:
                response = requests.get(f"{API_BASE_URL}/agent/list")
                if response.status_code == 200:
                    data = response.json()
                    for agent in data["agents"]:
                        with st.expander(f"{agent['name']} ({agent['role']})"):
                            st.write(f"**Agent ID**: {agent['agent_id']}")
                            st.write(f"**工具**: {', '.join(agent['tools'])}")
            except Exception as e:
                st.error(f"获取列表失败: {str(e)}")

    # 主区域 - Agent聊天
    st.header("Agent对话")

    # Agent选择
    col1, col2 = st.columns([1, 3])
    with col1:
        agent_id = st.text_input("Agent ID", value="助手")

    # 初始化聊天历史
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # 显示聊天历史
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

    # 用户输入
    if prompt := st.chat_input("请输入您的任务..."):  # 海象运算符:=:赋值并判断——获取用户输入的同时检查是否非空
        # 显示用户消息
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)

        # 获取Agent回复
        with st.chat_message("assistant"):
            with st.spinner("Agent正在思考..."):
                try:
                    response = requests.post(
                        f"{API_BASE_URL}/agent/{agent_id}/chat",
                        json={"message": prompt}
                    )

                    if response.status_code == 200:
                        result = response.json()

                        if result["success"]:
                            st.markdown(result["result"])
                            st.session_state.messages.append({
                                "role": "assistant",
                                "content": result["result"]
                            })
                        else:
                            st.error(f"执行失败: {result['error']}")
                    else:
                        st.error(f"请求失败: {response.text}")
                except Exception as e:
                    st.error(f"请求失败: {str(e)}")

if __name__ == "__main__":
    main()

12. 依赖文件 (requirements.txt)

Text Only
fastapi==0.115.0
uvicorn[standard]==0.32.0
python-multipart==0.0.12
pydantic==2.10.0
pydantic-settings==2.6.0
langchain==0.3.7
openai==1.55.0
requests==2.32.0
streamlit==1.40.0
python-dotenv==1.0.1
google-search-results==2.4.2

13. 环境变量文件 (.env.example)

Text Only
# OpenAI API配置
OPENAI_API_KEY=your_openai_api_key_here

# Serper搜索API配置
SERPER_API_KEY=your_serper_api_key_here

🚀 部署说明

1. 本地部署

步骤1: 克隆项目

Bash
git clone https://github.com/yourusername/agent-workflow.git
cd agent-workflow

步骤2: 创建虚拟环境

Bash
python -m venv venv

# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate

步骤3: 安装依赖

Bash
pip install -r requirements.txt

步骤4: 配置环境变量

Bash
cp .env.example .env
# 编辑.env文件,填入API密钥

步骤5: 启动后端服务

Bash
python -m app.main

步骤6: 启动前端服务

Bash
cd frontend
streamlit run app.py

2. Docker部署

Bash
# 构建镜像
docker build -t agent-workflow .

# 运行容器
docker run -d \
  --name agent-workflow \
  -p 8000:8000 \
  -p 8501:8501 \
  -e OPENAI_API_KEY=your_api_key_here \
  -e SERPER_API_KEY=your_serper_key_here \
  agent-workflow

🔧 扩展方向

1. 功能扩展

  • 多Agent协作: 实现Agent之间的协作机制
  • 任务队列: 添加任务队列管理
  • 工作流模板: 预定义常用工作流
  • 自定义工具: 支持用户自定义工具
  • 插件系统: 支持第三方插件

2. 性能优化

  • 异步执行: 全面支持异步操作
  • 缓存机制: 缓存常用查询结果
  • 并行执行: 并行执行独立任务
  • 资源限制: 限制Agent资源使用

3. 用户体验

  • 可视化: 增强执行过程可视化
  • 调试模式: 添加调试和日志功能
  • 导出功能: 支持导出工作流
  • 分享功能: 支持分享Agent配置

4. 企业功能

  • 权限管理: 添加用户权限控制
  • 审计日志: 完整的操作审计
  • 监控告警: 系统监控和告警
  • API限流: API访问限流

📚 学习收获

完成本项目后,你将掌握:

  • Agent系统原理: 理解智能体的核心概念
  • LangChain Agents: 熟练使用LangChain构建Agent
  • 工具开发: 开发自定义工具
  • 任务规划: 实现任务分解和规划
  • 记忆系统: 管理Agent的记忆
  • 多Agent协作: 实现Agent之间的协作
  • 系统设计: 设计复杂的AI系统

🎉 开始学习

现在你已经了解了整个AI Agent工作流系统的实现,开始动手构建你自己的智能Agent吧!

推荐学习顺序: 1. 先运行简单的Agent,体验基本功能 2. 然后尝试复杂任务,观察Agent的规划过程 3. 开发自定义工具,扩展Agent能力 4. 实现多Agent协作,处理复杂场景

祝你学习顺利! 💪