跳转至

05 - 模型基础方法前沿

学习时间: 4-5小时 重要性: ⭐⭐⭐⭐⭐ 结合模型与规划的先进方法 前置知识: 模型基方法、MCTS


🎯 学习目标

完成本章后,你将能够: - 理解MuZero的核心思想 - 了解Dreamer的世界模型方法 - 掌握模型预测控制(MPC) - 应用模型基础方法解决复杂任务


1. 模型基础方法简介

1.1 为什么需要模型基础方法

无模型方法的局限: - 样本效率低 - 难以处理长程规划 - 无法利用环境结构

模型基础方法的优势: - 样本效率高 - 可以进行规划 - 更好的泛化能力

1.2 发展历程

Text Only
模型基础方法演进:
├── Dyna-Q (1990)
├── PILCO (2011)
├── World Models (2018)
├── MuZero (2019)
├── Dreamer (2020)
└── DreamerV3 (2023)

2. MuZero

2.1 核心思想

学习隐式模型: - 不直接学习环境模型 - 学习价值函数、策略和奖励的隐式表示 - 使用MCTS进行规划

三个神经网络: 1. 表示网络:将观察映射为隐藏状态 2. 动力学网络:预测下一状态和奖励 3. 预测网络:预测价值和策略

2.2 算法流程

Python
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

class MuZeroNetwork(nn.Module):  # 继承nn.Module定义网络层
    """MuZero网络"""

    def __init__(self, obs_shape, action_dim, hidden_dim=256, num_blocks=16):
        super(MuZeroNetwork, self).__init__()

        self.action_dim = action_dim
        self.hidden_dim = hidden_dim

        # 表示网络:观察 -> 隐藏状态
        self.representation = nn.Sequential(
            nn.Conv2d(obs_shape[0], 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64 * (obs_shape[1] // 4) * (obs_shape[2] // 4), hidden_dim),
            nn.ReLU()
        )

        # 动力学网络:(状态, 动作) -> (下一状态, 奖励)
        self.dynamics = nn.Sequential(
            nn.Linear(hidden_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        self.reward_head = nn.Linear(hidden_dim, 1)

        # 预测网络:状态 -> (价值, 策略)
        self.prediction = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        self.value_head = nn.Linear(hidden_dim, 1)
        self.policy_head = nn.Linear(hidden_dim, action_dim)

    def represent(self, observation):
        """表示网络"""
        return self.representation(observation)

    def dynamics_step(self, hidden_state, action):
        """动力学网络"""
        action_onehot = torch.zeros(action.size(0), self.action_dim)
        action_onehot.scatter_(1, action.unsqueeze(1), 1.0)  # unsqueeze增加一个维度

        x = torch.cat([hidden_state, action_onehot], dim=-1)  # torch.cat沿已有维度拼接张量
        next_hidden = self.dynamics(x)
        reward = self.reward_head(next_hidden)

        return next_hidden, reward

    def predict(self, hidden_state):
        """预测网络"""
        features = self.prediction(hidden_state)
        value = self.value_head(features)
        policy_logits = self.policy_head(features)

        return value, policy_logits

class MuZero:
    """MuZero算法"""

    def __init__(self, obs_shape, action_dim, lr=1e-3, gamma=0.997):
        self.network = MuZeroNetwork(obs_shape, action_dim)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
        self.gamma = gamma
        self.action_dim = action_dim

    def mcts(self, root_state, num_simulations=800):
        """
        蒙特卡洛树搜索

        参数:
            root_state: 根节点隐藏状态
            num_simulations: 模拟次数

        返回:
            动作概率分布
        """
        # 简化的MCTS实现
        # 实际实现需要维护树结构

        with torch.no_grad():  # 禁用梯度计算,节省内存
            root_value, root_policy = self.network.predict(root_state)

            # 使用策略网络输出作为动作概率
            action_probs = torch.softmax(root_policy, dim=-1)

        return action_probs

    def train_step(self, observations, actions, rewards, values, policies):
        """
        训练一步

        参数:
            observations: 观察序列
            actions: 动作序列
            rewards: 奖励序列
            values: 目标价值
            policies: 目标策略
        """
        # 表示网络
        hidden_state = self.network.represent(observations)

        # 展开k步
        k = len(actions)
        pred_values = []
        pred_policies = []
        pred_rewards = []

        for t in range(k):
            # 预测
            value, policy_logits = self.network.predict(hidden_state)
            pred_values.append(value)
            pred_policies.append(policy_logits)

            # 动力学
            hidden_state, reward = self.network.dynamics_step(hidden_state, actions[t])
            pred_rewards.append(reward)

        # 计算损失
        value_loss = sum([F.mse_loss(pred_values[i], values[i]) for i in range(k)])  # F.xxx PyTorch函数式API
        policy_loss = sum([F.cross_entropy(pred_policies[i], policies[i]) for i in range(k)])  # F.cross_entropy PyTorch函数式交叉熵损失
        reward_loss = sum([F.mse_loss(pred_rewards[i], rewards[i]) for i in range(k)])

        total_loss = value_loss + policy_loss + reward_loss

        self.optimizer.zero_grad()  # 清零梯度
        total_loss.backward()  # 反向传播计算梯度
        self.optimizer.step()  # 更新参数

        return total_loss.item()  # 将单元素张量转为Python数值

3. Dreamer

3.1 核心思想

学习世界模型: - 学习环境的转移模型 - 在潜空间中想象未来 - 从想象中学习策略

三个组件: 1. 表示模型:编码观察为潜状态 2. 转移模型:预测下一潜状态 3. 奖励模型:预测奖励

3.2 代码实现

Python
class DreamerWorldModel(nn.Module):
    """Dreamer世界模型"""

    def __init__(self, obs_dim, action_dim, latent_dim=32, hidden_dim=200):
        super(DreamerWorldModel, self).__init__()

        self.action_dim = action_dim
        self.latent_dim = latent_dim

        # 表示模型:观察 -> 潜状态
        self.encoder = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, latent_dim * 2)  # 均值和对数方差
        )

        # 转移模型:(潜状态, 动作) -> 下一潜状态
        self.transition = nn.Sequential(
            nn.Linear(latent_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, latent_dim * 2)
        )

        # 奖励模型:潜状态 -> 奖励
        self.reward_model = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

        # 解码器:潜状态 -> 观察(用于重建)
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, obs_dim)
        )

    def encode(self, obs):
        """编码观察为潜状态分布"""
        params = self.encoder(obs)
        mean, log_std = torch.chunk(params, 2, dim=-1)
        return mean, log_std

    def sample_latent(self, mean, log_std):
        """从潜状态分布采样"""
        std = torch.exp(log_std)
        eps = torch.randn_like(std)
        return mean + std * eps

    def predict_next(self, latent, action):
        """预测下一潜状态"""
        x = torch.cat([latent, action], dim=-1)
        params = self.transition(x)
        mean, log_std = torch.chunk(params, 2, dim=-1)
        return mean, log_std

    def predict_reward(self, latent):
        """预测奖励"""
        return self.reward_model(latent)

    def decode(self, latent):
        """解码潜状态为观察"""
        return self.decoder(latent)

class Dreamer:
    """Dreamer算法"""

    def __init__(self, obs_dim, action_dim, lr=1e-3, gamma=0.99):
        self.world_model = DreamerWorldModel(obs_dim, action_dim)
        self.optimizer = optim.Adam(self.world_model.parameters(), lr=lr)
        self.gamma = gamma
        self.action_dim = action_dim

    def imagine_trajectory(self, initial_latent, policy, horizon=15):
        """
        想象未来轨迹

        参数:
            initial_latent: 初始潜状态
            policy: 策略网络
            horizon: 想象步数

        返回:
            想象的潜状态、动作、奖励序列
        """
        latents = [initial_latent]
        actions = []
        rewards = []

        latent = initial_latent

        for _ in range(horizon):
            # 策略选择动作
            with torch.no_grad():
                action = policy(latent)

            # 预测下一状态
            mean, log_std = self.world_model.predict_next(latent, action)
            latent = self.world_model.sample_latent(mean, log_std)

            # 预测奖励
            reward = self.world_model.predict_reward(latent)

            latents.append(latent)
            actions.append(action)
            rewards.append(reward)

        return latents, actions, rewards

    def train_world_model(self, observations, actions, rewards, next_observations):
        """训练世界模型"""
        # 编码观察
        mean, log_std = self.world_model.encode(observations)
        latent = self.world_model.sample_latent(mean, log_std)

        # 重建损失
        reconstructed = self.world_model.decode(latent)
        recon_loss = F.mse_loss(reconstructed, observations)

        # KL散度
        next_mean, next_log_std = self.world_model.encode(next_observations)
        pred_mean, pred_log_std = self.world_model.predict_next(latent, actions)

        # KL(encoder || transition) = KL(q(z'|o') || p(z'|z,a))
        kl_loss = torch.mean(torch.sum(
            pred_log_std - next_log_std +
            (next_log_std.exp().pow(2) + (next_mean - pred_mean).pow(2)) / (2 * pred_log_std.exp().pow(2)) - 0.5,
            dim=-1
        ))

        # 奖励预测损失
        pred_rewards = self.world_model.predict_reward(latent)
        reward_loss = F.mse_loss(pred_rewards, rewards)

        # 总损失
        total_loss = recon_loss + 0.1 * kl_loss + reward_loss

        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

        return total_loss.item()

4. 模型预测控制(MPC)

4.1 核心思想

在线规划: - 在每个时间步重新规划 - 使用学习的世界模型进行模拟 - 选择最优动作序列

4.2 代码实现

Python
class MPC:
    """模型预测控制"""

    def __init__(self, world_model, horizon=10, num_samples=1000):
        self.world_model = world_model
        self.horizon = horizon
        self.num_samples = num_samples

    def plan(self, current_obs, action_candidates):
        """
        规划最优动作序列

        参数:
            current_obs: 当前观察
            action_candidates: 候选动作序列

        返回:
            最优动作
        """
        best_action = None
        best_return = -float('inf')

        # 编码当前观察
        with torch.no_grad():
            mean, log_std = self.world_model.encode(current_obs)
            latent = self.world_model.sample_latent(mean, log_std)

        # 评估每个候选动作序列
        for action_seq in action_candidates:
            total_reward = 0
            current_latent = latent

            for action in action_seq:
                # 预测下一状态
                with torch.no_grad():
                    next_mean, next_log_std = self.world_model.predict_next(
                        current_latent, action
                    )
                    current_latent = self.world_model.sample_latent(next_mean, next_log_std)

                    # 预测奖励
                    reward = self.world_model.predict_reward(current_latent)
                    total_reward += reward.item()

            if total_reward > best_return:
                best_return = total_reward
                best_action = action_seq[0]

        return best_action

    def random_shooting(self, current_obs):
        """随机采样动作序列"""
        # 生成随机动作候选
        action_candidates = [
            [torch.randn(self.world_model.action_dim) for _ in range(self.horizon)]
            for _ in range(self.num_samples)
        ]

        return self.plan(current_obs, action_candidates)

    def cem(self, current_obs, num_iterations=5, elite_frac=0.1):
        """交叉熵方法(CEM)优化"""
        action_dim = self.world_model.action_dim

        # 初始化动作分布
        mean = torch.zeros(self.horizon, action_dim)
        std = torch.ones(self.horizon, action_dim)

        for _ in range(num_iterations):
            # 采样动作序列
            action_sequences = []
            for _ in range(self.num_samples):
                actions = []
                for t in range(self.horizon):
                    action = torch.normal(mean[t], std[t])
                    actions.append(action)
                action_sequences.append(actions)

            # 评估
            returns = []
            for action_seq in action_sequences:
                total_reward = 0
                with torch.no_grad():
                    mean_latent, log_std_latent = self.world_model.encode(current_obs)
                    latent = self.world_model.sample_latent(mean_latent, log_std_latent)

                    for action in action_seq:
                        next_mean, next_log_std = self.world_model.predict_next(latent, action)
                        latent = self.world_model.sample_latent(next_mean, next_log_std)
                        reward = self.world_model.predict_reward(latent)
                        total_reward += reward.item()

                returns.append(total_reward)

            # 选择精英样本
            returns = torch.tensor(returns)
            elite_idxs = returns.argsort(descending=True)[:int(self.num_samples * elite_frac)]

            elite_actions = [action_sequences[i] for i in elite_idxs]

            # 更新分布
            for t in range(self.horizon):
                actions_t = torch.stack([seq[t] for seq in elite_actions])  # torch.stack沿新维度拼接张量
                mean[t] = actions_t.mean(dim=0)
                std[t] = actions_t.std(dim=0) + 1e-6

        return mean[0]  # 返回第一个动作

5. 方法对比

方法 核心思想 优点 缺点
MuZero 隐式模型 + MCTS 通用性强,性能优异 计算量大
Dreamer 世界模型 + 想象 样本效率高 模型误差累积
MPC 在线规划 灵活,可处理约束 实时性要求高

6. 本章总结

核心概念

Text Only
模型基础方法前沿:
├── MuZero: 隐式模型 + MCTS
├── Dreamer: 世界模型 + 想象
├── MPC: 在线规划
└── 共同目标: 样本效率 + 长程规划

关键挑战:
├── 模型准确性
├── 计算效率
└── 误差累积

应用场景

  • 机器人控制
  • 游戏AI(围棋、象棋)
  • 自动驾驶规划
  • 资源调度

✅ 自测问题

  1. MuZero与AlphaZero的主要区别是什么?

  2. Dreamer如何利用世界模型进行学习?

  3. MPC与策略学习方法相比有什么优势?


📚 延伸阅读

  1. Schrittwieser et al. (2020) - Mastering Atari, Go, Chess and Shogi by Planning with a Learned Model (MuZero)
  2. Hafner et al. (2020) - Dream to Control: Learning Behaviors by Latent Imagination
  3. Hafner et al. (2023) - Mastering Diverse Domains through World Models (DreamerV3)

恭喜完成所有前沿主题!

→ 回到:06-前沿主题README