SpringBoot整合RabbitMQ快速实战

目录

引入依赖

配置文件

不同模式下使用springboot收发消息

直连模式

 生产者

 消费者

Fanout模式

生产者

消费者

Topic主题模式

生产者

消费者

Headers模式

生产者

 消费者

补充Quorum队列

生产者

消费者


引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 注意下版本。不同版本下的配置方式会有变化。


配置文件

       所有的基础运行环境都在application.properties中进行配置。所有配置以spring.rabbitmq开头。通常按照示例进行一些基础的必要配置就可以跑了。

server.port=8080
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/mirror# 单词推送消息数量
spring.rabbitmq.listener.simple.prefetch=1
# 消费者的消费线程数量
spring.rabbitmq.listener.simple.concurrency=5
# 消费者的最大线程数量
spring.rabbitmq.listener.simple.max-concurrency=10
# 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=none

不同模式下使用springboot收发消息

工具类

public class MyConstants {public static final String QUEUE_QUORUM = "quorumQueue";public static final String QUEUE_STREAM = "streamQueue";//direct模式,直接发送到队列public static final String QUEUE_DIRECT = "directqueue";//fanout模式public static final String EXCHANGE_FANOUT = "fanoutExchange";public static final String QUEUE_FANOUT_Q1 = "fanout.q1";public static final String QUEUE_FANOUT_Q2 = "fanout.q2";public static final String QUEUE_FANOUT_Q3 = "fanout.q3";public static final String QUEUE_FANOUT_Q4 = "fanout.q4";//topic模式public static final String EXCHANGE_TOPIC = "topicExchange";public static final String QUEUE_TOPIC1 = "hunan.eco";public static final String QUEUE_TOPIC2 = "hunan.IT";public static final String QUEUE_TOPIC3 = "hebei.eco";public static final String QUEUE_TOPIC4 = "hebei.IT";//header模式public static final String EXCHANGE_HEADER = "headerExchange";public static final String QUEUE_TXTYP1 = "txTyp1";public static final String QUEUE_BUSTYP1 = "busTyp1";public static final String QUEUE_TXBUSTYP1 = "txbusTyp1";}

直连模式

/*** 直连模式只需要声明队列,所有消息都通过队列转发。*/
@Configuration
public class DirectConfig {@Beanpublic Queue directQueue() {return new Queue(MyConstants.QUEUE_DIRECT);}
}
 生产者
	@ApiOperation(value="direct发送接口",notes="直接发送到队列。task模式")@GetMapping(value="/directSend")public Object directSend(String message) throws AmqpException, UnsupportedEncodingException {//设置部分请求参数MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setPriority(2);//设置消息转换器,如jsonrabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//将对象转换成json再发送。
//		rabbitTemplate.convertandsend("",Object);//发消息rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));return "message sended : "+message;}
 消费者
	//直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式//通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos@RabbitListener(queues=MyConstants.QUEUE_DIRECT,containerFactory="qos_4")public void directReceive22(Message message, Channel channel, String messageStr) {System.out.println("consumer1 received message : " +messageStr);}@RabbitListener(queues=MyConstants.QUEUE_DIRECT)public void directReceive2(String message) {System.out.println("consumer2 received message : " +message);}
@Configuration
public class RabbitmqConfig {@Bean(name="qos_4")public SimpleRabbitListenerContainerFactory getSimpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setMaxConcurrentConsumers(4);factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手动确认return factory;}
}

Fanout模式

/*** Fanout模式需要声明exchange,并绑定queue,由exchange负责转发到queue上。*/
@Configuration
public class FanoutConfig {//声明队列@Beanpublic Queue fanoutQ1() {return new Queue(MyConstants.QUEUE_FANOUT_Q1);}@Beanpublic Queue fanoutQ2() {return new Queue(MyConstants.QUEUE_FANOUT_Q2);}@Beanpublic Queue fanoutQ3() {return new Queue(MyConstants.QUEUE_FANOUT_Q3);}@Beanpublic Queue fanoutQ4() {return new Queue(MyConstants.QUEUE_FANOUT_Q4);}//声明exchange@Beanpublic FanoutExchange setFanoutExchange() {return new FanoutExchange(MyConstants.EXCHANGE_FANOUT);}//声明Binding,exchange与queue的绑定关系@Beanpublic Binding bindQ1() {return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());}@Beanpublic Binding bindQ2() {return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());}@Beanpublic Binding bindQ3() {return BindingBuilder.bind(fanoutQ3()).to(setFanoutExchange());}@Beanpublic Binding bindQ4() {return BindingBuilder.bind(fanoutQ4()).to(setFanoutExchange());}}
生产者
	@ApiOperation(value="fanout发送接口",notes="发送到fanoutExchange。消息将往该exchange下的所有queue转发")@GetMapping(value="/fanoutSend")public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//fanout模式只往exchange里发送消息。分发到exchange下的所有queuerabbitTemplate.send(MyConstants.EXCHANGE_FANOUT, "", new Message(message.getBytes("UTF-8"),messageProperties));Message message2 = MessageBuilder.withBody(message.getBytes()).setMessageId(UUID.randomUUID().toString()).build();rabbitTemplate.send(message2);return "message sended : "+message;}
消费者
	//fanout模式接收还是只指定队列@RabbitListener(queues=MyConstants.QUEUE_FANOUT_Q1)public void fanoutReceiveq1(String message) {System.out.println("fanoutReceive q1 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_FANOUT_Q2)public void fanoutReceiveq2(String message) {System.out.println("fanoutReceive q2 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_FANOUT_Q3)public void fanoutReceiveq3(String message) {System.out.println("fanoutReceive q3 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_FANOUT_Q4)public void fanoutReceiveq4(String message) {System.out.println("fanoutReceive q4 received message : " +message);}

Topic主题模式

@Configuration
public class TopicConfig {//声明队列@Beanpublic Queue topicQ1() {return new Queue(MyConstants.QUEUE_TOPIC1);}@Beanpublic Queue topicQ2() {return new Queue(MyConstants.QUEUE_TOPIC2);}@Beanpublic Queue topicQ3() {return new Queue(MyConstants.QUEUE_TOPIC3);}@Beanpublic Queue topicQ4() {return new Queue(MyConstants.QUEUE_TOPIC4);}//声明exchange@Beanpublic TopicExchange setTopicExchange() {return new TopicExchange(MyConstants.EXCHANGE_TOPIC);}//声明binding,需要声明一个roytingKey@Beanpublic Binding bindTopicHebei1() {return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("hunan.*");}@Beanpublic Binding bindTopicHebei2() {return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("*.IT");}@Beanpublic Binding bindTopicHebei3() {return BindingBuilder.bind(topicQ3()).to(setTopicExchange()).with("*.eco");}@Beanpublic Binding bindTopicHebei4() {return BindingBuilder.bind(topicQ4()).to(setTopicExchange()).with("hebei.*");}}
生产者
@ApiOperation(value="topic发送接口",notes="发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。")@ApiImplicitParam(name="routingKey",value="路由关键字")@GetMapping(value="/topicSendHunanIT")public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {if(null == routingKey) {routingKey="hebei.IT";}MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//fanout模式只往exchange里发送消息。分发到exchange下的所有queuerabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));return "message sended : routingKey >"+routingKey+";message > "+message;}
消费者
	//topic Receiver//注意这个模式会有优先匹配原则。例如发送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不会再去匹配*.IT(hebei.IT)@RabbitListener(queues=MyConstants.QUEUE_TOPIC1)public void topicReceiveq1(String message) {System.out.println("topic hunan.eco received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_TOPIC2)public void topicReceiveq2(String message) {System.out.println("topic hunan.IT received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_TOPIC3)public void topicReceiveq3(String message) {System.out.println("topic hebei.eco received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_TOPIC4)public void topicReceiveq4(String message) {System.out.println("topic hebei.IT received message : " +message);}

Headers模式

@Configuration
public class HeaderConfig {//声明queue@Beanpublic Queue headQueueTxTyp1() {return new Queue(MyConstants.QUEUE_TXTYP1);}@Beanpublic Queue headQueueBusTyp1() {return new Queue(MyConstants.QUEUE_BUSTYP1);}@Beanpublic Queue headQueueTxBusTyp() {return new Queue(MyConstants.QUEUE_TXBUSTYP1);}//声明exchange@Beanpublic HeadersExchange setHeaderExchange() {return new HeadersExchange(MyConstants.EXCHANGE_HEADER);}//声明Binding//绑定header中txtyp=1的队列。header的队列匹配可以用mathces和exisits@Beanpublic Binding bindHeaderTxTyp1() {return BindingBuilder.bind(headQueueTxTyp1()).to(setHeaderExchange()).where("txTyp").matches("1");}//绑定Header中busTyp=1的队列。@Bean public Binding bindHeaderBusTyp1() {return BindingBuilder.bind(headQueueBusTyp1()).to(setHeaderExchange()).where("busTyp").matches("1");}//绑定Header中txtyp=1或者busTyp=1的队列。@Bean public Binding bindHeaderTxBusTyp1() {Map<String,Object> condMap = new HashMap<>();condMap.put("txTyp", "1");condMap.put("busTyp", "1");
//		return BindingBuilder.bind(headQueueTxBusTyp()).to(setHeaderExchange()).whereAny(new String[] {"txTyp","busTyp"}).exist();return BindingBuilder.bind(headQueueTxBusTyp()).to(setHeaderExchange()).whereAny(condMap).match();}
}
生产者
@ApiOperation(value="header发送接口",notes="发送到headerExchange。exchange转发消息时,不再管routingKey,而是根据header条件进行转发。")@GetMapping(value="/headerSend")public Object headerSend(String txTyp,String busTyp,String message) throws AmqpException, UnsupportedEncodingException {if(null == txTyp) {txTyp="0";}if(null == busTyp) {busTyp="0";}MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setHeader("txTyp", txTyp);messageProperties.setHeader("busTyp", busTyp);//fanout模式只往exchange里发送消息。分发到exchange下的所有queuerabbitTemplate.send("headerExchange", "uselessRoutingKey", new Message(message.getBytes("UTF-8"),messageProperties));return "message sended : txTyp >"+txTyp+";busTyp > "+busTyp;}
 消费者
    //header receiver//这个模式不再根据routingKey转发,而是根据header中的匹配条件进行转发@RabbitListener(queues=MyConstants.QUEUE_TXTYP1)public void headerReceiveq1(String message) {System.out.println("header txTyp1 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_BUSTYP1)public void headerReceiveq2(String message) {System.out.println("header busTyp1 received message : " +message);}@RabbitListener(queues=MyConstants.QUEUE_TXBUSTYP1)public void headerReceiveq3(String message) {System.out.println("header txbusTyp1 received message : " +message);}

补充Quorum队列

/*** @desc 声明一个Quorum队列*/
@Configuration
public class QuorumConfig {@Beanpublic Queue quorumQueue() {Map<String,Object> params = new HashMap<>();params.put("x-queue-type","quorum");return new Queue(MyConstants.QUEUE_QUORUM,true,false,false,params);}
}
生产者
	@ApiOperation(value="quorum队列发送接口",notes="直接发送到队列。Quorum队列")@GetMapping(value="/directQuorum")public Object directQuorum(String message) throws AmqpException, UnsupportedEncodingException {//设置部分请求参数MessageProperties messageProperties = new MessageProperties();messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);messageProperties.setPriority(2);//设置消息转换器,如jsonrabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//将对象转换成json再发送。
//		rabbitTemplate.convertandsend("",Object);//发消息rabbitTemplate.send(MyConstants.QUEUE_QUORUM,new Message(message.getBytes("UTF-8"),messageProperties));return "message sended : "+message;}
消费者
	@RabbitListener(queues = MyConstants.QUEUE_QUORUM)public void quorumReceiver(String message){System.out.println("quorumReceiver received message : "+ message);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/656585.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是Vue Vue入门案例

一、什么是Vue 概念&#xff1a;Vue (读音 /vjuː/&#xff0c;类似于 view) 是一套 构建用户界面 的 渐进式 框架 Vue2官网&#xff1a;Vue.js 1.什么是构建用户界面 基于数据渲染出用户可以看到的界面 2.什么是渐进式 所谓渐进式就是循序渐进&#xff0c;不一定非得把V…

华为radius认证

组网需求 如图1所示&#xff0c;用户同处于huawei域&#xff0c;Router作为目的网络接入服务器。用户需要通过服务器的远端认证才能通过Router访问目的网络。在Router上的远端认证方式如下&#xff1a; Router对接入用户先用RADIUS服务器进行认证&#xff0c;如果认证没有响应…

(M)UNITY三段攻击制作

三段攻击逻辑 基本逻辑&#xff1a; 人物点击攻击按钮进入攻击状态&#xff08;bool isAttack&#xff09; 在攻击状态下&#xff0c; 一旦设置的触发器&#xff08;trigger attack&#xff09;被触发&#xff0c;设置的计数器&#xff08;int combo&#xff09;查看目前攻击…

【美团】无人机-大数据开发工程师

更新时间&#xff1a;2024/01/29 工作地点&#xff1a;北京市 事业群&#xff1a;到家事业群 工作经验&#xff1a;3年 部门介绍 为了更好地提升城市即时配送的效率与体验&#xff0c;美团于2017年启动了无人机配送服务的探索&#xff0c;通过科技创新推动履约工具变革&#x…

基于JAVA+SpringBoot+Vue的前后端分离的美食分享推荐平台2

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#xff1a; 在当今社会&#xff0…

代码随想录算法训练营day35 || 860.柠檬水找零,406. 根据身高重建队列,452. 用最少数量的箭引爆气球

视频讲解&#xff1a; 贪心算法&#xff0c;看上去复杂&#xff0c;其实逻辑都是固定的&#xff01;LeetCode&#xff1a;860.柠檬水找零_哔哩哔哩_bilibili 贪心算法&#xff0c;不要两边一起贪&#xff0c;会顾此失彼 | LeetCode&#xff1a;406.根据身高重建队列_哔哩哔哩_b…

自然语言处理(NLP)技术使用

自然语言处理&#xff08;NLP&#xff09;技术使用 以下是一些自然语言处理&#xff08;NLP&#xff09;技术的例子&#xff1a;以上只是一些NLP技术的例子&#xff0c;还有许多其他的技术和应用&#xff0c;如文本分类、文本生成、问答系统等。NLP技术的发展正逐渐改变人们与计…

手撕红黑树

目录 性质 插入规则 调整方法 插入在grandfather的左子树 uncle存在为红色&#xff08;变色&#xff09; uncle不存在或存在为黑色&#xff08;旋转变色&#xff09; 插入在grandfather的右子树 uncle存在且为红色&#xff08;变色&#xff09; uncle不存在或者存在为黑…

Whatsapp 相关(七) -网络请求

本篇主要用来完善上篇文章 frida 监测网络请求的. whatsapp相关(五)- frida监测网络请求 1: 脚本 本次的脚本与上次的区别是,之前只能输出请求的地址,本次优化后,可输出请求参数,结果等. 代码如下: Java.perform(function () {var HttpURLConnection Java.use(java.net.H…

Bean 的作用域有哪些?

Spring 中 Bean 的作用域通常有下面几种&#xff1a; singleton : IoC 容器中只有唯一的 bean 实例。Spring 中的 bean 默认都是单例的&#xff0c;是对单例设计模式的应用。prototype : 每次获取都会创建一个新的 bean 实例。也就是说&#xff0c;连续 getBean() 两次&#x…

Vue之初识路由

路由有什么用&#xff1f; 在我们修改地址栏路径时&#xff0c;切换显示匹配的组件 VueRouter的使用(5 2) 5个基础步骤(固定) 1.下载:下载 VueRouter模块到当前工程&#xff0c;版本3.6.5 yarn add vue-router3.6.5 npm i vue-router3.6.52.引入 import VueRouter from v…

如何在DBeaver中重命名数据库

前言 DBeaver是一款强大的开源通用数据库管理和开发工具&#xff0c;支持多种数据库类型。在某些数据库系统中&#xff0c;你可以直接通过DBeaver的图形界面来重命名数据库名称。本文将详细介绍如何在DBeaver中进行数据库重命名操作。 重要提示&#xff1a; 对于不同的数据库…

15EG使用vivado2021.1实现LWIP的网络传输

创建工程模板在hello_world中已经介绍过了&#xff0c;这里直接从配置完zynq ip核开始&#xff0c;由于使用vivado的版本不同&#xff0c;配置ZYNQ时需要用到的tcl文件我会放在工程文件夹下的file文件夹中 配置好IP核后&#xff0c;右键设计模块&#xff0c;点击Generate Outpu…

【通信系统】MIMO阵列信号来向DOA估计实现~含FOCUSS、OMP、贝叶斯学习(SBL)等稀疏重构法和常规、子空间法、空间平滑滤波法

MIMO阵列目标信号来向估计原理与实现~基于常规法、子空间变换法和稀疏恢复法 写在最前前言空间谱估计的历史发展 仿真原理离散时间阵列信号模型波束形成矩阵(完备字典)回波生成空间平滑滤波传统方法CBF~常规波束成型Capon~最小方差无失真响应法ML~最大似然估计法 子空间方法MUS…

05. 交换机的基本配置

文章目录 一. 初识交换机1.1. 交换机的概述1.2. Ethernet_ll格式1.3. MAC分类1.4. 冲突域1.5. 广播域1.6. 交换机的原理1.7. 交换机的3种转发行为 二. 初识ARP2.1. ARP概述2.2. ARP报文格式2.3. ARP的分类2.4. 免费ARP的作用 三. 实验专题3.1. 实验1&#xff1a;交换机的基本原…

十一:常用类

文章目录 01、字符串相关的类1.1、理解String的不可变性1.2、String不同实例化方式的对比1.3、String不同拼接操作的对比1.3.1、String使用陷阱 1.4、String的一道面试题1.5、JVM中涉及字符串的内存结构1.6、String的常用方法11.7、String的常用方法21.8、String的常用方法31.9…

防御保护笔记02

防火墙 防火墙的主要职责在于&#xff1a;控制和防护 ---- 安全策略 --- 防火墙可以根据安全策略来抓取流量 防火墙分类 按物理特性划分 软件防火墙 硬件防火墙 按性能划分 百兆级防火墙 吞吐量&#xff1a;指对网络、设备、端口、虚电路或其他设施&#xff0c;单位时间内成…

.locked.locked1勒索病毒爆发:如何有效保护和恢复您的文件

引言&#xff1a; 随着网络犯罪的不断演变&#xff0c;.locked.locked1勒索病毒成为当前数字世界中的一项威胁。本文将深入介绍.locked.locked1勒索病毒的特征&#xff0c;提供一些建议的数据恢复方法&#xff0c;并分享一些预防措施&#xff0c;以确保用户的数据免受威胁。如…

JS和CSS实现的原生轮播图

JSCSS实现滑动轮播图 使用JS加CSS来实现的幻灯片&#xff0c;主要使用的是CSS的transform属性中的translate来实现&#xff0c;适合与用户交互的轮播图&#xff0c;展现轮播图的数量&#xff0c;用户可自由进行选择。 <!DOCTYPE html> <html lang"en">&…

实际项目演示:Python RegEx在数据处理中的应用!

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 正则表达式&#xff08;Regular Expressions&#xff0c;简称 RegEx&#xff09;是一种强大的文本匹配和搜索工具&#xff0c;它在数据处理、文本解析和字符串操作中发挥着关键作用。Python 提供了内置的 re 模块…