Skip to content

事务消息详解

一、事务消息的基础概念

事务消息的本质是 “两阶段提交” 的优化:

  1. 半消息(Half Message):又称“预处理消息”,是事务消息的核心。它有两个关键特性:
    • 持久化:半消息会写入Broker的CommitLog(全局日志),保证不丢失;
    • 不可见性:半消息不会同步到ConsumeQueue(消费者拉取的索引队列),因此消费者无法感知。
  2. 事务状态:生产者执行完本地事务后,需向Broker汇报以下状态:
    • COMMIT_MESSAGE:本地事务成功,Broker将半消息转为“可见普通消息”;
    • ROLLBACK_MESSAGE:本地事务失败,Broker删除半消息;
    • UNKNOW:状态未知,Broker后续会发起事务回查

二、事务消息的实现原理(核心流程)

RocketMQ的事务消息流程可拆解为5个关键步骤,涉及生产者、Broker、消费者三方协同:


1. 发送半消息(第一阶段:预处理)

生产者通过TransactionMQProducer发送事务消息,流程如下:

  • 生产者构造事务消息:设置消息属性TRAN_MSG=true(标记为事务消息)、TRANSACTION_ID(唯一事务ID,关联本地事务);
  • Broker处理半消息
    1. Broker接收消息后,检查TRAN_MSG属性,确认是事务消息;
    2. 将半消息写入CommitLog(持久化);
    3. 不写入ConsumeQueue:确保消费者无法拉取;
    4. 返回SendStatus.SEND_OK给生产者,告知半消息发送成功。

关键设计:半消息的持久化是后续所有操作的基础——即使Broker宕机,重启后仍能从CommitLog恢复半消息。


2. 执行本地事务(第二阶段:业务操作)

生产者收到半消息成功的响应后,执行本地事务逻辑(如数据库插入、更新)。例如:

java
// TransactionListener#executeLocalTransaction(生产者实现)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    String transactionId = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_ID);
    try {
        // 执行本地事务:插入订单
        orderDao.insertOrder(transactionId, orderInfo);
        return LocalTransactionState.COMMIT_MESSAGE; // 本地事务成功
    } catch (Exception e) {
        return LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务失败
    }
}

注意:本地事务需保证幂等性(避免重复执行),因为后续可能触发事务回查。


3. 上报事务状态(第二阶段:提交/回滚)

生产者执行完本地事务后,需向Broker上报最终状态,流程如下:

  • 生产者调用endTransaction接口,携带transactionIdLocalTransactionState
  • Broker接收状态后,分情况处理:
    • COMMIT:将半消息的索引写入ConsumeQueue,使其变为“可见普通消息”,消费者可拉取;
    • ROLLBACK:从CommitLog标记/删除半消息(实际是写入一条“删除标记”,后续由后台线程清理);
    • UNKNOW:Broker暂不处理,后续发起事务回查

关键:若上报失败(如网络波动),生产者需重试,但需保证幂等(Broker会忽略重复的状态请求)。


4. 事务回查(补偿机制:解决状态丢失)

若生产者因网络故障、进程宕机等原因未上报状态,Broker需主动发起事务回查,确保状态最终一致。

回查的实现细节:

  1. Broker的定时任务:Broker内置TransactionalMessageCheckService定时任务(默认1分钟执行一次),扫描CommitLog中**状态为PREPARED(未确认)且超过超时时间(默认6秒)**的半消息;
  2. 构造回查请求:Broker根据半消息的producerGroup(生产者组)和transactionId,通过NameServer找到生产者实例,发送CHECK_TRANSACTION_STATE请求;
  3. 生产者处理回查:生产者实现TransactionListener#checkLocalTransaction方法,查询本地事务状态(如通过transactionId查数据库):
    java
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String transactionId = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_ID);
        // 查询本地事务状态:订单是否存在
        Order order = orderDao.getOrderByTransactionId(transactionId);
        if (order != null) {
            return LocalTransactionState.COMMIT_MESSAGE; // 本地事务成功
        } else {
            return LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务失败
        }
    }
  4. Broker处理回查结果:若返回COMMIT则提交,ROLLBACK则回滚;若仍返回UNKNOW,则重复回查直到超过最大次数(默认15次)。

超时处理:超过最大回查次数后,Broker会自动回滚半消息(标记为无效,后续清理)。


5. 消费者消费消息(最终一致性)

当Broker将半消息转为“可见普通消息”后,消费者的逻辑与普通消息完全一致:

  • 消费者从ConsumeQueue拉取消息;
  • 执行消费逻辑(如减库存);
  • 消费成功后向Broker确认ACK,Broker删除消息。

注意:消费者需处理幂等性(如根据transactionId判断是否已消费),避免重复执行。

三、事务消息的底层存储设计

RocketMQ的事务消息未额外引入数据库,而是复用CommitLogConsumeQueue,通过消息属性状态标记实现事务跟踪:

1. CommitLog中的事务消息

CommitLog是RocketMQ的全局日志文件,所有消息(包括半消息)都写入此处。事务消息的关键属性:

  • TRAN_MSG=true:标记为事务消息;
  • TRANSACTION_ID:唯一事务ID;
  • MESSAGE_STATUS:记录事务状态(PREPARED/COMMITTED/ROLLBACKED)。

例如,CommitLog中的一条半消息结构:

Topic: order_topic
Tags: create_order
Keys: order_123
Body: {"orderId":123,"amount":100}
Properties: {
  "TRAN_MSG": "true",
  "TRANSACTION_ID": "tx_123456",
  "MESSAGE_STATUS": "PREPARED"
}

2. ConsumeQueue的可见性控制

ConsumeQueue是消费者的“索引队列”,存储消息在CommitLog中的偏移量和长度。半消息不会写入ConsumeQueue,因此消费者无法拉取;只有当Broker收到COMMIT指令后,才会将半消息的索引写入ConsumeQueue,此时消息变为可见。

四、事务消息的高可用设计

RocketMQ的事务消息通过Broker集群的主从复制保证高可用:

  • 主Broker:处理半消息发送、状态上报、回查请求;
  • 从Broker:同步主Broker的CommitLog(半消息会被同步);
  • 故障转移:若主Broker宕机,从Broker会升级为新主,继续处理事务消息的后续流程(如回查、提交)。

五、事务消息的使用限制

  1. 不支持延迟/顺序消息:延迟消息需要指定时间后可见,顺序消息需要严格顺序,与事务消息的“状态驱动可见性”冲突;
  2. 生产者需实现TransactionListener:必须重写executeLocalTransaction(执行本地事务)和checkLocalTransaction(处理回查);
  3. 事务ID需唯一:生产者需保证TRANSACTION_ID全局唯一,否则会导致回查逻辑错误;
  4. 回查需幂等:生产者处理回查时,不能重复执行本地事务(需通过transactionId判断状态)。

六、事务消息的典型场景

事务消息适用于 “本地事务执行”与“消息发送”强关联的场景:

  • 电商订单:创建订单(本地事务)→ 发送减库存消息;
  • 支付回调:更新支付状态(本地事务)→ 发送发货消息;
  • 金融转账:转出账户扣款(本地事务)→ 发送转入账户加款消息。

七、总结:事务消息的核心逻辑

RocketMQ的事务消息通过**“半消息预处理→本地事务执行→状态上报→回查补偿”的流程,解决了分布式系统中“本地事务”与“消息发送”的原子性问题,最终实现最终一致性**。其核心设计亮点:

  • 半消息的“持久化+不可见”:保证消息不丢失且不提前消费;
  • 回查机制:解决状态上报失败的异常场景;
  • 复用现有存储:无需额外数据库,性能接近普通消息。

通过以上流程,RocketMQ的事务消息完美平衡了一致性性能,成为分布式系统中“消息驱动事务”的首选方案。