RabbitMQ Stream插件使用详解

2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持。

  • RabbitStreamTemplate
  • StreamListener容器

将spring rabbit流依赖项添加到项目中:

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId><version>3.1.4</version>
</dependency>

您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定队列类型,正常地配置队列。例如:

@Bean
Queue stream() {return QueueBuilder.durable("stream.queue1").stream().build();
}

然而,这仅在您还使用non-stream 组件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)时才有效,因为在打开AMQP连接时会触发管理员来声明定义的bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置StreamAdmin:

@Bean
StreamAdmin streamAdmin(Environment env) {return new StreamAdmin(env, sc -> {sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();sc.stream("stream.queue2").create();});
}

一、Sending Messages

RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。

public interface RabbitStreamOperations extends AutoCloseable {CompletableFuture<Boolean> send(Message message);CompletableFuture<Boolean> convertAndSend(Object message);CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);MessageBuilder messageBuilder();MessageConverter messageConverter();StreamMessageConverter streamMessageConverter();@Overridevoid close() throws AmqpException;}

RabbitStreamTemplate实现具有以下构造函数和属性:

public RabbitStreamTemplate(Environment environment, String streamName) {
}public void setMessageConverter(MessageConverter messageConverter) {
}public void setStreamConverter(StreamMessageConverter streamConverter) {
}public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverter在convertAndSend方法中用于将对象转换为Spring AMQP消息。

StreamMessageConverter用于将Spring AMQP消息转换为本机流消息。

您也可以直接发送本机流消息;使用messageBuilder()方法提供对生产者的消息生成器的访问。

ProducerCustomizer提供了一种机制,用于在生成生产者之前对其进行自定义。

 二、Receiving Messages

异步消息接收由StreamListenerContainer(以及使用@RabbitListener时的StreamRabbitListerContainerFactory)提供。

侦听器容器需要一个Environment以及一个流名称。

您可以使用经典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:

public interface StreamMessageListener extends MessageListener {void onStreamMessage(Message message, Context context);}

有关支持的属性的信息,请参阅消息侦听器容器配置。

与模板类似,容器具有ConsumerCustomizer属性。

有关自定义环境和使用者的信息,请参阅Java客户端文档。

使用@RabbitListener时,配置StreamRabbitListerContainerFactory;此时,大多数@RabbitListener属性(并发等)将被忽略。仅支持id、队列、autoStartup和containerFactory。此外,队列只能包含一个流名称。

三、Examples

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");template.setProducerCustomizer((name, builder) -> builder.name("test"));return template;
}@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {return new StreamRabbitListenerContainerFactory(env);
}@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {...
}@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);factory.setNativeListener(true);factory.setConsumerCustomizer((id, builder) -> {builder.name("myConsumer").offset(OffsetSpecification.first()).manualTrackingStrategy();});return factory;
}@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {...context.storeOffset();
}@Bean
Queue stream() {return QueueBuilder.durable("test.stream.queue1").stream().build();
}@Bean
Queue stream() {return QueueBuilder.durable("test.stream.queue2").stream().build();
}

2.4.5版将adviceChain属性添加到StreamListenerContainer(及其工厂)。还提供了一个新的工厂bean来创建一个无状态重试拦截器,该拦截器带有一个可选的StreamMessageRecoverer,用于在使用原始流消息时使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {StreamRetryOperationsInterceptorFactoryBean rfb =new StreamRetryOperationsInterceptorFactoryBean();rfb.setRetryOperations(retryTemplate);rfb.setStreamMessageRecoverer((msg, context, throwable) -> {...});return rfb;
}

四、Super Streams

超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数x-Super-Stream:true的交换来实现。

1、调配

为了方便起见,可以通过定义类型为SuperStream的单个bean来提供超级流。

@Bean
SuperStream superStream() {return new SuperStream("my.super.stream", 3);
}

RabbitAdmin检测到这个bean,并将声明交换(my.super.stream)和3个队列(分区)-my.super-stream-n,其中n是0,1,2,绑定的路由密钥等于n。

如果您还希望通过AMQP向exchange 发布,您可以提供自定义路由密钥:

@Bean
SuperStream superStream() {return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i).mapToObj(j -> "rk-" + j).collect(Collectors.toList()));
}

key 的数量必须等于分区的数量。

2、向超级流生产消息

你必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");template.setSuperStreamRouting(message -> {// some logic to return a String for the client's hashing algorithm});return template;
}

你也可以通过AMQP发布,使用 RabbitTemplate

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/821042.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java-spring 图灵 04

在Spring框架中&#xff0c;可以使用org.springframework.core.io.support.ResourcePatternResolver接口的resolveBasePackage方法来将指定的基础包解析为用于包搜索路径的模式规范。 例如&#xff0c;如果基础包是com.example.app&#xff0c;则可以使用resolveBasePackage方法…

微信小程序-绘制图片并分享下载(painter)

1、引入painter插件 painter官网地址 1.1 可通过官网的方法引入painter插件&#xff0c; 官方插件下载地址 1.2 可下载本文附带的插件包直接引入 1.2.1 复制下载下来的文件中的painter文件夹&#xff0c;将其放在components目录下 1.2.2 页面中引入并使用 .json {"…

Mac版2024 CleanMyMac X 4.15.2 核心功能详解 cleanmymac这个软件怎么样?cleanmymac到底好不好用?

近些年伴随着苹果生态的蓬勃发展&#xff0c;越来越多的用户开始尝试接触Mac电脑。然而很多人上手Mac后会发现&#xff0c;它的使用逻辑与Windows存在很多不同&#xff0c;而且随着使用时间的增加&#xff0c;一些奇奇怪怪的文件也会占据有限的磁盘空间&#xff0c;进而影响使用…

如何在 VM 虚拟机中安装 OpenEuler 操作系统保姆级教程(附链接)

一、VMware Workstation 虚拟机 若没有安装虚拟机的可以参考下篇文章进行安装&#xff1a; 博客链接https://eclecticism.blog.csdn.net/article/details/135713915 二、OpenEuler 镜像 点击链接前往官网 官网 选择第一个即可 三、安装 OpenEuler 打开虚拟机安装 Ctrl …

家居网购项目(手写分页)

文章目录 1.后台管理—分页显示1.程序框架图2.编写数据模型Page.java 3.编写dao层1.修改FurnDao增加方法 2.修改FurnDaoImpl增加方法 3.单元测试FurnDaoTest 4.编写service层1.修改FurnService增加方法 2.修改FurnServiceImpl增加方法3.单元测试FurnServiceTest 5.编写DataUtil…

计算机系列之操作系统的系统

2、大话操作系统的启动 当按下开机键时&#xff0c;BIOS 就会开始执行 ​ BIOS 就是放在主板上 ROM 里面的一段程序。 ​ ROM Read Only Memory&#xff08;只能读取的内存&#xff09; ​ 所以 BIOS 在出厂的时候就可以直接写死在 ROM 里面。 ​ 每次开机的时候&#xff…

windows下已经创建好了虚拟环境,但是切换不了的解决方法

用得多Ubuntu&#xff0c;今天用Windows重新更新anaconda出问题&#xff0c;重新安装之后&#xff0c;打开pycharm发现打开终端之后&#xff0c;刚开始是ps的状态&#xff0c;后面试了网上改cmd的方法&#xff0c;终端变成c盘开头了 切换到虚拟环境如下&#xff1a;目前的shell…

ROS 2边学边练(26)-- 监测参数变化(C++)

前言 通常&#xff0c;一个节点需要对其自身参数或另一个节点的参数的更改做出响应。ParameterEventHandler类使监听参数更改变得容易&#xff0c;这样代码就可以对它们做出响应。 动动手 创建一个包 进入工作空间根路径的src下&#xff08;ros2_ws/src&#xff09;&#xff…

【Python基础】—— scipy.spatial.KDTree、matplotlib.pyplot、imageio

scipy.spatial参考博客&#xff1a;Python点云处理——建立KDtree 1 KDtree算法原理 KDtree构建出了一种类似于二叉树的树形数据存储结构&#xff0c;每一层都对应原始数据中相应的维度&#xff0c;以K层为一个循环&#xff0c;因此被称为KDtree。 每一层的左右子树的划分依据…

视频自定义字幕,中英文,彩色的,你也可以,不会不知道吧

前言 关于【SSD系列】&#xff1a; 前端一些有意思的内容&#xff0c;旨在3-10分钟里&#xff0c;有所获&#xff0c;又不为所累。 字幕&#xff0c;大家见过吧&#xff0c;其实你也可以&#xff0c;真的可以&#xff0c;真的真的可以。不难&#xff0c;不难&#xff0c;真的…

如何评估一个RAG(检索增强生成)系统

本文首发自博客文章 如何评估一个RAG&#xff08;检索增强生成&#xff09;系统 RAG 概念最初来源于 2020 年 Facebook 的一篇论文&#xff0c;这是 Facebook 博客对论文内容的进一步解释 &#x1f449;《检索增强生成&#xff1a;简化智能自然语言处理模型的创建》。大家都知…

【C++对于C语言的扩充】函数重载、引用以及内联函数

文章目录 &#x1f680;前言&#x1f680;函数重载注意&#xff1a;✈️为什么C可以实现函数重载&#xff0c;而C语言却不行呢&#xff1f; &#x1f680;引用✈️引用的特性✈️C中为什么要引入引用✈️引用与指针的区别 &#x1f680;内联函数✈️内联函数特性 &#x1f680;…

GitHub提交PR

本教程只做开源代码库Github工程提交pr的教程&#xff0c;不做其他的深入的讲解 Github和Gitlab的操作类似&#xff0c;只不过Github叫PR&#xff0c;GitLab叫MR&#xff0c;基本上做法是一致的 以开源项目QuickChat为例 https://github.com/Binx98/QuickChat https://github…

C++项目 -- 负载均衡OJ(一)comm

C项目 – 负载均衡OJ&#xff08;一&#xff09;comm 文章目录 C项目 -- 负载均衡OJ&#xff08;一&#xff09;comm一、项目宏观结构1.项目功能2.项目结构 二、comm公共模块1.util.hpp2.log.hpp 一、项目宏观结构 1.项目功能 本项目的功能为一个在线的OJ&#xff0c;实现类似…

研发岗-统信UOS系统配置npm git等前端常用配置

第一步 获取root权限 配置环境等都需要用到root权限&#xff0c;所以我们先获取到root权限&#xff0c;方便下面的操作 下载软件 在UOS应用商店下载的所需应用 版本都比较低 安装node 官网下载了【arm64】的包&#xff0c;解压到指定文件夹&#xff0c;设置链接&#xff0…

苹果电脑启动磁盘是什么意思 苹果电脑磁盘清理软件 mac找不到启动磁盘 启动磁盘没有足够的空间来进行分区

当你一早打开苹果电脑&#xff0c;结果系统突然提示&#xff1a; “启动磁盘已满&#xff0c;需要删除部分文件”。你会怎么办&#xff1f;如果你认为单纯靠清理废纸篓或者删除大型文件就能释放你的启动磁盘上的空间&#xff0c;那就大错特错了。其实苹果启动磁盘的清理技巧有很…

STM32之HAL开发——CubeMX配置串行Flash文件系统

配置流程 在开始配置FATFS前&#xff0c;需要提前配置好RCC的时钟&#xff0c;以及时钟的频率&#xff0c;另外还要配置好Debug选项&#xff08;选择串行&#xff09; 选项介绍 文件系统适用于SD卡&#xff0c;Disk磁盘等&#xff0c;需要我们将对应的驱动打开才可以使用。 …

【vue】Pinia-2 安装Pinia,使用store

1. 安装Pinia 在项目路径下执行npm install pinia 在package.json中查看 2. 使用store 在main.js中添加 import { createPinia } from pinia const pinia createPinia()修改createApp方法 最后示例如下&#xff08;三处修改&#xff09; import { createApp } from vue //…

SimpleImputer缺失数据处理报错解决方案

作者Toby&#xff0c;来源公众号&#xff1a;Python风控建模&#xff0c;SimpleImputer缺失数据处理报错解决方案 今天有学员反馈缺失值代码报错&#xff0c;由于sklearn缺失值处理的包升级&#xff0c;下面把官网最新的缺失值处理代码奉上。 参考https://scikit-learn.org/st…

请把「睡一个好觉」,当成一天里最重要的事来管理

我发现许多人都有这么一种情况&#xff1a;明明知道睡眠很重要&#xff0c;但却总是有意无意地熬夜。 比如&#xff1a; 给自己排了太多的学习和工作量&#xff0c;一不小心就到了凌晨一两点&#xff1b; 总觉得过去的一天什么都没干&#xff0c;宁愿在网上闲逛&#xff0c;也不…