跳转至

06 - 分层强化学习与多目标强化学习

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

学习时间: 4-5小时 重要性: ⭐⭐⭐⭐ 解决复杂长程任务与多目标权衡 前置知识: MDP基础、DQN/PPO、策略梯度


🎯 学习目标

完成本章后,你将能够: - 理解分层强化学习(HRL)的Options框架 - 掌握Feudal NetworksHAM的核心思想 - 了解目标条件RL(Goal-Conditioned RL)与 HER - 掌握多目标RL的帕累托优化方法 - 理解自博弈(Self-Play)的原理与应用


Part I: 分层强化学习(Hierarchical RL)

1. 为什么需要分层?

1.1 问题:长程稀疏奖励

标准RL在面对长期规划任务时效率极低:

Text Only
做一顿饭的RL分解:
├── 底层动作空间(每个时间步)
│   ├── 左手移动(-10cm, 0, 5cm)
│   ├── 右手旋转(5°, -3°, 0°)
│   ├── 手指施力(2N)
│   └── ... 连续高维动作
├── 总步数: ~10000步
├── 奖励: 只在做完饭时 +1
└── 问题: 在10000步中随机探索几乎不可能找到正确序列

分层分解:
├── 高层: 选择子任务("打开冰箱" → "拿出材料" → "切菜" → "炒菜")
├── 中层: 选择子目标("手移到冰箱把手" → "抓住" → "拉开")
└── 底层: 生成关节动作
每一层的决策空间和时间尺度更小 → 更容易学习

1.2 分层RL的核心优势

优势 说明
时间抽象 高层策略以更长的时间尺度决策
状态抽象 高层只关注任务相关的宏观特征
可复用性 底层技能可跨任务复用
探索效率 在子目标空间而非原始动作空间探索

2. Options框架

2.1 定义

Sutton, Precup, Singh (1999) 提出的Options框架是分层RL的理论基础。

一个Option \(\omega = (I_\omega, \pi_\omega, \beta_\omega)\) 由三部分组成: - \(I_\omega \subseteq S\)初始化集合(在什么状态下可以启动这个option) - \(\pi_\omega: S \times A \to [0,1]\)内部策略(option执行什么动作) - \(\beta_\omega: S \to [0,1]\)终止条件(option什么时候结束)

直觉:一个Option就是一个"技能"。比如"走到门口"是一个option,它有自己的策略(怎么走),有触发条件(我离门不太远时可用),有终止条件(到门口了就结束)。

2.2 Semi-MDP (SMDP)

引入Options后,高层策略在时间扩展的框架中决策:

\[Q_\Omega(s, \omega) = \mathbb{E}\left[ r_1 + \gamma r_2 + \cdots + \gamma^{k-1} r_k + \gamma^k \max_{\omega'} Q_\Omega(s', \omega') \right]\]

其中 \(k\) 是option执行的持续时间。

2.3 代码实现

Python
import numpy as np
from typing import Callable, Optional, Tuple
from dataclasses import dataclass

@dataclass  # @dataclass自动生成__init__等方法
class Option:
    """
    Options框架中的一个Option(技能/子策略)

    参考: Sutton, Precup, Singh (1999)
    "Between MDPs and Semi-MDPs: A Framework for Temporal Abstraction in RL"
    """
    name: str                                      # option名称
    initiation_set: Callable[[np.ndarray], bool]  # 初始化条件 I(s) → bool
    policy: Callable[[np.ndarray], int]           # 内部策略 π(s) → a
    termination: Callable[[np.ndarray], float]    # 终止概率 β(s) → [0,1]

class OptionsFramework:
    """
    Options框架的SMDP Q-Learning实现

    高层策略: 选择Option(使用Q-Learning over options)
    底层策略: 每个Option的内部策略
    """

    def __init__(self, n_states: int, options: list, gamma: float = 0.99, lr: float = 0.1):
        """
        参数:
            n_states: 状态数量(离散状态空间)
            options: Option列表
            gamma: 折扣因子
            lr: Q值学习率
        """
        self.options = options
        self.n_options = len(options)
        self.gamma = gamma
        self.lr = lr

        # 高层Q值: Q(s, option)
        self.Q = np.zeros((n_states, self.n_options))

    def select_option(self, state: int, epsilon: float = 0.1) -> int:
        """
        高层策略: ε-greedy选择option
        只能从当前状态可用的options中选择
        """
        # 筛选可用options
        available = [
            i for i, opt in enumerate(self.options)  # enumerate同时获取索引和元素
            if opt.initiation_set(state)
        ]

        if not available:
            raise ValueError(f"状态 {state} 没有可用option")

        if np.random.random() < epsilon:
            return np.random.choice(available)

        # 在可用options中选Q值最大的
        q_vals = [(i, self.Q[state, i]) for i in available]
        return max(q_vals, key=lambda x: x[1])[0]  # lambda匿名函数

    def execute_option(self, env, state: int, option_idx: int) -> Tuple[int, float, int]:
        """
        执行一个option直到终止

        返回:
            next_state: option终止时的状态
            total_reward: 累积折扣奖励
            steps: 执行的步数
        """
        option = self.options[option_idx]
        total_reward = 0.0
        steps = 0
        current_state = state

        while True:
            # 使用option的内部策略选择动作
            action = option.policy(current_state)
            next_state, reward, done, _ = env.step(action)

            total_reward += (self.gamma ** steps) * reward
            steps += 1
            current_state = next_state

            # 检查终止条件
            if done or np.random.random() < option.termination(current_state):
                break

        return current_state, total_reward, steps

    def update_q(self, state: int, option_idx: int, reward: float,
                 next_state: int, steps: int):
        """
        SMDP Q-Learning更新

        Q(s, ω) ← Q(s, ω) + α[R + γ^k max_ω' Q(s', ω') - Q(s, ω)]
        """
        best_next = np.max(self.Q[next_state])
        target = reward + (self.gamma ** steps) * best_next
        self.Q[state, option_idx] += self.lr * (target - self.Q[state, option_idx])

# === 使用示例 ===
def create_navigation_options():
    """创建导航任务的options"""

    # Option 1: 向右走到边界
    go_right = Option(
        name="向右走",
        initiation_set=lambda s: s % 10 < 9,       # 不在右边界时可用
        policy=lambda s: 1,                          # 动作1=向右
        termination=lambda s: 1.0 if s % 10 == 9 else 0.0  # 到右边界就终止
    )

    # Option 2: 向下走到边界
    go_down = Option(
        name="向下走",
        initiation_set=lambda s: s // 10 < 9,       # 不在下边界时可用
        policy=lambda s: 2,                          # 动作2=向下
        termination=lambda s: 1.0 if s // 10 == 9 else 0.0  # 到下边界就终止
    )

    # Option 3: 走到目标区域
    go_to_goal = Option(
        name="走到目标",
        initiation_set=lambda s: True,               # 任何状态可用
        policy=lambda s: 1 if s % 10 < 9 else 2,    # 先右再下
        termination=lambda s: 1.0 if s == 99 else 0.1  # 接近目标时有概率终止
    )

    return [go_right, go_down, go_to_goal]

3. Option-Critic架构

3.1 端到端学习Options

Bacon, Harb, Precup (2017) 提出了Option-Critic,可以端到端学习options的策略和终止条件(而非手动设计)。

Text Only
Option-Critic 架构:
┌─────────────────────────────────────┐
│ 高层: 策略 over Options (π_Ω)      │
│   选择哪个option来执行              │
├─────────────────────────────────────┤
│ 中间: Option内部策略 (π_ω)          │
│   每个option里如何选择动作           │
├─────────────────────────────────────┤
│ 终止: 终止函数 (β_ω)               │
│   option什么时候结束                │
└─────────────────────────────────────┘
三个组件都通过梯度下降端到端学习

3.2 代码实现

Python
import torch
import torch.nn as nn
import torch.nn.functional as F

class OptionCritic(nn.Module):  # 继承nn.Module定义网络层
    """
    Option-Critic 架构

    端到端学习:
    1. 高层策略: 选择option(基于Q_Ω)
    2. option内部策略: 产生动作
    3. 终止函数: 决定option何时结束

    参考: Bacon et al., "The Option-Critic Architecture" (AAAI 2017)
    """

    def __init__(self, state_dim: int, n_actions: int, n_options: int = 4):
        """
        参数:
            state_dim: 状态向量维度
            n_actions: 原子动作数量
            n_options: option数量
        """
        super().__init__()  # super()调用父类方法
        self.n_options = n_options
        self.n_actions = n_actions

        # 共享特征提取器
        self.features = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU()
        )

        # Q_Ω(s, ω): 高层Q值(选择option)
        self.q_omega = nn.Linear(128, n_options)

        # π_ω(a|s): 每个option的内部策略
        # n_options个独立的动作分布
        self.option_policies = nn.ModuleList([
            nn.Linear(128, n_actions) for _ in range(n_options)
        ])

        # β_ω(s): 每个option的终止概率
        self.terminations = nn.Linear(128, n_options)

    def get_features(self, state: torch.Tensor) -> torch.Tensor:
        """提取共享特征"""
        return self.features(state)

    def get_q_omega(self, state: torch.Tensor) -> torch.Tensor:
        """计算所有option的Q值"""
        feat = self.get_features(state)
        return self.q_omega(feat)  # (batch, n_options)

    def get_option_policy(self, state: torch.Tensor, option: int) -> torch.Tensor:
        """获取某个option的动作概率分布"""
        feat = self.get_features(state)
        logits = self.option_policies[option](feat)
        return F.softmax(logits, dim=-1)   # (batch, n_actions)  # F.xxx PyTorch函数式API

    def get_termination(self, state: torch.Tensor, option: int) -> torch.Tensor:
        """获取某个option的终止概率"""
        feat = self.get_features(state)
        term_probs = torch.sigmoid(self.terminations(feat))  # (batch, n_options)
        return term_probs[:, option]  # (batch,)

    def select_option(self, state: torch.Tensor, epsilon: float = 0.1) -> int:
        """ε-greedy选择option"""
        if torch.rand(1).item() < epsilon:  # 将单元素张量转为Python数值
            return torch.randint(self.n_options, (1,)).item()
        q_values = self.get_q_omega(state)
        return q_values.argmax(dim=-1).item()

    def select_action(self, state: torch.Tensor, option: int) -> int:
        """根据当前option的策略选择动作"""
        action_probs = self.get_option_policy(state, option)
        return torch.multinomial(action_probs, 1).item()

class OptionCriticTrainer:
    """Option-Critic训练器"""

    def __init__(self, model: OptionCritic, lr: float = 1e-3, gamma: float = 0.99,
                 termination_reg: float = 0.01):
        """
        参数:
            model: OptionCritic模型
            lr: 学习率
            gamma: 折扣因子
            termination_reg: 终止正则化系数
                (鼓励options不要太快终止,促进时间抽象)
        """
        self.model = model
        self.gamma = gamma
        self.termination_reg = termination_reg
        self.optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    def update(self, state, option, action, reward, next_state, done):
        """
        一步更新

        三个梯度同时更新:
        1. Q_Ω: 高层Q值 (类似DQN)
        2. π_ω: option策略 (策略梯度)
        3. β_ω: 终止函数 (优势函数驱动)
        """
        state_t = torch.FloatTensor(state).unsqueeze(0)  # unsqueeze增加一个维度
        next_state_t = torch.FloatTensor(next_state).unsqueeze(0)

        # --- Q_Ω 更新 ---
        q_omega = self.model.get_q_omega(state_t)[0, option]

        with torch.no_grad():  # 禁用梯度计算,节省内存
            # 下一状态的option选择考虑终止
            next_q = self.model.get_q_omega(next_state_t)
            beta_next = self.model.get_termination(next_state_t, option)
            # 如果终止→选新option(max Q),否则继续当前option
            target = reward + self.gamma * (1 - done) * (
                beta_next * next_q.max() + (1 - beta_next) * next_q[0, option]
            )

        q_loss = F.mse_loss(q_omega, target)

        # --- π_ω 策略梯度 ---
        action_probs = self.model.get_option_policy(state_t, option)
        log_prob = torch.log(action_probs[0, action] + 1e-8)

        with torch.no_grad():
            advantage = reward + self.gamma * (1 - done) * next_q.max() - q_omega

        policy_loss = -log_prob * advantage.detach()  # 分离计算图,不参与梯度计算

        # --- β_ω 终止梯度 ---
        beta = self.model.get_termination(state_t, option)
        with torch.no_grad():
            # 终止的优势: 终止后选最优option vs 继续当前option
            term_advantage = next_q.max() - next_q[0, option]

        # 只有当终止有优势时才鼓励终止 + 正则化鼓励不要过早终止
        termination_loss = beta * (term_advantage - self.termination_reg)

        # 总损失
        total_loss = q_loss + policy_loss + termination_loss

        self.optimizer.zero_grad()  # 清零梯度
        total_loss.backward()  # 反向传播计算梯度
        self.optimizer.step()  # 更新参数

        return {
            'q_loss': q_loss.item(),
            'policy_loss': policy_loss.item(),
            'termination_loss': termination_loss.item()
        }

4. 目标条件强化学习(Goal-Conditioned RL)

4.1 核心思想

在标准RL中,目标是固定的。Goal-Conditioned RL 将目标 \(g\) 作为额外输入,使策略可以泛化到不同目标:

\[\pi(a|s, g) \quad \text{—— 策略同时依赖状态和目标}\]

4.2 Hindsight Experience Replay (HER)

Andrychowicz et al. (OpenAI, 2017) 提出的HER是Goal-Conditioned RL的关键突破。

核心洞察:即使智能体没能到达预定目标,它到达的位置本身也可以作为一个"事后目标"来学习。

Text Only
HER的直觉:
┌────────────────────────────────────────┐
│ 原始经验:                              │
│   目标: 到达A点                        │
│   实际: 到达了B点                      │
│   奖励: -1 (失败)                      │
│                                        │
│ HER重新标记:                           │
│   目标: 到达B点 (把实际到达的作为目标)  │
│   实际: 到达了B点                      │
│   奖励: 0 (成功!)                      │
│                                        │
│ → 从"失败"中也能学到有用的信息         │
└────────────────────────────────────────┘

4.3 代码实现

Python
import numpy as np
from collections import deque
from typing import Dict, List, Tuple

class HindsightExperienceReplay:
    """
    Hindsight Experience Replay (HER)

    将失败的轨迹重新标记为成功经验,
    解决稀疏奖励+目标条件RL的样本效率问题。

    参考: Andrychowicz et al., "Hindsight Experience Replay" (2017)
    """

    def __init__(
        self,
        buffer_size: int = 100000,
        n_sampled_goal: int = 4,
        strategy: str = 'future'
    ):
        """
        参数:
            buffer_size: 经验池大小
            n_sampled_goal: 每个transition重新标记的目标数
            strategy: HER目标采样策略
                - 'future': 从同轨迹的未来状态中采样(最常用)
                - 'final': 使用轨迹最后到达的状态
                - 'random': 随机采样
                - 'episode': 从同一episode中随机采样
        """
        self.buffer = deque(maxlen=buffer_size)
        self.n_sampled_goal = n_sampled_goal
        self.strategy = strategy

    def store_episode(self, episode: List[Dict]):
        """
        存储一个episode并进行HER重新标记

        参数:
            episode: transition列表,每个包含:
                {state, action, reward, next_state, goal, achieved_goal, done}
        """
        # 1. 存储原始经验
        for transition in episode:
            self.buffer.append(transition)

        # 2. HER重标记: 为每个transition生成额外的成功经验
        for idx, transition in enumerate(episode):
            sampled_goals = self._sample_goals(episode, idx)

            for new_goal in sampled_goals:
                # 用新目标重新计算奖励
                new_reward = self._compute_reward(
                    transition['achieved_goal'], new_goal
                )

                # 创建重标记的transition
                her_transition = {
                    'state': transition['state'],
                    'action': transition['action'],
                    'reward': new_reward,
                    'next_state': transition['next_state'],
                    'goal': new_goal,
                    'achieved_goal': transition['achieved_goal'],
                    'done': new_reward == 0  # 达到新目标=成功
                }
                self.buffer.append(her_transition)

    def _sample_goals(self, episode: List[Dict], current_idx: int) -> List[np.ndarray]:
        """根据策略采样替代目标"""
        goals = []

        for _ in range(self.n_sampled_goal):
            if self.strategy == 'future':
                # 从当前时间步之后的状态中采样(最有效的策略)
                if current_idx < len(episode) - 1:
                    future_idx = np.random.randint(current_idx + 1, len(episode))
                    goals.append(episode[future_idx]['achieved_goal'])
            elif self.strategy == 'final':
                # 使用episode最终到达的状态
                goals.append(episode[-1]['achieved_goal'])  # [-1]负索引取最后元素
            elif self.strategy == 'episode':
                # 从同episode中随机采样
                rand_idx = np.random.randint(0, len(episode))
                goals.append(episode[rand_idx]['achieved_goal'])

        return goals

    def _compute_reward(self, achieved_goal: np.ndarray, desired_goal: np.ndarray,
                        threshold: float = 0.05) -> float:
        """
        稀疏奖励函数:到达目标 → 0,否则 → -1
        """
        dist = np.linalg.norm(achieved_goal - desired_goal)  # np.linalg线性代数运算
        return 0.0 if dist < threshold else -1.0

    def sample(self, batch_size: int = 256) -> Dict:
        """从buffer中随机采样一个batch"""
        indices = np.random.randint(0, len(self.buffer), size=batch_size)
        batch = [self.buffer[i] for i in indices]

        return {
            'states': np.array([t['state'] for t in batch]),  # np.array创建NumPy数组
            'actions': np.array([t['action'] for t in batch]),
            'rewards': np.array([t['reward'] for t in batch]),
            'next_states': np.array([t['next_state'] for t in batch]),
            'goals': np.array([t['goal'] for t in batch]),
            'dones': np.array([t['done'] for t in batch])
        }

Part II: 多目标强化学习(Multi-Objective RL)

5. 多目标优化基础

5.1 问题定义

在多目标RL中,有多个奖励函数需要同时优化:

\[\max_\pi \mathbf{J}(\pi) = \left( J_1(\pi), J_2(\pi), \ldots, J_m(\pi) \right)\]

帕累托最优:如果没有其他策略在所有目标上都不差且至少一个目标更好,则称 \(\pi\) 是帕累托最优的。

Text Only
双目标示例: 自动驾驶
├── 目标1: 最短到达时间 (快)
├── 目标2: 最少碰撞风险 (安全)
└── 帕累托前沿:

    安全 ↑
    │    ●  非常安全但很慢
    │   ● ●
    │  ●   ● ← 帕累托前沿
    │ ●     ●    (所有帕累托最优策略)
    │●       ●
    │         ● 很快但不太安全
    └──────────── →  速度

    前沿上的每个点都是"不可能在不牺牲一个目标的情况下改进另一个目标"

5.2 方法分类

方法 描述 优势 劣势
线性加权 \(R = w_1 R_1 + w_2 R_2\) 简单 只能找到凸前沿
约束法 优化一个目标,约束其余 可找非凸前沿 需设阈值
帕累托法 同时优化所有目标 找到整个前沿 计算昂贵
偏好条件 策略以偏好为额外输入 一个模型适应所有偏好 训练复杂

5.3 线性加权方法

Python
import numpy as np
import torch
import torch.nn as nn

class LinearScalarization:
    """
    线性加权法: 最简单的多目标RL方法

    将多个目标的标量化: R = Σ w_i × R_i
    通过改变权重w遍历帕累托前沿

    局限: 只能找到帕累托前沿的凸包部分
    """

    def __init__(self, n_objectives: int):
        self.n_objectives = n_objectives

    def scalarize(self, rewards: np.ndarray, weights: np.ndarray) -> float:
        """
        将多目标奖励向量标量化

        参数:
            rewards: 多目标奖励向量 (n_objectives,)
            weights: 权重向量 (n_objectives,), sum=1
        """
        assert len(rewards) == len(weights) == self.n_objectives  # assert断言
        assert abs(sum(weights) - 1.0) < 1e-6, "权重必须归一化"
        return np.dot(rewards, weights)  # np.dot矩阵/向量点乘

    def sweep_pareto_front(self, n_points: int = 11):
        """
        通过遍历不同权重组合来近似帕累托前沿 (2目标)
        """
        if self.n_objectives != 2:
            raise ValueError("帕累托前沿遍历只支持2目标")

        weight_sets = []
        for i in range(n_points):
            w1 = i / (n_points - 1)
            w2 = 1 - w1
            weight_sets.append(np.array([w1, w2]))

        return weight_sets  # 每组权重训练一个策略

5.4 帕累托条件策略网络

Python
class ParetoConditionedPolicy(nn.Module):
    """
    帕累托条件策略网络

    将偏好权重作为额外输入,使一个策略网络
    能够在不同权重下表现出不同的帕累托最优行为。

    训练时随机采样权重 → 模型学会适应不同偏好
    推理时指定权重 → 输出对应的策略

    参考: Abels et al., "Dynamic Weights in Multi-Objective DRL" (ICML 2019)
    """

    def __init__(self, state_dim: int, action_dim: int, n_objectives: int):
        super().__init__()

        # 输入 = state + preference weights
        input_dim = state_dim + n_objectives

        self.network = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim)
        )

        self.n_objectives = n_objectives

    def forward(self, state: torch.Tensor, preference: torch.Tensor) -> torch.Tensor:
        """
        根据状态和偏好权重输出动作

        参数:
            state: (batch, state_dim)
            preference: (batch, n_objectives),权重向量
        """
        # 拼接状态和偏好
        x = torch.cat([state, preference], dim=-1)  # torch.cat沿已有维度拼接张量
        return self.network(x)

    def train_step(self, batch, optimizer):
        """
        训练一步: 随机采样偏好权重
        """
        states = batch['states']
        batch_size = states.shape[0]

        # 随机采样偏好权重(Dirichlet分布保证和为1)
        preferences = torch.distributions.Dirichlet(
            torch.ones(self.n_objectives)
        ).sample((batch_size,))

        # 计算标量化的奖励
        multi_rewards = batch['multi_rewards']  # (batch, n_objectives)
        scalar_rewards = (multi_rewards * preferences).sum(dim=-1)

        # 正常的策略梯度/DQN更新...
        actions = self.forward(states, preferences)
        loss = -scalar_rewards.mean()  # 简化

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        return loss.item()

6. 自博弈(Self-Play)

6.1 核心思想

Self-Play 是让智能体与自身的历史版本或当前版本对弈来提升能力。

经典成功案例: - AlphaGo / AlphaZero: 围棋、象棋、将棋 - OpenAI Five: Dota 2 - Cicero: 外交策略游戏

6.2 分类

Text Only
Self-Play方法:
├── 朴素自博弈(Naive Self-Play)
│   └── 永远与最新版本对弈
│       缺点: 可能不稳定(策略循环)
├── 虚拟自博弈(Fictitious Self-Play)
│   └── 对手是历史策略的均匀混合
│       优势: 收敛到纳什均衡
├── 人口自博弈(Population-Based Self-Play)
│   └── 维护一个策略群体,多样化对手
│       优势: 避免过拟合特定对手
└── PFSP(优先虚拟自博弈)
    └── 根据胜率优先选择弱势对手
        优势: 加速训练,减少灾难性遗忘

6.3 代码框架

Python
from collections import deque
import numpy as np
import copy

class SelfPlayTrainer:
    """
    Self-Play训练框架

    维护一个历史策略池,智能体与历史版本对弈提升
    """

    def __init__(self, agent, pool_size: int = 20, save_interval: int = 1000):
        """
        参数:
            agent: RL智能体
            pool_size: 历史策略池大小
            save_interval: 每多少步保存一次快照
        """
        self.agent = agent
        self.opponent_pool = deque(maxlen=pool_size)
        self.save_interval = save_interval
        self.step_count = 0

        # 保存初始版本
        self.opponent_pool.append(copy.deepcopy(agent))

    def select_opponent(self, strategy: str = 'pfsp') -> object:
        """
        选择对手

        参数:
            strategy: 对手选择策略
                - 'latest': 最新版本
                - 'uniform': 均匀随机
                - 'pfsp': 优先选择弱势对手
        """
        if strategy == 'latest':
            return self.opponent_pool[-1]
        elif strategy == 'uniform':
            return np.random.choice(list(self.opponent_pool))
        elif strategy == 'pfsp':
            return self._pfsp_select()

    def _pfsp_select(self) -> object:
        """
        优先虚拟自博弈(PFSP)选择
        优先选择当前agent胜率较低的对手
        → 在薄弱环节上更多训练
        """
        if len(self.opponent_pool) <= 1:
            return self.opponent_pool[0]

        # 计算对每个历史对手的胜率
        # 胜率低的对手权重更高(优先挑战弱点)
        win_rates = np.array([
            self._estimate_win_rate(opponent)
            for opponent in self.opponent_pool
        ])

        # 转化为采样权重(胜率越低 → 权重越高)
        weights = (1 - win_rates) ** 2  # 二次权重,强调弱势匹配
        weights = weights / weights.sum()

        idx = np.random.choice(len(self.opponent_pool), p=weights)
        return self.opponent_pool[idx]

    def _estimate_win_rate(self, opponent, n_games: int = 10) -> float:
        """估计对某个对手的胜率(简化版)"""
        # 实际中会维护一个胜率统计表
        return 0.5  # 简化

    def train_step(self):
        """一步训练"""
        # 选择对手
        opponent = self.select_opponent('pfsp')

        # 对弈并训练(具体实现取决于环境)
        # result = play_game(self.agent, opponent)
        # self.agent.learn(result)

        self.step_count += 1

        # 定期保存快照
        if self.step_count % self.save_interval == 0:
            snapshot = copy.deepcopy(self.agent)
            self.opponent_pool.append(snapshot)

7. 面试要点

7.1 高频问题

  1. 什么是Options框架?它如何实现时间抽象?
  2. Option = (初始化集合, 内部策略, 终止条件)
  3. 高层策略选择option,底层策略执行→时间尺度分离

  4. Option-Critic相比手工设计Options的优势?

  5. 端到端学习,无需人工设计,可自动发现有意义的子目标和技能

  6. HER如何解决稀疏奖励问题?

  7. 将失败经验重新标记为"事后成功"→大幅增加正样本

  8. 帕累托最优策略一定是线性加权能找到的吗?

  9. 不一定,线性加权只能找到帕累托前沿的凸包部分

  10. Self-Play为什么有效?PFSP相比Naive Self-Play的优势?

  11. Self-Play将自身作为不断提升的对手→ 自举式进步
  12. PFSP优先挑战弱点→更均衡→避免策略循环

📌 关键要点总结

  1. 分层RL通过Options框架实现时间抽象,解决长程规划问题
  2. Option-Critic可以端到端学习option的策略和终止条件
  3. HER通过事后重标记将失败转化为学习信号
  4. 多目标RL需要考虑帕累托最优,线性加权只是最简单的方法
  5. Self-Play是竞争性环境中的强大训练方法,PFSP是工程实践中的常用策略

📚 延伸阅读

  • PPO算法
  • 模仿学习
  • 安全强化学习
  • 论文: Sutton et al., "Between MDPs and Semi-MDPs" (1999)
  • 论文: Bacon et al., "The Option-Critic Architecture" (2017)
  • 论文: Andrychowicz et al., "Hindsight Experience Replay" (2017)