项目5 - 多智能体协作¶
难度: ⭐⭐⭐⭐⭐ 专家级 预计时间: 5-8小时 目标: 实现多智能体协作/竞争策略
1. 项目介绍¶
1.1 多智能体强化学习¶
与单智能体的区别: - 多个智能体同时学习和决策 - 智能体之间可能存在协作或竞争关系 - 环境非平稳(其他智能体的策略在变化)
典型场景: - 机器人团队协作 - 自动驾驶车队 - 游戏AI(如Dota2、星际争霸) - 资源分配
1.2 问题分类¶
| 类型 | 描述 | 示例 |
|---|---|---|
| 完全协作 | 所有智能体共享相同奖励 | 团队任务 |
| 完全竞争 | 零和博弈 | 棋类游戏 |
| 混合 | 既有协作又有竞争 | 足球比赛 |
1.3 成功标准¶
- 智能体学会协作策略
- 系统整体性能优于单智能体
- 良好的可扩展性
2. 多智能体环境¶
2.1 环境设计¶
Python
import numpy as np
import gymnasium as gym
from gymnasium import spaces
class MultiAgentEnv:
"""多智能体环境基类"""
def __init__(self, n_agents=2, grid_size=10):
self.n_agents = n_agents
self.grid_size = grid_size
# 每个智能体的观察空间和动作空间
self.observation_space = spaces.Box(
low=0, high=1, shape=(grid_size, grid_size, 3), dtype=np.float32
)
self.action_space = spaces.Discrete(5) # 0:不动, 1-4:上下左右
self.reset()
def reset(self):
"""重置环境"""
# 随机初始化智能体位置
self.agent_positions = [
(np.random.randint(0, self.grid_size),
np.random.randint(0, self.grid_size))
for _ in range(self.n_agents)
]
# 随机目标位置
self.target_positions = [
(np.random.randint(0, self.grid_size),
np.random.randint(0, self.grid_size))
for _ in range(self.n_agents)
]
return self._get_observations(), {}
def _get_observations(self):
"""获取所有智能体的观察"""
observations = []
for i in range(self.n_agents):
obs = np.zeros((self.grid_size, self.grid_size, 3))
# 标记所有智能体位置(通道0)
for j, pos in enumerate(self.agent_positions): # enumerate同时获取索引和元素
if j == i:
obs[pos[0], pos[1], 0] = 1.0 # 自己
else:
obs[pos[0], pos[1], 1] = 0.5 # 其他智能体
# 标记目标位置(通道2)
for target in self.target_positions:
obs[target[0], target[1], 2] = 1.0
observations.append(obs)
return observations
def step(self, actions):
"""
执行动作
参数:
actions: 所有智能体的动作列表
返回:
observations, rewards, done, info
"""
# 移动智能体
for i, action in enumerate(actions):
x, y = self.agent_positions[i]
if action == 1: # 上
x = max(0, x - 1)
elif action == 2: # 下
x = min(self.grid_size - 1, x + 1)
elif action == 3: # 左
y = max(0, y - 1)
elif action == 4: # 右
y = min(self.grid_size - 1, y + 1)
self.agent_positions[i] = (x, y)
# 计算奖励(协作:所有智能体共享奖励)
rewards = self._compute_rewards()
# 检查是否完成
done = self._check_done()
observations = self._get_observations()
info = {'agent_positions': self.agent_positions}
return observations, rewards, done, False, info
def _compute_rewards(self):
"""计算奖励(协作场景)"""
total_reward = 0
# 每个智能体到达目标的奖励
for i, agent_pos in enumerate(self.agent_positions):
target_pos = self.target_positions[i]
distance = abs(agent_pos[0] - target_pos[0]) + abs(agent_pos[1] - target_pos[1])
if distance == 0:
total_reward += 10.0 # 到达目标
else:
total_reward -= 0.1 * distance # 距离惩罚
# 所有智能体共享相同奖励
rewards = [total_reward] * self.n_agents
return rewards
def _check_done(self):
"""检查是否完成"""
# 所有智能体都到达目标
for i, agent_pos in enumerate(self.agent_positions):
if agent_pos != self.target_positions[i]:
return False
return True
def render(self):
"""可视化"""
grid = np.zeros((self.grid_size, self.grid_size), dtype=str)
grid[:] = '.'
# 标记目标
for target in self.target_positions:
grid[target[0], target[1]] = 'T'
# 标记智能体
for i, pos in enumerate(self.agent_positions):
grid[pos[0], pos[1]] = str(i)
print('\n'.join([' '.join(row) for row in grid]))
print()
3. 独立学习者方法¶
3.1 独立Q-Learning¶
Python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
class MultiAgentQLearning:
"""独立Q-Learning(每个智能体独立学习)"""
def __init__(self, n_agents, obs_shape, n_actions, lr=1e-3, gamma=0.99):
self.n_agents = n_agents
self.n_actions = n_actions
self.gamma = gamma
# 为每个智能体创建Q网络
self.q_networks = [
self._build_network(obs_shape, n_actions)
for _ in range(n_agents)
]
self.optimizers = [
optim.Adam(net.parameters(), lr=lr)
for net in self.q_networks
]
# 每个智能体的经验回放缓冲区
self.replay_buffers = [deque(maxlen=10000) for _ in range(n_agents)]
self.epsilon = 1.0
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
def _build_network(self, obs_shape, n_actions):
"""构建Q网络"""
return nn.Sequential(
nn.Conv2d(obs_shape[2], 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=3, padding=1),
nn.ReLU(),
nn.Flatten(),
nn.Linear(64 * obs_shape[0] * obs_shape[1], 256),
nn.ReLU(),
nn.Linear(256, n_actions)
)
def select_actions(self, observations, training=True):
"""为所有智能体选择动作"""
actions = []
for i, obs in enumerate(observations):
if training and random.random() < self.epsilon:
action = random.randint(0, self.n_actions - 1)
else:
with torch.no_grad(): # 禁用梯度计算,节省内存
obs_tensor = torch.FloatTensor(obs).unsqueeze(0).permute(0, 3, 1, 2) # unsqueeze增加一个维度
q_values = self.q_networks[i](obs_tensor)
action = q_values.argmax().item() # 将单元素张量转为Python数值
actions.append(action)
return actions
def store_transitions(self, observations, actions, rewards, next_observations, done):
"""存储经验"""
for i in range(self.n_agents):
self.replay_buffers[i].append((
observations[i], actions[i], rewards[i],
next_observations[i], done
))
def update(self, batch_size=32):
"""更新所有智能体"""
losses = []
for i in range(self.n_agents):
if len(self.replay_buffers[i]) < batch_size:
continue
# 采样
batch = random.sample(self.replay_buffers[i], batch_size)
obs, actions, rewards, next_obs, dones = zip(*batch) # zip按位置配对
# 转换为张量
obs = torch.FloatTensor(np.array(obs)).permute(0, 3, 1, 2) # np.array创建NumPy数组
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_obs = torch.FloatTensor(np.array(next_obs)).permute(0, 3, 1, 2)
dones = torch.FloatTensor(dones)
# Q-Learning更新
current_q = self.q_networks[i](obs).gather(1, actions.unsqueeze(1)).squeeze() # squeeze压缩维度
with torch.no_grad():
next_q = self.q_networks[i](next_obs).max(1)[0]
target_q = rewards + self.gamma * next_q * (1 - dones)
loss = nn.MSELoss()(current_q, target_q)
self.optimizers[i].zero_grad() # 清零梯度
loss.backward() # 反向传播计算梯度
self.optimizers[i].step() # 更新参数
losses.append(loss.item())
# 衰减探索率
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
return np.mean(losses) if losses else 0
4. 参数共享方法¶
4.1 参数共享的优势¶
- 样本效率:所有智能体共享经验
- 可扩展性:智能体数量不影响网络大小
- 对称性:相同类型的智能体学习相同策略
4.2 参数共享实现¶
Python
class ParameterSharingQLearning:
"""参数共享Q-Learning"""
def __init__(self, n_agents, obs_shape, n_actions, lr=1e-3, gamma=0.99):
self.n_agents = n_agents
self.n_actions = n_actions
self.gamma = gamma
# 所有智能体共享同一个Q网络
self.q_network = self._build_network(obs_shape, n_actions)
self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
# 共享的经验回放缓冲区
self.replay_buffer = deque(maxlen=100000)
self.epsilon = 1.0
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
def _build_network(self, obs_shape, n_actions):
"""构建Q网络"""
return nn.Sequential(
nn.Conv2d(obs_shape[2], 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=3, padding=1),
nn.ReLU(),
nn.Flatten(),
nn.Linear(64 * obs_shape[0] * obs_shape[1], 256),
nn.ReLU(),
nn.Linear(256, n_actions)
)
def select_actions(self, observations, agent_ids=None, training=True):
"""为所有智能体选择动作"""
actions = []
for i, obs in enumerate(observations):
if training and random.random() < self.epsilon:
action = random.randint(0, self.n_actions - 1)
else:
with torch.no_grad():
obs_tensor = torch.FloatTensor(obs).unsqueeze(0).permute(0, 3, 1, 2)
q_values = self.q_network(obs_tensor)
action = q_values.argmax().item()
actions.append(action)
return actions
def store_transitions(self, observations, actions, rewards, next_observations, done):
"""存储所有智能体的经验到共享缓冲区"""
for i in range(self.n_agents):
self.replay_buffer.append((
observations[i], actions[i], rewards[i],
next_observations[i], done
))
def update(self, batch_size=64):
"""更新共享网络"""
if len(self.replay_buffer) < batch_size:
return 0
# 采样
batch = random.sample(self.replay_buffer, batch_size)
obs, actions, rewards, next_obs, dones = zip(*batch)
# 转换为张量
obs = torch.FloatTensor(np.array(obs)).permute(0, 3, 1, 2)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_obs = torch.FloatTensor(np.array(next_obs)).permute(0, 3, 1, 2)
dones = torch.FloatTensor(dones)
# Q-Learning更新
current_q = self.q_network(obs).gather(1, actions.unsqueeze(1)).squeeze()
with torch.no_grad():
next_q = self.q_network(next_obs).max(1)[0]
target_q = rewards + self.gamma * next_q * (1 - dones)
loss = nn.MSELoss()(current_q, target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 衰减探索率
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
return loss.item()
5. 训练流程¶
Python
def train_multi_agent(env, agent, num_episodes=1000):
"""训练多智能体系统"""
episode_rewards = []
for episode in range(num_episodes):
observations, _ = env.reset()
episode_reward = 0
done = False
step = 0
while not done and step < 100:
# 选择动作
actions = agent.select_actions(observations, training=True)
# 执行动作
next_observations, rewards, done, _, info = env.step(actions)
# 存储经验
agent.store_transitions(observations, actions, rewards, next_observations, done)
# 更新
loss = agent.update()
episode_reward += sum(rewards)
observations = next_observations
step += 1
episode_rewards.append(episode_reward)
if (episode + 1) % 100 == 0:
avg_reward = np.mean(episode_rewards[-100:])
print(f"Episode {episode + 1}, Avg Reward: {avg_reward:.2f}, "
f"Epsilon: {agent.epsilon:.3f}")
return agent, episode_rewards
# 对比实验
def compare_methods():
"""对比独立学习者和参数共享"""
n_agents = 3
env = MultiAgentEnv(n_agents=n_agents)
obs_shape = (10, 10, 3)
n_actions = 5
# 独立学习者
print("=== Training Independent Learners ===")
independent_agent = MultiAgentQLearning(n_agents, obs_shape, n_actions)
_, rewards_independent = train_multi_agent(env, independent_agent, num_episodes=500)
# 参数共享
print("\n=== Training Parameter Sharing ===")
sharing_agent = ParameterSharingQLearning(n_agents, obs_shape, n_actions)
_, rewards_sharing = train_multi_agent(env, sharing_agent, num_episodes=500)
# 绘制对比
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(rewards_independent, label='Independent', alpha=0.7)
plt.plot(rewards_sharing, label='Parameter Sharing', alpha=0.7)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Multi-Agent Learning Comparison')
plt.legend()
plt.grid(True, alpha=0.3)
plt.savefig('multi_agent_comparison.png', dpi=150)
plt.show()
if __name__ == "__main__":
compare_methods()
6. 项目总结¶
学到的技能¶
- 多智能体环境设计
- 独立学习者方法
- 参数共享方法
- 协作策略学习
关键概念¶
Text Only
多智能体RL:
├── 环境非平稳
├── 信用分配问题
├── 方法:
│ ├── 独立学习者
│ ├── 参数共享
│ └── 集中训练分散执行
└── 挑战:
├── 可扩展性
└── 通信与协调
扩展方向¶
- MADDPG:多智能体DDPG
- QMIX:值函数分解
- MAPPO:多智能体PPO
- 通信学习:学习何时通信
恭喜完成所有实战项目!
→ 回到:05-实战项目README