存储概要设计
RocketMQ 主要存储的文件包括Comitlog文件、ConsumeQueue 文件、IndexFile文件。
RocketMQ将所有主题的消息存储在同-个文件中,确保消息发送时顺序写文件, 尽最大的能力确保消息发送的高性能与高吞吐量。
为了提高消息消费的效率, RocketMQ引入了ConsumeQueue消息队列文件, 每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。
IndexFile索引文件, 其主要设计理念就是为了加速消息的检索性能, 根据消息的属性快速从 Commitlog 文件中检索消息。
为什么单机kafka topic过多会导致性能下降,RocketMq下降不明显?
单机情况下,Kafka由于是一个topic的分区日志对应一个文件,当topic数量过多的时候,会出现大量IO竞争,导致吞吐率大幅下降
参考阿里中间件团队博客:http://jm.taobao.org/2016/04/07/kafka-vs-rocketmq-topic-amout/

RocketMq存储文件

- commitlog :消息存储目录。
- config :运行期间一些配置信息,主要包括下列信息
- consumerFilter.json :主题消息过滤信息。
- consumerOffset.json :集群消费模式消息消费进度。
- delayOffset.json:延时消息队列拉取进度
- …
- consumeQueue :消息消费队列存储目录。
- index :消息索引文件存储目录。
- abort :如果存在 abort 文件说明Broker 非正常关闭,该文件默认启动时创建,正常退出之前删除。
- checkpoint:文件检测点,存储 commitlog 文件最后一次刷盘时间戳、 consumeQueue最后一次刷盘时间、 index 索引文件最后一次刷盘时间戳。
commitlog
Commitlog文件存储目录为store/commitlog目录,每一个文件默认1个G,一个文件写满后再创建另外一个,以该文件中第一个偏移量为文件名,偏移量小于 20 位用 0 补齐。
将消息写入到commitlog后,broker会根据配置去执行同步刷盘或异步刷盘,然后再执行HA机制。


ConsumeQueue文件
该文件可以看成是 Commitlog 关于消息消费的“索引”文件,consumequeue的第一级目录为消息主题,第二级目录为主题的消息队列。
为了加速 ConsumeQueue 消息条目的检索速度与节省磁盘空间,每一个 Consumequeue 条目不会存储消息的全量信息。


单个ConsumeQueue文件中默认包含30万个条目,单个文件的长度为30w× 20字节,单个 ConsumeQueue文件可以看出是一个ConsumeQueue条目的数组,其下标为 ConsumeQueue的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。 ConsumeQueue即为 Commitlog文件的索引文件。
其构建机制是当消息到达 Commitlog 文件后 ,由专门的线程产生消息转发任务,从而构建消息消费队列文件与下文提到的索引文件。
Index 索引文件
RocketMQ引入了Hash索引机制为消息建立索引, HashMap 的设计包含两个基本点 : Hash槽与Hash冲突的链表结构。

IndexHeader头部,包含40个字节,记录该IndexFile的统计信息,其结构如下。
- beginTimestamp :该索引 文件中包含消息 的最小存储时间。
- endTimestamp : 该索引文件中包含消息的最大存储时间。
- beginPhyoffset : 该索引文件中包含消息的最小物理偏移量( commitlog 文件偏移量)。
- endPhyoffset :该索引 文件中包含消息 的最大物理偏移量( commitlog 文件偏移量)。
- hashslotCount: hashslot个数,并不是 hash 槽使用的个数,在这里意义不大。
- indexCount: Index条目列表当前已使用的个数, Index条目在Index条目列表中按顺序存储。
Hash槽, 一个IndexFile默认包含500万个Hash槽,每个Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引。
Index条目列表,默认一个索引文件包含 2000 万个条目,每一个 Index 条目结构如下。
- hashcode:key的 hashcode。
- phyoffset :消息对应的物理偏移量。
- timedif:该消息存储时间与第一条消息的时间戳的差值,小于 0 该消息无效 。
- prelndexNo :该条目的前一条记录的Index 索引, 当出现hash冲突时 , 构建的链表结构。
checkpoint
checkpoint 的作用是记录 Comitlog 、 ConsumeQueue 、 Index 文件的刷 盘时间点, 固定长度为 4k ,其中只用该文件的前面 24个字节。

实时更新消息队列与与索引文件
消息消费队列文件、消息属性索引文件都是基于CommitLog文件构建的,当消息生产者提交的消息存储在Commitlog文件中,ConsumeQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。
RocketMQ通过开启一个线程ReputMessageServcie来准实时转发CommitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue、IndexFile文件。
消息队列与与索引文件恢复
由于RocketMQ存储首先将消息全量存储在Commitlog文件中,然后异步生成转发任务更新ConsumeQueue、Index文件。**如果消息成功存储到Commitlog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因看机,导致Commitlog、ConsumeQueue、IndexFile文件数据不一致。**如果不加以人工修复的话,会有一部分消息即便在Commitlog文件中存在,但由于并没有转发到Consumequeue,这部分消息将永远不会被消费者消费。
根据abort是否存在判断是否正常停机
- 正常停机,默认从倒数第三个文件开始进行恢复,遍历Commitlog。
- 异常停止,需要从最后一个文件往前走,找到第一个消息存储正常的文件。
如果commitlog目录没有消息文件,如果在消息消费队列目录下存在文件,则需要销毁。
文件刷盘机制
RocketMQ的存储与读写是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。
- 如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法;
- 如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端。RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘操作。
通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNCFLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘),默认为异步刷盘。
文件过期机制
由于RocketMQ操作CommitLog、ConsumeQueu巳文件是基于内存映射机制并在启动的时候会加载commitlog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引人一种机制来删除己过期的文件。
RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时.(如果对文件使用touch 命令,会更新文件的时间)
RocketMQ 在如下三种情况任意之一满足的情况下将继续执行删除文件操作。
- 指定删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认为凌晨4点。
- 磁盘空间是否充足,如果磁盘空间不充足,则返回true,表示应该触发过期文件删除操作。
- 预留,手工触发,可以通过调用excuteDeleteFilesManualy方法手工触发过期文件删除,目前RocketMQ暂未封装手工触发文件删除的命令。
HA
主从同步
主要分同步复制和异步复制
从服务器在启动的时候主动向主服务器建立TCP长连接,然后获取服务器的commitlog最大偏移量,以此偏移量向主服务器主动拉取消息,主服务器根据偏移量,与自身commitlog文件的最大偏移量进行比较,如果大于从服务器的commitlog偏移量,主服务器将向从服务器返回一定数量的消息,该过程循环进行,达到主从服务器数据同步。
- 同步复制:保证消息不丢失,但是会影响效率
- 异步复制:消息少量丢失,效率更高
读写分离
RocketMQ读写分离与其他中间件的实现方式完全不同,RocketMQ是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。
Dlegder
官方最新的4.5版本已经利用Dlegder(基于raft协议的CommitLog存储库)支持故障转移(failover),当主节点挂后,从节点能自动切换成主节点(前提一组broker有至少有3个以上的奇数节点)