SpringBoot整合Canal+RabbitMQ监听数据变更(对rabbit进行模块封装)

SpringBoot+Canal(监听MySQL的binlog)+RabbitMQ(处理保存变更记录)

在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。
使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是又结合了RabbitMQ来处理保存变更记录的操作。

  • 启动MySQL环境,并开启binlog
  • 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
  • Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
  • Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件

预先在model实体中准备

短信实体

@Data
@ApiModel(description = "短信实体")
public class MsmVo{
	
	@ApiModelProperty(value="phone")
	private String phone;

	@ApiModelProperty(value = "短信模板code")
	private  String templateCode;

	@ApiModelProperty(value="短信模板参数")
	private Map<String,Object> param;
}

排班实体

@Data
@ApiModel(description = "OrderMqVo")
public class OrderMqVo{
	
	@ApiModelProperty(value="可预约数")
	private Integer reserverdNumber

	@ApiModelProperty(value = "剩余预约数")
	private Integer availableNumber;

	@ApiModelProperty(value = "排班id")
	private String scheduleId;

	@ApiModelProperty(value = "短信实体")
	private MsmVo msmVo;
}

一、安装RabbitMQ

docker pull rabbitmq:nanagemnet
docker run -d -p 5672:5672 -p 12672:15672 --name rabbitmq rabbitmq:nanagement

访问:http://IP:15672
在这里插入图片描述

二、rabbit-util模块封装

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
</dependency>

创建一个RabbitService用来发送消息

@Service
public class RabbitService{
	
	@Autowired
	private RabbitTemplate rabbitTemplate;

	//发送消息
	public boolean sendMessage(String exchange,String routingKey,Object message){
		rabbitTemplate.convertAndSend(exchange,routingKey,message);
		return true;
	}
}

创建mq消息转化器

@Configuration
public class MQConfig{
	
	@Bean
	public MessageConverter messageConverter(){
		return new Jackson2JsonMessageConverter();
	}
}

添加常量配置类

public class MqConst{
	
	//预约下单
	public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
	public static final String ROUTING_ORDER = "order";
	//队列
	public static final String QUEUE_ORDER = "queue.order";

	//短信
	public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
	public static final String ROUTING_MSM_ITEM = "msm.item";
	pulib static final String Queue_MSM_item = "queue.msm.item";
}

三、短信模块service-sms

将二中的模块依赖引入

<dependency>
	<groupId>com.michael</groupId>
	<artifactId>rabbit_util</artifactId>
	<version>xxx</version>
</dependency>

配置文件application.properties

spring.rabbitmq.host=192.168.44.168
spring.rabbitmq.port=5672
spring.rabbit.uername=guest
spring.rabbitmq.password=guest

Service发送消息

public interface MsmService{
	//发送手机验证码
	boolean send(String phone,String code);
	
	//MQ使用发送短信的接口
	boolean send(MsmVo msmVo);
}
@Service
public class MsmServiceImpl implements MsmService{

	@Override
	public boolean send(String phone,String code){
		//判断手机号是否为空
		if(StringUtils.isEmpty(phone)){
			return false;
		}

		//整合阿里云相关参数,短信服务
		DefaultProfile profile = DefaultProfile.getProfile(ConstantPropertiesUtils.REGION_Id,
			ConstantPropertiesUtils.ACCESS_KEY_ID,
			ConstantPropertiesUtils.SECRET
		);
		IAcsClient client = new DefaultAcsClient(profile);
		CommonRequest request = new CommonRequest();

		request.setMethod(MethodType.POST);
		request.setDomain("dysmsapi.aliyuncs.com");
		request.setVersion("2018-08-08");
		request.setAction("SendSms");

		//手机号
		request.putQueryParameter("PhoneNumbers",phone);
		//签名名称
		request.putQueryParameter("SignName","我的网站");
		//模板
		request.putQueryParameter("TemplateCode","SMS_180051135");
		//验证码使用json格式{"code":"123456"}
		Map<String,Object> param = new HashMap();
		param.put("code",code);
		request.putQueryParameter("TemplateParam",JSONObject.toJSONString(param));
		
		//调用方法进行短信发送
		try{
			CommonResponse response = client.getCommonResponse(request);
			System.out.println(response.getData());
			return response.getHttpResponse().isSuccess();
		}catch(ServerException e){
			e.printStackTrace();
		}catch(ClientException e){
			e.printStackTrace();
		}
		return false;
	}
	
	@Override
	public boolean send(MsmVo msmVo){
		
		if(!StringUtils.isEmpty(msmVO.getPhone())){
			String code = (String)msmVo.getParam().get("code");
			boolean isSend = this.send(msmVo.getPhone(),code);
			return isSend;
		}
		return false;
	}
}

创建mq监控器

@Component
public class MsmReceiver{
	
	@Autowired
	private MsmService msmService;

	//监听
	@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = MqConst.QUEUE_MSM_ITEM,durable = "true"),
		exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),
		key = {MqConst.ROUTING_MSM_ITEM}
	))
	public void send(MsmVo msmVo,Message message,Channel channel){
		msmService.ssend(msmVo);
	}
}

四、业务类

生成订单之后,发送短信并更新数量

①、业务模块中引入依赖

rabbit-util

②、添加配置

spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

③、service接口以及实现类

@Override
public void update(Schedule schedule){
	schedule.setUpdata(new Date());
	scheduleRepository.save(schedule);
}

④、receiver包中创建MQ监听器

@Component
public class HospitalReceiver{
	
	@Autowired
	private ScheduleService scheduleService;

	@Autowired
	private RabbitService rabbitService;

	//监听
	@RabbitListener(
		bindings = @QueueBinding(
			value = @Queue(value = MqConst.QUEUE_ORDER,durable = "true"),
			exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),
			key = {MqConst.ROUTING_ORDER}
		)
	)
	public void receiver(OrderMqVo orderMqVo,Message message,Channel channle) throws IOException{
		//下单成功,更新数据
		Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());
		schedule.setReservedNumber(orderMqVo.getReservedNumber());
		schedule.setAvailableNumber(orderMqVo.getAvailableNumber);
		scheduleService.update(schedule);

		//发送短信
		MsmVo msmVo = orderMqVo.getMsmVo();
		if(null != msmVo){
			rebbitService.sendMessage(MqConst.QUEUE_MSM_ITEM,MqConst.ROUTING_MSM_ITEM,msmVo);
		}
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/122372.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

React事件绑定的方式有哪些?区别?

一、是什么 在react应用中&#xff0c;事件名都是用小驼峰格式进行书写&#xff0c;例如onclick要改写成onClick 最简单的事件绑定如下&#xff1a; class ShowAlert extends React.Component { showAlert() { console.log("Hi"); } render() { ret…

接口测试工具的实验,Postman、Swagger、knife4j(黑马头条)

一、Postman 最常用的接口测试软件&#xff0c;需要注意点&#xff1a;在进行post请求时&#xff0c;需要选择JSON形式发送 输入JSON字符串&#xff0c;比如&#xff1a; {"maxBehotTime": "2021-04-19 00:19:09","minBehotTime": "2021-…

MacOS升级后命令行出现xcrun: error: invalid active developer path报错信息

在Mac上用g编译cpp文件时&#xff0c;出现以下&#xff08;类似于工具环境问题的&#xff09;报错&#xff1a; 解决方案&#xff1a;重新安装最新版的MacOS Command Line Tools xcode-select --install重新尝试编译&#xff1a; 编译成功&#xff08;忽略这个warning&…

Leetcode—515.在每个树行中找最大值【中等】

2023每日刷题&#xff08;二十三&#xff09; Leetcode—515.在每个树行中找最大值 DFS实现代码 /*** Definition for a binary tree node.* struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* };*/ /*** Note: The returned arra…

说说对Fiber架构的理解?解决了什么问题?

一、问题 JavaScript引擎和页面渲染引擎两个线程是互斥的&#xff0c;当其中一个线程执行时&#xff0c;另一个线程只能挂起等待 如果 JavaScript 线程长时间地占用了主线程&#xff0c;那么渲染层面的更新就不得不长时间地等待&#xff0c;界面长时间不更新&#xff0c;会导…

【微服务部署】五、Jenkins+Docker一键打包部署NodeJS(Vue)项目的Docker镜像步骤详解

NodeJS&#xff08;Vue&#xff09;项目也可以通过打包成Docker镜像的方式进行部署&#xff0c;原理是先将项目打包成静态页面&#xff0c;然后再将静态页面直接copy到Nginx镜像中运行。 一、服务器环境配置 前面说明了服务器Nginx的安装和配置&#xff0c;这里稍微有些不同&a…

机器学习中的关键组件

机器学习中的关键组件 数据 每个数据集由一个个样本组成&#xff0c;大多时候&#xff0c;它们遵循独立同分布。样本有时也叫作数据点或数据实例&#xff0c;通常每个样本由一组称为特征或协变量的属性组成。机器学习会根据这些属性进行预测&#xff0c;预测得到的称为标签或…

2023亚太杯数学建模C题思路分析

文章目录 0 赛题思路1 竞赛信息2 竞赛时间3 建模常见问题类型3.1 分类问题3.2 优化问题3.3 预测问题3.4 评价问题 4 建模资料5 最后 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 竞赛信息 2023年第十三…

微信小程序使用阿里巴巴矢量图标

一&#xff0c;介绍 微信小程序使用图标有两种方式&#xff0c;一种是在线获取&#xff0c;一种是下载到本地使用&#xff0c; 第一种在线获取的有个缺点就是图标是灰色的&#xff0c;不能显示彩色图标&#xff0c;而且第一种是每次请求资源的&#xff0c;虽然很快&#xff0…

工业园区一般用多大规格的电表?

随着我国经济的快速发展&#xff0c;工业园区在各地区如雨后春笋般崛起。作为电力系统的重要组成部分&#xff0c;电表的选择与应用对于工业园区的稳定运行至关重要。那么&#xff0c;工业园区一般用的是多大规格的电表呢&#xff1f;下面&#xff0c;小编就来给大家揭秘一下&a…

基于SSM框架的管理系统-计算机毕设 附源码 23402

基于SSM框架的管理系统 摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。在目前的形势下&#xff0c;无论是从国家到企业再到家庭&#xff0c;计算机都发挥着其不可替代的作用&#xff0c;可以说…

数据源、映射器的复用

开发环境&#xff1a; Windows 11 家庭中文版Microsoft Visual Studio Community 2019VTK-9.3.0.rc0vtk-example参考代码目的&#xff1a;学习与总结 demo解决问题&#xff1a;复用球体数据源、映射器&#xff0c;vtkSmartPointer与std::vector、vtkNew与std::array的搭配使用…

overflow: auto滚动条跳到指定位置

点击对应模块跳转页面&#xff0c;滚动到对应模块&#xff0c;露出到可视范围 代码&#xff1a; scrollToCurrentCard() {// treeWrapper是包裹多个el-tree组件的父级元素&#xff0c;也是设置overflow:auto的元素let treeWrapper document.getElementsByClassName(treeWrapp…

调试 Mahony 滤波算法的思考 10

调试 Mahony 滤波算法的思考 1. 说在前面的2.Mahony滤波算法的核心思想3. 易懂的理解 Mahony 滤波算法的过程4. 其他的一些思考5. 民间 9轴评估板 1. 说在前面的 之前调试基于QMI8658 6轴姿态解算的时候&#xff0c;我对Mahony滤波的认识还比较浅薄。初次的学习和代码的移植让…

超全大厂UI库分享,可免费套用!

今天我们要给大家分享的是TDesign、Arco Design、Ant Design、Material design等6个优秀的大厂UI库&#xff0c;一次性打包送给大家&#xff0c;通通免费用。大厂UI库都是经过无数次的事件检验的&#xff0c;扛住了许多种使用场景和突发情况的组件资源库&#xff0c;是前人的经…

超声波热量表和电磁热量表有哪些区别?

随着我们能源消耗日益增长&#xff0c;热量计量已成为节能减排、能源管理的重要手段。热量表是用于测量热能消耗的仪表&#xff0c;其中超声波热量表和电磁热量表是常见的两种类型。下面&#xff0c;就由小编来为大家详细的介绍下超声波热量表和电磁热量表的区别&#xff0c;一…

C语言C位出道心法(四):文件操作

C语言C位出道心法(一):基础语法 C语言C位出道心法(二):结构体|结构体指针|链表 C语言C位出道心法(三):共用体|枚举 C语言C位出道心法(四):文件操作 一:C语言操作文件认知升维: 二:文件打开 三:文件读写操作 忙着去耍帅,后期补充完整.................................

SuperMap iDesktopX基于地形DEM数据做最佳路径分析

问题1: 现有某山区的DEM高程数据,以及该地区电塔位置数据(DT)、电力维护工作基地(GZJD)(电力工作人员的工作居住地)。为了安全稳定的供电,电力工程师需要随时对所有的电塔进行检查维护。为了减少工作人员在路上的耗费,我们需要根据地形、电塔和工作基地的位置点来进行路径分…

PBJ | IF=13.8 利用ChIP-seq和ATAC-seq技术揭示MdRAD5B调控苹果耐旱性的双重分子作用机制

2023年10月24日&#xff0c;西北农林科技大学园艺学院管清美教授团队在Plant Biotechnology Journal&#xff08;最新IF&#xff1a;13.8&#xff09;上发表题为“The chromatin remodeller MdRAD5B enhances drought tolerance by coupling MdLHP1-mediated H3K27me3 in apple…

C语言实现将一个数组逆序输出,使用指针数组操作

完整代码&#xff1a; // 将一个数组逆序输出&#xff0c;使用指针数组操作 #include<stdio.h>//将一个数组逆序输出 void reverse(int *arr,int len){//头指针int *startarr;//尾指针int *endarrlen-1;//通过交换数组中前后所有的数&#xff0c;来使数组逆序while (sta…
最新文章