分布式事务解决方案

大多数场景下应用都只需要操作单一数据库,该情况下的事务称之为本地事务(Local Transaction)。本地事务的ACID特性是数据库直接提供支持。在JDBC编程中通过java.sql.Connection对象来开启关闭提交事务。

1
2
3
4
5
6
7
8
9
10
Connection conn = ... ; 	//获取数据库连接
conn.setAutoCommit(false);  //开启事务
try {
// 执行增删改查sql
conn.commit();  // 提交事务
} catch (Exception e) {
conn.rollback(); // 事务回滚
} finally{
conn.close(); // 关闭链接
}

绝大部分公司都进行了数据库拆分服务化,完成某一个业务功能可能需要横跨多个服务,操作多个数据库,需要操作的资源位于多个资源服务器上分布式事务就是为了保证不同资源服务器数据一致性。典型的分布式事务场景:垮库事务分库分表服务化

DTP模型

构成DTP模型的5个基本元素:

  • AP应用程序Application Program:用于定义事务边界即定义事务的开始和结束,并在事务边界内对资源进行操作
  • RM资源管理器Resource Manager:如数据库文件系统等,并提供访问资源的方式
  • TM事务管理器Transaction Manager:负责分配事务唯一标识,监控事务的执行进度,并负责事务的提交、回滚等
  • CRM通信资源管理器Communication Resource Manager:控制一个TM域内或者跨TM域的分布式应用之间的通信
  • CP通信协议Communication Protocol:提供CRM提供的分布式应用节点之间的底层通信服务

XA规范

在DTP本地模型实例中,由AP应用程序RMs资源管理器TM事务管理器组成,不需要其他元素,AP、RM和TM之间彼此都需要进行交互。XA规范主要作用是定义了RM-TM的交互接口,还对两阶段提交协议进行了优化

两阶段协议是在OSI TP标准中提出的,在DTP参考模型中,指定了全局事务的提交要使用两阶段提交协议;而XA规范只是定义了两阶段提交协议中需要使用到的接口,也就是上述提到的RM-TM交互的接口

两阶段提交协议

两阶段提交协议不是在XA规范中提出,但XA规范对其进行了优化,将提交过程划分为两个阶段

第一阶段TM通知各个RM准备提交它们的事务分支;若RM判断自己进行的工作可以被提交,则对工作内容进行持久化,再给TM肯定答复;若发生了其他情况则给TM的都是否定答复。在发送了否定答复并回滚了工作后,RM就可以丢弃该事务分支信息。

第二阶段TM根据第一阶段各个RM prepare的结果,决定是提交还是回滚事务;若所有RMprepare成功,则TM通知所有RM进行提交;若RM prepare失败则TM通知所有RM回滚事务分支

二阶段提交看起来确实能够提供原子性的操作,但二阶段提交还是有几个缺点:

同步阻塞问题:两阶段提交方案下全局事务的ACID特性是依赖于RM的,一个全局事务内部包含了多个独立的事务分支,这一组事务分支要不都成功要不都失败,各个事务分支的ACID特性共同构成了全局事务的ACID特性可重复读隔离级别不足以保证分布式事务一致性,若使用MySQL来支持XA分布式事务最好将事务隔离级别设置为SERIALIZABLE,而SERIALIZABLE是四个事务隔离级别中最高且执行效率最低一个级别。

单点故障:由于协调者的重要性,一旦协调者TM发生故障,参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,所有参与者还都处于锁定事务资源状态中,而无法继续完成事务操作。若协调者挂掉,可重新选举一个协调者,但无法解决因为协调者宕机导致的参与者处于阻塞状态的问题

数据不一致:在第二阶段中当协调者参与者发送commit请求之后,发生了局部网络异常或在发送commit请求过程中协调者发生故障,会导致只有一部分参与者接受到了commit请求,而在这部分参与者接到commit请求之后就会执行commit操作,但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。


常见分布式事务解决方案

JTA/XA规范实现

针对实现了JDBC规范中规定的实现XADataSource接口的数据库连接池,典型的XADataSource实现包括:

  • MySQL官方提供的com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
  • 阿里巴巴开源的druid连接池,对应的实现类为com.alibaba.druid.pool.xa.DruidXADataSource
  • tomcat-jdbc连接池提供的org.apache.tomcat.jdbc.pool.XADataSource
1
2
3
4
5
6
<!-- MySQL JDBC实现了XA规范 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// true表示打印XA语句,,用于调试
boolean logXaCommands = true;
// 获得资源管理器操作接口实例 RM1
Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_user", "root", "root");
XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn1, logXaCommands);
XAResource rm1 = xaConn1.getXAResource();
// 获得资源管理器操作接口实例 RM2
Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_account", "root", "root");
XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn2, logXaCommands);
XAResource rm2 = xaConn2.getXAResource();
// AP请求TM执行一个分布式事务,TM生成全局事务id
byte[] gtrid = "g12345".getBytes();
int formatId = 1;
try {
// ==============分别执行RM1和RM2上的事务分支====================
// TM生成rm1上的事务分支id
byte[] bqual1 = "b00001".getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
// 执行rm1上的事务分支
rm1.start(xid1, XAResource.TMNOFLAGS);// One of TMNOFLAGS, TMJOIN, or TMRESUME.
PreparedStatement ps1 = conn1.prepareStatement("INSERT into user(name) VALUES ('Eleven')");
ps1.execute();
rm1.end(xid1, XAResource.TMSUCCESS);
// TM生成rm2上的事务分支id
byte[] bqual2 = "b00002".getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 执行rm2上的事务分支
rm2.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement ps2 = conn2.prepareStatement("INSERT into account(user_id, money) VALUES (1, 10000000)");
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
// ===================两阶段提交================================
// phase1:询问所有的RM 准备提交事务分支
int rm1_prepare = rm1.prepare(xid1);
int rm2_prepare = rm2.prepare(xid2);
// phase2:提交所有事务分支
boolean onePhase = false;
//TM判断有2个事务分支,所以不能优化为一阶段提交
if (rm1_prepare == XAResource.XA_OK && rm2_prepare == XAResource.XA_OK) {
// 所有事务分支都prepare成功,提交所有事务分支
rm1.commit(xid1, onePhase);
rm2.commit(xid2, onePhase);
} else {// 如果有事务分支没有成功,则回滚
rm1.rollback(xid1);
rm2.rollback(xid2);
}
} catch (XAException e) { // 如果出现异常,也要进行回滚
e.printStackTrace();
}

开源框架AtomikosTransactionEssentials开源的免费产品,ExtremeTransactions上商业版需要收费。

TransactionEssentials实现了JTA/XA规范中的事务管理器应该实现的相关接口,如UserTransaction实现了com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作该类,TransactionManager实现了com.atomikos.icatch.jta.UserTransactionManagerTransaction实现了com.atomikos.icatch.jta.TransactionImp。

1
2
3
4
5
6
7
8
9
10
11
12
<!-- JTA规范扩展包 -->
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<!-- atomikos JTA/XA全局事务 -->
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
<version>4.0.6</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
// 连接池基本属性
Properties p = new Properties();
p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);
p.setProperty("user", "root");
p.setProperty("password", "root");
// 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
// 设置resourceName 唯一
ds.setUniqueResourceName(dbName);
ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
ds.setXaProperties(p);
return ds;
}
public static void main(String[] args) {
AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");
AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");
Connection conn1 = null;
Connection conn2 = null;
PreparedStatement ps1 = null;
PreparedStatement ps2 = null;
UserTransaction userTransaction = new UserTransactionImp();
try {
// 开启事务
userTransaction.begin();
// 执行db1上的sql
conn1 = ds1.getConnection();
ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
ps1.setString(1, "Eleven");
ps1.executeUpdate();
ResultSet generatedKeys = ps1.getGeneratedKeys();
int userId = -1;
while (generatedKeys.next()) {
// 获得自动生成的userId
userId = generatedKeys.getInt(1);
}
// 模拟异常 ,直接进入catch代码块,2个都不会提交
// int i=1/0;
// 执行db2上的sql
conn2 = ds2.getConnection();
ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
ps2.setInt(1, userId);
ps2.setDouble(2, 10000000);
ps2.executeUpdate();
// 两阶段提交
userTransaction.commit();
} catch (Exception e) {
try {
e.printStackTrace();
userTransaction.rollback();
} catch (SystemException e1) {
e1.printStackTrace();
}
} finally {
try {
ps1.close();
ps2.close();
conn1.close();
conn2.close();
ds1.close();
ds2.close();
} catch (Exception ignore) {
}
}
}

Seata AT模式

Seata相比与其它分布式事务框架有以下几个优势:

  • 应用层基于SQL解析实现了自动补偿,从而最大程度的降低业务侵入性
  • 将分布式事务中TC事务协调者独立部署,负责事务的注册、回滚
  • 通过全局锁实现了写隔离读隔离

Seata提供了ATTCCSAGAXA事务模式,AT模式是Seata首推模式,Seata有TC事务协调者TM事务管理器RM资源管理器三大角色,TC为单独部署的Server服务端,TMRM为嵌入到应用中的Client客户端

  • TC事务协调者:维护全局和分支事务的状态驱动全局事务提交或回滚
  • TM事务管理器定义全局事务的范围开始全局事务、提交或回滚全局事务
  • RM资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务状态,并驱动分支事务提交或回滚

Seata分布式事务生命周期

TM请求TC开启一个全局事务,TC会生成一个XID作为该全局事务的编号,XID会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起;RM请求TC将本地事务注册为全局事务的分支事务通过全局事务的XID进行关联;TM请求TC告诉XID对应的全局事务是进行提交还是回滚;TC驱动RM们将XID对应的自己的本地事务进行提交还是回滚。

AT模式的核心是对业务无侵入,是一种改进后的两阶段提交,前提是基于支持本地ACID事务的关系型数据库Java应用通过JDBC访问数据库。通过两阶段提交:

  • 一阶段业务数据回滚日志记录同一个本地事务中提交,释放本地锁和连接资源,本地事务提交前,需确保先拿到全局锁,否则不能提交本地事务,且拿全局锁的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务释放本地锁
  • 二阶段提交异步化非常快速地完成,回滚通过一阶段的回滚日志进行反向补偿

第一阶段

一阶段解析SQL类型是更新删除还是新增、表名条件等相关信息,根据解析得到的条件信息,生成查询语句定位数据生成前置镜像,然后执行业务SQL,根据前镜像的结果,通过主键定位数据得到后置镜像,把前置镜像后置镜像数据以及业务SQL相关的信息组成一条回滚日志记录,插入到UNDO_LOG表中。提交前向TC注册分支申请目标表中,对应主键值的记录的全局锁,业务数据的更新和前面步骤中生成的UNDO LOG一并提交,将本地事务提交的结果上报给TC。

第二阶段回滚

二阶段回滚,收到TC分支回滚请求,开启一个本地事务,通过XID和Branch ID查找到相应的UNDO LOG记录,拿 UNDO LOG中的后镜与当前数据进行比较,若有不同说明数据被当前全局事务之外的动作做了修改。该情况需要根据配置策略来做处理,根据UNDO LOG中前镜像和业务SQL相关信息生成并执行回滚的语句,提交本地事务并把本地事务的执行结果即分支事务回滚结果上报给TC事务协调者

第二阶段提交

二阶段提交,即分布式事务操作成功TC通知RM异步删除undolog,收到TC分支提交请求,把请求放入一个异步任务队列中,马上返回提交成功结果给TC,异步任务阶段的分支提交请求将异步和批量地删除相应UNDO LOG记录

Seata AT模式存在的问题:

  • 性能损耗:一条Update的SQL需要与TC通讯获取全局事务xid、before image解析SQL查询一次数据库、after image查询一次数据库、insert undo log写一次数据库、before commit与TC通讯判断锁冲突,这些操作都需要同步远程通讯RPC,且undo log写入时blob字段插入性能也不高。每条写SQL都会增加这么多开销,粗略估计会增加5倍响应时间
  • 性价比:为了进行自动补偿,需要对所有交易生成前后镜像并持久化,在实际业务场景下分布式事务失败需要回滚的有多少比率,按照二八原则预估,为了20%的交易回滚,需要将80%的成功交易的响应时间增加5倍,这样的代价相比于让应用开发一个补偿交易是否是值得
  • 全局锁:相比XA,Seata虽然在一阶段成功后会释放数据库锁,但一阶段在commit前全局锁的判定也拉长了对数据锁的占有时间,这个开销比XA的prepare低多少需要根据实际业务场景进行测试。全局锁的引入实现了隔离性,但带来的问题就是阻塞,降低并发性,尤其是热点数据,这个问题会更加严重。
  • 回滚锁释放时间:Seata在回滚时,需要先删除各节点的undo log,然后才能释放TC内存中的锁,所以若第二阶段是回滚,释放锁的时间会更长
  • 死锁问题:Seata的引入全局锁会额外增加死锁风险,若出现死锁会不断进行重试,最后靠等待全局锁超时,这种方式并不优雅,也延长了对数据库锁的占有时间

柔性事务TCC

TCC是比较常用的一种柔性事务方案。开源的TCC框架:Tcc-TransactionHmilyByteTCCEasyTransactionSeata TCC

TCC两阶段提交与XA两阶段提交的区别是:XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁,TCC是业务层面的分布式事务,最终一致性,不会一直持有资源的锁

TCC事务的优点有效了的避免了XA两阶段提交占用资源锁时间过长导致的性能底下的问题。相对于AT模式,TCC模式对业务代码有一定的侵入性,但TCC模式无AT模式的全局行锁,TCC性能会比AT模式高很多。

TCC事务的缺点是主业务服务和从业务服务都需要进行改造,从业务方改造成本更高。原来只需要提供一个接口,现在需要改造成tryconfirmcanel三个接口开发成本高。

空回滚

在没有调用TCC资源Try方法的情况下,调用了二阶段的Cancel方法,Cancel方法需要识别出这是一个空回滚然后直接返回成功,空回滚出现的原因是Try超时丢包分布式事务回滚触发Cancel,出现未收到Try,收到Cancel的情况

悬挂

悬挂CancelTry先执行,要运行空回滚,但要拒绝空回滚之后的Try操作,悬挂出现的原因是Try超时拥堵分布式事务回滚触发Cancel,之后拥堵的Try到达

幂等控制

Try,Confirm,Cancel都需要保证幂等性,因为网络抖动拥堵可能会超时,事务管理器会对资源进行重试操作,所以很可能一个业务操作会被重复调用,为了不因为重复调用而多次占用资源,需要对服务设计时进行幂等控制,通常可用事务xid业务主键判重来控制。

TCC设计注意事项

以扣钱场景为例,场景为A转账30元给B,A和B账户在不同的服务。在微服务架构下,很有可能出现网络超时重发机器宕机等一系列的异常,出现空回滚幂等悬挂的问题。

对于以下示例,都是先执行账户A的try方法,从而执行账户B的try方法,若成功则执行账户A的confirm方法然后执行账户B的confirm方法,若失败则执行账户A的concel方法,然后再执行账户B的cancel方法

方案A

方案C优于方案B优于方案A

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 账户A
try:
检查余额是否够30元     
扣减30元
confirm:

cancel:
增加30元
# 账户B
try:
增加30元       
confirm:
空       
cancel:
减少30元

方案B

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 账户A
try:
检查余额是否够30元
扣减30元
confirm:
空       
cancel:
增加30元
# 账户B
try:
空       
confirm:
增加30元       
cancel:

方案C

需要创建local_transaction_log日志表用于幂等性空回滚try悬挂处理时校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 账户A
try:
try幂等校验     
try悬挂处理     
检查余额是否够30元     
扣减30元       
confirm:
空       
cancel:
cancel幂等校验
cancel空回滚处理
增加可用余额30元
# 账户B
try:

confirm:
confirm幂等校验     
正式增加30元
cancel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Autowired
AccountMapper accountMapper;
@Autowired
Bank2FeignClient bank2FeignClient;
// try方法执行逻辑:try幂等校验,try悬挂处理,检查余额是否足够扣减,扣减金额
// 只要标记@Hmily就是try方法,在注解中指定confirm、cancel两个方法的名字
@Transactional(timeout = 60)
@Hmily(confirmMethod = "commit", cancelMethod = "rollback")
@Override
public void transfer(String fromAccountNo, String toAccountNo, Double amount) {
// 获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 try begin 开始执行...xid:{}", transId);
// 幂等判断 判断local_transaction_log表中是否有try日志记录,如果有则不再执行
if (accountMapper.isExistTransactionLogByType(transId, TransactionEnum.TRY.getValue()) > 0) {
log.info("bank1 try 已经执行,无需重复执行,xid:{}", transId);
return;
}
// try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行
if (accountMapper.isExistTransactionLogByType(transId, TransactionEnum.CONFIRM.getValue()) > 0
|| accountMapper.isExistTransactionLogByType(transId, TransactionEnum.CANCEL.getValue()) > 0) {
log.info("bank1 try悬挂处理 cancel或confirm已经执行,不允许执行try,xid:{}", transId);
return;
}
// 扣减金额
if (accountMapper.subtractAccountBalance(fromAccountNo, amount) <= 0) {
// 扣减失败
throw new RuntimeException("bank1 try 扣减金额失败,xid:" + transId);
}
// 插入try执行记录,用于幂等判断
accountMapper.addTransactionLog(transId, TransactionEnum.TRY.getValue());
// 转账,远程调用bank2
if (!bank2FeignClient.transferTo(toAccountNo, amount)) {
throw new RuntimeException("bank1 远程调用bank2微服务失败,xid:" + transId);
}
log.info("bank2 request end 结束执行...xid:{}", transId);
if (amount == 20) {
throw new RuntimeException("人为制造异常,xid:" + transId);
}
log.info("bank1 try end 结束执行...xid:{}", transId);
}
@Transactional
public void commit(String fromAccountNo, String toAccountNo, Double amount) {
// 获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 confirm begin 开始执行...xid:{},accountNo:{},amount:{}", transId, fromAccountNo, amount);
}
// cancel方法执行逻辑: 1.cancel幂等校验 2.cancel空回滚处理 3.增加可用余额
@Transactional(timeout = 60)
public void rollback(String fromAccountNo, String toAccountNo, Double amount) {
// 获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 cancel begin 开始执行...xid:{}", transId);
// cancel幂等校验
if (accountMapper.isExistTransactionLogByType(transId, TransactionEnum.CANCEL.getValue()) > 0) {
log.info("bank1 cancel 已经执行,无需重复执行,xid:{}", transId);
return;
}
// cancel空回滚处理,如果try没有执行,cancel不允许执行
if (accountMapper.isExistTransactionLogByType(transId, TransactionEnum.TRY.getValue()) <= 0) {
log.info("bank1 空回滚处理,try没有执行,不允许cancel执行,xid:{}", transId);
return;
}
// 增加可用余额
accountMapper.addAccountBalance(fromAccountNo, amount);
//插入一条cancel的执行记录
accountMapper.addTransactionLog(transId, TransactionEnum.CANCEL.getValue());
log.info("bank1 cancel end 结束执行...xid:{}", transId);
}
}

上面的设计并不能进行并发控制,即隔离性的保证,对业务模型进行优化,在业务模型中增加冻结金额字段,用来表示账户有多少金额处以冻结状态

对于用户下单场景,整个业务逻辑由仓储服务、订单服务、帐户服务三个微服务构成,分别完成对给定的商品扣除库存数量、根据采购需求创建订单、从用户帐户中扣除余额。


柔性事务:可靠消息最终一致性方案实现

本地消息表方案

本地消息表这个方案最初是eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。

Rocketmq事务消息实现


柔性事务:最大努力通知

最大努力通知型是最简单的一种柔性事务,是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务,且被动方处理结果不影响主动方的处理结果,典型的使用场景:银行通知商户通知等。最大努力通知型的实现方案,一般符合以下特点,且需要实现消息重复通知机制息校对机制

  • 不可靠消息:业务活动主动方,在完成业务处理之后,向业务活动的被动方发送消息,直到通知N次后不再通知,允许消息丢失
  • 定期校对:业务活动的被动方,根据定时策略,向业务活动主动方查询,主动方提供查询接口,恢复丢失的业务消息