MCP与工具生态¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
深入掌握Model Context Protocol (MCP)协议规范、Server/Client开发,构建可复用的Agent工具生态。
📌 定位说明:本章聚焦MCP协议的编码实战(Server/Client开发)。MCP协议的研究前沿与生态全景请参考 LLM学习/04-前沿探索/09-MCP协议与工具生态。
📖 章节导读¶
Model Context Protocol (MCP) 是由Anthropic推动的开放标准协议,旨在为AI模型提供统一的工具和数据访问接口。MCP正在形成跨框架工具生态的通用基础——就像HTTP之于Web,MCP之于AI Agent。本章将深入理解MCP协议、掌握Server/Client开发,并与传统Function Calling进行对比。
🎯 学习目标¶
- 理解MCP协议的设计理念和核心概念
- 掌握MCP Server/Client架构
- 能够用Python SDK开发MCP Server
- 理解Function Calling与MCP的区别和联系
- 学会集成常用MCP工具
- 完成实战项目:开发自定义MCP Server
📖 前置知识¶
- Agent基础与Tool Calling(第一章)
- Python异步编程(asyncio)
- 基本的客户端/服务器架构知识
- JSON-RPC概念
1. MCP概述¶
1.1 什么是MCP?¶
Model Context Protocol (MCP) 是一个开放的通信协议,标准化了AI应用与外部数据源/工具之间的交互方式。
类比理解:
USB协议 之于 外设连接 ≈ MCP 之于 AI工具连接
没有USB: 每种外设需要专用接口 → 没有MCP: 每个工具需要专门适配
有了USB: 统一接口,即插即用 → 有了MCP: 统一协议,工具通用
1.2 为什么需要MCP?¶
没有MCP之前的痛点:
# ❌ 传统方式:每个模型/框架需要单独适配工具
# OpenAI的工具定义
openai_tools = [{
"type": "function",
"function": {"name": "search", "parameters": {...}}
}]
# LangChain的工具定义
from langchain_core.tools import tool
@tool
def search(query: str) -> str: ...
# CrewAI的工具定义
from crewai_tools import tool
@tool("search")
def search(query: str) -> str: ...
# 问题: 同一个工具要写3次!不同框架之间不通用
# ✅ MCP方式:一次开发,到处使用
# 开发一个MCP Server
# → OpenAI Agent可以用
# → Claude可以用
# → LangGraph可以用
# → 任何支持MCP的客户端都可以用
1.3 MCP核心概念¶
┌─────────────────────────────────────────────────┐
│ MCP 架构 │
│ │
│ ┌──────────┐ MCP协议 ┌──────────────┐ │
│ │ MCP Host │ ◄──────────────► │ MCP Server │ │
│ │ (AI应用) │ JSON-RPC 2.0 │ (工具提供方) │ │
│ │ │ │ │ │
│ │ ┌──────┐ │ │ ┌──────────┐ │ │
│ │ │Client│ │ │ │Resources │ │ │
│ │ └──────┘ │ │ │Tools │ │ │
│ │ │ │ │Prompts │ │ │
│ └──────────┘ └──────────────┘ │
└─────────────────────────────────────────────────┘
三大核心能力:
| 能力 | 说明 | 类比 |
|---|---|---|
| Resources | 提供数据/上下文(只读) | 文件系统中的文件 |
| Tools | 可执行的操作(有副作用) | 命令行工具 |
| Prompts | 预定义的提示模板 | 函数模板 |
1.4 MCP协议通信¶
MCP基于JSON-RPC 2.0协议,支持两种主要传输方式:
"""
MCP通信协议说明
"""
# === 1. stdio传输(本地进程) ===
# 适用于: 本地工具,VS Code/Claude Desktop集成
# MCP Client和Server通过stdin/stdout通信
# 启动方式: 作为子进程运行
# === 2. Streamable HTTP传输(HTTP远程,推荐) ===
# 适用于: 远程服务,云端部署
# MCP Client通过HTTP与Server通信,支持流式响应
# 支持网络部署和多客户端连接
# 注意: 一些实现仍使用SSE,但在最新规范/实现中更推荐Streamable HTTP
# === JSON-RPC消息示例 ===
# 客户端请求: 调用工具
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "search_web",
"arguments": {"query": "MCP protocol"}
}
}
# 服务器响应
response = {
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "搜索结果: MCP是Anthropic提出的开放协议..."
}
]
}
}
# 通知(无需响应)
notification = {
"jsonrpc": "2.0",
"method": "notifications/resources/updated",
"params": {"uri": "file:///data/report.md"}
}
2. MCP Server开发¶
2.1 安装MCP Python SDK¶
2.2 第一个MCP Server¶
"""
第一个MCP Server:简单的计算器工具
文件: calculator_server.py
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
import json
# 创建MCP Server实例
server = Server("calculator-server")
# === 注册工具列表 ===
@server.list_tools()
async def list_tools() -> list[Tool]:
"""返回该Server提供的所有工具"""
return [
Tool(
name="add",
description="计算两个数的和",
inputSchema={
"type": "object",
"properties": {
"a": {"type": "number", "description": "第一个数"},
"b": {"type": "number", "description": "第二个数"},
},
"required": ["a", "b"],
},
),
Tool(
name="multiply",
description="计算两个数的乘积",
inputSchema={
"type": "object",
"properties": {
"a": {"type": "number", "description": "第一个数"},
"b": {"type": "number", "description": "第二个数"},
},
"required": ["a", "b"],
},
),
Tool(
name="power",
description="计算幂运算 a的b次方",
inputSchema={
"type": "object",
"properties": {
"base": {"type": "number", "description": "底数"},
"exponent": {"type": "number", "description": "指数"},
},
"required": ["base", "exponent"],
},
),
]
# === 处理工具调用 ===
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""处理工具调用请求"""
if name == "add":
result = arguments["a"] + arguments["b"]
return [TextContent(type="text", text=f"计算结果: {arguments['a']} + {arguments['b']} = {result}")]
elif name == "multiply":
result = arguments["a"] * arguments["b"]
return [TextContent(type="text", text=f"计算结果: {arguments['a']} × {arguments['b']} = {result}")]
elif name == "power":
result = arguments["base"] ** arguments["exponent"]
return [TextContent(type="text", text=f"计算结果: {arguments['base']}^{arguments['exponent']} = {result}")]
else:
return [TextContent(type="text", text=f"未知工具: {name}")]
# === 启动Server ===
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options(),
)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
2.3 提供Resources(数据资源)¶
"""
MCP Server - 提供Resources
Resources是只读的数据源,类似文件系统
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Resource, TextContent, Tool
import json
from pathlib import Path
server = Server("data-server")
# 模拟数据
DATASET = {
"users": [
{"id": 1, "name": "张三", "role": "engineer"},
{"id": 2, "name": "李四", "role": "designer"},
{"id": 3, "name": "王五", "role": "manager"},
],
"projects": [
{"id": "P001", "name": "Agent平台", "status": "active"},
{"id": "P002", "name": "RAG系统", "status": "completed"},
],
}
# === 注册资源列表 ===
@server.list_resources()
async def list_resources() -> list[Resource]:
"""返回可用的数据资源"""
return [
Resource(
uri="data://users",
name="用户列表",
description="系统中所有用户的信息",
mimeType="application/json",
),
Resource(
uri="data://projects",
name="项目列表",
description="所有项目的状态信息",
mimeType="application/json",
),
]
# === 读取资源内容 ===
@server.read_resource()
async def read_resource(uri: str) -> str:
"""根据URI读取资源内容"""
if uri == "data://users":
return json.dumps(DATASET["users"], ensure_ascii=False, indent=2)
elif uri == "data://projects":
return json.dumps(DATASET["projects"], ensure_ascii=False, indent=2)
else:
raise ValueError(f"未知资源: {uri}")
# === 同时提供工具 ===
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query_user",
description="根据ID查询用户信息",
inputSchema={
"type": "object",
"properties": {
"user_id": {"type": "integer", "description": "用户ID"},
},
"required": ["user_id"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "query_user":
user_id = arguments["user_id"]
for user in DATASET["users"]:
if user["id"] == user_id:
return [TextContent(type="text", text=json.dumps(user, ensure_ascii=False))]
return [TextContent(type="text", text=f"未找到ID为{user_id}的用户")]
return [TextContent(type="text", text=f"未知工具: {name}")]
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
2.4 提供Prompts(提示模板)¶
"""
MCP Server - 提供Prompts
Prompts是预定义的提示模板,帮助用户更高效地使用工具
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Prompt, PromptMessage, PromptArgument, TextContent
server = Server("prompt-server")
@server.list_prompts()
async def list_prompts() -> list[Prompt]:
"""返回可用的提示模板"""
return [
Prompt(
name="code_review",
description="代码审查提示模板",
arguments=[
PromptArgument(
name="code",
description="要审查的代码",
required=True,
),
PromptArgument(
name="language",
description="编程语言",
required=False,
),
],
),
Prompt(
name="summarize_paper",
description="论文摘要提示模板",
arguments=[
PromptArgument(
name="paper_title",
description="论文标题",
required=True,
),
PromptArgument(
name="focus_area",
description="关注的方面",
required=False,
),
],
),
]
@server.get_prompt()
async def get_prompt(name: str, arguments: dict | None = None) -> list[PromptMessage]:
"""获取填充后的提示模板"""
arguments = arguments or {}
if name == "code_review":
code = arguments.get("code", "")
language = arguments.get("language", "Python")
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"""请对以下{language}代码进行全面审查:
请从以下方面进行审查:
1. **正确性**: 代码逻辑是否正确?
2. **性能**: 是否有性能优化的空间?
3. **安全性**: 是否存在安全漏洞?
4. **可读性**: 代码是否清晰易懂?
5. **最佳实践**: 是否符合{language}的编码规范?
请给出具体的改进建议和示例代码。""",
),
),
]
elif name == "summarize_paper":
title = arguments.get("paper_title", "")
focus = arguments.get("focus_area", "整体内容")
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"""请为论文《{title}》生成一份结构化摘要。
重点关注: {focus}
请按以下格式输出:
## 论文信息
- 标题:
- 研究问题:
## 核心方法
(描述论文提出的主要方法)
## 关键发现
(列出主要实验结果和发现)
## 创新点
(总结论文的主要贡献)
## 局限性
(分析不足和未来工作方向)""",
),
),
]
raise ValueError(f"未知提示模板: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
3. MCP Client开发¶
3.1 连接MCP Server¶
"""
MCP Client:连接并调用MCP Server
"""
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
import asyncio
import json
async def main():
# 配置要连接的MCP Server
server_params = StdioServerParameters(
command="python",
args=["calculator_server.py"], # 第2.2节的计算器Server
)
# 建立连接
# 嵌套async with:外层建立传输层连接,内层初始化MCP会话,退出时自动清理资源
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 初始化连接
await session.initialize()
# === 1. 获取可用工具列表 ===
tools = await session.list_tools()
print("可用工具:")
for tool in tools.tools:
print(f" - {tool.name}: {tool.description}")
# === 2. 调用工具 ===
result = await session.call_tool("add", {"a": 15, "b": 27})
print(f"\n调用add(15, 27): {result.content[0].text}")
result = await session.call_tool("multiply", {"a": 6, "b": 7})
print(f"调用multiply(6, 7): {result.content[0].text}")
result = await session.call_tool("power", {"base": 2, "exponent": 10})
print(f"调用power(2, 10): {result.content[0].text}")
# === 3. 获取资源(如果Server提供) ===
try:
resources = await session.list_resources()
print("\n可用资源:")
for resource in resources.resources:
print(f" - {resource.uri}: {resource.name}")
except Exception:
print("\n该Server未提供资源")
asyncio.run(main())
3.2 将MCP工具集成到Agent¶
"""
将MCP Server的工具集成到OpenAI Agent中
MCP + OpenAI Tool Calling的桥接
"""
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI
import asyncio
import json
class MCPAgentBridge:
"""MCP到OpenAI Agent的桥接器"""
def __init__(self):
self.openai = OpenAI()
self.sessions: dict[str, ClientSession] = {}
self.tool_to_session: dict[str, str] = {}
async def connect_server(self, name: str, command: str, args: list[str]):
"""连接一个MCP Server"""
server_params = StdioServerParameters(command=command, args=args)
# 注意:实际使用中需要管理连接的生命周期
# 注意:实际应用中需要保存上下文管理器引用,在 disconnect 中调用 __aexit__
self._stdio_cm = stdio_client(server_params)
read, write = await self._stdio_cm.__aenter__()
self._session_cm = ClientSession(read, write)
session = await self._session_cm.__aenter__()
await session.initialize()
self.sessions[name] = session
# 获取并注册工具
tools = await session.list_tools()
for tool in tools.tools:
self.tool_to_session[tool.name] = name
def get_openai_tools(self) -> list[dict]:
"""将所有MCP工具转换为OpenAI Tool Calling格式
⚠️ 同步版本未实现,MCP SDK的工具获取为异步操作,
请使用异步版本 get_all_tools()。
"""
raise NotImplementedError("请使用异步方法 get_all_tools()")
return []
async def get_all_tools(self) -> list[dict]:
"""异步获取所有工具定义"""
all_tools = []
for session_name, session in self.sessions.items():
tools = await session.list_tools()
for tool in tools.tools:
all_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": tool.inputSchema,
},
})
return all_tools
async def call_tool(self, name: str, arguments: dict) -> str:
"""调用MCP工具"""
session_name = self.tool_to_session.get(name)
if not session_name:
return json.dumps({"error": f"工具 {name} 不存在"})
session = self.sessions[session_name]
result = await session.call_tool(name, arguments)
# 提取文本内容
# 带条件的列表推导式:遍历result.content,用hasattr过滤出有text属性的元素
texts = [content.text for content in result.content if hasattr(content, "text")]
return "\n".join(texts)
async def chat(self, user_message: str) -> str:
"""与Agent对话"""
tools = await self.get_all_tools()
messages = [
{"role": "system", "content": "你是一个智能助手,可以使用各种工具来帮助用户。"},
{"role": "user", "content": user_message},
]
max_iterations = 10 # 防止无限循环
for _ in range(max_iterations):
response = self.openai.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools if tools else None,
)
message = response.choices[0].message
if not message.tool_calls:
return message.content
messages.append(message)
for tool_call in message.tool_calls:
result = await self.call_tool(
tool_call.function.name,
json.loads(tool_call.function.arguments),
)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result,
})
# 使用示例
async def main():
bridge = MCPAgentBridge()
# 连接多个MCP Server
await bridge.connect_server("calculator", "python", ["calculator_server.py"])
await bridge.connect_server("data", "python", ["data_server.py"])
# 对话
response = await bridge.chat("帮我计算 2的20次方,然后查询用户ID为1的信息")
print(f"回答: {response}")
# asyncio.run(main())
4. Function Calling vs MCP对比¶
4.1 详细对比¶
"""
Function Calling vs MCP 对比说明
"""
comparison = {
"Function Calling": {
"定义": "OpenAI提出的让模型调用函数的能力",
"范围": "单一模型提供商的功能",
"工具定义": "JSON Schema,嵌入在API请求中",
"执行方": "客户端(开发者的代码)",
"通信": "HTTP API请求/响应",
"状态": "无状态",
"优点": [
"简单直接,无需额外基础设施",
"与OpenAI API深度集成",
"低延迟(无额外通信开销)",
],
"缺点": [
"工具定义与应用代码耦合",
"不同框架间不通用",
"无标准的工具发现机制",
],
},
"MCP": {
"定义": "开放的AI工具通信标准协议",
"范围": "跨模型、跨框架的通用标准",
"工具定义": "MCP Server声明,支持动态发现",
"执行方": "MCP Server(独立进程/服务)",
"通信": "JSON-RPC 2.0 over stdio/Streamable HTTP",
"状态": "有状态(支持会话和资源订阅)",
"优点": [
"一次开发,到处使用",
"支持动态工具发现",
"提供Resources和Prompts等额外能力",
"独立部署和版本管理",
],
"缺点": [
"额外的复杂度(需要运行Server进程)",
"通信开销(JSON-RPC序列化)",
"生态还在早期发展阶段",
],
},
}
4.2 何时用Function Calling,何时用MCP?¶
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 快速原型开发 | Function Calling | 简单直接,无需额外基础设施 |
| 单一应用,工具固定 | Function Calling | 不需要跨应用复用 |
| 工具被多个应用/框架使用 | MCP | 一次开发,多处复用 |
| 工具需独立部署和更新 | MCP | Server独立运行,不影响客户端 |
| 需要动态工具发现 | MCP | MCP原生支持工具列表查询 |
| 需要数据资源和模板 | MCP | MCP提供Resources和Prompts |
| IDE/编辑器集成 | MCP | VS Code/Claude Desktop等原生支持 |
4.3 混合使用模式¶
"""
混合模式:在OpenAI Agent中同时使用Function Calling和MCP
"""
from openai import OpenAI
import json
client = OpenAI()
class HybridAgent:
"""混合工具Agent:同时支持本地函数和MCP工具"""
def __init__(self):
self.local_tools = {} # Function Calling工具
self.local_schemas = []
self.mcp_bridge = None # MCP桥接器
def add_local_tool(self, name: str, description: str, parameters: dict, func):
"""注册本地工具(Function Calling)"""
self.local_tools[name] = func
self.local_schemas.append({
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters,
},
})
async def get_all_tools(self) -> list[dict]:
"""获取所有工具(本地 + MCP)"""
all_tools = self.local_schemas.copy()
if self.mcp_bridge:
mcp_tools = await self.mcp_bridge.get_all_tools()
all_tools.extend(mcp_tools)
return all_tools
async def execute_tool(self, name: str, arguments: dict) -> str:
"""执行工具:优先本地,否则通过MCP"""
if name in self.local_tools:
result = self.local_tools[name](**arguments)
# 三元表达式+isinstance多类型检查:dict/list转JSON字符串,其他类型转str
return json.dumps(result, ensure_ascii=False) if isinstance(result, (dict, list)) else str(result)
if self.mcp_bridge:
return await self.mcp_bridge.call_tool(name, arguments)
return json.dumps({"error": f"工具 {name} 不存在"})
5. 常用MCP工具集成¶
5.1 文件系统MCP Server¶
"""
文件系统MCP Server
提供文件读写、目录浏览等能力
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Resource
from pathlib import Path
import json
server = Server("filesystem-server")
# 限制访问的根目录(安全措施)
ALLOWED_ROOT = Path("./workspace")
def safe_path(filepath: str) -> Path:
"""安全路径检查,防止路径穿越攻击"""
full_path = (ALLOWED_ROOT / filepath).resolve()
if not str(full_path).startswith(str(ALLOWED_ROOT.resolve())):
raise ValueError("路径超出允许范围")
return full_path
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="read_file",
description="读取文件内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件相对路径"},
"encoding": {"type": "string", "description": "编码格式", "default": "utf-8"},
},
"required": ["path"],
},
),
Tool(
name="write_file",
description="写入文件内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"content": {"type": "string", "description": "文件内容"},
},
"required": ["path", "content"],
},
),
Tool(
name="list_directory",
description="列出目录内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "目录路径", "default": "."},
},
},
),
Tool(
name="search_files",
description="在文件中搜索内容",
inputSchema={
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "搜索的文件名模式,如 *.py"},
"keyword": {"type": "string", "description": "要搜索的关键词"},
},
"required": ["pattern"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
if name == "read_file":
path = safe_path(arguments["path"])
encoding = arguments.get("encoding", "utf-8")
content = path.read_text(encoding=encoding)
return [TextContent(type="text", text=content)]
elif name == "write_file":
path = safe_path(arguments["path"])
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(arguments["content"], encoding="utf-8")
return [TextContent(type="text", text=f"✅ 文件已写入: {arguments['path']}")]
elif name == "list_directory":
path = safe_path(arguments.get("path", "."))
if not path.is_dir():
return [TextContent(type="text", text=f"❌ 不是目录: {arguments.get('path', '.')}")]
items = []
for item in sorted(path.iterdir()):
icon = "📁" if item.is_dir() else "📄"
size = item.stat().st_size if item.is_file() else 0
items.append(f"{icon} {item.name} ({size} bytes)" if size else f"{icon} {item.name}/")
return [TextContent(type="text", text="\n".join(items) or "(空目录)")]
elif name == "search_files":
pattern = arguments["pattern"]
keyword = arguments.get("keyword", "")
path = ALLOWED_ROOT
results = []
for file_path in path.rglob(pattern):
if keyword and file_path.is_file():
content = file_path.read_text(encoding="utf-8", errors="ignore")
if keyword in content:
results.append(str(file_path.relative_to(ALLOWED_ROOT)))
elif not keyword:
results.append(str(file_path.relative_to(ALLOWED_ROOT)))
return [TextContent(type="text", text="\n".join(results[:20]) or "未找到匹配文件")]
return [TextContent(type="text", text=f"未知工具: {name}")]
except Exception as e:
return [TextContent(type="text", text=f"❌ 错误: {str(e)}")]
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
5.2 数据库MCP Server¶
"""
SQLite数据库MCP Server
提供SQL查询和数据操作能力
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Resource
import sqlite3
import json
server = Server("sqlite-server")
DB_PATH = "data.db"
def get_connection():
"""获取数据库连接"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query",
description="执行SQL查询(只支持SELECT)",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL查询语句(仅SELECT)"},
},
"required": ["sql"],
},
),
Tool(
name="list_tables",
description="列出数据库中所有表",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="describe_table",
description="查看表结构",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "表名"},
},
"required": ["table_name"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
conn = get_connection()
if name == "query":
sql = arguments["sql"].strip()
# 安全检查:只允许单条SELECT语句(禁止多语句注入)
if not sql.upper().startswith("SELECT") or ";" in sql:
return [TextContent(type="text", text="❌ 安全限制:只允许单条SELECT查询")]
cursor = conn.execute(sql)
rows = [dict(row) for row in cursor.fetchall()] # 将sqlite3.Row对象转为普通dict,便于JSON序列化
conn.close()
return [TextContent(type="text", text=json.dumps(rows, ensure_ascii=False, indent=2))]
elif name == "list_tables":
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row["name"] for row in cursor.fetchall()]
conn.close()
return [TextContent(type="text", text=f"数据库表: {', '.join(tables)}")]
elif name == "describe_table":
table = arguments["table_name"]
# ⚠️ 安全检查:白名单验证表名,防止SQL注入
valid_tables_cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
valid_tables = {row["name"] for row in valid_tables_cursor.fetchall()}
if table not in valid_tables:
conn.close()
return [TextContent(type="text", text=f"❌ 表 '{table}' 不存在")]
cursor = conn.execute(f"PRAGMA table_info({table})")
columns = []
for row in cursor.fetchall():
columns.append(f" {dict(row)['name']} ({dict(row)['type']})")
conn.close()
return [TextContent(type="text", text=f"表 {table} 的结构:\n" + "\n".join(columns))]
conn.close()
return [TextContent(type="text", text=f"未知工具: {name}")]
except Exception as e:
return [TextContent(type="text", text=f"❌ 错误: {str(e)}")]
async def main():
# 初始化示例数据库
conn = sqlite3.connect(DB_PATH)
conn.execute("""CREATE TABLE IF NOT EXISTS papers (
id INTEGER PRIMARY KEY,
title TEXT, authors TEXT, year INTEGER, citations INTEGER
)""")
conn.execute("INSERT OR IGNORE INTO papers VALUES (1, 'Attention Is All You Need', 'Vaswani et al.', 2017, 90000)")
conn.execute("INSERT OR IGNORE INTO papers VALUES (2, 'BERT', 'Devlin et al.', 2019, 80000)")
conn.execute("INSERT OR IGNORE INTO papers VALUES (3, 'ReAct', 'Yao et al.', 2023, 1500)")
conn.commit()
conn.close()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
5.3 MCP配置文件¶
{
"mcpServers": {
"filesystem": {
"command": "python",
"args": ["servers/filesystem_server.py"],
"env": {
"WORKSPACE_ROOT": "/path/to/workspace"
}
},
"database": {
"command": "python",
"args": ["servers/sqlite_server.py"],
"env": {
"DB_PATH": "./data.db"
}
},
"calculator": {
"command": "python",
"args": ["servers/calculator_server.py"]
}
}
}
6. 实战项目:开发自定义MCP Server¶
6.1 项目:GitHub集成MCP Server¶
"""
实战项目:GitHub MCP Server
提供GitHub仓库信息查询、Issue管理等能力
"""
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, Resource
import json
import os
from datetime import datetime
server = Server("github-server")
# 模拟GitHub API数据(实际应用中使用httpx调用GitHub API)
MOCK_REPOS = {
"langchain-ai/langgraph": {
"name": "langgraph",
"description": "Build resilient language agents as graphs.",
"stars": 8500,
"language": "Python",
"last_updated": "2026-02-01",
"issues": [
{"id": 1, "title": "StateGraph循环检测bug", "state": "open", "labels": ["bug"]},
{"id": 2, "title": "添加Redis Checkpointer", "state": "open", "labels": ["enhancement"]},
{"id": 3, "title": "文档:条件路由示例", "state": "closed", "labels": ["documentation"]},
],
},
"openai/openai-agents-python": {
"name": "openai-agents-python",
"description": "OpenAI Agents SDK for Python",
"stars": 5200,
"language": "Python",
"last_updated": "2026-01-28",
"issues": [
{"id": 1, "title": "Handoff状态丢失问题", "state": "open", "labels": ["bug"]},
{"id": 2, "title": "支持自定义Tracing后端", "state": "open", "labels": ["enhancement"]},
],
},
}
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="get_repo_info",
description="获取GitHub仓库基本信息",
inputSchema={
"type": "object",
"properties": {
"repo": {"type": "string", "description": "仓库全名,格式: owner/repo"},
},
"required": ["repo"],
},
),
Tool(
name="list_issues",
description="列出仓库的Issues",
inputSchema={
"type": "object",
"properties": {
"repo": {"type": "string", "description": "仓库全名"},
"state": {"type": "string", "enum": ["open", "closed", "all"], "description": "Issue状态"},
"label": {"type": "string", "description": "标签过滤"},
},
"required": ["repo"],
},
),
Tool(
name="search_repos",
description="搜索GitHub仓库",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索关键词"},
"language": {"type": "string", "description": "编程语言过滤"},
},
"required": ["query"],
},
),
Tool(
name="get_repo_readme",
description="获取仓库的README内容",
inputSchema={
"type": "object",
"properties": {
"repo": {"type": "string", "description": "仓库全名"},
},
"required": ["repo"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
if name == "get_repo_info":
repo_name = arguments["repo"]
repo = MOCK_REPOS.get(repo_name)
if not repo:
return [TextContent(type="text", text=f"未找到仓库: {repo_name}")]
info = f"""📦 仓库: {repo_name}
📝 描述: {repo['description']}
⭐ Stars: {repo['stars']}
💻 语言: {repo['language']}
📅 最近更新: {repo['last_updated']}
🐛 Open Issues: {sum(1 for i in repo['issues'] if i['state'] == 'open')}"""
return [TextContent(type="text", text=info)]
elif name == "list_issues":
repo_name = arguments["repo"]
state = arguments.get("state", "all")
label = arguments.get("label")
repo = MOCK_REPOS.get(repo_name)
if not repo:
return [TextContent(type="text", text=f"未找到仓库: {repo_name}")]
issues = repo["issues"]
if state != "all":
issues = [i for i in issues if i["state"] == state]
if label:
issues = [i for i in issues if label in i["labels"]]
if not issues:
return [TextContent(type="text", text="没有匹配的Issues")]
lines = [f"📋 {repo_name} Issues (filter: state={state}):"]
for issue in issues:
icon = "🟢" if issue["state"] == "open" else "🔴"
labels = " ".join(f"[{l}]" for l in issue["labels"])
lines.append(f" {icon} #{issue['id']} {issue['title']} {labels}")
return [TextContent(type="text", text="\n".join(lines))]
elif name == "search_repos":
query = arguments["query"].lower()
language = arguments.get("language", "").lower()
results = []
for repo_name, repo in MOCK_REPOS.items():
if query in repo_name.lower() or query in repo["description"].lower():
if not language or repo["language"].lower() == language:
results.append(f"⭐ {repo['stars']} | {repo_name} - {repo['description']}")
if not results:
return [TextContent(type="text", text=f"未找到匹配'{query}'的仓库")]
return [TextContent(type="text", text="搜索结果:\n" + "\n".join(results))]
elif name == "get_repo_readme":
repo_name = arguments["repo"]
# 模拟README内容
return [TextContent(type="text", text=f"# {repo_name}\n\n这是{repo_name}的README内容...")]
return [TextContent(type="text", text=f"未知工具: {name}")]
except Exception as e:
return [TextContent(type="text", text=f"❌ 错误: {str(e)}")]
# === Resources: 提供热门仓库列表 ===
@server.list_resources()
async def list_resources() -> list[Resource]:
return [
Resource(
uri="github://trending",
name="热门AI Agent仓库",
description="当前最热门的AI Agent相关仓库列表",
mimeType="application/json",
),
]
@server.read_resource()
async def read_resource(uri: str) -> str:
if uri == "github://trending":
# sorted+lambda对字典items按stars降序排序,解构赋值name,data分别获取键和值
trending = [
{"repo": name, "stars": data["stars"], "description": data["description"]}
for name, data in sorted(MOCK_REPOS.items(), key=lambda x: x[1]["stars"], reverse=True)
]
return json.dumps(trending, ensure_ascii=False, indent=2)
raise ValueError(f"未知资源: {uri}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
7. 面试常考题¶
Q1: MCP和Function Calling有什么区别?¶
高频面试题 ⭐⭐⭐⭐⭐
答: Function Calling是模型提供商(如OpenAI)的专有功能,工具定义嵌入在API请求中,工具执行在客户端;MCP是开放标准协议,工具以独立Server形式运行,支持跨模型/跨框架复用。MCP额外提供Resources(数据资源)和Prompts(模板)能力。
Q2: MCP用什么通信协议?为什么?¶
面试题 ⭐⭐⭐⭐
答: MCP基于JSON-RPC 2.0协议。选择JSON-RPC因为:1)简单标准,广泛支持;2)支持请求-响应和通知两种模式;3)方便batch请求;4)传输层可插拔(stdio/Streamable HTTP等)。
Q3: 如何保证MCP Server的安全性?¶
面试题 ⭐⭐⭐⭐
答: 1)路径安全检查,防止路径穿越;2)SQL注入防护,只允许SELECT;3)权限最小化原则;4)限制访问范围(如指定根目录);5)输入参数验证;6)速率限制;7)审计日志。
📝 本章小结¶
- ✅ 理解了MCP协议的设计理念和核心概念
- ✅ 掌握了MCP Server开发(Tools/Resources/Prompts)
- ✅ 学会了MCP Client连接和工具调用
- ✅ 理解了Function Calling与MCP的区别
- ✅ 完成了GitHub MCP Server实战项目
💡 Agent协议全景:MCP解决Agent调用工具的问题,A2A解决Agent相互协作的问题,ANP解决Agent跨网络发现和信任的问题。三者构成Agent通信协议栈的三个层级,详见 04-多Agent系统与实战 §2.3。
✅ 学习检查清单¶
- 能解释MCP协议的三大核心能力(Tools/Resources/Prompts)
- 能用Python SDK开发一个MCP Server
- 能编写MCP Client连接并调用Server
- 理解stdio和Streamable HTTP两种主要传输方式的区别
- 能对比Function Calling和MCP的优劣
- 能将MCP工具集成到OpenAI Agent中
- 完成自定义MCP Server实战项目
- 了解MCP安全最佳实践
🔗 下一步¶
下一章我们将学习多Agent系统设计与综合实战项目。
继续学习: 04-多Agent系统与实战
📚 参考资料¶
- MCP Official Specification
- MCP Python SDK
- MCP Servers Repository
- Anthropic MCP Blog
- OpenAI Function Calling Guide
祝你学习愉快! 🎉
最后更新日期:2026-02-12 适用版本:AI Agent开发实战教程 v2026
