事务调用原理

@Transactional生效的方法的调用,还是走AOP代理调用逻辑,即通过ReflectiveMethodInvocationproceed()调用逻辑,不过这里调用的拦截器链不是通过注解自定义配置的,而是在ProxyTransactionManagementConfiguration配置类中配置的TransactionInterceptor

1
2
3
4
5
6
7
8
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取代理对象的class属性
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 在事务中执行目标方法,在这埋了一个钩子函数invocation::proceed,用来回调目标方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}

这里解释事务的核心逻辑,首先通过事务属性主要是传播属性是否为嵌套事务等判断是否有必要创建事务,然后通过invocation.proceedWithInvocation()函数式接口回到ReflectiveMethodInvocationproceed()中去调用真正的目标方法,若调用目标方法抛出异常,则会走事务回滚逻辑,否则提交事务。

TransactionAttributeSource配置类中导入的,TransactionAttribute做切点匹配时已经解析完成其包含事务属性,PlatformTransactionManager是配置类中添加其包含数据源,连接点标识符joinpointIdentification是匹配事务时得到的并设置到事务属性的descriptor中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
// 获取在配置类中添加的事务属源对象,在创建代理进行匹配时将解析的事务属性赋值
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取解析后的事务属性信息,创建代理时已调用getTransactionAttribute,这里从解析后的缓存中获取
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取配置的事务管理器对象
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 从tx属性对象中获取出标注了@Transactionl的方法描述符
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // 处理声明式事务
// 若有必要创建事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
retVal = invocation.proceedWithInvocation(); // 调用钩子函数进行回调目标方法
} catch (Throwable ex) { // 抛出异常进行回滚处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
cleanupTransactionInfo(txInfo); // 清空我们的线程变量中transactionInfo的值
}
commitTransactionAfterReturning(txInfo); // 提交事务
return retVal;
} else { // 编程式事务:(回调偏向)
final ThrowableHolder throwableHolder = new ThrowableHolder();
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
} catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new ThrowableHolderException(ex);
}
} else {
throwableHolder.throwable = ex;
return null;
}
} finally {
cleanupTransactionInfo(txInfo);
}
});
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
} catch (ThrowableHolderException ex) {
throw ex.getCause();
} catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
} catch (Throwable ex2) {
throw ex2;
}
}
}
}

若事务没有定义名称,则把连接点的名称定义成事务的名称,然后获取当前事务状态,并将事物状态和事物属性等信息封装成一个TransactionInfo对象便于后续使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 若没定义名字,把连接点的名称定义成事务的名称
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {//获取一个事务状态
status = tm.getTransaction(txAttr);
}
}
// 把事物状态和事物属性等信息封装成一个TransactionInfo对象
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
txInfo.newTransactionStatus(status);
}
txInfo.bindToThread(); //把事务信息对象绑定到当前线程变量中
return txInfo;
}
}

在执行事务前会将上层事务状态,从事务信息线程本地变量中获取存入oldTransactionInfo,若为顶层事务该值肯定为null,但嵌套事务肯定不为null,当执行完用户代码逻辑后在finally中调用cleanupTransactionInfo从而调用restoreThreadLocalStatus将上层事务信息再返回线程本地变量中。

1
2
3
4
5
6
7
8
9
10
11
12
protected static final class TransactionInfo {
public void newTransactionStatus(@Nullable TransactionStatus status) {
this.transactionStatus = status;
}
private void bindToThread() {
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
private void restoreThreadLocalStatus() {
transactionInfoHolder.set(this.oldTransactionInfo);
}
}

首先New一个事务对象DataSourceTransactionObject,其实主要是看数据库连接持有者ConnectionHolder是否存在,然后判断是否已存在事务对象,判断依据是通过事务对象中的是否存在ConnectionHolder,以及ConnectionHolder中事务是否已经激活,若已经存在事务说明是嵌套事务,则走handleExistingTransaction单独的嵌套事务逻辑。若不存在说明是新事务,则根据事务的传播属性进行相关的逻辑处理。

当事务传播行为是MANDATORY直接抛出异常,当事务传播行为是REQUIREDREQUIRES_NEWNESTED将开启一个新事务,首先挂起当前事务,由于顶层事务当前还没创建事务不需要挂起,故suspend(null)传入null。newSynchronization允许开启同步事务,status构造事务状态对象并将事务信息封装进去,然后doBegin开启一个新事务,最后将当前事务信息绑定到线程本地变量中,为嵌套事务提供支持

事务传播行为类型 外部不存在事务 外部存在事务 备注
REQUIRED(默认) 开启新事务 融合到外部事务中 外层影响内层,内层影响外层
SUPPORTS 不开启新事务 融合到外部事务中 外层影响内层,内层影响外层
REQUIRES_NEW 开启新事务 挂起外部事务,创建新的事务 外层不影响内层,内层影响外层
NOT_SUPPORTED 不开启新事务 挂起外部事务,不开启事务 外层不影响内层,内层影响外层
且内层没有事务,即使异常也不滚
NEVER 不开启新事务 抛出异常 不常用
MANDATORY 抛出异常 融合到外部事务中 外层影响内层,内层影响外层
NESTED 开启新事务 融合到外部事务中,SavePoint机制 外层影响内层,内层不会影响外层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction(); // 尝试获取一个事务对象
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) { // 判断从上一个方法传递进来的事务属性是不是为空
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) { // 判断是不是已经存在了事务对象(事务嵌套)
return handleExistingTransaction(definition, transaction, debugEnabled);//处理存在的事务
}

if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {//检查事务设置的超时时间
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 若当前的事务属性为PROPAGATION_MANDATORY表示必须运行在事务中,若当前没有事务就抛出异常
// 由于isExistingTransaction(transaction)跳过了这里,说明当前是不存在事务的,那么就会抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// PROPAGATION_REQUIRED:当前存在事务就加入到当前的事务,没有就新开一个
// PROPAGATION_REQUIRES_NEW:新开一个事务,若当前存在事务就挂起当前事务
// PROPAGATION_NESTED: 表示若当前正有一个事务在运行中,则该方法应该运行在一个嵌套的事务中,
// 被嵌套的事务可独立于封装事务进行提交或回滚(保存点),若封装事务不存在,行为就像PROPAGATION_REQUIRES_NEW
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 经过上面的isExistingTransaction(transaction)判断当前是不存在事务的,故在此处是挂起当前事务传递一个null进去
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 可进行同步
// 构造事务状态对象,newTransaction=true代表是一个新事务
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition); //开启一个新的事物
prepareSynchronization(status, definition);// 把当前的事务信息绑定到线程本地变量中
return status;
} catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
} else { //创建一个空的事务
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
}

TransactionSynchronizationManager事务同步管理器对象,该类中都是局部线程变量,用来保存当前事务的信息,第一次从这里去线程变量中获取事务连接持有器对象,通过数据源为key去获取,由于第一次进来开始事务,事务同步管理器中没有被存放,所以此时获取出来的conHolder为null。

doBegin中首先判断事务对象有没有数据库连接持有器,对于顶层事务明显是没有,则从数据源中获取一个Connection连接,然后新建一个将ConnectionHolder并将其设置其中,将synchronizedWithTransaction属性设置为true,通过将autoCommit属性这是为false来开启一个事务,将ConnectionHolder的transactionActive属性设置为true,为嵌套事务做准备,最后将该数据源的该连接存入事务同步管理器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject(); // 创建一个数据源事务对象
txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 是否允许当前事务设置保持点
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject; //返回事务对象
}
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 若第一次进来开始事务,txObject.hasConnectionHolder()返回的null,表示不存在事务
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; // 强制转换事物对象
Connection con = null;
try {
// 判断事务对象没有数据库连接持有器
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection(); // 若没有,则通过数据源获取一个数据库连接对象
// 把数据库连接包装成一个ConnectionHolder对象,然后设置到txObject对象中去
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 标记当前的连接是一个同步事务
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 设置isReadOnly、隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// setAutoCommit默认为true,即每条SQL语句在各自的一个事务中执行。
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false); // 开启事务
}
prepareTransactionalConnection(con, definition); // 判断事务为只读事务
txObject.getConnectionHolder().setTransactionActive(true); // 设置事务激活
int timeout = determineTimeout(definition); // 设置事务超时时间
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 绑定数据源和连接到同步管理器上,把数据源作为key,数据库连接作为value,设置到线程本地变量中
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource()); // 释放数据库连接
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
//绑定事务激活
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
//当前事务的隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null);
//是否为只读事务
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
//事务的名称
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
}

若执行异常则回滚事务,否则提交事务,不论是提交事务还是回滚事务之前都做了一个判断,当status.isNewTransaction()false时不会去执行响应的提交或回滚逻辑,主要是为了支持融合事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
doRollback(status);
} else {
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
doSetRollbackOnly(status);
}
}
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
} finally {
cleanupAfterCompletion(status);
}
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
try {
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
cleanupAfterCompletion(status);
}
}
}
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {
con.rollback();
} catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {
con.commit();
} catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
}

存在嵌套事务

要触发嵌套事务,若调用的是本类方法一点要保证将动态代理暴露在线程中并结合AopContext使用。嵌套事务的处理同样是根据事务的传播性来处理的。若事务传播性为NEVER则嵌套事务直接抛出异常;

若事务传播性为REQUIRES_NEW则挂起外部事务,然后创建一个新的事务,并将旧的事务信息暂存,执行事务和顶层事务没什么区别;

若事务传播性为SUPPORTS则融合到外部事务中,返回的事务状态中newTransactionnewSynchronization属性都为false,事务的newConnectionHoldermustRestoreAutoCommit属性也为默认值false,由于newSynchronization属性为false这内部事务执行完后提交事务triggerAfterCommit不会做任务处理,当内部事务执行完毕后交由外部事务处理相关提交事务和回滚事务逻辑;

若事务传播性为NOT_SUPPORTED则挂起外部事务,返回的事务状态transaction属性为nullnewTransaction属性为false,newSynchronization属性为true,故执行完后也不会有事务的提交。

若事务传播性为NESTED则融合到外部事务中,且创建一个保存点,返回的事务状态中,newTransactionnewSynchronization属性为false。这里的内层不影响外层指的是可将内层异常捕获内层事务回滚,外层事务正常提交,若是其他几种融合到外部事务的传播属性捕获异常会导致程序异常,因为其在回滚时会通过doSetRollbackOnly(status)将事务设置为只能回滚的,若捕获内层异常,在外层事务提交时将抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
// NEVER存在外部事务则抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
}
// NOT_SUPPORTED存在外部事务则挂起外部事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
// 挂起存在的事物
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 创建一个新的非事物状态,保存了上一个存在事物状态的属性
return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}

// REQUIRES_NEW存在外部事务则挂起外部事务,创建新的事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
// 挂起已经存在的事物
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 是否需要新开启同步
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//创建一个新的事物状态(包含了挂起的事务的属性)
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//开启新的事物
doBegin(transaction, definition);
//把新的事物状态设置到当前的线程变量中去
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}

// NESTED存在外部事务则融合到外部事务中,应用层面和REQUIRED一样,源码层面
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
}
// 是否支持保存点:非JTA事务走这个分支。AbstractPlatformTransactionManager默认是true,JtaTransactionManager复写了该方法false,DataSourceTransactionManager没有复写,还是true,
if (useSavepointForNestedTransaction()) {
//开启一个新的事物
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 为事物设置一个回退点,savepoint可以在一组事务中,设置一个回滚点,点以上的不受影响,点以下的回滚。即外层影响内层,内层不会影响外层
status.createAndHoldSavepoint();
return status;
} else { // JTA事务走这个分支,创建新事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
}

挂起外部事务其实就是将外部事务的从事务同步管理器线程本地变量中获取到封装成一个SuspendedResourcesHolder,然后清空事务同步管理器中的数据。顶层事务这里返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
// 嵌套事务 已经激活,在doBegin后激活
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
//获取已存在的事物的名称
String name = TransactionSynchronizationManager.getCurrentTransactionName();
//清空线程变量的
TransactionSynchronizationManager.setCurrentTransactionName(null);
//获取出只读事务的名称
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
//清空线程变量的
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
//获取已存在事务的隔离级别
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
//清空隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
//获取激活标志
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
//清空标记Actual=外层事务
TransactionSynchronizationManager.setActualTransactionActive(false);
//把上诉从线程变量中获取出来的存在事务属性封装为挂起的事务属性返回出去
return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
} else if (transaction != null) {
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
} else {
return null;
}
}
}
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
}

若为融合事务status.isNewSynchronization()一定为false,则prepareSynchronization中不会做任何事情。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
protected final DefaultTransactionStatus prepareTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
prepareSynchronization(status, definition);
return status;
}
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
//绑定事务激活
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
//当前事务的隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null);
//是否为只读事务
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
//事务的名称
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
}

在执行完嵌套事务的内层事务后在finally中会调用cleanupAfterCompletion方法,并将上层事务resume,即将事务的相关信息放回事务同步管理器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {

if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}