NSQ 实现逻辑探秘

1 什么是 NSQ

NSQ 是一个消息队列中间件,用 go 实现,有如下特点:

  1. 分布式: 它提供了分布式的、去中心化且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和高可用特性。

  2. 易于扩展: 它支持水平扩展,没有中心化的消息代理( Broker ),内置的发现服务让集群中增加节点非常容易。

  3. 运维方便: 它非常容易配置和部署,灵活性高。

  4. 高度集成: 现在已经有官方的 Golang、Python 和 JavaScript 客户端,社区也有了其他各个语言的客户端库方便接入,自定义客户端也非常容易。

2 名词解释

名词

释义

Topic

一个 topic 就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建 topic

Channel

类似 kafka 中的消费组,是消费者之间的负载均衡。每当一个发布者发送一条消息到一个 topic,消息会被复制到所有消费者连接的 channel 上,然后将消息随机推送到其中一个消费者

nsqd

nsq 核心逻辑所在,负责接收消息、排队消息、并投递消息给消费者。可以同时运行多个 nsqd,不同的 nsqd 相互独立。如果存在 nsqlookup, 则会连接到 nsqlookup, 并向其同步 topic 和 channel 信息。

nsqlookup

负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定 topic 和 channel 的 nsqd 地址。有两个接口: TCP 接口, nsqd 用它来广播; HTTP 接口,客户端用它来发现和管理。

3 整体介绍

3.1 数据流动

topic 中的消息会被复制到多个 channel 中,并将消息推送到其中一个消费这个 channel 的消费者手中。

3.2 nsqlookup 和 nsqd

图中表明了 3 中类型的连接:

  1. 黑实线带箭头。consumer 会直连 nsqd,并从 nsqd 获取消息

  2. 蓝虚线。consumer 会询问 nsqlookup 某个 topic 在哪些 nsqd 上存在,nsqlookup 会返回 nsqd 的信息

  3. 灰虚线。nsqd 和 nsqlookup 会建立一个长连接,并在 topic 或者 channel 发生变更时,将 topic 和 channel 信息同步到 nsqlookup

4 详细介绍

4.1 生产者

4.1.1 生产的负载均衡

go sdk 中提供的 NewProducer 函数需要提供 nsqd 的地址,因此生产者会直连某个 nsqd,并向其投递消息。如果希望生产也能实现负载均衡,则需要我们自己做进一步的封装,大概逻辑是:

  1. 向 nsqlookup 获取所有 nsqd 的信息

  2. 建立多个 producer, 每个 producer 与不同的 nsqd 建立连接

  3. 每次生产消息时,随机挑选一个 producer 发布消息

初次之外,还需要一个守护任务,定时从 nsqlookup 刷新 nsqd 信息,添加新上线的 nsqd,移除异常的 nsqd

4.1.2 消息类型

nsq 支持的消息类型包括:

  1. 普通消息。该消息发布后,会尽快的推送到消费者那儿。

  2. 延迟消息。生产者可以指定消息延迟多少时间后再推送给消费者。需要注意的是,延迟时间并不保证精确,即并不是在延迟时间到达后,消息一定会被消费,包括如下原因:

    1. 在延迟时间到达后,消息只是可以推送给消费者,但是如果消费已经积压,则延迟消息依然需要排队

    2. 延迟时间的计算并不精确,这在 4.2.3 节会进行介绍

    3. 延迟消息一旦被存储到磁盘,则会丢失延迟时间信息,因而就会退化成普通消息。消息何时会存储到磁盘将在 4.2.1 节中介绍

  3. 临时消息。当 topic 名称以 "#ephemeral" 结尾时,就被认为是临时 topic,临时 topic 中的消息不会被持久化,且当不再有消费者消费时,临时 topic 会被删除。

4.2 Nsqd

4.2.1 消息接收

在 nsq 实现中,topic 和 channel 都维护了内存队列 msgChan 和磁盘队列 backendQueue,消息会被优先写入内存队列中。消息一旦写入磁盘队列,消息将会丢失延迟信息。生产者在发布消息后,消息首先会进入 topic 的消息队列,然后会被复制到这个 topic 关联的 channel。

临时 topic 没有磁盘队列,所以一旦 msgChan 满,则临时 topic 中的消息将会被丢弃。

nsq 支持延迟消息,因而 channel 还会额外维护一个延迟队列。在消息到期后,消息会从延迟队列中取出并发送到 channel 的消息队列中等待发送。延迟队列的实现将在 4.2.3 节中介绍。

整体流程大致如下

4.2.2 消息推送

如上所述,消息会被复制到所有的 channel,然后将其推送到消费端。但是消息的推送需要兼顾消费端的消费能力。nsq 通过 rdy(ready 的缩写)和 InFlight 队列实现推送速度的控制。

rdy 是消费端通过 TCP 请求设置的,表明当前消费端允许 nsqd 推送多少个消息过来。InFlight 队列是 channel 维护的,存储的是当前已推送但是没有收到响应的消息,响应包括 FIN 和 REQ 两种。FIIN 表示消息成功消费,REQ 表示消费失败,消息需要重新推送。如果 InFlight 中的消息数量已经大于了 rdy,则 nsqd 会停止推送消息。

nsqd 会定时处理 InFlight 队列中的消息,如果发现消息超时未回复,则会从 InFlight 队列中移除,重新推送。

若 nsqd 收到 REQ 回复,则会将消息放入到延迟队列中,延迟时间是消费端在 REQ 回复中设置的。

4.2.3 定时逻辑的处理

nsqd 有两个场景涉及到定时任务:

  1. 延迟消息的延迟推送

  2. InFlight 队列中消息的超时判断

nsqd 借鉴了 redis 的过期算法,主要逻辑如下:

  1. 每隔 QueueScanInterval (默认值 100ms) 时间唤醒一次

  2. 随机从所有 channel 中选择 QueueScanSelectionCount (默认20) 数量的 channel 开始处理。

  3. InFlight 队列和延迟消息队列使用最小堆实现,因此可以非常快速的找到最早过期的消息。nsqd 的定时逻辑每次被唤醒的时候,都会从 InFlight 队列和延迟消息队列中找出所有到期的消息,然后将其推送出去

  4. 如果有超过 25% 的 channel 存在过期的消息,则回到第 2 步,继续处理。

4.2.4 与 nsqlookup 的交互

  1. nsqd 会开启一个守护任务,在 topic 新增/删除,channel 新增/删除的时候,将事件告知 nsqlookup。

  2. 在 nsqd 刚与 nsqlookup 建立连接时,将会同步当前 nsqd 的 topic 和 channel 信息。

  3. 在 nsqd 同步 topic 和 channel 失败的时候,nsqd 会和 nsqlookup 断开连接,并在下次需要和 nsqlookup 通信时,尝试重新建立连接。

通过这种机制,保证 nsqlookup 始终能够保存最新的 topic 和 channel 信息。

4.3 消费端

4.3.1 连接 nsqd

go sdk 提供了多种方法去和 nsqd 建立连接:

  1. ConnectToNSQD: 提供单个 nsqd 的地址,并与之建立连接

  2. ConnectToNSQDs: 提供一组 nsqd 地址,分别和他们建立连接

  3. ConnectToNSQLookupd:提供一个 nsqlookup 地址,消费端从 nsqlookup 查询 nsqd 地址

  4. ConnectToNSQLookupds:提供一组 nsqlookup 地址,每次随机挑选一个 nsqlookup 查询 nsqd 地址

根据 nsqd 的实现,首选第 4 种方法

如果是通过 nsqlookup 发现 nsqd,消费端会定时查询 nsqlookup, 刷新本地的 nsqd 地址。

4.3.2 分配 rdy

在 4.2.2 节中了解到,消费速度是由消费端控制的。消费端会根据自己的情况,向 nsqd 发送 “RDY” 命令,从而控制 nsqd 最多发送多少个消息过来。

与此同时,消费端会维护 MaxInFlight 配置,表示消费端可以并发处理的消费总数。分配给每个 nsqd 的 rdy 之和必须小于 MaxInFlight. 

因此,给每个 nsqd 分配多少个 rdy,什么时候会分配 rdy,就成了消费速度控制的关键。

有如下几个场景消费端会发送 RDY 命令:

  1. 在和 nsqd 建立连接的时候。消费端会将 MaxInFlight 平均分配给每个 nsqd,但是至少会分配 1,即最坏情况下,每个 nsqd 只能串形推送消息

  2. 在消费失败的时候,消费端会默认进入退避模式,此时会将所有 nsqd 的 rdy 设置为0。在等待一段时间后,会开始消费恢复流程,此时会随机选择一个 nsqd,为其分配 rdy = 1。重试逻辑将在 4.3.3 节中详细介绍

  3. 消费端会启动一个守护任务,它会在 nsqd 数量大于 MaxInFlight 时,会将长时间未收到消息或者长时间未修改过 rdy 的 nsqd 的 rdy 置为0,并尽可能的将更多的 nsqd 的 rdy 置为1,从而保证在 nsqd 数量大于 MaxInFlight 时,每个 nsqd 的消息都有可能被消费到

4.3.3 失败重试

当我们消费失败时,会进行重试,重试是通过消费端向 nsqd 发送 “REQ” 命令实现,“REQ” 命令包含一个 delay 字段,用于告知 nsqd 应该延迟多久再推送。delay 的默认计算公式如下:

// DefaultRequeueDelay 是默认的重试时间,默认 90s
// Attempts 是这个消息重试的次数
// MaxRequeueDelay 是延迟时间的上限,默认 15m
delay = MIN(DefaultRequeueDelay * Attempts, MaxRequeueDelay)

当重试次数大于 MaxAttempts,消费端会直接向 nsqd 发送 “FIN”,从而结束重试。

默认情况下,即我们使用了自动 ACK 机制,重试会进入退避模式,具体逻辑如下:

  1. 维护了 backoffCounter 字段,表示进入到退避模式的次数。

  2. 计算退避的时间。当前提供两种策略,指数退避以及随机退避,时间的计算和 backoffCounter 正相关,但是最大不会大于 MaxBackoffDuration (默认 2m)

  3. 将当前所有 nsqd 的 rdy 置为 0

  4. 在退避时间到期后,会尝试进行恢复,即随机找到一个 nsqd,将它的rdy置为1,尝试重新开始消费

  5. 每次消费成功时,会将 backoffCounter - 1,如果 backoffCounter 为0,则重新平均分配 rdy,结束退避模式

QA

是否会丢消息

这个和生产者,消费者,nsqd 三方都有关系,我们这里只考虑 nsqd 会不会可能丢消息。

如果是临时 topic 则可能会丢失消息,但是其他消息,正常情况下不会丢失消息。但是若 nsqd 下线,则该 nsqd 上的消息将无法消费到。

是否能实现有序消费

nsq 的设计就没有考虑支持有序消费的场景,即使消息都发布到了同一个 nsqd,消费端也只开启了一个协程进行消费,但是由于以下两个原因,也很难保证一定是顺序消费:

  1. 消息重新推送是重新入队,此时排在了消息队列的末尾

  2. nsqd 在内存队列满的时候会将消息写入磁盘,而磁盘中的队列和内存队列的顺序是无法保证的

是否能更改消费进度,对消息进行重放

当前不支持这个能力

在发布/消费延迟消息的时候需要注意什么

  1. 延迟消息会进入延迟队列,延迟队列是内存中的最小堆,如果延迟消息很多又没有被及时消费,则可能导致内存/cpu占用高

  2. 延迟消息在一些场景下会被写入磁盘,一旦写入磁盘,则会丢失延迟信息,变成一个普通的消息

重试策略会影响消费速度吗

在如下环境下做了一个测试:

  1. 使用的是单机版部署的 nsq

  2. 消费者和生产者直连 nsqd

  3. 有足够的数据进行消费

控制变量包括:

  1. 消费失败率:failRate

结论:

即使消费失败率是万分之一,即成功率达到4个9,对消费速度的影响还是显而易见的。随着失败率的提高,消费速度会越来越慢

failRate = 0

curCount 表示每秒消费的数量

failRate = 1

failRate = 10

测试代码如下

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"

	"github.com/nsqio/go-nsq"
	gonsq "github.com/nsqio/go-nsq"
)

var maxInFlight = 100
var failRate = 10 // 单位:万分之一,即如果 failRate = 100,表示错误率是 1%

var ticker = time.NewTicker(time.Second)
var count = int32(0)
var mux sync.Mutex

func init() {
	go func() {
		for {
			select {
			case <-ticker.C:
				mux.Lock()
				fmt.Printf("curCount: %d\n", atomic.LoadInt32(&count))
				atomic.StoreInt32(&count, 0)
				mux.Unlock()
			}
		}
	}()
}

type NsqHandler struct {
	id int
}

func InitNSQConsumer(topic string) *nsq.Consumer {
	config := nsq.NewConfig()
	config.MaxInFlight = maxInFlight
	config.MaxAttempts = 3
	consumer, err := nsq.NewConsumer(topic, "main", config)

	if err != nil {
		fmt.Printf("new consumer fail, error: %+v\n", err)
		return nil
	}
	consumer.AddHandler(&NsqHandler{id: 1})

	if err := consumer.ConnectToNSQD("169.254.1.10:4150"); err != nil {
		fmt.Printf("look up fail, error: %+v\n", err)
		return nil
	}
	return consumer
}

func InitNSQProducer() *nsq.Producer {
	nsqdUrl := "169.254.1.10:4150"
	config := nsq.NewConfig()
	p, err := nsq.NewProducer(nsqdUrl, config)
	if err != nil {
		fmt.Printf("new producer fail, error: %+v\n", err)
		return nil
	}
	return p
}

func main() {
	topic := "detector.cloudwalker.detect_result.test"

	consumer := InitNSQConsumer(topic)
	if consumer == nil {
		fmt.Printf("consumer is nil\n")
		return
	}
	time.Sleep(3 * time.Second)
	producer := InitNSQProducer()
	if producer == nil {
		fmt.Printf("producer is nil\n")
		return
	}

	var wg sync.WaitGroup
	for i := 0; i < 4; i++ {
		wg.Add(1)
		go func(gi int) {
			defer wg.Done()
			for j := 0; j < 500000; j++ {
				if err := producer.Publish(topic, []byte(fmt.Sprintf("%s-%d-%d", "aaaabbbbbcccccddddd", gi, j))); err != nil {
					fmt.Printf("publish error: %+v\n", err)
					return
				}
			}
		}(i)
	}
	wg.Wait()
	time.Sleep(time.Minute)
}

func (h *NsqHandler) HandleMessage(message *gonsq.Message) error {
	if rand.Intn(10000) < failRate {
		return fmt.Errorf("always error")
	}
	mux.Lock()
	defer mux.Unlock()
	atomic.AddInt32(&count, 1)
	return nil
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/34009.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

星辰秘典:揭开Python项目的神秘密码——2048游戏

✨博主&#xff1a;命运之光 &#x1f338;专栏&#xff1a;Python星辰秘典 &#x1f433;专栏&#xff1a;web开发&#xff08;html css js&#xff09; ❤️专栏&#xff1a;Java经典程序设计 ☀️博主的其他文章&#xff1a;点击进入博主的主页 前言&#xff1a;你好&#x…

深度学习与神经网络

文章目录 引言1. 神经网络1.1 什么是神经网络1.2 神经元1.3 多层神经网络 2. 激活函数2.1 什么是激活函数2.2 激活函数的作用2.3 常用激活函数解析2.4 神经元稀疏 3. 设计神经网络3.1 设计思路3.2 对隐含层的感性认识 4. 深度学习4.1 什么是深度学习4.2 推理和训练4.3 训练的相…

python语法(高阶)-多线程编程

""" 演示多线程编程的使用 """ import time import threadingdef sing(msg):while True:print(msg)time.sleep(1)return Nonedef dance(msg):while True:print(msg)time.sleep(1)return Noneif __name__ __main__:# 创建一个唱歌的线程&#xf…

html实现好看的多种风格导航菜单(附源码)

文章目录 1.设计来源1.1 顶部导航菜单1.1.1 界面风格1-一二级连体导航菜单1.1.2 界面风格2-二级导航下拉框1.1.3 界面风格3-系统开始风格1.1.4 界面风格4-购物类导航菜单1.1.5 界面风格5 - 带搜索扩展的导航条1.1.6 界面风格6-火热效果多级导航条 1.2 悬浮按钮菜单1.2.1 界面风…

电力系统系统潮流分析【IEEE 57 节点】(Matlab代码实现)

&#x1f4a5; &#x1f4a5; &#x1f49e; &#x1f49e; 欢迎来到本博客 ❤️ ❤️ &#x1f4a5; &#x1f4a5; &#x1f3c6; 博主优势&#xff1a; &#x1f31e; &#x1f31e; &#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 …

Ceph分布式存储系统搭建

目录 安装部署示例 &#xff08;一&#xff09;准备环境 1). 设置主机名 2). 关闭防火墙 3).添加sdb磁盘并格式化 4).配置hosts解析文件 5).配置免密登录 6).同步时区 7). 安装 Ceph 包 &#xff08;二&#xff09;创建 Ceph 集群 1、 安装ceph-deploy管理工具 2、 …

Linux 用户名称高亮和最近路径显示

1、通常情况下&#xff0c;Linux中的路径名称会不断叠加显示&#xff0c;如下图&#xff0c;这样看起来会很长。 2、为了设置路径只是当前最近的文件路径&#xff0c;先进入自己的家目录&#xff0c;然后进入.bashrc&#xff1a; 3、在.bashrc文件中的最后一行加入以下内容…

C国演义 [第三章]

第三章 组合分析步骤递归函数的返回值和参数递归结束的条件单层逻辑 组合总和 III 组合 力扣链接 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;n 4, k 2 输出&#xff1…

Echarts区域面积areaStyle用图片进行纹理填充

React DOM结构代码&#xff1a; import fillImg from xx/fillImg.png; // 填充纹理图片...... {/* 趋势图填充纹理图片 */} <img id"fillImg" src{fillImg} style{{ width: 0 }} /> <div id"line" style{{ width: 100%, height: 300 }}></…

蓝绿发布、灰度发布和滚动发布

系列文章目录 文章目录 系列文章目录一、1.金丝雀发布&#xff08;Canary Release&#xff09;的工作原理&#xff1a;2.滚动发布&#xff08;Rolling Release&#xff09;3.蓝绿发布&#xff08;Blue-Green Deployment&#xff09;有钱人玩的&#xff01; 总结 一、 当涉及到…

深入理解深度学习——注意力机制(Attention Mechanism):带掩码的多头注意力(Masked Multi-head Attention)

分类目录&#xff1a;《深入理解深度学习》总目录 相关文章&#xff1a; 注意力机制&#xff08;AttentionMechanism&#xff09;&#xff1a;基础知识 注意力机制&#xff08;AttentionMechanism&#xff09;&#xff1a;注意力汇聚与Nadaraya-Watson核回归 注意力机制&#…

WPF中的Behavior及Behavior在MVVM模式下的应用

WPF中的Behavior及Behavior在MVVM模式下的应用 在WPF中&#xff0c;Behaviors&#xff08;行为&#xff09;是一种可重用的组件&#xff0c;可以附加到任何UI元素上&#xff0c;以添加特定的交互行为或功能。Behaviors可以通过附加属性或附加行为的方式来实现。 Behavior并不…

知识蒸馏学习记录(二)

上一篇博文中我们介绍了知识蒸馏的一些基础知识&#xff0c;这里我们来学习其到底是如何完成知识蒸馏过程的。 知识蒸馏为何可以让学生网络模型小却性能强&#xff1f; 详细很多同学与我有相同的疑问&#xff0c;尽管它依靠不同的蒸馏温度T可以学得一些hard target标注无法包…

三维空间刚体运动之旋转矩阵与变换矩阵

1. 旋转矩阵 1.1 点、向量和坐标系 点&#xff1a;点是空间中的基本元素&#xff0c;没有长度&#xff0c;没有体积&#xff1b; 向量&#xff1a;把两个点连接起来&#xff0c;就构成了向量&#xff0c;向量可以看成从某点指向另一点的一个箭头&#xff1b;只有当我们指定这…

hive基于新浪微博的日志数据分析——项目及源码

有需要本项目的全套资源资源以及部署服务可以私信博主&#xff01;&#xff01;&#xff01; 该系统的目的是利用大数据技术&#xff0c;分析新浪微博的日志数据&#xff0c;从而探索用户行为、内容传播和移动设备等各个层面的特性和动向。这项研究为公司和个人在制定营销战略、…

Redis数据库的简介、部署及常用命令

Redis数据库的简介、部署及常用命令 一、关系数据库与非关系型数据库概述1、关系型数据库2、非关系型数据库3、关系数据库与非关系型数据库区别4、非关系型数据库产生背景 二、Redis简介1、Redis服务器程序的单线程模型2、Redis的优点 三、Redis部署四、Redis 命令工具1、redis…

【Openvino03】深入了解OpenVINO™ 工具包与Jupyter Notebooks工程

接上一篇&#xff0c;本篇将以OpenVINO™ 工具包、Jupyter Notebook工具以及OpenVINO™ Notebooks工程为基础&#xff0c;依照构建环境、工具学习、案例学习、实战部署的顺序引导初学者完成从0到1学习人工智能的全过程&#xff0c;希望众多对人工智能感兴趣的开发者&#xff0c…

说说@EnableConfigurationProperties那点事

两者的对比 ConfigurationProperties 使用ConfigurationProperties的时候&#xff0c;把配置类的属性与yml配置文件绑定起来的时候&#xff0c;还需要加上Component注解才能绑定并注入IOC容器中&#xff0c;若不加上Component&#xff0c;则会无效。 EnableConfigurationPro…

RNN其中的X.reshape

假设RNN中的输入为2528&#xff0c;2是batchsize可以理解为有几句话&#xff0c;5是timestep可以理解为有几个词&#xff0c;28是vocab_size。如下就是两个句子&#xff0c;每个句子由5个单词组成。28则为每个单词的词向量&#xff0c;在此略去。 在输入的时候&#xff0c;首先…