Appearance
Seata 实战示例
要理解Seata四种模式的代码落地,我们结合电商“下单→扣库存→支付”的经典场景,分别给出可运行的核心代码+配置,并解释关键逻辑。
准备工作
- 部署Seata Server:参考Seata官方文档部署TC集群(建议用Nacos做注册中心)。
- 数据库初始化:
- AT模式需创建
undo_log表(所有参与AT模式的数据库都要); - 业务表:
t_order(订单)、t_stock(库存)、t_account(账户)。
- AT模式需创建
一、AT模式(无侵入,最常用)
AT模式是无侵入的分布式事务解决方案,核心是@GlobalTransactional注解+数据源代理。
1. 依赖配置(所有服务都需引入)
xml
<!-- Spring Cloud Alibaba Seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2023.0.1.0</version> <!-- 适配Spring Boot 3.x,若用2.x则选2.2.9.RELEASE -->
</dependency>
<!-- 数据库驱动(以MySQL为例) -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- 数据源(以Druid为例) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.20</version>
</dependency>2. 核心配置(application.yml)
yaml
spring:
application:
name: order-service # 服务名
datasource:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://localhost:3306/seata_order?useSSL=false&serverTimezone=UTC
username: root
password: root
cloud:
alibaba:
seata:
tx-service-group: order_tx_group # 事务组(需与TC配置一致)
seata:
registry:
type: nacos # 注册中心类型
nacos:
server-addr: 127.0.0.1:8848 # Nacos地址
namespace: seata_namespace # 命名空间(需提前创建)
config:
type: nacos # 配置中心类型
nacos:
server-addr: 127.0.0.1:88483. 数据源代理(关键!AT模式必须)
Seata通过DataSourceProxy拦截SQL,生成undo_log。需替换原数据源:
java
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
@Configuration
public class SeataDataSourceConfig {
// 1. 配置原始数据源(Druid)
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource rawDataSource() {
return new DruidDataSource();
}
// 2. 用Seata代理数据源
@Bean
public DataSourceProxy dataSourceProxy(DataSource rawDataSource) {
return new DataSourceProxy(rawDataSource);
}
// 3. 配置MyBatis的SqlSessionFactory(使用代理数据源)
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
factory.setDataSource(dataSourceProxy);
factory.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath:mapper/*.xml")); // MyBatis映射文件路径
return factory.getObject();
}
}4. 业务代码
(1)全局事务发起方:订单服务
用@GlobalTransactional标记全局事务边界,调用库存服务和支付服务:
java
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper; // 订单DAO
@Autowired
private StockFeignClient stockFeignClient; // 库存服务Feign客户端
@Autowired
private AccountFeignClient accountFeignClient; // 账户服务Feign客户端
/**
* 创建订单(全局事务入口)
* @param order 订单信息(包含商品ID、数量、用户ID、金额)
*/
@GlobalTransactional(name = "createOrderTx", rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 插入订单(本地事务,AT模式自动生成undo_log)
orderMapper.insert(order);
System.out.println("订单创建成功,orderId: " + order.getId());
// 2. 调用库存服务扣减库存(分布式调用,XID自动传递)
stockFeignClient.reduceStock(order.getProductId(), order.getNum());
System.out.println("库存扣减成功,productId: " + order.getProductId());
// 3. 调用账户服务扣减余额(分布式调用)
accountFeignClient.reduceBalance(order.getUserId(), order.getAmount());
System.out.println("余额扣减成功,userId: " + order.getUserId());
}
}(2)分支事务:库存服务
无需额外注解,Seata自动拦截SQL并注册分支事务:
java
@Service
public class StockService {
@Autowired
private StockMapper stockMapper;
/**
* 扣减库存(分支事务)
* @param productId 商品ID
* @param num 扣减数量
*/
public void reduceStock(Long productId, Integer num) {
// 直接执行SQL,Seata自动生成undo_log
stockMapper.reduceStock(productId, num);
}
}对应的MyBatis映射文件(StockMapper.xml):
xml
<update id="reduceStock">
UPDATE t_stock SET num = num - #{num} WHERE product_id = #{productId} AND num >= #{num}
</update>5. 运行流程
- 订单服务调用
createOrder,@GlobalTransactional触发TM向TC申请XID; - Feign调用库存服务时,XID通过
ThreadLocal传递到库存服务; - 库存服务执行
reduceStock,Seata拦截SQL,生成undo_log,提交本地事务,并向TC注册分支事务; - 若所有分支执行成功,TM向TC发起全局提交,TC通知所有RM删除
undo_log; - 若任一分支失败(如库存不足抛出异常),TM向TC发起全局回滚,TC通知所有RM执行
undo_log恢复数据。
二、TCC模式(手动补偿,灵活)
TCC模式需要手动实现Try/Confirm/Cancel三个方法,适用于非关系型数据库或第三方服务(如支付接口)。
1. 依赖配置
同AT模式,但无需数据源代理(TCC不依赖undo_log)。
2. 核心代码
以库存服务为例,实现“冻结库存→确认扣减→取消冻结”的TCC逻辑:
(1)TCC接口定义
需用@TccAction注解标记Try方法,并指定Confirm和Cancel方法:
java
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TccAction;
// LocalTCC注解:标记这是一个TCC接口(Seata 1.5+推荐)
@LocalTCC
public interface StockTccService {
/**
* Try阶段:冻结库存(预留资源)
* @param productId 商品ID
* @param num 冻结数量
* @param requestId 幂等唯一标识(如订单ID)
* @return 是否成功
*/
@TccAction(name = "freezeStock", commitMethod = "confirmFreeze", rollbackMethod = "cancelFreeze")
boolean freezeStock(
BusinessActionContext context, // Seata自动传递的上下文(包含XID、BranchID)
@BusinessActionContextParameter(paramName = "productId") Long productId, // 传递到Confirm/Cancel的参数
@BusinessActionContextParameter(paramName = "num") Integer num,
@BusinessActionContextParameter(paramName = "requestId") String requestId
);
/**
* Confirm阶段:确认扣减库存(Try成功后执行)
* @param context 上下文
* @return 是否成功
*/
boolean confirmFreeze(BusinessActionContext context);
/**
* Cancel阶段:取消冻结库存(Try失败后执行)
* @param context 上下文
* @return 是否成功
*/
boolean cancelFreeze(BusinessActionContext context);
}(2)TCC实现类
需处理幂等性(避免重复执行)和空回滚(Try未执行时Cancel不处理):
java
import io.seata.rm.tcc.api.BusinessActionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class StockTccServiceImpl implements StockTccService {
@Autowired
private StockMapper stockMapper;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String LOCK_KEY_PREFIX = "tcc:freeze:"; // Redis幂等键前缀
@Override
public boolean freezeStock(BusinessActionContext context, Long productId, Integer num, String requestId) {
// 1. 幂等检查:避免重复冻结(如Try重试)
String lockKey = LOCK_KEY_PREFIX + requestId;
if (redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.MINUTES)) {
// 2. 冻结库存(将可用库存转为冻结库存)
int rows = stockMapper.freezeStock(productId, num);
if (rows > 0) {
return true;
} else {
// 冻结失败,删除幂等键
redisTemplate.delete(lockKey);
return false;
}
}
// 已处理过,直接返回成功
return true;
}
@Override
public boolean confirmFreeze(BusinessActionContext context) {
// 1. 从上下文获取参数
Long productId = Long.parseLong(context.getActionContext("productId").toString());
Integer num = Integer.parseInt(context.getActionContext("num").toString());
String requestId = context.getActionContext("requestId").toString();
// 2. 确认扣减:将冻结库存转为实际扣减(可用库存=可用-冻结,冻结=0)
stockMapper.confirmFreeze(productId, num);
// 3. 删除幂等键
redisTemplate.delete(LOCK_KEY_PREFIX + requestId);
return true;
}
@Override
public boolean cancelFreeze(BusinessActionContext context) {
// 1. 从上下文获取参数
Long productId = Long.parseLong(context.getActionContext("productId").toString());
Integer num = Integer.parseInt(context.getActionContext("num").toString());
String requestId = context.getActionContext("requestId").toString();
// 2. 检查Try是否执行(幂等键是否存在)
if (redisTemplate.hasKey(LOCK_KEY_PREFIX + requestId)) {
// 3. 取消冻结:将冻结库存转回可用库存
stockMapper.cancelFreeze(productId, num);
// 4. 删除幂等键
redisTemplate.delete(LOCK_KEY_PREFIX + requestId);
}
// Try未执行,直接返回成功(避免空回滚)
return true;
}
}(3)全局事务发起方(订单服务)
调用TCC的Try方法,@GlobalTransactional会自动触发Confirm/Cancel:
java
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private StockTccService stockTccService; // 注入TCC服务
@Autowired
private AccountTccService accountTccService;
@GlobalTransactional(name = "createOrderTccTx", rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 插入订单(本地事务)
orderMapper.insert(order);
// 2. 调用库存TCC的Try方法(冻结库存)
boolean stockResult = stockTccService.freezeStock(
null, // Seata自动填充上下文
order.getProductId(),
order.getNum(),
order.getId().toString() // requestId用订单ID(幂等)
);
if (!stockResult) {
throw new RuntimeException("库存冻结失败");
}
// 3. 调用账户TCC的Try方法(冻结余额)
boolean accountResult = accountTccService.freezeBalance(
null,
order.getUserId(),
order.getAmount(),
order.getId().toString()
);
if (!accountResult) {
throw new RuntimeException("余额冻结失败");
}
}
}3. 运行流程
- 订单服务调用
freezeStock(Try阶段),冻结库存并记录幂等键; - 若所有Try成功,TM向TC发起全局提交,TC通知所有RM执行
confirmFreeze(确认扣减); - 若任一Try失败(如库存不足),TM向TC发起全局回滚,TC通知所有RM执行
cancelFreeze(取消冻结); - 幂等键确保Try/Confirm/Cancel不会重复执行。
三、SAGA模式(长事务,最终一致)
SAGA模式适用于长链路、异步的事务(如电商“下单→扣库存→支付→发货→通知”),核心是正向流程+补偿流程。
Seata的SAGA模式支持两种实现方式:
- 注解式(简单场景);
- 状态机式(复杂场景,推荐)。
1. 依赖配置
需额外引入SAGA依赖:
xml
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-saga-processors</artifactId>
<version>1.8.0</version>
</dependency>2. 状态机式实现(推荐)
SAGA状态机用JSON配置定义正向流程和补偿流程,Seata根据状态机自动执行。
(1)状态机配置文件(saga_create_order.json)
放在src/main/resources/saga目录下,定义“下单→扣库存→支付”的流程:
json
{
"name": "createOrderSaga",
"comment": "创建订单的SAGA流程",
"startState": "createOrder",
"states": {
// 1. 正向步骤1:创建订单
"createOrder": {
"type": "ServiceTask",
"serviceName": "orderService",
"serviceMethod": "createOrder",
"compensateState": "cancelOrder", // 补偿步骤:取消订单
"nextState": "reduceStock"
},
// 2. 正向步骤2:扣减库存
"reduceStock": {
"type": "ServiceTask",
"serviceName": "stockService",
"serviceMethod": "reduceStock",
"compensateState": "restoreStock", // 补偿步骤:恢复库存
"nextState": "reduceBalance"
},
// 3. 正向步骤3:扣减余额
"reduceBalance": {
"type": "ServiceTask",
"serviceName": "accountService",
"serviceMethod": "reduceBalance",
"compensateState": "restoreBalance", // 补偿步骤:恢复余额
"endState": true
},
// 补偿步骤1:取消订单
"cancelOrder": {
"type": "ServiceTask",
"serviceName": "orderService",
"serviceMethod": "cancelOrder"
},
// 补偿步骤2:恢复库存
"restoreStock": {
"type": "ServiceTask",
"serviceName": "stockService",
"serviceMethod": "restoreStock"
},
// 补偿步骤3:恢复余额
"restoreBalance": {
"type": "ServiceTask",
"serviceName": "accountService",
"serviceMethod": "restoreBalance"
}
}
}(2)业务服务实现
每个正向步骤和补偿步骤对应一个业务方法:
java
// 订单服务
@Service("orderService")
public class OrderService {
@Autowired
private OrderMapper orderMapper;
// 正向:创建订单
public void createOrder(Map<String, Object> param) {
Long productId = Long.parseLong(param.get("productId").toString());
Integer num = Integer.parseInt(param.get("num").toString());
Long userId = Long.parseLong(param.get("userId").toString());
BigDecimal amount = new BigDecimal(param.get("amount").toString());
Order order = new Order();
order.setProductId(productId);
order.setNum(num);
order.setUserId(userId);
order.setAmount(amount);
order.setStatus(1); // 1: 未支付
orderMapper.insert(order);
// 将订单ID存入上下文,供补偿流程使用
param.put("orderId", order.getId());
}
// 补偿:取消订单
public void cancelOrder(Map<String, Object> param) {
Long orderId = Long.parseLong(param.get("orderId").toString());
orderMapper.updateStatus(orderId, 0); // 0: 已取消
}
}
// 库存服务
@Service("stockService")
public class StockService {
@Autowired
private StockMapper stockMapper;
// 正向:扣减库存
public void reduceStock(Map<String, Object> param) {
Long productId = Long.parseLong(param.get("productId").toString());
Integer num = Integer.parseInt(param.get("num").toString());
stockMapper.reduceStock(productId, num);
}
// 补偿:恢复库存
public void restoreStock(Map<String, Object> param) {
Long productId = Long.parseLong(param.get("productId").toString());
Integer num = Integer.parseInt(param.get("num").toString());
stockMapper.restoreStock(productId, num);
}
}(3)触发SAGA流程
通过SagaEngine触发状态机:
java
import io.seata.saga.engine.SagaEngine;
import io.seata.saga.engine.context.SagaContext;
import io.seata.saga.engine.context.impl.JsonSagaContext;
import io.seata.saga.processor.DefaultSagaProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
public class SagaTriggerService {
@Autowired
private SagaEngine sagaEngine;
@Autowired
private DefaultSagaProcessor sagaProcessor;
public void triggerCreateOrderSaga(Order order) {
// 1. 构造流程参数
Map<String, Object> param = new HashMap<>();
param.put("productId", order.getProductId());
param.put("num", order.getNum());
param.put("userId", order.getUserId());
param.put("amount", order.getAmount());
// 2. 初始化SAGA上下文
SagaContext context = new JsonSagaContext();
context.setBizId(order.getId().toString()); // 业务唯一标识(如订单ID)
// 3. 触发SAGA流程(状态机文件路径:saga/createOrderSaga.json)
sagaEngine.start("saga/createOrderSaga.json", param, context);
}
}3. 运行流程
- 调用
triggerCreateOrderSaga,SagaEngine加载状态机配置; - 按顺序执行正向流程:
createOrder→reduceStock→reduceBalance; - 若所有正向步骤成功,流程结束;
- 若某一步失败(如
reduceBalance时余额不足),SagaEngine反向执行补偿流程:restoreBalance→restoreStock→cancelOrder; - 补偿流程确保所有资源恢复到事务前状态,最终一致。
四、XA模式(强一致,依赖数据库)
XA模式是传统2PC的实现,依赖数据库的XA协议(如MySQL的XA START/XA END),适用于强一致场景(如金融核心系统)。
1. 依赖配置
需引入XA数据源依赖(以Druid为例):
xml
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.20</version>
</dependency>2. XA数据源配置
需用DruidXADataSource替换原数据源(XA模式必须):
java
import com.alibaba.druid.pool.xa.DruidXADataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import javax.sql.XADataSource;
@Configuration
public class XADataSourceConfig {
// 1. 配置XA数据源(DruidXADataSource)
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public XADataSource xaDataSource() {
return new DruidXADataSource();
}
// 2. 用Seata代理XA数据源
@Bean
public DataSourceProxy dataSourceProxy(XADataSource xaDataSource) {
return new DataSourceProxy(xaDataSource);
}
// 3. 配置MyBatis的SqlSessionFactory
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
factory.setDataSource(dataSourceProxy);
factory.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath:mapper/*.xml"));
return factory.getObject();
}
}3. 业务代码
与AT模式类似,用@GlobalTransactional标记全局事务,Seata自动处理XA两阶段提交:
java
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private StockFeignClient stockFeignClient;
@Autowired
private AccountFeignClient accountFeignClient;
@GlobalTransactional(name = "createOrderXaTx", rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 插入订单(XA本地事务,未提交)
orderMapper.insert(order);
// 2. 调用库存服务扣减库存(XA分支事务,未提交)
stockFeignClient.reduceStock(order.getProductId(), order.getNum());
// 3. 调用账户服务扣减余额(XA分支事务,未提交)
accountFeignClient.reduceBalance(order.getUserId(), order.getAmount());
}
}4. 运行流程
- 订单服务调用
createOrder,TM向TC申请XID; - 每个分支服务执行SQL时,Seata通过XA数据源发起
XA START,执行SQL后XA END(但未提交); - 所有分支执行完成后,TM向TC发起全局提交,TC通知所有RM执行
XA COMMIT; - 若任一分支失败,TM向TC发起全局回滚,TC通知所有RM执行
XA ROLLBACK; - XA模式的核心是数据库层面的两阶段提交,Seata仅负责协调。
总结:四种模式的代码对比
| 模式 | 核心代码特征 | 适用场景 |
|---|---|---|
| AT | @GlobalTransactional+数据源代理 | 关系型数据库、低侵入 |
| TCC | 手动实现Try/Confirm/Cancel+@LocalTCC | 非关系型数据库、第三方服务 |
| SAGA | 状态机配置+正向/补偿方法 | 长链路、异步、最终一致 |
| XA | XA数据源配置+@GlobalTransactional | 强一致、金融核心系统 |
注意:所有模式都需确保XID传递(Seata Starter自动处理Feign/RestTemplate),生产环境需配置高可用TC集群(Nacos注册中心)。
