RabbitMQ消息队列快速入门

RabbitMQ消息队列快速入门

初始MQ

MQ全称为Message Queue,即消息队列,是在消息的传输过程中保存消息的容器。它是典型的生产者-消费者模型
生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。消息的生产和消费都是异步的,可以解耦发送者和接收者之间的通信,提高系统的可扩展性和可靠性

技术选型

目比较常见的MQ实现有:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka
RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
Messaging that just works — RabbitMQ

安装

基于Docker来安装RabbitMQ

docker pull rabbitmq

运行

docker run \-e RABBITMQ_DEFAULT_USER=daybreak \-e RABBITMQ_DEFAULT_PASS=123456 \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq

在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

运行成功后,访问http://ip:15672,输入username和password即可进入管理控制台。

RabbitMQ架构

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

Spring AMQP

由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:Spring AMQP

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

交换机

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

声明队列和交换机

基于Bean方式声明
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {/*** 声明交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("daybreak.fanout");}/*** 声明队列* @return*/@Beanpublic Queue fanoutQueue(){return new Queue("fanout.queue");}/*** 绑定队列和交换机* @param fanoutQueue3* @param fanoutExchange* @return*/@Beanpublic Binding FanoutBinding3(Queue fanoutQueue, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}
}
基于注解声明

声明Direct模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue", durable = "true"),exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue(String msg){System.out.println("消费者收到了direct.queue的消息:" + msg);
}

声明Topic模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue"),exchange = @Exchange(name = "daybreak.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue(String msg){System.out.println("消费者接收到topic.queue的消息:【" + msg + "】");
}

快速入门

导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置

在application.yml中添加配置:

spring:rabbitmq:host: 192.168.200.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: daybreak # 用户名password: 123456 # 密码

配置JSON转换器

Spring的消息发送代码接收的消息体是一个Object:

在数据传输时,它会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
然而默认情况下Spring采用的序列化方式是JDK序列化。

JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

使用JSON方式序列化需要引入以下依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

接收端

package com.itheima.consumer.listeners;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MyListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "daybreak.queue", durable = "true"),exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),key = "demo"))public void listenDirectQueue(String msg){System.out.println("消费者收到了direct.queue的消息:" + msg);}
}

发送端

package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class MyPublisher {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void myTest(){String exchangeName = "daybreak.direct";String routingKey = "demo";String msg = "Hello,RabbitMQ";rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);}
}

启动发送端和接收端后,运行结果如下:

消费者收到了direct.queue的消息:Hello,RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/156153.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java实现围棋算法

围棋是一种源自中国的棋类游戏&#xff0c;也是世界上最古老、最复杂的棋类游戏之一。该游戏由黑白两方交替放置棋子在棋盘上进行&#xff0c;目的是将自己的棋子占据更多的空间&#xff0c;并将对手的棋子围死或吃掉&#xff0c;最终获得胜利。围棋不仅是一种游戏&#xff0c;…

前端面试题总结

前端面试题总结 1.[从输入url到页面加载完成中间发生了什么](https://zhuanlan.zhihu.com/p/519407969) 1.从输入url到页面加载完成中间发生了什么

【Node.js】大前端技能最通俗易懂的讲解 快速入门必看

目录 1、概述前端工具VSCode安装 2、NodeJS的安装 3、NodeJS了解和快速入门 4、NodeJS实现HttpServer服务 5、NodeJS实现操作MySQL数据库 Node.js是一个基于Chrome V8引擎的JavaScript运行环境&#xff0c;它允许开发者在服务器端执行Node.js是一个基于Chrome V8引擎的Ja…

01-了解微服务架构的演变过程和微服务技术栈

微服务 微服务架构演变 单体架构:将业务的所有功能集中在一个项目中开发最后打成一个包部署 优点: 架构简单, 部署成本低,适合小型项目缺点: 耦合度高, 升级维护困难 分布式架构:根据业务功能对系统做拆分,每个业务功能模块作为独立项目开发称为一个服务 优点: 降低服务耦合…

CSS中常用的伪类选择器

一 、伪类&#xff08;不存在的类&#xff0c;特殊的类&#xff09; -伪类用来描述一个元素的特殊状态 比如&#xff1a;第一个元素&#xff0c;被点击的元素&#xff0c;鼠标移入的元素 -特点&#xff1a;一般请情况下&#xff0c;使用&#xff1a;开头 1、 :first-child …

金融企业为啥不选择云服务器还是考虑服务器托管

尽管云主机在近年来的发展中取得了巨大的成功&#xff0c;但在金融行业中&#xff0c;一些客户仍然倾向于将服务器托管到数据中心&#xff0c;而不是使用云主机。以下是一些金融客户选择将服务器托管到数据中心的原因&#xff1a; 数据安全性&#xff1a;金融行业对数据的安全性…

Arduino驱动Si7021温湿度传感器(温湿度传感器)

目录 1、传感器特性 2、控制器和传感器连线图 3、驱动程序 Si7021温湿度传感器,应用了专用的数字模块采集技术和温湿度传感技术,具有极高的可靠性与卓越的长期稳定性。同时其体积小巧、精度高,特别是拥有毫秒级测试转换时间(DHT系列需要约2s的转换时间),启动测量与读…

抖音预约服务小程序开发:前端与后端技术的完美融合

开发抖音预约服务小程序成为了一种有趣而又实用的尝试。本篇文章&#xff0c;小编会与大家共同探讨抖音预约服务小程序开发的前端与后端技术融合的关键要点。 一、前端技术选择与设计 1.小程序框架 开发抖音预约服务小程序的前端&#xff0c;首先需要选择一个适用的小程序框…

Ubuntu设设置默认外放和麦克风设备

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、pulseaudio 是什么&#xff1f;二、配置外放1.查看所有的外放设备2.设定默认的外放设备3.设定外放设备的声音强度4.设定外放设备静音 三、配置麦克风1.查看…

C#winform门诊医生系统+sqlserver

C#winform门诊医生系统sqlserver说明文档 运行前附加数据库.mdf&#xff08;或sql生成数据库&#xff09; 主要技术&#xff1a;基于C#winform架构和sql server数据库 功能模块&#xff1a; 个人中心&#xff1a;修改个人信息、打开照片并进行修改 预约挂号&#xff1a;二级…

轻松驾驭Linux命令:账户查看、目录文件操作详解

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; Linux系统操作 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;引言&#x1f324;️查看账户☁️whoami☁️who &#x1f324;️ls和目录文件的创建删除☁…

Vue2系列 — 修饰符.sync

.sync 修饰符以前存在于 Vue1.0 版本里&#xff0c;后来在2.0版本中移除了 .sync 修饰符。 但是在 2.0 发布之后的实际应用中&#xff0c;发现 .sync 还是有其适用之处&#xff0c;比如在开发可复用的组件库时。我们需要做的只是让子组件改变父组件状态的代码更容易被区分。 从…

【Spring Boot】如何运用Spring Cache并设置缓存失效时间

简单描述 Spring Cache是一个框架&#xff0c;实现了基于注解的缓存功能&#xff0c;只需要简单地加一个注解&#xff0c;就能实现缓存功能。Spring Cache提供了一层抽象&#xff0c;底层可以切换不同的cache实现。具体就是通过CacheManager接口来统一不同的缓存技术。CacheMan…

JavaScript的学习之BOM和DOM,就这一篇就OK了!(超详细)

目录 Day28 JavaScript(2) 1、BOM对象 1.1 window对象 1.2 Location ( 地址栏)对象 1.3 本地存储对象 2、DOM对象(JS核心) 2.1 查找标签 2.2 绑定事件 2.3 操作标签 2.4 常用事件 Day28 JavaScript(2) 1、BOM对象 BOM:Broswer object model,即浏览器提供我们开发者…

算法学习 day27

第二十七天 美化数组的最少删除数 2216. 美化数组的最少删除数 - 力扣&#xff08;LeetCode&#xff09; class Solution { public:int minDeletion(vector<int>& nums) {int len nums.size();if(len 0) return 0;int res 0,cur 0;for(int i 1;i < len;i)…

企业数字化转型所需的数据在哪里找?企业数据运营有什么用?

现阶段&#xff0c;越来越多企业考虑数字化转型。特别是中小型企业&#xff0c;他们察觉到&#xff1a;数字化转型的关键在于数据的运营。只有通过数据的有效管理和不断挖掘&#xff0c;企业才可以更好地了解市场需求&#xff0c;优化业务流程&#xff0c;提高决策效率&#xf…

【设计模式】聊聊职责链模式

原理和实现 模板模式变化的是其中一个步骤&#xff0c;而责任链模式变化的是整个流程。 将请求的发送和接收解耦合&#xff0c;让多个接收对象有机会可以处理这个请求&#xff0c;形成一个链条。不同的处理器负责自己不同的职责。 定义接口 public interface Filter {/*** …

[AutoSar]在Davinci developer中mapping Com interface port

目录 关键词平台说明一、实现步骤1.1 新建一个需要接入Com interface 的SWC1.2 Data mapping1.3 选择SWC和信号为分开的还是group1.4 添加前缀后缀1.5 在SWC中使用 关键词 嵌入式、C语言、autosar 平台说明 项目ValueOSautosar OSautosar厂商vector芯片厂商TI编程语言C&…

杭电oj 2053 Switch游戏 C语言

本题用到了异或“^”的性质&#xff1a;0与任何数异或都为该数本身&#xff0c;相同的两个数异或为0&#xff1b; #include <stdio.h>void main() {int n, i, out;while (~scanf_s("%d", &n)){out 0;for (i 1; i < n; i){if (n % i 0)out ^ 1;}pri…