Rust Async 并发编程:处理任意数量的 Future 与 Stream

1. Streams:异步数据流

1.1 Streams 与 Iterator 的异同

Rust 的 Iterator 是同步的,通过 next() 方法逐个获取数据。而 Streamasync 版本的 Iterator,它使用 next().await 来获取数据项。

示例:将 Iterator 转换为 Stream

use trpl::{stream_from_iter, StreamExt};let numbers = vec![1, 2, 3, 4, 5];
let stream = stream_from_iter(numbers.into_iter());while let Some(value) = stream.next().await {println!("Received: {}", value);
}

此示例中:

  • stream_from_iter()Iterator 转换为 Stream
  • 通过 stream.next().await 按顺序异步获取数据项。

2. 组合 Streams

2.1 构建 Stream 处理异步消息

在实际应用中,我们经常需要从网络、数据库或消息队列中接收数据。这时,可以用 trpl::channel 创建 Stream 来异步处理数据。

use trpl::{channel, ReceiverStream};fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = channel();spawn_task(async move {for letter in "abcdefghij".chars() {tx.send(letter.to_string()).await.unwrap();}});ReceiverStream::new(rx)
}while let Some(msg) = get_messages().next().await {println!("Message: {}", msg);
}
  • get_messages 返回一个 Stream,每次 next().await 便能获取新的数据项。
  • 通过 spawn_task 启动异步任务,定期向 Stream 发送数据。

3. 控制 Stream 速率与超时

3.1 timeout:为 Stream 设置超时

当处理外部数据时,我们可能希望对每个 Stream 数据项设定超时时间,以避免某个数据源长时间无响应。

use trpl::{StreamExt, sleep, Duration};let messages = get_messages().timeout(Duration::from_millis(200));while let Some(result) = messages.next().await {match result {Ok(msg) => println!("Message: {}", msg),Err(_) => println!("Timeout occurred!"),}
}
  • timeout() 方法为 Stream 每个数据项设置超时时间。
  • 当数据在 200ms 内到达时,正常输出,否则触发超时逻辑。

3.2 throttle:限制 Stream 处理速率

有时,我们希望 Stream 以固定的速率生成数据,而不是尽可能快地处理。

use trpl::StreamExt;let throttled_messages = get_messages().throttle(Duration::from_millis(100));
  • throttle() 方法限制 Stream 处理频率,每 100ms 处理一个数据项。
  • 避免 Stream 过快地填充下游处理逻辑。

4. 合并多个 Streams

4.1 merge:合并多个 Stream

在某些情况下,我们可能有多个 Stream 数据源,例如:

  • 一个 Stream 处理用户输入
  • 一个 Stream 处理传感器数据

可以使用 merge() 将它们合并到一个 Stream,以便统一处理:

let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals().map(|i| format!("Interval: {}", i));let merged = messages.merge(intervals);while let Some(event) = merged.next().await {println!("Received: {}", event);
}
  • messages 处理异步消息,带 200ms 超时。
  • intervals 生成时间间隔数据(Interval: 1, Interval: 2, …)。
  • merge() 方法合并两个 Stream,同时接收消息和时间间隔。

4.2 take:限制 Stream 处理的项数

有时,我们希望 Stream 只处理有限数量的数据项。例如,限制为 10 条:

let limited_stream = merged.take(10);

这样,merged 只会输出 10 条数据,然后 Stream 自动结束。

5. 处理 Stream 可能的错误

在异步系统中,消息通道的 send 操作可能会失败,例如 tx.send(msg).await.unwrap();

如果通道关闭,send 会返回 Err。因此,我们应当合理地处理这些错误,而不是 unwrap()

if let Err(e) = tx.send(msg).await {println!("Error sending message: {:?}", e);break;
}

在真实应用中,应当根据错误类型采取适当的恢复策略,而不是直接 break 退出。


6. 总结

  • Stream 适用于异步数据流,类似 Iterator,但支持 await
  • timeout 可为 Stream 每个数据项设置超时时间。
  • throttle 限制 Stream 生成数据的速率。
  • merge 将多个 Stream 合并,便于处理多个数据源。
  • take 限制 Stream 处理的最大数据项数。
  • 合理处理 send 失败,避免异步任务意外崩溃。

🚀 适用场景:

  • 处理 WebSocketKafka数据库监听流式数据
  • 限流 API 调用,避免发送太多请求。
  • 处理用户事件流,如 键盘输入、鼠标点击

通过 Stream 及其扩展方法,我们可以轻松构建高效的异步数据处理系统。Rust 提供了强大的 async 生态,让我们能更轻松地编写安全、高性能的并发代码!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/70990.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蓝桥杯 路径之谜

路径之谜 题目描述 小明冒充 XX 星球的骑士&#xff0c;进入了一个奇怪的城堡。 城堡里边什么都没有&#xff0c;只有方形石头铺成的地面。 假设城堡地面是 nnnn 个方格。如下图所示。 按习俗&#xff0c;骑士要从西北角走到东南角。可以横向或纵向移动&#xff0c;但不能斜着走…

3-5 WPS JS宏 工作表的移动与复制学习笔记

************************************************************************************************************** 点击进入 -我要自学网-国内领先的专业视频教程学习网站 *******************************************************************************************…

聊聊Java的SPI机制

个人自建博客地址 什么是SPI呢&#xff1f; SPI全称Service Provider Interface&#xff0c;翻译过来就是服务提供者接口。调用方提供接口声明&#xff0c;服务提供方对接口进行实现&#xff0c;提供服务的一种机制&#xff0c;服务提供方往往是第三方或者是外部扩展。 下面…

langchain4j+local-ai小试牛刀

序 本文主要研究一下如何本地运行local-ai并通过langchain4j集成调用。 步骤 curl安装 curl https://localai.io/install.sh | sh% Total % Received % Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed 100 21509 …

什么是“零日漏洞”(Zero-Day Vulnerability)?为何这类攻击被视为高风险威胁?

正文 零日漏洞&#xff08;Zero-Day Vulnerability&#xff09; 是指软件、硬件或系统中存在的、尚未被开发者发现或修复的安全漏洞。攻击者在开发者意识到漏洞存在之前&#xff08;即“零日”内&#xff09;利用该漏洞发起攻击&#xff0c;因此得名。这类漏洞的“零日”特性使…

鸿蒙 ArkUI 实现 2048 小游戏

2048 是一款经典的益智游戏&#xff0c;玩家通过滑动屏幕合并相同数字的方块&#xff0c;最终目标是合成数字 2048。本文基于鸿蒙 ArkUI 框架&#xff0c;详细解析其实现过程&#xff0c;帮助开发者理解如何利用声明式 UI 和状态管理构建此类游戏。 一、核心数据结构与状态管理…

Milvus高性能向量数据库与大模型结合

Milvus | 高性能向量数据库&#xff0c;为规模而构建Milvus 是一个为 GenAI 应用构建的开源向量数据库。使用 pip 安装&#xff0c;执行高速搜索&#xff0c;并扩展到数十亿个向量。https://milvus.io/zh Milvus 是什么&#xff1f; Milvus 是一种高性能、高扩展性的向量数据…

kettle插件-自定义函数-数据脱敏

平常我们在使用kettle抽取数据的时候会涉及到敏感数据邀请脱敏或者进行掩码的需求&#xff0c;今天我们使用自定义函数插件来实现这些需求。 1、将自定义函数插件&#xff08;kettle-func-plugin.zip&#xff09;解压后放到kettle的plugins目录下面&#xff0c;然后重启服务。…

LeetCode 每日一题 2025/2/24-2025/3/2

记录了初步解题思路 以及本地实现代码&#xff1b;并不一定为最优 也希望大家能一起探讨 一起进步 目录 2/24 1656. 设计有序流2/25 2502. 设计内存分配器2/26 1472. 设计浏览器历史记录2/27 2296. 设计一个文本编辑器2/28 2353. 设计食物评分系统3/1 131. 分割回文串3/2 2/24 …

C++动态与静态转换区别详解

文章目录 前言一、 类型检查的时机二、安全性三、适用场景四、代码示例对比总结 前言 在 C 中&#xff0c;dynamic_cast 和 static_cast 是两种不同的类型转换操作符&#xff0c;主要区别体现在类型检查的时机、安全性和适用场景上。以下是它们的核心区别&#xff1a; 一、 类…

探秘《矩阵之美》:解锁矩阵的无限魅力

在这个数据驱动的时代&#xff0c;矩阵作为数学中的瑰宝&#xff0c;不仅在理论研究中占据核心地位&#xff0c;更在工程技术、计算机科学、物理学、经济学等众多领域发挥着不可替代的作用。今天&#xff0c;让我们通过中科院大学耿修瑞老师&#xff08;中科院空天信息研究院研…

【MySQL】(2) 库的操作

SQL 关键字&#xff0c;大小写不敏感。 一、查询数据库 show databases; 注意加分号&#xff0c;才算一句结束。 二、创建数据库 {} 表示必选项&#xff0c;[] 表示可选项&#xff0c;| 表示任选其一。 示例&#xff1a;建议加上 if not exists 选项。 三、字符集编码和排序…

Vue3实现文件上传、下载及预览全流程详解(含完整接口调用)

文章目录 一、环境准备1.1 创建Vue3项目1.2 安装依赖1.3 配置Element Plus 二、文件上传实现2.1 基础上传组件2.2 自定义上传逻辑&#xff08;Axios实现&#xff09; 三、文件下载实现3.1 直接下载&#xff08;已知文件URL&#xff09;3.2 后端接口下载&#xff08;二进制流&am…

分布式数据存储:提升系统弹性与性能的技术之路

分布式数据存储:提升系统弹性与性能的技术之路 在当今数据爆炸式增长的时代,传统的单机存储系统已无法满足大规模、高并发、低延迟的需求。尤其是在大数据、云计算和物联网的推动下,数据存储面临着前所未有的挑战。分布式数据存储应运而生,通过将数据分布在多个物理节点上…

在编译Linux的内核镜像和模块时,必须先编译内核镜像,再编译模块,顺序不可随意调整的原因

问&#xff1a;在编译Linux的内核镜像和模块时,必须先编译内核镜像,再编译模块,顺序不可随意调整 答&#xff1a;在编译 Linux 内核和模块时&#xff0c;必须先编译内核镜像&#xff0c;再编译模块&#xff0c;顺序不可随意调整。 原因&#xff1a; 模块依赖内核的头文件和符…

免费使用 DeepSeek API 教程及资源汇总

免费使用 DeepSeek API 教程及资源汇总 一、DeepSeek API 资源汇总1.1 火山引擎1.2 百度千帆1.3 阿里百炼1.4 腾讯云 二、其他平台2.1 华为云2.2 硅基流动 三、总结 DeepSeek-R1 作为 2025 年初发布的推理大模型&#xff0c;凭借其卓越的逻辑推理能力和成本优势&#xff0c;迅速…

千峰React:案例二

完成对html文档还有css的引入&#xff0c;引入一下数据&#xff1a; import { func } from prop-types import ./购物车样式.css import axios from axios import { useImmer } from use-immer import { useEffect } from reactfunction Item() {return (<li classNameacti…

用DeepSeek生成批量删除处理 PDF第一页工具

安装依赖库 在运行程序之前&#xff0c;请确保安装所需的库&#xff1a; pip install pymupdf python-docx Python 程序代码 import os import fitz # PyMuPDF from docx import Documentdef delete_pdf_first_page(input_path, output_path):"""删除 PDF…

redis的下载和安装详解

一、下载redis安装包 进入redis官网查看当前稳定版本&#xff1a; https://redis.io/download/发现此时的稳定版本是6.2.4&#xff0c; 此时可以去这个网站下载6.2.4稳定版本的tar包。 暂时不考虑不在windows上使用redis&#xff0c;那样将无法发挥redis的性能 二、上传tar…

如何使用 Jenkins 实现 CI/CD 流水线:从零开始搭建自动化部署流程

如何使用 Jenkins 实现 CI/CD 流水线:从零开始搭建自动化部署流程 在软件开发过程中,持续集成(CI)和持续交付(CD)已经成为现代开发和运维的标准实践。随着代码的迭代越来越频繁,传统的手动部署方式不仅低效,而且容易出错。为了提高开发效率和代码质量,Jenkins作为一款…