附录B - 编程实践指南¶
说明:本附录提供强化学习编程的实用指南,包括Python/NumPy技巧、调试方法和代码规范。
1. Python/NumPy快速入门¶
1.1 必备库导入¶
Python
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, deque
import random
from typing import List, Tuple, Dict, Callable
import copy
1.2 NumPy核心操作¶
Python
# 数组创建
V = np.zeros(10) # 10个状态的值函数
Q = np.zeros((10, 4)) # 10个状态,4个动作的Q表
# 索引和切片
v_s = V[5] # 获取状态5的值
q_sa = Q[5, 2] # 获取状态5动作2的Q值
max_q = np.max(Q[5]) # 状态5的最大Q值
best_action = np.argmax(Q[5]) # 状态5的最优动作
# 条件操作
mask = Q > 0 # 布尔掩码
Q[mask] = 0 # 将所有正值设为0
# 广播操作
rewards = np.array([1, 2, 3, 4, 5]) # np.array创建NumPy数组
discounted = rewards * (0.9 ** np.arange(5)) # 折扣奖励
1.3 字典的高级用法¶
Python
from collections import defaultdict
# 默认值为0的字典
V = defaultdict(float) # defaultdict访问不存在的键时返回默认值
V[5] += 1 # 不需要检查5是否在字典中
# 默认值为列表的字典
episodes = defaultdict(list)
episodes[0].append(100) # 直接追加
# 嵌套字典
Q = defaultdict(lambda: defaultdict(float))
Q[0][1] = 10.0 # 状态0,动作1的Q值
2. 环境实现模板¶
2.1 标准环境接口¶
Python
class BaseEnvironment:
"""强化学习环境的标准接口"""
def __init__(self):
self.state = None
self.done = False
def reset(self):
"""
重置环境到初始状态
返回:
state: 初始状态
info: 额外信息字典
"""
raise NotImplementedError
def step(self, action):
"""
执行动作
参数:
action: 要执行的动作
返回:
next_state: 下一个状态
reward: 即时奖励
terminated: 是否终止
truncated: 是否截断
info: 额外信息
"""
raise NotImplementedError
def render(self):
"""可视化当前状态(可选)"""
pass
def get_state_space(self):
"""返回状态空间大小"""
raise NotImplementedError
def get_action_space(self):
"""返回动作空间大小"""
raise NotImplementedError
2.2 完整的Grid World实现¶
Python
import numpy as np
import random
class GridWorld:
"""
完整的网格世界环境
特性:
- 可配置网格大小
- 可添加障碍物
- 支持多种奖励设置
- 支持随机风(动作失败概率)
"""
def __init__(self, size=5, obstacles=None, wind=0.0):
"""
初始化Grid World
参数:
size: 网格大小(size x size)
obstacles: 障碍物位置列表 [(x1,y1), (x2,y2), ...]
wind: 随机风概率(动作失败的概率)
"""
self.size = size
self.start_pos = (0, 0)
self.goal_pos = (size-1, size-1)
self.obstacles = set(obstacles) if obstacles else set()
self.wind = wind
# 动作定义:0=上, 1=下, 2=左, 3=右
self.action_effects = {
0: (-1, 0), # 上
1: (1, 0), # 下
2: (0, -1), # 左
3: (0, 1) # 右
}
self.reset()
def reset(self):
"""重置环境"""
self.agent_pos = self.start_pos
self.steps = 0
self.total_reward = 0
return self._get_state(), {}
def _get_state(self):
"""将位置转换为状态编号"""
return self.agent_pos[0] * self.size + self.agent_pos[1]
def _pos_to_state(self, pos):
"""将位置转换为状态编号"""
return pos[0] * self.size + pos[1]
def _state_to_pos(self, state):
"""将状态编号转换为位置"""
return (state // self.size, state % self.size)
def step(self, action):
"""
执行动作
参数:
action: 0=上, 1=下, 2=左, 3=右
返回:
next_state: 下一个状态
reward: 奖励
terminated: 是否终止
truncated: 是否截断
info: 额外信息
"""
self.steps += 1
# 随机风:以一定概率执行随机动作
if random.random() < self.wind:
action = random.randint(0, 3)
# 计算新位置
dx, dy = self.action_effects[action]
new_x = np.clip(self.agent_pos[0] + dx, 0, self.size - 1)
new_y = np.clip(self.agent_pos[1] + dy, 0, self.size - 1)
new_pos = (new_x, new_y)
# 检查是否碰到障碍物
if new_pos in self.obstacles:
reward = -1.0
new_pos = self.start_pos # 回到起点
elif new_pos == self.goal_pos:
reward = 10.0
else:
reward = -0.01 # 每步小惩罚,鼓励快速到达终点
self.agent_pos = new_pos
self.total_reward += reward
# 检查是否到达终点或步数过多
done = (new_pos == self.goal_pos) or (self.steps >= 100)
info = {
'steps': self.steps,
'total_reward': self.total_reward,
'position': self.agent_pos
}
return self._get_state(), reward, done, False, info
def render(self):
"""可视化环境"""
grid = np.full((self.size, self.size), '.')
# 标记障碍物
for obs in self.obstacles:
grid[obs] = 'X'
# 标记起点、终点和智能体
grid[self.start_pos] = 'S'
grid[self.goal_pos] = 'G'
grid[self.agent_pos] = 'A'
print('\n'.join([' '.join(row) for row in grid]))
print(f"Steps: {self.steps}, Total Reward: {self.total_reward:.2f}")
print()
def get_state_space(self):
return self.size * self.size
def get_action_space(self):
return 4
def get_transition_prob(self, state, action, next_state):
"""
获取转移概率 P(next_state | state, action)
用于基于模型的方法
"""
pos = self._state_to_pos(state)
next_pos = self._state_to_pos(next_state)
# 计算预期位置
dx, dy = self.action_effects[action]
expected_pos = (
np.clip(pos[0] + dx, 0, self.size - 1),
np.clip(pos[1] + dy, 0, self.size - 1)
)
# 如果有风,需要考虑所有可能性
if self.wind > 0:
# 简化处理:返回预期转移概率
if next_pos == expected_pos:
return 1 - self.wind + self.wind / 4
else:
return self.wind / 4
else:
return 1.0 if next_pos == expected_pos else 0.0
# 使用示例
if __name__ == "__main__":
# 创建环境
env = GridWorld(size=5, obstacles=[(1, 1), (2, 2), (3, 3)], wind=0.1)
# 运行随机策略
state, _ = env.reset()
env.render()
done = False
while not done:
action = random.randint(0, 3)
next_state, reward, done, _, info = env.step(action)
env.render()
3. 算法实现模板¶
3.1 值迭代模板¶
Python
def value_iteration(env, gamma=0.9, theta=1e-8, max_iter=1000):
"""
值迭代算法
参数:
env: 环境(需要有get_state_space, get_action_space, get_transition_prob等方法)
gamma: 折扣因子
theta: 收敛阈值
max_iter: 最大迭代次数
返回:
V: 最优值函数
policy: 最优策略
"""
n_states = env.get_state_space()
n_actions = env.get_action_space()
# 初始化
V = np.zeros(n_states)
# 迭代
for i in range(max_iter):
delta = 0
for s in range(n_states):
v = V[s]
# 计算所有动作的Q值
q_values = []
for a in range(n_actions):
q = 0
for s_next in range(n_states):
prob = env.get_transition_prob(s, a, s_next)
if prob > 0:
reward = env.get_reward(s, a, s_next)
q += prob * (reward + gamma * V[s_next])
q_values.append(q)
# 更新V(s)
V[s] = max(q_values) if q_values else 0
delta = max(delta, abs(v - V[s]))
# 检查收敛
if delta < theta:
print(f"值迭代在{i+1}次迭代后收敛")
break
# 提取策略
policy = np.zeros(n_states, dtype=int)
for s in range(n_states):
q_values = []
for a in range(n_actions):
q = 0
for s_next in range(n_states):
prob = env.get_transition_prob(s, a, s_next)
if prob > 0:
reward = env.get_reward(s, a, s_next)
q += prob * (reward + gamma * V[s_next])
q_values.append(q)
policy[s] = np.argmax(q_values) if q_values else 0
return V, policy
3.2 Q-Learning模板¶
Python
class QLearningAgent:
"""Q-Learning智能体"""
def __init__(self, n_states, n_actions, alpha=0.1, gamma=0.9, epsilon=0.1):
self.n_states = n_states
self.n_actions = n_actions
self.alpha = alpha
self.gamma = gamma
self.epsilon = epsilon
# 初始化Q表
self.Q = np.zeros((n_states, n_actions))
def select_action(self, state):
"""ε-贪婪策略选择动作"""
if random.random() < self.epsilon:
return random.randint(0, self.n_actions - 1)
else:
return np.argmax(self.Q[state])
def update(self, state, action, reward, next_state, done):
"""更新Q值"""
if done:
target = reward
else:
target = reward + self.gamma * np.max(self.Q[next_state])
self.Q[state, action] += self.alpha * (target - self.Q[state, action])
def train(self, env, n_episodes=1000):
"""训练智能体"""
rewards_history = []
for episode in range(n_episodes):
state, _ = env.reset()
total_reward = 0
done = False
while not done:
action = self.select_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
self.update(state, action, reward, next_state, done)
total_reward += reward
state = next_state
rewards_history.append(total_reward)
# 衰减探索率
self.epsilon = max(0.01, self.epsilon * 0.995)
if (episode + 1) % 100 == 0:
avg_reward = np.mean(rewards_history[-100:])
print(f"Episode {episode + 1}, Avg Reward: {avg_reward:.2f}, Epsilon: {self.epsilon:.3f}")
return rewards_history
4. 调试技巧¶
4.1 打印调试¶
Python
def debug_episode(env, agent, verbose=True):
"""运行一个episode并打印详细信息"""
state, _ = env.reset()
total_reward = 0
done = False
step = 0
if verbose:
print("=" * 50)
print("开始新的Episode")
print("=" * 50)
while not done:
action = agent.select_action(state)
next_state, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
if verbose:
print(f"\nStep {step}:")
print(f" State: {state}")
print(f" Action: {action}")
print(f" Next State: {next_state}")
print(f" Reward: {reward}")
print(f" Done: {done}")
if hasattr(agent, 'Q'): # hasattr检查对象是否有某属性
print(f" Q-values: {agent.Q[state]}")
total_reward += reward
state = next_state
step += 1
if step > 1000: # 防止无限循环
print("Warning: Episode too long, breaking...")
break
if verbose:
print(f"\nEpisode finished after {step} steps")
print(f"Total reward: {total_reward}")
return total_reward, step
4.2 可视化训练过程¶
Python
import matplotlib.pyplot as plt
def plot_training_progress(rewards, window=100):
"""绘制训练进度"""
plt.figure(figsize=(12, 5))
# 原始奖励
plt.subplot(1, 2, 1)
plt.plot(rewards, alpha=0.3, label='Raw')
# 移动平均
if len(rewards) >= window:
moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid')
plt.plot(range(window-1, len(rewards)), moving_avg, label=f'MA({window})')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Training Progress')
plt.legend()
plt.grid(True, alpha=0.3)
# 奖励分布
plt.subplot(1, 2, 2)
plt.hist(rewards, bins=50, edgecolor='black', alpha=0.7)
plt.xlabel('Total Reward')
plt.ylabel('Frequency')
plt.title('Reward Distribution')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def visualize_value_function(V, env_size, title="Value Function"):
"""可视化值函数热图"""
V_matrix = V.reshape(env_size, env_size) # 重塑张量形状
plt.figure(figsize=(8, 6))
sns.heatmap(V_matrix, annot=True, fmt='.2f', cmap='viridis')
plt.title(title)
plt.show()
def visualize_policy(policy, env_size, title="Policy"):
"""可视化策略(箭头图)"""
action_arrows = {0: '↑', 1: '↓', 2: '←', 3: '→'}
policy_matrix = policy.reshape(env_size, env_size)
plt.figure(figsize=(8, 6))
for i in range(env_size):
for j in range(env_size):
action = policy_matrix[i, j]
plt.text(j + 0.5, i + 0.5, action_arrows[action],
ha='center', va='center', fontsize=20)
plt.xlim(0, env_size)
plt.ylim(env_size, 0)
plt.gca().set_aspect('equal')
plt.title(title)
plt.grid(True)
plt.show()
4.3 常见错误检查清单¶
Python
def sanity_check(env, agent):
"""对环境和智能体进行基本检查"""
print("Running sanity checks...")
# 检查环境
state, _ = env.reset()
assert isinstance(state, (int, np.integer)), f"State should be int, got {type(state)}" # isinstance检查类型 # assert断言
next_state, reward, terminated, truncated, info = env.step(0)
done = terminated or truncated
assert isinstance(next_state, (int, np.integer)), "Next state should be int"
assert isinstance(reward, (int, float)), "Reward should be numeric"
assert isinstance(terminated, bool), "Terminated should be bool"
# 检查智能体
if hasattr(agent, 'Q'):
assert agent.Q.shape == (env.get_state_space(), env.get_action_space()), \
"Q-table shape mismatch"
# 检查动作选择
action = agent.select_action(state)
assert 0 <= action < env.get_action_space(), f"Invalid action: {action}"
print("✓ All checks passed!")
5. 性能优化¶
5.1 向量化操作¶
Python
# 慢:循环计算
q_values = []
for a in range(n_actions):
q = sum([P[s, a, s_next] * (R[s, a] + gamma * V[s_next])
for s_next in range(n_states)])
q_values.append(q)
# 快:向量化计算
q_values = np.sum(P[s] * (R[s][:, None] + gamma * V[None, :]), axis=1)
5.2 使用Numba加速¶
Python
from numba import jit
@jit(nopython=True)
def fast_value_iteration(P, R, gamma, theta=1e-8, max_iter=1000):
"""使用Numba加速的值迭代"""
n_states, n_actions, _ = P.shape
V = np.zeros(n_states)
for _ in range(max_iter):
delta = 0.0
for s in range(n_states):
v = V[s]
max_q = -np.inf
for a in range(n_actions):
q = 0.0
for s_next in range(n_states):
if P[s, a, s_next] > 0:
q += P[s, a, s_next] * (R[s, a] + gamma * V[s_next])
if q > max_q:
max_q = q
V[s] = max_q
delta = max(delta, abs(v - V[s]))
if delta < theta:
break
return V
6. 代码规范¶
6.1 命名规范¶
Python
# 变量命名
n_states # 状态数量
n_actions # 动作数量
state # 当前状态
next_state # 下一个状态
action # 动作
reward # 奖励
done # 是否结束
gamma # 折扣因子
alpha # 学习率
epsilon # 探索率
V # 状态值函数
Q # 动作值函数
policy # 策略
pi # 策略(数学符号)
# 函数命名
def select_action(state): ...
def update_q_value(state, action, target): ...
def compute_return(rewards): ...
def evaluate_policy(policy): ...
6.2 文档字符串规范¶
Python
def train_agent(env, agent, n_episodes, verbose=True):
"""
训练强化学习智能体
参数:
env: 环境对象,需要实现reset()和step()方法
agent: 智能体对象,需要实现select_action()和update()方法
n_episodes: 训练的episode数量
verbose: 是否打印训练进度
返回:
rewards: 每个episode的总奖励列表
示例:
>>> env = GridWorld()
>>> agent = QLearningAgent(25, 4)
>>> rewards = train_agent(env, agent, 1000)
"""
pass
7. 实用工具函数¶
Python
class RLUtils:
"""强化学习实用工具类"""
@staticmethod # @staticmethod不需要实例即可调用
def epsilon_greedy(q_values, epsilon):
"""ε-贪婪策略"""
if random.random() < epsilon:
return random.randint(0, len(q_values) - 1)
else:
return np.argmax(q_values)
@staticmethod
def softmax(x, temperature=1.0):
"""Softmax策略"""
exp_x = np.exp((x - np.max(x)) / temperature)
probs = exp_x / np.sum(exp_x)
return np.random.choice(len(x), p=probs)
@staticmethod
def compute_discounted_return(rewards, gamma):
"""计算折扣回报"""
G = 0
returns = []
for r in reversed(rewards):
G = r + gamma * G
returns.insert(0, G)
return returns
@staticmethod
def moving_average(data, window):
"""计算移动平均"""
return np.convolve(data, np.ones(window)/window, mode='valid')
@staticmethod
def normalize_rewards(rewards):
"""标准化奖励"""
mean = np.mean(rewards)
std = np.std(rewards) + 1e-8
return (rewards - mean) / std
掌握这些编程技巧后,你将能够更高效地实现和调试强化学习算法!