从零开始读RocketMq源码(二)Message的发送详解

目录

前言

准备

消息发送方式

深入源码

消息发送模式

选择发送方式

同步发送消息

校验消息体

获取Topic订阅信息

高级特性-消息重投

选择消息队列-负载均衡

装载消息体发送消息

压缩消息内容

构造发送message的请求的Header

更新broker故障信息

异步发送消息

总结


前言

上一篇我们已经对RocketMq生产者启动源码进行了学习《从零开始读RocketMq源码(一)生产者启动》那么本篇我们将对生产者发送消息的源码进行学习

准备

如果没看前一篇的,这里还是要强调本篇的rocketmq版本

首先我们从github上拉取rocketmqd的源码链接到本地,使用idea打开。

源码地址:https://github.com/apache/rocketmq

目前最新版本为:5.2.0

那么我们在idea上切换分支为 release-5.2.0

注:请保持和本篇的版本一直,方便后面文章中给出的代码块定位

消息发送方式

在读源码之前我们先了解下mq支持的发送消息的类型。

消息的发送方式有三种,但我们最常用的是同步的方式发送

  • sync 同步:消息发送后,必须等待消息的发送结果返回后,才能发送下一条消息
  • async 异步:消息发送后,不用等待返回结果,直接发送下一条数据,但会设置一个回调方法接收返回结果
  • oneway 单向:消息发送后,不会返回结果,也不会等待,也不会设置回调方法。适用场景日志收集、监控数据和快速通知等对可靠性要求不高但需要高性能的场景

深入源码

首先进入外层的producer.send()方法中

//源码位置:
//包名:org.apache.rocketmq.example.simple
//文件名:Producer
//行数:42
SendResult sendResult = producer.send(msg);

消息发送模式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:431
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));//批量发送if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {return sendByAccumulator(msg, null, null);} else {//单条发送return sendDirect(msg, null, null);}
}
  1. 自动批处理发送 -sendByAccumulator()
  • 该方法用于将消息累积到一个批处理容器中,等待足够的消息数量或达到某个时间间隔后,再进行批量发送。
  • 可以显著减少发送次数,提高吞吐量。

     2. 直接发送 -sendDirect()

  • 适用于即时发送或消息已经是批处理消息的情况

本章的重点就是直接发送消息,这也是开发中使用最频发的方式

选择发送方式

//源码位置:
//包名:org.apache.rocketmq.client.producer
//文件名:DefaultMQProducer
//行数:720
public SendResult sendDirect(Message msg, MessageQueue mq,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// send in sync modeif (sendCallback == null) {if (mq == null) {//同步不指定队列return this.defaultMQProducerImpl.send(msg);} else {//同步指定队列return this.defaultMQProducerImpl.send(msg, mq);}} else {if (mq == null) {//异步不指定队列this.defaultMQProducerImpl.send(msg, sendCallback);} else {//异步指定队列this.defaultMQProducerImpl.send(msg, mq, sendCallback);}return null;}
}

有上面代码可以知道,方法中提供了三个参数设置:

  • msg :消息体,这个为必填项
  • sendCallback :消息回调对象,如果这个参数不为空,则为异步发送,为空则为同步发送
  • mq :指定的队列(指定与不指定的区别在于后续是否需要对队列负载均衡,下面源码中会讲到)

根据最开始生产者发送消息,我们只传入了msg,所以本次重点看同步不指定队列代码实现

同步发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1525
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

跟踪代码我们可以看到,方法中我们默认设置了CommunicationMode.SYNC 同步发送模式,并且回调参数为空,以及设置了默认超时时间3s

校验消息体

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:704
Validators.checkMessage(msg, this.defaultMQProducer);

该方法就是校验消息内容是否合规

  • 校验消息内容是否不为空,消息大小是否超过最大值maxMessageSize = 1024 * 1024 * 4; // 4M
  • 校验消息发送的topic是否为不为空,以及topic的长度是否超过默认最长值127

获取Topic订阅信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:709
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

该方法通过消息体中的topic名称获取topic的订阅信息,该方法在我们上一篇生产者启动中已经出现过了,深入方法内部其实就是先从本地topicPublishInfoTable map中获取数据,没有则从远程nameserver中拉取

高级特性-消息重投

这是rocketMq中一个重要的特性,消息如果投递失败了,会重新投递

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:715
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

这段代码就是获取总过重投的次数:

不难看出,只有发送方式为同步发送时才为1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() =3次,其余发送方式都只有一次机会。

只有同步发送消息才支持消息重投,如果第一次投递失败了,mq还回重试2次投递

找到上面源码位置往下看,其实可以看到下面代码就是使用了一个for循环来进行重投

选择消息队列-负载均衡

通过上面我们知道,最开始并没有指定队列,所以需要程序来获取一个队列。

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:724
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);

因为自动创建的topic,会被默认分配4个队列(生产环境为手动创建topic以及设置队列数量),所以我们必须使用负载均衡保证队列的合理分配到不同队列上,减轻单个队列的压力

  • topicPublishInfo:为消息发送到指定topic的订阅信息
  • lastBrokerName :为上一次选择的broker名称(如果在集群模式下,topic也会存在于多个broker上,因此记录上一次选择的broker名称可以避免连续选择同一个 Broker,从而实现更好的负载均衡和容错处理
  • resetIndex :重置队列索引位置(根据源码逻辑可知,当消息进行重新投递时会重置topic订阅消息中队列的索引位置)

深入上面源码会发现,队列负载均衡的算法获取索引策略默认就是轮询

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:TopicPublishInfo
//行数:101
int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size());

负载均衡策略

  1. 轮询策略 (Round-Robin)
  2. 随机策略 (Random)
  3. 一致性哈希策略 (Consistent Hashing)
  4. 权重随机策略 (Weighted Random)
  5. 最少连接策略 (Least Connections)

装载消息体发送消息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:740
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

该方法就是发现消息的核心方法了,不管是同步发送还是异步发送都会执行该方法

做一些发送消息前的准备,接下深入该方法查看

压缩消息内容

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:898
if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;
}
  • 首先判断消息是否大于4k( compressMsgBodyOverHowmuch = 1024 * 4),大于则进行压缩,小于则不处理
//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1070
byte[] data = compressor.compress(body, compressLevel);
  • 传入消息体以及压缩的等级,这里大佬们提供了三种压缩实现,分别基于三种不同的压缩框架

在我们日常工作中,如果需要压缩内容,也可以参考大佬们的实现,学习源码不仅仅是了解框架的本身,也要吸取优秀的地方合理运用

构造发送message的请求的Header

message是Producer发送给Broker的一个请求,我们可以把内容抽象成两部分组成:请求头请求体

  • 请求体就是消息本身数据
  • 请求头 SendMessageRequestHeader 则包含了各种必要的数据,比如topicmessaeQueue等等,更多可直接查看请求头对象源码

最后就是使用基于netty实现的远程调用发送消息到broker中

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:1016
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);

更新broker故障信息

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:742
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);

程序执行到这个位置,说明前面消息发送的流程全部执行完成了,那么我们也知道了消息发送的结果,从而知道broker服务的状态情况,我们需要把当前的broker故障情况更新到 faultItemTable 本地map中,供后续对broker服务的故障规避faultItemTable 该map在前一篇生产者启动中也提到过。

异步发送消息

选择发送方式代码中当sendCallback!=null时则进入异步发送消息

跟踪源码我们可知,异步发送其实就是创建了一个单独的线程,使用Runnable对象实现,因为会返回一个执行结果

//源码位置:
//包名:org.apache.rocketmq.client.impl.producer
//文件名:DefaultMQProducerImpl
//行数:550
Runnable runnable = new Runnable() {@Overridepublic void run() {long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {sendDefaultImpl(msg, CommunicationMode.ASYNC, newCallBack, timeout - costTime);} catch (Exception e) {newCallBack.onException(e);}} else {newCallBack.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}executeAsyncMessageSend(runnable, msg, newCallBack, timeout, beginStartTime);
};
  • sendDefaultImpl() 该方法就是和同步发送调用的同一个了,唯一区别就是类型 CommunicationMode.ASYNC 和存在回调方法newCallBack
  • executeAsyncMessageSend() 执行异步消息发送

总结

本篇对生产者发送消息源码进行了跟踪学习,你是否也有所收获呢。下一篇我们将对rocketMq的核心组件Broker进行源码解读,Broker负责接收和存储消息,管理消息队列,并将消息分发给消费者, 是担任连接生产者和消费者,确保消息的高效传输和存储,保证系统的可靠性和性能的重要角色。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/42188.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Open3D KDtree的建立与使用

目录 一、概述 1.1kd树原理 1.2kd树搜索原理 1.3kd树构建示例 二、常见的领域搜索方式 2.1K近邻搜索(K-Nearest Neighbors, KNN Search) 2.2半径搜索(Radius Search) 2.3混合搜索(Hybrid Search) …

ai native 模型微调

AI native 模型微调(fine-tuning)是指在预训练模型的基础上,通过对其参数进行进一步训练,使其在特定任务上表现更佳。以下是关于模型微调的一些基本步骤和概念: ### 1. 准备数据集 - **数据收集**:收集适用…

后端之路——登录校验前言(Cookie\ Session\ JWT令牌)

前言:Servlet 【登录校验】这个功能技术的基础是【会话技术】,那么在讲【会话技术】的时候必然要谈到【Cookie】和【Session】这两个东西,那么在这之前必须要先讲一下一个很重要但是很多人都会忽略的一个知识点:【Servlet】 什么是…

Oracle PL/SQL 循环批量执行存储过程

1. 查询存储过程 根据数据字典USER_OBJECTS查询出所有存储过程。 2. 动态拼接字符串(参数等) 根据数据字典USER_ARGUMENTS动态拼接参数。 3. 动态执行 利用EXECUTE IMMEDIATE动态执行无名块。 4. 输出执行信息 利用DBMS_OUTPUT.PUT_LINE输出执行成功与…

Android Gradle 开发与应用 (十): Gradle 脚本最佳实践

目录 1. 使用Gradle Kotlin DSL 1.1 什么是Gradle Kotlin DSL 1.2 迁移到Kotlin DSL 1.3 优势分析 2. 优化依赖管理 2.1 使用依赖版本管理文件 2.2 使用依赖分组 3. 合理使用Gradle插件 3.1 官方插件和自定义插件 3.2 插件管理的最佳实践 4. 任务配置优化 4.1 使用…

Oracle 19c 统一审计表清理

zabbix 收到SYSAUX表空间告警超过90%告警,最后面给出的清理方法只适合ORACLE 统一审计表的清理,传统审计表的清理SYS.AUD$不适合,请注意。 SQL> Col tablespace_name for a30 Col used_pct for a10 Set line 120 pages 120 select total.…

STM32实战篇:闪灯 × 流水灯 × 蜂鸣器

IO引脚初始化 即开展某项活动之前所做的准备工作,对于一个IO引脚来说,在使用它之前必须要做一些参数配置(例如:选择工作模式、速率)的工作(即IO引脚的初始化)。 IO引脚初始化流程 1、使能IO引…

LED灯的呼吸功能

"呼吸功能"通常是指 LED 灯的一种工作模式,它模拟人类的呼吸节奏,即 LED 灯的亮度会周期性地逐渐增强然后逐渐减弱,给人一种 LED 在"呼吸"的感觉。这种效果通常用于指示设备的状态或者简单地作为装饰效果。(就…

Spring Boot Security自定义AuthenticationProvider

以下是一个简单的示例,展示如何使用AuthenticationProvider自定义身份验证。首先,创建一个继承自标准AuthenticationProvider的类,并实现authenticate方法。 import com.kamier.security.web.service.MyUser; import org.springframework.se…

【Adobe】Photoshop图层的使用

Adobe Photoshop(简称PS)中的图层是图像处理中一个核心概念,它允许用户以堆叠的方式组织图像的不同部分,从而实现对图像的复杂编辑和处理而不影响原始图像。以下是关于Adobe Photoshop图层的详细介绍: 一、图层的定义 图层就像是透明的纸张,你可以在上面绘制、添加图像…

YOLOv10改进 | EIoU、SIoU、WIoU、DIoU、FocusIoU等二十余种损失函数

一、本文介绍 这篇文章介绍了YOLOv10的重大改进,特别是在损失函数方面的创新。它不仅包括了多种IoU损失函数的改进和变体,如SIoU、WIoU、GIoU、DIoU、EIOU、CIoU,还融合了“Focus”思想,创造了一系列新的损失函数。这些组合形式的…

Android Init Language自学笔记

Android Init Language由五个元素组成:Acttions、Commands、Services、Options和Imports。 Actions和Services隐式声明了一个新的section。所以的Commands和Options都属于最近声明的section。 Services具有唯一的名称,如果重名会报错。 Actions Acti…

解决Spring Boot中的高可用性设计

解决Spring Boot中的高可用性设计 大家好,我是微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 1. 高可用性设计概述 1.1 什么是高可用性? 高可用性指系统在面对各种故障和异常情况时,仍…

独立开发者系列(22)——API调试工具apifox的使用

接口的逻辑已经实现,需要对外发布接口,而发布接口的时候,我们需要能自己简单调试接口。当然,其实自己也可以写简单的代码调试自己的接口,因为其实就是简单的request请求或者curl库读取,调整请求方式get或者…

如果MySQL出现 “Too many connections“ 错误,该如何解决?

当你想要连接MySQL时出现"Too many connections" 报错的情况下,该如何解决才能如愿以偿呢?都是哥们儿,就教你两招吧! 1.不想重启数据库的情况下 你可以尝试采取以下方法来解决: 增加连接数限制&#xff1a…

RxJava学习记录

文章目录 1. 总览1.1 基本原理1.2 导入包和依赖 2. 操作符2.1 创建操作符2.2 转换操作符2.3 组合操作符2.4 功能操作符 1. 总览 1.1 基本原理 参考文献 构建流:每一步操作都会生成一个新的Observable节点(没错,包括ObserveOn和SubscribeOn线程变换操作…

asp.netWebForm(.netFramework) CSRF漏洞

asp.netWebForm(.netFramework) CSRF漏洞 CSRF(Cross-Site Request Forgery)跨站请求伪造是一种常见的 Web 应用程序安全漏 洞,攻击者通过诱使已认证用户在受信任的网站上执行恶意操作,从而利用用户的身份 执行未经授权的操作。攻…

echarts实现3D饼图

先看下最终效果 实现思路 使用echarts-gl的曲面图&#xff08;surface&#xff09;类型 通过parametric绘制曲面参数实现3D效果 代码实现 <template><div id"surfacePie"></div> </template> <script setup>import {onMounted} fro…

简单的找到自己需要的flutter ui 模板

简单的找到自己需要的flutter ui 模板 网站 https://flutterawesome.com/ 简介 我原本以为会很难用 实际上不错 很简单 打开后界面类似于,右上角可以搜索 点击view github 相当简单 很oks

RabbitMq,通过prefetchCount限制消费并发数

1.问题:项目瓶颈,通过rabbitMq来异步上传图片,由于并发上传的图片过多导致阿里OSS异常, 解决方法:通过prefetchCount限制图片上传OSS的并发数量 2.定义消费者 Component AllArgsConstructor Slf4j public class ReceiveFaceImageEvent {private final UPloadService uploadSe…