RocketMq多环境自动隔离

一、多环境隔离场景

        当多个环境使用同一套rocketmq的服务的时候,如果不对环境进行隔离,将会导致消息被错误的环境消费,因此可以采用两种方式进行隔离。

方式1
        通过$Value注入,但是这个需要每个环境都维护自己的topic等信息,比较麻烦。

方式2:

        通过环境配置进行自动隔离,比如dev、test、pre、prod等不同环境只需要简单配置一个选项,所有的消息将被自动隔离,这样各个环境共用一套rocketmq服务即可,不需要分环境搭建,无论开发、测试都非常简便,整个公司可以共用一套。

二、多环境自动隔离的原理

        多环境隔离,利用BeanPostProcessor的postProcessBeforeInitialization在监听器实例初始前把对应topic、consumerGroup进行修改,发送消息的时候,也根据环境进行区分要发到那个环境的topic和consumerGroup上去。

三、代码示例

  3.1、增加配置文件

# 自定义属性
rocketmq:environment:# 隔离环境名称,拼接到topic后,xxx_topic_pre,默认空字符串;# 也可根据spring.profiles.active的值# name: pre# 启动隔离,会自动在topic上拼接激活的配置文件,达到自动隔离的效果# 默认为true,配置类:RocketMqEnvirIsolationConfigisolation: true

  3.2、编写配置类

   原理:

      Spring生命周期中BeanPostProcessor在类初始化前,执行postProcessBeforeInitialization中的内容。使用@Component注解,用来把该类扫描到spring容器中进行统一管理。这要就可以在类初始化前,把监听器类的topic/group/tag等修改成自己想要的,然后实例化的时候用的就是改后值。

import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;/*** RocketMQ多环境隔离配置* 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉*/
@Configuration
public class RocketMqEnvirIsolationConfig implements BeanPostProcessor {@Value("${rocket.environment.isolation:true}")private boolean enabledIsolation;//Springboot的使用的环境dev、test、pre、prod@Value("${spring.prifiles.active}")private String environmentName;@Overridepublic Object postProcessBeforeInitialization(@NonNull Object bean,@NonNull String beanName) throws BeansException {// DefaultRocketMQListenerContainer是监听器实现类if (bean instanceof DefaultRocketMQListenerContainer) {DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;// 开启消息隔离情况下获取隔离配置,此处隔离topic,根据自己的需求隔离group或者tagif (enabledIsolation && StringUtils.hasText(environmentName)) {container.setTopic(String.join("_", container.getTopic(), environmentName));container.setConsumerGroup(String.join("_", container.getConsumerGroup(), environmentName));}return container;}return bean;}
}

 3.3、生产者发送消息

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.constant.RocketMqDelayLevel;
import com.codecoord.rocketmq.domain.RocketMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {@Resource(name = "rocketMQTemplate")private RocketMQTemplate rocketMqTemplate;@Value("${spring.profiles.active}")private String environmentName;@Value("${rocketmq.isolation:true}")private boolean enabledIsolation;@GetMapping("/sendMsg")public String sendMessage() {String destination = "user_audit_queue";//是否租户隔离 if(enabledIsolation){destination = String.join("_", destination , environmentName);}RocketMqMessage message = new RocketMqMessage();message.setId(System.currentTimeMillis());message.setMessage("这是一个测试消息!");message.setCurrentDate(LocalDate.now()); // Java时间字段需要单独处理,否则会序列化失败message.setCurrentDateTime(LocalDateTime.now());message.setVersion("1.0");//1、发送同步消息,消息成功发送到Broker时才返回,message可以入参批量消息// 通过SendResult来处理发送结果// SendResult sendResult = rocketMqTemplate.syncSend(destination, message);/// 发送时指定业务key/*Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)// 设置keys.setHeader(RocketMQHeaders.KEYS, message.getId()).build();SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage);*///2、 发送延迟消息Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();SendResult sendResult = rocketMqTemplate.syncSend(destination, buildMessage, 3000, RocketMqDelayLevel.FIVE_SECOND);//3、发送同步有序消息,需要指定hashKey,可以用业务唯一键// rocketMqTemplate.syncSendOrderly(destination, message, message.getId().toString());//4、发送异步消息,消息发送后及时返回,然后通过回调方法通知// rocketMqTemplate.asyncSend(destination, message, new SendCallback() {//     @Override//     public void onSuccess(SendResult sendResult) {//         log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));//     }////     @Override//     public void onException(Throwable e) {//         log.error("消息发送失败【{}】", e.getMessage());//     }// });//5、 发送异步有序消息,需要指定hashKey,可以用业务唯一键// rocketMqTemplate.asyncSendOrderly(destination, message, message.getId().toString(), new SendCallback() {//     @Override//     public void onSuccess(SendResult sendResult) {//         log.info("消息发送成功【{}】", JSONObject.toJSONString(sendResult));//     }////     @Override//     public void onException(Throwable e) {//         log.error("消息发送失败【{}】", e.getMessage());//     }// });//6、 发送单向消息// rocketMqTemplate.sendOneWay(destination, message);//7、 发送单向有序消息,通过MessageBuilder构建// Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message).build();// rocketMqTemplate.sendOneWayOrderly(destination, buildMessage, message.getId().toString());//8、发送和接收回调消息,需要实现 RocketMQReplyListener 监听器类才可以,否则将会超时错误// rocketMqTemplate.sendAndReceive(destination, message, new RocketMQLocalRequestCallback<String>() {//     @Override//     public void onSuccess(String message) {//         log.info("消息发送成功,消息类型【{}】", message);//     }////     @Override//     public void onException(Throwable e) {//         log.error("消息发送失败", e);//     }// });//9、 调用抽象类方法发送,最终也是syncSend// rocketMqTemplate.convertAndSend(destination, "convertAndSend");//10、转换消息和发送,底层使用的是syncSend(destination, message),将会被RocketEntityMessageListener消费// Message<RocketMqMessage> buildMessage = MessageBuilder.withPayload(message)//         // 设置请求头//         .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)//         .build();// 将会被RocketEntityMessageListener03消费// Message<Object> buildMessage = MessageBuilder.withPayload(new Object()).build();// rocketMqTemplate.send(destination, buildMessage);//11、 发送批量消息,批量消息最终转为单挑进行发送// List<Message<String>> msgList = new ArrayList<>();// for (int i = 0; i < 10; i++) {//     msgList.add(MessageBuilder.withPayload("消息:" + i).build());// }// rocketMqTemplate.syncSend(destination, msgList);return message;}/*** 直接将对象进行传输,也可以自己进行json转化后传输*/@RequestMapping("/messageExt/message")public SendResult convertAndSend() {String destination = "user_audit_topic";JSONObject jsonObject = new JSONObject();jsonObject.put("type", "messageExt");return rocketMqTemplate.syncSend(destination, jsonObject);}
}

3.4、消费者监听消息

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.jeecg.common.base.BaseMap;@Component
@RocketMQMessageListener(topic = "user_audit_queue",consumerGroup = "user_audit_queue")
@Slf4j
public class UserAuditQueueListener implements RocketMQListener<BaseMap> {@Overridepublic void onMessage(BaseMap baseMap) {//消费者监听到消息进行消费....}}

四、消息中时间类型的支持

RocketMQ内置使用的转换器是RocketMQMessageConverter中MessageConverterConfiguration方法,转换JSON时使用的是MappingJackson2MessageConverter,但是其不支持Java的时间类型,比如LocalDate、Date等,当消息实体中存在上面的时间类型字段时将会报以下错误:

java.lang.RuntimeException: cannot convert message to class com.codecoord.rocketmq.domain.RocketMqMessageat org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.doConvertMessage(DefaultRocketMQListenerContainer.java:486) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.handleMessage(DefaultRocketMQListenerContainer.java:399) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:71) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:359) ~[rocketmq-spring-boot-2.2.1.jar:2.2.1]at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:392) [rocketmq-client-4.9.1.jar:4.9.1]at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_231]at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_231]at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_231]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_231]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_231]at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Expected array or string.

 

     所以需要自定义消息转换器,将MappingJackson2MessageConverter进行替换,然后添加支持时间模块,代码如下:

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;import java.util.List;/*** 序列化器处理*/
@Configuration
public class RocketMqConfig {/*** 解决RocketMQ Jackson不支持Java时间类型配置*/@Bean@Primarypublic RocketMQMessageConverter createRocketMQMessageConverter() {RocketMQMessageConverter converter = new RocketMQMessageConverter();CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();for (MessageConverter messageConverter : messageConverterList) {if (messageConverter instanceof MappingJackson2MessageConverter) {MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();objectMapper.registerModules(new JavaTimeModule());}}return converter;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/854022.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LabVIEW与C#的区别及重新开发自动测试程序的可行性分析

LabVIEW和C#是两种广泛使用的编程语言&#xff0c;各自有不同的应用领域和特点。本文将详细比较LabVIEW与C#在自动测试程序开发中的区别&#xff0c;并分析将已完成的LabVIEW自动测试程序重新用C#开发的合理性。本文帮助评估这种转换的必要性和潜在影响。 LabVIEW与C#的区别 开…

C++编程:vector容器的简单模拟实现

前言&#xff1a; 在C标准库&#xff08;STL&#xff09;中&#xff0c;vector容器是最常见使用的动态数组。它结合了链表与数组的优点&#xff0c;提供了灵活的大小调整与高效的随机访问。本文将简单的对vector容器进行介绍并且对vector容器简单的模拟实现。 一、vector的文…

Mybatis的面试题

1. 什么是一级缓存什么是二级缓存&#xff1f; MyBatis是一款优秀的持久层框架&#xff0c;它提供了一级缓存和二级缓存来提高数据库访问性能。 一级缓存 一级缓存是指在同一个SqlSession中进行的缓存。当MyBatis执行查询时&#xff0c;查询结果会被缓存在SqlSession的内存中…

uniapp实现路由拦截——实战案例(二)

uniapp如何实现登录路由拦截&#xff1f; 今天再次介绍一下 uni-simple-router 插件&#xff0c;记得最初使用时&#xff0c;是在三年以前了&#xff0c;这里简单介绍通过自动读取 pages.json 作为路由表的方式&#xff0c;欢迎指教~ 文章目录 uniapp如何实现登录路由拦截&…

LangChain入门学习笔记(二)——LangChain表达式语言(LCEL)

基于LangChain框架编写大模型应用的过程就像垒积木&#xff0c;其中的积木就是Prompts&#xff0c;LLMs和各种OutputParser等。如何将这些积木组织起来&#xff0c;除了使用基本Python语法调用对应类的方法&#xff0c;一种更灵活的方法就是使用位于LangChain-Core层中的LCEL&a…

SwiftUI 6.0(Xcode 16)全新 @Entry 和 @Previewable 宏让开发妙趣横生

概览 如火如荼的 WWDC 2024 已进入第五天&#xff0c;苹果开发平台中众多海量新功能都争先恐后的喷薄欲出。 在这里就让我们从中挑两个轻松有趣的新功能展示给小伙伴们吧&#xff1a;它们分别是 全新的 Entry 和 Previewable 宏。 在本篇博文中&#xff0c;您将学到如下内容&a…

dayjs将星期的第一天设置为周一

默认引入的dayjs的语言是英文&#xff0c;一周的开始是星期日&#xff0c;当使用dayjs().startOf(week)的时候&#xff0c;就不是周一而是周日了。 import dayjs from "dayjs" import "dayjs/locale/zh-cn" import updateLocale from "dayjs/plugin/…

【Python】 探索 CatBoost:高效的机器学习分类与回归工具

我们都找到天使了 说好了 心事不能偷藏着 什么都 一起做 幸福得 没话说 把坏脾气变成了好沟通 我们都找到天使了 约好了 负责对方的快乐 阳光下 的山坡 你素描 的以后 怎么抄袭我脑袋 想的 &#x1f3b5; 薛凯琪《找到天使了》 在机器学习领域中&#xff…

【C++ 11 新特性】lambda 表达式详解

文章目录 1. 常见 lambda 面试题&#x1f58a; 1. 常见 lambda 面试题&#x1f58a; &#x1f34e;① 如果⼀个 lambda 表达式作为参数传递给⼀个函数&#xff0c;那这个函数可以使⽤这个 lambda 表达式捕获的变量吗 ? &#x1f427; 函数本身无法直接访问到 lambda表达式捕获…

vue3实现表格的分页以及确认消息弹窗

表格的分页实例展示 效果1:表格按照每行10条数据分页,且编号也会随之分页自增 实现按照页码分页效果 第二页 展示编号根据分页自动增长 固定表格高度 这边设置了滚动条,同时表格高度实现自适应滚动条高度 template部分 表格代码 编号是按照页码条数进行循环并根据索引自增…

力扣191. 位1的个数

Problem: 191. 位1的个数 文章目录 题目描述思路复杂度Code 题目描述 思路 题目规定数值的范围不会超过32位整形数 1.定义统计个数的变量oneCount&#xff1b;由于每次与给定数字求与的变量mask初始化为1 2.for循环从0~32&#xff0c;每一次拿mask与给定数字求与运算&#xff…

【Linux应用】Linux系统的设备管理——Udev

1.udev概述 udev是 Linux2.6内核里的一个功能&#xff0c;它替代了原来的 devfs&#xff0c;成为当前 Linux 默认的设备管理工具&#xff0c;能够根据系统中的硬件设备的状态动态更新设备文件&#xff0c;包括设备文件的创建&#xff0c;删除等。 udev以守护进程的形式运行&am…

YOLOv10的使用总结

目录 YOLOv10介绍 部署和使用示例 微调训练 YOLO模型因其在计算成本和检测性能之间的平衡而在实时目标检测中很受欢迎。前几天YOLOv10也刚刚发布了。我们这篇文章就来看看YOLOv10有哪些改进&#xff0c;如何部署&#xff0c;以及微调。 概述 实时物体检测旨在以较低的延迟准…

JC/T 2653-2022 不发火砂浆检测

不发火砂浆是指当材料与金属等坚硬物发生摩擦、冲击或冲擦等机械作用时&#xff0c;不产生火花或火星的砂浆。 JC/T 2653-2022 不发火砂浆检测项目&#xff1a; 测试项目 测试方法 外观 JC/T 2653 抗压强度 GB/T 17671 抗折强度 GB/T 17671 保水率 JGJ/T 70 凝结时间…

23.1 时间-获取时间、休眠、超时

1. 获取时间 时间是个重要的编程元素&#xff0c;可用于计算间隔、同步服务器以及控制超时。 计算机中的时间分为以下两种形式&#xff1a; 墙钟时间&#xff1a;以12或24小时为周期不断重复&#xff0c;不同的地区和季节会因时区或夏令时而异。单调时间&#xff1a;从一个时…

CSS 实现电影信息卡片

CSS 实现电影信息卡片 效果展示 CSS 知识点 CSS 综合知识运用 页面整体布局 <div class"card"><div class"poster"><img src"./poster.jpg" /></div><div class"details"><img src"./avtar…

这 10 种架构师,不合格!

大家好&#xff0c;我是君哥。 架构师这个岗位是好多程序员努力的方向&#xff0c;尤其是刚毕业的时候&#xff0c;对架构师有一种崇拜感。毕竟从初级到架构要经历好几次级别飞跃。 工作时间久了&#xff0c;发现架构师这个岗位&#xff0c;其实定义非常广泛&#xff0c;根据工…

工具清单 - 项目管理

# 工具清单 Bonobo Git Server在新窗口打开 - Set up your own self hosted git server on IIS for Windows. Manage users and have full control over your repositories with a nice user friendly graphical interface. (Source Code在新窗口打开) MIT C#Fossil在新窗口打…

windows用脚本编译qt的项目

mingw的 cd build ::设置jom环境 set PATHC:\Qt\Qt5.15.2\Tools\mingw810_32\bin;%PATH% set PATHC:\Qt\Qt5.15.2\5.15.2\mingw81_32\bin;%PATH% ::设置Qt环境 amd64_x86 或者 amd64 ::CALL "D:\Program Files (x86)\Microsoft Visual Studio\2017\Enterprise\VC\Auxilia…

Element-ui中Table表格无法显示

Element-ui中Table表格无法显示 在使用过程中发现样式正常显示但是table就是不显示&#xff0c;研究了一段时间后&#xff0c;发现问题是项目结构的问题 当你创建vue和安装el的时候&#xff0c;一定要注意进入到正确的项目文件夹&#xff0c;如果在外面也出现一个package.jso…