RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)

文章目录

  • 前言
  • 一、WorkQueues模型
    • 消息发送
    • 消息接收
    • 能者多劳
  • 二、交换机类型
    • 1.Fanout交换机
      • 消息发送
      • 消息接收
    • 2.Direct交换机
      • 消息接收
      • 消息发送
    • 3.Topic交换机
      • 消息发送
      • 消息接收
  • 三、编程式声明队列和交换机
    • fanout示例
    • direct示例
    • 基于注解
  • 四、消息转换器
  • 总结


前言

WorkQueues模型、Fanout交换机、Direct交换机、Topic交换机、基于SpringBoot注解声明队列和交换机、消息转换器。


一、WorkQueues模型

  • Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
  • 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

在这里插入图片描述
在控制台创建一个work.queue的队列:
在这里插入图片描述

消息发送

循环发送,模拟大量消息堆积现象。

/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

消息接收

模拟多个消费者绑定同一个队列,我们添加2个方法,并且设置不同睡眠时间模拟不同性能读取

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

在这里插入图片描述

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,发现结果如下:
在这里插入图片描述

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量

二、交换机类型

1.Fanout交换机

发送流程:
在这里插入图片描述

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

在控制台创建fanout.queue1、fanoutqueue2队列和dragon.fanout交换机,并将队列绑定到交换机:
在这里插入图片描述

在这里插入图片描述

消息发送

@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "dragon.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

消息接收

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

2.Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在这里插入图片描述
在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

在控制台声明两个队列direct.queue1和direct.queue2,然后声明一个direct类型的交换机,绑定队列和交换机(使用red和blue作为key,绑定direct.queue1到dragon.direct;使用red和yellow作为key,绑定direct.queue2到dragon.direct):
在这里插入图片描述

在这里插入图片描述

消息接收

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

消息发送

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "dragon.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

绑定红色的都会接收到信息:
在这里插入图片描述

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.Topic交换机

  • Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
    只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
  • BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
  • 通配符规则:
    • #:匹配一个或多个词
    • *:匹配不多不少恰好1个词
    • 举例:
      dragon.#:能够匹配dragon.stu.insert 或者 dragon.stu
      dragon.*:只能匹配dragon.stu

在这里插入图片描述
假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

在这里插入图片描述

消息发送

/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "dragon.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

消息接收

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

三、编程式声明队列和交换机

SpringAMQP提供了一个Queue类,用来创建队列:

fanout示例

@Configuration
public class FanoutConfiguration {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){
//    还可以是  return  ExchangeBuilder.fanoutExchange("dragon.fanout").build();return new FanoutExchange("dragon.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){
//   还可以是  return  QueueBuilder.durable("fanout.queue1").build();return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
//绑定队列和交换机的另一方法
//    @Bean
//    public Binding bindingQueue2(){
//        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
//    }
}

direct示例

@Configuration
public class DirectConfiguration {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("dragon.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

基于注解

上面的代码还是很多的,基于注解的方式也能够代替上面的繁杂配置,下面演示direct和topic交换机,队列的代码

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

四、消息转换器

随便创建一个队列,然后发送Map对象,你会发现消息格式很不友好

@Test
public void testSendMap() throws InterruptedException {// 准备消息Map<String,Object> msg = new HashMap<>();msg.put("name", "柳岩");msg.put("age", 21);// 发送消息rabbitTemplate.convertAndSend("object.queue", msg);
}

控制台查看:
在这里插入图片描述

  • Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
  • 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
    • 数据体积过大
      有安全漏洞
      可读性差
  • 根据上面测试,显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

j解决:
引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。其转换器配置。
添加配置:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

总结

以上就是所有讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/163692.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C plus plus

环境配置 vscodewindows vscode c 环境配置(终极版)_vscode配置c/c环境_BangBang的博客-CSDN博客VsCode安装和配置C环境详细全流程_vscode安装c-CSDN博客MinGW、MinGW-w64 与TDM-GCC 应该如何选择&#xff1f; - 知乎、VsCode安装和配置C环境详细全流程_vscode安装c-CSDN博客 …

​LeetCode解法汇总5-正则表达式匹配​

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 描述&#xff1a; 「HTML 实…

Educoder中Hive综合应用案例——用户学历查询

第1关:查询每一个用户从出生到现在的总天数 ---------- 禁止修改 ----------drop database if exists mydb cascade; ---------- 禁止修改 -------------------- begin ---------- ---创建mydb数据库 create database mydb;---使用mydb数据库 use mydb;---创建表user create …

电脑找不到xinput1_3.dll怎么修复,快速处理dll问题的5个方法分享

在使用电脑的过程中&#xff0c;我们常常会遇到一些常见的问题&#xff0c;其中之一就是“电脑缺少xinput1_3.dll”。这个问题可能会影响到我们对电脑的使用体验&#xff0c;甚至导致某些软件无法正常运行。在我遇到这个问题并解决之后&#xff0c;我深刻地体会到了解决问题的重…

迅镭激光板材切割自动化生产线中标高端机械装备龙头豪迈集团!

近年来&#xff0c;中国制造业逐步由低端制造业向高端制造业迈进、由劳动密集型向技术密集型转变&#xff0c;智能制造带动了制造业生产环节的自动化、信息化、数字化、智能化的迭代升级。 位于山东省的高端机械装备龙头——豪迈集团&#xff0c;紧跟国家发展战略&#xff0c;加…

【Spring集成MyBatis】MyBatis的Dao层实现(基于配置,非注解开发)

文章目录 1. MyBatis的dao层实现(传统方式)——需要写接口及其实现类2. MyBatis的代理开发方式——仅需写接口 1. MyBatis的dao层实现(传统方式)——需要写接口及其实现类 传统方式就是在项目下边建立dao包&#xff0c;里面包含接口及其实现类&#xff0c;文件结构如下&#x…

交直流一体化电源系统测试步骤详解

交直流一体化电源拥有高度适应性&#xff0c;可以用于不同的电力需求领域。但是为了确保其质量和性能&#xff0c;需要对交直流一体化电源进行各项测试以保证正常工作。本文纳米软件将介绍交直流一体化电源的测试方法&#xff0c;以及如何用交直流一体化电源测试系统进行测试。…

Java,数据结构与集合源码,关于Map接口的实现类(HashMap、LinkedHashMap)

HashMap中的元素的特点&#xff1a; HashMap中的所有key之间是不可重复的、无序的。所有的key构成一个Set集合。 HashMap中的所有的value彼此之间是可重复的、无序的。所有的value构成一个Collection集合。 HashMap中的一对key-value&#xff0c;就构成了一个entry。Map中的ent…

python常用第三方模块---openyxl

openyxl模块&#xff1a;用于处理excel文件的木块&#xff0c;可以Excel中的数据进行写入和读取 函数或类的名称功能描述load_workbook(filename)打开已经存在的工作簿&#xff0c;结果为工作簿对象 workbook.sheetnames 工作簿对象的sheetnames属性&#xff0c;用户获取所有工…

深入理解 pytest Fixture 方法及其应用!

当涉及到编写自动化测试时&#xff0c;测试框架和工具的选择对于测试用例的设计和执行非常重要。在Python 中&#xff0c;pytest是一种广泛使用的测试框架&#xff0c;它提供了丰富的功能和灵活的扩展性。其中一个很有用的功 能是fixture方法&#xff0c;它允许我们初始化测试环…

《实现领域驱动设计》笔记——上下文映射图

一个项目的上下文映射图可以用方式来表示。比较容易的一种是画一个简单的框图表示两个或多个限界上下文之间的映射关系。该框图表示了不同的限界上下文在解决方案空间中是如何通过集成相互关联的。另一种更详细的方式是通过限界上下文集成的源代码实现来表示。 上下文映射图为什…

微软WHQL认证

windows驱动开发要摆脱在测试模式下的开发&#xff0c;需要通过WHQL认证。 1&#xff1a;申请EV代码签名证书。EV代码签名证书在后续注册Windows硬件开发中心帐户&#xff0c;以及提交WHQL认证前为驱动程序进行数字签名等流程中都需要用到&#xff0c;所以申请EV代码签名证书是…

唯一索引和普通索引的使用上要注意什么

考虑下面一种情况&#xff1a; select name from CUser where id_card xxxxxxxyyyyyyzzzzz;你可能会将id_card作为主键了&#xff0c;但最好别这么做。你想想这么长一串的字符串做主键&#xff0c;查询时候效率其实是比较低的&#xff0c;其实是建议选择其他的作为主键。 那么…

BUUCTF [SWPU2019]我有一只马里奥 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 得到的 flag 请包上 flag{} 提交。 密文&#xff1a; 下载附件&#xff0c;得到一个.exe文件。 解题思路&#xff1a; 1、双击.exe文件运行&#xff0c;得到一个1.txt文本。打开&#xff0c;如下图。 2、提示我们…

Mysql中正则表达式Regexp常见用法

Mysql中正则表达式Regexp常见用法_regexp不包含-CSDN博客

List转string 逗号分隔

List转string 逗号分隔 1、将list转化为逗号分割的字符串 String str String.join(",", list); String str StringUtils.json(list.toArray(), ",");   2、将逗号分隔的字符串转换为List List<String> list Arrays.asList(str.split("…

当老师应该选文科还是理科

教育不断发展和改革&#xff0c;教师职业的选择也越来越受到关注。许多人在选择专业时都会考虑成为一名教师&#xff0c;但对于选择文科还是理科却感到困惑。本文将探讨当老师应该选文科还是理科。 文科注重的是人文素养和社会科学方面的知识&#xff0c;而理科则注重自然科学和…

如何做一个简单的深度集成学习框架

使用同一个框架&#xff0c;独立在一个数据集上面&#xff0c;分别训练多次&#xff0c;每个单独模型训练超参数可以一样&#xff0c;也可以不一样&#xff0c;最后若干个训练好的独立模型在测试集数据上面做最后集中决策。 实例代码如下&#xff1a; class MyEnsemble(nn.Modu…

问题 上位机程序重启

/ 1、上位机程序重启&#xff0c; 读线程被杀死&#xff0c;mcu usb busy&#xff0c;数据在fifo发不出去 下次线程重启后&#xff0c;在fifo里的数据首先被发送出去。 //

红旗Asianux Server Linux V8 安装万里数据库(GreatSQL)

红旗Asianux Server Linux V8 安装万里数据库&#xff08;GreatSQL&#xff09; 红旗Asianux介绍&#xff1a; 红旗Asianux Server Linux 8.0是为云时代重新设计的操作系统&#xff0c;为云时代的到来引入了大量新功能&#xff0c;包括用于配置管理、快速迁移框架、编程语言和…