消息发送的三种方式
- 同步:同步等待,直到消息服务器返回发送结果。
- 异步 :指定消息发送成功后的回掉函数,然后调用消息发送API 后,立即返回,消息发送者线程不阻塞 ,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
- 单向:消息发送者向 MQ 执行发送消息 API 时,直接返回,不等待消息服务器的结果,只管发,不在乎消息是否成功存储在消息服务器上。
启动producer流程
消息发送流程
消息长度验证
验证消息是否符合相应的规范,具体的规范要求是主题名称、 消息体不能为空 、 消息长度不能等于0且默认不能超过允许发送消息的最大长度 4M (maxMessageSize=l024 * 1024 * 4 )。
查找主题路由
- 如果生产者中缓存了 topic的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,
- 如果没有缓存或没有包含消息队列,则向 NameServer查询该topic的路由信息。
- 如果最终未找到路由信息,则抛出异常。
选择消息队列
RocketMq支持默认的消息投递方式和保证部分顺序的投递方式
默认消息投递方式
参考org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl方法
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#mqFaultStrategy
默认投递方式会把mq消息均匀的发送到目前可用的broker中,对于发送失败的情况,会通过重试的方式,把消息发到可用的broker的队列上
消息发送高可用主要通过两个手段 : 重试与 Broker 规避。 Broker规避就是在一次消息发送过程中发现错误,在某一时间段内,消息生产者不会选择该 Broker(消息服务器)上的消息队列,提高发送消息的成功率。
保证部分顺序(不支持消息发送高可用)
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
producer传入MessageQueueSelector,来确定消息发送到哪一个队列上,如果发送失败则会抛出异常,比如某个key被分配到aBroker的1队列上,aBroker宕机,producer会直接抛出异常而不是重试,发到bBroker的1队列上。
MessageQueueSelector目前有三个实现类
- SelectMessageQueueByHash 根据附加参数的Hash值,按照消息队列列表的大小取余数,得到消息队列的index
- SelectMessageQueueByRandom 使用了简单的随机数选择算法
- SelectMessageQueueByMachineRoom 开源的版本没有具体的实现

批量消息
rocketMq也支持批量消息,批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。