Seata 应用部分源码分析
为了清晰理解 Seata 工作机制,在出现问题时能快速找到原因并给出对应的解决方案,下面对 Seata 应用部分源码进行分析。这部分主要关注 Seata 在应用中的介入部分,对于 TM、RM 的实现部分分析会另开一文进行记录。这里主要针对 AT 模式进行展开,其他模式可基于相似的起点开始源码的阅读和学习。
Seata 自动配置
在 Spring Boot 应用中,自动配置的起点就在于 spring.factories 文件。启动过程中会由 SPI 机制读入其中的配置类并注入 Spring 容器中。
seata-spring-boot-starter 包中的 spring.factories 载入了 SeataAutoConfiguration
、SeataDataSourceAutoConfiguration
自动配置类。
SeataAutoConfiguration
注入 GlobalTransactionScanner
、FailureHandler
。
SeataDataSourceAutoConfiguration
注入 SeataAutoDataSourceProxyCreator
。
基于注入的这些 Bean 开启了应用的 Seata 集成。
全局事务切面介入
GlobalTransactionScanner
实现了 InitializingBean
从而利用 Bean 生命周期回调对自身进行初始化,其中调用了关键的 initClint
方法来实现。
GlobalTransactionScanner
还通过继承 AbstractAutoProxyCreator
成为了 BeanPostProcessor
得到修改其他 Bean 的能力。GlobalTransactionScanner
通过重写 wrapIfNecessary
方法实现了针对 Seata 的 Bean 增强逻辑。
wrapIfNecessary
首先通过 doCheckers 利用 ScannerChecker
快速排除已完成增强、Bean 黑名单、配置类等特殊 Bean。随后通过 InterfaceParser
检查每个 Bean 是否需要进行 Seata 增强,为需要增强的 Bean 构造代理调用处理器,并将代理调用处理器作为切面插入完成对 Bean 的增强。
值得注意的是,ScannerChecker
和 InterfaceParser
都通过 Seata 准备的 SPI 机制来加载相关的实现类。检查相关实现可以得到其增强逻辑。ScannerChecker
当前有三个实现,排除了配置类、黑名单包、作用域内这几类 Bean 。InterfaceParser
有 DefaultInterfaceParser
GlobalTransactionalInterceptorParser
TccActionInterceptorParser
三个实现,DefaultInterfaceParser
相当于兜底的空实现,GlobalTransactionalInterceptorParser
扫描 public 方法上存在 @GlobalTransactional
或 @GlobalLock
注解的类,返回 GlobalTransactionalInterceptorHandler
实例作为代理调用处理器,其中传入已扫描的需代理方法。TccActionInterceptorParser
内部判断逻辑则相对复杂,但会类似地返回 TccActionInterceptorHandler
实例作为代理调用处理器。
AT 模式调用处理器逻辑
GlobalTransactionalInterceptorHandler
继承了 AbstractProxyInvocationHandler
从而将代理的调用过程处理转移到 doInvoke
方法。其中首先提取 GlobalTransactional
及 GlobalLock
注解,并针对存在注解的情况分别调用 handleGlobalTransaction
或 handleGlobalLock
进行增强处理,不存在注解的场景下则直接透传调用。
全局事务处理
handleGlobalTransaction
将真实调用包装到 TransactionalTemplate
中执行。并在作为发起者捕获事务执行异常时交由 FailureHandler
处理。
TransactionalTemplate
执行全局事务控制。首先检查事务传播行为并在需要时挂起当前事务、创建全局事务上下文,在不需要全局事务特性时则直接透传调用并返回。由于整体包装在 try-finally 结构中,挂起的事务在退出时也可以自动恢复。在事务上下文准备就绪后即进入全局事务的正式执行流程。可简化如下
1 | try { |
beginTransaction
中在作为全局事务发起方时,最终调用到DefaultGlobalTransaction
的 begin
方法,在作为发起方时调用 TransactionManager
的 begin
方法,在 TM 角色的视角下开启全局事务。
completeTransactionAfterThrowing
中则调用 rollbackTransaction
回滚事务,或在抛出的异常可忽略时调用 commitTransaction
提交事务。
rollbackTransaction
在作为全局事务发起方调用时会调用 DefaultGlobalTransaction
的 rollback
方法,在作为发起方时带重试地调用 TransactionManager
的 rollback
方法,在 TM 角色的视角下回滚全局事务。
commitTransaction 与 rollbackTransaction
类似,在作为发起方调用时转到 DefaultGlobalTransaction
的 commit
方法,带重试地调用 TransactionManager
的 commit
方法,在 TM 角色的视角下提交全局事务。与 rollbackTransaction
不同的是,在提交前会检查全局事务超时时间,在已超时的情况下执行转为回滚逻辑。
全局锁处理
handleGlobalLock
将真实调用包装到 GlobalLockTemplate
中执行。内部在真实调用前后在事务上下文中进行锁标记的开启和关闭。检查逻辑则会在业务逻辑中由代理数据源产生的一系列增强中发挥作用。
数据源代理切面介入
与 GlobalTransactionScanner
类似,SeataAutoDataSourceProxyCreator
也基于 AbstractAutoProxyCreator
并在 wrapIfNecessary
中筛选 DataSource
进行代理增强。进行增强时也根据 Seata 事务模式选择了不同的数据源代理实现,包括 AT 模式下的 DataSourceProxy
和 XA 模式下的 DataSourceProxyXA
。
数据源代理机制
在 DataSourceProxy
构造时,会调用内部的 init
方法,其中在原始数据源上获取相关元信息并使用 DefaultResourceManager
的 registerResource
方法向分支类型对应的 RM 实现注册数据库资源。不同的分支模式下对应的 RM 实现是不一致的,它们都实现 ResourceManager
接口,并通过 getBranchType
的返回值来与分支类型对应。AT 模式下的 RM 实现为 DataSourceManager
。
在数据源代理构造时作为 RM 的初始化工作之外,作为数据源本身的工作更是作为数据源代理的核心工作。这些工作通过实现 DataSource
接口来达成。该接口实现的功能就只有获取数据连接这一个功能,数据源代理的实现便是在原始数据源上获取数据库连接后创建 ConnectionProxy
增强,并将增强后的连接代理作为连接返回。
数据库连接代理机制
返回的连接代理 ConnectionProxy
实现了数据库连接接口,对需要的方法进行了增强。它继承了 AbstractConnectionProxy
从而得到了一些公共的代理增强行为。此外,对 Connection
接口中的 commit
和 rollback
等在数据库连接使用中的关键方法进行实现,加入了全局锁相关的检查、重试机制。
提交逻辑
在连接代理实现的 commit
方法实现中,将本身的提交行为使用 LockRetryPolicy
进行了包装,在出现全局锁冲突时进行一定次数的提交重试。在这个带重试逻辑的事务提交逻辑中,会使用 processGlobalTransactionCommit
和 processLocalCommitWithGlobalLocks
分别处理全局事务提交情况和带全局锁的本地提交情况。在不涉及全局事务和全局锁的情况下则无需检查直接提交本地事务即可完成。这个过程中发生错误的情况下则需要转向回滚逻辑。
在 processGlobalTransactionCommit
中首先会携带锁信息向 RM 注册分支事务。在这个注册过程中就可能发现全局锁冲突的情况,这时就将超时作为异常上报并在重试机制的帮助下进行重试。在未发现全局锁冲突的情况下,便将全局事务产生的 undo_log 写入数据库并在同一个本地事务中进行提交。提交后不管成功还是失败,都需要将结果向 RM 回报分支状态。
processLocalCommitWithGlobalLocks
由于不涉及分布式事务,只是在执行本地事务提交前使用 RM 进行全局锁检查即可执行提交动作。
回滚逻辑
在 rollback
方法实现中,则将本地事务进行回滚后向 RM 回报回滚失败状态。
数据库声明(Statement)代理机制
在 ConnectionProxy
继承的 AbstractConnectionProxy
中,对从数据库连接中获取 Statement 相关的方法进行了实现,将数据库本身返回的 Statement
PreparedStatement
替换成增强的 StatementProxy
PreparedStatementProxy
。
在这些代理的 Statement 中,值得注意的点在于对 Statement 进行执行时的 execute
executeQuery
executeUpdate
系列方法实际上被转发到 ExecuteTemplate
的 execute
方法中去执行。其执行过程可分为3个步骤:首先构造解析 SQL 的 SQLRecognizer
;随后利用 SQLRecognizer
识别 SQL 语句类型并根据数据库类型选择对应的 Executor
实现;最后使用取得的 Executor
执行分支模式下需要进行的增强 SQL 逻辑。在使用 MySQL 的场景下,INSERT、UPDATE、DELETE、SELECT_FOR_UPDATE 类型语句分别使用 MySQLInsertExecutor
UpdateExecutor
DeleteExecutor
SelectForUpdateExecutor
执行器进行增强。而普通的 SELECT 类型语句由于不需要额外处理,直接使用 PlainExecutor
透传调用即可。
MySQLInsertExecutor
MySQLInsertExecutor
中实际上只重写了获取主键值相关的 InsertExcutor
内两个获取主键值相关的方法。更多的逻辑则完全通过继承 BaseInsertExecutor
及间接继承 AbstractDMLBaseExecutor
而来。通过这条继承链,语句的执行被重塑为“前镜像(beforeImage)-执行SQL-后镜像(afterImage)-UndoLog构造”的整体流程。在开启自动提交的场景下则需要辅以关闭自动提交、带重试策略的自动提交、重新开启自动提交,以及异常时调用回滚的过程。
具体到 BaseInsertExecutor
来看:前镜像直接返回空镜像;后镜像基于主键再次向数据库查询来构建。
UpdateExecutor
UpdateExecutor
的前镜像提取更新语句中的条件及镜像字段作为 SELECT FOR UPDATE 的条件及字段,对更新的记录取得本地锁,并直接利用加锁的同时查询的结果构造前镜像。后镜像则利用前镜像的记录提取主键作为条件,从数据库再次查询来构建。
DeleteExecutor
DeleteExecutor
前镜像构建方式与 UpdateExecutor
类似。后镜像则直接返回空记录。
SelectForUpdateExecutor
与前面几项 DML 执行器不同,SelectForUpdateExecutor
不涉及修改,因此也不需要进行前镜像、后镜像之类操作。而是在执行后根据全局锁获取是否成功来决定能否返回结果。此外,这一过程还具备一定的重试机制来减少失败。
Seata 应用部分总结
总的来说,在应用程序的部分,Seata 通过 Spring Boot 的自动配置机制,注入了两个 BeanPostProcessor 对 Spring 容器中的 Bean 进行增强。一方面为用户代码添加处理切面,起到 TM 的作用;另一方面为数据源进行代理,并对数据源上衍生的连接、声明添加处理切面,起到 RM 的作用。
对于 AT 模式下的全局锁机制,主要在连接代理、声明代理中起作用。在执行 DML 操作时,声明代理加入前代理、后代理机制,并准备 UndoLog 数据。而对于全局锁的检查则集中在连接代理层面,在执行提交动作前检查全局锁冲突、开启分布式事务分支、将 UndoLog 写入数据库,最后执行数据库本地事务提交并向 RM 汇报分支事务状态。
附录
配置动态更新机制
GlobalTransactionScanner
和 GlobalTransactionalInterceptorHandler
都实现了 ConfigurationChangeListener
从而在配置项发生变更时获得调用,并在对应的回调中进行配置变更相关的处理。GlobalTransactionScanner
在配置变更时会重新调用 initClient 从而对 TM 和 RM 配置进行更新。GlobalTransactionalInterceptorHandler
在配置变更时还会检查降级相关配置并进行对应的更新。总的来说,Seata 中的动态配置更新都通过 ConfigurationChangeListener
来实现。