10 - 强化学习基础¶
🎮 什么是强化学习?¶
核心概念¶
定义: 智能体 (Agent) 通过与环境 (Environment) 交互,学习最优策略
关键要素¶
智能体 (Agent):
- 做决策的主体
- 例如: 机器人、玩家、自动驾驶汽车
环境 (Environment):
- 智能体所处的外部世界
- 例如: 游戏世界、真实道路
状态 (State, s):
- 环境的当前情况
- 例如: 棋盘局面、摄像头画面
动作 (Action, a):
- 智能体可以执行的操作
- 例如: 移动、攻击、刹车
奖励 (Reward, r):
- 环境给智能体的反馈
- 例如: +1 (赢), -1 (输), 0 (平)
策略 (Policy, π):
- 智能体的决策规则
- π(a|s): 在状态s下采取动作a的概率
值函数 (Value function, V):
- 预期累积奖励
- Vπ(s): 从状态s开始,按策略π的期望回报
模型 (Model):
- 环境的动力学模型
- P(s'|s,a): 状态转移概率
- R(s,a): 即时奖励
马尔可夫决策过程 (MDP)¶
定义: 五元组 (S, A, P, R, γ)
- S: 状态空间
- A: 动作空间
- P: 状态转移概率
- R: 奖励函数
- γ: 折扣因子 (0≤γ≤1)
马尔可夫性质: 未来只取决于当前状态,与历史无关
目标: 最大化累积回报¶
折扣因子γ的作用: - γ接近1: 关注长期奖励 - γ接近0: 关注即时奖励 - γ=0: 贪婪 (只看眼前)
🎯 基于价值的方法¶
Q函数 (Action-Value Function)¶
定义: Q(s,a) = 在状态s执行动作a,然后按最优策略的期望回报
贝尔曼最优方程:
Q-Learning¶
核心思想: 通过时序差分 (TD) 学习Q值
更新规则:
- α: 学习率
- R + γ max Q(s',a'): TD目标
- Q(s,a): 当前估计
算法流程:
初始化 Q(s,a) 为任意值
重复每个episode:
初始化状态 s
重复每步:
根据 Q 选择动作 a (ε-贪婪策略)
执行 a, 观察 r, s'
Q(s,a) ← Q(s,a) + α[r + γ max Q(s',a') - Q(s,a)]
s ← s'
直到 s 是终止状态
代码实现:
import numpy as np
class QLearning:
def __init__(self, state_size, action_size, learning_rate=0.1, discount_factor=0.95, epsilon=1.0): # __init__构造方法,创建对象时自动调用
self.state_size = state_size
self.action_size = action_size
self.lr = learning_rate
self.gamma = discount_factor
self.epsilon = epsilon # 探索率
self.epsilon_decay = 0.995
self.epsilon_min = 0.01
self.q_table = np.zeros((state_size, action_size))
def act(self, state):
# ε-贪婪策略
if np.random.rand() <= self.epsilon:
return np.random.choice(self.action_size) # 探索
return np.argmax(self.q_table[state]) # 利用
def learn(self, state, action, reward, next_state, done):
# Q-Learning更新
target = reward
if not done:
target = reward + self.gamma * np.amax(self.q_table[next_state])
self.q_table[state, action] += self.lr * (target - self.q_table[state, action])
# 衰减探索率
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def train(self, env, num_episodes=1000):
for episode in range(num_episodes):
state, _ = env.reset() # gymnasium API: 返回 (state, info)
done = False
total_reward = 0
while not done:
action = self.act(state)
next_state, reward, terminated, truncated, _ = env.step(action) # gymnasium 5值返回
done = terminated or truncated
self.learn(state, action, reward, next_state, done)
state = next_state
total_reward += reward
if episode % 100 == 0:
print(f"Episode {episode}, Total Reward: {total_reward}, Epsilon: {self.epsilon:.2f}")
Deep Q-Network (DQN)¶
问题: Q-Learning用表格,状态空间太大怎么办?
解决: 用神经网络近似Q函数
架构:
关键创新:
1. 经验回放 (Experience Replay)¶
问题: 样本相关性强,训练不稳定
解决: 存储经验,随机采样训练
from collections import deque
class ReplayBuffer:
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
indices = np.random.choice(len(self.buffer), batch_size, replace=False)
batch = [self.buffer[i] for i in indices] # 列表推导式,简洁创建列表
states, actions, rewards, next_states, dones = zip(*batch) # zip按位置配对多个可迭代对象
return (
np.array(states), # np.array创建NumPy数组
np.array(actions),
np.array(rewards),
np.array(next_states),
np.array(dones)
)
def __len__(self): # __len__定义len()的行为
return len(self.buffer)
2. 目标网络 (Target Network)¶
问题: Q(s',a')不断变化,训练不稳定
解决: 使用固定的目标网络,定期更新
import torch
import torch.nn as nn
import torch.optim as optim
class DQN(nn.Module): # 继承nn.Module定义神经网络层
def __init__(self, state_size, action_size, hidden_size=64):
super().__init__() # super()调用父类方法
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, action_size)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x)
class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = ReplayBuffer(capacity=100000)
self.gamma = 0.95 # 折扣因子
self.epsilon = 1.0 # 探索率
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.batch_size = 64
# 主网络
self.model = DQN(state_size, action_size)
self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
# 目标网络
self.target_model = DQN(state_size, action_size)
self.update_target_model()
def update_target_model(self):
"""将主网络参数复制到目标网络"""
self.target_model.load_state_dict(self.model.state_dict())
def act(self, state):
if np.random.rand() <= self.epsilon:
return np.random.choice(self.action_size)
state = torch.FloatTensor(state).unsqueeze(0) # 链式调用,连续执行多个方法 # unsqueeze增加一个维度
q_values = self.model(state)
return q_values.argmax().item() # .item()将单元素张量转为Python数值
def remember(self, state, action, reward, next_state, done):
self.memory.push(state, action, reward, next_state, done)
def replay(self):
if len(self.memory) < self.batch_size:
return
# 从经验池采样
states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones)
# 当前Q值
current_q_values = self.model(states).gather(1, actions.unsqueeze(1))
# 目标Q值
with torch.no_grad():
next_q_values = self.target_model(next_states).max(1)[0]
target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
# 计算损失
loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
# 优化
self.optimizer.zero_grad()
loss.backward() # 反向传播计算梯度
self.optimizer.step() # 根据梯度更新模型参数
# 衰减探索率
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
训练循环:
def train_dqn(env, agent, num_episodes=1000):
for episode in range(num_episodes):
state, _ = env.reset() # gymnasium API: 返回 (state, info)
total_reward = 0
done = False
while not done:
# 选择动作
action = agent.act(state)
# 执行动作
next_state, reward, terminated, truncated, _ = env.step(action) # gymnasium 5值返回
done = terminated or truncated
# 存储经验
agent.remember(state, action, reward, next_state, done)
# 训练
agent.replay()
state = next_state
total_reward += reward
# 定期更新目标网络
if episode % 10 == 0:
agent.update_target_model()
if episode % 100 == 0:
print(f"Episode {episode}, Total Reward: {total_reward}, Epsilon: {agent.epsilon:.2f}")
🎭 基于策略的方法¶
Policy Gradient¶
核心思想: 直接参数化策略 π(a|s; θ),通过梯度上升优化
目标: 最大化期望回报
梯度 (REINFORCE算法):
直观理解:
代码实现:
import torch
import torch.nn as nn
import torch.optim as optim
class PolicyGradient(nn.Module):
def __init__(self, state_size, action_size, hidden_size=128):
super().__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, action_size)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.softmax(self.fc2(x), dim=-1)
return x
def act(self, state):
state = torch.FloatTensor(state)
probs = self.forward(state)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action)
class PGAgent:
def __init__(self, state_size, action_size, gamma=0.99):
self.policy = PolicyGradient(state_size, action_size)
self.optimizer = optim.Adam(self.policy.parameters(), lr=0.01)
self.gamma = gamma
self.episode_rewards = []
self.episode_log_probs = []
def remember(self, reward, log_prob):
self.episode_rewards.append(reward)
self.episode_log_probs.append(log_prob)
def discount_rewards(self):
"""计算折扣回报"""
discounted = []
running_add = 0
for r in reversed(self.episode_rewards):
running_add = r + self.gamma * running_add
discounted.insert(0, running_add)
return torch.FloatTensor(discounted)
def learn(self):
# 计算折扣回报
returns = self.discount_rewards()
returns = (returns - returns.mean()) / (returns.std() + 1e-8) # 归一化
# 计算损失
log_probs = torch.stack(self.episode_log_probs)
loss = -torch.sum(log_probs * returns)
# 优化
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 清空
self.episode_rewards = []
self.episode_log_probs = []
训练循环:
def train_policy_gradient(env, agent, num_episodes=1000):
for episode in range(num_episodes):
state, _ = env.reset() # gymnasium API
total_reward = 0
done = False
while not done:
action, log_prob = agent.policy.act(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
agent.remember(reward, log_prob)
state = next_state
total_reward += reward
# Episode结束后更新策略
agent.learn()
if episode % 100 == 0:
print(f"Episode {episode}, Total Reward: {total_reward}")
Actor-Critic¶
问题: Policy Gradient方差高,样本效率低
解决: 结合值函数,减少方差
Actor (策略网络): π(a|s; θ)
Critic (值网络): V(s; w)
优势函数 (Advantage):
A(s,a) = Q(s,a) - V(s)
= r + γV(s') - V(s)
Actor梯度:
∇J(θ) = E[∇log π(a|s) · A(s,a)]
Critic学习:
更新 w 使 V(s) 接近真实回报
代码实现:
class Actor(nn.Module):
def __init__(self, state_size, action_size, hidden_size=128):
super().__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, action_size)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.softmax(self.fc2(x), dim=-1)
return x
class Critic(nn.Module):
def __init__(self, state_size, hidden_size=128):
super().__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, 1)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
class A2CAgent:
def __init__(self, state_size, action_size, gamma=0.99):
self.actor = Actor(state_size, action_size)
self.critic = Critic(state_size)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=0.001)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=0.01)
self.gamma = gamma
def act(self, state):
state = torch.FloatTensor(state)
probs = self.actor(state)
action = torch.multinomial(probs, 1).item()
return action
def learn(self, states, actions, rewards, next_states, dones):
# 转换为tensor
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones)
# Critic: 计算TD目标
values = self.critic(states).squeeze()
next_values = self.critic(next_states).squeeze()
td_targets = rewards + (1 - dones) * self.gamma * next_values.detach() # detach()从计算图分离,不参与梯度计算
advantages = td_targets - values
# Actor损失
probs = self.actor(states)
log_probs = torch.log(probs.gather(1, actions.unsqueeze(1))).squeeze()
actor_loss = -torch.mean(log_probs * advantages.detach())
# Critic损失
critic_loss = nn.MSELoss()(values, td_targets)
# 更新
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
🎲 探索与利用¶
ε-Greedy策略¶
熵正则化¶
目标: 除了奖励,也鼓励策略的随机性
其中 H(π) 是策略熵: $\(H(π) = -\sum π(a|s) \log π(a|s)\)$
乐观初始化¶
将Q值初始化为较大值 - 鼓励访问未探索的状态-动作对 - 适用于乐观面对不确定性
📊 强化学习 vs 监督学习¶
| 维度 | 监督学习 | 强化学习 |
|---|---|---|
| 数据 | 有标签 | 只有奖励 |
| 反馈 | 即时正确答案 | 延迟奖励 |
| 目标 | 最小化预测误差 | 最大化累积奖励 |
| 数据分布 | 固定 | 非平稳 (策略变化) |
| 独立性 | 样本独立 | 样本相关 (序列) |
| 应用 | 分类、回归 | 游戏、机器人 |
🎯 应用场景¶
游戏¶
机器人¶
推荐系统¶
金融¶
自动驾驶¶
💡 学习建议¶
从简单开始¶
第1步: Grid World (格子世界)
- 简单状态空间
- 离散动作
- 理解核心概念
第2步: CartPole (倒立摆)
- 连续状态
- OpenAI Gym环境
- DQN入门
第3步: Atari游戏
- 图像输入
- CNN + DQN
- 深度强化学习
第4步: 连续控制 (如MuJoCo)
- 连续动作空间
- 策略梯度
- Actor-Critic
实践建议¶
# Gymnasium使用(gym已更名为gymnasium)
import gymnasium as gym
# 创建环境
env = gym.make('CartPole-v1')
# 重置
state, info = env.reset()
# 交互
for _ in range(1000):
action = env.action_space.sample() # 随机动作
state, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
state, info = env.reset()
env.close()
常见陷阱¶
❌ 忽略探索
→ 策略过早收敛到次优解
✅ 使用ε-贪婪或熵正则化
❌ 高方差不稳定
→ 训练困难,效果差
✅ 使用经验回放、目标网络、Actor-Critic
❌ 样本效率低
→ 需要大量交互
✅ 使用模拟器、模仿学习预训练
延伸阅读¶
经典算法: - DQN, Double DQN, Dueling DQN - A3C, A2C - PPO (近端策略优化) - SAC (软Actor-评论家)
前沿方向: - 离线强化学习 (Offline RL) - 多智能体强化学习 (MARL) - 元强化学习 (Meta-RL) - 模型基于规划 (Model-based RL)
下一步: 开始实践! 使用OpenAI Gym环境训练你的第一个智能体!