RocketMQ之原生方式操作

news/2025/10/24 9:22:27/文章来源:https://www.cnblogs.com/jingzh/p/19162183

目录
  • 1 原生操作
    • 1.1 原生生产消息
    • 1.2 原生消费消息
      • 1.2.1 原生Push和Pull对比
      • 1.2.2 Push和SCS和rocketmq
      • 1.2.3 Pull 模式(手动拉取)
      • 1.2.4 Push 模式
      • 1.2.5 其他问题
        • 1.2.5.1 拉取间隔pullInterval
        • 1.2.5.2 回调线程数ClientCallbackExecutorThreads

1 原生操作

使用原生消息时引入依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version>
</dependency>

1.1 原生生产消息

使用原生 DefaultMQProducer可以提供对 RocketMQ 的完全控制,可以精确配置各种参数(如 sendMsgTimeout、retryTimesWhenSendFailed 等),还可以直接访问所有 RocketMQ 特性

操作示例:
构建生产者配置

@Slf4j
public class MessageProducer{protected transient DefaultMQProducer defaultProducer;public MessageProducer() {DefaultMQProducer defaultProducer = new DefaultMQProducer();//基础配置//设置 NameServer 地址,用于发现 broker 节点defaultProducer.setNamesrvAddr("127.0.0.1:9876");//消息处理配置//配置 DefaultMQProducer 的消费者偏移量持久化间隔defaultProducer.setPersistConsumerOffsetInterval(5000);//设置消息体压缩阈值,超过该大小的消息会被压缩defaultProducer.setCompressMsgBodyOverHowmuch(4096);//设置单条消息的最大大小限制  2MdefaultProducer.setMaxMessageSize(1024*1024*2);//重试机制配置//当消息发送到 broker 失败时是否重试其他 brokerdefaultProducer.setRetryAnotherBrokerWhenNotStoreOK(false);//设置发送失败时的重试次数defaultProducer.setRetryTimesWhenSendFailed(2);//网络和性能配置//设置消息发送超时时间defaultProducer.setSendMsgTimeout(3000);//设置客户端回调执行线程数defaultProducer.setClientCallbackExecutorThreads(Runtime.getRuntime().availableProcessors());//设置客户端 IP 地址defaultProducer.setClientIP(RemotingUtil.getLocalAddress());//心跳和轮询配置//设置与 broker 的心跳间隔defaultProducer.setHeartbeatBrokerInterval(30000);//设置生产者实例名称defaultProducer.setInstanceName("test");//设置轮询 NameServer 的间隔时间defaultProducer.setPollNameServerInterval(30000);this.defaultProducer = defaultProducer;}//启动public void doStart() throws Exception {defaultProducer.start();log.info("mcq  started.");}//停止public void doStop() {defaultProducer.shutdown();log.info("mcq producer[%s] stopped.");}
}    

构建发送消息方法

	// 发送多条消息public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return defaultProducer.send(msgs);}//发送单条  同步public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return defaultProducer.send(msg);}//附带超时时间 同步发送单条public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return defaultProducer.send(msg, timeout);}//附带异步发送  通过 SendCallback 回调通知 两个回调方法://onSuccess(SendResult sendResult): 发送成功时调用//onException(Throwable e): 发送失败时调用public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {defaultProducer.send(msg, sendCallback);}//异步发送多条public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {defaultProducer.send(msgs,sendCallback);}

1.2 原生消费消息

1.2.1 原生Push和Pull对比

原生 RocketMQ 消费方式主要有两类:Push 模式Pull 模式

Push 模式特点 和 Pull方式(手动拉取)比较

特性 Push 模式 Pull 模式
复杂度 低,注册监听即可 高,需要手动拉取和管理 offset
并发控制 消费线程池控制 用户线程池控制
拉取节奏 pullInterval + 队列并发,间隔不是精确全局 可精确控制,按需拉取
高吞吐量 支持 可支持,但需要自己实现批量处理
延迟 较低 取决于拉取间隔和逻辑
适用场景 日常消费,高吞吐量 限流或复杂控制,批量处理,测试等

1.2.2 Push和SCS和rocketmq

原生 Push 模式:

  • 核心类:DefaultMQPushConsumer
  • 消息由 Broker 推送到客户端,客户端注册 Listener 处理
  • 特点
    • 使用难度较低,注册 Listener 即可
    • 并发控制通过 consumeThreadMin / consumeThreadMax 控制消费线程数
    • 拉取控制通过 pullBatchSize、consumeMessageBatchMaxSize、pullInterval
    • 消费模式是并发消费,可按队列分片并行处理
    • 吞吐量高,延迟低
    • 但是 pullInterval 不是全局严格间隔,消费逻辑完全由应用自己处理,容易出现消息积压

Spring Cloud Stream(SCS)

  • Spring Cloud Stream(SCS) 框架封装了 RocketMQ 消费逻辑,通过注解 @StreamListener 或者 functional binding (Consumer<T>) 接收消息
    点击此处了解 SpringCloud之Stream消息驱动RocketMQ讲解
  • 特点
    • 使用注解或函数式接口即可
    • 并发控制,通过 binder 配置 concurrency 参数控制
    • SCS 内部自己管理 PullTask,不暴露 pullInterval
    • 使用 异步线程池 + 负载均衡的消费模式,自动 ack
    • 自动管理 offset,支持消息重试和失败管理;集成 Spring 生态
    • 但是对低延迟或超高吞吐量的精细控制不如原生 Push 灵活

rocketmq-spring-boot-starter

  • 核心是基于 RocketMQ 原生 SDK 封装,和 Spring Boot 集成
  • 注解方式 @RocketMQMessageListener,提供简化配置 点击此处了解 SpringBoot整合RocketMQ
  • 特点
    • 使用难度低,Spring Boot 风格配置
    • 并发控制通过 consumeThreadMin / consumeThreadMaxconcurrency 配置
    • 内部使用 Push 模式,可通过配置控制批量消费
    • 消费模式与原生 Push 相似,但集成了 Spring Boot 配置管理和生命周期
    • 简化原生 Push 开发,自动 ack,支持 Spring Boot 注解和配置
    • 但是灵活性稍低,不如原生 SDK 可精确控制 pullInterval、自定义批量策略

三者比较

特性 原生 Push SCS rocketmq-spring-boot-starter
易用性
并发控制 高(线程池) 中(binder concurrency) 高(线程池或 concurrency)
拉取控制 精细,可批量、间隔 框架自动管理,不暴露 接近原生 Push,但简化配置
消息确认 手动 ack 自动 ack 自动 ack(可配置)
消息重试/死信 SDK 控制 框架自动管理 框架自动管理
高吞吐量 支持 取决于线程池和 binder 支持,但需配置批量
使用场景 精细化控制、批量消费 Spring 集成、快速开发 Spring Boot 项目快速接入

1.2.3 Pull 模式(手动拉取)

核心类:DefaultMQPullConsumer
消费逻辑由用户主动控制:

//创建 DefaultMQPullConsumer 实例,指定消费者组名为 "consumerGroup"
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumerGroup");
//设置 NameServer 地址,用于发现 RocketMQ 的 broker 节点
consumer.setNamesrvAddr("127.0.0.1:9876");
//启动消费者实例,初始化与 broker 的连接
consumer.start();//获取指定 "topic" 的所有 MessageQueue 集合,一个 topic 可能包含多个队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topic");
for (MessageQueue mq : mqs) {//获取当前队列的消费位点(offset),第二个参数 true 表示从 broker 获取最新位点long offset = consumer.fetchConsumeOffset(mq, true);while (true) {//从指定队列拉取消息: mq: 目标消息队列 "*": 消息过滤标签表达式 offset: 从该位点开始拉取 32: 最大拉取消息数量PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32);for (MessageExt msg : pullResult.getMsgFoundList()) {System.out.println(new String(msg.getBody()));}//更新消费位点为下次拉取的起始位置offset = pullResult.getNextBeginOffset();//将新的消费位点更新到 broker,确保消费进度不会丢失consumer.updateConsumeOffset(mq, offset);if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) break;}
}

1.2.4 Push 模式

核心类:DefaultMQPushConsumer

消费逻辑通过 注册消息监听器 实现:
构造消费核心

@Slf4j
public class MessageConsumer {protected transient DefaultMQPushConsumer defaultConsumer;private final ConsumerConfig config;public MessageConsumer(ConsumerConfig config,String topic,String group) throws MQClientException{DefaultMQPushConsumer defaultConsumer = new DefaultMQPushConsumer();//消费者基础配置//设置 NameServer 地址,用于发现 broker 节点defaultConsumer.setNamesrvAddr("127.0.0.1:9876");//设置消费者组名称,用于标识一组相关的消费者实例defaultConsumer.setConsumerGroup(group);//线程池配置//设置客户端回调执行线程数defaultConsumer.setClientCallbackExecutorThreads(Runtime.getRuntime().availableProcessors()*2);//设置消费线程池最小线程数defaultConsumer.setConsumeThreadMin(1);//设置消费线程池最大线程数defaultConsumer.setConsumeThreadMax(2);//消费行为配置// 设置一次从 broker 拉取的最大消息数defaultConsumer.setPullBatchSize(100);//设置拉取间隔时间(毫秒),控制拉取频率defaultConsumer.setPullInterval(1000);// 拉取间隔// 设置一次交给业务处理的最大消息数defaultConsumer.setConsumeMessageBatchMaxSize(50); //偏移量和心跳配置//设置消费者偏移量持久化间隔defaultConsumer.setPersistConsumerOffsetInterval(5000);//设置与 broker 的心跳间隔defaultConsumer.setHeartbeatBrokerInterval(30000);//订阅配置//订阅指定主题的所有消息("*"表示不过滤)defaultConsumer.subscribe(topic, "*");//监听器注册//注册并发消息监听器,当消息到达时调用 consumerEvent 方法处理defaultConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)->consumerEvent(msgs, context));this.defaultConsumer = defaultConsumer;}//启动public void doStart(String desc) throws Exception {defaultConsumer.start();log.info("RocketMQ consumer :{},started", desc);}//注销public void doStop(String desc) {defaultConsumer.shutdown();log.info("RocketMQ consumer :{},stop", desc);}//业务逻辑public ConsumeConcurrentlyStatus consumerEvent(List<MessageExt> msgList, ConsumeConcurrentlyContext context){try {MDC.put("trace_id", IdUtil.simpleUUID());if(CollUtil.isEmpty(msgList)) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;List<UserInfo> userInfoList = msgList.stream().filter(f -> StrUtil.isNotBlank(f.getTopic()) && f.getTopic().equals("test")).map(m -> JSONObject.parseArray(new String(m.getBody(), StandardCharsets.UTF_8), UserInfo.class)).flatMap(m -> m.stream()).toList();if(CollUtil.isNotEmpty(userInfoList )){// 业务逻辑处理部分}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch (Exception e){log.error("RocketMQ消费异常:{}",e.getMessage(),e);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}finally {log.info("RocketMQ消费结束....");MDC.clear();}}
}

使用示例

public void init() {try {MessageConsumer eventConsumer = new MessageConsumer(consumer, consumer.getEventTopic(),consumer.getEventGroup());eventConsumer.doStart("MQ测试");} catch (Exception e) {log.error("MQ启动失败:{}",e.getMessage(),e);if(eventConsumer!=null) eventConsumer.doStop("埋点测试");}}

1.2.5 其他问题

1.2.5.1 拉取间隔pullInterval

pullInterval 的行为可以理解成 每个 PullTask 完成后的最小间隔,具体是这样的:

  • RocketMQ Pull 模型
    对每个 队列(Queue),RocketMQ 会创建一个 PullTask
    PullTask 的作用是不断从 Broker 拉取消息,然后交给消费线程处理
  • pullInterval 的作用
    defaultConsumer.setPullInterval(xxx) 设置的 单位毫秒
    表示 每个 PullTask 拉取一次消息后,至少等待 pullInterval 再进行下一次拉取
    如果队列有未消费的消息,PullTask 完成后立即尝试拉下一批,但每个队列的间隔保证至少 xxx 毫秒,只有在拉取为空(没有新消息)时,才会等待 pullInterval 毫秒后再尝试下一次拉取
    注意:这里是每个 队列的 PullTask 间隔,不是整个主题的,也不是整个 Consumer Group 的全局间隔
  • 实际行为
    如果一个 PullTask 拉取到消息并立即消费完毕,PullTask 会等待 pullInterval 后再触发下一次拉取
    如果队列里消息很多、消费慢 PullTask 拉取完后消息还没消费完,不会阻塞下一次拉取
    RocketMQ 会根据队列的状态尽可能连续拉取,以保证消息不积压
  • PullTask 在一次拉取后会决定立即再拉还是等 pullInterval 再拉,需要看下列几个条件,满足可以接收更多并且处理能力允许的条件就 立即再拉;否则就 休眠 pullInterval 再拉:
    • 本次拉取是否返回了消息(pulledCount > 0),
    • 本地 ProcessQueue(缓存)是否已接近或超过高水位,
    • 消费(业务)线程池是否有空闲线程能立即处理更多消息,
    • 以及是否在做流控/限速(Broker/Client 流控信号)

1.2.5.2 回调线程数ClientCallbackExecutorThreads

ClientCallbackExecutorThreads:是指客户端处理网络回调(包括消息拉取结果、心跳响应、offset 更新、发送确认等)时的线程池大小
换句话说,它是 RocketMQ 客户端网络 I/O 层(Netty 客户端) 的回调处理线程数,主要负责执行客户端与 Broker 之间通信事件的回调逻辑。

RocketMQ 客户端内部有三个关键线程池概念:

线程池名称 控制参数 作用
Netty Callback Executor(网络回调线程池) setClientCallbackExecutorThreads 负责处理来自 Broker 的网络响应,例如:拉取结果回调、发送响应、心跳响应等
Pull Message Service(拉取线程池) 内部固定线程(每个 consumer 一个) 循环执行 pullMessage(),从 Broker 拉消息(异步发请求)——不是由上面的 callback pool 控制
Consume Message ThreadPool(业务消费线程池) setConsumeThreadMin/Max 或 spring 配置 consumeThreadMin/Max 负责执行用户的 MessageListener(也就是业务逻辑),和 BulkProcessor、ES 写入等操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/944896.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025 年低代码平台厂商最新推荐排行榜:深度解析行业实力与创新优势,助力企业精准选型

引言 数字化转型进入深水区,74% 的中小企业将其列为未来三年生存关键,但成功落地比例不足 30%。低代码平台虽成为破局利器,却面临选型困境:部分平台开发效率不足、专业门槛居高不下,或扩展能力有限难以适配复杂场…

react中redux的使用详细说明 - 详解

react中redux的使用详细说明 - 详解2025-10-24 09:21 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !imp…

智能时代下的SEO关键词优化新策略 - 实践

智能时代下的SEO关键词优化新策略 - 实践pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "M…

2025 年乡墅品牌推荐:湖南鑫住工美宅科技有限公司,为您打造理想乡居生活

随着乡村振兴战略的推进,乡村住宅市场迎来了新的发展机遇。越来越多的城市精英选择回乡建房,对乡墅的品质和服务也提出了更高的要求。在这样的背景下,湖南鑫住工美宅科技有限公司凭借其深厚的行业积淀和创新的服务模…

2025 年桥架源头厂家最新推荐排行榜:聚焦优质品牌核心优势助力采购决策

随着工业基建与新能源领域快速发展,桥架作为电力传输和线缆防护的关键设备,市场需求持续攀升,但行业内厂商资质不一、产品质量参差不齐、服务体系不完善等问题,给采购方带来极大选择难题。为帮助企业及采购人员精准…

2025 人力资源管理系统厂商最新推荐排行榜:聚焦 AI 赋能与行业适配,解锁数智化管理新路径

引言 在 AI 技术深度渗透人力资源管理的 2025 年,系统选型已成为企业突破管理瓶颈的关键抓手。生产制造的工时核算难题、餐饮服务的高流动率管理、物业行业的成本压力以及央国企的合规需求,让企业对专业化 HR 系统需…

2025年10月美白精华评价榜:五款高口碑单品横向对比

入秋以后,紫外线强度虽略有下降,但此前累积的黑色素仍在肌肤底层活跃,加上换季屏障易敏感、代谢变慢,很多人会在10月出现“夏黑反扑”:肤色暗沉、蜡黄、痘印难退。小红书联合益普索发布的《2024中国功效护肤白皮书…

2025 升降机厂家最新推荐排行榜,剪叉式升降机/导轨式升降机/固定式升降机/液压升降机公司推荐

在自动化生产、物流仓储、建筑施工等领域高速发展的当下,丝杆升降机、液压升降机等设备已成为保障作业效率的核心装备,市场需求持续攀升。但行业现存品牌繁杂、质量参差的问题,部分产品存在承载不足、精度偏差等隐患…

(React中组件的)状态(state)和属性(props)之间有何不同?

定义区别对比项 props(属性) state(状态)来源 由 父组件传入 由 组件自身定义和维护是否可修改 不可修改(只读) 可修改(通过 setState 或 useState)作用 用于让组件间 通信(父→子) 用于管理组件内部 动态数…

2025 年最新推荐!AI 教育培训机构推荐榜单:覆盖企业 AI 培训 / AI 应用落地 / AI 商业培训等多场景,帮你精准挑选优质机构

引言 随着 AI 技术在教育领域的深度渗透,AI 教育培训机构数量激增,但行业乱象也随之显现。部分机构课程同质化严重,仅简单堆砌 AI 理论,缺乏实战场景;有的技术实力薄弱,无法提供精准的智能教学服务;还有些机构重…

2025年6月杭州丝绸品牌推荐:老字号排名与AIGC创新对比

想买一条真正代表杭州韵味的丝巾,却担心景区店价格虚高、花型撞款、真假难辨?出差要挑一份既轻又有文化分量的伴手礼,时间紧、品类多,不知从何下手?这些场景背后,是消费者对“正宗、稀缺、可验证”的丝绸产品的共…

2025 年集装袋厂家最新推荐榜单:全面剖析行业领军者创新工艺与卓越品质,精选导电 / 防静电 / 抗静电 / 铝箔 / 食品级等多类型产品优质厂家

引言 当前,化工、食品、新能源等行业高速发展,对集装袋的需求持续攀升,但行业内产品质量参差不齐,抗撕裂强度不足、防静电性能不达标等问题频发,给货物运输带来安全隐患,且不同行业个性化需求难以被普通集装袋满…

2025 年算法备案咨询服务公司最新推荐榜单:覆盖互联网信息 / 深度合成 / AI 大模型备案的权威优选指南

引言 2025 年算法备案进入 “双轨监管深化期”,《生成式人工智能服务管理暂行办法》等新规明确大模型与算法需同步完成备案,8 项核心材料与双级审核流程让企业合规难度陡增。多数企业面临 “懂技术不懂合规”“对审核…

P9356 「SiR-1」Bracket 做题记录

P9356 「SiR-1」Bracket 做题记录 P9356 「SiR-1」Bracket - 洛谷 (luogu.com.cn) 将 \(\texttt{(}\) 看为 \(1\),将 \()\) 看为 \(-1\),整个括号序列看做一个折线图。 首先将末尾补到 \(0\),若 \(s_n<0\) 则在前…

放大器保护机制的技术原理与应用实践

文章总结:现代电子测量系统中,功率放大器与高压放大器的保护机制通过过流、过压、过温三维度设计,确保设备稳定运行与测量精度。在现代电子测量系统中,功率放大器和高压放大器作为关键信号调理设备,其可靠性直接关…

基于Java+Springboot+Vue开发的鲜牛奶订购网站管理系统(前后端分离)源码+运行步骤

项目简介该项目是基于Java+Springboot+Vue开发的鲜牛奶订购管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通…

2025年10月PE管厂家推荐榜:五强对比与选购全攻略

正在铺设市政给水管网的项目经理、需要更换农业滴灌主管的合作社、以及为温泉酒店寻找耐高温输送管道的采购人,都绕不开同一个问题:到底选哪家PE管厂家才稳妥?2025年行业产能继续向华北、华东集聚,全国PE实壁管年产…

2025年10月浦东装修公司口碑榜:五强对比评测

在上海浦东,装修一套房子往往意味着要在“时间、预算、质量”三条线上同时平衡:业主可能是刚拿到新房钥匙的年轻家庭,担心施工拖工期;也可能是需要翻新的二手房东,怕增项漏项;还可能是创业老板,想快速落地办公室…

安卓照片误删?这 5 种恢复方法亲测有效,小白也能上手

保存在 Android 设备上的照片是您人生中最珍贵的回忆之一。然而,就像上面提到的用户一样,有时照片会从图库中消失,而您甚至不知道原因。 如何从 Android 设备恢复永久删除的照片?通常,您可以从回收站 或 Google D…

MySQL学习笔记-部分实例datagrip源码-10-21

show tables; create table user(id int primary key auto_increment,name varchar(10) not null unique,age int check(age>0 and age<=120),status char(1) default 1,gender char(1) ) comment 用户表; -- 添…