[每周一更]-(第38期):Go常见的操作消息队列

在这里插入图片描述

在Go语言中,常见的消息队列有以下几种:

  • RabbitMQ:RabbitMQ是一个开源的AMQP(高级消息队列协议)消息代理软件,用于支持多种编程语言,包括Go语言。RabbitMQ提供了可靠的消息传递机制和灵活的路由规则,可以用于处理大量的消息和任务。
  • Apache Kafka:Apache Kafka是一个开源的分布式流处理平台,也可以用作消息队列,用于处理高容量的消息流和实时数据。Kafka提供了高吞吐量、低延迟的消息传递机制,并且具有良好的可伸缩性和可靠性。
  • NSQ:NSQ是一个开源的实时分布式消息平台,用于处理大规模的消息和数据流。NSQ提供了高可靠性和低延迟的消息传递机制,并且具有良好的可扩展性和可伸缩性。
  • NATS:NATS是一个轻量级、高性能的消息系统,用于支持分布式应用程序和微服务。NATS提供了简单易用的API和协议,具有高可靠性、低延迟和高吞吐量的消息传递机制。
  • Redis:Redis是一个开源的内存数据库,也可以用作消息队列。Redis提供了支持发布订阅模式、阻塞队列等特性,可以用于处理实时数据和大量的消息。
  • ActiveMQ:ActiveMQ是一个开源的消息代理软件,用于支持多种消息传递协议和编程语言。ActiveMQ提供了高可靠性、可伸缩性和可扩展性的消息传递机制,可以用于处理大规模的消息和任务。

除了以上常见的消息队列,还有一些其他的开源消息系统和组件,例如RocketMQ、ZeroMQ等,也可以用于处理消息和任务。

在选择消息队列时,需要根据具体的业务需求和性能要求进行选择,并且需要考虑安全性、可靠性和扩展性等因素,确保消息传递的可靠性和性能。

消息队列的使用场景有哪些?

不同的消息队列适用于不同的场景,以下是常见的使用场景:

  • RabbitMQ:RabbitMQ适用于需要可靠的消息传递和灵活的路由规则的场景,例如电商网站的订单处理、银行的支付处理等。
  • Apache Kafka:Kafka适用于处理大规模的消息和数据流,例如社交媒体的实时消息、大型网站的日志处理等。
  • NSQ:NSQ适用于需要高可靠性和低延迟的场景,例如在线游戏的实时消息、金融交易的实时处理等。
  • NATS:NATS适用于需要高可靠性、低延迟和高吞吐量的场景,例如移动互联网应用的实时通信、物联网设备的数据传输等。
  • Redis:Redis适用于需要快速处理大量消息的场景,例如在线聊天、实时数据分析等。
  • ActiveMQ:ActiveMQ适用于需要支持多种消息传递协议和编程语言的场景,例如企业应用集成、分布式系统的消息传递等。

当然,这些场景只是一些常见的示例,具体的使用场景还需要根据业务需求和性能要求来选择。需要根据消息队列的特性、优缺点和性能指标进行评估和比较,选择最适合自己业务需求的消息队列。

使用示例

操作RabbitMQ

我们有一个需求,需要向多个客户端发送消息,可以使用RabbitMQ作为消息队列,Go作为开发语言。

1、安装RabbitMQ并启动服务。

2、安装amqp库:


go get github.com/streadway/amqp

    

3、生产者向消息队列中发送消息:


package main

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

func main() {
	// 连接RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %s", err)
	}
	defer conn.Close()

	// 创建一个channel
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %s", err)
	}
	defer ch.Close()

	// 声明一个名为"hello"的queue
	q, err := ch.QueueDeclare(
		"hello", // 队列名
		false,   // 是否持久化
		false,   // 是否自动删除
		false,   // 是否具有排他性
		false,   // 是否阻塞
		nil,     // 额外的参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %s", err)
	}

	// 发送消息到队列
	body := "Hello, World!"
	err = ch.Publish(
		"",     // exchange名
		q.Name, // queue名
		false,  // 是否强制发送
		false,  // 是否立即发送
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		},
	)
	if err != nil {
		log.Fatalf("Failed to publish a message: %s", err)
	}

	fmt.Println("Message sent successfully")
}

4、消费者从消息队列中获取消息并处理:


package main

import (
	"log"

	"github.com/streadway/amqp"
)

func main() {
	// 连接RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %s", err)
	}
	defer conn.Close()

	// 创建一个channel
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %s", err)
	}
	defer ch.Close()

	// 声明一个名为"hello"的queue
	q, err := ch.QueueDeclare(
		"hello", // 队列名
		false,   // 是否持久化
		false,   // 是否自动删除
		false,   // 是否具有排他性
		false,   // 是否阻塞
		nil,     // 额外的参数
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %s", err)
	}

	// 消费队列中的消息
	msgs, err := ch.Consume(
		q.Name, // queue名
		"",     // 消费者名
		true,   // 是否自动应答

操作Kafka

我们使用 github.com/segmentio/kafka-go 库作为 Kafka Go 客户端。在生产者示例中,我们通过 kafka.DialLeader 方法连接到 Kafka 集群,
然后使用 conn.WriteMessages 方法向 Kafka 集群发送消息。在消费者示例中,我们通过 kafka.NewReader 方法创建一个 Kafka 消费者,
然后通过 r.ReadMessage 方法从 Kafka 集群读取消息。

在示例中,我们还使用了一个 sigchan 信号通道来监听操作系统的信号并退出程序。

安装 Kafka Go 客户端:

go get -u github.com/segmentio/kafka-go

生产者示例:


package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

func main() {
    topic := "my-topic"
    partition := 0
    conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
    if err != nil {
        log.Fatal("failed to dial leader:", err)
    }
    defer conn.Close()

    // 发送消息
    msg := kafka.Message{
        Value: []byte("Hello, Kafka!"),
    }
    _, err = conn.WriteMessages(msg)
    if err != nil {
        log.Fatal("failed to write message:", err)
    }

    fmt.Println("message sent")
}


消费者示例:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    "github.com/segmentio/kafka-go"
)

func main() {
    topic := "my-topic"
    partition := 0
    offset := kafka.LastOffset

    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   topic,
        Partition: partition,
        MinBytes: 10e3, // 10KB
        MaxBytes: 10e6, // 10MB
        MaxWait:  10 * time.Second,
    })

    // 接收消息
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    for {
        select {
        case <-sigchan:
            log.Println("received signal, exiting...")
            return
        default:
            msg, err := r.ReadMessage(context.Background())
            if err != nil {
                log.Fatal("failed to read message:", err)
            }

            fmt.Println(string(msg.Value))
        }
    }
}


操作NSQ

使用 github.com/nsqio/go-nsq 库作为 NSQ Go 客户端。在生产者示例中,我们通过 nsq.NewProducer 方法创建一个 NSQ 生产者,

然后使用 producer.Publish 方法向 NSQ 集群发送消息。在消费者示例中,我们通过 nsq.NewConsumer 方法创建一个 NSQ 消费者,

然后使用 consumer.AddHandler 方法设置消息处理函数。最后,我们通过 consumer.ConnectToNSQLookupd 方法连接到 NSQ 集群,

并使用 select {} 语句保持消费者程序不退出。

需要注意的是,上述示例中我们使用了一个空的 select {} 语句来保持消费者程序不退出。在实际生产环境中,我们需要在程序中添加正确的退出逻辑。

例如,使用一个 sigchan 信号通道来监听操作系统的信号并退出程序。

安装 NSQ Go 客户端:


安装 NSQ Go 客户端:

生产者示例:

package main

import (
    "fmt"

    "github.com/nsqio/go-nsq"
)

func main() {
    producer, err := nsq.NewProducer("localhost:4150", nsq.NewConfig())
    if err != nil {
        panic(err)
    }

    // 发送消息
    err = producer.Publish("my-topic", []byte("Hello, NSQ!"))
    if err != nil {
        panic(err)
    }

    producer.Stop()

    fmt.Println("message sent")
}


消费者示例:

package main

import (
    "fmt"
    "log"

    "github.com/nsqio/go-nsq"
)

type MyHandler struct{}

func (*MyHandler) HandleMessage(msg *nsq.Message) error {
    fmt.Println(string(msg.Body))
    return nil
}

func main() {
    consumer, err := nsq.NewConsumer("my-topic", "my-channel", nsq.NewConfig())
    if err != nil {
        panic(err)
    }

    consumer.AddHandler(&MyHandler{})

    err = consumer.ConnectToNSQLookupd("localhost:4161")
    if err != nil {
        panic(err)
    }

    fmt.Println("consumer started")

    select {}
}


操作NATS

使用 github.com/nats-io/nats.go 库作为 NATS Go 客户端。在生产者示例中,我们通过 nats.Connect 方法创建一个 NATS 连接,

然后使用 nc.Publish 方法向 NATS 集群发送消息。在消费者示例中,我们通过 nc.Subscribe 方法订阅 my-topic 主题,并使用一个回调函数处理接收到的消息。

最后,我们使用一个空的 select {} 语句保持消费者程序不退出。

需要注意的是,上述示例中我们使用了一个空的 select {} 语句来保持消费者程序不退出。在实际生产环境中,

我们需要在程序中添加正确的退出逻辑。例如,使用一个 sigchan 信号通道来监听操作系统的信号并退出程序。

安装 NATS Go 客户端:

go get github.com/nats-io/nats.go

生产者示例:

package main

import (
    "fmt"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    nc, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        panic(err)
    }
    defer nc.Close()

    // 发送消息
    err = nc.Publish("my-topic", []byte("Hello, NATS!"))
    if err != nil {
        panic(err)
    }

    fmt.Println("message sent")
}

消费者示例:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    nc, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        panic(err)
    }
    defer nc.Close()

    // 订阅消息
    _, err = nc.Subscribe("my-topic", func(msg *nats.Msg) {
        fmt.Println(string(msg.Data))
    })
    if err != nil {
        panic(err)
    }

    fmt.Println("consumer started")

    select {}
}

操作Redis

安装 Redis Go 客户端:

go get github.com/go-redis/redis

连接 Redis:

package main

import (
    "fmt"

    "github.com/go-redis/redis"
)

func main() {
    // 创建 Redis 客户端
    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "", // 没有设置密码
        DB:       0,  // 使用默认数据库
    })

    // 检查 Redis 是否正常连接
    pong, err := client.Ping().Result()
    fmt.Println(pong, err)
}

Redis String 类型操作:

package main

import (
    "fmt"

    "github.com/go-redis/redis"
)

func main() {
    // 创建 Redis 客户端
    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "", // 没有设置密码
        DB:       0,  // 使用默认数据库
    })

    // 设置字符串
    err := client.Set("key", "value", 0).Err()
    if err != nil {
        panic(err)
    }

    // 获取字符串
    val, err := client.Get("key").Result()
    if err != nil {
        panic(err)
    }
    fmt.Println("key", val)

    // 删除字符串
    err = client.Del("key").Err()
    if err != nil {
        panic(err)
    }
}

Redis List 类型操作:

package main

import (
    "fmt"

    "github.com/go-redis/redis"
)

func main() {
    // 创建 Redis 客户端
    client := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "", // 没有设置密码
        DB:       0,  // 使用默认数据库
    })

    // 将元素添加到列表
    err := client.RPush("list", "a", "b", "c").Err()
    if err != nil {
        panic(err)
    }

    // 获取列表长度
    length, err := client.LLen("list").Result()
    if err != nil {
        panic(err)
    }
    fmt.Println("list length:", length)

    // 获取列表中的元素
    val, err := client.LRange("list", 0, -1).Result()
    if err != nil {
        panic(err)
    }
    fmt.Println("list elements:", val)

    // 弹出列表左侧元素
    elem, err := client.LPop("list").Result()
    if err != nil {
        panic(err)
    }
    fmt.Println("popped element:", elem)
}

操作ActiveMQ

在Go中操作ActiveMQ,可以使用go-stomp库。以下是一个简单的示例代码,用于连接到ActiveMQ,并向队列发送消息:

package main

import (
	"fmt"
	"github.com/go-stomp/stomp"
)

func main() {
	conn, err := stomp.Dial("tcp", "localhost:61613")
	if err != nil {
		fmt.Println(err)
		return
	}
	defer conn.Disconnect()

	msg := "hello, activemq"
	err = conn.Send("/queue/test", "text/plain", []byte(msg), nil)
	if err != nil {
		fmt.Println(err)
		return
	}

	fmt.Printf("Message sent: %s\n", msg)
}

在该示例中,我们通过stomp.Dial()方法连接到ActiveMQ的默认端口61613。然后,我们使用conn.Send()方法向队列“/queue/test”发送消息。

要从队列中接收消息,请使用conn.Subscribe()方法。以下是一个示例代码:

package main

import (
	"fmt"
	"github.com/go-stomp/stomp"
)

func main() {
	conn, err := stomp.Dial("tcp", "localhost:61613")
	if err != nil {
		fmt.Println(err)
		return
	}
	defer conn.Disconnect()

	sub, err := conn.Subscribe("/queue/test", stomp.AckAuto)
	if err != nil {
		fmt.Println(err)
		return
	}
	defer sub.Unsubscribe()

	for {
		msg := <-sub.C
		fmt.Printf("Received message: %s\n", string(msg.Body))
	}
}

在该示例中,我们使用conn.Subscribe()方法订阅队列“/queue/test”。然后,我们通过使用sub.C通道来接收来自队列的消息。收到消息后,我们使用string(msg.Body)将其转换为字符串,并打印到控制台上。

需要注意的是,这只是一个简单的示例代码。在实际应用中,您需要考虑诸如异常处理、连接丢失、重连等问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/264832.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开源 AI 新秀崛起:Bittensor 更像是真正的“OpenAI”

强大的人工智能正在飞速发展&#xff0c;而完全由 OpenAI、Midjourney、Google&#xff08;Bard&#xff09;这样的少数公司控制 AI 不免让人感到担忧。在这样的背景下&#xff0c;试图用创新性解决方案处理人工智能中心化问题、权力集中于少数公司的 Bittensor&#xff0c;可谓…

让生活更智能,P1600边缘智能网关带你进入智能家居新时代

一、什么是P1600边缘智能网关&#xff1f; 在科技日新月异的今天&#xff0c;我们的生活已经被各种智能产品所包围。而在这个智能化的浪潮中&#xff0c;P1600边缘智能网关以其独特的优势&#xff0c;成为了智能家居的重要组成部分。那么&#xff0c;什么是P1600边缘智能网关呢…

力扣每日一题day38[106. 从中序与后序遍历序列构造二叉树]

给定两个整数数组 inorder 和 postorder &#xff0c;其中 inorder 是二叉树的中序遍历&#xff0c; postorder 是同一棵树的后序遍历&#xff0c;请你构造并返回这颗 二叉树 。 示例 1: 输入&#xff1a;inorder [9,3,15,20,7], postorder [9,15,7,20,3] 输出&#xff1a;[…

【数据结构之顺序表】

数据结构学习笔记---002 数据结构之顺序表1、介绍线性表1.1、什么是线性表? 2、什么是顺序表?2.1、概念及结构2.2、顺序表的分类 3、顺序表接口的实现3.1、顺序表动态存储结构的Seqlist.h3.1.1、定义顺序表的动态存储结构3.1.2、声明顺序表各个接口的函数 3.2、顺序表动态存储…

Redis取最近10条记录

有时候我们有这样的需求&#xff0c;就是取最近10条数据展示&#xff0c;这些数据不需要存数据库&#xff0c;只用于暂时最近的10条&#xff0c;就没必要在用到Mysql类似的数据库&#xff0c;只需要用redis即可&#xff0c;这样既方便也快&#xff01; 具体取最近10条的方法&a…

【Amazon 实验①】使用Amazon WAF做基础 Web Service 防护

文章目录 一、实验介绍二、实验环境准备三、验证实验环境四、Web ACLs 配置 & AWS 托管规则4.1 Web ACLs 介绍4.2 Managed Rules 托管规则4.3 防护常见威胁类型&#xff08;sql注入&#xff0c;XSS&#xff09;4.4 实验步骤4.4.1 创建Web ACL4.4.2 测试用例4.4.3 测试结果4…

格雷码独热码生成

一、基本原理 参考&#xff1a;Author Loudrs https://blog.csdn.net/Loudrs/article/details/130542638 自然二进制码转格雷码 //自然二进制数转格雷码 module bin2gray #(parameter width 4 //定义数据的位宽参数为4)(input [width - 1 : 0] bin,output [width - 1 : …

Leetcode 435 无重叠区间

题意理解&#xff1a; 给定一个区间的集合 intervals 要求需要移除区间&#xff0c;使剩余区间互不重叠 目标&#xff1a;最少需要移除几个区间。 解题思路&#xff1a; 采用贪心思路解题&#xff0c;什么是全局最优解&#xff0c;什么是局部最优解。 全局最优解&#xff0c;删…

苏州耕耘无忧物联网:降本增效,设备维护管理数字化转型的引领者

随着科技的快速发展和工业4.0的推动&#xff0c;设备维护管理已经从传统的被动式、经验式维护&#xff0c;转向了更加积极主动、数据驱动的维护模式。在这个过程中&#xff0c;苏州耕耘无忧物联科技有限公司以其深厚的技术积累和丰富的管理经验&#xff0c;引领着设备维护管理数…

7. 结构型模式 - 代理模式

亦称&#xff1a; Proxy 意图 代理模式是一种结构型设计模式&#xff0c; 让你能够提供对象的替代品或其占位符。 代理控制着对于原对象的访问&#xff0c; 并允许在将请求提交给对象前后进行一些处理。 问题 为什么要控制对于某个对象的访问呢&#xff1f; 举个例子&#xff…

Redis单机、主从、哨兵、集群配置

单机配置启动 Redis安装 下载地址&#xff1a;Download | Redis 安装步骤&#xff1a; 1: 安装gcc编译器&#xff1a;yum install gcc 2: 将下载好的redis‐5.0.3.tar.gz文件放置在/usr/local文件夹下&#xff0c;并解压redis‐5.0.3.tar.gz文件 wget http://download.re…

【Linux】权限篇(二)

权限目录 1. 前言2. 权限2.1 修改权限2.2 有无权限的对比2.3 另外一个修改权限的方法2.3.1 更改用户角色2.3.2 修改文件权限属性 3. 第一个属性列4. 目录权限5. 默认权限 1. 前言 在之前的一篇博客中分享了关于权限的一些知识&#xff0c;这次紧接上次的进行&#xff0c;有需要…

flask之文件管理网页(上传,下载,搜索,登录,注册) -- 翔山 第一版

前面说要做一个可以注册&#xff0c;登录&#xff0c;搜索&#xff0c;上传下载的网页&#xff0c;初版来了 第一版主代码 from flask import request, Flask, render_template, redirect, url_for, send_from_directory import bcrypt import ossavePath os.path.join(os.ge…

Qt中字符串转换为JS的函数执行

简介 在 QML 中&#xff0c;将 JavaScript 字符串转换为函数通常涉及使用 Function 构造函数或 eval() 函数。但是&#xff0c;QML 的环境对 JavaScript 的支持有一定的限制&#xff0c;因此不是所有的 JavaScript 功能都可以在 QML 中直接使用。 以下介绍都是在Qt5.12.1…

企业出海-如何保护客户账户安全?

近年来国内企业竞争日益激烈&#xff0c;许多企业在这般环境下难以持续发展。那么该如何获得业务的可持续性增长&#xff0c;如何获取更多的客户的同时开阔公司的视野&#xff1f;出海便是如今帮助国内企业能快速发展壮大的潮流之一&#xff0c;摆脱了局限于国内发展的束缚奔向…

智能优化算法应用:基于晶体结构算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于晶体结构算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于晶体结构算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.晶体结构算法4.实验参数设定5.算法结果6.…

CSS-SVG-环形进度条

线上代码地址 <div class"circular-progress-bar"><svg><circle class"circle-bg" /><circle class"circle-progress" style"stroke-dasharray: calc(2 * 3.1415 * var(--r) * (var(--percent) / 100)), 1000" …

Linux--shell练习题

1、写一个 bash脚本以输出数字 0 到 100 中 7 的倍数(0 7 14 21...)的命令。 vim /shell/homework1.sh #!/bin/bash for num in {1..100} doif [[ num%7 -eq o ]];thenecho $numfi done执行输出脚本查看输出结果 输出结果&#xff1a; 2、写一个 bash脚本以统计一个文本文件…

MATLAB - 使用 YOLO 和基于 PCA 的目标检测,对 UR5e 的半结构化智能垃圾箱拣选进行 Gazebo 仿真

系列文章目录 前言 本示例展示了在 Gazebo 中使用 Universal Robots UR5e cobot 模拟智能垃圾桶拣选的详细工作流程。本示例提供的 MATLAB 项目包括初始化、数据生成、感知、运动规划和积分器模块&#xff08;项目文件夹&#xff09;&#xff0c;可创建完整的垃圾桶拣选工作流…

智能优化算法应用:基于变色龙算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于变色龙算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于变色龙算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.变色龙算法4.实验参数设定5.算法结果6.参考文…
最新文章