【RabbitMQ】整合 SpringBoot,实现工作队列、发布/订阅、路由和通配符模式

文章目录

  • 工作队列模式
    • 引入依赖
    • 配置
    • 声明
    • 生产者代码
    • 消费者代码
  • 发布/订阅模式
    • 引入依赖
    • 声明
    • 生产者代码
      • 发送消息
    • 消费者代码
    • 运行程序
  • 路由模式
    • 声明
    • 生产者代码
    • 消费者代码
    • 运行程序
  • 通配符模式
    • 声明
    • 生产者代码
    • 消费者代码
    • 运行程序

工作队列模式

引入依赖

我们在创建 SpringBoot 项目的时候,选上这两个依赖即可 |380

或者在依赖中加入

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

配置

将配置文件后缀改成 yml 之后,进行配置image.png|372

#配置 RabbitMQ 的基本信息
spring:rabbitmq:  host: 127.0.0.1 #RabbitMQ 服务器的地址  port: 15673  #RabbitMQ的TCP协议的端口号,而不是管理平台的端口号。默认为5672  username: guest  password: guest  virtual-host: coding #默认为 /

或者这样写

spring:rabbitmq:addresses: amqp://guest:guest@127.0.0.1:5672/coding
  • 格式为: amqp://username:password@ip:port/virtual-host

声明

注意引入的是这个包
image.png

package org.example.rabbitmq.config;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.core.QueueBuilder;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  @Configuration  
public class RabbitMQConfig {  // 声明一个队列,来自第三方包,就是一个对象  @Bean("workQueue")  public Queue workQueue(){  return QueueBuilder.durable(Constants.WORK_QUEUE).build();  }  
}

生产者代码

package org.example.rabbitmq.controller;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  @RestController  
@RequestMapping("/producer")  
public class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;  @RequestMapping("/work")  public String work() {  // 使用内置交换机的话,RoutingKey 和队列名称一致  rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp: work...");  return "发送成功";  }  
}
  • 在运行程序之后,队列不会被立马创建出来
  • 需要发送消息之后才会被创建image.png|278

消费者代码

消费者是通过实现一个监听类,来监听有没有消息

  • 采用一个注解—— @RabbitListener

@RabbitListenerSpring 框架中用于监听 RabbitMQ 队列的注解,通过使用这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息。

  • 该注解支持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息
  • 以下是一些常用的参数类型:
    • String:返回消息的内容
    • Message (org.spring.framework.ampq.core.Message):Spring AMPQMessage 类,返回原始的消息体以及消息的属性,如消息 ID,内容,队列信息等
    • Channel (com.rabbitmq.client.Channel):RabbitMQ 的通道对象,可以用于进行高级的操作,如手动确认消息
package org.example.rabbitmq.listener;  import org.apache.logging.log4j.message.Message;  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  @Component  
public class WorkListener {  @RabbitListener(queues = Constants.WORK_QUEUE)  public void queueListener1(Message message) {  System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message);  }  @RabbitListener(queues = Constants.WORK_QUEUE)  public void queueListener2(String message) {  System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message);  }  
}

发布/订阅模式

在发布/订阅模式中,多了一个 Exchange 角色。Exchange 常见有三种类型,分别代表不同的路由规则

  • Fanout: 广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe 模式)
  • Direct: 定向,把消息交给符合指定 Routing Key 的队列(Routing 模式)
  • Topic: 通配符,把消息交给符合 Routing pattern (路由模式) 的队列(Topics 模式)

引入依赖

我们在创建 SpringBoot 项目的时候,选上这两个依赖即可 |380

或者在依赖中加入

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

声明

package org.example.rabbitmq.config;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.core.*;  
import org.springframework.beans.factory.annotation.Qualifier;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  @Configuration  
public class RabbitMQConfig {  /**  * 二、发布/订阅模式  * 声明队列、声明交换机、声明队列和交换机的绑定  * @return  */  @Bean("fanoutQueue1")  // @Bean注解:交给Spring进行管理, 括号里面是指定名称  public Queue fanoutQueue1() {  return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();  }  @Bean("fanoutQueue2")  public Queue fanoutQueue2() {  return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();  }  @Bean("fanoutExchange")  // 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange  public FanoutExchange fanoutExchange() {  return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();  }  @Bean("fanoutQueueBinding1")  public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) {  return BindingBuilder.bind(queue).to(fanoutExchange);  }  @Bean("fanoutQueueBinding2")  public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(fanoutExchange);  }  
}

生产者代码

image.png|276

  1. 声明队列
  2. 声明交换机
  3. 声明交换机和队列的绑定
  4. 发送消息

发送消息

package org.example.rabbitmq.controller;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  @RestController  
@RequestMapping("/producer")  
public class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;  @RequestMapping("/fanout")  public String fanout() {  rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp:fanout...");  return "发送成功";  }  
}

消费者代码

package org.example.rabbitmq.listener;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  @Component  
public class FanoutListener {  @RabbitListener(queues = Constants.FANOUT_QUEUE1)  public void queueListener1(String message) {  System.out.println("队列[" + Constants.FANOUT_QUEUE1 + "] 接收到消息:" + message);  }  @RabbitListener(queues = Constants.FANOUT_QUEUE2)  public void queueListener2(String message) {  System.out.println("队列[" + Constants.FANOUT_QUEUE2 + "] 接收到消息:" + message);  }  
}

运行程序

  1. 运行项目,调用接口发送消息
    • http://127.0.0.1:8080/producer/fanout
    • image.png

image.png

  1. 监听类收到消息,并打印
    image.png

路由模式

交换机类型为 Direct 时,会把消息交给符合指定 Routing Key 的队列

  • 队列和交换机的绑定,不是任意的绑定了,而是要制定一个 RoutingKey(路由 key
  • 消息的发送方在向 Exchange 发送消息时,也需要指定消息的 RoutingKey
  • Exchange 也不再把消息交给每一个绑定的 key,而是根据消息的 RoutingKey 进行判断,只有队列的 RoutingKey 和消息的 RoutingKey 完全一致,才会接收消息

image.png|315

声明

按照这个图片,进行绑定image.png|385

/**  * 三、 路由模式  * 声明队列、声明交换机、声明队列和交换机的绑定  * @return  */  
@Bean("directQueue1")  
public Queue directQueue1(){  return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();  
}  @Bean("directQueue2")  
public Queue directQueue2(){  return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();  
}  @Bean("directExchange")  
// 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange  
public DirectExchange directExchange() {  return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();  
}  @Bean("directQueueBinding1")  
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) {  return BindingBuilder.bind(queue).to(directExchange).with("a");  
}  @Bean("directQueueBinding2")  
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(directExchange).with("a");  
}  @Bean("directQueueBinding3")  
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(directExchange).with("b");  
}  @Bean("directQueueBinding4")  
public Binding directQueueBinding4(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(directExchange).with("c");  
}

生产者代码

package org.example.rabbitmq.controller;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  @RestController  
@RequestMapping("/producer")  
public class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;  /**  * 三、路由模式  * @param routingKey  * @return  */  @RequestMapping("/direct/{routingKey}")  //从路径中拿到这个routingKey  public String direct(@PathVariable("routingKey") String routingKey) {  rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey,"hello spring amqp:direct, my routing key is" + routingKey);  return "发送成功";  }  
}

消费者代码

package org.example.rabbitmq.listener;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  @Component  
public class DirectListener {  @RabbitListener(queues = Constants.DIRECT_QUEUE1)  public void queueListener1(String message) {  System.out.println("队列[" + Constants.DIRECT_QUEUE1 + "] 接收到消息:" + message);  }  @RabbitListener(queues = Constants.DIRECT_QUEUE2)  public void queueListener2(String message) {  System.out.println("队列[" + Constants.DIRECT_QUEUE2 + "] 接收到消息:" + message);  }  
}

运行程序

  1. 运行项目

  2. 调用接口发送 routingKeya 的消息

    • http://127.0.0.1:8080/producer/direct/a
    • 观察后端日志,队列 1 和 2 都收到消息 image.png
  3. 调用接口发送 routingKeyb 的消息

    • http://127.0.0.1:8080/producer/direct/b
    • 观察后端日志,队列 2 收到消息image.png|347
  4. 调用接口发送 routingKeyc 的消息

    • http://127.0.0.1:8080/producer/direct/c
    • 观察后端日志,队列 2 收到消息|372

通配符模式

TopicsRouting 模式的区别是:

  1. topics 模式使用的交换机类型为 topicRouting 模式使用的是 direct
  2. topic 类型的交换机在匹配规则上进行了扩展,Binding Key 支持通配符匹配

image.png|419

  • * 表示一个单词
  • # 表示多个单词

声明

/**  * 四、通配符模式  * 声明队列、声明交换机、声明队列和交换机的绑定  * @return  */  
@Bean("topicQueue1")  
public Queue topicQueue1(){  return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();  
}  @Bean("topicQueue2")  
public Queue topicQueue2(){  return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();  
}  @Bean("topicExchange")  
public TopicExchange topicExchange() {  return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();  
}  @Bean("topicQueueBinding1")  
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) {  return BindingBuilder.bind(queue).to(topicExchange()).with("*.a.*");  
}  @Bean("topicQueueBinding2")  
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(topicExchange()).with("*.*.b");  
}  @Bean("topicQueueBinding3")  
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(topicExchange()).with("c.#");  
}

生产者代码

package org.example.rabbitmq.controller;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  @RestController  
@RequestMapping("/producer")  
public class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;  /**  * 四、通配符模式  * @param routingKey  * @return  */  @RequestMapping("/topic/{routingKey}")  public String topic(@PathVariable("routingKey") String routingKey) {  rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey, "hello spring amqp:topic, my routing key is " + routingKey);  return "发送成功";  }  
}

消费者代码

运行程序

  1. 运行程序

  2. 调用接口发送 routingKeyqqq.a.b 的消息

    • http://127.0.0.1:8080/producer/topic/qqq.a.b
    • 观察后端日志,队列 1 和队列 2 均收到消息image.png|435
  3. 调用接口发送 routingKeyc.abc.fff 的消息

    • http://127.0.0.1:8080/producer/topic/c.abc.fff
    • 观察后端日志,队列 2 收到信息image.png
  4. 调用接口发送 routingKeyg.h.j 的消息

    • http://127.0.0.1:8080/producer/topic/g.h.j
    • 观察后端日志,没有队列收到消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/81255.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python-92:最大乘积区间问题

问题描述 小R手上有一个长度为 n 的数组 (n > 0)&#xff0c;数组中的元素分别来自集合 [0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]。小R想从这个数组中选取一段连续的区间&#xff0c;得到可能的最大乘积。 你需要帮助小R找到最大乘积的区间&#xff0c;并输出这…

windows触摸板快捷指南

以下是结构化整理后的触控手势说明&#xff0c;采用清晰的层级划分和标准化表述&#xff1a; **触控手势操作规范****1. 单指操作****2. 双指操作****3. 三指操作****4. 四指操作** **优化说明&#xff1a;** 触控手势操作规范 1. 单指操作 手势功能描述等效操作单击滑动选择…

VSCode launch.json 配置参数详解

使用 launch.json 配置调试环境时&#xff0c;会涉及到多个参数&#xff0c;用于定义调试器的行为和目标执行环境。以下是一些常用的配置参数&#xff1a; 1、"type" &#xff1a;指定调试器的类型&#xff0c;例如 "node" 表示 Node.js 调试器&#xff0…

mAP、AP50、AR50:目标检测中的核心评价指标解析

在目标检测任务中&#xff0c;评价指标是衡量模型性能的核心工具。其中&#xff0c;mAP&#xff08;mean Average Precision&#xff09;、AP50&#xff08;Average Precision at IoU0.5&#xff09;和AR50&#xff08;Average Recall at IoU0.5&#xff09;是最常用的指标。本…

【论文阅读】A Survey on Multimodal Large Language Models

目录 前言一、 背景与核心概念1-1、多模态大语言模型&#xff08;MLLMs&#xff09;的定义 二、MLLMs的架构设计2-1、三大核心模块2-2、架构优化趋势 三、训练策略与数据3-1、 三阶段训练流程 四、 评估方法4-1、 闭集评估&#xff08;Closed-set&#xff09;4-2、开集评估&…

[已解决] LaTeX “Unicode character“ 报错 (中文字符处理)

问题&#xff1a; 写 LaTeX 文档&#xff0c;特别是包含中文时&#xff0c;经常遇到类似下图的 “Unicode character XXXXXX” 报错 (X) Unicode character 本 (U672C) LaTeX [行 xx, 列 x] (X) Unicode character 报 (U62A5) LaTeX [行 xx, 列 x] ...这通常意味着我们的 LaTe…

现货黄金跌破 3160 美元,市场行情剧烈波动​

在 5 月 16 日的交易时段中&#xff0c;现货黄金市场出现戏剧性变化&#xff0c;价格短时间内大幅跳水。截至当日 20:04&#xff0c;现货黄金短线下挫 20 美元&#xff0c;一举跌破 3160 美元 / 盎司&#xff0c;日内跌幅达 2.56%&#xff1b;纽约期金日内也大跌 2%&#xff0c…

智慧校园(含实验室)智能化专项汇报方案

该方案聚焦智慧校园(含实验室)智能化建设,针对传统实验室在运营监管、环境监测、安全管控、排课考勤等方面的问题,依据《智慧校园总体框架》等标准,设计数字孪生平台、实验室综合管理平台、消安电一体化平台三大核心平台,涵盖通信、安防、建筑设备管理等设施,涉及 395 个…

【Python爬虫 !!!!!!政府招投标数据爬虫项目--医疗实例项目文档(提供源码!!!)!!!学会Python爬虫轻松赚外快】

政府招投标数据爬虫项目--医疗实例项目文档 1. 项目概述1.1 项目目标1.2 技术栈2. 系统架构2.1 模块划分2.2 流程示意图3. 核心模块设计3.1 反爬处理模块(`utils/anti_crawler.py`)3.1.1 功能特性3.1.2 关键代码3.2 爬虫模块(`crawler/spiders/`)3.2.1 基类设计(`base_spi…

RabbitMQ是什么?应用场景有哪些?

RabbitMQ 是一款开源的消息代理中间件,基于 AMQP(高级消息队列协议)实现,用于在分布式系统中进行异步通信和消息传递。它通过将消息的发送者和接收者解耦,提高了系统的可扩展性、可靠性和灵活性。 核心特点 多协议支持:不仅支持 AMQP,还兼容 STOMP、MQTT 等多种消息协议…

RT Thread FinSH(msh)调度逻辑

文章目录 概要FinSH功能FinSH调度逻辑细节小结 概要 RT-Thread&#xff08;Real-Time Thread&#xff09;作为一款开源的嵌入式实时操作系统&#xff0c;在嵌入式设备领域得到了广泛应用。 该系统不仅具备强大的任务调度功能&#xff0c;还集成了 FinSH命令行系统&#xff0c…

我司助力高校打造「智慧创新AI学习中心」

为推动AI教育融合跨领域应用&#xff0c;东吴大学于2025年4月举行「智慧创新AI学习中心」揭牌仪式&#xff0c;并宣布正式启动AI特色课程与教学空间建置计画。此次建置由我司协助整体教室空间与设备规划&#xff0c;导入最新NVIDIA GeForce RTX 50系列桌上型电脑&#xff0c;并…

给你的matplotlib images添加scale Bar

​Scale Bar&#xff08;比例尺&#xff09;用于直观表示图像与实际物理尺寸&#xff08;如微米、毫米等&#xff09;的对应关系。例如&#xff0c;在显微镜图像中&#xff0c;比例尺可以标注“75μm”表示图中某线段对应的实际长度。 这里分享使用matplotlib中的imshow结合ma…

基于React的高德地图api教程004:线标记绘制、修改、删除功能实现

文章目录 4、线绘制4.1 绘制线标记4.1.1 开启线标记绘制模式4.1.2 绘制线标记4.1.3 关闭线标记模式4.2 可视化线标记数据面板4.3 修改线标记4.3.1 修改线标记路径4.3.2 修改线标记名称和颜色4.4 删除线标记4.5 定位线标记4.6 代码下载4.04、线绘制 4.1 绘制线标记 4.1.1 开启…

lc42接雨水

1.原题 42. 接雨水 - 力扣&#xff08;LeetCode&#xff09; 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 2.题目解析 这一题是经常被考到的一道算法题&#xff0c;其中最简单最好用的方法就是双指…

【读代码】端到端多模态语言模型Ultravox深度解析

一、项目基本介绍 Ultravox是由Fixie AI团队开发的开源多模态大语言模型,专注于实现音频-文本的端到端实时交互。项目基于Llama 3、Mistral等开源模型,通过创新的跨模态投影架构,绕过了传统语音识别(ASR)的中间步骤,可直接将音频特征映射到语言模型的高维空间。 核心优…

力扣HOT100之二叉树:98. 验证二叉搜索树

这道题之前也刷过&#xff0c;自己做了一遍&#xff0c;发现卡在了第70多个样例&#xff0c;才发现自己没有利用二叉搜索树的性质&#xff0c;但凡涉及到二叉搜索树&#xff0c;应该首先考虑中序遍历&#xff01;&#xff01;&#xff01; 被卡住的测试样例是这样的&#xff1a…

Centos7.9同步外网yum源至内网

curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo curl -o /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo yum makecache yum repolist安装软件 yum install -y yum-utils createrepo # yum-utils包含re…

HMDB51数据集划分

生成训练集、验证集和测试集 每个split文件应该包含&#xff1a; 训练集(id1): 70个视频测试集(id2): 30个视频未使用(id0): 剩余视频 这是一个70/30的训练/测试分割比例。标记为0的视频被排除在当前实验之外。实际上训练集&#xff08;id1&#xff09;&#xff0c;验证集&am…

Spring Boot 项目的计算机专业论文参考文献

技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论文…