前言
RocketMQ 5.0 消费者引入了一种新的消费模式:Pop 消费模式,目的是解决 Push 消费模式的一些痛点。
 RocketMQ 4.x 之前,消费模式分为两种:
- Pull:拉模式,消费者自行拉取消息、上报消费结果
- Push:推模式,消费者主动拉取消息,看起来像是 Broker 主动推送消息,主动上报消费结果
Pop 消费模式可以看作是 Push 的升级版,Push 消费模式存在以下痛点:
- 客户端负责队列重平衡、消息拉取、消费位点上报、消费失败重试等功能,逻辑太重了,不利于多语言客户端发展
- 队列数/消费者数量变更会触发重平衡操作,期间消息无法消费,容易造成消息堆积
- 队列和消费者强绑定,消费者数量超过队列数后,无法再水平扩容
- 队列和消费者强绑定,消费者僵死状态下导致消息堆积
Pop 消费模式就是要解决这些痛点的,它的设计目标:
- 消费者只管消息拉取、消息消费、上报 ACK,客户端 SDK 轻量级
- 队列不再绑定消费者,消费者可以消费所有队列消息
- 消费者可以很方便的水平扩容
要实现这个目标还是有不小的挑战,看看 RocketMQ 是如何做到的吧。
设计难点
Pop 消费模式是存在一些设计难点的。
 Push 模式下队列为什么要和消费者绑定?也就是一个队列同一时间只允许被一个消费者消费,因为这样实现起来简单啊,主要体现在:
- Broker 消息投递处理简单,根据消费者请求的拉取位点投递
- Broker 消费位点管理简单,无需记录消息是否被消费,只需要记录消费位点,位点前的都消费了,位点后的都没消费
如果改成 Pop 模式,首先面临的挑战有:
 1、Broker 消息投递时,要记录哪些消息已经投递了,哪些消息还没投递
 Pop 模式下一个队列可以被多个消费者消费,假设现在队列里面有 1~10 号消息,消费者A 拉取了 1~3 号消息,消费者B 再拉取的时候,Broker 必须知道 1~3 号消息已经投递过在消费中了,不能再投递给消费者B了,得投递 4 号及之后的消息给消费者B才行。
 
2、Broker 要记录哪些消息消费成功了,哪些消息消费失败了,不能单纯记录消费位点了
 因为队列可以被多个消费者消费,大家都在上报队列粒度的消费位点,Broker 没法管理了。
你看,为了可以让队列被多个消费者消费,Broker 已经不能再按队列维度去管理信息了,必须精确到消息粒度,这就需要额外的数据结构来支撑。
设计实现
取消客户端队列 Rebalance
 客户端重平衡,不仅限制了消费者的消费能力,还增加了客户端复杂度,重平衡期间消息无法被消费,还容易造成消息堆积。Pop 消费模式下,消费者可以消费所有队列,也就不再需要客户端重平衡了。消费者查询给自己分配的队列时,Proxy 返回的是 Broker 维度且 queueId=-1 的逻辑队列,Broker 端会投递所有队列的消息。
 
invisibleTime
 Pop 消费模式下,消息投递后会有一个invisibleTime的概念,即消息的不可见时间,默认是 60 秒。比如 M1 投递给消费者A后,在不可见时间段内,其它消费者是无法消费这条消息的。
 
 另外 Broker 还提供了changeInvisibleTime()接口修改单条消息的不可见时间,比如消息消费失败后,会根据重试次数来设置新的不可见时间。
 
CK & ACK 消息
 RocketMQ 为了记录消息是否被消费成功,引入了 CK 和 ACK 消息,以及一个专属的 Topic:rmq_sys_REVIVE_LOG_{clusterName}。
 消费者在拉取消息时,Broker 端会给拉取到的这一批消息发一个 CK 消息,CK 消息记录了各消息的偏移量可以定位到具体消息,同时用位图记录了各消息的 ACK 情况。Consumer 消费成功后会调用ackMessage接口,Broker 会发送一个对应的 ACK 消息;消费失败后会调用changeInvisibleTime接口,延长消息不可见时间,底层是先发一个旧消息的 ACK 消息,再发一个新消息的 CK 消息。
 
 上述流程的运转是基于消费者正常处理消费结果的前提下的,如果消费者挂了,既不发 ack 也不发 nack,Broker 又该怎么处理这些消息呢?其实也能正常处理,因为 CK 和 ACK 消息均是延时消息,延迟的时间即消息的不可见时间,CK 消息会提前一秒消费,目的是匹配 ACK 消息。
 RocketMQ 会启动八个线程消费 REVIVE Topic 对应的八个队列,匹配 CK 消息里还有哪些消息没被 ack,再将这些没被 ack 的消息发送到 RETRY Topic,消费者就可以重新消费了。
源码
先通过流程图熟悉下 Pop 消费模式的大体流程:
 
ProcessQueue
PushConsumer 启动后,会定时向 Proxy 查询分配的队列:
protected void startUp() throws Exception {......scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {try {// 扫描分配的队列scanAssignments();} catch (Throwable t) {log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);}}, 1, 5, TimeUnit.SECONDS);......
}
Pop 消费模式下,客户端无须知道具体的队列数据,因为 Broker 会投递所有队列消息,所以 Proxy 只返回 Broker 维度且 queueId=-1 的一个逻辑队列即可。
 消费者拿到分配的队列后,紧接着调用syncProcessQueue()方法处理队列,有两种情况:
- 队列不再分配给自己,停止拉取消息,移除队列
- 新分配的队列,立即拉取消息
对于新分配的队列,消费者会创建ProcessQueue对象开始拉取队列消息:
private void receiveMessageImmediately() {// Broker端点列表 gRPC负载均衡调用final Endpoints endpoints = mq.getBroker().getEndpoints();// 构建请求final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);// 发请求final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,consumer.getPushConsumerSettings().getLongPollingTimeout());Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {@Overridepublic void onSuccess(ReceiveMessageResult result) {// 处理消息onReceiveMessageResult(result);}}
}
PopMessageProcessor
Proxy 端会开启 gRPC 服务,GrpcMessagingApplication 用来处理客户端的请求,拉取消息对应的处理方法是receiveMessage,最终会交给 Broker 的 PopMessageProcessor 处理,主要步骤:
- 校验请求参数、队列写权限等等
- 构建消息过滤器 ExpressionMessageFilter
- 按照 1/5 的概率先尝试从重试队列里拉取消息
- 遍历所有队列拉取消息,直到拉取最大消息数
- 如果普通队列拉取的消息数没达到最大消息数,再尝试拉取重试队列
- 如果还有剩余消息,通知其它被长轮询挂起的请求继续拉取
- 返回结果
private RemotingCommand processRequest(final Channel channel, RemotingCommand request)throws RemotingCommandException {......// 参数、权限校验// Topic配置TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// 消费组订阅配置SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());// 消息过滤ExpressionMessageFilter messageFilter = ......// POP模式下,消费者要遍历所有队列,随机一个下标开始读取,都从0开始读的话,存在热点队列,竞争锁int randomQ = random.nextInt(100);int reviveQid; // REVEIE队列ID 顺序消息999 普通消息:0~7轮询if (requestHeader.isOrder()) {reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;} else {reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());}int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();GetMessageResult getMessageResult = new GetMessageResult(commercialSizePerMsg);long restNum = 0; // 剩余消息数// 平均每5次请求,读取一下重试队列boolean needRetry = randomQ % 5 == 0;if (needRetry && !requestHeader.isOrder()) {restNum = popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid,channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}// POP模式下,请求的queueId=-1,读所有队列if (requestHeader.getQueueId() < 0) {for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {int queueId = (randomQ + i) % topicConfig.getReadQueueNums();restNum = popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,startOffsetInfo, msgOffsetInfo, orderCountInfo);}}// 拉取到的消息数量不满足消费者期望的数量,接着拉取重试队列:%RETRY%{consumerGroup}_{topic}......if (!getMessageResult.getMessageBufferList().isEmpty()) {// 拉取到消息,且队列里面还有消息,通知其它拉取请求if (restNum > 0) {notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),requestHeader.getQueueId());}} else {// 没有消息,进入长轮询状态 挂起int pollingResult = polling(channel, request, requestHeader);}......
}
通过队列获取消息的方法是popMsgFromQueue,虽然队列可以被多个消费者消费,但是同一个消费组下,针对同一个队列拉取消息的行为必须保证串行,所以 Broker 首先会构建一个topic@group@queueId格式的字符串作为 lockKey 保证加锁成功,然后再获取消息拉取位点 offset,调用 MessageStore 获取消息,最终给这批消息记录 CK 消息。
private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,Channel channel, long popTime,ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),requestHeader.getConsumerGroup()) : requestHeader.getTopic();// 给队列加锁 同一消费组下的队列串行读取 topic@group@queueIdString lockKey =topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;// 加锁if (!queueLockManager.tryLock(lockKey)) {restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;return restNum;}// 拉取位点offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);// 获取消息GetMessageResult getMessageTmpResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), topic, queueId, offset,requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter);if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {// 追加CheckPointappendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());}......queueLockManager.unLock(lockKey);
}

 这里的锁实现很简单,通过 AtomicBoolean CAS 修改上锁标记位来判断是否加锁成功:
static class TimedLock {private final AtomicBoolean lock;private volatile long lockTime;
}
加锁成功后,Broker 得知道该从哪里开始给消费者投递消息,这就是拉取位点的获取:
- 首先取已提交的消费位点
- 如果 CK 缓冲区有已投递的 PopCheckPoint,则取缓冲区的拉取位点,避免已经投递过的消息重复投递
private long getPopOffset(String topic, PopMessageRequestHeader requestHeader, int queueId, boolean init,String lockKey) {// 已提交的消费位点long offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),topic, queueId);if (offset < 0) {// 还没消费过 判断是从最旧还是最新的开始消费if (ConsumeInitMode.MIN == requestHeader.getInitMode()) {offset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);} else {// pop last one,then commit offset.offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;// max & no consumer offsetif (offset < 0) {offset = 0;}if (init) {this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset",requestHeader.getConsumerGroup(), topic,queueId, offset);}}}// CK缓冲区的位点,这些是已投递、还没提交消费的位点long bufferOffset = this.popBufferMergeService.getLatestOffset(lockKey);if (bufferOffset < 0) {return offset;} else {return bufferOffset > offset ? bufferOffset : offset;}
}
有了拉取位点,接下来就是通过 MessageStore 查询消息了,底层会读取 consumequeue 和 commitlog 文件,这里不赘述。拉取到消息列表后,Broker 会给这一批消息记录一个 CK 消息,用于后续匹配 ACK 消息。
PopCheckPoint
CK 消息对应的类是 PopCheckPoint,主要记录了:消息拉取时间、消息偏移量、不可见时间、消息 ACK 位图等等。
public class PopCheckPoint {// 起始偏移量@JSONField(name = "so")private long startOffset;// 消息拉取时间@JSONField(name = "pt")private long popTime;@JSONField(name = "it")private long invisibleTime;// 位图 收到ACK消息则把对应位设为1@JSONField(name = "bm")private int bitMap;// 消息数量@JSONField(name = "n")private byte num;@JSONField(name = "q")private byte queueId;@JSONField(name = "t")private String topic;// 消费组@JSONField(name = "c")private String cid;@JSONField(name = "ro")private long reviveOffset;// 消息增量偏移量@JSONField(name = "d")private List<Integer> queueOffsetDiff;@JSONField(name = "bn")String brokerName;
}
构建好 PopCheckPoint 对象,Broker 会把它作为一个普通消息写入 commitlog 持久化,消息内容:
topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ck
body: json(PopCheckPoint)
deliverTimeMs: invisibleTime-1s
CK 消息存储完毕后,就可以正常返回消息了。需要注意的是,消息在返回给消费者前,Broker 会给消息设置一个很重要的属性:POP_CK。它是消息关联的 CK 消息的句柄字符串,消费者基于该属性来 ack 消息。
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),responseHeader.getReviveQid(), messageExt.getTopic(), messageQueue.getBrokerName(), messageExt.getQueueId(), msgQueueOffset)
);
POP_CK由以下八个属性构成,空格连接,通过该属性可以快速定位到 CK 消息。
{ckQueueOffset} {popTime} {invisibleTime} {reviveQid} {0/1} {brokerName} {queueId} {msgQueueOffset}# 示例值
2 1698656741635 60000 0 0 broker-a 3 2
AckMessageProcessor
消费者消费完消息后,会调用eraseMessage()擦除消息,也就是根据消费结果判断是 ack 还是 nack。
public void eraseMessage(MessageViewImpl messageView, ConsumeResult consumeResult) {statsConsumptionResult(consumeResult);ListenableFuture<Void> future = ConsumeResult.SUCCESS.equals(consumeResult) ? ackMessage(messageView) :nackMessage(messageView);future.addListener(() -> evictCache(messageView), MoreExecutors.directExecutor());
}
如果消费成功,则调用ackMessage接口,Broker 的处理方式也很简单,就是构建一个 AckMsg 对象,然后把它作为消息体发一个 ACK 消息。
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......AckMsg ackMsg = new AckMsg();ackMsg.setAckOffset(requestHeader.getOffset());ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());ackMsg.setTopic(requestHeader.getTopic());ackMsg.setQueueId(requestHeader.getQueueId());ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {顺序消息处理}// 构建消息存储MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(reviveTopic);msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));msgInner.setQueueId(rqId);msgInner.setTags(PopAckConstants.ACK_TAG);msgInner.setBornTimestamp(System.currentTimeMillis());msgInner.setBornHost(this.brokerController.getStoreHost());msgInner.setStoreHost(this.brokerController.getStoreHost());// 延时消息 拉取时间+不可见时间msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);return response;
}
ACK 消息内容如下:
topic: rmq_sys_REVIVE_LOG_{clusterName}
queueId: 0~7轮询
tag: ack
body: json(AckMsg)
deliverTimeMs: invisibleTime
注意:CK & ACK 消息同属一个Topic
ChangeInvisibleTimeProcessor
如果消费失败,会根据重试次数调用changeInvisibleDuration接口延长消息不可见时间。Broker 最终会由 ChangeInvisibleTimeProcessor 处理请求,因为消息是按照紧凑的方式顺序写入 commitlog,所以写入后就不支持修改了,所谓的修改消息不可见时间,其实是先发一个新的 CK 消息,再发一个旧消息的 ACK 消息。
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,boolean brokerAllowSuspend) throws RemotingCommandException {......// add new cklong now = System.currentTimeMillis();PutMessageResult ckResult = appendCheckPoint(requestHeader, ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));// ack old msgackOrigin(requestHeader, extraInfo);
}
所以,无论消费者是调用 ack 还是 nack,消息一定会被 ack 掉。另外还有一种场景就是消费者异常,没有上报消费结果,Broker 在消息不可见时间到期后会把这些没被 ack 掉的消息发到重试队列里,让其它消费者消费。
PopReviveService
CK & ACK 消息存储下来以后,由谁来处理呢?AckMessageProcessor 会开启八个线程,消费 REVIVE Topic 的八个队列,线程对应的服务是 PopReviveService。
 PopReviveService 线程每秒执行一次,消费 REVIVE Topic 里的消息。如果是 CK 消息则重新构建 PopCheckPoint 对象;如果是 ACK 消息则更新 PopCheckPoint 里的位图,通过 tag 来区分消息类型。
 最后处理 PopCheckPoint,已经被 ack 的消息不做处理,未被 ack 的消息会构建一条新消息重新发送到重试队列,Topic 规则是:%RETRY%{consumerGroup}_{topic}。
 
public void run() {......ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();// 消费Revive队列消息,构建PopCheckPoint放入mapconsumeReviveMessage(consumeReviveObj);// 处理PopCheckPointmergeAndRevive(consumeReviveObj);
}
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {HashMap<String, PopCheckPoint> map = consumeReviveObj.map;// 消费位点long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);while (true) {// 查询REVIVE队列消息List<MessageExt> messageExts = getReviveMessage(offset, queueId);for (MessageExt messageExt : messageExts) {//ck消息if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {String raw = new String(messageExt.getBody(), DataConverter.charset);PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {// ack消息String raw = new String(messageExt.getBody(), DataConverter.charset);AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime());// 设置CheckPoint ACK位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));} else {POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);}}        }}
}
CK 消息到期后,会触发reviveMsgFromCk恢复没有被 ack 的消息,处理方式是构建新消息发到重试队列。
private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {for (int j = 0; j < popCheckPoint.getNum(); j++) {if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {// 已经ack了,跳过continue;}// 查询实际消息long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());if (messageExt == null) {POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",queueId, popCheckPoint.getTopic(), msgOffset);continue;}//skip ck from last epochif (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);continue;}// 重试 构建一条消息重新Put到commitlog topic:%RETRY%{consumerGroup}_{topic}reviveRetry(popCheckPoint, messageExt);}
}
PopBufferMergeService
为了记录消息的 ack 状态,RocketMQ 需要存储额外的 CK & ACK 消息,开销还是比较大的。而消息存储的目的是为了匹配消息的 ack 状态,把未被 ack 的消息发送到重试队列里。
 消息消费一般是很快的,意味着正常情况下,很快就能收到消费者发过来的 ack 和 nack 请求,那为什么不直接在内存里完成匹配呢?这样做可以大幅提升性能,消息落盘只作为一个兜底方案。
 RocketMQ 是支持优先在内存里匹配 ack 消息的,默认是关闭状态,需要手动把enablePopBufferMerge打开。
 开启内存匹配后,PopCheckPoint 会优先只追加到缓冲区,只有当缓冲区添加失败才会落地磁盘。
public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) {// key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt()if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {// 启用内存匹配return false;}if (!serving) {return false;}long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ck, timeout, {}, {}", point, now);}return false;}if (this.counter.get() > brokerController.getBrokerConfig().getPopCkMaxBufferSize()) {POP_LOGGER.warn("[PopBuffer]add ck, max size, {}, {}", point, this.counter.get());return false;}PopCheckPointWrapper pointWrapper = new PopCheckPointWrapper(reviveQueueId, reviveQueueOffset, point, nextBeginOffset);if (!checkQueueOk(pointWrapper)) {// 队列默认不超过20000个return false;}// 添加到commitOffsetsputOffsetQueue(pointWrapper);// 添加到缓冲区this.buffer.put(pointWrapper.getMergeKey(), pointWrapper);this.counter.incrementAndGet();if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ck, {}", pointWrapper);}return true;
}
同样的,Broker 在接收到消息 ack 请求时,也会优先只改缓冲区的 PopCheckPoint ack 位图,无需存储 ACK 消息。
 public boolean addAk(int reviveQid, AckMsg ackMsg) {if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {return false;}if (!serving) {return false;}try {PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());if (pointWrapper == null) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, no ck, {}", reviveQid, ackMsg);}return false;}if (pointWrapper.isJustOffset()) {return false;}PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() + 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() - 1500) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.warn("[PopBuffer]add ack fail, rqId={}, stay too long, {}, {}, {}", reviveQid, pointWrapper, ackMsg, now);}return false;}// 直接更改内存里的位图int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());if (indexOfAck > -1) {markBitCAS(pointWrapper.getBits(), indexOfAck);} else {POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);return true;}if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg);}return true;} catch (Throwable e) {POP_LOGGER.error("[PopBuffer]add ack error, rqId=" + reviveQid + ", " + ackMsg, e);}return false;}
内存资源有限,一直往里堆 PopCheckPoint 也不行啊,这时候就需要做两件事:
- 把内存里已经匹配完 ack 的 PopCheckPoint 移除掉
- 隔太久还没 ack 掉的 PopCheckPoint,必须要落盘存储了
PopBufferMergeService 本身也是个线程,会每隔 5ms 扫描一次缓冲区,执行上述操作。
 
 内存里的 PopCheckPoint 移除前需要满足两个条件:
- PopCheckPoint 消息已经持久化了
- PopCheckPoint 在内存里就已经全被 ack 掉了
还没有完全被 ack 掉的 PopCheckPoint 移除前需要做两件事:
- PopCheckPoint 消息持久化
- 已经被 ack 掉的消息也要持久化
private void scan() {long startTime = System.currentTimeMillis();int count = 0, countCk = 0;// 迭代缓冲区的PopCheckPointIterator<Map.Entry<String, PopCheckPointWrapper>> iterator = buffer.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<String, PopCheckPointWrapper> entry = iterator.next();PopCheckPointWrapper pointWrapper = entry.getValue();// CheckPoint已持久化,或已被ACK,从缓冲区删除// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}// 把超时或停留时间超10s的CheckPoint从缓冲区删除,删除前要先持久化PopCheckPoint point = pointWrapper.getCk();long now = System.currentTimeMillis();boolean removeCk = !this.serving;// ck will be timeoutif (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {removeCk = true;}// 内存停留时间超过10秒,也要移除掉if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {removeCk = true;}if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime() * 2L) {POP_LOGGER.warn("[PopBuffer]ck finish fail, stay too long, {}", pointWrapper);}// double checkif (isCkDone(pointWrapper)) {continue;} else if (pointWrapper.isJustOffset()) {// just offset should be in store.if (pointWrapper.getReviveQueueOffset() < 0) {// reviveQueueOffset<0代表还没存储,要先存储putCkToStore(pointWrapper, false);countCk++;}continue;} else if (removeCk) {// put buffer ak to storeif (pointWrapper.getReviveQueueOffset() < 0) {putCkToStore(pointWrapper, false);countCk++;}if (!pointWrapper.isCkStored()) {continue;}for (byte i = 0; i < point.getNum(); i++) {// reput buffer ak to store// 存储ack消息if (DataConverter.getBit(pointWrapper.getBits().get(), i)&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {if (putAckToStore(pointWrapper, i)) {count++;markBitCAS(pointWrapper.getToStoreBits(), i);}}}if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper);}iterator.remove();counter.decrementAndGet();continue;}}}// 扫描commitOffsets,提交消费位点int offsetBufferSize = scanCommitOffset();long eclipse = System.currentTimeMillis() - startTime;if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) {POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);this.serving = false;} else {if (scanTimes % countOfSecond1 == 0) {POP_LOGGER.info("[PopBuffer]scan, PopBufferEclipse={}, " +"PopBufferToStoreAck={}, PopBufferToStoreCk={}, PopBufferSize={}, PopBufferOffsetSize={}",eclipse, count, countCk, counter.get(), offsetBufferSize);}}scanTimes++;if (scanTimes >= countOfMinute1) {counter.set(this.buffer.size());scanTimes = 0;}
}
内存里的 PopCheckPoint 除了会根据唯一键,构建一个 Map,还会根据topic@cid@queueId构建一个 QueueWithTime 队列连接起来。
ConcurrentHashMap<String/*topic@cid@queueId*/, QueueWithTime<PopCheckPointWrapper>> commitOffsets =new ConcurrentHashMap<>();
QueueWithTime 队列的用途有两个:
- 内存里的 PopCheckPoint 处理完毕后,提交消费位点
- 消息拉取时,获取拉取位点,避免已经投递的消息重复投递
内存里的 PopCheckPoint 只要持久化了或者全被 ack 掉了,就可以提交消费位点了。因为这些消息要么被成功消费了,要么后续在处理 CK 消息时也会被发送到重试队列里。提交消费位点的方法是commitOffset()
private boolean commitOffset(final PopCheckPointWrapper wrapper) {if (wrapper.getNextBeginOffset() < 0) {return true;}final PopCheckPoint popCheckPoint = wrapper.getCk();final String lockKey = wrapper.getLockKey();// 加锁if (!queueLockManager.tryLock(lockKey)) {return false;}try {// 旧的消费位点final long offset = brokerController.getConsumerOffsetManager().queryOffset(popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId());if (wrapper.getNextBeginOffset() > offset) {if (brokerController.getBrokerConfig().isEnablePopLog()) {POP_LOGGER.info("Commit offset, {}, {}", wrapper, offset);}} else {// maybe store offset is not correct.POP_LOGGER.warn("Commit offset, consumer offset less than store, {}, {}", wrapper, offset);}// 提交消费位点brokerController.getConsumerOffsetManager().commitOffset(getServiceName(),popCheckPoint.getCId(), popCheckPoint.getTopic(), popCheckPoint.getQueueId(), wrapper.getNextBeginOffset());} finally {queueLockManager.unLock(lockKey);}return true;
}
尾巴
RocketMQ 5.0 的 Pop 消费模式是 Push 模式的升级版,它解决了原先 Push 模式下队列只能由一个消费者消费的问题、去除了客户端繁重的重平衡逻辑、降低了消息堆积的风险。核心逻辑是 Broker 给每次拉取的一批消息发一个 CK 延时消息,客户端 ack 时再发一个 ACK 延时消息,消息到期后对 CK 消息做 ACK 匹配,把未被 ack 掉的消息发到重试独立里。这种模式下,不可避免的会带来额外开销,所以 RocketMQ 也支持优先在内存里完成匹配,CK 和 ACK 消息就不用存储了。