0%

rocketmq05_TransactionalMessage

事务消息简介

RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

事务消息能保证producer发送消息的一致性。

Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

RocketMQ事务消息流程概要

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

事务消息发送及提交

  1. 发送消息(half消息)。

  2. 服务端响应消息写入结果。

  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

  4. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

补偿流程

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态
  3. 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

RocketMQ事务消息设计

事务消息在一阶段对用户不可见

在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:

RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。

Commit和Rollback操作以及Op消息的引入

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

Op消息的存储和对应关系

RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。

Half消息的索引构建

在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

如何处理二阶段失败的消息?

如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。

值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

事务消息实战

方案一

1
2
3
4
5
@Transactional
public void createOrder() {
save();//将数据存入数据库
sendMq();//发送消息到mq
}

方案一有以下弊端。

  1. 如果消息发送成功,在提交事务的时候JVM突然挂掉,事务没有成功提交,导致两个系统之间数据不一致。
  2. 由于消息是在事务提交之前提交,发送的消息内容是订单实体的内容,会造成在消费端进行消费时如果需要验证订单是否存在时可能出现订单不存在的情况。
  3. 发送成功,但是mq返回超时,导致事务回滚,出现数据不一致

方案二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Transactional
public void createOrder() {
//执行下订单相关的业务流程,例如操作本地数据库落库相关代码
save();
//生成事务消息唯一业务表示,将该业务表示组装到待发送的消息体中
//往待发送消息表中插入一条记录,本次唯一消息发送业务工D,消息σso{消息主题、消息tag、消息体}、创建时间、发送状态
createMqTransactionalLog();
}

@Scheduled
private void sendMq() {
//同时需要引人定时机制,去扫描待发送消息记录,避免消息丢失。

}

方案二在方案一的基础上,增加了补偿机制。但是也引入了其他问题。

  1. 消息有可能重复发送,但在消费端可以通过唯一业务编号来进行去重设计。
  2. 实现过于复杂,为了避免极端情况下的消息丢失,需要使用定时任务。

方案三(RocketMq的官方推荐方式)

1. 使用rocketmq事务消息的producer发送消息

1
2
3
4
5
6
7
8
@Transactional
public void createOrder() {
//执行下订单相关的业务流程,例如操作本地数据库落库相关代码
//生成事务消息唯一业务表示,将该业务表示组装到待发送的消息体中,方便消息消费端进行幂等消费。
save();
//调用消息客户端AP工,发送事务 prepare消息消费。
producer.sendMessageInTransaction(msg, null);
}

2.实现事务的监听接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//从消息中获取业务唯一ID。
String bizUniNo=msg. getUserproperty(" bizUniNo");
//将 bizUniNo 入库,表名: t_message_transaction,表结构 bizUniNo(主键),业务类型。
return LocalTransactionState.UNKNOW
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//从数据库中查询t_message_transaction表中是否存在bizUniNo
//如果存在 返回事务已提交的状态
//如果不存在 返回事务LocalTransactionState.UNKNOW
//超过重试次数,返回事务回滚
return LocalTransactionState.COMMIT_MESSAGE;
}
}

Transactionlistener实现要点如下

execute LocalTransaction:该方法主要是设置本地事务状态,与业务方代码在一个事务中,故在这里,主要是向 t_message_transaction添加一条记录,在事务回査时,如果存在记录,就认为是该消息需要提交,其返回值建议返还 LocalTransactionState.UNKNOW。

checkLocaltransaction:该方法主要是告知 RocketMQ消息是需要提交还是回滚,如果本地事务表( t_message_transaction)存在记录,则认为提交;如果不存在,可以设置回査次数,如果指定次数内还是未査到消息,则回滚,否则返回未知。

事务消息使用上的限制

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。