[转] 封装并发任务方法

news/2025/10/30 14:50:34/文章来源:https://www.cnblogs.com/Answer1215/p/19177012

作者:谢杰

该文章是并发异步操作系列文章第五篇。

有了前面几篇文章所介绍的知识铺垫后,本系列最终篇,我们来封装一个能够指定并发上限的方法。

需求

先来过一下需求,封装一个异步方法 runWithConcurrency,如下:

async function runWithConcurrency(items, worker, maxConcurrency) {}

该方法接收 3 个参数:

  • items:要处理的任务列表
  • worker:处理单个任务的异步函数
  • maxConcurrency:最大并发量

先假设这个方法我们已经写好了,那么外部在使用的时候,大概是这么用的:

// 示例 1:模拟任务
const items = Array.from({ length: 8 }, (_, i) => i + 1);const worker = async (n) => {// 模拟耗时 200~800ms 的异步任务await new Promise((r) => setTimeout(r, 200 + Math.random() * 600));console.log(`处理完成:task=${n}`);return n * n; // 返回结果
};const results = await runWithConcurrency(items, worker, 3);
console.log('结果(顺序与输入一致):', results);
// 示例 2:真实网络请求(批量拉取)
const urls = ['/api/user/1','/api/user/2','/api/user/3','/api/user/4','/api/user/5',
];const fetchUser = async (url) => {const res = await fetch(url);if (!res.ok) throw new Error(`请求失败:${res.status}`);const data = await res.json();console.log('已获取:', url);return data;
};const users = await runWithConcurrency(urls, fetchUser, 2);
console.log('所有用户:', users);

分析与实现

明确了需求后,接下来我们来逐步实现。假设外界调用的时候,是这么调用的:

await runWithConcurrency(urls, fetchUser, 2);

那么这里传递的 2 是什么?

没错,是并发的上限,我们可以将其想象成有两个工人,如下图

┌──────────────────────────────────────────────────┐
│ [任务五]  [任务四]  [任务三]  [任务二]  [任务一] │  ← i(下一个)
└──────────────────────────────────────────────────┘│         ││         │▼         ▼工人 2     工人 1

工人1 先领取了任务一,然后开始执行任务一

工人2 领取任务二,然后开始执行任务二

那么任务三交给谁呢?究竟由工人1 领取还是工人2 领取呢?

那得看哪一个工人的工作先完成,假设工人1 先完成了任务一,那么这个工人就去领取任务三;反之,如果是工人2 先完成了任务二,则是由工人2 去领取任务三。依此类推,后面的任务四、任务五也是这样,哪个工人手上没活了,就去领取下一个任务。这和我们现实生活中的场景,也是一致的。

这种模式在代码里对应的是:固定数量的工人 + 一个共享的“下一个任务”指针

接下来落地到具体的代码。首先,我们来确定工人的人数:

async function runWithConcurrency(items, worker, maxConcurrency) {const n = Math.max(1, Math.min(maxConcurrency, items.length));
}

这里的 n 是实际工人的人数。有的同学会觉得很奇怪,为什么不直接用 maxConcurrency?因为:

  • 工人数不能超过任务总数(否则有些工人一上来就没活干)
  • 至少要有 1 个工人

接下来这 n 个工人开始干活:

const workers = [];
for (let k = 0; k < n; k++) workers.push(spawn());

这里的 workers 用来收集每个工人(spawn())返回的 Promise。一旦调用 spawn(),工人就会立刻去领取任务并开始执行;数组里收集的是“工人何时收工”的 Promise,而不是每个任务的 Promise。

紧接着是“工人如何领活儿”。该方法需要不断地把任务分配给工人,直到任务列表为空:

let i = 0; // 共享任务指针:指向“下一个要处理的下标”async function spawn() {// 每个工人都需要不断地从任务队列中领取任务while (i < items.length) {const idx = i++;                 // 领取当前要处理的任务下标await worker(items[idx], idx);   // 处理单个任务(工人内部串行)// 处理完继续回到 while,再领下一单;直到任务列表为空}// 跳出 while:说明任务已经被领完,这个工人可以正常收工
}

这里要强调两点:

  • 每个工人内部是串行的:比如工人 1 领了任务一,必须等这单完成(await 返回)后才能去领下一单。并发来自“有 n 个工人同时在干”,而不是让单个工人内部再并发。
  • 共享指针是安全的const idx = i++ 这一步在同一次调用栈里是同步完成的(读取→使用→自增),不会被别的工人中途打断,因此不会出现两个工人领到同一单。真正“让出执行权”的地方发生在 await worker(...) 之后。

最后,等待所有工人完工,这里使用 Promise.allSettled

await Promise.allSettled(workers);

这一步不会启动任务,只是“在门口等所有工人跑完自己手上的任务”。

allSettled 的好处是:即便某个工人因为某一单失败而变成 rejected,也不会短路,其他工人仍会把剩下的活干完,更符合“批量任务尽量跑完”的诉求。

最终完整的代码如下:

async function runWithConcurrency(items, worker, maxConcurrency) {if (!items?.length) return;// i 是“下一个要被领取的任务索引”。多个工人共享这个变量。// 在 JS 的单线程事件循环中,“读取 i -> 使用 i -> i++”这一小段代码在一次// 宏/微任务中是原子的(不会被其他 JS 执行栈打断),因此可作为简单的任务分发指针。let i = 0;// workers 用来收集每个“工人”的 Promise,后面用 allSettled 等待他们全部结束。const workers = [];// spawn = 启动一个工人:不停从任务池里领取下一个索引并处理,直到没有任务可领async function spawn() {// 当还有未处理的任务(i < items.length)就继续循环while (i < items.length) {// 领取当前要处理的任务索引,然后自增以留给下一个任务const idx = i++;// 执行该任务:必须 await,保证这个工人在“串行处理自己的任务队列”// 如果不 await,就会在单个工人内部产生更高的并发,超出 maxConcurrency 的约束await worker(items[idx], idx);// 若 worker 抛错(reject),该 spawn() 的 Promise 会变为 rejected;// 但我们外层会用 Promise.allSettled,所以不会影响其它工人继续工作。}// 退出 while:说明任务已经被领完,这个工人可以正常收工(resolve)}// 计算实际需要启动的工人数:// - 不能超过任务总数(否则有些工人一上来就没活干)// - 至少 1 个const n = Math.max(1, Math.min(maxConcurrency, items.length));// 启动 n 个工人,每个工人都是一个独立的异步执行体(Promise)for (let k = 0; k < n; k++) workers.push(spawn());// 等待所有工人“收工”。使用 allSettled:// - 不会因为某个工人失败而中断等待(all 会短路,allSettled 不会)// - 适合“多个任务相互独立,允许部分失败也要跑完”的场景await Promise.allSettled(workers);// 函数到这里 resolve,表示全部任务都已尝试完成(成功或失败),所有工人均结束。
}

至此,带并发上限的并发执行方法就写完了。

写在最后

这一次我们实现了通用的 runWithConcurrency(items, worker, maxConcurrency),用固定数量的工人配合共享游标分发任务,工人内部串行、整体受控并发;配套示例也证明它能稳妥跑完批量请求和本地批处理场景。

使用时有几处要点值得牢牢记住:

  1. 并发从调用 spawn() 的那一刻就开始了,Promise.allSettled(workers)只是在门口把所有工人等到收工,并不会启动任务、更不会决定顺序;
  2. 共享指针用的是 const idx = i++,在单线程调用栈里是同步完成的,不会出现两个人领到同一单;
  3. 工人内部必须 await worker(...),否则单个工人会把并发继续堆高,超出你的 maxConcurrency
  4. 另外,workers 装的是“工人的promise”而不是“每个任务的promise”,若需要拿回结果,可以在工人里按下标写回一个 out[idx] 再返回。空列表可以直接早退,maxConcurrency 也最好做下限与取整的校验。

落到真实项目,还方法可以继续打磨:

  1. 给它包上超时与取消,必要时做重试与指数退避;
  2. 按需暴露进度回调与观测埋点,用数据调参;
  3. 在高压场景里引入优先级、背压和动态并发,保护上下游。

掌握并灵活运用这套基建,小到脚本批处理,大到服务侧限流,你的异步就能既稳又快。


-EOF-

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/950804.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MyEMS:重塑能源管理格局的智能化利器

在“双碳”目标引领与企业降本增效需求的双重驱动下,能源管理已从传统的粗放式统计升级为精细化、智能化的核心管理模块。MyEMS(My Energy Management System)作为一款开源且高度可定制的能源管理系统,凭借其独特的…

2025 年防撞护栏生产厂家最新推荐排行榜:涵盖铝合金 / Q235/Q355B 桥梁及景观、灯光、河道、公路、喷塑、道路护栏企业

引言 随着交通基础设施建设不断推进,防撞护栏作为保障道路与桥梁安全的核心设施,市场需求持续增长。但当前市场上生产厂家数量繁杂,产品质量、生产能力与服务水平差异显著,部分厂家存在原材料以次充好、工艺落后、…

成人草书培训市场:北兰亭引领专业风尚

在当今社会,成人对自我修养和艺术追求的需求日益增长,草书培训成为了许多人提升自我、陶冶情操的热门选择。成人草书培训市场蕴含着诸多优势,吸引着众多爱好者投身其中。成人草书培训的行业优势与特点成人草书培训具…

逆向基础--反编译介绍(002)

逆向基础--反编译介绍(002)一. 机器码机器码是指:CPU能够直接理解和执行的二进制指令。它是计算机世界的"母语",由0和1组成的序列。不管是高级语言(java,C#),还是中级语言(C和C++)最终都会转换成机器…

2025年锤片式粉碎机供货厂家权威推荐榜单:食品添加剂‍粉碎机/白糖粉碎机 /五谷杂粮粉碎机源头厂家精选

在食品加工、化工原料和制药行业,高效粉碎设备正成为提升生产效能的关键环节。 根据行业分析数据显示,全球粉碎机械设备市场规模预计将持续增长,其中锤片式粉碎机因结构简单、适用性广等特点,在细分市场中占有重要…

2025年热门的吸塑塑料托盘TOP品牌厂家排行榜

2025年热门的吸塑塑料托盘TOP品牌厂家排行榜 随着物流行业的快速发展,吸塑塑料托盘因其轻量化、耐用性强、环保可回收等优势,成为仓储物流领域的重要工具。2025年,市场对高品质塑料托盘的需求持续增长,各大品牌厂…

【项目复现上新】突破推理瓶颈!LightLLM轻量化部署新范式,打造高性能法律智能体

当前大语言模型(LLM)虽能力突飞猛进,却难逃“知识静态滞后”与“专业内容幻觉”两大痛点。在法律、医疗等强合规场景中,这几乎是“致命缺陷。而 RAG(检索增强生成)框架虽能通过融合外部知识库破解此困,却在落地…

2025年跨境电商APP权威推荐榜单:海外跨境电商/跨境电商系统/跨境电商开店服务商精选

随着全球电商市场持续扩张,跨境电商服务生态日趋完善。数据显示,2024年全球跨境电商市场规模预计将保持两位数增长,其中东南亚、拉美等新兴市场增速尤为显著。 在这一背景下,专业的跨境电商服务商通过整合营销、物…

麒麟服务器v11安装

麒麟服务器v11安装银河麒麟系统下载地址:https://www.kylinos.cn/support/trial/download 启动盘制作工具rufus下载地址:https://rufus.ie/zh/download

2025年铜箔PCB滤芯源头厂家权威推荐榜单:光纤滤芯/CMP抛光液滤芯 /墨水滤芯源头厂家精选

在高端电子制造领域,铜箔PCB滤芯作为确保电路板生产质量的关键部件,其性能直接影响产品良率。据行业统计数据显示,2025年全球PCB专用滤芯市场规模预计达到42亿元,中国市场的年均增长率稳定在18%以上。本报告基于产…

世界500强k8s全栈架构师

下载地址 https://8ma.co/res/R49W9GSW.zstitle { width: 280px; text-align: center; font-size: 26px } .zsimgweixin { width: 280px } .zsimgali { width: 280px; padding: 0px 0px 50px 0px } .zsleft { float: l…

2025年马口铁罐灌装压盖一体机源头厂家权威推荐榜单:啤酒小型灌装机/小型啤酒灌装机/啤酒桶倒置灌装机源头厂家精选

在啤酒生产行业,灌装压盖设备的精度与效率直接决定了产品的最终质量。相关数据显示,灌装过程中的氧摄入量每降低0.1ppm,啤酒保鲜期可延长30天以上。 在马口铁罐啤酒包装领域,灌装压盖一体机已成为现代化生产线的核…

[转] 并发与并行

作者:谢杰 该文章是并发异步操作系列文章第四篇。今天我们来解决一个很多同学经常搞混的概念对:并发和并行。 这两个词在日常交流中常常被混用,但在编程领域,它们指的是完全不同的执行模式。理解它们的区别,不仅能…

2025年评价高的卫生级阀门厂家最新热销排行

2025年评价高的卫生级阀门厂家最新热销排行在食品、制药、乳品等卫生要求严格的行业中,卫生级阀门作为关键设备,其质量直接影响生产安全和产品品质。2025年,随着行业标准的不断提高和技术的持续创新,一批优秀的卫生…

开源能源管理系统:赋能能源转型的 “透明化” 工具

在全球 “碳中和” 目标与能源结构转型的双重驱动下,企业、公共设施乃至家庭对能源的 “精细化管理” 需求日益迫切。传统商业能源管理系统(EMS)常受限于高昂授权费、封闭架构与定制化门槛,难以适配多样化的能源监…

详细介绍:Dockerfile 镜像构建实战

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

https接口的支持判断,http升级到https需求

https接口的支持判断,http升级到https需求https接口的支持判断,http升级到https需求 1.以下是仅仅支持http,不支持https的提示。http://myapi.mytest.com:8081/pinganzong/getCity {"resCode":"999999&…

源杰科技国产激光器芯片-光模块源头

源杰科技(股票代码:688498.SH)是中国光芯片领域的核心企业,近年来在AI算力爆发、数据中心升级和国产替代加速的背景下,实现了从“电信市场承压”到“数通业务爆发”的战略转型。以下从公司概况、技术实力、产品布…

2025 年仿石漆厂家最新推荐榜,技术实力与市场口碑深度解析的优质企业精选批刮 / 别墅 / 批发 / 出口仿石漆推荐

引言 当前仿石漆行业规模持续扩大,市场中品牌数量激增,但产品质量、技术水平与服务能力却呈现显著差异。部分产品存在仿石效果失真、耐候性差、环保不达标等问题,不仅影响建筑外观持久性,还可能引发安全与健康隐患…

2025 年外墙涂料厂家最新推荐榜,技术实力与市场口碑深度解析外墙涂料工程 / 外墙涂料翻新推荐

引言 为精准筛选 2025 年优质外墙涂料品牌,建筑装饰装修材料协会联合行业权威检测机构开展专项测评,测评覆盖全国 120 余家涂料企业。测评采用 “三维九项” 评价体系,从技术实力(研发投入占比、专利数量)、产品性…