rocketmq默认就是可以批量消费的,但需要设置多个参数一起配合。
我们只需要知道他是怎么消费的,就可以很精准的设置他的批量消费参数。
我们看看DefaultMQPushConsumer源码中的这几个参数:
/*** 消费消息线程,最小数目*/private int consumeThreadMin = 20;/*** 消费消息线程,最大数目*/private int consumeThreadMax = 64;/*** 拉消息间隔,如果为了降低拉取速度,可以设置大于0的值*/private long pullInterval = 0;/*** 消费一批消息,最大数*/private int consumeMessageBatchMaxSize = 1;/*** 拉消息,一次拉多少条*/private int pullBatchSize = 32;
rocketmq的批量消费,简单来说就是开了一个线程池,启动多个线程去拉数据,再回调Listener去处理。
consumeThreadMin和consumeThreadMax是这个线程池的最大最小线程数,通过设置这个可以控制每次处理的线程数。
pullInterval可以理解为可调度线程池的时间参数,单位是毫秒。设置这个是为了降低消费的次数,从而达到每批消费的个数都比较多。如果本来量就很大,就没必要设置这个值了。
consumeMessageBatchMaxSize是一个最大消费限制参数,取值范围是[0,1024],也就是说,rocketmq线程池中的每一个线程,一次只能拿出1024条数据。所以如果想设置批量消费,这个值也要调大。
pullBatchSize是设置一次拉多少条,很好理解。
总结
所以说,让rocketmq批量消费,至少需要这几个值协同处理。
举个例子(pull模式并发消费):
consumer.setPullInterval(10000);
consumer.setConsumeThreadMin(5);
consumer.setConsumeThreadMax(10);
consumer.setConsumeMessageBatchMaxSize(1001);
consumer.setPullBatchSize(1000);
这个就是降低线程数,从而提升每次拉起的数据。
在没有这样配置之前,每次处理就是一条,配置了这个,每次处理就变成几十条或几百条。