项目2: AI Agent工作流¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
难度: ⭐⭐⭐⭐ 高级 时间: 15-20小时 涉及知识: Agent系统、工具调用、LangChain Agents、任务规划
📖 项目概述¶
项目背景¶
传统的AI应用通常是"一问一答"的模式,用户提问,AI回答。但这种方式无法处理复杂的多步骤任务。AI Agent(智能体)是一种更高级的AI应用形式,它能够: - 理解用户的目标 - 自主规划任务步骤 - 调用各种工具完成任务 - 根据结果调整策略 - 最终达成用户的目标
AI Agent可以应用于各种场景,如:自动化办公、数据分析、代码生成、研究助手等。
项目目标¶
构建一个功能完整的AI Agent工作流系统,能够: - 理解用户的复杂任务 - 自动规划执行步骤 - 调用多种工具完成任务 - 支持多Agent协作 - 提供可视化的执行过程 - 支持自我反思和优化
技术栈¶
- Agent框架: LangChain Agents
- 大模型: OpenAI GPT-4 / 通义千问 / Claude
- 工具集成: Serper搜索、Python REPL、文件操作、API调用
- 记忆系统: LangChain Memory
- 前端框架: Streamlit
- 任务规划: LangChain Plan-and-Execute
- 可视化: Graphviz, Mermaid
🏗️ 项目结构¶
Text Only
agent-workflow/
├── app/ # 应用主目录
│ ├── __init__.py
│ ├── main.py # FastAPI主应用
│ ├── config.py # 配置文件
│ ├── agent/ # Agent模块
│ │ ├── __init__.py
│ │ ├── base_agent.py # 基础Agent类
│ │ ├── planner.py # 任务规划器
│ │ ├── executor.py # 任务执行器
│ │ └── reflector.py # 反思器
│ ├── tools/ # 工具模块
│ │ ├── __init__.py
│ │ ├── search.py # 搜索工具
│ │ ├── python.py # Python工具
│ │ ├── file.py # 文件工具
│ │ ├── api.py # API工具
│ │ └── calculator.py # 计算器工具
│ ├── memory/ # 记忆模块
│ │ ├── __init__.py
│ │ ├── conversation.py # 对话记忆
│ │ ├── entity.py # 实体记忆
│ │ └── vector.py # 向量记忆
│ └── api/ # API路由
│ ├── __init__.py
│ ├── agent.py # Agent API
│ └── workflow.py # 工作流API
├── frontend/ # 前端目录
│ ├── app.py # Streamlit应用
│ ├── pages/ # 页面组件
│ │ ├── home.py
│ │ ├── agent_chat.py
│ │ └── workflow_view.py
│ └── components/ # UI组件
│ ├── agent_card.py
│ └── workflow_diagram.py
├── data/ # 数据目录
│ ├── memory/ # 记忆存储
│ ├── logs/ # 执行日志
│ └── workflows/ # 工作流定义
├── tests/ # 测试目录
│ ├── test_agent.py
│ ├── test_tools.py
│ └── test_workflow.py
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── logger.py # 日志工具
│ └── visualizer.py # 可视化工具
├── requirements.txt # Python依赖
├── Dockerfile # Docker配置
├── docker-compose.yml # Docker Compose配置
└── README.md # 项目说明
🎯 核心功能¶
1. Agent核心¶
- 任务理解: 理解用户的复杂任务目标
- 任务规划: 将任务分解为可执行的步骤
- 工具选择: 根据任务需求选择合适的工具
- 执行监控: 监控任务执行过程
- 结果验证: 验证任务完成情况
2. 工具集成¶
- 搜索工具: 使用搜索引擎获取信息
- Python工具: 执行Python代码
- 文件工具: 读写文件
- API工具: 调用外部API
- 计算器工具: 执行数学计算
3. 记忆系统¶
- 对话记忆: 记录对话历史
- 实体记忆: 记录重要实体信息
- 向量记忆: 基于向量存储长期记忆
4. 多Agent协作¶
- 角色定义: 定义不同Agent的职责
- 任务分配: 自动分配任务给合适的Agent
- 协作机制: Agent之间的信息共享
- 冲突解决: 处理Agent之间的冲突
5. 可视化¶
- 执行过程可视化: 展示Agent的执行过程
- 工作流图: 可视化任务分解和执行流程
- 工具调用记录: 展示工具调用的详细记录
💻 代码实现¶
1. 配置文件 (app/config.py)¶
Python
"""
Agent工作流系统配置文件
"""
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""应用配置"""
# API配置
API_HOST: str = "0.0.0.0"
API_PORT: int = 8000
API_PREFIX: str = "/api/v1"
# LLM配置
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
OPENAI_MODEL: str = "gpt-4o"
OPENAI_TEMPERATURE: float = 0.0
OPENAI_MAX_TOKENS: int = 2000
# 搜索工具配置
SERPER_API_KEY: str = os.getenv("SERPER_API_KEY", "")
SEARCH_ENGINE: str = "google"
MAX_SEARCH_RESULTS: int = 5
# Agent配置
MAX_ITERATIONS: int = 10
EARLY_STOPPING: bool = True
VERBOSE: bool = True
# 记忆配置
MEMORY_DIR: str = "./data/memory"
MAX_MEMORY_SIZE: int = 1000
# 工作流配置
WORKFLOW_DIR: str = "./data/workflows"
LOG_DIR: str = "./data/logs"
# Python工具配置
PYTHON_TIMEOUT: int = 30
PYTHON_MAX_OUTPUT: int = 10000
class Config:
env_file = ".env"
case_sensitive = True
# 全局配置实例
settings = Settings()
2. 基础Agent类 (app/agent/base_agent.py)¶
Python
"""
基础Agent类
"""
from typing import Any
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from app.config import settings
from app.tools import get_all_tools
class BaseAgent:
"""基础Agent类"""
def __init__(
self,
name: str = "助手",
role: str = "通用助手",
tools: list | None = None,
llm: ChatOpenAI | None = None,
memory: ConversationBufferMemory | None = None
):
"""
初始化Agent
Args:
name: Agent名称
role: Agent角色
tools: 工具列表
llm: 语言模型
memory: 记忆
"""
self.name = name
self.role = role
# 初始化LLM
if llm is None:
self.llm = ChatOpenAI(
model=settings.OPENAI_MODEL,
temperature=settings.OPENAI_TEMPERATURE,
max_tokens=settings.OPENAI_MAX_TOKENS,
openai_api_key=settings.OPENAI_API_KEY
)
else:
self.llm = llm
# 初始化工具
if tools is None:
self.tools = get_all_tools()
else:
self.tools = tools
# 初始化记忆
if memory is None:
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
else:
self.memory = memory
# 创建Agent
self._create_agent()
def _create_agent(self):
"""创建Agent(使用 create_tool_calling_agent,替代已弃用的 initialize_agent)"""
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# 定义系统提示
system_prompt = f"""你是一个{self.role},名称为{self.name}。
你的职责是帮助用户完成各种任务。你可以使用以下工具:
{self._get_tools_description()}
使用工具时,请遵循以下规则:
1. 仔细分析用户的需求
2. 选择最合适的工具
3. 正确调用工具
4. 分析工具返回的结果
5. 如果需要,可以多次调用工具
6. 最终给出清晰的答案
请始终使用中文回答。"""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder("chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad"),
])
agent = create_tool_calling_agent(self.llm, self.tools, prompt)
self.agent_executor = AgentExecutor(
agent=agent,
tools=self.tools,
memory=self.memory,
verbose=settings.VERBOSE,
max_iterations=settings.MAX_ITERATIONS,
early_stopping_method="generate" if settings.EARLY_STOPPING else None,
)
def _get_tools_description(self) -> str:
"""获取工具描述"""
descriptions = []
for tool in self.tools:
descriptions.append(f"- {tool.name}: {tool.description}")
return "\n".join(descriptions)
def run(self, input_text: str) -> dict[str, Any]:
"""
运行Agent
Args:
input_text: 用户输入
Returns:
执行结果
"""
try: # try/except捕获异常,防止程序崩溃
result = self.agent_executor.invoke({"input": input_text})
return {
"success": True,
"result": result["output"],
"agent": self.name,
"role": self.role
}
except Exception as e:
return {
"success": False,
"error": str(e),
"agent": self.name,
"role": self.role
}
async def arun(self, input_text: str) -> dict[str, Any]: # async def定义协程函数
"""
异步运行Agent
Args:
input_text: 用户输入
Returns:
执行结果
"""
try:
result = await self.agent_executor.ainvoke({"input": input_text}) # await等待异步操作完成
return {
"success": True,
"result": result["output"],
"agent": self.name,
"role": self.role
}
except Exception as e:
return {
"success": False,
"error": str(e),
"agent": self.name,
"role": self.role
}
def clear_memory(self):
"""清除记忆"""
self.memory.clear()
def get_memory(self) -> list[dict[str, Any]]:
"""获取记忆"""
return self.memory.load_memory_variables({})
3. 任务规划器 (app/agent/planner.py)¶
Python
"""
任务规划器
"""
from typing import Any
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from app.config import settings
class TaskPlanner:
"""任务规划器"""
def __init__(self, llm: ChatOpenAI = None):
"""
初始化任务规划器
Args:
llm: 语言模型
"""
if llm is None:
self.llm = ChatOpenAI(
model=settings.OPENAI_MODEL,
temperature=0.0,
openai_api_key=settings.OPENAI_API_KEY
)
else:
self.llm = llm
self.prompt_template = PromptTemplate(
input_variables=["task", "available_tools"],
template="""你是一个任务规划专家。请将以下任务分解为可执行的步骤。
任务描述:
{task}
可用工具:
{available_tools}
请按照以下格式输出任务分解:
步骤1: [步骤描述]
- 使用工具: [工具名称]
- 目标: [步骤目标]
- 预期输出: [预期输出]
步骤2: [步骤描述]
- 使用工具: [工具名称]
- 目标: [步骤目标]
- 预期输出: [预期输出]
...
请确保:
1. 每个步骤都是可执行的
2. 步骤之间有合理的依赖关系
3. 选择的工具是合适的
4. 预期输出是明确的
"""
)
def plan(
self,
task: str,
available_tools: list[str]
) -> list[dict[str, Any]]:
"""
规划任务
Args:
task: 任务描述
available_tools: 可用工具列表
Returns:
任务步骤列表
"""
# 格式化工具列表
tools_description = "\n".join([f"- {tool}" for tool in available_tools])
# 生成规划
prompt = self.prompt_template.format(
task=task,
available_tools=tools_description
)
response = self.llm.invoke(prompt)
# 解析响应
steps = self._parse_plan(response.content)
return steps
def _parse_plan(self, plan_text: str) -> list[dict[str, Any]]:
"""
解析规划文本
Args:
plan_text: 规划文本
Returns:
任务步骤列表
"""
steps = []
current_step = None
for line in plan_text.split("\n"):
line = line.strip()
# 检测步骤
if line.startswith("步骤"):
if current_step:
steps.append(current_step)
current_step = {"description": line, "tool": "", "goal": "", "output": ""}
# 检测工具
elif line.startswith("- 使用工具:"):
if current_step:
current_step["tool"] = line.split(":")[1].strip()
# 检测目标
elif line.startswith("- 目标:"):
if current_step:
current_step["goal"] = line.split(":")[1].strip()
# 检测预期输出
elif line.startswith("- 预期输出:"):
if current_step:
current_step["output"] = line.split(":")[1].strip()
# 添加最后一个步骤
if current_step:
steps.append(current_step)
return steps
def optimize_plan(
self,
steps: list[dict[str, Any]],
feedback: str
) -> list[dict[str, Any]]:
"""
优化规划
Args:
steps: 原始步骤
feedback: 反馈信息
Returns:
优化后的步骤
"""
# 将步骤转换为文本
steps_text = "\n".join([
f"步骤{i+1}: {step['description']}\n"
f" 工具: {step['tool']}\n"
f" 目标: {step['goal']}"
for i, step in enumerate(steps) # enumerate同时获取索引和元素
])
prompt = f"""基于以下反馈,优化任务规划:
原始规划:
{steps_text}
反馈:
{feedback}
请提供优化后的规划(使用相同的格式):"""
response = self.llm.invoke(prompt)
optimized_steps = self._parse_plan(response.content)
return optimized_steps
4. 工具定义 (app/tools/init.py)¶
Python
"""
工具模块初始化
"""
from app.tools.search import SearchTool
from app.tools.python import PythonTool
from app.tools.file import FileTool
from app.tools.calculator import CalculatorTool
def get_all_tools():
"""获取所有工具"""
return [
SearchTool(),
PythonTool(),
FileTool(),
CalculatorTool()
]
5. 搜索工具 (app/tools/search.py)¶
Python
"""
搜索工具
"""
import requests
from typing import Any
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from app.config import settings
class SearchInput(BaseModel): # Pydantic BaseModel:自动数据验证和序列化
"""搜索输入"""
query: str = Field(description="搜索查询")
num_results: int = Field(default=5, description="返回结果数量")
class SearchTool(BaseTool):
"""搜索工具"""
name = "search"
description = "使用搜索引擎搜索信息"
args_schema = SearchInput
def __init__(self):
"""初始化搜索工具"""
super().__init__() # super()调用父类方法
self.api_key = settings.SERPER_API_KEY
self.base_url = "https://google.serper.dev/search" # ⚠️ 需要API密钥
def _run(
self,
query: str,
num_results: int = 5
) -> str:
"""
执行搜索
Args:
query: 搜索查询
num_results: 返回结果数量
Returns:
搜索结果
"""
try:
# 调用搜索API
headers = {
"X-API-KEY": self.api_key,
"Content-Type": "application/json"
}
payload = {
"q": query,
"num": min(num_results, settings.MAX_SEARCH_RESULTS)
}
response = requests.post(
self.base_url,
headers=headers,
json=payload
)
response.raise_for_status()
data = response.json()
# 解析结果
results = []
if "organic" in data:
for item in data["organic"][:num_results]:
results.append({
"title": item.get("title", ""),
"link": item.get("link", ""),
"snippet": item.get("snippet", "")
})
# 格式化输出
output = f"搜索查询: {query}\n\n"
for i, result in enumerate(results, 1):
output += f"{i}. {result['title']}\n"
output += f" 链接: {result['link']}\n"
output += f" 摘要: {result['snippet']}\n\n"
return output
except Exception as e:
return f"搜索失败: {str(e)}"
async def _arun(
self,
query: str,
num_results: int = 5
) -> str:
"""异步执行搜索"""
return self._run(query, num_results)
6. Python工具 (app/tools/python.py)¶
Python
"""
Python执行工具
"""
import sys
import io
import contextlib
from typing import Any
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from app.config import settings
class PythonInput(BaseModel):
"""Python输入"""
code: str = Field(description="要执行的Python代码")
class PythonTool(BaseTool):
"""Python执行工具"""
name = "python"
description = "执行Python代码并返回输出"
args_schema = PythonInput
def __init__(self):
"""初始化Python工具"""
super().__init__()
self.timeout = settings.PYTHON_TIMEOUT
self.max_output = settings.PYTHON_MAX_OUTPUT
def _run(self, code: str) -> str:
"""
执行Python代码
Args:
code: Python代码
Returns:
执行输出
"""
try:
# 重定向输出
output_buffer = io.StringIO()
error_buffer = io.StringIO()
with contextlib.redirect_stdout(output_buffer), \
contextlib.redirect_stderr(error_buffer):
# 执行代码
exec_globals = {
"__name__": "__main__",
"__builtins__": __builtins__
}
exec(code, exec_globals)
# 获取输出
stdout = output_buffer.getvalue()
stderr = error_buffer.getvalue()
# 组合输出
result = ""
if stdout:
result += f"输出:\n{stdout}\n"
if stderr:
result += f"错误:\n{stderr}\n"
if not result:
result = "代码执行成功,无输出"
# 限制输出长度
if len(result) > self.max_output:
result = result[:self.max_output] + "\n... (输出被截断)"
return result
except Exception as e:
return f"执行失败: {str(e)}"
async def _arun(self, code: str) -> str:
"""异步执行Python代码"""
return self._run(code)
7. 文件工具 (app/tools/file.py)¶
Python
"""
文件操作工具
"""
from pathlib import Path
from typing import Any
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
class FileReadInput(BaseModel):
"""文件读取输入"""
path: str = Field(description="文件路径")
class FileWriteInput(BaseModel):
"""文件写入输入"""
path: str = Field(description="文件路径")
content: str = Field(description="文件内容")
class FileListInput(BaseModel):
"""文件列表输入"""
directory: str = Field(default=".", description="目录路径")
class FileReadTool(BaseTool):
"""文件读取工具"""
name = "read_file"
description = "读取文件内容"
args_schema = FileReadInput
def _run(self, path: str) -> str:
"""
读取文件
Args:
path: 文件路径
Returns:
文件内容
"""
try:
file_path = Path(path)
if not file_path.exists():
return f"文件不存在: {path}"
content = file_path.read_text(encoding="utf-8")
# 限制返回长度
max_length = 5000
if len(content) > max_length:
content = content[:max_length] + "\n... (内容被截断)"
return content
except Exception as e:
return f"读取失败: {str(e)}"
async def _arun(self, path: str) -> str:
"""异步读取文件"""
return self._run(path)
class FileWriteTool(BaseTool):
"""文件写入工具"""
name = "write_file"
description = "写入文件内容"
args_schema = FileWriteInput
def _run(self, path: str, content: str) -> str:
"""
写入文件
Args:
path: 文件路径
content: 文件内容
Returns:
操作结果
"""
try:
file_path = Path(path)
# 创建父目录
file_path.parent.mkdir(parents=True, exist_ok=True)
# 写入文件
file_path.write_text(content, encoding="utf-8")
return f"文件写入成功: {path}"
except Exception as e:
return f"写入失败: {str(e)}"
async def _arun(self, path: str, content: str) -> str:
"""异步写入文件"""
return self._run(path, content)
class FileListTool(BaseTool):
"""文件列表工具"""
name = "list_files"
description = "列出目录中的文件"
args_schema = FileListInput
def _run(self, directory: str = ".") -> str:
"""
列出文件
Args:
directory: 目录路径
Returns:
文件列表
"""
try:
dir_path = Path(directory)
if not dir_path.exists():
return f"目录不存在: {directory}"
items = []
for item in dir_path.iterdir():
item_type = "目录" if item.is_dir() else "文件"
items.append(f"{item_type}: {item.name}")
if not items:
return "目录为空"
return "\n".join(items)
except Exception as e:
return f"列出失败: {str(e)}"
async def _arun(self, directory: str = ".") -> str:
"""异步列出文件"""
return self._run(directory)
8. 计算器工具 (app/tools/calculator.py)¶
Python
"""
计算器工具
"""
import ast
import operator
from typing import Any
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
class CalculatorInput(BaseModel):
"""计算器输入"""
expression: str = Field(description="数学表达式")
class CalculatorTool(BaseTool):
"""计算器工具"""
name = "calculator"
description = "执行数学计算"
args_schema = CalculatorInput
# 支持的运算符
operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.Mod: operator.mod,
ast.USub: operator.neg,
}
def _run(self, expression: str) -> str:
"""
执行计算
Args:
expression: 数学表达式
Returns:
计算结果
"""
try:
# 解析表达式
node = ast.parse(expression, mode='eval')
# 计算结果
result = self._eval(node.body)
return f"计算结果: {result}"
except Exception as e:
return f"计算失败: {str(e)}"
def _eval(self, node):
"""递归计算AST节点"""
if isinstance(node, ast.Num): # isinstance检查类型
return node.n
elif isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp):
left = self._eval(node.left)
right = self._eval(node.right)
op_type = type(node.op)
if op_type in self.operators:
return self.operators[op_type](left, right)
else:
raise ValueError(f"不支持的运算符: {op_type}")
elif isinstance(node, ast.UnaryOp):
operand = self._eval(node.operand)
op_type = type(node.op)
if op_type in self.operators:
return self.operators[op_type](operand)
else:
raise ValueError(f"不支持的一元运算符: {op_type}")
else:
raise ValueError(f"不支持的节点类型: {type(node)}")
async def _arun(self, expression: str) -> str:
"""异步执行计算"""
return self._run(expression)
9. FastAPI主应用 (app/main.py)¶
Python
"""
FastAPI主应用
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from app.config import settings
from app.api import agent, workflow
# 创建FastAPI应用
app = FastAPI(
title="AI Agent工作流系统",
description="基于LangChain的智能Agent工作流系统",
version="1.0.0"
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(
agent.router,
prefix=settings.API_PREFIX,
tags=["agent"]
)
app.include_router(
workflow.router,
prefix=settings.API_PREFIX,
tags=["workflow"]
)
@app.get("/")
async def root():
"""根路径"""
return {
"message": "AI Agent工作流系统",
"version": "1.0.0",
"docs": "/docs"
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
host=settings.API_HOST,
port=settings.API_PORT,
reload=True
)
10. Agent API (app/api/agent.py)¶
Python
"""
Agent API
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app.agent.base_agent import BaseAgent
from app.tools import get_all_tools
router = APIRouter()
# 存储Agent实例
agents = {}
class AgentCreateRequest(BaseModel):
"""创建Agent请求"""
name: str
role: str
tools: list[str] | None = None
class AgentChatRequest(BaseModel):
"""Agent聊天请求"""
message: str
class AgentChatResponse(BaseModel):
"""Agent聊天响应"""
success: bool
result: str | None = None
error: str | None = None
agent: str
role: str
@router.post("/agent/create", response_model=dict)
async def create_agent(request: AgentCreateRequest):
"""
创建Agent
Args:
request: 创建请求
Returns:
Agent信息
"""
try:
# 获取工具
all_tools = get_all_tools()
if request.tools:
selected_tools = [
tool for tool in all_tools
if tool.name in request.tools
]
else:
selected_tools = all_tools
# 创建Agent
agent = BaseAgent(
name=request.name,
role=request.role,
tools=selected_tools
)
# 存储Agent
agent_id = request.name
agents[agent_id] = agent
return {
"message": "Agent创建成功",
"agent_id": agent_id,
"name": agent.name,
"role": agent.role,
"tools": [tool.name for tool in agent.tools]
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/agent/{agent_id}/chat", response_model=AgentChatResponse)
async def agent_chat(agent_id: str, request: AgentChatRequest):
"""
Agent聊天
Args:
agent_id: Agent ID
request: 聊天请求
Returns:
聊天响应
"""
try:
# 获取Agent
if agent_id not in agents:
raise HTTPException(status_code=404, detail="Agent不存在")
agent = agents[agent_id]
# 执行聊天
result = agent.run(request.message)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/agent/{agent_id}")
async def delete_agent(agent_id: str):
"""
删除Agent
Args:
agent_id: Agent ID
Returns:
删除结果
"""
if agent_id not in agents:
raise HTTPException(status_code=404, detail="Agent不存在")
del agents[agent_id]
return {"message": "Agent删除成功"}
@router.get("/agent/list")
async def list_agents():
"""
列出所有Agent
Returns:
Agent列表
"""
agent_list = []
for agent_id, agent in agents.items():
agent_list.append({
"agent_id": agent_id,
"name": agent.name,
"role": agent.role,
"tools": [tool.name for tool in agent.tools]
})
return {"agents": agent_list}
11. Streamlit前端 (frontend/app.py)¶
Python
"""
Streamlit前端应用
"""
import streamlit as st
import requests
# 配置页面
st.set_page_config(
page_title="AI Agent工作流系统",
page_icon="🤖",
layout="wide"
)
# API配置
API_BASE_URL = "http://localhost:8000/api/v1"
def main():
"""主函数"""
st.title("🤖 AI Agent工作流系统")
# 侧边栏 - Agent管理
with st.sidebar:
st.header("Agent管理")
# 创建Agent
st.subheader("创建新Agent")
agent_name = st.text_input("Agent名称", value="助手")
agent_role = st.text_input("Agent角色", value="通用助手")
# 工具选择
all_tools = ["search", "python", "read_file", "write_file", "list_files", "calculator"]
selected_tools = st.multiselect("选择工具", all_tools, default=all_tools)
if st.button("创建Agent"):
with st.spinner("正在创建Agent..."):
try:
response = requests.post(
f"{API_BASE_URL}/agent/create",
json={
"name": agent_name,
"role": agent_role,
"tools": selected_tools
}
)
if response.status_code == 200:
st.success("Agent创建成功!")
st.json(response.json())
else:
st.error(f"创建失败: {response.text}")
except Exception as e:
st.error(f"创建失败: {str(e)}")
st.divider()
# Agent列表
st.subheader("Agent列表")
if st.button("刷新列表"):
try:
response = requests.get(f"{API_BASE_URL}/agent/list")
if response.status_code == 200:
data = response.json()
for agent in data["agents"]:
with st.expander(f"{agent['name']} ({agent['role']})"):
st.write(f"**Agent ID**: {agent['agent_id']}")
st.write(f"**工具**: {', '.join(agent['tools'])}")
except Exception as e:
st.error(f"获取列表失败: {str(e)}")
# 主区域 - Agent聊天
st.header("Agent对话")
# Agent选择
col1, col2 = st.columns([1, 3])
with col1:
agent_id = st.text_input("Agent ID", value="助手")
# 初始化聊天历史
if "messages" not in st.session_state:
st.session_state.messages = []
# 显示聊天历史
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# 用户输入
if prompt := st.chat_input("请输入您的任务..."): # 海象运算符:=:赋值并判断——获取用户输入的同时检查是否非空
# 显示用户消息
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# 获取Agent回复
with st.chat_message("assistant"):
with st.spinner("Agent正在思考..."):
try:
response = requests.post(
f"{API_BASE_URL}/agent/{agent_id}/chat",
json={"message": prompt}
)
if response.status_code == 200:
result = response.json()
if result["success"]:
st.markdown(result["result"])
st.session_state.messages.append({
"role": "assistant",
"content": result["result"]
})
else:
st.error(f"执行失败: {result['error']}")
else:
st.error(f"请求失败: {response.text}")
except Exception as e:
st.error(f"请求失败: {str(e)}")
if __name__ == "__main__":
main()
12. 依赖文件 (requirements.txt)¶
Text Only
fastapi==0.115.0
uvicorn[standard]==0.32.0
python-multipart==0.0.12
pydantic==2.10.0
pydantic-settings==2.6.0
langchain==0.3.7
openai==1.55.0
requests==2.32.0
streamlit==1.40.0
python-dotenv==1.0.1
google-search-results==2.4.2
13. 环境变量文件 (.env.example)¶
Text Only
# OpenAI API配置
OPENAI_API_KEY=your_openai_api_key_here
# Serper搜索API配置
SERPER_API_KEY=your_serper_api_key_here
🚀 部署说明¶
1. 本地部署¶
步骤1: 克隆项目¶
步骤2: 创建虚拟环境¶
Bash
python -m venv venv
# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate
步骤3: 安装依赖¶
步骤4: 配置环境变量¶
步骤5: 启动后端服务¶
步骤6: 启动前端服务¶
2. Docker部署¶
Bash
# 构建镜像
docker build -t agent-workflow .
# 运行容器
docker run -d \
--name agent-workflow \
-p 8000:8000 \
-p 8501:8501 \
-e OPENAI_API_KEY=your_api_key_here \
-e SERPER_API_KEY=your_serper_key_here \
agent-workflow
🔧 扩展方向¶
1. 功能扩展¶
- 多Agent协作: 实现Agent之间的协作机制
- 任务队列: 添加任务队列管理
- 工作流模板: 预定义常用工作流
- 自定义工具: 支持用户自定义工具
- 插件系统: 支持第三方插件
2. 性能优化¶
- 异步执行: 全面支持异步操作
- 缓存机制: 缓存常用查询结果
- 并行执行: 并行执行独立任务
- 资源限制: 限制Agent资源使用
3. 用户体验¶
- 可视化: 增强执行过程可视化
- 调试模式: 添加调试和日志功能
- 导出功能: 支持导出工作流
- 分享功能: 支持分享Agent配置
4. 企业功能¶
- 权限管理: 添加用户权限控制
- 审计日志: 完整的操作审计
- 监控告警: 系统监控和告警
- API限流: API访问限流
📚 学习收获¶
完成本项目后,你将掌握:
- Agent系统原理: 理解智能体的核心概念
- LangChain Agents: 熟练使用LangChain构建Agent
- 工具开发: 开发自定义工具
- 任务规划: 实现任务分解和规划
- 记忆系统: 管理Agent的记忆
- 多Agent协作: 实现Agent之间的协作
- 系统设计: 设计复杂的AI系统
🎉 开始学习¶
现在你已经了解了整个AI Agent工作流系统的实现,开始动手构建你自己的智能Agent吧!
推荐学习顺序: 1. 先运行简单的Agent,体验基本功能 2. 然后尝试复杂任务,观察Agent的规划过程 3. 开发自定义工具,扩展Agent能力 4. 实现多Agent协作,处理复杂场景
祝你学习顺利! 💪