SpringBoot连接多RabbitMQ源

转自:

SpringBoot连接多RabbitMQ源 - 掘金在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就…https://juejin.cn/post/6844904039797243917


在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就不太适用了,需要单独编写每个连接。

在SpringBoot框架中,我们常用的两个类一般是:

  • RabbitTemplate:作为生产、消费消息使用;
  • RabbitAdmin:作为申明、删除交换机和队列,绑定和解绑队列和交换机的绑定关系使用。

所以我们连接多个RabbitMQ就需要重新建立连接、重新实现这两个类。 代码如下:

配置

application.properties配置文件需要配置两个连接:


 

server.port=8080# rabbitmq
v2.spring.rabbitmq.host=host
v2.spring.rabbitmq.port=5672
v2.spring.rabbitmq.username=username
v2.spring.rabbitmq.password=password
v2.spring.rabbitmq.virtual-host=virtual-host
#consume 手动 ack
v2.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.当mandatory标志位设置为true时,
#   如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
#   那么broker会调用basic.return方法将消息返还给生产者;
#2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
#   mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
#   否则就将消息return给发送者;
v2.spring.rabbitmq.template.mandatory=true
#publisher confirms 发送确认
v2.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送达exchange
#   2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
v2.spring.rabbitmq.publisher-returns=true
v2.spring.rabbitmq.listener.simple.prefetch=5# rabbitmq
v1.spring.rabbitmq.host=host
v1.spring.rabbitmq.port=5672
v1.spring.rabbitmq.username=username
v1.spring.rabbitmq.password=password
v1.spring.rabbitmq.virtual-host=virtual-host
#consume 手动 ack
v1.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.当mandatory标志位设置为true时,
#   如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
#   那么broker会调用basic.return方法将消息返还给生产者;
#2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
#   mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
#   否则就将消息return给发送者;
v1.spring.rabbitmq.template.mandatory=true
#publisher confirms 发送确认
v1.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送达exchange
#   2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
v1.spring.rabbitmq.publisher-returns=true
v1.spring.rabbitmq.listener.simple.prefetch=5

重写连接工厂

需要注意的是,在多源的情况下,需要在某个连接加上@Primary注解,表示主连接,默认使用这个连接;

package com.example.config.rabbitmq;import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/*** Created by shuai on 2019/4/23.*/
@Configuration
public class MultipleRabbitMQConfig {@Bean(name = "v2ConnectionFactory")public CachingConnectionFactory hospSyncConnectionFactory(@Value("${v2.spring.rabbitmq.host}") String host,@Value("${v2.spring.rabbitmq.port}") int port,@Value("${v2.spring.rabbitmq.username}") String username,@Value("${v2.spring.rabbitmq.password}") String password,@Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,@Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,@Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setPublisherReturns(publisherReturns);return connectionFactory;}@Bean(name = "v2RabbitTemplate")public RabbitTemplate firstRabbitTemplate(@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);v2RabbitTemplate.setMandatory(mandatory);v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {if (!ack) {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));} else {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));}});v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});});return v2RabbitTemplate;}@Bean(name = "v2ContainerFactory")public SimpleRabbitListenerContainerFactory hospSyncFactory(@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,@Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));factory.setPrefetchCount(prefetch);return factory;}@Bean(name = "v2RabbitAdmin")public RabbitAdmin iqianzhanRabbitAdmin(@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}// mq主连接@Bean(name = "v1ConnectionFactory")@Primarypublic CachingConnectionFactory publicConnectionFactory(@Value("${v1.spring.rabbitmq.host}") String host,@Value("${v1.spring.rabbitmq.port}") int port,@Value("${v1.spring.rabbitmq.username}") String username,@Value("${v1.spring.rabbitmq.password}") String password,@Value("${v1.spring.rabbitmq.virtual-host}") String virtualHost,@Value("${v1.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,@Value("${v1.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setPublisherReturns(publisherReturns);return connectionFactory;}@Bean(name = "v1RabbitTemplate")@Primarypublic RabbitTemplate publicRabbitTemplate(@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v1.spring.rabbitmq.template.mandatory}") Boolean mandatory) {RabbitTemplate v1RabbitTemplate = new RabbitTemplate(connectionFactory);v1RabbitTemplate.setMandatory(mandatory);v1RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {if (!ack) {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));} else {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));}});v1RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});});return v1RabbitTemplate;}@Bean(name = "v1ContainerFactory")@Primarypublic SimpleRabbitListenerContainerFactory insMessageListenerContainer(@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v1.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,@Value("${v1.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));factory.setPrefetchCount(prefetch);return factory;}@Bean(name = "v1RabbitAdmin")@Primarypublic RabbitAdmin publicRabbitAdmin(@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
}

 

创建Exchange、Queue并绑定

再实现RabbitAdmin后,我们就需要根据RabbitAdmin创建对应的交换机和队列,并建立绑定关系

package com.example.config.rabbitmq;import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** 创建Queue、Exchange并建立绑定关系* Created by shuai on 2019/5/16.*/
@Configuration
public class MyRabbitMQCreateConfig {@Resource(name = "v2RabbitAdmin")private RabbitAdmin v2RabbitAdmin;@Resource(name = "v1RabbitAdmin")private RabbitAdmin v1RabbitAdmin;@PostConstructpublic void RabbitInit() {v2RabbitAdmin.declareExchange(new TopicExchange("exchange.topic.example.new", true, false));v2RabbitAdmin.declareQueue(new Queue("queue.example.topic.new", true));v2RabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.example.topic.new", true))        //直接创建队列.to(new TopicExchange("exchange.topic.example.new", true, false))    //直接创建交换机 建立关联关系.with("routing.key.example.new"));    //指定路由Key}
}

生产者

为了后续验证每个连接都建立成功,并且都能生产消息,生产者这里分别使用新生成的RabbitTemplate发送一条消息。

package com.example.topic;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class TopicProducer {@Resource(name = "v1RabbitTemplate")private RabbitTemplate v1RabbitTemplate;@Resource(name = "v2RabbitTemplate")private RabbitTemplate v2RabbitTemplate;public void sendMessageByTopic() {String content1 = "This is a topic type of the RabbitMQ message example from v1RabbitTemplate";v1RabbitTemplate.convertAndSend("exchange.topic.example.new","routing.key.example.new",content1);String content2 = "This is a topic type of the RabbitMQ message example from v2RabbitTemplate";v2RabbitTemplate.convertAndSend("exchange.topic.example.new","routing.key.example.new",content2);}
}

消费者

这里需要注意在配置消费队列时,需要标识ContainerFactory

package com.example.topic;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "queue.example.topic.new", containerFactory = "v2ContainerFactory")
public class TopicConsumer {@RabbitHandlerpublic void consumer(String message) {System.out.println(message);}
}

 这样就完成了SpringBoot连接多个RabbitMQ源的示例了,再写一段测试代码验证下。

测试验证

package com.example.test;import com.example.topic.TopicProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQMultipleTest {@Autowiredprivate TopicProducer topicProducer;@Testpublic void topicProducerTest() {topicProducer.sendMessageByTopic();}
}

 

执行测试代码,验证结果为:

 

验证SpringBoot连接多RabbitMQ源成功!

github地址:Spring Boot 教程、技术栈、示例代码 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329668.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

满足其中一个条件则可_农村分户好处多,但并非人人都可分户!满足这4个条件才可以申请...

分户,一般指子女成年或者成家后从父母户口里面独立出去,自立一户;也可以是指夫妻离婚后一方将户口独立出去(离婚也可以不分户,变更婚姻状态就行)。简单的说分户是指原本在一个户口本上的人口,现在分出去自成一个户口本…

Synchronized的实现原理(一)

转载自 Synchronized的实现原理(一)synchronized,是Java中用于解决并发情况下数据同步访问的一个很重要的关键字。当我们想要保证一个共享资源在同一时间只会被一个线程访问到时,我们可以在代码中使用synchronized关键字对类或者对…

FOSS历史回顾:三代开源人的故事

现在是2016年,你环顾一下四周,开源早已无处不在了。开源无论是规范、形式、以及面貌都和最初的大相径庭,然而事实上,这也预示着新一代的开源程序员们的崛起。下面我们尝试解释下。 (以下这一段落为作者自谦&#xff09…

Spring中@Autowired、@Qualifier、@Resource的区别

转自: Spring中Autowired、Qualifier、Resource的区别_老周聊架构的博客-CSDN博客_qualifier和resource区别1、AutowiredAutowired 可以单独使用。如果单独使用,它将按类型装配。因此,如果在容器中声明了多个相同类型的bean,则会…

map分组后取前10个_海关数据 | 图解前10个月外贸

*内容转载自微信公众号:海关发布RECOMMEND【 推荐阅读 】海关数据 | 图解前三季度我国外贸海关数据 | 图解8月外贸海关数据 | 一图看懂前7个月外贸海关数据 | 图解上半年度外贸增3.9%声明本微信订阅号不以商业营利为目的,不排除部分文字内容或图片转载自…

回顾build 2016:你好,这是微软迄今最好的Windows开发平台

按:本文作者陈计节,ThoughtWorks 高级咨询师。多年的跨平台 .NET 开发者,全栈工程师,技术布道师。擅长互联网应用程序的设计、开发和运维等工作。 在最近的开发者大会(Build 2016)上,微软面向开…

深入理解多线程(二)—— Java的对象模型

转载自 深入理解多线程(二)—— Java的对象模型上一篇文章中简单介绍过synchronized关键字的方式,其中,同步代码块使用monitorenter和monitorexit两个指令实现,同步方法使用ACC_SYNCHRONIZED标记符实现。后面几篇文章会…

8.1-CPU结构(学习笔记)

【README】 本文总结自bilibili《计算机组成原理(哈工大刘宏伟)》的视频讲解,非常棒,墙裂推荐; 【1】CPU结构 Cpu的首要功能就是解释指令;功能列表如下: 1) 取指令:从内存中读取…

生物信息 python 书籍_用python做生物信息数据分析(1-环境准备)

写在前面四五年前,接触生物信息的时候,阴差阳错,我选择用perl。事实上,直到嫌我,我还是认为我当初的选择,完全正确!。在做一些小文本的快速处理上,perl在我看来,从来最优…

8.2-指令周期(学习笔记)

【README】 本文总结自bilibili《计算机组成原理(哈工大刘宏伟)》的视频讲解,非常棒,墙裂推荐; 【1】指令周期 【1.1】指令周期概述 1)指令周期:取出并执行一条指令所需的全部时间&#xff1…

深入理解多线程(三)—— Java的对象头

转载自 深入理解多线程(三)—— Java的对象头上一篇文章中我们从HotSpot的源码入手,介绍了Java的对象模型。这一篇文章在上一篇文章的基础上再来介绍一下Java的对象头。主要介绍一下对象头的作用,结构以及他和锁的关系。 Java对象…

python tkinter 背景色改变不了_python - Tkinter背景颜色问题 - 堆栈内存溢出

我有一个脚本,其中包含Tkinter模块,我想每隔3分钟更改一次背景颜色,例如绿色3分钟,然后橙色,然后红色。 我有显示绿色的代码,但无法更改它。当我在代码中创建函数时,会遇到一些不同的错误&#…

回顾微软近年来对于Linux和开源的策略

2014年十月,在旧金山举办的一场活动中,微软的CEO Satya Nadella向公众表示,微软“爱Linux”。作为昔日的竞争对手,微软对Linux的态度逐渐从敌对转变为合作。自那次发言以来,微软在开源方面频频重拳出击,似乎…

深入理解多线程(四)—— Moniter的实现原理

转载自 深入理解多线程(四)—— Moniter的实现原理本文是《深入理解多线程系列文章》的第四篇。点击查看原文,阅读该系列所有文章。 在深入理解多线程(一)——Synchronized的实现原理中介绍过关于Synchronize的实现原理…

(转)Spring Boot通过ImportBeanDefinitionRegistrar动态注入Bean

转自: Spring Boot通过ImportBeanDefinitionRegistrar动态注入Bean - 掘金在阅读SpringBoot源码时,看到SpringBoot中大量使用ImportBeanDefinitionRegistrar来实现Bean的动态注入。它是Spring中一个强大的扩展接口。本篇文章来https://juejin.cn/post/6…

通过图书编号查询python_Python图书接口调用代码实例

1.[代码][Python]代码#!/usr/bin/python# -*- coding: utf-8 -*-import json, urllibfrom urllib import urlencode#----------------------------------# 图书电商数据调用示例代码 - 聚合数据# 在线接口文档:http://www.juhe.cn/docs/50#-------------…

深入理解多线程(五)—— Java虚拟机的锁优化技术

转载自 深入理解多线程(五)—— Java虚拟机的锁优化技术本文是《深入理解多线程》的第五篇文章,前面几篇文章中我们从synchronized的实现原理开始,一直介绍到了Monitor的实现原理。这一篇在前几篇的基础上,深入介绍一下…

Visual Studio Code 1.0正式发布

Visual Studio Code 是一个运行于 OS X,Windows 和 Linux 之上的,针对于编写现代 web 和云应用的跨平台编辑器。 这标志着 Microsoft 第一次向开发者们提供了一款真正的跨平台编辑器。虽然完整版的 Visual Studio 仍然是只能运行在 Windows 之上&#xf…

springboot使用ImportBeanDefinitionRegistrar 动态注册bean

【README】 1.采用 ImportBeanDefinitionRegistrar 动态注册bean,应用场景有: 如 一个后端服务需要用到多个 rabbitmq集群客户端,kafka客户端;这时就需要手动注册多个同类型的bean,但不同beanName,并用 …

python 线性回归函数_Python实现的简单线性回归算法实例分析

本文实例讲述了Python实现的简单线性回归算法。分享给大家供大家参考,具体如下:用python实现R的线性模型(lm)中一元线性回归的简单方法,使用R的women示例数据,R的运行结果:> summary(fit)Call:lm(formula weight ~…