赞
踩
在该核心类中主要创建事务管理器之 GlobalTransactionScanner
实例 & 资源管理器 SeataAutoDataSourceProxyCreator
。
@GlobalTransactional
注解的bean被Cglib代理,方便增强目标方法从而实现全局事务的控制。@GlobalLock
、@LocalTCC
注解的bean被Cglib代理,方便增强目标方法从而实现局部事务的提交 or 回滚。SeataAutoDataSourceProxyCreator控制资源管理器RM中DataSource,只要操作数据库就会被Seata代理------At模式。
Seata解决分布式事务的应用中几乎所有的bean均会被GlobalTransactionScanner拦截处理。至少在at、tcc模式下,tm角色中的@GlobalTransactional注解的目标bean、RM角色中的@LocalTCC、@TwoPhaseBusinessAction注解的bean将被拦截生成代理。
public class GlobalTransactionScanner extends AbstractAutoProxyCreator{ private MethodInterceptor globalTransactionalInterceptor; private MethodInterceptor interceptor; @Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { interceptor = null; // TCC模式RM角色中,@LocalTCC、@TwoPhaseBusinessAction注解的bean生成代理 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); ... } else {//该分支一般情况下在TM角色中成立,tcc、at模式均依赖@GlobalTransactional生成全局事务ID ... // 如果当前bean没有@GlobalTransactional、@GlobalLock注解,则不需要seata管理bean,直接返回去 if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } ... globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); interceptor = globalTransactionalInterceptor; } ... return bean; } }
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor { private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate(); private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>(); public Object invoke(final MethodInvocation methodInvocation) throws Throwable {//执行通知advice核心逻辑 Class clazz = AopUtils.getTargetClass(methodInvocation.getThis()); Class<?> targetClass = methodInvocation.getThis() != null ? clazz : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); GlobalTransactional gta = getAnnotation(method, targetClass, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); if (!localDisable) {//#1 if (globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, gta); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation); } } } return methodInvocation.proceed(); } private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception { return globalLockTemplate.execute(() -> { return methodInvocation.proceed(); }); } private Object handleGlobalTransaction(MethodInvocation invocation,GlobalTransactional globalTrxAnno){ boolean succeed = true; return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { return invocation.proceed();//真正开始目标方法 } ... @Override public TransactionInfo getTransactionInfo() { TransactionInfo transactionInfo = new TransactionInfo(); ... return transactionInfo; } }); } }
步骤1:证明GlobalTransactional
& GlobalLock
具有互斥性。优先执行GlobalTransactional注解的功能。
核心功能主要是在执行目标方法前,调用事务协调器TC生成当前分布式事务的全局事务ID。
public class TransactionalTemplate { public Object execute(TransactionalExecutor business) throws Throwable { // 1 get transactionInfo 注解存在事务的相关信息 TransactionInfo txInfo = business.getTransactionInfo(); // 1.1 get or create a transaction DefaultGlobalTransaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.2 Handle the Transaction propatation and the branchType Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null; switch (propagation) { ...//事务传播策略 } triggerBeforeBegin(); // 2. begin transaction 核心是生成与当前请求对应的全局事务ID tx.begin(txInfo.getTimeOut(), txInfo.getName());//tx:DefaultGlobalTransaction triggerAfterBegin(); // 3.执行目标方法 try { // Do Your Business rs = business.execute(); } catch (Throwable ex) { // 3.此内部存在RollbackRule即回滚策略。用户可以自定义回滚策略,回滚与否用户可以自定义选择 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 5. everything is fine, commit. commitTransaction(tx); return rs; } }
RootContext内部维护了跟线程绑定的上下文容器ContextCore。Seata提供了两种类型的实现类FastThreadLocalContextCore【默认】、ThreadLocalContextCore。
COMMIT_RETRY_COUNT:提交失败默认重试5次。
提交即发起 GlobalCommitRequest
类型的请求通知TC「事务协调器」服务。根据RM执行结果判断提交还是回滚。
RM服务不管是被远程调用还是本地执行都必须显式抛出相关异常才能触发TM向TC发送回滚的通知,否则触发向TC发送提交的通知。TC根据TM的通知类型真正回调RM触发其对应的commit or rollback。
public class DefaultGlobalTransaction implements GlobalTransaction { public void commit() throws TransactionException { ... int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT; try { while (retry > 0) { try { status = transactionManager.commit(xid); //status = transactionManager.rollback(xid); break; } catch (Throwable ex) { retry--; } } } } }
public class GlobalLockTemplate<T> { public Object execute(Callable<T> business) throws Exception { Object rs; boolean hasInGlobalLock = RootContext.requireGlobalLock(); if (!hasInGlobalLock) {//如果没有全局锁,则初始化全局锁 RootContext.bindGlobalLockFlag(); } try { rs = business.call(); } finally { if (!hasInGlobalLock) {//释放全局锁 RootContext.unbindGlobalLockFlag(); } } return rs; } }
@GlobalLock注解存在必要性:因为并不是所有的数据库操作都需要开启全局事务,而开启全局事务是一个比较重的操作,需要向 TC 发起开启全局事务等 RPC 过程,而@GlobalLock注解只会在执行过程中查询全局锁是否存在,不会去开启全局事务,因此在不需要全局事务,而又需要检查全局锁避免脏读脏写时,使用@GlobalLock注解是一个更加轻量的操作。
从上述得知,TC服务中添加全局锁GlobalLock,在RPC远程调用RM时并不会同@GlobalTransactional似的在请求头传递全局锁,说明@GlobalLock只能用于RM中。
如果TM是通过RPC方式调用RM,以Feign为例。SeataFeignClientAutoConfiguration自动装配类重新初始化Feign客户端之SeataFeignClient。在SeataFeignClient内部会将全局事务ID添加到请求头中。
RM方利用拦截器SeataHandlerInterceptor获取请求头的全局事务ID。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。