跳转至

Java并发编程进阶

🎯 学习目标

完成本章学习后,你将能够: - 深入理解Java内存模型(JMM)及happens-before规则 - 掌握volatile和synchronized的底层实现原理 - 理解AQS框架设计思想与CLH队列 - 熟练使用并发工具类解决实际问题 - 深入理解线程池的参数配置与最佳实践 - 掌握CompletableFuture异步编程 - 理解主流并发集合的底层原理 - 对比虚拟线程与传统线程的差异

Java并发进阶能力栈


1. Java内存模型(JMM)

1.1 JMM核心概念

Java内存模型(Java Memory Model)定义了多线程程序中变量的访问规则,解决的核心问题是可见性有序性原子性

Text Only
┌─────────────────────────────────────────────┐
│                  主内存 (Main Memory)          │
│    存储所有共享变量(堆中的实例变量、静态变量)     │
└──────────┬─────────────────┬────────────────┘
           │  read/write     │  read/write
    ┌──────▼──────┐   ┌──────▼──────┐
    │ 工作内存      │   │ 工作内存      │
    │ (线程A私有)   │   │ (线程B私有)   │
    │ CPU缓存/寄存器│   │ CPU缓存/寄存器│
    └─────────────┘   └─────────────┘
  • 可见性:线程A修改了共享变量,线程B能否立即看到?
  • 有序性:编译器和CPU可能对指令重排序,多线程下可能观察到意外顺序
  • 原子性:操作是否不可分割(如i++并非原子操作:read→add→write)

1.2 happens-before规则

JMM通过happens-before关系保证内存可见性。如果操作A happens-before操作B,则A的结果对B可见。

Java
// 8条 happens-before 规则:

// 1. 程序顺序规则:同一线程中,前面的操作happens-before后面的操作
// 2. 监视器锁规则:unlock happens-before 后续同一锁的lock
// 3. volatile规则:volatile写 happens-before 后续对同一变量的volatile读
// 4. 线程启动规则:Thread.start() happens-before 新线程中的任何操作
// 5. 线程终止规则:线程中的所有操作 happens-before Thread.join()返回
// 6. 中断规则:interrupt() happens-before 被检测到中断
// 7. 终结器规则:构造器 happens-before finalize()
// 8. 传递性:A hb B, B hb C → A hb C

// 示例:volatile保证可见性
class VolatileExample {
    private int a = 0;
    private volatile boolean flag = false;  // volatile变量

    // 线程A
    public void writer() {
        a = 42;         // 1. 普通写
        flag = true;    // 2. volatile写 — 将a=42也刷新到主内存
    }

    // 线程B
    public void reader() {
        if (flag) {     // 3. volatile读 — 从主内存刷新(包括a的新值)
            System.out.println(a);  // 4. 保证能看到42
            // 根据volatile规则:2 hb 3
            // 根据程序顺序规则:1 hb 2, 3 hb 4
            // 根据传递性:1 hb 4 → a=42对线程B可见
        }
    }
}

2. volatile深入

2.1 volatile的语义

Java
// volatile两大语义:
// 1. 可见性:写入volatile变量立即刷新到主内存,读取时从主内存加载
// 2. 禁止重排序:通过插入内存屏障(Memory Barrier)

// 内存屏障类型:
// StoreStore屏障 → volatile写之前:确保前面的普通写已刷新
// StoreLoad屏障  → volatile写之后:最重的屏障,确保写操作对后续读可见
// LoadLoad屏障   → volatile读之后:确保后续读取是最新值
// LoadStore屏障  → volatile读之后:确保后续写不会被重排到volatile读之前

// volatile适用场景:
// ✅ 状态标志(boolean flag)
// ✅ 一次性安全发布(单例的DCL)
// ✅ 独立观察(如传感器读数)
// ❌ 复合操作(如 volatile int count; count++; → 非原子)

2.2 双重检查锁(DCL)单例

Java
public class Singleton {
    // 必须加volatile!防止指令重排导致返回未初始化的对象
    private static volatile Singleton INSTANCE;

    private Singleton() {}

    public static Singleton getInstance() {
        if (INSTANCE == null) {                  // 第一次检查(无锁,快速返回)
            synchronized (Singleton.class) {
                if (INSTANCE == null) {          // 第二次检查(加锁后确认)
                    INSTANCE = new Singleton();
                    // new对象分3步:1.分配内存 2.初始化 3.赋值引用
                    // 没有volatile时,2和3可能重排序 → 另一个线程拿到未初始化的对象
                }
            }
        }
        return INSTANCE;
    }
}

// 更推荐的单例写法:枚举(天然线程安全 + 防反射 + 防序列化破坏)
public enum SingletonEnum {
    INSTANCE;
    public void doSomething() { }
}

3. synchronized深入

3.1 对象头结构(Mark Word)

Text Only
// 在HotSpot中,每个Java对象由以下结构组成:
// 对象头(Header)= Mark Word + 类指针(Klass Pointer) [+ 数组长度]
// 实例数据(Instance Data)
// 对齐填充(Padding)

// Mark Word(64位JVM,8字节)— 存储锁状态
// ┌────────────────────────────────────────────────────────┐
// │ 无锁态(001):  [hashcode(31)  | age(4) | biased(0) | 01] │
// │ 偏向锁(101):  [threadId(54)  | epoch(2)| age(4) | 1 | 01]│
// │ 轻量级锁(00): [指向栈帧中Lock Record的指针(62)      | 00] │
// │ 重量级锁(10): [指向Monitor对象的指针(62)            | 10] │
// │ GC标记(11):   [空                                  | 11] │
// └────────────────────────────────────────────────────────┘

3.2 锁升级过程

Java
// 无锁 → 偏向锁 → 轻量级锁 → 重量级锁(只能升级,不能降级)

// 偏向锁(Biased Locking)
// - 场景:总是同一个线程获取锁(无竞争)
// - 原理:在Mark Word中记录线程ID,同线程再次进入时无需任何同步操作
// - 撤销:当其他线程尝试获取锁时,偏向锁被撤销,升级为轻量级锁
// - 注意:JDK15后默认关闭偏向锁(-XX:-UseBiasedLocking)

// 轻量级锁(Lightweight Lock)
// - 场景:交替执行(线程交替获取锁,实际竞争少)
// - 原理:CAS将Mark Word替换为指向栈帧中Lock Record的指针
// - 自旋:获取失败时不立即阻塞,而是忙等待(自适应自旋)
// - 升级:自旋超过阈值 → 升级为重量级锁

// 重量级锁(Heavyweight Lock)
// - 场景:激烈竞争
// - 原理:通过操作系统Mutex实现,涉及用户态/内核态切换
// - 包含:EntryList(等待获取锁的线程)、WaitSet(wait()的线程)
// - 开销最大

// 示例:观察锁升级(需要JOL工具)
// import org.openjdk.jol.info.ClassLayout;
// Object obj = new Object();
// System.out.println(ClassLayout.parseInstance(obj).toPrintable());
// synchronized (obj) {
//     System.out.println(ClassLayout.parseInstance(obj).toPrintable());
// }

4. AQS框架详解

4.1 AbstractQueuedSynchronizer原理

Java
// AQS是java.util.concurrent中锁和同步器的基石
// ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 都基于AQS

// 核心要素:
// 1. state(volatile int)— 同步状态
//    - ReentrantLock: 0=未锁定, >0=重入次数
//    - Semaphore: 剩余许可数
//    - CountDownLatch: 剩余计数
// 2. CLH队列 — FIFO双向链表,管理等待线程
// 3. 独占/共享模式

// CLH队列结构:
//   head → [Node|SIGNAL] ⇄ [Node|SIGNAL] ⇄ [Node|0] ← tail
//            (持锁线程)     (等待线程1)      (等待线程2)

// 独占模式获取锁的流程(如ReentrantLock.lock()):
// 1. tryAcquire(1)          — 尝试CAS获取锁
// 2. 失败 → addWaiter(Node.EXCLUSIVE) — 创建Node加入CLH队列尾部
// 3. acquireQueued()        — 自旋检查前驱是否为head
//    - 前驱是head且tryAcquire成功 → 设为新head,获取成功
//    - 否则 → park()阻塞,等待前驱unpark唤醒

// 释放锁的流程(如ReentrantLock.unlock()):
// 1. tryRelease(1)          — state减1
// 2. state == 0             — 完全释放
// 3. unparkSuccessor(head)  — 唤醒CLH队列中下一个等待线程

4.2 ReentrantLock vs synchronized

Java
import java.util.concurrent.locks.*;

public class LockComparison {

    // ========== synchronized ==========
    private final Object syncLock = new Object();

    public void syncMethod() {
        synchronized (syncLock) {
            // 自动获取和释放锁
            // 不可中断、非公平、无超时
        }
    }

    // ========== ReentrantLock ==========
    private final ReentrantLock lock = new ReentrantLock(true); // true=公平锁
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    public void lockMethod() {
        lock.lock();
        try {
            // 手动获取和释放
        } finally {
            lock.unlock(); // 必须在finally中释放!
        }
    }

    // 可中断获取
    public void interruptibleLock() throws InterruptedException {
        lock.lockInterruptibly(); // 等待时可被interrupt()中断
        try {
            // ...
        } finally {
            lock.unlock();
        }
    }

    // 超时获取
    public boolean tryLockWithTimeout() throws InterruptedException {
        if (lock.tryLock(3, java.util.concurrent.TimeUnit.SECONDS)) {
            try {
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false; // 3秒内未获取到锁
    }

    // Condition实现精确唤醒(生产者-消费者)
    private final int[] buffer = new int[10];
    private int count = 0;

    public void produce(int item) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) {
                notFull.await();   // 缓冲区满,等待消费者
            }
            buffer[count++] = item;
            notEmpty.signal();      // 通知消费者
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();  // 缓冲区空,等待生产者
            }
            int item = buffer[--count];
            notFull.signal();       // 通知生产者
            return item;
        } finally {
            lock.unlock();
        }
    }
}

// 对比总结
// | 特性              | synchronized   | ReentrantLock      |
// |-------------------|----------------|--------------------|
// | 实现层面          | JVM内置         | JDK(AQS)          |
// | 获取/释放         | 自动            | 手动(try-finally) |
// | 可中断            | ❌              | ✅ lockInterruptibly |
// | 超时获取          | ❌              | ✅ tryLock(timeout)  |
// | 公平锁            | ❌(非公平)     | ✅ 可选             |
// | Condition条件     | wait/notify(一个)| ✅ 多个Condition    |
// | 性能              | JDK6后持平      | 持平                |
// | 推荐              | 简单场景        | 需要高级特性时       |

5. 并发工具类

Java
import java.util.concurrent.*;

public class ConcurrencyTools {

    // ========== CountDownLatch(一次性倒计数) ==========
    // 场景:主线程等待N个子任务全部完成
    static void countDownLatchDemo() throws InterruptedException {
        int taskCount = 5;
        CountDownLatch latch = new CountDownLatch(taskCount);
        ExecutorService executor = Executors.newFixedThreadPool(taskCount);

        for (int i = 0; i < taskCount; i++) {
            final int id = i;
            executor.submit(() -> {
                try {
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println("任务" + id + "完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();  // 每完成一个,计数-1
                }
            });
        }
        latch.await();  // 阻塞到计数为0
        System.out.println("所有任务完成!");
        executor.shutdown();
    }

    // ========== CyclicBarrier(可循环屏障) ==========
    // 场景:N个线程互相等待,到齐后一起执行
    static void cyclicBarrierDemo() throws Exception {
        int parties = 3;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("所有选手到达起跑线,开始比赛!");
        });

        for (int i = 0; i < parties; i++) {
            final int id = i;
            new Thread(() -> {
                try {
                    System.out.println("选手" + id + "准备中...");
                    Thread.sleep((long) (Math.random() * 2000));
                    System.out.println("选手" + id + "到达起跑线");
                    barrier.await();  // 等待其他线程
                    System.out.println("选手" + id + "冲刺!");
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }

    // ========== Semaphore(信号量) ==========
    // 场景:限制并发访问数(如数据库连接池、限流)
    static void semaphoreDemo() {
        Semaphore semaphore = new Semaphore(3);  // 最多3个并发

        for (int i = 0; i < 10; i++) {
            final int id = i;
            new Thread(() -> {
                try {
                    semaphore.acquire();  // 获取许可(阻塞)
                    System.out.println("线程" + id + "获取资源,当前可用: "
                            + semaphore.availablePermits());
                    Thread.sleep(2000);   // 模拟使用
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release();  // 释放许可
                }
            }).start();
        }
    }

    // ========== Phaser(分阶段同步) ==========
    // 场景:多阶段任务,比CyclicBarrier更灵活(可动态注册/注销)
    static void phaserDemo() {
        Phaser phaser = new Phaser(3);  // 3个参与者

        for (int i = 0; i < 3; i++) {
            final int id = i;
            new Thread(() -> {
                System.out.println("线程" + id + " 阶段1完成");
                phaser.arriveAndAwaitAdvance();  // 等待所有人完成阶段1

                System.out.println("线程" + id + " 阶段2完成");
                phaser.arriveAndAwaitAdvance();  // 等待所有人完成阶段2

                System.out.println("线程" + id + " 全部完成");
                phaser.arriveAndDeregister();    // 注销
            }).start();
        }
    }
}

6. 线程池深入

6.1 ThreadPoolExecutor 7大参数

Java
// 构造器(最完整的7参数版本)
public ThreadPoolExecutor(
    int corePoolSize,       // 核心线程数(保持存活,即使空闲)
    int maximumPoolSize,    // 最大线程数
    long keepAliveTime,     // 非核心线程空闲存活时间
    TimeUnit unit,          // 时间单位
    BlockingQueue<Runnable> workQueue,  // 任务队列
    ThreadFactory threadFactory,        // 线程工厂(设置线程名等)
    RejectedExecutionHandler handler    // 拒绝策略
) { }

// 执行流程:
// 1. 当前线程数 < corePoolSize → 创建核心线程执行
// 2. 核心满 → 任务放入workQueue
// 3. workQueue满 → 创建非核心线程(直到maximumPoolSize)
// 4. 全满 → 触发RejectedExecutionHandler

// 示例:精准配置线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    4,                                    // 核心4线程
    8,                                    // 最大8线程
    60, TimeUnit.SECONDS,                 // 非核心线程空闲60s后回收
    new LinkedBlockingQueue<>(100),       // 有界队列(容量100)
    new ThreadFactory() {
        private final AtomicInteger counter = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "biz-thread-" + counter.getAndIncrement());
            t.setDaemon(false);
            return t;
        }
    },
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);

6.2 四种拒绝策略

Java
// 当线程池满且队列满时触发

// 1. AbortPolicy(默认)— 抛RejectedExecutionException
new ThreadPoolExecutor.AbortPolicy();

// 2. CallerRunsPolicy — 由调用线程自己执行(不会丢任务,起到限流效果)
new ThreadPoolExecutor.CallerRunsPolicy();

// 3. DiscardPolicy — 静默丢弃新任务
new ThreadPoolExecutor.DiscardPolicy();

// 4. DiscardOldestPolicy — 丢弃队列中最老的任务,然后重试
new ThreadPoolExecutor.DiscardOldestPolicy();

// 自定义策略:记录日志 + 持久化
RejectedExecutionHandler customHandler = (r, executor1) -> {
    log.warn("线程池已满,任务被拒绝: {}", r.toString());
    // 可以将任务保存到数据库/消息队列,后续补偿
};

6.3 线程池最佳实践

Java
// ❌ 不推荐:Executors工厂方法(可能OOM)
// Executors.newFixedThreadPool(N)       → LinkedBlockingQueue无界队列 → OOM
// Executors.newCachedThreadPool()       → maximumPoolSize=Integer.MAX_VALUE → OOM
// Executors.newSingleThreadExecutor()   → 同上无界队列

// ✅ 推荐:手动创建ThreadPoolExecutor,明确所有参数

// 线程数设置经验:
// CPU密集型:N + 1(N = CPU核数)
// IO密集型:2 * N 或 N * (1 + W/C)(W=等待时间, C=计算时间)

int cpuCores = Runtime.getRuntime().availableProcessors();

// 线程池监控
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {
    System.out.printf(
        "Pool: active=%d, poolSize=%d, queueSize=%d, completed=%d%n",
        executor.getActiveCount(),
        executor.getPoolSize(),
        executor.getQueue().size(),
        executor.getCompletedTaskCount()
    );
}, 0, 5, TimeUnit.SECONDS);

// 优雅关闭
executor.shutdown();                          // 不再接受新任务,等待已提交任务完成
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
    executor.shutdownNow();                   // 超时强制关闭
    executor.awaitTermination(10, TimeUnit.SECONDS);
}

7. CompletableFuture异步编程

Java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureDemo {

    static ExecutorService executor = Executors.newFixedThreadPool(4);

    public static void main(String[] args) throws Exception {

        // ===== 创建异步任务 =====
        // supplyAsync — 有返回值
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {  // CompletableFuture 异步编程,支持链式回调
            sleep(1000);
            return "Hello";
        }, executor);

        // runAsync — 无返回值
        CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> {
            System.out.println("异步执行,无返回值");
        });

        // ===== 链式转换 =====
        CompletableFuture<Integer> lengthFuture = future
            .thenApply(s -> s + " World")             // 同步转换
            .thenApply(String::length);                // 链式转换

        // thenCompose — 异步链(flatMap,避免嵌套CompletableFuture)
        CompletableFuture<String> composed = future
            .thenCompose(s -> CompletableFuture.supplyAsync(() -> s.toUpperCase()));

        // ===== 组合多个Future =====
        CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
            sleep(500); return "Alice";
        }, executor);
        CompletableFuture<Integer> scoreFuture = CompletableFuture.supplyAsync(() -> {
            sleep(800); return 95;
        }, executor);

        // thenCombine — 两个Future都完成后合并结果
        CompletableFuture<String> combined = userFuture.thenCombine(scoreFuture,
            (user, score) -> user + " 得分: " + score);
        System.out.println(combined.get());  // "Alice 得分: 95"

        // allOf — 等待所有Future完成
        CompletableFuture<Void> all = CompletableFuture.allOf(userFuture, scoreFuture);
        all.join();  // 阻塞到全部完成

        // anyOf — 任意一个完成就返回
        CompletableFuture<Object> any = CompletableFuture.anyOf(userFuture, scoreFuture);
        System.out.println("最先完成: " + any.get());

        // ===== 异常处理 =====
        CompletableFuture<String> safeFuture = CompletableFuture.supplyAsync(() -> {
            if (true) throw new RuntimeException("出错了!");
            return "ok";
        }).exceptionally(ex -> {
            System.err.println("异常: " + ex.getMessage());
            return "fallback";
        });

        // handle — 统一处理结果和异常
        CompletableFuture<String> handled = CompletableFuture.supplyAsync(() -> {
            return "data";
        }).handle((result, ex) -> {
            if (ex != null) return "error: " + ex.getMessage();
            return "success: " + result;
        });

        // ===== 实战:并行调用多个API =====
        CompletableFuture<String> profileFuture =
            CompletableFuture.supplyAsync(() -> fetchProfile(1L), executor);
        CompletableFuture<String> ordersFuture =
            CompletableFuture.supplyAsync(() -> fetchOrders(1L), executor);
        CompletableFuture<String> recsFuture =
            CompletableFuture.supplyAsync(() -> fetchRecommendations(1L), executor);

        // 三个请求并行执行,全部完成后组合
        String result = profileFuture.thenCombine(ordersFuture, (p, o) -> p + " | " + o)
            .thenCombine(recsFuture, (po, r) -> po + " | " + r)
            .get();
        System.out.println(result);

        executor.shutdown();
    }

    static String fetchProfile(Long id) { sleep(300); return "Profile-" + id; }
    static String fetchOrders(Long id) { sleep(500); return "Orders-" + id; }
    static String fetchRecommendations(Long id) { sleep(400); return "Recs-" + id; }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

8. 并发集合

8.1 ConcurrentHashMap原理

Java
// JDK7:分段锁(Segment数组 + HashEntry数组)
// 每个Segment是一把ReentrantLock,最多16个并发写

// JDK8+:CAS + synchronized + 红黑树
// 数组(Node[]) + 链表 + 红黑树(与HashMap结构类似,但线程安全)
// 写操作:对桶头节点加synchronized(粒度更细:锁单个桶而非段)
// 读操作:无锁(volatile保证可见性)
// 初始化/扩容:CAS操作

// 关键方法
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 原子操作
map.putIfAbsent("key", 1);           // key不存在时才put
map.computeIfAbsent("key", k -> 42); // key不存在时计算再put
map.compute("key", (k, v) -> (v == null) ? 1 : v + 1); // 原子累加
map.merge("key", 1, Integer::sum);   // 合并(累加)

// 注意:size()是估算值;批量操作(forEach, search, reduce)默认并行阈值可调
map.forEach(1, (k, v) -> System.out.println(k + "=" + v)); // 并行遍历

8.2 其他并发集合

Java
// CopyOnWriteArrayList — 读多写极少场景
// 写时复制整个数组(写操作加锁),读操作无锁
// 适用:事件监听器列表、配置信息等
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item");  // 内部复制整个数组

// BlockingQueue — 生产者-消费者核心组件
// ArrayBlockingQueue   — 有界数组,创建时指定容量
// LinkedBlockingQueue  — 可选有界链表(默认Integer.MAX_VALUE)
// PriorityBlockingQueue — 优先级队列(无界)
// SynchronousQueue     — 无缓冲,生产者直接交给消费者
// DelayQueue           — 延迟队列(元素到期才能取出)

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("task");      // 队列满时阻塞
queue.offer("task", 3, TimeUnit.SECONDS);  // 超时返回false
String item = queue.take();  // 队列空时阻塞
String item2 = queue.poll(3, TimeUnit.SECONDS);  // 超时返回null

9. Fork/Join框架

Java
import java.util.concurrent.*;

/**
 * Fork/Join框架 — 分治思想 + 工作窃取算法
 * 适合递归可分的计算密集型任务
 */
public class ForkJoinDemo {

    // 计算数组元素之和
    static class SumTask extends RecursiveTask<Long> {
        private static final int THRESHOLD = 10000;
        private final long[] array;
        private final int start, end;

        SumTask(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                // 基础情况:直接计算
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                return sum;
            }
            // 递归拆分
            int mid = (start + end) / 2;
            SumTask left = new SumTask(array, start, mid);
            SumTask right = new SumTask(array, mid, end);
            left.fork();   // 异步执行左半部分
            long rightResult = right.compute();  // 同步执行右半部分
            long leftResult = left.join();       // 等待左半部分完成
            return leftResult + rightResult;
        }
    }

    public static void main(String[] args) {
        long[] array = new long[10_000_000];
        for (int i = 0; i < array.length; i++) array[i] = i + 1;

        ForkJoinPool pool = new ForkJoinPool();  // 默认线程数 = CPU核数
        long result = pool.invoke(new SumTask(array, 0, array.length));
        System.out.println("Sum = " + result);
        // 工作窃取(Work Stealing):空闲线程从其他线程的双端队列尾部窃取任务
    }
}

10. 虚拟线程 vs 传统线程(Java 21)

Java
public class VirtualVsPlatformThread {

    public static void main(String[] args) throws Exception {

        // ===== 性能对比 =====
        int taskCount = 100_000;

        // 平台线程:受OS线程数限制,10万个线程几乎不可能
        // long start = System.currentTimeMillis();
        // for (int i = 0; i < taskCount; i++) {
        //     Thread.ofPlatform().start(() -> {
        //         try { Thread.sleep(1000); } catch (Exception e) {}
        //     });
        // }  // 可能OOM: unable to create native thread

        // 虚拟线程:轻松创建10万个
        long start = System.currentTimeMillis();
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < taskCount; i++) {
                executor.submit(() -> {
                    try { Thread.sleep(Duration.ofSeconds(1)); } catch (Exception e) {}
                });
            }
        }  // try-with-resources自动关闭并等待
        long elapsed = System.currentTimeMillis() - start;
        System.out.println("虚拟线程完成 " + taskCount + " 个任务,耗时: " + elapsed + "ms");

        // ===== 关键区别 =====
        // | 维度       | 平台线程 (Platform)     | 虚拟线程 (Virtual)       |
        // |-----------|------------------------|--------------------------|
        // | 映射      | 1:1 映射OS线程           | M:N 映射(多虚拟:少平台)  |
        // | 内存      | ~1MB 栈空间              | ~几KB,按需增长            |
        // | 创建数量  | 数千级                   | 百万级                    |
        // | 调度      | OS内核调度               | JVM用户态调度              |
        // | 适用场景  | CPU密集型                | IO密集型                  |
        // | 线程池    | 需要池化复用              | 不需要池化(即用即抛)      |
        // | synchronized | 会pin住载体线程       | 推荐用ReentrantLock替代   |

        // ===== 注意事项 =====
        // 1. 虚拟线程中应避免长时间的synchronized块(会pin住载体线程)
        //    推荐改用ReentrantLock
        // 2. 不要池化虚拟线程(它们本身就是轻量的)
        // 3. ThreadLocal在虚拟线程中有效但要注意内存,推荐Scoped Values
        // 4. CPU密集型任务用平台线程仍然更合适
    }
}

✏️ 练习

  1. 基础:使用CountDownLatch模拟赛跑场景(5名运动员准备 → 裁判发枪 → 所有人跑完 → 输出成绩)
  2. 中级:用ThreadPoolExecutor手动创建线程池(4核心、8最大、100队列),提交200个任务,观察拒绝策略触发;补充线程池监控
  3. 高级:使用CompletableFuture实现:并行调用price-service(获取价格)、stock-service(获取库存)、discount-service(获取折扣),三者均有超时处理和fallback,最终组合为商品详情

📋 面试要点

  1. volatile和synchronized区别? — volatile保证可见性和有序性但不保证原子性(适合单读写);synchronized保证三者(互斥锁)
  2. AQS原理? — volatile state + CLH双向队列 + CAS;独占模式(ReentrantLock)和共享模式(Semaphore/CountDownLatch)
  3. 线程池核心参数和执行流程? — 7参数(corePoolSize → workQueue → maximumPoolSize → handler),核心满放队列 → 队列满建非核心 → 全满触发拒绝
  4. 为什么不推荐Executors工厂方法? — FixedThreadPool/SingleThread用无界队列(可能OOM),CachedPool最大线程数无限(可能OOM)
  5. ConcurrentHashMap JDK7 vs 8? — 7分段锁(Segment+ReentrantLock),8 CAS+synchronized+红黑树(锁粒度从Segment细化到桶)
  6. 死锁条件?如何排查? — 互斥、请求与保持、不可剥夺、循环等待;jstack查看线程状态,Arthas thread -b
  7. CompletableFuture vs Future? — Future只能阻塞get();CompletableFuture支持链式回调(thenApply/thenCombine)、异常处理、组合
  8. 虚拟线程的使用注意? — 避免长synchronized(改用ReentrantLock)、不要池化、不适合CPU密集型

📌 下一步学习16-数据库访问技术 — 掌握Java数据库持久层技术栈