跳转至

异步编程

异步编程

📚 章节概述

Rust 的异步编程模型基于 Future 和 async/await 语法,提供了高效的并发执行能力。本章将介绍异步编程的基础概念、运行时和最佳实践。

🎯 学习目标

  • 理解异步编程的概念
  • 掌握 async/await 语法
  • 学会使用 Future trait
  • 了解异步运行时
  • 掌握异步 I/O 和网络编程
  • 理解异步错误处理

📖 异步编程基础

1.1 什么是异步编程

异步编程允许程序在等待 I/O 操作时执行其他任务,提高资源利用率:

Rust
// 同步代码
fn sync_read_file() -> String {
    // 阻塞等待文件读取完成
    std::fs::read_to_string("file.txt").unwrap()
}

// 异步代码
async fn async_read_file() -> String {  // async fn 声明异步函数,返回Future
    // 非阻塞读取文件
    tokio::fs::read_to_string("file.txt").await.unwrap()
}

1.2 async/await 语法

Rust
async fn hello_world() {
    println!("Hello, world!");
}

#[tokio::main]
async fn main() {
    hello_world().await;  // .await 等待异步操作完成
}

🔮 Future Trait

2.1 Future 基础

Rust
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct SimpleFuture;

impl Future for SimpleFuture {
    type Output = i32;

    // self: Pin<&mut Self> 固定内存位置(防止自引用失效); cx: 携带Waker的上下文; 返回Pending(未就绪)或Ready(完成)
    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        Poll::Ready(42)
    }
}

2.2 使用 Future

Rust
use std::future::Future;
use std::time::Duration;

async fn delay(duration: Duration) {
    tokio::time::sleep(duration).await;
}

#[tokio::main]
async fn main() {
    delay(Duration::from_secs(1)).await;
    println!("Done!");
}

⚡ 异步运行时

3.1 Tokio 运行时

TOML
# Cargo.toml
[dependencies]
tokio = { version = "1.0", features = ["full"] }
Rust
use tokio::time::{sleep, Duration};

async fn task1() {
    sleep(Duration::from_secs(1)).await;
    println!("Task 1 completed");
}

async fn task2() {
    sleep(Duration::from_secs(2)).await;
    println!("Task 2 completed");
}

#[tokio::main]
async fn main() {
    tokio::join!(task1(), task2());
}

3.2 异步任务

Rust
use tokio::task;
use std::time::Duration;

async fn background_task() {
    loop {
        println!("Background task running");
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

#[tokio::main]
async fn main() {
    task::spawn(background_task());  // tokio::spawn 在运行时上派生异步任务

    tokio::time::sleep(Duration::from_secs(5)).await;
    println!("Main task completed");
}

🌐 异步 I/O

4.1 异步文件操作

Rust
use tokio::fs;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

async fn read_file() -> io::Result<String> {
    let content = fs::read_to_string("file.txt").await?;
    Ok(content)
}

async fn write_file(content: &str) -> io::Result<()> {
    fs::write("file.txt", content).await?;
    Ok(())
}

#[tokio::main]
async fn main() -> io::Result<()> {
    write_file("Hello, async!").await?;
    let content = read_file().await?;
    println!("Content: {}", content);
    Ok(())
}

4.2 异步网络编程

⚠️ tokio::spawnSend + 'static 约束tokio::spawn 要求传入的 Future 满足 Send + 'static, 这意味着闭包捕获的所有变量必须是 Send(可跨线程传递)且不包含非 'static 引用。 如果遇到 "future cannot be sent between threads safely" 编译错误,通常需要: - 用 Arc 代替 Rc - 用 tokio::sync::Mutex 代替 std::sync::Mutex(在 .await 跨越点持有锁时) - 用 move 闭包转移所有权,避免引用捕获

Rust
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::io;

async fn handle_connection(mut socket: TcpStream) -> io::Result<()> {
    let mut buf = [0; 1024];
    loop {
        let n = socket.read(&mut buf).await?;
        if n == 0 {
            return Ok(());
        }
        socket.write_all(&buf[..n]).await?;
    }
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            handle_connection(socket).await.unwrap();
        });
    }
}

🔗 异步并发

5.1 join!

Rust
use tokio::time::{sleep, Duration};

async fn task1() -> &'static str {
    sleep(Duration::from_secs(1)).await;
    "Task 1"
}

async fn task2() -> &'static str {
    sleep(Duration::from_secs(2)).await;
    "Task 2"
}

#[tokio::main]
async fn main() {
    let (result1, result2) = tokio::join!(task1(), task2());
    println!("{}: {}", result1, result2);
}

5.2 select!

Rust
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.unwrap();
            sleep(Duration::from_millis(100)).await;
        }
    });

    loop {
        tokio::select! {
            Some(value) = rx.recv() => {
                println!("Received: {}", value);
            }
            _ = sleep(Duration::from_secs(1)) => {
                println!("Timeout");
                break;
            }
        }
    }
}

🔄 异步迭代器

6.1 Stream 基础

Rust
use futures::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);

    while let Some(value) = stream.next().await {
        println!("Received: {}", value);
    }
}

6.2 Stream 适配器

Rust
use futures::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    let stream = stream::iter(0..10)
        .filter(|x| async move { x % 2 == 0 })
        .map(|x| x * 2);

    futures::pin_mut!(stream); // 将stream固定在栈上; filter/map适配器产生的Stream是!Unpin的,必须固定才能调用.next()

    while let Some(value) = stream.next().await {
        println!("Received: {}", value);
    }
}

📝 练习题

练习 1:基础异步函数

Rust
// TODO: 实现一个异步函数,模拟耗时操作
async fn expensive_operation() -> u32 {
    // TODO: 使用 tokio::time::sleep 模拟耗时操作
    // TODO: 返回一个计算结果
    todo!()
}

#[tokio::main]
async fn main() {
    // TODO: 调用异步函数并打印结果
}

练习 2:并发任务

Rust
// TODO: 实现多个异步任务并发执行
async fn task1() -> String {
    // TODO: 返回 "Task 1"
    todo!()
}

async fn task2() -> String {
    // TODO: 返回 "Task 2"
    todo!()
}

async fn task3() -> String {
    // TODO: 返回 "Task 3"
    todo!()
}

#[tokio::main]
async fn main() {
    // TODO: 使用 tokio::join! 并发执行所有任务
}

练习 3:异步文件操作

Rust
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

// TODO: 实现异步文件复制
async fn copy_file(src: &str, dst: &str) -> tokio::io::Result<()> {
    // TODO: 读取源文件
    // TODO: 写入目标文件
    todo!()
}

#[tokio::main]
async fn main() -> tokio::io::Result<()> {
    // TODO: 测试文件复制
}

练习 4:异步 HTTP 客户端

Rust
use reqwest;

// TODO: 实现一个异步 HTTP GET 请求
async fn fetch_url(url: &str) -> reqwest::Result<String> {
    // TODO: 发送 GET 请求
    // TODO: 返回响应内容
    todo!()
}

#[tokio::main]
async fn main() -> reqwest::Result<()> {
    // TODO: 并发获取多个 URL 的内容
}

💡 最佳实践

1. 使用异步 I/O

Rust
// 好:使用异步 I/O
use tokio::fs;
let content = fs::read_to_string("file.txt").await?;

// 避免:使用同步 I/O
let content = std::fs::read_to_string("file.txt")?;

2. 正确处理错误

Rust
// 好:使用 ? 传播错误
async fn read_file() -> io::Result<String> {
    let content = fs::read_to_string("file.txt").await?;
    Ok(content)
}

// 避免:忽略错误
async fn read_file() -> String {
    fs::read_to_string("file.txt").await.unwrap()
}

3. 使用合理的超时

Rust
// 好:设置超时
use tokio::time::{timeout, Duration};

let result = timeout(Duration::from_secs(5), async_operation()).await;

// 避免:无限等待
let result = async_operation().await;

4. 避免阻塞异步代码

Rust
// 好:使用异步版本
tokio::task::spawn_blocking(|| {
    // CPU 密集型任务
});

// 避免:在异步代码中阻塞
let result = expensive_computation(); // 阻塞整个运行时

⚠️ 常见错误

1. 忘记 await

Rust
// 错误:忘记 await
async fn do_something() {
    println!("Doing something");
}

#[tokio::main]
async fn main() {
    do_something(); // 不会执行
}

// 正确:使用 await
#[tokio::main]
async fn main() {
    do_something().await;
}

2. 在同步代码中使用异步

Rust
// 错误:在同步代码中调用异步
fn main() {
    do_something().await; // 编译错误
}

// 正确:使用异步运行时
#[tokio::main]
async fn main() {
    do_something().await;
}

3. 阻塞异步运行时

Rust
// 错误:阻塞异步运行时
#[tokio::main]
async fn main() {
    std::thread::sleep(Duration::from_secs(1)); // 阻塞整个运行时
}

// 正确:使用异步 sleep
#[tokio::main]
async fn main() {
    tokio::time::sleep(Duration::from_secs(1)).await;
}

📚 扩展阅读

🎯 本章小结

本章介绍了 Rust 的异步编程:

  • ✅ 理解异步编程的概念
  • ✅ 掌握 async/await 语法
  • ✅ 学会使用 Future trait
  • ✅ 了解异步运行时
  • ✅ 掌握异步 I/O 和网络编程
  • ✅ 理解异步错误处理

下一章: 我们将通过一个完整的实战项目,综合运用前面所学的 Rust 知识。