消息队列 - 解析 RabbitMQ 的运行机制

在线工具站
  • 推荐一个程序员在线工具站:程序员常用工具(http://cxytools.com),有时间戳、JSON格式化、文本对比、HASH生成、UUID生成等常用工具,效率加倍嘎嘎好用。
程序员资料站
  • 推荐一个程序员编程资料站:程序员的成长之路(http://cxyroad.com),收录了一些列的技术教程、各大面试专题,还有常用开发工具的教程。
小报童专栏精选Top100
  • 推荐一个小报童专栏导航站:小报童精选Top100(http://xbt100.top),收录了生财有术项目精选、AI海外赚钱、纯银的产品分析等专栏,陆续会收录更多的专栏,欢迎体验~

在现代分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统各个组件,还可以提高系统的伸缩性和容错性。RabbitMQ 作为流行的消息队列中间件,以其稳定性和灵活性广受开发者欢迎。

一、RabbitMQ 简介

RabbitMQ 是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP),并支持多种消息传输协议。RabbitMQ 使用 Erlang 语言编写,具有高并发、高可用的特点。它可以运行在多个操作系统上,并支持多种编程语言的客户端库。

二、RabbitMQ 的核心概念

在深入 RabbitMQ 的运行机制之前,我们需要先了解一些核心概念:

  1. Producer(生产者):消息的发送方,负责将消息发送到交换器(Exchange)。
  2. Consumer(消费者):消息的接收方,负责从队列(Queue)中获取并处理消息。
  3. Exchange(交换器):接收来自生产者的消息,并根据绑定规则将消息路由到一个或多个队列。常见的交换器类型有 direct、fanout、topic 和 headers。
  4. Queue(队列):用于存储消息,消费者可以从队列中获取消息进行处理。
  5. Binding(绑定):定义了交换器和队列之间的关系,确定消息的路由规则。

三、RabbitMQ 的运行机制

RabbitMQ 的运行机制可以分为消息生产、消息路由和消息消费三个阶段。

1. 消息生产

消息生产者负责将消息发送到 RabbitMQ 服务器。生产者通过连接(Connection)和信道(Channel)与 RabbitMQ 通信。连接是物理 TCP 连接,而信道是建立在连接之上的虚拟连接,用于减少连接建立和销毁的开销。

生产者发送消息的过程:
  1. 创建连接和信道

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 连接和信道创建完成
    }
    
  2. 声明交换器和队列

    channel.exchangeDeclare("exchange_name", "direct");
    channel.queueDeclare("queue_name", true, false, false, null);
    channel.queueBind("queue_name", "exchange_name", "routing_key");
    
  3. 发送消息

    String message = "Hello, RabbitMQ!";
    channel.basicPublish("exchange_name", "routing_key", null, message.getBytes());
    

2. 消息路由

消息到达 RabbitMQ 服务器后,首先进入交换器。交换器根据绑定规则和路由键将消息路由到相应的队列。不同类型的交换器有不同的路由策略:

  • Direct Exchange:根据完全匹配的路由键将消息发送到相应的队列。
  • Fanout Exchange:将消息广播到所有绑定到该交换器的队列,不关心路由键。
  • Topic Exchange:根据通配符匹配的路由键将消息发送到相应的队列。
  • Headers Exchange:根据消息头中的属性匹配,将消息路由到相应的队列。

3. 消息消费

消费者通过订阅队列来接收消息。消费者可以是一个服务或应用程序,它们通过连接和信道与 RabbitMQ 服务器通信,从队列中获取消息进行处理。

消费者接收消息的过程:
  1. 创建连接和信道

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 连接和信道创建完成
    }
    
  2. 声明队列

    channel.queueDeclare("queue_name", true, false, false, null);
    
  3. 订阅消息

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received message: " + message);
    };
    channel.basicConsume("queue_name", true, deliverCallback, consumerTag -> {});
    

四、RabbitMQ 的高级特性

除了基本的消息生产、路由和消费,RabbitMQ 还提供了一些高级特性,帮助开发者构建更复杂和可靠的消息系统。

1. 消息确认(Message Acknowledgment)

消息确认机制确保消息在被消费者成功处理后才从队列中移除。消费者可以手动确认消息,防止消息丢失。

boolean autoAck = false;
channel.basicConsume("queue_name", autoAck, (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");// 处理消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});

2. 死信队列(Dead Letter Queue)

当消息被拒绝、过期或队列达到最大长度时,可以将消息转发到死信队列进行特殊处理。

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");
channel.queueDeclare("queue_name", true, false, false, args);

3. 消息持久化(Message Durability)

将消息和队列设置为持久化,确保在 RabbitMQ 重启后消息不会丢失。

channel.queueDeclare("queue_name", true, false, false, null);
channel.basicPublish("exchange_name", "routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

4. 流量控制(Flow Control)

RabbitMQ 提供了流量控制机制,防止生产者发送过多消息导致消费者处理不过来。可以通过信道上的 QoS 设置来限制每次传递的消息数。

channel.basicQos(1);

五、RabbitMQ 的监控和管理

RabbitMQ 提供了丰富的监控和管理工具,帮助开发者维护和优化消息系统。

1. 管理插件(Management Plugin)

RabbitMQ 提供了一个管理插件,可以通过 Web 界面查看队列、交换器、连接、信道等信息,并执行管理操作。

rabbitmq-plugins enable rabbitmq_management

管理界面可以通过浏览器访问:http://localhost:15672,默认用户名和密码均为 guest

2. CLI 工具(Command Line Interface)

RabbitMQ 提供了一组命令行工具,用于管理和监控 RabbitMQ 实例。

# 查看队列状态
rabbitmqctl list_queues
# 查看交换器状态
rabbitmqctl list_exchanges
# 查看连接状态
rabbitmqctl list_connections

3. 监控指标(Monitoring Metrics)

RabbitMQ 提供了丰富的监控指标,可以集成到 Prometheus、Grafana 等监控系统中,实时监控 RabbitMQ 的性能和健康状态。

# 启用 Prometheus 插件
rabbitmq-plugins enable rabbitmq_prometheus

六、总结

RabbitMQ 是一个功能强大、灵活易用的消息队列中间件,它通过生产者、交换器、队列和消费者等核心组件实现了高效的消息传递。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/856345.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

xshell使用vi命令:bash:vim:command not found

你们好&#xff0c;我是金金金。 场景 此时我通过xshell客户端连接到了远程的虚拟机。想用vi命令编辑一个文件时&#xff0c;显示&#xff1a;bash: vim: command not found 排查 看报错提示就可以知道&#xff0c;没找到vim命令 解决 使用包管理器 apt 来安装 vim 更新你的软…

大模型开发Embedding技术介绍

什么是Embedding&#xff1f; 在自然语言处理&#xff08;NLP&#xff09;和机器学习中&#xff0c;Embedding 是一种将高维数据映射到低维连续空间的技术。Embedding 允许我们将词语、句子或其他类型的数据表示成向量&#xff0c;这些向量捕捉了数据的语义和上下文信息。 Em…

数据结构(中)

完全二叉树的第6层有10个结点&#xff0c;那么有&#xff08;21&#xff09;个叶子结点。 10-52*2*2*2 设树中某结点不是根结点&#xff0c;则离它最近的祖先结点是双亲结点 一颗有5个结点的深度为3的二叉树采用顺序存储方式存储&#xff0c;存储数组的大小至少为7 看深度&…

民宿小程序在线预约系统开发,提高品牌影响力

在旅游业发展旺盛的当下&#xff0c;也带动了各地民宿的发展。在科技的支持下&#xff0c;民宿小程序得到了快速发展&#xff0c;凭借方便快捷的优势为大众带来新的体验。 民宿小程序的发展为用户提供了便捷的预订渠道&#xff0c;用户可以根据对房间的要求选择&#xff0c;能…

深(广)度优先遍历

994. 腐烂的橘子 BFS &#xff08;广度优先搜索&#xff09;可以看成是层序遍历。从某个结点出发&#xff0c;BFS 首先遍历到距离为 1 的结点&#xff0c;然后是距离为 2、3、4…… 的结点。因此&#xff0c;BFS 可以用来求最短路径问题。BFS 先搜索到的结点&#xff0c;一定是…

[HGAME 2022 week1]Matryoshka(古典密码混合)

题目&#xff1a; 直接说方法&#xff1a; 首先这是一段盲文&#xff0c;要先将盲文反转&#xff0c; 然后再用摩斯密码转换 将得到的字符串去掉“,”后&#xff0c;base16解码 在尝试维吉尼亚密码 再用base64解码 然后用凯撒密码 最后栅栏密码&#xff08;22栏&#xff09;

第5章 不确定性与风险分析 作业

第5章 不确定性与风险分析 作业 一单选题&#xff08;共25题&#xff0c;100分&#xff09; (单选题)当产销量( )盈亏平衡点时,销售收入()总成本。 A. 大于,大于 B. 等于,小于 C. 小于,大于 D. 大于,小于 正确答案: A:大于,大于; (单选题)已知单位产品售价为P,年固定成本为F,…

Linux-账号和权限管理

目录 一、管理用户账号 1、用户账号类型 2、UID--身份标识 3、UID的分类 ​4、用户账号文件​ 5、chage-修改账号密码 5.1、chage—使用格式&#xff1a; 5.2、chage—使用参数&#xff1a; ​6、添加用户账号与管理 6.1、useradd—添加用户 6.2、passwd—设置/修改…

【HarmonyOS NEXT】har 包的构建生成过程

Har模块文件结构 构建HAR 打包规则 开源HAR除了默认不需要打包的文件&#xff08;build、node_modules、oh_modules、.cxx、.previewer、.hvigor、.gitignore、.ohpmignore&#xff09;和.gitignore/.ohpmignore中配置的文件&#xff0c;cpp工程的CMakeLists.txt&#xff0c;…

3d隐藏模型为什么就不见了?---模大狮模型网

在3D建模和设计过程中&#xff0c;经常会遇到需要隐藏某些模型的情况。然而&#xff0c;有时候隐藏之后再也找不到这些模型了。这种情况可能让人感到困惑和沮丧。本文将探讨3D隐藏模型后“消失”的原因&#xff0c;并提供一些解决方法&#xff0c;帮助您更好地管理和查找隐藏的…

npm报错:request to https://registry.npm.taobao.org failed处理办法

npm报错&#xff1a;request to https://registry.npm.taobao.org failed处理办法 npm报错&#xff1a;request to https://registry.npm.taobao.org failed, reason certificate has expired 看提示是淘宝镜像过期了。找了一下资料&#xff0c;好像是npm 淘宝镜像已经从 regi…

在Apache Flink中,TableAggregateFunction是一种用户自定义的聚合函数,它允许你实现自定义的聚合逻辑

在Apache Flink中&#xff0c;TableAggregateFunction是一种用户自定义的聚合函数&#xff0c;它允许你实现自定义的聚合逻辑。以下是一个Java代码示例&#xff0c;展示了如何实现和使用TableAggregateFunction。 假设我们想要创建一个简单的表聚合函数&#xff0c;用于计算一…

基于深度学习的图像风格迁移

基于深度学习的图像风格迁移 图像风格迁移&#xff08;Image Style Transfer&#xff09;是一种将一幅图像的风格应用到另一幅图像的方法&#xff0c;使目标图像在保持其原有内容的同时呈现出参考图像的风格。深度学习&#xff0c;特别是卷积神经网络&#xff08;CNN&#xff…

Linux-笔记 g++: internal compiler error: Killed (program cc1plus)报错

前言 编译buildroot的时候报错了&#xff0c;通过查阅资料发现问题可能是编译器进程 cc1plus 被系统终止了。这种情况通常发生在编译过程中消耗了大量的系统资源&#xff0c;特别是内存&#xff0c;而系统为了释放资源而终止了该进程&#xff0c;如系统的物理内存&#xff08;R…

循环的结构

一.简介 循环结构&#xff0c;一般常用在while&#xff0c;do…while&#xff0c;for循环三个语法&#xff0c;但我们一般来常用的是for循环&#xff0c;while与do…while我们只需要掌握就可以。 于此同时&#xff0c;我们需要掌握一下循环控制的关键字&#xff0c;开始循环时…

服务端⾼并发分布式结构演进之路

在进行技术学习过程中&#xff0c;由于大部分读者没有经历过一些中大型系统的实际经验&#xff0c;导致无法从全局理解一些概念&#xff0c;所以本文以一个"电子商务"应用为例&#xff0c;介绍从一百个到千万级并发情况下服务端的架构的演进过程&#xff0c;同时列举…

【绝对有用】什么是I/O密集型任务 什么是CPU密集型任务,异步IO 如何提高程序的效率?

I/O密集型任务和CPU密集型任务是计算机科学中两种不同类型的工作负载&#xff0c;它们的性能瓶颈在不同的资源上。理解这两者的区别和如何利用异步I/O提高程序效率对开发高效应用程序非常重要。 I/O密集型任务 I/O密集型任务是指那些主要受限于输入/输出操作&#xff08;例如…

SpringBoot:SpringBoot集成Druid监控慢SQL

一、前言 数据库连接池是一个至关重要的组成部分&#xff0c;一个优秀的数据库连接池可以显著提高应用程序的性能和可伸缩性。常见的连接池&#xff1a;Druid、HikariCP、C3P0、DBCP等等&#xff0c;不过目前大部分都是使用Druid或者SpringBoot默认的HikariCP&#xff01; 本文…

一个完整的Flutter应用

15.2 Flutter APP代码结构 | 《Flutter实战第二版》 我们先来创建一个全新的Flutter工程&#xff0c;命名为"github_client_app" 我们在项目根目录下分别创建imgs和fonts、jsons、l10n文件夹 工程目录如下&#xff1a; 在lib下创建文件夹如下&#xff1a; 在“jso…

服务器上设置pnpm环境变量

首先&#xff0c;确认 pnpm 是否已经安装&#xff1a; ls /www/server/nodejs/v20.10.0/bin/pnpm如果输出包含 pnpm&#xff0c;那么说明 pnpm 已经安装。 如果没有看到 pnpm&#xff0c;你可能需要重新安装它&#xff1a; npm install -g pnpm接下来&#xff0c;确保 PATH …