跳转至

附录B - 编程实践指南

说明:本附录提供强化学习编程的实用指南,包括Python/NumPy技巧、调试方法和代码规范。


1. Python/NumPy快速入门

1.1 必备库导入

Python
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, deque
import random
from typing import List, Tuple, Dict, Callable
import copy

1.2 NumPy核心操作

Python
# 数组创建
V = np.zeros(10)           # 10个状态的值函数
Q = np.zeros((10, 4))      # 10个状态,4个动作的Q表

# 索引和切片
v_s = V[5]                 # 获取状态5的值
q_sa = Q[5, 2]            # 获取状态5动作2的Q值
max_q = np.max(Q[5])      # 状态5的最大Q值
best_action = np.argmax(Q[5])  # 状态5的最优动作

# 条件操作
mask = Q > 0              # 布尔掩码
Q[mask] = 0               # 将所有正值设为0

# 广播操作
rewards = np.array([1, 2, 3, 4, 5])  # np.array创建NumPy数组
discounted = rewards * (0.9 ** np.arange(5))  # 折扣奖励

1.3 字典的高级用法

Python
from collections import defaultdict

# 默认值为0的字典
V = defaultdict(float)  # defaultdict访问不存在的键时返回默认值
V[5] += 1  # 不需要检查5是否在字典中

# 默认值为列表的字典
episodes = defaultdict(list)
episodes[0].append(100)  # 直接追加

# 嵌套字典
Q = defaultdict(lambda: defaultdict(float))
Q[0][1] = 10.0  # 状态0,动作1的Q值

2. 环境实现模板

2.1 标准环境接口

Python
class BaseEnvironment:
    """强化学习环境的标准接口"""

    def __init__(self):
        self.state = None
        self.done = False

    def reset(self):
        """
        重置环境到初始状态

        返回:
            state: 初始状态
            info: 额外信息字典
        """
        raise NotImplementedError

    def step(self, action):
        """
        执行动作

        参数:
            action: 要执行的动作

        返回:
            next_state: 下一个状态
            reward: 即时奖励
            terminated: 是否终止
            truncated: 是否截断
            info: 额外信息
        """
        raise NotImplementedError

    def render(self):
        """可视化当前状态(可选)"""
        pass

    def get_state_space(self):
        """返回状态空间大小"""
        raise NotImplementedError

    def get_action_space(self):
        """返回动作空间大小"""
        raise NotImplementedError

2.2 完整的Grid World实现

Python
import numpy as np
import random

class GridWorld:
    """
    完整的网格世界环境

    特性:
    - 可配置网格大小
    - 可添加障碍物
    - 支持多种奖励设置
    - 支持随机风(动作失败概率)
    """

    def __init__(self, size=5, obstacles=None, wind=0.0):
        """
        初始化Grid World

        参数:
            size: 网格大小(size x size)
            obstacles: 障碍物位置列表 [(x1,y1), (x2,y2), ...]
            wind: 随机风概率(动作失败的概率)
        """
        self.size = size
        self.start_pos = (0, 0)
        self.goal_pos = (size-1, size-1)
        self.obstacles = set(obstacles) if obstacles else set()
        self.wind = wind

        # 动作定义:0=上, 1=下, 2=左, 3=右
        self.action_effects = {
            0: (-1, 0),   # 上
            1: (1, 0),    # 下
            2: (0, -1),   # 左
            3: (0, 1)     # 右
        }

        self.reset()

    def reset(self):
        """重置环境"""
        self.agent_pos = self.start_pos
        self.steps = 0
        self.total_reward = 0
        return self._get_state(), {}

    def _get_state(self):
        """将位置转换为状态编号"""
        return self.agent_pos[0] * self.size + self.agent_pos[1]

    def _pos_to_state(self, pos):
        """将位置转换为状态编号"""
        return pos[0] * self.size + pos[1]

    def _state_to_pos(self, state):
        """将状态编号转换为位置"""
        return (state // self.size, state % self.size)

    def step(self, action):
        """
        执行动作

        参数:
            action: 0=上, 1=下, 2=左, 3=右

        返回:
            next_state: 下一个状态
            reward: 奖励
            terminated: 是否终止
            truncated: 是否截断
            info: 额外信息
        """
        self.steps += 1

        # 随机风:以一定概率执行随机动作
        if random.random() < self.wind:
            action = random.randint(0, 3)

        # 计算新位置
        dx, dy = self.action_effects[action]
        new_x = np.clip(self.agent_pos[0] + dx, 0, self.size - 1)
        new_y = np.clip(self.agent_pos[1] + dy, 0, self.size - 1)
        new_pos = (new_x, new_y)

        # 检查是否碰到障碍物
        if new_pos in self.obstacles:
            reward = -1.0
            new_pos = self.start_pos  # 回到起点
        elif new_pos == self.goal_pos:
            reward = 10.0
        else:
            reward = -0.01  # 每步小惩罚,鼓励快速到达终点

        self.agent_pos = new_pos
        self.total_reward += reward

        # 检查是否到达终点或步数过多
        done = (new_pos == self.goal_pos) or (self.steps >= 100)

        info = {
            'steps': self.steps,
            'total_reward': self.total_reward,
            'position': self.agent_pos
        }

        return self._get_state(), reward, done, False, info

    def render(self):
        """可视化环境"""
        grid = np.full((self.size, self.size), '.')

        # 标记障碍物
        for obs in self.obstacles:
            grid[obs] = 'X'

        # 标记起点、终点和智能体
        grid[self.start_pos] = 'S'
        grid[self.goal_pos] = 'G'
        grid[self.agent_pos] = 'A'

        print('\n'.join([' '.join(row) for row in grid]))
        print(f"Steps: {self.steps}, Total Reward: {self.total_reward:.2f}")
        print()

    def get_state_space(self):
        return self.size * self.size

    def get_action_space(self):
        return 4

    def get_transition_prob(self, state, action, next_state):
        """
        获取转移概率 P(next_state | state, action)
        用于基于模型的方法
        """
        pos = self._state_to_pos(state)
        next_pos = self._state_to_pos(next_state)

        # 计算预期位置
        dx, dy = self.action_effects[action]
        expected_pos = (
            np.clip(pos[0] + dx, 0, self.size - 1),
            np.clip(pos[1] + dy, 0, self.size - 1)
        )

        # 如果有风,需要考虑所有可能性
        if self.wind > 0:
            # 简化处理:返回预期转移概率
            if next_pos == expected_pos:
                return 1 - self.wind + self.wind / 4
            else:
                return self.wind / 4
        else:
            return 1.0 if next_pos == expected_pos else 0.0

# 使用示例
if __name__ == "__main__":
    # 创建环境
    env = GridWorld(size=5, obstacles=[(1, 1), (2, 2), (3, 3)], wind=0.1)

    # 运行随机策略
    state, _ = env.reset()
    env.render()

    done = False
    while not done:
        action = random.randint(0, 3)
        next_state, reward, done, _, info = env.step(action)
        env.render()

3. 算法实现模板

3.1 值迭代模板

Python
def value_iteration(env, gamma=0.9, theta=1e-8, max_iter=1000):
    """
    值迭代算法

    参数:
        env: 环境(需要有get_state_space, get_action_space, get_transition_prob等方法)
        gamma: 折扣因子
        theta: 收敛阈值
        max_iter: 最大迭代次数

    返回:
        V: 最优值函数
        policy: 最优策略
    """
    n_states = env.get_state_space()
    n_actions = env.get_action_space()

    # 初始化
    V = np.zeros(n_states)

    # 迭代
    for i in range(max_iter):
        delta = 0

        for s in range(n_states):
            v = V[s]

            # 计算所有动作的Q值
            q_values = []
            for a in range(n_actions):
                q = 0
                for s_next in range(n_states):
                    prob = env.get_transition_prob(s, a, s_next)
                    if prob > 0:
                        reward = env.get_reward(s, a, s_next)
                        q += prob * (reward + gamma * V[s_next])
                q_values.append(q)

            # 更新V(s)
            V[s] = max(q_values) if q_values else 0
            delta = max(delta, abs(v - V[s]))

        # 检查收敛
        if delta < theta:
            print(f"值迭代在{i+1}次迭代后收敛")
            break

    # 提取策略
    policy = np.zeros(n_states, dtype=int)
    for s in range(n_states):
        q_values = []
        for a in range(n_actions):
            q = 0
            for s_next in range(n_states):
                prob = env.get_transition_prob(s, a, s_next)
                if prob > 0:
                    reward = env.get_reward(s, a, s_next)
                    q += prob * (reward + gamma * V[s_next])
            q_values.append(q)
        policy[s] = np.argmax(q_values) if q_values else 0

    return V, policy

3.2 Q-Learning模板

Python
class QLearningAgent:
    """Q-Learning智能体"""

    def __init__(self, n_states, n_actions, alpha=0.1, gamma=0.9, epsilon=0.1):
        self.n_states = n_states
        self.n_actions = n_actions
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon

        # 初始化Q表
        self.Q = np.zeros((n_states, n_actions))

    def select_action(self, state):
        """ε-贪婪策略选择动作"""
        if random.random() < self.epsilon:
            return random.randint(0, self.n_actions - 1)
        else:
            return np.argmax(self.Q[state])

    def update(self, state, action, reward, next_state, done):
        """更新Q值"""
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.Q[next_state])

        self.Q[state, action] += self.alpha * (target - self.Q[state, action])

    def train(self, env, n_episodes=1000):
        """训练智能体"""
        rewards_history = []

        for episode in range(n_episodes):
            state, _ = env.reset()
            total_reward = 0
            done = False

            while not done:
                action = self.select_action(state)
                next_state, reward, terminated, truncated, _ = env.step(action)
                done = terminated or truncated

                self.update(state, action, reward, next_state, done)

                total_reward += reward
                state = next_state

            rewards_history.append(total_reward)

            # 衰减探索率
            self.epsilon = max(0.01, self.epsilon * 0.995)

            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(rewards_history[-100:])
                print(f"Episode {episode + 1}, Avg Reward: {avg_reward:.2f}, Epsilon: {self.epsilon:.3f}")

        return rewards_history

4. 调试技巧

4.1 打印调试

Python
def debug_episode(env, agent, verbose=True):
    """运行一个episode并打印详细信息"""
    state, _ = env.reset()
    total_reward = 0
    done = False
    step = 0

    if verbose:
        print("=" * 50)
        print("开始新的Episode")
        print("=" * 50)

    while not done:
        action = agent.select_action(state)
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

        if verbose:
            print(f"\nStep {step}:")
            print(f"  State: {state}")
            print(f"  Action: {action}")
            print(f"  Next State: {next_state}")
            print(f"  Reward: {reward}")
            print(f"  Done: {done}")
            if hasattr(agent, 'Q'):  # hasattr检查对象是否有某属性
                print(f"  Q-values: {agent.Q[state]}")

        total_reward += reward
        state = next_state
        step += 1

        if step > 1000:  # 防止无限循环
            print("Warning: Episode too long, breaking...")
            break

    if verbose:
        print(f"\nEpisode finished after {step} steps")
        print(f"Total reward: {total_reward}")

    return total_reward, step

4.2 可视化训练过程

Python
import matplotlib.pyplot as plt

def plot_training_progress(rewards, window=100):
    """绘制训练进度"""
    plt.figure(figsize=(12, 5))

    # 原始奖励
    plt.subplot(1, 2, 1)
    plt.plot(rewards, alpha=0.3, label='Raw')

    # 移动平均
    if len(rewards) >= window:
        moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid')
        plt.plot(range(window-1, len(rewards)), moving_avg, label=f'MA({window})')

    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('Training Progress')
    plt.legend()
    plt.grid(True, alpha=0.3)

    # 奖励分布
    plt.subplot(1, 2, 2)
    plt.hist(rewards, bins=50, edgecolor='black', alpha=0.7)
    plt.xlabel('Total Reward')
    plt.ylabel('Frequency')
    plt.title('Reward Distribution')
    plt.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

def visualize_value_function(V, env_size, title="Value Function"):
    """可视化值函数热图"""
    V_matrix = V.reshape(env_size, env_size)  # 重塑张量形状

    plt.figure(figsize=(8, 6))
    sns.heatmap(V_matrix, annot=True, fmt='.2f', cmap='viridis')
    plt.title(title)
    plt.show()

def visualize_policy(policy, env_size, title="Policy"):
    """可视化策略(箭头图)"""
    action_arrows = {0: '↑', 1: '↓', 2: '←', 3: '→'}

    policy_matrix = policy.reshape(env_size, env_size)

    plt.figure(figsize=(8, 6))
    for i in range(env_size):
        for j in range(env_size):
            action = policy_matrix[i, j]
            plt.text(j + 0.5, i + 0.5, action_arrows[action],
                    ha='center', va='center', fontsize=20)

    plt.xlim(0, env_size)
    plt.ylim(env_size, 0)
    plt.gca().set_aspect('equal')
    plt.title(title)
    plt.grid(True)
    plt.show()

4.3 常见错误检查清单

Python
def sanity_check(env, agent):
    """对环境和智能体进行基本检查"""
    print("Running sanity checks...")

    # 检查环境
    state, _ = env.reset()
    assert isinstance(state, (int, np.integer)), f"State should be int, got {type(state)}"  # isinstance检查类型  # assert断言

    next_state, reward, terminated, truncated, info = env.step(0)
    done = terminated or truncated
    assert isinstance(next_state, (int, np.integer)), "Next state should be int"
    assert isinstance(reward, (int, float)), "Reward should be numeric"
    assert isinstance(terminated, bool), "Terminated should be bool"

    # 检查智能体
    if hasattr(agent, 'Q'):
        assert agent.Q.shape == (env.get_state_space(), env.get_action_space()), \
            "Q-table shape mismatch"

    # 检查动作选择
    action = agent.select_action(state)
    assert 0 <= action < env.get_action_space(), f"Invalid action: {action}"

    print("✓ All checks passed!")

5. 性能优化

5.1 向量化操作

Python
# 慢:循环计算
q_values = []
for a in range(n_actions):
    q = sum([P[s, a, s_next] * (R[s, a] + gamma * V[s_next])
             for s_next in range(n_states)])
    q_values.append(q)

# 快:向量化计算
q_values = np.sum(P[s] * (R[s][:, None] + gamma * V[None, :]), axis=1)

5.2 使用Numba加速

Python
from numba import jit

@jit(nopython=True)
def fast_value_iteration(P, R, gamma, theta=1e-8, max_iter=1000):
    """使用Numba加速的值迭代"""
    n_states, n_actions, _ = P.shape
    V = np.zeros(n_states)

    for _ in range(max_iter):
        delta = 0.0
        for s in range(n_states):
            v = V[s]
            max_q = -np.inf
            for a in range(n_actions):
                q = 0.0
                for s_next in range(n_states):
                    if P[s, a, s_next] > 0:
                        q += P[s, a, s_next] * (R[s, a] + gamma * V[s_next])
                if q > max_q:
                    max_q = q
            V[s] = max_q
            delta = max(delta, abs(v - V[s]))

        if delta < theta:
            break

    return V

6. 代码规范

6.1 命名规范

Python
# 变量命名
n_states      # 状态数量
n_actions     # 动作数量
state         # 当前状态
next_state    # 下一个状态
action        # 动作
reward        # 奖励
done          # 是否结束
gamma         # 折扣因子
alpha         # 学习率
epsilon       # 探索率

V             # 状态值函数
Q             # 动作值函数
policy        # 策略
pi            # 策略(数学符号)

# 函数命名
def select_action(state): ...
def update_q_value(state, action, target): ...
def compute_return(rewards): ...
def evaluate_policy(policy): ...

6.2 文档字符串规范

Python
def train_agent(env, agent, n_episodes, verbose=True):
    """
    训练强化学习智能体

    参数:
        env: 环境对象,需要实现reset()和step()方法
        agent: 智能体对象,需要实现select_action()和update()方法
        n_episodes: 训练的episode数量
        verbose: 是否打印训练进度

    返回:
        rewards: 每个episode的总奖励列表

    示例:
        >>> env = GridWorld()
        >>> agent = QLearningAgent(25, 4)
        >>> rewards = train_agent(env, agent, 1000)
    """
    pass

7. 实用工具函数

Python
class RLUtils:
    """强化学习实用工具类"""

    @staticmethod  # @staticmethod不需要实例即可调用
    def epsilon_greedy(q_values, epsilon):
        """ε-贪婪策略"""
        if random.random() < epsilon:
            return random.randint(0, len(q_values) - 1)
        else:
            return np.argmax(q_values)

    @staticmethod
    def softmax(x, temperature=1.0):
        """Softmax策略"""
        exp_x = np.exp((x - np.max(x)) / temperature)
        probs = exp_x / np.sum(exp_x)
        return np.random.choice(len(x), p=probs)

    @staticmethod
    def compute_discounted_return(rewards, gamma):
        """计算折扣回报"""
        G = 0
        returns = []
        for r in reversed(rewards):
            G = r + gamma * G
            returns.insert(0, G)
        return returns

    @staticmethod
    def moving_average(data, window):
        """计算移动平均"""
        return np.convolve(data, np.ones(window)/window, mode='valid')

    @staticmethod
    def normalize_rewards(rewards):
        """标准化奖励"""
        mean = np.mean(rewards)
        std = np.std(rewards) + 1e-8
        return (rewards - mean) / std

掌握这些编程技巧后,你将能够更高效地实现和调试强化学习算法!