Seata分布式事务原理

以下是分布式事务的核心逻辑,确切的说是SeataTM事务管理器的核心逻辑,跟本地事务的核心逻辑很类似,SeataAT模式也是基于本地事务的,对本地事务的处理都是通过代理对象DataSourceProxy来完成的,DataSourceProxyRM资源管理器的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 获取当前全局事务,若不为空,则交易角色为GlobalTransactionRole.Participant
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
Propagation propagation = txInfo.getPropagation(); // 获取当前全局事务的传播属性
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) { // 根据事务的传播属性做相应的处理
case NOT_SUPPORTED: // 若事务存在,则将其暂停
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
return business.execute(); // 若不存在事务则直接执行业务方法
case REQUIRES_NEW: // 若事务存在,则暂停它,然后开始新的事务
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
break; // 继续执行新的事务
case SUPPORTS: // 若事务不存在,则在没有事务的情况下执行
if (notExistingTransaction(tx)) {
return business.execute();
}
break; // 若事务存在,继续执行事务
case REQUIRED: // 若当前事务存在,则使用当前事务执行,否则继续并使用新事务执行
break;
case NEVER: // 若事务存在,则抛出异常
if (existingTransaction(tx)) {
throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));
} else { // 直接执行业务方法
return business.execute();
}
case MANDATORY: // 事务不存在,则抛出异常。
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
break; // 继续并执行当前事务。
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
if (tx == null) { // 若当前全局事务为null,则创建角色为GlobalTransactionRole.Launcher的新事务
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {// 若tx角色是GlobalTransactionRole.Launcher,则发送beginTransaction的请求给TC,否则什么都不做
beginTransaction(txInfo, tx); // 开启全局事务
Object rs;
try {// Do Your Business
rs = business.execute(); // 执行业务逻辑
} catch (Throwable ex) { // 回滚所需的业务异常
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
commitTransaction(tx); // 提交全局事务
return rs;
} finally { //5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally { // If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
}

首先是获取当前全局事务,首先通过RootContext获取全局事务的Xid,若Xid不为空则说明是嵌套事务,则通过Xid创建一个角色为ParticipantGlobalTransaction。这个角色是非常重要的。然后根据事务的传播属性作相应的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class GlobalTransactionContext {
public static GlobalTransaction getCurrent() {
String xid = RootContext.getXID();
if (xid == null) {
return null;
}
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}
public static GlobalTransaction createNew() {
return new DefaultGlobalTransaction();
}
}
DefaultGlobalTransaction() {
this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}

开启全局事务

若全局事务GlobalTransaction角色是GlobalTransactionRole.Launcher,则向TC事务协调者发送开启全局事务请求且获取全局事务Xid,若角色为Participant则说明是嵌套事务分支事务,不需要开启新的全局事务故直接返回。若需要开启全局事务,则通过RPCTC发送开启全局事务请求,并获取全局事务Xid并通过RootContext绑定到线程上下文中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class TransactionalTemplate {
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
tx.begin(txInfo.getTimeOut(), txInfo.getName()); // 开启全局事务
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);

}
}
}
public class DefaultGlobalTransaction implements GlobalTransaction {
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
return; // 调用者业务方法可能多重嵌套创建多个GlobalTransaction对象开启事务方法,故GlobalTransaction有GlobalTransactionRole角色属性,只有Launcher角色的才有开启、提交、回滚事务的权利
}
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists, can't begin a new global transaction, currentXid = " + currentXid);
}
xid = transactionManager.begin(null, null, name, timeout); // 获取全局事务id
status = GlobalStatus.Begin; // 设置全局事务未begin状态
RootContext.bind(xid); // 将xid保存到线程上下文中
}
}
public class DefaultTransactionManager implements TransactionManager {
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); // 同步RPC调用Seata-Server服务获取全局事务id
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid(); // 获取RPC调用返回的全局事务Id,xid结构:idAddress + ":" + port + ":" + tranId
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
}

最终RPC掉到事务协调者TCDefaultCoordinatordoGlobalBegin方法,默认超时时间是60s。通过ipAddressporttransactionId生成全局唯一的事务Xid,并最终将全局事务信息插入global_table表中。且将全局事务Xid返回给事务管理器TM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout()));
}
}
public class DefaultCore implements Core {
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); // 生成全局session
// SessionHolder中根据配置的mode类型,通过解析SessionManager实现类上的@LoadLevel注解加载具体的SessionManager
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
session.begin(); // seata支持三种存储方式:file、db、redis,在seata-server的file.conf中指定store.mode
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC, session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
return session.getXid();
}
}
public class GlobalSession implements SessionLifecycle, SessionStorable {
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
this.beginTime = System.currentTimeMillis();
this.active = true;
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this); // 调用AbstractSessionManager的onBegin存储生成的GlobalSession
}
}
}
public class GlobalSession implements SessionLifecycle, SessionStorable {
public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout);
return session;
}
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
this.transactionId = UUIDGenerator.generateUUID();
this.status = GlobalStatus.Begin;
this.applicationId = applicationId;
this.transactionServiceGroup = transactionServiceGroup;
this.transactionName = transactionName;
this.timeout = timeout;
this.xid = XID.generateXID(transactionId);
}
}
public class XID {
public static String generateXID(long tranId) {
return ipAddress + ":" + port + ":" + tranId;
}
}
public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
public void onBegin(GlobalSession globalSession) throws TransactionException {
addGlobalSession(globalSession); // 调用具体的SessionManager的addGlobalSession方法存储生成的GlobalSession
}
}
@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager implements Initialize {
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (StringUtils.isBlank(taskName)) {// 调用实现类DataBaseTransactionStoreManager的writeSession,将全局事务信息插入global_table表中
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
} else {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
}
}
}

分支事务执行

分支事务的执行是依赖于本地事务,通过DataSourceProxy代理类创建的ConnectionProxy来完成分支事务的提交回滚等操作,通过ConnectionProxy代理类中创建的PreparedStatementProxy来完成具体SQL的执行。若获取不到全局锁不是AT模式直接执行本地事务。根据具体SQL类型获取到具体的事务执行器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}
}
public class ExecuteTemplate {
public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
return execute(null, statementProxy, statementCallback, args);
}
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// 若不能获取到全局锁 或不是AT模式 则直接执行本地事务
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT: // 根据具体的dbType以及@LoadLevel加载具体的InsertExecutor,这里以MySQLInsertExecutor为例
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try { // 调用MySQLInsertExecutor的超类BaseTransactionalExecutor的execute方法
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}
}

执行分支事务时首先绑定全局事务Xid,将事务的自动提交设置为false,若发生锁冲突会失败重试默认重试次数为30,且LockRetryPolicy清理undo日志、锁的key且本地事务回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
public T execute(Object... args) throws Throwable {
String xid = RootContext.getXID();
if (xid != null) {
statementProxy.getConnectionProxy().bind(xid); // 绑定全局事务Id
}
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args); // 调用超类AbstractDMLBaseExecutor的doExecute
}
}
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
public T doExecute(Object... args) throws Throwable { // 这里将事务的自动提交设置为手动提交
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); // 获取到代理的Connection
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args); // 本地事务提交逻辑,这里会将事务设置为手动提交
} else {
return executeAutoCommitFalse(args); // 设置本地事务提交逻辑为手动提交
}
}
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
connectionProxy.setAutoCommit(false); // 设置本地事务为手动提交
return new LockRetryPolicy(connectionProxy).execute(() -> {
T result = executeAutoCommitFalse(args);
connectionProxy.commit();
return result;
});
} catch (Exception e) {
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback(); // 回滚本地事务
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
public <T> T execute(Callable<T> callable) throws Exception {
if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) { // 默认为true
return doRetryOnLockConflict(callable);
} else {
return callable.call();
}
}
protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
LockRetryController lockRetryController = new LockRetryController();
while (true) {
try {
return callable.call(); // 调用doCommit()方法分支事务注册
} catch (LockConflictException lockConflict) {
onException(lockConflict); // LockRetryPolicy清理undo日志,锁的key,且本地事务回滚
lockRetryController.sleep(lockConflict);
} catch (Exception e) {
onException(e);
throw e;
}
}
}
protected void onException(Exception e) throws Exception {
ConnectionContext context = connection.getContext();
context.getUndoItems().clear();
context.getLockKeysBuffer().clear();
connection.getTargetConnection().rollback();
}
}
public class LockRetryController {
public void sleep(Exception e) throws LockWaitTimeoutException {
if (--lockRetryTimes < 0) { // lockRetryTimes默认30次
throw new LockWaitTimeoutException("Global lock wait timeout", e);
}
try {
Thread.sleep(lockRetryInternal); // 默认休眠10ms
} catch (InterruptedException ignore) {
}
}
}

最终会调用executeAutoCommitFalse方法来生成前置镜像执行本地事务SQL,以及生成后置镜像,且通过前置镜像和后置镜像生成undo_log且将其保存到缓存中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
TableRecords beforeImage = beforeImage(); // 生成前置镜像
T result = statementCallback.execute(statementProxy.getTargetStatement(), args); // 执行业务SQL
TableRecords afterImage = afterImage(beforeImage); // BaseInsertExecutor生成后置镜像,为了查后置镜像事务隔离级别为读未提交
prepareUndoLog(beforeImage, afterImage); // 通过前置镜像和后置镜像生成undo log回滚日志
return result;
}
}
public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
return;
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage); // 将前置镜像和后置镜像封装为SQLUndoLog
connectionProxy.appendUndoLog(sqlUndoLog); // 缓存sqlUndoLog对象,并未插入到数据库
}
}

然后调用ConnectionProxycommit方法完成分支事务注册,然后将undo日志保存到undo_log表中,以及本地事务提交。内层的事务提交不会再走锁冲突重试机制了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class ConnectionProxy extends AbstractConnectionProxy {
public void commit() throws SQLException {
try {
LOCK_RETRY_POLICY.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit()) {
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit(); // 分支事务注册,发起BranchRegisterRequest请求,返回分支branchId,保存undolog,数据库本地事务提交
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks(); // 数据库本地事务提交
} else {
targetConnection.commit(); // 数据库本地事务提交
}
}
private void processGlobalTransactionCommit() throws SQLException {
try {
register(); // 注册分支事务
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try { // 根据具体的dbType获取具体的UndoLogManager,这里以MySQLUndoLogManager为例
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // 保存undolog到undo_log表中
targetConnection.commit(); // 数据库本地事务提交
} catch (Throwable ex) {
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
}
public static class LockRetryPolicy {
public <T> T execute(Callable<T> callable) throws Exception {
if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) { // 默认为true
return callable.call();
} else { // 若为false,会有锁的重试机制
return doRetryOnLockConflict(callable);
}
}
}

分支事务注册,通过资源管理器RMTC发起分支事务注册请求请求,返回分支branchId且将其绑定到ConnectionContext中。

1
2
3
4
5
6
7
8
9
10
public class ConnectionProxy extends AbstractConnectionProxy {
private void register() throws TransactionException {
if (!context.hasUndoLog() || context.getLockKeysBuffer().isEmpty()) {
return;
}
// 分支事务注册,发起BranchRegisterRequest请求,返回分支branchId
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), null, context.buildLockKeys());
context.setBranchId(branchId);
}
}

通过RPC注册分支事务最终执行DefaultCoordinatordoBranchRegister方法,首先通过全局事务Xidglobal_table表中查询全局事务检查其是否存在。然后获取全局锁收集行锁,将其保存到lock_table表中,最后将分支事务信息保存到branch_table表中,然后返回分支事务branchId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException {
// 注册分支事务,返回branchId
response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), request.getXid(), request.getApplicationData(), request.getLockKey()));
}
}
public class DefaultCore implements Core {
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);
}
}
public abstract class AbstractCore implements Core {
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId);
branchSessionLock(globalSession, branchSession); // 获取全局锁,且收集行锁,将其保存到lock_table表
try {
globalSession.addBranch(branchSession); // 向全局Session中添加分支session
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession); // 释放全局锁,DB模式将删除lock_table表对应记录
throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()), ex);
}
return branchSession.getBranchId();
});
}
private GlobalSession assertGlobalSessionNotNull(String xid, boolean withBranchSessions) throws TransactionException {
// DB模式,从数据库获取GlobalSession,若withBranchSessions为true则包括分支事务信息
GlobalSession globalSession = SessionHolder.findGlobalSession(xid, withBranchSessions);
if (globalSession == null) { // 若查询不到可能是事务超时了,则抛出异常
throw new GlobalTransactionException(TransactionExceptionCode.GlobalTransactionNotExist, String.format("Could not found global transaction xid = %s, may be has finished.", xid));
}
return globalSession;
}
}
public class GlobalSession implements SessionLifecycle, SessionStorable {
public void addBranch(BranchSession branchSession) throws TransactionException {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onAddBranch(this, branchSession); // 将分支事务信息保存到branch_table表中
}
branchSession.setStatus(BranchStatus.Registered); // 设置分支session状态是已注册
add(branchSession);
}
}
@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager implements Initialize {
public void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException {
if (StringUtils.isNotBlank(taskName)) {
return;
}
// 将分支事务信息插入branch_table表中
boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_ADD, session);
if (!ret) {
throw new StoreException("addBranchSession failed.");
}
}
}

若分支事务执行一次会调用rollback方法,然后RPC调用TC报告该分支事务执行失败,在TM向TC发送事务回滚请求时,TC遍历所有注册的分支事务时,会判断分支事务状态,若失败则不会在做多余的回调操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ConnectionProxy extends AbstractConnectionProxy {
public void rollback() throws SQLException {
targetConnection.rollback(); // 回滚本地事务
if (context.inGlobalTransaction() && context.isBranchRegistered()) {
report(false); // 报告TC分支事务执行失败
}
context.reset();
}
private void report(boolean commitDone) throws SQLException {
if (context.getBranchId() == null) {
return;
}
int retry = REPORT_RETRY_COUNT;
while (retry > 0) {
try {
DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(), commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
return;
} catch (Throwable ex) {
retry--;
if (retry == 0) {
throw new SQLException("Failed to report branch status " + commitDone, ex);
}
}
}
}
}

事务提交

通过分布式事务的提交只能由TMLauncher角色来完成,向TC发起全局事务提交请求,RPC调用DefaultCoordinator#doGlobalCommit,若事务提交失败会重试,默认5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class TransactionalTemplate {
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
tx.commit(); // 提交全局事务
triggerAfterCommit();
} catch (TransactionException txe) {// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
}
}
}
public class DefaultGlobalTransaction implements GlobalTransaction {
public void commit() throws TransactionException {
// 调用者业务方法可能多重嵌套创建多个GlobalTransaction对象开启事务方法,故GlobalTransaction有GlobalTransactionRole角色属性,只有Launcher角色的才有开启、提交、回滚事务的权利
if (role == GlobalTransactionRole.Participant) {
return;
}
assertXIDNotNull();
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try { // 发起全局事务提交,发起GlobalCommitRequest请求,RPC调用DefaultCoordinator#doGlobalCommit
status = transactionManager.commit(xid); // 提交全局事务,若异常则重试,默认5次
break;
} catch (Throwable ex) {
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
}
}
public class DefaultTransactionManager implements TransactionManager {
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
// 发起全局事务提交,发起GlobalCommitRequest请求,RPC调用DefaultCoordinator#doGlobalCommit
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
}

TC收到提交事务请求后,首先检查当前全局事务是否存在,若不存在直接返回失败,若支持异步提交,则将全局事务的状态改为AsyncCommitting异步提交,然后通过异步任务提交分支事务。若不支持则同步调用doGlobalCommit方法遍历分支事务提交分支事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException {
response.setGlobalStatus(core.commit(request.getXid()));
}
}
public class DefaultCore implements Core {
public GlobalStatus commit(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid); // DB模式查询global_table表
if (globalSession == null) { // 若查询不到全局事务则返回失败
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// Highlight: Firstly, close the session, then no more branch can be registered.
globalSession.closeAndClean();
if (globalSession.getStatus() == GlobalStatus.Begin) {
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit(); // AT模式默认异步提交,正常情况下最终返回GlobalStatus.Committed
return false;
} else {
globalSession.changeStatus(GlobalStatus.Committing);
return true; // 同步提交分支事务
}
}
return false;
});
if (shouldCommit) { // 若不支持异步提交,则同步提交分支事务
boolean success = doGlobalCommit(globalSession, false);
if (success && !globalSession.getBranchSessions().isEmpty()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
}
public class GlobalSession implements SessionLifecycle, SessionStorable {
public void asyncCommit() throws TransactionException { // 被DefaultCoordinator#handleAsyncCommitting异步轮训处理AsyncCommitting任务
this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());
// 更新global_table表记录中对应数据状态status为1即Begin
SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);
this.changeStatus(GlobalStatus.AsyncCommitting); // 更新状态为GlobalStatus.AsyncCommitting
}
}

事务协调者TC服务启动类Servermain方法中会调用DefaultCoordinatorinit方法,从而初始化异步延时线程池用于异步提交分支事务。不论同步提交还是异步提交分支事务最终都是通过调用doGlobalCommit方法完成的。AT模式默认是支持异步提交分支事务。分支事务的提交是通过给每个分支资源管理器RM发送提交请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
private ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncCommitting", 1));
protected void handleAsyncCommitting() {
Collection<GlobalSession> asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager().allSessions();
if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
return;
}
for (GlobalSession asyncCommittingSession : asyncCommittingSessions) { // 遍历全局事务
try {
if (GlobalStatus.AsyncCommitting != asyncCommittingSession.getStatus()) {
continue;
}
asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
core.doGlobalCommit(asyncCommittingSession, true); // 遍历分支事务提交
} catch (TransactionException ex) {
}
}
}
public void init() {
retryRollbacking.scheduleAtFixedRate(() -> {
try {
handleRetryRollbacking(); // 初始化retryRollbacking定时任务线程池,默认间隔1s
} catch (Exception e) {
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
retryCommitting.scheduleAtFixedRate(() -> {
try {
handleRetryCommitting(); // 初始化handleRetryCommitting定时任务线程池,默认间隔1s
} catch (Exception e) {
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
asyncCommitting.scheduleAtFixedRate(() -> {
try {
handleAsyncCommitting(); // 初始化handleAsyncCommitting定时任务线程池,默认间隔1s
} catch (Exception e) {
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
timeoutCheck.scheduleAtFixedRate(() -> {
try {
timeoutCheck();
} catch (Exception e) {
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
undoLogDelete.scheduleAtFixedRate(() -> {
try {
undoLogDelete();
} catch (Exception e) {
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
}
public class DefaultCore implements Core {
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
for (BranchSession branchSession : globalSession.getSortedBranches()) {
if (!retrying && branchSession.canBeCommittedAsync()) {
continue; // 如果不重试,请跳过canBeCommittedAsync分支
}
BranchStatus currentStatus = branchSession.getStatus();
if (currentStatus == BranchStatus.PhaseOne_Failed) { // 当前分支事务状态为PhaseOne_Failed
globalSession.removeBranch(branchSession); // 释放全局锁,删除branch_table信息,删除分支事务信息,移除缓存
continue;
}
try { // 提交分支事务,RPC调用AsyncWorker#doBranchCommits向RM发起BranchCommitRequest请求
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Committed: // 提交成功
globalSession.removeBranch(branchSession); // 释放全局锁,删除branch_table信息,删除分支事务信息,移除缓存
continue;
case PhaseTwo_CommitFailed_Unretryable: // 支持异步提交,则交给定时线程池处理
if (globalSession.canBeCommittedAsync()) {
continue;
} else {
SessionHelper.endCommitFailed(globalSession);
return false;
}
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {
continue;
} else {
return false;
}
}
} catch (Exception ex) {
if (!retrying) {
globalSession.queueToRetryCommit();
throw new TransactionException(ex);
}
}
}
if (globalSession.hasBranch()) {
return false;
}
}
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession);
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
}
return success;
}
}

TC发送的分支事务提交的请求最终会调用RM的AsyncWorkerbranchCommit方法,将请求放入队列中,在init方法中创建了异步处理任务,隔1s执行一次,这里做的仅仅是删除undo_log表中对应的记录,AT模式本地事务在执行完成后就已提交,这里无需做特别的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class AsyncWorker implements ResourceManagerInbound {
private static final BlockingQueue<Phase2Context> ASYNC_COMMIT_BUFFER = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
}
return BranchStatus.PhaseTwo_Committed;
}
public synchronized void init() {
ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
timerExecutor.scheduleAtFixedRate(() -> {
try {
doBranchCommits();
} catch (Throwable e) {
}
}, 10, 1000 * 1, TimeUnit.MILLISECONDS); // 每1s执行一次
}
private void doBranchCommits() { // 因为都成功了,无需回滚成功的数据,只需要删除生成的操作日志就行,采用异步方式,提高效率
if (ASYNC_COMMIT_BUFFER.isEmpty()) {
return;
}
Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
List<Phase2Context> contextsGroupedByResourceId;
while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
contextsGroupedByResourceId = CollectionUtils.computeIfAbsent(mappedContexts, commitContext.resourceId, key -> new ArrayList<>());
contextsGroupedByResourceId.add(commitContext);
}
for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
Connection conn = null;
DataSourceProxy dataSourceProxy;
try {
try {
DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get().getResourceManager(BranchType.AT);
dataSourceProxy = resourceManager.get(entry.getKey());
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
}
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
continue;
}
contextsGroupedByResourceId = entry.getValue();
Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
for (Phase2Context commitContext : contextsGroupedByResourceId) {
xids.add(commitContext.xid);
branchIds.add(commitContext.branchId);
int maxSize = Math.max(xids.size(), branchIds.size());
if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
try { // 删除undo_log记录
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);
} catch (Exception ex) {
}
xids.clear();
branchIds.clear();
}
}
if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
return;
}
try { // 删除undo_log记录
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);
} catch (Exception ex) {
}
if (!conn.getAutoCommit()) {
conn.commit();
}
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException rollbackEx) {
}
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
}
}
}
}
}
}

事务回滚

同事务的提交一样事务的回滚,也只能通过TMLauncher角色才能发起,若回滚异常同样会重试5,最终也是通过RPC调用TC执行回滚操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class TransactionalTemplate {
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
//roll back
if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {
rollbackTransaction(tx, originalException); // 异常回滚
} catch (TransactionException txe) { // Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException);
}
} else { // not roll back on this exception, so commit
commitTransaction(tx);
}
}
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus()) ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
}
public class DefaultGlobalTransaction implements GlobalTransaction {
public void rollback() throws TransactionException {
// 调用者业务方法可能多重嵌套创建多个GlobalTransaction对象开启事务方法,故GlobalTransaction有GlobalTransactionRole角色属性,只有Launcher角色的才有开启、提交、回滚事务的权利
if (role == GlobalTransactionRole.Participant) {
return;
}
assertXIDNotNull();
int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
try {
while (retry > 0) {
try { // 调用DefaultTransactionManager获取回滚状态,出现异常会进行重试,默认5次
status = transactionManager.rollback(xid);
break;
} catch (Throwable ex) {
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global rollback", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
}
}
public class DefaultTransactionManager implements TransactionManager {
public GlobalStatus rollback(String xid) throws TransactionException { // 调用seata-server发起GlobalRollbackRequest
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback); // RPC调用DefaultCoordinator#doGlobalRollback
return response.getGlobalStatus(); // 获取RPC调用返回的全局事务状态
}
}

最终通过TC的DefaultCoordinatordoGlobalRollback方法,遍历所有的分支事务给所有分支发送事务回滚通知,若回滚成功释放全局锁删除branch_table信息,移除缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException {
response.setGlobalStatus(core.rollback(request.getXid())); // 全局事务回滚
}
}
public class DefaultCore implements Core {
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid); // DB模式从数据库获取GlobalSession信息
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
globalSession.close();
if (globalSession.getStatus() == GlobalStatus.Begin) {
globalSession.changeStatus(GlobalStatus.Rollbacking);
return true;
}
return false;
});
if (!shouldRollBack) {
return globalSession.getStatus();
}
doGlobalRollback(globalSession, false);
return globalSession.getStatus();
}
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start rollback event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(), globalSession.getBeginTime(), null, globalSession.getStatus()));
if (globalSession.isSaga()) { // 执行Saga模式的回滚逻辑
success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
} else {
for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) { // 若是第一阶段失败
globalSession.removeBranch(branchSession); // 删除分支事务信息,释放全局锁
continue;
}
try {
BranchStatus branchStatus = branchRollback(globalSession, branchSession); // 分支事务回滚,返回分支事务状态
switch (branchStatus) {
case PhaseTwo_Rollbacked:
globalSession.removeBranch(branchSession); // 删除分支事务信息,释放全局锁
continue;
case PhaseTwo_RollbackFailed_Unretryable: // 返回TimeoutRollbackFailed或RollbackFailed
SessionHelper.endRollbackFailed(globalSession);
return false;
default:
if (!retrying) {
globalSession.queueToRetryRollback();
}
return false;
}
} catch (Exception ex) {
if (!retrying) {
globalSession.queueToRetryRollback();
}
throw new TransactionException(ex);
}
}
GlobalSession globalSessionTwice = SessionHolder.findGlobalSession(globalSession.getXid());
if (globalSessionTwice != null && globalSessionTwice.hasBranch()) {
return false;
}
}
if (success) {
SessionHelper.endRollbacked(globalSession); // 删除全局事务
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC, globalSession.getTransactionName(), globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
}
return success;
}
}

RM收到TC回滚通知后通过DataSourceManagerbranchRollback进行响应的处理,主要逻辑是通过事务xid分支事务branchIdundo_log表中查询出undo_log,然就将其解析遍历执行rollback补偿,最后将其从undo_log表中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class DataSourceManager extends AbstractResourceManager implements Initialize {
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked; // 返回BranchStatus
}
}
public abstract class AbstractUndoLogManager implements UndoLogManager {
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
conn = dataSourceProxy.getPlainConnection();
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); // Find UNDO LOG 找到undolog
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try { // put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn); // 执行rollback补偿
}
} finally { // remove serializer name
removeCurrentSerializer();
}
}
if (exists) {
deleteUndoLog(xid, branchId, conn); // 删除undo_log
conn.commit();
} else {
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
}
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, e.getMessage()), e);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
}
} catch (SQLException closeEx) {
}
}
}
}
}