分布式之RabbitMQ的使用(3)QueueBuilder - 详解

news/2025/11/14 21:47:29/文章来源:https://www.cnblogs.com/yangykaifa/p/19223533

文章目录

  • 前言
  • 一、概述
  • 二、功能特点
  • 三、准备工作
    • 创建CreateRabbitMQConfig
    • 创建listening
    • 创建QueueBuilderController
  • 四、常用属性及方法介绍
    • (一)创建持久化队列
      • 语法
      • 代码编写
        • CreateRabbitMQConfig
        • QueueBuilderController
        • ListenRabbitMQConfig
        • 测试
    • (二)创建非持久化队列
    • (三)创建排他队列
    • (四) 创建自动删除队列
  • (五)设置队列长度限制
  • 五、使用步骤
    • (一)引入相关依赖
    • (二)创建队列对象
    • (三)后续操作

前言

本篇帖子是上一篇分布式之RabbitMQ的使用(3)的续写。

一、概述

QueueBuilder 是在与消息队列系统(如 RabbitMQ)集成时,用于以编程方式构建队列(Queue)的工具类或构建器模式的实现。它提供了一种便捷的方法来配置队列的各种属性,使得在应用程序中能够根据具体需求灵活创建不同类型和特性的队列。

二、功能特点

  1. 属性配置灵活性:通过一系列方法,可以轻松设置队列的关键属性,如是否持久化、是否排他、是否自动删除等,以满足不同的业务场景和消息处理要求。
  2. 与消息队列系统紧密集成:通常是特定消息队列客户端库(如 Spring AMQP 用于 Spring Boot 与RabbitMQ 集成)的一部分,能够无缝对接相应的消息队列服务,确保创建的队列在系统中正确生效。

三、准备工作

创建CreateRabbitMQConfig

新建config文件夹,并在其文件夹下创建类CreateRabbitMQConfig

package com.hsh.config;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CreateRabbitMQConfig {
}

创建listening

新建listen文件夹,并在其文件夹下创建类ListenRabbitMQConfig

package com.hsh.listen;
import org.springframework.stereotype.Component;
@Component
public class ListenRabbitMQConfig {
}

创建QueueBuilderController

在controller文件夹下创建类QueueBuilderController

package com.hsh.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/queueBuilderController")
public class QueueBuilderController {
@Autowired
private RabbitTemplate rabbitTemplate;
}

四、常用属性及方法介绍

(一)创建持久化队列

语法

方法: QueueBuilder.durable(String queueName)
说明:用于创建一个持久化队列。持久化队列在消息队列服务器重启或发生意外故障后,其队列定义(包括队列名称、属性等)以及队列中尚未被消费的消息不会丢失,会在服务器恢复正常后继续存在,可以继续被消费者消费。
语法如下示例:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();

在上述示例中,创建了一个名为 myDurableQueue 的持久化队列。

代码编写

我们以创建简单队列的持久化队列为例。

CreateRabbitMQConfig

在上面新建的CreateRabbitMQConfig中编写,用于在项目加载时创建队列

package com.hsh.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CreateRabbitMQConfig {
@Bean
public Queue myDurableQueue () {
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();
return durableQueue;
}
}
QueueBuilderController

编写QueueBuilderController,用于用户发起请求,也就是生产者。

package com.hsh.controller;
import com.hsh.pojo.Goods;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/queueBuilderController")
public class QueueBuilderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String index(){
Goods goods =new Goods();
for (int i = 0; i < 10; i++){
goods.setGoodsId(i);
rabbitTemplate.convertAndSend("myDurableQueue", goods);
}
return "发送成功";
}
}
ListenRabbitMQConfig

编写ListenRabbitMQConfig用于监听队列

package com.hsh.listen;
import com.hsh.pojo.Goods;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.stereotype.Component;
@Component
public class ListenRabbitMQConfig {
// 不在这里创建队列了 @RabbitListener(queuesToDeclare = @Queue("work"))
// 我们这个myDurableQueue在上面的 CreateRabbitMQConfig创建,这样更加灵活,也是为什么使用QueueBuilder的原因。
@RabbitListener(queues = "myDurableQueue")
public void receive(Message  message){
Goods goods = (Goods) SerializationUtils.deserialize(message.getBody());
System.out.println("消费者===========:" + goods);
}
}
测试

直接访问http://localhost:8080/queueBuilderController/send
控制台如下
在这里插入图片描述

(二)创建非持久化队列

方法: QueueBuilder.nonDurable(String queueName)
说明:创建一个非持久化队列。非持久化队列在某些情况下(如连接关闭、相关消费者进程结束等),队列及其可能剩余的消息通常会被自动删除,不具备在服务器重启等情况下的数据保留能力。
示例:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue nonDurableQueue = QueueBuilder.nonDurable("myNonDurableQueue")
.build();

这里创建了一个名为 myNonDurableQueue 的非持久化队列。

和上面创建持久化队列差不多就是把CreateRabbitMQConfig的内容换一下不再演示。

(三)创建排他队列

  • 方法: QueueBuilder.exclusive(String queueName)
  • 说明:排他队列是连接专属的,只有创建它的连接可以使用它,并且当连接关闭时,该队列会被自动删除。
  • 示例:
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    Queue exclusiveQueue = QueueBuilder.exclusive("myExclusiveQueue")
    .build();

此示例创建了一个名为 myExclusiveQueue 的排他队列。

(四) 创建自动删除队列

  • 方法: QueueBuilder.autoDelete(String queueName)
  • 说明:自动删除队列在所有消费者都取消订阅或者队列中的消息都被消费完后(满足其中一个条件即可),队列会会被自动删除。
  • 示例:
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    Queue autoDeleteQueue = QueueBuilder.autoDelete("myAutoDeleteQueue")
    .build();

创建了一个名为 myAutoDeleteQueue 的自动删除队列。

(五)设置队列长度限制

  • 方法: QueueBuilder.durable(String queueName).maxLength(int maxLength) (以在持久化队列上设置为例,非持久化等其他类型队列设置方式类似)
  • 说明:用于设置队列的最大长度限制,即队列中最多能容纳的消息数量。当队列中的消息数量达到该限制时,后续发送到该队列的消息可能会根据消息队列系统的策略进行相应处理(如被丢弃等)。
  • 示例:
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    Queue queueWithLengthLimit = QueueBuilder.durable("myQueueWithLimit")
    .maxLength(1000)
    .build();

在上述示例中,创建了一个名为 myQueueWithLimit 的持久化队列,并设置其最大长度为 1000 条消息。

五、使用步骤

(一)引入相关依赖

在使用 QueueBuilder 之前,需要确保项目中引入了与消息队列系统对应的客户端库。例如,在 SpringBoot 中集成 RabbitMQ 并使用 QueueBuilder 时,需要在项目的 pom.xml(Maven 项目)或build.gradle(Gradle 项目)中引入 Spring AMQP 依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(二)创建队列对象

根据业务需求,选择合适的 QueueBuilder 方法来创建队列对象。也就是上面的CreateRabbitMQConfig中编写如下代码例如:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
// 创建一个持久化队列
Queue durableQueue = QueueBuilder.durable("myDurableQueue")
.build();
// 创建一个非持久化队列
Queue nonDurableQueue = QueueBuilder.nonDurable("myNonDurableQueue")
.build();

(三)后续操作

创建好队列对象后,可以根据具体的消息队列集成场景进行后续操作,比如:

将队列绑定到交换机(在使用交换机的消息队列架构中):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 创建一个持久化队列
@Bean
public Queue myQueue() {
return QueueBuilder.durable("myQueue")
.build();
}
// 创建一个持久化的直接交换机
@Bean
public DirectExchange myDirectExchange() {
return ExchangeBuilder.directExchange("myDirectExchange")
.durable(true)
.build();
}
// 创建队列与交换机之间的绑定关系
/*
*/
@Bean
public Binding binding(DirectExchange myDirectExchange, Queue myQueue) {
return BindingBuilder.bind(myQueue)
.to(myDirectExchange)
.with("routingKey");
}
}
  • myQueue 方法使用 QueueBuilder 创建了一个名为 myQueue 的持久化队列,并通过 @Bean 注解将其注册为 Spring 容器中的一个 Bean。
  • myDirectExchange 方法使用 ExchangeBuilder 创建了一个名为myDirectExchange 的持久化直接交换机,同样通过 @Bean 注解将其注册为 Spring 容器中的一个 Bean。
  • binding 方法通过 @Bean 注解创建了一个 Binding 对象,用于将前面创建的 myQueue 队列和myDirectExchange 交换机进行绑定,并且指定了路由键为 routingKey 。Spring 会自动将已经创建好的 myDirectExchangemyQueue 这两个 Bean 作为参数传递给 binding 方法,无需手动干预。

binding方法:

  • 调用时机:Spring 在创建 Binding 这个 Bean 时会自动调用 binding 方法。因为这个方法也被标注了 @Bean 注解,所以 Spring 会识别它并按照创建 Bean 的流程来处理。
  • 参数传递: binding 方法的参数是 DirectExchange myDirectExchangeQueue myQueue 。Spring 会通过方法参数的类型来自动匹配已经在 Spring 容器中创建好的相应 Bean 实例并作为参数传递进来。也就是说,Spring 会找到之前通过 myDirectExchange 方法创建并注册的直接交换机 Bean 和通过 myQueue 方法创建并注册的队列 Bean,然后将它们分别传递给 binding 方法,以便在方法内部使用 BindingBuilder 将队列和交换机进行绑定操作。

将队列提供给消息生产者和消费者使用:
在消息生产者端,可以将消息发送到创建好的队列中。例如,在 Spring AMQP 中:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue myQueue;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(myQueue.getName(), message);
}
}

在上述代码中,通过 RabbitTemplate 将消息发送到了名为 myQueue 的队列中(这里假设 myQueue 是之前通过 QueueBuilder 创建的队列)。

在消息消费者端,可以从创建好的队列中获取消息进行处理。例如:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@RabbitListener(queues = "myQueue")
public void receiveMessage(String message) {
// 处理接收到的消息
System.out.println("Received message: " " + message);
}
}

这里通过 RabbitListener 注解监听名为 myQueue 的队列,当队列中有消息时,会调用receiveMessage 方法进行处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/965728.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年市面上口碑好的出国留学中介机构哪家强,全球联申/名校录取/留学就业一体化/背景提升/语言培训中介哪家好

2025年市面上口碑好的出国留学中介机构哪家强,全球联申/名校录取/留学就业一体化/背景提升/语言培训中介哪家好行业权威榜单发布,优质机构综合测评 随着全球化教育理念的深入普及和留学需求的持续攀升,出国留学中介…

网络犯罪新手段:黑客如何利用IT技术实施货物盗窃

本文详细揭露了网络犯罪分子如何通过网络钓鱼、远程监控工具和身份盗窃等手段入侵货运公司IT系统,窃取高价值货物。文章分析了攻击手法、行业漏洞及防护措施,涉及RMM工具、运输管理系统安全等关键技术内容。网络犯罪…

采用 OpenCASCADE 提取布尔运算后平面图形的外轮廓

采用 OpenCASCADE 提取布尔运算后平面图形的外轮廓pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas"…

很多争论不是认知问题,而是数学问题

🧠 很多争论不是认知问题,而是数学问题一、我们日常遇到的大部分争论,其实都是“数学逻辑问题” 很多人以为自己在讨论观点,其实他们在无意识地做数学,只是形式极其粗糙:没定义:集合不明确 没量化:变量无值 没…

11/14

今天没课,真爽

25.11.14

P13046 求出每个位置后面第一个变成 \(d\) 倍数的位置就能线性 dp 了,又是树上结构的样子,略去不叙。 前面这块有个很抽象的基环树做法,我们不要学习这个。 考虑为啥不能直接哈希,因为可能没有逆元,那么发现其实是…

题解:AtCoder ARC209D A_A_i

闲话帅炸了。 这是主播被 \(n=1\) 的 case 卡爆了,望周知。 题意 给定长度为 \(n\) 的序列 \(a\),值域为 \([1,n]\),有一些位置未确定。你需要给这些未确定的位置的确定取值,使得序列 \(b_i=a_{a_i}\) 的字典序最小…

代码制作数学动画 python manim jjmpeg - 何苦

代码制作数学动画 python manim jjmpeg安装 manimpip install manim安装ffmpeg 下载未完待续。。。

重组融合蛋白技术概述

重组融合蛋白的基本概念 重组融合蛋白是通过基因工程技术将两个或多个不同基因的编码序列连接,在宿主细胞中表达产生的单一多肽链。这种技术使得研究人员能够将不同蛋白质的功能域进行组合,创造出具有新特性的蛋白质…

OpenEuler安装宝塔

准备环境 sudo dnf update -y sudo dnf install -y wget curl vim #确认系统架构和版本,宝塔官方支持的系统主要是 CentOS/RHEL 7/8、Ubuntu 16-22,openEuler属于RHEL兼容系统,所以通常可以用CentOS安装脚本 uname -…

20230827 - Balancer 攻击事件:价格操纵 + 精度丢失的经典组合拳

攻击背景介绍 2023.08.27(没错是 2023 不是 2025),Balancer V2 的稳定币池遭到了黑客攻击,导致多条链上价值约 368k 美元的资产被盗。黑客利用 rounding down(精度丢失)问题操纵 bb-a-USDC 的价格,从稳定币池中…

我的标题2

我的内容2222222

破解cocos creator 2.3.2, 让它支持M芯片

cocos creator 貌似低于2.4.3的版本,都无法在mac m芯片上运行, 之所以不能运行,还是因为electron版本太低导致 对它进行逆向,破解app.asar, 然后升级electron相关的东西,并且升级vue ui组建 开源地址:https://gi…

Kotlin Coroutines

https://kotlinlang.org/docs/coroutines-overview.html 协程是作为三方库进行提供的,类似 javax <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><kotlin.co…

我的标题

我的内容111111

深入解析:软考中级-系统集成项目管理工程师**的超详细知识点笔记。

深入解析:软考中级-系统集成项目管理工程师**的超详细知识点笔记。2025-11-14 21:14 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !…

GeoScene Pro试用申请

GeoScene Pro试用申请 按在线的文档配置 GeoScene Pro申请试用 - 易智瑞教育网 1)要求你输入许可的,如下图,这时候需要更换【许可类型】为“指定用户许可”(4.0版本软件这里写的是“授权用户许可”) 更换许可…

题解:P13573 [CCPC 2024 重庆站] Pico Park

P13573:区间 DP、组合数学VP 的时候没题可跟了,就开了这题切掉了,结果 VP 结束发现正赛就一个队伍过了??? 若 \(x\) 用缩小枪击中了 \(y\),则从 \(x\) 向 \(y\) 连一条有向边。注意到,任何一个时刻得到的图是若…

【AI智能体】Coze 提取对标账号短视频生成视频文案实战详解 - 指南

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

Java Benchmark使用

如何测量Java代码的性能 在 Java 中,可以使用多种方法来测量一段代码的执行性能。使用 System.currentTimeMillis()是最常见的方法 long startTime = System.currentTimeMillis();// 需要测量的代码块 for (int i = 0…