RocketMQ高级特性

消息模型

RocketMQ主要由ProducerBrokerConsumer三部分组成,Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。

Broker在实际部署过程中对应一台服务器,每个Broker可存储多个Topic消息每个Topic消息也可分片存储于不同的BrokerMessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中ConsumerGroup由多个Consumer 实例构成

每个Broker下都会生成对应的Topic的文件夹,每个Topic文件夹下会为每个队列生成一个以队列id作为文件名的文件夹,在文件夹内才是对应的MessageQueue文件

消息生产者

RocketMQ提供多种发送方式,同步发送异步发送顺序发送单向发送同步异步方式均需要Broker返回确认信息,单向发送不需要;

生产者中会把同一类Producer组成一个集合,叫做生产者组,这类Producer发送同一类消息发送逻辑一致。若发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组其他生产者实例提交回溯消费

消息消费者

一个消息消费者会从Broker服务器拉取消息,从用户应用的角度而言提供了两种消费形式拉取式消费推动式消费

  • 拉取式消费的应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
  • 推动式消费模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高,底层也是通过拉取模式来实现的,Broker建立长连接达到消息实时接收的效果

消费者同样会把同一类Consumer组成一个集合,叫做消费者组,这类Consumer通常消费同一类消息消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡容错的目标变得非常容易。消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ支持两种消息模式集群消费Clustering广播消费Broadcasting

  • 集群消费模式:相同Consumer Group的每个Consumer实例平均分摊消息。不同的Consumer Group全量接收消息。
  • 广播消费模式:相同Consumer Group的每个Consumer实例都接收全量的消息

Topic

Topic表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位

同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位MessageQueueMessageQueue是生产者发送消息与消费者消费消息的最小单位

Broker Server

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关元数据,包括消费者组消费进度偏移主题队列消息等。Broker Server要保证高可用需要搭建主从集群架构。RocketMQ中有普通集群Dledger高可用集群两种Broker架构模式。

普通集群

该集群模式下会给每个节点分配一个固定角色Master负责响应客户端的请求存储消息Slave则只负责对Master的消息进行同步保存,并响应部分客户端的读请求,消息同步方式分为同步同步异步同步。该集群模式下各节点角色无法进行切换,即Master节点挂了,则这一组Broker就不可用了,Slave不会顶上去成为Master。

Dledger高可用集群

Dledger是RocketMQ 4.5引入的实现高可用集群的一项技术。该模式下集群会随机选出一个节点作为Master,当Master节点挂了后,会从Slave中自动选出一个节点升级成为Master。

Dledger会从集群中选举出Master节点完成Master节点往Slave节点的消息同步,且接管Broker的CommitLog消息存储。Dledger是使用Raft算法来进行节点选举的。

每个节点有LeaderFollowerCandidate三个状态,正常运行情况下,集群中会有一个leader,其他都是Follower且只响应Leader和Candidate的请求,而客户端的请求全部由Leader处理,即使有客户端请求到了一个Follower也会将请求转发到Leader

集群刚启动时每个节点都是Follower状态,之后集群内部会发送一个timeout信号,所有Follower转成Candidate去拉取选票,获得大多数选票的节点选为Leader,其他候选人转为Follower。若一个timeout信号发出时,没有选出Leader,将会重新开始一次新的选举。Leader节点会往其他节点发送心跳信号,确认其Leader状态。

然后启动定时器,若指定时间内未收到Leader心跳,则转为Candidate状态,然后向其他成员发起投票请求,若收到半数以上成员的投票,则Candidate会晋升为Leader,然后Leader也有可能会退化成Follower

在Raft协议中会将时间分为一些任意时间长度的时间片段叫做term。term会使用一个全局唯一连续递增的编号作为标识,起到一个逻辑时钟的作用。

每个term时间片里都会进行新的选举,每个Candidate都会努力争取成为Leader。Leader节点在一个term时间片里会保持Leader状态,同一时间段内,集群中只会有一个Leader。某些情况下形成不了多数派,那该term可能直到结束都没有leader,直到下一个term再重新发起选举,也就没有了Zookeeper中的脑裂问题。而在每次重新选举的过程中, Leader也有可能会退化成为Follower。在该集群中Leader节点会不断变化

每次选举的过程中,每个节点都会存储当前term编号,并在节点之间进行交流时带上自己的term编号。若一个节点发现他的编号比另外一个小,则会将自己的编号更新为较大的那一个。若Leader或Candidate发现自己的编号不是最新的,则会自动转成Follower。若接收到的请求term编号小于自己的编号term将会拒绝执行

在选举过程中,Raft协议会通过心跳机制发起Leader选举。节点都是从Follower状态开始,若收到了来自Leader或Candidate的心跳RPC请求,则会保持Follower状态,避免争抢成为Candidate。而Leader会往其他节点发送心跳信号,来确认自己的地位。若Follower两个timeout信号内没有收到Leader的心跳信号,则会认为Leader挂了,发起新一轮选举。

选举开始后,每个Follower会增加自己当前的term,并将自己转为Candidate。然后向其他节点发起投票请求,请求时默认投自己一票。之后Candidate状态可能会发生以下三种变化:

  • 赢得选举成为Leader:若在一个term内收到了大多数的选票,将会在接下的剩余term时间内称为Leader,然后就可通过发送心跳确立自己的地位。每一个Server在一个term内只能投一张选票,并且按先到先得的原则投出
  • 其他节点成为Leader:在等待投票时,可能会收到其他Server发出心跳信号,说明Leader已产生。这时通过比较自己的term编号和RPC过来的term编号,若比对方大说明Leader的term过期,则拒绝该RPC并继续保持候选人身份,若对方编号不比自己小则承认对方的地位,转为follower。
  • 选票被瓜分选举失败:若无Candidate获取大多数选票,则无Leader产生,Candidate们等待超时后发起另一轮选举,为防止下一次选票还被瓜分,Raft采用随机Election Timeout即随机休眠时间机制防止选票被持续瓜分。通过将timeout随机设为一段区间上的某个值,因此很大概率会有某个Candidate率先超时然后赢得大部分选票。

以三个节点的集群为例,集群启动时三个节点都是Follower,发起投票后三个节点都会给自己投票,一轮投票下来三个节点的term都是1,选举不出Leader;三个节点会进入随机休眠,然后开始新一轮投票;

Dledger还会采用Raft协议进行多副本消息同步数据同步会通过uncommitted阶段commited阶段两个阶段来完成。

Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把该uncommitted数据发给Follower Broker的DledgerServer组件。

Follower Broker的DledgerServer收到uncommitted消息后,必须返回一个Ack给Leader Broker的Dledger。若Leader Broker收到超过半数的Follower Broker返回的Ack之后,就会把消息标记为committed状态

Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态

消息存储

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。RocketMQ采用的是类似于Kafka的文件存储机制,即直接用磁盘文件来保存消息,而不需要借助MySQL这一类索引工具

  • MQ收到一条消息后,需要向生产者返回一个ACK响应,并将消息存储起来。
  • MQ Push一条消息给消费者后,等待消费者的ACK响应,需要将消息标记为已消费。若没有标记为消费,MQ会不断尝试往消费者推送这条消息。
  • MQ需要定期删除一些过期的消息,这样才能保证服务一直可用。

目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过一般网卡传输速度。但是磁盘随机写速度只有大概100KB/sRocketMQ的消息用顺序写保证了消息存储的速度。

Linux操作系统分为用户态内核态文件操作网络操作需要涉及这两种形态的切换,免不了进行数据复制。一台服务器把本机磁盘文件的内容发送到客户端,一般分为读取本地文件内容将读取的内容通过网络发送出去两个步骤。看似简单的操作,实际进行了4 次数据复制

  • 磁盘复制数据到内核态内存
  • 内核态内存复制到用户态内存
  • 然后从用户态内存复制到网络驱动内核态内存
  • 最后从网络驱动的内核态内存复制到网卡中进行传输

通过使用mmap方式,可省去向用户态内存复制,提高速度。这种机制在Java中是通过NIO包中的MappedByteBuffer实现的。RocketMQ充分利用该特性,也就是所谓的零拷贝技术,提高消息存盘网络发送速度

采用MappedByteBuffer这种内存映射的方式有几个限制:一次只能映射1.5~2G的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因

零拷贝在Java NIO中提供了mmapsendfile两种实现方式,mmap适合比较小的文件sendfile适合传递比较大的文件

消息存储结构

RocketMQ消息的存储分为三个部分:

  • CommitLog存储消息的元数据,所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G以第一条消息的偏移量为文件名
  • ConsumerQueue存储消息在CommitLog的索引,一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog每个Broker下都会生成对应的Topic的文件夹,每个Topic文件夹下会为每个队列生成一个以队列id作为文件名的文件夹,在文件夹内才是对应的MessageQueue文件
  • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

  • abort:该文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下在启动时创建关闭服务时删除。若遇到服务器宕机或kill -9等一些非正常关闭服务的情况,该abort文件不会删除,RocketMQ就可判断上一次服务是非正常关闭,做一些数据恢复的操作。
  • checkpoint数据存盘检查点
  • config/*.json:将Topic配置消费者组配置、消费者组消息偏移量Offset关键配置信息进行存盘保存。

刷盘机制

RocketMQ需要将消息存储到磁盘上才能保证断电后消息不丢失,才可以让存储的消息量超出内存的限制。为了提高性能,会尽量保证磁盘的顺序写。消息在写入磁盘时有SYNC_FLUSH同步刷盘ASYNC_FLUSH异步刷盘两种写磁盘的方式。通过Broker配置文件中flushDiskType参数设置。

  • 同步刷盘返回写成功状态时,消息已经被写入磁盘。消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

  • 异步刷盘:返回写成功状态时,消息可能只是被写入了内存的PAGECACHE写操作的返回快吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作快速写入。

主从复制

若Broker以集群方式部署,会有一个Master节点和多个Slave节点,消息需要从Master复制到Slave上。而消息复制的方式分为同步复制异步复制。通过Broker配置文件里的brokerRole参数进行设置,该参数可以被设置成ASYNC_MASTERSYNC_MASTERSLAVE三个值中的一个。

  • 同步复制等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。若Master故障Slave上有全部数据备份,很容易恢复数据。同步复制会增大数据写入延迟降低系统的吞吐量

  • 异步复制Master写入消息成功就反馈给客户端写入成功的状态,再异步将消息复制给Slave节点。系统拥有较低延迟较高吞吐量。但若Master故障而有些数据没有完成复制就会造成数据丢失

负载均衡

Producer负载均衡

Producer发送消息时,默认轮询目标Topic下的所有MessageQueue,并采用递增取模方式往不同的MessageQueue上发送消息,让消息平均落在不同的Queue上。由于MessageQueue分布在不同的Broker,故消息也会发送到不同的Broker上。生产者在发送消息时,可指定一个MessageQueueSelector,通过该对象来将消息发送到指定的MessageQueue上,可通过该方式保证消息局部有序

Consumer负载均衡

Consumer也是以MessageQueue为单位来进行负载均衡,分为集群模式广播模式。广播模式下每条消息都会投递给订阅了Topic的所有消费者实例,在Consumer分配Queue时,所有Consumer都分到所有的Queue。

集群消费模式每条消息只需要投递到订阅该TopicConsumer Group下的一个实例,RocketMQ采用主动拉取方式拉取并消费消息,在拉取时需明确指定拉取哪一条MessageQueue

每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时会按照Queue的数量实例的数量平均分配Queue给每个实例

每次分配时都会将MessageQueue消费者ID进行排序,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类可在Consumer中直接指定默认情况下使用的是最简单的平均分配策略

  • AllocateMachineRoomNearby将同机房的Consumer和Broker优先分配在一起。该策略可通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般用平均分配策略轮询分配策略
  • AllocateMessageQueueAveragely平均分配,将所有MessageQueue平均分给每一个消费者
  • AllocateMessageQueueAveragelyByCircle轮询分配,轮流的给一个消费者分配一个MessageQueue。
  • AllocateMessageQueueByConfig: 直接指定一个messageQueue列表,类似于广播模式,直接指定所有队列。
  • AllocateMessageQueueByMachineRoom按逻辑机房的概念进行分配。对BrokerName和ConsumerIdc有定制化的配置。
  • AllocateMessageQueueConsistentHash一致性哈希策略只需要指定一个虚拟节点数,用一个哈希环算法,虚拟节点是为了让Hash数据在环上分布更为均匀。

Consumer平均分配

消息重试

广播模式的消息是不存在消息重试机制,消息消费失败后不会再重新进行发送,继续消费新的消息。对于普通消息,当消费者消费消息失败后,可通过设置返回状态达到消息重试的结果

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置,有返回Action.ReconsumeLater返回NULL抛出异常三种配置方式。

重试消息会进入一个%RETRY%+ConsumeGroup队列中,默认允许每条消息最多重试16重试时间跟延迟消息的延迟级别是对应的,不过取的是延迟级别的后16级别。

重试次数 与上次重试的间隔时间 重试次数 与上次重试的间隔时间
1 10秒 9 7分钟
2 30秒 10 8分钟
3 1分钟 11 9分钟
4 2分钟 12 10分钟
5 3分钟 13 20分钟
6 4分钟 14 30分钟
7 5分钟 15 1小时
8 6分钟 16 2小时

若消息重试16次后仍然失败,消息将不再投递转为进入死信队列,可通过consumer.setMaxReconsumeTimes(20)自定义重试次数,当定制的重试次数超过16次后,消息重试时间间隔均为2小时

消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效。且最后启动的Consumer会覆盖之前启动的Consumer的配置

死信队列

当一条消息消费失败会自动进行消息重试,若消息超过最大重试次数,RocketMQ认为该消息有问题,但不会立刻将该消息丢弃,而是将其发送到这个消费者组对应的一种特殊队列死信队列。死信队列名称%DLQ%+ConsumGroup

  • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
  • 若一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • 一个死信队列包含了该ConsumeGroup里的所有死信消息,而不区分该消息Topic
  • 死信队列中的消息不会再被消费者正常消费
  • 死信队列的有效期跟正常消息相同默认3,对应broker.conf中的fileReservedTime属性。超过该最长时间的消息都会被删除,而不管消息是否消费过。

通常一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费或者丢弃。

消息幂等

  • 发送时消息重复:当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 若此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同且MessageID也相同的消息。
  • 投递时消息重复:消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,RocketMQ服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且MessageID也相同的消息。
  • 负载均衡时消息重复:包括但不限于网络抖动、Broker重启以及订阅方应用重启,当RocketMQ的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。

在RocketMQ中,无法保证每个消息只被投递一次,所以要在业务上自行来保证消息消费的幂等性。RocketMQ的每条消息都有一个唯一的MessageId,该参数在多次投递的过程中是不会改变,业务上可用MessageId来作为判断幂等的关键依据。

MessageId无法保证全局唯一,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识如订单ID。而该业务标识可使用MessageKey来进行传递

消息零丢失

  • 生产者使用事务消息机制。
  • Broker配置同步刷盘+Dledger主从架构
  • 消费者不要使用异步消费。
  • 整个MQ挂了之后准备降级方案

消息积压处理

若Topic下的MessageQueue配置得是足够多的,每个Consumer实际上会分配多个MessageQueue来进行消费。可通过增加Consumer服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。此时再继续增加Consumer的服务节点就没有用了。

若Topic下的MessageQueue配置不够多,就不能用上面这种增加Consumer节点个数的方法。若要快速处理积压的消息,可创建一个新的Topic配置足够多的MessageQueue,把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者只负责消费旧Topic中的消息,并转储到新的Topic中,在新的Topic上就可以通过增加消费者个数来提高消费速度了。

若RocketMQ原本是采用的普通方式搭建主从架构,而现在想要中途改为使用Dledger高可用集群,这时若不想历史消息丢失,就需要先将消息进行对齐,也就是要消费者把所有的消息都消费完,再来切换主从架构。因为Dledger集群会接管RocketMQ原有的CommitLog日志,切换主从架构时,若有消息没有消费完,这些消息是存在旧的CommitLog中的,就无法再进行消费了。该场景下也是需要尽快的处理掉积压的消息。