【源码分析】zeebe actor模型源码解读

zeebe actor 模型🙋‍♂️

如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。

环境⛅

zeebe release-8.1.14

actor.run() 是怎么开始的🌈

zeebe actor 模型

LongPollingActivateJobsHandler.java

以LongPollingActivateJobsHandler 的激活任务方法为例,我们可以看到run 方法实际上执行ActorControl类的run 方法,让我们进到run 方法中。

	private ActorControl actor;

    public void activateJobs(final InflightActivateJobsRequest request) {
        actor.run(
                () -> {
                    final InFlightLongPollingActivateJobsRequestsState state =
                            getJobTypeState(request.getType());

                    if (state.shouldAttempt(failedAttemptThreshold)) {
                        activateJobsUnchecked(state, request);
                    } else {
                        completeOrResubmitRequest(request, false);
                    }
                });
    }

ActorControl

可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中,添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了,因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中,对于job 的执行要等到队列不断被线程pop 到当前job 的时候。

	final ActorTask task;
	
    @Override
    public void run(final Runnable action) {
        scheduleRunnable(action);
    }
    private void scheduleRunnable(final Runnable runnable) {
        final ActorThread currentActorThread = ActorThread.current();

        if (currentActorThread != null && currentActorThread.getCurrentTask() == task) {
            final ActorJob newJob = currentActorThread.newJob();
            newJob.setRunnable(runnable);
            newJob.onJobAddedToTask(task);

            // 插入到执行队列
            task.insertJob(newJob);
        } else {
            final ActorJob job = new ActorJob();
            job.setRunnable(runnable);
            job.onJobAddedToTask(task);

            // 提交到外部队列
            // submit 实际上是将task 放到thread group 里边
            task.submit(job);
        }
    }

job 是怎么被执行的⚡

并不是任意一个ActorControl 都可以执行run 方法的,按照上图所示,Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task,每个task 中又会有多个job,按照策略选取不同的job 执行,我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。

Gateway.java

gateway 注册task

  
  private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(
      final ActivateJobsHandler handler) {
    final var future = new CompletableFuture<ActivateJobsHandler>();
    final var actor =
        Actor.newActor()
            .name("ActivateJobsHandler")
            .actorStartedHandler(handler.andThen(t -> future.complete(handler)))
            .build();
	
	// 将task 注册到TaskQueues
    actorSchedulingService.submitActor(actor);
    return future;
  }

ActorThreadGroup.java

就是上面提到的“线程池”,负责初始化每一条ActorThread 线程,并为其分配默认的WorkStealingGroup

	protected final String groupName;
    protected final ActorThread[] threads;
    protected final WorkStealingGroup tasks;
    protected final int numOfThreads;

	// 构造器,初始化每条线程,并为其分配一个默认的WorkStealingGroup 任务队列
    public ActorThreadGroup(
            final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {
        this.groupName = groupName;
        this.numOfThreads = numOfThreads;

        tasks = new WorkStealingGroup(numOfThreads);

        threads = new ActorThread[numOfThreads];

        for (int t = 0; t < numOfThreads; t++) {
            final String threadName = String.format("%s-%d", groupName, t);
            final ActorThread thread =
                    builder
                            .getActorThreadFactory()
                            .newThread(
                                    threadName,
                                    t,
                                    this,
                                    tasks,
                                    builder.getActorClock(),
                                    builder.getActorTimerQueue(),
                                    builder.isMetricsEnabled());

            threads[t] = thread;
        }
    }
	
	// start
    public void start() {
        for (final ActorThread actorThread : threads) {

            // 启动每一个ActorThread
            actorThread.start();
        }
    }

ActorThread.java

ActorThread 继承自Thread,可以看到start=>run=>doWork 的引用流程,在doWork 方法中,首先从taskScheduler 中获取当前task,然后执行当前task


	// 继承自Thread 
    @Override
    public synchronized void start() {
        if (UNSAFE.compareAndSwapObject(
                this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {
            
            // super.start 会执行下面的run 方法
            super.start();
        } else {
            throw new IllegalStateException("Cannot start runner, not in state 'NEW'.");
        }
    }

	// 主要执行doWork 方法
    @Override
    public void run() {
        idleStrategy.init();
		
        while (state == ActorThreadState.RUNNING) {
            try {
                doWork();
            } catch (final Exception e) {
                LOG.error("Unexpected error occurred while in the actor thread {}", getName(), e);
            }
        }

        state = ActorThreadState.TERMINATED;

        terminationFuture.complete(null);
    }
	private void doWork() {
        submittedCallbacks.drain(this);

        if (clock.update()) {
            timerJobQueue.processExpiredTimers(clock);
        }

		// 从taskScheduler 中获取当前task
        currentTask = taskScheduler.getNextTask();

        if (currentTask != null) {
            final var actorName = currentTask.actor.getName();
            try (final var timer = actorMetrics.startExecutionTimer(actorName)) {

                // 执行当前任务
                executeCurrentTask();
            }
            if (actorMetrics.isEnabled()) {
                actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());
                actorMetrics.countExecution(actorName);
            }
        } else {
            idleStrategy.onIdle();
        }
    }

	private void executeCurrentTask() {
        final var properties = currentTask.getActor().getContext();
        MDC.setContextMap(properties);
        idleStrategy.onTaskExecuted();

        boolean resubmit = false;

        try {
			// 真正执行当前任务
            resubmit = currentTask.execute(this);
        } catch (final Throwable e) {
            FATAL_ERROR_HANDLER.handleError(e);
            LOG.error("Unexpected error occurred in task {}", currentTask, e);
        } finally {
            MDC.remove("actor-name");
            clock.update();
        }

        if (resubmit) {
            currentTask.resubmit();
        }
    }

ActorTask.java

ActorTask 的执行流程,它会不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll(),从这里可以看到submittedJobs 和fastlaneJobs 的区别!

找到job 后开始执行当前job


	public boolean execute(final ActorThread runner) {
        schedulingState.set(TaskSchedulingState.ACTIVE);

        boolean resubmit = false;

        // 不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll()
        while (!resubmit && (currentJob != null || poll())) {
            currentJob.execute(runner);

            switch (currentJob.schedulingState) {
                case TERMINATED -> {
                    final ActorJob terminatedJob = currentJob;

                    // 从fastlaneJobs任务集合中拉取任务
                    currentJob = fastLaneJobs.poll();

                    // 如果是通过订阅触发的任务
                    if (terminatedJob.isTriggeredBySubscription()) {
                        final ActorSubscription subscription = terminatedJob.getSubscription();

                        // 如果订阅是一次性的,那么在订阅触发后则将订阅移除
                        if (!subscription.isRecurring()) {
                            removeSubscription(subscription);
                        }

                        // 执行订阅的回调任务
                        subscription.onJobCompleted();
                    } else {
                      
                        runner.recycleJob(terminatedJob);
                    }
                }
                case QUEUED ->
                    // the task is experiencing backpressure: do not retry it right now, instead re-enqueue
                    // the actor task.
                    // this allows other tasks which may be needed to unblock the backpressure to run
                        resubmit = true;
                default -> {
                }
            }

            if (shouldYield) {
                shouldYield = false;
                resubmit = currentJob != null;
                break;
            }
        }

        if (currentJob == null) {
            resubmit = onAllJobsDone();
        }

        return resubmit;
    }
    private boolean poll() {
        boolean result = false;

        result |= pollSubmittedJobs();
        result |= pollSubscriptions();

        return result;
    }

ActorJob.java

ActorJob 的执行逻辑

还记得上面说过ActorJob 可以理解为runnable 的吗,在invoke 中ActorJob 中的runnable 真正执行了,至此job 的执行过程结束

	void execute(final ActorThread runner) {
        actorThread = runner;
        observeSchedulingLatency(runner.getActorMetrics());
        try {

            // 执行actor 的 callable 或者 runnable 方法
            invoke();

            if (resultFuture != null) {
                resultFuture.complete(invocationResult);
                resultFuture = null;
            }

        } catch (final Throwable e) {
            FATAL_ERROR_HANDLER.handleError(e);
            task.onFailure(e);
        } finally {
            actorThread = null;

            // 无论那种情况,成功或者失败,都要判断是否job 应该被resubmitted
            // in any case, success or exception, decide if the job should be resubmitted
            if (isTriggeredBySubscription() || runnable == null) {
                schedulingState = TaskSchedulingState.TERMINATED;
            } else {
                schedulingState = TaskSchedulingState.QUEUED;
                scheduledAt = System.nanoTime();
            }
        }
    }
    
    private void invoke() throws Exception {
        if (callable != null) {
            invocationResult = callable.call();
        } else {
            // only tasks triggered by a subscription can "yield"; everything else just executes once
            if (!isTriggeredBySubscription()) {
                final Runnable r = runnable;
                runnable = null;
                r.run();
            } else {
            	// runnable 真正执行
                runnable.run();
            }
        }
    }

总结📝

本文中的激活例子其实只是列举了Actor 的实现原理,想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此,对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘🤏~

zeebe 团队真的是太喜欢functional programming了,找一个方法的调用链头都大了😅

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/182097.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【SpringMVC】 三层架构

一.lombok工具包 中央仓库查找这个工具包:https://mvnrepository.com/ 给类添加Data注解就可以获取gettter和setter方法 , 这样我们就不必写getter 和 setter 方法. 也可以给成员属性添加单独的getter 和 setter , 针对某个成员属性单独添加setter或setter方法. 二.如果使用spr…

【华为数通HCIP | 网络工程师】821-IGP高频题、易错题之OSPF(1)

个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名大三在校生&#xff0c;喜欢AI编程&#x1f38b; &#x1f43b;‍❄️个人主页&#x1f947;&#xff1a;落798. &#x1f43c;个人WeChat&#xff1a;hmmwx53 &#x1f54a;️系列专栏&#xff1a;&#x1f5bc;️…

spark的算子

spark的算子 1.spark的单Value算子 Spark中的单Value算子是指对一个RDD中的每个元素进行操作&#xff0c;并返回一个新的RDD。下面详细介绍一些常用的单Value算子及其功能&#xff1a; map&#xff1a;逐条映射&#xff0c;将RDD中的每个元素通过指定的函数转换成另一个值&am…

SWT/Jface(2): 表格的编辑

前言 上节说到, 创建和渲染表格需要如下几个步骤: 接收源数据数组(也可以是单个对象或者其他集合类型): TableViewer.setInput(Object)渲染接收的数据 渲染表头: TableViewer.setLabelProvider(IBaseLabelProvider)渲染内容: TableViewer.setContentProvider(IContentProvide…

Vue框架学习笔记——Vue实例中el和data的两种写法

文章目录 前文提要Vue实例的el第一种写法第二种写法小结 Vue实例中data第一种写法&#xff0c;对象式效果图片第二种写法&#xff0c;函数式效果图片小结 前文提要 本文仅做自己的学习记录&#xff0c;如有错误&#xff0c;请多谅解 Vue实例的el 第一种写法 <body><…

工业一体全国产方案,米尔T113核心板

入门级HMI屏作为嵌入式系统中重要组成部分&#xff0c;大部分都是串口屏&#xff1b;其功能简单、成本低等特点&#xff0c;使用历史悠久、应用广泛&#xff0c;而随着信息技术的快速发展&#xff0c;行业需求不断升级&#xff0c;工程师使用了大量串口屏后&#xff0c;发现串口…

微服务保护 Sentinel

1.初识Sentinel 文章目录 1.初识Sentinel1.1.雪崩问题及解决方案1.1.1.雪崩问题1.1.2.超时处理1.1.3.仓壁模式1.1.4.断路器1.1.5.限流1.1.6.总结 1.2.服务保护技术对比1.3.Sentinel介绍和安装1.3.1.初识Sentinel1.3.2.安装Sentinel 1.4.微服务整合Sentinel 2.流量控制2.1.簇点链…

python opencv 放射变换和图像缩放-实现图像平移旋转缩放

python opencv 放射变换和图像缩放-实现图像平移旋转缩放 我们实现这次实验主要用到cv2.resize和cv2.warpAffine cv2.warpAffine主要是传入一个图像矩阵&#xff0c;一个M矩阵&#xff0c;输出一个dst结果矩阵&#xff0c;计算公式如下&#xff1a; cv2.resize则主要使用fx&…

Arm64版本的centos编译muduo库遇到的问题的归纳

环境&#xff1a;Mac m2 pro下的VMware虚拟机中Arm64 centos ./build.sh 执行后提示如下 cmake -DCMAKE_BUILD_TYPErelease -DCMAKE_INSTALL_PREFIX…/release-install-cpp11 -DCMAKE_EXPORT_COMPILE_COMMANDSON /root/package/muduo-master – Boost version: 1.69.0 – Co…

【双指针】和为 s 的两个数字

和为 s 的两个数字 文章目录 和为 s 的两个数字题目描述算法思路暴力枚举双指针 代码编写Java代码C代码编写 LCR 179. 查找总价格为目标值的两个商品 - 力扣&#xff08;LeetCode&#xff09; 题目描述 购物车内的商品价格按照升序记录于数组 price。请在购物车中找到两个商品…

Go语言中结构体的使用和示例

结构体&#xff08;简称struct&#xff09;用于创建不同数据类型的成员集合&#xff0c;放入一个单一的变量中。虽然数组用于将相同数据类型的多个值存储在单一变量中&#xff0c;但结构体用于将不同数据类型的多个值存储在单一变量中。结构体对于将数据组合在一起以创建记录非…

云安全之盾:ZStack 云主机安全防护解决方案全方位保护云环境

随着云计算的蓬勃发展&#xff0c;网络威胁愈发复杂&#xff0c;涵盖了从勒索病毒到APT攻击的各种威胁类型。在这一风云变幻的网络安全环境下&#xff0c;云主机安全不再仅仅是一个选项&#xff0c;它是信息系统安全的核心要素。云轴科技ZStack 云主机安全防护解决方案是为了满…

国家超级计算济南中心低代码平台应用实践

摘要&#xff1a;文章主要介绍了济南超算使用低代码平台明道云解决了一系列业务问题&#xff0c;包括资产管理、人员与机构管理、流程制度管理等。通过明道云平台&#xff0c;济南超算成功地将不同部门的业务信息进行整合&#xff0c;提高了工作效率和管理水平。文章还强调了明…

操作系统 day13(RR)

RR&#xff08;时间片轮转&#xff09; 响应时间&#xff1a;系统中有10个进程正在并发执行&#xff0c;如果时间片为1秒&#xff0c;则一个进程被响应可能需要等待9秒。也就是说&#xff0c;如果用户在自己进程的时间片外通过键盘发出调试命令&#xff0c;可能需要等待9秒才能…

如何在AD上创建完整的项目

首先&#xff0c;我们先安装好AD&#xff0c;这里我使用的是AD22&#xff0c;安装过程如下&#xff1a; Altium Designer 22下载安装教程-CSDN博客 Altium Designer 22是全球领先的PCB设计软件之一&#xff0c;为电路板设计师提供了一种集成的解决方案&#xff0c;旨在简化和加…

Python大语言模型实战-记录一次用MetaGPT框架实现爬虫任务的完整过程

1、模型选择&#xff1a;GPT4 2、需求&#xff1a;在win10操作系统环境下&#xff0c;基于python3.10解释器&#xff0c;爬取豆瓣电影Top250的相关信息&#xff0c;包括电影详情链接&#xff0c;图片链接&#xff0c;影片中文名&#xff0c;影片外国名&#xff0c;评分&#x…

回归预测 | MATLAB实现SCN随机配置网络多输入单输出回归预测

回归预测 | MATLAB实现SCN随机配置网络多输入单输出回归预测 目录 回归预测 | MATLAB实现SCN随机配置网络多输入单输出回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 Matlab实现SCN随机配置网络多变量回归预测 1.data为数据集&#xff0c;7个输入特征&#xff0…

港口大型设备状态监测及预测性维护策略

在现代港口运营中&#xff0c;大型设备的正常运行对于保障港口作业的高效性至关重要。为了实现设备的可靠性和持续性&#xff0c;港口管理者需要采取一系列状态监测和预测性维护策略。 推进自动化和智能化是提高港口大型设备状态监测和维护管理效率的重要途径。通过应用先进的…

Node使用Nvm安装双版本切换(node两个版本同时用怎么办?不同的项目Node版本要求不一样怎么办?)

先把node.js卸载 开始—>添加删除程序—>node npm -v node -v //检查是否还存在&#xff0c;卸载成功就行了NVM下载 github下载 百度网盘下载 打开安装包以管理员身份安装&#xff0c;要是记得这个路径并且必须全是英文 使用nvm安装两个使用的node版本 cmd以管理员…
最新文章