RabbitMQ 的介绍与使用

一. 简介

1> 什么是MQ

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。

那么为什么会产生消息队列呢?有几个原因:

  • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

  • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。

2> 什么是RabbitMQ

RabbitMQ 是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要寄的邮件放入邮箱时,您可以确信信使最终会将邮件发送给您的收件人。在本例中,RabbitMQ 是邮箱、邮局和信使。

RabbitMQ 与邮局的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块——消息

RabbitMQ 和一般意义上的消息传递使用了一些术语。

  • 生产_仅仅意味着发送。发送消息的程序称为_生产者

  • _队列_是 RabbitMQ 中邮箱的名称。虽然消息会流经 RabbitMQ 和您的应用程序,但它们只能存储在_队列_中。_队列_仅受主机内存和磁盘限制的约束,它本质上是一个大型消息缓冲区。许多_生产者_可以发送消息到一个队列,并且许多_消费者_可以尝试从一个_队列_中接收数据。这就是我们表示队列的方式

其是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

3> 相关概念

通常我们谈到队列服务,会有三个概念:发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上,多做了一层抽象,在发消息者和队列之间,加入了交换器 (Exchange)。这样发消息者和队列就没有直接联系,转而变成发消息者把消息给交换器,交换器根据调度策略再把消息给队列。那么,其中比较重要的概念有4个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单, RabbitMQ 当中,用户只能在虚拟主机的 粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ 服务器 都有一个默认的虚拟主机“/”。
  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的 消息。这里有一个比较重要的概念:路由键。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据
    该路由键。
  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

二. 实现

Spring Boot 集成 RabbitMQ

Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。

1. 简单使用
1>配置 pom 包,主要是添加 spring-boot-starter-amqp 的支持
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2>配置文件application.yml

配置 RabbitMQ 的安装地址、端口以及账户信息

# 配置文件 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 # RabbitMQ配置 rabbitmq: host: 192.168.146.1 port: 5672 username: admin password: 123456

我这里还配置了数据库

3>队列配置
package com.nianxi.mybatisplus.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } }
4>发送者

rabbitTemplate 是 Spring Boot 提供的默认实现

package com.nianxi.mybatisplus.mapper; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } }
5>接收者
package com.nianxi.mybatisplus.mapper; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
6> 测试
package com.nianxi.mybatisplus; import com.nianxi.mybatisplus.mapper.HelloSender; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class RabbitMqHelloTest { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } }

注意:发送者和接收者的 queue name 必须一致,不然不能接收

2.RabbitTemplate

**RabbitTemplate**是SpringAMQP提供的一个高级消息操作模板,**用于在与RabbitMQ进行交互时进行消息的发送和接收操作。**它是对底层AMQP协议的封装,简化了与RabbitMQ的交互过程, 是SpringAMQP中的核心类,提供声明式方式处理RabbitMQ,包括发送和接收消息、消息转换、属性设置及回调机制。通过配置和正确使用,简化了RabbitMQ的集成与操作。

1> 发送消息

**RabbitTemplate**提供了多种发送消息的方法,包括同步发送和异步发送。通过指定交换机、路由键和消息体,我们可以将消息发送到 RabbitMQ 服务器上的指定位置。此外,RabbitTemplate还支持消息的确认机制,以确保消息被成功发送和接收。

rabbitTemplate.convertAndSend("exchangeName", "routingKey", message);
2> 接收消息

除了发送消息外,**RabbitTemplate**还提供了接收消息的功能。通过调用相关方法,我们可以从指定的队列中接收消息,并进行相应的处理。这通常涉及到监听队列、处理消息和确认消息接收等步骤。

Message receivedMessage = rabbitTemplate.receive("queueName"); MyMessage myMessage = rabbitTemplate.receiveAndConvert("queueName", MyMessage.class);
3> 消息转换

**RabbitTemplate支持消息的自动转换。这意味着我们可以将 Java 对象作为消息体发送,而RabbitTemplate会自动将其转换为可序列化的格式(如 JSON 或 XML)。同样地,当从队列中接收消息时,RabbitTemplate**也可以自动将消息体转换回 Java 对象。

Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(); rabbitTemplate.setMessageConverter(messageConverter);
4> 消息属性设置

在发送消息时,我们可以设置各种消息属性,如消息的优先级、持久化标志、过期时间等。这些属性可以通过**MessageProperties对象进行设置,并在发送消息时传递给RabbitTemplate**。

import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange, String routingKey, String message, int priority, boolean persistent, int ttl) { // 创建MessageProperties MessageProperties properties = new MessageProperties(); // 设置优先级,值范围0-9,其中0为最低优先级,9为最高优先级 properties.setPriority(priority); // 设置消息持久化 properties.setDeliveryMode(persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); // 设置消息的过期时间,单位为毫秒 properties.setExpiration(String.valueOf(ttl)); // 使用MessageBuilder构建Message对象 Message msg = MessageBuilder.withBody(message.getBytes()) .setContentEncoding("UTF-8") .setContentType("text/plain") .setMessageId(UUID.randomUUID().toString()) // 可选,设置消息ID .setTimestamp(new Date()) // 可选,设置时间戳 .setHeaders(Collections.singletonMap("x-custom-header", "value")) // 可选,设置自定义头 .andProperties(properties) .build(); // 发送消息 rabbitTemplate.convertAndSend(exchange, routingKey, msg); } }
5> 回调机制

**RabbitTemplate**支持发送消息时的回调机制。这意味着在发送消息后,我们可以注册一个回调函数来处理发送结果或接收响应。这对于需要异步处理发送结果或接收响应的场景非常有用。

**setConfirmCallback方法是RabbitTemplate**类中的一个回调方法,用于处理消息的确认(acknowledgment)结果。当消息成功发送到RabbitMQ的交换机时,会触发确认回调,你可以在该回调中处理相应的逻辑。

  • correlationData:关联数据,可以是任意类型的对象,通常用于唯一标识消息。

  • ack:布尔值,表示消息是否成功发送到交换机。true表示成功,false表示失败。

  • cause:失败的原因,当ackfalse时,此参数会提供一个可选的异常信息。

    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
    // 消息发送成功
    System.out.println(“Message sent successfully”);
    } else {
    // 消息发送失败,进行处理
    System.out.println("Message sent failed: " + cause);
    }
    });

6> 异步消息处理

RabbitTemplate支持异步消息处理,你可以注册ConfirmCallbackReturnCallback来处理消息的确认和返回结果。ConfirmCallback用于确认消息是否成功发送到交换机,ReturnCallback用于处理无法路由到队列的消息。

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { // 消息发送成功 } else { // 消息发送失败,进行处理 } }); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 处理无法路由到队列的消息 });
3.使用 RabbitTemplate 的步骤
1> 配置 RabbitTemplate

在使用**RabbitTemplate**之前,我们需要对其进行配置。这通常涉及到设置连接工厂、交换机、队列和绑定等。这些配置可以通过 XML 配置或 Java 配置完成。

2> 创建 RabbitTemplate 实例

一旦配置完成,我们可以创建一个**RabbitTemplate**实例。这个实例将使用我们提供的配置来与 RabbitMQ 服务器进行交互。

3> 发送消息

使用**RabbitTemplate**的发送方法,我们可以将消息发送到指定的交换机和路由键。我们可以指定消息体、消息属性和其他发送选项。

4> 接收消息

要接收消息,我们可以使用**RabbitTemplate**的接收方法或结合监听器来监听指定的队列。当消息到达时,我们可以处理消息并执行相应的业务逻辑。

5> 处理异常和错误

在使用**RabbitTemplate**时,我们还需要考虑异常和错误处理。例如,当发送消息失败或接收消息时发生异常时,我们需要有相应的处理机制来确保系统的稳定性和可靠性。

4.RabbitTemplate 的优势与注意事项
优势
  1. 简化操作RabbitTemplate封装了底层细节,使得开发者能够专注于业务逻辑的实现,而无需关心底层的消息传输细节。
  2. 灵活性RabbitTemplate提供了丰富的配置选项和扩展点,使得开发者能够根据实际需求进行定制和优化。
  3. 性能优化RabbitTemplate内部进行了性能优化,如连接池管理、消息缓存等,以提高消息传输的效率和可靠性。
注意事项
  1. 配置正确性:确保RabbitTemplate的配置正确无误,包括连接工厂、交换机、队列和绑定等的设置。错误的配置可能导致消息无法正确发送或接收。
  2. 异常处理:在使用RabbitTemplate时,要充分考虑异常处理机制,确保在发生异常时能够及时发现并处理。
  3. 资源释放:在使用完RabbitTemplate后,要确保释放相关资源,如关闭连接、释放连接池中的连接等,以避免资源泄漏和性能问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1141642.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RabbitMQ HAProxy 负载均衡

文章目录 前言当Java中指定的端口号绑定的rabbitmq服务挂掉了之后&#xff0c;我们的程序是否还能够成功访问到rabbitmq服务呢什么是 HAProxy 负载均衡HAProxy 安装修改HAProxy配置文件使用HAProxy结论 前言 前面我们学习了 rabbitmq 搭建集群&#xff0c;并且为了解决集群中…

RISC架构下实时操作系统移植:项目应用

RISC架构下实时操作系统移植&#xff1a;从原理到实战的深度实践在工业自动化、智能驾驶和边缘计算飞速发展的今天&#xff0c;嵌入式系统早已不再是“跑个循环”的简单设备。越来越多的应用要求毫秒级响应、任务间精确协同、资源高效调度——这些正是实时操作系统&#xff08;…

STM32在Proteus 8 Professional中的仿真可行性深度剖析

STM32能在Proteus里“跑起来”吗&#xff1f;——一次不绕弯的仿真实战复盘最近带学生做课程设计&#xff0c;又碰上了那个老问题&#xff1a;“老师&#xff0c;我还没拿到开发板&#xff0c;能不能先用Proteus仿真一下STM32的代码&#xff1f;”这问题听着简单&#xff0c;但…

从零开始:使用Hadoop处理物联网数据的完整指南

从零开始&#xff1a;使用Hadoop处理物联网数据的完整指南关键词&#xff1a;Hadoop、物联网数据、数据处理、分布式计算、大数据摘要&#xff1a;本文旨在为读者提供一份从零基础开始&#xff0c;使用Hadoop处理物联网数据的完整指南。首先介绍了物联网数据处理的背景和使用Ha…

CAPL实现远程诊断请求自动响应:实战案例

用CAPL打造“会说话”的虚拟ECU&#xff1a;远程诊断自动响应实战全解析你有没有遇到过这样的场景&#xff1f;新项目刚启动&#xff0c;硬件还没影儿&#xff0c;测试团队却急着要验证诊断协议&#xff1b;或者产线检测卡在某个负响应逻辑上&#xff0c;真实ECU死活不肯配合复…

Betaflight在F4飞控板上的配置优化:全面讲解

Betaflight在F4飞控板上的配置优化&#xff1a;从底层机制到飞行手感的全面调校 你有没有过这样的体验&#xff1f;——刚组装好一台穿越机&#xff0c;装上高端电机、轻量化机架、碳纤螺旋桨&#xff0c;结果一飞起来却“软绵无力”&#xff0c;转弯拖泥带水&#xff0c;油门…

永磁同步电机无差拍预测控制加延时补偿:探索高效电机控制之路

永磁同步电机无差拍预测控制加延时补偿在电机控制领域&#xff0c;永磁同步电机&#xff08;PMSM&#xff09;凭借其高功率密度、高效率等优点&#xff0c;广泛应用于工业、交通等众多领域。而如何实现对PMSM的精准控制&#xff0c;一直是研究的热点。今天咱们就来聊聊永磁同步…

[内网流媒体] 零信任理念在内网工具中的落地

零信任的核心 零信任强调“永不信任,始终验证”。即便在内网,也假设网络不可信、设备不可信、用户可能被劫持。对实时画面工具,零信任的落地关乎访问控制、最小权限和持续验证。 落地原则 身份优先 所有访问都需身份验证(口令/Token/单点登录),不提供匿名入口。 最小权…

Kafka Connect详解:大数据ETL的得力助手

Kafka Connect详解&#xff1a;大数据ETL的得力助手 关键词&#xff1a;Kafka Connect、ETL、数据管道、连接器、分布式系统、数据集成、大数据 摘要&#xff1a;本文将深入探讨Kafka Connect的核心概念和工作原理&#xff0c;这个专为Apache Kafka设计的可扩展、可靠的数据集成…

vh6501测试busoff:硬件工程师实战案例解析

vh6501测试Bus-Off&#xff1a;硬件工程师的实战指南从一个真实问题说起某新能源车型在路试中偶发“整车通信中断”故障&#xff0c;仪表黑屏、动力降级。售后排查未发现硬件损坏&#xff0c;日志显示BMS模块突然停止发送报文&#xff0c;但其他节点并未崩溃。最终定位到&#…

模拟电子技术驱动的振荡器设计:从零实现教程

从零构建一个正弦波振荡器&#xff1a;模拟电路的艺术与实战 你有没有试过&#xff0c;只用几个电阻、电容和一块运放&#xff0c;让电路“自己”发出稳定的正弦波&#xff1f;没有单片机、没有代码、也没有复杂的数字逻辑——一切全靠模拟反馈的精妙平衡。这正是 文氏桥振荡器…

Keil下载与串口烧录模式对比图解说明

Keil下载与串口烧录&#xff1a;从开发到量产的程序写入全解析 在嵌入式系统的世界里&#xff0c;代码写得再漂亮&#xff0c;最终也得“刷进去”才算真正落地。而如何把编译好的固件可靠、高效地写入MCU Flash&#xff0c;是每个工程师都绕不开的问题。 面对琳琅满目的工具和…

手把手解析74194四位移位寄存器引脚定义

从零搞懂74194&#xff1a;一块芯片如何让数据“左右横跳”&#xff1f;你有没有想过&#xff0c;那些会流动的LED灯、键盘扫描电路&#xff0c;甚至老式收音机的频道指示条&#xff0c;是怎么实现“一个亮完下一个亮”的&#xff1f;背后藏着一种看似不起眼却极为关键的数字器…

[内网流媒体] 从审计视角看内网服务设计

审计关注什么 谁在什么时候访问了什么资源; 是否有未经授权的访问; 是否符合公司安全/合规要求; 发生问题时能否追溯责任与影响范围。 关键设计点 访问日志 记录时间、IP、路径/流标识、状态码、鉴权结果、User-Agent。 按天滚动,统一时间格式,便于分析与留存。 身份与权…

七段数码管显示数字:基于STM32的硬件连接说明

从点亮一个“8”开始&#xff1a;深入理解STM32驱动七段数码管的底层逻辑 你有没有试过&#xff0c;第一次用单片机点亮一个数字时的那种兴奋&#xff1f; 不是OLED上绚丽的图形&#xff0c;也不是串口打印出的一行数据——而是当你按下复位键&#xff0c;那几个红红的“ 8 …

openmv与stm32通信入门必看:手把手教程(从零实现)

OpenMV与STM32通信实战指南&#xff1a;从零搭建视觉控制系统当你的小车开始“看见”世界想象这样一个场景&#xff1a;你面前的小车不需要遥控&#xff0c;自己就能锁定红色球并追着跑&#xff1b;仓库里的机械臂看到二维码就知道该往哪搬货&#xff1b;机器人通过手势识别理解…

操作指定目录下的文件,对特定参数赋值,接口函数

操作指定目录下的文件,对特定参数赋值,接口函数 操作 /usrdata/root/params.ini文件 并对某些参数赋值 这里为 record_stream参数赋值 #include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h>#define PARAM_FILE "…

MATLAB仿真bp神经网络预测电力负荷 商品形式:程序 实现功能:使用前几日负荷数据预测未来...

MATLAB仿真bp神经网络预测电力负荷 商品形式&#xff1a;程序 实现功能&#xff1a;使用前几日负荷数据预测未来负荷数据 使用bp神经网络 得到误差分析图电力负荷预测这活儿挺有意思的&#xff0c;咱们今天用MATLAB整点实际的。先说说思路&#xff1a;拿前7天的负荷数据当输入…

[内网流媒体] 能长期使用的内网工具具备哪些特征

长期可用性的核心要素 稳定性与可恢复 崩溃自动重启;采集/编码异常可回退;健康检查可观测。 可配置与可调优 分辨率/帧率/质量/端口/鉴权均可配置,且有安全上限。 安全与合规 默认有口令/网段限制/日志;支持审计与合规要求。 可维护与可升级 配置管理、版本化;兼容性考虑,…

Keil5开发环境搭建:手把手教程(从零配置)

Keil5开发环境搭建&#xff1a;从零开始的实战指南你有没有过这样的经历&#xff1f;买了一块崭新的STM32开发板&#xff0c;兴致勃勃地打开电脑准备“点灯”&#xff0c;结果卡在第一步——Keil打不开、编译报错一堆、下载程序失败……最后只能对着闪烁的ST-Link指示灯发呆。别…