跳转至

01 - PPO算法:近端策略优化

学习时间: 4-5小时 重要性: ⭐⭐⭐⭐⭐ 最实用的策略梯度算法 前置知识: 策略梯度、Actor-Critic


🎯 学习目标

完成本章后,你将能够: - 理解PPO的核心思想(裁剪目标函数) - 掌握PPO-Clip和PPO-Penalty两种变体 - 实现完整的PPO算法 - 理解PPO相比TRPO的优势 - 应用PPO解决连续控制问题


1. PPO简介

1.1 从TRPO到PPO

TRPO的问题: - 需要计算Fisher信息矩阵 - 使用共轭梯度法,计算复杂 - 实现困难

PPO的改进: - 使用一阶优化(梯度下降) - 实现简单,效果相当 - 成为最流行的策略梯度算法

1.2 核心思想

目标:限制策略更新的幅度,避免破坏性的大更新

从TRPO到PPO的简化推导

TRPO需要求解带约束的优化问题(KL约束 + 共轭梯度 + 线性搜索),计算复杂。PPO的关键洞察是:可以用更简单的方式近似TRPO的约束效果。

  • TRPO约束:\(\max_\theta L(\theta)\) s.t. \(D_{KL}(\pi_{\theta_{old}} \| \pi_\theta) \leq \delta\)
  • PPO-Penalty:将KL约束变成惩罚项 \(L(\theta) - \beta D_{KL}\)(拉格朗日松弛)
  • PPO-Clip:直接裁剪概率比率 \(r_t(\theta)\),当 \(r_t\) 偏离1太远时梯度为0

裁剪操作隐式地限制了KL散度:当 \(\pi_\theta\)\(\pi_{\theta_{old}}\) 差异过大时,\(r_t(\theta)\) 会超出 \([1-\epsilon, 1+\epsilon]\) 范围,被裁剪后梯度为零,从而阻止进一步偏离。这用一阶优化实现了TRPO二阶优化的效果。

方法: 1. PPO-Clip:裁剪概率比率 2. PPO-Penalty:KL散度惩罚


2. PPO-Clip

2.1 裁剪目标函数

概率比率

\[r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}\]

裁剪目标

\[L^{CLIP}(\theta) = \mathbb{E}_t[\min(r_t(\theta)A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)A_t)]\]

分情况分析(为什么clip能防止过大更新)

  • \(A_t > 0\)(好动作,希望增大概率)
  • 目标变为 \(\min(r_t A_t, (1+\epsilon) A_t)\)
  • \(r_t < 1+\epsilon\) 时,梯度正常推动 \(r_t\) 增大
  • \(r_t > 1+\epsilon\) 时,\(\min\) 选择 \((1+\epsilon)A_t\)(常数),梯度为零
  • 效果:允许适度增大概率,但不超过 \(1+\epsilon\)

  • \(A_t < 0\)(坏动作,希望减小概率)

  • 目标变为 \(\min(r_t A_t, (1-\epsilon) A_t)\),由于 \(A_t < 0\)\(\min\) 等价于选取 \(|r_t A_t|\) 更大的项
  • \(r_t > 1-\epsilon\) 时,梯度正常推动 \(r_t\) 减小
  • \(r_t < 1-\epsilon\) 时,\(\min\) 选择 \((1-\epsilon)A_t\)(常数),梯度为零
  • 效果:允许适度减小概率,但不低于 \(1-\epsilon\)

总结:无论优势正负,clip都会在概率比率偏离1过远时将梯度截断为零,隐式地约束了新旧策略之间的KL散度,从而防止破坏性的大步更新。

2.2 完整目标函数

\[L^{CLIP+VF+S}(\theta) = \mathbb{E}_t[L_t^{CLIP}(\theta) - c_1 L_t^{VF}(\theta) + c_2 S[\pi_\theta](s_t)]\]

其中: - \(L_t^{VF}\):值函数损失(MSE) - \(S\):策略熵(鼓励探索)

2.3 代码实现

Python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.distributions import Categorical

class ActorCritic(nn.Module):  # 继承nn.Module定义网络层
    """Actor-Critic网络"""

    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(ActorCritic, self).__init__()

        # 共享层
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh()
        )

        # Actor头
        self.actor = nn.Linear(hidden_dim, action_dim)

        # Critic头
        self.critic = nn.Linear(hidden_dim, 1)

    def forward(self, state):
        features = self.shared(state)
        logits = self.actor(features)
        value = self.critic(features)
        return logits, value

    def get_action_and_value(self, state, action=None):
        """获取动作、对数概率和价值"""
        logits, value = self.forward(state)
        dist = Categorical(logits=logits)

        if action is None:
            action = dist.sample()

        log_prob = dist.log_prob(action)
        entropy = dist.entropy()

        return action, log_prob, entropy, value.squeeze(-1)  # squeeze压缩维度

class PPO:
    """PPO算法"""

    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
                 gae_lambda=0.95, clip_epsilon=0.2,
                 value_coef=0.5, entropy_coef=0.01,
                 max_grad_norm=0.5, update_epochs=10, batch_size=64):

        self.gamma = gamma
        self.gae_lambda = gae_lambda
        self.clip_epsilon = clip_epsilon
        self.value_coef = value_coef
        self.entropy_coef = entropy_coef
        self.max_grad_norm = max_grad_norm
        self.update_epochs = update_epochs
        self.batch_size = batch_size

        # 网络
        self.ac = ActorCritic(state_dim, action_dim)
        self.optimizer = optim.Adam(self.ac.parameters(), lr=lr)

    def compute_gae(self, rewards, values, dones, next_value):
        """计算广义优势估计(GAE)"""
        advantages = []
        gae = 0

        for t in reversed(range(len(rewards))):
            if t == len(rewards) - 1:
                next_val = next_value
            else:
                next_val = values[t + 1]

            delta = rewards[t] + self.gamma * next_val * (1 - dones[t]) - values[t]
            gae = delta + self.gamma * self.gae_lambda * (1 - dones[t]) * gae
            advantages.insert(0, gae)

        advantages = torch.tensor(advantages, dtype=torch.float32)
        returns = advantages + torch.tensor(values, dtype=torch.float32)

        return advantages, returns

    def update(self, states, actions, old_log_probs, rewards, values, dones, next_value):
        """PPO更新"""
        # 计算优势函数和回报
        advantages, returns = self.compute_gae(rewards, values, dones, next_value)

        # 归一化优势
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        # 转换为张量
        states = torch.stack(states)  # torch.stack沿新维度拼接张量
        actions = torch.tensor(actions)
        old_log_probs = torch.stack(old_log_probs)

        # 多轮更新
        for _ in range(self.update_epochs):
            # 重新计算log_prob和价值
            _, new_log_probs, entropy, new_values = self.ac.get_action_and_value(states, actions)

            # 概率比率
            ratio = torch.exp(new_log_probs - old_log_probs)

            # 裁剪目标
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()

            # 值函数损失
            value_loss = nn.MSELoss()(new_values, returns)

            # 熵奖励
            entropy_loss = -entropy.mean()

            # 总损失
            loss = actor_loss + self.value_coef * value_loss + self.entropy_coef * entropy_loss

            # 优化
            self.optimizer.zero_grad()  # 清零梯度
            loss.backward()  # 反向传播计算梯度
            nn.utils.clip_grad_norm_(self.ac.parameters(), self.max_grad_norm)
            self.optimizer.step()  # 更新参数

        return loss.item()  # 将单元素张量转为Python数值

    def select_action(self, state):
        """选择动作"""
        with torch.no_grad():  # 禁用梯度计算,节省内存
            state = torch.FloatTensor(state)
            action, log_prob, _, value = self.ac.get_action_and_value(state)
            return action.item(), log_prob, value.item()

3. PPO-Penalty

3.1 KL散度惩罚

目标函数

\[L^{KLPEN}(\theta) = \mathbb{E}_t\left[\frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)} A_t - \beta \text{KL}[\pi_{\theta_{old}}(\cdot|s_t) || \pi_\theta(\cdot|s_t)]\right]\]

自适应调整: - 如果KL太大,增加\(\beta\) - 如果KL太小,减小\(\beta\)


4. 关键技巧

4.1 广义优势估计(GAE)

\[\hat{A}_t^{GAE(\gamma,\lambda)} = \sum_{l=0}^{\infty} (\gamma\lambda)^l \delta_{t+l}^V\]

作用:平衡偏差和方差

4.2 多轮更新

  • 使用同一批数据进行多次梯度更新
  • 通常4-10轮

4.3 小批量更新

  • 将数据分成小批次
  • 增加更新稳定性

5. 超参数调优

超参数 典型值 说明
学习率 3e-4 Adam优化器
γ 0.99 折扣因子
GAE λ 0.95 优势估计参数
ε 0.2 裁剪参数
更新轮数 4-10 每批数据更新次数
批次大小 64-512 小批量大小

6. 本章总结

核心概念

Text Only
PPO:
├── 核心思想: 限制策略更新幅度
├── PPO-Clip: 裁剪概率比率
│   └── L^CLIP = min(rA, clip(r)A)
├── PPO-Penalty: KL散度惩罚
└── 优势:
    ├── 实现简单
    ├── 样本效率高
    └── 性能稳定

关键技巧:
├── GAE: 低方差优势估计
├── 多轮更新: 充分利用数据
└── 小批量: 稳定训练

✅ 自测问题

  1. PPO相比TRPO有什么优势?

  2. 裁剪目标函数如何限制策略更新?

  3. GAE中的λ参数有什么作用?


📚 延伸阅读

  1. Schulman et al. (2017)
  2. "Proximal Policy Optimization Algorithms"
  3. arXiv:1707.06347

→ 下一步:02-SAC算法.md