Rust异步模式Skill rust-async-patterns

此技能专注于Rust语言的异步编程模式,涵盖Tokio运行时、并发任务执行、通道通信、错误处理、优雅关闭、异步特性、流处理等。适用于开发高性能异步应用,如网络服务、并发系统,并包含调试技巧和最佳实践。关键词:Rust异步编程、Tokio、并发模式、错误处理、网络服务、异步应用。

后端开发 0 次安装 0 次浏览 更新于 3/22/2026

名称: rust-async-patterns 描述: 通过Tokio、异步特性、错误处理和并发模式掌握Rust异步编程。在构建异步Rust应用、实现并发系统或调试异步代码时使用。

Rust 异步模式

使用Tokio运行时的异步Rust编程的生产模式,包括任务、通道、流和错误处理。

何时使用此技能

  • 构建异步Rust应用
  • 实现并发网络服务
  • 使用Tokio进行异步I/O
  • 正确处理异步错误
  • 调试异步代码问题
  • 优化异步性能

核心概念

1. 异步执行模型

Future (惰性) → poll() → Ready(值) | Pending
                ↑           ↓
              Waker ← 运行时调度

2. 关键抽象

概念 用途
Future 可能稍后完成的惰性计算
async fn 返回impl Future的函数
await 挂起直到future完成
Task 并发运行的spawned future
Runtime 轮询futures的执行器

快速开始

# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
use tokio::time::{sleep, Duration};
use anyhow::Result;

#[tokio::main]
async fn main() -> Result<()> {
    // 初始化追踪
    tracing_subscriber::fmt::init();

    // 异步操作
    let result = fetch_data("https://api.example.com").await?;
    println!("获得: {}", result);

    Ok(())
}

async fn fetch_data(url: &str) -> Result<String> {
    // 模拟异步操作
    sleep(Duration::from_millis(100)).await;
    Ok(format!("来自 {} 的数据", url))
}

模式

模式 1: 并发任务执行

use tokio::task::JoinSet;
use anyhow::Result;

// 生成多个并发任务
async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
    let mut set = JoinSet::new();

    for url in urls {
        set.spawn(async move {
            fetch_data(&url).await
        });
    }

    let mut results = Vec::new();
    while let Some(res) = set.join_next().await {
        match res {
            Ok(Ok(data)) => results.push(data),
            Ok(Err(e)) => tracing::error!("任务失败: {}", e),
            Err(e) => tracing::error!("加入错误: {}", e),
        }
    }

    Ok(results)
}

// 带并发限制
use futures::stream::{self, StreamExt};

async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
    stream::iter(urls)
        .map(|url| async move { fetch_data(&url).await })
        .buffer_unordered(limit) // 最大并发任务数
        .collect()
        .await
}

// 选择第一个完成的
use tokio::select;

async fn race_requests(url1: &str, url2: &str) -> Result<String> {
    select! {
        result = fetch_data(url1) => result,
        result = fetch_data(url2) => result,
    }
}

模式 2: 通道通信

use tokio::sync::{mpsc, broadcast, oneshot, watch};

// 多生产者,单消费者
async fn mpsc_example() {
    let (tx, mut rx) = mpsc::channel::<String>(100);

    // 生成生产者
    let tx2 = tx.clone();
    tokio::spawn(async move {
        tx2.send("Hello".to_string()).await.unwrap();
    });

    // 消费
    while let Some(msg) = rx.recv().await {
        println!("获得: {}", msg);
    }
}

// 广播:多生产者,多消费者
async fn broadcast_example() {
    let (tx, _) = broadcast::channel::<String>(100);

    let mut rx1 = tx.subscribe();
    let mut rx2 = tx.subscribe();

    tx.send("Event".to_string()).unwrap();

    // 两个接收器都收到消息
    let _ = rx1.recv().await;
    let _ = rx2.recv().await;
}

// 单次:单值,单次使用
async fn oneshot_example() -> String {
    let (tx, rx) = oneshot::channel::<String>();

    tokio::spawn(async move {
        tx.send("Result".to_string()).unwrap();
    });

    rx.await.unwrap()
}

// 监视:单生产者,多消费者,最新值
async fn watch_example() {
    let (tx, mut rx) = watch::channel("initial".to_string());

    tokio::spawn(async move {
        loop {
            // 等待变化
            rx.changed().await.unwrap();
            println!("新值: {}", *rx.borrow());
        }
    });

    tx.send("updated".to_string()).unwrap();
}

模式 3: 异步错误处理

use anyhow::{Context, Result, bail};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum ServiceError {
    #[error("网络错误: {0}")]
    Network(#[from] reqwest::Error),

    #[error("数据库错误: {0}")]
    Database(#[from] sqlx::Error),

    #[error("未找到: {0}")]
    NotFound(String),

    #[error("超时 {0:?} 后")]
    Timeout(std::time::Duration),
}

// 使用anyhow处理应用错误
async fn process_request(id: &str) -> Result<Response> {
    let data = fetch_data(id)
        .await
        .context("获取数据失败")?;

    let parsed = parse_response(&data)
        .context("解析响应失败")?;

    Ok(parsed)
}

// 使用自定义错误处理库代码
async fn get_user(id: &str) -> Result<User, ServiceError> {
    let result = db.query(id).await?;

    match result {
        Some(user) => Ok(user),
        None => Err(ServiceError::NotFound(id.to_string())),
    }
}

// 超时包装器
use tokio::time::timeout;

async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
where
    F: std::future::Future<Output = Result<T, ServiceError>>,
{
    timeout(duration, future)
        .await
        .map_err(|_| ServiceError::Timeout(duration))?
}

模式 4: 优雅关闭

use tokio::signal;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;

async fn run_server() -> Result<()> {
    // 方法 1: CancellationToken
    let token = CancellationToken::new();
    let token_clone = token.clone();

    // 生成尊重取消的任务
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token_clone.cancelled() => {
                    tracing::info!("任务正在关闭");
                    break;
                }
                _ = do_work() => {}
            }
        }
    });

    // 等待关闭信号
    signal::ctrl_c().await?;
    tracing::info!("收到关闭信号");

    // 取消所有任务
    token.cancel();

    // 给任务时间清理
    tokio::time::sleep(Duration::from_secs(5)).await;

    Ok(())
}

// 方法 2: 广播通道用于关闭
async fn run_with_broadcast() -> Result<()> {
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    let mut rx = shutdown_tx.subscribe();
    tokio::spawn(async move {
        tokio::select! {
            _ = rx.recv() => {
                tracing::info!("收到关闭");
            }
            _ = async { loop { do_work().await } } => {}
        }
    });

    signal::ctrl_c().await?;
    let _ = shutdown_tx.send(());

    Ok(())
}

模式 5: 异步特性

use async_trait::async_trait;

#[async_trait]
pub trait Repository {
    async fn get(&self, id: &str) -> Result<Entity>;
    async fn save(&self, entity: &Entity) -> Result<()>;
    async fn delete(&self, id: &str) -> Result<()>;
}

pub struct PostgresRepository {
    pool: sqlx::PgPool,
}

#[async_trait]
impl Repository for PostgresRepository {
    async fn get(&self, id: &str) -> Result<Entity> {
        sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
            .fetch_one(&self.pool)
            .await
            .map_err(Into::into)
    }

    async fn save(&self, entity: &Entity) -> Result<()> {
        sqlx::query!(
            "INSERT INTO entities (id, data) VALUES ($1, $2)
             ON CONFLICT (id) DO UPDATE SET data = $2",
            entity.id,
            entity.data
        )
        .execute(&self.pool)
        .await?;
        Ok(())
    }

    async fn delete(&self, id: &str) -> Result<()> {
        sqlx::query!("DELETE FROM entities WHERE id = $1", id)
            .execute(&self.pool)
            .await?;
        Ok(())
    }
}

// 特性对象用法
async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
    let entity = repo.get(id).await?;
    // 处理...
    repo.save(&entity).await
}

模式 6: 流和异步迭代

use futures::stream::{self, Stream, StreamExt};
use async_stream::stream;

// 从异步迭代器创建流
fn numbers_stream() -> impl Stream<Item = i32> {
    stream! {
        for i in 0..10 {
            tokio::time::sleep(Duration::from_millis(100)).await;
            yield i;
        }
    }
}

// 处理流
async fn process_stream() {
    let stream = numbers_stream();

    // 映射和过滤
    let processed: Vec<_> = stream
        .filter(|n| futures::future::ready(*n % 2 == 0))
        .map(|n| n * 2)
        .collect()
        .await;

    println!("{:?}", processed);
}

// 分块处理
async fn process_in_chunks() {
    let stream = numbers_stream();

    let mut chunks = stream.chunks(3);

    while let Some(chunk) = chunks.next().await {
        println!("处理块: {:?}", chunk);
    }
}

// 合并多个流
async fn merge_streams() {
    let stream1 = numbers_stream();
    let stream2 = numbers_stream();

    let merged = stream::select(stream1, stream2);

    merged
        .for_each(|n| async move {
            println!("获得: {}", n);
        })
        .await;
}

模式 7: 资源管理

use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, Semaphore};

// 共享状态使用RwLock(读多写少时优选)
struct Cache {
    data: RwLock<HashMap<String, String>>,
}

impl Cache {
    async fn get(&self, key: &str) -> Option<String> {
        self.data.read().await.get(key).cloned()
    }

    async fn set(&self, key: String, value: String) {
        self.data.write().await.insert(key, value);
    }
}

// 连接池使用信号量
struct Pool {
    semaphore: Semaphore,
    connections: Mutex<Vec<Connection>>,
}

impl Pool {
    fn new(size: usize) -> Self {
        Self {
            semaphore: Semaphore::new(size),
            connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
        }
    }

    async fn acquire(&self) -> PooledConnection<'_> {
        let permit = self.semaphore.acquire().await.unwrap();
        let conn = self.connections.lock().await.pop().unwrap();
        PooledConnection { pool: self, conn: Some(conn), _permit: permit }
    }
}

struct PooledConnection<'a> {
    pool: &'a Pool,
    conn: Option<Connection>,
    _permit: tokio::sync::SemaphorePermit<'a>,
}

impl Drop for PooledConnection<'_> {
    fn drop(&mut self) {
        if let Some(conn) = self.conn.take() {
            let pool = self.pool;
            tokio::spawn(async move {
                pool.connections.lock().await.push(conn);
            });
        }
    }
}

调试技巧

// 启用tokio-console进行运行时调试
// Cargo.toml: tokio = { features = ["tracing"] }
// 运行: RUSTFLAGS="--cfg tokio_unstable" cargo run
// 然后: tokio-console

// 工具化异步函数
use tracing::instrument;

#[instrument(skip(pool))]
async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
    tracing::debug!("获取用户");
    // ...
}

// 跟踪任务生成
let span = tracing::info_span!("worker", id = %worker_id);
tokio::spawn(async move {
    // 轮询时进入跨度
}.instrument(span));

最佳实践

该做的

  • 使用 tokio::select! - 用于竞速futures
  • 优先使用通道 - 尽可能避免共享状态
  • 使用 JoinSet - 用于管理多个任务
  • 使用追踪进行工具化 - 用于调试异步代码
  • 处理取消 - 检查 CancellationToken

不该做的

  • 不要阻塞 - 不要在异步中使用 std::thread::sleep
  • 不要在跨await时持有锁 - 会导致死锁
  • 不要无限制生成任务 - 使用信号量限制
  • 不要忽略错误 - 使用 ? 传播或记录
  • 不要忘记Send边界 - 对于生成的futures

资源