基于 Redis 发布订阅实现服务注册与发现

写在前面

其实很少有公司会使用 Redis 来实现服务注册与发现,通常是ETCD、NACOS、ZOOKEEPER等等,但是也不妨碍我们了解。本文会先介绍 Redis 的发布/订阅模式,接着基于这个模式实现服务注册与发现。

Redis发布订阅流程图:
在这里插入图片描述

Redis 发布订阅

1. 简介

Redis的发布订阅功能主要由PUBLISHSUBSCRIBEPSUBSCRIBE 等命令组成的。

通过执行 SUBSCRIBE 命令,客户端可以订阅一个或多个频道,从而成为这个频道的订阅者。
在这里插入图片描述

每当有其他客户端向这个被订阅的频道发送消息的时候,频道的所有订阅者都会收到这条消息。

在这里插入图片描述
当然,客户端还可以通过PSUBSCRIBE订阅一个或多个模式,从而成为这些模式的订阅者,也就是模糊匹配

2. 订阅

每当一个客户端执行SUBSCRIBE命令订阅某个或某些频道的时候,这个客户端与被订阅者之间就会建立起一种订阅关系。而Redis会将这种订阅关系保存到pubsub_channels 这个字典中,这个字典的键是某个被订阅的频道,而值是一个链表,这个链表记录了所有订阅这个频道的客户端

在这里插入图片描述
每当有客户端执行了SUBSCRIBE命令订阅某个或某些频道的时候,服务器都会将客户端与被订阅的频道在 pubsub_channels字典中进行关联。

3. 退订

如果进行退订UNSUBSCRIBE,那么服务器会从pubsub_channels中接触客户端与被退订频道之间的关联。当这个key中,已经没有订阅者,那么会将这个key进行删除。例如下面的client7
在这里插入图片描述

4. 发布消息

当一个Redis客户端执行 PUBLISH channel message 命令将消息 message 发送给channel的时候,将消息发送给channel频道的所有订阅者(本文不讨论pattern模式)

服务注册与发现

我们了解完redis的发布订阅流程之后,我们来基于这个发布订阅来实现一个服务注册与发现的功能。

Redis服务发现与注册流程图:
在这里插入图片描述

1. 对象定义

redis服务发现与注册的结构体

type RedisRegistryService struct {
	config *RedisConfig // the config about redis
	cli *redis.Client // client for redis
	rwLock *sync.RWMutex // rwLock lock groupList when update service instance
	// vgroupMapping to store the cluster group
	// eg: map[cluster_name_key]cluster_name
	vgroupMapping map[string]string
	// groupList store all addresses under this cluster
	// eg: map[cluster_name][]{service_instance1,service_instance2...}
	groupList map[string][]*ServiceInstance
	ctx context.Context
}

订阅的消息内容,为key 以及 value ,而key就是服务的name,value就是服务的具体地址

type NotifyMessage struct {
	// key = registry.redis.${cluster}_ip:port
	Key   string `json:"key"`
	Value string `json:"value"`
}

2. 对象加载

新建一个redis服务注册与发现对象,并且在创建的这个对象的时候,我们会做两件事情

  1. 将redis中所已存在的key都load一次,存到本地缓存中。
  2. 开启一些协程进行发布订阅,不断监听上游的注册消息
func newRedisRegisterService(config *ServiceConfig, redisConfig *RedisConfig) RegistryService {
	if redisConfig == nil {
		log.Fatalf("redis config is nil")
		panic("redis config is nil")
	}

	cfg := &redis.Options{
		Addr:     redisConfig.ServerAddr,
		Username: redisConfig.Username,
		Password: redisConfig.Password,
		DB:       redisConfig.DB,
	}
	cli := redis.NewClient(cfg)

	vgroupMapping := config.VgroupMapping
	groupList := make(map[string][]*ServiceInstance)

	redisRegistryService := &RedisRegistryService{
		config:        redisConfig,
		cli:           cli,
		ctx:           context.Background(),
		rwLock:        &sync.RWMutex{},
		vgroupMapping: vgroupMapping,
		groupList:     groupList,
	}

	// loading all server at init time
	redisRegistryService.load()
	// subscribe at real time
	go redisRegistryService.subscribe()
	return redisRegistryService
}

3. 服务加载

load 函数:将所有 key 都 scan 出来,再遍历所有的key,拿到对应的value,进行一次初始化操作,加载到本地缓存中

func (s *RedisRegistryService) load() {
	// find all the server list redis register by redisFileKeyPrefix
	keys, _, err := s.cli.Scan(s.ctx, 0, fmt.Sprintf("%s*", redisFileKeyPrefix), 0).Result()
	if err != nil {
		log.Errorf("RedisRegistryService-Scan-Key-Error:%s", err)
		return
	}
	for _, key := range keys {
		clusterName := s.getClusterNameByKey(key)
		val, err := s.cli.Get(s.ctx, key).Result()
		if err != nil {
			log.Errorf("RedisRegistryService-Get-Key:%s, Err:%s", key, err)
			continue
		}
		ins, err := s.getServerInstance(val)
		if err != nil {
			log.Errorf("RedisRegistryService-getServerInstance-val:%s, Err:%s", val, err)
			continue
		}
		// put server instance list in group list
		s.rwLock.Lock()
		if s.groupList[clusterName] == nil {
			s.groupList[clusterName] = make([]*ServiceInstance, 0)
		}
		s.groupList[clusterName] = append(s.groupList[clusterName], ins)
		s.rwLock.Unlock()
	}
}

4.服务发现

通过 key 从 vgroupMapping 找到对应的 value

func (s *RedisRegistryService) Lookup(key string) (r []*ServiceInstance, err error) {
	s.rwLock.RLock()
	defer s.rwLock.RUnlock()
	cluster := s.vgroupMapping[key]
	if cluster == "" {
		err = fmt.Errorf("cluster doesnt exit")
		return
	}
	r = s.groupList[cluster]
	return
}

5. 服务注册

  1. key 和 value set到 redis 中
  2. key 和 value 通过 Channel 发布出去
  3. 另外开启一个协程将进行保活
func (s *RedisRegistryService) register(key, value string) (err error) {
	_, err = s.cli.HSet(s.ctx, key, value).Result()
	if err != nil {
		return
	}

	msg := &NotifyMessage{
		Key:   key,
		Value: value,
	}

	s.cli.Publish(s.ctx, redisRegisterChannel, msg)

	go func() {
		s.keepAlive(s.ctx, key)
	}()

	return
}

6. 服务订阅

订阅 Subscribe Channel 监听上游服务,并对服务的 key 和 value 进行更新操作。 注意这里对map进行读写的时候要加上读写锁,防止线程不安全。

func (s *RedisRegistryService) subscribe() {
	go func() {
		msgs := s.cli.Subscribe(s.ctx, redisRegisterChannel).Channel()
		for msg := range msgs {
			var data *NotifyMessage
			err := json.Unmarshal([]byte(msg.Payload), &data)
			if err != nil {
				log.Errorf("RedisRegistryService-subscribe-Subscribe-Err:%+v", err)
				continue
			}
			// get cluster name by key
			clusterName := s.getClusterNameByKey(data.Key)
			ins, err := s.getServerInstance(data.Value)
			if err != nil {
				log.Errorf("RedisRegistryService-subscribe-getServerInstance-value:%s, Err:%s", data.Value, err)
				continue
			}
			s.rwLock.Lock()
			if s.groupList[clusterName] == nil {
				s.groupList[clusterName] = make([]*ServiceInstance, 0)
			}
			s.groupList[clusterName] = append(s.groupList[clusterName], ins)
			s.rwLock.Unlock()
		}
	}()

	return
}

注意一点:redis的发布订阅的消息是不存储到日志的,也没有ack确认。 所以如果发生的消息的丢失,就需要业务自己承担了,比如自己实现一个ack,发送的时候进行消息日志的存储。

完整代码:
https://github.com/CocaineCong/incubator-seata-go/blob/discovery/redis/pkg/discovery/redis.go

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/579826.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

北京半导体展会2024时间(入场时间+闭馆时间)

2024年第二十一届中国国际半导体博览会(IC CHINA) 时 间:2024 年 9 月 5 一 7 日 地 点:中国北京 北人亦创国际会展中心 作为中国半导体行业协会主办的唯一展览会,自 2003 年起已连续成功举办二十届,是…

ZCT-80零序电流互感器配继电器组合用,电流200/1.5mA JOSEF约瑟

ZCT-80零序电流互感器配继电器组合用,电流200/1.5mA ■ 互感器用于接地保护的装置 ■ 检测零序电流的互感器 ■ 适用于EOCR继电器 功能特点: 专用于剩余电流的检测。 与ELR继电器配合使用。 产品外壳采用ABS阻燃材料,抗干扰能力强,测量…

RFC 6071: IP Security (IPsec) and Internet Key Exchange (IKE) Document Roadmap

![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/96882d1fb67b4383bc77c4dd421f7b

Ubuntu中的 Everything 搜索软件 ==> fsearch

本文所使用的 Ubuntu 系统版本是 Ubuntu 22.04 ! 在 Windows 中,我经常使用 Everything 来进行文件搜索,搜索效率比 Windows 自带的高出千百倍。 那么在 Ubuntu 系统中,有没有类似的软件呢?那必须有,它就是 FSearch 。…

【单调栈】3113. 边界元素是最大值的子数组数目

本文涉及的基础知识点 单调栈分类、封装和总结 LeetCode 3113. 边界元素是最大值的子数组数目 给你一个 正 整数数组 nums 。 请你求出 nums 中有多少个子数组,满足子数组中 第一个 和 最后一个 元素都是这个子数组中的 最大 值。 示例 1: 输入&#…

区块链 | OpenSea:Wyvern protocol

目录 Wyvern on the OpenSea 1 交易流程 1.1 卖家 1.2 买家 2 组成部分 2.1 WyvernProxyRegistry 2.2 OwnableDelegateProxy 2.3 NFT Contract 2.4 OpenSea Order Book 2.5 Wyvern Exchange Contract 3 总结 🥑原文:Wyvern on the …

交通气象站监测站

TH-GQX8交通运输在人们的日常生活中扮演着越来越重要的角色。然而,气候变化、环境污染等因素对交通安全产生了极大的影响。为了应对这些挑战,交通气象站监测站应运而生,成为守护交通安全的重要利器。 一、交通气象站监测站的功能 交通气象站…

路透社:美国SEC将拒绝以太坊ETF

4月25日,据路透社报道,美国SEC在下个月将拒绝以太坊现货ETF申请。根据4位知情人士表示,在最近几周与美国证券交易委员会(SEC)进行了会议之后,美国发行商和其他公司预计SEC将拒绝他们推出与以太坊价格挂钩的…

OpenMesh 网格高斯曲率计算(二)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 Mesh曲率特征通常指的是在三维几何网格(Mesh)上计算的曲率相关的一系列特征,包括主曲率、高斯曲率、平均曲率等。这些曲率特征提供了对Mesh表面形状的详细描述,对于表面形状分析、形状比较和几何建模等领域非常…

《C++的类型转换》

目录 一、c语言中的类型转换 1、隐式类型转化: 2、强制类型转化: 3、缺点 二、c新的类型转换 1、内置类型转为自定义类型 3、自定义类型转换为内置类型 三、C的规范的强制类型转换 1、C新增四种规范的类型转换的原因 2、static_cast 3、reint…

头歌实践教学平台:CG5-v1.0-简单光照效果

第2关:OpenGL球体镜面反射 一.任务描述 根据提示,在右侧修改代码,并自己绘制出图形。平台会对你编写的代码进行测试。 1.本关任务 为在场景中增加光照,需要执行以下步骤。 (1).设置一个或多个光源,设定它的有关属性…

信息系统项目管理师0074:数据集成(5信息系统工程—5.3系统集成—5.3.3数据集成)

点击查看专栏目录 文章目录 5.3.3数据集成1.数据集成层次2.异构数据集成5.3.3数据集成 数据集成的目的是运用一定的技术手段将系统中的数据按一定的规则组织成为一个整体,使得用户能有效地对数据进行操作。数据集成处理的主要对象是系统中各种异构数据库中的数据。数据仓库技术…

eclipse导入工程提示Project has no explicit encoding set

eclipse导入工程提示Project has no explicit encoding set 文章目录 eclipse导入工程提示Project has no explicit encoding set一、Eclipse的工程导入二、可能的问题1.在工程名下有黄色叹号 一、Eclipse的工程导入 用Eclipse的导入可以将原有工程导入到新环境中 具体方法是&…

1. 房屋租赁管理系统(基于springboot/vue的Java项目)

1.此系统的受众 1.1 在校学习的学生,可用于日常学习使用或是毕业设计使用 1.2 毕业一到两年的开发人员,用于锻炼自己的独立功能模块设计能力,增强代码编写能力。 1.3 亦可以部署为商化项目使用。 2. 技术栈 jdk8springbootvue2mysq5.7&8…

区块链与Web3.0:区块链项目的推广

数字信息时代,一场革命正在酝酿中,那就是区块链与Web3.0的结合。这种结合将会改变我们对于信息传输、存储和使用的方式,并有可能推动媒体行业向新的高度发展。这种转变不仅关系到我们如何获取和使用信息,也涉及到如何用创新的方式…

四、OSPF域间路由

注:区域(area)是以接口进行划分的 描述: R1的g0/0/1接口属于area 0 √ R1属于区域0和区域1 1.设计原则 1、OSPF区域的设计原则: 骨干区域有且只能存在一个 非骨干区域必须和骨干区域相连 多区域时&#…

VulnHub靶机 DC-9 靶机 详细渗透过程

VulnHub靶机 DC-9 打靶实战 详细渗透过程 目录 VulnHub靶机 DC-9 打靶实战 详细渗透过程一、将靶机配置导入到虚拟机当中二、渗透测试主机发现端口扫描Web渗透SQL注入登入后台文件包含SSH爆破提权 一、将靶机配置导入到虚拟机当中 靶机地址: https://www.vulnhub.…

【MHA】MySQL高可用MHA介绍1-功能,架构,优势,案例

目录 一 MHA 介绍 1 MHA功能 自动化主服务器监控和故障转移 交互式(手动启动的)主故障转移 非交互式主故障转移 在线切换主机 2 主服务器故障转移的难点 二 MHA架构 1 MHA组件 2 自定义扩展(脚本) 三 MHA优势 1 MHA可以…

锂电池SOH预测 | 基于BP神经网络的锂电池SOH预测(附matlab完整源码)

锂电池SOH预测 锂电池SOH预测完整代码锂电池SOH预测 锂电池的SOH(状态健康度)预测是一项重要的任务,它可以帮助确定电池的健康状况和剩余寿命,从而优化电池的使用和维护策略。 SOH预测可以通过多种方法实现,其中一些常用的方法包括: 容量衰减法:通过监测电池的容量衰减…

jupyter notebook设置代码自动补全

jupyter notebook设置代码自动补全 Anaconda Prompt窗口执行 pip install jupyter_contrib_nbextensionsjupyter contrib nbextensions install --userpip install jupyter_nbextensions_configuratorjupyter nbextensions_configurator enable --user按如下图片设置 卸载jed…
最新文章