消息队列如何保证消息可靠性(kafka以及RabbitMQ)

目录

RabbitMQ保证消息可靠性

生产者丢失消息

MQ丢失消息

消费端丢失了数据

Kakfa的消息可靠性

生产者的消息可靠性

Kakfa的消息可靠性

消费者的消息可靠性


RabbitMQ保证消息可靠性

生产者丢失消息

1.事务消息保证

生产者在发送消息之前,开启事务消息随后生产者发送消息,消息发送之后,如果消息没有被MQ接收到的话,生产者会收到异常报错,生产者回滚事务,然后重试消息,如果收到了消息,就能提交事务了

@Autowired
private RabbitTemplate rabbitTemplate;public void sendTransactionalMessage() {ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();Channel channel = connectionFactory.createConnection().createChannel(false);try {channel.txSelect(); // 开启事务channel.basicPublish("exchange", "routing.key", null, "message".getBytes());channel.txCommit(); // 提交事务} catch (Exception e) {channel.txRollback(); // 出错回滚}
}

2.使用confirm机制

  • 普通confirm机制,就是发送消息之后,等待服务器confirm之后再发送下一个消息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息成功发送到Broker");} else {System.out.println("消息发送失败,原因:" + cause);}
});rabbitTemplate.convertAndSend("exchange", "routing.key", "message");
  • 批量confirm机制,每发送一批消息之后,等待服务器confirm
Channel channel = connection.createChannel(false);
channel.confirmSelect();for (int i = 0; i < 100; i++) {channel.basicPublish("exchange", "routing.key", null, ("msg" + i).getBytes());
}
channel.waitForConfirms(); // 等待所有消息确认
  • 异步confirm机制,服务器confirm一个或者多个消息之后,客户端(生产者)能够通过回调函数来确定消息是否被confirm(推荐)
SortedSet<Long> pendingSet = Collections.synchronizedSortedSet(new TreeSet<>());
channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long tag, boolean multiple) {if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}public void handleNack(long tag, boolean multiple) {System.err.println("未确认消息:" + tag);if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}
});while (true) {long seq = channel.getNextPublishSeqNo();channel.basicPublish("demo.exchange", "demo.key",MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes());pendingSet.add(seq);
}

MQ丢失消息

防止MQ的丢失数据的话,方法就是开启RabbitMQ的持久化,消息写入之后(也就是到了MQ之后)就直接持久化到磁盘中,即使Rabbimq自己挂了之后,会恢复数据。

设置持久化步骤

  • 创建queue的时候直接设置持久化,此时就能持久化queue的元数据(不是消息)
@Bean
public Queue durableQueue() {return new Queue("myQueue", true); // true 表示持久化
}
  • 发送消息的时候指定消息为deliveryMode设置为2,也就是设置消息为持久化,此时消息可以持久化磁盘上
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("content".getBytes(), props);
rabbitTemplate.send("exchange", "routing.key", message);

极端情况:

消息写到RabbitMQ之后,但是还没有持久化到磁盘之后直接挂了,导致内存中消息丢失。

解决方法:持久化与生产者的confirm机制配合,当且仅当持久化了消息之后,再confirm,避免数据与消息丢失,此时生产者收不到ack,也是可以自己重发

消费端丢失了数据

意思就是消息已经拉取到了信息,还没有处理(注意这是已经告诉MQ我拉取到数据了),结果进程挂了,重启之后继续消费下一条消息,导致中间的这一条没有消费到,此时数据丢失了。

利用ack机制处理

取消RabbiMQ的自动ack,也就是一个api,可以在消费端消费完了消息之后再调用api告诉MQ我们收到并且处理了该消息。如果没有返回ack,RabbitMQ会把该消息分配给其他的consumer处理,消息不会丢失。通过配置处理

spring:rabbitmq:listener:simple:acknowledge-mode: manual

Kakfa的消息可靠性

生产者的消息可靠性

在kafka中,可以在producer(生产段)设置一个参数,也就是ack=all,要求每个数据,必须写入所有的replica(也就是所有该分区的副本),才认为是接收成功。该参数设置的是你的leader接收到消息后,所有的follower都同步到消息后才认为写成功

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 等待所有副本确认
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("发送成功:" + metadata.offset());} else {exception.printStackTrace();}
});
producer.close();

Kakfa的消息可靠性

kafka默认是会将消息持久化到磁盘上的,但是还是有情况会导致丢失数据

kafka某个broker宕机,随后重新选举partition的leader。倘若在该broker中的partition中的leader副本中的消息,还没有被其他broker中的follower同步,此时同步缺失的数据就丢失了,也就是少了一些数据

解决方法:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

按照上面的配置之后,leader的切换就不会导致数据缺失了。

消费者的消息可靠性

唯一可能也是类似于RabbitMQ中的,也就是说你消费到该消息的时候,消费者自动提交offset,让kafka以为你消费好了该消息,但是自己还没处理就宕机后,会导致重启后没有消费该消息。

解决方法:

关闭kafka默认的自动提交offset,通过消费端业务逻辑处理完消息后,再手动提交offset,当然这里就是会导致重复消费了,这里就是幂等性的问题了。比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次

手动提交api:consumer.commitSync();

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息System.out.println("处理消息:" + record.value());}// 手动提交 offsetconsumer.commitSync();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/905235.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何查看项目是否支持最新 Android 16K Page Size 一文汇总

前几天刚聊过 《Google 开始正式强制 Android 适配 16 K Page Size》 之后&#xff0c;被问到最多的问题是「怎么查看项目是否支持 16K Page Size」 &#xff1f;其实有很多直接的方式&#xff0c;但是最难的是当你的项目有很多依赖时&#xff0c;怎么知道这个「不支持的动态库…

HttpServletResponse的理解

HttpServletResponse 是 Java Servlet API 提供的一个接口 常用方法 方法用途setContentType(String type)设置响应内容类型&#xff08;如 "application/json"、"text/html"&#xff09;setStatus(int sc)设置响应状态码&#xff08;如 200、404&#x…

可灵 AI:开启 AI 视频创作新时代

在当今数字化浪潮中&#xff0c;人工智能&#xff08;AI&#xff09;技术正以前所未有的速度渗透到各个领域&#xff0c;尤其是在内容创作领域&#xff0c;AI 的应用正引发一场革命性的变革。可灵 AI 作为快手团队精心打造的一款前沿 AI 视频生成工具&#xff0c;宛如一颗璀璨的…

用 AltSnap 解锁 Windows 窗口管理的“魔法”

你有没有遇到过这样的场景&#xff1a;电脑屏幕上堆满了窗口&#xff0c;想快速调整它们的大小和位置&#xff0c;却只能拖来拖去&#xff0c;费时又费力&#xff1f;或者你是个多任务狂魔&#xff0c;喜欢一边写代码、一边看文档、一边刷视频&#xff0c;却发现 Windows 自带的…

深度策略梯度算法PPO

一、策略梯度核心思想和原理 从时序差分算法Q学习到深度Q网络&#xff0c;这些算法都侧重于学习和优化价值函数&#xff0c;属于基于价值的强化学习算法&#xff08;Value-based&#xff09;。 1. 基于策略方法的主要思想&#xff08;Policy-based&#xff09; 基于价值类方…

【LaTeX】Word插入LaTeX行间公式如何编号和对齐

在 Word 文档中插入公式&#xff0c;需要用到 LaTeX \LaTeX LATE​X 。但遗憾的是&#xff0c;Word 只支持部分 LaTeX \LaTeX LATE​X 语法&#xff0c;这就导致很多在 Markdown 能正常渲染的公式在 Word 中无法正常显示。 “内嵌”和“显示” 首先介绍一下 Word 的“内嵌”…

互联网大厂Java面试实战:Spring Boot到微服务的技术问答解析

&#x1f4aa;&#x1f3fb; 1. Python基础专栏&#xff0c;基础知识一网打尽&#xff0c;9.9元买不了吃亏&#xff0c;买不了上当。 Python从入门到精通 &#x1f601; 2. 毕业设计专栏&#xff0c;毕业季咱们不慌忙&#xff0c;几百款毕业设计等你选。 ❤️ 3. Python爬虫专栏…

spring boot3.0自定义校验注解:文章状态校验示例

文章目录 Spring Boot 自定义校验注解&#xff1a;状态校验示例一、创建 State 注解步骤&#xff1a;1. 创建自定义注解&#xff1a;2. 实现校验逻辑&#xff1a; 二、 实现自定义校验步骤:1. 在实体类中使用自定义校验注解 State&#xff1a;2. 添加 State 注解&#xff1a; 总…

无侵入式弹窗体验_探索 Chrome 的 Close Watcher API

1. 引言 在网页开发中,弹窗(Popup)是一种常见的交互方式,用于提示用户进行操作、确认信息或展示关键内容。然而,传统的 JavaScript 弹窗方法如 alert()、confirm() 和 prompt() 存在诸多问题,包括阻塞主线程、样式不可定制等。 为了解决这些问题,Chrome 浏览器引入了 …

调出事件查看器界面的4种方法

方法1. 方法2. 方法3. 方法4.

Ubuntu 安装远程桌面连接RDP方式

1. 安装 XFCE4 桌面环境 如果你的 Ubuntu 系统默认使用 GNOME 或其它桌面环境&#xff0c;可以安装轻量级的 XFCE4&#xff1a; sudo apt update sudo apt install xfce4 xfce4-goodies 说明&#xff1a;xfce4-goodies 包含额外的插件和工具&#xff08;如面板插件、终端等&a…

LWIP传输层协议笔记

传输协议简介 文件/图片/视频 都是一堆二进制数据 经过传输层来传输 这两种协议有什么区别呢&#xff1f; 传输层的TCP/UDP三个步骤 TCP使用传输流程 1、三次握手 作用&#xff1a;三次握手就是建立连接的过程 2、传输数据 作用&#xff1a;建立连接完成之后&#xff…

数据分析与逻辑思维:六步解决业务难题;参考书籍《数据分析原理:6步解决业务分析难题 (周文全, 黄怡媛, 马炯雄)》

文章目录 一、懂业务&#xff1a;业务背景与逻辑前提1.1 明确业务目标与问题定义1.2 培养批判性思维与高于业务视角 二、定指标&#xff1a;构建科学的指标体系2.1 指标拆解与维度分析2.2 典型指标体系案例&#xff1a;用户与业务视角 三、选方法&#xff1a;匹配业务需求的分析…

开启WSL的镜像网络模式

开启WSL的镜像网络模式 前提 Windows主机系统版本高于Windows 11 22H2。WLS版本>2.0。 可输入wsl --version查看当前系统wsl版本。 修改设置 图形界面修改 在开始菜单中搜索&#xff1a;wsl settings&#xff0c;结果如下图所示&#xff1a; 点击“打开”&#xff0…

Python爬虫第20节-使用 Selenium 爬取小米商城空调商品

目录 前言 一、 本文目标 二、环境准备 2.1 安装依赖 2.2 配置 ChromeDriver 三、小米商城页面结构分析 3.1 商品列表结构 3.2 分页结构 四、Selenium 自动化爬虫实现 4.1 脚本整体结构 4.2 代码实现 五、关键技术详解 5.1 Selenium 启动与配置 5.2 页面等待与异…

聚类分析的原理、常用算法及其应用

聚类分析的原理、常用算法及其应用 一、聚类分析的基本原理 &#xff08;一&#xff09;什么是聚类分析 聚类分析是一种无监督学习方法&#xff0c;其目标是将数据集中的样本划分为若干个簇&#xff0c;每个簇包含相似的样本。聚类分析的核心思想是通过某种相似性度量&#…

Aware和InitializingBean接口以及@Autowired注解失效分析

Aware 接口用于注入一些与容器相关信息&#xff0c;例如&#xff1a; ​ a. BeanNameAware 注入 Bean 的名字 ​ b. BeanFactoryAware 注入 BeanFactory 容器 ​ c. ApplicationContextAware 注入 ApplicationContext 容器 ​ d. EmbeddedValueResolverAware 注入 解析器&a…

JDK 安装与配置

JDK 全称是 Java SE Development Kit&#xff0c;翻译成中文就是&#xff1a;Java 标准版开发包&#xff0c;是 Sun 公司&#xff08;后被 Oracle 公司收购&#xff09;专门外 Java 开发人员提供的一套用于开发 Java 应用程序的工具包。 JDK 提供了用于编译和运行 Java 应用程序…

防火墙来回路径不一致导致的业务异常

案例拓扑&#xff1a; 拓扑描述&#xff1a; 服务器有2块网卡&#xff0c;内网网卡2.2.2.1/24 网关2.2.254 提供内网用户访问&#xff1b; 外网网卡1.1.1.1/24&#xff0c;外网网关1.1.1.254 80端口映射到公网 这个时候服务器有2条默认路由&#xff0c;分布是0.0.0.0 0.0.0.0 1…

Java面试高频问题(36-37)

三十六、服务网格核心能力与设计模式 服务网格架构分层模型 mermaid graph TB subgraph 数据平面 ASidecar代理 -->拦截流量 BEnvoy B -->协议转换 CHTTP/gRPC B -->策略执行 D熔断/限流 end subgraph 控制平面 E配置中心 -->下发策略 Fistiod F -->证书管理 …