详解一下RabbitMQ中的channel.Publish

函数定义(来自 github.com/streadway/amqp)

func (ch *Channel) Publish(exchange string,key string,mandatory bool,immediate bool,msg Publishing,
) error

这个方法的作用是:向指定的交换机 exchange 发送一条消息 msg,带上路由键 key。

参数名类型含义
exchangestring指定要将消息发布到哪个 交换机(exchange)。可以是 “” 表示默认交换机。
keystring路由键(routing key),根据交换机类型决定消息怎么路由。
mandatorybool是否强制投递。若为 true 且无法路由到队列,则会触发 Basic.Return(需要监听返回)。
immediatebool是否立即投递。很少使用,RabbitMQ 通常不支持,建议设为 false
msgPublishing消息体及其元数据(headers、content-type、body等)

📦 msg(Publishing)结构

type Publishing struct {ContentType     stringContentEncoding stringDeliveryMode    uint8 // 1=非持久化 2=持久化Priority        uint8CorrelationId   stringReplyTo         stringExpiration      stringMessageId       stringTimestamp       time.TimeType            stringUserId          stringAppId           stringBody            []byteHeaders         Table
}

常用字段:
Body: 消息内容([]byte)
ContentType: 比如 “application/json”,“text/plain”
DeliveryMode: 2 表示持久化消息,1 表示不持久化(内存中)
Headers: 自定义属性(可以设置 key-value)

✅ 使用示例:基本用法

body := "Hello RabbitMQ!"
err := channel.Publish("my-exchange", // exchange 这表示你要将消息发布到一个叫 "my-exchange" 的交换机。"my-key",      // routing key 会被用于匹配绑定在交换机上的队列。false,         // mandatory 如果消息无法路由到队列,不返回任何信息。false,         // immediate 不要求立即投递(几乎总是 false)。amqp.Publishing{ ContentType: "text/plain",Body:        []byte(body), //消息正文,以字节形式传递。DeliveryMode: amqp.Persistent, // 2 = 持久化},
)
if err != nil {log.Fatalf("Publish failed: %s", err)
}

通俗的讲一下mandatory和immediate两个参数及其应用场景

参数通俗解释
mandatory“找不到接收方要告诉我”(确保消息不被悄悄丢掉)
immediate“没有消费者就别投了”(对方不在线就别发)
err := ch.Publish("logs", "debug.key",true,  // mandatoryfalse,msg,
)

结果:
如果没有任何队列绑定了 “logs” 交换机并匹配 “debug.key”,这条消息就会被退回来,你可以通过监听 channel.NotifyReturn() 获取退回消息。

⚠️ 注意:RabbitMQ 早就不支持 immediate = true了!这个参数基本是“历史遗留”。几乎都设置为false
RabbitMQ 默认就不支持 immediate。 设置 immediate = true 会直接报错:“immediate=true” not supported

带 mandatory 回退处理机制的 RabbitMQ 生产者完整示例代码

✅ 功能概览
启动 RabbitMQ 连接与通道
使用 mandatory = true 发布消息
使用 NotifyReturn() 接收“退回的消息”
输出退回原因和消息内容

package mainimport ("log""time""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接 RabbitMQconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "连接失败")defer conn.Close()ch, err := conn.Channel()failOnError(err, "打开通道失败")defer ch.Close()// 声明交换机(topic 类型)err = ch.ExchangeDeclare("my-exchange", // name"topic",       // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)failOnError(err, "声明交换机失败")// 设置 return 回退监听(必须在 Publish 之前设置)returns := ch.NotifyReturn(make(chan amqp.Return))// 模拟发送消息,但没有任何队列绑定这个 key → 消息将被退回err = ch.Publish("my-exchange", // exchange"unmatched.key", // routing keytrue,          // mandatory:要求通知投递失败false,         // immediate:RabbitMQ 不支持amqp.Publishing{ContentType:  "text/plain",Body:         []byte("This message will be returned"),DeliveryMode: amqp.Persistent,},)if err != nil {log.Printf("Publish error: %s", err)}// 检查是否被回退(注意:这是异步的)select {case ret := <-returns:log.Println("❗ 消息被退回!")log.Printf("原因:%s", ret.ReplyText)log.Printf("交换机:%s", ret.Exchange)log.Printf("路由键:%s", ret.RoutingKey)log.Printf("内容:%s", string(ret.Body))case <-time.After(2 * time.Second):log.Println("✅ 消息已成功路由(没有被退回)")}
}

🧪 测试说明
你不绑定任何队列到 my-exchange + unmatched.key,运行这段代码会看到:

❗ 消息被退回!
原因:NO_ROUTE
交换机:my-exchange
路由键:unmatched.key
内容:This message will be returned

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/378.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

docker使用sh脚本创建容器,保持容器正常运行,异常关闭后马上重启

docker run -d --name dadeName \--memory5120m \-p 40060:80 \-p 40061:3306 \-v "$data:$dockerData" \-v "$img:$dockerImg" \--restartalways \ # 关键参数&#xff1a;总是重启dade:120 \/bin/bash -c "/www/start.sh && tail -f /dev/…

3516cv610在sample_aiisp上多创一路编码流,方法

3516cv610在sample_aiisp上多创一路编码流&#xff0c;方法 首先确保 vpss grp0有视频流 最好保证 已经有一路视频流能推出来 多创一路编码流思路为 将 vpss grp0又绑定给 vpss_chn1 vpss_chn1有绑定给 venc_chn1 这样我们就多创了一路视频流。 这里思路完全正确 可以实现…

Leetcode 3566. Partition Array into Two Equal Product Subsets

Leetcode 3566. Partition Array into Two Equal Product Subsets 1. 解题思路2. 代码实现 题目链接&#xff1a;3566. Partition Array into Two Equal Product Subsets 1. 解题思路 这一题我的实现还是比较暴力的&#xff0c;首先显而易见的&#xff0c;若要满足题目要求&…

waitpid的waitstatus 含义源码解读

当我们在调用pid_t waitpid(pid_t pid, int *stat_loc, int options)时&#xff0c;其中第二个参数stat_loc会提供子进程退出的详细信息&#xff0c;为此posix还提供了一组宏来解析这个status. 在\glibc\bits\waitstatus.h /* If WIFEXITED(STATUS), the low-order 8 bits of …

MATLAB实战:传染病模型仿真实现

以下是一个使用MATLAB实现传染病模型&#xff08;SIR和SEIR&#xff09;仿真的完整解决方案&#xff0c;包含参数分析和干预措施模拟&#xff1a; %% 传染病模型仿真工具箱 % 包含SIR、SEIR模型&#xff0c;支持参数调整和干预措施模拟 % 使用ode45求解微分方程 function epi…

系统架构设计师(一):计算机系统基础知识

系统架构设计师&#xff08;一&#xff09;&#xff1a;计算机系统基础知识 引言计算机系统概述计算机硬件处理器处理器指令集常见处理器 存储器总线总线性能指标总线分类按照总线在计算机中所处的位置划分按照连接方式分类按照功能分类 接口接口分类 计算机软件文件系统文件类…

汽车安全:功能安全FuSa、预期功能安全SOTIF与网络安全Cybersecurity 解析

汽车安全的三重防线&#xff1a;深入解析FuSa、SOTIF与网络安全技术 现代汽车已成为装有数千个传感器的移动计算机&#xff0c;安全挑战比传统车辆复杂百倍。 随着汽车智能化、网联化飞速发展&#xff0c;汽车电子电气架构已从简单的分布式控制系统演变为复杂的移动计算平台。现…

GitHub 趋势日报 (2025年05月31日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 1153 prompt-eng-interactive-tutorial 509 BillionMail 435 ai-agents-for-begin…

飞腾D2000与FPGA结合的主板

UD VPX-404是基于高速模拟/数字采集回放、FPGA信号实时处理、CPU主控、高速SSD实时存储架构开发的一款高度集成的信号处理组合模块&#xff0c;采用6U VPX架构&#xff0c;模块装上外壳即为独立整机&#xff0c;方便用户二次开发。 UD VPX-404模块的国产率可达到100%&#xff0…

前端面经 协商缓存和强缓存

HHTTPTTP缓存 协商缓存和强缓存 核心区别是否向服务器发起请求验证资源过期 强缓存 浏览器直接读取本地缓存,不发请求 HTTP响应头 Cache-Control:max-age3600资源有效期 Expires优先级低 如果有效浏览器返回200(浏览器换伪造的200) 应用静态资源 协商缓存 OK如果 1强缓…

【NLP 78、手搓Transformer模型结构】

你以为走不出的淤泥&#xff0c;也迟早会云淡风轻 —— 25.5.31 引言 ——《Attention is all you need》 《Attention is all you need》这篇论文可以说是自然语言处理领域的一座里程碑&#xff0c;它提出的 Transformer 结构带来了一场技术革命。 研究背景与目标 在 Transfo…

Baklib内容中台革新企业知识实践

Baklib智能知识中枢构建 作为现代企业知识管理的核心架构&#xff0c;Baklib内容中台通过整合多源异构数据形成智能化知识中枢&#xff0c;实现从信息采集到价值转化的全链路管理。其底层采用跨平台数据贯通技术&#xff0c;支持API接口与企业现有CRM、ERP系统无缝对接&#x…