rocketmq源码阅读(5)——消费者消费流程

Posted by New Boy on January 20, 2019

前言

在消费者初始化过程中,我们看到了PullRequest的分发,接下来就是看看具体的消费流程。大体的流程是这样的:

  1. PullRequest的处理
  2. 回调PullCallBack处理pullResult
  3. ConsumeMessageService处理消息

PullRequest的处理

PullMessageService在拿到PullRequest后,直接转发给DefauleMQPushConsumerImpl进行处理:

在拿到PullRequest后,DefauleMQPushConsumerImpl不会马上就去进行RPC调用。如果ProcessQueue中缓存的条数大于1000条,或者大于100M,会停50微秒再将PullRequest重新投递到PullMessageService中,以达到流量控制,避免压力过大。当然,以上数值均为默认值。

这里主要就是一些细节的处理,参数的拼凑,再交由pullAPIWrapper进行RPC通信,当然会传入回调接口。细节的代码就不贴了。

在PullRequest的中,还加杂了PullCallBack的初始化,这个会在看处理pullRequest时再看看。

回调PullCallBack处理PullResult

首先来看看PullCallBack的匿名内部类的实现吧

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);

            switch (pullResult.getPullStatus()) {
                case FOUND:
                    long prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                    long pullRT = System.currentTimeMillis() - beginTimestamp;
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                                                                       pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    } else {
                        //第一条消息的offset
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                                                                            pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                        //拉取来的消息放入processQueue中
                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                        //提交给consumerMessageService处理
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);

                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                                                   DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                        } else {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        }
                    }

                    if (pullResult.getNextBeginOffset() < prevRequestOffset
                        || firstMsgOffset < prevRequestOffset) {
                        log.warn(
                            "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                            pullResult.getNextBeginOffset(),
                            firstMsgOffset,
                            prevRequestOffset);
                    }

                    break;
                case NO_NEW_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case NO_MATCHED_MSG:
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                case OFFSET_ILLEGAL:
                    log.warn("the pull request offset illegal, {} {}",
                             pullRequest.toString(), pullResult.toString());
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    pullRequest.getProcessQueue().setDropped(true);
                    DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                        @Override
                        public void run() {
                            try {
                                DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);

                                DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                log.warn("fix the pull request offset, {}", pullRequest);
                            } catch (Throwable e) {
                                log.error("executeTaskLater Exception", e);
                            }
                        }
                    }, 10000);
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void onException(Throwable e) {
        if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            log.warn("execute the pull request exception", e);
        }

        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    }
};

可以看到总共会返回4中拉取状态:FOUND,NO_NEW_MSG,NO_MATCHED_MSG,OFFSET_ILLEGAL。

对于中间两种,会更新本地的offset,然后重新投递PullRequest。对于OFFSET_ILLEGAL,会清理该ProcessQueue。

主要还是来看看FOUND的情况,先是记录一下拉取的时间,然后就把请求丢给consumeMessageService。ConsumeMessageService包括顺序消费(ConsumeMessageOrderlyService)和无序消费(ConsumeMessageConcurrentlyService)。先来看看无序消费

无序消费

//org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService#submitConsumeRequest
//batch消费size
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
    ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
    try {
        this.consumeExecutor.submit(consumeRequest);
    } catch (RejectedExecutionException e) {
        this.submitConsumeRequestLater(consumeRequest);
    }
} else {
    //对于拉取的消息size大于配置的值,会将消息根据长度进行切分,组成多个ConsumeRequest进行提交
    for (int total = 0; total < msgs.size(); ) {
        List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
        for (int i = 0; i < consumeBatchSize; i++, total++) {
            if (total < msgs.size()) {
                msgThis.add(msgs.get(total));
            } else {
                break;
            }
        }

        ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            for (; total < msgs.size(); total++) {
                msgThis.add(msgs.get(total));
            }

            this.submitConsumeRequestLater(consumeRequest);
        }
    }
}

这里主要是构建了一个ConsumeRequest对象,然后扔到线程池中执行,线程池是这样的

this.consumeExecutor = new ThreadPoolExecutor(
    //core thread number,默认为20
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    //max thread number,默认为64
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl("ConsumeMessageThread_"));

再看一下ConsumeRequest的run方法。在DefaultMQPushConsumerImpl中,我们看到了consumeMessageHookList这个变量,在ConsumeRequest中,就是触发这个钩子时候,关于钩子就这么一笔带过了,不贴上来了。

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
    //重置其中标记为"重试"的消息的topic
    ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
    if (msgs != null && !msgs.isEmpty()) {
        for (MessageExt msg : msgs) {
            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
        }
    }
    //让注册的listener去消费这些记录
    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
             RemotingHelper.exceptionSimpleDesc(e),
             ConsumeMessageConcurrentlyService.this.consumerGroup,
             msgs,
             messageQueue);
    hasException = true;
}
...
//处理消费结果,消费失败会进行重试
if (!processQueue.isDropped()) {
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}

对于listener消费的结果,有两种返回结果:

public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     */
    RECONSUME_LATER;
}

众所周知,rocketmq对消息会有重试机制,对于消费失败的消息会重新消费,直至达到配置的值(默认为16),达到配置的值后会丢入DLQ,也就是死信队列,此时就需要人为干预了。接下来就看看它到底是怎么实现消费重试机制的。(写到这里,正好想到最近项目准备切换成kafka,正好学习借鉴一下:D)

//org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService#processConsumeResult
//...
switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        //...
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
        //处理发送失败消息集合,消费成功时,ackIndex = consumeRequest.getMsgs().size() - 1;,失败时ack=-1
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            //重新发送消息
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
                //发送失败?重试次数+1。broker也会增加重试次数?
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }

        if (!msgBackFailed.isEmpty()) {
            consumeRequest.getMsgs().removeAll(msgBackFailed);

            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
        }
        break;
    default:
        break;
}

//移除消费队列中的消息,从ProcessQueue的treeMap中移除消费成功的消息,并返回第一条消息的offset,removeMessage中加了写锁,保证写过程线程安全
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

其中sendMessageBack方法即将消息扔到相关consumeGroup的重试队列中。这里就涉及到延迟队列相关的内容。总共会重试16次,16次都失败后会丢进死信队列,需要人工干预。“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”是默认设置的延迟等级。

在最后,会更新offset,这样在MQClientInstace就可以定时将offset更新到broker端了。

有序消费

在看有序消费之前还是有些疑惑的:

1.怎么保证ProcessQueue只有一次拉取的消息集合?

2.在消息消费失败后又是怎么处理的?

带着问题,我们来看看相关的流程

有序消费中,也会生成个ConsumeRequest对象,其中的run方法:

final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
        final long beginTime = System.currentTimeMillis();
        for (boolean continueConsume = true; continueConsume; ) {

            final int consumeBatchSize =
                ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

            //从本地缓存中拉取批量消息
            List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
            if (!msgs.isEmpty()) {
                final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                ConsumeOrderlyStatus status = null;

                ConsumeMessageContext consumeMessageContext = null;
                //...

                long beginTimestamp = System.currentTimeMillis();
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                boolean hasException = false;
                try {
                    //消息消费锁
                    this.processQueue.getLockConsume().lock();

                    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                } catch (Throwable e) {
                } finally {
                    this.processQueue.getLockConsume().unlock();
                }
                //...

                ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                    .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                //处理改次消息返回结果,
                continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
            } else {
                continueConsume = false;
            }
        }
    } else {

        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
    }
}

我们来看看rocketmq内部是怎么实现顺序消费的:

1.给messageQueue对象加锁

//org.apache.rocketmq.client.impl.consumer.MessageQueueLock
private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) {
    Object objLock = this.mqLockTable.get(mq);
    if (null == objLock) {
        objLock = new Object();
        Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
        if (prevLock != null) {
            objLock = prevLock;
        }
    }
    return objLock;
}

2.给messageQueue相应的ProcessQueue加“锁”

ProcessQueue中的锁其实只是一个为“locked”的布尔变量。在ConsumeMessageOrderlyService中起了一个schedule任务,对所有的ProcessQueue的locked变量置为true

至于这个变量其实主要是用在处理pullRequest时:

if (processQueue.isLocked()) {
    if (!pullRequest.isLockedFirst()) {
        //从broker端获取offset
        final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
        boolean brokerBusy = offset < pullRequest.getNextOffset();
        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                 pullRequest, offset, brokerBusy);
        if (brokerBusy) {
            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                     pullRequest, offset);
        }

        pullRequest.setLockedFirst(true);
        pullRequest.setNextOffset(offset);
    }
} else {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    log.info("pull message later because not locked in broker, {}", pullRequest);
    return;
}

其主要的目的还是保证rebalance服务已经结束,队列已分配好,offset也更新到最新。

3.消息消费锁

this.processQueue.getLockConsume().lock();,这是一个ReentrantLock,保证listerner只能处理一批次的消息。

4.consumingMsgOrderlyTreeMap的使用

consumingMsgOrderlyTreeMap其实是Processqueue中的一个变量,官方注释是这么写的:A subset of msgTreeMap, will only be used when orderly consume.

举个例子来说,一次PullRequest可能拉取了10条Message,不过默认每次消费的条数是1,那在执行this.processQueue.takeMessags方法时,consumingMsgOrderlyTreeMap存放的就是msgTreeMap中的第一个消息

通过上面几点,我们可以看到客户端对于顺序消费的把控。接着再来看看对消费结果进行处理的过程,也就是ConsumeMessageOrderlyService.this.processConsumeResult():

不同于无序消费,顺序消费包括SUCCESSSUSPEND_CURRENT_QUEUE_A_MOMENT这两种状态,来看看核心的代码

//...
//成功消费
case SUCCESS:
    //获取当前消费的offset,并清空consumingMsgOrderlyTreeMap,返回的offset用于更新缓存中的offset
    commitOffset = consumeRequest.getProcessQueue().commit();
    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
    break;
//重试
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
    if (checkReconsumeTimes(msgs)) {
        //将消息从consumingMsgOrderlyTreeMap中移除,放回msgTreeMap中,稍后进行重新消费
        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
        //重新提交消费请求
        this.submitConsumeRequestLater(
            consumeRequest.getProcessQueue(),
            consumeRequest.getMessageQueue(),
            context.getSuspendCurrentQueueTimeMillis());
        continueConsume = false;
    } else {
        commitOffset = consumeRequest.getProcessQueue().commit();
    }
    break;
default:
	break;
//...
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {       this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

再来看看SUSPEND_CURRENT_QUEUE_A_MOMENT的情况

再来看看checkReconsumeTimes()方法:

private boolean checkReconsumeTimes(List<MessageExt> msgs) {
    //设置重试次数
    boolean suspend = false;
    if (msgs != null && !msgs.isEmpty()) {
        for (MessageExt msg : msgs) {
            //getMaxReconsumeTimes()默认返回Integer.MAX_VALUE
            if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
                MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
                //重新发送消息失败
                if (!sendMessageBack(msg)) {
                    suspend = true;
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                }
            } else {
                suspend = true;
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
            }
        }
    }
    return suspend;
}

因为getMaxReconsumeTimes()默认返回Integer.MAX_VALUE,所以在相当长的一段时间里会一直尝试重新消费,而不会去拉取消息。

思考

1.在使用顺序消费时,一定要小心使用SUSPEND_CURRENT_QUEUE_A_MOMENT这个状态,如果出现这个状态,会有一段时间阻塞在相应的消息上,不会继续往下消费。如果要用还是建议设置maxReconsumeTimes的值。

2.对于同一个MessageQueue,只会有一个PullRequest对象,该对象在负载均衡服务中生成。