跳转至

06 - 死锁

死锁条件与资源环路示意图

建议学习时间:2.5 小时 🎯 难度等级:⭐⭐⭐⭐ 📋 前置知识:进程与线程、同步与互斥基础


📋 本章目录


一、死锁的定义与必要条件

1.1 什么是死锁?

死锁(Deadlock) 是指两个或多个进程各自持有某些资源,同时等待对方释放资源,导致所有进程都无限期阻塞的状态。

Text Only
  进程 A                    进程 B
  ┌──────┐                ┌──────┐
  │持有 R1│──── 请求 R2 ──→│持有 R2│
  │      │←── 请求 R1 ────│      │
  └──────┘                └──────┘
       ↑                      ↑
       └──── 互相等待,永久阻塞 ─┘

1.2 死锁的四个必要条件

死锁发生当且仅当以下四个条件同时成立:

条件 英文名 说明 类比
互斥 Mutual Exclusion 资源不能被多个进程共享,一次只能被一个进程使用 一把锁只能一个人持有
持有并等待 Hold and Wait 进程持有已获得的资源,同时等待获取其他资源 拿着一把钥匙等另一把
不可剥夺 No Preemption 已分配给进程的资源不能被强制收回,只能由进程主动释放 不能抢别人手中的钥匙
循环等待 Circular Wait 存在进程的循环等待链:P1→P2→P3→…→P1 一圈人互相等对方传钥匙

⚠️ 破坏任何一个条件就能阻止死锁发生。这是死锁预防策略的理论基础。


二、资源分配图

2.1 有向图表示

资源分配图(Resource Allocation Graph, RAG) 用有向图描述进程与资源之间的关系:

  • 圆形节点:进程(P1, P2, …)
  • 方形节点:资源类型(R1, R2, …),其中的小圆点表示资源实例数
  • 请求边:P → R(进程请求资源)
  • 分配边:R → P(资源已分配给进程)
Text Only
示例:无死锁
                    ┌────┐
          请求      │ R1 │
  P1 ─────────────→ │ ●  │
                    └────┘
                      │ 分配
                    ┌────┐
                    │ P2 │
                    └────┘

示例:有死锁
                    ┌────┐
          请求      │ R1 │ 分配
  P1 ─────────────→ │ ●  │──────→ P2
   ↑                └────┘         │
   │                               │ 请求
   │ 分配     ┌────┐               │
   └─────────│ R2 │←──────────────┘
             │ ●  │
             └────┘
  P1 持有 R2,请求 R1
  P2 持有 R1,请求 R2  → 循环等待 → 死锁!

2.2 检测环路

  • 如果资源分配图中无环,则一定无死锁
  • 如果资源分配图中有环
  • 每种资源只有一个实例一定死锁
  • 每种资源有多个实例可能死锁(需进一步分析)

三、死锁预防

通过在系统设计阶段破坏四个必要条件之一来预防死锁:

3.1 破坏互斥条件

方法:将资源改为可共享访问。

可行性 说明
部分可行 只读文件、SPOOLing 技术(如打印机假脱机)可以共享
大部分不可行 很多资源本质上不可共享(如写操作、锁)

3.2 破坏持有并等待条件

方法:要求进程在开始执行前一次性申请所有需要的资源。

Text Only
进程开始前:
  申请(R1, R2, R3) → 全部获得才运行,否则等待

优点:简单,不会死锁
缺点:① 资源利用率低(早早占有但暂不使用)
      ② 可能导致饥饿(需要多种资源的进程长时间得不到运行)
      ③ 进程可能事先不知道需要哪些资源

3.3 破坏不可剥夺条件

方法: - 若进程请求新资源失败,则释放已持有的所有资源,重新申请 - 或者允许高优先级进程强行抢占低优先级进程的资源

Text Only
进程 P 持有R1,请求R2失败:
  释放 R1 → 同时请求 R1 和 R2 → 重新尝试

优点:适用于状态可保存/恢复的资源(CPU寄存器、内存页面)
缺点:不适用于不可保存状态的资源(打印机正在打印不能中途剥夺)

3.4 破坏循环等待条件(最实用)

方法:对所有资源进行全局编号,进程必须按编号递增的顺序请求资源。

Text Only
资源编号:R1=1, R2=2, R3=3, R4=4

规则:进程必须先申请编号小的资源,再申请编号大的

合法序列:  锁(R1) → 锁(R3) → 锁(R4)     ✓
非法序列:  锁(R3) → 锁(R1)                ✗(违反顺序)

为什么有效?不可能形成环路!
  如果 P1 持有 Ri 并请求 Rj (j > i),
  而 P2 持有 Rj 并请求 Rk (k > j),
  不可能存在某个进程持有大号资源并请求小号资源,
  因此不会形成 P1→P2→...→P1 的环。

💡 锁排序(Lock Ordering) 是实际编程中最常用的死锁预防策略。

3.5 预防策略对比

策略 破坏条件 实用性 副作用
资源共享 互斥 大多数资源无法共享
一次性申请 持有并等待 资源利用率低、饥饿
可剥夺 不可剥夺 资源状态需可保存
资源编号 循环等待 需全局规划编号

四、死锁避免 —— 银行家算法

4.1 安全状态与不安全状态

  • 安全状态:系统能找到一个进程执行顺序(安全序列),使所有进程都能顺利完成
  • 不安全状态不存在安全序列(不一定死锁,但有死锁风险)
  • 死锁状态 ⊂ 不安全状态
Text Only
                ┌────────────────────┐
                │    所有状态         │
                │  ┌──────────────┐  │
                │  │ 不安全状态    │  │
                │  │ ┌──────────┐ │  │
                │  │ │ 死锁状态  │ │  │
                │  │ └──────────┘ │  │
                │  └──────────────┘  │
                │    安全状态         │
                └────────────────────┘

4.2 银行家算法(Banker's Algorithm)

Dijkstra 提出,类比银行贷款:银行家不会把所有现金全部贷出,确保始终能满足至少一个客户的最大需求。

核心数据结构

设系统有 n 个进程、m 种资源:

数据结构 维度 说明
Available[m] 向量 系统当前可用的各种资源数量
Max[n][m] 矩阵 每个进程对每种资源的最大需求量
Allocation[n][m] 矩阵 每个进程当前已分配的各种资源数量
Need[n][m] 矩阵 每个进程还需要的各种资源数量(Need = Max - Allocation)

安全性检查算法

Text Only
1. 初始化:Work = Available, Finish[i] = false (i=0..n-1)
2. 查找满足以下条件的进程 Pi:
   Finish[i] == false 且 Need[i] <= Work
3. 若找到:
   Work = Work + Allocation[i]   // Pi 完成后释放资源
   Finish[i] = true
   回到步骤 2
4. 若所有 Finish[i] == true → 系统处于安全状态
   否则 → 不安全状态

资源请求算法

Text Only
进程 Pi 请求资源 Request[i]:
1. 若 Request[i] > Need[i] → 错误(超出最大需求)
2. 若 Request[i] > Available → 等待(资源不足)
3. 假装分配(试探性分配):
   Available = Available - Request[i]
   Allocation[i] = Allocation[i] + Request[i]
   Need[i] = Need[i] - Request[i]
4. 运行安全性检查:
   安全 → 正式分配
   不安全 → 回滚假分配,进程 Pi 等待

4.3 完整 Python 实现

Python
import numpy as np

class BankersAlgorithm:
    """银行家算法完整实现"""

    def __init__(self, available, max_matrix, allocation_matrix):
        self.available = np.array(available)
        self.max_matrix = np.array(max_matrix)
        self.allocation = np.array(allocation_matrix)
        self.need = self.max_matrix - self.allocation
        self.n_processes = len(max_matrix)
        self.n_resources = len(available)

    def is_safe(self):
        """安全性检查算法,返回 (是否安全, 安全序列)"""
        work = self.available.copy()
        finish = [False] * self.n_processes
        safe_sequence = []

        while True:
            found = False
            for i in range(self.n_processes):
                if not finish[i] and all(self.need[i] <= work):
                    # 进程 i 可以完成
                    work += self.allocation[i]
                    finish[i] = True
                    safe_sequence.append(f"P{i}")
                    found = True
                    break  # 从头重新扫描

            if not found:
                break

        is_safe = all(finish)
        return is_safe, safe_sequence

    def request_resources(self, process_id, request):
        """进程 process_id 请求资源"""
        request = np.array(request)
        pid = process_id

        # 步骤1:检查请求是否超过需求
        if any(request > self.need[pid]):
            return False, "错误:请求超过最大需求"

        # 步骤2:检查资源是否足够
        if any(request > self.available):
            return False, "资源不足,进程需等待"

        # 步骤3:试探性分配
        self.available -= request
        self.allocation[pid] += request
        self.need[pid] -= request

        # 步骤4:安全性检查
        safe, sequence = self.is_safe()

        if safe:
            return True, f"分配成功!安全序列: {' → '.join(sequence)}"
        else:
            # 回滚
            self.available += request
            self.allocation[pid] -= request
            self.need[pid] += request
            return False, "分配会导致不安全状态,已回滚"

    def display_state(self):
        """显示当前系统状态"""
        print("=" * 60)
        print(f"Available (可用资源): {self.available}")
        print()
        headers = [f"R{j}" for j in range(self.n_resources)]
        print(f"{'进程':<6} {'Allocation':<20} {'Max':<20} {'Need':<20}")
        print("-" * 60)
        for i in range(self.n_processes):
            alloc_str = str(self.allocation[i])
            max_str = str(self.max_matrix[i])
            need_str = str(self.need[i])
            print(f"P{i:<5} {alloc_str:<20} {max_str:<20} {need_str:<20}")
        print("=" * 60)

# ==================== 测试示例 ====================
if __name__ == "__main__":
    # 系统有 3 种资源 (A, B, C)
    available = [3, 3, 2]

    # 5 个进程的最大需求
    max_matrix = [
        [7, 5, 3],  # P0
        [3, 2, 2],  # P1
        [9, 0, 2],  # P2
        [2, 2, 2],  # P3
        [4, 3, 3],  # P4
    ]

    # 当前已分配资源
    allocation_matrix = [
        [0, 1, 0],  # P0
        [2, 0, 0],  # P1
        [3, 0, 2],  # P2
        [2, 1, 1],  # P3
        [0, 0, 2],  # P4
    ]

    banker = BankersAlgorithm(available, max_matrix, allocation_matrix)

    print("【初始状态】")
    banker.display_state()

    # 安全性检查
    safe, sequence = banker.is_safe()
    print(f"\n系统是否安全: {safe}")
    print(f"安全序列: {' → '.join(sequence)}")

    # P1 请求 (1, 0, 2)
    print("\n【P1 请求资源 (1, 0, 2)】")
    success, msg = banker.request_resources(1, [1, 0, 2])
    print(msg)

    if success:
        banker.display_state()

    # P4 请求 (3, 3, 0)
    print("\n【P4 请求资源 (3, 3, 0)】")
    success, msg = banker.request_resources(4, [3, 3, 0])
    print(msg)

运行输出

Text Only
【初始状态】
Available (可用资源): [3 3 2]
进程     Allocation           Max                  Need
------------------------------------------------------------
P0     [0 1 0]              [7 5 3]              [7 4 3]
P1     [2 0 0]              [3 2 2]              [1 2 2]
P2     [3 0 2]              [9 0 2]              [6 0 0]
P3     [2 1 1]              [2 2 2]              [0 1 1]
P4     [0 0 2]              [4 3 3]              [4 3 1]

系统是否安全: True
安全序列: P1 → P3 → P4 → P0 → P2

【P1 请求资源 (1, 0, 2)】
分配成功!安全序列: P1 → P3 → P4 → P0 → P2

【P4 请求资源 (3, 3, 0)】
资源不足,进程需等待

五、死锁检测

5.1 等待图(Wait-For Graph)

当每种资源只有一个实例时,可将资源分配图简化为等待图

Text Only
资源分配图:                等待图(去掉资源节点):
P1 → R1 → P2              P1 → P2
P2 → R2 → P3              P2 → P3
P3 → R3 → P1              P3 → P1  → 有环 → 死锁!

5.2 检测算法(多实例资源)

类似银行家算法的安全性检查,但不检查 Need 矩阵,而是检查当前的请求矩阵 Request:

Python
def detect_deadlock(available, allocation, request):
    """
    死锁检测算法
    available: 当前可用资源向量
    allocation: 当前分配矩阵
    request: 当前请求矩阵(各进程等待的资源)
    返回: (是否有死锁, 死锁进程列表)
    """
    n = len(allocation)
    work = available.copy()
    finish = [False] * n

    # 初始:Allocation[i] 全为 0 的进程标记完成(不占资源)
    for i in range(n):
        if all(a == 0 for a in allocation[i]):
            finish[i] = True

    changed = True
    while changed:
        changed = False
        for i in range(n):
            if not finish[i] and all(request[i][j] <= work[j]
                                     for j in range(len(work))):
                # 进程 i 的请求可被满足 → 假设其完成并释放资源
                work = [work[j] + allocation[i][j] for j in range(len(work))]
                finish[i] = True
                changed = True

    deadlocked = [f"P{i}" for i in range(n) if not finish[i]]
    return len(deadlocked) > 0, deadlocked

# 测试
available = [0, 0, 0]
allocation = [
    [0, 1, 0],
    [2, 0, 0],
    [3, 0, 3],
    [2, 1, 1],
    [0, 0, 2],
]
request = [
    [0, 0, 0],
    [2, 0, 2],
    [0, 0, 0],
    [1, 0, 0],
    [0, 0, 2],
]

has_deadlock, processes = detect_deadlock(available, allocation, request)
print(f"存在死锁: {has_deadlock}")
print(f"死锁进程: {processes}")

5.3 检测频率

  • 每次资源请求失败时检测 —— 开销大但能立即发现
  • 定期检测(如每分钟) —— 折中方案
  • CPU 利用率下降时检测 —— 利用抖动信号

六、死锁恢复

检测到死锁后,需要恢复系统运行。主要有两种方法:

6.1 进程终止

策略 说明 代价
终止所有死锁进程 简单粗暴,一定能解除 代价最大
逐个终止 每次终止一个成本最小的进程,然后重新检测 需要反复检测

选择终止哪个进程的依据: - 进程优先级 - 进程已运行时间 / 剩余时间 - 进程已使用的资源量 - 进程是否可安全重启 - 进程类型(交互式 vs 批处理)

6.2 资源剥夺

从某些进程中强制收回资源,分配给死锁进程: - 选择"牺牲者"(被剥夺资源的进程) - 回滚牺牲者到安全状态 - 注意避免饥饿——不能总是选同一个进程当牺牲者


七、活锁与饥饿

7.1 活锁(Livelock)

进程没有被阻塞,而是不断重试但总是失败——在"忙等待"中没有任何实质进展。

Text Only
经典比喻:两个人在走廊相遇
  - 两人同时向左让路 → 仍然挡住
  - 两人同时向右让路 → 仍然挡住
  - 不断重复...谁都过不去

与死锁的区别: - 死锁:进程都被阻塞,不消耗 CPU - 活锁:进程都在运行(消耗 CPU),但没有进展

解决方法:引入随机退避时间(如网络协议中的指数退避)。

7.2 饥饿(Starvation)

某个进程长时间得不到资源,尽管系统没有死锁。

原因:不公平的调度或资源分配策略(如总是优先服务短进程、高优先级进程)

解决方法老化(Aging)机制——随着等待时间增加,逐步提升进程的优先级

7.3 三者对比

特征 死锁 活锁 饥饿
进程状态 阻塞 运行(忙等) 就绪但等待
CPU 消耗 有(浪费)
涉及进程 多进程循环等待 多进程互相让步 单进程被忽视
能否自行解除 不能 可能(随机化) 可能(老化)

八、实际中的死锁案例

8.1 数据库死锁(SELECT FOR UPDATE)

SQL
-- 事务A                          -- 事务B
BEGIN;                             BEGIN;  -- 事务保证操作原子性
SELECT * FROM accounts             SELECT * FROM accounts
  WHERE id = 1 FOR UPDATE;           WHERE id = 2 FOR UPDATE;
-- 锁定了 id=1                    -- 锁定了 id=2

SELECT * FROM accounts             SELECT * FROM accounts
  WHERE id = 2 FOR UPDATE;           WHERE id = 1 FOR UPDATE;
-- 等待 id=2 的锁...              -- 等待 id=1 的锁...
-- 死锁!

解决方案: 1. 锁排序:总是按 id 从小到大的顺序加锁 2. 超时机制:设置 innodb_lock_wait_timeout 3. MySQL 自动检测:InnoDB 会检测死锁并回滚其中一个事务

SQL
-- MySQL 查看死锁信息
SHOW ENGINE INNODB STATUS;

-- 设置锁等待超时(秒)
SET innodb_lock_wait_timeout = 5;

8.2 Java 多线程死锁演示

Java
public class DeadlockDemo {
    private static final Object lockA = new Object();
    private static final Object lockB = new Object();

    public static void main(String[] args) {
        // 线程1:先锁A再锁B
        Thread t1 = new Thread(() -> {
            synchronized (lockA) {  // synchronized同步锁,保证线程安全
                System.out.println("Thread1: 持有 lockA");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                System.out.println("Thread1: 等待 lockB...");
                synchronized (lockB) {
                    System.out.println("Thread1: 获得 lockB");
                }
            }
        });

        // 线程2:先锁B再锁A → 与线程1顺序相反 → 死锁!
        Thread t2 = new Thread(() -> {
            synchronized (lockB) {
                System.out.println("Thread2: 持有 lockB");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                System.out.println("Thread2: 等待 lockA...");
                synchronized (lockA) {
                    System.out.println("Thread2: 获得 lockA");
                }
            }
        });

        t1.start();
        t2.start();
    }
}

检测方法

Bash
# 获取 Java 进程的线程栈(含死锁检测)
jstack <pid>
# 输出中会显示:
# Found one Java-level deadlock:
# =============================
# "Thread-0": waiting to lock monitor ...
# "Thread-1": waiting to lock monitor ...

8.3 Python 多线程死锁演示

Python
import threading
import time

lock_a = threading.Lock()
lock_b = threading.Lock()

def thread1_func():
    lock_a.acquire()
    print("Thread1: 持有 lock_a")
    time.sleep(0.1)

    print("Thread1: 等待 lock_b...")
    lock_b.acquire()  # 阻塞!Thread2 持有 lock_b
    print("Thread1: 获得 lock_b")

    lock_b.release()
    lock_a.release()

def thread2_func():
    lock_b.acquire()
    print("Thread2: 持有 lock_b")
    time.sleep(0.1)

    print("Thread2: 等待 lock_a...")
    lock_a.acquire()  # 阻塞!Thread1 持有 lock_a
    print("Thread2: 获得 lock_a")

    lock_a.release()
    lock_b.release()

t1 = threading.Thread(target=thread1_func)
t2 = threading.Thread(target=thread2_func)
t1.start()
t2.start()
t1.join(timeout=5)
t2.join(timeout=5)

if t1.is_alive() or t2.is_alive():
    print("检测到死锁!程序无法正常结束。")

Python 中避免死锁:使用 threading.Lockacquire(timeout=...) 设置超时。

Python
# 使用 timeout 避免永久等待
if not lock_b.acquire(timeout=2):
    print("获取 lock_b 超时,释放已持有的锁")
    lock_a.release()

8.4 哲学家就餐问题

5 个哲学家围坐圆桌,每两人之间有一根筷子(共 5 根)。每个哲学家需要同时拿起左右两根筷子才能吃饭。

Text Only
        P0
    🥢      🥢
  P4          P1
   🥢        🥢
    P3      P2
        🥢

若所有人同时拿起左手边筷子 → 死锁!

解法一:限制就餐人数

最多允许 4 个哲学家同时拿筷子,保证至少一人能拿到两根。

解法二:破坏循环等待(奇偶编号法)

Python
import threading

class DiningPhilosophers:
    """哲学家就餐问题 — 奇偶编号法"""

    def __init__(self, n=5):
        self.n = n
        self.chopsticks = [threading.Lock() for _ in range(n)]

    def philosopher(self, pid):
        left = pid
        right = (pid + 1) % self.n

        # 奇数编号的哲学家先拿右后拿左(破坏循环等待)
        if pid % 2 == 0:
            first, second = left, right
        else:
            first, second = right, left

        for _ in range(3):  # 吃3次
            # 思考
            print(f"哲学家{pid} 正在思考")

            # 拿筷子
            self.chopsticks[first].acquire()
            self.chopsticks[second].acquire()

            print(f"哲学家{pid} 正在吃饭 🍜")

            # 放下筷子
            self.chopsticks[second].release()
            self.chopsticks[first].release()

        print(f"哲学家{pid} 吃饱了")

if __name__ == "__main__":
    dp = DiningPhilosophers(5)
    threads = []
    for i in range(5):
        t = threading.Thread(target=dp.philosopher, args=(i,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print("所有哲学家都吃饱了,没有发生死锁!")

解法三:使用 Semaphore 限制同时就餐人数

Python
import threading

chopsticks = [threading.Lock() for _ in range(5)]
semaphore = threading.Semaphore(4)  # 最多4人同时尝试拿筷子

def philosopher(pid):
    left = pid
    right = (pid + 1) % 5

    for _ in range(3):
        semaphore.acquire()  # 进入餐厅(最多4人)
        chopsticks[left].acquire()
        chopsticks[right].acquire()

        print(f"哲学家{pid} 正在吃饭")

        chopsticks[right].release()
        chopsticks[left].release()
        semaphore.release()  # 离开餐厅

threads = [threading.Thread(target=philosopher, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print("无死锁完成!")

九、如何避免死锁的实践建议

9.1 锁排序(Lock Ordering)

最重要的规则:为所有锁定义一个全局顺序,所有线程都按相同顺序获取锁。

Python
# ✗ 错误:两个线程以不同顺序获取锁
def transfer_bad(from_acc, to_acc, amount):
    from_acc.lock.acquire()
    to_acc.lock.acquire()
    # 如果另一个线程同时执行 transfer(to_acc, from_acc, ...) → 死锁!

# ✓ 正确:始终按 id 从小到大获取锁
def transfer_good(from_acc, to_acc, amount):
    first = min(from_acc, to_acc, key=lambda x: x.id)  # lambda匿名函数:简洁的单行函数
    second = max(from_acc, to_acc, key=lambda x: x.id)
    first.lock.acquire()
    second.lock.acquire()
    # 执行转账...
    second.lock.release()
    first.lock.release()

9.2 超时机制

Python
# Python: 使用 timeout
if lock.acquire(timeout=5):
    try:  # try/except捕获异常
        # 临界区操作
        pass
    finally:
        lock.release()
else:
    print("获取锁超时,进行重试或其他处理")
Java
// Java: 使用 tryLock
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;

ReentrantLock lock = new ReentrantLock();
if (lock.tryLock(5, TimeUnit.SECONDS)) {
    try {  // try/catch捕获异常
        // 临界区操作
    } finally {
        lock.unlock();
    }
} else {
    System.out.println("获取锁超时");
}

9.3 实践清单

建议 说明
✅ 锁排序 定义全局锁顺序,所有线程遵守
✅ 使用超时 tryLock(timeout) 代替无限等待
✅ 最小化锁范围 只在必要的最小代码块内持有锁
✅ 避免嵌套锁 尽量不在持有一个锁时去获取另一个锁
✅ 使用高级并发工具 ConcurrentHashMapBlockingQueue 等无需手动加锁
✅ 监控与日志 生产环境启用死锁检测(如 JVM 死锁检测、数据库死锁日志)
❌ 避免长时间持锁 不在持有锁时做网络请求、磁盘 I/O 等耗时操作

十、面试高频题 📋

题目 1:什么是死锁?发生的四个必要条件是什么?

:死锁是两个或多个进程互相等待对方释放资源,导致所有进程永久阻塞的状态。四个必要条件:① 互斥——资源不可共享;② 持有并等待——持有资源同时等待新资源;③ 不可剥夺——已分配资源不能强制收回;④ 循环等待——进程间形成环形等待链。四个条件必须同时满足才会死锁。

题目 2:如何预防死锁?

:通过破坏四个必要条件之一:① 破坏互斥——使资源可共享(有限适用);② 破坏持有并等待——要求进程一次性申请所有资源;③ 破坏不可剥夺——允许抢占或请求失败时释放已有资源;④ 破坏循环等待——资源编号,按序申请(最实用)。

题目 3:解释银行家算法的核心思想

:银行家算法是一种死锁避免算法。在每次分配资源前,先进行"试探性分配",然后检查系统是否仍处于安全状态(能否找到一个安全序列使所有进程完成)。若安全则批准分配,否则拒绝并让进程等待。核心数据结构为 Available(可用)、Max(最大需求)、Allocation(已分配)、Need(还需要)四个矩阵。

题目 4:死锁预防、避免、检测的区别?

预防——在设计阶段静态地破坏必要条件,限制最严但开销可能大。避免——运行时动态检查每次分配是否安全(如银行家算法),灵活但需要预知最大需求。检测——允许死锁发生,定期运行检测算法,发现后恢复。三种策略由严到宽,对应不同的资源利用率和实现复杂度权衡。

题目 5:什么是活锁?与死锁有什么区别?

:活锁是进程没有被阻塞,但不断重试相同的操作总是失败,没有实质进展。区别:死锁中进程被阻塞不消耗 CPU;活锁中进程在运行但白白消耗 CPU。如两人在走廊互相让路总是同方向。解决方法:引入随机退避时间。

题目 6:请描述一个实际的死锁场景及解决方案

:数据库转账场景:事务 A 锁定账户 1 后请求锁定账户 2,同时事务 B 锁定账户 2 请求锁定账户 1 → 死锁。解决方案:按账户 ID 排序加锁——两个事务都先锁 ID 小的账户再锁 ID 大的,破坏循环等待条件。数据库还可设置锁超时或依赖数据库引擎的自动死锁检测(如 MySQL InnoDB)。

题目 7:哲学家就餐问题有哪些解法?

:① 限制最多 4 人同时拿筷子(Semaphore),保证至少一人能吃;② 奇偶编号法——偶数先拿左手边,奇数先拿右手边,破坏循环等待;③ 仲裁者法——引入服务员角色(互斥锁),一次只允许一人尝试拿筷子;④ Chandy/Misra 解法——基于资源层次编号的分布式方案。

题目 8:Java 中如何检测死锁?

:① 使用 jstack <pid> 命令,输出线程栈并自动检测死锁;② 使用 ThreadMXBeanfindDeadlockedThreads() API 在程序中编程检测;③ JConsole/VisualVM 图形化工具可直接显示死锁信息。检测到后可记录日志、发告警,生产环境中配合超时和重试机制应对。

题目 9:数据库 InnoDB 如何处理死锁?

:InnoDB 维护一个等待图(wait-for graph),在每次锁请求时检测是否形成环路。若检测到死锁,InnoDB 会选择回滚代价最小的事务(通常是修改行数最少的事务),释放其持有的锁,并向客户端返回错误码 1213。开发者应捕获此错误并重试事务

题目 10:生产环境中如何避免死锁?

① 锁排序——所有操作按固定顺序获取锁;② 减小锁粒度——用行级锁代替表级锁;③ 使用超时——tryLock(timeout) 避免永久等待;④ 最小化临界区——持锁时间越短越好;⑤ 使用无锁数据结构——如 ConcurrentHashMap⑥ 监控——开启死锁日志和告警。


十一、练习题 ✏️

练习 1:判断死锁

系统有 3 种资源 A(7)、B(2)、C(6),当前状态:

进程 Allocation Max Available
P0 (0,1,0) (7,5,3) (3,3,2)
P1 (2,0,0) (3,2,2)
P2 (3,0,2) (9,0,2)
P3 (2,1,1) (2,2,2)
P4 (0,0,2) (4,3,3)
  1. 计算 Need 矩阵
  2. 系统是否处于安全状态?给出安全序列
  3. 若 P1 请求 (1,0,2),能否批准?

练习 2:资源分配图

画出以下资源分配图并判断是否死锁: - 资源 R1(1个实例)分配给 P1 - 资源 R2(1个实例)分配给 P2 - P1 请求 R2 - P2 请求 R1

练习 3:编程实现死锁检测

用 Python 实现死锁检测算法,接收 Available、Allocation、Request 三个矩阵作为输入,输出是否存在死锁及死锁进程列表。

练习 4:修复死锁代码

以下 Python 代码存在死锁,请找出原因并修复:

Python
import threading  # 线程池/多线程:并发执行任务

lock1 = threading.Lock()
lock2 = threading.Lock()
lock3 = threading.Lock()

def worker_a():
    lock1.acquire()
    lock2.acquire()
    lock3.acquire()
    print("Worker A done")
    lock3.release()
    lock2.release()
    lock1.release()

def worker_b():
    lock2.acquire()
    lock3.acquire()
    lock1.acquire()
    print("Worker B done")
    lock1.release()
    lock3.release()
    lock2.release()

def worker_c():
    lock3.acquire()
    lock1.acquire()
    lock2.acquire()
    print("Worker C done")
    lock2.release()
    lock1.release()
    lock3.release()

练习 5:设计无死锁的转账系统

设计并实现一个银行转账系统(Python),支持多线程并发转账且不会死锁。要求: - 多个账户,每个账户有余额 - 支持并发转账 transfer(from_id, to_id, amount) - 使用锁排序策略避免死锁 - 编写测试用例验证正确性


📚 延伸阅读

  • 《操作系统概念》(恐龙书)第 8 章 —— 死锁
  • 《现代操作系统》第 6 章 —— 死锁
  • 《Java 并发编程实战》第 10 章 —— 避免活跃性危险
  • MySQL 死锁排查指南
  • Go 死锁检测机制
  • Dijkstra 原始论文:Cooperating Sequential Processes(1965)