做团购网站需要什么威海网站建设威海
news/
2025/10/5 14:05:13/
文章来源:
做团购网站需要什么,威海网站建设威海,做设计常用的素材网站,如何免费制作网站SpringBoot教程(十五) | SpringBoot集成RabbitMq
RabbitMq是我们在开发过程中经常会使用的一种消息队列。今天我们来研究研究rabbitMq的使用。
rabbitMq的官网#xff1a; rabbitmq.com/
rabbitMq的安装这里先略过#xff0c;因为我尝试了几次都失败了#xff0c;后面等我…SpringBoot教程(十五) | SpringBoot集成RabbitMq
RabbitMq是我们在开发过程中经常会使用的一种消息队列。今天我们来研究研究rabbitMq的使用。
rabbitMq的官网 rabbitmq.com/
rabbitMq的安装这里先略过因为我尝试了几次都失败了后面等我安装成功了会把详细的文章发出来。目前是使用公司的环境进行的调试。
1. 一些概念
RabbitMQ是一个开源的消息代理和队列服务器用来实现各个应用服务间的数据共享跨平台 跨语言。RabbitMQ是使用erlang语言编写的并且基于AMQP协议实现。
所有的消息队列产品模型抽象上来说都是类似的过程。生产者创建消息然后发布到消息队列中由消费者进行消费。
而rabbitMQ也是类似的有生产者消费者角色。其内部结构如下图所示。 那么接下来我们就来介绍一下RabbitMQ中的这些概念。
1. Message:
消息就是我们需要传递和共享的信息消息由一些列的可选属性组成包括路由键优先级是否持久化等信息
2. Publisher
消息的生产者也是一个向交换机发布消息的客户端应用程序。
3. Exchange:
交换机这是RabbitMQ中的一个非常重要的概念在rabbitMq中生产者产生的消息都不是直接发送到队列中去的而是发送到了交换机中交换机会通过一定的规则绑定队列交换机会根据相应的路由规则发送给对服务器中的队列。
4. Binding:
绑定 用于交换机和消息列队之间的关联。一个绑定就是基于路由键routing-key将交换机和消息队列连接起来的路由规则。所以可以将交换机理解成一个有绑定有成的路由表。
5. Queue:
消息队列用来保存消息直到发送给消费者。它是消息的容器也是消息的终点。一个消息可以投入一个或多个队列中。消息一直在对队列里边等待消费者连接到这个队列将其消费。
6. Connection:
网络连接比如一个TCP连接。
7. Channel
信道多路复用连接中的一条独立的双向数据流通道。信道是简历在真实的TCP连接内的虚拟连接。AMQP命令都是通过信道发出去的不管是发布消息、订阅队列还是接收消息这些动作都是通过信道完成的。因为对于操作系统过来说建立和销毁TCP都是非常昂贵的开销所以引入了信道的概念以复用一条TCP连接。
8. Consumer
消息的消费者表示一个从消息队列中取得消息的客户端应用。
9. Virtual Host
虚拟主机标识一批交换机、消息队列和相关对象。 虚拟主机是相同的身份认证和加密环境的独立服务器域。 每个vhost本质就是一个mini版的rabbitMQ服务器拥有自己的队列交换机绑定和权限机制。vhost是AMQP概念的基础必须在连接时指定RabbitMQ的默认vhost是/.
10. Broker
标识消息队列服务器实体。
2. Exchange类型
Exchange分发消息的时候根据类型的不同分发策略有所区别目前常见的有四种类型 direct、fanout、topic、headers。 headers匹配AMQP消息的header而不是路由键此外headers交换机和direct交换机完成一直但是性能差很多几乎用不到了所以直接看另外三种类型。
2.1 direct交换机 消息中的路由键routing key如果和Binding中的bing key一致交换机就将消息发送到队列的队列中。路由键要完全匹配单个传播。
2.2 fanout 每个发到fanout类型交换机的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键只是简单的将队列绑定到交换机上每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。
2.3 topic topic交换机通过模式匹配分配路由的路由键属性将路由键和某个模式进行匹配此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词这些单词之间用.隔开。它同样会识别两个通配符 # 和* 。 #匹配0个或多个单词 * 匹配一个单词
3. springBoot集成RabbitMQ
SpringBoot集成rabbitMQ还是比较简单的因为springBoot使用RabbitTemplate对常用操作进行了封装。
接下来我们来看一下集成过程。首先导入依赖。
xml复制代码!-- 无需在parent的配置文件中添加 --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency然后在springBoot的配置文件 application.yml中配置rabbitMQ连接信息
yml复制代码server:port: 7890spring:rabbitmq:host: 172.15.33.52port: 5672username: rootpassword: 123456接下来我们我们分三种交换机进行演示。
3.1 direct
首先是配置类在配置类中我们需要声明交换机队列和绑定关系。
java复制代码Configuration
public class DirectExchangeConfig {public static final String DIRECT_QUEUE directQueue;public static final String DIRECT_QUEUE2 directQueue2;public static final String DIRECT_EXCHANGE directExchange;public static final String DIRECT_ROUTING_KEY direct;Beanpublic Queue directQueue() {return new Queue(DIRECT_QUEUE, true);}Beanpublic Queue directQueue2() {return new Queue(DIRECT_QUEUE2, true);}Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE, true, false);}Beanpublic Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);}Beanpublic Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY);}}这里我们创建了一个叫directExchange的交换机绑定了directQueue和directQueue2两个队列路由键是direct.
消息的生产者我们通过一个Controller来进行模拟,直接引用rabbitTemplate
java复制代码RestController
Slf4j
RequestMapping(/direct)
public class DirectController {private final RabbitTemplate rabbitTemplate;public DirectController(RabbitTemplate rabbitTemplate) {this.rabbitTemplate rabbitTemplate;}/*** direct交换机为直连模式交换机* 根据消息携带的路由键将消息投递给对应队列*** return*/GetMapping(send)public Object sendMsg() {rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, 发送一条测试消息direct);return direct消息发送成功;}当我在浏览器访问对应连接的时候就会生产一条消息发送到directExchange交换机路由key为direct, 消息内容为发送一条测试消息direct
接下来我们来看消息的消费者。
java复制代码package com.lsqingfeng.action.rabbitmq.direct;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** className: DirectQueueListener* description: 直连交换机的监听器* author: sh.Liu* date: 2021-08-23 16:03*/
Slf4j
Component
public class DirectQueueListener {/*** 尽管设置了两个消费者但是只有一个能够消费成功* 多次发送则轮训消费* DirectReceiver消费者收到消息1 : 发送一条测试消息direct* DirectReceiver消费者收到消息2 : 发送一条测试消息direct* DirectReceiver消费者收到消息1 : 发送一条测试消息direct* DirectReceiver消费者收到消息2 : 发送一条测试消息direct** 一个交换机可以绑定多个队列。如果通过路由key可以匹配到多个队列消费的时候也只能有一个进行消费* param testMessage*/RabbitHandlerRabbitListener(queues DirectExchangeConfig.DIRECT_QUEUE)public void process(String testMessage) {System.out.println(DirectReceiver消费者收到消息1 : testMessage);}RabbitHandlerRabbitListener(queues DirectExchangeConfig.DIRECT_QUEUE)public void process2(String testMessage) {System.out.println(DirectReceiver消费者收到消息2 : testMessage);}RabbitHandlerRabbitListener(queues DirectExchangeConfig.DIRECT_QUEUE2)public void process3(String testMessage) {System.out.println(DirectReceiver消费者收到消息3 : testMessage);}}当我们访问浏览器生产消息会观察控制台结果 DirectReceiver消费者收到消息1 : 发送一条测试消息direct DirectReceiver消费者收到消息3 : 发送一条测试消息direct 在发送一次 DirectReceiver消费者收到消息3 : 发送一条测试消息direct DirectReceiver消费者收到消息2 : 发送一条测试消息direct 由于我们又两个队列都绑定了交换机且routeKey一样所以会打印两条。要注意direct只有routeKey完全匹配的时候才能被消费同时每个队列中的消息只会 被消费一次。
3.2 fanout
配置类
java复制代码Configuration
public class FanoutExchangeConfig {public static final String FANOUT_QUEUE fanoutQueue;public static final String FANOUT_QUEUE2 fanoutQueue2;public static final String FANOUT_QUEUE3 fanoutQueue3;public static final String FANOUT_EXCHANGE fanoutExchange;public static final String FANOUT_ROUTING_KEY fanout;Beanpublic Queue fanoutQueue() {return new Queue(FANOUT_QUEUE, true);}Beanpublic Queue fanoutQueue2() {return new Queue(FANOUT_QUEUE2, true);}Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE, true, false);}Beanpublic Binding bindingFanoutExchange(Queue fanoutQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}Beanpublic Binding bindingFanoutExchange2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}这里也是用一个Fanout类型的交换机绑定了两个队列要注意在这种模式下是不需要指定routing-Key的因为所有绑定的队列都会收到消息。
生产者代码如下
java复制代码RestController
Slf4j
RequestMapping(/fanout)
public class FanoutController {private final RabbitTemplate rabbitTemplate;public FanoutController(RabbitTemplate rabbitTemplate) {this.rabbitTemplate rabbitTemplate;}/*** fanout交换机为扇形模式交换机* 消息会发送到所有绑定的队列上。* return*/GetMapping(send)public Object sendMsg() {rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, 发送一条测试消息fanout);return fanout消息发送成功;}
}消息的消费者
java复制代码Slf4j
Component
public class FanoutQueueListener {/*** fanout交换机 扇型交换机这个交换机没有路由键概念就算你绑了路由键也是无视的。 这个交换机在接收到消息后会直接转发到绑定到它上面的所有队列* 同一个队列监听多次只会消费一次。* 交换机绑定的多个队列都可以收到消息* param testMessage*/RabbitHandlerRabbitListener(queues FanoutExchangeConfig.FANOUT_QUEUE)public void process(String testMessage) {System.out.println(FanoutReceiver消费者收到消息1 : testMessage);}RabbitHandlerRabbitListener(queues FanoutExchangeConfig.FANOUT_QUEUE)public void process2(String testMessage) {System.out.println(FanoutReceiver消费者收到消息2 : testMessage);}RabbitHandlerRabbitListener(queues FanoutExchangeConfig.FANOUT_QUEUE2)public void process3(String testMessage) {System.out.println(FanoutReceiver消费者收到消息3 : testMessage);}}打印结果 FanoutReceiver消费者收到消息1 : 发送一条测试消息fanout FanoutReceiver消费者收到消息3 : 发送一条测试消息fanout 因为方法1和方法2监听的是同一个队列只有一个可以消费成功。多次执行两个方法交替执行。
3.3 topic
主题交换机会根据routing-Key的匹配规则将消息发送到符合规则的队列中。
配置类
java复制代码/*** className: TopicExchangeConfig* description:* * (星号) 用来表示一个单词 (必须出现的)* # (井号) 用来表示任意数量零个或多个单词* author: sh.Liu* date: 2021-08-23 15:49*/
Configuration
public class TopicExchangeConfig {public static final String TOPIC_QUEUE topicQueue;public static final String TOPIC_QUEUE2 topicQueue2;public static final String TOPIC_QUEUE3 topicQueue3;public static final String TOPIC_EXCHANGE topicExchange;public static final String TOPIC_ROUTING_KEY topic*;Beanpublic Queue topicQueue() {return new Queue(TOPIC_QUEUE, true);}Beanpublic Queue topicQueue2() {return new Queue(TOPIC_QUEUE2, true);}Beanpublic Queue topicQueue3() {return new Queue(TOPIC_QUEUE3, true);}Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE, true, false);}Beanpublic Binding bindingTopicExchange(Queue topicQueue, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue).to(topicExchange).with(topic.#);}Beanpublic Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with(test.#);}Beanpublic Binding bindingTopicExchange3(Queue topicQueue3, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue3).to(topicExchange).with(#);}
}这里要注意我们的绑定管关系。分别是topic.#, test.*, #
#: 代表所有* 代表有且只有一个。
消息的发送者我们将routingKey作为参数方便我们看效果
java复制代码RestController
Slf4j
RequestMapping(/topic)
public class TopicController {private final RabbitTemplate rabbitTemplate;public TopicController(RabbitTemplate rabbitTemplate) {this.rabbitTemplate rabbitTemplate;}GetMapping(send)public Object sendMsg(String routingKey) {rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, routingKey, 发送一条测试消息topic);return topic消息发送成功;}消息的消费者
java复制代码/*** className: TopicQueueListener* description: 主题交换机的监听器* author: sh.Liu* date: 2021-08-23 16:03*/
Slf4j
Component
public class TopicQueueListener {/*** topic: 主题交换机* param testMessage*/RabbitHandlerRabbitListener(queues TopicExchangeConfig.TOPIC_QUEUE)public void process(String testMessage) {System.out.println(TopicReceiver消费者收到消息1 : testMessage);}RabbitHandlerRabbitListener(queues TopicExchangeConfig.TOPIC_QUEUE)public void process2(String testMessage) {System.out.println(TopicReceiver消费者收到消息2 : testMessage);}RabbitHandlerRabbitListener(queues TopicExchangeConfig.TOPIC_QUEUE2)public void process3(String testMessage) {System.out.println(TopicReceiver消费者收到消息3 : testMessage);}RabbitHandlerRabbitListener(queues TopicExchangeConfig.TOPIC_QUEUE3)public void process4(String testMessage) {System.out.println(TopicReceiver消费者收到消息4 : testMessage);}}请求http://localhost:7890/topic/send?routingKeytest.a
结果 TopicReceiver消费者收到消息3 : 发送一条测试消息topic TopicReceiver消费者收到消息4 : 发送一条测试消息topic 代表 test.* 和 # 与路由key匹配成功
请求http://localhost:7890/topic/send?routingKeytopic.123 TopicReceiver消费者收到消息1 : 发送一条测试消息topic TopicReceiver消费者收到消息4 : 发送一条测试消息topic 代表 topic.# 和 # 匹配成功
请求 http://localhost:7890/topic/send?routingKeytest TopicReceiver消费者收到消息4 : 发送一条测试消息topic test.* 后面必须要有一个单词
请求 http://localhost:7890/topic/send?routingKeytest.aaa TopicReceiver消费者收到消息4 : 发送一条测试消息topic TopicReceiver消费者收到消息3 : 发送一条测试消息topic test.*和 #匹配成功
请求http://localhost:7890/topic/send?routingKeytest.aaa.b TopicReceiver消费者收到消息4 : 发送一条测试消息topic 只对# 匹配成功 因为test.*只能匹配一个单词aaa.b代表两个
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/928320.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!