08 - 安全强化学习(Safe RL)¶
⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。
学习时间: 4-5小时 重要性: ⭐⭐⭐⭐⭐ 现实部署中必须考虑的安全性 前置知识: MDP基础、PPO、约束优化
🎯 学习目标¶
完成本章后,你将能够: - 理解约束MDP(CMDP)的形式化定义 - 掌握拉格朗日方法求解约束RL的原理 - 了解CPO、PCPO、FOCOPS等安全RL算法 - 理解现实场景中安全约束的设计方法 - 了解Safe RL在自动驾驶、机器人等领域的应用
1. 为什么需要安全强化学习?¶
1.1 不安全的RL¶
标准RL的目标是最大化累积回报,但完全不考虑安全性:
问题:在现实环境中,某些行为虽然能获得高回报,但是危险的、违规的或不可逆转的。
不安全RL的现实风险:
├── 自动驾驶
│ └── 为快速到达目的地而闯红灯、高速变道
├── 机器人控制
│ └── 为完成任务而对人类施力过大
├── 推荐系统
│ └── 为提高点击率推荐极端内容
├── 金融交易
│ └── 为高收益做出高风险杠杆操作
└── LLM对齐
└── 为满足用户请求而输出有害内容
1.2 核心思想¶
安全RL = 在满足安全约束的前提下,最大化累积回报。
2. 约束马尔可夫决策过程(CMDP)¶
2.1 形式化定义¶
CMDP 在标准MDP基础上增加了代价函数和约束:
其中: - \(S, A, P, r, \gamma\):与MDP相同 - \(c_i: S \times A \to \mathbb{R}\):第 \(i\) 个代价函数 - \(d_i \in \mathbb{R}\):第 \(i\) 个约束的阈值
安全RL优化目标:
直觉:最大化奖励,但约束"代价"(如碰撞次数、能耗)不超过阈值。
2.2 代码建模¶
import numpy as np
from dataclasses import dataclass, field
from typing import List, Tuple, Callable
@dataclass # @dataclass自动生成__init__等方法
class CMDPConfig:
"""
约束MDP配置
将标准MDP扩展为包含安全约束的CMDP
"""
state_dim: int # 状态维度
action_dim: int # 动作维度
gamma: float = 0.99 # 折扣因子
cost_functions: List[Callable] = field(default_factory=list) # 代价函数列表
cost_limits: List[float] = field(default_factory=list) # 约束阈值列表
def validate(self):
"""验证约束配置的一致性"""
assert len(self.cost_functions) == len(self.cost_limits), \ # assert断言
"代价函数和约束阈值数量必须一致"
for d in self.cost_limits:
assert d >= 0, "约束阈值必须为非负数"
class SafetyGymEnv:
"""
简化版安全强化学习环境
模拟一个有障碍物的导航任务
"""
def __init__(self, grid_size: int = 10, n_hazards: int = 5):
"""
参数:
grid_size: 网格大小
n_hazards: 危险区域数量
"""
self.grid_size = grid_size
self.goal = np.array([grid_size - 1, grid_size - 1], dtype=float) # np.array创建NumPy数组
self.state = np.array([0.0, 0.0])
# 随机放置危险区域
rng = np.random.default_rng(42)
self.hazards = [
rng.uniform(1, grid_size - 1, size=2) for _ in range(n_hazards)
]
self.hazard_radius = 1.0
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, float, bool]:
"""
执行一步
返回:
next_state: 下一个状态
reward: 奖励(靠近目标 → 正奖励)
cost: 安全代价(进入危险区 → cost=1)
done: 是否结束
"""
# 移动
self.state = np.clip(self.state + action, 0, self.grid_size - 1)
# 奖励:到目标的负距离
dist_to_goal = np.linalg.norm(self.state - self.goal) # np.linalg线性代数运算
reward = -dist_to_goal / self.grid_size
# 安全代价:是否进入危险区
cost = 0.0
for hazard in self.hazards:
if np.linalg.norm(self.state - hazard) < self.hazard_radius:
cost = 1.0 # 进入危险区域
break
done = dist_to_goal < 0.5 # 到达目标
if done:
reward = 10.0 # 到达目标的奖励
return self.state.copy(), reward, cost, done
def reset(self) -> np.ndarray:
"""重置环境"""
self.state = np.array([0.0, 0.0])
return self.state.copy()
3. 拉格朗日方法¶
3.1 原理¶
将约束优化问题转化为无约束的拉格朗日对偶问题:
交替优化: 1. 固定 \(\lambda\),用PPO/TRPO更新策略 \(\pi\) 2. 固定 \(\pi\),用梯度下降更新乘子 \(\lambda\)
3.2 直觉理解¶
拉格朗日乘子 λ 的角色:
┌─────────────────────────────────────────┐
│ λ 小 → 安全约束不紧 → 策略更自由 │
│ λ 大 → 安全约束很紧 → 策略更保守 │
│ │
│ 自动调节机制: │
│ • 当前代价 > 阈值 → λ 增大 → 更保守 │
│ • 当前代价 < 阈值 → λ 减小 → 更激进 │
│ │
│ 最终效果: λ 收敛到使约束恰好满足的值 │
└─────────────────────────────────────────┘
3.3 PPO-Lagrangian 实现¶
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
class PPOLagrangian:
"""
PPO-Lagrangian: 基于拉格朗日乘子的安全PPO
核心思想: 将CMDP转化为对偶问题
- 策略网络最大化 reward - λ * cost
- 乘子 λ 通过梯度下降自动调节
参考: Ray et al., "Benchmarking Safe Exploration in Deep RL" (2019)
"""
def __init__(
self,
state_dim: int,
action_dim: int,
cost_limit: float = 25.0,
lambda_lr: float = 0.05,
policy_lr: float = 3e-4,
gamma: float = 0.99,
clip_eps: float = 0.2
):
"""
参数:
state_dim: 状态空间维度
action_dim: 动作空间维度
cost_limit: 安全约束阈值(每回合平均代价上限)
lambda_lr: 拉格朗日乘子的学习率
policy_lr: 策略网络的学习率
gamma: 折扣因子
clip_eps: PPO裁剪范围
"""
# 策略网络
self.actor = nn.Sequential(
nn.Linear(state_dim, 64),
nn.Tanh(),
nn.Linear(64, 64),
nn.Tanh(),
nn.Linear(64, action_dim),
)
# 奖励价值网络
self.reward_critic = nn.Sequential(
nn.Linear(state_dim, 64),
nn.Tanh(),
nn.Linear(64, 64),
nn.Tanh(),
nn.Linear(64, 1),
)
# 代价价值网络(额外的Critic估计代价回报)
self.cost_critic = nn.Sequential(
nn.Linear(state_dim, 64),
nn.Tanh(),
nn.Linear(64, 64),
nn.Tanh(),
nn.Linear(64, 1),
)
self.optimizer = optim.Adam(
list(self.actor.parameters()) +
list(self.reward_critic.parameters()) +
list(self.cost_critic.parameters()),
lr=policy_lr
)
# 拉格朗日乘子(可学习参数,log空间确保非负)
self.log_lambda = torch.tensor(0.0, requires_grad=True)
self.lambda_optimizer = optim.Adam([self.log_lambda], lr=lambda_lr)
self.cost_limit = cost_limit
self.gamma = gamma
self.clip_eps = clip_eps
@property # @property将方法变为属性访问
def lam(self) -> float:
"""当前的拉格朗日乘子值(通过exp确保非负)"""
return self.log_lambda.exp().item() # 将单元素张量转为Python数值
def update(self, batch: dict):
"""
执行一次PPO-Lagrangian更新
参数:
batch: 包含以下键的字典
- states: 状态 (N, state_dim)
- actions: 动作 (N, action_dim)
- rewards: 奖励 (N,)
- costs: 代价 (N,)
- old_log_probs: 旧策略下的log概率 (N,)
- reward_advantages: 奖励优势函数 (N,)
- cost_advantages: 代价优势函数 (N,)
"""
states = batch['states']
actions = batch['actions']
old_log_probs = batch['old_log_probs']
reward_adv = batch['reward_advantages']
cost_adv = batch['cost_advantages']
# === Step 1: 更新策略 ===
# 计算新的log概率
action_dist = torch.distributions.Normal(self.actor(states), 0.5)
new_log_probs = action_dist.log_prob(actions).sum(-1)
# 重要性采样比率
ratio = (new_log_probs - old_log_probs).exp()
# PPO裁剪目标(奖励部分)
surr1 = ratio * reward_adv
surr2 = torch.clamp(ratio, 1 - self.clip_eps, 1 + self.clip_eps) * reward_adv
reward_loss = -torch.min(surr1, surr2).mean()
# 代价部分(拉格朗日惩罚)
cost_surr1 = ratio * cost_adv
cost_surr2 = torch.clamp(ratio, 1 - self.clip_eps, 1 + self.clip_eps) * cost_adv
cost_loss = torch.max(cost_surr1, cost_surr2).mean()
# 总损失 = 奖励损失 + λ × 代价损失
lam = self.log_lambda.exp().detach() # 分离计算图,不参与梯度计算
total_policy_loss = reward_loss + lam * cost_loss
self.optimizer.zero_grad() # 清零梯度
total_policy_loss.backward() # 反向传播计算梯度
self.optimizer.step() # 更新参数
# === Step 2: 更新拉格朗日乘子 ===
# 如果平均代价超过阈值 → 增大λ → 策略更保守
avg_cost = batch['costs'].sum() / batch.get('n_episodes', 1)
lambda_loss = -self.log_lambda * (avg_cost - self.cost_limit)
self.lambda_optimizer.zero_grad()
lambda_loss.backward()
self.lambda_optimizer.step()
return {
'reward_loss': reward_loss.item(),
'cost_loss': cost_loss.item(),
'lambda': self.lam,
'avg_cost': avg_cost.item()
}
4. 主要安全RL算法¶
4.1 算法对比¶
| 算法 | 年份 | 核心思想 | 约束满足 | 计算开销 |
|---|---|---|---|---|
| PPO-Lagrangian | 2019 | 拉格朗日对偶 | 近似满足 | 低 |
| CPO | 2017 | 信赖域+约束 | 严格满足 | 高 |
| PCPO | 2020 | 投影约束策略优化 | 严格满足 | 中 |
| FOCOPS | 2020 | 一阶约束优化 | 近似满足 | 低 |
| CUP | 2022 | 约束更新投影 | 严格满足 | 中 |
| SafetyLayer | 2017 | 动作空间投影 | 严格满足 | 低 |
4.2 CPO(Constrained Policy Optimization)¶
Joshua Achiam et al., 2017 提出的CPO是安全RL的里程碑算法。
核心思想:在TRPO的信赖域框架中加入安全约束,保证每次更新: 1. 奖励单调提升(来自TRPO的保证) 2. 代价不超过限制(来自约束条件)
更新规则:
其中 \(A^R\) 和 \(A^C\) 分别是奖励和代价的优势函数。
class CPOUpdate:
"""
CPO更新步骤的简化实现
通过二次规划在信赖域内找到满足约束的最优策略更新方向
"""
def __init__(self, delta: float = 0.01, cost_limit: float = 25.0):
"""
参数:
delta: 信赖域大小(KL散度上限)
cost_limit: 每回合代价上限
"""
self.delta = delta
self.cost_limit = cost_limit
def compute_update(
self,
reward_grad: torch.Tensor,
cost_grad: torch.Tensor,
fisher_matrix: torch.Tensor,
current_cost: float
) -> torch.Tensor:
"""
计算CPO的策略更新方向
核心: 在Fisher信息矩阵定义的信赖域内,
找到最大化奖励梯度且满足代价约束的方向
参数:
reward_grad: 奖励目标的梯度 g
cost_grad: 代价约束的梯度 b
fisher_matrix: Fisher信息矩阵 H (近似KL的Hessian)
current_cost: 当前策略的平均代价 J_C(π_k)
返回:
策略更新方向 Δθ
"""
# 求解 H^{-1}g 和 H^{-1}b (使用共轭梯度法)
H_inv_g = self._conjugate_gradient(fisher_matrix, reward_grad)
H_inv_b = self._conjugate_gradient(fisher_matrix, cost_grad)
# 计算二次规划的系数
q = reward_grad.dot(H_inv_g) # g^T H^{-1} g
r = reward_grad.dot(H_inv_b) # g^T H^{-1} b
s = cost_grad.dot(H_inv_b) # b^T H^{-1} b
c = current_cost - self.cost_limit # 约束余量
# 分情况求解(详见CPO论文Appendix)
if s == 0:
# 代价不依赖于策略更新方向 → 退化为标准TRPO
lam = torch.sqrt(q / (2 * self.delta))
update = H_inv_g / lam
elif c < 0:
# 约束已满足,有余量
# 可行域内最大化奖励
lam = torch.sqrt(q / (2 * self.delta))
nu = max(0, (lam * c - r) / s)
update = (H_inv_g - nu * H_inv_b) / lam
else:
# 约束被违反,需要恢复
# 优先满足约束,在约束面上最大化奖励
lam = torch.sqrt(q / (2 * self.delta))
nu = (lam * c - r) / s
update = (H_inv_g - nu * H_inv_b) / lam
return update
def _conjugate_gradient(
self, A: torch.Tensor, b: torch.Tensor, n_iter: int = 10
) -> torch.Tensor:
"""
共轭梯度法求解 Ax = b
避免显式计算和存储 A^{-1}
"""
x = torch.zeros_like(b)
r = b.clone()
p = r.clone()
for _ in range(n_iter):
Ap = A @ p
alpha = r.dot(r) / (p.dot(Ap) + 1e-8)
x = x + alpha * p
r_new = r - alpha * Ap
beta = r_new.dot(r_new) / (r.dot(r) + 1e-8)
p = r_new + beta * p
r = r_new
if r.norm() < 1e-10:
break
return x
4.3 Safety Layer方法¶
最简单的安全RL方法——在动作输出后加一个安全投影层:
import torch
import numpy as np
from typing import Tuple
class SafetyLayer:
"""
Safety Layer: 动作空间安全投影
核心思想: 在策略网络输出动作后,
将不安全的动作投影到最近的安全动作
a_safe = argmin_{a'} ||a' - a_unsafe||²
s.t. c(s, a') ≤ 0
优势: 可以组合任何RL算法,不需修改训练过程
参考: Dalal et al., "Safe Exploration in Continuous Action Spaces" (2018)
"""
def __init__(self, constraint_model: nn.Module, safety_margin: float = 0.1):
"""
参数:
constraint_model: 预训练的约束函数模型 c(s,a)
c(s,a) > 0 表示不安全
safety_margin: 安全裕度
"""
self.constraint_model = constraint_model
self.safety_margin = safety_margin
def project_action(
self, state: torch.Tensor, unsafe_action: torch.Tensor
) -> torch.Tensor:
"""
将不安全的动作投影到安全集合
使用线性化近似 + 二次规划求解:
c(s, a) ≈ c(s, a₀) + ∇_a c(s, a₀) · (a - a₀)
"""
state = state.requires_grad_(False)
action = unsafe_action.clone().requires_grad_(True)
# 计算约束值和梯度
cost = self.constraint_model(state, action)
cost.backward()
grad = action.grad.clone() # ∇_a c(s, a)
if cost.item() + self.safety_margin <= 0:
# 本身就是安全的,不需要投影
return unsafe_action
# 线性化约束下的投影(解析解)
# a_safe = a - [(c + margin) / ||∇c||²] * ∇c
correction = ((cost.item() + self.safety_margin) /
(grad.norm()**2 + 1e-8)) * grad
safe_action = unsafe_action - correction
return safe_action.detach()
5. 安全探索¶
5.1 探索与安全的矛盾¶
标准探索 vs 安全探索:
┌─────────────────────────────┐ ┌────────────────────────────┐
│ 标准ε-greedy探索 │ │ 安全探索 │
│ • 随机选择动作 │ │ • 只在安全集合内随机 │
│ • 可能触发危险 │ │ • 需要安全知识/模型 │
│ • 适合仿真环境 │ │ • 适合真实环境部署 │
│ • 简单高效 │ │ • 计算开销更大 │
└─────────────────────────────┘ └────────────────────────────┘
5.2 常见安全探索方法¶
| 方法 | 原理 | 适用场景 |
|---|---|---|
| Shield | 预计算安全动作集 | 离散动作空间 |
| CBF (控制屏障函数) | 基于Lyapunov稳定性理论 | 连续控制 |
| Recovery RL | 学习恢复策略+任务策略 | 通用 |
| Constrained BO | 贝叶斯优化+约束 | 样本极珍贵 |
5.3 控制屏障函数(CBF)¶
class ControlBarrierFunction:
"""
控制屏障函数(CBF): 安全集合不变性保证
核心思想:
定义安全集合 C = {x : h(x) ≥ 0}
设计控制输入使 ḣ(x) + α*h(x) ≥ 0
→ 保证系统永远不会离开安全集合
适用于连续状态和动作空间的安全控制
"""
def __init__(self, barrier_fn, dynamics_fn, alpha: float = 1.0):
"""
参数:
barrier_fn: 屏障函数 h(x),h(x)≥0 表示安全
dynamics_fn: 系统动力学 ẋ = f(x) + g(x)u
alpha: CBF参数(越大越保守)
"""
self.h = barrier_fn # 屏障函数
self.f = dynamics_fn # 动力学模型
self.alpha = alpha
def safe_action(
self, state: np.ndarray, desired_action: np.ndarray
) -> np.ndarray:
"""
计算满足CBF约束的最近安全动作
优化问题:
min ||u - u_desired||²
s.t. Lf h(x) + Lg h(x) · u + α h(x) ≥ 0
其中 Lf, Lg 是Lie导数
"""
# 计算屏障函数值
h_val = self.h(state)
# 计算Lie导数(CBF约束的线性系数)
Lf_h = self._lie_derivative_f(state) # 漂移项
Lg_h = self._lie_derivative_g(state) # 控制项
# CBF约束: Lf_h + Lg_h @ u + alpha * h >= 0
constraint_val = Lf_h + Lg_h @ desired_action + self.alpha * h_val
if constraint_val >= 0:
# 期望动作是安全的
return desired_action
# 否则,投影到约束面
# QP解析解: u* = u_des + λ * Lg_h^T
# λ = max(0, -(Lf_h + Lg_h @ u_des + α*h) / ||Lg_h||²)
lam = -constraint_val / (np.dot(Lg_h, Lg_h) + 1e-8) # np.dot矩阵/向量点乘
safe_u = desired_action + lam * Lg_h
return safe_u
def _lie_derivative_f(self, state):
"""计算关于漂移动力学的Lie导数 Lf h"""
# 简化实现:使用数值差分
eps = 1e-4
h0 = self.h(state)
grad_h = np.zeros_like(state)
for i in range(len(state)):
state_plus = state.copy()
state_plus[i] += eps
grad_h[i] = (self.h(state_plus) - h0) / eps
f_val = self.f(state, np.zeros(len(state))) # 零输入下的漂移
return np.dot(grad_h, f_val)
def _lie_derivative_g(self, state):
"""计算关于控制的Lie导数 Lg h"""
eps = 1e-4
h0 = self.h(state)
grad_h = np.zeros_like(state)
for i in range(len(state)):
state_plus = state.copy()
state_plus[i] += eps
grad_h[i] = (self.h(state_plus) - h0) / eps
# 假设仿射控制系统 ẋ = f(x) + g(x)u
# Lg_h = grad_h^T @ g(x)
# 简化:假设g(x) = I(单位矩阵)
return grad_h
6. 现实应用案例¶
6.1 自动驾驶中的安全RL¶
自动驾驶安全约束设计:
├── 硬约束(绝对不能违反)
│ ├── 不能闯红灯
│ ├── 不能逆行
│ ├── 不能撞到行人
│ └── 速度不能超过限速
├── 软约束(尽量满足)
│ ├── 保持安全跟车距离
│ ├── 变道时保持平稳
│ └── 减少急刹车频率
└── 优化目标
├── 高效到达目的地
├── 乘客舒适度
└── 能耗最小化
6.2 机器人安全交互¶
class SafeRobotController:
"""
安全机器人控制器示例
约束: 关节力矩限制 + 与人体距离限制
"""
def __init__(self, max_torque: float = 50.0, min_human_dist: float = 0.3):
"""
参数:
max_torque: 最大关节力矩 (Nm)
min_human_dist: 与人体最小距离 (m)
"""
self.max_torque = max_torque
self.min_human_dist = min_human_dist
def cost_torque(self, state: dict, action: np.ndarray) -> float:
"""力矩超限代价"""
# 超过最大力矩的部分
excess = np.maximum(np.abs(action) - self.max_torque, 0)
return float(np.sum(excess))
def cost_distance(self, state: dict, action: np.ndarray) -> float:
"""距离人体过近的代价"""
human_pos = state.get('human_position', None)
robot_pos = state.get('robot_position', None)
if human_pos is None or robot_pos is None:
return 0.0
dist = np.linalg.norm(np.array(robot_pos) - np.array(human_pos))
if dist < self.min_human_dist:
return 1.0 # 距离过近,代价=1
return 0.0
def get_cost_functions(self):
"""返回所有代价函数及其阈值"""
return [
{'fn': self.cost_torque, 'limit': 0.0, 'name': '力矩限制'},
{'fn': self.cost_distance, 'limit': 0.0, 'name': '人体距离'},
]
7. 开源工具与库¶
7.1 Safety Gymnasium¶
import safety_gymnasium
# 创建安全环境
env = safety_gymnasium.make('SafetyPointGoal1-v0')
# 环境返回额外的 cost 信号
obs, info = env.reset()
for _ in range(1000):
action = env.action_space.sample()
obs, reward, cost, terminated, truncated, info = env.step(action)
# reward: 任务奖励
# cost: 安全代价(碰到障碍物=1, 否则=0)
if terminated or truncated:
obs, info = env.reset()
7.2 OmniSafe¶
import omnisafe
# 一行代码训练安全RL Agent
agent = omnisafe.Agent(
algo='PPOLag', # PPO-Lagrangian
env_id='SafetyPointGoal1-v0',
custom_cfgs={
'lagrange_cfgs': {
'cost_limit': 25.0, # 每回合代价上限
'lagrangian_multiplier_init': 0.001,
}
}
)
agent.learn()
# 支持的算法: PPOLag, CPO, PCPO, FOCOPS, CUP, OnCRPO 等
8. 面试要点¶
8.1 高频问题¶
- 什么是CMDP?与MDP有何区别?
-
CMDP增加了代价函数和约束阈值,目标是在约束下最大化奖励
-
拉格朗日方法如何解决约束RL?
-
引入乘子将约束推到目标函数中,交替优化策略和乘子
-
CPO和PPO-Lagrangian的区别?
- CPO在信赖域内直接求解约束优化(更理论完备)
-
PPO-Lag将约束隐含在乘子惩罚中(更简单实用)
-
Safety Layer的优势和劣势?
- 优势:可搭配任何RL算法,推理时保证安全
-
劣势:需要预训练约束模型,线性化近似可能不准
-
CBF如何保证安全?
- 通过屏障函数定义安全集合,Lie导数条件保证集合不变性
📌 关键要点总结¶
- 安全RL通过CMDP形式化——在约束下优化奖励
- 拉格朗日方法是最常用的框架——自动调节安全严格程度
- CPO提供了理论最优解,PPO-Lag提供了实用方案
- Safety Layer和CBF可在推理时直接保证安全
- 现实中通常需要多层安全机制——训练时约束 + 推理时投影 + 监控
📚 延伸阅读¶
- 奖励设计与Reward Hacking
- PPO算法
- RLHF与人类反馈
- 论文: Achiam et al., "Constrained Policy Optimization" (2017)
- 论文: Ray et al., "Benchmarking Safe Exploration in Deep RL" (2019)
- 工具: OmniSafe (PKU)