跳转至

01 - 值函数近似

学习时间: 3-4小时 重要性: ⭐⭐⭐⭐⭐ 从表格到函数的关键转变 前置知识: Q-Learning、梯度下降


🎯 学习目标

完成本章后,你将能够: - 理解为什么需要函数近似 - 掌握线性函数近似的方法 - 理解特征工程的重要性 - 实现基于梯度的值函数更新 - 分析函数近似的收敛性挑战


1. 为什么需要函数近似

1.1 表格方法的局限

状态空间爆炸: - 围棋:\(10^{170}\) 个状态 - 机器人控制:连续状态空间(无限) - Atari游戏:\(256^{84 \times 84}\) 像素状态

存储问题: - 表格需要 \(O(|S| \times |A|)\) 内存 - 大多数状态永远不会被访问 - 无法泛化到未见过状态

1.2 函数近似的优势

泛化能力: - 相似状态有相似的值 - 从未见过的状态也能估计 - 参数数量远小于状态数

连续状态空间: - 可以处理连续变量(位置、速度等) - 自然扩展到高维输入


2. 线性函数近似

2.1 基本形式

状态值函数

\[\hat{v}(s, \mathbf{w}) = \mathbf{w}^T \mathbf{x}(s) = \sum_{i=1}^{d} w_i x_i(s)\]

动作值函数

\[\hat{q}(s, a, \mathbf{w}) = \mathbf{w}^T \mathbf{x}(s, a)\]

其中: - \(\mathbf{w}\):权重向量(参数) - \(\mathbf{x}(s)\):状态s的特征向量 - \(d\):特征维度

2.2 特征工程

常见特征类型

Python
import numpy as np

# 1. 多项式特征
def polynomial_features(state, degree=2):
    """多项式特征"""
    x, y = state
    features = [1, x, y, x**2, x*y, y**2]
    return np.array(features)  # np.array创建NumPy数组

# 2. 径向基函数(RBF)
def rbf_features(state, centers, widths):
    """RBF特征"""
    features = []
    for c, w in zip(centers, widths):  # zip按位置配对
        dist = np.linalg.norm(np.array(state) - c)  # np.linalg线性代数运算
        features.append(np.exp(-dist**2 / (2 * w**2)))
    return np.array(features)

# 3. 瓷砖编码(Tile Coding)
class TileCoding:
    """瓷砖编码特征"""

    def __init__(self, ranges, num_tiles, num_tilings):
        self.ranges = ranges  # 各维度的范围
        self.num_tiles = num_tiles
        self.num_tilings = num_tilings
        self.d = num_tiles ** len(ranges) * num_tilings

    def get_features(self, state):
        """获取特征向量(稀疏)"""
        features = np.zeros(self.d)

        for tiling in range(self.num_tilings):
            # 计算偏移
            offset = tiling * 0.5 / self.num_tilings

            # 计算瓷砖索引
            indices = []
            for i, (s, (low, high)) in enumerate(zip(state, self.ranges)):  # enumerate同时获取索引和元素
                scaled = (s - low) / (high - low) + offset
                tile_idx = int(scaled * self.num_tiles)
                tile_idx = max(0, min(tile_idx, self.num_tiles - 1))
                indices.append(tile_idx)

            # 设置特征
            feature_idx = tiling * (self.num_tiles ** len(state))
            for i, idx in enumerate(indices):
                feature_idx += idx * (self.num_tiles ** i)

            features[feature_idx] = 1

        return features

2.3 梯度下降更新

目标:最小化均方误差

\[J(\mathbf{w}) = \mathbb{E}[(v_\pi(S) - \hat{v}(S, \mathbf{w}))^2]\]

梯度下降

\[\mathbf{w}_{t+1} = \mathbf{w}_t + \alpha [v_\pi(S_t) - \hat{v}(S_t, \mathbf{w}_t)] \nabla_\mathbf{w} \hat{v}(S_t, \mathbf{w}_t)\]

对于线性函数近似

\[\nabla_\mathbf{w} \hat{v}(s, \mathbf{w}) = \mathbf{x}(s)\]

因此:

\[\mathbf{w}_{t+1} = \mathbf{w}_t + \alpha [v_\pi(S_t) - \hat{v}(S_t, \mathbf{w}_t)] \mathbf{x}(S_t)\]

3. 基于梯度的TD学习

3.1 梯度下降TD(0)

半梯度TD(0)

\[\mathbf{w}_{t+1} = \mathbf{w}_t + \alpha [R_{t+1} + \gamma \hat{v}(S_{t+1}, \mathbf{w}_t) - \hat{v}(S_t, \mathbf{w}_t)] \nabla_\mathbf{w} \hat{v}(S_t, \mathbf{w}_t)\]

为什么叫"半梯度"? - 只对估计值 \(\hat{v}(S_t, \mathbf{w})\) 求梯度 - 不对目标值 \(R_{t+1} + \gamma \hat{v}(S_{t+1}, \mathbf{w})\) 求梯度 - 不是真正的梯度下降,但实践中有效

3.2 代码实现

Python
import numpy as np

class LinearTDAgent:
    """线性函数近似的TD(0)智能体"""

    def __init__(self, feature_dim, alpha=0.01, gamma=0.9):
        self.d = feature_dim
        self.alpha = alpha
        self.gamma = gamma
        self.w = np.zeros(feature_dim)  # 权重向量

    def value(self, features):
        """计算状态值"""
        return np.dot(self.w, features)  # np.dot矩阵/向量点乘

    def update(self, features, reward, next_features, done):
        """
        TD(0)更新

        参数:
            features: 当前状态特征
            reward: 即时奖励
            next_features: 下一个状态特征
            done: 是否终止
        """
        # 当前估计
        v_current = self.value(features)

        # TD目标
        if done:
            td_target = reward
        else:
            v_next = self.value(next_features)
            td_target = reward + self.gamma * v_next

        # TD误差
        td_error = td_target - v_current

        # 梯度下降更新
        self.w += self.alpha * td_error * features

    def train(self, env, feature_extractor, num_episodes=1000):
        """训练智能体"""
        history = []

        for episode in range(num_episodes):
            state, _ = env.reset()
            features = feature_extractor(state)

            total_reward = 0
            done = False

            while not done:
                # ε-贪婪策略(基于当前值函数)
                action = self.select_action(state, feature_extractor, env)

                next_state, reward, terminated, truncated, _ = env.step(action)
                done = terminated or truncated
                next_features = feature_extractor(next_state)

                self.update(features, reward, next_features, done)

                total_reward += reward
                features = next_features

            history.append(total_reward)

            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(history[-100:])
                print(f"Episode {episode + 1}, Avg Reward: {avg_reward:.2f}")

        return history

    def select_action(self, state, feature_extractor, env, epsilon=0.1):
        """ε-贪婪动作选择"""
        if np.random.random() < epsilon:
            return np.random.randint(env.get_action_space())

        # 评估所有动作
        q_values = []
        for action in range(env.get_action_space()):
            # 假设有动作特征,或只对状态值函数
            features = feature_extractor(state, action)
            q_values.append(self.value(features))

        return np.argmax(q_values)

3.3 梯度下降Q-Learning

Python
class LinearQLearning:
    """线性函数近似的Q-Learning"""

    def __init__(self, feature_dim, n_actions, alpha=0.01, gamma=0.9):
        self.d = feature_dim
        self.n_actions = n_actions
        self.alpha = alpha
        self.gamma = gamma
        # 每个动作一组权重
        self.w = np.zeros((n_actions, feature_dim))

    def q_value(self, features, action):
        """计算Q(s,a)"""
        return np.dot(self.w[action], features)

    def update(self, features, action, reward, next_features, done):
        """Q-Learning更新"""
        # 当前Q值
        q_current = self.q_value(features, action)

        # Q-Learning目标
        if done:
            q_target = reward
        else:
            # 最大Q值
            q_next_values = [self.q_value(next_features, a)
                           for a in range(self.n_actions)]
            q_target = reward + self.gamma * max(q_next_values)

        # TD误差
        td_error = q_target - q_current

        # 更新对应动作的权重
        self.w[action] += self.alpha * td_error * features

4. 收敛性分析

4.1 收敛性挑战

致命三元组(Deadly Triad): 1. 函数近似 2. 自举(Bootstrapping) 3. Off-Policy

当三者同时存在时,算法可能发散。

4.2 收敛条件

线性函数近似 + TD(0): - On-Policy:收敛(收敛到局部最优) - Off-Policy:可能发散

保证收敛的方法: - 梯度TD(GTD, GTD2, TDC) - 经验回放 - 使用On-Policy方法

4.3 稳定性技巧

Python
# 1. 经验回放
class ReplayBuffer:
    """经验回放缓冲区"""

    def __init__(self, capacity=10000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        return zip(*batch)

# 2. 目标网络
class TargetNetwork:
    """目标网络(延迟更新)"""

    def __init__(self, main_network, update_freq=100):
        self.main = main_network
        self.target = copy.deepcopy(main_network)
        self.update_freq = update_freq
        self.step = 0

    def get_target(self, features):
        return self.target.value(features)

    def update(self):
        self.step += 1
        if self.step % self.update_freq == 0:
            self.target = copy.deepcopy(self.main)

5. 实践练习

练习1:实现线性函数近似的CartPole

Python
import gymnasium as gym

# 创建环境
env = gym.make('CartPole-v1')

# 定义特征提取器
def extract_features(observation, action=None):
    """提取多项式特征"""
    x, x_dot, theta, theta_dot = observation

    # 简单的多项式特征
    features = np.array([
        1.0,
        x, x_dot, theta, theta_dot,
        x**2, x_dot**2, theta**2, theta_dot**2,
        x * theta, x_dot * theta_dot
    ])

    return features

# 训练
agent = LinearQLearning(feature_dim=11, n_actions=2, alpha=0.001)
# ... 训练代码

练习2:比较不同特征的效果

Python
def compare_features(env, num_episodes=500):
    """比较不同特征表示的效果"""

    # 1. 原始特征
    def raw_features(state):
        return np.array(state)

    # 2. 多项式特征
    def poly_features(state):
        x, y = state[:2]  # 切片操作,取前n个元素
        return np.array([1, x, y, x**2, x*y, y**2])

    # 3. RBF特征
    centers = [np.array([i, j]) for i in range(5) for j in range(5)]
    def rbf_features(state):
        return rbf_features_impl(state, centers, widths=1.0)

    feature_extractors = [
        ('Raw', raw_features),
        ('Polynomial', poly_features),
        ('RBF', rbf_features)
    ]

    results = {}
    for name, extractor in feature_extractors:
        agent = LinearTDAgent(feature_dim=len(extractor(env.reset()[0])),
                            alpha=0.01)
        history = agent.train(env, extractor, num_episodes)
        results[name] = history

    # 绘制对比
    for name, history in results.items():
        plt.plot(history, label=name, alpha=0.7)
    plt.legend()
    plt.show()

6. 本章总结

核心概念

Text Only
函数近似:
├── 为什么: 状态空间太大,需要泛化
├── 线性近似: v̂(s,w) = w^T x(s)
├── 特征工程:
│   ├── 多项式特征
│   ├── RBF特征
│   └── 瓷砖编码
└── 梯度更新:
    ├── 半梯度TD(0)
    └── 梯度下降Q-Learning

收敛性:
├── 致命三元组: 函数近似 + 自举 + Off-Policy
├── 稳定性技巧:
│   ├── 经验回放
│   └── 目标网络
└── 保证收敛: 梯度TD方法

✅ 自测问题

  1. 为什么表格方法在大状态空间下不可行?函数近似如何解决?

  2. 什么是"半梯度"?为什么不对目标值求梯度?

  3. 致命三元组是什么?如何避免发散?

  4. 设计一个适合MountainCar环境的特征表示。


📚 延伸阅读

  1. Sutton & Barto (2018)
  2. 《Reinforcement Learning: An Introduction》第9章

  3. Boyan & Moore (1995)

  4. "Generalization in Reinforcement Learning"

准备好学习深度Q网络了吗?

→ 下一步:02-DQN详解.md