RocketMQ的简单使用

这里需要创建2.x版本的springboot项目 

导入依赖

    <dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.6</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies>

定义配置文件

server:port: 3000rocketmq:name-server: xxx.xxx.xxx.xxx:9876  # NameServer 地址producer:group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义

生产者定义

这里的生产者有两个,一个是普通的,一个是延时。

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.messaging.support.MessageBuilder;@Component
@Slf4j
public class GeneralMessageDemoProduce {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent) {SendResult sendResult;try{StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}

延时的

@Component
@Slf4j
public class ScheduleProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent ) {SendResult sendResult;try {StringBuilder destinationBuilder = StrUtil.builder().append(topic);if(StrUtil.isNotBlank(tag)){destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS,keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();// 设置消息的延时级别sendResult=rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L,6);log.info("[延时消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);}catch(Throwable ex){log.error("[延时消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}

消费者定义

这里也是两个消费者,普通的和延时的不在同一个主题的内

@Slf4j
@Component
@RocketMQMessageListener(topic = "rocketmq-yhy_topic",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ消息,消息体:{}", JSON.toJSONString(message));}
}
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = "Delay",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume_Delay implements RocketMQListener<MessageEvent> {@Overridepublic void onMessage(MessageEvent message) {log.info("接到RocketMQ的延时消息,消息体:{}", JSON.toJSONString(message));}
}

发送消息

这里直接在启动类发送。

@SpringBootApplication
@RestController
public class RocketMQDemoApplication {@Autowiredprivate GeneralMessageDemoProduce generalMessageDemoProduce;@Autowiredprivate ScheduleProducer scheduleProducer;@PostMapping("/test/send/general-message")public String sendGeneralMessage() {String keys= UUID.randomUUID().toString();MessageEvent messageEvent=new MessageEvent("消息具体内容——yhy",keys);SendResult sendResult=generalMessageDemoProduce.sendMessage("rocketmq-yhy_topic","general",keys,messageEvent);SendResult sendResult2=scheduleProducer.sendMessage("Delay","general",keys,messageEvent);System.out.println(sendResult.getSendStatus().name() );System.out.println(sendResult2.getSendStatus().name());return sendResult.getSendStatus().name();}public static void main(String[] args) {SpringApplication.run(RocketMQDemoApplication.class, args);}
}

postman触发

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/797231.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SSM+Jsp+Mysql的人事管理系统

开发语言&#xff1a;Java框架&#xff1a;ssm技术&#xff1a;JSPJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包…

CV最新论文|4月5日 arXiv更新论文合集

以下内容由马拉AI整理&#xff0c;今天为大家带来4月5日 arXiv 计算机视觉和模式识别相关论文&#xff1a; 1、Know Your Neighbors: Improving Single-View Reconstruction via Spatial Vision-Language Reasoning 了解你的邻居&#xff1a;通过空间视觉-语言推理改进单视图…

深入理解JVM的内存结构及GC机制(2)

虚拟机栈占用的是操作系统内存&#xff0c;每个线程对应一个虚拟机栈&#xff0c;它是线程私有的&#xff0c;生命周期和线程一样&#xff0c;每个方法被执行时产生一个栈帧&#xff08;Statck Frame&#xff09;&#xff0c;栈帧用于存储局部变量表、动态链接、操作数和方法出…

大语言模型落地的关键技术:RAG

1、什么是RAG&#xff1f; RAG 是检索增强生成&#xff08;Retrieval-Augmented Generation&#xff09;的简称&#xff0c;是当前最火热的大语言模型应用落地的关键技术&#xff0c;主要用于提高语言模型的效果和准确性。它结合了两种主要的NLP方法&#xff1a;检索&#xff…

Anaconda 安装pytorch 问题

问题 clobbererror: this transaction has incompatible packages due to a shared path. packages: nvidia/win-64::cuda-cupti-11.8.87-0, nvidia/win-64::cuda-nvtx-11.8.86-0 path: ‘metadata_conda_debug.yaml’ 打开 cmd 输入 nvida-smi &#xff0c;可以看见本机的NI…

post请求搜索功能爬虫

<!--爬虫仅支持1.8版本的jdk--> <!-- 爬虫需要的依赖--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency>…

2023年下半年网络工程师上午真题及答案解析

1.当计算机突然断电时&#xff0c;( )中存储的信息会丢失。 A.光盘 B.ROM C.RAM D.硬盘 2.进程的状态有就绪态、运行态、阻塞态&#xff0c;其中( )的变化是不可能直接发生的。 A.就绪态到运行态 B.阻塞态到就绪态 C.运行态到阻塞态 D.阻塞态到运行态 3.分…

老板们注意了,AI可能在悄悄威胁你的工作

前天,科技新闻大佬The Register发了一篇文章,说的是AI在科研领域的管理角色越来越大,可能会让管理岗位变得过时,听起来是不是有点儿疯狂? ESMT Berlin的研究小伙伴们发现,AI能够以更大的规模和效率来管理研究项目,比如审查科学文献和预测创新化合物等等,而不是取代人类…

docker用来解决什么问题

2024年4月6日&#xff0c;周六下午 Docker用于解决软件开发、部署和运行过程中的一系列问题&#xff0c;包括但不限于以下几点&#xff1a; 环境一致性问题&#xff1a;在软件开发和部署过程中&#xff0c;由于不同环境的配置差异&#xff0c;经常会出现“在我的电脑上可以运行…

漂亮国的无人餐厅的机器人骚操作

导语 大家好&#xff0c;我是智能仓储物流技术研习社的社长&#xff0c;你的老朋友&#xff0c;老K。行业群 新书《智能物流系统构成与技术实践》 知名企业 读者福利&#xff1a; &#x1f449;抄底-仓储机器人-即买即用-免调试 智能制造-话题精读 1、西门子、ABB、汇川&#x…

线性结构与非线性结构

线性结构与非线性结构 数据结构包括:线性结构和非线性结构。 线性结构 1)线性结构作为最常用的数据结构&#xff0c;其特点是数据元素之间存在一对一的线性关系。 2)线性结构有两种不同的存储结构&#xff0c;即顺序存储结构和链式存储结构。 顺序存储的线性表称为顺序表&a…

react api:createContext

使用 createContext 创建组件能够提供与读取的 上下文&#xff08;context&#xff09;。 ** const SomeContext createContext(defaultValue) 在任意组件外调用 createContext 创建一个上下文。 import { createContext } from ‘react’; const ThemeContext createConte…

P2036 [COCI2008-2009 #2] PERKET(DFS)

# [COCI2008-2009 #2] PERKET ## 题目描述 Perket 是一种流行的美食。为了做好 Perket&#xff0c;厨师必须谨慎选择食材&#xff0c;以在保持传统风味的同时尽可能获得最全面的味道。你有 n 种可支配的配料。对于每一种配料&#xff0c;我们知道它们各自的酸度 s 和苦度 b。…

深入探讨string类的奥秘

标题&#xff1a;深入探索C String类的奥秘 一、String类简介 在C编程中&#xff0c;字符串处理是非常常见的一种操作。C标准库为我们提供了一种名为String的类&#xff0c;用于处理字符串。String类在头文件中定义&#xff0c;它提供了许多成员函数和友元函数&#xff0c;使…

大模型日报2024-04-07

大模型日报 2024-04-07 大模型资讯 EURUS&#xff1a;针对推理优化的大型语言模型套件&#xff0c;取得开源模型多项基准测试的最先进成果 摘要: EURUS是一套针对推理能力进行优化的大型语言模型&#xff08;LLMs&#xff09;&#xff0c;在多项多样化的基准测试中取得了最先进…

算法练习----力扣每日一题------7

原题链接 1483. 树节点的第 K 个祖先 - 力扣&#xff08;LeetCode&#xff09; 题目解析 要求编写一个TreeAncestor类&#xff0c;需要为其写两个函数。该类是一个无规律的多叉树&#xff0c;多叉树的父节点一定是0号节点 1. TreeAncestor(int n, vector<int>&…

Android Hal service compatibility matrix

hal service 1&#xff09;增加声明xml文件 <manifest version"1.0" type"framework"><hal format"aidl"><name>ltd.faw.native_log_service</name><interface><name>INativeLogServiceInterface</name…

学习vue3第十四节 Teleport 内置组件介绍

<Teleport></Teleport> 作用目的&#xff1a; 用于将指定的组件或者元素传送到指定的位置&#xff1b; 通常是自定义的全局通用弹窗&#xff0c;绑定到 body 上&#xff0c;而不是在当前元素上面&#xff1b; 使用方法&#xff1a; 接收两个参数 to: 要将目标传…

MySQL数据库——4、数据类型

在 MySQL 数据库中&#xff0c;数据类型用于定义表中列&#xff08;字段&#xff09;可以存储的数据类型。MySQL 提供了丰富的数据类型&#xff0c;可以满足不同类型数据的存储需求。 MySQL 中一些常用的数据类型及其用途&#xff1a; 整数类型&#xff08;Integer Types&…

图片切换案例

<!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta name"viewport" content"widthdevice-width, initial-scale1.0"> <title>图片切换案例</title> </head> …