010 rocketmq批量消息

文章目录

  • 批量消息
  • BatchProducer.java
  • BatchConsumer.java

批量消息

批量发送可以提⾼发送性能,但有⼀定的限制:
topic 相同
waitStoreMsgOK 相同 (⾸先我们建设消息的iswaitstoremsgok=true(默认为true), 如果没有异常,我们将始终收到"OK",org.apache.rocketmq.common.message.Message#isWaitStoreMsgOK)
不支持延时发送
⼀批消息的大小不能⼤于 4M(DefaultMQProducer.maxMessageSize)
大小限制需要特殊注意,因为消息是动态的,不注意的话就可能超限,就会报错:
计算消息的大小
= (topic + body + (key + value) * N) * 吞吐量

int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();
}

BatchProducer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;public class BatchProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String topic = "TopicTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));//then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {try {List<Message>  listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();//handle the error}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);//计算消息的大小 = (topic + body + (key + value) * N) * 吞吐量int tmpSize = message.getTopic().length() + message.getBody().length;//属性值的添加Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {//+key + valuetmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the SIZE_LIMIT//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}

BatchConsumer.java

package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");// Subscribe one more more topics to consume.consumer.subscribe("TopicTest", "*");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/72219.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

6.6.6 嵌入式SQL

文章目录 2个核心问题识别SQL语句主语言和SQL通信完整导图 2个核心问题 SQL语句嵌入高级语言需要解决的2个核心问题是&#xff1a;如何识别嵌入语句&#xff1f;如何让主语言&#xff08;比如C,C语言&#xff09;和SQL通信&#xff1f; 识别SQL语句 为了识别主语言中嵌入的SQL…

Windows安装sql server2017

看了下官网的文档&#xff0c;似乎只有ubuntu18.04可以安装&#xff0c;其他debian系的都不行&#xff0c;还有通过docker的方式安装的。 双击进入下载的ISO&#xff0c;点击执行可执行文件&#xff0c;并选择“是” 不要勾选 警告而已&#xff0c;不必理会 至少勾选这两…

RuoYi框架介绍,以及如何基于Python使用RuoYi框架

若依框架&#xff08;RuoYi&#xff09;是一款基于Spring Boot和Vue.js的开源快速开发平台&#xff0c;广泛应用于企业级应用开发。它提供了丰富的功能模块和代码生成工具&#xff0c;帮助开发者快速搭建后台管理系统。 主要特点 前后端分离&#xff1a;前端采用Vue.js&#x…

从零搭建Tomcat:深入理解Java Web服务器的工作原理

Tomcat是Java生态中最常用的Web服务器之一&#xff0c;广泛应用于Java Web应用的部署和运行。本文将带你从零开始搭建一个简易的Tomcat服务器&#xff0c;深入理解其工作原理&#xff0c;并通过代码实现一个基本的Servlet容器。 1. Tomcat的基本概念 Tomcat是一个开源的Servl…

京东云DeepSeek-R1模型一键部署教程,基于智算GCS【成本2元】

使用京东云智算一键部署DeepSeek-R1模型&#xff0c;京东云智算服务AI平台GCS支持DeepSeek-R1模型预装环境&#xff0c;支持1.5B、7B、32B及70B参数模型环境&#xff0c;用户可在GCS中快速启动&#xff0c;使用ChatbotUI或者Open-WebUI作为用户界面&#xff0c;进行测试并接入业…

Jenkins 自动打包项目镜像部署到服务器 ---(前端项目)

Jenkins 新增前端项目Job 指定运行的节点 选择部署运行的节点标签&#xff0c;dev标签对应开发环境 节点的远程命令执行配置 jenkins完整流程 配置源码 拉取 Credentials添加 触发远程构建 配置后可以支持远程触发jenkins构建&#xff08;比如自建的CICD自动化发布平台&…

7.2 - 定时器之计算脉冲宽度实验

文章目录 1 实验任务2 系统框图3 软件设计 1 实验任务 本实验任务是通过CPU私有定时器来计算按键按下的时间长短。 2 系统框图 参见7.1。 3 软件设计 注意事项&#xff1a; 定时器是递减计数的&#xff0c;需要考虑StartCount&#xff1c;EndCount的情况。 /***********…

双机热备旁挂组网实验

1拓扑图 2.要求 1 、 SW3 的流量 正常情况下&#xff1a; SW1_VRF-->FW1--->SW1_Public--->R5 故障情况下&#xff1a; SW2_VRF-->FW2--->SW2_Public--->R6 2 、 SW4 的流量 正常情况下&#xff1a; SW2_VRF-->FW2--->SW2_Public--->R6 故障情…

2025春新生培训数据结构(树,图)

教学目标&#xff1a; 1&#xff0c;清楚什么是树和图&#xff0c;了解基本概念&#xff0c;并且理解其应用场景 2&#xff0c;掌握一种建图&#xff08;树&#xff09;方法 3&#xff0c;掌握图的dfs和树的前中后序遍历 例题与习题 2025NENU新生培训&#xff08;树&#…

HTML 日常开发常用标签

文章目录 HTML 日常开发常用标签1、基本结构标签2、内容标签3、多媒体标签4、表单标签5、列表和定义标签6、表格标签7、链接和图像8、元数据9、语义化标签&#xff08;HTML5新增&#xff09;10、框架和内联11、交互12、过时或不推荐使用的标签 HTML 日常开发常用标签 1、基本结…

7.1.1 计算机网络的组成

文章目录 物理组成功能组成工作方式完整导图 物理组成 计算机网络是将分布在不同地域的计算机组织成系统&#xff0c;便于相互之间资源共享、传递信息。 计算机网络的物理组成包括硬件和软件。硬件中包含主机、前端处理器、连接设备、通信线路。软件中包含协议和应用软件。 功…

【AI论文】MedVLM-R1:通过强化学习激励视觉语言模型(VLMs)的医疗推理能力

摘要&#xff1a;推理是推进医学影像分析的关键前沿领域&#xff0c;其中透明度和可信度对于赢得临床医生信任和获得监管批准起着核心作用。尽管医学视觉语言模型&#xff08;VLMs&#xff09;在放射学任务中展现出巨大潜力&#xff0c;但大多数现有VLM仅给出最终答案&#xff…

国产RISCV64 也能跑AI

Banana Pi BPI-F3 进控时空 K1开发板 AI人工智能AI 部署工具使用手册_bianbu software-CSDN博客 文章置顶了 有兴趣的可以一起留言探索&#xff0c;非常有意思&#xff1a; 我最近接触到了进迭时空研发的 Spacengine™&#xff0c;这是一套能在进迭时空 RISC-V 系列芯片上部署…

APISIX Dashboard上的配置操作

文章目录 登录配置路由配置消费者创建后端服务项目配置上游再创建一个路由测试 登录 http://192.168.10.101:9000/user/login?redirect%2Fdashboard 根据docker 容器里的指定端口&#xff1a; 配置路由 通过apisix 的API管理接口来创建&#xff08;此路由&#xff0c;直接…

【WPF】绑定报错:双向绑定需要 Path 或 XPath

背景 最开始使用的是 TextBlock: <ItemsControl ItemsSource"{Binding CameraList}"><ItemsControl.ItemsPanel><ItemsPanelTemplate><StackPanel Orientation"Horizontal"/></ItemsPanelTemplate></ItemsControl.Item…

Kotlin协变与逆变区别

在Kotlin中&#xff0c;协变和逆变是泛型编程中的两个重要概念&#xff0c;它们允许我们在类型系统中更加灵活地处理类型关系。 1.协变&#xff1a;协变允许我们使用比原始类型更具体的类型。在kotlin中&#xff0c;通过在类型参数上加out关键字来表示协变,生产者&#xff0c;例…

如何调试Linux内核?

通过创建一个最小的根文件系统&#xff0c;并使用QEMU和GDB进行调试。 1.准备工作环境 确保系统上安装了所有必要的工具和依赖项。 sudo apt-get update //更新一下软件包 sudo apt-get install build-essential git libncurses-dev bison flex libssl-dev qemu-system-x…

Java 调试模式下 Redisson 看门狗失效

一、场景分析 前几天在做分布式锁测试&#xff1a; 在调试模式下&#xff0c;lock.lock() 之后打上断点&#xff0c;想测试一下在当前线程放弃锁之前&#xff0c;别的线程能否获取得到锁。 发现调试模式下&#xff0c;看门狗机制失效了&#xff0c;Redis 上 30 秒后&#xff0…

GPT-4.5震撼登场,AI世界再掀波澜!(3)

GPT-4.5震撼登场&#xff0c;AI世界再掀波澜! GPT-4.5震撼登场&#xff0c;AI世界再掀波澜!(2) &#xff08;一&#xff09;伦理困境&#xff1a;如何抉择 GPT-4.5 的强大功能在为我们带来诸多便利的同时&#xff0c;也引发了一系列深刻的伦理问题&#xff0c;这些问题犹如高…

【数据挖掘】Pandas

Pandas 是 Python 进行 数据挖掘 和 数据分析 的核心库之一&#xff0c;提供了强大的 数据清洗、预处理、转换、分析 和 可视化 功能。它通常与 NumPy、Matplotlib、Seaborn、Scikit-Learn 等库结合使用&#xff0c;帮助构建高效的数据挖掘流程。 &#x1f4cc; 1. 读取数据 P…