Seata 应用部分源码分析

为了清晰理解 Seata 工作机制,在出现问题时能快速找到原因并给出对应的解决方案,下面对 Seata 应用部分源码进行分析。这部分主要关注 Seata 在应用中的介入部分,对于 TM、RM 的实现部分分析会另开一文进行记录。这里主要针对 AT 模式进行展开,其他模式可基于相似的起点开始源码的阅读和学习。

Seata 自动配置

在 Spring Boot 应用中,自动配置的起点就在于 spring.factories 文件。启动过程中会由 SPI 机制读入其中的配置类并注入 Spring 容器中。

seata-spring-boot-starter 包中的 spring.factories 载入了 SeataAutoConfigurationSeataDataSourceAutoConfiguration 自动配置类。

SeataAutoConfiguration 注入 GlobalTransactionScannerFailureHandler

SeataDataSourceAutoConfiguration 注入 SeataAutoDataSourceProxyCreator

基于注入的这些 Bean 开启了应用的 Seata 集成。

全局事务切面介入

GlobalTransactionScanner 实现了 InitializingBean 从而利用 Bean 生命周期回调对自身进行初始化,其中调用了关键的 initClint 方法来实现。

GlobalTransactionScanner 还通过继承 AbstractAutoProxyCreator 成为了 BeanPostProcessor 得到修改其他 Bean 的能力。GlobalTransactionScanner 通过重写 wrapIfNecessary 方法实现了针对 Seata 的 Bean 增强逻辑。

wrapIfNecessary 首先通过 doCheckers 利用 ScannerChecker 快速排除已完成增强、Bean 黑名单、配置类等特殊 Bean。随后通过 InterfaceParser 检查每个 Bean 是否需要进行 Seata 增强,为需要增强的 Bean 构造代理调用处理器,并将代理调用处理器作为切面插入完成对 Bean 的增强。

值得注意的是,ScannerCheckerInterfaceParser 都通过 Seata 准备的 SPI 机制来加载相关的实现类。检查相关实现可以得到其增强逻辑。ScannerChecker 当前有三个实现,排除了配置类、黑名单包、作用域内这几类 Bean 。InterfaceParserDefaultInterfaceParser GlobalTransactionalInterceptorParser TccActionInterceptorParser 三个实现,DefaultInterfaceParser 相当于兜底的空实现,GlobalTransactionalInterceptorParser 扫描 public 方法上存在 @GlobalTransactional@GlobalLock 注解的类,返回 GlobalTransactionalInterceptorHandler 实例作为代理调用处理器,其中传入已扫描的需代理方法。TccActionInterceptorParser 内部判断逻辑则相对复杂,但会类似地返回 TccActionInterceptorHandler 实例作为代理调用处理器。

AT 模式调用处理器逻辑

GlobalTransactionalInterceptorHandler 继承了 AbstractProxyInvocationHandler 从而将代理的调用过程处理转移到 doInvoke 方法。其中首先提取 GlobalTransactionalGlobalLock 注解,并针对存在注解的情况分别调用 handleGlobalTransactionhandleGlobalLock 进行增强处理,不存在注解的场景下则直接透传调用。

全局事务处理

handleGlobalTransaction 将真实调用包装到 TransactionalTemplate 中执行。并在作为发起者捕获事务执行异常时交由 FailureHandler 处理。

TransactionalTemplate 执行全局事务控制。首先检查事务传播行为并在需要时挂起当前事务、创建全局事务上下文,在不需要全局事务特性时则直接透传调用并返回。由于整体包装在 try-finally 结构中,挂起的事务在退出时也可以自动恢复。在事务上下文准备就绪后即进入全局事务的正式执行流程。可简化如下

1
2
3
4
5
6
7
8
9
10
11
12
13
try {
this.beginTransaction(txInfo, tx);
try {
rs = business.execute();
} catch (Throwable e) {
this.completeTransactionAfterThrowing(txInfo, tx, var17);
}
this.commitTransaction(tx, txInfo);
} finally {
this.resumeGlobalLockConfig(previousConfig);
this.triggerAfterCompletion(tx);
this.cleanUp();
}

beginTransaction 中在作为全局事务发起方时,最终调用到DefaultGlobalTransactionbegin 方法,在作为发起方时调用 TransactionManagerbegin 方法,在 TM 角色的视角下开启全局事务。

completeTransactionAfterThrowing 中则调用 rollbackTransaction 回滚事务,或在抛出的异常可忽略时调用 commitTransaction 提交事务。

rollbackTransaction 在作为全局事务发起方调用时会调用 DefaultGlobalTransactionrollback 方法,在作为发起方时带重试地调用 TransactionManagerrollback 方法,在 TM 角色的视角下回滚全局事务。

commitTransaction 与 rollbackTransaction 类似,在作为发起方调用时转到 DefaultGlobalTransactioncommit 方法,带重试地调用 TransactionManagercommit 方法,在 TM 角色的视角下提交全局事务。与 rollbackTransaction 不同的是,在提交前会检查全局事务超时时间,在已超时的情况下执行转为回滚逻辑。

全局锁处理

handleGlobalLock 将真实调用包装到 GlobalLockTemplate 中执行。内部在真实调用前后在事务上下文中进行锁标记的开启和关闭。检查逻辑则会在业务逻辑中由代理数据源产生的一系列增强中发挥作用。

数据源代理切面介入

GlobalTransactionScanner 类似,SeataAutoDataSourceProxyCreator 也基于 AbstractAutoProxyCreator 并在 wrapIfNecessary 中筛选 DataSource 进行代理增强。进行增强时也根据 Seata 事务模式选择了不同的数据源代理实现,包括 AT 模式下的 DataSourceProxy 和 XA 模式下的 DataSourceProxyXA

数据源代理机制

DataSourceProxy 构造时,会调用内部的 init 方法,其中在原始数据源上获取相关元信息并使用 DefaultResourceManagerregisterResource 方法向分支类型对应的 RM 实现注册数据库资源。不同的分支模式下对应的 RM 实现是不一致的,它们都实现 ResourceManager 接口,并通过 getBranchType 的返回值来与分支类型对应。AT 模式下的 RM 实现为 DataSourceManager

在数据源代理构造时作为 RM 的初始化工作之外,作为数据源本身的工作更是作为数据源代理的核心工作。这些工作通过实现 DataSource 接口来达成。该接口实现的功能就只有获取数据连接这一个功能,数据源代理的实现便是在原始数据源上获取数据库连接后创建 ConnectionProxy 增强,并将增强后的连接代理作为连接返回。

数据库连接代理机制

返回的连接代理 ConnectionProxy 实现了数据库连接接口,对需要的方法进行了增强。它继承了 AbstractConnectionProxy 从而得到了一些公共的代理增强行为。此外,对 Connection 接口中的 commitrollback 等在数据库连接使用中的关键方法进行实现,加入了全局锁相关的检查、重试机制。

提交逻辑

在连接代理实现的 commit 方法实现中,将本身的提交行为使用 LockRetryPolicy 进行了包装,在出现全局锁冲突时进行一定次数的提交重试。在这个带重试逻辑的事务提交逻辑中,会使用 processGlobalTransactionCommitprocessLocalCommitWithGlobalLocks 分别处理全局事务提交情况和带全局锁的本地提交情况。在不涉及全局事务和全局锁的情况下则无需检查直接提交本地事务即可完成。这个过程中发生错误的情况下则需要转向回滚逻辑。

processGlobalTransactionCommit 中首先会携带锁信息向 RM 注册分支事务。在这个注册过程中就可能发现全局锁冲突的情况,这时就将超时作为异常上报并在重试机制的帮助下进行重试。在未发现全局锁冲突的情况下,便将全局事务产生的 undo_log 写入数据库并在同一个本地事务中进行提交。提交后不管成功还是失败,都需要将结果向 RM 回报分支状态。

processLocalCommitWithGlobalLocks 由于不涉及分布式事务,只是在执行本地事务提交前使用 RM 进行全局锁检查即可执行提交动作。

回滚逻辑

rollback 方法实现中,则将本地事务进行回滚后向 RM 回报回滚失败状态。

数据库声明(Statement)代理机制

ConnectionProxy 继承的 AbstractConnectionProxy 中,对从数据库连接中获取 Statement 相关的方法进行了实现,将数据库本身返回的 Statement PreparedStatement 替换成增强的 StatementProxy PreparedStatementProxy

在这些代理的 Statement 中,值得注意的点在于对 Statement 进行执行时的 execute executeQuery executeUpdate 系列方法实际上被转发到 ExecuteTemplateexecute 方法中去执行。其执行过程可分为3个步骤:首先构造解析 SQL 的 SQLRecognizer;随后利用 SQLRecognizer 识别 SQL 语句类型并根据数据库类型选择对应的 Executor 实现;最后使用取得的 Executor 执行分支模式下需要进行的增强 SQL 逻辑。在使用 MySQL 的场景下,INSERT、UPDATE、DELETE、SELECT_FOR_UPDATE 类型语句分别使用 MySQLInsertExecutor UpdateExecutor DeleteExecutor SelectForUpdateExecutor 执行器进行增强。而普通的 SELECT 类型语句由于不需要额外处理,直接使用 PlainExecutor 透传调用即可。

MySQLInsertExecutor

MySQLInsertExecutor 中实际上只重写了获取主键值相关的 InsertExcutor 内两个获取主键值相关的方法。更多的逻辑则完全通过继承 BaseInsertExecutor 及间接继承 AbstractDMLBaseExecutor 而来。通过这条继承链,语句的执行被重塑为“前镜像(beforeImage)-执行SQL-后镜像(afterImage)-UndoLog构造”的整体流程。在开启自动提交的场景下则需要辅以关闭自动提交、带重试策略的自动提交、重新开启自动提交,以及异常时调用回滚的过程。

具体到 BaseInsertExecutor 来看:前镜像直接返回空镜像;后镜像基于主键再次向数据库查询来构建。

UpdateExecutor

UpdateExecutor 的前镜像提取更新语句中的条件及镜像字段作为 SELECT FOR UPDATE 的条件及字段,对更新的记录取得本地锁,并直接利用加锁的同时查询的结果构造前镜像。后镜像则利用前镜像的记录提取主键作为条件,从数据库再次查询来构建。

DeleteExecutor

DeleteExecutor 前镜像构建方式与 UpdateExecutor 类似。后镜像则直接返回空记录。

SelectForUpdateExecutor

与前面几项 DML 执行器不同,SelectForUpdateExecutor 不涉及修改,因此也不需要进行前镜像、后镜像之类操作。而是在执行后根据全局锁获取是否成功来决定能否返回结果。此外,这一过程还具备一定的重试机制来减少失败。

Seata 应用部分总结

总的来说,在应用程序的部分,Seata 通过 Spring Boot 的自动配置机制,注入了两个 BeanPostProcessor 对 Spring 容器中的 Bean 进行增强。一方面为用户代码添加处理切面,起到 TM 的作用;另一方面为数据源进行代理,并对数据源上衍生的连接、声明添加处理切面,起到 RM 的作用。

Seata整合

对于 AT 模式下的全局锁机制,主要在连接代理、声明代理中起作用。在执行 DML 操作时,声明代理加入前代理、后代理机制,并准备 UndoLog 数据。而对于全局锁的检查则集中在连接代理层面,在执行提交动作前检查全局锁冲突、开启分布式事务分支、将 UndoLog 写入数据库,最后执行数据库本地事务提交并向 RM 汇报分支事务状态。

Seata AT 时序

附录

配置动态更新机制

GlobalTransactionScannerGlobalTransactionalInterceptorHandler 都实现了 ConfigurationChangeListener 从而在配置项发生变更时获得调用,并在对应的回调中进行配置变更相关的处理。GlobalTransactionScanner 在配置变更时会重新调用 initClient 从而对 TM 和 RM 配置进行更新。GlobalTransactionalInterceptorHandler 在配置变更时还会检查降级相关配置并进行对应的更新。总的来说,Seata 中的动态配置更新都通过 ConfigurationChangeListener 来实现。

参考资料

seata 源码解析(图解_秒懂_史上最全) - 疯狂创客圈 - 博客园

GitHub - apache/incubator-seata at 2.0.0