Seata集成原理

Seata的集成包括两部分,一部分是对@GlobalTransactional@GlobalLock分布式事务注解的支持,一方面是对分支事务用到的一些RPC组件的处理,如RestTemplateFeign以及添加SeataHandlerInterceptor等。

对于@GlobalTransactional@GlobalLock分布式事务注解支持是通过SeataAutoConfiguration配置类中注入的GlobalTransactionScanner来完成。该类实现了InitializingBean接口在其初始化时,会通过afterPropertiesSet方法初始化资源管理器RMClient事务管理器TMClient。且该类还实现了AbstractAutoProxyCreator接口,在Bean初始化时通过后置处理器给Bean上或Bean的方法上带有@GlobalTransactional@GlobalLock注解的Bean创建代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
}
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
public void afterPropertiesSet() { // 初始化TMClient以及RMClient
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this);
if (disableGlobalTransaction) {
return;
}
if (initialized.compareAndSet(false, true)) {
initClient();
}
}
private void initClient() {
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); // 初始化TMClient
RMClient.init(applicationId, txServiceGroup); // 初始化RMClient
registerSpringShutdownHook();
}
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try { // AOP逻辑,初始化Bean的后置处理器中调用此方法,添加全局事务拦截器
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // TCC模式
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);
} else { // 使用@GlobalTransactional或者@GlobalLock注解
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {
return bean; // 若不存在@GlobalTransactional注解直接跳过生成AOP代理
}
if (interceptor == null) {
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
// getAdvicesAndAdvisorsForBean方法是
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
private boolean existsAnnotation(Class<?>[] classes) {
if (CollectionUtils.isNotEmpty(classes)) {
for (Class<?> clazz : classes) {
if (clazz == null) {
continue;
}
GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) { // 类上是否被标注@GlobalTransactional注解
return true;
}
Method[] methods = clazz.getMethods();
for (Method method : methods) {
trxAnno = method.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) { // 方法上是否被标注@GlobalTransactional注解
return true;
}

GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
if (lockAnno != null) { // 方法上是否被标注@GlobalLock注解
return true;
}
}
}
}
return false;
}
}

执行带有@GlobalTransactional注解的方法执行时会被GlobalTransactionalInterceptor拦截调用其invoke方法。最终通过TransactionalTemplateexecute来执行全局事务方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null) { // 处理被标注了@GlobalTransactional注解的方法
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) { // 处理被标注了@GlobalLock注解的方法
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable {
boolean succeed = true;
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name; // 在@GlobalTransactional注解中声明了事务名称
}
return formatMethod(methodInvocation.getMethod()); // 被@GlobalTransactional注解标注的方法的签名
}
@Override
public TransactionInfo getTransactionInfo() { // reset the value of timeout
int timeout = globalTrxAnno.timeoutMills(); // 全局事务超时时间
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
}

TransactionalTemplate中是全局事务的核心逻辑,首先获取当前全局事务,若不为空则说明是分支事务,则将交易角色设置为GlobalTransactionRole.Participant,然后根据事务的传播属性做相应的处理,若为新事务则创建角色为GlobalTransactionRole.Launcher的新事务,且发送beginTransaction的请求给事务协调者TC获取全局事务的Xid。然后执行业务逻辑,在业务逻辑中有分支事务的对分支事务处理,若发生异常则有Launcher角色向事务协调者TC发送回滚请求,从而通过undo_log回滚所有分支事务,若事务成功也是由Launcher角色向事务协调者TC发送提交请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 获取当前全局事务,若不为空,则交易角色为GlobalTransactionRole.Participant
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
Propagation propagation = txInfo.getPropagation(); // 获取当前全局事务的传播属性
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) { // 根据事务的传播属性做相应的处理
case NOT_SUPPORTED: // 若事务存在,则将其暂停
if (existingTransaction(tx)) { // 判断tx是否为null
suspendedResourcesHolder = tx.suspend();
}
return business.execute(); // 若不存在事务则直接执行业务方法
case REQUIRES_NEW: // 若事务存在,则暂停它,然后开始新的事务
if (existingTransaction(tx)) { // 判断tx是否为null
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
break; // 继续执行新的事务
case SUPPORTS: // 若事务不存在,则在没有事务的情况下执行
if (notExistingTransaction(tx)) { // 判断tx是否为null
return business.execute();
}
break; // 若事务存在,继续执行事务
case REQUIRED: // 若当前事务存在,则使用当前事务执行,否则继续并使用新事务执行
break;
case NEVER: // 若事务存在,则抛出异常
if (existingTransaction(tx)) { // 判断tx是否为null
throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid()));
} else { // 直接执行业务方法
return business.execute();
}
case MANDATORY: // 事务不存在,则抛出异常。
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
break; // 继续并执行当前事务。
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
if (tx == null) { // 若当前全局事务为null,则创建角色为GlobalTransactionRole.Launcher的新事务
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {// 若tx角色是GlobalTransactionRole.Launcher,则发送beginTransaction的请求给TC,否则什么都不做
beginTransaction(txInfo, tx); // 开启全局事务
Object rs;
try {// Do Your Business
rs = business.execute(); // 执行业务逻辑
} catch (Throwable ex) { // 回滚所需的业务异常
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
commitTransaction(tx); // 提交全局事务
return rs;
} finally { //5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally { // If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
}
public static GlobalTransaction getCurrent() {
String xid = RootContext.getXID(); // 从上下文中获取全局事务Xid
if (xid == null) { // 获取不到则说明是新事务
return null;
}
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}

数据源代理

通过@EnableAutoDataSourceProxy中导入的AutoDataSourceProxyRegistrar类完成SeataDataSourceBeanPostProcessorSeataAutoDataSourceProxyCreator类的注入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(AutoDataSourceProxyRegistrar.class)
@Documented
public @interface EnableAutoDataSourceProxy {
boolean useJdkProxy() default false;
String[] excludes() default {};
String dataSourceProxyMode() default "AT"; // 默认AT模式
}
public class AutoDataSourceProxyRegistrar implements ImportBeanDefinitionRegistrar {
private static final String ATTRIBUTE_KEY_USE_JDK_PROXY = "useJdkProxy";
private static final String ATTRIBUTE_KEY_EXCLUDES = "excludes";
private static final String ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE = "dataSourceProxyMode";
public static final String BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR = "seataDataSourceBeanPostProcessor";
public static final String BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR = "seataAutoDataSourceProxyCreator";
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(EnableAutoDataSourceProxy.class.getName());
boolean useJdkProxy = Boolean.parseBoolean(annotationAttributes.get(ATTRIBUTE_KEY_USE_JDK_PROXY).toString());
String[] excludes = (String[]) annotationAttributes.get(ATTRIBUTE_KEY_EXCLUDES);
String dataSourceProxyMode = (String) annotationAttributes.get(ATTRIBUTE_KEY_DATA_SOURCE_PROXY_MODE);
//register seataDataSourceBeanPostProcessor bean def
if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)) {
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
.genericBeanDefinition(SeataDataSourceBeanPostProcessor.class)
.addConstructorArgValue(excludes)
.addConstructorArgValue(dataSourceProxyMode)
.getBeanDefinition();
registry.registerBeanDefinition(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR, beanDefinition);
}
//register seataAutoDataSourceProxyCreator bean def
if (!registry.containsBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)) {
AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder
.genericBeanDefinition(SeataAutoDataSourceProxyCreator.class)
.addConstructorArgValue(useJdkProxy)
.addConstructorArgValue(excludes)
.addConstructorArgValue(dataSourceProxyMode)
.getBeanDefinition();
registry.registerBeanDefinition(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR, beanDefinition);
}
}
}

不仅仅需要对@GlobalTransactional注解的支持,还需要对数据源DataSource对象创建DataSourceProxy代理。该逻辑是在SeataAutoConfiguration配置类中的SeataDataSourceConfiguration来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
static class SeataDataSourceConfiguration {
@Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
@ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(), seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
}
}

首先通过SeataDataSourceBeanPostProcessor后置处理器,在DataSource初始化完成后创建一个DataSourceProxy代理将其缓存到dataSourceProxyMap中。然后通过AbstractAutoProxyCreator的子类SeataAutoDataSourceProxyCreator给所有的DataSource真正的创建代理,在执行所有的DataSource的方法时会调用SeataAutoDataSourceProxyAdviceInvoke方法,从dataSourceProxyMap中获取对应的缓存的代理对象,从而调用代理对象中的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class SeataDataSourceBeanPostProcessor implements BeanPostProcessor {
private final List<String> excludes;
private final BranchType dataSourceProxyMode;
public SeataDataSourceBeanPostProcessor(String[] excludes, String dataSourceProxyMode) {
this.excludes = Arrays.asList(excludes);
this.dataSourceProxyMode = BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode) ? BranchType.XA : BranchType.AT;
}
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource) { // 给数据源创建代理对象
//When not in the excludes, put and init proxy.
if (!excludes.contains(bean.getClass().getName())) { // 对代理对象初始化
DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
}
if (bean instanceof SeataDataSourceProxy) {
return ((SeataDataSourceProxy) bean).getTargetDataSource();
}
}
return bean;
}
}
public class DataSourceProxyHolder {
private static final int MAP_INITIAL_CAPACITY = 8;
private ConcurrentHashMap<DataSource, SeataDataSourceProxy> dataSourceProxyMap;
public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
DataSource originalDataSource;
if (dataSource instanceof SeataDataSourceProxy) {
SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;
if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
return (SeataDataSourceProxy)dataSource;
}
originalDataSource = dataSourceProxy.getTargetDataSource();
} else {
originalDataSource = dataSource;
}
return CollectionUtils.computeIfAbsent(this.dataSourceProxyMap, originalDataSource, BranchType.XA == dataSourceProxyMode ? DataSourceProxyXA::new : DataSourceProxy::new);
}
}
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator { // 给DataSource自动创建代理
public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
this.excludes = Arrays.asList(excludes);
this.advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode));
setProxyTargetClass(!useJdkProxy);
}
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
return new Object[]{advisor};
}
// beanClass是DataSource的子类 && beanClass不是beanClass的子类 && beanClass没有被排除,则创建通过SeataAutoDataSourceProxyAdvice创建代理
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
return !DataSource.class.isAssignableFrom(beanClass) || SeataProxy.class.isAssignableFrom(beanClass) || excludes.contains(beanClass.getName());
}
}
public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {
private final BranchType dataSourceProxyMode;
private final Class<? extends SeataDataSourceProxy> dataSourceProxyClazz;

public SeataAutoDataSourceProxyAdvice(String dataSourceProxyMode) {
if (BranchType.AT.name().equalsIgnoreCase(dataSourceProxyMode)) {
this.dataSourceProxyMode = BranchType.AT;
this.dataSourceProxyClazz = DataSourceProxy.class;
} else if (BranchType.XA.name().equalsIgnoreCase(dataSourceProxyMode)) {
this.dataSourceProxyMode = BranchType.XA;
this.dataSourceProxyClazz = DataSourceProxyXA.class;
} else {
throw new IllegalArgumentException("Unknown dataSourceProxyMode: " + dataSourceProxyMode);
}
RootContext.setDefaultBranchType(this.dataSourceProxyMode);
}
public Object invoke(MethodInvocation invocation) throws Throwable {
if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
return invocation.proceed();
}
Method method = invocation.getMethod();
Object[] args = invocation.getArguments();
Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
if (m != null) {
SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
return m.invoke(dataSourceProxy, args); // 调用DataSourceProxy的代理方法
} else {
return invocation.proceed();
}
}
public Class<?>[] getInterfaces() {
return new Class[]{SeataProxy.class};
}
}

DataSourceProxy中创建了Connection的代理对象ConnectionProxy, 在ConnectionProxy重写了事务的提交回滚等方法,且在其超类AbstractConnectionProxy中为StatementPreparedStatement也生成了代理对象StatementProxyPreparedStatementProxy。在这些类中相应的扩展从而实现了分布式事务的相关逻辑。从此也可以知道SeataAT模式是依赖于数据库本地事务的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
}
public class ConnectionProxy extends AbstractConnectionProxy {
public void commit() throws SQLException {
try {
LOCK_RETRY_POLICY.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit()) {
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
public void rollback() throws SQLException {
targetConnection.rollback();
if (context.inGlobalTransaction() && context.isBranchRegistered()) {
report(false);
}
context.reset();
}
public void setAutoCommit(boolean autoCommit) throws SQLException {
if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {
doCommit();
}
targetConnection.setAutoCommit(autoCommit);
}
}
public abstract class AbstractConnectionProxy implements Connection {
public Statement createStatement() throws SQLException {
Statement targetStatement = getTargetConnection().createStatement();
return new StatementProxy(this, targetStatement);
}
public PreparedStatement prepareStatement(String sql) throws SQLException {
String dbType = getDbType();
PreparedStatement targetPreparedStatement = null;
if (BranchType.AT == RootContext.getBranchType()) {
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(), sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}
}
}
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
}

RPC集成

对于RestTemplate的集成是通过SeataRestTemplateAutoConfiguration配置类来完成的,通过@PostConstruct注解标注的初始化方法中给RestTemplate添加SeataRestTemplateInterceptor拦截器。该拦截器的作用就是若当前全局事务Xid存在,则将其设置到请求的header中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration(proxyBeanMethods = false)
public class SeataRestTemplateAutoConfiguration {
@Bean
public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
return new SeataRestTemplateInterceptor();
}
@Autowired(required = false)
private Collection<RestTemplate> restTemplates;
@Autowired
private SeataRestTemplateInterceptor seataRestTemplateInterceptor;
@PostConstruct
public void init() {
if (this.restTemplates != null) {
for (RestTemplate restTemplate : restTemplates) {
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(restTemplate.getInterceptors());
interceptors.add(this.seataRestTemplateInterceptor);
restTemplate.setInterceptors(interceptors);
}
}
}
}
public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
String xid = RootContext.getXID();
if (!StringUtils.isEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
}

对于Feign的集成是通过SeataFeignClientAutoConfiguration中的内部配置类FeignBeanPostProcessorConfiguration中导入的后置处理器来完成,主要是通过SeataFeignObjectWrapperClient对象创建了一个SeataFeignClient代理,在该代理中会将全局事务的Xid设置到Header中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Client.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class SeataFeignClientAutoConfiguration {
@Configuration(proxyBeanMethods = false)
protected static class FeignBeanPostProcessorConfiguration {
@Bean
SeataBeanPostProcessor seataBeanPostProcessor(SeataFeignObjectWrapper seataFeignObjectWrapper) {
return new SeataBeanPostProcessor(seataFeignObjectWrapper);
}
@Bean
SeataContextBeanPostProcessor seataContextBeanPostProcessor(BeanFactory beanFactory) {
return new SeataContextBeanPostProcessor(beanFactory);
}
@Bean
SeataFeignObjectWrapper seataFeignObjectWrapper(BeanFactory beanFactory) {
return new SeataFeignObjectWrapper(beanFactory);
}
}
}
public class SeataBeanPostProcessor implements BeanPostProcessor {
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return this.seataFeignObjectWrapper.wrap(bean);
}
}
public class SeataFeignObjectWrapper {
Object wrap(Object bean) {
if (bean instanceof Client && !(bean instanceof SeataFeignClient)) {
if (bean instanceof LoadBalancerFeignClient) {
LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean);
return new SeataLoadBalancerFeignClient(client.getDelegate(), factory(), clientFactory(), this);
}
if (bean instanceof FeignBlockingLoadBalancerClient) {
FeignBlockingLoadBalancerClient client = (FeignBlockingLoadBalancerClient) bean;
return new SeataFeignBlockingLoadBalancerClient(client.getDelegate(), beanFactory.getBean(BlockingLoadBalancerClient.class), this);
}
return new SeataFeignClient(this.beanFactory, (Client) bean);
}
return bean;
}
}
public class SeataFeignClient implements Client {
public Response execute(Request request, Request.Options options) throws IOException {
Request modifiedRequest = getModifyRequest(request);
return this.delegate.execute(modifiedRequest, options);
}
private Request getModifyRequest(Request request) {
String xid = RootContext.getXID();
if (StringUtils.isEmpty(xid)) {
return request;
}
Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
headers.putAll(request.headers());
List<String> seataXid = new ArrayList<>();
seataXid.add(xid);
headers.put(RootContext.KEY_XID, seataXid);
return Request.create(request.method(), request.url(), headers, request.body(), request.charset());
}
}

这是针对请求发送的处理,对于分支事务接收到请求时也需要获取到全局事务Xid设置到RootContext中,该功能是通过SeataHandlerInterceptorConfiguration配置类中添加SeataHandlerInterceptor拦截器来完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@ConditionalOnWebApplication
public class SeataHandlerInterceptorConfiguration implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**");
}
}
public class SeataHandlerInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String xid = RootContext.getXID();
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isBlank(xid) && rpcXid != null) {
RootContext.bind(rpcXid);
if (log.isDebugEnabled()) {
}
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception e) {
if (StringUtils.isNotBlank(RootContext.getXID())) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isEmpty(rpcXid)) {
return;
}
String unbindXid = RootContext.unbind();
if (log.isDebugEnabled()) {
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
if (unbindXid != null) {
RootContext.bind(unbindXid);
}
}
}
}
}