【Seata源码学习 】 篇二 TM与RM初始化过程

【Seata源码学习 】 篇二 TM与RM初始化过程

1.GlobalTransactionScanner 初始化

GlobalTransactionScanner 实现了InitializingBean 接口,在初始化后将执行自定义的初始化方法

io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet

   @Override
    public void afterPropertiesSet() {
    		//是否禁用了全局事务
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        //初始化客户端
        initClient();
    }

io.seata.spring.annotation.GlobalTransactionScanner#initClient

private void initClient() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Initializing Global Transaction Clients ... ");
        }
        if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
            LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
                    "please change your default configuration as soon as possible " +
                    "and we don't recommend you to use default tx-service-group's value provided by seata",
                    DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
        }
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
        }
        //init TM
        //初始化事务管理器
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }
        //init RM
        //初始化资源管理器
        RMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global Transaction Clients are initialized. ");
        }
        //注册应用上下文关闭回调方法
        registerSpringShutdownHook();

    }

image-20231113213409054

2. 初始化事务管理器 TM

流程图

image-20231114222639955

实例化 TmNettyRemotingClient

io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

 public static void init(String applicationId, String transactionServiceGroup) {
 				//TM进行netty网络通信的客户端
 				// applicationId 当前应用id  transactionServiceGroup 事务分组名称
        TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        tmNettyRemotingClient.init();
    }

首先看下获取实例的方法

io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)

public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
        TmNettyRemotingClient tmNettyRemotingClient = getInstance();
        tmNettyRemotingClient.setApplicationId(applicationId);
        tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
        return tmNettyRemotingClient;
    }

io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance()

    public static TmNettyRemotingClient getInstance() {
    		//双检锁,保证只有一个实例
        if (instance == null) {
            synchronized (TmNettyRemotingClient.class) {
                if (instance == null) {
                		//netty的配置
                    NettyClientConfig nettyClientConfig = new NettyClientConfig();
                    //消息处理线程池
                    //核心线程和最大线程都是16 没有非核心线程
                    //有界的阻塞队列 容量为200
                    final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                        nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                        KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
                        new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
                            nettyClientConfig.getClientWorkerThreads()),
                        RejectedPolicies.runsOldestTaskPolicy());
                     //创建实例
                    instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
                }
            }
        }
        return instance;
    }

io.seata.core.rpc.netty.TmNettyRemotingClient#TmNettyRemotingClient

private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,
                                  EventExecutorGroup eventExecutorGroup,
                                  ThreadPoolExecutor messageExecutor) {
          //调用父类构造器
        super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);
        //基于SPI机制加载鉴权签名组件 AuthSigner
        this.signer = EnhancedServiceLoader.load(AuthSigner.class);
        // set enableClientBatchSendRequest
        // 是否开启了批量发送请求对配置。默认 false
        this.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,
                DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST);
        //监听配置是否有变化
        ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST, new ConfigurationChangeListener() {
            @Override
            public void onChangeEvent(ConfigurationChangeEvent event) {
                String dataId = event.getDataId();
                String newValue = event.getNewValue();
                if (ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST.equals(dataId) && StringUtils.isNotBlank(newValue)) {
                    enableClientBatchSendRequest = Boolean.parseBoolean(newValue);
                }
            }
        });
    }

实例化 AbstractNettyRemotingClient

io.seata.core.rpc.netty.AbstractNettyRemotingClient#AbstractNettyRemotingClient

 public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
                                       ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
        //调用父类构造器 用于处理消息的线程池
        super(messageExecutor);
        //当前事务角色
        this.transactionRole = transactionRole;
        //创建 NettyClientBootstrap 实例 
        clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
        //消息处理器  
        clientBootstrap.setChannelHandlers(new ClientHandler());
        //channel管理器
        clientChannelManager = new NettyClientChannelManager(
            new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
    }

实例化 AbstractNettyRemoting

io.seata.core.rpc.netty.AbstractNettyRemoting#AbstractNettyRemoting

  public AbstractNettyRemoting(ThreadPoolExecutor messageExecutor) {
        //设置处理消息的线程池
        this.messageExecutor = messageExecutor;
    }

初始化 TmNettyRemotingClient

回到

io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

创建完成 TmNettyRemotingClient 实例后,调用init方法

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
        TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
        tmNettyRemotingClient.init();
    }
    @Override
    public void init() {
        // registry processor
        //注册请求处理器
        registerProcessor();
        if (initialized.compareAndSet(false, true)) {
            //调用父类初始化方法 
            // 1. 定时重连
            // 2. 超时检测
            super.init();
            //如果事务分组不为空
            if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
                //通过channel管理器建立链接
                getClientChannelManager().reconnect(transactionServiceGroup);
            }
        }
    }

io.seata.core.rpc.netty.TmNettyRemotingClient#registerProcessor

   private void registerProcessor() {
        //根据不同的消息类型,使用不同的消息处理器
        //两个处理器 一种对消息进行处理
        //还有一种是处理心跳
        // 1.registry TC response processor
        ClientOnResponseProcessor onResponseProcessor =
                new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        //注册就是将 消息处理器与线程池封装成一对pair,然后在进一步封装成map,map对key为消息类型,value为封装对pair
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
        // 2.registry heartbeat message processor
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }
 @Override
    public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {
        Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
        this.processorTable.put(requestCode, pair);
    }

初始化 AbstractNettyRemotingClient

io.seata.core.rpc.netty.AbstractNettyRemotingClient#init

    @Override
    public void init() {
        //周期线程池 第一次在60秒后通过连接管理器重新建立链接,之后每10秒重新建立一次链接
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                clientChannelManager.reconnect(getTransactionServiceGroup());
            }
        }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
        if (this.isEnableClientBatchSendRequest()) {
            mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                MAX_MERGE_SEND_THREAD,
                KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
            mergeSendExecutorService.submit(new MergedSendRunnable());
        }
        //启动一个周期线程池,每3秒检查一次请求是否超时
        super.init();
        //启动netty客户端
        clientBootstrap.start();
    }

io.seata.core.rpc.netty.NettyClientBootstrap#start

启动过程中一共设置了4个消息处理器

  1. IdleStateHandler 处理心跳
  2. ProtocolV1Decoder 消息解码
  3. ProtocolV1Encoder 消息编码
  4. ClientHandler 消息处理
 @Override
    public void start() {
        if (this.defaultEventExecutorGroup == null) {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
                new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
                    nettyClientConfig.getClientWorkerThreads()));
        }
        this.bootstrap.group(this.eventLoopGroupWorker).channel(
            nettyClientConfig.getClientChannelClazz()).option(
            ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
            ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
            ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
            nettyClientConfig.getClientSocketRcvBufSize());

        if (nettyClientConfig.enableNative()) {
            if (PlatformDependent.isOsx()) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("client run on macOS");
                }
            } else {
                bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
                    .option(EpollChannelOption.TCP_QUICKACK, true);
            }
        }

        bootstrap.handler(
            new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(
                        new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                            nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                            nettyClientConfig.getChannelMaxAllIdleSeconds()))
                        .addLast(new ProtocolV1Decoder())
                        .addLast(new ProtocolV1Encoder());
                    if (channelHandlers != null) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }
                }
            });

        if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
            LOGGER.info("NettyClientBootstrap has started");
        }
    }

io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler

  @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            processMessage(ctx, (RpcMessage) msg);
        }

io.seata.core.rpc.netty.AbstractNettyRemoting#processMessage

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
        }
        Object body = rpcMessage.getBody();
        //顶层接口 MessageTypeAware
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //根据消息的类型获取不同的RemotingProcessor进行处理
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                //如果线程池不为空,使用线程池执行 前面封装pair时,线程池都是null
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            } finally {
                                MDC.clear();
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                            "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                        if (allowDumpStack) {
                            String name = ManagementFactory.getRuntimeMXBean().getName();
                            String pid = name.split("@")[0];
                            long idx = System.currentTimeMillis();
                            try {
                                String jstackFile = idx + ".log";
                                LOGGER.info("jstack command will dump to " + jstackFile);
                                Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                            } catch (IOException exx) {
                                LOGGER.error(exx.getMessage());
                            }
                            allowDumpStack = false;
                        }
                    }
                } else {
                    //如果消息处理器对应的线程池是空的,则直接处理
                    try {
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }

初始化 AbstractNettyRemoting

io.seata.core.rpc.netty.AbstractNettyRemoting#init

public void init() {
        //每3秒检查一次请求是否超时
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                    MessageFuture future = entry.getValue();
                    if (future.isTimeout()) {
                        futures.remove(entry.getKey());
                        RpcMessage rpcMessage = future.getRequestMessage();
                        future.setResultMessage(new TimeoutException(String
                            .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                        }
                    }
                }

                nowMills = System.currentTimeMillis();
            }
        }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
    }

TM客户端与 TC 建立连接

io.seata.core.rpc.netty.NettyClientChannelManager#reconnect

void reconnect(String transactionServiceGroup) {
        List<String> availList = null;
        try {
            //根据事务分组名称找seata服务端地址列表 默认根据File配置映射关系查找
            //tx-service-group 事务分组名
            //vgroup-mapping.事务分组名=分组seata服务列表名
            //seata.service.grouplist.分组seata服务列表名=seata服务地址
            availList = getAvailServerList(transactionServiceGroup);
        } catch (Exception e) {
            LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
            return;
        }
        if (CollectionUtils.isEmpty(availList)) {
            RegistryService registryService = RegistryFactory.getInstance();
            String clusterName = registryService.getServiceGroup(transactionServiceGroup);
            //如果找不到任何seata server 服务配置列表,抛出异常
            if (StringUtils.isBlank(clusterName)) {
                LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                        ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                        transactionServiceGroup);
                return;
            }

            if (!(registryService instanceof FileRegistryServiceImpl)) {
                LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
            }
            return;
        }
        Set<String> channelAddress = new HashSet<>(availList.size());
        try {
            for (String serverAddress : availList) {
                try {
                    //与所有seata server建立长连接
                    acquireChannel(serverAddress);
                    channelAddress.add(serverAddress);
                } catch (Exception e) {
                    LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
                        serverAddress, e.getMessage(), e);
                }
            }
        } finally {
            if (CollectionUtils.isNotEmpty(channelAddress)) {
                List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());
                for (String address : channelAddress) {
                    String[] array = address.split(":");
                    aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
                }
                RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
            } else {
                RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
            }
        }
    }

3.初始化资源管理器 RM

io.seata.rm.RMClient#init

    public static void init(String applicationId, String transactionServiceGroup) {
        //获取RmNettyRemotingClient实例
        RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        //设置资源管理器
        rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
        //设置资源事务管理器
        rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
        //初始化RmNettyRemotingClient
        rmNettyRemotingClient.init();
    }

实例化 RmNettyRemotingClient

io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)

  public static RmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
        //获取实例,并创建消息处理线程池
        RmNettyRemotingClient rmNettyRemotingClient = getInstance();
        //设置应用id
        rmNettyRemotingClient.setApplicationId(applicationId);
        //设置事务分组名
        rmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
        return rmNettyRemotingClient;
    }

io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance()

public static RmNettyRemotingClient getInstance() {
        //双检锁创建实例 保证单例
        if (instance == null) {
            synchronized (RmNettyRemotingClient.class) {
                if (instance == null) {
                    //netty客户端配置
                    NettyClientConfig nettyClientConfig = new NettyClientConfig();
                    //消息处理线程池
                    final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                        nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                        KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
                        new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(),
                            nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy());
                    instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
                }
            }
        }
        return instance;
    }

RmNettyRemotingClient 及其父类的实例化过程都与TM是一致的,我们可以看下继承关系图

截屏2023-11-16 21.15.23

真正有区别的地方在于TM客户端初始化的过程与RM客户端初始化的过程

初始化 RmNettyRemotingClient

//获取RmNettyRemotingClient实例
        RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        //设置资源管理器
        rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
        //设置资源事务管理器
        rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
        //初始化RmNettyRemotingClient
        rmNettyRemotingClient.init();

io.seata.core.rpc.netty.RmNettyRemotingClient#init

    public void init() {
        // 注册消息处理器
        registerProcessor();
        if (initialized.compareAndSet(false, true)) {
            super.init();

            // Found one or more resources that were registered before initialization
            // 与TC建立连接前 会先判断资源是否存在
            if (resourceManager != null
                    && !resourceManager.getManagedResources().isEmpty()
                    && StringUtils.isNotBlank(transactionServiceGroup)) {
                getClientChannelManager().reconnect(transactionServiceGroup);
            }
        }
    }

io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor

private void registerProcessor() {
        //注册类五种不同的消息处理器
        // 1.registry rm client handle branch commit processor
        // 分支事务提交
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
        // 2.registry rm client handle branch rollback processor
        // 分支事务回滚
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
        // 3.registry rm handler undo log processor
        // 回滚日志处理
        RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
        // 4.registry TC response processor
        // TC响应处理
        ClientOnResponseProcessor onResponseProcessor =
            new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
        // 5.registry heartbeat message processor
        // 心跳信息处理
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

以上就是TM和RM实例化的过程,至于不同的消息处理器的实现我们放到后面去看

4. TM和RM初始化总结

截屏2023-11-16 21.51.27

两者其实过程是一致的,TM客户端对象TMClient主要是实例并初始化TmNettyRemotingClient,RM客户端对象RMClient主要是实例并初始化RmNettyRemotingClient。两者类继承关系如下所示

截屏2023-11-16 21.15.23

NettyRemotingClient 对象主要是在初始化方法中消息处理器并与TC服务端建立长连接,TM与RM注册的消息处理器是不同的,并且RM在与TC建立连接前会先判断数据库资源是否存在。TmNettyRemotingClient与RmNettyRemotingClient都将共同的方法放到抽象父类 AbstractNettyRemotingClient 中 。

父类 AbstractNettyRemotingClient 封装了原生的Netty信息,用于创建Netty客户端对象,并在初始化方法中启动一个周期线程去定期重新发起连接请求

AbstractNettyRemotingClient 的父类 AbstractNettyRemoting 主要是在执行初始化方法时启动 一个周期线程池,每隔3秒检测一次发送的消息集合中是否有消息超时,默认的超时时间为30秒

io.seata.common.DefaultValues#DEFAULT_RPC_RM_REQUEST_TIMEOUT

    long DEFAULT_RPC_RM_REQUEST_TIMEOUT = Duration.ofSeconds(30).toMillis();

我们可以通过设置 transport.rpcRmRequestTimeout (毫秒)去改变这个默认的值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/154352.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++ 模板 (一)

1. 泛型编程 如何实现一个通用的交换函数呢&#xff1f; void Swap(int& left, int& right) { int temp left; left right; right temp; } void Swap(double& left, double& right) { double temp left; left right; right temp; } void Swap(char&…

【152.乘积最大子数组】

目录 一、题目描述二、算法原理三、代码实现 一、题目描述 二、算法原理 三、代码实现 class Solution { public:int maxProduct(vector<int>& nums) {int nnums.size();vector<int> f(n);vector<int> g(n);f[0]g[0]nums[0];int retnums[0];for(int i1;…

如何在聊天记录中实时查找大量的微信群二维码

10-5 如果你有需要从微信里收到的大量信息中实时找到别人发到群里的二维码&#xff0c;那本文非常适合你阅读&#xff0c;因为本文的教程&#xff0c;可以让你在海量的微信消息中&#xff0c;实时地把二维码自动挑出来&#xff0c;并且帮你分类保存。 如果你是做网推的&#…

【AI视野·今日NLP 自然语言处理论文速览 第六十五期】Mon, 30 Oct 2023

AI视野今日CS.NLP 自然语言处理论文速览 Mon, 30 Oct 2023 Totally 67 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computation and Language Papers An Approach to Automatically generating Riddles aiding Concept Attainment Authors Niharika Sri Parasa,…

C#WPF文本转语音实例

本文介绍C#WPF文本转语音实例 实现方法:使用类库(SpeechSynthesizer )实现的。 一、首先是安装程序包。 二、创建项目 需要添加引用using System.Speech.Synthesis; UI界面 <Windowx:Class="TextToSpeechDemo.MainWindow"xmlns="http://schemas.micr…

PostgreSQL基于Citus实现的分布式集群

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

这五个步骤,网络有故障,自己都能解决

网络出现故障了怎么办&#xff1f;大部分弱电人一开始无从下手。 网络故障是弱电工作中最易常见的问题&#xff0c;尤其是我们弱电人经常与网络打交道&#xff0c;那么如何才能进行网络排查&#xff0c;快速解决问题呢&#xff1f; 解决和排查网络故障&#xff0c;要有基本的…

【漏洞复现】maccms苹果cms 命令执行漏洞

漏洞描述 感谢提供更多信息。“苹果CMS” 似乎是指 “Maccms”&#xff0c;这是一款开源的内容管理系统&#xff0c;主要用于搭建视频网站。Maccms 提供了一套完整的解决方案&#xff0c;包括用户管理、视频上传、分类管理、数据统计等功能&#xff0c;使用户能够方便地创建和…

Sentinel入门

一、Sentinel介绍 Sentinel 是阿里云开发的一款用于流量控制、熔断降级、系统负载保护的轻量级库。它可以帮助开发者保障系统的稳定性&#xff0c;在分布式服务架构中&#xff0c;Sentinel 能够对服务提供一定的保护&#xff0c;避免因为某个服务的故障而影响全局。 Sentinel …

HMM与LTP词性标注之LTP介绍

文章目录 LTP 上图缺点&#xff1a;参数太多&#xff0c;中文语料库匮乏 注意力机制&#xff0c;相当于给每一个词赋予一个权重&#xff0c;权重越大的越重要。 bert的缺点&#xff1a;神经元太多&#xff0c;较慢。 LTP 如果只是需要做词性的识别&#xff0c;那么用LTP就可…

【MATLAB源码-第80期】基于蚯蚓优化算法(EOA)的无人机三维路径规划,输出做短路径图和适应度曲线

操作环境&#xff1a; MATLAB 2022a 1、算法描述 蚯蚓优化算法&#xff08;Earthworm Optimisation Algorithm, EOA&#xff09;是一种启发式算法&#xff0c;灵感来源于蚯蚓在自然界中的行为模式。蚯蚓优化算法主要模仿了蚯蚓在寻找食物和逃避天敌时的行为策略。以下是蚯蚓…

C语言--字符串详解(多角度分析,什么是字符串?字符串如何存储?字符串如何应用?字符串常用的库函数有哪些?)

目录 一、前言 &#x1f4a6;什么是字符串 &#x1f4a6;字符串如何存储&#xff1f; 二、字符串常量和字符数组 &#x1f4a6;字符串常量 ✨什么是字符串常量&#xff1f; ✨字符串常量与指针 &#x1f4a6;字符数组 ✨字符数组的应用 &#x1f4a6;字符串常量与字符数组的…

手撕无头单链表

&#x1f493; 博客主页&#xff1a;江池俊的博客⏩ 收录专栏&#xff1a;数据结构探索&#x1f449;专栏推荐&#xff1a;✅C语言初阶之路 ✅C语言进阶之路&#x1f4bb;代码仓库&#xff1a;江池俊的代码仓库&#x1f525;编译环境&#xff1a;Visual Studio 2022&#x1f38…

LCD1602指定位置显示字符串-详细版

本文为博主 日月同辉&#xff0c;与我共生&#xff0c;csdn原创首发。希望看完后能对你有所帮助&#xff0c;不足之处请指正&#xff01;一起交流学习&#xff0c;共同进步&#xff01; > 发布人&#xff1a;日月同辉,与我共生_单片机-CSDN博客 > 欢迎你为独创博主日月同…

工业智能网关:工业物联网智慧连接

工业智能网关是工业互联网的核心设备之一&#xff0c;它承担着连接设备、搜集数据、分析数据、设备控制和实现设备智能化管理的重要任务。随着工业互联网的快速发展&#xff0c;工业智能网关的作用变得越来越重要。 计讯物联5G/4G工业智能网关是一种连接工厂设备与互联网的关键…

考虑区域多能源系统集群协同优化的联合需求侧响应模型(matlab代码)

该程序复现《考虑区域多能源系统集群协同优化的联合需求侧响应模型》文献模型&#xff0c;程序的核心是对多个区域级多能源系统互联系统进行多目标优化&#xff0c;并且考虑联合需求侧响应&#xff0c;以多个区域多能源系统运行总成本最小、碳排放最小为目标&#xff0c;建立多…

腾讯云轻量服务器购买优惠,腾讯云轻量应用服务器优惠购买方法

你是否曾经为如何选择合适的服务器而苦恼&#xff1f;在互联网的海洋中&#xff0c;如何找到一个性价比高&#xff0c;性能稳定&#xff0c;价格合理的服务器供应商&#xff0c;确实是一个让人头疼的问题。今天&#xff0c;我要向你介绍的&#xff0c;是腾讯云轻量应用服务器的…

【问题处理】WPS提示不能启动此对象的源应用程序如何处理?

哈喽&#xff0c;大家好&#xff0c;我是雷工&#xff01; 最近在用WPS打开word文件中&#xff0c;插入的Excel附件时&#xff0c;无法打开&#xff0c;提示&#xff1a;“不能启动此对象的源应用程序”。 经过上网查找处理办法&#xff0c;尝试解决&#xff0c;现将解决过程记…

python科研绘图:圆环图

圆环图是一种特殊的图形&#xff0c;它可以显示各个部分与整体之间的关系。圆环图由两个或多个大小不一的饼图叠加而成&#xff0c;中间被挖空&#xff0c;看起来像一个甜甜圈。因此&#xff0c;圆环图也被称为“甜甜圈”图。 与饼图相比&#xff0c;圆环图的空间利用率更高&a…

腾讯云新客户优惠服务器88元/年,540元/3年,另有5年优惠服务器

在选择云服务器时&#xff0c;首先需要考虑的是性能与配置是否与自己的需求相匹配。对于小型网站或者个人博客&#xff0c;轻量应用服务器是一个不错的选择。腾讯云双十一活动中&#xff0c;2核2G轻量应用服务器的活动优惠价为88元/年&#xff0c;2核4G轻量应用服务器的活动优惠…
最新文章