1. Streams:异步数据流
1.1 Streams 与 Iterator 的异同
Rust 的 Iterator 是同步的,通过 next() 方法逐个获取数据。而 Stream 是 async 版本的 Iterator,它使用 next().await 来获取数据项。
示例:将 Iterator 转换为 Stream
use trpl::{stream_from_iter, StreamExt};let numbers = vec![1, 2, 3, 4, 5];
let stream = stream_from_iter(numbers.into_iter());while let Some(value) = stream.next().await {println!("Received: {}", value);
}
此示例中:
stream_from_iter()将Iterator转换为Stream。- 通过
stream.next().await按顺序异步获取数据项。
2. 组合 Streams
2.1 构建 Stream 处理异步消息
在实际应用中,我们经常需要从网络、数据库或消息队列中接收数据。这时,可以用 trpl::channel 创建 Stream 来异步处理数据。
use trpl::{channel, ReceiverStream};fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = channel();spawn_task(async move {for letter in "abcdefghij".chars() {tx.send(letter.to_string()).await.unwrap();}});ReceiverStream::new(rx)
}while let Some(msg) = get_messages().next().await {println!("Message: {}", msg);
}
get_messages返回一个Stream,每次next().await便能获取新的数据项。- 通过
spawn_task启动异步任务,定期向Stream发送数据。
3. 控制 Stream 速率与超时
3.1 timeout:为 Stream 设置超时
当处理外部数据时,我们可能希望对每个 Stream 数据项设定超时时间,以避免某个数据源长时间无响应。
use trpl::{StreamExt, sleep, Duration};let messages = get_messages().timeout(Duration::from_millis(200));while let Some(result) = messages.next().await {match result {Ok(msg) => println!("Message: {}", msg),Err(_) => println!("Timeout occurred!"),}
}
timeout()方法为Stream每个数据项设置超时时间。- 当数据在 200ms 内到达时,正常输出,否则触发超时逻辑。
3.2 throttle:限制 Stream 处理速率
有时,我们希望 Stream 以固定的速率生成数据,而不是尽可能快地处理。
use trpl::StreamExt;let throttled_messages = get_messages().throttle(Duration::from_millis(100));
throttle()方法限制Stream处理频率,每 100ms 处理一个数据项。- 避免
Stream过快地填充下游处理逻辑。
4. 合并多个 Streams
4.1 merge:合并多个 Stream
在某些情况下,我们可能有多个 Stream 数据源,例如:
- 一个 Stream 处理用户输入
- 一个 Stream 处理传感器数据
可以使用 merge() 将它们合并到一个 Stream,以便统一处理:
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals().map(|i| format!("Interval: {}", i));let merged = messages.merge(intervals);while let Some(event) = merged.next().await {println!("Received: {}", event);
}
messages处理异步消息,带 200ms 超时。intervals生成时间间隔数据(Interval: 1,Interval: 2, …)。merge()方法合并两个Stream,同时接收消息和时间间隔。
4.2 take:限制 Stream 处理的项数
有时,我们希望 Stream 只处理有限数量的数据项。例如,限制为 10 条:
let limited_stream = merged.take(10);
这样,merged 只会输出 10 条数据,然后 Stream 自动结束。
5. 处理 Stream 可能的错误
在异步系统中,消息通道的 send 操作可能会失败,例如 tx.send(msg).await.unwrap();。
如果通道关闭,send 会返回 Err。因此,我们应当合理地处理这些错误,而不是 unwrap()。
if let Err(e) = tx.send(msg).await {println!("Error sending message: {:?}", e);break;
}
在真实应用中,应当根据错误类型采取适当的恢复策略,而不是直接 break 退出。
6. 总结
Stream适用于异步数据流,类似Iterator,但支持await。timeout可为Stream每个数据项设置超时时间。throttle限制Stream生成数据的速率。merge将多个Stream合并,便于处理多个数据源。take限制Stream处理的最大数据项数。- 合理处理
send失败,避免异步任务意外崩溃。
🚀 适用场景:
- 处理 WebSocket、Kafka、数据库监听 等 流式数据。
- 限流
API调用,避免发送太多请求。 - 处理用户事件流,如 键盘输入、鼠标点击。
通过 Stream 及其扩展方法,我们可以轻松构建高效的异步数据处理系统。Rust 提供了强大的 async 生态,让我们能更轻松地编写安全、高性能的并发代码!