RPC教程 6.负载均衡

1.负载均衡策略

假设有多个服务实例,而每个实例都提供相同的功能,为了提高整个系统的吞吐量,每个实例部署在不同的机器上。客户端可以选择任意一个实例进行调用,获取想要的结果。那如何选择呢?取决于负载均衡的策略。

  • 随机选择策略 - 从服务列表中随机选择一个。
  • 轮询算法(Round Robin) - 依次调度不同的服务器,每次调度执行 i = (i + 1) mode n。
  • 加权轮询(Weight Round Robin) - 在轮询算法的基础上,为每个服务实例设置一个权重,高性能的机器赋予更高的权重,也可以根据服务实例的当前的负载情况做动态的调整,例如考虑最近3分钟部署服务器的 CPU、内存消耗情况。

2.服务发现 

需要负载均衡那就需要有多个提供相同功能的服务实例。那服务发现是什么意思呢?现在我们有多个提供相同功能的服务实例,那客户端要获取该服务的地址,就需要服务中心返回一个地址给客戶端。这个就是服务发现。

那我们先实现一个基础的服务发现模块 Discovery(定义成interface接口类型)。

我们定义两个类型:

  • SelectMode 代表不同的负载均衡策略,简单起见,这里仅实现 Random 和 RoundRobin 两种策略。
  • Discovery 是一个接口类型,包含了服务发现所需要的最基本的接口。
    • Refresh() 从注册中心更新服务列表
    • Update(servers []string) 手动更新服务列表
    • Get(mode SelectMode) 根据负载均衡策略,选择一个服务实例
    • GetAll() 返回所有的服务实例

 代码在xclient文件夹中。

//discovery.go
type SelectMode int

const (
	RandomSelect SelectMode = iota
	RoundRobinSelect
)

type Discovery interface {
	Refresh() error
	Update(servers []string) error
	Get(mode SelectMode) (string, error)
	GetAll() ([]string, error)
}

 紧接着,我们实现一个不需要注册中心,服务列表由手工维护的服务发现的结构体:MultiServersDiscovery。

可以用编码的方式在客户端中配置服务的地址,服务器不需要进行更多的配置,如果添加或删除了某些服务,可以调用MultipleServersDiscovery.Update来动态更新服务。

客户端使用NewMultipleServersDiscovery方法设置该服务的网络和地址。

//discovery.go
type MultiServerDiscovery struct {
	rwMutex sync.RWMutex //protect following,即是保护servers,index
	servers []string
	index   int
}

//使用例子: NewMultiServerDiscovery([]string{"tcp@127.0.0.1:100","tcp@127.0.0.1:2100"})
func NewMultiServerDiscovery(servers []string) *MultiServerDiscovery {
	d := &MultiServerDiscovery{
		servers: servers,
	}

	d.index = rand.Intn(math.MaxInt32 - 1)
	return d
}
  • index 记录 Round Robin 算法已经轮询到的位置,为了避免每次从 0 开始,初始化时随机设定一个值。

 然后,实现 Discovery 接口

var _ Discovery = (*MultiServerDiscovery)(nil)

func (d *MultiServerDiscovery) Refresh() error {
	return nil
}

func (d *MultiServerDiscovery) Update(servers []string) error {
	d.rwMutex.Lock()
	defer d.rwMutex.Unlock()
	d.servers = servers
	return nil
}

func (d *MultiServerDiscovery) Get(mode SelectMode) (string, error) {
    //这里不能用d.rwMutex.RLock(),因为d.index有更新
	d.rwMutex.Lock()
	defer d.rwMutex.Unlock()
	n := len(d.servers)
	if n == 0 {
		return "", errors.New("rpc discovery: no available servers")
	}

	switch mode {
	case RandomSelect:
		return d.servers[rand.Intn(n)], nil
	case RoundRobinSelect:
		s := d.servers[d.index%n]
		d.index = (d.index + 1) % n
		return s, nil
	default:
		return "", errors.New("rpc discovery: not supported select mode")
	}
}

func (d *MultiServerDiscovery) GetAll() ([]string, error) {
	d.rwMutex.RLock()
	defer d.rwMutex.RUnlock()
    // return a copy of d.servers
	servers := make([]string, len(d.servers))
	copy(servers, d.servers)
	return servers, nil
}

3.为什么选择客户端负载均衡

RPC client 和 server 建立是长连接, 因而基于连接的负载均衡没有太大意义, 所以 该RPC 负载均衡是基于每次调用。也就是客户在同一个 client 发的请求希望它被负载均衡到所有服务端。

一般来说负载均衡器是独立的, 被放置在服务消费者和提供者之间. 代理通常需要保存请求响应副本, 因此有性能消耗也会造成额外延迟. 当请求量大时, lb (load balance)可能会变成瓶颈, 并且此时 lb 单点故障会影响整个服务。

客户端负载将lb 的功能集成到客户端进程里,然后使用负载均衡策略选择一个目标服务地址,向目标服务发起请求。LB能力被分散到每一个服务消费者的进程内部,同时服务消费方和服务提供方之间是直接调用,没有额外开销,性能比较好。

但用客户端负载均衡也有坏处, 若有多种不同的语言栈,就要配合开发多种不同的客户端,有一定的研发和维护成本。

4.支持负载均衡的客户端

之前对外使用的客户端是Dail(...),这里我们也要向用户暴露一个支持负载均衡的客户端,叫做 XClient。

//xclient.go
type XClient struct {
	d       Discovery
	mode    SelectMode
	opt     *geerpc.Option
	mutex   sync.Mutex
	clients map[string]*geerpc.Client
}

func NewXClient(d Discovery, mode SelectMode, opt *geerpc.Option) *XClient {
	return &XClient{
		d:       d,
		mode:    mode,
		opt:     opt,
		clients: make(map[string]*geerpc.Client),
	}
}

func (xc *XClient) Close() error {
	xc.mutex.Lock()
	defer xc.mutex.Unlock()
	for key, client := range xc.clients {
		//只是关闭,没有其他的对错误的处理
		client.Close()
		delete(xc.clients, key)
	}
	return nil
}

XClient 的构造函数需要传入三个参数,服务发现实例 Discovery、负载均衡模式 SelectMode 以及协议选项 Option。为了尽量地复用已经创建好的 Socket 连接,使用 clients 保存创建成功的 Client 实例,并提供 Close 方法在结束后,关闭已经建立的连接。

我们之前是使用dial函数来创建一个客户端,那我们为了可以复用已经创建好的socket连接,这里我们也实现一个dial函数,在内部复用socket连接。

func (xc *XClient) dial(rpcAddr string) (*geerpc.Client, error) {
	xc.mutex.Lock()
	defer xc.mutex.Unlock()

	client, ok := xc.clients[rpcAddr]
	if ok && !client.IsAvailable() {
		client.Close()
		delete(xc.clients, rpcAddr)
		client = nil
	}
	if client == nil {
		var err error
		client, err = geerpc.XDial(rpcAddr, xc.opt)
		if err != nil {
			return nil, err
		}
		xc.clients[rpcAddr] = client
	}
	return client, nil
}

func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {
	//获取sokcet连接(复用)
	client, err := xc.dial(rpcAddr)
	if err != nil {
		return err
	}
	return client.Call(ctx, serviceMethod, args, reply)
}

// serviceMethod 例子:"Foo.SUM"
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply any) error {
	//通过负载均衡策略得到服务实例
	rpcAddr, err := xc.d.Get(xc.mode)
	if err != nil {
		return err
	}

	return xc.call(rpcAddr, ctx, serviceMethod, args, reply)
}

之后实现一个调用服务的方法Call。该方法主要是三步:

  1. 通过负载均衡策略得到服务实例
  2. 获取sokcet连接(复用)
  3. 最终调用client.Call去发送服务请求

另外,我们为 XClient 添加一个常用功能:Broadcast。

Broadcast 表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。

Broadcast 是 XClient 的一个方法, 你可以将一个请求发送到这个服务的所有节点。
如果所有的节点都正常返回,没有错误的话, Broadcast将返回其中的一个节点的返回结果。 如果有节点返回错误的话,将返回这些错误信息中的一个。

func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply any) error {
	//获取所有的服务实例
	servers, err := xc.d.GetAll()
	if err != nil {
		return err
	}

	var wg sync.WaitGroup
	var mutex sync.Mutex //protect e and replyDone
	var e error

	replyDone := reply == nil
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	for _, rpcAddr := range servers {
		wg.Add(1)
		//fmt.Printf("rpcAddrstring addr: %p\n", &rpcAddr) //其rpcAddr的地址都是一样的
		go func(rpcAddr string) {
			defer wg.Done()
			var clonedReply any
			if reply != nil {
				//reply是指针的,所以需要使用Elem()
				clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
			}
			//xc.call方法中的参数clonedReply不能使用reply
			err := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply)
			mutex.Lock()
			defer mutex.Unlock()

			if err != nil && e == nil {//e==nil表明e还没有被赋值
				e = err
				cancel() // if any call failed, cancel unfinished calls
			}
			if err == nil && !replyDone {
				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())
				replyDone = true
			}
		}(rpcAddr)
	}
	wg.Wait()
	return e
}

// //另一种写法,go协程中没有参数
// func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply any) error {
//     .............
// 	for _, rpcAddr := range servers {
// 		wg.Add(1)
// 		//fmt.Printf("rpcAddrstring addr: %p\n", &rpcAddr) //其rpcAddr的地址都是一样的
//         addr:=rpcAddr
// 		go func() {
// 			defer wg.Done()
// 		err := xc.call(addr, ctx, serviceMethod, args, clonedReply)
//          ......
// 		}()
// 	}
//     ................
// }

 需要注意的几点:

  1. 为了提升性能,请求是并发的。而要等待协程去访问服务实例结束,所以可以使用sync.WaitGroup来阻塞等待。
  2. 并发情况下,xc.call中不能使用reply入参,需要每个协程都有自己的clonedReply参数,不然就需要用锁来控制reply,这就不值得了。每个协程都有自己的clonedReply,获得结果后,再把clonedReply赋值给reply就行,这样就只需要使用互斥锁保证 error 和 reply 能被正确赋值即可。
  3. 借助 context.WithCancel 确保有错误发生时,快速失败。

5.测试 

首先,启动 RPC 服务的代码还是类似的,Sum 是正常的方法,Sleep 用于验证 XClient 的超时机制能否正常运作。

type My int

type Args struct{ Num1, Num2 int }

func (m *My) Sum(args Args, reply *int) error {
	*reply = args.Num1 + args.Num2
	return nil
}

func (m *My) Sleep(args Args, reply *int) error {
	time.Sleep(time.Second * time.Duration(args.Num1))
	*reply = args.Num1 + args.Num2
	return nil
}

接着,有两个函数,clientCall调用单个服务实例,broadcast 调用所有服务实例。

这两个函数代码也是基本相似的,主要不同就是协程函数内的操作不同。

// 调用单个服务实例
func clientCall(addr1, addr2 string) {
	d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()

	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			var reply int = 1324
			if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
				log.Println("call Foo.Sum error:", err)
			}
			fmt.Println("reply: ", reply)
		}(i)
	}
	wg.Wait()
}

func broadcast(addr1, addr2 string) {
	d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()

	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			var reply int = 1324
			if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
				fmt.Println("Broadcast call Foo.Sum error:", err)
			}
			fmt.Println("Broadcast reply: ", reply)

			ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
			defer cancel()
			var replyTimeout int = 1324
			if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {
				fmt.Println("Broadcast call Foo.Sum error:", err)
			}
			fmt.Println("timeout Broadcast reply: ", replyTimeout)

		}(i)
	}
	wg.Wait()
}

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	//start two servers
	go startServer(ch1)
	go startServer(ch2)

	addr1 := <-ch1
	addr2 := <-ch2
	time.Sleep(time.Second)
	clientCall(addr1, addr2)
	broadcast(addr1, addr2)
}

效果

 

完整代码:https://github.com/liwook/Go-projects/tree/main/geerpc/6-load-balance

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/353698.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【WPF.NET开发】WPF中的双向功能

本文内容 FlowDirectionFlowDocumentSpan 元素非文本元素的 FlowDirection数字替换 与其他任何开发平台不同&#xff0c;WPF 具有许多支持双向内容快速开发的功能&#xff0c;例如&#xff0c;同一文档中混合了从左到右和从右到左的数据。 同时&#xff0c;WPF 也为需要双向功…

JVM基础知识汇总篇

☆* o(≧▽≦)o *☆嗨~我是小奥&#x1f379; &#x1f4c4;&#x1f4c4;&#x1f4c4;个人博客&#xff1a;小奥的博客 &#x1f4c4;&#x1f4c4;&#x1f4c4;CSDN&#xff1a;个人CSDN &#x1f4d9;&#x1f4d9;&#x1f4d9;Github&#xff1a;传送门 &#x1f4c5;&a…

企业计算机中了360后缀勒索病毒怎么办,360后缀勒索病毒解密流程

企业计算机服务器在生产运营过程中发挥着巨大作用&#xff0c;为企业带来极大便利&#xff0c;存储着企业的重要核心数据&#xff0c;但同时也成为众多勒索病毒攻击的目标。近期&#xff0c;云天数据恢复中心接到很多企业的求助&#xff0c;企业的计算机服务器遭到了360后缀勒索…

多文件开发

当所有的类都写在main.m这个源文件之中、将不利于后期的维护和团队开发 推荐的方式 把1个类写在1个模块之中&#xff0c;而1个模块至少包含两个文件 h头文件 1.写的类声明因为要用到Foundation框架中的类NS0 bject所以在这个头文件中要引入 Foundationa 2.框架的头文件 3.然后…

qemu搭建arm64 linux kernel环境

一、环境准备 ubuntu 22.04 内核源码&#xff1a;linux-6.6.1 &#xff08;直接上最新版&#xff09; 下载链接&#xff1a;The Linux Kernel Archives 交叉编译工具链&#xff1a; sudo apt-get install gcc-12-aarch64-linux-gnu 具体能用的版本gcc-XX-arch64-linux-gnu…

python_ACM模式《剑指offer刷题》链表2

题目&#xff1a; 例如 面试tips&#xff1a; 询问有无时间复杂度或空间复杂度的限制。 思路&#xff1a; 本题的本质就是复杂链表的深拷贝 1. 暴力解法 → 第一次遍历原链表时构建一个复制了next的新链表&#xff0c;第二次遍历原链表&#xff0c;对每个原链表的节点的ran…

幻兽帕鲁服务器多少钱?4核16G支持32人在线吗?

4核16G服务器是幻兽帕鲁Palworld推荐的配置&#xff0c;阿里云和腾讯云均推出针对幻兽帕鲁的4核16G服务器&#xff0c;阿里云4核16G幻兽帕鲁专属服务器32元1个月、66元3个月&#xff0c;腾讯云4核16G14M服务器66元1个月、277元3个月、1584元一年。云服务器吧yunfuwuqiba.com分享…

一、MongoDB、express的安装和基本使用

数据库【Sqlite3、MongoDB、Mysql】简介&小记 Sqlite3&#xff1a; SQLite3是一个轻量级的数据库系统&#xff0c;它被设计成嵌入式数据库。这意味着它是一个包含在应用程序中的数据库&#xff0c;而不是独立运行的系统服务。适用场景&#xff1a;如小型工具、游戏、本地…

node.js Redis SETNX命令实现分布式锁解决超卖/定时任务重复执行问题

Redis SETNX 命令背后的原理探究 当然&#xff0c;让我们通过一个简单的例子&#xff0c;使用 Redis CLI&#xff08;命令行界面&#xff09;来模拟获取锁和释放锁的过程。 在此示例中 获取锁: # 首先&#xff0c;设置锁密钥的唯一值和过期时间(秒) 127.0.0.1:6379> SET …

ChatGPT 官方中文页面上线

根据页面显示&#xff0c;OpenAI 现已推出 ChatGPT 的多语言功能 Alpha 版测试&#xff0c;允许用户选择不同语言的界面进行交互。 如下图所示&#xff0c;ChatGPT 会检测系统当前所使用的语言&#xff0c;并提示用户进行语言切换。 用户也可通过设置页面选择其他语言。目前&a…

企业转型:虚拟化对云计算的影响

虚拟化被认为是IT行业最优秀的技术之一。虚拟化提供的灵活性和效率&#xff0c;有助于企业根据不断变化的需求扩展其IT基础设施。虚拟化是云基础设施的基础&#xff0c;允许按需动态分配和管理计算资源。这种适应性对于满足现代企业的多样化需求至关重要&#xff0c;因为现代企…

深度学习之处理多维特征的输入

我们首先来看一个糖尿病的数据集&#xff1a; 在数据集中&#xff0c;我们称每一行叫做sample&#xff0c;表示一个样本&#xff0c;称每一列是feature&#xff0c;也就是特征在数据库里面这就是一个关系表&#xff0c;每一行叫做记录&#xff0c;每一列叫做字段。 每一个样本都…

山西电力市场日前价格预测【2024-01-29】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2024-01-29&#xff09;山西电力市场全天平均日前电价为279.99元/MWh。其中&#xff0c;最高日前电价为397.38元/MWh&#xff0c;预计出现在07:45。最低日前电价为0.00元/MWh&#xff0c;预计出…

【计算机专业学习委员必备自动化催作业通知】

文章目录 前言一、前期准备zfile部署mysql服务搭建 二、编写python脚本python代码 三、总结 前言 大家好&#xff01;我是一名计算机专业的菜鸟&#xff0c;作为这个专业的学习委员&#xff0c;我觉得收电子版作业是一件非常麻烦的事情&#xff0c;作业实验科目也比较多&#…

RLHF学习

整体流程 三个步骤分解&#xff1a; 预训练一个语言模型 (LM) &#xff1b;聚合问答数据并训练一个奖励模型 (Reward Model&#xff0c;RM) &#xff1b;用强化学习 (RL) 方式微调 LM。 RW RM 的训练是 RLHF 区别于旧范式的开端。这一模型接收一系列文本并返回一个标量奖励&…

探索Pyecharts关系图绘制技巧:炫酷效果与创意呈现【第42篇—python:Pyecharts水球图】

文章目录 Pyecharts绘制多种炫酷关系网图引言准备工作代码实战1. 基本关系网图2. 自定义节点样式和边样式3. 关系网图的层级结构4. 添加标签和工具提示5. 动态关系网图6. 高级关系网图 - Les Miserables 示例7. 自定义关系网图布局8. 添加背景图9. 3D 关系网图10. 热力关系网图…

使用PCL进行法向量可视化

使用PCL进行法向量可视化 文章目录 1、使用PCL进行法向量可视化2、计算所有点的法线并显示3、计算一个子集的法线 1、使用PCL进行法向量可视化 #include <iostream> #include <pcl/io/pcd_io.h> #include <pcl/visualization/pcl_visualizer.h> #include &l…

Qt使用中文字符串乱码的问题

文章目录 vs编译器下第一种解决方式第二种解决方式 Qt编译器下 我们在使用qt的时候有时候会遇到打印中文字符串的时候出现中文乱码的问题&#xff0c;主要是由于Qt的QString字符串存储的方式是使用utf-8的编码方式&#xff0c;如果我们本地的文件是使用GBK方式的编码再使用中文…

DAY09_SpringBoot—整合SpringMVCSpringMVC参数取值用法

目录 1 SpringMVC1.1 SpringMVC框架介绍1.2 SpringMVC入门案例1.2.1 创建项目1.2.2 添加依赖项1.2.3 检查pom.xml文件1.2.4 编辑YML配置文件1.2.5 在templates中添加index.html文件1.2.6 默认页面跳转机制 1.3 RequestMapping注解测试1.3.1 编辑HelloController1.3.2 页面请求效…

【计算机网络】深入掌握计算机网络的核心要点(面试专用)

写在前面 前言四层模型网络地址管理Linux下设置ipARP请求包总结 前言 计算机网络是指将分散的计算机设备通过通信线路连接起来&#xff0c;形成一个统一的网络。为了使得各个计算机之间能够相互通信&#xff0c;需要遵循一定的协议和规范。OSI参考模型和TCP/IP参考模型是计算机…
最新文章