简识MQ之Kafka、ActiveMQ、RabbitMQ、RocketMQ传递机制

四种主流消息队列(Kafka、ActiveMQ、RabbitMQ、RocketMQ)的生产者与消费者传递信息的机制说明,以及实际使用中的注意事项和示例:


1. Apache Kafka

传递机制

  • 模型:基于 发布-订阅模型,生产者向 主题(Topic) 发送消息,消费者订阅主题并消费消息。
  • 核心流程
    1. 生产者将消息发送到 Kafka 集群的 Broker,根据 分区策略(如轮询、哈希)将消息写入对应的分区(Partition)。
    2. 消费者通过消费者组(Consumer Group)订阅主题,每个分区的数据会被分配给组内的消费者(通过 Rebalance 机制)。
    3. 消费者从分区中拉取消息(poll 方式)并处理。

示例代码(Kafka 生产者)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "message"));
producer.close();

示例代码(Kafka 消费者)

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}

注意事项

  1. 分区与顺序性

    • Kafka 不保证跨分区的消息顺序,但单个分区内的消息按顺序存储。
    • 示例:发送订单创建事件时,需将同一用户的消息发送到同一分区(通过 key)。
  2. 消费者组与 Rebalance

    • 消费者组内成员变化时(如新增消费者),会触发分区重新分配(Rebalance),可能导致短暂消息不可读。
    • 建议:避免频繁增减消费者实例。
  3. 消息持久化

    • 生产者可通过 acks=all 确保消息写入所有副本后返回成功,但会增加延迟。
    • 适用场景:对消息可靠性要求极高的场景(如金融交易)。

2. Apache ActiveMQ

传递机制

  • 模型:支持 点对点(Queue) 和 发布-订阅(Topic) 模型。
  • 核心流程
    1. 生产者发送消息到队列或主题。
    2. 消息通过 异步/同步 方式传递给消费者(默认异步)。
    3. 可启用 持久化,消息存储到磁盘以防 Broker宕机。

示例代码(ActiveMQ 生产者)

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("my-queue");MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
producer.send(message);
connection.close();

示例代码(ActiveMQ 消费者)

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("my-queue");MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage) consumer.receive();
System.out.println("Received: " + message.getText());
consumer.acknowledge(); // 手动确认消息
connection.close();

注意事项

  1. 消息持久化

    • 需设置 DeliveryMode.PERSISTENT,否则消息可能丢失。
    • 示例:关键业务消息(如订单支付通知)必须持久化。
  2. 事务支持

    • 生产者和消费者可通过事务确保消息的原子性(发送/接收一致性)。
    • 风险:长事务可能导致性能下降。
  3. 死信队列(DLQ)

    • 配置 deadLetterExchange 和 deadLetterRoutingKey 处理无法消费的消息。
    • 示例:超过重试次数的消息自动进入 DLQ。

3. RabbitMQ

传递机制

  • 模型:灵活的消息路由模型,基于 交换器(Exchange) 和 绑定(Binding)
  • 核心流程
    1. 生产者将消息发送到交换器,并附带路由键(Routing Key)。
    2. 交换器根据类型(如 Direct、Topic、Headers)将消息路由到绑定的队列。
    3. 消费者从队列中拉取消息。

示例代码(RabbitMQ 生产者Producers)

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {Channel channel = connection.createChannel();String exchangeName = "direct-exchange";channel.exchangeDeclare(exchangeName, "direct");String routingKey = "user.login";AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2) // 持久化.build();channel.basicPublish(exchangeName, routingKey, props, "Login Event".getBytes());
}

示例代码(RabbitMQ 消费者Consumers)

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {Channel channel = connection.createChannel();String queueName = "user_queue";channel.queueDeclare(queueName, true, false, false, null);String exchangeName = "direct-exchange";channel.queueBind(queueName, exchangeName, "user.login");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received: " + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
}

注意事项

  1. 消息确认机制

    • 消费者需发送 ACK 确认消息处理,避免重复消费。
    • 示例:使用 channel.basicAck() 或 channel.basicNack()
  2. 死信队列配置

    • 在队列声明时配置 x-dead-letter-exchange 和 x-dead-letter-routing-key
    • 示例:处理失败的消息进入专用队列。
  3. 内存限制

    • RabbitMQ 默认限制队列大小为内存中的一定比例,需根据业务调整 vm_memory_high_watermark

4. RocketMQ

传递机制

  • 模型:基于 主题(Topic) 和 队列(Queue) 的分布式模型。
  • 核心流程
    1. 生产者Producers发送消息到主题,主题将消息路由到多个队列(负载均衡)。
    2. 消费者Consumers通过消费者组(Consumer Group)订阅主题,从队列中拉取消息。
    • 顺序消息:同一队列内的消息按顺序消费。
    • 广播消息:消费者组内每个消费者都收到同一条消息(仅限 Topic 模型)。

示例代码(RocketMQ 生产者Producers)

DefaultMQProducer producer = new DefaultMQProducer("my-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();Message msg = new Message("my-topic", "Order-123".getBytes(), "JSON".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println("Send Result: " + sendResult);producer.shutdown();

示例代码(RocketMQ 消费者Consumers)

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("my-topic", "*"); // 订阅所有队列consumer.registerMessageListener(new MessageListener() {@Overridepublic void consume(Message msg, ConsumeContext context) throws Exception {System.out.println("Received: " + new String(msg.getBody()));context.commitMessage(msg); // 提交消费位移}
});
consumer.start();

注意事项

  1. 事务消息

    • 生产者和消费者可通过事务确保消息的最终一致性。
    • 示例:订单创建成功后,发送支付通知(若失败则回滚)。
  2. 消息顺序性

    • 严格顺序场景需指定 MessageQueueSelector,确保同一订单的所有消息进入同一队列。
  3. 消息堆积

    • 消费者处理能力不足时,消息会堆积在队列中,需监控并扩容消费者实例。

总结对比

特性KafkaActiveMQRabbitMQRocketMQ
模型发布-订阅(仅 Topic)支持点对点和发布-订阅灵活路由(多种交换器)主题+队列(顺序/广播)
持久化支持分区副本支持消息持久化和事务支持队列和消息持久化支持消息持久化和事务
顺序性单分区有序不保证(除非事务)可通过队列保证单队列严格有序
适用场景高吞吐、日志/事件流通用、企业级消息系统复杂路由、多协议支持高可靠、顺序消息、分布式事务

通用注意事项

  1. 消息幂等性:防止重复消费(如订单支付场景)。
  2. 监控与告警:关注队列长度、消息堆积、消费者延迟。
  3. 序列化与压缩:选择高效的序列化方式(如 Protobuf)和压缩算法(如 GZIP)。
  4. 连接池管理:避免频繁创建/关闭连接,影响性能。

5、注意MQ的Kafka、ActiveMQ、RabbitMQ、RocketMQ区别;

        URL: 浅识MQ的 Kafka、ActiveMQ、RabbitMQ、RocketMQ区别-CSDN博客

6、注意:持久化策略

        URL:浅聊MQ之Kafka、RabbitMQ、ActiveMQ、RocketMQ持久化策略-CSDN博客

   

(望各位潘安、各位子健/各位彦祖、于晏不吝赐教!多多指正!🙏)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/71736.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Websocket——心跳检测

1. 前言&#xff1a;为什么需要心跳机制&#xff1f; 在现代的实时网络应用中&#xff0c;保持客户端和服务端的连接稳定性是非常重要的。尤其是在长时间的网络连接中&#xff0c;存在一些异常情况&#xff0c;导致服务端无法及时感知到客户端的断开&#xff0c;可能造成不必要…

tailwindcss 前端 css 框架 无需写css 快速构建页面

版本&#xff1a;VUE3 TS 框架 vite 文章中使用tailwindcss 版本&#xff1a; ^3.4.17 简介&#xff1a; Tailwind CSS 一个CSS 框架&#xff0c;提供组件化的样式&#xff0c;直接在HTML 中编写样式&#xff0c;无需额外自定义CSS &#xff0c;快速&#xff01; 简洁&#…

MFC开发:如何创建第一个MFC应用程序

文章目录 一、概述二、MFC 的主要组件三、创建一个MFC窗口四、控件绑定消息函数 一、概述 MFC 是微软提供的一个 C 类库&#xff0c;用于简化 Windows 应用程序的开发。它封装了 Windows API&#xff0c;提供面向对象的接口&#xff0c;帮助开发者更高效地创建图形用户界面&am…

【Git版本控制器】第四弹——分支管理,合并冲突,--no-ff,git stash

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;Linux网络编程 &#x1f337;追光的人&#xff0c;终会万丈光芒 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 ​ 相关笔记&#xff1a; https://blog.csdn.net/djd…

AI助力小微企业技术开发规范化管理 | 杂谈

AI助力小微企业技术开发规范化管理 在小型技术研发企业中&#xff0c;人员配置紧张&#xff0c;往往一名员工需要承担多项职务和任务。例如&#xff0c;后端程序开发人员可能同时要负责需求调研、数据库设计、后端设计及开发&#xff0c;甚至在某些情况下还需兼任架构师的角色。…

SpringBoot+Vue+微信小程序的猫咖小程序平台(程序+论文+讲解+安装+调试+售后)

感兴趣的可以先收藏起来&#xff0c;还有大家在毕设选题&#xff0c;项目以及论文编写等相关问题都可以给我留言咨询&#xff0c;我会一一回复&#xff0c;希望帮助更多的人。 系统介绍 在当下这个高速发展的时代&#xff0c;网络科技正以令人惊叹的速度不断迭代更新。从 5G …

DeepSeek提效实操革命,全场景应用指南 AI提示词万能公式四步法以及对话技巧

欢迎来到涛涛聊AI DeepSeek系列文章 三块显示器如何摆放效率最高&#xff0c;让deepseek给深度思考下 阿里云免费试用 DeepSeek大模型。 限时送 100 万 tokens&#xff0c;快来抢先免费体验&#xff01;AI 助手不再出现系统繁忙阿里云免费试用 DeepSeek大模型。 限时送 100 万 …

智慧教室与无纸化同屏技术方案探讨与实现探究

引言 随着教育信息化的不断发展&#xff0c;智慧教室和无纸化同屏技术逐渐成为提升教学效率和质量的重要手段。大牛直播SDK凭借其强大的音视频处理能力和丰富的功能特性&#xff0c;在智慧教室和无纸化同屏领域积累了众多成功案例。本文将深入探讨基于大牛直播SDK的智慧教室、…

Linux MySQL 8.0.29 忽略表名大小写配置

Linux MySQL 8.0.29 忽略表名大小写配置 问题背景解决方案遇到的问题&#xff1a; 问题背景 突然发现有个大写的表报不存在。 在Windows上&#xff0c;MySQL是默认支持忽略大小写的。 这个时候你要查询一下是不是没有配置&#xff1a; SHOW VARIABLES LIKE lower_case_table…

【蓝桥杯单片机】第十三届省赛第二场

一、真题 二、模块构建 1.编写初始化函数(init.c) void Cls_Peripheral(void); 关闭led led对应的锁存器由Y4C控制关闭蜂鸣器和继电器 2.编写LED函数&#xff08;led.c&#xff09; void Led_Disp(unsigned char ucLed); 将ucLed取反的值赋给P0 开启锁存器 关闭锁存…

【CMake 教程】常用函数与构建案例解析(三)

一、CMake 常用函数简析 1. 条件判断 if() / elseif() / else() 在 CMake 脚本中&#xff0c;条件判断是控制逻辑的重要工具。if() 支持多种比较语句&#xff0c;包括数值、字符串、布尔值和变量存在性等。在条件满足时执行特定逻辑代码&#xff0c;下面是典型语法&#xff1…

ASP.NET Core 8.0学习笔记(二十七)——数据迁移:Migrations深入与其他迁移命令

一、数据库架构的管理 1.EF Core提供两种方式来保持EF Core的模型与数据库保持同步。 (1)以数据库为准&#xff1a;反向工程&#xff08;Db First&#xff09;&#xff0c;适用于中大型工程 (2)以代码为准&#xff1a;数据迁移&#xff08;Code First&#xff09;&#xff0c;…

Python 基本语法的详细解释

目录 &#xff08;1&#xff09;注释 &#xff08;2&#xff09;缩进 &#xff08;3&#xff09;变量和数据类型 变量定义 数据类型 &#xff08;4&#xff09;输入和输出 输出&#xff1a;print() 函数 输入&#xff1a;input() 函数 &#xff08;1&#xff09;注释 注…

20-R 绘图 - 饼图

R 绘图 - 饼图 R 语言提供来大量的库来实现绘图功能。 饼图&#xff0c;或称饼状图&#xff0c;是一个划分为几个扇形的圆形统计图表&#xff0c;用于描述量、频率或百分比之间的相对关系。 R 语言使用 pie() 函数来实现饼图&#xff0c;语法格式如下&#xff1a; pie(x, l…

Ubuntu 22.04 一键部署MinerU1.1.0

MinerU MinerU是一款将PDF转化为机器可读格式的工具&#xff08;如markdown、json&#xff09;&#xff0c;可以很方便地抽取为任意格式。 MinerU诞生于书生-浦语的预训练过程中&#xff0c;我们将会集中精力解决科技文献中的符号转化问题&#xff0c;希望在大模型时代为科技发…

紫光同创开发板使用教程(二):sbit文件下载

sbit文件相当于zynq里面的bit文件&#xff0c;紫光的fpga工程编译完成后会自动生成sbit文件&#xff0c;因工程编译比较简单&#xff0c;这里不在讲解工程编译&#xff0c;所以我这里直接下载sbit文件。 1.工程编译完成后&#xff0c;可以看到Flow列表里面没有报错&#xff0c…

DeepSeek 部署全指南:常见问题解析与最新技术实践

引言 随着开源大模型DeepSeek的爆火&#xff0c;其部署需求激增&#xff0c;但用户在实际操作中常面临服务器压力、本地部署性能瓶颈、API配置复杂等问题。本文结合2025年最新技术动态&#xff0c;系统梳理DeepSeek部署的核心问题与解决方案&#xff0c;并分享行业实践案例&am…

Vue02

Vue02 绑定class样式 字符串写法&#xff0c;适用于&#xff1a;样式的类名不确定&#xff0c;需要动态指定 数组写法&#xff0c;适用于&#xff1a;要绑定的样式个数不确定&#xff0c;名字也不确定 对象写法&#xff0c;适用于&#xff1a;要绑定的样式个数缺点&#xff…

超导量子计算机的最新进展:走向实用化的量子革命

超导量子计算机的最新进展:走向实用化的量子革命 大家好,我是 Echo_Wish,今天我们来聊聊科技圈最炙手可热的话题之一——超导量子计算机。近年来,量子计算领域可谓是风起云涌,而超导量子计算机作为主流路线之一,已经在学术界和工业界取得了不少突破性进展。 那么,超导…

LangChain构建行业知识库实践:从架构设计到生产部署全指南

文章目录 引言:行业知识库的进化挑战一、系统架构设计1.1 核心组件拓扑1.2 模块化设计原则二、关键技术实现2.1 文档预处理流水线2.2 混合检索增强三、领域适配优化3.1 医学知识图谱融合3.2 检索结果重排序算法四、生产环境部署4.1 性能优化方案4.2 安全防护体系五、评估与调优…