Golang使用消息队列(RabbitMQ)

最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。

使用RabbitMQ的架构

在这里插入图片描述

代码

因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节

  • 注意交换机和队列的绑定,一定要细心
  • 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
  • 下列的内容不包含RabbitMQ持久化的内容
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"strings"
)

func InitRabbitMQ() *amqp.Connection {
	mq := "amqp"
	host := "127.0.0.1"
	port := "5672"
	user := "root"
	pwd := "root"
	dns := strings.Join([]string{mq, "://", user, ":", pwd,
		"@", host, ":", port, "/"}, "")
	conn, err := amqp.Dial(dns)
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	return conn
}

func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {
	// 队列信息
	exchangeName := "main_exchange"
	queueName := fmt.Sprintf("user_queue_%s", userID)
	messageTTL := int32(300000)

	// 声明主交换机
	err := ch.ExchangeDeclare(
		exchangeName, // 交换机名
		"direct",     // Exchange type
		false,        // Durable
		false,        // Auto-deleted
		false,        // Internal
		false,        // No-wait
		nil,          // Arguments
	)
	if err != nil {
		log.Fatalf("Failed to declare an main exchange: %v", err)
	}

	// 声明用户队列
	_, err = ch.QueueDeclare(
		queueName, // 队列名
		false,     // Durable
		false,     // Delete when unused
		false,     // Exclusive
		false,     // No-wait
		amqp.Table{
			"x-dead-letter-routing-key": "dead",          // routing-key
			"x-dead-letter-exchange":    "dead_exchange", // 死信交换机
			"x-message-ttl":             messageTTL,      // TTL
		},
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定
	err = ch.QueueBind(queueName, userID, "main_exchange", false, nil)
	if err != nil {
		log.Fatalf("Failed to bind queue to exchange: %v", err)
	}

	return ch
}

func InitDeadExchangeAndQueue(ch *amqp.Channel) {
	// 声明死信交换机
	err := ch.ExchangeDeclare(
		"dead_exchange",
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("Failed to declare an dead exchange: %v", err)
	}

	// 声明一个死信队列
	_, err = ch.QueueDeclare(
		"dead_queue",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定
	err = ch.QueueBind("dead_queue", "dead", "dead_exchange", false, nil)
	if err != nil {
		log.Fatalf("Failed to bind queue to exchange: %v", err)
	}
}

func PublishMessage(ch *amqp.Channel, userID, fileID string) {
	// 用户信息
	message := fmt.Sprintf("%s|%s", userID, fileID)
	exchangeName := "main_exchange"
	// 发布用户消息
	err := ch.Publish(
		exchangeName, // Exchange
		userID,       // Routing key
		false,        // Mandatory
		false,        // Immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	if err != nil {
		log.Fatalf("Failed to publish a message: %v", err)
	}
	log.Printf("Message sent to user %s: %s", userID, message)
}

func ConsumeTTL(ch *amqp.Channel) {
	// 声明死信交换机
	err := ch.ExchangeDeclare(
		"dead_exchange", // 交换机名
		"direct",        // Exchange type
		true,            // Durable
		false,           // Auto-deleted
		false,           // Internal
		false,           // No-wait
		nil,             // Arguments
	)
	if err != nil {
		log.Fatalf("Failed to declare a dead letter exchange: %v", err)
	}

	// 创建消费者并阻塞等待消费死信队列中的消息
	megs, err := ch.Consume(
		"dead_queue", // Queue
		"",           // Consumer
		false,        // Auto-acknowledge
		false,        // Exclusive
		false,        // No-local
		false,        // No-wait
		nil,          // Args
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)
	}

	// 使用无限循环一直监听
	fmt.Println("Waiting for message from dead_queue......")
	for d := range megs {
		// 实际中,处理消息的逻辑,例如删除文件或其他操作
		fmt.Println(string(d.Body))

		// 消费完成后手动确认消息
		err = d.Ack(false)
		if err != nil {
			log.Fatalf("Failed to ack message: %v", err)
		}
	}
}

func Consume(ch *amqp.Channel, userID string) {
	// 下面的信息可以通过前后端进行传递
	queueName := fmt.Sprintf("user_queue_%s", userID)

	// 消费消息
	megs, err := ch.Consume(
		queueName, // Queue
		"",        // Consumer
		true,      // Auto-acknowledge
		false,     // Exclusive
		false,     // No-local
		false,     // No-wait
		nil,       // Args
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer: %v", err)
	}

	// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听
	d, ok := <-megs
	if !ok {
		log.Fatalf("Failed to get message: %v", err)
	}
	fmt.Println(string(d.Body))
	// 消息完成后确认消息
	err = d.Ack(true)
	if err != nil {
		log.Fatalf("Failed to ack message: %v", err)
	}
}

func main() {
	// 获取客户端
	client := InitRabbitMQ()
	defer client.Close()

	ch, err := client.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	//ConsumeTTL(ch)

	// 构造dead_exchange及dead_queue
	// InitDeadExchangeAndQueue(ch)

	// 假设这是web请求信息
	//var userID1 = "test-id1"
	//var fileID1 = "file1"

	// 构造main_exchange及user_queue
	//ch = InitMainExchangeAndQueue(ch, userID1)
	// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中
	//PublishMessage(ch, userID1, fileID1)

	//time.Sleep(20 * time.Second)

	// 模拟后端消费消息
	//Consume(ch, userID1)

	// 针对用户2:模拟其不被后端消费,过期到死信队列中
	var userID2 = "test-id2"
	var fileID2 = "file2"
	ch = InitMainExchangeAndQueue(ch, userID2)
	PublishMessage(ch, userID2, fileID2)
	// 注意这个消息没有被消费,理论上应当被死信队列消费
}

从dead_exchange中消费:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/82511.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue中实现订单支付倒计时

需求 创建订单后15分钟内进行支付&#xff0c;否则订单取消。 实现 思路&#xff1a; 获取当前时间和支付超时时间&#xff08;在创建时间的基础上增加15分钟即为超时时间&#xff0c;倒计时多久根据自己的实际需求&#xff0c;这里为15分钟&#xff09;&#xff0c;支付超时…

后端开发12.商品模块

概述 简介 商品模块这个设计的非常复杂 效果图 数据库

FPGA原理与结构——ROM IP的使用与测试

一、前言 本文介绍Block Memory Generator v8.4 IP核 实现ROM&#xff0c;在学习一个IP核的使用之前&#xff0c;首先需要对于IP核的具体参数和原理有一个基本的了解&#xff0c;具体可以参考&#xff1a; FPGA原理与结构——块RAM&#xff08;Block RAM,BRAM&#xff09;http…

KUST_LI计算机视觉实验室服务器安装与管理

第一步&#xff1a;安装 Linux-Ubuntu系统 系统语言设置为英文 ENGLISH&#xff0c;防止系统 BUG&#xff1b;选择-清除整个磁盘并安装系统&#xff1b;设置用户名和密码&#xff0c;实验室统一其余全部默认设置 开机后设置磁盘挂载 在系统设置中找到 desk 打开&#xff0c;…

从零开始 Spring Cloud 12:Sentinel

从零开始 Spring Cloud 12&#xff1a;Sentinel 1.初识 Sentinel 1.1雪崩问题 1.1.1什么是雪崩问题 微服务中&#xff0c;服务间调用关系错综复杂&#xff0c;一个微服务往往依赖于多个其它微服务。 如图&#xff0c;如果服务提供者I发生了故障&#xff0c;当前的应用的部分…

简单认识Docker的资源控制

文章目录 一、CPU资源限制1.设置CPU使用率上限2.设置CPU资源占用比&#xff08;设置多个容器才有效&#xff09;3.设置容器与CPU绑核 二、内存资源限制三、对磁盘I/O配额的限制 一、CPU资源限制 1.设置CPU使用率上限 Linux通过CFS&#xff08;Completely Fair Scheduler&#…

【jenkins】jenkins流水线构建打包jar,生成docker镜像,重启docker服务的过程,在jenkins上一键完成,实现提交代码自动构建的功能

【jenkins】jenkins流水线构建打包jar&#xff0c;生成docker镜像&#xff0c;重启docker服务的过程&#xff0c;在jenkins上一键完成&#xff0c;实现提交代码自动构建&#xff0c;服务重启&#xff0c;服务发布的功能。一键实现。非常的舒服。 1. 启动脚本 shell脚本 这是 s…

vue3 路由缓存问题

目录 解决问题的思路&#xff1a; 解决问题的方案&#xff1a; 1、给roter-view添加key&#xff08;破坏复用机制&#xff0c;强制销毁重建&#xff09; 2、使用beforeRouteUpdate导航钩子 3、使用watch监听路由 vue3路由缓存&#xff1a;当用户从/users/johnny导航到/use…

【王道-进程与线程】

#pic_center R 1 R_1 R1​ R 2 R^2 R2 目录 知识框架No.0 引言No.1 进程的概念、组成、特征一、进程的概念二、进程的组成1、PCB进程控制块2、程序段/数据段 三、程序是如何运行的&#xff1f;四、进程的特征五、总结 No.2 进程的状态转换和组织一、进程的状态1、创建态、就绪态…

(牛客网)链表相加(二)

嗯哼~ 题目 描述 假设链表中每一个节点的值都在 0 - 9 之间&#xff0c;那么链表整体就可以代表一个整数。 给定两个这种链表&#xff0c;请生成代表两个整数相加值的结果链表。 数据范围&#xff1a;0 ≤ n,m ≤ 1000000&#xff0c;链表任意值 0 ≤ val ≤ 9 要求&#x…

c++优先级队列的模拟实现代码

了解&#xff1a; 1.优先队列是一种容器适配器&#xff0c;根据严格的弱排序标准&#xff0c;它的第一个元素总是它所包含的元素中最大的。 2. 类似于堆&#xff0c;在堆中可以随时插入元素&#xff0c;并且只能检索最大堆元素(优先队列中位于顶部的元素)。 3. 优先队列被实现为…

IDEA 如何制作代码补丁?IDEA 生成 patch 和使用 patch

什么是升级补丁&#xff1f; 比如你本地修复的 bug&#xff0c;需要把增量文件发给客户&#xff0c;很多场景下大家都需要手工整理修改的文件&#xff0c;并整理好目录&#xff0c;这个很麻烦。那有没有简单的技巧呢&#xff1f;看看 IDEA 生成 patch 和使用 patch 的使用。 介…

Ubuntu安装Apache+Php

环境&#xff1a;ubuntu 22.04 虚拟机 首先更新一下 sudo apt-get update sudo apt-get upgrade安装Apache2&#xff1a; sudo apt-get install apache2 输入y&#xff0c;继续。等着他恐龙抗浪抗浪的下载安装就好了 打开浏览器访问http://localhost/ 安装php&#xff1a; …

【Linux】进程信号篇Ⅰ:信号的产生(signal、kill、raise、abort、alarm)、信号的保存(core dump)

文章目录 一、 signal 函数&#xff1a;用户自定义捕捉信号二、信号的产生1. 通过中断按键产生信号2. 调用系统函数向进程发信号2.1 kill 函数&#xff1a;给任意进程发送任意信号2.2 raise 函数&#xff1a;给调用进程发送任意信号2.3 abort 函数&#xff1a;给调用进程发送 6…

【C语言】数组概述

&#x1f6a9;纸上得来终觉浅&#xff0c; 绝知此事要躬行。 &#x1f31f;主页&#xff1a;June-Frost &#x1f680;专栏&#xff1a;C语言 &#x1f525;该篇将带你了解 一维数组&#xff0c;二维数组等相关知识。 目录&#xff1a; &#x1f4d8;前言&#xff1a;&#x1f…

vue3+ts+vite使用el-breadcrumb实现面包屑组件,实现面包屑过渡动画

简介 使用 element-plus 的 el-breadcrumb 组件&#xff0c;实现根据页面路由动态生成面包屑导航&#xff0c;并实现面包屑导航的切换过渡动画 一、先看效果加粗样式 1.1 静态效果 1.2 动态效果 二、全量代码 <script lang"ts" setup> import { ref, watch…

JVM基础了解

JVM 是java虚拟机。 作用&#xff1a;运行并管理java源码文件锁生成的Class文件&#xff1b;在不同的操作系统上安装不同的JVM&#xff0c;从而实现了跨平台的保证。一般在安装完JDK或者JRE之后&#xff0c;其中就已经内置了JVM&#xff0c;只需要将Class文件交给JVM即可 写好的…

安装pyrender和OSMesa

1&#xff09;安装 pyrender Pyrender是一个基于OpenGL的库&#xff0c;可以加载和渲染三维网格、点云、相机等对象3。 pip install pyrender 2&#xff09;理解PyOpenGL和OSMesa的关系是: PyOpenGL是Python的OpenGL绑定库&#xff08;接口壳子&#xff09;,它提供了在Python中…

day9 STM32 I2C总线通信

I2C总线简介 I2C总线介绍 I2C&#xff08;Inter-Integrated Circuit&#xff09;总线&#xff08;也称IIC或I2C&#xff09;是由PHILIPS公司开发的两线式串行总线&#xff0c;用于连接微控制器及其外围设备&#xff0c;是微电子通信控制领域广泛采用的一种总线标准。 它是同步通…

基于ACF,AMDF算法的语音编码matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 .......................................................................... plotFlag …