【Spring Cloud Alibaba】1.4 Nacos服务注册流程和原理解析

文章目录

    • 1.前言
    • 2. 服务注册的基本流程
    • 3. 服务注册的核心代码分析
      • 3.1. `NacosNamingService`
        • NamingProxy 服务端通信的核心类
        • NamingClientProxy nacos 2.x 版本服务端通信核心接口
      • 3.2 NamingGrpcClientProxy 详解
        • RpcClient类
          • RpcClient类核心方法 `start`
      • 3.3 NamingHttpClientProxy详解
        • 方法 callServer
    • 4. 心跳机制
      • nacos v1.x 版本
      • Nacos 2.x 版本

1.前言

本章我们来聊聊nacos 服务注册流程和原理解析。在微服务架构中,服务之间需要相互通信,因此服务如何发现彼此变得非常重要。这就需要使用到服务注册中心,在 Spring Cloud Alibaba 中,Nacos 就充当了这个角色。

Nacos 提供了服务发现、服务配置和服务元数据等服务治理功能。作为服务注册中心,Nacos 可以实现服务的自动注册、发现和健康检查等功能。

本章, 将深入剖析 Nacos 的服务注册流程和实现原理 将从 Nacos 服务启动注册开始,逐步解析其后续的心跳维持、服务发现等过程,以及在出现故障时 Nacos 如何进行处理。

在这里插入图片描述

2. 服务注册的基本流程

Nacos服务注册流程大致如下:

  1. 服务提供者启动: 当服务提供者启动后,会向Nacos Server发送一个注册请求,请求中包含了服务的基本信息,例如服务名称,服务地址,服务端口等。

  2. Nacos Server接收请求: Nacos Server接收到注册请求后,会对服务信息进行处理。处理包括存储服务信息,检查服务是否已存在等。

  3. 服务提供者心跳检测: 服务提供者在注册成功后,会定期向Nacos Server发送心跳,以证明自己还在运行。如果Nacos Server在一定时间内没有收到服务提供者的心跳,会认为该服务已经不可用,并将其从注册列表中移除。

  4. 服务消费者查询: 服务消费者需要调用某个服务时,会向Nacos Server查询该服务的信息。Nacos Server会返回存储的服务提供者的信息给服务消费者。

  5. 服务消费者调用: 服务消费者根据Nacos Server返回的服务信息,选择一个合适的服务提供者进行调用。

Nacos服务注册流程包括服务提供者的注册,心跳检测,以及服务消费者的查询和调用。这个过程通过Nacos Server的管理,实现了服务的高可用和负载均衡。
在这里插入图片描述

3. 服务注册的核心代码分析

3.1. NacosNamingService

该类实现了NamingService接口,提供了服务注册、注销、查询等方法。
NacosNamingService是Nacos中负责服务注册与发现的核心组件, NacosNamingService的一些主要功能和相关的源码解析:

  1. 服务注册:registerInstance(String serviceName, String groupName, Instance instance)方法允许服务提供者将自己的实例注册到指定的服务名下。
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    Instance instanceForServer = new Instance();
    instanceForServer.setIp(instance.getIp());
    instanceForServer.setPort(instance.getPort());
    instanceForServer.setWeight(instance.getWeight());
    instanceForServer.setMetadata(instance.getMetadata());
    instanceForServer.setClusterName(instance.getClusterName());
    instanceForServer.setServiceName(serviceName);
    instanceForServer.setEnabled(instance.isEnabled());
    instanceForServer.setHealthy(instance.isHealthy());
    instanceForServer.setEphemeral(instance.isEphemeral());
    serverProxy.registerService(serverName, groupName, instanceForServer);
}
  1. 服务发现:selectInstances(String serviceName, String groupName, boolean healthy)方法允许服务消费者发现指定服务名下的所有实例。
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
    return serverProxy.queryInstancesOfService(serviceName, groupName, 0, healthy);
}
  1. 服务下线:deregisterInstance(String serviceName, String groupName, Instance instance)方法允许服务提供者将自己的实例从指定的服务名下注销。
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    serverProxy.deregisterService(serviceName, instance);
}
  1. 获取服务列表:getServicesOfServer(int pageNo, int pageSize, String groupName)方法允许获取指定分页和组名下的服务列表。
public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName) throws NacosException {
    return serverProxy.getServiceList(pageNo, pageSize, groupName);
}
NamingProxy 服务端通信的核心类

所有这些方法的实现都依赖于serverProxy对象,这个对象是NamingProxy类的实例,它包含了与Nacos服务器进行RPC通信的逻辑。在NacosNamingService进行服务注册或发现的时候,实际上是通过NamingProxy将请求发送到Nacos服务器,然后接收并处理服务器的响应。
在这里插入图片描述

我之前在1.x版本中看源码的时候 与服务端通信的代理工具类命名为 NamingProxy serverProxy;
但是在新版本中已经变为NamingClientProxy clientProxy;
在这里插入图片描述

这个变化可能在大家阅读源码的时候会注意到,从表面看是命名好像变了,大部分方法的实现都是一样,其实底层是发生了很多变化。

在早期的版本中,Nacos使用NamingProxy ServerProxy这个对象来处理与Nacos服务器的交互。然而,这种方式在处理复杂交互逻辑时,例如服务实例的注册、注销和查询等操作,可能会变得复杂并且容易出错

Nacos的开发者引入了NamingClientProxy接口,来封装与Nacos服务器交互的逻辑。通过实现这个接口,可以创建多个代理,每个代理处理一种特定的交互方式,例如HTTP或者gRPC。这样做的好处是可以将交互逻辑的实现细节封装在代理之中,使得对外提供的API变得更加清晰和简单。
我们通过源码可以看到
在这里插入图片描述
NamingProxyNamingClientProxy都是Nacos在不同时期设计的与服务端通信的对象,用于处理与Nacos服务器的交互。

  1. 设计上的差异:NamingProxy是一个具体的类,用于处理与Nacos服务器之间的HTTP交互。而NamingClientProxy是一个接口,定义了所有与Nacos服务器交互的对象应该具备的方法。通过实现这个接口,可以创建多个代理来处理不同类型的交互,例如HTTP和gRPC。

  2. 使用上的差异:在早期版本的Nacos中,NamingProxy负责所有的与Nacos服务器的交互。在后续版本中,这一角色被NamingClientProxy接口和它的实现类取代。这意味着,新的交互方式可以通过添加新的NamingClientProxy实现来支持,而不需要修改NamingProxy的代码,可以以多态的形式进行扩展,抛弃了最早单一的类实现。

这个变化使得Nacos的代码更加具有可维护性和扩展性。

此外,使用NamingClientProxy还有一个好处是,可以更容易的添加新的交互方式。只需要实现新的NamingClientProxy,就可以处理新的交互方式,而不需要修改现有的代码。

NamingClientProxy nacos 2.x 版本服务端通信核心接口

服务注册和发现默认使用gRPC协议进行通信, 使用NamingGrpcClientProxy
在这里插入图片描述
NamingClientProxy 有两个实现类

NamingGrpcClientProxyNamingHttpClientProxy都是NamingClientProxy接口的实现类,用于定义具体的和Nacos Server进行交互的方法。不同的是,他们使用的通信协议不同。

  1. NamingGrpcClientProxy:这个类使用gRPC协议与Nacos服务器进行交互。gRPC是一个高性能、开源和通用的RPC框架,设计初衷是实现微服务之间的通信。gRPC协议在大规模服务间的通信、低延迟的场景下可能会有更好的性能。在Nacos 2.2.0版本开始,服务注册和发现默认使用gRPC协议进行通信。

  2. NamingHttpClientProxy:这个类使用HTTP协议与Nacos服务器进行交互。HTTP协议是Internet上应用最为广泛的一种网络协议,更加通用。在某些情况下,例如网络环境限制只能使用HTTP协议时,可以选择NamingHttpClientProxy

这两个实现类均定义了服务注册、服务注销、服务发现、服务列表查询等方法,只是底层通信协议的差异。在Nacos的客户端创建时,会根据配置选择使用哪个实现类,然后通过这个代理类与Nacos Server进行交互。

根据一位同学的测试,发现 使用NamingHttpClientProxy 频繁注册和注销和心跳 会导致服务端的内存短时间暴增,大家注意避雷,能升版本就升版本。详细issue 可以参考高频心跳导致nacos server内存持续高位且心跳停止内存未回收 #11424

场景
使用nacos 2.2.3版本
注册100个服务 使用api 发送心跳 nacos server内存持续增长,且心跳停止以后,内存未回收

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

3.2 NamingGrpcClientProxy 详解

在新版本中,Nacos 默认使用 gRPC 作为通信协议替换了 HTTP。下面是几个主要原因:

  1. 性能:相比 HTTP/1.1gRPC 基于 HTTP/2,支持多路复用请求优先级双向流等特性,能够支持更高的并发连接,而且延迟更低。
  2. 语言无关:gRPC 支持多种语言,包括 Java、C++、Python、Go、Node.js、Ruby等,方便在不同语言的环境中使用。
  3. 强类型:gRPC 使用 Protobuf 作为序列化工具,定义服务接口和消息类型,生成强类型的代码,避免手动处理 JSON
  4. 易于扩展:支持拦截器和自定义元数据,方便进行认证、负载均衡、日志记录等操作。
  5. 流控制:gRPC 内置了流控制机制,可以有效地控制消息的发送速率,防止因流量过大而导致的服务故障。

因此,新版本的Nacos采用gRPC作为通信协议,可以提供更高的性能,更好的兼容性和扩展性,同时也可以带来更好的开发体验。

源码解析
requestToServer方法是NamingGrpcClientProxy 类中的核心方法,负责将请求发送到Nacos服务器,并得到返回的响应。

requestToServer将请求的安全性相关的头信息添加到请求中,然后根据是否设置了请求超时时间,决定使用哪种方式发送请求。接着,检查服务器返回的响应是否成功,如果成功,则检查响应的类型是否和期望的响应类匹配,如果匹配,则返回响应,如果不匹配,则记录错误日志。在请求过程中如果发生异常。

private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
        throws NacosException {
    try {
        // 添加安全相关的头信息到请求中
        request.putAllHeader(
                getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
        // 根据是否设置了请求超时时间,来决定使用哪种方式来发送请求
        Response response =
                requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        // 检查服务器返回的响应是否成功
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        // 检查服务器返回的响应类型是否和期望的响应类匹配
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        // 如果不匹配,记录错误日志
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                response.getClass().getName(), responseClass.getName());
    } catch (NacosException e) {
        throw e;
    } catch (Exception e) {
    
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
    // 如果服务器返回的响应无效,抛出NacosException
    throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}

getSecurityHeaders 主要是获取请求头信息,如果我们设置了账号密码鉴权,发送的所有请求都需要携带token。如果我们跟源码会发现。核心的是SecurityProxy ,主要用于处理Nacos客户端与服务端之间的安全认证相关的逻辑。

在Nacos中,为了保障系统的安全,通常会进行一些安全认证的操作,比如登录认证、权限认证等。这些安全认证的操作通常涉及到一些复杂的逻辑,比如密码的加密、Token的生成和验证等。

SecurityProxy类就是用来封装这些安全认证相关的逻辑的。比如在Nacos客户端启动时,SecurityProxy会负责进行登录操作,向Nacos服务端发送用户名和密码,获取登录后的Token;在发送请求到Nacos服务端时,SecurityProxy会将Token添加到请求的Header中,用于服务端验证请求的合法性。

此外,SecurityProxy还封装了一些其他的功能,比如定时刷新Token,获取当前的安全信息等。通过SecurityProxy,可以使Nacos的其他模块不需要关心具体的安全认证逻辑,只需要通过SecurityProxy提供的接口进行操作即可,从而降低了系统的复杂性。

public class SecurityProxy implements Closeable {
    
    private ClientAuthPluginManager clientAuthPluginManager;
    
   
    public SecurityProxy(List<String> serverList, NacosRestTemplate nacosRestTemplate) {
        clientAuthPluginManager = new ClientAuthPluginManager();
        clientAuthPluginManager.init(serverList, nacosRestTemplate);
    }
  
    public void login(Properties properties) {
        if (clientAuthPluginManager.getAuthServiceSpiImplSet().isEmpty()) {
            return;
        }
        for (ClientAuthService clientAuthService : clientAuthPluginManager.getAuthServiceSpiImplSet()) {
            clientAuthService.login(properties);
        }
    }
 
    public Map<String, String> getIdentityContext(RequestResource resource) {
        Map<String, String> header = new HashMap<>(1);
        for (ClientAuthService clientAuthService : clientAuthPluginManager.getAuthServiceSpiImplSet()) {
            LoginIdentityContext loginIdentityContext = clientAuthService.getLoginIdentityContext(resource);
            for (String key : loginIdentityContext.getAllKey()) {
                header.put(key, loginIdentityContext.getParameter(key));
            }
        }
        return header;
    }
    
    @Override
    public void shutdown() throws NacosException {
        clientAuthPluginManager.shutdown();
    }
}

我们还看到一个核心类RpcClient
在这里插入图片描述

RpcClient类

RpcClient是远程客户端的抽象类,用于连接到服务器并发送请求。它具有启动、关闭、发送请求等功能,并提供了用于处理连接事件和重连的线程池。它还可以注册连接事件监听器和服务器请求处理器。

  1. start(): 启动客户端,并连接到服务器。 会创建一个ScheduledExecutorService,用于处理连接事件和重连事件。然后,尝试连接服务器,如果连接成功,则设置当前连接为新连接,并将客户端状态设置为RUNNING;如果连接失败,则调用switchServerAsync()进行切换服务器。
  1. shutdown(): 关闭客户端。该方法将客户端状态设置为SHUTDOWN,并关闭客户端的事件执行器和当前连接。

  2. isWaitInitiated(): 检查客户端是否等待初始化。

  3. isRunning(): 检查客户端是否正在运行。

  4. isShutdown(): 检查客户端是否已关闭。

  5. request(): 发送同步请求到服务器并返回响应。

  6. asyncRequest(): 发送异步请求到服务器。

  7. requestFuture(): 发送异步请求到服务器,并返回一个RequestFuture对象,可以用于获取响应。

  8. connectToServer(): 连接到指定的服务器。

  9. handleServerRequest(): 处理从服务器接收到的请求。

  10. registerConnectionListener(): 注册连接事件监听器。

  11. registerServerRequestHandler(): 注册服务器请求处理器。

  12. nextRpcServer(): 获取下一个RPC服务器。

  13. currentRpcServer(): 获取当前RPC服务器。

  14. resolveServerInfo(): 解析服务器地址。

  15. getConnectionType(): 获取客户端的连接类型。

  16. rpcPortOffset(): 获取RPC端口的偏移量。

  17. getLabels(): 获取客户端的标签。

RpcClient类核心方法 start

详细代码略了,有兴趣可以直接看源码。此处只是简要说明一下流程和设计

  1. rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING); 这段是使用CAS(Compare And Swap)操作来改变客户端的状态,从初始化状态到启动状态,这是一种保证线程安全的方式。
  2. 创建线程池clientEventExecutor,用于处理客户端的任务。
  3. 提交一个任务到线程池,这个任务是一个无限循环,主要任务是监听事件队列,当接收到连接事件时,会根据连接的状态进行通知。
  4. 提交另一个任务到线程池,这个任务也是一个无限循环,主要任务是处理重连的上下文,如果没有重连上下文,那么就进行健康检查,如果检查失败,那么就设置客户端的状态为UNHEALTHY,并创建一个新的重连上下文。
  5. 尝试与服务器建立连接,如果连接失败,那么就进行重试。如果所有的重试都失败,那么就异步切换服务器。如果连接成功,那么就设置当前的连接,并改变客户端状态为RUNNING,并通知事件队列已经连接。
  6. 注册处理连接重置请求的处理器。
  7. 注册处理客户端检测请求的处理器。

start这个方法设计思想主要体现在两个方面

  1. 一是采用了状态机模式来管理RPC客户端的状态,通过CAS操作保证状态的线程安全性
  2. 二是通过线程池和阻塞队列来处理事件和进行重连操作,以实现事件驱动,使代码逻辑更清晰,同时提高代码的可维护性和可扩展性
public final void start() throws NacosException {
    // 使用CAS操作确保start()只执行一次
    boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
    if (!success) {
        return;
    }

    // 创建一个定时线程池,用于处理客户端事件和重连
    clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
        Thread t = new Thread(r);
        t.setName("com.alibaba.nacos.client.remote.worker");
        t.setDaemon(true);
        return t;
    });

    // 提交一个任务到线程池,这个任务一直监听事件队列,当发现连接事件时,会根据连接的状态进行相应的通知
    clientEventExecutor.submit(() -> {
        while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
            ConnectionEvent take;
            try {
                take = eventLinkedBlockingQueue.take();
                if (take.isConnected()) {
                    notifyConnected();
                } else if (take.isDisConnected()) {
                    notifyDisConnected();
                }
            } catch (Throwable e) {
                
            }
        }
    });

    // 提交另一个任务到线程池,这个任务主要进行客户端的重连
    clientEventExecutor.submit(() -> {
        while (true) {
            try {
                if (isShutdown()) {
                    break;
                }
                ReconnectContext reconnectContext = reconnectionSignal
                        .poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
                if (reconnectContext == null) {
                    // check alive time.
                    // 如果获取不到重连上下文,那么就进行健康检查,如果检查失败,那么就设置客户端的状态为UNHEALTHY,并创建一个新的重连上下文
                    // 如果健康检查通过,那么更新最后活跃时间戳
                }

                if (reconnectContext.serverInfo != null) {
                    // clear recommend server if server is not in server list.
                    // 如果获取到重连上下文,那么就进行重连。在重连前会检查推荐的服务器是否在服务器列表中,如果不在就忽略
                }
                reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
            } catch (Throwable throwable) {
                // 忽略异常
            }
        }
    });

    // 尝试连接到服务器,如果连接失败会尝试重试,所有重试失败后异步切换服务器,连接成功后改变客户端状态为RUNNING
    // 偿试连接到服务器, 并设置启动状态
    int startUpRetryTimes = rpcClientConfig.retryTimes();
    while (startUpRetryTimes > 0 && connectToServer == null) {
        // 详细代码略了,有兴趣可以直接看源码。此处只是简要说明一下流程和设计读取下一个服务信息并尝试连接,如果失败就打印日志并重试
    }

    // 如果连接成功,设置当前连接为成功的连接,并改变状态为RUNNING,并通知事件队列已经连接
    // 否则,异步切换服务器
    if (connectToServer != null) {
        this.currentConnection = connectToServer;
        rpcClientStatus.set(RpcClientStatus.RUNNING);
        eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
    } else {
        switchServerAsync();
    }

    // 注册处理连接重置请求的处理器和客户端检测请求的处理器
    registerServerRequestHandler(new ConnectResetRequestHandler());
    registerServerRequestHandler(request -> {
        if (request instanceof ClientDetectionRequest) {
            return new ClientDetectionResponse();
        }
        return null;
    });
}

 

3.3 NamingHttpClientProxy详解

NamingHttpClientProxy 是Nacos服务发现功能中的一个重要组件,主要负责和Nacos服务器的HTTP交互,包括注册服务、注销服务、查询服务和更新服务等操作。 但是由于其有很多弊端在 新版本中已经默认弃用。默认使用grpc。

NamingHttpClientProxy 使用 HTTP 协议进行通信,虽然 HTTP 是一种简单、通用的协议,但在一些场景下, 存在以下弊端:

  1. 性能问题:HTTP 基于文本,数据传输效率较低;而且每次请求都需要建立连接,对服务器的资源消耗大,当并发量大时,性能可能会下降。
  2. 长连接问题:HTTP 本身不支持长连接,需要通过一些手段(如 Keep-Alive、Websocket 等)来实现,但这样会增加复杂性。
  3. 缺乏实时性:HTTP 是一种请求-响应模型,客户端主动发送请求,服务器才会响应,不适合需要实时更新的场景。 对于需要高并发、低延迟、实时更新、强类型和元数据支持的场景,

NamingHttpClientProxy 已不是最佳选择。所以在新版本中,Nacos 使用 gRPC 替代了 HTTP,以解决这些问题。

下面是NamingHttpClientProxy 类的部分源码:

可以看到,NamingHttpClientProxy主要使用了HTTP协议进行通信,通过发送不同的HTTP请求(例如GET、POST和DELETE)实现不同的操作。同时,它还负责将HTTP响应的JSON字符串解析为Java对象,方便后续的处理。

需要注意的是,这个类并不负责维护和Nacos服务器的连接,而只是发送HTTP请求。连接的维护由其他部分的代码负责。

public class NamingHttpClientProxy {
    ...
    public void registerService(String serviceName, Instance instance) throws NacosException {
        Map<String, String> params = ...; // 构建请求参数
        // 向Nacos服务器发送注册服务的请求
        reqAPI(UtilAndComs.namingServletPath + "/instance", headers, params, "POST");
    }
    ...
    public void deregisterService(String serviceName, Instance instance) throws NacosException {
        Map<String, String> params = ...; // 构建请求参数
        // 向Nacos服务器发送注销服务的请求
        reqAPI(UtilAndComs.namingServletPath + "/instance", headers, params, "DELETE");
    }
    ...
    public ServiceInfo queryInstancesOfService(String serviceName, ...) throws NacosException {
        Map<String, String> params = ...; // 构建请求参数
        // 向Nacos服务器发送查询服务的请求
        String result = reqAPI(UtilAndComs.namingServletPath + "/instance/list", headers, params);
        // 解析返回的JSON字符串为ServiceInfo对象
        return JacksonUtils.toObj(result, ServiceInfo.class);
    }
    ...
}
方法 callServer
 public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {
    long start = System.currentTimeMillis(); // 记录请求开始的时间
    long end = 0;
    String namespace = params.get(CommonParams.NAMESPACE_ID); // 从参数中获取命名空间
    String group = params.get(CommonParams.GROUP_NAME); // 从参数中获取分组名
    String serviceName = params.get(CommonParams.SERVICE_NAME); // 从参数中获取服务名
    params.putAll(getSecurityHeaders(namespace, group, serviceName)); // 添加安全头
    Header header = NamingHttpUtil.builderHeader();  
    
    // 构建请求 URL
    String url;
    if (curServer.startsWith(HTTPS_PREFIX) || curServer.startsWith(HTTP_PREFIX)) {
        url = curServer + api;
    } else {
        if (!InternetAddressUtil.containsPort(curServer)) {
            curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
        }
        url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
    }
    
    try {
        // 发送 HTTP 请求,并获取响应结果
        HttpRestResult<String> restResult = nacosRestTemplate
                .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
        end = System.currentTimeMillis();  
        
        // 记录请求的指标
        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
                .observe(end - start);
        
        // 根据响应结果处理并返回结果
        if (restResult.ok()) {
            return restResult.getData();
        }
        if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
            return StringUtils.EMPTY;
        }
        throw new NacosException(restResult.getCode(), restResult.getMessage());
    } catch (NacosException e) {
        NAMING_LOGGER.error("[NA] failed to request", e);
        throw e;
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to request", e);
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }
}

    

4. 心跳机制

服务提供者通过定期发送心跳请求来告知注册中心自身的健康状态。注册中心根据心跳信息判断服务提供者的可用性,并及时更新注册表。

nacos v1.x 版本

使用http 请求发送心跳导致很多问题。具体源码在BeatReactor

在 Nacos 1.x 中,Nacos 客户端周期性地向 Nacos 服务器发送心跳,以保持服务实例的存活状态。这个心跳机制主要通过 com.alibaba.nacos.client.naming.beat.BeatReactor 类实现。

当 Nacos 客户端注册服务实例时,BeatReactor 会为该服务实例创建一个心跳任务,并将这个任务加入到定时任务队列中。这个任务会周期性地向 Nacos 服务器发送心跳。

具体的代码如下:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    SERVICE_BEAT_MAP.putIfAbsent(serviceName, new ConcurrentHashMap<>());
    SERVICE_BEAT_MAP.get(serviceName).put(beatInfo.getIp() + "#" + beatInfo.getPort(), beatInfo);
    beatInfo.setPeriod(beatInfo.getPeriod() > 0 ? beatInfo.getPeriod() : DEFAULT_HEART_BEAT_INTERVAL);
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
}

BeatTask 是一个 Runnable,它的 run 方法会发送心跳:

@Override
public void run() {
    long nextTime = beatInfo.getPeriod();
    try {
        int code = namingProxy.sendBeat(beatInfo, false);
        long interval = switchDomain.getBeatInterval();
        if (code == HttpURLConnection.HTTP_OK) {
            ...
        } else if (code == HttpURLConnection.HTTP_NOT_MODIFIED) {
            ...
        } else if (code == HttpURLConnection.HTTP_FORBIDDEN) {
            ...
        }
        nextTime = interval;
        beatInfo.setPeriod(nextTime);
    } catch (Exception e) {
        LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}", beatInfo, e);
    } finally {
        executorService.schedule(this, nextTime, TimeUnit.MILLISECONDS);
    }
}

可以看到,在 BeatTaskrun 方法中,首先会调用 namingProxy.sendBeat 方法发送心跳,然后根据发送结果决定下一次心跳的间隔时间,最后调度下一次的心跳任务。

 

/**
 * @author harold
 */
public class BeatReactor {

    private ScheduledExecutorService executorService;

    private NamingProxy serverProxy;

    public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();

    public BeatReactor(NamingProxy serverProxy) {
        this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
    }

    public BeatReactor(NamingProxy serverProxy, int threadCount) {
        this.serverProxy = serverProxy;

        executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                return thread;
            }
        });
    }

    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
        executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }

    public void removeBeatInfo(String serviceName, String ip, int port) {
        NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port);
        BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port));
        if (beatInfo == null) {
            return;
        }
        beatInfo.setStopped(true);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }

    private String buildKey(String serviceName, String ip, int port) {
        return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER
            + ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;
    }

    class BeatTask implements Runnable {

        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        @Override
        public void run() {
            if (beatInfo.isStopped()) {
                return;
            }
            long result = serverProxy.sendBeat(beatInfo);
            long nextTime = result > 0 ? result : beatInfo.getPeriod();
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }
}

Nacos 2.x 版本

在 Nacos 2.x 版本中,心跳机制的实现发生了一些改变。在最新的 2.x 版本中,Nacos 客户端会与服务器建立 gRPC 长连接,通过这个连接,服务器可以实时感知到客户端的运行状态,无需客户端周期性地发送心跳。

这种新的设计方式减少了网络请求的数量,提高了系统的性能和稳定性。所以在 Nacos 2.x 的源码中可能找不到心跳相关的代码,因为不再需要客户端主动发送心跳。

并不意味着心跳机制在 Nacos 2.x 中完全消失。实际上,Nacos 服务器仍然会定期检测与每个客户端的 gRPC 长连接的状态,以确定客户端的存活状态。这种被动的“心跳”检测方式,与传统的心跳机制在效果上是一致的。

RpcClient 中的健康检查,是向服务端请求一个HealthCheckRequest对象空数据,也可以把此作为心跳类型标识。
在这里插入图片描述
实际的消息体
在这里插入图片描述
实际的请求体为

metadata {
  type: "HealthCheckRequest"
  clientIp: "172.24.36.36"
}
body {
  value: "{\"headers\":{},\"module\":\"internal\"}"
}

·healthCheck 方法·

    private boolean healthCheck() {
        HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
        if (this.currentConnection == null) {
            return false;
        }
        int reTryTimes = rpcClientConfig.healthCheckRetryTimes();
        while (reTryTimes >= 0) {
            reTryTimes--;
            try {
                Response response = this.currentConnection
                        .request(healthCheckRequest, rpcClientConfig.healthCheckTimeOut());
                // not only check server is ok, also check connection is register.
                return response != null && response.isSuccess();
            } catch (NacosException e) {
                // ignore
            }
        }
        return false;
    }

此处就设计的比较巧妙了,但是这样心跳的间隔目前被硬编码到代码里了。心跳间隔为 rpcClientConfig.connectionKeepAlive() .硬编码在DefaultGrpcClientConfig 里面 默认为5000毫秒。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/216244.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

电子书制作神器!错过等十年

众所周知&#xff0c;随着科技的飞速发展&#xff0c;电子书已成为越来越多人的首选阅读方式。但制作电子书并不费力&#xff0c;一个制作电子书的神器就能解决这些问题。 那这款神器究竟有何魅力&#xff1f;它能帮助我们制作出怎样的电子书&#xff1f; 首先&#xff0c;这款…

PyQt6 QFontComboBox字体组合框控件

​锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计35条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话…

Spring 声明式事务

Spring 声明式事务 1.Spring 事务管理概述1.1 事务管理的重要性1.2 Spring事务管理的两种方式1.2.1 编程式事务管理1.2.2 声明式事务管理 1.3 为什么选择声明式事务管理 2. 声明式事务管理2.1 基本用法2.2 常用属性2.2.1 propagation&#xff08;传播行为&#xff09;2.2.2 iso…

Python代码编译并生成Docker镜像

Python代码编译并生成Docker镜像 前言 实际python项目交付时往往有针对关键代码进行保护的需求&#xff0c;本文介绍了一种简单可行的方案&#xff1a;1. 在Linux系统上先将 .py 文件编译为 .so 文件&#xff0c;2. 将整个项目打包成Docker镜像&#xff08;解决 .so 文件的环…

业务场景中Hive解析Json常用案例

业务场景中Hive解析Json常用案例 json在线工具 json格式转换在线工具 https://tool.lu/json/format格式互转&#xff1a; // 格式化可以合并整行显示 {"name":"John Doe","age":35,"email":"johnexample.com"}// 格式化…

二进制动态插桩工具intel PIN的学习笔记

前言 最近两周为了课程汇报学习了intel PIN这个动态插桩&#xff08;dynamic instrument&#xff09;工具&#xff0c;总体的学习感受还是挺累的。一方面&#xff0c;这个方向比较小众&#xff0c;相关的二手资料比较少&#xff0c;能参考的也就只有官方手册这种一手资料&…

分类预测 | Matlab实现NGO-KELM北方苍鹰算法优化核极限学习机分类预测

分类预测 | Matlab实现NGO-KELM北方苍鹰算法优化核极限学习机分类预测 目录 分类预测 | Matlab实现NGO-KELM北方苍鹰算法优化核极限学习机分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现NGO-KELM北方苍鹰算法优化核极限学习机分类预测&#xff08;完…

深入学习锁--Lock各种使用方法

一、什么是Lock Lock是一个接口,通常所说的可重入锁是指Lock的一个实现子类ReentrantLock 二、Lock实现步骤&#xff1a; ①创建锁对象Lock lock new ReentrantLock(); ②加锁lock.lock(); ③释放锁lock.unlock(); import java.util.concurrent.locks.Lock; import java.util…

智安网络|语音识别技术:从历史到现状与未来展望

语音识别技术是一种将语音信号转化为可识别的文本或命令的技术&#xff0c;近年来得到了广泛应用和关注。 一. 语音识别的发展现状 1.历史发展 语音识别技术的起源可以追溯到20世纪50年代&#xff0c;但直到近年来取得了显著的突破和进展。随着计算机性能的提升和深度学习算法…

nn.AdaptiveAvgPool2d(output_size)输入和输出怎么回事?

前言 nn.AdaptiveAvgPool2d(output_size) 函数作用&#xff1a;自适应进行平均池化。不用管输入、stride、padding&#xff0c;函数参数只有输出大小&#xff0c;其他的这个函数帮你搞定。 问题就是&#xff0c;我想知道他是咋搞定的&#xff1f; 1 函数的使用 先把例子摆上…

认识Docker

Docker 是世界领先的软件容器平台&#xff0c;所以想要搞懂 Docker 的概念我们必须先从容器开始说起。 1.1 什么是容器? 先来看看容器较为官方 一句话概括容器&#xff1a;容器就是将软件打包成标准化单元&#xff0c;以用于开发、交付和部署。 容器镜像是轻量的、可执行的…

openEuler 20.03 (LTS-SP2) aarch64 cephadm 部署ceph18.2.0【1】离线部署 准备基础环境

准备3台虚拟机服务器(均可访问公网) 10.2.1.176 &#xff08;作为操作机&#xff09; 10.2.1.191 10.2.1.219 安装基础工具 yum install -y vim 配置hosts 编辑/etc/hosts&#xff0c;添加 10.2.1.176 ceph-176 10.2.1.191 ceph-191 10.2.1.219 ceph-219 配置免密登录…

Nginx 简单入门操作

前言:之前的文章有些过就不罗嗦了。 Nginx 基础内容 是什么? Nginx 是一个轻量级的 HTTP 服务器,采用事件驱动、异步非阻塞处理方式的服务器,它具有极好的 IO 性能,常用于 HTTP服务器(包含动静分离)、正向代理、反向代理、负载均衡 等等. Nginx 和 Node.js 在很多方…

harmonyOS学习笔记之stateStyles

stateStyles:多态样式 stateStyles可以依据组件的内部状态的不同,设置不同的样式 stateStyles是属性方法,可以根据状态来设置样式,类似于css伪类,但是语法不一样,ArkUI提供了四种状态: focused:获焦态 normal:正常态 pressed:按压态 disable:不可用态例如: Entry Component …

【云原生系列】Kubernetes知识点

目录 概念 基础架构 单master节点 多master节点 组件 Master节点核心组件 其他组件 请求发送流程 插件 核心资源 调度资源 Pod 创建pod组件间调用流程 pod生命周期&#xff1a; 初始化容器 镜像拉取策略 重启策略 钩子函数 探针 探针的实现方式 DownwardAP…

文档保密不漏泄:上海迅软DSE提升企业效率的文档管理利器大揭秘!

你是否面临这些问题&#xff1f;公司的电子文档繁多且分散&#xff0c;无法统一管理&#xff1b;员工粗心误删重要文件&#xff0c;导致文件无法找回&#xff1b;硬盘驱动损坏&#xff0c;数据丢失无法恢复&#xff1b;办公电脑丢失&#xff0c;数据不但找不回来&#xff0c;重…

idea连接mysql详细讲解

IDEA连接mysql又报错&#xff01;Server returns invalid timezone. Go to Advanced tab and set serverTimezone prope 前进的道路充满荆棘。 错误界面 IDEA连接mysql&#xff0c;地址&#xff0c;用户名&#xff0c;密码&#xff0c;数据库名&#xff0c;全都配置好了&…

gitlab高级功能之mirroring - push mirroring(一)

今天给大家介绍一个gitlab很高级也是非常有用的功能 - gitlab的mirroring&#xff0c;你可以将仓库镜像到外部或从外部镜像仓库过来&#xff0c;从而可以实现分支、标签和提交的自动同步。 文章目录 1. mirroring的实现方式2. push mirroring2.1 简介2.2 说明 3. 配置推送镜像3…

15、pytest的fixture调用fixture

官方实例 # content of test_append.py import pytest# Arrange pytest.fixture def first_entry():return "a"# Arrange pytest.fixture def order(first_entry):return [first_entry]def test_string(order):# Actorder.append("b")# Assertassert orde…

车间工人个人简历15篇

在车间工人职位的求职过程中想脱颖而出&#xff0c;获得心仪的工作机会&#xff0c;参考这15篇精选的车间工人简历案例&#xff0c;无论您是初学者还是经验丰富的工人&#xff0c;这些专业简历将为您提供灵感和实用建议。让您的简历吸引HR的关注&#xff0c;轻松斩获心仪职位。…