java实现请求缓冲合并

业务背景:

我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,例如:接口入参json包含两个字段,createBy和receiverList,完整的入参json示例如下:

{"createBy": "aa","receiverList": ["bb","cc"]
}

实际第三方业务会进行多次调用接口,每次传递的数据可能如下:

{"createBy": "aa","receiverList": ["bb"]
}
或者
{"createBy": "aa","receiverList": ["cc"]
}
或者
{"createBy": "bb","receiverList": ["cc"]
}
或者
{"createBy": "aa","receiverList": ["bb","cc"]
}

所有需要对第三方业务传递过来的数据进行缓冲合并处理,减轻真正的后台服务的压力。

代码实现

package com.demo;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** Description: 请求合并管理类*/
@Slf4j
@Component
public class RequestMerger {// 线程池核心线程数private final int corePoolSize = 200;// 任务执行超时时间,单位:毫秒private final int timeout = 5 * 60 * 1000;// 队列,队列长度为Integer.MAX_VALUEprivate final LinkedBlockingQueue<String> requestQueue = new LinkedBlockingQueue<>();// 定时器,所有任务共用线程池private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(corePoolSize,new CustomizableThreadFactory("schedule-executor-"));// 是否关闭标志private final AtomicBoolean isShutdown = new AtomicBoolean(false);/*** 构造函数,用于初始化请求合并器。** @param batchSize   每次合并的最大请求数量。* @param delayMillis 合并请求的周期间隔,单位为毫秒。*/public RequestMerger(int batchSize, long delayMillis) {// 启动定时器,定期合并请求,延迟delayMillis后开始,之后每隔delayMillis执行一次scheduler.scheduleAtFixedRate(() -> {if (!isShutdown.get()) {List<String> batch = new ArrayList<>(batchSize);int drainedCount = requestQueue.drainTo(batch, batchSize);log.info("==>scheduler,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());if (!batch.isEmpty()) {// 异步执行任务,防止业务执行时间过长导致业务整体延迟过大scheduler.submit(() -> {sendRequestBatch(batch);});}}}, delayMillis, delayMillis, TimeUnit.MILLISECONDS);}/*** 发送请求批次的方法。** @param batch 请求批次。*/private void sendRequestBatch(List<String> batch) {Future<?> future = scheduler.submit(() -> {try {// 在这里实现你的请求发送逻辑// 可以使用HTTP客户端库(如Apache HttpClient或OkHttp)来发送请求// ...System.out.println("Sending batch of " + batch.size() + " requests");} catch (Exception e) {// 异常处理逻辑System.err.println("Error sending requests: " + e.getMessage());}});// 尝试获取任务结果,如果超过超时时间则抛出TimeoutException异常,进行取消任务try {// 超时时间,单位:毫秒future.get(timeout, TimeUnit.MILLISECONDS);} catch (TimeoutException | ExecutionException e) {// 超时或执行异常时取消任务future.cancel(true);} catch (Exception e) {log.error("==>任务执行异常", e);// 任务执行异常future.cancel(true);}}/*** 在对象销毁前执行的关闭操作。* 该方法从请求队列中拉取所有未处理的请求,并将它们批量发送。* 无参数和返回值。*/@PreDestroypublic void shutdown() {isShutdown.set(true);List<String> batch = new ArrayList<>();// 获取请求队列中的剩余所有请求int drainedCount = requestQueue.drainTo(batch);log.info("==>shutdown,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());// 批量发送收集到的剩余请求sendRequestBatch(batch);// 关闭定时执行器scheduler.shutdown();try {if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {log.error("Scheduler did not terminate gracefully within 60 seconds, force shutting down.");scheduler.shutdownNow();}} catch (InterruptedException e) {log.warn("Interrupted during scheduler termination, force shutting down.");scheduler.shutdownNow();Thread.currentThread().interrupt();}}/*** 向请求队列中添加一个请求。如果服务未关闭,则直接添加到请求队列中;* 如果服务已关闭,则将该请求作为一批请求发送。** @param request 要添加的请求字符串。*/public void addRequest(String request) throws InterruptedException {// 检查服务是否已关闭if (!isShutdown.get()) {// 未关闭,直接添加到请求队列requestQueue.put(request);} else {// 已关闭,将当前请求作为一批发送List<String> batch = new ArrayList<>();batch.add(request);sendRequestBatch(batch);}}
}

参考资料

https://gitee.com/huangjuncong/mumux-framework/tree/master/merge-request/src/main/java/com/mumux/concurrent

注意:此代码容易导致数据丢失。例如:调用add方法将10个元素放入队列,但是真正获取到9个元素。
造成原因:FlushThread#add()中使用offer方法将数据放入队列,如果此时队列已满,返回值为false,实际数据未进入队列,需要额外对数据进行处理。
修改建议:调大队列长度,并且将offer方法改为put方法,保证数据最终进入队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/823763.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Postman之版本信息查看

Postman之版本信息查看 一、为何需要查看版本信息&#xff1f;二、查看Postman的版本信息的步骤 一、为何需要查看版本信息&#xff1f; 不同的版本之间可能存在功能和界面的差异。 二、查看Postman的版本信息的步骤 1、打开 Postman 2、打开设置项 点击页面右上角的 “Set…

Java中的容器

Java中的容器主要包括以下几类&#xff1a; Collection接口及其子接口/实现类&#xff1a; List 接口及其实现类&#xff1a; ArrayList&#xff1a;基于动态数组实现的列表&#xff0c;支持随机访问&#xff0c;插入和删除元素可能导致大量元素移动。LinkedList&#xff1a;基…

只用键盘的技巧

技巧一&#xff1a;将常用软件固定在任务栏使用winnum/winT(shift)打开 技巧二&#xff1a;winX快捷键&#xff08;显示快捷键的快捷键&#xff09; ALT F4    关闭当前应用程序 技巧三&#xff1a;使用好Chrome快键键 ctrl h&#xff1b;历史纪录。 ctrl shift esc&am…

充电桩--OCPP 充电通讯协议介绍

一、OCPP协议介绍 OCPP的全称是 Open Charge Point Protocol 即开放充电点协议&#xff0c; 它是免费开放的协议&#xff0c;该协议由位于荷兰的组织 OCA&#xff08;开放充电联盟&#xff09;进行制定。Open Charge Point Protocol (OCPP) 开放充电点协议用于充电站(CS)和任何…

大模型开发轻松入门——(1)从搭建自己的环境开始

pip install openai import openai import osfrom dotenv import load_dotenv, find_dotenv _ load_dotenv(find_dotenv())openai.api_key os.getenv(OPENAI_API_KEY)

mac IDEA激活 亲测有效

1、官网下载mac版本IDEA并安装 2、打开激活页面 3、下载脚本文件 链接: https://pan.baidu.com/s/1I2BqdfxSJv1A96422rflnA?pwdm494 提取码: m494 4、命令行到该界面&#xff0c;执行 sudo bash idea.sh 可能出现的问题&#xff1a; 查看sh文件&#xff0c;targetFilePath…

vue快速入门(十六)事件修饰符

注释很详细&#xff0c;直接上代码 上一篇 新增内容 事件修饰符之阻止冒泡事件修饰符之阻止默认行为 源码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdev…

重定向原理和缓冲区

文章目录 重定向缓冲区 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站&#xff0c; 通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。 点击跳转到网站。 重定向 内核中为了管理被打开的文件&#xff0c;一定会存在描述一…

ESP8266闪存文件系统(SPIFFS)

开发环境&#xff1a; 1、安装ESP8266的开发环境&#xff0c;如Arduino IDE。 2、下载并安装ESP8266的相关开发库和工具。 我们使用的是Arduino IDE。 基本介绍&#xff1a; 每一个ESP8266都配有一个闪存&#xff0c;这个闪存很像是一个小硬盘&#xff0c;我们上传的文件就被…

MCU最小系统晶振模块设计

单片机的心脏&#xff1a;晶振 晶振模块 单片机有两个心脏&#xff0c;一个是8M的心脏&#xff0c;一个是32.768的心脏 8M的精度较低&#xff0c;所以需要外接一个32.768khz 为什么是8MHZ呢&#xff0c;因为内部自带的 频率越高&#xff0c;精度越高&#xff0c;功耗越大&am…

[Java EE] 多线程(二): 线程的创建与常用方法(下)

2.3 启动一个线程–>start() 之前我们已经看到了如何通过重写run()方法来创建一个线程对象,但是线程对象被创建出来并不意味着线程就开始运行了. 覆写run方法是给线程提供了所要做的事情的指令清单创建线程对象就是把干活的人叫了过来.而调用start方法,就是喊一声"行…

贪心法确定补水地点

贪心算法是一个简单有趣的算法&#xff0c;它总是做出当前看来最好的选择&#xff0c;每次的局部最优选择最终可以产生整体最优解或整体最优解的近似。本文将介绍如何用贪心法解决补水问题。 1. 补水问题 2升水可以走 k k k英里&#xff0c;水站可以把水补满为2升&#xff0c…

【五十四】【算法分析与设计】Manacher算法,Manacher算法作用,Manacher算法流程,Manacher算法证明,Manacher算法代码

Manacher算法作用 1. 给你一个字符串str&#xff0c;要你求这个字符串的最长回文子串的长度&#xff0c;或者求这个字符串的最长回文子串在str中开始位置的下标。 2. 暴力解法&#xff0c;中心扩散算法&#xff0c;时间复杂度O(N*2)。Manacher算法可以用O(N)解决这个问题。…

鸿蒙相关岗位需求突增!你具体知道都有哪些岗位吗?

1 月 18 日&#xff0c;鸿蒙 Next 预览版面向开发者正式开放申请。至此&#xff0c;鸿蒙原生应用版图已成型&#xff0c;这个中国自主研发的操作系统&#xff0c;正式走上了独立之路。 随后迎来了不少互联网公司与华为鸿蒙原生应用达成了合作&#xff0c;像我们常见的阿里、京…

【Android GUI】FramebufferNativeWindow与Surface

文章目录 显示整体体系FramebufferNativeWindowFramebufferNativeWindow构造函数 dequeueBufferSurface总结参考 显示整体体系 native window为OpenGL与本地窗口系统之间搭建了桥梁。 这个窗口系统中&#xff0c;有两类本地窗口&#xff0c;nativewindow1是能直接显示在屏幕的…

上班族副业指南:六种实用赚钱途径

在现今竞争激烈的社会中&#xff0c;许多上班族都选择开辟副业来增加收入与实现自我价值。副业不仅能够增强经济安全感&#xff0c;还能满足个人兴趣爱好&#xff0c;并为未来铺设更坚实的财务基石。本文将为你揭示六种适合上班族的副业选择&#xff0c;帮助你找到最适合自己的…

JookDB下载安装使用

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

CUDA优化入门

本文记录了我的cuda学习经历&#xff0c;和大多数人一样&#xff0c;通过优化矩阵乘法的过程来了解一些基本的概念。仓库链接&#xff1a; GiteeGithub Refences NVIIDA Fermi Architecture WhitepaperCUDA C Programming GuideCUDA C Best Practices Guide 其中Fermi架构是…

LINUX中使用cron定时任务被隐藏,咋回事?

一、问题现象 线上服务器运行过程中&#xff0c;进程有莫名进程被启动&#xff0c;怀疑是有定时任务自动启动&#xff0c;当你用常规方法去查看&#xff0c;比如使用crontab去查看定时器任务&#xff0c;提示no crontab for root 或者使用cat到/var/spool/cron目录下去查看定时…

反射

目录 01、Java反射机制概述1.1、使用反射&#xff0c;实现同上的操作、调用私有属性 02、理解Class类并获取Class实例2.1、Class类的理解2.2、获取Class实例的4种方式2.3、Class实例对应的结构的说明 03、ClassLoader的理解3.1、ClassLoader的理解3.2、使用ClassLoader加载配置…