RocketMQ 延迟消息

RocketMQ 延迟消息

RocketMQ 消费者启动流程

什么是延迟消息

RocketMQ 延迟消息是指,生产者发送消息给消费者消息,消费者需要等待一段时间后才能消费到。

使用场景

用户下单之后,15分钟未支付,对支付账单进行提醒或者关单处理。

RocketMQ 开源版本的消息不支持任意时间精度,只支持5s 10s 1m等等。

Broker 如何处理延迟消息

消息投递如下:

  1. 生产者发送一个延迟消息到一个 topic
  2. Broker 判断是个延迟消息后,将消息暂存
  3. Broker 通过延迟服务, 先检查消息是否过期,如果到期将消息投递到目标 topic
  4. 消费者消费topic中的投递延迟消息。

开源RocketMQ 的消息不支持任意精度,默认支持 18个 level:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker 在启动的时候,会创建一个内部 topic:“SCHEDULE_TOPIC_XXXX” 根据延迟 level 数量,创建对应数量的 队列。 也就是说 18 level 对应了18 个队列。

具体可以在 代码TopicConfigManager.java 中 看到:

private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;

要注意的是,Broker 一般是集群模式
部署,也就是说,每个Broker 都会有18个队列。

TopicConfigManager#TopicConfigManager(BrokerController brokerController)

生产者消息延迟发送

代码示例如下:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);

Broker 存储延迟消息

上一篇文章已经谈到,Broker 收到消费者消息后,会进行消息存储,然后再转发到消费队列(ConsumerQueue),然后再推给消费者。

其实一旦消息转发到

存储延迟消息的流程也类似

  1. 确定延迟消息投递到topic 哪个队列。存储生产者写入的消息时,将消息转发到 ConsumeQueue 中,消费者就能消费到。 延迟消息不能立即消息到,于是将 topic 名称修改为 SCHEDULE_TOPIC_XXX,并根据延迟消息级别,确定投递到哪个队列上。同时还会将原来消息要发送到的目标 topic 和队列记录投递到哪个队列。

代码在CommitLog#asyncPutMessage 中

设置延迟消息的投递队列信息代码如下:

 // Delay Deliveryif (msg.getDelayTimeLevel() > 0) {// 如果设置的级别超过了最大级别,重置延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
// 计算延迟消息应该投递到 SCHEDULE_TOPIC_XXXX 到哪个队列。topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 记录原始 topic ,queueid,方便后期投递到目标 topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新消息投递目标为 SCHEDULE_TOPIC_XXX,queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}

消息转发

消息转发过程其实中会对延迟消息做一些特殊处理

CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/35461.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PostgreSQL查询慢sql原因和优化方案

PostgreSQL sql查询慢优化方案有一下几种解决方案: 1.关闭会话 查询慢sql的执行会话,关闭进程。 查看数据库后台连接进程 SELECT count(*) FROM pg_stat_activity;SELECT * FROM pg_stat_activity; 查看数据库后台连接进程,但是此条SQL不…

python提取pdf图片

import fitz import re import osdef save_pdf_img(path, save_path):path: pdf的路径save_path : 图片存储的路径# 使用正则表达式来查找图片checkXO r"/Type(? */XObject)"checkIM r"/Subtype(? */Image)"# 打开pdfdoc fitz.open(path)# 图片计数im…

用HARU-Net增强核分割:一种基于混合注意的残差u块网络

文章目录 Enhancing Nucleus Segmentation with HARU-Net: A Hybrid Attention Based Residual U-Blocks Network摘要本文方法损失函数后处理消融实验 Enhancing Nucleus Segmentation with HARU-Net: A Hybrid Attention Based Residual U-Blocks Network 摘要 核图像分割是…

W6100-EVB-PICO 做TCP Server进行回环测试(六)

前言 上一章我们用W6100-EVB-PICO开发板做TCP 客户端连接服务器进行数据回环测试,那么本章将用开发板做TCP服务器来进行数据回环测试。 TCP是什么?什么是TCP Server?能干什么? TCP (Transmission Control Protocol) 是一种面向连…

zabbix监控安装部署

目录 一、环境 二、配置 1.配置yum源,这里用的清华的 2.过滤一下安装包,查看依赖包 安装依赖包 3.配置数据库 开机自启 创建数据库 创建用户 授权 导入数据到数据库 查看zabbix数据库有没有表和数据 4.修改zabbix配置文件 1.修改zabbix配置…

去趋势化一个心电图信号、信号功率谱、低通IIR滤波器并平滑信号、对滤波器引起的延迟进行补偿研究(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

SPM实现framework自动管理和分发

一、前言 Swift Package Manager (SPM) 是苹果官方提供的用于管理 Swift 项目的依赖关系和构建过程的工具。它是一个集成在 Swift 编程语言中的包管理器,用于解决在开发过程中管理和构建包依赖项的需求。 那么如何使用SPM管理和分发Objective C编写的二进制库呢&a…

HOT86-单词拆分

leetcode原题链接:单词拆分 题目描述 给你一个字符串 s 和一个字符串列表 wordDict 作为字典。请你判断是否可以利用字典中出现的单词拼接出 s 。注意:不要求字典中出现的单词全部都使用,并且字典中的单词可以重复使用。 示例 1&#xff1a…

不同路径 II——力扣63

class Solution {public:int uniquePathsWithObstacles(vector<vector<int>>& obstacleGrid) {int n=

一键登录是如何在登录方式中脱颖而出的?

首先&#xff0c;我们先了解一下登录方式的演变过程&#xff0c;大致可以分为三个阶段。分别是账号密码登录、短信验证码登录和一键登录。 阶段一&#xff1a;账号密码登录 账号密码登录是一种常见的用户身份验证方式&#xff0c;用户需要输入一个唯一的账号和对应的密码来登…

【APITable】教程:创建并运行一个自建小程序

1.进入APITable&#xff0c;在想要创建小程序的看板页面点击右上角的【小程序】&#xff0c;进入小程序编辑页面。 2.创建一个新的小程序区。 点击【 添加小程序】 点击创建小程序&#xff0c;选择模板&#xff0c;输入名字。 3.确定后进入小程序部署引导页面。 4.打开Xshell 7…

初识鸿蒙跨平台开发框架ArkUI-X

HarmonyOS是一款面向万物互联时代的、全新的分布式操作系统。在传统的单设备系统能力基础上&#xff0c;HarmonyOS提出了基于同一套系统能力、适配多种终端形态的分布式理念&#xff0c;能够支持手机、平板、智能穿戴、智慧屏、车机等多种终端设备&#xff0c;提供全场景&#…

99. for循环练习题-3种方式输出0-9

【目录】 文章目录 99. for循环练习题-3种方式输出0-91. for循环和while循环的区别2. 输出 0~(n-1)的数字2.1 基础代码2.2 自定义函数代码2.3 异常处理语句代码 【正文】 99. for循环练习题-3种方式输出0-9 1. for循环和while循环的区别 for循环和while循环都用于重复执行特定…

Linux一些常见的命令

1. 基础命令 1. ls&#xff1a; 列出目录内容。- 例如&#xff1a;ls -l 以长格式列出文件和目录。2. cd&#xff1a; 切换工作目录。- 例如&#xff1a;cd /home/user 进入 /home/user 目录。3. pwd&#xff1a; 显示当前工作目录的路径。4. mkdir&#xff1a; 创建新目录。-…

flink-对齐和不对齐,精准一次和至少一次

精准一次怎么保证&#xff1f;可以设置为以下2个 对齐 当有一个barrier比较快时&#xff0c;输入缓冲区阻塞&#xff0c;当另外一个barrier到来时&#xff0c;才进行备份&#xff0c;所以数据不会重复。优点&#xff1a;不会造成数据重复缺点&#xff1a;会造成数据积压&#x…

ChatGPT Plus和ChatGPT对比

模型规模更大&#xff0c;参数数量超过6万亿&#xff0c;比ChatGPT大很多训练数据更丰富&#xff0c;包括不同语言、领域和类型的数据语言理解和生成能力更强&#xff0c;能够更准确地理解和生成文本可解释性和可控性更好&#xff0c;支持更多的调参和控制参数&#xff0c;生成…

uni-app和springboot完成前端后端对称加密解密流程

概述 使用对称加密的方式实现。前端基于crypto-js。uni-app框架中是在uni.request的基础上&#xff0c;在拦截器中处理的。springboot在Filter中完成解密工作。 uni-app 项目中引入crypto-js。 npm install crypto-js加密方法 const SECRET_KEY CryptoJS.enc.Utf8.parse(…

最强自动化测试框架Playwright(20)- iframe

一个页面可以附加一个或多个 Frame 对象。每个页面都有一个主框架&#xff0c;并且假定页面级交互&#xff08;如&#xff09;在主框架中运行。click frame_locator 使用 iframe 时&#xff0c;可以创建一个框架定位器&#xff0c;该定位器将进入 iframe 并允许选择该 iframe…

idea模板的使用(配置xml文件模板)

1. 问题的引出 我们在日常项目中可以发现&#xff0c;sql映射文件和mybatis主配置文件&#xff0c;以及application.yml文件中有很多固定不变的内容&#xff0c;为了方面使用&#xff0c;所以可以把这些xml文件设置为模板 2. 创建模板的步骤 按照图片一步一步进行即可 点击…

gcc编译选项之预处理向源码传参和条件编译

一、是什么? 预处理:是指在进行加工前准备工作. gcc 选项 文件名字 二、使用步骤 1.向源码传参 gcc -save-temps -DSENSOR_TYPE=SONY_IMX477_MIPI_8M_30FPS_12BIT hello.c -o hello 代码如下(示例): #include <stdio.h> #include <stdlib.h>typedef enum …