通过Redisson构建延时队列并实现注解式消费

目录

  • 一、序言
  • 二、延迟队列实现
    • 1、Redisson延时消息监听注解和消息体
    • 2、Redisson延时消息发布器
    • 3、Redisson延时消息监听处理器
  • 三、测试用例
  • 四、结语

一、序言

两个月前接了一个4万的私活,做一个线上商城小程序,在交易过程中不可避免的一个问题就是用户下单后的订单自动取消。

目前成熟的方案有通过RabbitMQ+死信队列RabbitMQ+延迟消息插件RocketMQ定时消息推送Redisson延时队列来实现。

考虑到商城的定位和用户体量,以及系统维护成本,其实完全没有必要引入消息中间件,借助Redis其实就可以轻松实现这个需求。

加上Redisson客户端本身就已经实现了很多分布式集合工具类,借助阻塞队列和延时队列就可轻松搞定。

当然,为了使用方便以及团队协作,顺便模仿@RabbitListener封装了一套基于注解的消息消费,废话不多说,直接上代码。


二、延迟队列实现

1、Redisson延时消息监听注解和消息体

延迟消息监听器定义:

/*** Redisson延时队列监听器** @author Nick Liu* @date 2024/11/13*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedissonDelayedQueueListener {/*** 队列名称* @return*/String queueName();
}

消息体定义:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RedisDelayedMsgDTO {/*** 消息内容*/private String msg;/*** 队列名称*/private String queueName;/*** 延时时间*/private long delayTime;private TimeUnit timeUnit;
}

2、Redisson延时消息发布器

@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedMsgPublisher {private final RedissonClient redissonClient;/*** 发布延时信息* @param delayedMsgDTO*/public void publishDelayedMsg(RedisDelayedMsgDTO delayedMsgDTO) {log.info("开始发布延迟消息: {}", FastJsonUtils.toJsonString(delayedMsgDTO));RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(delayedMsgDTO.getQueueName());RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(delayedMsgDTO.getMsg(), delayedMsgDTO.getDelayTime(), delayedMsgDTO.getTimeUnit());}
}

这里我们借助RBlockingQueueRDelayedQueue来实现,只有当延迟消息快到期时,消费者才能从阻塞队列拉取到消息,否则消费者将一直阻塞。

3、Redisson延时消息监听处理器

这里我们定义了一个BeanPostProcessor 的实现,目的就是为了扫描Spring容器中所有带RedissonDelayedQueueListener注解的Bean实例和方法。

/*** Redisson延迟队列Bean后处理器* @author Nick Liu* @date 2025/1/3*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedQueuePostProcessor implements BeanPostProcessor {private final RedissonClient redissonClient;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 获取最终的目标运行时对象Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);Method[] methods = clazz.getDeclaredMethods();for (Method m : methods) {if (!m.isAnnotationPresent(RedissonDelayedQueueListener.class)) {continue;}// 如果Bean上的方法有Redisson队列监听注解,则启动一个线程监听队列RedissonDelayedQueueListener annotation = m.getAnnotation(RedissonDelayedQueueListener.class);CompletableFuture.runAsync(() -> {log.info("开始监听Redisson延时队列[{}]消息", annotation.queueName());while (true) {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(annotation.queueName());redissonClient.getDelayedQueue(blockingQueue);try {String msg = blockingQueue.take();MDC.put(CommonConst.X_REQUEST_ID, SerialNoUtils.generateSimpleUUID());log.info("监听到队列[{}]延时消息: {}", annotation.queueName(), msg);m.invoke(bean, msg);MDC.remove(CommonConst.X_REQUEST_ID);} catch (Exception e) {log.error(e.getMessage(), e);}}});}return bean;}}

这里我们扫描到指定Bean的方法后,会开启一个异步线程,并轮询拉取延时消息,如果消息没过期,异步线程将会一直阻塞等待。


三、测试用例

/*** @author Nick Liu* @date 2025/2/2*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class RedissonDelayedMsgController {private static final String DELAYED_QUEUE = "redisson:delayed:queue";private final RedissonDelayedMsgPublisher redissonDelayedMsgPublisher;@GetMapping("/delayed/msg")public ResponseEntity<RedisDelayedMsgDTO> publishDelayedMsg() {RedisDelayedMsgDTO redisDelayedMsgDTO = new RedisDelayedMsgDTO();redisDelayedMsgDTO.setQueueName(DELAYED_QUEUE);redisDelayedMsgDTO.setMsg("This is a delayed msg");redisDelayedMsgDTO.setDelayTime(10);redisDelayedMsgDTO.setTimeUnit(TimeUnit.SECONDS);redissonDelayedMsgPublisher.publishDelayedMsg(redisDelayedMsgDTO);return ResponseEntity.ok(redisDelayedMsgDTO);}@RedissonDelayedQueueListener(queueName = DELAYED_QUEUE)public void handleDelayedMsg(String msg) {log.info("Received delayed msg: {}", msg);}
}

启动服务后,Bean后处理器会启动异步线程监听延时消息,如下:

2025-02-02 16:46:04.271 INFO  [] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():44] - 开始监听Redisson延时队列[redisson:delayed:queue]消息

浏览器直接输入http://localhost:8000/delayed/msg发布延时消息,10s后消费者进行处理,如下:

2025-02-02 16:43:11.107 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():51] - 监听到队列[redisson:delayed:queue]延时消息: This is a delayed msg
2025-02-02 16:43:11.108 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [com.xlyj.contoller.RedissonDelayedMsgController.handleDelayedMsg():40] - Received delayed msg: This is a delayed msg

四、结语

虽说通过Redisson实现的延迟队列也能实现支付订单的自动取消,但是可用性相比专业的消息中间件还是尚有不足的。

比如消息生产者发送消息没有确认机制,消息消费也没有确认机制,这两个环节都有可能导致消息丢失。

当然我们可以通过其它保障机制去补偿,比如再加上定时任务扫表,把扫描时间可以设置长一点,保证最终的一致性。

在大型项目中还是优先推荐专业的消息中间件去实现延时消息消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/67792.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MVC 文件夹:架构之美与实际应用

MVC 文件夹:架构之美与实际应用 引言 MVC(Model-View-Controller)是一种设计模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种架构模式不仅提高了代码的可维护性和可扩展性,而且使得开发流程更加清晰。本文将深入探讨MVC文…

【PyQt】lambda函数,实现动态传递参数

为什么需要 lambda&#xff1f; 在 PyQt5 中&#xff0c;clicked 信号默认会传递一个布尔值&#xff08;表示按钮是否被选中&#xff09;。如果我们希望将按钮的文本内容传递给槽函数&#xff0c;需要通过 lambda 函数显式传递参数。 这样可以实现将按钮内容传递给槽函数&…

pytorch深度Q网络

人工智能例子汇总&#xff1a;AI常见的算法和例子-CSDN博客 DQN 引入了深度神经网络来近似Q函数&#xff0c;解决了传统Q-learning在处理高维状态空间时的瓶颈&#xff0c;尤其是在像 Atari 游戏这样的复杂环境中。DQN的核心思想是使用神经网络 Q(s,a;θ)Q(s, a; \theta)Q(s,…

Baklib构建高效协同的基于云的内容中台解决方案

内容概要 随着云计算技术的飞速发展&#xff0c;内容管理的方式也在不断演变。企业面临着如何在数字化转型过程中高效管理和协同处理内容的新挑战。为应对这些挑战&#xff0c;引入基于云的内容中台解决方案显得尤为重要。 Baklib作为创新型解决方案提供商&#xff0c;致力于…

DeepSeek-R1 论文. Reinforcement Learning 通过强化学习激励大型语言模型的推理能力

论文链接&#xff1a; [2501.12948] DeepSeek-R1: Incentivizing Reasoning Capability in LLMs via Reinforcement Learning 实在太长&#xff0c;自行扔到 Model 里&#xff0c;去翻译去提问吧。 工作原理&#xff1a; 主要技术&#xff0c;就是训练出一些专有用途小模型&…

C++泛型编程指南03-CTAD

文章目录 C17 自定义类型推断指引&#xff08;CTAD&#xff09;深度解析一、基础概念1. 核心作用2. 工作原理 二、标准库中的 CTAD 应用1. 容器类型推导2. 智能指针推导3. 元组类型推导 三、自定义推导指引语法1. 基本语法结构2. 典型应用场景 四、推导指引设计模式1. 迭代器范…

deepseek+vscode自动化测试脚本生成

近几日Deepseek大火,我这里也尝试了一下,确实很强。而目前vscode的AI toolkit插件也已经集成了deepseek R1,这里就介绍下在vscode中利用deepseek帮助我们完成自动化测试脚本的实践分享 安装AI ToolKit并启用Deepseek 微软官方提供了一个针对AI辅助的插件,也就是 AI Toolk…

电介质超表面中指定涡旋的非线性生成

涡旋光束在众多领域具有重要应用&#xff0c;但传统光学器件产生涡旋光束的方式限制了其在集成系统中的应用。超表面的出现为涡旋光束的产生带来了新的可能性&#xff0c;尤其是在非线性领域&#xff0c;尽管近些年来已经有一些研究&#xff0c;但仍存在诸多问题&#xff0c;如…

基于Springboot+mybatis+mysql+html图书管理系统2

基于Springbootmybatismysqlhtml图书管理系统2 一、系统介绍二、功能展示1.用户登陆2.用户主页3.图书查询4.还书5.个人信息修改6.图书管理&#xff08;管理员&#xff09;7.学生管理&#xff08;管理员&#xff09;8.废除记录&#xff08;管理员&#xff09; 三、数据库四、其它…

重构字符串(767)

767. 重构字符串 - 力扣&#xff08;LeetCode&#xff09; 解法&#xff1a; class Solution { public:string reorganizeString(string s){string res;//因为1 < s.length < 500 &#xff0c; uint64_t 类型足够uint16_t n s.size();if (n 0) {return res;}unordere…

本地部署DeepSeek方法

本地部署完成后的效果如下图&#xff0c;整体与chatgpt类似&#xff0c;只是模型在本地推理。 我们在本地部署主要使用两个工具&#xff1a; ollamaopen-webui ollama是在本地管理和运行大模型的工具&#xff0c;可以直接在terminal里和大模型对话。open-webui是提供一个类…

游戏引擎 Unity - Unity 启动(下载 Unity Editor、生成 Unity Personal Edition 许可证)

Unity Unity 首次发布于 2005 年&#xff0c;属于 Unity Technologies Unity 使用的开发技术有&#xff1a;C# Unity 的适用平台&#xff1a;PC、主机、移动设备、VR / AR、Web 等 Unity 的适用领域&#xff1a;开发中等画质中小型项目 Unity 适合初学者或需要快速上手的开…

【开源免费】基于Vue和SpringBoot的公寓报修管理系统(附论文)

本文项目编号 T 186 &#xff0c;文末自助获取源码 \color{red}{T186&#xff0c;文末自助获取源码} T186&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

Haskell语言的多线程编程

Haskell语言的多线程编程 Haskell是一种基于函数式编程范式的编程语言&#xff0c;以其强大的类型系统和懒惰求值著称。近年来&#xff0c;随着多核处理器的发展&#xff0c;多线程编程变得日益重要。虽然Haskell最初并不是为了多线程而设计&#xff0c;但它的设计理念和工具集…

《苍穹外卖》项目学习记录-Day11订单统计

根据起始时间和结束时间&#xff0c;先把begin放入集合中用while循环当begin不等于end的时候&#xff0c;让begin加一天&#xff0c;这样就把这个区间内的时间放到List集合。 查询每天的订单总数也就是查询的时间段是大于当天的开始时间&#xff08;0点0分0秒&#xff09;小于…

【python】python油田数据分析与可视化(源码+数据集)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;专__注&#x1f448;&#xff1a;专注主流机器人、人工智能等相关领域的开发、测试技术。 【python】python油田数据分析与可视化&#xff08…

FBX SDK的使用:基础知识

Windows环境配置 FBX SDK安装后&#xff0c;目录下有三个文件夹&#xff1a; include 头文件lib 编译的二进制库&#xff0c;根据你项目的配置去包含相应的库samples 官方使用案列 动态链接 libfbxsdk.dll, libfbxsdk.lib是动态库&#xff0c;需要在配置属性->C/C->预…

【单层神经网络】基于MXNet库简化实现线性回归

写在前面 同最开始的两篇文章 完整程序及注释 导入使用的库# 基本 from mxnet import autograd, nd, gluon # 模型、网络 from mxnet.gluon import nn from mxnet import init # 学习 from mxnet.gluon import loss as gloss # 数据集 from mxnet.gluon…

【爬虫】JS逆向解决某药的商品价格加密

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ 🐴作者:秋无之地 🐴简介:CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作,主要擅长领域有:爬虫、后端、大数据开发、数据分析等。 🐴欢迎小伙伴们点赞👍🏻、收藏⭐️、…

OpenAI开源战略反思:中国力量推动AI产业变革

在周五的Reddit问答会上&#xff0c;OpenAI首席执行官Sam Altman罕见承认公司正面临来自中国科技企业的强劲挑战。这位向来强硬的硅谷领军者坦言&#xff0c;以深度求索&#xff08;DeepSeek&#xff09;为代表的中国AI公司正在改写行业游戏规则。 这场历时三小时的对话揭示了…