跳转至

04 - 策略梯度方法

学习时间: 4-5小时 重要性: ⭐⭐⭐⭐⭐ 直接优化策略的方法 前置知识: 梯度下降、神经网络基础


🎯 学习目标

完成本章后,你将能够: - 理解策略梯度的核心思想 - 掌握REINFORCE算法 - 理解基线(Baseline)的作用 - 实现Actor-Critic方法 - 应用策略梯度解决连续控制问题


1. 策略梯度简介

1.1 从值函数到策略函数

值函数方法的问题: - 需要维护Q表或Q网络 - 需要max操作选择动作 - 难以处理连续动作空间

策略梯度的优势: - 直接参数化策略 - 自然处理连续动作 - 可以学习随机策略

1.2 策略表示

随机策略

\[\pi_\theta(a|s) = P(A_t = a | S_t = s; \theta)\]

常见形式: - 离散动作:Softmax策略 $\(\pi_\theta(a|s) = \frac{e^{h(s,a;\theta)}}{\sum_{a'} e^{h(s,a';\theta)}}\)$

  • 连续动作:高斯策略 $\(\pi_\theta(a|s) = \mathcal{N}(\mu(s;\theta), \sigma^2)\)$

2. 策略梯度定理

2.1 目标函数

期望累积奖励

\[J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}[R(\tau)]\]

其中\(\tau = (s_0, a_0, s_1, a_1, ...)\)是轨迹,\(R(\tau)\)是累积奖励。

2.2 策略梯度定理

定理

\[\nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}[\nabla_\theta \log \pi_\theta(a|s) \cdot Q^{\pi_\theta}(s,a)]\]

直观理解: - \(\nabla_\theta \log \pi_\theta(a|s)\):增加选择动作a的概率的方向 - \(Q^{\pi_\theta}(s,a)\):动作a的好坏程度 - 乘积:好的动作增加概率,坏的动作减少概率

2.3 策略梯度定理完整推导

📌 这是策略优化理论的基石,理解每一步的推导至关重要。

目标函数(episodic情形,以起始状态分布 \(d_0\) 为例):

\[J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^{T} \gamma^t r_t\right] = \sum_s d^{\pi_\theta}(s) \sum_a \pi_\theta(a|s) Q^{\pi_\theta}(s,a)\]

其中 \(d^{\pi_\theta}(s) = \sum_{t=0}^{\infty} \gamma^t P(S_t=s|\pi_\theta)\) 是折扣状态访问频率。

推导过程

步骤1:展开目标函数的梯度

\[\nabla_\theta J(\theta) = \nabla_\theta \sum_s d^{\pi_\theta}(s) \sum_a \pi_\theta(a|s) Q^{\pi_\theta}(s,a)\]

注意 \(d^{\pi_\theta}(s)\)\(Q^{\pi_\theta}(s,a)\) 都依赖于 \(\theta\)。直接对三者同时求导非常复杂。策略梯度定理的精妙之处在于:梯度可以不必对 \(d^{\pi_\theta}\) 求导

步骤2:从单个状态出发

定义从状态 \(s\) 出发的值函数:

\[V^{\pi_\theta}(s) = \sum_a \pi_\theta(a|s) Q^{\pi_\theta}(s,a)\]

\(\theta\) 求梯度:

\[\begin{align} \nabla_\theta V^{\pi_\theta}(s) &= \nabla_\theta \left[\sum_a \pi_\theta(a|s) Q^{\pi_\theta}(s,a)\right] \\ &= \sum_a \left[\nabla_\theta \pi_\theta(a|s) \cdot Q^{\pi_\theta}(s,a) + \pi_\theta(a|s) \cdot \nabla_\theta Q^{\pi_\theta}(s,a)\right] \end{align}\]

步骤3:展开 \(\nabla_\theta Q^{\pi_\theta}(s,a)\)

利用贝尔曼方程 \(Q^{\pi_\theta}(s,a) = R(s,a) + \gamma \sum_{s'} P(s'|s,a) V^{\pi_\theta}(s')\)

\[\nabla_\theta Q^{\pi_\theta}(s,a) = \gamma \sum_{s'} P(s'|s,a) \nabla_\theta V^{\pi_\theta}(s')\]

(注意 \(R(s,a)\)\(P(s'|s,a)\) 不依赖 \(\theta\),所以它们的梯度为0。)

步骤4:代入并递归展开

将步骤3代入步骤2:

\[\begin{align} \nabla_\theta V^{\pi_\theta}(s) &= \sum_a \nabla_\theta \pi_\theta(a|s) Q^{\pi_\theta}(s,a) + \sum_a \pi_\theta(a|s) \gamma \sum_{s'} P(s'|s,a) \nabla_\theta V^{\pi_\theta}(s') \\ &= \sum_a \nabla_\theta \pi_\theta(a|s) Q^{\pi_\theta}(s,a) + \gamma \sum_{s'} P^{\pi_\theta}(s'|s) \nabla_\theta V^{\pi_\theta}(s') \end{align}\]

其中 \(P^{\pi_\theta}(s'|s) = \sum_a \pi_\theta(a|s) P(s'|s,a)\) 是策略下的状态转移概率。

\(\nabla_\theta V^{\pi_\theta}(s')\) 继续递归展开(展开 \(k\) 步):

\[\nabla_\theta V^{\pi_\theta}(s) = \sum_{k=0}^{\infty} \gamma^k \sum_{s'} P^{\pi_\theta}(S_k=s'|S_0=s) \sum_a \nabla_\theta \pi_\theta(a|s') Q^{\pi_\theta}(s',a)\]

步骤5:得到最终结果

对初始状态分布 \(s_0 \sim d_0\) 求期望:

\[\begin{align} \nabla_\theta J(\theta) &= \mathbb{E}_{s_0 \sim d_0}[\nabla_\theta V^{\pi_\theta}(s_0)] \\ &= \sum_s \underbrace{\left(\sum_{k=0}^{\infty} \gamma^k P(S_k=s|\pi_\theta)\right)}_{d^{\pi_\theta}(s)} \sum_a \nabla_\theta \pi_\theta(a|s) Q^{\pi_\theta}(s,a) \\ &= \sum_s d^{\pi_\theta}(s) \sum_a \nabla_\theta \pi_\theta(a|s) Q^{\pi_\theta}(s,a) \end{align}\]

步骤6:引入对数梯度技巧(log-derivative trick)

利用恒等式 \(\nabla_\theta \pi_\theta(a|s) = \pi_\theta(a|s) \nabla_\theta \log \pi_\theta(a|s)\)

\[\boxed{\nabla_\theta J(\theta) = \mathbb{E}_{s \sim d^{\pi_\theta}, a \sim \pi_\theta}[\nabla_\theta \log \pi_\theta(a|s) \cdot Q^{\pi_\theta}(s,a)]}\]

证毕。 \(\blacksquare\)

关键洞察:推导中最重要的结论是——梯度表达式中不包含对状态分布 \(d^{\pi_\theta}(s)\) 的求导。这使得我们可以通过采样来近似梯度,而无需知道状态分布如何依赖于策略参数。

2.4 蒙特卡洛估计

实际计算

\[\nabla_\theta J(\theta) \approx \frac{1}{N} \sum_{i=1}^N \sum_{t=0}^T \nabla_\theta \log \pi_\theta(a_t^{(i)}|s_t^{(i)}) \cdot G_t^{(i)}\]

3. REINFORCE算法

3.1 算法流程

Text Only
初始化策略参数θ

对每个episode:
    根据π_θ生成轨迹τ = (s_0, a_0, r_1, s_1, a_1, ..., s_T)

    对每个时间步t:
        计算回报G_t = Σ γ^k r_{t+k+1}

    策略梯度: ∇J = Σ_t ∇log π_θ(a_t|s_t) · G_t

    更新: θ ← θ + α · ∇J

3.2 代码实现

Python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.distributions import Categorical

class PolicyNetwork(nn.Module):  # 继承nn.Module定义网络层
    """策略网络"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(PolicyNetwork, self).__init__()

        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )

    def forward(self, state):
        """返回动作概率分布"""
        return self.net(state)

    def select_action(self, state):
        """采样动作"""
        probs = self.forward(state)
        # 数值稳定性: 加小 epsilon 防止 log(0) 产生 -inf
        probs = probs + 1e-8
        dist = Categorical(probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action.item(), log_prob  # 将单元素张量转为Python数值

class REINFORCE:
    """REINFORCE算法"""

    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
        self.gamma = gamma

        # 策略网络
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)

    def compute_returns(self, rewards):
        """计算折扣回报"""
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + self.gamma * G
            returns.insert(0, G)

        # 归一化回报(减少方差)
        returns = torch.tensor(returns, dtype=torch.float32)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        return returns

    def update(self, log_probs, rewards):
        """策略更新"""
        returns = self.compute_returns(rewards)

        # 计算策略损失
        policy_loss = []
        for log_prob, G in zip(log_probs, returns):  # zip按位置配对
            policy_loss.append(-log_prob * G)  # 负号因为梯度上升

        loss = torch.stack(policy_loss).sum()  # torch.stack沿新维度拼接张量

        # 优化
        self.optimizer.zero_grad()  # 清零梯度
        loss.backward()  # 反向传播计算梯度
        self.optimizer.step()  # 更新参数

        return loss.item()

    def train(self, env, num_episodes=1000):
        """训练"""
        rewards_history = []

        for episode in range(num_episodes):
            state, _ = env.reset()
            log_probs = []
            rewards = []
            done = False

            # 收集轨迹
            while not done:
                state_tensor = torch.FloatTensor(state)
                action, log_prob = self.policy.select_action(state_tensor)

                next_state, reward, terminated, truncated, _ = env.step(action)
                done = terminated or truncated

                log_probs.append(log_prob)
                rewards.append(reward)
                state = next_state

            # 更新策略
            loss = self.update(log_probs, rewards)

            total_reward = sum(rewards)
            rewards_history.append(total_reward)

            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(rewards_history[-100:])
                print(f"Episode {episode + 1}, Avg Reward: {avg_reward:.2f}")

        return rewards_history

4. 基线(Baseline)

4.1 为什么需要基线

问题:回报的方差很大,导致梯度估计不稳定

解决方案:减去基线

\[\nabla_\theta J(\theta) = \mathbb{E}[\nabla_\theta \log \pi_\theta(a|s) \cdot (Q(s,a) - b(s))]\]

最优基线:状态值函数\(V(s)\)

4.2 带基线的REINFORCE

Python
class ValueNetwork(nn.Module):
    """值函数网络"""

    def __init__(self, state_dim, hidden_dim=128):
        super(ValueNetwork, self).__init__()

        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, state):
        return self.net(state).squeeze(-1)  # squeeze压缩维度

class REINFORCEWithBaseline:
    """带基线的REINFORCE"""

    def __init__(self, state_dim, action_dim, lr_policy=1e-3,
                 lr_value=1e-3, gamma=0.99):
        self.gamma = gamma

        # 策略网络
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=lr_policy)

        # 值网络(基线)
        self.value = ValueNetwork(state_dim)
        self.value_optimizer = optim.Adam(self.value.parameters(), lr=lr_value)

    def update(self, states, log_probs, rewards):
        """更新策略和值函数"""
        states = torch.stack([torch.FloatTensor(s) for s in states])

        # 计算回报
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + self.gamma * G
            returns.insert(0, G)
        returns = torch.tensor(returns, dtype=torch.float32)

        # 更新值函数
        values = self.value(states)
        value_loss = nn.MSELoss()(values, returns)

        self.value_optimizer.zero_grad()
        value_loss.backward()
        self.value_optimizer.step()

        # 更新策略(使用优势函数)
        advantages = returns - values.detach()  # 分离计算图,不参与梯度计算

        policy_loss = []
        for log_prob, advantage in zip(log_probs, advantages):
            policy_loss.append(-log_prob * advantage)

        policy_loss = torch.stack(policy_loss).sum()

        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()

        return policy_loss.item(), value_loss.item()

5. Actor-Critic方法

5.1 核心思想

Actor:策略网络,选择动作 Critic:值函数网络,评估动作

优势: - Critic提供低方差的优势估计 - 可以在线更新(不需要完整episode)

5.2 A2C实现

Python
class ActorCritic(nn.Module):
    """Actor-Critic网络"""

    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(ActorCritic, self).__init__()

        # 共享层
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
        )

        # Actor头
        self.actor = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        )

        # Critic头
        self.critic = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, state):
        features = self.shared(state)
        probs = self.actor(features)
        value = self.critic(features)
        return probs, value

    def select_action(self, state):
        """选择动作"""
        probs, value = self.forward(state)
        dist = Categorical(probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action.item(), log_prob, value

class A2CAgent:
    """A2C智能体"""

    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
                 value_coef=0.5, entropy_coef=0.01):
        self.gamma = gamma
        self.value_coef = value_coef
        self.entropy_coef = entropy_coef

        self.model = ActorCritic(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)

    def compute_returns(self, rewards, values, done):
        """计算回报"""
        returns = []
        G = 0 if done else values[-1].item()  # [-1]负索引取最后元素

        for r, v in zip(reversed(rewards), reversed(values[:-1])):
            G = r + self.gamma * G
            returns.insert(0, G)

        return torch.tensor(returns, dtype=torch.float32)

    def update(self, states, actions, log_probs, rewards, values, done):
        """更新Actor和Critic"""
        states = torch.stack(states)
        actions = torch.tensor(actions)
        old_log_probs = torch.stack(log_probs)
        values = torch.stack(values).squeeze(-1)

        # 计算回报
        returns = self.compute_returns(rewards, values, done)

        # 重新计算(用于获取新的log_prob和entropy)
        probs, new_values = self.model(states)
        dist = Categorical(probs)
        new_log_probs = dist.log_prob(actions)
        entropy = dist.entropy()

        # 优势函数
        advantages = returns - new_values.squeeze(-1).detach()

        # Actor损失(策略梯度)
        actor_loss = -(new_log_probs * advantages).mean()

        # Critic损失(值函数)
        critic_loss = nn.MSELoss()(new_values.squeeze(-1), returns)

        # 熵奖励(鼓励探索)
        entropy_loss = -entropy.mean()

        # 总损失
        loss = actor_loss + self.value_coef * critic_loss + self.entropy_coef * entropy_loss

        # 优化
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5)
        self.optimizer.step()

        return loss.item()

6. 本章总结

核心概念

Text Only
策略梯度:
├── 直接优化策略参数
├── 策略梯度定理: ∇J = E[∇log π · Q]
└── REINFORCE: 蒙特卡洛估计

减少方差:
├── 基线: 减去状态值函数
├── 优势函数: A(s,a) = Q(s,a) - V(s)
└── Actor-Critic: 在线更新

Actor-Critic:
├── Actor: 策略网络(选择动作)
├── Critic: 值网络(评估动作)
└── A2C: 同步更新

✅ 自测问题

  1. 策略梯度与值函数方法相比有什么优势和劣势?

  2. 为什么REINFORCE的方差很大?基线如何减少方差?

  3. Actor-Critic中Critic的作用是什么?

  4. 设计一个实验比较REINFORCE和A2C的性能。


📚 延伸阅读

  1. Sutton et al. (2000)
  2. "Policy Gradient Methods for Reinforcement Learning with Function Approximation"

  3. Williams (1992)

  4. "Simple Statistical Gradient-Following Algorithms for Connectionist Reinforcement Learning"
  5. REINFORCE算法的原始论文

准备好学习Actor-Critic的高级方法了吗?

→ 下一步:05-Actor-Critic高级方法.md