Java并发编程进阶¶
🎯 学习目标¶
完成本章学习后,你将能够: - 深入理解Java内存模型(JMM)及happens-before规则 - 掌握volatile和synchronized的底层实现原理 - 理解AQS框架设计思想与CLH队列 - 熟练使用并发工具类解决实际问题 - 深入理解线程池的参数配置与最佳实践 - 掌握CompletableFuture异步编程 - 理解主流并发集合的底层原理 - 对比虚拟线程与传统线程的差异
1. Java内存模型(JMM)¶
1.1 JMM核心概念¶
Java内存模型(Java Memory Model)定义了多线程程序中变量的访问规则,解决的核心问题是可见性、有序性和原子性。
Text Only
┌─────────────────────────────────────────────┐
│ 主内存 (Main Memory) │
│ 存储所有共享变量(堆中的实例变量、静态变量) │
└──────────┬─────────────────┬────────────────┘
│ read/write │ read/write
┌──────▼──────┐ ┌──────▼──────┐
│ 工作内存 │ │ 工作内存 │
│ (线程A私有) │ │ (线程B私有) │
│ CPU缓存/寄存器│ │ CPU缓存/寄存器│
└─────────────┘ └─────────────┘
- 可见性:线程A修改了共享变量,线程B能否立即看到?
- 有序性:编译器和CPU可能对指令重排序,多线程下可能观察到意外顺序
- 原子性:操作是否不可分割(如
i++并非原子操作:read→add→write)
1.2 happens-before规则¶
JMM通过happens-before关系保证内存可见性。如果操作A happens-before操作B,则A的结果对B可见。
Java
// 8条 happens-before 规则:
// 1. 程序顺序规则:同一线程中,前面的操作happens-before后面的操作
// 2. 监视器锁规则:unlock happens-before 后续同一锁的lock
// 3. volatile规则:volatile写 happens-before 后续对同一变量的volatile读
// 4. 线程启动规则:Thread.start() happens-before 新线程中的任何操作
// 5. 线程终止规则:线程中的所有操作 happens-before Thread.join()返回
// 6. 中断规则:interrupt() happens-before 被检测到中断
// 7. 终结器规则:构造器 happens-before finalize()
// 8. 传递性:A hb B, B hb C → A hb C
// 示例:volatile保证可见性
class VolatileExample {
private int a = 0;
private volatile boolean flag = false; // volatile变量
// 线程A
public void writer() {
a = 42; // 1. 普通写
flag = true; // 2. volatile写 — 将a=42也刷新到主内存
}
// 线程B
public void reader() {
if (flag) { // 3. volatile读 — 从主内存刷新(包括a的新值)
System.out.println(a); // 4. 保证能看到42
// 根据volatile规则:2 hb 3
// 根据程序顺序规则:1 hb 2, 3 hb 4
// 根据传递性:1 hb 4 → a=42对线程B可见
}
}
}
2. volatile深入¶
2.1 volatile的语义¶
Java
// volatile两大语义:
// 1. 可见性:写入volatile变量立即刷新到主内存,读取时从主内存加载
// 2. 禁止重排序:通过插入内存屏障(Memory Barrier)
// 内存屏障类型:
// StoreStore屏障 → volatile写之前:确保前面的普通写已刷新
// StoreLoad屏障 → volatile写之后:最重的屏障,确保写操作对后续读可见
// LoadLoad屏障 → volatile读之后:确保后续读取是最新值
// LoadStore屏障 → volatile读之后:确保后续写不会被重排到volatile读之前
// volatile适用场景:
// ✅ 状态标志(boolean flag)
// ✅ 一次性安全发布(单例的DCL)
// ✅ 独立观察(如传感器读数)
// ❌ 复合操作(如 volatile int count; count++; → 非原子)
2.2 双重检查锁(DCL)单例¶
Java
public class Singleton {
// 必须加volatile!防止指令重排导致返回未初始化的对象
private static volatile Singleton INSTANCE;
private Singleton() {}
public static Singleton getInstance() {
if (INSTANCE == null) { // 第一次检查(无锁,快速返回)
synchronized (Singleton.class) {
if (INSTANCE == null) { // 第二次检查(加锁后确认)
INSTANCE = new Singleton();
// new对象分3步:1.分配内存 2.初始化 3.赋值引用
// 没有volatile时,2和3可能重排序 → 另一个线程拿到未初始化的对象
}
}
}
return INSTANCE;
}
}
// 更推荐的单例写法:枚举(天然线程安全 + 防反射 + 防序列化破坏)
public enum SingletonEnum {
INSTANCE;
public void doSomething() { }
}
3. synchronized深入¶
3.1 对象头结构(Mark Word)¶
Text Only
// 在HotSpot中,每个Java对象由以下结构组成:
// 对象头(Header)= Mark Word + 类指针(Klass Pointer) [+ 数组长度]
// 实例数据(Instance Data)
// 对齐填充(Padding)
// Mark Word(64位JVM,8字节)— 存储锁状态
// ┌────────────────────────────────────────────────────────┐
// │ 无锁态(001): [hashcode(31) | age(4) | biased(0) | 01] │
// │ 偏向锁(101): [threadId(54) | epoch(2)| age(4) | 1 | 01]│
// │ 轻量级锁(00): [指向栈帧中Lock Record的指针(62) | 00] │
// │ 重量级锁(10): [指向Monitor对象的指针(62) | 10] │
// │ GC标记(11): [空 | 11] │
// └────────────────────────────────────────────────────────┘
3.2 锁升级过程¶
Java
// 无锁 → 偏向锁 → 轻量级锁 → 重量级锁(只能升级,不能降级)
// 偏向锁(Biased Locking)
// - 场景:总是同一个线程获取锁(无竞争)
// - 原理:在Mark Word中记录线程ID,同线程再次进入时无需任何同步操作
// - 撤销:当其他线程尝试获取锁时,偏向锁被撤销,升级为轻量级锁
// - 注意:JDK15后默认关闭偏向锁(-XX:-UseBiasedLocking)
// 轻量级锁(Lightweight Lock)
// - 场景:交替执行(线程交替获取锁,实际竞争少)
// - 原理:CAS将Mark Word替换为指向栈帧中Lock Record的指针
// - 自旋:获取失败时不立即阻塞,而是忙等待(自适应自旋)
// - 升级:自旋超过阈值 → 升级为重量级锁
// 重量级锁(Heavyweight Lock)
// - 场景:激烈竞争
// - 原理:通过操作系统Mutex实现,涉及用户态/内核态切换
// - 包含:EntryList(等待获取锁的线程)、WaitSet(wait()的线程)
// - 开销最大
// 示例:观察锁升级(需要JOL工具)
// import org.openjdk.jol.info.ClassLayout;
// Object obj = new Object();
// System.out.println(ClassLayout.parseInstance(obj).toPrintable());
// synchronized (obj) {
// System.out.println(ClassLayout.parseInstance(obj).toPrintable());
// }
4. AQS框架详解¶
4.1 AbstractQueuedSynchronizer原理¶
Java
// AQS是java.util.concurrent中锁和同步器的基石
// ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 都基于AQS
// 核心要素:
// 1. state(volatile int)— 同步状态
// - ReentrantLock: 0=未锁定, >0=重入次数
// - Semaphore: 剩余许可数
// - CountDownLatch: 剩余计数
// 2. CLH队列 — FIFO双向链表,管理等待线程
// 3. 独占/共享模式
// CLH队列结构:
// head → [Node|SIGNAL] ⇄ [Node|SIGNAL] ⇄ [Node|0] ← tail
// (持锁线程) (等待线程1) (等待线程2)
// 独占模式获取锁的流程(如ReentrantLock.lock()):
// 1. tryAcquire(1) — 尝试CAS获取锁
// 2. 失败 → addWaiter(Node.EXCLUSIVE) — 创建Node加入CLH队列尾部
// 3. acquireQueued() — 自旋检查前驱是否为head
// - 前驱是head且tryAcquire成功 → 设为新head,获取成功
// - 否则 → park()阻塞,等待前驱unpark唤醒
// 释放锁的流程(如ReentrantLock.unlock()):
// 1. tryRelease(1) — state减1
// 2. state == 0 — 完全释放
// 3. unparkSuccessor(head) — 唤醒CLH队列中下一个等待线程
4.2 ReentrantLock vs synchronized¶
Java
import java.util.concurrent.locks.*;
public class LockComparison {
// ========== synchronized ==========
private final Object syncLock = new Object();
public void syncMethod() {
synchronized (syncLock) {
// 自动获取和释放锁
// 不可中断、非公平、无超时
}
}
// ========== ReentrantLock ==========
private final ReentrantLock lock = new ReentrantLock(true); // true=公平锁
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
public void lockMethod() {
lock.lock();
try {
// 手动获取和释放
} finally {
lock.unlock(); // 必须在finally中释放!
}
}
// 可中断获取
public void interruptibleLock() throws InterruptedException {
lock.lockInterruptibly(); // 等待时可被interrupt()中断
try {
// ...
} finally {
lock.unlock();
}
}
// 超时获取
public boolean tryLockWithTimeout() throws InterruptedException {
if (lock.tryLock(3, java.util.concurrent.TimeUnit.SECONDS)) {
try {
return true;
} finally {
lock.unlock();
}
}
return false; // 3秒内未获取到锁
}
// Condition实现精确唤醒(生产者-消费者)
private final int[] buffer = new int[10];
private int count = 0;
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) {
notFull.await(); // 缓冲区满,等待消费者
}
buffer[count++] = item;
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // 缓冲区空,等待生产者
}
int item = buffer[--count];
notFull.signal(); // 通知生产者
return item;
} finally {
lock.unlock();
}
}
}
// 对比总结
// | 特性 | synchronized | ReentrantLock |
// |-------------------|----------------|--------------------|
// | 实现层面 | JVM内置 | JDK(AQS) |
// | 获取/释放 | 自动 | 手动(try-finally) |
// | 可中断 | ❌ | ✅ lockInterruptibly |
// | 超时获取 | ❌ | ✅ tryLock(timeout) |
// | 公平锁 | ❌(非公平) | ✅ 可选 |
// | Condition条件 | wait/notify(一个)| ✅ 多个Condition |
// | 性能 | JDK6后持平 | 持平 |
// | 推荐 | 简单场景 | 需要高级特性时 |
5. 并发工具类¶
Java
import java.util.concurrent.*;
public class ConcurrencyTools {
// ========== CountDownLatch(一次性倒计数) ==========
// 场景:主线程等待N个子任务全部完成
static void countDownLatchDemo() throws InterruptedException {
int taskCount = 5;
CountDownLatch latch = new CountDownLatch(taskCount);
ExecutorService executor = Executors.newFixedThreadPool(taskCount);
for (int i = 0; i < taskCount; i++) {
final int id = i;
executor.submit(() -> {
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println("任务" + id + "完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 每完成一个,计数-1
}
});
}
latch.await(); // 阻塞到计数为0
System.out.println("所有任务完成!");
executor.shutdown();
}
// ========== CyclicBarrier(可循环屏障) ==========
// 场景:N个线程互相等待,到齐后一起执行
static void cyclicBarrierDemo() throws Exception {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("所有选手到达起跑线,开始比赛!");
});
for (int i = 0; i < parties; i++) {
final int id = i;
new Thread(() -> {
try {
System.out.println("选手" + id + "准备中...");
Thread.sleep((long) (Math.random() * 2000));
System.out.println("选手" + id + "到达起跑线");
barrier.await(); // 等待其他线程
System.out.println("选手" + id + "冲刺!");
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
// ========== Semaphore(信号量) ==========
// 场景:限制并发访问数(如数据库连接池、限流)
static void semaphoreDemo() {
Semaphore semaphore = new Semaphore(3); // 最多3个并发
for (int i = 0; i < 10; i++) {
final int id = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可(阻塞)
System.out.println("线程" + id + "获取资源,当前可用: "
+ semaphore.availablePermits());
Thread.sleep(2000); // 模拟使用
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
// ========== Phaser(分阶段同步) ==========
// 场景:多阶段任务,比CyclicBarrier更灵活(可动态注册/注销)
static void phaserDemo() {
Phaser phaser = new Phaser(3); // 3个参与者
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
System.out.println("线程" + id + " 阶段1完成");
phaser.arriveAndAwaitAdvance(); // 等待所有人完成阶段1
System.out.println("线程" + id + " 阶段2完成");
phaser.arriveAndAwaitAdvance(); // 等待所有人完成阶段2
System.out.println("线程" + id + " 全部完成");
phaser.arriveAndDeregister(); // 注销
}).start();
}
}
}
6. 线程池深入¶
6.1 ThreadPoolExecutor 7大参数¶
Java
// 构造器(最完整的7参数版本)
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数(保持存活,即使空闲)
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂(设置线程名等)
RejectedExecutionHandler handler // 拒绝策略
) { }
// 执行流程:
// 1. 当前线程数 < corePoolSize → 创建核心线程执行
// 2. 核心满 → 任务放入workQueue
// 3. workQueue满 → 创建非核心线程(直到maximumPoolSize)
// 4. 全满 → 触发RejectedExecutionHandler
// 示例:精准配置线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, // 核心4线程
8, // 最大8线程
60, TimeUnit.SECONDS, // 非核心线程空闲60s后回收
new LinkedBlockingQueue<>(100), // 有界队列(容量100)
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "biz-thread-" + counter.getAndIncrement());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
6.2 四种拒绝策略¶
Java
// 当线程池满且队列满时触发
// 1. AbortPolicy(默认)— 抛RejectedExecutionException
new ThreadPoolExecutor.AbortPolicy();
// 2. CallerRunsPolicy — 由调用线程自己执行(不会丢任务,起到限流效果)
new ThreadPoolExecutor.CallerRunsPolicy();
// 3. DiscardPolicy — 静默丢弃新任务
new ThreadPoolExecutor.DiscardPolicy();
// 4. DiscardOldestPolicy — 丢弃队列中最老的任务,然后重试
new ThreadPoolExecutor.DiscardOldestPolicy();
// 自定义策略:记录日志 + 持久化
RejectedExecutionHandler customHandler = (r, executor1) -> {
log.warn("线程池已满,任务被拒绝: {}", r.toString());
// 可以将任务保存到数据库/消息队列,后续补偿
};
6.3 线程池最佳实践¶
Java
// ❌ 不推荐:Executors工厂方法(可能OOM)
// Executors.newFixedThreadPool(N) → LinkedBlockingQueue无界队列 → OOM
// Executors.newCachedThreadPool() → maximumPoolSize=Integer.MAX_VALUE → OOM
// Executors.newSingleThreadExecutor() → 同上无界队列
// ✅ 推荐:手动创建ThreadPoolExecutor,明确所有参数
// 线程数设置经验:
// CPU密集型:N + 1(N = CPU核数)
// IO密集型:2 * N 或 N * (1 + W/C)(W=等待时间, C=计算时间)
int cpuCores = Runtime.getRuntime().availableProcessors();
// 线程池监控
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
System.out.printf(
"Pool: active=%d, poolSize=%d, queueSize=%d, completed=%d%n",
executor.getActiveCount(),
executor.getPoolSize(),
executor.getQueue().size(),
executor.getCompletedTaskCount()
);
}, 0, 5, TimeUnit.SECONDS);
// 优雅关闭
executor.shutdown(); // 不再接受新任务,等待已提交任务完成
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 超时强制关闭
executor.awaitTermination(10, TimeUnit.SECONDS);
}
7. CompletableFuture异步编程¶
Java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureDemo {
static ExecutorService executor = Executors.newFixedThreadPool(4);
public static void main(String[] args) throws Exception {
// ===== 创建异步任务 =====
// supplyAsync — 有返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // CompletableFuture 异步编程,支持链式回调
sleep(1000);
return "Hello";
}, executor);
// runAsync — 无返回值
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> {
System.out.println("异步执行,无返回值");
});
// ===== 链式转换 =====
CompletableFuture<Integer> lengthFuture = future
.thenApply(s -> s + " World") // 同步转换
.thenApply(String::length); // 链式转换
// thenCompose — 异步链(flatMap,避免嵌套CompletableFuture)
CompletableFuture<String> composed = future
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s.toUpperCase()));
// ===== 组合多个Future =====
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
sleep(500); return "Alice";
}, executor);
CompletableFuture<Integer> scoreFuture = CompletableFuture.supplyAsync(() -> {
sleep(800); return 95;
}, executor);
// thenCombine — 两个Future都完成后合并结果
CompletableFuture<String> combined = userFuture.thenCombine(scoreFuture,
(user, score) -> user + " 得分: " + score);
System.out.println(combined.get()); // "Alice 得分: 95"
// allOf — 等待所有Future完成
CompletableFuture<Void> all = CompletableFuture.allOf(userFuture, scoreFuture);
all.join(); // 阻塞到全部完成
// anyOf — 任意一个完成就返回
CompletableFuture<Object> any = CompletableFuture.anyOf(userFuture, scoreFuture);
System.out.println("最先完成: " + any.get());
// ===== 异常处理 =====
CompletableFuture<String> safeFuture = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了!");
return "ok";
}).exceptionally(ex -> {
System.err.println("异常: " + ex.getMessage());
return "fallback";
});
// handle — 统一处理结果和异常
CompletableFuture<String> handled = CompletableFuture.supplyAsync(() -> {
return "data";
}).handle((result, ex) -> {
if (ex != null) return "error: " + ex.getMessage();
return "success: " + result;
});
// ===== 实战:并行调用多个API =====
CompletableFuture<String> profileFuture =
CompletableFuture.supplyAsync(() -> fetchProfile(1L), executor);
CompletableFuture<String> ordersFuture =
CompletableFuture.supplyAsync(() -> fetchOrders(1L), executor);
CompletableFuture<String> recsFuture =
CompletableFuture.supplyAsync(() -> fetchRecommendations(1L), executor);
// 三个请求并行执行,全部完成后组合
String result = profileFuture.thenCombine(ordersFuture, (p, o) -> p + " | " + o)
.thenCombine(recsFuture, (po, r) -> po + " | " + r)
.get();
System.out.println(result);
executor.shutdown();
}
static String fetchProfile(Long id) { sleep(300); return "Profile-" + id; }
static String fetchOrders(Long id) { sleep(500); return "Orders-" + id; }
static String fetchRecommendations(Long id) { sleep(400); return "Recs-" + id; }
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
8. 并发集合¶
8.1 ConcurrentHashMap原理¶
Java
// JDK7:分段锁(Segment数组 + HashEntry数组)
// 每个Segment是一把ReentrantLock,最多16个并发写
// JDK8+:CAS + synchronized + 红黑树
// 数组(Node[]) + 链表 + 红黑树(与HashMap结构类似,但线程安全)
// 写操作:对桶头节点加synchronized(粒度更细:锁单个桶而非段)
// 读操作:无锁(volatile保证可见性)
// 初始化/扩容:CAS操作
// 关键方法
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 原子操作
map.putIfAbsent("key", 1); // key不存在时才put
map.computeIfAbsent("key", k -> 42); // key不存在时计算再put
map.compute("key", (k, v) -> (v == null) ? 1 : v + 1); // 原子累加
map.merge("key", 1, Integer::sum); // 合并(累加)
// 注意:size()是估算值;批量操作(forEach, search, reduce)默认并行阈值可调
map.forEach(1, (k, v) -> System.out.println(k + "=" + v)); // 并行遍历
8.2 其他并发集合¶
Java
// CopyOnWriteArrayList — 读多写极少场景
// 写时复制整个数组(写操作加锁),读操作无锁
// 适用:事件监听器列表、配置信息等
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item"); // 内部复制整个数组
// BlockingQueue — 生产者-消费者核心组件
// ArrayBlockingQueue — 有界数组,创建时指定容量
// LinkedBlockingQueue — 可选有界链表(默认Integer.MAX_VALUE)
// PriorityBlockingQueue — 优先级队列(无界)
// SynchronousQueue — 无缓冲,生产者直接交给消费者
// DelayQueue — 延迟队列(元素到期才能取出)
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("task"); // 队列满时阻塞
queue.offer("task", 3, TimeUnit.SECONDS); // 超时返回false
String item = queue.take(); // 队列空时阻塞
String item2 = queue.poll(3, TimeUnit.SECONDS); // 超时返回null
9. Fork/Join框架¶
Java
import java.util.concurrent.*;
/**
* Fork/Join框架 — 分治思想 + 工作窃取算法
* 适合递归可分的计算密集型任务
*/
public class ForkJoinDemo {
// 计算数组元素之和
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private final long[] array;
private final int start, end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 基础情况:直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 递归拆分
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 异步执行左半部分
long rightResult = right.compute(); // 同步执行右半部分
long leftResult = left.join(); // 等待左半部分完成
return leftResult + rightResult;
}
}
public static void main(String[] args) {
long[] array = new long[10_000_000];
for (int i = 0; i < array.length; i++) array[i] = i + 1;
ForkJoinPool pool = new ForkJoinPool(); // 默认线程数 = CPU核数
long result = pool.invoke(new SumTask(array, 0, array.length));
System.out.println("Sum = " + result);
// 工作窃取(Work Stealing):空闲线程从其他线程的双端队列尾部窃取任务
}
}
10. 虚拟线程 vs 传统线程(Java 21)¶
Java
public class VirtualVsPlatformThread {
public static void main(String[] args) throws Exception {
// ===== 性能对比 =====
int taskCount = 100_000;
// 平台线程:受OS线程数限制,10万个线程几乎不可能
// long start = System.currentTimeMillis();
// for (int i = 0; i < taskCount; i++) {
// Thread.ofPlatform().start(() -> {
// try { Thread.sleep(1000); } catch (Exception e) {}
// });
// } // 可能OOM: unable to create native thread
// 虚拟线程:轻松创建10万个
long start = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < taskCount; i++) {
executor.submit(() -> {
try { Thread.sleep(Duration.ofSeconds(1)); } catch (Exception e) {}
});
}
} // try-with-resources自动关闭并等待
long elapsed = System.currentTimeMillis() - start;
System.out.println("虚拟线程完成 " + taskCount + " 个任务,耗时: " + elapsed + "ms");
// ===== 关键区别 =====
// | 维度 | 平台线程 (Platform) | 虚拟线程 (Virtual) |
// |-----------|------------------------|--------------------------|
// | 映射 | 1:1 映射OS线程 | M:N 映射(多虚拟:少平台) |
// | 内存 | ~1MB 栈空间 | ~几KB,按需增长 |
// | 创建数量 | 数千级 | 百万级 |
// | 调度 | OS内核调度 | JVM用户态调度 |
// | 适用场景 | CPU密集型 | IO密集型 |
// | 线程池 | 需要池化复用 | 不需要池化(即用即抛) |
// | synchronized | 会pin住载体线程 | 推荐用ReentrantLock替代 |
// ===== 注意事项 =====
// 1. 虚拟线程中应避免长时间的synchronized块(会pin住载体线程)
// 推荐改用ReentrantLock
// 2. 不要池化虚拟线程(它们本身就是轻量的)
// 3. ThreadLocal在虚拟线程中有效但要注意内存,推荐Scoped Values
// 4. CPU密集型任务用平台线程仍然更合适
}
}
✏️ 练习¶
- 基础:使用
CountDownLatch模拟赛跑场景(5名运动员准备 → 裁判发枪 → 所有人跑完 → 输出成绩) - 中级:用
ThreadPoolExecutor手动创建线程池(4核心、8最大、100队列),提交200个任务,观察拒绝策略触发;补充线程池监控 - 高级:使用
CompletableFuture实现:并行调用price-service(获取价格)、stock-service(获取库存)、discount-service(获取折扣),三者均有超时处理和fallback,最终组合为商品详情
📋 面试要点¶
- volatile和synchronized区别? — volatile保证可见性和有序性但不保证原子性(适合单读写);synchronized保证三者(互斥锁)
- AQS原理? — volatile state + CLH双向队列 + CAS;独占模式(ReentrantLock)和共享模式(Semaphore/CountDownLatch)
- 线程池核心参数和执行流程? — 7参数(corePoolSize → workQueue → maximumPoolSize → handler),核心满放队列 → 队列满建非核心 → 全满触发拒绝
- 为什么不推荐Executors工厂方法? — FixedThreadPool/SingleThread用无界队列(可能OOM),CachedPool最大线程数无限(可能OOM)
- ConcurrentHashMap JDK7 vs 8? — 7分段锁(Segment+ReentrantLock),8 CAS+synchronized+红黑树(锁粒度从Segment细化到桶)
- 死锁条件?如何排查? — 互斥、请求与保持、不可剥夺、循环等待;jstack查看线程状态,Arthas thread -b
- CompletableFuture vs Future? — Future只能阻塞get();CompletableFuture支持链式回调(thenApply/thenCombine)、异常处理、组合
- 虚拟线程的使用注意? — 避免长synchronized(改用ReentrantLock)、不要池化、不适合CPU密集型
📌 下一步学习:16-数据库访问技术 — 掌握Java数据库持久层技术栈