名称: rust-async-patterns 用户可调用: false 描述: 当使用Rust异步编程与tokio、async/await和futures时使用。当编写异步Rust代码时使用。 允许工具:
- Bash
- Read
Rust 异步模式
掌握Rust中的异步编程,使用async/await语法、tokio运行时和futures生态系统进行并发I/O操作。
Async/Await 基础
基本异步函数:
async fn fetch_data() -> String {
String::from("data")
}
#[tokio::main]
async fn main() {
let data = fetch_data().await;
println!("{}", data);
}
Cargo.toml 设置:
[dependencies]
tokio = { version = "1", features = ["full"] }
Tokio 运行时
不同运行时配置:
// 多线程运行时(默认)
#[tokio::main]
async fn main() {
// 代码在这里
}
// 单线程运行时
#[tokio::main(flavor = "current_thread")]
async fn main() {
// 代码在这里
}
// 手动创建运行时
use tokio::runtime::Runtime;
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("运行异步代码");
});
}
生成任务
创建并发任务:
use tokio::task;
#[tokio::main]
async fn main() {
let task1 = task::spawn(async {
println!("任务 1");
42
});
let task2 = task::spawn(async {
println!("任务 2");
100
});
let result1 = task1.await.unwrap();
let result2 = task2.await.unwrap();
println!("结果: {}, {}", result1, result2);
}
使用 move 生成:
#[tokio::main]
async fn main() {
let data = String::from("hello");
let handle = task::spawn(async move {
println!("{}", data);
});
handle.await.unwrap();
}
使用 reqwest 进行异步 HTTP
安装 reqwest:
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
发起 HTTP 请求:
use reqwest;
#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
let response = reqwest::get("https://api.github.com/users/rust-lang")
.await?
.text()
.await?;
println!("{}", response);
Ok(())
}
并发请求:
use reqwest;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let urls = vec![
"https://api.github.com/users/rust-lang",
"https://api.github.com/users/tokio-rs",
];
let mut handles = vec![];
for url in urls {
let handle = tokio::spawn(async move {
reqwest::get(url).await?.text().await
});
handles.push(handle);
}
for handle in handles {
let response = handle.await??;
println!("{}", response);
}
Ok(())
}
Select 和 Join
使用 tokio::select! 竞争 futures:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
println!("计时器先完成");
}
_ = async_operation() => {
println!("操作先完成");
}
}
}
async fn async_operation() {
sleep(Duration::from_secs(2)).await;
}
使用 tokio::join! 进行并发执行:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (r1, r2, r3) = tokio::join!(
async { sleep(Duration::from_secs(1)).await; 1 },
async { sleep(Duration::from_secs(1)).await; 2 },
async { sleep(Duration::from_secs(1)).await; 3 },
);
println!("结果: {}, {}, {}", r1, r2, r3);
}
通道
mpsc 通道用于消息传递:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
for i in 0..10 {
tx.send(i).await.unwrap();
}
});
while let Some(value) = rx.recv().await {
println!("收到: {}", value);
}
}
多个生产者:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(format!("来自 {} 的消息", i)).await.unwrap();
});
}
drop(tx); // 当所有发送者被删除时关闭通道
while let Some(msg) = rx.recv().await {
println!("{}", msg);
}
}
oneshot 通道:
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send("结果").unwrap();
});
let result = rx.await.unwrap();
println!("{}", result);
}
同步原语
Mutex 用于共享状态:
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = tokio::spawn(async move {
let mut num = counter.lock().await;
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("结果: {}", *counter.lock().await);
}
RwLock 用于读写访问:
use tokio::sync::RwLock;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
// 多个读者
let data1 = Arc::clone(&data);
let data2 = Arc::clone(&data);
let reader1 = tokio::spawn(async move {
let d = data1.read().await;
println!("读者 1: {:?}", *d);
});
let reader2 = tokio::spawn(async move {
let d = data2.read().await;
println!("读者 2: {:?}", *d);
});
// 一个写者
let data3 = Arc::clone(&data);
let writer = tokio::spawn(async move {
let mut d = data3.write().await;
d.push(4);
});
reader1.await.unwrap();
reader2.await.unwrap();
writer.await.unwrap();
}
Semaphore 用于限制并发:
use tokio::sync::Semaphore;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
let mut handles = vec![];
for i in 0..10 {
let permit = semaphore.clone();
let handle = tokio::spawn(async move {
let _permit = permit.acquire().await.unwrap();
println!("任务 {} 获得许可", i);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
println!("任务 {} 释放许可", i);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
流
使用异步流:
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);
while let Some(value) = stream.next().await {
println!("{}", value);
}
}
创建自定义流:
use tokio_stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
struct Counter {
count: usize,
max: usize,
}
impl Stream for Counter {
type Item = usize;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
if self.count < self.max {
let current = self.count;
self.count += 1;
Poll::Ready(Some(current))
} else {
Poll::Ready(None)
}
}
}
#[tokio::main]
async fn main() {
let mut counter = Counter { count: 0, max: 5 };
while let Some(value) = counter.next().await {
println!("{}", value);
}
}
超时和间隔
使用超时:
use tokio::time::{timeout, Duration};
async fn slow_operation() -> String {
tokio::time::sleep(Duration::from_secs(5)).await;
String::from("完成")
}
#[tokio::main]
async fn main() {
match timeout(Duration::from_secs(2), slow_operation()).await {
Ok(result) => println!("成功: {}", result),
Err(_) => println!("操作超时"),
}
}
使用间隔:
use tokio::time::{interval, Duration};
#[tokio::main]
async fn main() {
let mut interval = interval(Duration::from_secs(1));
for _ in 0..5 {
interval.tick().await;
println!("滴答");
}
}
错误处理
使用 ? 传播错误:
use reqwest;
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?;
let body = response.text().await?;
Ok(body)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let content = fetch_url("https://example.com").await?;
println!("{}", content);
Ok(())
}
阻塞操作
运行CPU密集型任务:
use tokio::task;
fn blocking_operation() -> u64 {
// CPU密集型工作
(0..1_000_000).sum()
}
#[tokio::main]
async fn main() {
let result = task::spawn_blocking(|| {
blocking_operation()
}).await.unwrap();
println!("结果: {}", result);
}
异步特质
使用 async-trait 箱:
[dependencies]
async-trait = "0.1"
use async_trait::async_trait;
#[async_trait]
trait Repository {
async fn find(&self, id: u64) -> Option<String>;
async fn save(&self, data: String) -> Result<(), String>;
}
struct UserRepository;
#[async_trait]
impl Repository for UserRepository {
async fn find(&self, id: u64) -> Option<String> {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Some(format!("用户 {}", id))
}
async fn save(&self, data: String) -> Result<(), String> {
println!("保存: {}", data);
Ok(())
}
}
何时使用此技能
使用 rust-async-patterns 当您需要:
- 构建异步Web服务器或客户端
- 高效处理并发I/O操作
- 并发发起多个HTTP请求
- 实现生产者-消费者模式
- 处理异步数据流
- 跨异步任务管理共享状态
- 控制并发限制
- 处理超时和取消
- 构建事件驱动系统
- 异步处理数据
最佳实践
- 使用 tokio::spawn 处理CPU无关任务
- 使用 spawn_blocking 处理CPU密集型工作
- 优先选择通道而非带锁的共享状态
- 使用 select! 竞争futures
- 使用 join! 进行并发独立操作
- 为网络操作设置适当超时
- 使用 Semaphore 限制并发操作
- 避免在await点持有锁
- 在异步上下文中使用 Arc 进行共享所有权
- 使用 Result 正确处理错误
常见陷阱
- 跨await持有 std::sync::Mutex(应使用 tokio::sync::Mutex)
- 在spawn中未使用move与闭包
- 忘记await futures
- 使用CPU密集型工作阻塞运行时
- 创建过多任务而无限制
- 未正确处理取消
- 使用错误的通道类型
- 锁顺序不当导致死锁
- 未适当配置运行时
- 忽略生成任务中的错误