Luigi任务调度框架学习2:运行每一个Task,避免因判定完成导致跳过执行Task主程序

在上一篇Luigi的线性调度文章中(Luigi任务调度框架学习1:线性调用流程),我们知道Task运行的时候:

每个任务是否完成有两次判定,即:进行判定(未完成) =》运行def run(self)函数 =》进行判定(完成) =》运行后续的Task;如果第一次判定就完成,则不会执行当前Task的def run(self)函数

但是Luigi支持的判定条件只有文件与SQL,在有些情况下(尤其是定时任务),我们希望它梳理并运行整个拓扑,而不是根据判定去决定是否运行,因此本文来解决这个问题,让我们能够直接运行Task

文章目录

  • 解决方法
  • 问题解析
    • 1. Luigi多Task多次运行效果
    • 2. 无限调用Task示例程序

解决方法

博主写了一个工具类:

import luigi
import os


class NeverStopLuigi(luigi.Task):
    """无限执行Task的工具类"""
    never_stop = luigi.BoolParameter(True)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._out_put_file = self.__class__.__name__  # 用当前类名,作为无限重启确认的文件名
        if self.never_stop:
            outputs = luigi.task.flatten(self.output())
            for out in outputs:
                if isinstance(out, luigi.LocalTarget) and out.exists():
                    os.remove(self.output().path)

    def run(self):
        with open(self._out_put_file, "w") as f:
            f.write(".")

    def output(self):
        return luigi.LocalTarget(self._out_put_file)

之后程序只需要继承NeverStopLuigi类后覆写两个函数(def requires(self)def run(self))即可运行:

class MyTask(NeverStopLuigi):

    def requires(self):  # 1. 用于定义前置程序
        print(f"运行MyTask的 requires程序")
        return []

    def run(self):  # 2. 这个Task运行的主程序
        super(MyTask, self).run() # 调用NeverStopLuigi的run函数
        print(f"运行MyTask的 ======= 主程序 ======")

在这个工具类中,主要内容有:

  1. 依靠初始化Task时删除掉检测的文件,来实现首次检测output不通过
  2. 调用run函数先写文件,实现二次检测output通过

至于用于检测的文件名,用最不容易重名的类名self.__class__.__name__来作为中间的文件名,这里是为了文件能够循环执行,而不用uuid这种随意生成的文件,最后无法删除(其实是可以在最后一个Task通过获取前面的所有output删除的,但没有直接继承的写法简洁明了)

问题解析

1. Luigi多Task多次运行效果

import luigi


class PreClass(luigi.Task):
    out_file = "pre_class.text"

    def requires(self):  # 所需的前置函数,也可以不写,默认为空
        print(f"运行PreClass的 requires程序")
        return []

    def output(self):  # 完成的依据
        print(f"运行PreClass的 output程序")
        return luigi.LocalTarget(self.out_file)  # 判断这一部分是否完成会去查看是否有out_file文件(pre_class.text)

    def run(self):  # 这一部分执行的主函数
        print(f"运行PreClass的 ======= 主程序 ======")
        with open(self.out_file, "a") as file:
            file.write("啊哈哈哈")


class Run1Class(luigi.Task):
    out_file = "run_1_class.text"

    # 这里不覆写 output是默认空
    def requires(self):  # 所需的前置函数
        print(f"运行Run1Class的 requires程序")
        return [PreClass()]

    def output(self):  # 完成的依据
        print(f"运行Run1Class的 output程序")
        return luigi.LocalTarget(self.out_file)  # 判断这一部分是否完成会去查看是否有out_file文件(pre_class.text)

    def run(self):  # 这一部分执行的主函数
        print(f"运行Run1Class ======= 主程序 ======")
        with open(self.out_file, "a") as file:
            file.write("啊哈哈哈")


class Run2Class(luigi.Task):
    out_file = "run_2_class.text"

    # 这里不覆写 output是默认空
    def requires(self):  # 所需的前置函数
        print(f"运行Run2Class的 requires程序")
        return [Run1Class()]

    def run(self):  # 这一部分执行的主函数
        print(f"运行Run2Class ======= 主程序 ======")
        with open(self.out_file, "a") as file:
            file.write("啊哈哈哈")


if __name__ == '__main__':
    luigi.build([Run2Class()], local_scheduler=True)  # 只写入最后输入的数据即可

最后的输出效果如下:

===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 3 ran successfully:
    - 1 PreClass()
    - 1 Run1Class()
    - 1 Run2Class()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

运行Run2Class的 requires程序
运行Run1Class的 output程序
运行Run1Class的 requires程序
运行PreClass的 output程序
运行PreClass的 requires程序
运行PreClass的 requires程序
运行PreClass的 ======= 主程序 ======
运行Run1Class的 requires程序
运行PreClass的 output程序
运行Run1Class ======= 主程序 ======
运行Run2Class的 requires程序
运行Run1Class的 output程序
运行Run2Class ======= 主程序 ======

调度顺序如上所示

2. 无限调用Task示例程序

import luigi
import os


class NeverStopLuigi(luigi.Task):
    """工具类"""
    never_stop = luigi.BoolParameter(True)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._out_put_file = self.__class__.__name__  # 用当前类名,作为无限重启确认的文件名
        if self.never_stop:
            outputs = luigi.task.flatten(self.output())
            for out in outputs:
                if isinstance(out, luigi.LocalTarget) and out.exists():
                    os.remove(self.output().path)

    def run(self):
        with open(self._out_put_file, "w") as f:
            f.write(".")

    def output(self):
        return luigi.LocalTarget(self._out_put_file)


class PreClass(NeverStopLuigi):

    def requires(self):  # 所需的前置函数,也可以不写,默认为空
        print(f"运行PreClass的 requires程序")
        return []

    def run(self):  # 这一部分执行的主函数
        super(PreClass, self).run()
        print(f"运行PreClass的 ======= 主程序 ======")


class Run1Class(NeverStopLuigi):

    def requires(self):  # 所需的前置函数
        print(f"运行Run1Class的 requires程序")
        return [PreClass()]

    def run(self):  # 这一部分执行的主函数
        super(Run1Class, self).run()
        print(f"运行Run1Class ======= 主程序 ======")


class Run2Class(NeverStopLuigi):

    def requires(self):  # 所需的前置函数
        print(f"运行Run2Class的 requires程序")
        return [Run1Class()]

    def run(self):  # 这一部分执行的主函数
        super(Run2Class, self).run()
        print(f"运行Run2Class ======= 主程序 ======")


if __name__ == '__main__':
    luigi.build([Run2Class()], local_scheduler=True)  # 只写入最后输入的数据即可

如上程序,不论运行多少次都会得到:

运行Run2Class的 requires程序
运行Run1Class的 requires程序
运行PreClass的 requires程序
运行PreClass的 requires程序
运行PreClass的 ======= 主程序 ======
运行Run1Class的 requires程序
运行Run1Class ======= 主程序 ======
运行Run2Class的 requires程序
运行Run2Class ======= 主程序 ======

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/499144.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue实现文字一个字一个字的显示(开箱即用)

图示&#xff1a; 核心代码 Vue.prototype.$showHtml function (str, haveCallback null) {let timeFlag let abcStr for (let i 0; i < str.length; i) {(function (i) {timeFlag setTimeout(function () {abcStr str[i]haveCallback(abcStr)if ((i 1) str.length…

SI24R2E:智能电子学生卡2.4GHz考勤方案

今年年初教育部发布的《关于加强中小学生手机管理工作的通知》中提出&#xff0c;学生手机有限带入校园&#xff0c;原则上不得将个人手机带入校园&#xff0c;禁止带入课堂&#xff1b;应设立校内公共电话、建立班主任沟通热线、探索使用具备通话功能的电子学生证或提供其他家…

List操作add,clear,addall报错UnsupportedOperationException的解决办法

ArrayList和Arrays.ArrayList是两码事 ArrayList 支持 add&#xff0c;clear&#xff0c;addall Arrays.ArrayList不支持add&#xff0c;clear&#xff0c;addall 这个方法的使用时候&#xff0c;传递的数组必须是对象数组&#xff0c;而不是基本数据类型 JDK源码 /** *返回由…

LLM之RAG实战(三十六)| 使用LangChain实现多模态RAG

我们之前介绍的RAG&#xff0c;更多的是使用输入text来查询相关文档。在某些情况下&#xff0c;信息可以出现在图像或者表格中&#xff0c;然而&#xff0c;之前的RAG则无法检测到其中的内容。 针对上述情况&#xff0c;我们可以使用多模态大模型来解决&#xff0c;比如GPT-4-V…

【微服务】Sentinel(流量控制)

文章目录 1.基本介绍1.Sentinel是什么2.Sentinel主要特性3.Sentinel核心功能1.流量控制2.熔断降级3.消息削峰填谷 4.Sentinel两个组成部分 2.Sentinel控制台显示1.需求分析2.下载3.运行1.进入cmd2.输入java -jar sentinel-dashboard-1.8.0.jar3.查看默认端口8080 4.访问1.账号和…

数据结构课设-基于Python的校园导航系统(附源码)

一月份的数据结构课设完成后&#xff0c;我对Python的了解也更加深刻。现将课设报告及源码开源&#xff0c;不足之处希望大家指正。源码我放在博客主页的资源中&#xff0c;需要的话大家自行下载&#xff08;用户信息保存在 users.json 文件中&#xff0c;地图信息保存在 campu…

GooglePlay无法下载应用问题

问题如下 解决方法 1、实际上是因为google尚未添加apk downloader扩展程序 2、添加该扩展程序后&#xff0c;在应用中搜索应用名即可 欧克&#xff01;下载完成

IDEA设置内存大小不生效

IDEA设置内存大小不生效 100%可行的方法 -Xms512m -Xmx4096m1.首先要找对idea加载的是哪个配置文件。 2.找到idea启动文件夹&#xff0c;编辑idea.bat 添加打印修改文件路径的代码&#xff0c;运行idea.bat打印一下你的配置文件路径&#xff0c;找到路径 修改 然后运行idea.…

机器学习作业二之KNN算法

KNN&#xff08;K- Nearest Neighbor&#xff09;法即K最邻近法&#xff0c;最初由 Cover和Hart于1968年提出&#xff0c;是一个理论上比较成熟的方法&#xff0c;也是最简单的机器学习算法之一。该方法的思路非常简单直观&#xff1a;如果一个样本在特征空间中的K个最相似&…

vs2022 关于Python项目无法识别中文的解决方法

这是针对于vs2022安装和使用教程&#xff08;详细&#xff09;-CSDN博客 Python项目无法识别中文的解决方法的文章 一、问题 1.输入代码 print("你好Hello world&#xff01;") 2.启动&#xff0c;发现代码里有中文报错 二、解决方法 1.选择菜单栏里的工具->…

超实用的Maven指南

文章目录 实战记录&#x1f4dd;Maven 指令大全 &#x1f31f;找到没有被使用的jar&#xff08;analyze&#xff09;分析jar是被哪个maven引入&#xff08;tree&#xff09;&#x1f31f; dependencies&#xff08;Maven依赖&#xff09;build-resources&#xff08;资源导入&a…

如何提高知识库系统管理水平?

我们都有过这样的经历--遇到问题或紧急请求时&#xff0c;第一时间就是向知识库系统寻求帮助。很多时候&#xff0c;当你翻遍了无穷无尽的文档&#xff0c;却发现没有任何东西能够摆脱此时的困境&#xff0c;这时&#xff0c;向服务台提交工单成了不可避免的解决方式&#xff0…

基于Java的新生入学报到管理系统的设计与实现(论文+源码+PPT)_kaic

摘 要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所认识&#xff0c;科学化的管理&#xff0c;使信息…

2024年2月游戏手柄线上电商(京东天猫淘宝)综合热销排行榜

鲸参谋监测的线上电商&#xff08;京东天猫淘宝&#xff09;游戏手柄品牌销售数据已出炉&#xff01;2月游戏手柄销售数据呈现出强劲的增长势头。 根据鲸参谋数据显示&#xff0c;今年2月游戏手柄月销售量累计约43万件&#xff0c;同比去年上涨了78%&#xff1b;销售额累计达1…

Stable Diffusion 模型分享:SDXL Unstable Diffusers ☛ YamerMIX(混合风格)

本文收录于《AI绘画从入门到精通》专栏,专栏总目录:点这里,订阅后可阅读专栏内所有文章。 文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八下载地址模型介绍

每日一题 --- 快乐数[力扣][Go]

快乐数 题目&#xff1a;202. 快乐数 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为&#xff1a; 对于一个正整数&#xff0c;每一次将该数替换为它每个位置上的数字的平方和。然后重复这个过程直到这个数变为 1&#xff0c;也可能是 无限循环 但始终变不到…

Spring用到了哪些设计模式?

目录 Spring 框架中⽤到了哪些设计模式&#xff1f;工厂模式单例模式1.饿汉式&#xff0c;线程安全2.懒汉式&#xff0c;线程不安全3.懒汉式&#xff0c;线程安全4.双重检查锁&#xff08;DCL&#xff0c; 即 double-checked locking&#xff09;5.静态内部类6.枚举单例 代理模…

AI 文字转语音工具以及它们的官网收集(值得收藏)

目前比较成熟的 AI 文字转语音工具以及它们的官网&#xff1a; 百度语音合成 (https://ai.baidu.com/tech/speech/tts)&#xff1a; 百度语音合成是百度 AI 推出的语音合成服务&#xff0c;支持多种语言和音色&#xff0c;可以用于语音播报、智能客服、有声阅读等场景。 阿里云…

使用Kaggle API快速下载Kaggle数据集

前言 在使用Kaggle网站下载数据集时&#xff0c;直接在网页上点击下载可能会很慢&#xff0c;甚至会出现下载失败的情况。本文将介绍如何使用Kaggle API快速下载数据集。 具体步骤 安装Kaggle API包 在终端中输入以下命令来安装Kaggle API相关的包&#xff1a; pip install…

对 CSS 工程化的理解

CSS 工程化是为了解决以下问题&#xff1a; 宏观设计&#xff1a;CSS 代码如何组织、如何拆分、模块结构怎样设计&#xff1f;编码优化&#xff1a;怎样写出更好的 CSS&#xff1f;构建&#xff1a;如何处理我的 CSS&#xff0c;才能让它的打包结果最优&#xff1f;可维护性&a…