跳转至

项目5 - 多智能体协作

难度: ⭐⭐⭐⭐⭐ 专家级 预计时间: 5-8小时 目标: 实现多智能体协作/竞争策略


1. 项目介绍

1.1 多智能体强化学习

与单智能体的区别: - 多个智能体同时学习和决策 - 智能体之间可能存在协作或竞争关系 - 环境非平稳(其他智能体的策略在变化)

典型场景: - 机器人团队协作 - 自动驾驶车队 - 游戏AI(如Dota2、星际争霸) - 资源分配

1.2 问题分类

类型 描述 示例
完全协作 所有智能体共享相同奖励 团队任务
完全竞争 零和博弈 棋类游戏
混合 既有协作又有竞争 足球比赛

1.3 成功标准

  • 智能体学会协作策略
  • 系统整体性能优于单智能体
  • 良好的可扩展性

2. 多智能体环境

2.1 环境设计

Python
import numpy as np
import gymnasium as gym
from gymnasium import spaces

class MultiAgentEnv:
    """多智能体环境基类"""

    def __init__(self, n_agents=2, grid_size=10):
        self.n_agents = n_agents
        self.grid_size = grid_size

        # 每个智能体的观察空间和动作空间
        self.observation_space = spaces.Box(
            low=0, high=1, shape=(grid_size, grid_size, 3), dtype=np.float32
        )
        self.action_space = spaces.Discrete(5)  # 0:不动, 1-4:上下左右

        self.reset()

    def reset(self):
        """重置环境"""
        # 随机初始化智能体位置
        self.agent_positions = [
            (np.random.randint(0, self.grid_size),
             np.random.randint(0, self.grid_size))
            for _ in range(self.n_agents)
        ]

        # 随机目标位置
        self.target_positions = [
            (np.random.randint(0, self.grid_size),
             np.random.randint(0, self.grid_size))
            for _ in range(self.n_agents)
        ]

        return self._get_observations(), {}

    def _get_observations(self):
        """获取所有智能体的观察"""
        observations = []

        for i in range(self.n_agents):
            obs = np.zeros((self.grid_size, self.grid_size, 3))

            # 标记所有智能体位置(通道0)
            for j, pos in enumerate(self.agent_positions):  # enumerate同时获取索引和元素
                if j == i:
                    obs[pos[0], pos[1], 0] = 1.0  # 自己
                else:
                    obs[pos[0], pos[1], 1] = 0.5  # 其他智能体

            # 标记目标位置(通道2)
            for target in self.target_positions:
                obs[target[0], target[1], 2] = 1.0

            observations.append(obs)

        return observations

    def step(self, actions):
        """
        执行动作

        参数:
            actions: 所有智能体的动作列表

        返回:
            observations, rewards, done, info
        """
        # 移动智能体
        for i, action in enumerate(actions):
            x, y = self.agent_positions[i]

            if action == 1:  # 上
                x = max(0, x - 1)
            elif action == 2:  # 下
                x = min(self.grid_size - 1, x + 1)
            elif action == 3:  # 左
                y = max(0, y - 1)
            elif action == 4:  # 右
                y = min(self.grid_size - 1, y + 1)

            self.agent_positions[i] = (x, y)

        # 计算奖励(协作:所有智能体共享奖励)
        rewards = self._compute_rewards()

        # 检查是否完成
        done = self._check_done()

        observations = self._get_observations()
        info = {'agent_positions': self.agent_positions}

        return observations, rewards, done, False, info

    def _compute_rewards(self):
        """计算奖励(协作场景)"""
        total_reward = 0

        # 每个智能体到达目标的奖励
        for i, agent_pos in enumerate(self.agent_positions):
            target_pos = self.target_positions[i]
            distance = abs(agent_pos[0] - target_pos[0]) + abs(agent_pos[1] - target_pos[1])

            if distance == 0:
                total_reward += 10.0  # 到达目标
            else:
                total_reward -= 0.1 * distance  # 距离惩罚

        # 所有智能体共享相同奖励
        rewards = [total_reward] * self.n_agents

        return rewards

    def _check_done(self):
        """检查是否完成"""
        # 所有智能体都到达目标
        for i, agent_pos in enumerate(self.agent_positions):
            if agent_pos != self.target_positions[i]:
                return False
        return True

    def render(self):
        """可视化"""
        grid = np.zeros((self.grid_size, self.grid_size), dtype=str)
        grid[:] = '.'

        # 标记目标
        for target in self.target_positions:
            grid[target[0], target[1]] = 'T'

        # 标记智能体
        for i, pos in enumerate(self.agent_positions):
            grid[pos[0], pos[1]] = str(i)

        print('\n'.join([' '.join(row) for row in grid]))
        print()

3. 独立学习者方法

3.1 独立Q-Learning

Python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random

class MultiAgentQLearning:
    """独立Q-Learning(每个智能体独立学习)"""

    def __init__(self, n_agents, obs_shape, n_actions, lr=1e-3, gamma=0.99):
        self.n_agents = n_agents
        self.n_actions = n_actions
        self.gamma = gamma

        # 为每个智能体创建Q网络
        self.q_networks = [
            self._build_network(obs_shape, n_actions)
            for _ in range(n_agents)
        ]

        self.optimizers = [
            optim.Adam(net.parameters(), lr=lr)
            for net in self.q_networks
        ]

        # 每个智能体的经验回放缓冲区
        self.replay_buffers = [deque(maxlen=10000) for _ in range(n_agents)]

        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01

    def _build_network(self, obs_shape, n_actions):
        """构建Q网络"""
        return nn.Sequential(
            nn.Conv2d(obs_shape[2], 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64 * obs_shape[0] * obs_shape[1], 256),
            nn.ReLU(),
            nn.Linear(256, n_actions)
        )

    def select_actions(self, observations, training=True):
        """为所有智能体选择动作"""
        actions = []

        for i, obs in enumerate(observations):
            if training and random.random() < self.epsilon:
                action = random.randint(0, self.n_actions - 1)
            else:
                with torch.no_grad():  # 禁用梯度计算,节省内存
                    obs_tensor = torch.FloatTensor(obs).unsqueeze(0).permute(0, 3, 1, 2)  # unsqueeze增加一个维度
                    q_values = self.q_networks[i](obs_tensor)
                    action = q_values.argmax().item()  # 将单元素张量转为Python数值

            actions.append(action)

        return actions

    def store_transitions(self, observations, actions, rewards, next_observations, done):
        """存储经验"""
        for i in range(self.n_agents):
            self.replay_buffers[i].append((
                observations[i], actions[i], rewards[i],
                next_observations[i], done
            ))

    def update(self, batch_size=32):
        """更新所有智能体"""
        losses = []

        for i in range(self.n_agents):
            if len(self.replay_buffers[i]) < batch_size:
                continue

            # 采样
            batch = random.sample(self.replay_buffers[i], batch_size)
            obs, actions, rewards, next_obs, dones = zip(*batch)  # zip按位置配对

            # 转换为张量
            obs = torch.FloatTensor(np.array(obs)).permute(0, 3, 1, 2)  # np.array创建NumPy数组
            actions = torch.LongTensor(actions)
            rewards = torch.FloatTensor(rewards)
            next_obs = torch.FloatTensor(np.array(next_obs)).permute(0, 3, 1, 2)
            dones = torch.FloatTensor(dones)

            # Q-Learning更新
            current_q = self.q_networks[i](obs).gather(1, actions.unsqueeze(1)).squeeze()  # squeeze压缩维度

            with torch.no_grad():
                next_q = self.q_networks[i](next_obs).max(1)[0]
                target_q = rewards + self.gamma * next_q * (1 - dones)

            loss = nn.MSELoss()(current_q, target_q)

            self.optimizers[i].zero_grad()  # 清零梯度
            loss.backward()  # 反向传播计算梯度
            self.optimizers[i].step()  # 更新参数

            losses.append(loss.item())

        # 衰减探索率
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

        return np.mean(losses) if losses else 0

4. 参数共享方法

4.1 参数共享的优势

  • 样本效率:所有智能体共享经验
  • 可扩展性:智能体数量不影响网络大小
  • 对称性:相同类型的智能体学习相同策略

4.2 参数共享实现

Python
class ParameterSharingQLearning:
    """参数共享Q-Learning"""

    def __init__(self, n_agents, obs_shape, n_actions, lr=1e-3, gamma=0.99):
        self.n_agents = n_agents
        self.n_actions = n_actions
        self.gamma = gamma

        # 所有智能体共享同一个Q网络
        self.q_network = self._build_network(obs_shape, n_actions)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)

        # 共享的经验回放缓冲区
        self.replay_buffer = deque(maxlen=100000)

        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01

    def _build_network(self, obs_shape, n_actions):
        """构建Q网络"""
        return nn.Sequential(
            nn.Conv2d(obs_shape[2], 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64 * obs_shape[0] * obs_shape[1], 256),
            nn.ReLU(),
            nn.Linear(256, n_actions)
        )

    def select_actions(self, observations, agent_ids=None, training=True):
        """为所有智能体选择动作"""
        actions = []

        for i, obs in enumerate(observations):
            if training and random.random() < self.epsilon:
                action = random.randint(0, self.n_actions - 1)
            else:
                with torch.no_grad():
                    obs_tensor = torch.FloatTensor(obs).unsqueeze(0).permute(0, 3, 1, 2)
                    q_values = self.q_network(obs_tensor)
                    action = q_values.argmax().item()

            actions.append(action)

        return actions

    def store_transitions(self, observations, actions, rewards, next_observations, done):
        """存储所有智能体的经验到共享缓冲区"""
        for i in range(self.n_agents):
            self.replay_buffer.append((
                observations[i], actions[i], rewards[i],
                next_observations[i], done
            ))

    def update(self, batch_size=64):
        """更新共享网络"""
        if len(self.replay_buffer) < batch_size:
            return 0

        # 采样
        batch = random.sample(self.replay_buffer, batch_size)
        obs, actions, rewards, next_obs, dones = zip(*batch)

        # 转换为张量
        obs = torch.FloatTensor(np.array(obs)).permute(0, 3, 1, 2)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_obs = torch.FloatTensor(np.array(next_obs)).permute(0, 3, 1, 2)
        dones = torch.FloatTensor(dones)

        # Q-Learning更新
        current_q = self.q_network(obs).gather(1, actions.unsqueeze(1)).squeeze()

        with torch.no_grad():
            next_q = self.q_network(next_obs).max(1)[0]
            target_q = rewards + self.gamma * next_q * (1 - dones)

        loss = nn.MSELoss()(current_q, target_q)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # 衰减探索率
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

        return loss.item()

5. 训练流程

Python
def train_multi_agent(env, agent, num_episodes=1000):
    """训练多智能体系统"""

    episode_rewards = []

    for episode in range(num_episodes):
        observations, _ = env.reset()
        episode_reward = 0
        done = False
        step = 0

        while not done and step < 100:
            # 选择动作
            actions = agent.select_actions(observations, training=True)

            # 执行动作
            next_observations, rewards, done, _, info = env.step(actions)

            # 存储经验
            agent.store_transitions(observations, actions, rewards, next_observations, done)

            # 更新
            loss = agent.update()

            episode_reward += sum(rewards)
            observations = next_observations
            step += 1

        episode_rewards.append(episode_reward)

        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episode {episode + 1}, Avg Reward: {avg_reward:.2f}, "
                  f"Epsilon: {agent.epsilon:.3f}")

    return agent, episode_rewards

# 对比实验
def compare_methods():
    """对比独立学习者和参数共享"""

    n_agents = 3
    env = MultiAgentEnv(n_agents=n_agents)
    obs_shape = (10, 10, 3)
    n_actions = 5

    # 独立学习者
    print("=== Training Independent Learners ===")
    independent_agent = MultiAgentQLearning(n_agents, obs_shape, n_actions)
    _, rewards_independent = train_multi_agent(env, independent_agent, num_episodes=500)

    # 参数共享
    print("\n=== Training Parameter Sharing ===")
    sharing_agent = ParameterSharingQLearning(n_agents, obs_shape, n_actions)
    _, rewards_sharing = train_multi_agent(env, sharing_agent, num_episodes=500)

    # 绘制对比
    import matplotlib.pyplot as plt

    plt.figure(figsize=(10, 6))
    plt.plot(rewards_independent, label='Independent', alpha=0.7)
    plt.plot(rewards_sharing, label='Parameter Sharing', alpha=0.7)
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('Multi-Agent Learning Comparison')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.savefig('multi_agent_comparison.png', dpi=150)
    plt.show()

if __name__ == "__main__":
    compare_methods()

6. 项目总结

学到的技能

  • 多智能体环境设计
  • 独立学习者方法
  • 参数共享方法
  • 协作策略学习

关键概念

Text Only
多智能体RL:
├── 环境非平稳
├── 信用分配问题
├── 方法:
│   ├── 独立学习者
│   ├── 参数共享
│   └── 集中训练分散执行
└── 挑战:
    ├── 可扩展性
    └── 通信与协调

扩展方向

  1. MADDPG:多智能体DDPG
  2. QMIX:值函数分解
  3. MAPPO:多智能体PPO
  4. 通信学习:学习何时通信

恭喜完成所有实战项目!

→ 回到:05-实战项目README