跳转至

🔄 C++并发编程

📋 学习目标

通过本章学习,你将能够:

  • 掌握 std::thread 的创建、管理和参数传递
  • 理解互斥量(mutex)家族及 RAII 锁管理
  • 使用条件变量实现线程间同步
  • 运用 std::async/std::future/std::promise 进行异步编程
  • 掌握原子操作与内存序模型
  • 了解 C++20 协程的基本用法
  • 能独立实现线程池、生产者-消费者等经典并发模式
  • 具备并发面试题的解答能力

C++并发工具箱总览

上图将线程、同步、原子与协程能力放在同一视图,便于建立并发知识地图。


1️⃣ std::thread 线程基础

1.1 创建与管理线程

C++
#include <iostream>
#include <thread>

void hello() {
    std::cout << "Hello from thread "
              << std::this_thread::get_id() << std::endl;
}

int main() {
    std::thread t(hello);  // 创建线程

    std::cout << "Main thread: " << std::this_thread::get_id() << std::endl;
    std::cout << "Hardware concurrency: "
              << std::thread::hardware_concurrency() << std::endl;

    t.join();  // 等待线程完成
    return 0;
}

1.2 join 与 detach

C++
#include <iostream>
#include <thread>
#include <chrono>

void worker(int id, int ms) {
    std::this_thread::sleep_for(std::chrono::milliseconds(ms));
    std::cout << "Worker " << id << " done\n";
}

int main() {
    // join: 主线程等待子线程完成
    std::thread t1(worker, 1, 100);
    t1.join();  // 阻塞直到 t1 完成

    // detach: 分离线程,后台独立运行
    std::thread t2(worker, 2, 50);
    t2.detach();  // t2 在后台运行,生命周期与主线程无关

    // ⚠️ 注意:线程对象销毁前必须 join 或 detach,否则 std::terminate
    // std::thread t3(worker, 3, 10);  // 未 join/detach → 程序终止

    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    return 0;
}

1.3 线程参数传递

C++
#include <iostream>
#include <thread>
#include <string>

// 值传递
void by_value(int x) {
    x = 100;
}

// 引用传递 —— 必须使用 std::ref
void by_ref(int& x) {
    x = 100;
}

// 移动语义
void by_move(std::string s) {
    std::cout << "Moved string: " << s << std::endl;
}

// 成员函数
struct Task {
    void run(int id) {
        std::cout << "Task::run id=" << id << std::endl;
    }
};

int main() {
    // 值传递
    int a = 0;
    std::thread t1(by_value, a);
    t1.join();
    std::cout << "a after by_value: " << a << std::endl;  // 0(未改变)

    // 引用传递
    int b = 0;
    std::thread t2(by_ref, std::ref(b));  // ⚠️ 必须 std::ref
    t2.join();
    std::cout << "b after by_ref: " << b << std::endl;    // 100

    // 移动传递
    std::string str = "Hello Concurrency";
    std::thread t3(by_move, std::move(str));  // std::move 将左值转为右值引用,转移所有权
    t3.join();
    // str 已被移动,不可再用

    // 成员函数
    Task task;
    std::thread t4(&Task::run, &task, 42);
    t4.join();

    // Lambda
    int val = 10;
    std::thread t5([&val]() { val += 5; });
    t5.join();
    std::cout << "val after lambda: " << val << std::endl;  // 15

    return 0;
}

1.4 RAII 线程管理(C++20 jthread)

C++
#include <iostream>
#include <thread>
#include <stop_token>

// C++20 jthread: 自动 join + 支持协作取消
void cancellable_work(std::stop_token stoken, int id) {
    while (!stoken.stop_requested()) {
        std::cout << "Worker " << id << " running...\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    std::cout << "Worker " << id << " stopped.\n";
}

int main() {
    std::jthread jt(cancellable_work, 1);
    std::this_thread::sleep_for(std::chrono::milliseconds(350));
    jt.request_stop();  // 请求停止
    // jt 析构时自动 join,无需手动调用
    return 0;
}

2️⃣ 互斥量与锁

2.1 std::mutex 基础

C++
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::mutex mtx;  // std::mutex 互斥量,保护共享数据
int counter = 0;

void increment(int n) {
    for (int i = 0; i < n; ++i) {
        mtx.lock();
        ++counter;      // 临界区
        mtx.unlock();
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i)
        threads.emplace_back(increment, 100000);
    for (auto& t : threads) t.join();

    std::cout << "Counter: " << counter << std::endl;  // 400000(正确)
    return 0;
}

2.2 lock_guard(RAII 自动加锁/解锁)

C++
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int shared_data = 0;

void safe_increment(int n) {
    for (int i = 0; i < n; ++i) {
        std::lock_guard<std::mutex> lock(mtx);  // 构造时加锁
        ++shared_data;
        // 离开作用域自动解锁,即使抛异常也安全
    }
}

int main() {
    std::thread t1(safe_increment, 100000);
    std::thread t2(safe_increment, 100000);
    t1.join(); t2.join();
    std::cout << "Result: " << shared_data << std::endl;  // 200000
    return 0;
}

2.3 unique_lock(灵活锁管理)

C++
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;

void flexible_locking() {
    // 延迟加锁
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
    // ... 做一些不需要锁的工作 ...
    lock.lock();    // 手动加锁
    std::cout << "Locked section\n";
    lock.unlock();  // 手动解锁

    // 尝试加锁(非阻塞)
    if (lock.try_lock()) {
        std::cout << "Got the lock\n";
        lock.unlock();
    }

    // 可以转移所有权
    std::unique_lock<std::mutex> lock2(std::move(lock));
}

// 避免死锁:std::lock 同时锁定多个互斥量
std::mutex mtx_a, mtx_b;

void no_deadlock() {
    std::unique_lock<std::mutex> la(mtx_a, std::defer_lock);
    std::unique_lock<std::mutex> lb(mtx_b, std::defer_lock);
    std::lock(la, lb);  // 原子地锁定两个互斥量,避免死锁
    std::cout << "Both locked safely\n";
}

// C++17: std::scoped_lock(更简洁的多锁方案)
void scoped_lock_demo() {
    std::scoped_lock lock(mtx_a, mtx_b);  // 同时锁定,自动解锁
    std::cout << "scoped_lock: both locked\n";
}

2.4 shared_mutex(读写锁)

C++
#include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>
#include <map>

class ThreadSafeMap {
    mutable std::shared_mutex rw_mutex_;
    std::map<std::string, int> data_;

public:
    // 读操作 —— 共享锁,多个读者可并发
    int read(const std::string& key) const {
        std::shared_lock<std::shared_mutex> lock(rw_mutex_);
        auto it = data_.find(key);
        return (it != data_.end()) ? it->second : -1;
    }

    // 写操作 —— 独占锁,互斥访问
    void write(const std::string& key, int value) {
        std::unique_lock<std::shared_mutex> lock(rw_mutex_);
        data_[key] = value;
    }

    size_t size() const {
        std::shared_lock<std::shared_mutex> lock(rw_mutex_);
        return data_.size();
    }
};

int main() {
    ThreadSafeMap tsm;

    // 多个写者
    std::vector<std::thread> writers;
    for (int i = 0; i < 5; ++i)
        writers.emplace_back([&tsm, i]() {
            tsm.write("key" + std::to_string(i), i * 10);
        });
    for (auto& t : writers) t.join();

    // 多个读者并发
    std::vector<std::thread> readers;
    for (int i = 0; i < 5; ++i)
        readers.emplace_back([&tsm, i]() {
            std::cout << "key" << i << " = " << tsm.read("key" + std::to_string(i)) << "\n";
        });
    for (auto& t : readers) t.join();

    return 0;
}

3️⃣ 条件变量 std::condition_variable

C++
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool finished = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            data_queue.push(i);
            std::cout << "Produced: " << i << "\n";
        }
        cv.notify_one();  // 通知一个等待的消费者
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
    {
        std::lock_guard<std::mutex> lock(mtx);
        finished = true;
    }
    cv.notify_all();  // 通知所有消费者:生产结束
}

void consumer(int id) {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        // wait 接受谓词,防止虚假唤醒(spurious wakeup)
        cv.wait(lock, []{ return !data_queue.empty() || finished; });

        while (!data_queue.empty()) {
            int val = data_queue.front();
            data_queue.pop();
            std::cout << "Consumer " << id << " got: " << val << "\n";
        }

        if (finished && data_queue.empty()) break;
    }
}

int main() {
    std::thread prod(producer);
    std::thread cons1(consumer, 1);
    std::thread cons2(consumer, 2);

    prod.join();
    cons1.join();
    cons2.join();
    return 0;
}

要点cv.wait(lock, predicate) 等价于 while(!predicate()) cv.wait(lock);,能正确处理虚假唤醒。


4️⃣ std::async / std::future / std::promise

4.1 std::async 异步任务

C++
#include <iostream>
#include <future>
#include <cmath>

double heavy_computation(double x) {
    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    return std::sqrt(x * x + 1.0);
}

int main() {
    // std::launch::async   → 强制新线程执行
    // std::launch::deferred → 延迟执行(调用 get() 时才执行)
    // 默认 = async | deferred,由实现决定

    auto f1 = std::async(std::launch::async, heavy_computation, 3.0);  // std::async 异步执行任务,返回future
    auto f2 = std::async(std::launch::async, heavy_computation, 4.0);

    // 主线程可以做其他事...
    std::cout << "Waiting for results...\n";

    // get() 阻塞直到结果就绪
    double r1 = f1.get();
    double r2 = f2.get();

    std::cout << "Results: " << r1 << ", " << r2 << std::endl;
    return 0;
}

4.2 std::promise / std::future 手动传值

C++
#include <iostream>
#include <thread>
#include <future>

void compute(std::promise<int> prom, int x) {
    try {
        if (x < 0) throw std::runtime_error("Negative input!");
        int result = x * x;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        prom.set_value(result);  // 设置结果
    } catch (...) {
        prom.set_exception(std::current_exception());  // 传递异常
    }
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread t(compute, std::move(prom), 7);

    try {
        int result = fut.get();  // 阻塞等待结果
        std::cout << "Result: " << result << std::endl;  // 49
    } catch (const std::exception& e) {
        std::cerr << "Error: " << e.what() << std::endl;
    }

    t.join();
    return 0;
}

4.3 std::shared_future 多线程共享

C++
#include <iostream>
#include <thread>
#include <future>
#include <vector>

int main() {
    auto shared_fut = std::async(std::launch::async, []() {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        return 42;
    }).share();  // 转为 shared_future

    // 多个线程可以同时 get()
    std::vector<std::thread> threads;
    for (int i = 0; i < 3; ++i) {
        threads.emplace_back([shared_fut, i]() {
            int val = shared_fut.get();
            std::cout << "Thread " << i << " got: " << val << "\n";
        });
    }
    for (auto& t : threads) t.join();
    return 0;
}

5️⃣ std::atomic 原子操作与内存序

5.1 原子变量基础

C++
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>

std::atomic<int> counter{0};

void atomic_increment(int n) {
    for (int i = 0; i < n; ++i) {
        counter.fetch_add(1, std::memory_order_relaxed);
        // 等价于 counter++ (默认 memory_order_seq_cst)
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i)
        threads.emplace_back(atomic_increment, 100000);
    for (auto& t : threads) t.join();

    std::cout << "Atomic counter: " << counter.load() << std::endl;  // 400000
    return 0;
}

5.2 CAS(Compare-And-Swap)操作

C++
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>

std::atomic<int> value{0};

// 无锁递增——使用 CAS 循环
void cas_increment(int n) {
    for (int i = 0; i < n; ++i) {
        int expected = value.load(std::memory_order_relaxed);
        while (!value.compare_exchange_weak(
            expected, expected + 1,
            std::memory_order_release,
            std::memory_order_relaxed)) {
            // expected 自动更新为当前值,循环重试
        }
    }
}

int main() {
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i)
        threads.emplace_back(cas_increment, 100000);
    for (auto& t : threads) t.join();
    std::cout << "CAS result: " << value.load() << std::endl;  // 400000
    return 0;
}

5.3 内存序详解

C++
#include <atomic>
#include <thread>
#include <iostream>
#include <cassert>

/*
 * C++ 内存序(Memory Order)—— 从宽松到严格
 *
 * memory_order_relaxed    - 仅保证原子性,不保证顺序
 * memory_order_acquire    - 本线程中,后续读写不会被重排到此操作之前
 * memory_order_release    - 本线程中,之前的读写不会被重排到此操作之后
 * memory_order_acq_rel    - acquire + release 的组合
 * memory_order_seq_cst    - 顺序一致性(默认,最严格,性能最低)
 *
 * 核心规则:
 * - release 写 与 acquire 读 配对,构成 happens-before 关系
 * - seq_cst 保证全局一致的操作顺序(所有线程看到相同的顺序)
 */

std::atomic<bool> ready{false};
std::atomic<int> data{0};

void writer() {
    data.store(42, std::memory_order_relaxed);    // ① 先写数据
    ready.store(true, std::memory_order_release);  // ② release 屏障
}

void reader() {
    while (!ready.load(std::memory_order_acquire)) {}  // ③ acquire 屏障
    int value = data.load(std::memory_order_relaxed);   // ④ 一定能看到 42
    assert(value == 42);  // 由 ②→③ 的 happens-before 保证
    std::cout << "Read value: " << value << std::endl;
}

int main() {
    std::thread t1(writer);
    std::thread t2(reader);
    t1.join(); t2.join();
    return 0;
}

5.4 自旋锁实现

C++
#include <atomic>
#include <thread>
#include <iostream>
#ifdef _MSC_VER
#include <intrin.h>   // MSVC: _mm_pause()
#endif

class SpinLock {
    std::atomic_flag flag_{};  // C++20: 默认初始化为 clear(ATOMIC_FLAG_INIT 已弃用)
public:
    void lock() {
        while (flag_.test_and_set(std::memory_order_acquire)) {
            // 自旋等待(pause 指令降低 CPU 功耗与总线争用)
            #if defined(__x86_64__) || defined(__i386__)
            __builtin_ia32_pause();       // GCC / Clang
            #elif defined(_M_X64) || defined(_M_IX86)
            _mm_pause();                  // MSVC
            #endif
        }
    }
    void unlock() {
        flag_.clear(std::memory_order_release);
    }
};

SpinLock spin;
int shared_counter = 0;

void spin_increment(int n) {
    for (int i = 0; i < n; ++i) {
        spin.lock();
        ++shared_counter;
        spin.unlock();
    }
}

int main() {
    std::thread t1(spin_increment, 100000);
    std::thread t2(spin_increment, 100000);
    t1.join(); t2.join();
    std::cout << "Spinlock counter: " << shared_counter << std::endl;
    return 0;
}

6️⃣ C++20 协程

6.1 协程基础概念

Text Only
传统函数:    调用 → 执行 → 返回 (一次性)
协程:        调用 → 执行 → 挂起 → 恢复 → 执行 → 挂起 → ... → 返回

关键字:
  co_await  — 挂起协程,等待异步操作完成
  co_yield  — 挂起并产出一个值(生成器模式)
  co_return — 完成协程并返回最终值

6.2 Generator 示例(co_yield)

C++
#include <coroutine>
#include <iostream>
#include <optional>

template<typename T>
struct Generator {
    struct promise_type {
        T current_value;

        Generator get_return_object() {
            return Generator{
                std::coroutine_handle<promise_type>::from_promise(*this)
            };
        }
        std::suspend_always initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        std::suspend_always yield_value(T value) {
            current_value = value;
            return {};
        }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };

    std::coroutine_handle<promise_type> handle_;

    Generator(std::coroutine_handle<promise_type> h) : handle_(h) {}
    ~Generator() { if (handle_) handle_.destroy(); }

    // 禁止拷贝
    Generator(const Generator&) = delete;
    Generator& operator=(const Generator&) = delete;
    Generator(Generator&& other) : handle_(other.handle_) { other.handle_ = nullptr; }

    std::optional<T> next() {
        if (!handle_ || handle_.done()) return std::nullopt;
        handle_.resume();
        if (handle_.done()) return std::nullopt;
        return handle_.promise().current_value;
    }
};

// 使用 co_yield 生成斐波那契数列
Generator<uint64_t> fibonacci() {
    uint64_t a = 0, b = 1;
    while (true) {
        co_yield a;
        auto next = a + b;
        a = b;
        b = next;
    }
}

int main() {
    auto fib = fibonacci();
    for (int i = 0; i < 15; ++i) {
        auto val = fib.next();
        if (val) std::cout << *val << " ";
    }
    std::cout << std::endl;
    // 输出: 0 1 1 2 3 5 8 13 21 34 55 89 144 233 377
    return 0;
}

6.3 简易 Task(co_await / co_return)

C++
#include <coroutine>
#include <iostream>

struct Task {
    struct promise_type {
        int result_value;
        Task get_return_object() {
            return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void return_value(int v) { result_value = v; }
        void unhandled_exception() { std::terminate(); }
    };

    std::coroutine_handle<promise_type> handle_;
    Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
    ~Task() { if (handle_) handle_.destroy(); }

    int get_result() { return handle_.promise().result_value; }
};

Task compute_async(int x) {
    // 在实际应用中,co_await 可以等待 I/O 完成
    std::cout << "Computing " << x << " * " << x << "...\n";
    co_return x * x;
}

int main() {
    Task t = compute_async(7);
    std::cout << "Result: " << t.get_result() << std::endl;  // 49
    return 0;
}

7️⃣ 线程池实现

C++
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <vector>
#include <future>

class ThreadPool {
public:
    explicit ThreadPool(size_t num_threads) : stop_(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this]() {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex_);
                        condition_.wait(lock, [this]{
                            return stop_ || !tasks_.empty();
                        });
                        if (stop_ && tasks_.empty()) return;
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    task();
                }
            });
        }
    }

    // 提交任务,返回 future
    template<typename F, typename... Args>
    auto submit(F&& f, Args&&... args)
        -> std::future<decltype(f(args...))>
    {
        using return_type = decltype(f(args...));
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        std::future<return_type> result = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            if (stop_) throw std::runtime_error("submit on stopped pool");
            tasks_.emplace([task]() { (*task)(); });
        }
        condition_.notify_one();
        return result;
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (auto& worker : workers_)
            worker.join();
    }

private:
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
};

int main() {
    ThreadPool pool(4);

    // 提交多个任务
    std::vector<std::future<int>> results;
    for (int i = 0; i < 10; ++i) {
        results.push_back(pool.submit([i]() {
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
            return i * i;
        }));
    }

    for (auto& fut : results)
        std::cout << fut.get() << " ";
    std::cout << std::endl;
    // 输出: 0 1 4 9 16 25 36 49 64 81
    return 0;
}

8️⃣ 生产者-消费者模式(有界缓冲区)

C++
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <optional>

template<typename T>
class BoundedQueue {
public:
    explicit BoundedQueue(size_t capacity) : capacity_(capacity) {}

    void push(T item) {
        std::unique_lock<std::mutex> lock(mtx_);
        not_full_.wait(lock, [this]{ return queue_.size() < capacity_; });
        queue_.push(std::move(item));
        not_empty_.notify_one();
    }

    T pop() {
        std::unique_lock<std::mutex> lock(mtx_);
        not_empty_.wait(lock, [this]{ return !queue_.empty(); });
        T item = std::move(queue_.front());
        queue_.pop();
        not_full_.notify_one();
        return item;
    }

    // 带超时的出队
    std::optional<T> try_pop(std::chrono::milliseconds timeout) {
        std::unique_lock<std::mutex> lock(mtx_);
        if (!not_empty_.wait_for(lock, timeout, [this]{ return !queue_.empty(); }))
            return std::nullopt;
        T item = std::move(queue_.front());
        queue_.pop();
        not_full_.notify_one();
        return item;
    }

private:
    std::queue<T> queue_;
    size_t capacity_;
    std::mutex mtx_;
    std::condition_variable not_full_;
    std::condition_variable not_empty_;
};

int main() {
    BoundedQueue<int> bq(5);  // 容量为 5

    // 3 个生产者
    std::vector<std::thread> producers;
    for (int p = 0; p < 3; ++p) {
        producers.emplace_back([&bq, p]() {
            for (int i = 0; i < 5; ++i) {
                int val = p * 100 + i;
                bq.push(val);
                std::cout << "P" << p << " -> " << val << "\n";
            }
        });
    }

    // 2 个消费者
    std::vector<std::thread> consumers;
    for (int c = 0; c < 2; ++c) {
        consumers.emplace_back([&bq, c]() {
            for (int i = 0; i < 7; ++i) {  // 每个消费者取 7 个
                int val = bq.pop();
                std::cout << "  C" << c << " <- " << val << "\n";
            }
        });
    }

    // 还有 1 个元素由主线程消费
    for (auto& t : producers) t.join();
    for (auto& t : consumers) t.join();
    int last = bq.pop();
    std::cout << "Main <- " << last << "\n";

    return 0;
}

9️⃣ 并发数据结构设计技巧

9.1 无锁栈(Lock-Free Stack)

C++
#include <atomic>
#include <memory>
#include <iostream>

template<typename T>
class LockFreeStack {
    struct Node {
        T data;
        Node* next;
        Node(T val) : data(std::move(val)), next(nullptr) {}
    };

    std::atomic<Node*> head_{nullptr};

public:
    void push(T val) {
        Node* new_node = new Node(std::move(val));
        new_node->next = head_.load(std::memory_order_relaxed);
        // CAS 循环:如果 head 未变,则更新成功
        while (!head_.compare_exchange_weak(
            new_node->next, new_node,
            std::memory_order_release,
            std::memory_order_relaxed)) {}
    }

    // ⚠️ ABA 问题 & Use-After-Free 风险:
    //   1. ABA:线程A读到 old_head=X,线程B pop X 再 push 新节点到同一地址X,
    //      线程A的 CAS 看到 head 仍是 X 便成功,但 X->next 已失效 → 数据结构损坏。
    //   2. Use-After-Free:线程A读到 old_head->next 之前,线程B 可能已 delete old_head。
    //   生产环境解决方案:Hazard Pointers、Epoch-Based Reclamation 或
    //   std::atomic<std::shared_ptr<Node>>(C++20)。
    bool pop(T& result) {
        Node* old_head = head_.load(std::memory_order_acquire);
        while (old_head && !head_.compare_exchange_weak(
            old_head, old_head->next,
            std::memory_order_acq_rel,
            std::memory_order_acquire)) {}
        if (!old_head) return false;
        result = std::move(old_head->data);
        delete old_head;  // 简化处理,生产环境需用 hazard pointer 或 RCU
        return true;
    }

    ~LockFreeStack() {
        T dummy;
        while (pop(dummy)) {}
    }
};

int main() {
    LockFreeStack<int> stack;
    stack.push(1);
    stack.push(2);
    stack.push(3);

    int val;
    while (stack.pop(val))
        std::cout << val << " ";  // 3 2 1
    std::cout << std::endl;
    return 0;
}

9.2 并发设计原则

Text Only
📌 并发数据结构设计要点:

1. 减小锁粒度
   - 全局一把锁 → 分段锁(如 ConcurrentHashMap 思路)
   - 读写分离 → shared_mutex

2. 无锁编程层次
   - Wait-free  > Lock-free  > Obstruction-free > Lock-based
   - 无锁并非总是更快,争用低时 mutex 可能更优

3. 避免伪共享(False Sharing)
   - 不同线程频繁访问同一缓存行的不同变量
   - 用 alignas(64) 或 std::hardware_destructive_interference_size 对齐

4. 注意 ABA 问题
   - CAS 中值 A→B→A,CAS 会误判"未变"
   - 解决:带版本号的指针(tagged pointer)

5. 内存回收
   - 无锁结构中删除节点需要安全回收
   - 方案:Hazard Pointer、Epoch-Based Reclamation、RCU
C++
// 伪共享示例与解决
#include <atomic>
#include <thread>
#include <iostream>
#include <chrono>

// ❌ 有伪共享:两个原子变量可能在同一缓存行
struct BadAlign {
    std::atomic<int> a{0};
    std::atomic<int> b{0};
};

// ✅ 避免伪共享:强制对齐到不同缓存行
struct GoodAlign {
    alignas(64) std::atomic<int> a{0};
    alignas(64) std::atomic<int> b{0};
};

template<typename T>
void benchmark(const char* name) {
    T data;
    auto start = std::chrono::high_resolution_clock::now();
    std::thread t1([&]{ for(int i=0;i<10000000;++i) data.a.fetch_add(1, std::memory_order_relaxed); });
    std::thread t2([&]{ for(int i=0;i<10000000;++i) data.b.fetch_add(1, std::memory_order_relaxed); });
    t1.join(); t2.join();
    auto end = std::chrono::high_resolution_clock::now();
    auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
    std::cout << name << ": " << ms << " ms\n";
}

int main() {
    benchmark<BadAlign>("BadAlign (false sharing)");
    benchmark<GoodAlign>("GoodAlign (no false sharing)");
    return 0;
}

🔟 读写锁完整实现

C++
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <thread>
#include <vector>

// 手动实现读写锁(不依赖 shared_mutex)
class ReadWriteLock {
    std::mutex mtx_;
    std::condition_variable cv_;
    int readers_ = 0;
    bool writer_active_ = false;
    int waiting_writers_ = 0;  // 写者优先

public:
    void lock_read() {
        std::unique_lock<std::mutex> lock(mtx_);
        // 写者优先:有等待的写者时,新读者需等待
        cv_.wait(lock, [this]{ return !writer_active_ && waiting_writers_ == 0; });
        ++readers_;
    }

    void unlock_read() {
        std::unique_lock<std::mutex> lock(mtx_);
        --readers_;
        if (readers_ == 0) cv_.notify_all();
    }

    void lock_write() {
        std::unique_lock<std::mutex> lock(mtx_);
        ++waiting_writers_;
        cv_.wait(lock, [this]{ return !writer_active_ && readers_ == 0; });
        --waiting_writers_;
        writer_active_ = true;
    }

    void unlock_write() {
        std::unique_lock<std::mutex> lock(mtx_);
        writer_active_ = false;
        cv_.notify_all();
    }
};

int main() {
    ReadWriteLock rwlock;
    int shared_data = 0;

    std::vector<std::thread> threads;

    // 5 个读者
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back([&rwlock, &shared_data, i]() {
            for (int j = 0; j < 3; ++j) {
                rwlock.lock_read();
                std::cout << "Reader " << i << " reads: " << shared_data << "\n";
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
                rwlock.unlock_read();
            }
        });
    }

    // 2 个写者
    for (int i = 0; i < 2; ++i) {
        threads.emplace_back([&rwlock, &shared_data, i]() {
            for (int j = 0; j < 3; ++j) {
                rwlock.lock_write();
                ++shared_data;
                std::cout << "  Writer " << i << " wrote: " << shared_data << "\n";
                std::this_thread::sleep_for(std::chrono::milliseconds(20));
                rwlock.unlock_write();
            }
        });
    }

    for (auto& t : threads) t.join();
    return 0;
}

🎯 面试题精选

Q1: 死锁如何避免?

Text Only
死锁四个必要条件(Coffman 条件):
1. 互斥 —— 资源被独占
2. 持有并等待 —— 持有资源的同时等待其他资源
3. 不可抢占 —— 资源不可被强制释放
4. 循环等待 —— 存在资源的循环等待链

避免策略:
✅ 固定加锁顺序 —— 所有线程以相同顺序获取锁
✅ std::lock() / std::scoped_lock —— 原子地锁定多个互斥量
✅ try_lock 超时机制 —— 获取失败时释放已有锁并重试
✅ 减少锁粒度 —— 缩小临界区,减少冲突概率
✅ 无锁算法 —— 使用 atomic 操作替代 mutex

Q2: atomic 和 mutex 的区别?

Text Only
              atomic                         mutex
──────────────────────────────────────────────────────────
实现层级     硬件指令(CAS/LL-SC)           OS 内核(futex/临界区)
适用场景     简单变量的原子读/写/RMW         保护复杂的临界区代码段
开销         极低(纳秒级,无上下文切换)     较高(可能导致线程阻塞/唤醒)
阻塞方式     非阻塞(自旋)                  阻塞(线程挂起)
组合操作     不支持(仅单个变量原子)         支持(锁内可操作多个变量)

选择原则:
- 单个变量的简单操作 → atomic
- 多个变量的复合操作 → mutex
- 高争用场景 → mutex(自旋浪费 CPU)
- 低争用、性能敏感 → atomic

Q3: 什么是内存序?为什么需要它?

Text Only
CPU 和编译器为了优化性能,可能会重排指令执行顺序。
在单线程中这不会有问题,但在多线程中,重排可能导致一个线程
看到另一个线程的操作顺序与代码编写顺序不同。

内存序(Memory Order)就是告诉编译器和 CPU:
"这个原子操作和周围的操作之间的顺序约束是什么"

从宽松到严格:
  relaxed  → 仅原子性,无顺序保证(性能最好)
  acquire  → 此操作之后的读写不会被重排到之前
  release  → 此操作之前的读写不会被重排到之后
  seq_cst  → 全局顺序一致(默认,最安全,性能最差)

实践建议:没有充分理由不要使用非 seq_cst 内存序

Q4: std::condition_variable 为什么需要配合 unique_lock?

Text Only
1. wait() 内部需要原子地释放锁并挂起线程:
   - 释放锁 → 让其他线程能修改条件
   - 挂起线程 → 避免忙等
   这两步必须是原子的,否则存在竞态条件

2. unique_lock 支持 lock()/unlock() 动态操作
   lock_guard 不支持,所以不能用

3. 被唤醒后,wait() 会自动重新获取锁
   然后检查谓词(防虚假唤醒),这也需要持有锁

Q5: 什么是虚假唤醒(Spurious Wakeup)?

Text Only
操作系统实现中,condition_variable::wait() 可能在没有
notify 的情况下返回。这不是 bug,而是底层实现的特性。

解决方案:始终使用带谓词的 wait:
    cv.wait(lock, []{ return condition_met; });

等价于:
    while (!condition_met) cv.wait(lock);

这样即使虚假唤醒,也会重新检查条件。

Q6: std::async 的 launch 策略有什么区别?

Text Only
std::launch::async    → 保证在新线程上执行
std::launch::deferred → 延迟执行,在 get()/wait() 时在调用线程执行
默认(async | deferred)→ 由实现决定

⚠️ 陷阱:
- 默认策略下 future 析构会阻塞(等待任务完成)
- deferred 会在 get() 的线程执行,可能不是你想要的
- 建议:明确指定 std::launch::async,避免意外

Q7: 线程池相比直接创建线程有什么优势?

Text Only
1. 避免频繁创建/销毁线程的开销(线程创建涉及 OS 资源分配)
2. 控制并发度,防止线程过多耗尽系统资源
3. 任务排队机制,支持背压策略
4. 线程复用,减少上下文切换
5. 统一的错误处理和生命周期管理

适用场景:
  - Web 服务器处理请求
  - 批量数据处理
  - 任何需要频繁执行短时任务的场景

Q8: compare_exchange_weak 和 compare_exchange_strong 的区别?

Text Only
weak:   可能虚假失败(即使预期值匹配也可能失败)
        → 用在循环中,性能更好(某些架构上避免内存屏障)

strong: 保证只在预期值不匹配时才失败
        → 用在非循环场景,或者失败重试代价很高时

一般实践:
  while(!x.compare_exchange_weak(exp, desired)) {} // ← 适合 weak
  if (x.compare_exchange_strong(exp, desired)) {}   // ← 单次尝试用 strong

Q9: C++20 的 jthread 相比 thread 有何改进?

Text Only
1. 自动 join:析构时自动调用 join(),不会忘记 join 导致 terminate
2. 协作取消:内置 stop_token/stop_source 机制
   - request_stop() —— 请求取消
   - stop_token.stop_requested() —— 检查是否收到取消请求
3. 可中断的条件变量等待(condition_variable_any + stop_token)

本质上是 thread + RAII + 协作取消 的组合

✅ 学习检查清单

主题 掌握情况
std::thread 创建/join/detach
线程参数传递(值/引用/移动)
std::mutex 基本用法
lock_guard / unique_lock / scoped_lock
shared_mutex 读写锁
condition_variable 条件变量
std::async / future / promise
std::atomic 原子操作
内存序模型(relaxed/acquire/release/seq_cst)
CAS 操作与自旋锁
C++20 协程(co_await/co_yield/co_return)
C++20 jthread 与协作取消
线程池实现
生产者—消费者模式
无锁数据结构
伪共享、ABA 问题等陷阱
死锁的四个条件与避免策略
能回答 8 道以上并发面试题

📚 推荐阅读


下一步:学完并发编程后,建议结合 15-CMake与工程实践.md 搭建多线程项目,并通过 ThreadSanitizer(-fsanitize=thread)检测数据竞争。