RocketMQ源码 发送顺序消息源码分析

前言

rocketmq 发送顺序消息和普通消息的主流程区别大部分一致的,区别在于:普通消息发送时,从所有broker的队列集合中 轮询选择一个队列,而顺序队列可以提供用户自定义消息队列选择器,从NameServer 分配的顺序 broker集合中选择一个队列。

源码版本:4.9.3

源码架构图

源码分析

发送普通消息源码在另外一篇文章https://blog.csdn.net/hzwangmr/article/details/135411495,这里主要阅读和普通消息有差异的部分。

顺序消息源码入口

org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.MessageQueueSelector, java.lang.Object)

可以看到系统提供了一个 MessageQueueSelector 消息队列选择器,用于自定义选择队列的逻辑。

    /*** Same to {@link #send(Message)} with message queue selector specified.** @param msg Message to send.* @param selector Message queue selector, through which we get target message queue to deliver message to.* @param arg Argument to work along with message queue selector.* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.* @throws MQClientException if there is any client error.* @throws RemotingException if there is any network-tier error.* @throws MQBrokerException if there is any error with broker.* @throws InterruptedException if the sending thread is interrupted.*/@Overridepublic SendResult send(Message msg, MessageQueueSelector selector, Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg, selector, arg);}

触发队列选择器源码

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl

这里可以看到在调用发送消息核心 sendKernelImpl() 方法之前,会调用 selector.select() 函数,执行我们自定的选择逻辑。那么自定义的逻辑具体是什么呢?接着往下看

    private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);// 获取topic路由数据TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {// 解析发布消息队列List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());// 克隆消息Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);// 利用消息队列选择器,选择一个队列mq = mQClientFactory.getClientConfig().queueWithNamespace(// 自定义选择队列selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 如果选择出来的 MessageQueue 存在,这调用核心发送消息函数,发送消息return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}

源码工程自带发送顺序消息实例

可以看到,工程自带的发送顺序消息的example实例,针对 id相同的数据,选择了相同的消息队列,这样对于同一个实体的数据变化一定是有顺序的。

public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 自定义选择逻辑,可以理解为将相同的 orderId订单id,投递到相同的队列中Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}
}

其他

topic对应的顺序队列怎么来的?

mq生产者客户端,在发送消息前,会从 NameServer中读取消息队里指定 topic对应的 topic路由信息,然后写到转换和缓存在内存多个数据结构里。其中,有一处就是下方的将 topic路由数据转换为topic 已发布信息,这里写入了messageQueueList (顺序消息待选择队列)。

转换逻辑是:

  1. 将路由信息写入topic发布信息;
  2. 判断当前路由信息是不是顺序 topic 配置;
  3. 解析顺序消息 topic配置;
  4. 遍历 broker,遍历broker 队列数量;
    1. 封装消息队列数据。topic -> broker ->队里id;
  5. 顺序队列数据,添加到 MessageQueueList 集合中;
    public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {TopicPublishInfo info = new TopicPublishInfo();// 写入路由数据info.setTopicRouteData(route);// 顺序消息配置,顺序消息分配了多少个brokerif (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {// 是顺序消息// 解析顺序消息topic配置String[] brokers = route.getOrderTopicConf().split(";");// 遍历顺序 broker,从namersrv拿到的broker数量for (String broker : brokers) {String[] item = broker.split(":");// 特定broker中的队列数量int nums = Integer.parseInt(item[1]);for (int i = 0; i < nums; i++) {// 分装队列数据,topic--brokerName--队列idMessageQueue mq = new MessageQueue(topic, item[0], i);info.getMessageQueueList().add(mq);}}// 是顺序topic,打标info.setOrderTopic(true);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/602890.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用JavaScript实现动态生成并管理购物车的深入解析

一、引言 在当前的互联网时代&#xff0c;电子商务已成为我们日常生活的重要组成部分。购物车作为电子商务网站的核心功能之一&#xff0c;其实现方式对于用户体验至关重要。本文将深入探讨如何使用JavaScript实现一个动态生成并管理购物车的功能&#xff0c;并详细介绍其实现…

Linux Shell数学运算与条件测试

一、Shell数学运算 1.Shell常见的算术运算符号 序号算术运算符号意义1、-、*、/、%加、减、乘、除、取余2**幂运算3、–自增或自减4&&、||、&#xff01;与、或、非5、!相等、不相等&#xff0c;也可写成6、、-、*、/、%赋值运算符&#xff0c;a1相等于aa1 2.Shell常…

.NET Standard 支持的 .NET Framework 和 .NET Core

.NET Standard 是针对多个 .NET 实现推出的一套正式的 .NET API 规范。 推出 .NET Standard 的背后动机是要提高 .NET 生态系统中的一致性。 .NET 5 及更高版本采用不同的方法来建立一致性&#xff0c;这种方法在大多数情况下都不需要 .NET Standard。 但如果要在 .NET Framewo…

QT 高DPI解决方案

一、根据DPI实现动态调整控件大小&#xff08;三种方式&#xff09; 1、QT支持高DPI&#xff08;针对整个进程中所有的UI&#xff09; // main函数中 QApplication::setAttribute(Qt::AA_EnableHighDpiScaling)tips&#xff1a;&#xff08;1&#xff09;如果不想全局设置&am…

Nodejs搭配axios下载图片

新建一个文件夹&#xff0c;npm i axios 实测发现只需保留node_modules文件夹&#xff0c;删除package.json不影响使用 1.纯下载图片 其实该方法不仅可以下载图片&#xff0c;其他的文件都可以下载 const axios require(axios) const fs require(fs) var arrPic [https:…

最大输出 18W,集成 Type-C PD 输出和各种快充输出协议

一、产品简介 IP6510是一款集成同步开关的降压转换器、支持 9 种输出快充协议、支持 Type-C 输出和 USB PD协议&#xff0c;为车载充电器、快充适配器、智能排插提供完整的解决方案。 IP6510 内置功率 MOS&#xff0c;输入电压范围是 4.5V到 32V&#xff0c;输出电压范围是 3…

案例101:基于微信小程序的停车共享管理系统设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

一个命令查看linux系统是Centos还是Ubuntu

目 录 一、 背景介绍 二、一个命令查看linux系统的简单方法 1、 uname -a 2、cat /etc/issue 3、lsb_release -a 4、 dmesg | grep Ubuntu 一、 背景介绍 Linux 系统基本上分为两大类&#xff1a; 1. Red Hat 系列&#xff1a;包括 Red Ha…

使用pytorch-superpoint与pytorch-superglue项目实现训练自己的数据集

superpoint与superglue的组合可以实现基于深度学习的图像配准,官方发布的superpoint与superglue模型均基于coco数据训练,与业务中的实际数据或许存在差距,为此实现基于开源的pytorch-superpoint与pytorch-superglue项目实现训练自己的数据集。然而,在训练pytorch-superpoin…

电缆线标书:打造高质量工程的关键一步

电缆线标书制作是工程项目中至关重要的一环&#xff0c;它不仅仅是一份文件&#xff0c;更是对工程质量和实施过程的全面规划和控制。在电缆线标书中&#xff0c;涉及到的内容十分丰富&#xff0c;包括但不限于工程概况、技术要求、材料清单、施工方案、质量控制等多个方面。 …

【LMM 012】TinyGPT-V:24G显存训练,8G显存推理的高效多模态大模型

论文标题&#xff1a;TinyGPT-V: Efficient Multimodal Large Language Model via Small Backbones 论文作者&#xff1a;Zhengqing Yuan, Zhaoxu Li, Lichao Sun 作者单位&#xff1a;Anhui Polytechnic University, Nanyang Technological University, Lehigh University 论文…

AI:118-基于深度学习的法庭口译实时翻译

🚀点击这里跳转到本专栏,可查阅专栏顶置最新的指南宝典~ 🎉🎊🎉 你的技术旅程将在这里启航! 从基础到实践,深入学习。无论你是初学者还是经验丰富的老手,对于本专栏案例和项目实践都有参考学习意义。 ✨✨✨ 每一个案例都附带有在本地跑过的关键代码,详细讲解供…

VMware中删除虚拟机

虚拟机使用完成后&#xff0c;需要删除虚拟机如何操作呢&#xff1f; 1.首先进入VMware 2.选择需要删除的虚拟机&#xff0c;点击右键 3.直接选择“移除”&#xff1f; 当然不是&#xff0c;这只是从这么目录显示中去掉了&#xff0c;并非 “真正” 删除该虚拟机 注意&#x…

正交投影矩阵与透视投影矩阵的推导

正交投影矩阵 正交投影矩阵的视锥体是一个长方体 [ l , r ] [ b , t ] [ f , n ] [l,r][b,t][f,n] [l,r][b,t][f,n]&#xff0c;我们要把这个长方体转换到一个正方体 [ − 1 , 1 ] [ − 1 , 1 ] [ − 1 , 1 ] [-1,1][-1,1][-1,1] [−1,1][−1,1][−1,1]中&#xff0c;如下图所…

机器学习--ROC AUC

参考 机器学习-ROC曲线 - 知乎 (zhihu.com)https://zhuanlan.zhihu.com/p/347470776一文看懂ROC、AUC - 知乎 (zhihu.com)https://zhuanlan.zhihu.com/p/81202617 在了解之前&#xff0c;我们先来认识一下以下的概念 针对一个二分类问题&#xff0c;将实例分成正类(postive)或…

常见算法(JavaScript版)

持续更新中… 目录 排序冒泡排序选择排序插入排序希尔排序快速排序&#xff08;必须掌握&#xff09;优化枢纽选择 堆排序归并排序 查找算法二分查找 排序 假设以下所有排序都是升序 快速排序在大部分情况下是效率最高的&#xff0c;所以笔试的时候要求写排序算法&#xff0…

SSR 服务器端渲染:提升用户体验的新趋势(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

Java学习,一文掌握Java之SpringBoot框架学习文集(5)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

物联网的感知层、网络层与应用层分享

物联网的概念在很早以前就已经被提出&#xff0c;20世纪末期在美国召开的移动计算和网络国际会议就已经提出了物联网(Internet of Things)这个概念。 最先提出这个概念的是MIT Auto-ID中心的Ashton教授&#xff0c;他在研究RFID技术时&#xff0c;便提出了结合物品编码、互联网…

CMU15-445-Spring-2023-Project #1 - Buffer Pool

前置知识&#xff0c;参考上一篇博客&#xff1a;CMU15-445-Spring-2023-Project #1 - 前置知识&#xff08;lec01-06&#xff09; 在存储管理器中实现缓冲池。缓冲池负责将物理页从主内存来回移动到磁盘。它允许 DBMS 支持大于系统可用内存量的数据库。缓冲池的操作对系统中的…