go对rabbitmq基本操作

一、安装rabbitmq

  • 1、直接使用docker拉取镜像

    docker pull rabbitmq:3.8
    
  • 2、启动容器

    docker run \
     -e RABBITMQ_DEFAULT_USER=admin \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     -v mq-plugins:/plugins \
     --name rabbit01 \
     --hostname rabbit01 --restart=always \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3.8
    
  • 3、关于端口的介绍

    • 15672的给浏览器控制台使用的
    • 5672是给程序调用的
  • 4、进入到rabbit01容器中

    docker exec -it rabbit01 /bin/bash
    
  • 5、开启可视化界面操作

    rabbitmq-plugins enable rabbitmq_management
    
  • 6、客户端直接访问xx:15672

  • 7、或者直接用别人搞好的镜像

    docker run \
     -e RABBITMQ_DEFAULT_USER=admin \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     -v mq-plugins:/plugins \
     --name rabbit02 \
     --hostname rabbit02 --restart=always \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3.8-management
    

二、go语言对rabbitmq基本操作

  • 1、安装依赖包

    go get github.com/streadway/amqp
    
  • 2、基本的连接操作

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    )
    
    func main() {
    	// 连接rabbitmq
        // conn,_ := amqp.Dial("amqp://用户名:密码@IP:端口号/虚拟机空间名称")   // 端口号:5672
    	conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672
    	defer conn.Close()
    
    	// 打开通道
    	ch, err := conn.Channel()
    	fmt.Println(err)
    	defer ch.Close()
    }
    
  • 3、由于部分每个地方都要使用,封装成一个方法

    package utils
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    )
    
    func RabbitmqUtils() *amqp.Channel {
    	// 连接rabbitmq
    	conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672
    	//defer conn.Close()
    	// 打开通道
    	ch, err := conn.Channel()
    	fmt.Println(err)
    	//defer ch.Close()
    	return ch
    }
    
  • 4、创建一个队列,然后到可视化界面查看是否自动创建

    func main() {
    	// 创建一个队列
        // durable, autoDelete, exclusive, noWait bool
    	queue, err := utils.RabbitmqUtils().QueueDeclare("simple_queue", false, false, false, false, nil)
    	fmt.Println(queue.Name, err)
    }
    

    在这里插入图片描述

  • 5、关于创建队列几个参数的介绍

    • 第一个参数是队列名称
    • 第二个参数是队列是否持久化
    • 第三个参数是是否自动删除
    • 第四个参数是队列是否可以被其他队列访问
    • 第五个参数是设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度

三、简单模式

  • 1、根据官网图来看,简单模式是不需要交换机的

    在这里插入图片描述

  • 2、定义生产者,向队列中发送消息(注意要先创建队列)

    func main() {
        /**
    	第一个参数是交换机名称
    	第二个参数是队列名称
    	第三个参数是 如果生产者生产的任务没有正常进入队列中,设置为true会返还给生产者,设置为false会直接丢弃
    	第四个参数是 路由的时候
    	第五个参数是消息体
    	*/
    	err := utils.RabbitmqUtils().Publish("", "simple_queue", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    	fmt.Println(err)
    }
    
  • 3、查看可是界面是否存在一条消息

  • 4、创建消费者,来获取消息内容

    /**
    第一个参数是队列名称
    第二个参数自己给当前消费者命名
    第三个参数是否自动应答
    第三个参数队列是否可以被其他队列访问
    第四个参数
    第五个参数设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度
    */
    msgChan, err := utils.RabbitmqUtils().Consume("simple_queue", "", false, false, false, false, nil)
    fmt.Println(err)
    for msg := range msgChan {
        fmt.Println(string(msg.Body))
    }
    

四、工作模式

  • 1、工作模式是指一个生产者多个消费者,在简单模式上扩展成多个消费者,每个消费者只能交替来消费消息

  • 2、定义2个消费者来消费消息

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("work_queue", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 3、生产多条消息

    func main() {
    	for i := 0; i < 10; i++ {
    		_ = utils.RabbitmqUtils().Publish("", "work_queue", false, false, amqp.Publishing{
    			Body: []byte(fmt.Sprintf("hello word %d", i)),
    		})
    	}
    }
    
  • 4、消费结果

    在这里插入图片描述

五、发布订阅模式

  • 1、发布订阅模式同样是一个生产者生产消息,多个消费者来消费,与上面的工作模式的区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者都可以消费到消息

  • 2、使用goapi来创建交换机和队列

    func main() {
    	// 1.创建2个队列
    	queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue", true, false, false, true, nil)
    	queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue", true, false, false, true, nil)
    	// 2.创建一个交换机
    	_ = utils.RabbitmqUtils().ExchangeDeclare("first_exchange", amqp.ExchangeDirect, true, false, false, false, nil)
    	// 3.队列和交换机绑定在一起
    	_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "", "first_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "", "first_exchange", true, nil)
    }
    
  • 3、消费者只需要绑定队列来消费消息就可以

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("first_queue", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 4、生产者只需要把消息发送到交换机里面就可以,交换机会根据绑定的队列来推送消息

    func main() {
    	_ = utils.RabbitmqUtils().Publish("first_exchange", "", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    }
    
  • 5、可以查看控制台两个消费者都接收到消息

六、路由模式

  • 1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加的路由key

  • 2、使用go-api创建交换机和队列,并且对其绑定

    func main() {
    	// 1.创建2个队列
    	queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue_key", true, false, false, true, nil)
    	queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue_key", true, false, false, true, nil)
    	// 2.创建一个交换机
    	err := utils.RabbitmqUtils().ExchangeDeclare("second_exchange", amqp.ExchangeDirect, true, false, false, false, nil)
    	if err != nil {
    		fmt.Println(err)
    	}
    	// 3.队列和交换机绑定在一起
    	_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "error", "second_exchange", true, nil)
    }
    
  • 3、定义消费者

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("first_queue_key", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 4、定义生产者

    func main() {
        // 消费者会根据绑定的路由key来获取消息
    	_ = utils.RabbitmqUtils().Publish("second_exchange", "error", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    }
    

七、主题模式

  • 1、主题模式和上面路由模式差不多,只是多了一个模糊匹配
    • *表示只匹配一个单词
    • #表示匹配多个单词

八、简单对其封装

  • 1、封装代码

    package utils
    
    import (
    	"errors"
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    // MQURL url的格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
    const MQURL = "amqp://admin:123456@localhost:5672//"
    
    type RabbitMQ struct {
    	conn    *amqp.Connection
    	channel *amqp.Channel
    	MQUrl   string
    }
    
    // NewRabbitMQ 创建RabbitMQ的结构体实例
    func NewRabbitMQ() *RabbitMQ {
    	rabbitMQ := &RabbitMQ{
    		MQUrl: MQURL,
    	}
    	var err error
    	// 创建rabbitMQ连接
    	rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)
    	if err != nil {
    		rabbitMQ.failOnErr(err, "创建连接错误")
    	}
    	rabbitMQ.channel, err = rabbitMQ.conn.Channel()
    	if err != nil {
    		rabbitMQ.failOnErr(err, "获取channel失败")
    	}
    	return rabbitMQ
    }
    
    // Binding 创建交换机和队列并且绑定在一起
    func (r *RabbitMQ) Binding(queueName, exchange, key, routerKey string) {
    	// 1.创建1个队列
    	queue1, err := r.channel.QueueDeclare(queueName, true, false, false, true, nil)
    	if err != nil {
    		r.failOnErr(err, "创建队列失败")
    	}
    	if exchange != "" && key == "" {
    		r.failOnErr(errors.New("错误"), "请传递交换机链接类型")
    	}
    	if exchange != "" {
    		// 2.创建一个交换机
    		err1 := r.channel.ExchangeDeclare(exchange, key, true, false, false, false, nil)
    		if err1 != nil {
    			r.failOnErr(err, "创建交换机失败")
    		}
    		// 3.队列和交换机绑定在一起
    		if err := r.channel.QueueBind(queue1.Name, routerKey, exchange, true, nil); err != nil {
    			fmt.Println("1111")
    			r.failOnErr(err, "交换机和队列绑定失败")
    		}
    	}
    	fmt.Println("创建成功")
    }
    
    // failOnErr 定义内部错误处理
    func (r *RabbitMQ) failOnErr(err error, message string) {
    	if err != nil {
    		log.Fatalf("%s:%s", message, err)
    		panic(fmt.Sprintf("%s:%s", message, err))
    	}
    }
    
    func (r *RabbitMQ) Close() {
    	defer func(Conn *amqp.Connection) {
    		err := Conn.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭链接失败")
    		}
    	}(r.conn)
    	defer func(Channel *amqp.Channel) {
    		err := Channel.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭通道失败")
    		}
    	}(r.channel)
    }
    
    func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) {
    	err := r.channel.Qos(prefetchCount, prefetchSize, global)
    	if err != nil {
    		r.failOnErr(err, "限流失败")
    	}
    }
    
    // Publish 发布者
    func (r *RabbitMQ) Publish(exchange, routerKey, message string) {
    	// 2.发送数据到队列中
    	if err := r.channel.Publish(
    		exchange,
    		routerKey,
    		false, // 如果为true的时候会根据exchange的类型和routKey规则,如果无法找到符合条件的队列那么会把发送的消息发挥给发送者
    		false, // 如果为true的时候当exchane发送消息到队列后发现队列上没有绑定消费者则会把消息发还给发送者
    		amqp.Publishing{
    			Body: []byte(message),
    		},
    	); err != nil {
    		r.failOnErr(err, "发送消息失败")
    	}
    	fmt.Println("恭喜你,消息发送成功")
    }
    
    // Consumer 消费者
    func (r *RabbitMQ) Consumer(queueName string, callback func(message []byte)) {
    	// 2.接收消息
    	message, err := r.channel.Consume(
    		queueName,
    		"",    // 区分多个消费者
    		true,  // 是否自动应答
    		false, // 是否具有排他性
    		false, // 如果为true的时候,表示不能将同一个connection中发送的消息传递给connection中的消费者
    		false, // 队列消费是否阻塞
    		nil,
    	)
    	if err != nil {
    		r.failOnErr(err, "接收消息失败")
    	}
    	fmt.Println("消费者等待消费...")
    	forever := make(chan bool)
    	// 使用协程处理消息
    	go func() {
    		for d := range message {
    			log.Printf("接收到的消息:%s", d.Body)
    			callback(d.Body)
    		}
    	}()
    	<-forever
    }
    
  • 2、简单模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("simple_queue1", func(message []byte) {
    		fmt.Println(string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("simple_queue1", "", "", "")
    	defer mq.Close()
    	mq.Publish("", "simple_queue1", "你好水痕")
    }
    
  • 3、工作模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("work_queue1", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	for i := 0; i < 10; i++ {
    		mq.Publish("", "work_queue1", fmt.Sprintf("你好水痕%d", i))
    	}
    }
    
  • 4、交换机带路由的时候

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "info")
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "error")
    	mq.Binding("first_queue2", "first_exchange1", amqp.ExchangeDirect, "info")
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("first_queue2", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	mq.Publish("first_exchange1", "error", "你好水痕")
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/190067.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构 / 计算机内存分配

1. Linux 32位系统内存分配 栈(stack): 先进后出, 栈区变量先定义的后分配内存, 栈区地址从高到低分配堆(heap): 先进先出, 栈区变量先定义的先分配内存, 堆区地址从低到高分配堆栈溢出: 表示的是栈区内存耗尽, 称为溢出. 例如: 每次调用递归都需要在栈区申请内存, 如果递归太深…

【LLM_04】自然语言处理基础_2

一、神经网络1、循环神经网络&#xff08;RNN&#xff09;2、门控循环单元&#xff08;GRU&#xff09;3、长短期记忆网络&#xff08;LSTM&#xff09;4、双向RNN5、卷积神经网络&#xff08;CNN&#xff09; 二、注意力机制1、注意力机制原理介绍2、注意力机制的各种变式3、注…

车载通信架构 —— 传统车内通信网络FlexRay(较高速度高容错、较灵活拓扑结构)

车载通信架构 —— 传统车内通信网络FlexRay(较高速度高容错、较灵活拓扑结构) 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,…

leetCode 1080.根到叶路径上的不足节点 + 递归 + 图解

给你二叉树的根节点 root 和一个整数 limit &#xff0c;请你同时删除树中所有 不足节点 &#xff0c;并返回最终二叉树的根节点。假如通过节点 node 的每种可能的 “根-叶” 路径上值的总和全都小于给定的 limit&#xff0c;则该节点被称之为 不足节点 &#xff0c;需要被删除…

目录树自动生成器 golang+fyne

go tree 代码实现请看 gitee 仓库链接 有很多生成目录树的工具&#xff0c;比如windows自带的tree命令&#xff0c;nodejs的treer&#xff0c;tree-cli等等。这些工具都很成熟、很好用&#xff0c;有较完善的功能。 但是&#xff0c;这些工具全部是命令式的&#xff0c;如果…

vs2019中出现Debug Error的原因

一般出现这种错误表示你的某个变量没有正确赋值&#xff0c;或者说本身在你的C程序中加了assert断言&#xff0c;assert的作用是先计算表达式expression,如果其值为假&#xff0c;那么它会打印一条错误信息 #include<assert.h> void assert(int expression); 例子&…

01、copilot+pycharm

之——free for student 目录 之——free for student 杂谈 正文 1.for student 2.pycharm 3.使用 杂谈 copilot是github推出的AI程序员&#xff0c;将chatgpt搬到了私人终端且无token限制&#xff0c;下面是使用方法。 GitHub Copilot 是由 GitHub 与 OpenAI 合作开发的…

网络篇---第一篇

系列文章目录 文章目录 系列文章目录前言一、HTTP 响应码有哪些?分别代表什么含义?二、Forward 和 Redirect 的区别?三、Get 和 Post 请求有哪些区别?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男…

【ArcGIS Pro微课1000例】0037:ArcGIS Pro中模型构建器的使用---以shp批量转kml/kmz为例

文章目录 一、ArcGIS Pro模型构建器介绍二、shp批量转kml/kmz1. 打开模型构建器2. 添加工作空间4. 添加【创建要素图层】工具5. 添加【图层转kml】工具6. 输出文件命名7. 运行模型三、模型另存为1.py文件2. 保存为工具一、ArcGIS Pro模型构建器介绍 模型构建器是一种可视化编程…

项目去除git版本控制

我 | 在这里 &#x1f575;️ 读书 | 长沙 ⭐软件工程 ⭐ 本科 &#x1f3e0; 工作 | 广州 ⭐ Java 全栈开发&#xff08;软件工程师&#xff09; &#x1f383; 爱好 | 研究技术、旅游、阅读、运动、喜欢流行歌曲 ✈️已经旅游的地点 | 新疆-乌鲁木齐、新疆-吐鲁番、广东-广州…

C++连接MySQL失败解决

通过localhost连接&#xff0c;错误 遂改用127.0.0.1&#xff0c;协议不匹配&#xff0c;错误 在my.conf添加skip–sslon&#xff0c;重启mysql&#xff0c;连接成功 ref: https://www.cnblogs.com/walkersss/p/17037086.html

从前序与中序遍历序列构造二叉树(C++实现)

从前序与中序遍历序列构造二叉树 题目思路分析代码代码讲解 题目 思路分析 我们可以通过递归实现的二叉树构建函数。它根据给定的先序遍历序列和中序遍历序列构建一棵二叉树&#xff0c;并返回根节点。可以创建一个_build 函数&#xff0c;该函数负责构建二叉树的节点&#xff…

【深入剖析K8s】容器技术基础(三):深入理解容器镜像 文件角度

容器里的进程‘看到’’的文件系统 可能你立刻就能想到,这应该是&#xff3f;个关于MountNamespace的问题:容器里的应用进程理应‘看到”一套完全独立的文件系统这样它就可以在自己的容器目录&#xff08;比如&#xff0f;tmp&#xff09;下进行操作’而完全不会受宿主机以及其…

DataGrip 2023.2.3(IDE数据库开发)

DataGrip是一款数据库集成开发环境&#xff08;IDE&#xff09;&#xff0c;用于数据库管理和开发。 DataGrip提供了许多强大的功能&#xff0c;如SQL语句编辑、数据库连接管理、数据导入和导出、数据库比较和同步等等。它支持多种数据库&#xff0c;如MySQL、PostgreSQL、Ora…

CSDN动态发了但是主页面看不见已发的动态

问题描述&#xff1a; 今天在写csdn动态的时候&#xff0c;发了五个动态&#xff0c;但是主页面的“最近”看不到我发的动态&#xff0c;我还以为是csdn动态每天的发送量有数量限制。去这个地方点我的发现 右上角全是“审核中”的字样 按理说是不可能审核这么久的&#xff08…

Arduino驱动温湿度气压光照传感器模块

目录 一、简介二、原理图三、使用方法四、实验现象 一、简介 点击图片购买 HTU21D特性&#xff1a;HTU21D基于法国Humirel公司高性能的湿度感应元件制成&#xff0c;传感器输出标准IIC格式。同时具有很高的温度精度和湿度精度。HTU21专为低功耗小体积应用设计&#xff0c;具有很…

【挑战业余一周拿证】CSDN官方课程目录

一、亚马逊云科技简介 二、在云中计算 三、全球基础设施和可靠性 四、联网 五、存储和数据库 六、安全性 七、监控和分析 八、定价和支持 九、迁移和创新 十、云之旅 关注订阅号 CSDN 官方中文视频&#xff08;免费&#xff09;&#xff1a;点击进入 一、亚马逊云科…

90. 打家劫舍II (房子围成一圈)

题目 题解 class Solution:def rob(self, nums: List[int]) -> int:def dp(nums: List[int]) -> int:N len(nums)# 定义状态&#xff1a;dp[i]表示从第i个房屋开始偷窃&#xff0c;能够偷到的最高金额dp [0 for i in range(N)]for i in range(N-1, -1, -1):if i N-1:…

【二叉树】oj题

在处理oj题之前我们需要先处理一下之前遗留的问题 在二叉树中寻找为x的节点 BTNode* BinaryTreeFind(BTNode* root, int x) {if (root NULL)return NULL;if (root->data x)return root;BTNode* ret1 BinaryTreeFind(root->left, x);BTNode* ret2 BinaryTreeFind(ro…

Alfred v5.1.4(mac快速启动)

Mac效率办公软件哪个好&#xff1f;Alfred是一款Mac电脑上的快速启动和工作流自动化工具&#xff0c;它可以帮助用户快速访问文件、应用程序、web搜索和系统工具&#xff0c;提高工作效率。以下是Alfred的特点&#xff1a; 快速启动&#xff1a;用户可以通过Alfred快速启动应用…
最新文章