09. MCP协议与AI工具生态系统¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
📌 定位说明:本章侧重MCP协议的研究前沿与生态全景分析。MCP Server/Client编码实战请参考 AI Agent开发实战/03-MCP与工具生态。
目录¶
1. MCP协议概述¶
1.1 什么是MCP¶
MCP (Model Context Protocol) 是由Anthropic于2024年底推出的开放协议,旨在标准化AI模型与外部工具、数据源之间的连接方式。
Text Only
┌─────────────────────────────────────────────────────────────────┐
│ MCP协议核心目标 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 标准化接口 │
│ • 统一的工具发现机制 │
│ • 一致的调用语义 │
│ • 跨平台兼容性 │
│ │
│ 2. 解耦架构 │
│ • 模型与工具分离 │
│ • 独立演进 │
│ • 即插即用 │
│ │
│ 3. 安全可控 │
│ • 细粒度权限控制 │
│ • 审计追踪 │
│ • 人在回路 │
│ │
│ 4. 生态开放 │
│ • 开源协议 │
│ • 社区驱动 │
│ • 工具共享 │
│ │
└─────────────────────────────────────────────────────────────────┘
1.2 MCP与传统工具调用的对比¶
| 特性 | 传统Function Calling | MCP协议 |
|---|---|---|
| 标准化 | 各平台独立实现 | 统一协议规范 |
| 发现机制 | 静态配置 | 动态发现 |
| 能力描述 | 简单schema | 丰富的metadata |
| 权限控制 | 应用层控制 | 协议层支持 |
| 跨平台 | 困难 | 原生支持 |
| 生态 | 封闭 | 开放共享 |
1.3 MCP生态系统概览¶
Text Only
┌─────────────────────────────────────────────────────────────────┐
│ MCP生态系统 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Claude │ │ GPT-4 │ │ Gemini │ │
│ │ Desktop │ │ (OpenAI) │ │ (Google) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ ↓ │
│ ┌─────────────────────────┐ │
│ │ MCP Client SDK │ │
│ │ (Python/TypeScript/...)│ │
│ └───────────┬─────────────┘ │
│ ↓ │
│ ┌─────────────────────────┐ │
│ │ MCP Protocol │ │
│ │ (JSON-RPC/stdio/sse) │ │
│ └───────────┬─────────────┘ │
│ ↓ │
│ ┌────────────────┼────────────────┐ │
│ ↓ ↓ ↓ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ File System │ │ GitHub │ │ Slack │ │
│ │ MCP Server │ │ MCP Server │ │ MCP Server │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ PostgreSQL │ │ Brave │ │ Stripe │ │
│ │ MCP Server │ │ Search │ │ MCP Server │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
2. MCP架构与核心概念¶
2.1 核心组件¶
Python
"""
MCP核心组件架构
1. MCP Server: 提供工具、资源和能力的后端服务
2. MCP Client: 与Server通信的客户端组件
3. Protocol: 定义通信规范
4. Transport: 数据传输层(stdio/sse/http)
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass
from enum import Enum
import json
class MCPError(Exception):
"""MCP协议错误基类"""
pass
class ToolNotFoundError(MCPError):
"""工具未找到错误"""
pass
class PermissionDeniedError(MCPError):
"""权限拒绝错误"""
pass
@dataclass # @dataclass自动生成__init__等方法
class MCPTool:
"""MCP工具定义"""
name: str
description: str
input_schema: Dict[str, Any] # JSON Schema
output_schema: Optional[Dict[str, Any]] = None # Optional表示值可以为None
annotations: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict:
"""转换为字典表示"""
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema,
"outputSchema": self.output_schema,
"annotations": self.annotations
}
@dataclass
class MCPResource:
"""MCP资源定义"""
uri: str
name: str
description: str
mime_type: Optional[str] = None
def to_dict(self) -> Dict:
return {
"uri": self.uri,
"name": self.name,
"description": self.description,
"mimeType": self.mime_type
}
@dataclass
class MCPCapability:
"""MCP能力声明"""
tools: bool = False
resources: bool = False
prompts: bool = False
logging: bool = False
def to_dict(self) -> Dict:
result = {}
if self.tools:
result["tools"] = {}
if self.resources:
result["resources"] = {}
if self.prompts:
result["prompts"] = {}
if self.logging:
result["logging"] = {}
return result
2.2 协议消息格式¶
Python
class MCPMessage:
"""MCP协议消息基类"""
def __init__(self, jsonrpc: str = "2.0"):
self.jsonrpc = jsonrpc
class MCPRequest(MCPMessage):
"""MCP请求消息"""
def __init__(self, method: str, params: Dict = None, request_id: str = None):
super().__init__() # super()调用父类方法
self.id = request_id or self._generate_id()
self.method = method
self.params = params or {}
def to_dict(self) -> Dict:
return {
"jsonrpc": self.jsonrpc,
"id": self.id,
"method": self.method,
"params": self.params
}
def _generate_id(self) -> str:
import uuid
return str(uuid.uuid4())
class MCPResponse(MCPMessage):
"""MCP响应消息"""
def __init__(self, request_id: str, result: Any = None, error: Dict = None):
super().__init__()
self.id = request_id
self.result = result
self.error = error
def to_dict(self) -> Dict:
result = {
"jsonrpc": self.jsonrpc,
"id": self.id
}
if self.error:
result["error"] = self.error
else:
result["result"] = self.result
return result
@classmethod # 类方法:cls指向类本身,cls(...)等价于MCPResponse(...),是工厂方法模式
def success(cls, request_id: str, result: Any) -> "MCPResponse":
return cls(request_id, result=result) # cls()创建新实例,等价于MCPResponse()
@classmethod # @classmethod接收类作为第一个参数
def error(cls, request_id: str, code: int, message: str, data: Any = None) -> "MCPResponse":
error = {"code": code, "message": message}
if data:
error["data"] = data
return cls(request_id, error=error)
class MCPNotification(MCPMessage):
"""MCP通知消息(无响应)"""
def __init__(self, method: str, params: Dict = None):
super().__init__()
self.method = method
self.params = params or {}
def to_dict(self) -> Dict:
return {
"jsonrpc": self.jsonrpc,
"method": self.method,
"params": self.params
}
# MCP协议方法定义
class MCPMethods:
"""MCP标准方法"""
# 初始化
INITIALIZE = "initialize"
INITIALIZED = "initialized"
# 工具
TOOLS_LIST = "tools/list"
TOOLS_CALL = "tools/call"
# 资源
RESOURCES_LIST = "resources/list"
RESOURCES_READ = "resources/read"
RESOURCES_SUBSCRIBE = "resources/subscribe"
# 提示
PROMPTS_LIST = "prompts/list"
PROMPTS_GET = "prompts/get"
# 能力
COMPLETION_COMPLETE = "completion/complete"
# 日志
LOGGING_SET_LEVEL = "logging/setLevel"
# 生命周期
PING = "ping"
# 通知
NOTIFICATIONS_PROGRESS = "notifications/progress"
NOTIFICATIONS_MESSAGE = "notifications/message"
NOTIFICATIONS_RESOURCES_UPDATED = "notifications/resources/updated"
2.3 传输层实现¶
Python
import asyncio # Python标准异步库
from typing import AsyncIterator
import sys
class MCPTransport(ABC): # ABC抽象基类,定义接口规范
"""MCP传输层抽象基类"""
@abstractmethod # @abstractmethod子类必须实现此方法
async def read(self) -> AsyncIterator[str]: # async def定义协程函数
"""读取消息流"""
pass
@abstractmethod
async def write(self, message: str) -> None:
"""写入消息"""
pass
@abstractmethod
async def close(self) -> None:
"""关闭传输"""
pass
class StdioTransport(MCPTransport):
"""标准输入输出传输"""
def __init__(self, stdin=None, stdout=None):
self.stdin = stdin or sys.stdin
self.stdout = stdout or sys.stdout
self._closed = False
async def read(self) -> AsyncIterator[str]:
"""从stdin读取JSON-RPC消息"""
while not self._closed:
try: # try/except捕获异常,防止程序崩溃
line = await asyncio.get_event_loop().run_in_executor( # await等待异步操作完成
None, self.stdin.readline
)
if not line:
break
line = line.strip()
if line:
yield line # yield产出值,函数变为生成器
except Exception as e:
print(f"Read error: {e}", file=sys.stderr)
break
async def write(self, message: str) -> None:
"""向stdout写入消息"""
if not self._closed:
self.stdout.write(message + "\n")
self.stdout.flush()
async def close(self) -> None:
"""关闭传输"""
self._closed = True
class SSETransport(MCPTransport):
"""Server-Sent Events传输"""
def __init__(self, endpoint_url: str):
self.endpoint_url = endpoint_url
self.session = None
self._closed = False
async def connect(self):
"""建立SSE连接"""
import aiohttp
self.session = aiohttp.ClientSession()
self.response = await self.session.get(
self.endpoint_url,
headers={"Accept": "text/event-stream"}
)
async def read(self) -> AsyncIterator[str]:
"""读取SSE事件"""
if not self.response:
raise RuntimeError("Not connected")
async for line in self.response.content:
line = line.decode("utf-8").strip()
if line.startswith("data: "):
data = line[6:]
if data:
yield data
async def write(self, message: str) -> None:
"""通过POST发送消息"""
if not self.session:
raise RuntimeError("Not connected")
async with self.session.post( # async with异步上下文管理器
self.endpoint_url,
json=json.loads(message) # json.loads将JSON字符串→Python对象
) as response:
await response.text()
async def close(self) -> None:
"""关闭连接"""
self._closed = True
if self.response:
self.response.close()
if self.session:
await self.session.close()
class WebSocketTransport(MCPTransport):
"""WebSocket传输"""
def __init__(self, ws_url: str):
self.ws_url = ws_url
self.websocket = None
self._closed = False
async def connect(self):
"""建立WebSocket连接"""
import websockets
self.websocket = await websockets.connect(self.ws_url)
async def read(self) -> AsyncIterator[str]:
"""读取WebSocket消息"""
if not self.websocket:
raise RuntimeError("Not connected")
async for message in self.websocket:
if self._closed:
break
yield message
async def write(self, message: str) -> None:
"""发送WebSocket消息"""
if not self.websocket:
raise RuntimeError("Not connected")
await self.websocket.send(message)
async def close(self) -> None:
"""关闭WebSocket"""
self._closed = True
if self.websocket:
await self.websocket.close()
3. MCP服务器实现¶
3.1 基础服务器框架¶
Python
class MCPServer:
"""MCP服务器实现"""
def __init__(
self,
name: str,
version: str,
capabilities: MCPCapability = None
):
self.name = name
self.version = version
self.capabilities = capabilities or MCPCapability()
# 注册的工具
self._tools: Dict[str, MCPTool] = {}
self._tool_handlers: Dict[str, Callable] = {}
# 注册的资源
self._resources: Dict[str, MCPResource] = {}
self._resource_handlers: Dict[str, Callable] = {}
# 提示模板
self._prompts: Dict[str, Dict] = {}
# 客户端信息
self._client_info: Optional[Dict] = None
self._client_capabilities: Optional[Dict] = None
# 传输层
self._transport: Optional[MCPTransport] = None
def register_tool(
self,
name: str,
description: str,
input_schema: Dict,
handler: Callable,
output_schema: Dict = None,
annotations: Dict = None
):
"""注册工具"""
tool = MCPTool(
name=name,
description=description,
input_schema=input_schema,
output_schema=output_schema,
annotations=annotations
)
self._tools[name] = tool
self._tool_handlers[name] = handler
self.capabilities.tools = True
def register_resource(
self,
uri: str,
name: str,
description: str,
handler: Callable,
mime_type: str = None
):
"""注册资源"""
resource = MCPResource(
uri=uri,
name=name,
description=description,
mime_type=mime_type
)
self._resources[uri] = resource
self._resource_handlers[uri] = handler
self.capabilities.resources = True
def register_prompt(
self,
name: str,
description: str,
template: str,
arguments: List[Dict] = None
):
"""注册提示模板"""
self._prompts[name] = {
"name": name,
"description": description,
"template": template,
"arguments": arguments or []
}
self.capabilities.prompts = True
async def handle_request(self, request: MCPRequest) -> MCPResponse:
"""处理请求"""
method = request.method
params = request.params
try:
if method == MCPMethods.INITIALIZE:
return await self._handle_initialize(params, request.id)
elif method == MCPMethods.TOOLS_LIST:
return await self._handle_tools_list(request.id)
elif method == MCPMethods.TOOLS_CALL:
return await self._handle_tools_call(params, request.id)
elif method == MCPMethods.RESOURCES_LIST:
return await self._handle_resources_list(request.id)
elif method == MCPMethods.RESOURCES_READ:
return await self._handle_resources_read(params, request.id)
elif method == MCPMethods.PROMPTS_LIST:
return await self._handle_prompts_list(request.id)
elif method == MCPMethods.PROMPTS_GET:
return await self._handle_prompts_get(params, request.id)
elif method == MCPMethods.PING:
return MCPResponse.success(request.id, {})
else:
return MCPResponse.error(
request.id,
code=-32601,
message=f"Method not found: {method}"
)
except Exception as e:
return MCPResponse.error(
request.id,
code=-32603,
message=str(e)
)
async def _handle_initialize(
self,
params: Dict,
request_id: str
) -> MCPResponse:
"""处理初始化请求"""
self._client_info = params.get("clientInfo")
self._client_capabilities = params.get("capabilities", {})
protocol_version = params.get("protocolVersion", "2024-11-05")
result = {
"protocolVersion": protocol_version,
"capabilities": self.capabilities.to_dict(),
"serverInfo": {
"name": self.name,
"version": self.version
}
}
return MCPResponse.success(request_id, result)
async def _handle_tools_list(self, request_id: str) -> MCPResponse:
"""处理工具列表请求"""
tools = [tool.to_dict() for tool in self._tools.values()]
return MCPResponse.success(request_id, {"tools": tools})
async def _handle_tools_call(
self,
params: Dict,
request_id: str
) -> MCPResponse:
"""处理工具调用请求"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in self._tools:
return MCPResponse.error(
request_id,
code=-32602,
message=f"Tool not found: {tool_name}"
)
handler = self._tool_handlers[tool_name]
try:
result = await handler(**arguments) # **arguments将字典解包为关键字参数,如{"city":"北京"}变为handler(city="北京")
return MCPResponse.success(request_id, {
"content": [{"type": "text", "text": str(result)}]
})
except Exception as e:
return MCPResponse.error(
request_id,
code=-32603,
message=f"Tool execution failed: {str(e)}"
)
async def _handle_resources_list(self, request_id: str) -> MCPResponse:
"""处理资源列表请求"""
resources = [res.to_dict() for res in self._resources.values()]
return MCPResponse.success(request_id, {"resources": resources})
async def _handle_resources_read(
self,
params: Dict,
request_id: str
) -> MCPResponse:
"""处理资源读取请求"""
uri = params.get("uri")
if uri not in self._resources:
return MCPResponse.error(
request_id,
code=-32602,
message=f"Resource not found: {uri}"
)
handler = self._resource_handlers[uri]
try:
content = await handler()
return MCPResponse.success(request_id, {
"contents": [{
"uri": uri,
"mimeType": self._resources[uri].mime_type,
"text": content
}]
})
except Exception as e:
return MCPResponse.error(
request_id,
code=-32603,
message=f"Resource read failed: {str(e)}"
)
async def _handle_prompts_list(self, request_id: str) -> MCPResponse:
"""处理提示列表请求"""
prompts = list(self._prompts.values())
return MCPResponse.success(request_id, {"prompts": prompts})
async def _handle_prompts_get(
self,
params: Dict,
request_id: str
) -> MCPResponse:
"""处理提示获取请求"""
name = params.get("name")
arguments = params.get("arguments", {})
if name not in self._prompts:
return MCPResponse.error(
request_id,
code=-32602,
message=f"Prompt not found: {name}"
)
prompt_def = self._prompts[name]
template = prompt_def["template"]
# 填充模板
try:
filled = template.format(**arguments)
except KeyError as e:
return MCPResponse.error(
request_id,
code=-32602,
message=f"Missing argument: {e}"
)
return MCPResponse.success(request_id, {
"description": prompt_def["description"],
"messages": [{
"role": "user",
"content": {"type": "text", "text": filled}
}]
})
async def run(self, transport: MCPTransport):
"""运行服务器"""
self._transport = transport
async for message_str in transport.read():
try:
message = json.loads(message_str)
# 处理请求
if "method" in message and "id" in message:
request = MCPRequest(
method=message["method"],
params=message.get("params", {}),
request_id=message["id"]
)
response = await self.handle_request(request)
await transport.write(json.dumps(response.to_dict())) # json.dumps将Python对象→JSON字符串
# 处理通知
elif "method" in message:
# 处理通知(无需响应)
pass
except json.JSONDecodeError as e:
error_response = MCPResponse.error(
None,
code=-32700,
message=f"Parse error: {str(e)}"
)
await transport.write(json.dumps(error_response.to_dict()))
3.2 示例:文件系统MCP服务器¶
Python
class FileSystemMCPServer(MCPServer):
"""文件系统MCP服务器示例"""
def __init__(self, root_path: str = "."):
super().__init__(
name="filesystem-server",
version="1.0.0",
capabilities=MCPCapability(tools=True, resources=True)
)
self.root_path = Path(root_path).resolve()
self._setup_tools()
self._setup_resources()
def _setup_tools(self):
"""设置文件系统工具"""
# 读取文件工具
self.register_tool(
name="read_file",
description="读取文件内容",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件路径(相对root)"
}
},
"required": ["path"]
},
handler=self._read_file
)
# 写入文件工具
self.register_tool(
name="write_file",
description="写入文件内容",
input_schema={
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
},
handler=self._write_file,
annotations={
"destructive": ["write"],
"idempotent": False
}
)
# 列出目录工具
self.register_tool(
name="list_directory",
description="列出目录内容",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "目录路径(相对root)"
}
},
"required": ["path"]
},
handler=self._list_directory
)
# 搜索文件工具
self.register_tool(
name="search_files",
description="搜索文件",
input_schema={
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "搜索模式(glob)"
}
},
"required": ["pattern"]
},
handler=self._search_files
)
# 获取文件信息工具
self.register_tool(
name="get_file_info",
description="获取文件元数据",
input_schema={
"type": "object",
"properties": {
"path": {"type": "string"}
},
"required": ["path"]
},
handler=self._get_file_info
)
def _setup_resources(self):
"""设置文件系统资源"""
# 根目录资源
self.register_resource(
uri="file:///root",
name="Root Directory",
description="文件系统根目录",
handler=self._read_root_resource,
mime_type="application/json"
)
def _resolve_path(self, path: str) -> Path:
"""解析并验证路径"""
resolved = (self.root_path / path).resolve()
# 安全检查:确保路径在root_path内
try:
resolved.relative_to(self.root_path)
except ValueError:
raise PermissionError(f"Access denied: {path}")
return resolved
async def _read_file(self, path: str) -> str:
"""读取文件"""
file_path = self._resolve_path(path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {path}")
if not file_path.is_file():
raise ValueError(f"Not a file: {path}")
return file_path.read_text(encoding="utf-8")
async def _write_file(self, path: str, content: str) -> str:
"""写入文件"""
file_path = self._resolve_path(path)
# 确保父目录存在
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
return f"Successfully wrote to {path}"
async def _list_directory(self, path: str) -> str:
"""列出目录"""
dir_path = self._resolve_path(path)
if not dir_path.exists():
raise FileNotFoundError(f"Directory not found: {path}")
if not dir_path.is_dir():
raise ValueError(f"Not a directory: {path}")
entries = []
for entry in dir_path.iterdir():
entry_type = "directory" if entry.is_dir() else "file"
entries.append({
"name": entry.name,
"type": entry_type,
"size": entry.stat().st_size if entry.is_file() else None
})
return json.dumps(entries, indent=2)
async def _search_files(self, pattern: str) -> str:
"""搜索文件"""
matches = list(self.root_path.glob(pattern))
results = []
for match in matches:
try:
# 确保在root_path内
match.relative_to(self.root_path)
results.append(str(match.relative_to(self.root_path)))
except ValueError:
continue
return json.dumps(results, indent=2)
async def _get_file_info(self, path: str) -> str:
"""获取文件信息"""
file_path = self._resolve_path(path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {path}")
stat = file_path.stat()
info = {
"name": file_path.name,
"path": str(file_path.relative_to(self.root_path)),
"type": "directory" if file_path.is_dir() else "file",
"size": stat.st_size,
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"permissions": oct(stat.st_mode)[-3:]
}
return json.dumps(info, indent=2)
async def _read_root_resource(self) -> str:
"""读取根目录资源"""
entries = []
for entry in self.root_path.iterdir():
entries.append({
"name": entry.name,
"type": "directory" if entry.is_dir() else "file"
})
return json.dumps({
"root": str(self.root_path),
"entries": entries
}, indent=2)
# 运行服务器
async def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description="File System MCP Server")
parser.add_argument("--root", default=".", help="Root directory path")
args = parser.parse_args()
server = FileSystemMCPServer(root_path=args.root)
transport = StdioTransport()
print(f"File System MCP Server starting...", file=sys.stderr)
print(f"Root path: {server.root_path}", file=sys.stderr)
await server.run(transport)
if __name__ == "__main__":
asyncio.run(main()) # 创建事件循环运行顶层协程
4. MCP客户端集成¶
4.1 客户端实现¶
Python
class MCPClient:
"""MCP客户端实现"""
def __init__(self, transport: MCPTransport):
self.transport = transport
self._pending_requests: Dict[str, asyncio.Future] = {}
self._server_capabilities: Optional[Dict] = None
self._server_info: Optional[Dict] = None
self._initialized = False
self._request_counter = 0
async def initialize(
self,
client_name: str,
client_version: str,
capabilities: Dict = None
) -> Dict:
"""初始化连接"""
request = MCPRequest(
method=MCPMethods.INITIALIZE,
params={
"protocolVersion": "2024-11-05",
"capabilities": capabilities or {},
"clientInfo": {
"name": client_name,
"version": client_version
}
}
)
response = await self._send_request(request)
if response.error:
raise MCPError(f"Initialization failed: {response.error}")
self._server_capabilities = response.result.get("capabilities", {})
self._server_info = response.result.get("serverInfo", {})
self._initialized = True
# 发送initialized通知
await self._send_notification(MCPMethods.INITIALIZED, {})
return response.result
async def list_tools(self) -> List[MCPTool]:
"""获取可用工具列表"""
if not self._initialized:
raise RuntimeError("Client not initialized")
request = MCPRequest(method=MCPMethods.TOOLS_LIST)
response = await self._send_request(request)
if response.error:
raise MCPError(f"Failed to list tools: {response.error}")
tools_data = response.result.get("tools", [])
return [
MCPTool(
name=t["name"],
description=t["description"],
input_schema=t["inputSchema"],
output_schema=t.get("outputSchema"),
annotations=t.get("annotations")
)
for t in tools_data
]
async def call_tool(
self,
tool_name: str,
arguments: Dict
) -> Dict:
"""调用工具"""
if not self._initialized:
raise RuntimeError("Client not initialized")
request = MCPRequest(
method=MCPMethods.TOOLS_CALL,
params={
"name": tool_name,
"arguments": arguments
}
)
response = await self._send_request(request)
if response.error:
raise MCPError(f"Tool call failed: {response.error}")
return response.result
async def list_resources(self) -> List[MCPResource]:
"""获取可用资源列表"""
if not self._initialized:
raise RuntimeError("Client not initialized")
request = MCPRequest(method=MCPMethods.RESOURCES_LIST)
response = await self._send_request(request)
if response.error:
raise MCPError(f"Failed to list resources: {response.error}")
resources_data = response.result.get("resources", [])
return [
MCPResource(
uri=r["uri"],
name=r["name"],
description=r["description"],
mime_type=r.get("mimeType")
)
for r in resources_data
]
async def read_resource(self, uri: str) -> Dict:
"""读取资源"""
if not self._initialized:
raise RuntimeError("Client not initialized")
request = MCPRequest(
method=MCPMethods.RESOURCES_READ,
params={"uri": uri}
)
response = await self._send_request(request)
if response.error:
raise MCPError(f"Resource read failed: {response.error}")
return response.result
async def _send_request(self, request: MCPRequest) -> MCPResponse:
"""发送请求并等待响应"""
future = asyncio.get_event_loop().create_future()
self._pending_requests[request.id] = future
try:
await self.transport.write(json.dumps(request.to_dict()))
# 等待响应(带超时)
response_data = await asyncio.wait_for(future, timeout=30.0)
return MCPResponse(
request_id=response_data.get("id"),
result=response_data.get("result"),
error=response_data.get("error")
)
except asyncio.TimeoutError:
raise MCPError(f"Request timeout: {request.method}")
finally:
self._pending_requests.pop(request.id, None)
async def _send_notification(self, method: str, params: Dict):
"""发送通知"""
notification = MCPNotification(method=method, params=params)
await self.transport.write(json.dumps(notification.to_dict()))
async def _handle_incoming(self):
"""处理传入消息"""
async for message_str in self.transport.read():
try:
message = json.loads(message_str)
# 处理响应
if "id" in message and "result" in message:
request_id = message["id"]
if request_id in self._pending_requests:
future = self._pending_requests.pop(request_id)
if not future.done():
future.set_result(message)
# 处理错误响应
elif "id" in message and "error" in message:
request_id = message["id"]
if request_id in self._pending_requests:
future = self._pending_requests.pop(request_id)
if not future.done():
future.set_result(message)
# 处理服务器通知
elif "method" in message:
await self._handle_notification(message)
except json.JSONDecodeError:
print(f"Failed to parse message: {message_str}", file=sys.stderr)
async def _handle_notification(self, notification: Dict):
"""处理服务器通知"""
method = notification.get("method")
params = notification.get("params", {})
if method == MCPMethods.NOTIFICATIONS_PROGRESS:
# 处理进度通知
print(f"Progress: {params.get('message', '')}")
elif method == MCPMethods.NOTIFICATIONS_MESSAGE:
# 处理消息通知
level = params.get("level", "info")
message = params.get("message", "")
print(f"[{level.upper()}] {message}")
async def close(self):
"""关闭客户端"""
await self.transport.close()
4.2 与LLM集成¶
Python
class MCPEnhancedLLM:
"""集成MCP的增强型LLM"""
def __init__(self, llm_client, mcp_client: MCPClient):
self.llm = llm_client
self.mcp = mcp_client
self._tools: List[MCPTool] = []
async def initialize(self):
"""初始化并获取工具"""
await self.mcp.initialize(
client_name="mcp-enhanced-llm",
client_version="1.0.0"
)
self._tools = await self.mcp.list_tools()
async def chat(self, user_message: str) -> str:
"""对话并可能调用工具"""
# 构建系统提示
system_prompt = self._build_system_prompt()
# 第一次LLM调用
response = await self.llm.generate(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
tools=self._format_tools_for_llm()
)
# 检查是否需要调用工具
if response.tool_calls:
# 执行工具调用
tool_results = await self._execute_tool_calls(response.tool_calls)
# 第二次LLM调用(带工具结果)
final_response = await self.llm.generate(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
{"role": "assistant", "content": response.content},
*self._format_tool_results(tool_results)
]
)
return final_response.content
return response.content
def _build_system_prompt(self) -> str:
"""构建系统提示"""
tools_desc = "\n".join([
f"- {tool.name}: {tool.description}"
for tool in self._tools
])
return f"""You are an AI assistant with access to external tools.
Available tools:
{tools_desc}
When you need to use a tool, respond with a tool call in the appropriate format.
After receiving tool results, incorporate them into your response."""
def _format_tools_for_llm(self) -> List[Dict]:
"""将MCP工具格式化为LLM工具格式"""
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.input_schema
}
}
for tool in self._tools
]
async def _execute_tool_calls(self, tool_calls: List[Dict]) -> List[Dict]:
"""执行工具调用"""
results = []
for call in tool_calls:
tool_name = call["function"]["name"]
arguments = json.loads(call["function"]["arguments"])
try:
result = await self.mcp.call_tool(tool_name, arguments)
results.append({
"tool_call_id": call["id"],
"role": "tool",
"name": tool_name,
"content": json.dumps(result)
})
except Exception as e:
results.append({
"tool_call_id": call["id"],
"role": "tool",
"name": tool_name,
"content": f"Error: {str(e)}"
})
return results
def _format_tool_results(self, results: List[Dict]) -> List[Dict]:
"""格式化工具结果为消息"""
return [
{
"role": "tool",
"tool_call_id": r["tool_call_id"],
"content": r["content"]
}
for r in results
]
5. 工具生态系统¶
5.1 官方工具服务器¶
Text Only
┌─────────────────────────────────────────────────────────────────┐
│ MCP官方工具服务器 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 开发工具 │
│ ├── @modelcontextprotocol/server-github # GitHub集成 │
│ ├── @modelcontextprotocol/server-git # Git操作 │
│ └── @modelcontextprotocol/server-postgres # PostgreSQL │
│ │
│ 搜索与信息 │
│ ├── @modelcontextprotocol/server-brave-search # Brave搜索 │
│ ├── @modelcontextprotocol/server-fetch # HTTP请求 │
│ └── @modelcontextprotocol/server-puppeteer # 浏览器自动化 │
│ │
│ 文件与存储 │
│ ├── @modelcontextprotocol/server-filesystem # 文件系统 │
│ ├── @modelcontextprotocol/server-s3 # AWS S3 │
│ └── @modelcontextprotocol/server-sqlite # SQLite │
│ │
│ 通信与协作 │
│ ├── @modelcontextprotocol/server-slack # Slack │
│ ├── @modelcontextprotocol/server-telegram # Telegram │
│ └── @modelcontextprotocol/server-discord # Discord │
│ │
│ 商业与支付 │
│ ├── @modelcontextprotocol/server-stripe # Stripe支付 │
│ └── @modelcontextprotocol/server-shopify # Shopify │
│ │
└─────────────────────────────────────────────────────────────────┘
5.2 社区工具生态¶
Python
class MCPToolRegistry:
"""MCP工具注册表"""
def __init__(self):
self._servers: Dict[str, ServerInfo] = {}
def register(self, server_info: ServerInfo):
"""注册服务器"""
self._servers[server_info.id] = server_info
def search(self, query: str, category: str = None) -> List[ServerInfo]:
"""搜索工具服务器"""
results = []
for server in self._servers.values():
if category and server.category != category:
continue
# 简单匹配
if (query.lower() in server.name.lower() or
query.lower() in server.description.lower() or
any(query.lower() in tag.lower() for tag in server.tags)): # any()任一为True则返回True
results.append(server)
return results
def get_install_command(self, server_id: str) -> str:
"""获取安装命令"""
server = self._servers.get(server_id)
if not server:
raise ValueError(f"Server not found: {server_id}")
if server.package_manager == "npm":
return f"npx {server.package_name}"
elif server.package_manager == "pip":
return f"pip install {server.package_name}"
elif server.package_manager == "docker":
return f"docker run {server.docker_image}"
else:
return server.install_command
# 示例:社区工具服务器信息
COMMUNITY_SERVERS = [
ServerInfo(
id="notion",
name="Notion MCP Server",
description="Read and write Notion pages and databases",
category="productivity",
tags=["notion", "wiki", "documentation"],
package_manager="npm",
package_name="@suekou/mcp-notion-server",
author="suekou",
github_url="https://github.com/suekou/mcp-notion-server"
),
ServerInfo(
id="obsidian",
name="Obsidian MCP Server",
description="Interact with Obsidian vaults",
category="productivity",
tags=["obsidian", "notes", "markdown"],
package_manager="npm",
package_name="@mcp-get/obsidian-mcp-server",
author="community",
github_url="https://github.com/calclavia/mcp-obsidian"
),
ServerInfo(
id="browserbase",
name="Browserbase MCP Server",
description="Cloud browser automation",
category="automation",
tags=["browser", "automation", "scraping"],
package_manager="npm",
package_name="@browserbasehq/mcp-server-browserbase",
author="Browserbase",
github_url="https://github.com/browserbase/mcp-server-browserbase"
),
ServerInfo(
id="supabase",
name="Supabase MCP Server",
description="Manage Supabase databases and auth",
category="database",
tags=["supabase", "postgresql", "backend"],
package_manager="npm",
package_name="@supabase/mcp-server-supabase",
author="Supabase",
github_url="https://github.com/supabase/supabase-mcp"
),
ServerInfo(
id="cloudflare",
name="Cloudflare MCP Server",
description="Manage Cloudflare workers and resources",
category="cloud",
tags=["cloudflare", "cdn", "serverless"],
package_manager="npm",
package_name="@cloudflare/mcp-server-cloudflare",
author="Cloudflare",
github_url="https://github.com/cloudflare/mcp-server-cloudflare"
),
ServerInfo(
id="vercel",
name="Vercel MCP Server",
description="Deploy and manage Vercel projects",
category="cloud",
tags=["vercel", "deployment", "nextjs"],
package_manager="npm",
package_name="@vercel/mcp-server-vercel",
author="Vercel",
github_url="https://github.com/vercel/mcp-server-vercel"
)
]
5.3 工具组合与编排¶
Python
class MCPToolComposer:
"""MCP工具组合器"""
def __init__(self):
self._clients: Dict[str, MCPClient] = {}
self._workflows: Dict[str, Workflow] = {}
async def add_server(self, name: str, transport: MCPTransport):
"""添加MCP服务器"""
client = MCPClient(transport)
await client.initialize(
client_name="tool-composer",
client_version="1.0.0"
)
self._clients[name] = client
def define_workflow(self, name: str, steps: List[WorkflowStep]):
"""定义工作流"""
self._workflows[name] = Workflow(name=name, steps=steps)
async def execute_workflow(
self,
workflow_name: str,
inputs: Dict
) -> WorkflowResult:
"""执行工作流"""
workflow = self._workflows.get(workflow_name)
if not workflow:
raise ValueError(f"Workflow not found: {workflow_name}")
context = WorkflowContext(inputs=inputs)
for step in workflow.steps:
# 解析工具引用
server_name, tool_name = step.tool_ref.split(".")
client = self._clients.get(server_name)
if not client:
raise ValueError(f"Server not found: {server_name}")
# 准备参数(支持模板)
arguments = self._resolve_arguments(step.arguments, context)
# 执行工具
result = await client.call_tool(tool_name, arguments)
# 保存结果到上下文
context.set_step_result(step.name, result)
return WorkflowResult(
outputs=context.outputs,
execution_log=context.log
)
def _resolve_arguments(
self,
arguments: Dict,
context: WorkflowContext
) -> Dict:
"""解析参数模板"""
resolved = {}
for key, value in arguments.items():
if isinstance(value, str) and value.startswith("${"): # isinstance检查类型
# 模板引用
ref_path = value[2:-1] # ${step.result.field}
resolved[key] = context.resolve_reference(ref_path)
else:
resolved[key] = value
return resolved
# 示例:数据分析工作流
async def data_analysis_workflow_example():
"""数据分析工作流示例"""
composer = MCPToolComposer()
# 添加服务器
await composer.add_server(
"filesystem",
StdioTransport()
)
await composer.add_server(
"postgres",
StdioTransport()
)
await composer.add_server(
"slack",
StdioTransport()
)
# 定义工作流
composer.define_workflow(
name="daily_report",
steps=[
WorkflowStep(
name="query_data",
tool_ref="postgres.query",
arguments={
"sql": "SELECT * FROM sales WHERE date = CURRENT_DATE"
}
),
WorkflowStep(
name="generate_report",
tool_ref="filesystem.write_file",
arguments={
"path": "/reports/daily_sales.md",
"content": "${query_data.result}"
}
),
WorkflowStep(
name="notify_team",
tool_ref="slack.send_message",
arguments={
"channel": "#sales",
"message": "Daily report generated: ${generate_report.path}"
}
)
]
)
# 执行工作流
result = await composer.execute_workflow(
"daily_report",
inputs={}
)
return result
6. 安全与权限管理¶
6.1 安全模型¶
Python
class MCPSecurityManager:
"""MCP安全管理器"""
def __init__(self):
self._permissions: Dict[str, PermissionSet] = {}
self._audit_log: List[AuditEntry] = []
def grant_permission(
self,
client_id: str,
resource: str,
actions: List[str]
):
"""授予权限"""
if client_id not in self._permissions:
self._permissions[client_id] = PermissionSet()
self._permissions[client_id].grant(resource, actions)
def check_permission(
self,
client_id: str,
resource: str,
action: str
) -> bool:
"""检查权限"""
perm_set = self._permissions.get(client_id)
if not perm_set:
return False
return perm_set.check(resource, action)
def log_access(
self,
client_id: str,
resource: str,
action: str,
result: str
):
"""记录访问日志"""
entry = AuditEntry(
timestamp=datetime.now(),
client_id=client_id,
resource=resource,
action=action,
result=result
)
self._audit_log.append(entry)
class PermissionSet:
"""权限集合"""
def __init__(self):
self._grants: Dict[str, List[str]] = {}
def grant(self, resource: str, actions: List[str]):
"""授予资源权限"""
if resource not in self._grants:
self._grants[resource] = []
self._grants[resource].extend(actions)
def check(self, resource: str, action: str) -> bool:
"""检查是否有权限"""
# 支持通配符
for granted_resource, granted_actions in self._grants.items():
if self._match_resource(granted_resource, resource):
if "*" in granted_actions or action in granted_actions:
return True
return False
def _match_resource(self, pattern: str, resource: str) -> bool:
"""匹配资源模式"""
import fnmatch
return fnmatch.fnmatch(resource, pattern)
# 安全配置示例
SECURITY_CONFIG = {
"clients": {
"claude-desktop": {
"permissions": [
{
"resource": "filesystem:///home/user/projects/*",
"actions": ["read", "write"]
},
{
"resource": "filesystem:///etc/*",
"actions": [] # 禁止访问
},
{
"resource": "github://*",
"actions": ["read", "write"]
}
]
}
},
"rate_limits": {
"default": {
"requests_per_minute": 60,
"requests_per_hour": 1000
}
},
"audit": {
"log_all_requests": True,
"retention_days": 30
}
}
6.2 安全最佳实践¶
Python
class MCPSecurityBestPractices:
"""MCP安全最佳实践"""
@staticmethod # @staticmethod无需实例即可调用
def principle_of_least_privilege():
"""
最小权限原则
1. 只授予必要的权限
2. 使用资源路径限制
3. 定期审查权限
"""
# 好的实践:限制到特定目录
good_permission = {
"resource": "filesystem:///home/user/project/*",
"actions": ["read", "write"]
}
# 不好的实践:过于宽泛
bad_permission = {
"resource": "filesystem:///*",
"actions": ["*"]
}
@staticmethod
def input_validation():
"""
输入验证
1. 验证路径合法性
2. 检查参数类型
3. 防止注入攻击
"""
class InputValidator:
@staticmethod
def validate_path(path: str, allowed_prefix: str) -> bool:
"""验证路径是否在允许范围内"""
import os
# 规范化路径
normalized = os.path.normpath(path)
# 检查路径遍历
if ".." in normalized:
return False
# 检查前缀
return normalized.startswith(allowed_prefix)
@staticmethod
def sanitize_sql(query: str) -> str:
"""简单的SQL注入防护"""
# 实际应用中应使用参数化查询
dangerous = [";", "--", "/*", "*/", "xp_"]
for d in dangerous:
if d in query.lower():
raise ValueError(f"Potentially dangerous SQL pattern: {d}")
return query
@staticmethod
def audit_logging():
"""
审计日志
1. 记录所有工具调用
2. 记录权限检查
3. 保留足够时间
"""
class AuditLogger:
def __init__(self, storage):
self.storage = storage
def log_tool_call(
self,
client_id: str,
tool_name: str,
arguments: Dict,
result: Any,
duration_ms: int
):
"""记录工具调用"""
entry = {
"timestamp": datetime.now().isoformat(),
"type": "tool_call",
"client_id": client_id,
"tool_name": tool_name,
"arguments_hash": self._hash_arguments(arguments),
"success": result is not None,
"duration_ms": duration_ms
}
self.storage.store(entry)
def _hash_arguments(self, arguments: Dict) -> str:
"""哈希参数(保护敏感信息)"""
import hashlib
json_str = json.dumps(arguments, sort_keys=True)
return hashlib.sha256(json_str.encode()).hexdigest()[:16]
@staticmethod
def secure_transport():
"""
安全传输
1. 使用TLS加密
2. 验证服务器身份
3. 防止中间人攻击
"""
# SSE over HTTPS
secure_sse_config = {
"endpoint": "https://mcp-server.example.com/sse",
"verify_ssl": True,
"client_cert": "/path/to/client.crt",
"client_key": "/path/to/client.key"
}
# WebSocket over WSS
secure_ws_config = {
"url": "wss://mcp-server.example.com/ws",
"ssl_context": "custom_ssl_context"
}
7. 实践项目:构建MCP服务器¶
7.1 项目概述¶
构建一个天气查询MCP服务器,提供以下功能: 1. 查询当前天气 2. 查询天气预报 3. 查询历史天气数据
7.2 完整实现¶
Python
#!/usr/bin/env python3
"""
Weather MCP Server
提供天气查询功能的MCP服务器
"""
import asyncio
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass
import aiohttp
@dataclass
class WeatherConfig:
"""天气服务配置"""
api_key: str
base_url: str = "https://api.weatherapi.com/v1"
default_location: str = "Beijing"
class WeatherMCPServer:
"""天气MCP服务器"""
def __init__(self, config: WeatherConfig):
self.config = config
self.name = "weather-server"
self.version = "1.0.0"
# 存储工具和资源
self.tools: Dict[str, Dict] = {}
self.tool_handlers: Dict[str, callable] = {}
self.resources: Dict[str, Dict] = {}
self.resource_handlers: Dict[str, callable] = {}
self._setup_tools()
self._setup_resources()
def _setup_tools(self):
"""设置天气工具"""
# 当前天气工具
self.tools["get_current_weather"] = {
"name": "get_current_weather",
"description": "获取指定位置的当前天气信息",
"inputSchema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称或坐标(如:Beijing, 39.9,116.4)"
},
"units": {
"type": "string",
"enum": ["metric", "imperial"],
"description": "温度单位",
"default": "metric"
}
},
"required": ["location"]
}
}
self.tool_handlers["get_current_weather"] = self._handle_current_weather
# 天气预报工具
self.tools["get_forecast"] = {
"name": "get_forecast",
"description": "获取未来几天的天气预报",
"inputSchema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称或坐标"
},
"days": {
"type": "integer",
"description": "预报天数(1-10)",
"minimum": 1,
"maximum": 10,
"default": 3
}
},
"required": ["location"]
}
}
self.tool_handlers["get_forecast"] = self._handle_forecast
# 空气质量工具
self.tools["get_air_quality"] = {
"name": "get_air_quality",
"description": "获取指定位置的空气质量指数",
"inputSchema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称或坐标"
}
},
"required": ["location"]
}
}
self.tool_handlers["get_air_quality"] = self._handle_air_quality
def _setup_resources(self):
"""设置天气资源"""
# 天气图标资源
self.resources["weather://icons"] = {
"uri": "weather://icons",
"name": "Weather Icons",
"description": "天气状况图标集合",
"mimeType": "application/json"
}
self.resource_handlers["weather://icons"] = self._handle_icons_resource
# 支持的城市列表
self.resources["weather://cities"] = {
"uri": "weather://cities",
"name": "Supported Cities",
"description": "支持查询天气的城市列表",
"mimeType": "application/json"
}
self.resource_handlers["weather://cities"] = self._handle_cities_resource
async def _handle_current_weather(self, location: str, units: str = "metric") -> str:
"""处理当前天气查询"""
url = f"{self.config.base_url}/current.json"
params = {
"key": self.config.api_key,
"q": location,
"aqi": "yes"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status != 200:
error_data = await response.json()
raise Exception(f"API error: {error_data.get('error', {}).get('message', 'Unknown error')}")
data = await response.json()
current = data["current"]
location_info = data["location"]
# 格式化输出
temp_unit = "°C" if units == "metric" else "°F"
temp = current["temp_c"] if units == "metric" else current["temp_f"]
feels_like = current["feelslike_c"] if units == "metric" else current["feelslike_f"]
result = {
"location": f"{location_info['name']}, {location_info['country']}",
"local_time": location_info["localtime"],
"temperature": f"{temp}{temp_unit}",
"feels_like": f"{feels_like}{temp_unit}",
"condition": current["condition"]["text"],
"humidity": f"{current['humidity']}%",
"wind": f"{current['wind_kph']} km/h {current['wind_dir']}",
"uv_index": current["uv"],
"visibility": f"{current['vis_km']} km",
"air_quality": {
"pm2_5": current.get("air_quality", {}).get("pm2_5", "N/A"), # 链式get:第一个get返回空字典防KeyError,第二个get从子字典安全取值
"pm10": current.get("air_quality", {}).get("pm10", "N/A"),
"us_epa_index": current.get("air_quality", {}).get("us-epa-index", "N/A")
}
}
return json.dumps(result, indent=2, ensure_ascii=False)
async def _handle_forecast(self, location: str, days: int = 3) -> str:
"""处理天气预报查询"""
url = f"{self.config.base_url}/forecast.json"
params = {
"key": self.config.api_key,
"q": location,
"days": days,
"aqi": "yes",
"alerts": "yes"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status != 200:
error_data = await response.json()
raise Exception(f"API error: {error_data.get('error', {}).get('message', 'Unknown error')}")
data = await response.json()
location_info = data["location"]
forecast_days = data["forecast"]["forecastday"]
forecast_list = []
for day in forecast_days:
day_data = day["day"]
forecast_list.append({
"date": day["date"],
"condition": day_data["condition"]["text"],
"max_temp_c": day_data["maxtemp_c"],
"min_temp_c": day_data["mintemp_c"],
"avg_temp_c": day_data["avgtemp_c"],
"max_wind_kph": day_data["maxwind_kph"],
"avg_humidity": day_data["avghumidity"],
"chance_of_rain": day_data["daily_chance_of_rain"],
"chance_of_snow": day_data["daily_chance_of_snow"],
"uv_index": day_data["uv"]
})
result = {
"location": f"{location_info['name']}, {location_info['country']}",
"forecast": forecast_list
}
return json.dumps(result, indent=2, ensure_ascii=False)
async def _handle_air_quality(self, location: str) -> str:
"""处理空气质量查询"""
url = f"{self.config.base_url}/current.json"
params = {
"key": self.config.api_key,
"q": location,
"aqi": "yes"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status != 200:
error_data = await response.json()
raise Exception(f"API error: {error_data.get('error', {}).get('message', 'Unknown error')}")
data = await response.json()
location_info = data["location"]
aqi = data["current"].get("air_quality", {})
result = {
"location": f"{location_info['name']}, {location_info['country']}",
"local_time": location_info["localtime"],
"air_quality": {
"co": aqi.get("co", "N/A"),
"no2": aqi.get("no2", "N/A"),
"o3": aqi.get("o3", "N/A"),
"so2": aqi.get("so2", "N/A"),
"pm2_5": aqi.get("pm2_5", "N/A"),
"pm10": aqi.get("pm10", "N/A"),
"us_epa_index": aqi.get("us-epa-index", "N/A"),
"gb_defra_index": aqi.get("gb-defra-index", "N/A")
}
}
return json.dumps(result, indent=2, ensure_ascii=False)
async def _handle_icons_resource(self) -> str:
"""处理图标资源请求"""
icons = {
"sunny": "☀️",
"partly_cloudy": "⛅",
"cloudy": "☁️",
"rainy": "🌧️",
"snowy": "🌨️",
"stormy": "⛈️",
"foggy": "🌫️",
"windy": "💨"
}
return json.dumps(icons, indent=2)
async def _handle_cities_resource(self) -> str:
"""处理城市列表资源请求"""
cities = [
{"name": "Beijing", "country": "China", "lat": 39.9, "lon": 116.4},
{"name": "Shanghai", "country": "China", "lat": 31.2, "lon": 121.5},
{"name": "Tokyo", "country": "Japan", "lat": 35.7, "lon": 139.7},
{"name": "New York", "country": "USA", "lat": 40.7, "lon": -74.0},
{"name": "London", "country": "UK", "lat": 51.5, "lon": -0.1},
{"name": "Paris", "country": "France", "lat": 48.9, "lon": 2.3},
{"name": "Sydney", "country": "Australia", "lat": -33.9, "lon": 151.2}
]
return json.dumps(cities, indent=2)
async def handle_request(self, request: Dict) -> Dict:
"""处理MCP请求"""
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
try:
if method == "initialize":
return self._handle_initialize(request_id, params)
elif method == "tools/list":
return self._handle_tools_list(request_id)
elif method == "tools/call":
return await self._handle_tools_call(request_id, params)
elif method == "resources/list":
return self._handle_resources_list(request_id)
elif method == "resources/read":
return await self._handle_resources_read(request_id, params)
elif method == "ping":
return {"jsonrpc": "2.0", "id": request_id, "result": {}}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"Method not found: {method}"}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": str(e)}
}
def _handle_initialize(self, request_id: str, params: Dict) -> Dict:
"""处理初始化"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {},
"resources": {}
},
"serverInfo": {
"name": self.name,
"version": self.version
}
}
}
def _handle_tools_list(self, request_id: str) -> Dict:
"""处理工具列表"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"tools": list(self.tools.values())
}
}
async def _handle_tools_call(self, request_id: str, params: Dict) -> Dict:
"""处理工具调用"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in self.tool_handlers:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32602, "message": f"Tool not found: {tool_name}"}
}
try:
handler = self.tool_handlers[tool_name]
result = await handler(**arguments)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"content": [{"type": "text", "text": result}]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Tool execution failed: {str(e)}"}
}
def _handle_resources_list(self, request_id: str) -> Dict:
"""处理资源列表"""
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"resources": list(self.resources.values())
}
}
async def _handle_resources_read(self, request_id: str, params: Dict) -> Dict:
"""处理资源读取"""
uri = params.get("uri")
if uri not in self.resource_handlers:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32602, "message": f"Resource not found: {uri}"}
}
try:
handler = self.resource_handlers[uri]
content = await handler()
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {
"contents": [{
"uri": uri,
"mimeType": self.resources[uri].get("mimeType"),
"text": content
}]
}
}
except Exception as e:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32603, "message": f"Resource read failed: {str(e)}"}
}
async def run(self):
"""运行服务器"""
print(f"Weather MCP Server v{self.version} starting...", file=sys.stderr)
while True:
try:
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
if not line:
break
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = await self.handle_request(request)
print(json.dumps(response), flush=True)
except json.JSONDecodeError as e:
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": f"Parse error: {str(e)}"}
}
print(json.dumps(error_response), flush=True)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
async def main():
"""主函数"""
import os
# 从环境变量获取API密钥
api_key = os.environ.get("WEATHER_API_KEY")
if not api_key:
print("Error: WEATHER_API_KEY environment variable not set", file=sys.stderr)
sys.exit(1)
config = WeatherConfig(api_key=api_key)
server = WeatherMCPServer(config)
await server.run()
if __name__ == "__main__":
asyncio.run(main())
7.3 配置与使用¶
Text Only
// claude_desktop_config.json
{
"mcpServers": {
"weather": {
"command": "python",
"args": ["/path/to/weather_mcp_server.py"],
"env": {
"WEATHER_API_KEY": "your-api-key-here"
}
}
}
}
Bash
# 安装依赖
pip install aiohttp
# 设置API密钥
export WEATHER_API_KEY="your-weatherapi-key"
# 运行服务器
python weather_mcp_server.py
7.4 测试客户端¶
Python
# test_weather_client.py
import asyncio
import json
from weather_mcp_server import WeatherMCPServer, WeatherConfig
async def test_server():
"""测试天气MCP服务器"""
config = WeatherConfig(api_key="test-key")
server = WeatherMCPServer(config)
# 测试初始化
init_request = {
"jsonrpc": "2.0",
"id": "1",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "test-client", "version": "1.0"}
}
}
response = await server.handle_request(init_request)
print("Initialize response:", json.dumps(response, indent=2))
# 测试工具列表
tools_request = {
"jsonrpc": "2.0",
"id": "2",
"method": "tools/list"
}
response = await server.handle_request(tools_request)
print("\nTools list:", json.dumps(response, indent=2))
if __name__ == "__main__":
asyncio.run(test_server())
总结¶
MCP协议代表了AI工具集成的标准化方向,其核心价值在于:
- 标准化接口:统一的协议规范使工具开发和集成更加简单
- 开放生态:开源协议促进社区贡献和工具共享
- 安全可控:内置权限管理和审计机制
- 灵活部署:支持多种传输方式和部署模式
随着MCP生态的发展,我们可以期待: - 更多官方和社区工具服务器 - 更完善的权限和安全机制 - 与更多AI平台的深度集成 - 工具编排和自动化工作流
MCP正在成为AI应用开发的基础设施,类似于HTTP对于Web的意义。
参考资源¶
官方资源¶
- MCP Specification: https://modelcontextprotocol.io
- MCP GitHub: https://github.com/modelcontextprotocol
- Python SDK: https://github.com/modelcontextprotocol/python-sdk
- TypeScript SDK: https://github.com/modelcontextprotocol/typescript-sdk
社区资源¶
- MCP Servers Collection: https://github.com/modelcontextprotocol/servers
- Awesome MCP: https://github.com/punkpeye/awesome-mcp-servers
- MCP Inspector: https://github.com/modelcontextprotocol/inspector
教程与文档¶
- MCP Quick Start: https://modelcontextprotocol.io/quickstart
- MCP Server Development: https://modelcontextprotocol.io/development
- Security Best Practices: https://modelcontextprotocol.io/security
文档版本: 1.0 作者: AI Learning Team
最后更新日期:2026-02-12 适用版本:LLM学习教程 v2026