前言
在消费者初始化过程中,我们看到了PullRequest的分发,接下来就是看看具体的消费流程。大体的流程是这样的:
- PullRequest的处理
- 回调PullCallBack处理pullResult
- ConsumeMessageService处理消息
PullRequest的处理
PullMessageService在拿到PullRequest后,直接转发给DefauleMQPushConsumerImpl进行处理:
在拿到PullRequest后,DefauleMQPushConsumerImpl不会马上就去进行RPC调用。如果ProcessQueue中缓存的条数大于1000条,或者大于100M,会停50微秒再将PullRequest重新投递到PullMessageService中,以达到流量控制,避免压力过大。当然,以上数值均为默认值。
这里主要就是一些细节的处理,参数的拼凑,再交由pullAPIWrapper进行RPC通信,当然会传入回调接口。细节的代码就不贴了。
在PullRequest的中,还加杂了PullCallBack的初始化,这个会在看处理pullRequest时再看看。
回调PullCallBack处理PullResult
首先来看看PullCallBack的匿名内部类的实现吧
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
//第一条消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
//拉取来的消息放入processQueue中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//提交给consumerMessageService处理
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
可以看到总共会返回4中拉取状态:FOUND,NO_NEW_MSG,NO_MATCHED_MSG,OFFSET_ILLEGAL。
对于中间两种,会更新本地的offset,然后重新投递PullRequest。对于OFFSET_ILLEGAL,会清理该ProcessQueue。
主要还是来看看FOUND的情况,先是记录一下拉取的时间,然后就把请求丢给consumeMessageService。ConsumeMessageService包括顺序消费(ConsumeMessageOrderlyService)和无序消费(ConsumeMessageConcurrentlyService)。先来看看无序消费
无序消费
//org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService#submitConsumeRequest
//batch消费size
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//对于拉取的消息size大于配置的值,会将消息根据长度进行切分,组成多个ConsumeRequest进行提交
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
这里主要是构建了一个ConsumeRequest对象,然后扔到线程池中执行,线程池是这样的
this.consumeExecutor = new ThreadPoolExecutor(
//core thread number,默认为20
this.defaultMQPushConsumer.getConsumeThreadMin(),
//max thread number,默认为64
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
再看一下ConsumeRequest的run方法。在DefaultMQPushConsumerImpl中,我们看到了consumeMessageHookList这个变量,在ConsumeRequest中,就是触发这个钩子时候,关于钩子就这么一笔带过了,不贴上来了。
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
//重置其中标记为"重试"的消息的topic
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
//让注册的listener去消费这些记录
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
...
//处理消费结果,消费失败会进行重试
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
对于listener消费的结果,有两种返回结果:
public enum ConsumeConcurrentlyStatus {
/**
* Success consumption
*/
CONSUME_SUCCESS,
/**
* Failure consumption,later try to consume
*/
RECONSUME_LATER;
}
众所周知,rocketmq对消息会有重试机制,对于消费失败的消息会重新消费,直至达到配置的值(默认为16),达到配置的值后会丢入DLQ,也就是死信队列,此时就需要人为干预了。接下来就看看它到底是怎么实现消费重试机制的。(写到这里,正好想到最近项目准备切换成kafka,正好学习借鉴一下:D)
//org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService#processConsumeResult
//...
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
//...
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//处理发送失败消息集合,消费成功时,ackIndex = consumeRequest.getMsgs().size() - 1;,失败时ack=-1
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//重新发送消息
boolean result = this.sendMessageBack(msg, context);
if (!result) {
//发送失败?重试次数+1。broker也会增加重试次数?
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
//移除消费队列中的消息,从ProcessQueue的treeMap中移除消费成功的消息,并返回第一条消息的offset,removeMessage中加了写锁,保证写过程线程安全
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
其中sendMessageBack方法即将消息扔到相关consumeGroup的重试队列中。这里就涉及到延迟队列相关的内容。总共会重试16次,16次都失败后会丢进死信队列,需要人工干预。“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”是默认设置的延迟等级。
在最后,会更新offset,这样在MQClientInstace就可以定时将offset更新到broker端了。
有序消费
在看有序消费之前还是有些疑惑的:
1.怎么保证ProcessQueue只有一次拉取的消息集合?
2.在消息消费失败后又是怎么处理的?
带着问题,我们来看看相关的流程
有序消费中,也会生成个ConsumeRequest对象,其中的run方法:
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//从本地缓存中拉取批量消息
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
//...
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//消息消费锁
this.processQueue.getLockConsume().lock();
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
} finally {
this.processQueue.getLockConsume().unlock();
}
//...
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//处理改次消息返回结果,
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
我们来看看rocketmq内部是怎么实现顺序消费的:
1.给messageQueue对象加锁
//org.apache.rocketmq.client.impl.consumer.MessageQueueLock
private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
if (null == objLock) {
objLock = new Object();
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}
return objLock;
}
2.给messageQueue相应的ProcessQueue加“锁”
ProcessQueue中的锁其实只是一个为“locked”的布尔变量。在ConsumeMessageOrderlyService中起了一个schedule任务,对所有的ProcessQueue的locked变量置为true
至于这个变量其实主要是用在处理pullRequest时:
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
//从broker端获取offset
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
其主要的目的还是保证rebalance服务已经结束,队列已分配好,offset也更新到最新。
3.消息消费锁
即this.processQueue.getLockConsume().lock();,这是一个ReentrantLock,保证listerner只能处理一批次的消息。
4.consumingMsgOrderlyTreeMap的使用
consumingMsgOrderlyTreeMap其实是Processqueue中的一个变量,官方注释是这么写的:A subset of msgTreeMap, will only be used when orderly consume.
举个例子来说,一次PullRequest可能拉取了10条Message,不过默认每次消费的条数是1,那在执行this.processQueue.takeMessags方法时,consumingMsgOrderlyTreeMap存放的就是msgTreeMap中的第一个消息
通过上面几点,我们可以看到客户端对于顺序消费的把控。接着再来看看对消费结果进行处理的过程,也就是ConsumeMessageOrderlyService.this.processConsumeResult():
不同于无序消费,顺序消费包括SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT这两种状态,来看看核心的代码
//...
//成功消费
case SUCCESS:
//获取当前消费的offset,并清空consumingMsgOrderlyTreeMap,返回的offset用于更新缓存中的offset
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
//重试
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
//将消息从consumingMsgOrderlyTreeMap中移除,放回msgTreeMap中,稍后进行重新消费
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
//重新提交消费请求
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
//...
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
再来看看SUSPEND_CURRENT_QUEUE_A_MOMENT的情况
再来看看checkReconsumeTimes()方法:
private boolean checkReconsumeTimes(List<MessageExt> msgs) {
//设置重试次数
boolean suspend = false;
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
//getMaxReconsumeTimes()默认返回Integer.MAX_VALUE
if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
//重新发送消息失败
if (!sendMessageBack(msg)) {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
} else {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
}
}
return suspend;
}
因为getMaxReconsumeTimes()默认返回Integer.MAX_VALUE,所以在相当长的一段时间里会一直尝试重新消费,而不会去拉取消息。
思考
1.在使用顺序消费时,一定要小心使用SUSPEND_CURRENT_QUEUE_A_MOMENT这个状态,如果出现这个状态,会有一段时间阻塞在相应的消息上,不会继续往下消费。如果要用还是建议设置maxReconsumeTimes的值。
2.对于同一个MessageQueue,只会有一个PullRequest对象,该对象在负载均衡服务中生成。