读源码系列文章--开源项目openjob之alarm告警模块

一、背景

告警模块,作为大多数应用都存在的一个基础功能,今天我们就以开源项目openjob 为例,分析其设计及实现。

首先,我们梳理一下需求:

  • 支持多种告警方式,包括钉钉、飞书、微信和webhook。
  • 方便业务模块的接入,这里采用本地事件驱动的方式来解耦模块间的依赖。

本文将针对这两个问题,先画出多个告警的类及接口设计,再讲述本地事件驱动,最后是异步消费队列中的任务。

二、接口及类的设计

在这里插入图片描述

在这里插入图片描述
源码内容就不在这里赘述了。

四个实现类的差异在于方法send()和channel(),大多数公共的实现在抽象类AbstractChannel中。

三、模块架构图

在这里插入图片描述
我们可以看到,通过本地事件驱动机制,告警模块和其他业务模块做到了解耦。

事件监听者,订阅事件,转换为任务存入到LinkedBlockingQueue队列中。

同时,启动两个线程池(pullExecutor线程池负责拉取队列中的任务,consumerExecutor线程池负责执行任务,也即告警)

在这里插入图片描述
还有两个基础类 io.openjob.common.task.BaseConsumer和io.openjob.common.task.TaskQueue。
在这里插入图片描述
TaskQueue是对LinkedBlockingQueue的一个简单封装,入队在本地事件监听者,出队则在下一个类。
抽象类BaseConsumer包括两个线程池:pullExecutor和consumerExecutor。

  • 线程池pullExecutor的作用是:读取TaskQueue中的任务保存至第二个线程池consumerExecutor里
  • 线程池consumerExecutor的作用是:异步执行任务,调用AlarmService.alarm()。

四、本地事件驱动机制

1、定义事件AlarmEvent

package io.openjob.server.alarm.event;

import io.openjob.server.alarm.dto.AlarmEventDTO;
import org.springframework.context.ApplicationEvent;

/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
public class AlarmEvent extends ApplicationEvent {
    public AlarmEvent(AlarmEventDTO alarmEventDTO) {
        super(alarmEventDTO);
    }
}

2、定义事件的发布者AlarmEventPublisher

package io.openjob.server.alarm.event;

import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;

/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
@Component
public class AlarmEventPublisher implements ApplicationEventPublisherAware {

    private static ApplicationEventPublisher applicationEventPublisher;

    /**
     * Publish event
     *
     * @param applicationEvent applicationEvent
     */
    public static void publishEvent(ApplicationEvent applicationEvent) {
        applicationEventPublisher.publishEvent(applicationEvent);
    }

    @Override
    public void setApplicationEventPublisher(@NonNull ApplicationEventPublisher applicationEventPublisher) {
        AlarmEventPublisher.applicationEventPublisher = applicationEventPublisher;
    }
}

业务模块发布事件:

AlarmEventPublisher.publishEvent(new AlarmEvent(alarmEventDTO));

3、事件监听者AlarmEventListener(重点)

类初始化的时候,初始化任务队列TaskQueue,然后启动两个线程池。
作为事件监听者,使用注解EventListener,把事件内容保存至任务队列TaskQueue。

package io.openjob.server.alarm.event;

import io.openjob.common.task.TaskQueue;
import io.openjob.server.alarm.dto.AlarmEventDTO;
import io.openjob.server.alarm.task.EventTaskConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
@Slf4j
@Component
public class AlarmEventListener {
    private final TaskQueue<AlarmEventDTO> queue;

    @Autowired
    public AlarmEventListener() {
        queue = new TaskQueue<>(0L, 1024);
        EventTaskConsumer consumer = new EventTaskConsumer(
                0L,
                1,
                4,
                "Openjob-heartbeat-executor",
                1024,
                "Openjob-heartbeat-consumer",
                queue
        );
        consumer.start();
    }

    /**
     * Alarm listener
     *
     * @param alarmEvent alarmEvent
     */
    @EventListener
    public void alarmListener(AlarmEvent alarmEvent) {
        try {
            AlarmEventDTO event = (AlarmEventDTO) alarmEvent.getSource();
            // 取得事件的内容,放入任务队列
            queue.submit(event);
        } catch (Throwable throwable) {
            log.error("Alarm event add failed!", throwable);
        }
    }
}

  • start()方法,包括两个线程池:consumerExecutor和pullExecutor,
    consumerExecutor这里只有初始化,并没有放入任务,待类EventTaskConsumer的consume()实现。pullExecutor详见下文。
		consumerExecutor = new ThreadPoolExecutor(
                this.consumerCoreThreadNum,
                this.consumerMaxThreadNum,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10240),
                new ThreadFactory() {
                    private final AtomicInteger index = new AtomicInteger(1);

                    @Override
                    public Thread newThread(@Nonnull Runnable r) {
                        return new Thread(r, String.format("%s-%d-%d", consumerThreadName, id, index.getAndIncrement()));
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        consumerExecutor.allowCoreThreadTimeOut(true);

4、生产线程池pullExecutor

  • start()方法,每次从队列中拉取一定数量的任务
		this.pullExecutor = new ThreadPoolExecutor(
                1,
                1,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(1), r -> new Thread(r, "pull"));

        this.pullExecutor.submit(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    List<T> tasks = this.pollTasks();
                    if (tasks.size() < this.pollSize) {
                        if (tasks.isEmpty()) {
                            Thread.sleep(this.pollIdleTime);
                            continue;
                        }
                        Thread.sleep(this.pollSleepTime);
                    }
                }
            } catch (Throwable ex) {
                log.warn("Task consumer failed! message={}", ex.getMessage());
            }
        });
  • pollTasks(),每次拉取一定量的任务,转放入消费线程池(消费逻辑不一)
    private synchronized List<T> pollTasks() {
        // 每次拉取一定量的任务
        List<T> tasks = queues.poll(this.pollSize);
        if (!tasks.isEmpty()) {
            this.activePollNum.incrementAndGet();
            // 放入消费线程池,异步执行任务
            this.consume(id, tasks);
        }
        return tasks;
    }

其中consume()是一个抽象方法,交由具体类去实现。见下文类EventTaskConsumer

public abstract void consume(Long id, List<T> tasks);

5、消费线程EventTaskConsumer

package io.openjob.server.alarm.task;

import io.openjob.server.alarm.dto.AlarmEventDTO;
import io.openjob.server.alarm.service.AlarmService;
import io.openjob.common.OpenjobSpringContext;
import io.openjob.common.task.BaseConsumer;
import io.openjob.common.task.TaskQueue;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * @author stelin swoft@qq.com
 * @since 1.0.6
 */
@Slf4j
public class EventTaskConsumer extends BaseConsumer<AlarmEventDTO> {
    public EventTaskConsumer(Long id,
                             Integer consumerCoreThreadNum,
                             Integer consumerMaxThreadNum,
                             String consumerThreadName,
                             Integer pollSize,
                             String pollThreadName, TaskQueue<AlarmEventDTO> queues) {
        super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 5000L, 5000L);
    }

    @Override
    public void consume(Long id, List<AlarmEventDTO> tasks) {
         // 异步执行任务
        this.consumerExecutor.submit(new EventTaskRunnable(tasks));
    }

    private static class EventTaskRunnable implements Runnable {
        private final List<AlarmEventDTO> tasks;

        private EventTaskRunnable(List<AlarmEventDTO> tasks) {
            this.tasks = tasks;
        }

        @Override
        public void run() {
            try {
                // 执行告警,详细实现见后
                OpenjobSpringContext.getBean(AlarmService.class).alarm(this.tasks);
            } catch (Throwable throwable) {
                log.error("Alarm event consume failed!", throwable);
            }
        }
    }
}

五、执行告警任务

前文我们讲到告警的策略有多种,具体采用哪种策略,是由任务决定。
所以,首先保存策略对应的实现,再取得任务的属性后,反查其实现类,最后执行调用。

  • 保存策略对应的告警实现
@Service
public class AlarmService {
    private final AlertRuleDAO alertRuleDAO;
    private final DelayDAO delayDAO;
    private final JobDAO jobDAO;
    private final AppDAO appDAO;
    private final Map<String, AlarmChannel> channelMap = new HashMap<>();

    @Autowired
    public AlarmService(List<AlarmChannel> channels, AlertRuleDAO alertRuleDAO, DelayDAO delayDAO, JobDAO jobDAO, AppDAO appDAO) {
        this.alertRuleDAO = alertRuleDAO;
        this.delayDAO = delayDAO;
        this.jobDAO = jobDAO;
        this.appDAO = appDAO;
        channels.forEach(c -> channelMap.put(c.channel().getType(), c));
    }

注入接口AlarmChannel的所有实现类(这种写法的好处是不需要枚举),在类实例化的时候,遍历所有的实现类,保存至Map集合。

private AlarmChannel getChannel(String alertMethod) {
        return Optional.ofNullable(this.channelMap.get(alertMethod))
                .orElseThrow(() -> new RuntimeException("Alarm method not supported! method=" + alertMethod));
    }

根据任务的属性,反查接口的实现类。

AlarmChannel channel = this.getChannel(r.getMethod());
channel.send(alarmDTO);

最后调用实现类的send()方法,这样就实现了告警的灵活配置。

六、总结

当遇到一个接口多种实现的时候,利用jdk的多态性和抽象类,源码实现可以看到设计模式中的策略模式与工厂模式。

通过本文,使用事件驱动机制,降低模块之间的耦合。

顺便说一下,openjob是支持延迟任务的,不过它的实现比较复杂,并没有采用常见的开源方案。

本文就告警模块的源码给出了一个梳理与分析,希望可以帮助到你。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/598757.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++实现二叉搜索树(模型)

目录 1.二叉搜索树的概念 2.二叉搜索树的实现 2.1总体代码预览 2.2各个函数实现原理 链表结构体 二叉搜索树的成员变量 二叉搜索树的插入 二叉搜索树的查找 二叉搜索树的遍历 二叉搜索树的删除 1.二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#…

CSS中文本样式(详解网页文本样式)

目录 一、Text介绍 1.概念 2.特点 3.用法 4.应用 二、Text语法 1.文本格式 2.文本颜色 3.文本的对齐方式 4.文本修饰 5.文本转换 6.文本缩进 7.color&#xff1a;设置文本颜色。 8.font-family&#xff1a;设置字体系列。 9.font-size&#xff1a;设置字体大小。…

做好源代码防泄密的10条准则

#深度好文计划# 近年来&#xff0c;电脑以及互联网应用在中国的普及和发展&#xff0c;已经深入到社会每个角落&#xff0c; 政府&#xff0c;经济&#xff0c;军事&#xff0c;社会&#xff0c;文化和人们生活等各方面都越来越依赖于电脑和网络。企业需要花费大量的时间精力去…

PC小程序解密及反编译

一、小程序包解密 小程序原始加密包位置C:\Users\administrator\Documents\WeChat Files\Applet\wx234324324324 二、wxappUnpacker反编译 npm install./bingo D:\temp\小程序包解密\wxpack\wx234324324324.wxapkg 三、查看反编译后的文件

Fluence Developer Rewards 国内 每个账号收2000元

# 国内有金主支持 每个账号收2000元 Fluence Developer Rewards account_line 白名单见附件 # 感兴趣的请留言 或加微信 Fluence Developer Rewards This repo allows one to generate a proof signature for Fluence dev reward claiming. 感兴趣 Caution Beware of s…

MapReduce的Shuffle过程

Shuffle是指从 Map 产生输出开始,包括系统执行排序以及传送Map输出到Reduce作为输入的过程. Shuffle 阶段可以分为 Map 端的 Shuffle 阶段和 Reduce 端的 Shuffle 阶段. Shuffle 阶段的工作过程,如图所示: Map 端的 Shuffle 阶段 1&#xff09;每个输入分片会让一个 Map 任务…

STM32F407VET6 学习笔记1:GPIO引脚认识分类与开发板原理图

今日学习STM32F407VET6 &#xff0c;首先从基本原理图、引脚方面开始做个初步理解并整理&#xff1a; 这里使用的学习开发板是在嘉立创购买的 立创梁山派天空星&#xff0c;芯片是 STM32F407VET6 主要对这个芯片的引脚做一些归纳认识、对开发学习板原理图设计进行认识理解:最…

上传文件至linux服务器失败

目录 前言异常排查使用df -h命令查看磁盘使用情况使用du -h --max-depth1命令查找占用空间最大的文件夹 原因解决补充&#xff1a;删除文件后&#xff0c;磁盘空间无法得到释放 前言 使用XFTP工具上传文件至CentOS服务器失败 异常 排查 使用df -h命令查看磁盘使用情况 发现磁盘…

IDEA中git的常用操作(保姆级教学)

IDEA中git的常用操作&#xff08;保姆级教学&#xff09; 以下是git的工作原理&#xff0c;觉得繁琐的可以跳过不看 Workspace&#xff1a;工作区 (平时存放代码的地方) Index / Stage&#xff1a;暂存区&#xff08;用于临时存放存放你的改动&#xff0c;事实上就是一个文件&…

电脑实时监控软件分享|好用实时屏幕监控软件有哪些?

在当今数字化工作环境和远程办公日益普及的背景下&#xff0c;电脑实时监控软件成为了企业管理、教育监控、家庭监护等多个领域的必备工具。这些软件不仅能够帮助管理者实时了解员工的工作状态&#xff0c;确保工作效率&#xff0c;还能有效防止数据泄露&#xff0c;保护企业或…

现场面试题

这里写目录标题 1.sql1.1 只保留学生的最新成绩1.2 统计通话号码数1.3 更新地址 2.基础题2.1 请求序列第N位的值: 0, 1, 1, 2, ,3, 5, 8, 13, 21, 34.....第N位的值2.2 请写一段java代码&#xff0c;输出存在重复字母的单词 1.sql 1.1 只保留学生的最新成绩 表student中记录学…

Vue-组件中的data

一个组件的data选项必须是一个函数。保证每个组件实例&#xff0c;维护独立的一份数据对象。如下图&#xff1a; 组件一旦封装好了&#xff0c;可以使用多次&#xff0c;比如数字框组件使用了三次&#xff1a; 每次创建新的组件实例&#xff0c;都会重新执行一次data函数&#…

leetcode-字符串的排列-100

题目要求 思路 1.因为只涉及到字符&#xff0c;因此可以进行排序 2.创建临时字符串&#xff0c;当临时字符串temp的长度等于str的长度&#xff0c;作为判出条件。 3.创建一个标记的数组&#xff0c;每次在temp中插入一个字符&#xff0c;便在对应的数组下标设置为1&#xff0c…

【PDF技巧】PDF限制编辑密码忘记了,如何编辑文件?

PDF文件打开之后&#xff0c;发现编辑功能都是灰色的&#xff0c;无法使用&#xff0c;无法编辑PDF文件&#xff0c;遇到这种情况&#xff0c;是因为PDF文件设置了限制编辑导致的。一般情况下&#xff0c;我们只需要输入PDF密码&#xff0c;将限制编辑取消就可以正常编辑文件了…

【ARM Cortex-M3指南】8:中断行为

文章目录 八、中断行为8.1 中断/异常流程8.1.1 压栈8.1.2 取向量8.1.3 寄存器更新 8.2 异常退出8.3 嵌套中断8.4 末尾连锁中断8.5 延迟到达8.6 进一步了解异常返回值8.7 中断等待8.8 中断相关的错误8.8.1 压栈8.8.2 出栈8.8.3 取向量8.8.4 非法返回 八、中断行为 8.1 中断/异常…

GPU术语

可向量化循环 可向量化循环通常是指在编程中&#xff0c;能够被转换为向量操作或矩阵运算的循环结构。 在传统编程中&#xff0c;对于数组或向量中的每个元素执行相同的操作时&#xff0c;开发者可能会使用for循环逐一进行处理。然而&#xff0c;许多现代编程语言和库提供了向…

从哪些方面可以看出现货黄金价格走势?

现货黄金价格的走势受到多种因素的影响&#xff0c;我们可以从宏观经济环境、货币政策、供需关系、市场情绪和技术分析几个主要方面来观察和分析这一贵金属的价格动态。现货黄金作为全球投资市场中的避险资产&#xff0c;其价格波动往往能体现出复杂的经济和政治变化。 宏观经济…

(论文阅读-优化器)Orca: A Modular Query Optimizer Architecture for Big Data

目录 摘要 一、简介 二、背景知识 2.1 大规模并行处理 2.2 SQL on Hadoop 三、Orca架构 四、查询优化 4.1 优化工作流 4.2 并行查询优化 五、Metadata Exchange 六、可行性 6.1 Minimal Repros 6.2 优化器准确性测试 七、实验 八、相关工作 8.1 查询优化基础 8…

Python专题:二、Python小游戏,体验Python的魅力

希望先通过一个小的游戏让大家先对Python感兴趣&#xff0c;兴趣是最好的老师。 小游戏的运行结果&#xff1a; 1、在sublime编辑器里面写如下代码&#xff1a; import randomnum random.randint(1, 100) # 获得一个随机数 is_done False # 是否猜中的标记 count 0 # 玩…

视频号小店怎么做?品从哪里来?如何推广售卖?一文详解

大家好&#xff0c;我是电商笨笨熊 视频号小店作为一个刚推出不久的项目&#xff0c;可谓是站在风口&#xff0c;遍地红利&#xff0c;也正是我们进入的最佳时机。 但是面对一个新的项目&#xff0c;自是存在着多种疑问&#xff0c;尤其是对于一些从未踏足电商市场的新手玩家…
最新文章