消息队列在data中的应用

Posted by New Boy on June 30, 2019

在不久前,在data和exchange项目中,我们将所有用到rocketmq的地方全部切换到了kafka上。在结汇项目中,消息队列也已经用了近一年多,期间也踩过几个坑,此wiki用以记录消息队列在data和exchange中的实践

不经意的TIMEOUT_CLEAN_QUEUE

记得在去年11月初的时候,确切来说是在ebay API的限流正式放开之后。某日早上,日常检查线上error日志,发现了几条之前从未出现过的信息

sendKernelImpl exception, resend at once, InvokeID: 6395301748774370577, RT: 213ms, Broker: MessageQueue [topic=ORDER, brokerName=prd-idc-broker-1, queueId=10]
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 209ms, size of queue: 0
....(具体的消息体)

可以大致看出来,broker太繁忙了,消息发送失败了!去hbase中查了一下(应用在消费这些消息后会入hbase),果然没有找到这些消息!为了暂时减少这种情况的发生,先降低了拉取数据的频率。

本着“翻山越岭,纵横四海”的精神,从git拉下了4.2.0版本的rocketmq源码,还是有必要确定一下为什么会发生这种情况,这样才能采取适合的措施。

溯源

org.apache.rocketmq.broker.latency.BrokerFastFailure#cleanExpiredRequestInQueue方法中,我找到了这条日志

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while....

再接着分析之前,先简单提一下rocketmq中的通信模块。rocketmq中的RPC也是基于netty来实现的。抛开别的细节,当我们的producer端掉用sendMessage方法时,对应就会有个broker接收到这个请求,之后broker会把这个请求丢到与之相对应的线程池中进行异步处理。

那什么时候会触发[TIMEOUT_CLEAN_QUEUE]呢?broker在启动的时候,会开启很多的异步服务,其中就包括一个BrokerFastFailure服务,这个服务有个功能就是定时去清理“过期”的请求。“过期”请求被干掉后,就会抛出[TIMEOUT_CLEAN_QUEUE]相关异常

怎么才算“过期”请求呢?在请求丢进对应的线程池处理之前,会生成一个时间戳,记录这个请求创建的时间。当触发cleanExpiredRequestInQueue方法时,会从线程池的阻塞队列中取出首个请求,如果当前时间比请求创建时间多200ms(默认设为200ms),则会干掉该请求,并抛出异常

然后再来看看相应的处理生产者请求的线程池的配置(相关参数均为默认参数)

this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
  1,
  1,
  1000 * 60,
  TimeUnit.MILLISECONDS,
  new LinkedBlockingQueue<Runnable>(10000),
  new ThreadFactoryImpl("SendMessageThread_"));

我刚看到的时候,我比较疑惑的是,它的核心线程数和最大线程数竟然只设为了1!这不就相当于单线程去处理请求了吗?在多核处理器的时代,我不得不怀疑这样设置真的好吗?不过很快,我又注意到在这个核心线程数配置上方有这么一行注释

/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
* value is 1.
*/

到这里,就不得不提一下rockemtq的存储了,rocketmq是直接使用了磁盘文件来作为底层的存储,所有topic,所有queue的消息都会写入到~/store/commitLog这个文件夹下。众所周知,rocketmq默认使用了异步刷盘的方式,很大程度上提高了系统的吞吐量。而所谓异步刷盘,即消息先写入到ByteBuffer中,然后由FlushRealTimeService服务异步刷新到磁盘上。

因此,消息写入到ByteBuffer的过程就必须保证是线程安全的!那么就需要用到锁,在rocketmq 4.0版本之前,默认是使用了ReentrantLock来保证线程安全,但在4.0版本之后,默认是通过AtomicBoolean来实现的自旋锁以保证线程安全。那么核心线程数设为1以及上面那段注释也就好理解了。自旋锁虽然效率要远高于互斥锁,但自旋锁会一直占用CPU,如果其余线程短时间内没有获取到,会降低CPU的性能,我想这应该就是broker中将核心线程数设为1的原因

broker服务可用性

broker要保证服务的可用性,很容易想到,主要就是要保证内存(这里主要指jvm)和外存(这里主要指磁盘,因为broker的数据是直接写入到文件中的)在容量上不会出现问题,保证中间件可以持续提供服务。

在外存的处理上

1.定时清理磁盘文件

在内存的处理上

1.阻塞队列设置了大小

2.定时清理阻塞队列中的

3.beginTimeInLock

问题解决方案

当时有准备了以下几个方案:

1.最显而易见的方法就是直接增大BrokerConfig中的waitTimeMillsInSendQueue这个字段的值

2.切换中间件。回归到业务本身,data用于拉取店铺的订单数据,通过rocketmq发送给下游的结汇业务。而rocketmq最早是阿里借鉴了kafka的设计,为交易系统做的一套中间件。这么来看,其实号称百万级的并发写入的kafka似乎更符合我们的业务场景。

其实可以单单从底层的存储来看看kafka和rocketmq的区别。

1.偷张rocketmq的图

2.讲讲kafka的底层存储的区别

因为正好kafka已经在项目的其他地方有用到,所以抽时间把rocketmq的流量全切到了kafka上。一是保证rocketmq的稳定,保证其他的业务系统的稳定使用,二也是为了我们data-exchange系统的稳定