JAVA接口调用限速器

目录

1、并发限速

2、串行限速  


 

需求:批量调用第三方ERP接口,对方接口限流时,减缓调用速率。

1、并发限速


@Slf4j
@RestController
public class ApiCallTask {//第三方接口@Resourceprivate ErpService erpService;//异步线程池@Resourceprivate ThreadPoolTaskExecutor taskExecutor;//定时调度器@Resourceprivate ThreadPoolTaskScheduler taskScheduler;private static final BlockingQueue<Seller> sellerQueue = new LinkedBlockingQueue<>(1000);private static final RateLimiter rateLimiter = RateLimiter.create(200.0 / 60.0);private static final ObjectMapper objectMapper = new ObjectMapper();private static final int MAX_RETRY_COUNT = 5;private static final int BATCH_SIZE = 10;@Scheduled(cron = "0 0 2 * * ?")@RequestMapping(value = "/jobAfterSalesSync")public ResponseEntity<String> jobAfterSalesSync() {log.info("开始同步商家售后数据...");Map<String, String> queryMap = Maps.newHashMap();queryMap.put("status", "2");List<商家seller> sellerList = erpService.getSellerList(queryMap);List<商家seller> sellerList = sellerList  != null ? sellerList  : new ArrayList<>();log.info("共 {} 个商家待处理", sellerList.size());for (Seller seller : sellerList) {if (!sellerQueue.offer(seller)) {log.warn("队列已满,商家 {} 未加入队列", seller.getSellerName());} else {log.debug("商家 {} 已加入队列", seller.getSellerName());}}processBatch(); // 启动分批处理log.info("任务已提交,线程池活跃线程数: {}", taskExecutor.getActiveCount());return ResponseEntity.ok("任务已触发");}/*** 异步处理任务* taskScheduler与rateLimiter的分工*      processBatch 中每批完成后等待 3 秒再调度下一批,这是批次之间的宏观控制。Instant.now().plusSeconds(3)*      rateLimiter.acquire():在每批内部的 10 个任务中,控制每个 API 调用的微观速率。* 队列作用*      processBatch 在每次批次完成后检查 sellerQueue.isEmpty()。如果队列非空,通过 taskScheduler.schedule 调度下一次 processBatch,形成递归调用。保证所有seller都被处理*      限流(801)时,handleRetry 确保 Seller 被重新加入 sellerQueue。即使队列满,也通过延迟重试保证任务不丢失。* CompletableFuture作用*      CompletableFuture 是对传统 Future 的增强,支持链式调用、异常处理和任务组合,用于异步执行 callErpApi,实现每批 10 个 Seller 的并发处理。*      将 callErpApi 的执行从主线程中分离出来,提交给线程池(如 taskExecutor)异步运行。*      submit 方法返回一个 Future 对象(这里未使用返回值),表示任务已交给线程池处理。*      CompletableFuture.runAsync 创建异步任务,执行 callErpApi。在批处理中,每个 Seller 的 API 调用是独立的异步任务。*      futures 收集所有任务的 CompletableFuture 实例。*      使用 CompletableFuture.allOf 等待一批任务全部完成,然后触发后续操作(如调度下一批)。*      通过 .exceptionally 或 .whenComplete 处理异步任务中的异常,确保任务链不会因错误中断。* taskExecutor 的整体作用*      异步执行:*      将 processBatch 和 callErpApi 从主线程(定时任务或 HTTP 请求线程)中分离出来,避免阻塞主线程。*      例如,HTTP 请求可以快速响应,而实际处理在后台进行。*      并发处理:*      在 processBatch 中,10 个 callErpApi 任务可以并行执行(取决于线程池大小),提高处理效率。*      例如,如果线程池核心线程数为 10,则每批 10 个任务可以同时运行。*      与 taskScheduler 的分工:*      taskExecutor:负责执行具体的任务(processBatch 和 callErpApi)。*      taskScheduler:负责调度任务的执行时间(例如批次间隔 3 秒或限流重试延迟)。*      与代码目标的关系*      分批执行:taskExecutor 使每批 10 个任务并发运行。*      持续执行:与 taskScheduler 配合,确保队列非空时任务持续调度。*      限流控制:rateLimiter 限制速率,taskExecutor 提供并发支持,二者结合实现高效且受控的处理。*/private void processBatch() {if (sellerQueue.isEmpty()) {log.info("队列处理完成,剩余大小: {}", sellerQueue.size());return;}List<Seller> batch = new ArrayList<>();int drained = sellerQueue.drainTo(batch, BATCH_SIZE); // 取出最多 10 个,出队log.info("从队列中取出 {} 个元素,开始处理", drained);List<CompletableFuture<Void>> futures = new ArrayList<>();/**** 对于每批的 10 个 Seller,使用 CompletableFuture.runAsync 将 callErpApi(seller) 提交到 taskExecutor 执行。* runAsync 的第二个参数指定了执行器(taskExecutor),确保这些任务在 taskExecutor 的线程池中并行运行。*/for (Seller seller : batch) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> callErpApi(seller), taskExecutor);futures.add(future);}// 等待当前批次所有任务完成。在批次完成后执行回调,检查队列并调度下一次 processBatch(延迟 3 秒)。CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((result, throwable) -> {if (throwable != null) {log.error("批处理异常: {}", throwable.getMessage());}log.info("完成一批处理,剩余队列大小: {}", sellerQueue.size());if (!sellerQueue.isEmpty()) {// 只要 sellerQueue 中还有元素,processBatch 会在每次批次完成后通过 taskScheduler.schedule 重新调用自己。//限流(801 错误)时,handleRetry 会将 Seller 重新加入 sellerQueue,保持队列非空。//每次批次完成后,只要队列非空,就延迟 3 秒调度下一批,直到队列为空。taskScheduler.schedule(this::processBatch, Instant.now().plusSeconds(3));} else {log.info("队列已空,任务结束");}});}//调用ERP接口private void callErpApi(Seller seller) {rateLimiter.acquire();try {String response = request(seller);if (StringUtils.isBlank(response)) {log.info("商家 {} 处理成功(队列剩余: {})", seller.getSellerName(), sellerQueue.size());} else {JsonNode jsonResponse = objectMapper.readTree(response);if (jsonResponse.has("code") && jsonResponse.get("code").asInt() == 801) {int waitTime = extractWaitTime(jsonResponse.get("message").asText());log.warn("限流,商家 {} 暂停 {} 秒后重试", seller.getSellerName(), waitTime);//限流时,sellerQueue.offer(seller) 尝试入队。如果队列满,延迟 waitTime 秒后重试。handleRetry(seller, waitTime);} else {log.warn("其他错误,商家: {}, 接口返回: {},视为成功", seller.getSellerName(), response);}}} catch (Exception e) {log.error("API 调用异常,商家: {},视为成功", seller.getSellerName(), e);}}//重试处理private void handleRetry(Seller seller, int waitTime) {if (seller.getRetryCount() < MAX_RETRY_COUNT) {seller.incrementRetry();boolean requeued = sellerQueue.offer(seller);//入队if (requeued) {log.info("商家 {} 重试次数: {},已重新入队,等待下次批处理", seller.getSellerName(), seller.getRetryCount());} else {log.warn("队列已满,商家 {} 延迟 {} 秒后重试", seller.getSellerName(), waitTime);taskScheduler.schedule(() -> handleRetry(seller, waitTime), Instant.now().plusSeconds(waitTime));}} else {log.error("商家 {} 达到最大重试次数 {},丢弃", seller.getSellerName(), MAX_RETRY_COUNT);}}//获取限速接口中等待时间private static int extractWaitTime(String message) {Pattern pattern = Pattern.compile("(\\d+)\\s*秒");Matcher matcher = pattern.matcher(message);return matcher.find() ? Integer.parseInt(matcher.group(1)) : 30;}//请求接口public String request(Seller seller) {Map<String, Object> params = Maps.newHashMap();String[] range = DateUtil.getDateRange(14);params.put("limit", 200);params.put("page", 1);params.put("start_time", range[0]);params.put("end_time", range[1]);params.put("shop_nick", seller.getSellerName());try {String result = erpService.afterSalesData(params);return  result != null ? result  : "";} catch (Exception e) {log.error("请求 API 失败,商家: {}", seller.getSellerName(), e);return "{}";}}}

2、串行限速  

以上代码仍然有限速问题,调用接口限速频率太高,改造并优化。


@Slf4j
@RestController
public class ErpApiCallTask {@Resourceprivate ErpService erpService;@Resourceprivate ThreadPoolTaskExecutor taskExecutor;@Resourceprivate ThreadPoolTaskScheduler taskScheduler;private static final BlockingQueue<Seller> sellerQueue = new LinkedBlockingQueue<>(1000);private static final RateLimiter rateLimiter = RateLimiter.create(1.0 / 5.0); // 每 5 秒 1 次private static final ObjectMapper objectMapper = new ObjectMapper();private static final int MAX_RETRY_COUNT = 5;private static final long WAIT_INTERVAL = 30000; // 等待 30 秒检查新入队元素@Scheduled(cron = "0 0 2 * * ?")@RequestMapping(value = "/jobAfterSalesSync")public ResponseEntity<String> jobAfterSalesSync() {log.info("开始从ERP系统同步商家售后数据...");Map<String, String> queryMap = Maps.newHashMap();queryMap.put("status", "2");List<商家seller> sellerList = erpService.getSellerList(queryMap);log.info("共 {} 个商家待处理", sellerList.size());for (Seller seller : sellerList) {if (!sellerQueue.offer(seller)) {log.warn("队列已满,商家 {} 未加入队列,当前队列大小: {}", seller.getSellerName(), sellerQueue.size());} else {log.debug("商家 {} 已加入队列,当前队列大小: {}", seller.getSellerName(), sellerQueue.size());}}log.info("队列初始化完成,当前队列大小: {}", sellerQueue.size());// 异步启动处理taskExecutor.submit(this::processQueue);log.info("任务已提交,线程池活跃线程数: {}", taskExecutor.getActiveCount());return ResponseEntity.ok("任务已触发");}/*** 串行处理队列,使用 RateLimiter 控制每 5 秒 1 次调用,队列为空时等待新入队元素* 移除批处理和并发:原代码按批次处理(每次 10 个),并通过 CompletableFuture 并发执行。现在改为 processQueue,串行处理队列中的每个 Seller。* 移除 CompletableFuture:不需要并发,直接在单线程中顺序调用 callErpApi。* 串行执行:使用 orderQueue.poll() 逐个取出 Seller,每次处理一个后等待 5 秒。* 保留 taskExecutor.submit:异步启动处理,避免阻塞主线程(定时任务或 HTTP 请求)。处理逻辑在后台线程中串行执行。* 使用 RateLimiter 控制速率,在每次调用 callErpApi 前获取令牌,确保5秒最多 1 次调用。* 相比 Thread.sleep(1000),RateLimiter 更灵活,能动态调整速率并处理突发请求。* 限流处理,限流(801)时,延迟 waitTime 秒后重新入队。队列满时递归重试,确保任务不丢失。* 新增等待机制:当 orderQueue.isEmpty() 时,不直接退出,而是等待 WAIT_INTERVAL(秒),然后再次检查队列。如果等待后队列仍为空,设置 hasMoreTasks = false,结束循环;否则继续处理。* 新增标志变量 hasMoreTasks:用布尔变量控制外层循环,避免无限等待。*/private void processQueue() {boolean hasMoreTasks = true;while (hasMoreTasks) {if (!sellerQueue.isEmpty()) {log.info("开始处理队列,当前队列大小: {}", sellerQueue.size());Seller seller = sellerQueue.poll(); // 取出队列头部元素,if (seller != null) {log.info("从队列中取出商家: {},剩余队列大小: {}", seller.getSellerName(), sellerQueue.size());rateLimiter.acquire(); // 获取令牌,控制速率callErpApi(seller);}} else {log.info("队列当前为空,等待 {} 毫秒检查新入队元素", WAIT_INTERVAL);try {Thread.sleep(WAIT_INTERVAL); // 等待一段时间,检查是否有新元素} catch (InterruptedException e) {log.error("等待被中断", e);Thread.currentThread().interrupt();}if (sellerQueue.isEmpty()) {log.info("等待后队列仍为空,任务结束");hasMoreTasks = false; // 队列仍为空,结束循环} else {log.info("检测到新入队元素,继续处理,当前队列大小: {}", sellerQueue.size());}}}log.info("队列处理完成,剩余大小: {}", sellerQueue.size());}private void callErpApi(Seller seller) {try {String response = request(seller);if (StringUtils.isBlank(response)) {log.info("商家 {} 处理成功,当前队列大小: {}", seller.getSellerName(), sellerQueue.size());} else {JsonNode jsonResponse = objectMapper.readTree(response);if (jsonResponse.has("code") && jsonResponse.get("code").asInt() == 801) {int waitTime = extractWaitTime(jsonResponse.get("message").asText());log.warn("限流,商家 {} 暂停 {} 秒后重试,当前队列大小: {}",seller.getSellerName(), waitTime, sellerQueue.size());handleRetry(seller, waitTime);} else {log.warn("其他错误,商家: {},接口返回: {},视为成功,当前队列大小: {}",seller.getSellerName(), response, sellerQueue.size());}}} catch (Exception e) {log.error("API 调用异常,商家: {},视为成功,当前队列大小: {}",seller.getSellerName(), sellerQueue.size(), e);}}private void handleRetry(Seller seller, int waitTime) {if (seller.getRetryCount() < MAX_RETRY_COUNT) {seller.incrementRetry();// 延迟 waitTime 秒后重新入队taskScheduler.schedule(() -> {boolean requeued = sellerQueue.offer(seller);if (requeued) {log.info("商家 {} 重试次数: {},已重新入队,当前队列大小: {}",seller.getSellerName(), seller.getRetryCount(), sellerQueue.size());} else {log.warn("队列已满,商家 {} 延迟 {} 秒后重试,当前队列大小: {}",seller.getSellerName(), waitTime, sellerQueue.size());handleRetry(seller, waitTime); // 递归重试}}, Instant.now().plusSeconds(waitTime));} else {log.error("商家 {} 达到最大重试次数 {},丢弃,当前队列大小: {}",seller.getSellerName(), MAX_RETRY_COUNT, sellerQueue.size());}}private static int extractWaitTime(String message) {Pattern pattern = Pattern.compile("(\\d+)\\s*秒");Matcher matcher = pattern.matcher(message);return matcher.find() ? Integer.parseInt(matcher.group(1)) : 30;}public String request(Seller seller) {Map<String, Object> params = Maps.newHashMap();String[] range = DateUtil.getDateRange(14);params.put("limit", 200);params.put("page", 1);params.put("start_time", range[0]);params.put("end_time", range[1]);params.put("shop_nick", seller.getSellerName());try {String result = erpService.afterSalesSyncHandler(params);return result  != null ? result  : "";} catch (Exception e) {log.error("请求 API 失败,商家: {}", seller.getSellerName(), e);return "{}";}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/899622.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32 CAN控制器硬件资源与用法

1、硬件结构图 以STM32F4为例&#xff0c;他有2个can控制器&#xff0c;分别为 CAN1 CAN2。 每个CAN控制器&#xff0c;都有3个发送邮箱、2个接收fifo&#xff0c;每个接收fifo又由3个接收邮箱组成。也即每个CAN控制器都有9个邮箱&#xff0c;其中3个供发送用&#xff0c;3个…

【C++ 继承】—— 青花分水、和而不同,继承中的“明明德”与“止于至善”

欢迎来到ZyyOvO的博客✨&#xff0c;一个关于探索技术的角落&#xff0c;记录学习的点滴&#x1f4d6;&#xff0c;分享实用的技巧&#x1f6e0;️&#xff0c;偶尔还有一些奇思妙想&#x1f4a1; 本文由ZyyOvO原创✍️&#xff0c;感谢支持❤️&#xff01;请尊重原创&#x1…

Qt warning LNK4042: 对象被多次指定;已忽略多余的指定

一、常规原因&#xff1a; pro或pri 文件中源文件被多次包含 解决&#xff1a;删除变量 SOURCES 和 HEADERS 中重复条目 二、误用 对于某些pri库可以使用如下代码简写包含 INCLUDEPATH $$PWDHEADERS $$PWD/*.hSOURCES $$PWD/*.cpp但是假如该目录下只有头文件&#xff0c;没…

Visual Studio Code 无法打开源文件解决方法

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;Linux &#x1f525; 系列专栏&#xff1a;C从入门到精通 目录 一&#xff1a;&#x1f525; 突发状况 二&#xff1a;&#x1f525; 共勉 一&#xff1a;&#x1f525; 突发状况 &#x1f42c;…

js文字两端对齐

目录 一、问题 二、原因及解决方法 三、总结 一、问题 1.text-align: justify; 不就可以了吗&#xff1f;但是实际测试无效 二、原因及解决方法 1.原因&#xff1a;text-align只对非最后一行文字有效。只有一行文字时&#xff0c;text-align无效&#xff0c;要用text-alig…

LeetCode算法题(Go语言实现)_20

题目 给你两个下标从 0 开始的整数数组 nums1 和 nums2 &#xff0c;请你返回一个长度为 2 的列表 answer &#xff0c;其中&#xff1a; answer[0] 是 nums1 中所有 不 存在于 nums2 中的 不同 整数组成的列表。 answer[1] 是 nums2 中所有 不 存在于 nums1 中的 不同 整数组成…

每天认识一个设计模式-桥接模式:在抽象与实现的平行宇宙架起彩虹桥

一、前言&#xff1a;虚拟机桥接的启示 使用过VMware或者Docker的同学们应该都接触过网络桥接&#xff0c;在虚拟机网络配置里&#xff0c;桥接模式是常用的网络连接方式。选择桥接模式时&#xff0c;虚拟机会通过虚拟交换机与物理网卡相连&#xff0c;获取同网段 IP 地址&…

java笔记02

运算符 1.隐式转换和强制转换 类型转换的分类 1.隐式转换&#xff1a; 取值范围小的数值 转换为 取值范围大的数值 2.强制转换&#xff1a; 取值范围大的数值 转换为 取值范围小的数值隐式转换的两种提升规则 取值范围小的&#xff0c;和取值范围大的进行运算&#xff0c;小的…

Redis-07.Redis常用命令-集合操作命令

一.集合操作命令 SADD key member1 [member2]&#xff1a; sadd set1 a b c d sadd set1 a 0表示没有添加成功&#xff0c;因为集合中已经有了这个元素了&#xff0c;因此无法重复添加。 SMEMBERS key: smembers set1 SCARD key&#xff1a; scard set1 SADD key member1 …

李飞飞、吴佳俊团队新作:FlowMo如何以零卷积、零对抗损失实现ImageNet重构新巅峰

目录 一、摘要 二、引言 三、相关工作 四、方法 基于扩散先前的离散标记化器利用广告 架构 阶段 1A&#xff1a;模式匹配预训练 阶段 1B&#xff1a;模式搜索后训练 采样 第二阶段&#xff1a;潜在生成建模 五、Coovally AI模型训练与应用平台 六、实验 主要结果 …

CSS3:现代Web设计的魔法卷轴

一、布局革命&#xff1a;从平面到多维空间 1.1 Grid布局的次元突破 星际战舰布局系统 .galaxy {display: grid;grid-template-areas: "nav nav nav""sidebar content ads""footer footer footer";grid-template-rows: 80px 1fr 120p…

美观快速的react 的admin框架

系统特色&#xff1a; - &#x1f3a8; 精心设计的UI主题系统&#xff0c;提供优雅的配色方案和视觉体验 - &#x1f4e6; 丰富完整的组件库&#xff0c;包含大量开箱即用的高质量组件 - &#x1f528; 详尽的组件使用示例&#xff0c;降低开发者的学习成本 - &#x1f680…

【C++】 string底层封装的模拟实现

目录 前情提要Member functions —— 成员函数构造函数拷贝构造函数赋值运算符重载析构函数 Element access —— 元素访问Iterator —— 迭代器Capacity —— 容量sizecapacityclearemptyreserveresize Modifiers —— 修改器push_backappendoperator(char ch)operator(const …

计算机网络相关知识小结

计算机网络 1.计算机网络&#xff1a;独立计算机&#xff0c;通信线路连接&#xff0c;实现资源共享 2.组成&#xff1a;资源子网和通信子网 3.拓扑分类 4.范围&#xff1a;LAN, MAN. WAN 5、有线和无线 6.按照方向&#xff1a;单工、双工&#xff0c;全双工 7.传输对象方式&a…

16-CSS3新增选择器

知识目标 掌握属性选择器的使用掌握关系选择器的使用掌握结构化伪类选择器的使用掌握伪元素选择器的使用 如何减少文档内class属性和id属性的定义&#xff0c;使文档变得更加简洁&#xff1f; 可以通过属性选择器、关系选择器、结构化伪类选择器、伪元素选择器。 1. 属性选择…

【弹性计算】异构计算云服务和 AI 加速器(四):FPGA 虚拟化技术

《异构计算云服务和 AI 加速器》系列&#xff0c;共包含以下文章&#xff1a; 异构计算云服务和 AI 加速器&#xff08;一&#xff09;&#xff1a;功能特点异构计算云服务和 AI 加速器&#xff08;二&#xff09;&#xff1a;适用场景异构计算云服务和 AI 加速器&#xff08;…

Java进阶——位运算

位运算直接操作二进制位&#xff0c;在处理底层数据、加密算法、图像处理等领域具有高效性能和效率。本文将深入探讨Java中的位运算。 本文目录 一、位运算简介1. 与运算2. 或运算异或运算取反运算左移运算右移运算无符号右移运算 二、位运算的实际应用1. 权限管理2. 交换两个变…

OpenAI深夜直播「偷袭」谷歌!GPT-4o原生图像生成:奥特曼带梗图,AGI战场再燃战火

引言&#xff1a;AI战场的「闪电战」 当谷歌刚刚发布「地表最强」Gemini 2.5 Pro时&#xff0c;OpenAI立即以一场深夜直播「闪电反击」——GPT-4o的原生图像生成功能正式上线&#xff01;从自拍变梗图到相对论漫画&#xff0c;奥特曼&#xff08;OpenAI团队&#xff09;用一连…

鸿蒙harmonyOS:笔记 正则表达式

从给出的文本中&#xff0c;按照既定的相关规则&#xff0c;匹配出符合的数据&#xff0c;其中的规则就是正则表达式&#xff0c;使用正则表达式&#xff0c;可以使得我们用简洁的代码就能实现一定复杂的逻辑&#xff0c;比如判断一个邮箱账号是否符合正常的邮箱账号&#xff0…

[首发]烽火HG680-KD-海思MV320芯片-2+8G-安卓9.0-强刷卡刷固件包

烽火HG680-KD-海思MV320芯片-28G-安卓9.0-强刷卡刷固件包 U盘强刷刷机步骤&#xff1a; 1、强刷刷机&#xff0c;用一个usb2.0的8G以下U盘&#xff0c;fat32&#xff0c;2048块单分区格式化&#xff08;强刷对&#xff35;盘非常非常挑剔&#xff0c;usb2.0的4G U盘兼容的多&a…