RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
RocketMQ 前身叫做MetaQ,在MetaQ发布3.0版本的时候改名为 RocketMQ
RocketMQ本质上的设计思路和Kafka类似,但是和Kafka不同的是其使用Java进行开发,由于在国内的Java受众群体远远多于Scala、Erlang,所以RocketMQ是很多以Java语言为主的公司的首选。同样的RocketMQ和Kafka都是Apache基金会中的顶级项目,他们社区的活跃度都非常高,项目更新迭代也非常快。
RocketMQ是阿里review kafka的java版,如果消息性能要求高,用 RocketMQ 与 Kafka 可以更优
消息队列在实际应用中常用的使用场景,包含应用解耦、异步处理、流量削锋、消息通讯、日志处理等。
RocketMQ 官网:http://rocketmq.apache.org
RocketMQ Github:https://github.com/apache/rocketmq
Kafka 官网:http://kafka.apache.org
ActiveMQ 官网:http://activemq.apache.org
RabbitMQ 官网:https://www.rabbitmq.com
ZeroMQ 官网:https://zeromq.org
MetaMQ RocketMQ的前世今生
某公司一直用的消息中间件是MetaMQ现在,网上相关的资料也不是很多,今天去想淘宝为什会把MetaMQ给替换成了RocketMQ。就网上搜索了一下,这两个居然是爷孙关系。
阿里巴巴消息中间件起源于2001年的五彩石项目,Notify在这期间应运而生,用于交易核心消息的流转。
至2010年,B2B开始大规模使用ActiveMQ作为消息内核,随着阿里业务的快速发展,急需一款支持顺序消息,拥有海量消息堆积能力的消息中间件,MetaQ 1.0在2011年诞生。
到2012年,MetaQ已经发展到了MetaQ 3.0,并抽象出了通用的消息引擎RocketMQ。
随后,将RocketMQ进行了开源,阿里的消息中间件正式走入了公众的视野。
到2015年,RocketMQ已经经历了多年双十一的洗礼,在可用性、可靠性以及稳定性等方面都有出色的表现。与此同时,云计算大行其道,阿里消息中间件基于RocketMQ推出了Aliware MQ 1.0,开始为阿里云上成千上万家企业提供消息服务。
到今年,MetaQ在2016年双十一承载了万亿级消息的流转,跨越了一个新的里程碑,同时RocketMQ进入Apache 孵化。
RocketMQ 产品发展历史
大约经历了三个主要版本迭代:
1)Metaq 1.x(Metamorphosis)
由开源社区killme2008维护,开源社区非常活跃 https://github.com/killme2008/Metamorphosis
2)Metaq 2.x
于2012年10月份上线,在淘宝内部被广泛使用。
3)RocketMQ 3.x
基于公司内部开源共建原则, RocketMQ项目只维护核心功能,且去除了所有其他运行时依赖,核心功能最简化。每个BU的个性化需求都在RocketMQ项目之上进行深度定制。RocketMQ向其他BU提供的仅仅是Jar包,例如要定制一个Broker,那么只需要依赖rocketmq-broker这个jar包即可,可通过API进行交互,如果定制client,则依赖rocketmq-client这个jar包,对其提供的api进行再封装。
在RocketMQ项目基础上衍生的项目如下
- com.taobao.metaq v3.0 = RocketMQ + 淘宝个性化需求,为淘宝应用提供消息服务。
- com.alipay.zpullmsg v1.0 = RocketMQ + 支付宝个性化需求,为支付宝应用提供消息服务。
- com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B个性化需求,为B2B应用提供消息服务。
一、 MQ背景&选型
消息队列作为分布式、高并发系统的核心组件之一,能够帮助业务系统解构,提升开发效率和系统稳定性。
MQ 主要具有以下优势:
1)削峰填谷:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
2)系统解耦:解决不同重要程度、不同能力级别系统之间依赖导致一死全死
3)提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
4)蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
目前主流的MQ主要是RocketMQ、kafka、RabbitMQ等
RocketMQ相比于RabbitMQ、kafka具有主要优势特性有:
1)支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)
2)支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)
3)支持18个级别的延迟消息(rabbitmq和kafka不支持)
4)支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)
5)支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)
6)支持重复消费(rabbitmq不支持,kafka支持)
Rocketmq、kafka、Rabbitmq的详细对比,请参照下表格:
二、RocketMQ集群概述
1. RocketMQ集群部署结构
1) Name Server
Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
2) Broker
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。
3) Producer
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。
4) Consumer
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。
当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。
消费者队列是消费者连接之后(或者之前有连接过)才创建的。我们将原生的消费者标识由 {IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk。任何一个元素不同,都认为是不同的消费端,每个消费端会拥有一份自己消费对列(默认是broker对列数量*broker数量)。新挂载的消费者对列中拥有commitlog中的所有数据。
如果有需要,可以查看Rocketmq更多源码解析
三、 Rocketmq如何支持分布式事务消息
场景
A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层MQ,
A和MQ保持事务一致性(异常情况下通过MQ反查A接口实现check),
B和MQ保证事务一致(通过重试),从而达到最终事务一致性。
原理:大事务 = 小事务 + 异步
1. MQ与DB一致性原理(两方事务)
流程图
上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。
MQ消息、DB操作一致性方案:
1)发送消息到MQ服务器,此时消息状态为SEND_OK。此消息为consumer不可见。
2)执行DB操作;DB执行成功Commit DB操作,DB执行失败Rollback DB操作。
3)如果DB执行成功,回复MQ服务器,将状态为COMMIT_MESSAGE;如果DB执行失败,回复MQ服务器,将状态改为ROLLBACK_MESSAGE。注意此过程有可能失败。
4)MQ内部提供一个名为“事务状态服务”的服务,此服务会检查事务消息的状态,如果发现消息未COMMIT,则通过Producer启动时注册的TransactionCheckListener来回调业务系统,业务系统在checkLocalTransactionState方法中检查DB事务状态,如果成功,则回复COMMIT_MESSAGE,否则回复ROLLBACK_MESSAGE。
说明:
上面以DB为例,其实此处可以是任何业务或者数据源。
以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE 均是client jar提供的状态,在MQ服务器内部是一个数字。
TransactionCheckListener 是在消息的commit或者rollback消息丢失的情况下才会回调(上图中灰色部分)。这种消息丢失只存在于断网或者RocketMQ集群挂了的情况下。当RocketMQ集群挂了,如果采用异步刷盘,存在1s内数据丢失风险,异步刷盘场景下保障事务没有意义。所以如果要核心业务用RocketMQ解决分布式事务问题,建议选择同步刷盘模式。
2. 多系统之间数据一致性(多方事务)
当需要保证多方(超过2方)的分布式一致性,上面的两方事务一致性(通过RocketMQ的事务性消息解决)已经无法支持。这个时候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。
以上图交易系统为例:
1)交易系统创建订单(往DB插入一条记录),同时发送订单创建消息。通过RocketMQ事务性消息保证一致性
2)接着执行完成订单所需的同步核心RPC服务(非核心的系统通过监听MQ消息自行处理,处理结果不会影响交易状态)。执行成功更改订单状态,同时发送MQ消息。
3)交易系统接受自己发送的订单创建消息,通过定时调度系统创建延时回滚任务(或者使用RocketMQ的重试功能,设置第二次发送时间为定时任务的延迟创建时间。在非消息堵塞的情况下,消息第一次到达延迟为1ms左右,这时可能RPC还未执行完,订单状态还未设置为完成,第二次消费时间可以指定)。延迟任务先通过查询订单状态判断订单是否完成,完成则不创建回滚任务,否则创建。 PS:多个RPC可以创建一个回滚任务,通过一个消费组接受一次消息就可以;也可以通过创建多个消费组,一个消息消费多次,每次消费创建一个RPC的回滚任务。 回滚任务失败,通过MQ的重发来重试。
以上是交易系统和其他系统之间保持最终一致性的解决方案。
3. 案例分析
1) 单机环境下的事务示意图
如下为A给B转账的例子。
1锁定A的账户
2锁定B的账户
3检查A账户是否有1元
4A的账户扣减1元
5给B的账户加1元
6解锁B的账户
7解锁A的账户
步骤动作
以上过程在代码层面,甚至可以简化到在一个事物中,执行两条sql语句。
2) 分布式环境下事务
和单机事务不同,A、B账户可能不在同一个DB中,此时无法像在单机情况下使用事务来实现。
此时可以通过一下方式实现,将转账操作分成两个操作。
a) A账户
1锁定A的账户
2检查A账户是否有1元
3A的账户扣减1元
4解锁A的账户
步骤动作
b) MQ消息
A账户数据发生变化时,发送MQ消息,MQ服务器将消息推送给转账系统,转账系统来给B账号加钱。
c) B账户
1锁定B的账户
2给B的账户加1元
3解锁B的账户
步骤动作
四、 顺序消息
RocketMq有3中消息类型
1. 普通消费
2. 顺序消费
3. 事务消费
顺序消费场景:在网购的时候,我们需要下单,那么下单需要假如有三个顺序:
第一、创建订单
第二:订单付款
第三:订单完成
也就是这三个环节要有顺序,这个订单才有意义,RocketMQ可以保证顺序消费。
RocketMQ 实现顺序消费的原理:produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息
注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue
1. 顺序消息缺陷
发送顺序消息,无法利用集群Fail Over特性,消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。
2. 原理
produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。
注意:把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue
3. 扩展
可以通过实现发送消息的队列选择器方法,实现部分顺序消息。
举例:比如一个数据库通过MQ来同步,只需要保证每个表的数据是同步的就可以。解析binlog,将表名作为队列选择器的参数,这样就可以保证每个表的数据到同一个对列里面,从而保证表数据的顺序消费
五、 最佳实践
1. Producer
1) Topic
一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。
只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags 在broker做消息过滤。
2) key
每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key 尽可能唯一,这样可以避免潜在的哈希冲突。
// 订单Id
String orderId= "20034568923546";
message.setKeys(orderId);
3) 日志
消息发送成功或者失败,要打印消息日志,务必要打印 send result 和key 字段。
4) send
send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义。
SEND_OK:消息发送成功
FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
2. Consumer
1) 幂等
RocketMQ使用的消息原语是At Least Once,所以consumer可能多次收到同一个消息,此时务必做好幂等。
幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。
在编程中一个幂等操作的特点是:其任意多次执行所产生的影响(结果),均与一次执行的影响相同。
幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。
例如,“setTrue()”函数就是一个幂等函数,无论多次执行,其结果都是一样的。更复杂的操作幂等保证,是利用唯一交易号(流水号)实现。
2) 日志
消费时记录日志,以便后续定位问题。
3) 批量消费
尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。
MQ 消息队列的底层原理
1、生产者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
try {
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes
(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
2、消费者
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
3、RocketMQ架构原理
对于RocketMQ先抛出几个问题:
RocketMQ的topic和队列是什么样的,和Kafka的分区有什么不同?
RocketMQ网络模型是什么样的,和Kafka对比如何?
RocketMQ消息存储模型是什么样的,如何保证高可靠的存储,和Kafka对比如何?
3.1 RocketMQ架构图
对于RocketMQ的架构图,在大体上来看和Kafka并没有太多的差别,
但是在很多细节上是有很多差别的,接下来会一一进行讲述。
3.2 RocketMQ名词解释
在3.1的架构中我们有 多个Producer,多个主Broker,多个从Broker
每个Producer可以对应多个Topic,每个Consumer也可以消费多个Topic,多对多的关系
Broker信息会上报至NameServer,Consumer会从NameServer中拉取Broker和Topic的信息。
Producer:消息生产者,向Broker发送消息的客户端
Consumer:消息消费者,从Broker读取消息的客户端
Broker:消息中间的处理节点,这里和kafka不同,kafka的Broker没有主从的概念,都可以写入请求以及备份其他节点数据,RocketMQ只有主Broker节点才能写,一般也通过主节点读,当主节点有故障或者一些其他特殊情况才会使用从节点读,有点类似- 于mysql的主从架构。
Topic:消息主题,一级消息类型,生产者向其发送消息, 消费者读取其消息。
Group:分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的。
Tag:Kafka中没有这个概念,Tag是属于二级消息类型,一般来说业务有关联的可以使用同一个Tag,比如订单消息队列,使用Topic_Order,Tag可以分为Tag_食品订单,Tag_服装订单等等。
Queue: 在kafka中叫Partition,每个Queue内部是有序的,在RocketMQ中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。
NameServer:Kafka中使用的是ZooKeeper保存Broker的地址信息,以及Broker的Leader的选举,在RocketMQ中并没有采用选举Broker的策略,所以采用了无状态的NameServer来存储,由于NameServer是无状态的,集群节点之间并不会通信,所以上传数据的时候都需要向所有节点进行发送。
很多朋友都在问什么是无状态呢?状态的有无实际上就是数据是否会做存储,有状态的话数据会被持久化,无状态的服务可以理解就是一个内存服务,NameServer本身也是一个内存服务,所有数据都存储在内存中,重启之后都会丢失。
3.3 Topic 和 Queue
在RocketMQ中的每一条消息,都有一个Topic,用来区分不同的消息。一个Topic主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生产者写入的新消息。
在Topic中有分为了多个Queue,这其实是我们发送/读取消息通道的最小单位,我们发送消息都需要指定某个写入某个Queue,拉取消息的时候也需要指定拉取某个Queue,所以我们的顺序消息可以基于我们的Queue维度保持队列有序,如果想做到全局有序那么需要将Queue大小设置为1,这样所有的数据都会在Queue中有序。
在上图中我们的Producer会通过一些策略进行Queue的选择:
1)非顺序消息:非顺序消息一般直接采用轮训发送的方式进行发送。
2)顺序消息:根据某个Key,比如我们常见的订单Id,用户Id,进行Hash,将同一类数据放在同一个队列中,保证我们的顺序性。
我们同一组Consumer也会根据一些策略来选Queue,常见的比如平均分配或者一致性Hash分配。
要注意的是当Consumer出现下线或者上线的时候,这里需要做重平衡,也就是Rebalance,RocketMQ的重平衡机制如下:
1)定时拉取broker,topic的最新信息
2)每隔20s做重平衡
3)随机选取当前Topic的一个主Broker,这里要注意的是不是每次重平衡所有主Broker都会被选中,因为会存在一个Broker再多个Broker的情况
4)获取当前Broker,当前ConsumerGroup的所有机器ID
5)然后进行策略分配
由于重平衡是定时做的,所以这里有可能会出现某个Queue同时被两个Consumer消费,所以会出现消息重复投递。
Kafka的重平衡机制和RocketMQ不同,Kafka的重平衡是通过Consumer和Coordinator联系来完成的,当Coordinator感知到消费组的变化,会在心跳过程中发送重平衡的信号,然后由一个ConsumerLeader进行重平衡选择,然后再由Coordinator将结果通知给所有的消费者。
Queue 读写数量不一致
在RocketMQ中Queue被分为读和写两种,在最开始接触RocketMQ的时候,一直以为读写队列数量配置不一致不会出现什么问题的,比如当消费者机器很多的时候我们配置很多读的队列,但是实际过程中发现会出现消息无法消费和根本没有消息消费的情况。
当写的队列数量大于读的队列的数量,当大于读队列这部分ID的写队列的数据会无法消费,因为不会将其分配给消费者。
当读的队列数量大于写的队列数量,那么多的队列数量就不会有消息被投递进来。
这个功能在RocketMQ在我看来明显没什么用,因为基本上都会设置为读写队列大小一样,这个为啥不直接将其进行统一,反而容易让用户配置不一样出现错误。
这个问题在RocketMQ的Issue里也没有收到好的答案。
3.4 消费模型
一般来说消息队列的消费模型分为两种:MQPullConsumer 和 MQPushConsumer,基于推送的消息(push)模型和基于拉取(poll)的消息模型。
1)基于推送模型的消息系统,由消息代理记录消费状态。
消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。比如当我们把已经把消息发送给消费者之后,由于消费进程挂掉或者由于网络原因没有收到这条消息,如果我们在消费代理将其标记为已消费,这个消息就永久丢失了。如果我们利用生产者收到消息后回复这种方法,消息代理需要记录消费状态,这种不可取。
用过RocketMQ的同学肯定不禁会想到,在RocketMQ中不是提供了两种消费者吗?
MQPullConsumer和MQPushConsumer,其中MQPushConsumer不就是我们的推模型吗?
其实这两种模型都是客户端主动去拉消息,其中的实现区别如下:
1)MQPullConsumer
每次拉取消息需要传入拉取消息的offset和每次拉取多少消息量,具体拉取哪里的消息,拉取多少是由客户端控制。
2)MQPushConsumer
同样也是客户端主动拉取消息,但是消息进度是由服务端保存,Consumer会定时上报自己消费到哪里,所以Consumer下次消费的时候是可以找到上次消费的点,一般来说使用PushConsumer我们不需要关心offset和拉取多少数据,直接使用即可。
集群消费和广播消费
消费模式我们分为两种,集群消费,广播消费:
1)集群消费
同一个GroupId都属于一个集群,一般来说一条消息,只会被任意一个消费者处理。
2)广播消费
广播消费的消息会被集群中所有消费者进行消息,但是要注意:因为广播消费的offset在服务端保存成本太高,所以客户端每一次重启都会从最新消息消费,而不是上次保存的offset。
3.5 网络模型
在Kafka中使用的原生的socket实现网络通信,而RocketMQ使用的是Netty网络框架,现在越来越多的中间件都不会直接选择原生的socket,而是使用的Netty框架,主要得益于下面几个原因:
1)API使用简单,不需要关心过多的网络细节,更专注于中间件逻辑。
2)性能高。
3)成熟稳定,jdk nio的bug都被修复了。
选择框架是一方面,而想要保证网络通信的高效,网络线程模型也是一方面,我们常见的有
1+N(1个Acceptor线程,N个IO线程)
1+N+M(1个acceptor线程,N个IO线程,M个worker线程)等模型
RocketMQ使用的是 1+N1+N2+M 的模型,如下图所示:
1个acceptor线程,N1个IO线程,N2个线程用来做Shake-hand,SSL验证,编解码,M个线程用来做业务处理。这样的好处将编解码,和SSL验证等一些可能耗时的操作放在了一个单独的线程池,不会占据我们业务线程和IO线程。
3.6 高可靠的分布式存储模型
做为一个好的消息系统,高性能的存储,高可用都不可少。
3.6.1 高性能日志存储
RocketMQ和Kafka的存储核心设计有很大的不同,所以其在写入性能方面也有很大的差别,这是2016年阿里中间件团队对RocketMQ和Kafka不同Topic下做的性能测试:
产品Topic数量发送端并发数发送端RT(ms)发送端TPS消费端TPS
RocketMQ6480089000086000
12880097800077000
256800107500075000
Kafka648005136000136000
1282562385008500
25625613322152352
从上可以看出:
Kafka在Topic数量由64增长到256时,吞吐量下降了98.37%
RocketMQ在Topic数量由64增长到256时,吞吐量只下降了16%
这是为什么呢?kafka一个topic下面的所有消息都是以partition的方式分布式的存储在多个节点上。同时在kafka的机器上,每个Partition其实都会对应一个日志目录,在目录下面会对应多个日志分段。所以如果Topic很多的时候Kafka虽然写文件是顺序写,但实际上文件过多,会造成磁盘IO竞争非常激烈。
那RocketMQ为什么在多Topic的情况下,依然还能很好的保持较多的吞吐量呢?
我们首先来看一下RocketMQ中比较关键的文件:
这里有四个目录(这里的解释就直接用RocketMQ官方的了):
commitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
config:保存一些配置信息,包括一些Group,Topic以及Consumer消费offset等信息。
consumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。
Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:HOME \store\index\${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。 我们发现我们的消息主体数据并没有像Kafka一样写入多个文件,而是写入一个文件,这样我们的写入IO竞争就非常小,可以在很多Topic的时候依然保持很高的吞吐量。有同学说这里的ConsumeQueue写是在不停的写入呢,并且ConsumeQueue是以Queue维度来创建文件,那么文件数量依然很多,在这里ConsumeQueue的写入的数据量很小,每条消息只有20个字节,30W条数据也才6M左右,所以其实对我们的影响相对Kafka的Topic之间影响是要小很多的。我们整个的逻辑可以如下:
Producer不断的再往CommitLog添加新的消息,有一个定时任务ReputService会不断的扫描新添加进来的CommitLog,然后不断的去构建ConsumerQueue和Index。
注意:这里指的都是普通的硬盘,在SSD上面多个文件并发写入和单个文件写入影响不大。
读取消息
Kafka中每个Partition都会是一个单独的文件,所以当消费某个消息的时候,会很好的出现顺序读,我们知道OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取,将数据放入PageCache,所以Kafka的读取消息性能比较好。
RocketMQ读取流程如下:
先读取ConsumerQueue中的offset对应CommitLog物理的offset
根据offset读取CommitLog
ConsumerQueue也是每个Queue一个单独的文件,并且其文件体积小,所以很容易利用PageCache提高性能。而CommitLog,由于同一个Queue的连续消息在CommitLog其实是不连续的,所以会造成随机读,RocketMQ对此做了几个优化:
Mmap映射读取,Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销
使用DeadLine调度算法+SSD存储盘
由于Mmap映射受到内存限制,当不在Mmmap映射这部分数据的时候(也就是消息堆积过多),默认是内存的40%,会将请求发送到SLAVE,减缓Master的压力
3.6.2 可用性
3.6.2.1 集群模式
我们首先需要选择一种集群模式,来适应我们可忍耐的可用程度,一般来说分为四种:
1)单Master
这种模式,可用性最低,但是成本也是最低,一旦宕机,所有都不可用。这种一般只适用于本地测试。
2)单Master多SLAVE
这种模式,可用性一般,如果主宕机,那么所有写入都不可用,读取依然可用,如果master磁盘损坏,可以依赖slave的数据。
3)多Master
这种模式,可用性一般,如果出现部分master宕机,那么这部分master上的消息都不可消费,也不可写数据,如果一个Topic的队列在多个Master上都有,那么可以保证没有宕机的那部分可以正常消费,写入。如果master的磁盘损坏会导致消息丢失。
4)多Master多Slave
这种模式,可用性最高,但是维护成本也最高,当master宕机了之后,只会出现在这部分master上的队列不可写入,但是读取依然是可以的,并且如果master磁盘损坏,可以依赖slave的数据。
一般来说投入生产环境的话都会选择第四种,来保证最高的可用性。
3.6.2.2 消息的可用性
当我们选择好了集群模式之后,那么我们需要关心的就是怎么去存储和复制这个数据,rocketMQ对消息的刷盘提供了同步和异步的策略来满足我们的,当我们选择同步刷盘之后,如果刷盘超时会给返回FLUSH_DISK_TIMEOUT,如果是异步刷盘不会返回刷盘相关信息,选择同步刷盘可以尽最大程度满足我们的消息不会丢失。
除了存储有选择之后,我们的主从同步提供了同步和异步两种模式来进行复制,当然选择同步可以提升可用性,但是消息的发送RT时间会下降10%左右。
3.6.3 Dleger
我们上面对于master-slave部署模式已经做了很多分析,我们发现,当master出现问题的时候,我们的写入怎么都会不可用,除非恢复master,或者手动将我们的slave切换成master,导致了我们的Slave在多数情况下只有读取的作用。RocketMQ在最近的几个版本中推出了Dleger-RocketMQ,使用Raft协议复制CommitLog,并且自动进行选主,这样master宕机的时候,写入依然保持可用。
有关Dleger-RocketMQ的信息更多的可以查看这篇文章:Dledger-RocketMQ 基于Raft协议的commitlog存储库
3.7 定时/延时消息
定时消息和延时消息在实际业务场景中使用的比较多,比如下面的一些场景:
1)订单超时未支付自动关闭,因为在很多场景中下单之后库存就被锁定了,这里需要将其进行超时关闭。
2)需要一些延时的操作,比如一些兜底的逻辑,当做完某个逻辑之后,可以发送延时消息比如延时半个小时,进行兜底检查补偿。
3)在某个时间给用户发送消息,同样也可以使用延时消息。
在开源版本的RocketMQ中延时消息并不支持任意时间的延时,需要设置几个固定的延时等级,目前默认设置为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,从1s到2h分别对应着等级1到18,而阿里云中的版本(要付钱)是可以支持40天内的任何时刻(毫秒级别)。
我们先看下在RocketMQ中定时任务原理图:
Step1:Producer在自己发送的消息上设置好需要延时的级别。
Step2: Broker发现此消息是延时消息,将Topic进行替换成延时Topic,每个延时级别都会作为一个单独的queue,将自己的Topic作为额外信息存储。
Step3: 构建ConsumerQueue
Step4: 定时任务定时扫描每个延时级别的ConsumerQueue。
Step5: 拿到ConsumerQueue中的CommitLog的Offset,获取消息,判断是否已经达到执行时间
Step6: 如果达到,那么将消息的Topic恢复,进行重新投递。如果没有达到则延迟没有达到的这段时间执行任务。
可以看见延时消息是利用新建单独的Topic和Queue来实现的,如果我们要实现40天之内的任意时间度,基于这种方案,那么需要402460601000个queue,这样的成本是非常之高的,那阿里云上面的支持任意时间是怎么实现的呢?这里猜测是持久化二级TimeWheel时间轮,二级时间轮用于替代我们的ConsumeQueue,保存Commitlog-Offset,然后通过时间轮不断的取出当前已经到了的时间,然后再次投递消息。具体的实现逻辑需要后续会单独写一篇文章。
3.8 事务消息
事务消息同样的也是RocketMQ中的一大特色,其可以帮助我们完成分布式事务的最终一致性
具体使用事务消息步骤如下:
Step1:调用sendMessageInTransaction发送事务消息
Step2: 如果发送成功,则执行本地事务。
Step3: 如果执行本地事务成功则发送commit,如果失败则发送rollback。
Step4: 如果其中某个阶段比如commit发送失败,rocketMQ会进行定时从Broker回查,本地事务的状态。
事务消息的使用整个流程相对之前几种消息使用比较复杂,下面是事务消息实现的原理图:
Step1: 发送事务消息,这里也叫做halfMessage,会将Topic替换为HalfMessage的Topic。
Step2: 发送commit或者rollback,如果是commit这里会查询出之前的消息,然后将消息复原成原Topic,并且发送一个OpMessage用于记录当前消息可以删除。如果是rollback这里会直接发送一个OpMessage删除。
Step3: 在Broker有个处理事务消息的定时任务,定时对比halfMessage和OpMessage,如果有OpMessage且状态为删除,那么该条消息必定commit或者rollback,所以就可以删除这条消息。
Step4: 如果事务超时(默认是6s),还没有opMessage,那么很有可能commit信息丢了,这里会去反查我们的Producer本地事务状态。
Step5: 根据查询出来的信息做Step2。
我们发现RocketMQ实现事务消息也是通过修改原Topic信息,和延迟消息一样,然后模拟成消费者进行消费,做一些特殊的业务逻辑。当然我们还可以利用这种方式去做RocketMQ更多的扩展。