(十)springboot实战——springboot3下的webflux项目mysql数据库事务处理

前言

WebFlux 是 Spring Framework 5.0 中引入的一种新型反应式编程模型,支持非阻塞 I/O,适用于高并发、高吞吐量的应用程序。在 WebFlux 应用程序中使用事务需要注意以下几点。使用 Reactive R2DBC:WebFlux 支持使用 Reactive R2DBC 访问关系型数据库。R2DBC 是一个反应式的数据库连接规范,它允许开发人员以响应式方式访问关系型数据库。在 WebFlux 应用程序中使用 Reactive R2DBC 可以实现非阻塞式的数据库操作。使用 @Transactional 注解:在 WebFlux 应用程序中,可以使用 @Transactional 注解声明事务边界,将多个数据库操作绑定到同一事务中。需要注意的是,@Transactional 注解需要与 Reactive R2DBC 结合使用,确保事务管理器与 R2DBC 兼容。遵循响应式编程原则:在 WebFlux 应用程序中,需要遵循响应式编程的原则,使用 Mono 和 Flux 等响应式类型来处理数据流,而不是传统的阻塞式方法。这意味着在事务中涉及到的所有操作,都需要返回 Mono 或 Flux 对象,并且保证响应式链条的正确性。

本节内容以关系型数据库mysql为例,通过使用spring-boot-starter-data-r2dbc框架完成关系型数据库的调用,并通过具体的案例实现webflux应用下的数据库事务管理,包含最佳的使用实战案例,通过spring的AOP实现最终的事务控制。

正文

①引入必要的pom组件,其中spring-boot-starter-aop可以不引入,如果不使用aop管理r2dbc的数据库事务

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-starter-webflux-ui -->
<dependency>
	<groupId>org.springdoc</groupId>
	<artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
	<version>2.3.0</version>
</dependency>

<!--        响应式 Spring Data R2dbc-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/io.asyncer/r2dbc-mysql -->
<dependency>
	<groupId>io.asyncer</groupId>
	<artifactId>r2dbc-mysql</artifactId>
	<version>1.0.6</version>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-test</artifactId>
	<scope>test</scope>
</dependency>

②创建控制层UserController请求接口测试方法:新增用户

@Operation(summary = "新增用户", description = "新增用户")
@PostMapping(value = "saveUser")
public Mono<ApiResponse> saveUser(@RequestBody User user) {
	Mono<User> userMono = userService.saveUser(user);
	return  userMono.map(ApiResponse::success);
}

③创建UserService的业务接口层

/**
 * 新增用户
 *
 * @param user
 * @return
 */
Mono<User> saveUser(User user);

 ④创建UserServiceImpl的业务实现层

    @Override
    public Mono<User> saveUser(User user) {
        return userRepository.save(user);
    }

 ⑤创建UserRepository数据操作层

package com.yundi.atp.repository;

import com.yundi.atp.entity.User;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;


@Repository
public interface UserRepository extends R2dbcRepository<User, Integer> {
    /**
     * 根据用户名获取用户信息
     *
     * @param name
     * @return
     */
    Mono<User> findUsersByName(@Param("name") String name);

    /**
     * 更新用户信息
     *
     * @param user
     * @return
     */
    @Query("update user set name = :#{#user.name}, age = :#{#user.age} where id = :#{#user.id}")
    Mono<User> updateUser(@Param("user") User user);


    /**
     * 动态更新用户信息
     *
     * @param user
     * @return
     */
    @Query("UPDATE User SET " +
            "name = CASE WHEN :#{#user.name} IS NULL THEN name ELSE :#{#user.name} END, " +
            "age = CASE WHEN :#{#user.age} IS NULL THEN age ELSE :#{#user.age} END " +
            "WHERE id = :#{#user.id}")
    Mono<User> updateUserQuery(@Param("user") User user);
}

⑥ 正常通过swagger工具访问用户新增接口,数据可以正常插入数据库

⑦在保存用户的业务方法中人为创建一个异常,验证是否能够正常保存数据

@Override
public Mono<User> saveUser(User user) {
	Mono<User> monoUser = userRepository.save(user);
    //人为创建一个数学异常
	System.out.println(1/0);
	return monoUser;
}

 ⑧使用swagger工具访问用户新增接口,从打印的日志来看,数据并没有插入成功,并抛出了异常,但是这里并没有数据库的事务处理

PS:这里可能会误导初学者,认为webflux的r2dbc具备天然的事务处理机制,其实不然,这是因为webflux是响应式非阻塞式编程,所有操作都是异步执行,导致业务方法会跳过用户保存的操作而先去执行异常的部分代码,直接消费异常处理结果,真正的用户保存方法并没有执行。

⑨使用@Transactional,加入事务处理注解,查看是否会有事务执行

@Transactional(rollbackFor = Exception.class)
@Override
public Mono<User> saveUser(User user) {
	Mono<User> mono = userRepository.save(user);
	System.out.println(1/0);
	return mono;
}

⑩使用swagger工具访问用户新增接口,从打印的日志来看,加入@Transactional确实切入了事务,但是由于异步执行,跳过了用户保存的方法,用户保存sql并未执行,这里的事务看起来并没有起什么作用

⑪使用Mono的flatMap方法将业务方法改为如下方式,先执行用户保存方法,在触发异常,查看打印结果

@Transactional(rollbackFor = Exception.class)
@Override
public Mono<User> saveUser(User user) {
	return userRepository.save(user).flatMap(item -> {
		System.out.println("item:" + item);
		System.out.println(1 / 0);
		return Mono.just(item);
	});
}

⑫使用swagger工具访问用户新增接口,从打印的日志来看,数据持久化sql操作真实执行了,但并没有真实插入数据库,事务确定已经真实生效,数据进行了回滚

⑬利用事务的传播机制,测试内层事务出现异常,内外层事务都会回滚

@Transactional(rollbackFor = Exception.class)
@Override
public Mono<User> saveUser(User user) {
	return userRepository.save(user).flatMap(item -> {
		System.out.println("item:" + item);
		return userRepository.findUsersByName(item.getName()).flatMap(it -> {
			System.out.println("it:" + it);
			System.out.println(1 / 0);
			return Mono.just(it);
		});
	});
}

⑭使用swagger工具访问用户新增接口,从打印的日志来看,内外层的sql都执行了,由于内层异常,导致全局的异常回滚,说明异常的传播机制是shengx

⑮最佳实践:使用aop切面实现数据库事务的统一管理,创建全局事务处理配置类ReactiveTransactionConfig

package com.yundi.atp.config;

import io.r2dbc.spi.ConnectionFactory;
import jakarta.annotation.Resource;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.r2dbc.connection.R2dbcTransactionManager;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
import org.springframework.transaction.interceptor.TransactionInterceptor;


@Configuration
@EnableTransactionManagement
public class ReactiveTransactionConfig {
    @Resource
    private ConnectionFactory connectionFactory;

    @Bean
    public ReactiveTransactionManager reactiveTransactionManager() {
        return new R2dbcTransactionManager(connectionFactory);
    }

    @Bean(name = "myTransactionInterceptor")
    public TransactionInterceptor myTransactionInterceptor() {
        //写事务控制
        DefaultTransactionAttribute txAttr_REQUIRED = new DefaultTransactionAttribute();
        txAttr_REQUIRED.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        txAttr_REQUIRED.setTimeout(2000);

        //读事务控制
        DefaultTransactionAttribute txAttr_REQUIRED_READONLY = new DefaultTransactionAttribute();
        txAttr_REQUIRED_READONLY.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

        NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();
        source.addTransactionalMethod("page*", txAttr_REQUIRED_READONLY);
        source.addTransactionalMethod("find*", txAttr_REQUIRED_READONLY);
        source.addTransactionalMethod("query*", txAttr_REQUIRED_READONLY);
        source.addTransactionalMethod("list*", txAttr_REQUIRED_READONLY);
        source.addTransactionalMethod("get*", txAttr_REQUIRED_READONLY);

        source.addTransactionalMethod("save*", txAttr_REQUIRED);
        source.addTransactionalMethod("add*", txAttr_REQUIRED);
        source.addTransactionalMethod("update*", txAttr_REQUIRED);
        source.addTransactionalMethod("remove*", txAttr_REQUIRED);
        source.addTransactionalMethod("delete*", txAttr_REQUIRED);
        return new TransactionInterceptor(reactiveTransactionManager(), source);
    }

    /**
     * 切点
     *
     * @return
     */
    @Bean
    public Advisor advisor() {
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        pointcut.setExpression("execution(* com.yundi.atp.service..*(..))");
        return new DefaultPointcutAdvisor(pointcut, myTransactionInterceptor());
    }


}

⑯使用swagger工具访问用户新增接口,从打印的日志来看,不加@Transactional,依然可以实现事务处理

结语

至此,关于webflux项目数据库事务的内容到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/361332.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

前端大厂面试题探索编辑部——第四期

目录 题目 单选题 题解 JavaScript的异步编程 Promise 异步函数async/await 关于Ajax 题目 这期只有一道题&#xff0c;我们来详细讲讲JavaScript的异步编程&#xff0c;当然&#xff0c;异步编程是许多编程语言都有的一种编程思想&#xff0c;我们在前端这个领域&#…

Python爬虫:XPath基本语法

XPath&#xff08;XML Path Language&#xff09;是一种用于在XML文档中定位元素的语言。它使用路径表达式来选择节点或节点集&#xff0c;类似于文件系统中的路径表达式。 不啰嗦&#xff0c;讲究使用&#xff0c;直接上案例。 导入 pip3 install lxmlfrom lxml import etr…

springboot中获取配置文件中属性值的几种方式

目录 第一章、使用Value注解第二章、使用PropertySource注解第三章、使用Configurationproperties注解第四章、使用Java Properties类第五章、使用Environment接口 友情提醒: 先看文章目录&#xff0c;大致了解文章知识点结构&#xff0c;点击文章目录可直接跳转到文章指定位置…

旋转花键磨损后如何修复?

旋转花键是机械传动中的一种&#xff0c;传递机械扭矩的&#xff0c;在轴的外表有纵向的键槽&#xff0c;套在轴上的旋转件也有对应的键槽&#xff0c;可保持跟轴同步旋转。在旋转的同时&#xff0c;有的还可以在轴上作纵向滑动&#xff0c;如变速箱换档齿轮等。 我们在使用某些…

VirtualBox安装Ubuntu22.04

目录 1、新建虚拟机 1.1、设置内存大小 1.2、创建虚拟硬盘 2、虚拟机设置 2.1、设置启动顺序​编辑 2.2、选择iso镜像文件 2.3、设置网络(桥接网卡) 3、启动 3.1、设置语言环境 3.2、系统更新安装(不更新) 3.3、选择键盘布局(默认即可) 3.4、选择安装类型 3.5、网…

项目实战:一个基于标准库的具备最值获取的万能容器实现

目录 写在前面 需求 分析 接口设计 项目实现 一些思考与总结 致谢 写在前面 刚刚介绍了变参模板和完美转发&#xff0c;现在换一换脑子做一个小的项目实战吧。博主最近学习的是标准库&#xff0c;总体来说&#xff0c;我认为标准库中的内容是很trivial的&#xff0c;重点…

STM32实时时钟(RTC)的配置和使用方法详解

实时时钟&#xff08;RTC&#xff09;是STM32系列微控制器上的一个重要模块&#xff0c;用于提供准确的时间和日期信息。在本文中&#xff0c;我们将详细介绍STM32实时时钟的配置和使用方法。 ✅作者简介&#xff1a;热爱科研的嵌入式开发者&#xff0c;修心和技术同步精进 ❤欢…

【C++】默认成员函数

与普通成员函数差距较大&#xff0c;形式对于我们比较陌生&#xff0c;但这是语法&#xff0c;是我们是必须要掌握的。 目录 类的默认成员函数&#xff1a;构造函数&#xff1a;概念&#xff1a;语法&#xff1a;特性&#xff1a; 析构函数&#xff1a;概念&#xff1a;语法&a…

Django模型(七)

一、聚合与分组查询 1.1、准备数据 class Cook(models.Model):"""厨师"""name = models.CharField(max_length=32,verbose_name=厨师名)level = models.IntegerField(verbose_name=厨艺等级)age = models.IntegerField(verbose_name=年龄)sect …

消息中间件之RocketMQ源码分析(三)

RocketMQ中的Consumer启动流程 RocketMQ客户端中有两个独立的消费者实现类分别为DefaultMQPullConsumer和DefaultMQPushConsumer&#xff0c; DefaultMQPullConsumer DefaultMQPullConsumer,该消费者使用时需要用户主动从Broker中Pull消息和消费消息&#xff0c;提交消费位点…

Springboot-SpringCloud学习

文章目录 web项目开发历史 SpringBootSpring以及Springboot是什么微服务第一个Springboot项目配置如何编写 yaml自动装配原理集成 web开发&#xff08;业务核心&#xff09;集成 数据库 Druid分布式开发&#xff1a;Dubbo&#xff08;RPC&#xff09; zookeeperswagger&#x…

掌握使用 React 和 Ant Design 的个人博客艺术之美

文章目录 前言在React的海洋中起航安装 Create React App安装Ant Design 打造个性化的博客风格通过路由实现多页面美化与样式定制部署与分享总结 前言 在当今数字时代&#xff0c;个人博客成为表达观点、分享经验和展示技能的独特平台。在这个互联网浪潮中&#xff0c;选择使用…

新火种AI|脑洞照进现实!马斯克正式官宣,已将芯片连入大脑...

作者&#xff1a;小岩 编辑&#xff1a;彩云 2024年1月30日&#xff0c;马斯克在社交媒体上宣布&#xff0c;他的公司Neuralink已经完成了首例人类大脑芯片植入手术&#xff0c;并且目前手术恢复状况良好。这一突破性进展意味着人类距离实现大脑与电脑的直接连接更近了一步。…

【mysql】InnoDB引擎的索引

目录 1、B树索引 1.1 二叉树 1.1.1 二分查找&#xff08;对半查找&#xff09; 1.1.2 树&#xff08;Tree&#xff09; 1.1.2.1 树的定义 1.1.2.2 树的特点 1.1.2.3 二叉树 1.1.2.4 二叉查找&#xff08;搜索&#xff09;树 1.2 B树 1.2.1 聚簇索引&#xff08;clust…

AI-数学-高中-14-函数零点存在定理和运用

原作者视频&#xff1a;【函数综合】【考点精华】1零点存在性定理的运用&#xff08;基础&#xff09;_哔哩哔哩_bilibili 1.定义&#xff1a; 2.零点存在定义&#xff1a; 2.函数零点与图像焦点的转化 零点如果不好求&#xff0c;将函数化成两个函数再画图&#xff0c;看函数…

防抖和节流?有什么区别?如何实现?

#一、是什么 本质上是优化高频率执行代码的一种手段 如&#xff1a;浏览器的 resize、scroll、keypress、mousemove 等事件在触发时&#xff0c;会不断地调用绑定在事件上的回调函数&#xff0c;极大地浪费资源&#xff0c;降低前端性能 为了优化体验&#xff0c;需要对这类…

计算机网络-物理层传输介质(导向传输介质-双绞线 同轴电缆 光纤和非导向性传输介质-无线波 微波 红外线 激光)

文章目录 传输介质及分类导向传输介质-双绞线导向传输介质-同轴电缆导向传输介质-光纤非导向性传输介质小结 传输介质及分类 物理层规定电气特性&#xff1a;规定电气信号对应的数据 导向传输介质-双绞线 双绞线的主要作用是传输数据和语音信息。它通过将两根导线以特定的方…

python爬虫2

1.table 是表格&#xff0c;tr是行&#xff0c;td是列 ul li是无序列标签用的较多&#xff0c;ol li是有序列标签 最基本的结构 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><title> Title </title>…

【JavaEE】UDP协议与TCP协议

作者主页&#xff1a;paper jie_博客 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 本文于《JavaEE》专栏&#xff0c;本专栏是针对于大学生&#xff0c;编程小白精心打造的。笔者用重金(时间和精力)打造&…

CSS复合选择器

目录 1.什么是复合选择器 2.后代选择器 3.子选择器 4.并集选择器 5.伪类选择器 5.1链接伪类选择器 5.2 :focus 伪类选择器 6.总结 7.补充 7.1相邻兄弟选择器&#xff08;也叫加号选择器&#xff09; 7.2通用兄弟选择器&#xff08;也叫波浪号选择器&#xff09; 1. 什…