跳转至

01 - 模仿学习

学习时间: 3-4小时 重要性: ⭐⭐⭐⭐ 从专家数据学习 前置知识: 监督学习、策略梯度


🎯 学习目标

完成本章后,你将能够: - 理解模仿学习的核心思想 - 掌握行为克隆方法 - 了解DAgger算法 - 理解逆强化学习的概念 - 应用模仿学习解决实际问题


1. 模仿学习简介

1.1 为什么需要模仿学习

传统RL的问题: - 探索效率低 - 奖励函数设计困难 - 训练时间长

模仿学习的优势: - 从专家演示直接学习 - 无需设计奖励函数 - 学习速度快

1.2 应用场景

  • 自动驾驶
  • 机器人操作
  • 游戏AI
  • 对话系统

2. 行为克隆 (Behavior Cloning)

2.1 核心思想

将RL问题转化为监督学习问题

\[\min_\theta \mathbb{E}_{(s,a) \sim \mathcal{D}}[L(\pi_\theta(s), a)]\]

其中\(\mathcal{D} = \{(s_i, a_i)\}\)是专家演示数据集

2.2 代码实现

Python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class BehaviorCloning:
    """行为克隆"""

    def __init__(self, state_dim, action_dim, hidden_dim=256, lr=3e-4):
        # 策略网络
        self.policy = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Tanh()  # 连续动作
        )

        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        self.loss_fn = nn.MSELoss()

    def train(self, expert_data, num_epochs=100, batch_size=64):
        """
        训练行为克隆策略

        参数:
            expert_data: (states, actions) 专家数据
        """
        states, actions = expert_data
        n_samples = len(states)

        for epoch in range(num_epochs):
            # 随机打乱
            indices = np.random.permutation(n_samples)

            total_loss = 0
            n_batches = 0

            for i in range(0, n_samples, batch_size):
                batch_indices = indices[i:i+batch_size]

                batch_states = torch.FloatTensor(states[batch_indices])
                batch_actions = torch.FloatTensor(actions[batch_indices])

                # 前向传播
                pred_actions = self.policy(batch_states)

                # 计算损失
                loss = self.loss_fn(pred_actions, batch_actions)

                # 反向传播
                self.optimizer.zero_grad()  # 清零梯度
                loss.backward()  # 反向传播计算梯度
                self.optimizer.step()  # 更新参数

                total_loss += loss.item()  # 将单元素张量转为Python数值
                n_batches += 1

            if (epoch + 1) % 10 == 0:
                avg_loss = total_loss / n_batches
                print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {avg_loss:.4f}")

    def select_action(self, state):
        """选择动作"""
        with torch.no_grad():  # 禁用梯度计算,节省内存
            state = torch.FloatTensor(state).unsqueeze(0)  # unsqueeze增加一个维度
            action = self.policy(state)
            return action.squeeze(0).numpy()  # squeeze压缩维度

# 收集专家数据
def collect_expert_data(env, expert_policy, num_episodes=100):
    """收集专家演示数据"""
    states = []
    actions = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False

        while not done:
            # 专家策略选择动作
            action = expert_policy(state)

            states.append(state)
            actions.append(action)

            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            state = next_state

    return np.array(states), np.array(actions)  # np.array创建NumPy数组

2.3 问题与局限

复合误差问题: - 策略犯的错误会累积 - 访问的状态分布与专家不同

解决方案:DAgger


3. DAgger (Dataset Aggregation)

3.1 核心思想

迭代收集数据: 1. 用当前策略收集数据 2. 请专家标注收集的状态 3. 聚合到数据集 4. 重新训练策略

3.2 算法流程

Python
class DAgger:
    """DAgger算法"""

    def __init__(self, state_dim, action_dim, expert_policy,
                 hidden_dim=256, lr=3e-4):
        self.expert_policy = expert_policy
        self.bc = BehaviorCloning(state_dim, action_dim, hidden_dim, lr)

        # 数据集
        self.states = []
        self.actions = []

    def train(self, env, num_iterations=10, episodes_per_iter=10):
        """
        DAgger训练

        参数:
            env: 环境
            num_iterations: 迭代次数
            episodes_per_iter: 每次迭代的episode数
        """
        for iteration in range(num_iterations):
            print(f"\n=== Iteration {iteration + 1}/{num_iterations} ===")

            # 1. 用当前策略收集数据
            new_states = []
            for _ in range(episodes_per_iter):
                state, _ = env.reset()
                done = False

                while not done:
                    # 使用当前策略(可能带噪声)
                    if len(self.states) == 0:
                        # 第一次迭代,使用专家策略
                        action = self.expert_policy(state)
                    else:
                        # 使用当前学习的策略
                        action = self.bc.select_action(state)
                        # 添加噪声以探索
                        action += np.random.normal(0, 0.1, size=action.shape)

                    new_states.append(state)
                    next_state, reward, terminated, truncated, _ = env.step(action)
                    done = terminated or truncated
                    state = next_state

            # 2. 请专家标注
            print(f"标注 {len(new_states)} 个状态...")
            new_actions = [self.expert_policy(s) for s in new_states]

            # 3. 聚合数据
            self.states.extend(new_states)
            self.actions.extend(new_actions)

            print(f"数据集大小: {len(self.states)}")

            # 4. 重新训练
            expert_data = (np.array(self.states), np.array(self.actions))
            self.bc.train(expert_data, num_epochs=50)

            # 评估
            avg_reward = self.evaluate(env, num_episodes=5)
            print(f"平均奖励: {avg_reward:.2f}")

    def evaluate(self, env, num_episodes=10):
        """评估策略"""
        total_rewards = []

        for _ in range(num_episodes):
            state, _ = env.reset()
            done = False
            episode_reward = 0

            while not done:
                action = self.bc.select_action(state)
                next_state, reward, terminated, truncated, _ = env.step(action)
                done = terminated or truncated
                state = next_state
                episode_reward += reward

            total_rewards.append(episode_reward)

        return np.mean(total_rewards)

    def select_action(self, state):
        return self.bc.select_action(state)

4. 逆强化学习 (Inverse RL)

4.1 核心思想

从专家演示恢复奖励函数

找到奖励函数 \(r_\theta\),使得专家策略在该奖励下优于其他策略:

\[\max_\theta \mathbb{E}_{\pi_E}[r_\theta(s,a)] - \mathbb{E}_{\pi_\theta}[r_\theta(s,a)]\]

即专家的期望奖励应高于学习策略的期望奖励。

4.2 最大熵IRL

Python
class MaxEntIRL:
    """最大熵逆强化学习"""

    def __init__(self, state_dim, feature_dim, lr=1e-3):
        # 奖励函数参数
        self.reward_weights = nn.Parameter(torch.zeros(feature_dim))
        self.optimizer = optim.Adam([self.reward_weights], lr=lr)

    def reward(self, features):
        """计算奖励"""
        return torch.dot(self.reward_weights, features)

    def compute_feature_expectations(self, trajectories):
        """计算特征期望"""
        feature_sum = torch.zeros(len(self.reward_weights))

        for traj in trajectories:
            for features in traj:
                feature_sum += features

        return feature_sum / len(trajectories)

    def train(self, expert_trajectories, policy_trajectories, num_iters=100):
        """
        训练奖励函数

        参数:
            expert_trajectories: 专家轨迹列表
            policy_trajectories: 策略轨迹列表
        """
        expert_features = self.compute_feature_expectations(expert_trajectories)
        policy_features = self.compute_feature_expectations(policy_trajectories)

        for iteration in range(num_iters):
            # 最大化专家与策略的奖励差距
            loss = -torch.dot(self.reward_weights, expert_features - policy_features)

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            if (iteration + 1) % 10 == 0:
                print(f"Iter {iteration + 1}, Loss: {loss.item():.4f}")

5. 本章总结

方法对比

方法 优点 缺点 适用场景
行为克隆 简单直接 复合误差 数据充足
DAgger 解决分布偏移 需要专家在线 可交互环境
逆RL 学习奖励函数 计算复杂 需要理解意图

核心概念

Text Only
模仿学习:
├── 行为克隆: 监督学习
├── DAgger: 迭代数据聚合
└── 逆RL: 恢复奖励函数

关键挑战:
├── 分布偏移
├── 复合误差
└── 专家数据获取

✅ 自测问题

  1. 行为克隆为什么会出现复合误差?

  2. DAgger如何解决分布偏移问题?

  3. 逆强化学习与模仿学习有什么区别?


📚 延伸阅读

  1. Pomerleau (1989) - ALVINN (行为克隆)
  2. Ross et al. (2011) - DAgger
  3. Ziebart et al. (2008) - MaxEnt IRL

→ 下一步:02-离线强化学习.md