RocketMQ 09 SpringBoot 整合

RocketMQ 09 SpringBoot 整合

目前还没有官方的starter

pom.xml

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>4.6.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.6.1</version></dependency>

Producer配置

Config配置类

用于在系统启动时初始化producer参数并启动

package com.mashibing.rmq.controller;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MQConfig {public static final Logger LOGGER = LoggerFactory.getLogger(MQConfig.class);@Value("${rocketmq.producer.groupName}")private String groupName;@Value("${rocketmq.producer.namesrvAddr}")private String namesrvAddr;@Beanpublic DefaultMQProducer getRocketMQProducer() {DefaultMQProducer producer;producer = new DefaultMQProducer(this.groupName);producer.setNamesrvAddr(this.namesrvAddr);try {producer.start();System.out.println("start....");LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]", this.groupName,this.namesrvAddr));} catch (MQClientException e) {LOGGER.error(String.format("producer is error {}", e.getMessage(), e));}return producer;}
}

Service消息发送类

package com.mashibing.rmq.service;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQService {@AutowiredDefaultMQProducer producer;public Object sendMsg(String string) {for (int i = 0; i < 1; i++) {Message message = new Message("tpk02", "xx".getBytes());try {return producer.send(message);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} }return null;}
}

配置文件

spring.application.name=mq01
rocketmq.producer.namesrvAddr=192.168.150.131:9876
rocketmq.producer.groupName=${spring.application.name}
server.port=8081

Consumer配置

Config配置类

package com.mashibing.rmq.controller;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MQConfig {public static final Logger logger = LoggerFactory.getLogger(MQConfig.class);@Value("${rocketmq.consumer.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.consumer.groupName}")private String groupName;@Value("${rocketmq.consumer.topics}")private String topics;@Beanpublic DefaultMQPushConsumer getRocketMQConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(namesrvAddr);consumer.subscribe(topics, "*");consumer.registerMessageListener(new MyMessageListener() );consumer.start();return consumer;}
}

消息处理类

package com.mashibing.rmq.controller;import java.util.List;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class MyMessageListener implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("来啦!!22!");for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

配置文件

spring.application.name=mq02
rocketmq.producer.namesrvAddr=192.168.150.131:9876
rocketmq.producer.groupName=${spring.application.name}rocketmq.consumer.topics=tpk02

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/819984.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频怎么用二维码来分享?手机扫码看视频更方便

怎么把制作的视频快速以二维码的形式分享给其他人呢&#xff1f;视频是很多内容重要的一种展示方式&#xff0c;为了能够让其他人更加方便的查看视频内容&#xff0c;现在很多人会选择视频转二维码的方式来提供预览。将视频存储到云端然后通过扫码来调取&#xff0c;不仅减少了…

DHCP小实验

实验要求&#xff1a; 看拓扑有两个网段则我们首先需要对200.1.1.0/26进行子网划分&#xff0c;划分为两个子网&#xff0c;为200.1.1.0/27和200.1.1.32/27 我门就可以一边一个网段了&#xff0c;左边为200.1.1.0/27&#xff0c;右边为200.1.1.32/27 1、配置PC1&#xff0c;2…

2024蓝桥杯——宝石问题

先展示题目 声明 以下代码仅是我的个人看法&#xff0c;在自己考试过程中的优化版&#xff0c;本人考试就踩了很多坑&#xff0c;我会—一列举出来。代码可能很多&#xff0c;但是总体时间复杂度不高只有0(N) 函数里面的动态数组我没有写开辟判断和free&#xff0c;这里我忽略…

MySQL——基础

SQL 全称 Structured Query Language&#xff0c;结构化查询语言。操作关系型数据库的编程语言&#xff0c;定义了一套操作关系型数据库统一标准 。 SQL 通用语法 SQL语句可以单行或多行书写&#xff0c;以分号结尾。SQL语句可以使用空格/缩进来增强语句的可读性。MySQL数据库…

一文速览铁威马TOS 6全新“文件管理”

TOS 6 Beta已经上线一段时间了&#xff0c;各位铁粉用着怎么样呢&#xff1f;今天就和大家分享&#xff0c;TOS 6全新文件管理。 为了向用户提供更流畅、更便捷的文件管理体验&#xff0c;铁威马的研发团队积极借鉴了Windows OS和Mac OS在文件管理方面的优点&#xff0c;投入巨…

【echarts】echarts入门教程,学会如何编写echarts代码

echarts模板 使用&#xff01;为html来创建一个模板。 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><titl…

Open3D (C++) 点云投影至主成分空间

目录 一、算法原理二、代码实现三、结果展示四、相关连接Open3D (C++) 点云投影至主成分空间由CSDN点云侠原创,爬虫自重。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫。 一、算法原理 p r o j

原始部落版本潮玩宇宙小程序定制大逃杀游戏APP开发H5游戏

原始部落版本潮玩宇宙小程序定制大逃杀游戏APP开发H5游戏 潮玩宇宙小程序定制大逃杀游戏APP开发H5游戏 潮玩宇宙大逃杀小游戏模块成品源码&#xff0c;可嵌入任何平台系统&#xff0c;增加用户粘性&#xff0c;消除泡沫&#xff0c;短视频直播引流。 玩家选择一间房间躲避杀手…

第二期书生浦语大模型训练营第五次作业

部署LMDeploy并对话 配置LMDeploy运行环境 安装好环境&#xff0c;并成功激活 使用transformer运行大模型 使用LMDeploy模型量化(lite) KV8量化和W4A16量化。KV8量化是指将逐 Token&#xff08;Decoding&#xff09;生成过程中的上下文 K 和 V 中间结果进行 INT8 量化&#…

【安装部署】Apache SeaTunnel 和 Web快速安装详解

版本说明 由于作者目前接触当前最新版本为2.3.4 但是官方提供的web版本未1.0.0&#xff0c;不兼容2.3.4&#xff0c;因此这里仍然使用2.3.3版本。 可以自定义兼容处理&#xff0c;官方提供了文档&#xff1a;https://mp.weixin.qq.com/s/Al1VmBoOKu2P02sBOTB6DQ 因为大部分用…

引领智能互联时代,紫光展锐赋能百业创新发展

随着5G技术的快速发展&#xff0c;各行各业对通信技术的需求也在不断升级。紫光展锐持续深耕5G垂直行业&#xff0c;不断推进5G标准演进&#xff0c;从R15到R16&#xff0c;再到R17&#xff0c;展锐携手生态合作伙伴&#xff0c;不断推出创新性解决方案&#xff0c;在5G RedCap…

【Unity】RPG小游戏创建游戏中的交互

RPG小游戏创建游戏中的交互 创建可交互的物体的公共的父类&#xff08;Interactable&#xff09;InteractableObject 类NPCObject 类PickableObject 类 创建可交互的物体的公共的父类&#xff08;Interactable&#xff09; InteractableObject 类 using System.Collections; u…

【攻防世界】warmup

[HCTF 2018]WarmUp全网最详细解释_[hctf 2018]warmup的解-CSDN博客 php://filter 读取源码&#xff08;文件&#xff09; php://input 执行php代码&#xff0c;需要post请求提交数据 Content-Type为image/jpeg text. 绕过后缀的有文件格式有php,php3,php4,php5,pht…

【Unity 实用工具篇】 | UIEffect 实现一系列UGUI特效,灰度、负片、像素化特效

前言 【Unity 实用工具篇】 | UIEffect 实现一系列UGUI特效&#xff0c;灰度、负片、像素化特效一、UGUI特效插件&#xff1a;UIEffect1.1 介绍1.2 效果展示1.3 使用说明及下载 二、组件属性面板三、代码操作组件四、组件常用方法示例4.1 使用灰度特效做头像(关卡)选择 总结 前…

03-JAVA设计模式-迭代器模式

迭代器模式 什么是迭代器模式 迭代器模式&#xff08;demo1.Iterator Pattern&#xff09;是Java中一种常用的设计模式&#xff0c;它提供了一种顺序访问一个聚合对象中各个元素&#xff0c;而又不需要暴露该对象的内部表示的方法。迭代器模式将遍历逻辑从聚合对象中分离出来…

IP地址归属地与旅游业应用

在当今数字化时代&#xff0c;IP地址归属地已成为许多行业的重要工具&#xff0c;其中包括旅游业。IP地址归属地是指将特定IP地址与其地理位置相关联的过程。在旅游业中&#xff0c;利用IP地址归属地可以提供多种应用&#xff0c;从客户定位到个性化推广&#xff0c;以及旅游数…

树--排序二叉树的删除

一、二叉排序树的删除 二叉排序树的删除情况比较复杂&#xff0c;有以下三种情况需要考虑。 删除叶子节点 &#xff08;比如&#xff1a;2,5,9,10&#xff09;删除只有一个子树的节点&#xff08;比如&#xff1a;1&#xff09;删除有两个子树的节点 &#xff08;比如&#x…

YAPI第一次创建项目

黑马程序员JavaWeb开发教程 文章目录 1、添加项目2、添加分类3、添加接口 1、添加项目 2、添加分类 3、添加接口

数据结构:线性表————单链表专题

&#x1f308;个人主页&#xff1a;小新_- &#x1f388;个人座右铭&#xff1a;“成功者不是从不失败的人&#xff0c;而是从不放弃的人&#xff01;”&#x1f388; &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd; &#x1f3c6;所属专栏&#xff1…

STM32的GPIO端口的八种模式解析

目录 STM32的GPIO端口的八种模式解析 一、上拉输入模式 二、下拉输入模式 三、浮空输入模式 四、模拟输入模式 五、推挽输出模式 六、开漏输出模式 七、复用推挽输出模式 八、复用开漏输出模式 STM32的GPIO端口的八种模式解析 在学习STM32的过程中&#xff0c;GPIO端口…