跳转至

03 - DQN改进算法

学习时间: 3-4小时 重要性: ⭐⭐⭐⭐ 提升DQN性能的关键技术 前置知识: DQN基础


🎯 学习目标

完成本章后,你将能够: - 理解DQN的过度估计问题及解决方案 - 掌握Double DQN、Dueling DQN等改进算法 - 实现优先经验回放 - 了解Rainbow等集成方法


1. Double DQN

1.1 过度估计问题

问题:max操作导致Q值被高估 $\(Y_t = R_{t+1} + \gamma \max_a Q(S_{t+1}, a; \theta^-)\)$

原因: - 噪声也会被max选中 - 估计误差总是正向的

1.2 Double DQN解决方案

思想:解耦动作选择和动作评估

\[Y_t = R_{t+1} + \gamma Q(S_{t+1}, \arg\max_a Q(S_{t+1}, a; \theta); \theta^-)\]

1.3 代码实现

Python
class DoubleDQNAgent(DQNAgent):
    """Double DQN智能体"""

    def update(self):
        """Double DQN更新"""
        if len(self.replay_buffer) < self.batch_size:
            return None

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)

        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)

        # 当前Q值
        current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))  # unsqueeze增加一个维度

        # Double DQN目标
        with torch.no_grad():  # 禁用梯度计算,节省内存
            # 用policy网络选择动作
            next_actions = self.policy_net(next_states).argmax(1)
            # 用target网络评估动作
            next_q = self.target_net(next_states).gather(1, next_actions.unsqueeze(1)).squeeze(1)  # squeeze压缩维度
            target_q = rewards + (1 - dones) * self.gamma * next_q

        loss = nn.MSELoss()(current_q.squeeze(), target_q)

        self.optimizer.zero_grad()  # 清零梯度
        loss.backward()  # 反向传播计算梯度
        torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), max_norm=10)
        self.optimizer.step()  # 更新参数

        self.steps += 1
        if self.steps % self.target_update == 0:
            self.target_net.load_state_dict(self.policy_net.state_dict())

        self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)

        return loss.item()  # 将单元素张量转为Python数值

2. Dueling DQN

2.1 核心思想

分解Q函数: $\(Q(s,a) = V(s) + A(s,a) - \frac{1}{|A|}\sum_{a'}A(s,a')\)$

优势: - 某些状态下所有动作价值相近时,直接学习V(s)更高效 - 减少学习冗余

2.2 网络架构

Text Only
Input
Shared Conv Layers
FC Layer
  ↓          ↓
Value Stream  Advantage Stream
  ↓          ↓
V(s)         A(s,a)
  ↓          ↓
    Aggregation Layer
      Q(s,a)

2.3 代码实现

Python
class DuelingDQNNetwork(nn.Module):  # 继承nn.Module定义网络层
    """Dueling DQN网络"""

    def __init__(self, input_shape, n_actions):
        super(DuelingDQNNetwork, self).__init__()

        # 共享卷积层
        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )

        conv_out_size = self._get_conv_out(input_shape)

        # 共享全连接层
        self.fc_shared = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU()
        )

        # Value流
        self.value_stream = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )

        # Advantage流
        self.advantage_stream = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, n_actions)
        )

    def _get_conv_out(self, shape):
        o = self.conv(torch.zeros(1, *shape))
        return int(np.prod(o.size()))

    def forward(self, x):
        conv_out = self.conv(x).view(x.size()[0], -1)  # 重塑张量形状
        shared_out = self.fc_shared(conv_out)

        value = self.value_stream(shared_out)
        advantage = self.advantage_stream(shared_out)

        # Dueling聚合
        q = value + (advantage - advantage.mean(dim=1, keepdim=True))
        return q

3. 优先经验回放 (PER)

3.1 核心思想

问题:均匀采样可能浪费大量训练在简单样本上

解决方案:根据TD误差优先级采样

\[P(i) = \frac{p_i^\alpha}{\sum_k p_k^\alpha}\]

其中 \(p_i = |\delta_i| + \epsilon\)

3.2 重要性采样权重

问题:优先级采样改变了数据分布

解决方案:重要性采样修正

\[w_i = \left(\frac{1}{N} \cdot \frac{1}{P(i)}\right)^\beta\]

3.3 代码实现

Python
import numpy as np

class SumTree:
    """求和树,用于高效采样"""

    def __init__(self, capacity):
        self.capacity = capacity
        self.tree = np.zeros(2 * capacity - 1)
        self.data = np.zeros(capacity, dtype=object)
        self.write = 0
        self.n_entries = 0

    def _propagate(self, idx, change):
        parent = (idx - 1) // 2
        self.tree[parent] += change
        if parent != 0:
            self._propagate(parent, change)

    def _retrieve(self, idx, s):
        left = 2 * idx + 1
        right = left + 1

        if left >= len(self.tree):
            return idx

        if s <= self.tree[left]:
            return self._retrieve(left, s)
        else:
            return self._retrieve(right, s - self.tree[left])

    def total(self):
        return self.tree[0]

    def add(self, priority, data):
        idx = self.write + self.capacity - 1
        self.data[self.write] = data
        self.update(idx, priority)

        self.write = (self.write + 1) % self.capacity
        self.n_entries = min(self.n_entries + 1, self.capacity)

    def update(self, idx, priority):
        change = priority - self.tree[idx]
        self.tree[idx] = priority
        self._propagate(idx, change)

    def get(self, s):
        idx = self._retrieve(0, s)
        data_idx = idx - self.capacity + 1
        return idx, self.tree[idx], self.data[data_idx]

class PrioritizedReplayBuffer:
    """优先经验回放缓冲区"""

    def __init__(self, capacity=100000, alpha=0.6, beta=0.4, beta_increment=0.001):
        self.tree = SumTree(capacity)
        self.alpha = alpha
        self.beta = beta
        self.beta_increment = beta_increment
        self.epsilon = 0.01
        self.capacity = capacity

    def push(self, transition):
        """添加经验,设置最大优先级"""
        max_priority = np.max(self.tree.tree[-self.tree.capacity:])
        if max_priority == 0:
            max_priority = 1.0

        self.tree.add(max_priority, transition)

    def sample(self, batch_size):
        """优先级采样"""
        batch = []
        indices = []
        priorities = []

        segment = self.tree.total() / batch_size

        self.beta = min(1.0, self.beta + self.beta_increment)

        for i in range(batch_size):
            a = segment * i
            b = segment * (i + 1)
            s = np.random.uniform(a, b)

            idx, priority, data = self.tree.get(s)

            priorities.append(priority)
            batch.append(data)
            indices.append(idx)

        # 计算重要性采样权重
        sampling_probs = np.array(priorities) / self.tree.total()  # np.array创建NumPy数组
        is_weights = np.power(self.tree.n_entries * sampling_probs, -self.beta)
        is_weights /= is_weights.max()

        return batch, indices, is_weights

    def update_priorities(self, indices, td_errors):
        """更新优先级(td_errors 可以是 numpy 数组或 PyTorch 张量)"""
        for idx, td_error in zip(indices, td_errors):  # zip按位置配对
            # 使用 float() 确保 PyTorch 张量元素正确转换为 Python 标量
            priority = (abs(float(td_error)) + self.epsilon) ** self.alpha
            self.tree.update(idx, priority)

    def __len__(self):  # __len__定义len()行为
        return self.tree.n_entries

4. Noisy Networks

4.1 核心思想

替代ε-贪婪:用噪声参数替代显式探索

参数噪声: $\(\theta = \mu + \sigma \odot \epsilon\)$

4.2 代码实现

Python
class NoisyLinear(nn.Module):
    """噪声线性层"""

    def __init__(self, in_features, out_features, sigma_init=0.017):
        super(NoisyLinear, self).__init__()

        self.in_features = in_features
        self.out_features = out_features

        # 可学习参数
        self.weight_mu = nn.Parameter(torch.empty(out_features, in_features))
        self.weight_sigma = nn.Parameter(torch.empty(out_features, in_features))
        self.bias_mu = nn.Parameter(torch.empty(out_features))
        self.bias_sigma = nn.Parameter(torch.empty(out_features))

        # 噪声缓冲区
        self.register_buffer('weight_epsilon', torch.empty(out_features, in_features))
        self.register_buffer('bias_epsilon', torch.empty(out_features))

        self.sigma_init = sigma_init
        self.reset_parameters()
        self.reset_noise()

    def reset_parameters(self):
        mu_range = 1 / np.sqrt(self.in_features)
        self.weight_mu.data.uniform_(-mu_range, mu_range)
        self.weight_sigma.data.fill_(self.sigma_init / np.sqrt(self.in_features))
        self.bias_mu.data.uniform_(-mu_range, mu_range)
        self.bias_sigma.data.fill_(self.sigma_init / np.sqrt(self.out_features))

    def reset_noise(self):
        epsilon_in = self._scale_noise(self.in_features)
        epsilon_out = self._scale_noise(self.out_features)
        self.weight_epsilon.copy_(epsilon_out.outer(epsilon_in))
        self.bias_epsilon.copy_(epsilon_out)

    def _scale_noise(self, size):
        x = torch.randn(size)
        return x.sign().mul_(x.abs().sqrt_())

    def forward(self, x):
        if self.training:
            weight = self.weight_mu + self.weight_sigma * self.weight_epsilon
            bias = self.bias_mu + self.bias_sigma * self.bias_epsilon
        else:
            weight = self.weight_mu
            bias = self.bias_mu

        return nn.functional.linear(x, weight, bias)

5. Rainbow DQN

5.1 集成所有改进

Rainbow = DQN + 6个改进: 1. Double DQN 2. Dueling DQN 3. Prioritized Replay 4. Multi-step Learning 5. Distributional RL 6. Noisy Nets

5.2 性能对比

算法 相对性能
DQN 100%
Double DQN 117%
Dueling DQN 127%
PER 128%
Noisy Nets 131%
Rainbow 231%

6. 本章总结

核心改进

Text Only
DQN改进:
├── Double DQN: 解决过度估计
├── Dueling DQN: 分解V和A
├── PER: 优先级采样
├── Noisy Nets: 自适应探索
└── Rainbow: 集成所有改进

选择建议:
├── 基础改进: Double + Dueling
├── 提升采样: 加PER
├── 提升探索: 加Noisy Nets
└── 最佳性能: Rainbow

✅ 自测问题

  1. Double DQN如何解决过度估计问题?

  2. Dueling DQN的架构优势是什么?

  3. 优先经验回放为什么需要重要性采样权重?


📚 延伸阅读

  1. Van Hasselt et al. (2016) - Double DQN
  2. Wang et al. (2016) - Dueling DQN
  3. Schaul et al. (2016) - PER
  4. Hessel et al. (2018) - Rainbow

→ 下一步:04-策略梯度方法.md