中间件-RocketMQ

RocketMQ

  • 基本架构
  • 消息模型
  • 消费者消费消息模式
  • 顺序消息机制
  • 延迟消息
  • 批量消息
  • 事务消息
  • 消息重试
  • 最佳实践

基本架构

在这里插入图片描述

nameServer: 维护broker列表信息,客户端连接时只需要连接nameServer。可配置成集群。
broker:broker分为master和slave,master负责消息的发送和消费,slave是master的备份。master-slaver的集群方式,master挂掉时候slave不能主动转换为master提供服务(5.X版本后可以通过配置实现mater挂掉后slave转为master提供服务)。
leader-follower的集群方式,即高可用集群,各个broker是对等的,通过选举产生leader(在dashboart中显示为master),如果leader挂掉,在剩下的follower(显示为slave)中选举再产生新的leader。注意,只有超过半数的几点存活,才能选举出leader。

消息模型

在这里插入图片描述
⽣产者和消费者都可以指定⼀个Topic发送消息或者拉取消息。⽽Topic是⼀个逻辑概念。
Topic中的消息会分布在后⾯多个MessageQueue当中。这些MessageQueue会分布到⼀个或者多个broker中。

消费者消费消息模式

广播模式:所有关注topic的消费者都收到消息。广播模式下消息队列的消费位点由客户端自己维护,消费失败服务端不会重发。
集群模式:同一个消费者组只有一个成员收到消息。集群模式下消费点位由服务端维护,消费者组的所有成员共用一个位点,消费失败服务端会重发。

顺序消息机制

  1. ⽣产者只有将⼀批有顺序要求的消息,放到同⼀个MesasgeQueue上,通过MessageQueue的FIFO特性保证这⼀批消息的顺序。如果不指定MessageSelector对象,
    那么⽣产者会采⽤轮询的⽅式将多条消息依次发送到不同的MessageQueue上。
  2. 消费者需要实现MessageListenerOrderly接⼝,实际上在服务端,处理MessageListenerOrderly时,会给⼀个MessageQueue加锁,拿到MessageQueue上所有的消息,然后再去读取下⼀个MessageQueue的消息。
  3. 消费消息失败时,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。因为消费者端只进⾏有限次数的重试。如果⼀条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进⾏重试。但是,如果消费者⼀直处理失败,超过最⼤重试次数,那么RocketMQ就会跳过这⼀条消息,处理后⾯的消息,这会造成消息乱序。

延迟消息

  1. 定固定的延迟级别:对于指定固定延迟级别的延迟消息,RocketMQ的实现⽅式是预设⼀个系统Topic,名字叫做SCHEDULE_TOPIC_XXXXX。在这个Topic下,预设了18个MessageQueue。这⾥每个对列就对应了⼀种延迟级别。然后每次扫描这18个队列⾥的消息,进⾏延迟操作就可以了。
  2. 指定消息发送时间:RocketMQ是通过时间轮算法实现。

批量消息

⽣产者要发送的消息⽐较多时,可以将多条消息合并成⼀个批量消息,⼀次性发送出去。这样可以减少⽹络IO,提升消息发送的吞吐量。同⼀批消息的Topic必须相同,另外,不⽀持延迟消息。还有批量消息的⼤⼩不要超过1M,如果太⼤就需要⾃⾏分割。

事务消息

在这里插入图片描述

  1. ⽣产者将消息发送⾄ApacheRocketMQ服务端。
  2. ApacheRocketMQ服务端将消息持久化成功之后,向⽣产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. ⽣产者开始执⾏本地事务逻辑。
  4. ⽣产者根据本地事务执⾏结果向服务端提交⼆次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:⼆次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。⼆次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断⽹或者是⽣产者应⽤重启的特殊情况下,若服务端未收到发送者提交的⼆次确认结果,或服务端收到的⼆次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息⽣产者即⽣产者集群中任⼀⽣产者实例发起消息回查。
  6. ⽣产者收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
  7. ⽣产者根据检查到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤4对半事务消息进⾏处理。

消息重试

RocketMQ的消费者端,如果处理消息失败了,Broker是会将消息重新进⾏投送的。⽽在重试时,RocketMQ实际上会为每个消费者组创建⼀个对应的重试队列。重试的消息会进⼊⼀个“%RETRY%”+ConsumeGroup的队列中。
RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
在这里插入图片描述
如果消息重试16次后仍然失败,消息将不再投递,转为进⼊死信队列。重试次数可以通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2⼩时。
如果消息超过最⼤重试次数,RocketMQ会将消息发送到死信队列。⼀个死信队列对应⼀个消费组。死信队列的默认权限为2(禁读)。如果需要处理死信队列的消息,需要把权限修改为6(可读可写后)消费该Topic的消息进行处理。队列中超过有效期(默认3天)的消息会被删除,不管有没有消费。

最佳实践

  1. ⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识。tags过滤消息的性能很高,相当于索引。
  2. 消费端幂等控制:RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据。但是,这个MessageId是⽆法保证全局唯⼀的,也会有冲突的情况。所以在⼀些对幂等性要求严格的场景,最好是使⽤业务上唯⼀的⼀个标识⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进⾏传递。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/905699.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

anaconda3如何切换虚拟环境

在 Anaconda3 中切换虚拟环境可以通过 命令行 或 Anaconda Navigator 图形界面实现。以下是详细步骤: 方法1:通过命令行切换(推荐) 1. 查看所有虚拟环境 conda env list # 或 conda info --envs 输出示例: base …

【vue】axios网络请求介绍

一、基础使用 1.引入js文件 2.在methods中的函数里写 axios.get(路径) .then((res))>{ console.log(res.data);//控制台打印结果数据 this.listArrres.data//定义数组来接收返回来的数据 }) 二、参数传递 参数传递一般在路径后面使用 params:{ num:2,…

机器学习 --- KNN算法

机器学习 — KNN算法 文章目录 机器学习 --- KNN算法一,sklearn机器学习概述二,KNN算法---分类2.1样本距离判断2.2 KNN算法原理2.3 KNN缺点2.4 API2.5 使用sklearn中鸢尾花数据集实现KNN 一,sklearn机器学习概述 获取数据、数据处理、特征工…

Spring Boot 中的重试机制

Retryable 注解简介 Retryable 注解是 Spring Retry 模块提供的,用于自动重试可能会失败的方法。在微服务架构和分布式系统中,服务之间的调用可能会因为网络问题、服务繁忙等原因失败。使用 Retryable 可以提高应用的稳定性和容错能力 1。 使用步骤 &…

FPGA生成随机数的方法

FPGA生成随机数的方法,目前有以下几种: 1、震荡采样法 实现方式一:通过低频时钟作为D触发器的时钟输入端,高频时钟作为D触发器的数据输入端,使用高频采样低频,利用亚稳态输出随机数。 实现方式二:使用三个…

(五)毛子整洁架构(分布式日志/Redis缓存/OutBox Pattern)

文章目录 项目地址一、结构化日志1.1 使用Serilog1. 安装所需要的包2. 注册服务和配置3. 安装Seq服务 1.2 添加分布式id中间件1. 添加中间件2. 注册服务3. 修改Application的LoggingBehavior 二、Redis缓存2.1 添加缓存1. 创建接口ICaching接口2. 实现ICaching接口3. 注册Cachi…

Vue.js 全局导航守卫:深度解析与应用

在 Vue.js 开发中,导航守卫是一项极为重要的功能,它为开发者提供了对路由导航过程进行控制的能力。其中,全局导航守卫更是在整个应用的路由切换过程中发挥着关键作用。本文将深入探讨全局导航守卫的分类、作用以及参数等方面内容。 一、全局…

使用FastAPI和React以及MongoDB构建全栈Web应用05 FastAPI快速入门

一、FastAPI概述 1.1 什么是FastAPI FastAPI is a modern, high-performance Python web framework designed for building APIs. It’s rapidly gaining popularity due to its ease of use, speed, and powerful features. Built on top of Starlette, FastAPI leverages a…

如何查看打开的 git bash 窗口是否是管理员权限打开

在 git bash 中输入: net session >nul 2>&1 && (echo Ok) || (echo Failed) 显示 OK 》是管理员权限; 显示 Failed 》不是管理员权限。 如何删除此步生成的垃圾文件: 新建一个 .txt 文件,输入以下代码…

得物0509面试手撕题目解答

题目 使用两个栈(一个无序栈和一个空栈)将无序栈中的元素转移到空栈,使其有序,不允许使用其他数据结构。 示例:输入:[3, 1, 6, 4, 2, 5],输出:[6, 5, 4, 3, 2, 1] 思路与代码 如…

基于 Nexus 在 Dockerfile 配置 yum, conda, pip 仓库的方法和参考

在 Nexus 配置代理仓库的方法,可参考 pypi 的配置博客:https://hellogitlab.com/CI/docker/create_your_nexus_2 更多代理格式,参考官方文档,如 pypi:https://help.sonatype.com/en/pypi-repositories.html 配置 yum…

[6-8] 编码器接口测速 江协科技学习笔记(7个知识点)

1 2 在STM32微控制器的定时器模块中,CNT通常指的是定时器的计数器值。以下是CNT是什么以及它的用途: 是什么: • CNT:代表定时器的当前计数值。在STM32中,定时器从0开始计数,直到达到预设的自动重装载值&am…

RabbitMQ ③-Spring使用RabbitMQ

Spring使用RabbitMQ 创建 Spring 项目后&#xff0c;引入依赖&#xff1a; <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --> <dependency><groupId>org.springframework.boot</groupId><artifac…

海外IP被误封解决方案

这里使用Google Cloud和Cloudflare来实现&#xff0c;解决海外服务器被误封IP&#xff0c;访问不到的问题。 这段脚本的核心目的&#xff0c;是自动监测你在 Cloudflare 上管理的 VPS 域名是否可达&#xff0c;一旦发现域名无法 Ping 通&#xff0c;就会帮你更换IP&#xff1a…

一个基于 Spring Boot 的实现,用于代理百度 AI 的 OCR 接口

一个基于 Spring Boot 的实现&#xff0c;用于代理百度 AI 的 OCR 接口 BaiduAIController.javaBaiduAIConfig.java在 application.yml 或 application.properties 中添加配置&#xff1a;application.yml同时&#xff0c;需要在Spring Boot应用中配置RestTemplate&#xff1a;…

GPT-4o 遇强敌?英伟达 Eagle 2.5 视觉 AI 王者登场

前言&#xff1a; 在人工智能领域&#xff0c;视觉语言模型的竞争愈发激烈。GPT-4o 一直是该领域的佼佼者&#xff0c;但英伟达的 Eagle 2.5 横空出世&#xff0c;凭借其 80 亿参数的精简架构&#xff0c;在长上下文多模态任务中表现出色&#xff0c;尤其是在视频和高分辨率图像…

将语言融入医学视觉识别与推理:一项综述|文献速递-深度学习医疗AI最新文献

Title 题目 Integrating language into medical visual recognition and reasoning: A survey 将语言融入医学视觉识别与推理&#xff1a;一项综述 01 文献速递介绍 检测以及语义分割&#xff09;是无数定量疾病评估和治疗规划的基石&#xff08;利特延斯等人&#xff0c…

Ubuntu24.04版本解决RK3568编译器 libmpfr.so.4: cannot open shared object

问题描述 在Ubuntu24.04版本上编译RK3568应用程序关于libmpfr.so.4: cannot open shared object问题&#xff0c;如下所示&#xff1a; /tools/ToolsChain/rockchip/rockchip_rk3568/host/bin/../libexec/gcc/aarch64-buildroot-linux-gnu/9.3.0/cc1plus: error while loadin…

产线视觉检测设备技术方案:基于EFISH-SCB-RK3588/SAIL-RK3588的国产化替代赛扬N100/N150全场景技术解析

一、核心硬件选型与替代优势‌ ‌1. 算力与AI加速能力‌ ‌异构八核架构‌&#xff1a;采用4Cortex-A76&#xff08;2.4GHz&#xff09;4Cortex-A55&#xff08;1.8GHz&#xff09;设计&#xff0c;支持视觉算法并行处理&#xff08;如模板匹配、缺陷分类&#xff09; 相机采…

python如何合并excel单元格

在Python中合并Excel单元格&#xff0c;常用openpyxl库实现。以下是详细步骤和示例代码&#xff1a; 方法一&#xff1a;使用 openpyxl 库 步骤说明&#xff1a; 安装库&#xff1a; pip install openpyxl导入库并加载文件&#xff1a; from openpyxl import load_workbook# …