RocketMQ消息消费概述
消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。
- 集群模式: 主题下的同一条消息只允许被其中一个消费者消费。
- 广播模式: 主题下的同一条消息将被集群内的所有消费者消费一次。
消息服务器与消费者之间的消息传送也有两种方式:推模式、拉模式。
- 拉模式 是消费端主动发起拉消息请求
- 推模式 是消息到达消息服务器后,推送给消息消费者。(RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。)
消息消费流程
消息拉取
PullMessageService实现
PullMessageService在初始化后会调用start方法,一直不断的去broker拉取消息。
PullMessageService从消息服务器默认每次拉取32条消息,按消息的队列偏移量顺序存放在ProcessQueue中,PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。

消息队列负载与重新分布机制
RocketMQ消息队列重新分布是由RebalanceService线程来实现的。一个MQClientInstance持有一个RebalanceService实现,并随着MQC!ientlnstance的启动而启动。
RebalanceService线程默认每隔20s执行一次mqClientFactory.doRebalance()方法,可以使用-Drocketmq.client.r巳balance.waitlnterval=interval来改变默认值。
MQClientlinstance遍历已注册的消费者,对消费者执行doRebalance()方法。
每个DefaultMQPushConsumerlmpl都持有一个单独的Rebalancelmpl对象,该方法主要是遍历订阅信息对每个主题的队列进行重新负载。
主要进行负载均衡的方法为
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
负载均衡的实现
前提
消费者在启动的时候会向MQClientlnstance中注册消费者,然后MQClientlnstance会向所有的Broker发送心跳包,心跳包中包含MQClientlnstance的消费者信息。
RocketMQ用一个叫ClientID的概念,来唯一标记一个客户端实例,一个客户端实例对于Broker而言会开辟一个Netty的客户端实例。 而ClientID是由ClientIP+InstanceName构成,故如果一个进程中多个实例(无论Producer还是Consumer)ClientIP和InstanceName都一样,他们将公用一个内部实例(同一套网络连接,线程资源等)
此外,此ClientID在对于Consumer负载均衡的时候起到唯一标识的作用,一旦多个实例(无论不同进程、不通机器、还是同一进程)的多个Consumer实例有一样的ClientID,负载均衡的时候必然RocketMQ任然会把两个实例当作一个client(因为同样一个clientID)。
ClientConfig中可以看到CID的生成方式。
消息队列负载机制遵循一个通用的思想:一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列
同一个消息队列只会分配给一个消费者, 故如果消费者个数大于消息队列数量, 则有些消费者无法消费消息。
- 从主题订阅信息缓存表中获取主题的队列信息;发送请求从Broker中该消费组内当前所有的消费者客户端ID(随机选择一台Broker)
- 对cidAll(所有的消费者id)和mqAll(所有的消息队列)进行排序。
- 使用负载均衡策略进行分配,有6种负载策略(原来是5种,最新版本源码中有6种,参考rebalance包)
- AllocateMessageQueueAveragely 平均分配
- AllocateMessageQueueAveragelyByCircle 平均轮询分配
- AllocateMessageQueueConsistentHash 一致性哈希
- AllocateMessageQueueByConfig 根据配置指定消费
- AllocateMessageQueueByMachineRoom 根据Broker部署机房名
- AllocateMachineRoomNearby 根据最近的机房
对比消息队列是否发生变化,主要思路是遍历当前负载队列集合,如果队列不在新分配队列集合中,需要将该队列停止消费并保存消费进度;遍历已分配的队列,如果队列不在队列负载表中(processQueueTable)则需要创建该队列拉取任务PullRequest,然后添加到PullMessageService线程的pullRequestQueue中,PullMessageService才会继续拉取任务。
- 分配到队列,开始消费数据
由于每次进行队列重新负载时会从Broker实时查询出当前消费组内所有消费者,并且对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消费队列从而消费消息。

消息消费
如果是广播模式,业务方返回RECONSUME_LATER,消息并不会重新被消费,只是以警告级别输出到日志文件。
如果是集群模式,只有在业务方返回RECONSUME_LATER时,该批消息都需要发ACK消息,如果消息发送ACK失败,则直接将本批ACK消费发送失败的消息再次封装为ConsumeRequest,然后延迟5s后重新消费。如果ACK消息发送成功,则该消息会延迟消费。消费成功的消息不会进行ACK的返回,直接通过消费进度同步到Broker上面。
在执行消息消费的前后,都要验证验证一下ProcessQueue的isDroped状态值,如果设置为tru,将不进行处理,如果由于由新的消费者加入或原先的消费者出现若机导致原先分给消费者的队列在负载之后分配给别的消费者,那么在应用程序的角度来看的话,消息会被重复消费。
1 2 3 4 5 6 7
| if(!isDroped){ do Consume }
if(!isDroped){ submit result }
|
消息消费ACK机制&消息重试
官方文档引述
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
- 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
- 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
Consumer启动时会自动订阅重试主题消息。
RocketMQ 消息重试是以消费组为单位,而不是主题,消息重试主题名为**%RETRY%+消费组名**。 消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。
只有当消费模式为MessageModel.CLUSTERING(集群模式) 时,Broker才会自动进行重试,对于广播消息是不会重试的。
集群消费模式下,当消息消费失败,RocketMQ会通过消息重试机制重新投递消息,努力使该消息消费成功。
消费者消费消息以后,只有返回ConsumeConcurrentlyStatus 消费成功状态,才不会进入消息重试,否则Broker会对消息进行重新投递。
broker会根据返回的ACK数据进行如下操作
- 创建重试主题,重试主题名称:%RETRY%+消费组名称,并从重试队列中随机选择一个队列,并构建TopicConfig主题配置信息。
- 设置消息重试次数,如果消息已重试次数超过maxReconsumeTimes,再次改变newTopic主题为DLQ(”%DLQ%”),该主题的权限为只写,说明消息一旦进入到DLQ队列中,RocketMQ将不负责再次调度进行消费了,需要人工干预。(死信队列)
- 根据原先的消息创建一个新的消息对象,重试消息会拥有自己的唯一消息ID(msgld)并存人到commitlog文件中,并不会去更新原先消息,而是会将原先的主题、消息ID存入消息的属性中,主题名称为重试主题,其他属性与原先消息保持相同
- 在存入Commitlog文件之前,如果消息的延迟级别delayTimeLevel大于0,替换消息的主题与队列为定时任务主题“SCHEDULE_TOPIC_XXXX”,队列ID为延迟级别减1。再次将消息主题、队列存入消息的属性中,键分别为:PROPERTYREALTOPIC、PROPERTY_REALQUEUE_ID
- ACK 消息存入 CommitLog 文件后 ,将依托 RocketMQ 定时消息机制在延迟时间到期 后再次将消息拉取,提交消费线程池
消费进度管理
广播模式:同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是对立的,互相不影响,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定。
广播模式消息消费进度存储在消费者本地,其实现类LocalFileOffsetStore。
集群模式:同一个消费组内的所有消息消费者共享消息主题下的所有消息,同一条消息(同一个消息消费队列)在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度需要保存在一个每个消费者都能访问到的地方。
集群模式消息进度存储文件存放在消息服务端 Broker,RemoteBrokerOffsetStore。
消费者在消费完数据后,调用processConsumeResult方法 updateOffset方法更新内存中的offset, persist方法会持久化offset到本地或者RemoteBroker。消费者在处理完一个消费任务以后都会用当前队列的最小偏移量来更新消费进度(内存中更新,不做持久化)。
消费者会在下面两种情况下做持久化
- 负载均衡,消费者移除部分queue的时候,见removeUnnecessaryMessageQueue()方法
- MQClientInstance startScheduledTask()方法中 定时调用persistAllConsumerOffset()方法 默认5秒同步一次。
Broker 端默认 10s 持久化一次消息进度。

定时消息&消息重试
定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
延迟级别
RocketMQ只支持特定级别的延迟消息
要支持任意时间精度的定时调度,不可避免地需要在Broker层做消息排序,再加上持久化方面的考量,将不可避免地带来具大的性能消耗。
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。**注意,messageDelayLevel是broker的属性,不属于某个topic。**发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
ps :超过18以后,也只会取前18个level。
18个level可以自定义,支持 s m h d,最多到天
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX(XXX不是占位符。。。真的就叫XXX)的topic中。
根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。
broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
1 2
| // MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
public boolean parseDelayLevel() { HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(); timeUnitTable.put("s", 1000L); timeUnitTable.put("m", 1000L * 60); timeUnitTable.put("h", 1000L * 60 * 60); timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); try { String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) { String value = levelArray[i]; String ch = value.substring(value.length() - 1); Long tu = timeUnitTable.get(ch);
int level = i + 1; if (level > this.maxDelayLevel) { this.maxDelayLevel = level; } long num = Long.parseLong(value.substring(0, value.length() - 1)); long delayTimeMillis = tu * num; this.delayLevelTable.put(level, delayTimeMillis); } } catch (Exception e) { log.error("parseDelayLevel exception", e); log.info("levelString String = {}", levelString); return false; }
return true; }
|
producer发送定时消息
设置消息延迟级别
1 2
| Message msg = new Message(...); msg.setDelayTimeLevel(level);
|
broker存储定时消息
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX(XXX不是占位符。。。真的就叫XXX)的topic中。
根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| 1: //【CommitLog.java】 2: /** 3: * 添加消息,返回消息结果 4: * 5: * @param msg 消息 6: * @return 结果 7: */ 8: public PutMessageResult putMessage(final MessageExtBrokerInner msg) { 9: // ....(省略代码) 10: 11: // 定时消息处理 12: final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 13: if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// 14: || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { 15: // Delay Delivery 16: if (msg.getDelayTimeLevel() > 0) { 17: if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 18: msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); 19: } 20: 21: // 存储消息时,延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX` 。 22: topic = ScheduleMessageService.SCHEDULE_TOPIC; 23: 24: // 延迟级别 与 消息队列编号 做固定映射 25: queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); 26: 27: // Backup real topic, queueId 28: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); 29: MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); 30: msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); 31: 32: msg.setTopic(topic); 33: msg.setQueueId(queueId); 34: } 35: } 36: 37: // ....(省略代码) 38: } 39: 40: //【ScheduleMessageService.java】 41: /** 42: * 根据 延迟级别 计算 消息队列编号 43: * QueueId = DelayLevel - 1 44: * 45: * @param delayLevel 延迟级别 46: * @return 消息队列编号 47: */ 48: public static int delayLevel2QueueId(final int delayLevel) { 49: return delayLevel - 1; 50: }
|
Broker 发送定时消息
对 SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送到达投递时间【计划消费时间】 的消息。 具体实现见ScheduleMessageService.java

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
| 1: /** 2: * 发送(投递)延迟消息定时任务 3: */ 4: class DeliverDelayedMessageTimerTask extends TimerTask { 5: /** 6: * 延迟级别 7: */ 8: private final int delayLevel; 9: /** 10: * 位置 11: */ 12: private final long offset; 13: 14: public DeliverDelayedMessageTimerTask(int delayLevel, long offset) { 15: this.delayLevel = delayLevel; 16: this.offset = offset; 17: } 18: 19: @Override 20: public void run() { 21: try { 22: this.executeOnTimeup(); 23: } catch (Exception e) { 24: // XXX: warn and notify me 25: log.error("ScheduleMessageService, executeOnTimeup exception", e); 26: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( 27: this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); 28: } 29: } 30: 31: /** 32: * 纠正可投递时间。 33: * 因为发送级别对应的发送间隔可以调整,如果超过当前间隔,则修正成当前配置,避免后面的消息无法发送。 34: * 35: * @param now 当前时间 36: * @param deliverTimestamp 投递时间 37: * @return 纠正结果 38: */ 39: private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { 40: long result = deliverTimestamp; 41: 42: long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); 43: if (deliverTimestamp > maxTimestamp) { 44: result = now; 45: } 46: 47: return result; 48: } 49: 50: public void executeOnTimeup() { 51: ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); 52: 53: long failScheduleOffset = offset; 54: 55: if (cq != null) { 56: SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); 57: if (bufferCQ != null) { 58: try { 59: long nextOffset = offset; 60: int i = 0; 61: for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 62: long offsetPy = bufferCQ.getByteBuffer().getLong(); 63: int sizePy = bufferCQ.getByteBuffer().getInt(); 64: long tagsCode = bufferCQ.getByteBuffer().getLong(); 65: 66: long now = System.currentTimeMillis(); 67: long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); 68: 69: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); 70: 71: long countdown = deliverTimestamp - now; 72: 73: if (countdown <= 0) { // 消息到达可发送时间 74: MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); 75: if (msgExt != null) { 76: try { 77: // 发送消息 78: MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); 79: PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner); 80: if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功 81: continue; 82: } else { // 发送失败 83: // XXX: warn and notify me 84: log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); 85: 86: // 安排下一次任务 87: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); 88: 89: // 更新进度 90: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); 91: return; 92: } 93: } catch (Exception e) { 94: // XXX: warn and notify me 95: log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" 96: + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); 97: } 98: } 99: } else { 100: // 安排下一次任务 101: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); 102: 103: // 更新进度 104: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); 105: return; 106: } 107: } // end of for 108: 109: nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); 110: 111: // 安排下一次任务 112: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); 113: 114: // 更新进度 115: ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); 116: return; 117: } finally { 118: bufferCQ.release(); 119: } 120: } // end of if (bufferCQ != null) 121: else { // 消费队列已经被删除部分,跳转到最小的消费进度 122: long cqMinOffset = cq.getMinOffsetInQueue(); 123: if (offset < cqMinOffset) { 124: failScheduleOffset = cqMinOffset; 125: log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" 126: + cqMinOffset + ", queueId=" + cq.getQueueId()); 127: } 128: } 129: } // end of if (cq != null) 130: 131: ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); 132: } 133: 134: /** 135: * 设置消息内容 136: * 137: * @param msgExt 消息 138: * @return 消息 139: */ 140: private MessageExtBrokerInner messageTimeup(MessageExt msgExt) { 141: MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); 142: msgInner.setBody(msgExt.getBody()); 143: msgInner.setFlag(msgExt.getFlag()); 144: MessageAccessor.setProperties(msgInner, msgExt.getProperties()); 145: 146: TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); 147: long tagsCodeValue = 148: MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); 149: msgInner.setTagsCode(tagsCodeValue); 150: msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); 151: 152: msgInner.setSysFlag(msgExt.getSysFlag()); 153: msgInner.setBornTimestamp(msgExt.getBornTimestamp()); 154: msgInner.setBornHost(msgExt.getBornHost()); 155: msgInner.setStoreHost(msgExt.getStoreHost()); 156: msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); 157: 158: msgInner.setWaitStoreMsgOK(false); 159: MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL); 160: 161: msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); 162: 163: String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID); 164: int queueId = Integer.parseInt(queueIdStr); 165: msgInner.setQueueId(queueId); 166: 167: return msgInner; 168: } 169: }
|
Broker 持久化定时发送进度
定时消息发送进度存储在文件(../config/delayOffset.json)里
每 10s 定时持久化发送进度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| 1: // 【ScheduleMessageService.java】 2: /** 3: public void start() { 4: // 定时发送消息 5: for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 6: Integer level = entry.getKey(); 7: Long timeDelay = entry.getValue(); 8: Long offset = this.offsetTable.get(level); 9: if (null == offset) { 10: offset = 0L; 11: } 12: 13: if (timeDelay != null) { 14: this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); 15: } 16: } 17: 18: // 定时持久化发送进度 19: this.timer.scheduleAtFixedRate(new TimerTask() { 20: 21: @Override 22: public void run() { 23: try { 24: ScheduleMessageService.this.persist(); 25: } catch (Exception e) { 26: log.error("scheduleAtFixedRate flush exception", e); 27: } 28: } 29: }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); 30: }
|
顺序消费
RocketMQ支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一主题的全局顺序消息消费,可以将该主题的队列数设置为1,牺牲高可用性。
官方文档引用
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
- 全局顺序
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
- 分区顺序
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
参考producer的发送方式,需要指定sharding key。
顺序消费的实现
顺序消息消费与并发消息消费的第一个关键区别:顺序消息在创建消息队列拉取任务时需要在Broker服务器锁定该消息队列。
如果经过消息队列重新负载(分配)后,分配到新的消息队列时,首先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成功则创建该消息队列的拉取任务,否则将跳过,等待其他消费者释放该消息队列的锁,然后在下一次队列重新负载时再尝试加锁。
会不会出现当消息队列重新负载时,原先由自己处理的消息队列被另外一个消费者分配,此时如果还未来得及将ProceeQueue解除锁定,就被另外一个消费者添加进去,此时会存储多个消息消费者同时消费一个消息队列?
答案是不会的,因为当一个新的消费队列分配给消费者时,在添加其拉取任务之前必须先向Broker发送对该消息队列加锁请求,只有加锁成功后,才能添加拉取消息,否则等到下一次负载后,只有消费队列被原先占有的消费者释放后,才能开始新的拉取任务。集群模式下,如果未锁定处理队列,则延迟该队列的消息消费。
队列锁的实现
顺序消息消费的各个环节基本都是围绕消息消费队列(MessageQueue)与消息处理队列(ProceeQueue)展开的。消息消费进度拉取,消息进度消费都要判断ProceeQueue的locked是否为true,设置ProceeQueu巳为true的前提条件是消息消费者(cid)向Broker端发送锁定消息队列的请求并返回加锁成功。
消息过滤
RocketMQ支持表达式过滤与类过滤两种模式,其中表达式又分为TAG和SQL92。
类过滤模式允许提交-个过滤类到FilterServer,消息消费者从FilterServer拉取消息,消息经过FilterServer时会执行过滤逻辑。
表达式模式分为TAG与SQL92表达式,SQL92表达式以消息属性过滤上下文,实现SQL条件过滤表达式而TAG模式就是简单为消息定义标签,根据消息属性tag进行匹配。
tag过滤的实现
消息发送者在消息发送时如果设置了消息的tags属性,存储在消息属性中,先存储在CommitLog文件中,然后转发到消息消费队列,消息消费队列会用8个字节存储消息tag的hashcode,之所以不直接存储tag字符串,是因为将ConumeQueue设计为定长结构,加快消息消费的加载性能。
如果直接用字符串匹配的话,过滤时会对服务器造成比较大的压力,字符串匹配需要一个一个去比
在Broker端拉取消息时午,遍历ConsumeQueue,只对比消息tag的hashcode,如果匹配则返回,否则忽略该消息。
Consume在收到消息后,同样需要先对消息进行过滤,只是此时比较的是消息tag的值而不再是hashcode。