rocketmq源码阅读(1)——生产者初始化与消息的发送

Posted by New Boy on January 5, 2019

前言

其实这开头还是在全部写完才加上去的。其实这篇文章也不是那么入门了,毕竟也涉及到源码了。不说熟练掌握rocketmq的架构,至少得熟悉rocketmq的整体流程,包括broker,queue,namesrv。当然,本文并未涵盖事务消息,VIPChannel,延迟消息,队列选择算法的选择等,一些高级用法的源码会在之后慢慢补充起来。不过话说,估计也没人看——、

先放个最简单的producer的demo吧,其实org.apache.rocketmq.client.producer.DefaultMQProducerTest中的测试类就很全,不过要稍微复杂一点:

public static void producer() throws Exception{
    String producerGroup = "testGroup";
    DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.setInstanceName("dasdas");

    producer.start();

    String topic = "TBW";
    String tags = "";
    byte[] data = new byte[100];
    Message message = new Message(topic, tags, data);
    message.setKeys("keys");
    producer.send(message);
}

以上就是一个基本的生产者初始化的过程,其实主要就是4个部分:生产者创建生产者启动消息创建消息发送,那核心的其实就是producer.start()producer.send(message).

producer初始化

一层层追溯进去,这是第一步

//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.java
public void start() throws MQClientException {
    this.start(true);
}

这里的this.start(true);里面的true指启动MQClientInstance,MQClientInstance是个rocketmq-client中很重要的一块,所以单独提一下

接着就是具体的启动过程:

//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.java
public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                            null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                     this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                                        + this.serviceState
                                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                        null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

比较多,就一行一行来跟踪吧

1.配置校验与调整

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
    this.defaultMQProducer.changeInstanceNameToPID();
}

this.checkConfig();校验producerGroupName的名字,取为“DEFAULT_PRODUCER”会抛出异常

this.defaultMQProducer.changeInstanceNameToPID();如果instanceName为“DEFAULT”,将修改为PID

2.MQClientInstance的创建与注册与启动

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
    mQClientFactory.start();
}

在读代码前,先简单讲讲MQClientInstance吧。在客户端中,MQClientInstance和MQClientId是一一对应的关系。而MQClientId=”IP@instanceName@UnitName”,对于broker端而言,MQClientId就相当于一个唯一的标志。所以这里就涉及到一些坑,详细的在看MQClientInstance源码的时候再说。接着看producer的初始化吧

MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook) 这一句就相当于获取MQClientInstance对象。

//org/apache/rocketmq/client/impl/MQClientManager
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
    new ConcurrentHashMap<String, MQClientInstance>();

private MQClientManager() {

}

public static MQClientManager getInstance() {
    return instance;
}

其实可以看到,MQClientManager是用了单例模式饿汉式的写法。里面维护了一个自增Id和MQClientInstance的map

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                                 this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

逻辑是这样的,就是生成clientId(IP@instanceName@UnitName),然后找是否存在对应的MQClientInstance,若无则创建对象。

接下来就是mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); 顾名思义,将生产者注册到MQClientInstance中,跟进去其实也就是一行代码

//org/apache/rocketmq/client/impl/factory/MQClientInstance.java中producerTable的定义
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);

接着是this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());,topicPublishInfoTable中存了所有我们会投递消息的topic信息。而这里getCreateTopicKey()是默认自动创建的topic=”TBW102”,源码给的注释是“Just for testing or demo program”,而且线上一般都会禁止topic自动创建

那么topicPublishInfoTable在什么时候更新呢?在我们发送消息的时候,就会去更新这个topicPublishInfoTable。另外,在mqClientInstance中,在启动时会有ScheduledTask,会定时从nameSrv中获取相关的topic信息进行更新,当然,这些会在看MQClientInstance源码时再细看。

到这里,producer初始化就差不多了,剩下的就是MQClientInstance的启动,这个会另写一篇细看

producer推送消息

顺着send(Message message),很快就找到了相关的逻辑,在DefaultMQProducerImplsendDefaultImpl方法。因为代码太长,不方便阅读,故部分小细节就直接在代码中写了。部分涉及到延迟容错模块会在下文细写

//org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);

    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    //根据topic名称获取该topic相关的信息,包括所有的queue,本次消息投递的queue
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        //同步方式默认发送3次,有三种方式,async,sync,oneway。除了sync,其余的方式都只尝试一次
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            //获取上一次发送的broker
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            //选择发送的queue,这里涉及到了选择发送队列的算法策略,会细看
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        //处理超时,中断发送请求
                        callTimeout = true;
                        break;
                    }

                    //发送消息,涉及相关rpc调用
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    //更新broker的延迟
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    //涉及到延迟容错相关内容
                    continue;
                } catch (MQClientException e) {
                    continue;
                } catch (MQBrokerException e) {
                    continue;
                } catch (InterruptedException e) {
                    throw e;
                }
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }

        throw mqClientException;
    }
}

消息发送主要看一下下面几行

//根据topic名称获取该topic相关的信息,包括所有的queue,本次消息投递的queue
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

//选择发送的queue,这里涉及到了选择发送队列的算法
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

//发送消息,涉及相关rpc调用
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

1.获取topic信息

查看tryToFindTopicPublishInfo()方法

//org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //从本地缓存中获取该topic的信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //从nameSrv获取该topic信息,并更新本地缓存
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //如果缺少topicRouterInfo,再跟踪进去后,会发现,其实就是更新默认topic"TBW102"的相关信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

主要还是updateTopicRouteInfoFromNameServer()方法,接着看

//org/apache/rocketmq/client/impl/factory/MQClientInstance.java
private final Lock lockNamesrv = new ReentrantLock();
//加锁,同一个mqClientInstance
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
    try {
        TopicRouteData topicRouteData;
        if (isDefault && defaultMQProducer != null) {
            //更新默认topic--TBW102的相关信息
            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                1000 * 3);
            if (topicRouteData != null) {
                for (QueueData data : topicRouteData.getQueueDatas()) {
                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                    data.setReadQueueNums(queueNums);
                    data.setWriteQueueNums(queueNums);
                }
            }
        } else {
            //通过rpc获取topicRoute信息,
            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
        }
        if (topicRouteData != null) {
            TopicRouteData old = this.topicRouteTable.get(topic);
            //判断topic相关的queue和broker是否有变更
            boolean changed = topicRouteDataIsChange(old, topicRouteData);
            if (!changed) {
                //是否更新topicPublishInfoTable,主要是判断旧的TopicPublishInfo是否为null,以及messageQueue是否为空,如果为空则需要更新
                changed = this.isNeedUpdateTopicRouteInfo(topic);
            } else {
                log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
            }

            if (changed) {
                TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                    this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                }

                //遍历更新producer中的topic信息
                // Update Pub info
                {
                    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                    publishInfo.setHaveTopicRouterInfo(true);
                    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, MQProducerInner> entry = it.next();
                        MQProducerInner impl = entry.getValue();
                        if (impl != null) {
                            impl.updateTopicPublishInfo(topic, publishInfo);
                        }
                    }
                }

                // Update sub info
                //更新消费者相关topic信息,和上面更新生产者类似
                {
                    Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, MQConsumerInner> entry = it.next();
                        MQConsumerInner impl = entry.getValue();
                        if (impl != null) {
                            impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                        }
                    }
                }
                log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                //更新mqClientInstance中的topic信息
                this.topicRouteTable.put(topic, cloneTopicRouteData);
                return true;
            }
        } else {
            log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
        }
    } catch (Exception e) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }
    } finally {
        this.lockNamesrv.unlock();
    }
}

代码还挺多,不过就是更新TopicRouteInfo的相关逻辑,主要是一些细节的处理,注释中都写了。

mqClientInstance里的mQClientAPIImpl作为rpc调用统一的入口,会在后续关于rocketmq-remoting的源码阅读中,结合rocketmq对netty的实践进行分析

2.选择发送队列

接着看一下selectOneMessageQueue()方法,这里又涉及到了一个MQFaultStrategy类,不过这里没考虑延迟容错,延迟容错东西也多,会在下面详细写。

先简单提一下吧,举个例子,有3个broker:brokerA,brokerB,borkerC,如果因为网络原因,和brokerC的连接非常慢,如果producer开启了延迟容错的功能,那么客户端就会停止往brokerC的queue中投递消息,而是一段时间后重新尝试,当然,延迟容错默认是不开启的

如果没考虑延迟容错,MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); 跟踪这段代码,可以看到

//org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //可以理解为index++,index为ThreadLocal变量
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //如果不是上一个broker,则返回该queue。保证失败后不往同一个broker中投递消息
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

主要就是轮询queue

消息发送

以上,我们已经确定了要投递的broker和queue,接下来就是消息发送了,也就是DefaultMQProducerImpl.sendKernelImpl()这一部分

因为本文还未涉及到vipChannel,事务消息,延迟队列等相关内容,这些后续会进行添加。

//org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

int sysFlag = 0;
boolean msgBodyCompressed = false;
//消息内容大于4k,会进行压缩,加上标记
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
    msgBodyCompressed = true;
}

//如果为事务消息,加上标记
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

//说实话没用过,也是一个钩子接口,可以在初始化生产者的时候注册进去
if (hasCheckForbiddenHook()) {
    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
    checkForbiddenContext.setCommunicationMode(communicationMode);
    checkForbiddenContext.setBrokerAddr(brokerAddr);
    checkForbiddenContext.setMessage(msg);
    checkForbiddenContext.setMq(mq);
    checkForbiddenContext.setUnitMode(this.isUnitMode());
    this.executeCheckForbiddenHook(checkForbiddenContext);
}
//消息发送钩子
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}

//构建请求,用于rpc交互
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }

    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}

SendResult sendResult = null;
switch (communicationMode) {
    case ASYNC:
        Message tmpMessage = msg;
        if (msgBodyCompressed) {
            //If msg body was compressed, msgbody should be reset using prevBody.
            //Clone new message using commpressed message body and recover origin massage.
            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            msg.setBody(prevBody);
        }
        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
        break;
    case ONEWAY:
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        //发送超时
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}

if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}

return sendResult;

其实这一块主要涉及到几个hook,重要的MQClientAPIImpl之后会详细写,包括netty的应用

RPCHook

在初始化producer时会有一个RPCHook的参数。简单提一下,在发送消息以及接收后会触发RPCHook,代码不多,就把相关的直接copy过来了

//org/apache/rocketmq/remoting/RPCHook
public interface RPCHook {
    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);

    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
        final RemotingCommand response);
}
//org/apache/rocketmq/remoting/netty/NettyRemotingClient
if (this.rpcHook != null) {
    this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
    throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}

可以看到,在请求前会调用doBeforeRequest(),返回结果后会调用doAfterResponse()。

延迟容错

如果producer开启了setSendLatencyFaultEnable,则启用了延迟容错的功能。其核心主要在org.apache.rocketmq.client.latency包下,其中LatencyFaultToleranceImpl类中维护了一个faultItemTable,FaultItem类是这样的

class FaultItem implements Comparable<FaultItem> {
    	//name指broker名称
        private final String name;
    	//该broker当前的延迟
        private volatile long currentLatency;
    	//该broker可以调用的时间。其实可以结合下面的isAvailable()来看,如果当前时间大于startTimestamp,则该broker"可用",其实这里的可用并非真正的可用,而是比如过了10分钟,我可以去试试这个broker现在是不是可用
        private volatile long startTimestamp;
    	public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }
        @Override
        public int compareTo(final FaultItem other) {
            if (this.isAvailable() != other.isAvailable()) {
                if (this.isAvailable())
                    return -1;

                if (other.isAvailable())
                    return 1;
            }

            if (this.currentLatency < other.currentLatency)
                return -1;
            else if (this.currentLatency > other.currentLatency) {
                return 1;
            }

            if (this.startTimestamp < other.startTimestamp)
                return -1;
            else if (this.startTimestamp > other.startTimestamp) {
                return 1;
            }

            return 0;
        }
}

按代码中看到的那样,currentLatency和startTimestamp又是怎么计算出来的呢?

其实回到DefaultMQProducerImpl.sendDefaultImpl()中可以看到,无论是发送成功还是失败,都会调用一次this.updateFaultItem():

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

其中(endTimestamp - beginTimestampPrev)就是发送消息的网络延迟,也就是FaultItem中的currentLatency,我们接着往下看。

//org/apache/rocketmq/client/latency/MQFaultStrategy.java
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        //currentLatency 15S以上-600000L......
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    return 0;
}

这里就是计算duration的地方,computeNotAvailableDuration(final long currentLatency)这算法举个例子来说,如果currentLatency为16s,返回的结果就是600000L,也就是600s,如果currentLatency为8s,返回的结果就是180s。往下看其实就发现,FaultItem里的startTimestamp=System.currentTimeMillis() + duration;

其实latencyMax数组和notAvailableDuration数组是写死的,其实producer是可以set这两个数组的,这其中的值自然可以根据实际情况进行人为调整。

现在在回过头来看sendDefaultImpl中的selectOneMessageQueue,其实就好理解了

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //延迟容错
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;

                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //判断该队列是否可用
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //获取该broker的写队列数量
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        //每次取+1(轮询)
        return tpInfo.selectOneMessageQueue();
    }
    //获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

在未开启延迟容错时,我们是直接取上次broker以外的broker中的queue(如果发送没超时,就是按顺序去一个)。

如果开启了延迟容错,如果broker被认为“不可用”,则会latencyFaultTolerance.pickOneAtLeast();这其中的算法也贴出来好了。

@Override
public String pickOneAtLeast() {
    final Enumeration<FaultItem> elements = this.faultItemTable.elements();
    List<FaultItem> tmpList = new LinkedList<FaultItem>();
    while (elements.hasMoreElements()) {
        final FaultItem faultItem = elements.nextElement();
        tmpList.add(faultItem);
    }

    if (!tmpList.isEmpty()) {
        //洗牌
        Collections.shuffle(tmpList);

        //排序
        Collections.sort(tmpList);

        final int half = tmpList.size() / 2;
        if (half <= 0) {
            return tmpList.get(0).getName();
        } else {
            final int i = this.whichItemWorst.getAndIncrement() % half;
            return tmpList.get(i).getName();
        }
    }

    return null;
}

至于排序规则,可以参考FaultItem类的定义

总结思考

1.其实对于InstanceName设置还是有一些坑。首先,在单机单进程下,如果要配置多个rocketmq集群,instanceName或者unitName一定是要设置的。因为MQClientInstance和clientId是1:1的,如果都按IP@PID走的话,那其实配置的多个生产者或者消费者都是共用了同一个MQClientInstance,那就代表了同样的nameSrv,broker等配置,这必然事与愿违。

2.另外在单机多进程下,如果代码中写死了InstanceName,导致两个进程的clientId相同(也就是同样的程序在同一个机子上起了两个。。。)。生产者这时发消息没有问题,但如果进行消费,那在broker端就认为这是同一个消费者,那就会涉及到重复消费,以及存在queue未被分配client实例的情况。当然这涉及到消费者以及负载均衡相关的内容,其中的原因会在后面提到。

3….未完

花了一天多的时间,粗略整理了一份最基本的producer的源码,竟然这么多,再加上在下实力也实在有限,定不能很好参透精髓,仅仅是个人粗鄙的理解。如有理解错误之处,非常欢迎指正。