🔄 C++并发编程¶
📋 学习目标¶
通过本章学习,你将能够:
- 掌握
std::thread的创建、管理和参数传递 - 理解互斥量(mutex)家族及 RAII 锁管理
- 使用条件变量实现线程间同步
- 运用
std::async/std::future/std::promise进行异步编程 - 掌握原子操作与内存序模型
- 了解 C++20 协程的基本用法
- 能独立实现线程池、生产者-消费者等经典并发模式
- 具备并发面试题的解答能力
上图将线程、同步、原子与协程能力放在同一视图,便于建立并发知识地图。
1️⃣ std::thread 线程基础¶
1.1 创建与管理线程¶
C++
#include <iostream>
#include <thread>
void hello() {
std::cout << "Hello from thread "
<< std::this_thread::get_id() << std::endl;
}
int main() {
std::thread t(hello); // 创建线程
std::cout << "Main thread: " << std::this_thread::get_id() << std::endl;
std::cout << "Hardware concurrency: "
<< std::thread::hardware_concurrency() << std::endl;
t.join(); // 等待线程完成
return 0;
}
1.2 join 与 detach¶
C++
#include <iostream>
#include <thread>
#include <chrono>
void worker(int id, int ms) {
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
std::cout << "Worker " << id << " done\n";
}
int main() {
// join: 主线程等待子线程完成
std::thread t1(worker, 1, 100);
t1.join(); // 阻塞直到 t1 完成
// detach: 分离线程,后台独立运行
std::thread t2(worker, 2, 50);
t2.detach(); // t2 在后台运行,生命周期与主线程无关
// ⚠️ 注意:线程对象销毁前必须 join 或 detach,否则 std::terminate
// std::thread t3(worker, 3, 10); // 未 join/detach → 程序终止
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return 0;
}
1.3 线程参数传递¶
C++
#include <iostream>
#include <thread>
#include <string>
// 值传递
void by_value(int x) {
x = 100;
}
// 引用传递 —— 必须使用 std::ref
void by_ref(int& x) {
x = 100;
}
// 移动语义
void by_move(std::string s) {
std::cout << "Moved string: " << s << std::endl;
}
// 成员函数
struct Task {
void run(int id) {
std::cout << "Task::run id=" << id << std::endl;
}
};
int main() {
// 值传递
int a = 0;
std::thread t1(by_value, a);
t1.join();
std::cout << "a after by_value: " << a << std::endl; // 0(未改变)
// 引用传递
int b = 0;
std::thread t2(by_ref, std::ref(b)); // ⚠️ 必须 std::ref
t2.join();
std::cout << "b after by_ref: " << b << std::endl; // 100
// 移动传递
std::string str = "Hello Concurrency";
std::thread t3(by_move, std::move(str)); // std::move 将左值转为右值引用,转移所有权
t3.join();
// str 已被移动,不可再用
// 成员函数
Task task;
std::thread t4(&Task::run, &task, 42);
t4.join();
// Lambda
int val = 10;
std::thread t5([&val]() { val += 5; });
t5.join();
std::cout << "val after lambda: " << val << std::endl; // 15
return 0;
}
1.4 RAII 线程管理(C++20 jthread)¶
C++
#include <iostream>
#include <thread>
#include <stop_token>
// C++20 jthread: 自动 join + 支持协作取消
void cancellable_work(std::stop_token stoken, int id) {
while (!stoken.stop_requested()) {
std::cout << "Worker " << id << " running...\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "Worker " << id << " stopped.\n";
}
int main() {
std::jthread jt(cancellable_work, 1);
std::this_thread::sleep_for(std::chrono::milliseconds(350));
jt.request_stop(); // 请求停止
// jt 析构时自动 join,无需手动调用
return 0;
}
2️⃣ 互斥量与锁¶
2.1 std::mutex 基础¶
C++
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
std::mutex mtx; // std::mutex 互斥量,保护共享数据
int counter = 0;
void increment(int n) {
for (int i = 0; i < n; ++i) {
mtx.lock();
++counter; // 临界区
mtx.unlock();
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i)
threads.emplace_back(increment, 100000);
for (auto& t : threads) t.join();
std::cout << "Counter: " << counter << std::endl; // 400000(正确)
return 0;
}
2.2 lock_guard(RAII 自动加锁/解锁)¶
C++
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
int shared_data = 0;
void safe_increment(int n) {
for (int i = 0; i < n; ++i) {
std::lock_guard<std::mutex> lock(mtx); // 构造时加锁
++shared_data;
// 离开作用域自动解锁,即使抛异常也安全
}
}
int main() {
std::thread t1(safe_increment, 100000);
std::thread t2(safe_increment, 100000);
t1.join(); t2.join();
std::cout << "Result: " << shared_data << std::endl; // 200000
return 0;
}
2.3 unique_lock(灵活锁管理)¶
C++
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void flexible_locking() {
// 延迟加锁
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
// ... 做一些不需要锁的工作 ...
lock.lock(); // 手动加锁
std::cout << "Locked section\n";
lock.unlock(); // 手动解锁
// 尝试加锁(非阻塞)
if (lock.try_lock()) {
std::cout << "Got the lock\n";
lock.unlock();
}
// 可以转移所有权
std::unique_lock<std::mutex> lock2(std::move(lock));
}
// 避免死锁:std::lock 同时锁定多个互斥量
std::mutex mtx_a, mtx_b;
void no_deadlock() {
std::unique_lock<std::mutex> la(mtx_a, std::defer_lock);
std::unique_lock<std::mutex> lb(mtx_b, std::defer_lock);
std::lock(la, lb); // 原子地锁定两个互斥量,避免死锁
std::cout << "Both locked safely\n";
}
// C++17: std::scoped_lock(更简洁的多锁方案)
void scoped_lock_demo() {
std::scoped_lock lock(mtx_a, mtx_b); // 同时锁定,自动解锁
std::cout << "scoped_lock: both locked\n";
}
2.4 shared_mutex(读写锁)¶
C++
#include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>
#include <map>
class ThreadSafeMap {
mutable std::shared_mutex rw_mutex_;
std::map<std::string, int> data_;
public:
// 读操作 —— 共享锁,多个读者可并发
int read(const std::string& key) const {
std::shared_lock<std::shared_mutex> lock(rw_mutex_);
auto it = data_.find(key);
return (it != data_.end()) ? it->second : -1;
}
// 写操作 —— 独占锁,互斥访问
void write(const std::string& key, int value) {
std::unique_lock<std::shared_mutex> lock(rw_mutex_);
data_[key] = value;
}
size_t size() const {
std::shared_lock<std::shared_mutex> lock(rw_mutex_);
return data_.size();
}
};
int main() {
ThreadSafeMap tsm;
// 多个写者
std::vector<std::thread> writers;
for (int i = 0; i < 5; ++i)
writers.emplace_back([&tsm, i]() {
tsm.write("key" + std::to_string(i), i * 10);
});
for (auto& t : writers) t.join();
// 多个读者并发
std::vector<std::thread> readers;
for (int i = 0; i < 5; ++i)
readers.emplace_back([&tsm, i]() {
std::cout << "key" << i << " = " << tsm.read("key" + std::to_string(i)) << "\n";
});
for (auto& t : readers) t.join();
return 0;
}
3️⃣ 条件变量 std::condition_variable¶
C++
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool finished = false;
void producer() {
for (int i = 0; i < 10; ++i) {
{
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(i);
std::cout << "Produced: " << i << "\n";
}
cv.notify_one(); // 通知一个等待的消费者
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
{
std::lock_guard<std::mutex> lock(mtx);
finished = true;
}
cv.notify_all(); // 通知所有消费者:生产结束
}
void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// wait 接受谓词,防止虚假唤醒(spurious wakeup)
cv.wait(lock, []{ return !data_queue.empty() || finished; });
while (!data_queue.empty()) {
int val = data_queue.front();
data_queue.pop();
std::cout << "Consumer " << id << " got: " << val << "\n";
}
if (finished && data_queue.empty()) break;
}
}
int main() {
std::thread prod(producer);
std::thread cons1(consumer, 1);
std::thread cons2(consumer, 2);
prod.join();
cons1.join();
cons2.join();
return 0;
}
要点:
cv.wait(lock, predicate)等价于while(!predicate()) cv.wait(lock);,能正确处理虚假唤醒。
4️⃣ std::async / std::future / std::promise¶
4.1 std::async 异步任务¶
C++
#include <iostream>
#include <future>
#include <cmath>
double heavy_computation(double x) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return std::sqrt(x * x + 1.0);
}
int main() {
// std::launch::async → 强制新线程执行
// std::launch::deferred → 延迟执行(调用 get() 时才执行)
// 默认 = async | deferred,由实现决定
auto f1 = std::async(std::launch::async, heavy_computation, 3.0); // std::async 异步执行任务,返回future
auto f2 = std::async(std::launch::async, heavy_computation, 4.0);
// 主线程可以做其他事...
std::cout << "Waiting for results...\n";
// get() 阻塞直到结果就绪
double r1 = f1.get();
double r2 = f2.get();
std::cout << "Results: " << r1 << ", " << r2 << std::endl;
return 0;
}
4.2 std::promise / std::future 手动传值¶
C++
#include <iostream>
#include <thread>
#include <future>
void compute(std::promise<int> prom, int x) {
try {
if (x < 0) throw std::runtime_error("Negative input!");
int result = x * x;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
prom.set_value(result); // 设置结果
} catch (...) {
prom.set_exception(std::current_exception()); // 传递异常
}
}
int main() {
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t(compute, std::move(prom), 7);
try {
int result = fut.get(); // 阻塞等待结果
std::cout << "Result: " << result << std::endl; // 49
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
t.join();
return 0;
}
4.3 std::shared_future 多线程共享¶
C++
#include <iostream>
#include <thread>
#include <future>
#include <vector>
int main() {
auto shared_fut = std::async(std::launch::async, []() {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return 42;
}).share(); // 转为 shared_future
// 多个线程可以同时 get()
std::vector<std::thread> threads;
for (int i = 0; i < 3; ++i) {
threads.emplace_back([shared_fut, i]() {
int val = shared_fut.get();
std::cout << "Thread " << i << " got: " << val << "\n";
});
}
for (auto& t : threads) t.join();
return 0;
}
5️⃣ std::atomic 原子操作与内存序¶
5.1 原子变量基础¶
C++
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
std::atomic<int> counter{0};
void atomic_increment(int n) {
for (int i = 0; i < n; ++i) {
counter.fetch_add(1, std::memory_order_relaxed);
// 等价于 counter++ (默认 memory_order_seq_cst)
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i)
threads.emplace_back(atomic_increment, 100000);
for (auto& t : threads) t.join();
std::cout << "Atomic counter: " << counter.load() << std::endl; // 400000
return 0;
}
5.2 CAS(Compare-And-Swap)操作¶
C++
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
std::atomic<int> value{0};
// 无锁递增——使用 CAS 循环
void cas_increment(int n) {
for (int i = 0; i < n; ++i) {
int expected = value.load(std::memory_order_relaxed);
while (!value.compare_exchange_weak(
expected, expected + 1,
std::memory_order_release,
std::memory_order_relaxed)) {
// expected 自动更新为当前值,循环重试
}
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i)
threads.emplace_back(cas_increment, 100000);
for (auto& t : threads) t.join();
std::cout << "CAS result: " << value.load() << std::endl; // 400000
return 0;
}
5.3 内存序详解¶
C++
#include <atomic>
#include <thread>
#include <iostream>
#include <cassert>
/*
* C++ 内存序(Memory Order)—— 从宽松到严格
*
* memory_order_relaxed - 仅保证原子性,不保证顺序
* memory_order_acquire - 本线程中,后续读写不会被重排到此操作之前
* memory_order_release - 本线程中,之前的读写不会被重排到此操作之后
* memory_order_acq_rel - acquire + release 的组合
* memory_order_seq_cst - 顺序一致性(默认,最严格,性能最低)
*
* 核心规则:
* - release 写 与 acquire 读 配对,构成 happens-before 关系
* - seq_cst 保证全局一致的操作顺序(所有线程看到相同的顺序)
*/
std::atomic<bool> ready{false};
std::atomic<int> data{0};
void writer() {
data.store(42, std::memory_order_relaxed); // ① 先写数据
ready.store(true, std::memory_order_release); // ② release 屏障
}
void reader() {
while (!ready.load(std::memory_order_acquire)) {} // ③ acquire 屏障
int value = data.load(std::memory_order_relaxed); // ④ 一定能看到 42
assert(value == 42); // 由 ②→③ 的 happens-before 保证
std::cout << "Read value: " << value << std::endl;
}
int main() {
std::thread t1(writer);
std::thread t2(reader);
t1.join(); t2.join();
return 0;
}
5.4 自旋锁实现¶
C++
#include <atomic>
#include <thread>
#include <iostream>
#ifdef _MSC_VER
#include <intrin.h> // MSVC: _mm_pause()
#endif
class SpinLock {
std::atomic_flag flag_{}; // C++20: 默认初始化为 clear(ATOMIC_FLAG_INIT 已弃用)
public:
void lock() {
while (flag_.test_and_set(std::memory_order_acquire)) {
// 自旋等待(pause 指令降低 CPU 功耗与总线争用)
#if defined(__x86_64__) || defined(__i386__)
__builtin_ia32_pause(); // GCC / Clang
#elif defined(_M_X64) || defined(_M_IX86)
_mm_pause(); // MSVC
#endif
}
}
void unlock() {
flag_.clear(std::memory_order_release);
}
};
SpinLock spin;
int shared_counter = 0;
void spin_increment(int n) {
for (int i = 0; i < n; ++i) {
spin.lock();
++shared_counter;
spin.unlock();
}
}
int main() {
std::thread t1(spin_increment, 100000);
std::thread t2(spin_increment, 100000);
t1.join(); t2.join();
std::cout << "Spinlock counter: " << shared_counter << std::endl;
return 0;
}
6️⃣ C++20 协程¶
6.1 协程基础概念¶
Text Only
传统函数: 调用 → 执行 → 返回 (一次性)
协程: 调用 → 执行 → 挂起 → 恢复 → 执行 → 挂起 → ... → 返回
关键字:
co_await — 挂起协程,等待异步操作完成
co_yield — 挂起并产出一个值(生成器模式)
co_return — 完成协程并返回最终值
6.2 Generator 示例(co_yield)¶
C++
#include <coroutine>
#include <iostream>
#include <optional>
template<typename T>
struct Generator {
struct promise_type {
T current_value;
Generator get_return_object() {
return Generator{
std::coroutine_handle<promise_type>::from_promise(*this)
};
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
std::suspend_always yield_value(T value) {
current_value = value;
return {};
}
void return_void() {}
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle_;
Generator(std::coroutine_handle<promise_type> h) : handle_(h) {}
~Generator() { if (handle_) handle_.destroy(); }
// 禁止拷贝
Generator(const Generator&) = delete;
Generator& operator=(const Generator&) = delete;
Generator(Generator&& other) : handle_(other.handle_) { other.handle_ = nullptr; }
std::optional<T> next() {
if (!handle_ || handle_.done()) return std::nullopt;
handle_.resume();
if (handle_.done()) return std::nullopt;
return handle_.promise().current_value;
}
};
// 使用 co_yield 生成斐波那契数列
Generator<uint64_t> fibonacci() {
uint64_t a = 0, b = 1;
while (true) {
co_yield a;
auto next = a + b;
a = b;
b = next;
}
}
int main() {
auto fib = fibonacci();
for (int i = 0; i < 15; ++i) {
auto val = fib.next();
if (val) std::cout << *val << " ";
}
std::cout << std::endl;
// 输出: 0 1 1 2 3 5 8 13 21 34 55 89 144 233 377
return 0;
}
6.3 简易 Task(co_await / co_return)¶
C++
#include <coroutine>
#include <iostream>
struct Task {
struct promise_type {
int result_value;
Task get_return_object() {
return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_value(int v) { result_value = v; }
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle_;
Task(std::coroutine_handle<promise_type> h) : handle_(h) {}
~Task() { if (handle_) handle_.destroy(); }
int get_result() { return handle_.promise().result_value; }
};
Task compute_async(int x) {
// 在实际应用中,co_await 可以等待 I/O 完成
std::cout << "Computing " << x << " * " << x << "...\n";
co_return x * x;
}
int main() {
Task t = compute_async(7);
std::cout << "Result: " << t.get_result() << std::endl; // 49
return 0;
}
7️⃣ 线程池实现¶
C++
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <vector>
#include <future>
class ThreadPool {
public:
explicit ThreadPool(size_t num_threads) : stop_(false) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this]{
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}
// 提交任务,返回 future
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> std::future<decltype(f(args...))>
{
using return_type = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> result = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
if (stop_) throw std::runtime_error("submit on stopped pool");
tasks_.emplace([task]() { (*task)(); });
}
condition_.notify_one();
return result;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (auto& worker : workers_)
worker.join();
}
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
};
int main() {
ThreadPool pool(4);
// 提交多个任务
std::vector<std::future<int>> results;
for (int i = 0; i < 10; ++i) {
results.push_back(pool.submit([i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return i * i;
}));
}
for (auto& fut : results)
std::cout << fut.get() << " ";
std::cout << std::endl;
// 输出: 0 1 4 9 16 25 36 49 64 81
return 0;
}
8️⃣ 生产者-消费者模式(有界缓冲区)¶
C++
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <optional>
template<typename T>
class BoundedQueue {
public:
explicit BoundedQueue(size_t capacity) : capacity_(capacity) {}
void push(T item) {
std::unique_lock<std::mutex> lock(mtx_);
not_full_.wait(lock, [this]{ return queue_.size() < capacity_; });
queue_.push(std::move(item));
not_empty_.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(mtx_);
not_empty_.wait(lock, [this]{ return !queue_.empty(); });
T item = std::move(queue_.front());
queue_.pop();
not_full_.notify_one();
return item;
}
// 带超时的出队
std::optional<T> try_pop(std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mtx_);
if (!not_empty_.wait_for(lock, timeout, [this]{ return !queue_.empty(); }))
return std::nullopt;
T item = std::move(queue_.front());
queue_.pop();
not_full_.notify_one();
return item;
}
private:
std::queue<T> queue_;
size_t capacity_;
std::mutex mtx_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
};
int main() {
BoundedQueue<int> bq(5); // 容量为 5
// 3 个生产者
std::vector<std::thread> producers;
for (int p = 0; p < 3; ++p) {
producers.emplace_back([&bq, p]() {
for (int i = 0; i < 5; ++i) {
int val = p * 100 + i;
bq.push(val);
std::cout << "P" << p << " -> " << val << "\n";
}
});
}
// 2 个消费者
std::vector<std::thread> consumers;
for (int c = 0; c < 2; ++c) {
consumers.emplace_back([&bq, c]() {
for (int i = 0; i < 7; ++i) { // 每个消费者取 7 个
int val = bq.pop();
std::cout << " C" << c << " <- " << val << "\n";
}
});
}
// 还有 1 个元素由主线程消费
for (auto& t : producers) t.join();
for (auto& t : consumers) t.join();
int last = bq.pop();
std::cout << "Main <- " << last << "\n";
return 0;
}
9️⃣ 并发数据结构设计技巧¶
9.1 无锁栈(Lock-Free Stack)¶
C++
#include <atomic>
#include <memory>
#include <iostream>
template<typename T>
class LockFreeStack {
struct Node {
T data;
Node* next;
Node(T val) : data(std::move(val)), next(nullptr) {}
};
std::atomic<Node*> head_{nullptr};
public:
void push(T val) {
Node* new_node = new Node(std::move(val));
new_node->next = head_.load(std::memory_order_relaxed);
// CAS 循环:如果 head 未变,则更新成功
while (!head_.compare_exchange_weak(
new_node->next, new_node,
std::memory_order_release,
std::memory_order_relaxed)) {}
}
// ⚠️ ABA 问题 & Use-After-Free 风险:
// 1. ABA:线程A读到 old_head=X,线程B pop X 再 push 新节点到同一地址X,
// 线程A的 CAS 看到 head 仍是 X 便成功,但 X->next 已失效 → 数据结构损坏。
// 2. Use-After-Free:线程A读到 old_head->next 之前,线程B 可能已 delete old_head。
// 生产环境解决方案:Hazard Pointers、Epoch-Based Reclamation 或
// std::atomic<std::shared_ptr<Node>>(C++20)。
bool pop(T& result) {
Node* old_head = head_.load(std::memory_order_acquire);
while (old_head && !head_.compare_exchange_weak(
old_head, old_head->next,
std::memory_order_acq_rel,
std::memory_order_acquire)) {}
if (!old_head) return false;
result = std::move(old_head->data);
delete old_head; // 简化处理,生产环境需用 hazard pointer 或 RCU
return true;
}
~LockFreeStack() {
T dummy;
while (pop(dummy)) {}
}
};
int main() {
LockFreeStack<int> stack;
stack.push(1);
stack.push(2);
stack.push(3);
int val;
while (stack.pop(val))
std::cout << val << " "; // 3 2 1
std::cout << std::endl;
return 0;
}
9.2 并发设计原则¶
Text Only
📌 并发数据结构设计要点:
1. 减小锁粒度
- 全局一把锁 → 分段锁(如 ConcurrentHashMap 思路)
- 读写分离 → shared_mutex
2. 无锁编程层次
- Wait-free > Lock-free > Obstruction-free > Lock-based
- 无锁并非总是更快,争用低时 mutex 可能更优
3. 避免伪共享(False Sharing)
- 不同线程频繁访问同一缓存行的不同变量
- 用 alignas(64) 或 std::hardware_destructive_interference_size 对齐
4. 注意 ABA 问题
- CAS 中值 A→B→A,CAS 会误判"未变"
- 解决:带版本号的指针(tagged pointer)
5. 内存回收
- 无锁结构中删除节点需要安全回收
- 方案:Hazard Pointer、Epoch-Based Reclamation、RCU
C++
// 伪共享示例与解决
#include <atomic>
#include <thread>
#include <iostream>
#include <chrono>
// ❌ 有伪共享:两个原子变量可能在同一缓存行
struct BadAlign {
std::atomic<int> a{0};
std::atomic<int> b{0};
};
// ✅ 避免伪共享:强制对齐到不同缓存行
struct GoodAlign {
alignas(64) std::atomic<int> a{0};
alignas(64) std::atomic<int> b{0};
};
template<typename T>
void benchmark(const char* name) {
T data;
auto start = std::chrono::high_resolution_clock::now();
std::thread t1([&]{ for(int i=0;i<10000000;++i) data.a.fetch_add(1, std::memory_order_relaxed); });
std::thread t2([&]{ for(int i=0;i<10000000;++i) data.b.fetch_add(1, std::memory_order_relaxed); });
t1.join(); t2.join();
auto end = std::chrono::high_resolution_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << name << ": " << ms << " ms\n";
}
int main() {
benchmark<BadAlign>("BadAlign (false sharing)");
benchmark<GoodAlign>("GoodAlign (no false sharing)");
return 0;
}
🔟 读写锁完整实现¶
C++
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <thread>
#include <vector>
// 手动实现读写锁(不依赖 shared_mutex)
class ReadWriteLock {
std::mutex mtx_;
std::condition_variable cv_;
int readers_ = 0;
bool writer_active_ = false;
int waiting_writers_ = 0; // 写者优先
public:
void lock_read() {
std::unique_lock<std::mutex> lock(mtx_);
// 写者优先:有等待的写者时,新读者需等待
cv_.wait(lock, [this]{ return !writer_active_ && waiting_writers_ == 0; });
++readers_;
}
void unlock_read() {
std::unique_lock<std::mutex> lock(mtx_);
--readers_;
if (readers_ == 0) cv_.notify_all();
}
void lock_write() {
std::unique_lock<std::mutex> lock(mtx_);
++waiting_writers_;
cv_.wait(lock, [this]{ return !writer_active_ && readers_ == 0; });
--waiting_writers_;
writer_active_ = true;
}
void unlock_write() {
std::unique_lock<std::mutex> lock(mtx_);
writer_active_ = false;
cv_.notify_all();
}
};
int main() {
ReadWriteLock rwlock;
int shared_data = 0;
std::vector<std::thread> threads;
// 5 个读者
for (int i = 0; i < 5; ++i) {
threads.emplace_back([&rwlock, &shared_data, i]() {
for (int j = 0; j < 3; ++j) {
rwlock.lock_read();
std::cout << "Reader " << i << " reads: " << shared_data << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10));
rwlock.unlock_read();
}
});
}
// 2 个写者
for (int i = 0; i < 2; ++i) {
threads.emplace_back([&rwlock, &shared_data, i]() {
for (int j = 0; j < 3; ++j) {
rwlock.lock_write();
++shared_data;
std::cout << " Writer " << i << " wrote: " << shared_data << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(20));
rwlock.unlock_write();
}
});
}
for (auto& t : threads) t.join();
return 0;
}
🎯 面试题精选¶
Q1: 死锁如何避免?¶
Text Only
死锁四个必要条件(Coffman 条件):
1. 互斥 —— 资源被独占
2. 持有并等待 —— 持有资源的同时等待其他资源
3. 不可抢占 —— 资源不可被强制释放
4. 循环等待 —— 存在资源的循环等待链
避免策略:
✅ 固定加锁顺序 —— 所有线程以相同顺序获取锁
✅ std::lock() / std::scoped_lock —— 原子地锁定多个互斥量
✅ try_lock 超时机制 —— 获取失败时释放已有锁并重试
✅ 减少锁粒度 —— 缩小临界区,减少冲突概率
✅ 无锁算法 —— 使用 atomic 操作替代 mutex
Q2: atomic 和 mutex 的区别?¶
Text Only
atomic mutex
──────────────────────────────────────────────────────────
实现层级 硬件指令(CAS/LL-SC) OS 内核(futex/临界区)
适用场景 简单变量的原子读/写/RMW 保护复杂的临界区代码段
开销 极低(纳秒级,无上下文切换) 较高(可能导致线程阻塞/唤醒)
阻塞方式 非阻塞(自旋) 阻塞(线程挂起)
组合操作 不支持(仅单个变量原子) 支持(锁内可操作多个变量)
选择原则:
- 单个变量的简单操作 → atomic
- 多个变量的复合操作 → mutex
- 高争用场景 → mutex(自旋浪费 CPU)
- 低争用、性能敏感 → atomic
Q3: 什么是内存序?为什么需要它?¶
Text Only
CPU 和编译器为了优化性能,可能会重排指令执行顺序。
在单线程中这不会有问题,但在多线程中,重排可能导致一个线程
看到另一个线程的操作顺序与代码编写顺序不同。
内存序(Memory Order)就是告诉编译器和 CPU:
"这个原子操作和周围的操作之间的顺序约束是什么"
从宽松到严格:
relaxed → 仅原子性,无顺序保证(性能最好)
acquire → 此操作之后的读写不会被重排到之前
release → 此操作之前的读写不会被重排到之后
seq_cst → 全局顺序一致(默认,最安全,性能最差)
实践建议:没有充分理由不要使用非 seq_cst 内存序
Q4: std::condition_variable 为什么需要配合 unique_lock?¶
Text Only
1. wait() 内部需要原子地释放锁并挂起线程:
- 释放锁 → 让其他线程能修改条件
- 挂起线程 → 避免忙等
这两步必须是原子的,否则存在竞态条件
2. unique_lock 支持 lock()/unlock() 动态操作
lock_guard 不支持,所以不能用
3. 被唤醒后,wait() 会自动重新获取锁
然后检查谓词(防虚假唤醒),这也需要持有锁
Q5: 什么是虚假唤醒(Spurious Wakeup)?¶
Text Only
操作系统实现中,condition_variable::wait() 可能在没有
notify 的情况下返回。这不是 bug,而是底层实现的特性。
解决方案:始终使用带谓词的 wait:
cv.wait(lock, []{ return condition_met; });
等价于:
while (!condition_met) cv.wait(lock);
这样即使虚假唤醒,也会重新检查条件。
Q6: std::async 的 launch 策略有什么区别?¶
Text Only
std::launch::async → 保证在新线程上执行
std::launch::deferred → 延迟执行,在 get()/wait() 时在调用线程执行
默认(async | deferred)→ 由实现决定
⚠️ 陷阱:
- 默认策略下 future 析构会阻塞(等待任务完成)
- deferred 会在 get() 的线程执行,可能不是你想要的
- 建议:明确指定 std::launch::async,避免意外
Q7: 线程池相比直接创建线程有什么优势?¶
Text Only
1. 避免频繁创建/销毁线程的开销(线程创建涉及 OS 资源分配)
2. 控制并发度,防止线程过多耗尽系统资源
3. 任务排队机制,支持背压策略
4. 线程复用,减少上下文切换
5. 统一的错误处理和生命周期管理
适用场景:
- Web 服务器处理请求
- 批量数据处理
- 任何需要频繁执行短时任务的场景
Q8: compare_exchange_weak 和 compare_exchange_strong 的区别?¶
Text Only
weak: 可能虚假失败(即使预期值匹配也可能失败)
→ 用在循环中,性能更好(某些架构上避免内存屏障)
strong: 保证只在预期值不匹配时才失败
→ 用在非循环场景,或者失败重试代价很高时
一般实践:
while(!x.compare_exchange_weak(exp, desired)) {} // ← 适合 weak
if (x.compare_exchange_strong(exp, desired)) {} // ← 单次尝试用 strong
Q9: C++20 的 jthread 相比 thread 有何改进?¶
Text Only
1. 自动 join:析构时自动调用 join(),不会忘记 join 导致 terminate
2. 协作取消:内置 stop_token/stop_source 机制
- request_stop() —— 请求取消
- stop_token.stop_requested() —— 检查是否收到取消请求
3. 可中断的条件变量等待(condition_variable_any + stop_token)
本质上是 thread + RAII + 协作取消 的组合
✅ 学习检查清单¶
| 主题 | 掌握情况 |
|---|---|
std::thread 创建/join/detach | ⬜ |
| 线程参数传递(值/引用/移动) | ⬜ |
std::mutex 基本用法 | ⬜ |
lock_guard / unique_lock / scoped_lock | ⬜ |
shared_mutex 读写锁 | ⬜ |
condition_variable 条件变量 | ⬜ |
std::async / future / promise | ⬜ |
std::atomic 原子操作 | ⬜ |
| 内存序模型(relaxed/acquire/release/seq_cst) | ⬜ |
| CAS 操作与自旋锁 | ⬜ |
| C++20 协程(co_await/co_yield/co_return) | ⬜ |
| C++20 jthread 与协作取消 | ⬜ |
| 线程池实现 | ⬜ |
| 生产者—消费者模式 | ⬜ |
| 无锁数据结构 | ⬜ |
| 伪共享、ABA 问题等陷阱 | ⬜ |
| 死锁的四个条件与避免策略 | ⬜ |
| 能回答 8 道以上并发面试题 | ⬜ |
📚 推荐阅读¶
- 《C++ Concurrency in Action》(Anthony Williams) — C++并发编程实战
- 《The Art of Multiprocessor Programming》 — 多处理器编程的艺术
- cppreference: Thread support
- cppreference: Atomic operations
- cppreference: Coroutines
下一步:学完并发编程后,建议结合 15-CMake与工程实践.md 搭建多线程项目,并通过 ThreadSanitizer(
-fsanitize=thread)检测数据竞争。