【RabbitMQ】应用问题、仲裁队列(Raft算法)和HAProxy负载均衡

    🔥个人主页: 中草药

🔥专栏:【中间件】企业级中间件剖析


一、幂等性保障

什么是幂等性?

        幂等性是指对一个系统进行重复调用(相同参数),无论同一操作执行多少次,这些请求对系统的影响都是相同的效果,结果都与执行一次相同。

        消息可能因网络重传、消费者异常重启、消息重复投递等导致重复消费,需确保多次处理不会产生副作用。

RabbitMQ 重复消息的来源

场景原因
生产者重复发送生产者未收到 Broker 的 ACK,触发重试机制(如网络抖动、Broker 未及时响应)
消费者重复消费消费者处理消息后未及时 ACK,消息重新入队(如消费者崩溃、处理超时)
Broker 消息堆积消息因队列配置(如死信队列、TTL)被多次重新投递

MQ的幂等性保障

对于 MQ 而言,幂等性是指同一条消息,多次消费,对系统的影响是相同的。

一般消息中间件的消息传输保障分为三个层级。

  1. At most once: 最多一次。消息可能会丢失,但绝不会重复传输.
  2. At least once: 最少一次。消息绝不会丢失,但可能会重复传输.
  3. Exactly once: 恰好一次。每条消息肯定会被传输一次且仅传输一次.

        RabbitMQ 支持 "最多一次" 和 "最少一次"。对于 "恰好一次", 目前 RabbitMQ 还做不到,不仅是 RabbitMQ, 目前市面上主流的消息中间件,都做不到这一点.

实现方案

1、唯一标识 + 去重表

原理:为每条消息分配唯一 ID(如 UUID、业务主键),消费前检查该 ID 是否已处理。

实现步骤

生产者:在消息头(Header)中添加唯一标识(如 message_id)。

消费者

        消费前查询去重表(如 Redis 或数据库),判断 message_id 是否存在。

        若不存在,处理消息并写入去重表;若存在,直接 ACK 消息。

优化

        去重表设计:可以使用 Redis 的原子性操作 setnx 来保证幂等性,将唯一 ID 作为 key 放到 redis 中(SETNX messageID 1). 返回 1,说明之前没有消费过,正常消费。返回 0,说明这条消息之前已消费过,抛弃.

        过期时间:为去重表记录设置 TTL,避免数据无限膨胀。


2、业务逻辑判断

在业务逻辑层面实现消息处理的幂等性。

例如: 通过检查数据库中是否已存在相关数据记录,或者使用乐观锁机制来避免更新已被其他事务更改的数据,再或者在处理消息之前,先检查相关业务的状态,确保消息对应的操作尚未执行,然后才进行处理,具体根据业务场景来处理

二、顺序性保障

        在分布式系统中,消息的顺序性保障是确保消息按照生产者发送的先后顺序被消费者处理的机制。RabbitMQ 作为消息中间件,默认不提供严格的全局顺序保证,但可通过特定设计和配置实现部分场景下的顺序性。

顺序性问题的根源

RabbitMQ 默认无法保证全局顺序性的原因:

  • 多消费者并行消费:一个队列绑定多个消费者时,消息可能被无序处理。

  • 消息重试与重新入队:消费者处理失败的消息重新入队后,可能插入到队列中间。

  • 交换机路由策略:使用 directtopic 或 headers 交换机时,消息可能分散到不同队列。

  • 网络延迟与分区:网络抖动可能导致消息到达 Broker 的顺序与发送顺序不一致。

顺序性保障方案

1、单一队列 + 单一消费者

  • 原理:同一队列仅绑定一个消费者,串行处理消息。

  • 适用场景:低吞吐量但对顺序性要求极高的场景(如金融交易)。

  • 实现

    • 生产者将所有消息发送到同一队列。

    • 队列仅允许一个消费者连接(设置 prefetch_count=1)。

    • 消费者禁用自动 ACK,处理完一条消息后手动确认。

2、分区消费

        单个消费者的吞吐太低了,当需要多个消费者以提高处理速度时,可以使用分区消费,把一个队列分割成多个分区,每个分区由一个消费者处理,以此来保持每个分区内消息的顺序性.

Rabbitmq本身并不支持分区消费,需要业务逻辑去实现,或者借助spring-cloud-stream来实现

Partitioning with the RabbitMQ Binder :: Spring Cloud Stream

实现效果演示

3、消息确认机制
        使用手动消息确认机制,消费者在处理完一条消息后,显式地发送确认,这样RabbitMQ才会移除并继续发送下一条消息.

4、业务逻辑控制
        在某些情况下,即使消息乱序到达,也可以在业务逻辑层面实现顺序控制,比如通过在消息中嵌入序列号,并在消费时根据这些信息来处理

由于RabbitMO本身并不保证全局的严格顺序性,所以以上所提供的方案往往需要搭配混合使用,特别是在分布式系统中,在实际应用开发中,根据具体的业务需求,需要结合多种策略来实现所需要的顺序保证.

三、消息积压

常见原因

1、消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力,包括一些流量激增的情况(活动促销)

2、消费者处理能力不足:消费者处理处理消息的速度跟不上消息生产的速度,也会导致消息在队列中积压,可能原因有:

  • 消费端业务逻辑复杂,耗时长
  • 消费端代码性能低
  • 系统资源限制,如 CPU、内存、磁盘 I/O 等也会限制消费者处理消息的效率.
  • 异常处理处理不当。消费者在处理消息时出现异常,导致消息无法被正确处理和确认.

3、网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压

4、RabbitMQ 服务器配置问题

  • 未设置合理的 prefetch count:消费者一次拉取过多消息,导致内存压力。
  • 队列未持久化:重启后消息丢失,需重新处理积压。
  • 未使用惰性队列(Lazy Queue):高吞吐场景下内存不足。

解决方案

1)提高消费者效率
        a. 增加消费者实例数量,比如新增机器
        b. 优化业务逻辑,比如使用多线程来处理业务
        c. 设置 prefetchCount, 当一个消费者阻塞时,消息转发到其他未阻塞的消费者.
        d. 消息发生异常时,设置合适的重试策略,或者转入到死信队列

2)限制生产者速率。比如流量控制,限流算法等
        a. 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送速率
        b. 限流:使用限流工具,为消息发送速率设置一个上限
        c. 设置过期时间。如果消息过期未消费,可以配置死信队列,以避免消息丢失,并减少对主队列的压力

3)资源与配置优化   比如升级 RabbitMQ 服务器的硬件,调整 RabbitMQ 的配置参数等

在选择策略的时候需要实际考虑业务的需求和系统的实际承载能力

四、Raft算法

        Raft 是一种专为 分布式一致性 设计的共识算法。其核心目标是通过 强可理解性 解决传统 Paxos 算法的复杂性,同时保证分布式系统的 高可用性 和 数据一致性

分解问题

将共识问题拆分为三个子问题:

领导人选举(Leader Election):系统中仅有一个 Leader 负责处理客户端请求。

日志复制(Log Replication):Leader 将操作日志同步到所有 Follower 节点。

安全性(Safety):确保所有节点最终状态一致,避免数据冲突。

核心机制

节点角色

  • Leader:唯一处理客户端请求的节点,负责日志复制和心跳维持。

  • Follower:被动接收 Leader 的日志和心跳,不主动响应客户端,不直接处理客户端请求。

  • Candidate:选举过程中的临时角色(Follower 超时未收到心跳后成为 Candidate,开始尝试通过 投票过程成为新的Leader)。

正常的情况下,集群中只有一个Leader,剩下的节点都是follower

任期(Term)

  • 全局单调递增的整数(类似“逻辑时钟”),每个任期至多一个 Leader。

  • 节点间通信携带 Term,用于检测过期信息(如旧 Leader 的请求会被拒绝)。

        Raft 将时间划分成任意长度的任期(term).每一段任期从一次选举开始,在这个时候会有一个或者多个candidate 尝试去成为leader,在成功完成一次leaderelection之后,一个leader就会一直节管理集群直到任期结束,在某些情况下,一次选举无法选出 leader,这个时候这个任期会以没有leader 而结束(如下图t3).同时一个新的任期(包含一次新的选举)会很快重新开始

通信

Raft算法中的服务器节点之间采用RPC进行通信,主要由两类RPC请求:

  • RequestVote RPCs: 请求投票,由 candidate 在选举过程中发出

  • AppendEntries RPCs: 追加条目,由leader 发出,用来做日志复制和提供心跳机制

选举过程

可以通过此网站动画来理解投票选举过程Raft Consensus Algorithm

        Raft 采用一种心跳机制来触发 leader 选举,当服务器启动的时候,都是follow状态.如果follower在election timeout内没有收到来自leader的心跳(可能没有选出leader,也可能leader挂了,或者leader与follower之间网络故障),则会主动发起选举.

步骤如下:
1、率先超时的节点,自增当前任期号然后切换为 candidate 状态,并投自己一票

2、以并行的方式发送一个 RequestVote RPCs 给集群中的其他服务器节点(企图得到它们的投票)

3、等待其他节点的回复

此时可能会出现三种结果

a、赢得选举,自己成为Leader(包括自己的一票),新的Leader会给其他节点发布消息,避免其余节点触发新的选举

b、其他节点赢得了选举,未成功选举的节点在接受到消息时,会自动转化为follower

c、一段时间内没有收到majority投票,保持candidate状态,重新发出选举

        没有任何节点获得majority投票.比如所有的 follower 同时变成 candidate,然后它们都将票投给自己,那这样就没有 candidate 能得到超过半数的投票了.当这种情况发生的时候,每个candidate 都会进行一次超时响应,然后通过自增任期号来开启一轮新的选举,并启动另一轮的RequestVote RPCs.如果没有额外的措施,这种无结果的投票可能会无限重复下去.

        为了解决上述问题,Raft 采用 随机选举超时时间(randomized election timeouts)来确保很少产生无结果的投票,并且就算发生了也能很快地解决。为了防止选票一开始就被瓜分,选举超时时间是从一个固定的区间(比如,150-300ms)中随机选择。这样可以把服务器分散开来以确保在大多数情况下会只有一个服务器率先结束超时,那么这个时候,它就可以赢得选举并在其他服务器结束超时之前发送心跳。

五、仲裁队列

        RabbitMQ 的 仲裁队列(Quorum Queues) 是 RabbitMQ 3.8 版本引入的一种新型队列类型,专为 高可用性和数据一致性 场景设计。它基于 Raft 一致性协议实现,替代了传统的镜像队列(Mirrored Queues),在节点故障时能更可靠地保证数据安全。

        在集群环境之中,如果某一节点宕机故障,其中原本的信息也会发生丢失,仲裁队列可以在rabbitmq之间进行队列数据的复制,保障集群系统的高可用性。

节点宕机之前

节点宕机后,消息丢失了 

使用仲裁队列

@Bean("quorumQueue")
public Queue quorumQueue() {return QueueBuilder.durable("quorum_queue").quorum().build();
}

可以观察到,仲裁队列后面有一个+2,表示队列中有两个镜像节点,点进去可以看到队列详细

此时如果发生单个节点宕机,队列里的消息不会丢失

六、HAProxy负载均衡

        面对大量的业务访问,高并发请求,试想如果一个集群中有3个节点,我们在写代码时,访问哪个节点呢?
答案是访问任何一个节点都可以.
这时候就存在两个问题:
1、如果我们访问的是node1,但是node1挂了,咱们的程序也会出现问题,所以最好是有一个统一的入口,一个节点故障时,流量可以及时转移到其他节点.

2、如果所有的客户端都与node1建议连接,那么node1的网络负载必然会大大增加,而其他节点又由于没有那么多的负载而造成硬件资源的浪费.

        这时,负载均衡显得尤为重要,HAProxy(High Availability Proxy)是一款开源的 高性能TCP/HTTP负载均衡器 和 反向代理,广泛用于分发流量、提升系统可用性和扩展性。

快速上手

Ubuntu安装

#更新软件包
sudo apt-get update#查找haproxy
sudo apt listlgrep haproxy#安装haproxy
sudo apt-get install haproxy

验证安装

#查看服务状态
sudo systemctl status haproxy#查看版本
haproxy -v#如果要设置HAProxy服务开机自启,可以使用
sudo systemctl enable haproxy

 修改haproxy.cfg

vim /etc/haproxy/haproxy.cfg

# haproxy web 管理界面
listen stats    #设置一个监听器,统计HAProxy的统计信息bind *:8100        #指定了监听器绑定到的IP地址和端口mode http          #监听器的工作模式为HTTPstats enable       #启用统计页面stats realm Haproxy\ Statisticsstats uri /stats auth admin:admin    #登录账号密码
# 配置负载均衡
Listen rabbitmgbind *:5670mode tcp              #Rabbitmq使用的AMQP协议是一个基于TCP的协议balance roundrobin    #制定负载均衡策略为轮询server    rabbitmgl 127.0.0.1:5672 check inter 5000 rise 2 fall 3server    rabbitmq2 127.0.0.1:5673 check inter 5000 rise 2 fall 3server    rabbitmg3 127.0.0.1:5674 check inter 5000 rise 2 fall 3

重启HAProxy

sudo systemctl restart haproxy

此时可以通过访问 http://ip:8100/  查看HAProxy

修改配置文件

spring:rabbitmq:addresses: amqp://study:study@ip:5670/Test

此时成功实现了负载均衡,也实现了节点宕机后,流量的及时转移


自信与骄傲有异:信者常沉着,而骄傲者常浮扬。                                                ——梁启超

🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀

以上,就是本期的全部内容啦,若有错误疏忽希望各位大佬及时指出💐

  制作不易,希望能对各位提供微小的帮助,可否留下你免费的赞呢🌸 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/83334.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

51 单片机头文件 reg51.h 和 reg52.h 详解

51 单片机头文件详解 51 单片机的头文件reg51.h和reg52.h是开发中非常重要的文件,它们定义了单片机的特殊功能寄存器 (SFR) 和位地址。以下是对这两个头文件的详细解析: 1. 头文件概述 reg51.h:针对标准 8051 单片机(4KB ROM, 128B RAM) reg52.h:针对增强型 8052 单片…

前端的面试笔记——JavaScript篇(二)

一、instanceof 在 JavaScript 里,instanceof 是一个相当实用的运算符,它的主要功能是检查某个对象是否属于特定构造函数的实例。这里需要明确的是,判断的依据并非对象的类型,而是其原型链。下面为你详细介绍它的用法和特点&…

”一维前缀和“算法原理及模板

前缀和,就是通过一种方法来求出数组中某个连续区间的元素的和的办法。我们通常先预处理出来一个前缀和数组,然后把数组中进行元素填充后再进行后续使用。 我们通过一道模板题或许能更加理解其意思。 现在的问题就是:如果我们用暴力枚举来记录…

5.13/14 linux安装centos及一些操作命令随记

一、环境准备 VMware Workstation版本选择建议 CentOS 7 ISO镜像下载指引 虚拟机硬件配置建议(内存/处理器/磁盘空间) 二、系统基础命令 一、环境准备 1.VMware Workstation版本选择建议 版本选择依据 选择VMware Workstation的版本时&#xff0c…

spring学习->sprintboot

spring IoC(控制翻转): 控制:资源的控制权(资源的创建,获取,销毁等) 反转:和传统方式不一样(用上面new什么),不用new让ioc来发现你用什么,然后我来给什么 DI:(依赖注入) 依赖:组件的依赖关系。如newsController依赖NewsServi…

iOS 阅后即焚功能的实现

iOS阅后即焚功能实现步骤 一、功能设计要点 消息类型支持:文本、图片、视频、音频等。销毁触发条件: 接收方首次打开消息后启动倒计时。消息存活时间可配置(如5秒、1分钟)。 安全要求: 端到端加密(E2EE&a…

OpenHarmony 开源鸿蒙南向开发——linux下使用make交叉编译第三方库——mqtt库

准备工作 请依照这篇文章搭建环境 OpenHarmony 开源鸿蒙南向开发——linux下使用make交叉编译第三方库——环境配置_openharmony交叉编译-CSDN博客 下载 wget ftp://ftp.gnutls.org/gcrypt/gnutls/v3.5/gnutls-3.5.9.tar.xz 解压 tar -xf mkdir ./out cd ./out Cmake命…

武汉SMT贴片工艺优化与生产效能提升路径

内容概要 随着华中地区电子制造产业集群的快速发展,武汉SMT贴片行业面临工艺升级与效能提升的双重挑战。本文聚焦SMT生产全流程中的关键环节,从钢网印刷精度控制、回流焊温度曲线优化、AOI检测系统迭代三大核心工艺出发,结合区域产业链特点提…

线程池(ThreadPoolExecutor)实现原理和源码细节是Java高并发面试和实战开发的重点

一、线程池核心流程图 ----------------- | 提交任务 | submit/execute -----------------|v ----------------- | 判断核心线程数 | < corePoolSize&#xff1f; -----------------|Yes |Nov v [创建新线程] -----------------| 队列是否满&a…

学习海康VisionMaster之直方图工具

一&#xff1a;进一步学习了 今天学习下VisionMaster中的直方图工具&#xff1a;就是统计在ROI范围内进行灰度级分布的统计 二&#xff1a;开始学习 1&#xff1a;什么是直方图工具&#xff1f; 直方图工具针对输入灰度图像的指定ROI区域&#xff0c;输出该区域的图像灰度直方…

计算机网络 : Socket编程

计算机网络 &#xff1a; Socket编程 目录 计算机网络 &#xff1a; Socket编程引言1.UDP网络编程1.1 网络地址与端口转换函数1.2 本地环回1.3 EchoServer1.4 DictServer1.5 DictServer封装版1.6 简单聊天室 2.TCP网络编程2.1 TCP Socket API详解2.2 Echo Server2.3 Echo Serve…

Elasticsearch/OpenSearch 中doc_values的作用

目录 1. 核心作用 2. 适用场景 3. 与 index 参数的对比 4. 典型配置示例 场景 1&#xff1a;仅用于聚合&#xff0c;禁止搜索 场景 2&#xff1a;优化大字段存储 5. 性能调优建议 6. 底层原理 doc_values 是 Elasticsearch/OpenSearch 中用于优化查询和聚合的列式存储结…

使用mermaid 语言绘画时序图和链路图

给大家展示一下效果&#xff0c; 官方地址&#xff1a;https://mermaid.nodejs.cn/ 官方开发地&#xff1a;https://mermaid.nodejs.cn/intro/#google_vignette graph LR%% 样式定义&#xff08;完全保留&#xff09; classDef user fill:#E1F5FE,stroke:#0288D1;classDef …

C++ Kafka客户端(cppkafka)安装与问题解决指南

一、cppkafka简介 cppkafka是一个现代C的Apache Kafka客户端库&#xff0c;它是对librdkafka的高级封装&#xff0c;旨在简化使用librdkafka的过程&#xff0c;同时保持最小的性能开销。 #mermaid-svg-qDUFSYLBf8cKkvdw {font-family:"trebuchet ms",verdana,arial,…

STM32的ADC模块中,**采样时机(Sampling Time)**和**转换时机(Conversion Time),获取数据的时机详解

在STM32的ADC模块中&#xff0c;**采样时机&#xff08;Sampling Time&#xff09;和转换时机&#xff08;Conversion Time&#xff09;**是ADC工作流程中的两个关键阶段&#xff0c;直接影响采样精度和系统实时性。以下是详细解析&#xff1a; 1. 采样时机&#xff08;Samplin…

Pageassist安装(ollama+deepseek-r1)

page-assist网站&#xff1a;https://github.com/n4ze3m/page-assist 首先电脑配置node.js&#xff0c;管理员打开命令窗口输入下面命令下载bun npm install -g buncd 到你想要安装page-assist的地方&#xff08;推荐桌面&#xff09; 输入下列命令 git clone https://gith…

APC 荧光通道专用!Elabscience® CD11b 抗体激发 / 发射光谱精准匹配流式检测

内容概要 Elabscience APC Anti-Mouse/Human CD11b Antibody [M1/70]&#xff08;货号&#xff1a;E-AB-F1081E&#xff09;是一款高特异性荧光标记抗体&#xff0c;适用于流式细胞术&#xff08;FCM&#xff09;&#xff0c;可精准检测小鼠和人类样本中的 CD11b 髓系细胞&…

entity线段材质设置

在cesium中,我们可以改变其entity线段材质,这里以直线为例. 首先我们先创建一条直线 const redLine viewer.entities.add({polyline: {positions: Cesium.Cartesian3.fromDegreesArray([-75,35,-125,35,]),width: 5,material:material, 保存后可看到在地图上创建了一条线段…

大模型数据分析破局之路20250512

大模型数据分析破局之路 本文面向 AI 初学者、数据分析从业者与企业技术负责人&#xff0c;围绕大模型如何为数据分析带来范式转变展开&#xff0c;从传统数据分析困境谈起&#xff0c;延伸到 LLM MCP 的协同突破&#xff0c;最终落脚在企业实践建议。 &#x1f30d; 开篇导语…

【MySQL】索引太多会怎样?

在 MySQL 中&#xff0c;虽然索引可以显著提高查询效率&#xff0c;但过多的索引&#xff08;如超过 5-6 个&#xff09;会带来以下弊端&#xff1a; 1. 存储空间占用增加 每个索引都需要额外的磁盘空间存储索引树&#xff08;BTree&#xff09;。对于大表来说&#xff0c;多个…