Java使用elasticjob实现定时任务(v2.1.5)

elastic是一个定时任务库

https://shardingsphere.apache.org/elasticjob/index_zh.html

bff3a57751ea742698917f55e0557d5e.png

项目结构

ba89260324b237010225fa45c99d4290.png

​依赖

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
        </dependency>

实现simplejob

simplejob是使用最多、最简单的定时任务

任务类

定时任务类需要实现相应的定时任务接口(idea快捷键 ctrl+i)

public class MySimpleJob implements SimpleJob

然后在实现的execute里写定时任务的逻辑

public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("分片项: " + shardingContext.getShardingItem() +
                ",总分片项数: " + shardingContext.getShardingTotalCount());
    }
}

定时任务配置

新建App.java

public class App {
} 

添加配置信息(都写在App.java里)

1)zookeeper配置信息(zookeeper作为注册中心,elasticjob将服务注册到zookeeper)

zookeeper搭建可以看我的这一篇文章

在windows搭建zookeeper(单机/集群) - 知乎
    /**
     * 注册中心zookeeper
     */
    public static CoordinatorRegistryCenter zkCenter() {
        // 参数1: zk的地址(集群就写多个,中间用逗号隔开),参数2: 命名空间
        var zc =
                new ZookeeperConfiguration("localhost:2181", "java-simple-job");
        var crc = new ZookeeperRegistryCenter(zc);
        // 初始化注册中心
        crc.init();
        return crc;
    }

2)simplejob任务配置

    /**
     * simple-job配置
     *
     * @return
     */
    public static LiteJobConfiguration configurationSimple() {
        // 1,job核心配置
        var jcc = JobCoreConfiguration
                // 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量
                .newBuilder("mySimpleJob", "0/10 * * * * ?", 2)
                .build();
        // 2,job类型配置
        // 参数1: 核心配置,参数2: 任务类的全类名
        var jtc = new SimpleJobConfiguration(jcc, MySimpleJob.class.getCanonicalName());
        // 3,job根配置 (LiteJobConfiguration)
        return LiteJobConfiguration.newBuilder(jtc)
                // 有这个才能重新布置任务,否则修改不会生效
                .overwrite(true)
                .build();
    }

3)启动定时任务

    public static void main(String[] args) {
        // 启动定时任务
        // 参数1: 注册中心;参数2: 配置
        new JobScheduler(zkCenter(), configurationSimple()).init();
    }

启动

6e23bf6b17513046cfb242a2287ed547.png

因为我们设置的分片数量是2,所以可以启动另一个定时任务,elasticjob会自动分配任务

5ae8c0ea77479358af4513c9115a76eb.png

684c25b9f11c32a15e400d0da0eeb7ca.png

复制运行配置

启动两个任务,可以看到自动分配任务,原本是一个服务执行分片1和0,现在是分别执行单个任务

dbaa405674086c2f1e712ed5dcce613b.png

caa9dfdcb430d467751bcf161e5e8115.png

dataflow任务

dataflow任务适合处理流式作业,和simplejob不同,分为数据抓取和处理,先获取数据然后进行处理

订单类(被处理的类)

public class Order {
    private Integer orderId;
    // 0 未处理; 1 已处理
    private Integer status;

    @Override
    public String toString() {
        return "Order{" +
                "orderId=" + orderId +
                ", status=" + status +
                '}';
    }

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }
}

任务类

实现接口,有两个方法,对应抓取和处理,抓取方法的返回值会交给处理方法

public class MyDataflowJob implements DataflowJob<Order> { 
    // 抓取数据
    @Override
    public List<Order> fetchData(ShardingContext shardingContext) {
       return null;
    }

    // 处理数据
    @Override
    public void processData(ShardingContext shardingContext, List<Order> data) { 
    }
} 

具体逻辑:初始化100个order,然后抓取指定数据(status为0 并且 订单号%分片总数 == 当前分片项)的订单进行处理,返回值交给处理方法,处理方法进行处理(将order的status设置为1)

public class MyDataflowJob implements DataflowJob<Order> {
    private List<Order> orders = new ArrayList<Order>();

    {
        // 实例化该类时执行
        for (int i = 0; i < 100; i++) {
            Order order = new Order();
            order.setOrderId(i + 1);
            // 未处理
            order.setStatus(0);
            orders.add(order);
        }
    }

    // 抓取数据
    @Override
    public List<Order> fetchData(ShardingContext shardingContext) {
        // 将 订单号%分片总数 == 当前分片项 的订单进行处理
        var orderList = orders.stream()
                // 过滤状态为0的
                .filter(o -> o.getStatus() == 0)
                .filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount()
                        == shardingContext.getShardingItem())
                // 放入集合
                .collect(toList());
        List<Order> subList = null;
        if (orderList != null && orderList.size() > 0) {
            // (抓)截取list
            subList = orderList.subList(0, 10);
        }
        try {
            // 休眠3秒
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LocalTime time = LocalTime.now();
        System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",我抓取的数据是: " + subList);
        return subList;
    }

    // 处理数据
    @Override
    public void processData(ShardingContext shardingContext, List<Order> data) {
        // 设置为已处理,下次不会再抓取到
        data.forEach(o -> o.setStatus(1));
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LocalTime time = LocalTime.now();
        System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",正在处理数据!");
    }
}

App.java

1)dataflow任务配置

   /**
     * dataflow-job配置
     *
     * @return
     */
    public static LiteJobConfiguration configurationDataflow() {
        // 1,job核心配置
        var jcc = JobCoreConfiguration
                // 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量
                .newBuilder("myDataflowJob", "0/10 * * * * ?", 2)
                .build();
        // 2,job类型配置
        // 参数1: 核心配置,参数2: 任务类的全类名,参数3: 是否开启定时任务(不开则只执行1次)
        var jtc =
                new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(), true);
        // 3,job根配置 (LiteJobConfiguration)
        return LiteJobConfiguration.newBuilder(jtc)
                // 有这个才能重新布置任务,否则修改不会生效
                .overwrite(true)
                .build();
    }

2)main方法

    public static void main(String[] args) {
        // 启动定时任务
        // 参数1: 注册中心;参数2: 配置
        new JobScheduler(zkCenter(), configurationDataflow()).init();
    }


启动

0363103046d4c3da2737246318e51fde.png

script任务

可以运行脚本文件(cmd、python……)

d盘下新建test.txt,修改内容后重命名为.cmd

%1这些是用来接收elastic传递来的参数的

echo running cmd cript: %1,%2,%3,%4,%5

App.java

1)任务配置

    /**
     * script-job配置
     *
     * @return
     */
    public static LiteJobConfiguration configurationScript() {
        // 1,job核心配置
        var jcc = JobCoreConfiguration
                // 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量
                .newBuilder("myScriptJob", "0/10 * * * * ?", 2)
                .build();
        // 2,job类型配置
        // 参数1: 核心配置,参数2: 任务脚本所在目录
        var jtc =
                new ScriptJobConfiguration(jcc, "d:/test.cmd");
        // 3,job根配置 (LiteJobConfiguration)
        return LiteJobConfiguration.newBuilder(jtc)
                // 有这个才能重新布置任务,否则修改不会生效
                .overwrite(true)
                .build();
    }

2)main方法

    public static void main(String[] args) {
        // 启动定时任务
        // 参数1: 注册中心;参数2: 配置
        new JobScheduler(zkCenter(), configurationScript()).init();
    }

0c3768fe9861e8e2cb0bfc86fc9b7c97.png

后续更文:springboot整合(2.1.5和3.0.0-alpha)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/10322.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

5.Java循环控制语句

Java循环控制语句 循环是Java中应用最为广泛的一个知识点&#xff0c;所以也是很需要掌握的。所谓循环&#xff0c;即通过判断条件&#xff0c;重复执行一段代码&#xff0c;根据条件的变化&#xff0c;来确定代码是否执行&#xff0c;执行次数。 一、循环结构 1、while循环…

C风格的字符串赋值方式

文章目录&#xff08;1&#xff09;C语言中&#xff0c;没有字符串类型但可以用字符数组模拟字符串。&#xff08;2&#xff09;C语言中&#xff0c;字符串是以’\0’作结尾字符。&#xff08;3&#xff09;C语言中&#xff0c;字符串常量本质上是一个无名的字符数组。C风格的字…

代码自动发布系统

之前是jenkins发现gitlab代码更新了就自动获取直接部署到服务器 现在是jenkins自动获取Code之后打包成镜像上传到仓库然后通知docker去拉取更新的镜像 分析 旧∶ 代码发布环境提前准备&#xff0c;以主机为颗粒度静态 新: 代码发布环境多套&#xff0c;以容器为颗粒度编译 …

适合销售使用的CRM系统特点

销售人员抱怨CRM系统太复杂&#xff0c;这是一个很重要的问题。毕竟&#xff0c;如果系统太难使用&#xff0c;会导致CRM实用率和效率下降&#xff0c;最终影响公司的运作。在这篇文章中&#xff0c;我们来探讨当销售抱怨crm客户系统太复杂了&#xff0c;企业该如何解决。 缺少…

VCS4 debug with DVE

1、重点讲解&#xff1a; 在verilog源代码中嵌入VCD 系统函数&#xff0c;重点如testbench文件中。VCD文件是VCS产生的仿真波形文件&#xff0c;未经压缩&#xff0c;占用空间较大。VCD是压缩后的波形文件。 编译、仿真以生成VCD文件。 在后处理模式中使用激活DVElog对产生的…

NodeJS Cluster模块基础教程

Cluster简介 默认情况下&#xff0c;Node.js不会利用所有的CPU&#xff0c;即使机器有多个CPU。一旦这个进程崩掉&#xff0c;那么整个 web 服务就崩掉了。 应用部署到多核服务器时&#xff0c;为了充分利用多核 CPU 资源一般启动多个 NodeJS 进程提供服务&#xff0c;这时就…

当ChatGPT续写《红楼梦》,能替代原著吗?

来源: 清华大学出版社 近段时间&#xff0c;人工智能聊天机器人ChatGPT火爆网络&#xff0c;“AI写作是否会让文字工作者被替代&#xff1f;”成为人们关注并持续讨论的话题。 闲聊、问答、解题、写代码、写诗、创作小说&#xff0c; 连续回答&#xff0c;不断纠错&#xff0c…

拥抱自动化测试,快速升职加薪丄Selenium+Pytest自动化测试框架教你如何做到

目录&#xff1a;导读 引言 SeleniumPytest自动化测试框架是目前最流行的自动化测试工具之一&#xff0c;其强大的功能和易用性援助许多开发人员和测试人员。 selenium自动化 pytest测试框架禅道实战 选用的测试网址为我电脑本地搭建的禅道 conftest.py更改 config.ini更…

MyBatis配置文件 —— 相关标签详解

目录 一、Mybatis配置文件 — properties标签 二、Mybatis配置文件 — settings标签 三、Mybatis配置文件 — plugins标签 四、Mybatis配置文件 — typeAliases标签 五、Mybatis配置文件 — environments标签 六、Mybatis配置文件 — mappers标签 一、Mybatis配置文件 —…

2023年第十四届蓝桥杯 C++ B组参赛经验总结

没错&#xff0c;今年本菜狗又来啦~~ hhh &#xff0c; 文章当时比赛完就写完了&#xff0c; 发的有点晚 比赛成绩 &#xff08;等出来我就写这里&#xff09; 感觉最多省二 估计没省一了555 赛前准备 赛前把蓝桥杯课基本都刷了 &#xff0c; 但是还是感觉有点慌 刷题经验 …

【网络原理】网络通信与协议

✨个人主页&#xff1a;bit me&#x1f447; ✨当前专栏&#xff1a;Java EE初阶&#x1f447; 目 录一. 网络发展史二. 网络通信基础1. IP地址2. 端口号3. 认识协议&#xff08;核心概念&#xff09;4. 五元组5. 协议分层6. 封装和分用一. 网络发展史 独立模式&#xff1a;计…

springboot从2.1.3升级到2.3.5后控制台自动输出http请求日志RequestResponseBodyMethodProcessor

springboot从2.1.3升级到2.3.5后控制台自动输出http请求日志RequestResponseBodyMethodProcessor和RequestMappingHandlerMapping推荐使用第二个方案简单 明了 方便 快捷方案一第一步定义TurboFilter第二步配置logback方案二 直接配置logback的配置XML推荐使用第二个方案简单 明…

【三十天精通 Vue 3】 第四天 Vue 3的模板语法详解

✅创作者&#xff1a;陈书予 &#x1f389;个人主页&#xff1a;陈书予的个人主页 &#x1f341;陈书予的个人社区&#xff0c;欢迎你的加入: 陈书予的社区 &#x1f31f;专栏地址: 三十天精通 Vue 3 文章目录引言一、Vue 3 模板语法概述1. Vue 3 模板语法的简介2. Vue 3 模板…

Openlayers(五)点位聚合Cluster

Openlayers&#xff08;五&#xff09;点位聚合Cluster 1.业务问题 由于点位在地图上显示过多&#xff0c;会造成页面卡顿、点位标注信息相互叠加导致看不清 优化后效果 不断放大层级 2.聚合类Cluster OpenLayers 中聚合是通过 ol.source.Cluster 实现&#xff0c;聚合的原…

Flink的窗口机制

窗口机制 tumble&#xff08;滚动窗口&#xff09; hop&#xff08;滑动窗口&#xff09; session&#xff08;会话窗口&#xff09; cumulate&#xff08;渐进式窗口&#xff09; Over&#xff08;聚合窗口&#xff09; 滚动窗口&#xff08;tumble&#xff09; 概念 滚…

系统复杂度之【高性能】

系统复杂度之【高性能】 今天我们来谈一谈系统复杂度的根源之【高性能】 对性能的不懈追求一直是人类科技持续发展的核心动力。例如计算机&#xff0c;从电子管计算机到晶体管计算机&#xff0c;再到集成电路计算机&#xff0c;运算性能从每秒几次提高到每秒几亿次。然而&#…

VUE_学习笔记

一、 xx 二、模板语法 1.模板语法之差值语法 &#xff1a;{{ }} 主要研究&#xff1a;{{ 这里可以写什么}} 在data中声明的变量、函数等都可以。常量只要是合法的javascript表达式&#xff0c;都可以。模板表达式都被放在沙盒中&#xff0c;只能访问全局变量的一个白名单&a…

【微服务笔记14】微服务组件之Config配置中心高可用环境搭建

这篇文章&#xff0c;主要介绍微服务组件之Config配置中心高可用环境搭建。 目录 一、高可用Config配置中心 1.1、高可用配置中心介绍 1.2、搭建Eureka注册中心 1.3、搭建ConfigServer服务端 &#xff08;1&#xff09;引入依赖 &#xff08;2&#xff09;添加配置文件 …

Jetson nano部署剪枝YOLOv8

目录前言一、YOLOv8模型剪枝训练1. Pretrain[option]1.1 项目的克隆1.2 数据集1.3 训练2. Constraint training3. Prune4. finetune二、YOLOv8模型剪枝部署1. 源码下载2. 环境配置2.1 trtexec环境变量设置3. ONNX导出3.1 Transpose节点的添加3.2 Resize节点解析的问题4. 运行4.…

FIFO的工作原理及其设计

1.简介 FIFO( First Input First Output)简单说就是指先进先出。FIFO存储器是一个先入先出的双口缓冲器&#xff0c;即第一个进入其内的数据第一个被移出&#xff0c;其中一个口是存储器的输入口&#xff0c;另一个口是存储器的输出口。 对于单片FIFO来说&#xff0c;主要有两种…