08 - 同步与并发¶
⏰ 建议学习时间:3 小时 🎯 难度等级:⭐⭐⭐⭐⭐ 📋 前置知识:进程与线程、CPU调度、内存管理基础
📋 本章目录¶
- 临界区问题
- 互斥锁(Mutex)
- 信号量(Semaphore)
- 条件变量
- 读写锁
- 经典同步问题
- 管程(Monitor)
- 原子操作与CAS
- 内存屏障与happens-before
- 并发原语对照表
- 面试高频题
- 练习题
临界区问题¶
什么是临界区¶
临界区(Critical Section) 是指进程中访问共享资源的那段代码。当多个进程并发执行时,如果不对临界区进行保护,就会产生竞态条件(Race Condition)。
进程结构:
┌──────────────┐
│ 入口区 │ ← 申请进入临界区(加锁)
├──────────────┤
│ 临界区 │ ← 访问共享资源
├──────────────┤
│ 退出区 │ ← 释放临界区(解锁)
├──────────────┤
│ 剩余区 │ ← 其他代码
└──────────────┘
临界区的三个要求¶
| 要求 | 含义 | 说明 |
|---|---|---|
| 互斥(Mutual Exclusion) | 同一时刻只有一个进程在临界区内 | 最基本的要求 |
| 前进(Progress) | 没有进程在临界区时,请求进入的进程应能进入 | 不能无限推迟决策 |
| 有限等待(Bounded Waiting) | 进程请求进入临界区后,等待次数有上限 | 防止饥饿 |
Peterson算法(两进程互斥)¶
Peterson算法是一个经典的软件解决方案,适用于两个进程之间的互斥。
# Peterson算法 —— 两进程互斥的软件实现
# 共享变量
flag = [False, False] # flag[i]=True 表示进程i想要进入临界区
turn = 0 # 指示谁有权进入临界区
def process_0():
"""进程0"""
global turn
flag[0] = True # 表示进程0想进入临界区
turn = 1 # 谦让:让进程1先来
# 等待条件:进程1也想进入 且 轮到进程1
while flag[1] and turn == 1:
pass # 忙等待
# ===== 临界区 =====
# 访问共享资源
# ===== 退出区 =====
flag[0] = False # 进程0不再需要临界区
def process_1():
"""进程1"""
global turn
flag[1] = True # 表示进程1想进入临界区
turn = 0 # 谦让:让进程0先来
while flag[0] and turn == 0:
pass # 忙等待
# ===== 临界区 =====
# 访问共享资源
# ===== 退出区 =====
flag[1] = False
正确性分析:
- 互斥性:假设两个进程同时想进入临界区,
turn值只能是0或1,不可能两个进程同时满足while条件为False,因此只有一个能进入。 - 前进性:如果只有一个进程想进入(如
flag[1]=False),则while条件不满足,直接进入。 - 有限等待:如果进程0在等待,进程1退出临界区后
flag[1]=False,进程0必定能进入。
注意:在现代多核处理器上,由于指令重排序和缓存一致性问题,Peterson算法需要配合内存屏障才能正确工作。
互斥锁(Mutex)¶
概念与使用¶
互斥锁是最基本的同步原语,保证同一时刻只有一个线程持有锁。
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
lock.acquire() # 加锁
counter += 1 # 临界区
lock.release() # 解锁
# 推荐使用 with 语句(自动加锁/解锁)
def increment_safe():
global counter
for _ in range(100000):
with lock:
counter += 1
# 测试
t1 = threading.Thread(target=increment_safe)
t2 = threading.Thread(target=increment_safe)
t1.start(); t2.start()
t1.join(); t2.join()
print(f"Counter: {counter}") # 200000 # Counter计数器:统计元素出现次数
自旋锁 vs 睡眠锁¶
| 特性 | 自旋锁(Spinlock) | 睡眠锁(Mutex/阻塞锁) |
|---|---|---|
| 等待方式 | 忙等待(循环检查) | 阻塞挂起(进入等待队列) |
| CPU消耗 | 持续占用CPU | 不占用CPU |
| 上下文切换 | 无 | 有(挂起和唤醒) |
| 适用场景 | 临界区很短、多核系统 | 临界区较长、单核系统 |
| 持有期间可否睡眠 | 不可以 | 可以 |
| Linux内核对应 | spin_lock | mutex_lock |
// Linux内核中的自旋锁
spinlock_t lock;
spin_lock_init(&lock);
spin_lock(&lock);
// 临界区(必须非常短!)
spin_unlock(&lock);
// Linux内核中的互斥锁
struct mutex lock; // struct结构体:自定义复合数据类型
mutex_init(&lock);
mutex_lock(&lock);
// 临界区(可以较长,可以睡眠)
mutex_unlock(&lock);
选择原则:临界区执行时间 < 两次上下文切换的时间 → 用自旋锁;否则用睡眠锁。
信号量(Semaphore)¶
信号量的概念¶
信号量(Semaphore)是Dijkstra于1965年提出的同步机制,是一个整型变量,通过两个原子操作来访问。
计数信号量与二值信号量¶
| 类型 | 取值范围 | 用途 |
|---|---|---|
| 计数信号量 | 0, 1, 2, 3, ... | 控制对有限资源的访问(如连接池) |
| 二值信号量 | 0, 1 | 等价于互斥锁,实现互斥 |
P操作(wait)和V操作(signal)¶
P操作(wait / down / proberen):
S = S - 1
if S < 0:
将进程加入等待队列,阻塞
V操作(signal / up / verhogen):
S = S + 1
if S <= 0:
从等待队列唤醒一个进程
P和V操作都是原子操作,不可被中断。
用信号量实现互斥¶
import threading
# 二值信号量实现互斥
mutex = threading.Semaphore(1) # 初始值为1
shared_data = 0
def worker(name):
global shared_data
for _ in range(10000):
mutex.acquire() # P操作 (wait)
shared_data += 1 # 临界区
mutex.release() # V操作 (signal)
t1 = threading.Thread(target=worker, args=("T1",))
t2 = threading.Thread(target=worker, args=("T2",))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"shared_data = {shared_data}") # 20000
用信号量实现同步¶
import threading
import time
# 信号量实现同步:保证A在B之前执行
sync_sem = threading.Semaphore(0) # 初始值为0
def task_a():
print("Task A: 完成数据准备")
time.sleep(1) # 模拟耗时操作
print("Task A: 数据准备完成,通知Task B")
sync_sem.release() # V操作,信号量+1
def task_b():
print("Task B: 等待Task A完成...")
sync_sem.acquire() # P操作,如果信号量=0则阻塞
print("Task B: 收到通知,开始处理数据")
t_b = threading.Thread(target=task_b)
t_a = threading.Thread(target=task_a)
t_b.start(); t_a.start()
t_b.join(); t_a.join()
# 输出:
# Task B: 等待Task A完成...
# Task A: 完成数据准备
# Task A: 数据准备完成,通知Task B
# Task B: 收到通知,开始处理数据
条件变量¶
wait / signal / broadcast 语义¶
条件变量允许线程在某个条件不满足时挂起等待,条件满足时被唤醒。
| 操作 | 语义 | 说明 |
|---|---|---|
| wait() | 释放关联的锁,阻塞等待 | 被唤醒后自动重新获取锁 |
| notify() / signal() | 唤醒一个等待的线程 | 不释放自己持有的锁 |
| notify_all() / broadcast() | 唤醒所有等待的线程 | 所有线程竞争获取锁 |
与互斥锁配合使用¶
import threading
import time
condition = threading.Condition() # 内部包含一个Lock
items = []
def producer():
for i in range(5):
time.sleep(0.5)
with condition:
items.append(f"item-{i}")
print(f"[生产者] 生产了 item-{i},通知消费者")
condition.notify() # 唤醒一个等待的消费者
def consumer():
while True:
with condition:
while not items: # 使用while而不是if,防止虚假唤醒
print("[消费者] 没有数据,等待中...")
condition.wait() # 释放锁并等待
item = items.pop(0)
print(f"[消费者] 消费了 {item}")
if item == "item-4":
break
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t2.start(); t1.start()
t1.join(); t2.join()
重要:使用
while循环检查条件而非if,因为可能存在虚假唤醒(Spurious Wakeup)。
读写锁¶
读者优先 vs 写者优先¶
| 策略 | 描述 | 特点 |
|---|---|---|
| 读者优先 | 只要有读者在读,新的读者可以直接进入 | 写者可能饥饿 |
| 写者优先 | 有写者等待时,阻止新的读者进入 | 读者可能饥饿 |
| 公平策略 | 按请求到达的顺序处理 | 无饥饿但性能一般 |
实际应用场景¶
- 数据库系统:查询(读)远多于更新(写)
- 缓存系统:缓存命中(读)远多于缓存更新(写)
- 配置文件:读取频繁,更新偶尔
- DNS缓存:大量查询,少量更新
import threading
import time
class ReadWriteLock:
"""简单的读写锁实现(读者优先)"""
def __init__(self):
self.readers = 0
self.mutex = threading.Lock() # 保护readers计数
self.write_lock = threading.Lock() # 写互斥锁
def acquire_read(self):
with self.mutex:
self.readers += 1
if self.readers == 1:
self.write_lock.acquire() # 第一个读者阻止写者
def release_read(self):
with self.mutex:
self.readers -= 1
if self.readers == 0:
self.write_lock.release() # 最后一个读者释放写锁
def acquire_write(self):
self.write_lock.acquire()
def release_write(self):
self.write_lock.release()
# 示例使用
rw_lock = ReadWriteLock()
shared_data = {"value": 0}
def reader(name):
for _ in range(3):
rw_lock.acquire_read()
print(f"[读者{name}] 读取: {shared_data['value']}")
time.sleep(0.1)
rw_lock.release_read()
def writer(name):
for i in range(3):
rw_lock.acquire_write()
shared_data['value'] += 1
print(f"[写者{name}] 写入: {shared_data['value']}")
time.sleep(0.2)
rw_lock.release_write()
经典同步问题¶
生产者-消费者问题¶
问题描述:生产者产生数据放入有界缓冲区,消费者从缓冲区取数据消费。缓冲区满时生产者等待,缓冲区空时消费者等待。
import threading
import queue
import time
import random
buffer = queue.Queue(maxsize=5)
def producer(name):
for i in range(10):
item = f"{name}-item-{i}"
buffer.put(item)
print(f"[生产者{name}] 生产: {item}, 缓冲区大小: {buffer.qsize()}")
time.sleep(random.uniform(0.1, 0.5))
def consumer(name):
while True:
try: # try/except捕获异常
item = buffer.get(timeout=3)
print(f"[消费者{name}] 消费: {item}, 缓冲区大小: {buffer.qsize()}")
buffer.task_done()
time.sleep(random.uniform(0.2, 0.6))
except queue.Empty:
print(f"[消费者{name}] 超时退出")
break
# 启动
threads = []
for i in range(2):
t = threading.Thread(target=producer, args=(f"P{i}",))
threads.append(t)
for i in range(3):
t = threading.Thread(target=consumer, args=(f"C{i}",))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
信号量版本(经典实现):
import threading
import time
import random
BUFFER_SIZE = 5
buffer = [None] * BUFFER_SIZE
in_idx = 0 # 生产者放入位置
out_idx = 0 # 消费者取出位置
mutex = threading.Semaphore(1) # 互斥访问缓冲区
empty = threading.Semaphore(BUFFER_SIZE) # 空槽位数量
full = threading.Semaphore(0) # 满槽位数量
def producer(name):
global in_idx
for i in range(8):
item = f"{name}-{i}"
empty.acquire() # P(empty):等待空槽位
mutex.acquire() # P(mutex):互斥访问
buffer[in_idx] = item
print(f"[生产者{name}] 放入 {item} 到位置 {in_idx}")
in_idx = (in_idx + 1) % BUFFER_SIZE
mutex.release() # V(mutex)
full.release() # V(full):增加一个满槽位
time.sleep(random.uniform(0.1, 0.3))
def consumer(name):
global out_idx
for _ in range(8):
full.acquire() # P(full):等待满槽位
mutex.acquire() # P(mutex):互斥访问
item = buffer[out_idx]
print(f"[消费者{name}] 取出 {item} 从位置 {out_idx}")
out_idx = (out_idx + 1) % BUFFER_SIZE
mutex.release() # V(mutex)
empty.release() # V(empty):增加一个空槽位
time.sleep(random.uniform(0.2, 0.4))
# 注意: P操作顺序不能颠倒!先P(empty/full)再P(mutex),否则可能死锁
读者-写者问题¶
问题描述:多个读者可以同时读,但写者必须独占访问。
import threading
import time
import random
# === 读者优先版本 ===
class ReaderWriterProblem:
def __init__(self):
self.data = "初始数据"
self.reader_count = 0
self.mutex = threading.Lock() # 保护reader_count
self.write_lock = threading.Lock() # 写互斥(读写互斥、写写互斥)
def reader(self, name):
for _ in range(3):
# 读者进入
with self.mutex:
self.reader_count += 1
if self.reader_count == 1:
self.write_lock.acquire() # 第一个读者锁定写者
# 读取数据(多个读者可以并行)
print(f" [读者{name}] 正在读取: '{self.data}' (当前读者数: {self.reader_count})")
time.sleep(random.uniform(0.1, 0.3))
# 读者退出
with self.mutex:
self.reader_count -= 1
if self.reader_count == 0:
self.write_lock.release() # 最后一个读者释放写锁
time.sleep(random.uniform(0.1, 0.2))
def writer(self, name):
for i in range(2):
self.write_lock.acquire() # 写者独占
self.data = f"{name}的第{i+1}次写入"
print(f"[写者{name}] >>> 写入: '{self.data}'")
time.sleep(random.uniform(0.2, 0.4))
self.write_lock.release() # 写者释放
time.sleep(random.uniform(0.1, 0.3))
# 测试
rw = ReaderWriterProblem()
threads = []
for i in range(3):
threads.append(threading.Thread(target=rw.reader, args=(f"R{i}",)))
for i in range(2):
threads.append(threading.Thread(target=rw.writer, args=(f"W{i}",)))
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终数据: '{rw.data}'")
哲学家就餐问题¶
问题描述:5个哲学家围坐圆桌,每两人之间有一根筷子,需要拿到左右两根筷子才能吃饭。
解法一:资源层次法(Resource Hierarchy)
import threading
import time
import random
NUM_PHILOSOPHERS = 5
chopsticks = [threading.Lock() for _ in range(NUM_PHILOSOPHERS)]
def philosopher_hierarchy(id):
"""资源层次法:总是先拿编号小的筷子"""
left = id
right = (id + 1) % NUM_PHILOSOPHERS
# 总是先获取编号较小的筷子
first = min(left, right)
second = max(left, right)
for meal in range(3):
# 思考
print(f"哲学家{id}: 思考中...")
time.sleep(random.uniform(0.1, 0.3))
# 拿筷子(按编号顺序)
chopsticks[first].acquire()
chopsticks[second].acquire()
# 吃饭
print(f"哲学家{id}: 正在吃第{meal+1}顿饭 🍜")
time.sleep(random.uniform(0.1, 0.2))
# 放筷子
chopsticks[second].release()
chopsticks[first].release()
print(f"哲学家{id}: 已吃饱")
# 启动
threads = [threading.Thread(target=philosopher_hierarchy, args=(i,))
for i in range(NUM_PHILOSOPHERS)]
for t in threads:
t.start()
for t in threads:
t.join()
print("所有哲学家都吃饱了!")
解法二:服务生解法(Waiter/限制同时进餐人数)
import threading
import time
import random
NUM_PHILOSOPHERS = 5
chopsticks = [threading.Lock() for _ in range(NUM_PHILOSOPHERS)]
# 最多允许4个哲学家同时尝试拿筷子(防止5个都拿了左筷子导致死锁)
waiter = threading.Semaphore(NUM_PHILOSOPHERS - 1)
def philosopher_waiter(id):
"""服务生解法:限制同时进餐人数"""
left = id
right = (id + 1) % NUM_PHILOSOPHERS
for meal in range(3):
print(f"哲学家{id}: 思考中...")
time.sleep(random.uniform(0.1, 0.3))
waiter.acquire() # 向服务生请求许可
chopsticks[left].acquire()
chopsticks[right].acquire()
print(f"哲学家{id}: 正在吃第{meal+1}顿饭 🍜")
time.sleep(random.uniform(0.1, 0.2))
chopsticks[right].release()
chopsticks[left].release()
waiter.release() # 归还许可
print(f"哲学家{id}: 已吃饱")
threads = [threading.Thread(target=philosopher_waiter, args=(i,))
for i in range(NUM_PHILOSOPHERS)]
for t in threads:
t.start()
for t in threads:
t.join()
print("所有哲学家都吃饱了!")
两种解法对比:
| 方面 | 资源层次法 | 服务生解法 |
|---|---|---|
| 原理 | 按编号顺序获取资源 | 限制并发获取资源的数量 |
| 最大并发 | 2(5个哲学家) | 2(最多4个尝试) |
| 复杂度 | 需要确定全局资源顺序 | 需要额外的信号量 |
| 适用性 | 资源有自然顺序时 | 通用场景 |
管程(Monitor)¶
概念与结构¶
管程是一种高级同步原语,将共享数据和对数据的操作封装在一起,保证同一时刻最多有一个进程在管程内执行。
管程结构:
┌─────────────────────────────────┐
│ Monitor │
│ ┌─────────────────────────┐ │
│ │ 共享数据 │ │
│ │ (condition variables) │ │
│ └─────────────────────────┘ │
│ │
│ ┌─────────────────────────┐ │
│ │ procedure 1() │ │ ← 同一时刻只有一个进程
│ │ procedure 2() │ │ 在管程内执行
│ │ procedure 3() │ │
│ └─────────────────────────┘ │
│ │
│ ┌─────────────────────────┐ │
│ │ 初始化代码 │ │
│ └─────────────────────────┘ │
│ │
│ 入口队列 → [P1][P2][P3]... │
└─────────────────────────────────┘
管程的关键特性: 1. 互斥:管程内一次只能有一个活跃进程 2. 条件变量:进程可在条件不满足时等待,条件满足时被唤醒 3. 封装:共享数据只能通过管程内的过程访问
Java synchronized 与 Monitor¶
Java的synchronized关键字基于管程模型实现:
public class MonitorExample {
private int count = 0;
// synchronized方法 —— 整个方法是管程的一个过程
public synchronized void increment() { // synchronized同步锁,保证线程安全
count++; // 互斥访问
}
// 等价于
public void incrementExplicit() {
synchronized (this) { // 获取管程锁
count++;
}
}
// 条件变量的使用(wait/notify)
private boolean ready = false;
public synchronized void waitForReady() throws InterruptedException {
while (!ready) {
wait(); // 释放管程锁,加入等待队列
}
// 条件满足,继续执行
}
public synchronized void setReady() {
ready = true;
notifyAll(); // 唤醒所有等待的线程
}
}
Java管程模型分析: - 每个Java对象都可以作为管程(有一把内置锁/Monitor Lock) - synchronized获取对象的内置锁 - wait()释放锁并进入等待集合 - notify()/notifyAll()唤醒等待集合中的线程 - ReentrantLock + Condition是更灵活的管程实现
原子操作与CAS¶
Compare-And-Swap 原理¶
CAS(Compare-And-Swap)是一条CPU指令级的原子操作,是无锁并发编程的基础。
CAS(内存地址V, 期望值A, 新值B):
if V的当前值 == A:
将V设为B
return true // 交换成功
else:
return false // 交换失败(说明被其他线程修改了)
# CAS的伪代码模拟
import threading # 线程池/多线程:并发执行任务
class AtomicInteger:
"""使用CAS思想实现的原子整数(Python模拟)"""
def __init__(self, value=0):
self._value = value
self._lock = threading.Lock() # Python层面模拟原子性
def compare_and_swap(self, expected, new_value):
"""CAS操作"""
with self._lock: # 实际CPU中这是一条原子指令
if self._value == expected:
self._value = new_value
return True
return False
def get(self):
return self._value
def increment_and_get(self):
"""无锁自增"""
while True:
current = self._value
if self.compare_and_swap(current, current + 1):
return current + 1
# CAS失败,重试(自旋)
# 测试
atom = AtomicInteger(0)
def worker():
for _ in range(10000):
atom.increment_and_get()
threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"最终值: {atom.get()}") # 40000
ABA问题¶
问题描述:线程T1读取值为A,T2将A改为B再改回A,T1执行CAS时发现值仍为A,认为没有被修改过,CAS成功。但实际上值已经被"动过"了。
解决方案:
| 方案 | 原理 | Java实现 |
|---|---|---|
| 版本号/时间戳 | 每次修改都增加版本号,CAS时同时比较值和版本号 | AtomicStampedReference |
| 标记位 | 用布尔标记是否被修改过 | AtomicMarkableReference |
Java AtomicInteger 示例¶
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
public class CASExample {
// 基本原子操作
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
// 原子自增
counter.incrementAndGet(); // ++counter (原子操作)
counter.getAndIncrement(); // counter++ (原子操作)
// CAS操作
boolean success = counter.compareAndSet(2, 10); // 期望2,设为10
System.out.println("CAS结果: " + success + ", 当前值: " + counter.get());
// 解决ABA问题
AtomicStampedReference<Integer> stampedRef =
new AtomicStampedReference<>(100, 0); // 初始值100,版本号0
int stamp = stampedRef.getStamp();
// 同时比较值和版本号
stampedRef.compareAndSet(100, 200, stamp, stamp + 1);
}
}
内存屏障与happens-before¶
指令重排序¶
处理器和编译器为了优化性能,可能会对指令进行重排序:
// 原始代码
int a = 1; // 语句1
int b = 2; // 语句2
int c = a + b; // 语句3
// 可能被重排序为(不影响单线程结果):
int b = 2; // 语句2
int a = 1; // 语句1
int c = a + b; // 语句3
// 但在多线程中可能导致问题!
重排序类型: 1. 编译器重排序:编译器优化调整指令顺序 2. CPU指令重排序:处理器乱序执行 3. 内存系统重排序:缓存/写缓冲区导致的可见性问题
volatile 语义(Java)¶
public class VolatileExample {
// volatile保证:可见性 + 禁止重排序
private volatile boolean flag = false;
private int data = 0;
// 线程A
public void writer() {
data = 42; // 语句1
flag = true; // 语句2(volatile写)
// volatile写之前的所有写操作,对后续volatile读可见
// 语句1和2不会被重排序
}
// 线程B
public void reader() {
if (flag) { // 语句3(volatile读)
int result = data; // 语句4
// 保证能读到data=42
// 语句3和4不会被重排序
}
}
}
volatile的作用: - 可见性:对volatile变量的写,对后续的读立即可见(刷新到主内存/从主内存读取) - 有序性:volatile写之前的操作不会被重排到volatile写之后;volatile读之后的操作不会被重排到volatile读之前 - 不保证原子性:volatile int i; i++ 不是原子的
happens-before 规则¶
Java内存模型(JMM)定义了happens-before关系来保证多线程程序的可见性和有序性:
| 规则 | 描述 |
|---|---|
| 程序顺序规则 | 同一线程中,前面的操作 happens-before 后面的操作 |
| 锁规则 | unlock操作 happens-before 后续的lock操作(同一把锁) |
| volatile规则 | volatile写 happens-before 后续的volatile读(同一变量) |
| 传递性 | 如果A happens-before B,B happens-before C,则A happens-before C |
| 线程启动规则 | Thread.start() happens-before 该线程中的所有操作 |
| 线程终止规则 | 线程中的所有操作 happens-before Thread.join()返回 |
| 中断规则 | Thread.interrupt() happens-before 被中断线程检测到中断 |
| 终结器规则 | 构造函数完成 happens-before finalize()方法开始 |
并发原语对照表¶
| 概念 | Python | Java | C/POSIX | Go |
|---|---|---|---|---|
| 互斥锁 | threading.Lock | synchronized / ReentrantLock | pthread_mutex_t | sync.Mutex |
| 信号量 | threading.Semaphore | java.util.concurrent.Semaphore | sem_t | chan模拟 |
| 条件变量 | threading.Condition | Condition (Lock接口) | pthread_cond_t | sync.Cond |
| 读写锁 | — (需自行实现) | ReadWriteLock / ReentrantReadWriteLock | pthread_rwlock_t | sync.RWMutex |
| 原子操作 | — (GIL保护部分操作) | AtomicInteger / AtomicLong 等 | atomic_int (C11) / __sync_* | sync/atomic |
| 屏障 | threading.Barrier | CyclicBarrier / CountDownLatch | pthread_barrier_t | sync.WaitGroup |
| 线程安全队列 | queue.Queue | BlockingQueue | 需自行实现 | chan |
面试高频题¶
1. 什么是竞态条件?如何避免?¶
答:竞态条件(Race Condition)指多个进程/线程并发访问和操作共享数据时,最终结果依赖于执行的时序。例如两个线程同时执行count++,可能导致结果不正确。避免方法包括:使用互斥锁保护临界区、使用原子操作(CAS)、使用线程安全的数据结构、避免共享状态(如使用消息传递)。
2. 信号量和互斥锁有什么区别?¶
答:互斥锁(Mutex)是二值的,只能由持有者释放,用于保护临界区的互斥访问。信号量(Semaphore)是计数值,可以由任何线程释放(不一定是获取者),既可以实现互斥(二值信号量)也可以实现同步(计数信号量)。互斥锁有所有权概念(谁加锁谁解锁),信号量没有。互斥锁通常还有优先级继承等高级特性来避免优先级反转。
3. 什么是死锁?如何预防和避免?¶
答:死锁是多个进程相互等待对方持有的资源,导致都无法继续执行。四个必要条件:互斥、持有并等待、不可抢占、循环等待。预防方法:破坏任一条件即可——一次申请所有资源(破坏持有并等待)、允许抢占、资源排序(破坏循环等待)。避免方法:银行家算法动态检查安全状态。检测和恢复:进程等待图检测环路,终止死锁进程或资源抢占。
4. 生产者-消费者问题为什么P操作的顺序不能颠倒?¶
答:经典信号量解法中必须先P(empty/full)再P(mutex)。如果先P(mutex)再P(empty):当缓冲区满时,生产者先获取了mutex锁,然后P(empty)阻塞等待消费者释放空间;但消费者需要P(mutex)才能消费,而mutex被生产者持有——死锁!正确顺序是先检查资源条件(empty/full),再获取互斥锁(mutex),这样即使阻塞在资源条件上也不会持有mutex锁。
5. 什么是优先级反转?如何解决?¶
答:优先级反转指高优先级任务因等待低优先级任务持有的锁而被阻塞,而中优先级任务抢占了低优先级任务的执行,导致高优先级任务间接被中优先级任务延迟。经典案例是"火星探路者号"任务故障。解决方案:优先级继承——低优先级任务在持有锁期间临时继承等待该锁的最高优先级任务的优先级;优先级天花板——将锁的优先级设为所有可能访问该锁的任务中的最高优先级。
6. CAS的优缺点是什么?¶
答:优点:无锁,避免了线程阻塞和上下文切换的开销;不会造成死锁。缺点:①ABA问题——值被改了又改回来,CAS检测不到(用版本号解决);②自旋开销——竞争激烈时大量线程反复重试,浪费CPU;③只能保证单个变量的原子性——无法像锁那样保护一段代码的原子性。适用场景:竞争不激烈、操作简单(如计数器递增)的场景。
7. 什么是虚假唤醒?为什么wait要用while循环?¶
答:虚假唤醒是指在没有其他线程signal/notify的情况下,wait返回了。这是POSIX和Java规范允许的行为(出于实现效率考虑)。因此等待条件的代码模式必须使用while循环而非if判断:while (!condition) { wait(); }。这样即使发生虚假唤醒,也会重新检查条件,只有条件真正满足才继续执行。另外,notify可能唤醒了一个线程,但在该线程获取锁之前,条件又被其他线程改变了。
8. volatile能保证线程安全吗?¶
答:不能完全保证。volatile只保证可见性(对变量的修改立即对其他线程可见)和有序性(禁止指令重排序),但不保证原子性。例如volatile int i; i++不是线程安全的,因为i++包含读取、加1、写回三步操作,volatile不能保证这三步的原子性。对于复合操作(check-then-act、read-modify-write),仍需使用锁或原子类。volatile适用于一写多读的场景(如标志位)。
9. synchronized和ReentrantLock的区别?¶
答:①synchronized是JVM内置关键字,ReentrantLock是API层面的锁;②synchronized自动释放锁(退出同步块/异常时),ReentrantLock需手动在finally中释放;③ReentrantLock提供更多功能:可中断加锁(lockInterruptibly)、超时加锁(tryLock)、公平锁选项、多个Condition条件变量;④性能上JDK6以后synchronized经过偏向锁、轻量级锁优化,性能与ReentrantLock相当;⑤synchronized不可降级为读锁,而ReentrantReadWriteLock支持读写分离。
10. 哲学家就餐问题有哪些解法?¶
答:①资源层次法:为筷子编号,每个哲学家总是先拿编号小的筷子,破坏循环等待条件;②服务生解法:引入一个服务生(信号量),最多允许n-1个哲学家同时拿筷子;③奇偶法:奇数号哲学家先拿左筷子,偶数号先拿右筷子,破坏统一的获取顺序;④chandy/Misra解法:通过令牌传递实现分布式解法;⑤超时重试:拿到一只筷子后尝试拿另一只,超时则放下重试(tryLock)。实际中通常使用资源层次法或限制并发数的方法。
练习题¶
1. 使用信号量实现一个"H₂O水分子生成"问题:有两类线程,一类产生氢原子,一类产生氧原子。需要2个氢原子和1个氧原子组合才能形成水分子。请设计同步机制确保正确的组合。
2. 分析以下代码是否存在竞态条件,如果有,如何修复?
balance = 1000
def withdraw(amount):
global balance
if balance >= amount:
balance -= amount
return True
return False
3. 实现一个有界阻塞队列(Bounded Blocking Queue),支持put(item)和get()操作,使用条件变量而非信号量实现。
4. 解释为什么Peterson算法在现代多核处理器上需要内存屏障才能正确工作?如何通过添加内存屏障修复?
5. 考虑一个银行转账场景:transfer(from_account, to_account, amount)。如果使用锁保护每个账户,如何避免两个线程同时执行transfer(A, B, 100)和transfer(B, A, 200)时产生的死锁?
延伸阅读¶
- 《操作系统概念》第9版 —— 第6、7章(同步与死锁)
- 《Java并发编程实战》—— Brian Goetz
- 《深入理解Java虚拟机》—— 第12章 Java内存模型与线程
- 《The Art of Multiprocessor Programming》—— Maurice Herlihy
- Java Memory Model (JSR-133)
- Linux Kernel Synchronization