04 - 策略梯度方法¶
学习时间: 4-5小时 重要性: ⭐⭐⭐⭐⭐ 直接优化策略的方法 前置知识: 梯度下降、神经网络基础
🎯 学习目标¶
完成本章后,你将能够: - 理解策略梯度的核心思想 - 掌握REINFORCE算法 - 理解基线(Baseline)的作用 - 实现Actor-Critic方法 - 应用策略梯度解决连续控制问题
1. 策略梯度简介¶
1.1 从值函数到策略函数¶
值函数方法的问题: - 需要维护Q表或Q网络 - 需要max操作选择动作 - 难以处理连续动作空间
策略梯度的优势: - 直接参数化策略 - 自然处理连续动作 - 可以学习随机策略
1.2 策略表示¶
随机策略:
常见形式: - 离散动作:Softmax策略 $\(\pi_\theta(a|s) = \frac{e^{h(s,a;\theta)}}{\sum_{a'} e^{h(s,a';\theta)}}\)$
- 连续动作:高斯策略 $\(\pi_\theta(a|s) = \mathcal{N}(\mu(s;\theta), \sigma^2)\)$
2. 策略梯度定理¶
2.1 目标函数¶
期望累积奖励:
其中\(\tau = (s_0, a_0, s_1, a_1, ...)\)是轨迹,\(R(\tau)\)是累积奖励。
2.2 策略梯度定理¶
定理:
直观理解: - \(\nabla_\theta \log \pi_\theta(a|s)\):增加选择动作a的概率的方向 - \(Q^{\pi_\theta}(s,a)\):动作a的好坏程度 - 乘积:好的动作增加概率,坏的动作减少概率
2.3 策略梯度定理完整推导¶
📌 这是策略优化理论的基石,理解每一步的推导至关重要。
目标函数(episodic情形,以起始状态分布 \(d_0\) 为例):
其中 \(d^{\pi_\theta}(s) = \sum_{t=0}^{\infty} \gamma^t P(S_t=s|\pi_\theta)\) 是折扣状态访问频率。
推导过程:
步骤1:展开目标函数的梯度
注意 \(d^{\pi_\theta}(s)\) 和 \(Q^{\pi_\theta}(s,a)\) 都依赖于 \(\theta\)。直接对三者同时求导非常复杂。策略梯度定理的精妙之处在于:梯度可以不必对 \(d^{\pi_\theta}\) 求导。
步骤2:从单个状态出发
定义从状态 \(s\) 出发的值函数:
对 \(\theta\) 求梯度:
步骤3:展开 \(\nabla_\theta Q^{\pi_\theta}(s,a)\)
利用贝尔曼方程 \(Q^{\pi_\theta}(s,a) = R(s,a) + \gamma \sum_{s'} P(s'|s,a) V^{\pi_\theta}(s')\):
(注意 \(R(s,a)\) 和 \(P(s'|s,a)\) 不依赖 \(\theta\),所以它们的梯度为0。)
步骤4:代入并递归展开
将步骤3代入步骤2:
其中 \(P^{\pi_\theta}(s'|s) = \sum_a \pi_\theta(a|s) P(s'|s,a)\) 是策略下的状态转移概率。
对 \(\nabla_\theta V^{\pi_\theta}(s')\) 继续递归展开(展开 \(k\) 步):
步骤5:得到最终结果
对初始状态分布 \(s_0 \sim d_0\) 求期望:
步骤6:引入对数梯度技巧(log-derivative trick)
利用恒等式 \(\nabla_\theta \pi_\theta(a|s) = \pi_\theta(a|s) \nabla_\theta \log \pi_\theta(a|s)\):
证毕。 \(\blacksquare\)
关键洞察:推导中最重要的结论是——梯度表达式中不包含对状态分布 \(d^{\pi_\theta}(s)\) 的求导。这使得我们可以通过采样来近似梯度,而无需知道状态分布如何依赖于策略参数。
2.4 蒙特卡洛估计¶
实际计算:
3. REINFORCE算法¶
3.1 算法流程¶
初始化策略参数θ
对每个episode:
根据π_θ生成轨迹τ = (s_0, a_0, r_1, s_1, a_1, ..., s_T)
对每个时间步t:
计算回报G_t = Σ γ^k r_{t+k+1}
策略梯度: ∇J = Σ_t ∇log π_θ(a_t|s_t) · G_t
更新: θ ← θ + α · ∇J
3.2 代码实现¶
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.distributions import Categorical
class PolicyNetwork(nn.Module): # 继承nn.Module定义网络层
"""策略网络"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(PolicyNetwork, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
def forward(self, state):
"""返回动作概率分布"""
return self.net(state)
def select_action(self, state):
"""采样动作"""
probs = self.forward(state)
# 数值稳定性: 加小 epsilon 防止 log(0) 产生 -inf
probs = probs + 1e-8
dist = Categorical(probs)
action = dist.sample()
log_prob = dist.log_prob(action)
return action.item(), log_prob # 将单元素张量转为Python数值
class REINFORCE:
"""REINFORCE算法"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
self.gamma = gamma
# 策略网络
self.policy = PolicyNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
def compute_returns(self, rewards):
"""计算折扣回报"""
returns = []
G = 0
for r in reversed(rewards):
G = r + self.gamma * G
returns.insert(0, G)
# 归一化回报(减少方差)
returns = torch.tensor(returns, dtype=torch.float32)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
return returns
def update(self, log_probs, rewards):
"""策略更新"""
returns = self.compute_returns(rewards)
# 计算策略损失
policy_loss = []
for log_prob, G in zip(log_probs, returns): # zip按位置配对
policy_loss.append(-log_prob * G) # 负号因为梯度上升
loss = torch.stack(policy_loss).sum() # torch.stack沿新维度拼接张量
# 优化
self.optimizer.zero_grad() # 清零梯度
loss.backward() # 反向传播计算梯度
self.optimizer.step() # 更新参数
return loss.item()
def train(self, env, num_episodes=1000):
"""训练"""
rewards_history = []
for episode in range(num_episodes):
state, _ = env.reset()
log_probs = []
rewards = []
done = False
# 收集轨迹
while not done:
state_tensor = torch.FloatTensor(state)
action, log_prob = self.policy.select_action(state_tensor)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
log_probs.append(log_prob)
rewards.append(reward)
state = next_state
# 更新策略
loss = self.update(log_probs, rewards)
total_reward = sum(rewards)
rewards_history.append(total_reward)
if (episode + 1) % 100 == 0:
avg_reward = np.mean(rewards_history[-100:])
print(f"Episode {episode + 1}, Avg Reward: {avg_reward:.2f}")
return rewards_history
4. 基线(Baseline)¶
4.1 为什么需要基线¶
问题:回报的方差很大,导致梯度估计不稳定
解决方案:减去基线
最优基线:状态值函数\(V(s)\)
4.2 带基线的REINFORCE¶
class ValueNetwork(nn.Module):
"""值函数网络"""
def __init__(self, state_dim, hidden_dim=128):
super(ValueNetwork, self).__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, state):
return self.net(state).squeeze(-1) # squeeze压缩维度
class REINFORCEWithBaseline:
"""带基线的REINFORCE"""
def __init__(self, state_dim, action_dim, lr_policy=1e-3,
lr_value=1e-3, gamma=0.99):
self.gamma = gamma
# 策略网络
self.policy = PolicyNetwork(state_dim, action_dim)
self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=lr_policy)
# 值网络(基线)
self.value = ValueNetwork(state_dim)
self.value_optimizer = optim.Adam(self.value.parameters(), lr=lr_value)
def update(self, states, log_probs, rewards):
"""更新策略和值函数"""
states = torch.stack([torch.FloatTensor(s) for s in states])
# 计算回报
returns = []
G = 0
for r in reversed(rewards):
G = r + self.gamma * G
returns.insert(0, G)
returns = torch.tensor(returns, dtype=torch.float32)
# 更新值函数
values = self.value(states)
value_loss = nn.MSELoss()(values, returns)
self.value_optimizer.zero_grad()
value_loss.backward()
self.value_optimizer.step()
# 更新策略(使用优势函数)
advantages = returns - values.detach() # 分离计算图,不参与梯度计算
policy_loss = []
for log_prob, advantage in zip(log_probs, advantages):
policy_loss.append(-log_prob * advantage)
policy_loss = torch.stack(policy_loss).sum()
self.policy_optimizer.zero_grad()
policy_loss.backward()
self.policy_optimizer.step()
return policy_loss.item(), value_loss.item()
5. Actor-Critic方法¶
5.1 核心思想¶
Actor:策略网络,选择动作 Critic:值函数网络,评估动作
优势: - Critic提供低方差的优势估计 - 可以在线更新(不需要完整episode)
5.2 A2C实现¶
class ActorCritic(nn.Module):
"""Actor-Critic网络"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(ActorCritic, self).__init__()
# 共享层
self.shared = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
)
# Actor头
self.actor = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
# Critic头
self.critic = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, state):
features = self.shared(state)
probs = self.actor(features)
value = self.critic(features)
return probs, value
def select_action(self, state):
"""选择动作"""
probs, value = self.forward(state)
dist = Categorical(probs)
action = dist.sample()
log_prob = dist.log_prob(action)
return action.item(), log_prob, value
class A2CAgent:
"""A2C智能体"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
value_coef=0.5, entropy_coef=0.01):
self.gamma = gamma
self.value_coef = value_coef
self.entropy_coef = entropy_coef
self.model = ActorCritic(state_dim, action_dim)
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
def compute_returns(self, rewards, values, done):
"""计算回报"""
returns = []
G = 0 if done else values[-1].item() # [-1]负索引取最后元素
for r, v in zip(reversed(rewards), reversed(values[:-1])):
G = r + self.gamma * G
returns.insert(0, G)
return torch.tensor(returns, dtype=torch.float32)
def update(self, states, actions, log_probs, rewards, values, done):
"""更新Actor和Critic"""
states = torch.stack(states)
actions = torch.tensor(actions)
old_log_probs = torch.stack(log_probs)
values = torch.stack(values).squeeze(-1)
# 计算回报
returns = self.compute_returns(rewards, values, done)
# 重新计算(用于获取新的log_prob和entropy)
probs, new_values = self.model(states)
dist = Categorical(probs)
new_log_probs = dist.log_prob(actions)
entropy = dist.entropy()
# 优势函数
advantages = returns - new_values.squeeze(-1).detach()
# Actor损失(策略梯度)
actor_loss = -(new_log_probs * advantages).mean()
# Critic损失(值函数)
critic_loss = nn.MSELoss()(new_values.squeeze(-1), returns)
# 熵奖励(鼓励探索)
entropy_loss = -entropy.mean()
# 总损失
loss = actor_loss + self.value_coef * critic_loss + self.entropy_coef * entropy_loss
# 优化
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5)
self.optimizer.step()
return loss.item()
6. 本章总结¶
核心概念¶
策略梯度:
├── 直接优化策略参数
├── 策略梯度定理: ∇J = E[∇log π · Q]
└── REINFORCE: 蒙特卡洛估计
减少方差:
├── 基线: 减去状态值函数
├── 优势函数: A(s,a) = Q(s,a) - V(s)
└── Actor-Critic: 在线更新
Actor-Critic:
├── Actor: 策略网络(选择动作)
├── Critic: 值网络(评估动作)
└── A2C: 同步更新
✅ 自测问题¶
-
策略梯度与值函数方法相比有什么优势和劣势?
-
为什么REINFORCE的方差很大?基线如何减少方差?
-
Actor-Critic中Critic的作用是什么?
-
设计一个实验比较REINFORCE和A2C的性能。
📚 延伸阅读¶
- Sutton et al. (2000)
-
"Policy Gradient Methods for Reinforcement Learning with Function Approximation"
-
Williams (1992)
- "Simple Statistical Gradient-Following Algorithms for Connectionist Reinforcement Learning"
- REINFORCE算法的原始论文
准备好学习Actor-Critic的高级方法了吗?
→ 下一步:05-Actor-Critic高级方法.md