Nacos源码解读07——集群数据同步

Distro协议背景

Distro 协议是 Nacos 社区自研的⼀种 AP 分布式协议,是面向临时实例设计的⼀种分布式协议,
其保证了在某些 Nacos 节点宕机后,整个临时实例处理系统依旧可以正常工作。作为⼀种有状态
的中间件应用的内嵌协议,Distro 保证了各个 Nacos 节点对于海量注册请求的统⼀协调和存储。

设计思想

1.Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
2. 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据⼀致性。
3. 每个节点独立处理读请求,及时从本地发出响应。

原理

数据初始化

新加入的 Distro 节点会进行全量数据拉取。具体操作是轮询所有的 Distro 节点,通过向其他的机
器发送请求拉取全量数据。
在这里插入图片描述
在全量拉取操作完成之后,Nacos 的每台机器上都维护了当前的所有注册上来的非持久化实例数
据。

数据校验

在 Distro 集群启动之后,各台机器之间会定期的发送心跳。心跳信息主要为各个机器上的所有数据
的元信息(之所以使用元信息,是因为需要保证网络中数据传输的量级维持在⼀个较低水平)。这
种数据校验会以心跳的形式进行,即每台机器在固定时间间隔会向其他机器发起⼀次数据校验请求。
在这里插入图片描述
⼀旦在数据校验过程中,某台机器发现其他机器上的数据与本地数据不⼀致,则会发起⼀次全量拉
取请求,将数据补齐。

写操作

对于⼀个已经启动完成的 Distro 集群,在⼀次客户端发起写操作的流程中,当注册非持久化的实例
的写请求打到某台 Nacos 服务器时,Distro 集群处理的流程图如下
在这里插入图片描述
整个步骤包括几个部分(图中从上到下顺序):
 前置的 Filter 拦截请求,并根据请求中包含的 IP 和 port 信息计算其所属的 Distro 责任节点,
并将该请求转发到所属的 Distro 责任节点上。
 责任节点上的 Controller 将写请求进行解析。
 Distro 协议定期执行 Sync 任务,将本机所负责的所有的实例信息同步到其他节点上。

读操作

由于每台机器上都存放了全量数据,因此在每⼀次读操作中,Distro 机器会直接从本地拉取数据。
快速响应。
在这里插入图片描述
这种机制保证了 Distro 协议可以作为⼀种 AP 协议,对于读操作都进行及时的响应。在网络分区
的情况下,对于所有的读操作也能够正常返回;当网络恢复时,各个 Distro 节点会把各数据分片的
数据进行合并恢复。

Distro总结

Distro 协议是 Nacos 对于临时实例数据开发的⼀致性协议。其数据存储在缓存中,并且会在启动
时进行全量数据同步,并定期进行数据校验
在 Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求。所有的 Distro 协议的请
求场景主要分为三种情况:

  1. 当该节点接收到属于该节点负责的实例的写请求时,直接写入。
  2. 当该节点接收到不属于该节点负责的实例的写请求时,将在集群内部路由,转发给对应的节点,
    从而完成读写。
  3. 当该节点接收到任何读请求时,都直接在本机查询并返回(因为所有实例都被同步到了每台机
    器上)。
    Distro 协议作为 Nacos 的内嵌临时实例⼀致性协议,保证了在分布式环境下每个节点上面的服务
    信息的状态都能够及时地通知其他节点,可以维持数十万量级服务实例的存储和⼀致性。

Distro寻址

什么是Distro寻址

寻址是指当检测到Naocs集群节点变化时能后及时更新节点信息

寻址模式

Nacos支持两种寻址模式分别为「文件寻址」和「地址服务器寻址」
文件寻址

默认为文件寻址,可以通过参数「nacos.core.member.lookup.type」设置取值为「file」或者「address-server」

文件寻址路径默认为 「${user.home}/nacos/conf/cluster.conf」

文件寻址cluster.conf配置文件的内容格式为「ip1:port,ip2:port」

地址寻址

地址服务器寻址默认为:http://jmenv.tbsite.net:8080/serverlist;其中域名、端口、url均可自定义

检测到集群节点变更时会更新缓存并发布MembersChangeEvent事件

为防止新节点没有初始化好,当检测到新节点加入时先设置该节点状态为DOWN,该节点不参与通信

过几秒通过节点之间通信将已初始化的新节点状态由DOWN设置为UP,该节点正式参与通信

Distro启动

DistroClientComponentRegistry和DistroHttpRegistry会在
Bean构造完成之后会执行doRegister方法进行Distro的注册

    // DistroClientComponentRegistry
    @PostConstruct
    public void doRegister() {
        // 创建distro处理器
        DistroClientDataProcessor dataProcessor = new DistroClientDataProcessor(clientManager, distroProtocol,
                upgradeJudgement);
        // 创建distro传输代理对象
        DistroTransportAgent transportAgent = new DistroClientTransportAgent(clusterRpcClientProxy,
                serverMemberManager);
        // 创建失败处理器,该处理器主要是添加失败重试任务
        DistroClientTaskFailedHandler taskFailedHandler = new DistroClientTaskFailedHandler(taskEngineHolder);
        // 注册Nacos:Naming:v2:ClientData类型数据的数据仓库实现
        componentHolder.registerDataStorage(DistroClientDataProcessor.TYPE, dataProcessor);
        // 注册Nacos:Naming:v2:ClientData类型的DistroData数据处理器
        componentHolder.registerDataProcessor(dataProcessor);
        // 注册Nacos:Naming:v2:ClientData类型数据的数据传输代理对象实现
        componentHolder.registerTransportAgent(DistroClientDataProcessor.TYPE, transportAgent);
        // 注册Nacos:Naming:v2:ClientData类型的失败任务处理器
        componentHolder.registerFailedTaskHandler(DistroClientDataProcessor.TYPE, taskFailedHandler);
    }
 
 
    // DistroHttpRegistry
    @PostConstruct
    public void doRegister() {
        // 创建distro数据存储对象
        componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
                new DistroDataStorageImpl(dataStore, distroMapper));
        // 创建distro传输代理对象http
        componentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent(memberManager));
        // 创建失败处理器,该处理器主要是添加失败重试任务
        componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
                new DistroHttpCombinedKeyTaskFailedHandler(taskEngineHolder));
        // 添加distro的http延时任务处理器
        taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
                new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));
        // 注册数据处理器
        componentHolder.registerDataProcessor(consistencyService);
    }

DistroTaskEngineHolder

Distro的任务执行引擎他去做Distro的执行
这个类中有一个延时执行引擎和一个立即执行引擎,在创建类时会给延时执行引擎设置一个默认的任务处理器。DistroHttpRegistry中会注册一个任务处理器,则v1版本的话不会使用默认的任务处理器。

@Component
public class DistroTaskEngineHolder {
    
    private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
    
    private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();
    
    public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
        // 延时任务引擎设置默认任务处理器
        DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
        delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
    }
    
    public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
        return delayTaskExecuteEngine;
    }
    
    public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
        return executeWorkersManager;
    }
    public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
        this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
    }
}

DistroProtocol 加了@Component注解所以在bean实例化的时候会调用构造方法

    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
            DistroTaskEngineHolder distroTaskEngineHolder) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        //启动Distro任务
        startDistroTask();
    }
    private void startDistroTask() {
      // 判断是否是单机模式,如果是单机模式,不需要进行同步 直接设置初始化完成
        if (EnvUtil.getStandaloneMode()) {
            isInitialized = true;
            return;
        }
        // 开启验证任务
        startVerifyTask();
        // 开始加载全量拉取distro数据快照
        startLoadTask();
    }

Distro 定时校验同步节点数据

DistroProtocol

    private void startVerifyTask() {
        //开启一个定时线程池 定时五秒执行执行DistroVerifyTimedTask的run方法 去做校验同步
        GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
                        distroTaskEngineHolder.getExecuteWorkersManager()),
                DistroConfig.getInstance().getVerifyIntervalMillis());
    }

DistroVerifyTimedTask

    @Override
    public void run() {
        try {
          //获取集群其他节点
            List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("server list is: {}", targetServer);
            }
            //遍历数据类型  在Nacos server启动时初始化时两种类型HTTP和gRPC
            for (String each : distroComponentHolder.getDataStorageTypes()) {
                // 对dataStorage内的数据进行验证
                verifyForDataStorage(each, targetServer);
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
        }
    }
private void verifyForDataStorage(String type, List<Member> targetServer) {
        // 根据类型拿到对应的数据存储器  这里以GRPC为例
        DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
        // 判断该存储器有没有初始化完成(在启动时会全量拉取数据,拉去完则会设置FinishInitial为true)
        if (!dataStorage.isFinishInitial()) {
            Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
                    dataStorage.getClass().getSimpleName());
            return;
        }
        // 拿到要校验的distro数据集合
        List<DistroData> verifyData = dataStorage.getVerifyData();
        if (null == verifyData || verifyData.isEmpty()) {
            return;
        }
        for (Member member : targetServer) {
            // 获取到对应的传输代理http or rpc
            DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
            if (null == agent) {
                continue;
            }
            // 获取到distro传输代理对象,依次添加任务到立即执行引擎
            executeTaskExecuteEngine.addTask(member.getAddress() + type,
                    new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
        }
    }
@Override
public List<DistroData> getVerifyData() {
    List<DistroData> result = new LinkedList<>(); // 一组DistroData
    for (String each : clientManager.allClientId()) {
        Client client = clientManager.getClient(each);
        if (null == client || !client.isEphemeral()) { // 无效client或者非临时节点
            continue;
        }
        // 判断client是否为本几点负责的逻辑为ClientManagerDelegate#isResponsibleClient。即:属于ConnectionBasedClient并且isNative为true表示该client是直连到该节点的。
        if (clientManager.isResponsibleClient(client)) {
            // 构造Verify Data 主要信息为clientId,还有一个版本信息作为保留字段,目前都是0。
            DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            DistroData data = new DistroData(distroKey,
                    ApplicationUtils.getBean(Serializer.class).serialize(verifyData)); // 序列化校验数据
            data.setType(DataOperation.VERIFY);
            result.add(data);
        }
    }
    return result;
}

DistroVerifyExecuteTask

    @Override
    public void run() {
        //遍历要校验的distro数据集合
        for (DistroData each : verifyData) {
            try {
                 // // 判断是否支持回调传输数据  grpc支持回调
                if (transportAgent.supportCallbackTransport()) {
                 // 发送验证数据带回调
                    doSyncVerifyDataWithCallback(each);
                } else {
                 // 发送验证数据
                    doSyncVerifyData(each);
                }
            } catch (Exception e) {
                Loggers.DISTRO
                        .error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);
            }
        }
    }
    private void doSyncVerifyData(DistroData data) {
        transportAgent.syncVerifyData(data, targetServer);
    }
    @Override
    public boolean syncVerifyData(DistroData verifyData, String targetServer) {
        //判断是否存在目标服务器
        if (isNoExistTarget(targetServer)) {
            return true;
        }
        // replace target server as self server so that can callback.
        verifyData.getDistroKey().setTargetServer(memberManager.getSelf().getAddress());
        //创建请求
        DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
        //获取要发送请求的成员
        Member member = memberManager.find(targetServer);
        //节点状态检查需UP状态,即:可通信状态
        if (checkTargetServerStatusUnhealthy(member)) {
            Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy", targetServer);
            return false;
        }
        try {
            //发送请求
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            return checkResponse(response);
        } catch (NacosException e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] Verify distro data failed! ", e);
        }
        return false;
    }

发送RPC请求

    public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {
        RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
        if (client != null) {
            return client.request(request, timeoutMills);
        } else {
            throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
        }
    }

客户端处理校验请求

DistroDataRequestHandler

    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                //校验请求 
                case VERIFY:
                    return handleVerify(request.getDistroData(), meta);
                 ......
                default:
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
    private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
        DistroDataResponse result = new DistroDataResponse();
        //如果缓存存在client则校验成功,刷新client保鲜时间,否则校验失败。
        if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
            result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
        }
        return result;
    }

public boolean onVerify(DistroData distroData, String sourceAddress) {
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),
                    distroData.getDistroKey());
        }
        String resourceType = distroData.getDistroKey().getResourceType();
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
            return false;
        }
        // 使用DistroData处理器来处理校验请求
        return dataProcessor.processVerifyData(distroData, sourceAddress);
    }

DistroClientDataProcessor

    @Override
    public boolean processVerifyData(DistroData distroData, String sourceAddress) {
        //获取Distro客户端校验信息
        DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class)
                .deserialize(distroData.getContent(), DistroClientVerifyInfo.class);
         //根据clientId来验证不同的客户端       
        if (clientManager.verifyClient(verifyData.getClientId())) {
            return true;
        }
        Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);
        return false;
    }

ConnectionBasedClientManager

    @Override
    public boolean verifyClient(String clientId) {
        ConnectionBasedClient client = clients.get(clientId);
         // 如果已经存在,则更新最近一次有效连接时间
        if (null != client) {
            client.setLastRenewTime();
            return true;
        }
        //如果不存在返回false
        return false;
    }

Distro定时校验总结

节点之间发送校验数据是在全量同步后进行的;发送校验的频率默认为5秒钟一次;校验数据包括clientId和version,其中version为保留字段当前为0;接受到校验数据后如果缓存中存在该client表示校验成功,同时更新保鲜时间,否则校验失败

节点刚加入全量拉取数据快照

启动拉取任务

    private void startLoadTask() {
        DistroCallback loadCallback = new DistroCallback() {
            @Override
            public void onSuccess() {
                isInitialized = true;
            }
            
            @Override
            public void onFailed(Throwable throwable) {
                isInitialized = false;
            }
        };
        GlobalExecutor.submitLoadDataTask(
                new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
    }

DistroLoadDataTask

数据初始化读取任务

    @Override
    public void run() {
        try {
         //从集群中其他节点全量加载数据
            load();
            //如果没有加载成功延迟30秒钟重新执行一次,可以通过参数「nacos.core.protocol.distro.data.load_retry_delay_ms」指定
            if (!checkCompleted()) {
                GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
            } else {
                loadCallback.onSuccess();
                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
            }
        } catch (Exception e) {
            loadCallback.onFailed(e);
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
        }
    }

从其他节点拉取全量数据

    private void load() throws Exception {
        while (memberManager.allMembersWithoutSelf().isEmpty()) {
            Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
            TimeUnit.SECONDS.sleep(1);
        }
        while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
            Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
            TimeUnit.SECONDS.sleep(1);
        }
        //不同的数据类型缓存快照,此处有gRPC和http两类数据类型
        for (String each : distroComponentHolder.getDataStorageTypes()) {
            if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
                loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
            }
        }
    }
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == transportAgent || null == dataProcessor) {
        return false;
    }
    //获取集群中除了本节点的其他节点,循环重试获取快照,直到有成功节点返回快照,成功后设置状态状态完成初始化
    for (Member each : memberManager.allMembersWithoutSelf()) { 
        try {
            //获取集群其他节点数据
           DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
           //处理数据同步到本地
            boolean result = dataProcessor.processSnapshot(distroData);
            if (result) {
                distroComponentHolder.findDataStorage(resourceType).finishInitial(); // 设置为完成初始化
                return true;
            }
        } catch (Exception e) {
           
        }
    }
    return false;
}

发起读取数据快照请求

@Override
public DistroData getDatumSnapshot(String targetServer) {
    Member member = memberManager.find(targetServer);
    if (checkTargetServerStatusUnhealthy(member)) {
        throw new DistroException(
                String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
    }
    DistroDataRequest request = new DistroDataRequest();
   // 设置请求操作为SNAPSHOT
    request.setDataOperation(DataOperation.SNAPSHOT); 
    try {
       // 发起请求快照数据
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        if (checkResponse(response)) {
            return ((DistroDataResponse) response).getDistroData();
        } else {
            throw new DistroException(
                    String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
                            targetServer, response.getErrorCode(), response.getMessage()));
        }
    } catch (NacosException e) {
        throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
    }
}

服务端接收读取快照请求

DistroDataRequestHandler

    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                   ......
                case SNAPSHOT:
                    return handleSnapshot();
                   ......
                default:
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
    private DistroDataResponse handleSnapshot() {
        DistroDataResponse result = new DistroDataResponse();
        //读取快照数据
        DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
        result.setDistroData(distroData);
        return result;
    }
@Override
public DistroData getDatumSnapshot() {
    List<ClientSyncData> datum = new LinkedList<>();
    // 把本节点的所有client数据全部封装
    for (String each : clientManager.allClientId()) {
       // 获取对应客户端
        Client client = clientManager.getClient(each);
        if (null == client || !client.isEphemeral()) {
            continue;
        }
        // 针对客户端生成数据
        datum.add(client.generateSyncData());
    }
     // 封装响应数据
    ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
    snapshot.setClientSyncDataList(datum);
    // 将数据序列化成json字符串 再转成字节数据
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot); 
    // 返回响应数据
    return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}

生成服务数据

client数据信息,命名空间、分组名称、服务名称、节点Instance信息(IP、端口等等)。

public ClientSyncData generateSyncData() {
    List<String> namespaces = new LinkedList<>();
    List<String> groupNames = new LinkedList<>();
    List<String> serviceNames = new LinkedList<>();
    List<InstancePublishInfo> instances = new LinkedList<>();
    for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
        namespaces.add(entry.getKey().getNamespace());
        groupNames.add(entry.getKey().getGroup());
        serviceNames.add(entry.getKey().getName());
        instances.add(entry.getValue());
    }
    return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

总结

集群中每个节点都拥有所有的快照数据;在节点启动时会从集群中其他节点中的一个节点同步快照数据并缓存在Map中;缓存的数据类型分类两类分别为HTTP和gRPC;具体数据即客户端注册节点信息含命名空间、分组名称、服务名称、节点Instance信息等。

增量数据同步

当某台机器发现其他机器上的数据与本地数据不⼀致 会触发一次

校验数据失败处理

    @Override
    public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
        if (isNoExistTarget(targetServer)) {
            callback.onSuccess();
            return;
        }
        DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
        Member member = memberManager.find(targetServer);
        try {
            //注意这里有一个DistroVerifyCallbackWrapper的回调类当调用结束之后会调用到onResponse方法
            DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
                    verifyData.getDistroKey().getResourceKey(), callback, member);
            clusterRpcClientProxy.asyncRequest(member, request, wrapper);
        } catch (NacosException nacosException) {
            callback.onFailed(nacosException);
        }
    }
    @Override
        public void onResponse(Response response) {
            
            if (checkResponse(response)) {
                NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
                distroCallback.onSuccess();
            } else {
             //校验失败处理
                Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
                //发送一个校验失败的事件 
                NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));
                //更新为失败 
                NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
                distroCallback.onFailed(null);
            }
        }

DistroClientDataProcessor 订阅了
DistroClientDataProcessor会监听到ClientVerifyFailedEvent事件

    @Override
    public void onEvent(Event event) {
        // 必须是集群
        if (EnvUtil.getStandaloneMode()) {
            return;
        }
        //必须是grpc请求
        if (!upgradeJudgement.isUseGrpcFeatures()) {
            return;
        }
        //校验失败事件
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
        //同步事件
            syncToAllServer((ClientEvent) event);
        }
    }

校验失败任务客户端处理

private void syncToVerifyFailedServer(ClientEvent.ClientVerifyFailedEvent event) {
        // 判断客户端是否存在
        Client client = clientManager.getClient(event.getClientId());
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        // 如果存在,则开始进行同步
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        // 这里也是发送的 action为add的事件 注意后续的处理流程 他们最后和syncToAllServer方法一样最后都由DistroDelayTaskProcessor处理
        distroProtocol.syncToTarget(distroKey, DataOperation.ADD, event.getTargetServer(), 0L);

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                targetServer);
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        // 添加到延时任务执行引擎来执行     
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
        }
    }

同步集群的其他节点

private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
       // 当客户端断开连接事件ClientDisconnectEvent时,向其他节点同步DELETE操作
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
       // 当客户端变更事件ClientChangedEvent时,向其他节点同步CHANGE操作
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}
    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        //遍历所有需要同步的节点
        for (Member each : memberManager.allMembersWithoutSelf()) {
            //执行数据增量更新
            syncToTarget(distroKey, action, each.getAddress(), delay);
        }
    }
    /**
     * Start to sync to target server.
     *
     * @param distroKey    distro key of sync data
     * @param action       the action of data operation
     * @param targetServer target server
     * @param delay        delay time for sync
     */
    public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                targetServer);
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        //添加任务让执行器去做执行
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
        }
    }

后续看任务队列执行同步数据操作

任务队列执行同步数据操作

DistroDelayTaskProcessor会去执行处理延时任务 因为之前发的Action是ADD所以会走ADD的逻辑

    @Override
    public boolean process(NacosTask task) {
         // 不处理非延迟任务
        if (!(task instanceof DistroDelayTask)) {
            return true;
        }
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
        switch (distroDelayTask.getAction()) {
            case DELETE:
                DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                return true;
            case CHANGE:
            case ADD:
            // 添加任务到立即执行任务引擎     
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            default:
                return false;
        }
    }

DistroSyncChangeTask继承了AbstractDistroExecuteTask而AbstractDistroExecuteTask又继承AbstractExecuteTask ,AbstractExecuteTask 内容实现了Runnable所以会执行AbstractDistroExecuteTask的run方法

    @Override
    public void run() {
        String type = getDistroKey().getResourceType();
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
        if (null == transportAgent) {
            Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
            return;
        }
        Loggers.DISTRO.info("[DISTRO-START] {}", toString());
         grpc支持回调
        if (transportAgent.supportCallbackTransport()) {
            doExecuteWithCallback(new DistroExecuteCallback());
        } else {
            executeDistroTask();
        }
    }
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        //获取DistroData 
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        //执行同步数据
        getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer(), callback);
    }

DistroClientTransportAgent

@Override
public boolean syncData(DistroData data, String targetServer) {
    if (isNoExistTarget(targetServer)) {
        return true;
    }
    // 构造请求数据并设置数据类型
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    // 查找目标节点缓存数据
    Member member = memberManager.find(targetServer);
    // 节点状态检查需UP状态,即:可通信状态
    if (checkTargetServerStatusUnhealthy(member)) {
        Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
        return false;
    }
    try {
        // 向目标节点发送数据
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        return checkResponse(response);
    } catch (NacosException e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
    }
    return false;
}

客户端节点处理增量同步请求

DistroDataRequestHandler ADD、CHANGE、DELETE都是由handleSyncData方法进行处理

    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                ......
                case ADD:
                case CHANGE:
                case DELETE:
                    return handleSyncData(request.getDistroData());
                ...... 
                default:
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
private DistroDataResponse handleSyncData(DistroData distroData) {
    DistroDataResponse result = new DistroDataResponse();
    if (!distroProtocol.onReceive(distroData)) {
        result.setErrorCode(ResponseCode.FAIL.getCode());
        result.setMessage("[DISTRO-FAILED] distro data handle failed");
    }
    return result;
}
    public boolean onReceive(DistroData distroData) {
        Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(),
                distroData.getDistroKey());
        String resourceType = distroData.getDistroKey().getResourceType();
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
            return false;
        }
        //同步信息
        return dataProcessor.processData(distroData);
    }

DistroClientDataProcessor

    @Override
    public boolean processData(DistroData distroData) {
        switch (distroData.getType()) {
            case ADD:
            case CHANGE:
                ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                        .deserialize(distroData.getContent(), ClientSyncData.class);
                //将同步过来的Client信息进行缓存        
                handlerClientSyncData(clientSyncData);
                return true;
                //响应删除操作,从clients缓存中移除。
            case DELETE:
                String deleteClientId = distroData.getDistroKey().getResourceKey();
                Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
                clientManager.clientDisconnected(deleteClientId);
                return true;
            default:
                return false;
        }
    }

移除信息

@Override
public boolean clientDisconnected(String clientId) {
    Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
    ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
    return true;
}

同步信息

    private void handlerClientSyncData(ClientSyncData clientSyncData) {
        Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
        //Client信息缓存 需要的是从其他节点通过过来的Client信息,ConnectionBasedClient属性isNative为false表示该连接时从其他节点同步过来的;true表示该连接客户端直接连接的。
        clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
        //获取Client信息
        Client client = clientManager.getClient(clientSyncData.getClientId());
        //更新Client的Service以及Instance信息
        upgradeClient(client, clientSyncData);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/228248.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【软件推荐】文本转语音,语音转wav,导入ue5

文字转语音 在线免费文字转语音 - TTSMaker官网 | 马克配音https://ttsmaker.cn/ 文件转换器 语音转wav Convertio — 文件转换器https://convertio.co/zh/

[英语学习][10][Word Power Made Easy]的精读与翻译优化

[序言] 下面这段话, 译者翻译没有太大问题, 就是某些单词上, 跟他理解得不一样. 另外还有一个关键的定语从句, 我认为译者理解不到位, 导致翻译不够通顺. [英文学习的目标] 提升自身的英语水平, 对日后编程技能的提升有很大帮助. 希望大家这次能学到东西, 同时加入我的社区讨…

静态HTTP和动态HTTP的混合使用:最佳实践

在当今的互联网环境中&#xff0c;静态HTTP和动态HTTP各有其优势和局限。静态HTTP具有速度快、安全性高和易于维护的特点&#xff0c;而动态HTTP则能够实现动态交互和处理大量动态数据。为了充分利用两者的优势&#xff0c;越来越多的网站开始采用静态HTTP和动态HTTP混合使用的…

两个分数相加。

输入两个分数&#xff0c;例如3/41/2&#xff0c;输出3/41/25/4。 运行程序时&#xff0c;如下图所示&#xff1a; 输入样例1: 1/61/2输出样例2: 1/61/22/3 #include<stdio.h> int gcd(int a,int b) //求最大公约数&#xff08;Greatest Common Divisor&…

代码随想录第二十七天(一刷C语言)|分发饼干摆动序列最大子数组和

创作目的&#xff1a;为了方便自己后续复习重点&#xff0c;以及养成写博客的习惯。 一、分发饼干 思路&#xff1a;参考carl文档 局部最优就是大饼干喂给胃口大的&#xff0c;充分利用饼干尺寸喂饱一个&#xff0c;全局最优就是喂饱尽可能多的小孩。尝试使用贪心策略&#x…

FastAPI之声明请求参数示例数据

在Pydantic模型中添加额外的JSON模式数据 您可以声明Pydantic模型的示例&#xff0c;这些示例将被添加到生成的JSON模式中。 示例代码 from fastapi import FastAPI from pydantic import BaseModelapp FastAPI()class Item(BaseModel):name: strdescription: str | None …

设计模式——单例模式(Singleton Pattern)

概述 单例模式确保一个类只有一个实例&#xff0c;而且自行实例化并向整个系统提供整个实例&#xff0c;这个类称为单例类&#xff0c;它提供全局访问的方法。单例模式是一种对象创建型模式。单例模式有三个要点&#xff1a;一是某个类只能有一个实例&#xff1b;二是它必须自行…

WebStorm:Mac/Win上强大的JavaScript开发工具

WebStorm是JetBrains公司开发的针对Mac和Windows系统的JavaScript开发工具。它为开发者提供了一站式的代码编辑、调试、测试和版本控制等功能&#xff0c;帮助你更高效地进行Web开发。新版本的WebStorm 2023在性能和用户体验方面都做出了重大改进&#xff0c;让你的JavaScript开…

冰箱镜头除雾解决方案

冰箱镜头除雾解决方案 1.0 雾气产生原因 由于温度和湿度的变化,导致空气中的水汽凝结在镜头上,形成一层细小的水滴,从而影响了镜头的透光性和清晰度。 这种情况跟汽车玻璃、眼镜等物体表面起雾的原理是一样的, 雾的形成条件 (1)湿度–充足的水汽:①水汽输送(向岸风…

【RHCE】openlab搭建web网站

网站需求&#xff1a; 1、基于域名 www.openlab.com 可以访问网站内容为 welcome to openlab!!! 增加映射 [rootlocalhost ~]# vim /etc/hosts 创建网页 [rootlocalhost ~]# mkdir -p /www/openlab [rootlocalhost ~]# echo welcome to openlab > /www/openlab/index.h…

[足式机器人]Part2 Dr. CAN学习笔记-数学基础Ch0-2 特征值与特征向量

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记-数学基础Ch0-2 特征值与特征向量 1. 定义1.1 线性变换1.2 求解特征值&#xff0c;特征向量1.3 应用&#xff1a;对角化矩阵——解耦Decouple 2. Summary 1. 定义 A v ⃗ λ v ⃗ A\vec{v}\lambd…

广播和组播

1. 广播 1.1 知识点 INADDR_ANY代表本机所有地址 常用方法当你将套接字绑定到INADDR_ANY&#xff0c;它会监听所有可用的网络接口&#xff0c;这意味着它将接受来自所有本地IP地址的传入连接或数据包 1.1.1 广播的流程 广播发送端&#xff1a; ----> 添加广播属性 1、建立套…

无需繁琐编程 开启高效数据分析之旅!

不学编程做R统计分析&#xff1a;图形界面R Commander官方手册 R Commander是 R 的图形用户界面&#xff0c;不需要键入命令就可通过熟悉的菜单和对话框来访问 R 统计软件。 R 和 R Commander 均可免费安装于所有常见的操作系统——Windows、Mac OS X 和 Linux/UNIX。 本书作…

DataGrip常见问题

查询语句结果没有输出在output中 进行如下配置 配置后查询结果输出在output中 左侧数据库链接信息导航栏被隐藏 以上导航栏被隐藏&#xff0c;按下图操作调出

自定义TypeHandler 将mysql返回的逗号分隔的String转换到List

sql执行如下&#xff1a; 这里我定义的接受类&#xff1a; 但是这里报了错JSON parse error: Cannot deserialize value of type java.util.ArrayList<java.lang.String>from Object value (token JsonToken.START_OBJECT); nested exception is com.fasterxml.jackson…

第六届“强网”拟态防御国际精英挑战赛 持续引领技术发展新潮流

12月6日&#xff0c;第六届“强网”拟态防御国际精英挑战赛在南京紫金山实验室盛大开幕&#xff0c;来自全球的40支顶尖战队开启了连续72小时的高强度对决。 本届挑战赛由江苏省委网信办指导&#xff0c;南京市委网信办、紫金山实验室、中国通信学会主办&#xff0c;是第三届网…

高精度加法,减法,乘法,除法(下)(C语言)

前言 上一篇博客我们分享了高精度加法&#xff0c;减法,这一期我将为大家讲解高精度乘法和高精度除法。那让我们开始吧&#xff01; 对加法和减法感兴趣的话就点我 文章目录 1&#xff0c;乘法2&#xff0c;除法3&#xff0c;尾声 1&#xff0c;乘法 让我们想想我们平时做数学…

使用高防IP防护有哪些优势

高防IP是针对互联网服务器在遭受大流量的DDoS攻击后导致服务不可用的情况下&#xff0c;推出的付费增值服务&#xff0c;用户可以通过配置高防IP&#xff0c;将攻击流量引流到高防IP&#xff0c;确保源站的稳定可靠。高防IP相当于搭建完转发的服务器。 高防IP有两种接入方式&a…

vue如何解决el-select下拉框显示ID不显示label问题

<template><el-select v-model"searchObj.saleAreaId" change"searchClick" placeholder"请选择" class"inpt1" clearablesize"small"><el-option v-for"item in saleAreaList" :key"item.id…

机器学习实战:预测波士顿房价

前言&#xff1a; Hello大家好&#xff0c;我是Dream。 今天来学习一下机器学习中一个非常经典的案例&#xff1a;预测波士顿房价&#xff0c;在此过程中也会补充很多重要的知识点&#xff0c;欢迎大家一起前来探讨学习~ 一、导入数据 在这个项目中&#xff0c;我们利用马萨诸…
最新文章