跳转至

并发编程

并发编程

📚 章节概述

Rust 的并发编程模型基于消息传递和共享状态,编译器会帮助你在编译时发现数据竞争。本章将介绍线程、通道、消息传递和共享状态的并发编程技术。

🎯 学习目标

  • 理解 Rust 的并发模型
  • 学会创建和管理线程
  • 掌握通道(channel)的使用
  • 理解共享状态并发
  • 学会使用 Mutex 和 Arc
  • 掌握 Send 和 Sync trait

📖 线程

1.1 创建线程

Rust
use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

1.2 等待线程完成

Rust
use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

1.3 使用 move 闭包

Rust
use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

📨 消息传递

2.1 创建通道

Rust
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

2.2 发送多个值

Rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

2.3 多个生产者

Rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

🔒 共享状态并发

3.1 使用 Mutex

Rust
use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

3.2 多线程共享 Mutex

Rust
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));  // Arc 线程安全引用计数 + Mutex 互斥锁
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

3.3 为什么需要 Arc 而不是 Rc

Rc<T> 不是线程安全的,不能在多线程中使用;Arc<T> 使用原子操作维护引用计数,适用于多线程场景:

Rust
use std::sync::Mutex;
use std::thread;

fn main() {
    // ❌ 编译错误!Rc 没有实现 Send,不能跨线程传递
    // use std::rc::Rc;
    // let counter = Rc::new(Mutex::new(0));
    // thread::spawn(move || { ... }); // 编译错误

    // ✅ 正确:使用 Arc(原子引用计数)
    use std::sync::Arc;
    let counter = Arc::new(Mutex::new(0));
    let counter_clone = Arc::clone(&counter);

    let handle = thread::spawn(move || {
        let mut num = counter_clone.lock().unwrap();
        *num += 1;
    });

    handle.join().unwrap();
    println!("Result: {}", *counter.lock().unwrap());
}

Rc vs Arc:

特性 Rc Arc
线程安全
性能开销 稍高(原子操作)
使用场景 单线程 多线程

🔄 Send 和 Sync trait

4.1 Send trait

Send trait 表示类型的所有权可以在线程间传递:

Rust
use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

4.2 Sync trait

Sync trait 表示类型的引用可以在线程间安全共享:

Rust
use std::sync::Arc;
use std::thread;

fn main() {
    // Arc<T> 要求 T: Sync,因为多个线程可能同时通过 Arc 引用 T
    let data = Arc::new(vec![1, 2, 3]);

    let data_clone = Arc::clone(&data);
    let handle = thread::spawn(move || {
        // 这里通过 Arc 的共享引用访问 data,要求 Vec<i32>: Sync
        println!("Here's a vector: {:?}", data_clone);
    });

    // 主线程仍可访问 data
    println!("Main thread: {:?}", data);
    handle.join().unwrap();
}

📝 练习题

练习 1:线程创建

Rust
use std::thread;
use std::time::Duration;

// TODO: 创建 5 个线程,每个线程打印自己的 ID
// 等待所有线程完成

fn main() {
    // TODO: 实现
}

练习 2:消息传递

Rust
use std::sync::mpsc;
use std::thread;

// TODO: 创建一个生产者-消费者模型
// 生产者发送 1-10 的数字
// 消费者接收并打印这些数字

fn main() {
    // TODO: 实现
}

练习 3:共享状态

Rust
use std::sync::{Arc, Mutex};
use std::thread;

// TODO: 使用 Mutex 和 Arc 实现一个线程安全的计数器
// 10 个线程各增加计数器 1000 次
// 最终结果应该是 10000

fn main() {
    // TODO: 实现
}

练习 4:任务队列

Rust
use std::sync::{Arc, Mutex};
use std::thread;

// TODO: 实现一个简单的任务队列
// - 创建一个包含多个任务的队列
// - 多个工作线程从队列中获取任务并执行
// - 使用 Mutex 和通道实现

fn main() {
    // TODO: 实现
}

💡 最佳实践

1. 优先使用消息传递

Rust
// 好:使用通道
use std::sync::mpsc;
use std::thread;

let (tx, rx) = mpsc::channel();
thread::spawn(move || {
    tx.send(42).unwrap();
});
let received = rx.recv().unwrap();

// 避免:过度使用共享状态

2. 使用 Arc 而不是 Rc

Rust
// 好:多线程使用 Arc
use std::sync::Arc;
let data = Arc::new(vec![1, 2, 3]);

// 避免:多线程使用 Rc
use std::rc::Rc;
let data = Rc::new(vec![1, 2, 3]); // 编译错误

3. 避免死锁

Rust
// 好:按固定顺序获取锁
let mut a = mutex_a.lock().unwrap();
let mut b = mutex_b.lock().unwrap();

// 避免:不同线程以不同顺序获取锁,可能导致死锁
let mut b = mutex_b.lock().unwrap();
let mut a = mutex_a.lock().unwrap();

4. 使用线程池

Rust
// 好:使用线程池管理线程
use threadpool::ThreadPool;
let pool = ThreadPool::new(4);

for i in 0..10 {
    pool.execute(move || {
        println!("Task {}", i);
    });
}

pool.join();

⚠️ 常见错误

1. 忘记 join 线程

Rust
// 错误:主线程可能先结束
thread::spawn(|| {
    println!("Hello from thread");
});

// 正确:等待线程完成
let handle = thread::spawn(|| {
    println!("Hello from thread");
});
handle.join().unwrap();

2. 忘记使用 move

Rust
// 错误:编译错误
let v = vec![1, 2, 3];
thread::spawn(|| {
    println!("{:?}", v);
});

// 正确:使用 move
let v = vec![1, 2, 3];
thread::spawn(move || {
    println!("{:?}", v);
});

3. 死锁

Rust
// 错误:可能导致死锁
let mutex1 = Arc::new(Mutex::new(1));
let mutex2 = Arc::new(Mutex::new(2));

let mutex1_clone = Arc::clone(&mutex1);
let mutex2_clone = Arc::clone(&mutex2);

let handle1 = thread::spawn(move || {
    let _lock1 = mutex1_clone.lock().unwrap();
    thread::sleep(Duration::from_millis(100));
    let _lock2 = mutex2_clone.lock().unwrap();
});

let handle2 = thread::spawn(move || {
    let _lock2 = mutex2.lock().unwrap();
    thread::sleep(Duration::from_millis(100));
    let _lock1 = mutex1.lock().unwrap();
});

handle1.join().unwrap();
handle2.join().unwrap();

📚 扩展阅读

🎯 本章小结

本章介绍了 Rust 的并发编程:

  • ✅ 理解 Rust 的并发模型
  • ✅ 学会创建和管理线程
  • ✅ 掌握通道(channel)的使用
  • ✅ 理解共享状态并发
  • ✅ 学会使用 Mutex 和 Arc
  • ✅ 掌握 Send 和 Sync trait

下一章: 我们将学习 Rust 的智能指针,包括 Box、Rc、Arc 和循环引用。