Spring 动态数据源事务处理

在一般的 Spring 应用中,如果底层数据库访问采用的是 MyBatis,那么在大多数情况下,只使用一个单独的数据源,Spring 的事务管理在大多数情况下都是有效的。然而,在一些复杂的业务场景下,如需要在某一时刻访问不同的数据库,由于 Spring 对于事务管理实现的方式,可能不能达到预期的效果。本文将简要介绍 Spring 中事务的实现方式,并对以 MyBatis 为底层数据库访问的系统为例,提供多数据源事务处理的解决方案

Spring 事务的实现原理

常见地,在 Spring 中添加事务的方式通常都是在对应的方法或类上加上 @Transactional 注解显式地将这部分处理加上事务,对于 @Transactional 注解,Spring 会在 org.springframework.transaction.annotation.AnnotationTransactionAttributeSource 定义方法拦截的匹配规则(即 AOP 部分中的 PointCut),而具体的处理逻辑(即 AOP 中的 Advice)则是在 org.springframework.transaction.interceptor.TransactionInterceptor 中定义

具体事务执行的调用链路如下

spring-transaction-flow.png

Spring 对于事务切面采取的具体行为实现如下:

public class TransactionInterceptor 
    extends TransactionAspectSupport 
    implements MethodInterceptor, Serializable {
    
    // 这里的方法定义为 MethodInterceptor,即 AOP 实际调用点
    @Override
	@Nullable
	public Object invoke(MethodInvocation invocation) throws Throwable {
		// Work out the target class: may be {@code null}.
		// The TransactionAttributeSource should be passed the target class
		// as well as the method, which may be from an interface.
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
        // invokeWithinTransaction 为父类 TransactionAspectSupport 定义的方法
		return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
			@Override
			@Nullable
			public Object proceedWithInvocation() throws Throwable {
				return invocation.proceed();
			}
			@Override
			public Object getTarget() {
				return invocation.getThis();
			}
			@Override
			public Object[] getArguments() {
				return invocation.getArguments();
			}
		});
	}
}

继续进入 TransactionAspectSupport 的 invokeWithinTransaction 方法:

public abstract class TransactionAspectSupport 
    implements BeanFactoryAware, InitializingBean {
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {
        // 省略响应式事务和编程式事务的处理逻辑

        // 当前事务管理的实际
		PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
            /*
            	检查在当前的执行上下文中,是否需要创建新的事务,这是因为当前执行的业务处理可能在上一个已经开始
            	的事务处理中
            */
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
				retVal = invocation.proceedWithInvocation(); // 实际业务代码的业务处理
			}
			catch (Throwable ex) {
				// target invocation exception
				completeTransactionAfterThrowing(txInfo, ex); // 出现异常的回滚处理
				throw ex;
			}
			finally {
				cleanupTransactionInfo(txInfo);
			}

			if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
				// Set rollback-only in case of Vavr failure matching our rollback rules...
				TransactionStatus status = txInfo.getTransactionStatus();
				if (status != null && txAttr != null) {
					retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
				}
			}

            // 如果没有出现异常,则提交本次事务
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}
	}
}

在获取事务信息对象时,首先需要获取到对应的事务状态对象 TransactionStatus,这个状态对象决定了 Spring 后续要对当前事务采取的何种行为,具体代码在 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction

// 这里的 definition 是通过解析 @Transactional 注解中的属性得到的配置对象
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    throws TransactionException {

    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    /*
    	这里获取事务相关的对象(如持有的数据库连接等),具体由子类来定义相关的实现
    */
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // 如果当前已经在一个事务中,那么需要按照定义的属性采取对应的行为
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }

    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
            "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 需要重新开启一个新的事务的情况,具体在 org.springframework.transaction.TransactionDefinition 有相关的定义
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
        }
        try {
            // 开启一个新的事务
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

在 AbstractPlatformTransactionManager 中已经定义了事务处理的大体框架,而实际的事务实现则交由具体的子类实现,在一般情况下,由 org.springframework.jdbc.datasource.DataSourceTransactionManager 采取具体的实现

主要关注的点在于对于事务信息对象的创建,事务的开启、提交回滚操作,具体对应的代码如下:

事务信息对象的创建代码:

protected Object doGetTransaction() {
    /*
    	简单地理解,DataSourceTransactionObject 就是一个持有数据库连接的资源对象
    */
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    /*
    	TransactionSynchronizationManager 是用于管理在事务执行过程相关的信息对象的一个工具类,基本上
    	这个类持有的事务信息贯穿了整个 Spring 事务管理
    */
    ConnectionHolder conHolder =
        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

开启事务对应的源代码:

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        /*
        	如果当前事务对象没有持有数据库连接,则需要从对应的 DataSource 中获取对应的连接
        */
        if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        txObject.setReadOnly(definition.isReadOnly());

        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        
        /*
        	由于当前的事务已经交由 Spring 进行管理,那么在这种情况下,原有数据库连接的自动提交
        	必须是关闭的,因为如果开启了自动提交,那么实际上就相当于每一次的 SQL 都会执行一次事务的提交,
        	这种情况下事务的管理没有意义
        */
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);
        }

        prepareTransactionalConnection(con, definition);
        txObject.getConnectionHolder().setTransactionActive(true);

        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // Bind the connection holder to the thread.
        
        /*
        	如果是新创建的事务,那么需要绑定这个数据库连接对象到这个事务中,使得后续再进来的业务处理
        	能够顺利地进入原有的事务中
        */
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    }

    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/303173.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

已解决 ValueError: Data cardinality is ambiguous 问题

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通Golang》…

WPF 导航界面悬浮两行之间的卡片 漂亮的卡片导航界面 WPF漂亮渐变颜色 WPF漂亮导航头界面 UniformGrid漂亮展现

在现代应用程序设计中&#xff0c;一个漂亮的WPF导航界面不仅为用户提供视觉上的享受&#xff0c;更对提升用户体验、增强功能可发现性和应用整体效率起到至关重要的作用。以下是对WPF漂亮导航界面重要性的详尽介绍&#xff1a; 首先&#xff0c;引人入胜的首页界面是用户与软…

Redis原理篇(Dict的收缩扩容机制和渐进式rehash)

Dict&#xff08;即字典&#xff09; Redis是一种键值型数据库&#xff0c;其中键与值的映射关系就是Dict实现的。 Dict通过三部分组成&#xff1a;哈希表&#xff08;DictHashTable&#xff09;&#xff0c;哈希节点(DictEntry)&#xff0c;字典&#xff08;Dict&#xff09…

【docker】centos7安装harbor

目录 零、前提一、下载离线包二、安装三、访问四、开机自启 零、前提 1.前提是已经安装了docker和docker-compose 一、下载离线包 1. csdn资源&#xff1a;harbor-offline-installer-v2.10.0.tgz 2. 百度云盘&#xff08;提取码&#xff1a;ap3t&#xff09;&#xff1a;harbo…

Nvidia Jetson AGX Orin使用CAN与底盘通信(ROS C++ 驱动)

文章目录 一、Nvidia Jetson AGX Orin使用CAN通信1.1 CAN使能配置修改GPIO口功能1.2 can收发测试 二、通过CAN协议编写CAN的SocketCan ROS1驱动程序2.1 通讯协议2.2 接收数据节点2.3 发送数据节点2.4 功能包配置 三、ROS2驱动程序 一、Nvidia Jetson AGX Orin使用CAN通信 参考…

python股票分析挖掘预测技术指标知识之蜡烛图指标(6)

本人股市多年的老韭菜&#xff0c;各种股票分析书籍&#xff0c;技术指标书籍阅历无数&#xff0c;萌发想法&#xff0c;何不自己开发个股票预测分析软件&#xff0c;选择python因为够强大&#xff0c;它提供了很多高效便捷的数据分析工具包。 我们已经初步的接触与学习其中数…

Java中的String类:深入分析与高级应用

Java中的String类&#xff1a;深入分析与高级应用 1. String类基础1.1 概述1.2 不可变性的好处1.3 字符串常量池 2. 创建String对象3. String类常用方法4. 内存管理4.1 字符串常量池4.2 intern方法 5. String与StringBuilder/StringBuffer6. 性能考虑7. 结论 Java中的String类是…

【Bootstrap学习 day14】

分页 分页是通过将内容分成单独的页面来组织内容的过程&#xff0c;分页导航一般用于文章列表页&#xff0c;下载列表、图片列表等&#xff0c;由于数据很多&#xff0c;不可能在一页显示&#xff0c;一般分页导航包括上一页&#xff0c;下一页、数字页码等。 基础的分页 要创…

【Python机器学习】线性模型——用于二分类的线性模型

线性模型也广泛用于分类问题&#xff0c;对于二分类问题&#xff0c;可以用以下公式进行预测&#xff1a; yw[0]*x[0]w[1]*x[1]…………w[p]*x[p]b>0 公式与现行回归的公式非常类似&#xff0c;但没有返回特征的加权求和&#xff0c;而是为预测设置了阈值。如果函数值小于…

Unity 欧盟UMP用户隐私协议Android接入指南

Unity 欧盟UMP用户协议Android接入指南 官方文档链接开始接入mainTemplate.gradle 中引入CustomUnityPlayerActivity 导入UMP相关的包java类中新增字段初始化UMPSDK方法调用![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/d882171b068c46a1b956e80425f3a9cf.png)测…

Linux操作系统基础(06):Linux的文件类型和颜色

1.Linux文件类型 在Linux系统中&#xff0c;文件类型是指文件的种类或类型&#xff0c;它决定了系统对文件的处理方式&#xff0c;文件类型的作用在于告诉系统如何处理文件&#xff0c;不同类型的文件会有不同的默认行为和处理方式&#xff0c;Linux系统中常见的文件类型包括 …

轻松玩转书生·浦语大模型趣味Demo

轻松玩转书生浦语大模型趣味 Demo 轻松玩转书生浦语大模型趣味 Demo 1 大模型及 InternLM 模型简介 1.1 什么是大模型&#xff1f;1.2 InternLM 模型全链条开源 2 InternLM-Chat-7B 智能对话 Demo 2.1 环境准备2.2 模型下载2.3 代码准备2.4 终端运行2.5 web demo 运行 3 Lagen…

大数据 Hive - 实现SQL执行

文章目录 MapReduce实现SQL的原理Hive的架构Hive如何实现join操作小结 MapReduce的出现大大简化了大数据编程的难度&#xff0c;使得大数据计算不再是高不可攀的技术圣殿&#xff0c;普通工程师也能使用MapReduce开发大数据程序。 但是对于经常需要进行大数据计算的人&#xff…

QT5.14 实现ModbusTCP客户端 Demo

本文在QT5.14平台&#xff0c;基于QModbusClientTcp类&#xff0c;实现了客户端对单个寄存器的读写&#xff0c;用ModbusSlave做服务器做测试。 1.界面 (1)更改读按钮的名称为bt_Read (2)更改写按钮的名称为bt_Write 2.修改pro文件的第三行 greaterThan(QT_MAJOR_VERSION, 4)…

快速幂算法总结

知识概览 快速幂可以在O(logk)的时间复杂度之内求出来的结果。 例题展示 快速幂 题目链接 活动 - AcWing 系统讲解常用算法与数据结构&#xff0c;给出相应代码模板&#xff0c;并会布置、讲解相应的基础算法题目。https://www.acwing.com/problem/content/877/ 代码 #inc…

Rapberry Pi 4 安装VxWorks笔记

Rapberry Pi 4 安装VxWorks笔记 本文章发表与我的github page&#xff1a; Rapberry Pi 4 安装VxWorks笔记 | Hi, I am watershade. Welcome to my pages. 在github page会有更好体验和更多文章。 一、概述 ROS2推荐的操作系统是ubuntu,众所周知&#xff0c;linux并不是实时…

基于php应用的文件管理器eXtplorer部署网站并内网穿透远程访问

文章目录 1. 前言2. eXtplorer网站搭建2.1 eXtplorer下载和安装2.2 eXtplorer网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1.Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 1. 前言 通过互联网传输文件&#xff0c;是互联网最重要的应用之一&#xff0c;无论是…

7 种常见的前端安全攻击

文章目录 七种常见的前端攻击1.跨站脚本&#xff08;XSS&#xff09;2.依赖性风险3.跨站请求伪造&#xff08;CSRF&#xff09;4.点击劫持5.CDN篡改6. HTTPS 降级7.中间人攻击 随着 Web 应用程序对业务运营变得越来越重要&#xff0c;它们也成为更有吸引力的网络攻击目标。但不…

test mutation-02-变异测试 mutate-test-kata入门介绍

拓展阅读 开源 Auto generate mock data for java test.(便于 Java 测试自动生成对象信息) 开源 Junit performance rely on junit5 and jdk8.(java 性能测试框架。性能测试。压测。测试报告生成。) test 系统学习-04-test converate 测试覆盖率 jacoco 原理介绍 mutate-te…

SkyWalking介绍和Docker环境下部署

一、Skywalking概述 1、Skywalking介绍 Skywalking是分布式系统的应用程序性能监视工具&#xff0c;专为微服务&#xff0c;云原生架构和基于容器&#xff08;Docker&#xff0c;K8S,Mesos&#xff09;架构而设计&#xff0c;它是一款优秀的APM&#xff08;Application Perfo…
最新文章