Rust异步编程模式Skill rust-async-patterns

这个技能是关于掌握Rust中的异步编程,使用async/await语法、tokio运行时和futures生态系统,用于高效的并发I/O操作,如构建异步Web服务器、客户端、处理并发请求等。关键词:Rust、异步编程、tokio、async/await、并发、I/O操作、Web服务器、异步模式、futures。

后端开发 0 次安装 0 次浏览 更新于 3/25/2026

名称: rust-async-patterns 用户可调用: false 描述: 当使用Rust异步编程与tokio、async/await和futures时使用。当编写异步Rust代码时使用。 允许工具:

  • Bash
  • Read

Rust 异步模式

掌握Rust中的异步编程,使用async/await语法、tokio运行时和futures生态系统进行并发I/O操作。

Async/Await 基础

基本异步函数:

async fn fetch_data() -> String {
    String::from("data")
}

#[tokio::main]
async fn main() {
    let data = fetch_data().await;
    println!("{}", data);
}

Cargo.toml 设置:

[dependencies]
tokio = { version = "1", features = ["full"] }

Tokio 运行时

不同运行时配置:

// 多线程运行时(默认)
#[tokio::main]
async fn main() {
    // 代码在这里
}

// 单线程运行时
#[tokio::main(flavor = "current_thread")]
async fn main() {
    // 代码在这里
}

// 手动创建运行时
use tokio::runtime::Runtime;

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("运行异步代码");
    });
}

生成任务

创建并发任务:

use tokio::task;

#[tokio::main]
async fn main() {
    let task1 = task::spawn(async {
        println!("任务 1");
        42
    });

    let task2 = task::spawn(async {
        println!("任务 2");
        100
    });

    let result1 = task1.await.unwrap();
    let result2 = task2.await.unwrap();

    println!("结果: {}, {}", result1, result2);
}

使用 move 生成:

#[tokio::main]
async fn main() {
    let data = String::from("hello");

    let handle = task::spawn(async move {
        println!("{}", data);
    });

    handle.await.unwrap();
}

使用 reqwest 进行异步 HTTP

安装 reqwest:

[dependencies]
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }

发起 HTTP 请求:

use reqwest;

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    let response = reqwest::get("https://api.github.com/users/rust-lang")
        .await?
        .text()
        .await?;

    println!("{}", response);
    Ok(())
}

并发请求:

use reqwest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://api.github.com/users/rust-lang",
        "https://api.github.com/users/tokio-rs",
    ];

    let mut handles = vec![];

    for url in urls {
        let handle = tokio::spawn(async move {
            reqwest::get(url).await?.text().await
        });
        handles.push(handle);
    }

    for handle in handles {
        let response = handle.await??;
        println!("{}", response);
    }

    Ok(())
}

Select 和 Join

使用 tokio::select! 竞争 futures:

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = sleep(Duration::from_secs(1)) => {
            println!("计时器先完成");
        }
        _ = async_operation() => {
            println!("操作先完成");
        }
    }
}

async fn async_operation() {
    sleep(Duration::from_secs(2)).await;
}

使用 tokio::join! 进行并发执行:

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (r1, r2, r3) = tokio::join!(
        async { sleep(Duration::from_secs(1)).await; 1 },
        async { sleep(Duration::from_secs(1)).await; 2 },
        async { sleep(Duration::from_secs(1)).await; 3 },
    );

    println!("结果: {}, {}, {}", r1, r2, r3);
}

通道

mpsc 通道用于消息传递:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.unwrap();
        }
    });

    while let Some(value) = rx.recv().await {
        println!("收到: {}", value);
    }
}

多个生产者:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            tx.send(format!("来自 {} 的消息", i)).await.unwrap();
        });
    }

    drop(tx); // 当所有发送者被删除时关闭通道

    while let Some(msg) = rx.recv().await {
        println!("{}", msg);
    }
}

oneshot 通道:

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send("结果").unwrap();
    });

    let result = rx.await.unwrap();
    println!("{}", result);
}

同步原语

Mutex 用于共享状态:

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = tokio::spawn(async move {
            let mut num = counter.lock().await;
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }

    println!("结果: {}", *counter.lock().await);
}

RwLock 用于读写访问:

use tokio::sync::RwLock;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(vec![1, 2, 3]));

    // 多个读者
    let data1 = Arc::clone(&data);
    let data2 = Arc::clone(&data);

    let reader1 = tokio::spawn(async move {
        let d = data1.read().await;
        println!("读者 1: {:?}", *d);
    });

    let reader2 = tokio::spawn(async move {
        let d = data2.read().await;
        println!("读者 2: {:?}", *d);
    });

    // 一个写者
    let data3 = Arc::clone(&data);
    let writer = tokio::spawn(async move {
        let mut d = data3.write().await;
        d.push(4);
    });

    reader1.await.unwrap();
    reader2.await.unwrap();
    writer.await.unwrap();
}

Semaphore 用于限制并发:

use tokio::sync::Semaphore;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));
    let mut handles = vec![];

    for i in 0..10 {
        let permit = semaphore.clone();
        let handle = tokio::spawn(async move {
            let _permit = permit.acquire().await.unwrap();
            println!("任务 {} 获得许可", i);
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            println!("任务 {} 释放许可", i);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

使用异步流:

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);

    while let Some(value) = stream.next().await {
        println!("{}", value);
    }
}

创建自定义流:

use tokio_stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};

struct Counter {
    count: usize,
    max: usize,
}

impl Stream for Counter {
    type Item = usize;

    fn poll_next(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        if self.count < self.max {
            let current = self.count;
            self.count += 1;
            Poll::Ready(Some(current))
        } else {
            Poll::Ready(None)
        }
    }
}

#[tokio::main]
async fn main() {
    let mut counter = Counter { count: 0, max: 5 };

    while let Some(value) = counter.next().await {
        println!("{}", value);
    }
}

超时和间隔

使用超时:

use tokio::time::{timeout, Duration};

async fn slow_operation() -> String {
    tokio::time::sleep(Duration::from_secs(5)).await;
    String::from("完成")
}

#[tokio::main]
async fn main() {
    match timeout(Duration::from_secs(2), slow_operation()).await {
        Ok(result) => println!("成功: {}", result),
        Err(_) => println!("操作超时"),
    }
}

使用间隔:

use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
    let mut interval = interval(Duration::from_secs(1));

    for _ in 0..5 {
        interval.tick().await;
        println!("滴答");
    }
}

错误处理

使用 ? 传播错误:

use reqwest;

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    let response = reqwest::get(url).await?;
    let body = response.text().await?;
    Ok(body)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let content = fetch_url("https://example.com").await?;
    println!("{}", content);
    Ok(())
}

阻塞操作

运行CPU密集型任务:

use tokio::task;

fn blocking_operation() -> u64 {
    // CPU密集型工作
    (0..1_000_000).sum()
}

#[tokio::main]
async fn main() {
    let result = task::spawn_blocking(|| {
        blocking_operation()
    }).await.unwrap();

    println!("结果: {}", result);
}

异步特质

使用 async-trait 箱:

[dependencies]
async-trait = "0.1"
use async_trait::async_trait;

#[async_trait]
trait Repository {
    async fn find(&self, id: u64) -> Option<String>;
    async fn save(&self, data: String) -> Result<(), String>;
}

struct UserRepository;

#[async_trait]
impl Repository for UserRepository {
    async fn find(&self, id: u64) -> Option<String> {
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        Some(format!("用户 {}", id))
    }

    async fn save(&self, data: String) -> Result<(), String> {
        println!("保存: {}", data);
        Ok(())
    }
}

何时使用此技能

使用 rust-async-patterns 当您需要:

  • 构建异步Web服务器或客户端
  • 高效处理并发I/O操作
  • 并发发起多个HTTP请求
  • 实现生产者-消费者模式
  • 处理异步数据流
  • 跨异步任务管理共享状态
  • 控制并发限制
  • 处理超时和取消
  • 构建事件驱动系统
  • 异步处理数据

最佳实践

  • 使用 tokio::spawn 处理CPU无关任务
  • 使用 spawn_blocking 处理CPU密集型工作
  • 优先选择通道而非带锁的共享状态
  • 使用 select! 竞争futures
  • 使用 join! 进行并发独立操作
  • 为网络操作设置适当超时
  • 使用 Semaphore 限制并发操作
  • 避免在await点持有锁
  • 在异步上下文中使用 Arc 进行共享所有权
  • 使用 Result 正确处理错误

常见陷阱

  • 跨await持有 std::sync::Mutex(应使用 tokio::sync::Mutex)
  • 在spawn中未使用move与闭包
  • 忘记await futures
  • 使用CPU密集型工作阻塞运行时
  • 创建过多任务而无限制
  • 未正确处理取消
  • 使用错误的通道类型
  • 锁顺序不当导致死锁
  • 未适当配置运行时
  • 忽略生成任务中的错误

资源