Appearance
事务消息详解
一、事务消息的基础概念
事务消息的本质是 “两阶段提交” 的优化:
- 半消息(Half Message):又称“预处理消息”,是事务消息的核心。它有两个关键特性:
- 持久化:半消息会写入Broker的
CommitLog(全局日志),保证不丢失; - 不可见性:半消息不会同步到
ConsumeQueue(消费者拉取的索引队列),因此消费者无法感知。
- 持久化:半消息会写入Broker的
- 事务状态:生产者执行完本地事务后,需向Broker汇报以下状态:
COMMIT_MESSAGE:本地事务成功,Broker将半消息转为“可见普通消息”;ROLLBACK_MESSAGE:本地事务失败,Broker删除半消息;UNKNOW:状态未知,Broker后续会发起事务回查。
二、事务消息的实现原理(核心流程)
RocketMQ的事务消息流程可拆解为5个关键步骤,涉及生产者、Broker、消费者三方协同:
1. 发送半消息(第一阶段:预处理)
生产者通过TransactionMQProducer发送事务消息,流程如下:
- 生产者构造事务消息:设置消息属性
TRAN_MSG=true(标记为事务消息)、TRANSACTION_ID(唯一事务ID,关联本地事务); - Broker处理半消息:
- Broker接收消息后,检查
TRAN_MSG属性,确认是事务消息; - 将半消息写入
CommitLog(持久化); - 不写入
ConsumeQueue:确保消费者无法拉取; - 返回
SendStatus.SEND_OK给生产者,告知半消息发送成功。
- Broker接收消息后,检查
关键设计:半消息的持久化是后续所有操作的基础——即使Broker宕机,重启后仍能从
CommitLog恢复半消息。
2. 执行本地事务(第二阶段:业务操作)
生产者收到半消息成功的响应后,执行本地事务逻辑(如数据库插入、更新)。例如:
java
// TransactionListener#executeLocalTransaction(生产者实现)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_ID);
try {
// 执行本地事务:插入订单
orderDao.insertOrder(transactionId, orderInfo);
return LocalTransactionState.COMMIT_MESSAGE; // 本地事务成功
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务失败
}
}注意:本地事务需保证幂等性(避免重复执行),因为后续可能触发事务回查。
3. 上报事务状态(第二阶段:提交/回滚)
生产者执行完本地事务后,需向Broker上报最终状态,流程如下:
- 生产者调用
endTransaction接口,携带transactionId和LocalTransactionState; - Broker接收状态后,分情况处理:
- COMMIT:将半消息的索引写入
ConsumeQueue,使其变为“可见普通消息”,消费者可拉取; - ROLLBACK:从
CommitLog中标记/删除半消息(实际是写入一条“删除标记”,后续由后台线程清理); - UNKNOW:Broker暂不处理,后续发起事务回查。
- COMMIT:将半消息的索引写入
关键:若上报失败(如网络波动),生产者需重试,但需保证幂等(Broker会忽略重复的状态请求)。
4. 事务回查(补偿机制:解决状态丢失)
若生产者因网络故障、进程宕机等原因未上报状态,Broker需主动发起事务回查,确保状态最终一致。
回查的实现细节:
- Broker的定时任务:Broker内置
TransactionalMessageCheckService定时任务(默认1分钟执行一次),扫描CommitLog中**状态为PREPARED(未确认)且超过超时时间(默认6秒)**的半消息; - 构造回查请求:Broker根据半消息的
producerGroup(生产者组)和transactionId,通过NameServer找到生产者实例,发送CHECK_TRANSACTION_STATE请求; - 生产者处理回查:生产者实现
TransactionListener#checkLocalTransaction方法,查询本地事务状态(如通过transactionId查数据库):java@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String transactionId = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_ID); // 查询本地事务状态:订单是否存在 Order order = orderDao.getOrderByTransactionId(transactionId); if (order != null) { return LocalTransactionState.COMMIT_MESSAGE; // 本地事务成功 } else { return LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务失败 } } - Broker处理回查结果:若返回
COMMIT则提交,ROLLBACK则回滚;若仍返回UNKNOW,则重复回查直到超过最大次数(默认15次)。
超时处理:超过最大回查次数后,Broker会自动回滚半消息(标记为无效,后续清理)。
5. 消费者消费消息(最终一致性)
当Broker将半消息转为“可见普通消息”后,消费者的逻辑与普通消息完全一致:
- 消费者从
ConsumeQueue拉取消息; - 执行消费逻辑(如减库存);
- 消费成功后向Broker确认
ACK,Broker删除消息。
注意:消费者需处理幂等性(如根据
transactionId判断是否已消费),避免重复执行。
三、事务消息的底层存储设计
RocketMQ的事务消息未额外引入数据库,而是复用CommitLog和ConsumeQueue,通过消息属性和状态标记实现事务跟踪:
1. CommitLog中的事务消息
CommitLog是RocketMQ的全局日志文件,所有消息(包括半消息)都写入此处。事务消息的关键属性:
TRAN_MSG=true:标记为事务消息;TRANSACTION_ID:唯一事务ID;MESSAGE_STATUS:记录事务状态(PREPARED/COMMITTED/ROLLBACKED)。
例如,CommitLog中的一条半消息结构:
Topic: order_topic
Tags: create_order
Keys: order_123
Body: {"orderId":123,"amount":100}
Properties: {
"TRAN_MSG": "true",
"TRANSACTION_ID": "tx_123456",
"MESSAGE_STATUS": "PREPARED"
}2. ConsumeQueue的可见性控制
ConsumeQueue是消费者的“索引队列”,存储消息在CommitLog中的偏移量和长度。半消息不会写入ConsumeQueue,因此消费者无法拉取;只有当Broker收到COMMIT指令后,才会将半消息的索引写入ConsumeQueue,此时消息变为可见。
四、事务消息的高可用设计
RocketMQ的事务消息通过Broker集群的主从复制保证高可用:
- 主Broker:处理半消息发送、状态上报、回查请求;
- 从Broker:同步主Broker的
CommitLog(半消息会被同步); - 故障转移:若主Broker宕机,从Broker会升级为新主,继续处理事务消息的后续流程(如回查、提交)。
五、事务消息的使用限制
- 不支持延迟/顺序消息:延迟消息需要指定时间后可见,顺序消息需要严格顺序,与事务消息的“状态驱动可见性”冲突;
- 生产者需实现
TransactionListener:必须重写executeLocalTransaction(执行本地事务)和checkLocalTransaction(处理回查); - 事务ID需唯一:生产者需保证
TRANSACTION_ID全局唯一,否则会导致回查逻辑错误; - 回查需幂等:生产者处理回查时,不能重复执行本地事务(需通过
transactionId判断状态)。
六、事务消息的典型场景
事务消息适用于 “本地事务执行”与“消息发送”强关联的场景:
- 电商订单:创建订单(本地事务)→ 发送减库存消息;
- 支付回调:更新支付状态(本地事务)→ 发送发货消息;
- 金融转账:转出账户扣款(本地事务)→ 发送转入账户加款消息。
七、总结:事务消息的核心逻辑
RocketMQ的事务消息通过**“半消息预处理→本地事务执行→状态上报→回查补偿”的流程,解决了分布式系统中“本地事务”与“消息发送”的原子性问题,最终实现最终一致性**。其核心设计亮点:
- 半消息的“持久化+不可见”:保证消息不丢失且不提前消费;
- 回查机制:解决状态上报失败的异常场景;
- 复用现有存储:无需额外数据库,性能接近普通消息。
通过以上流程,RocketMQ的事务消息完美平衡了一致性与性能,成为分布式系统中“消息驱动事务”的首选方案。
