[自研开源] MyData 数据集成之数据过滤 v0.7.2

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

概述

本篇基于 数据集成之任务流程 介绍任务执行时的数据过滤的使用场景和配置操作。

使用场景

业务系统与mydata集成时,核心是数据的来和去,在这两个方向上分别实现:数据预清洗数据权限控制

  1. 数据预清洗,从api获取数据后 过滤排除掉“脏”数据,然后再入库用于数据集成;

    在这里插入图片描述

    例如:接口返回的某字段值不能为空、字段值长度在指定范围等;

    以下代码是 提供数据 类型的任务执行过程:

    // 提供数据
    case MdConstant.DATA_PRODUCER:
        // 调用api 获取json
        String json = ApiUtil.read(taskInfo);
        // 将json按字段映射 解析为业务数据
        jobDataService.parseData(taskInfo, json);
        // 根据条件过滤数据
        jobDataFilterService.doFilter(taskInfo);
        // 保存业务数据
        jobDataService.saveTaskData(taskInfo);
        // 更新环境变量
        jobVarService.saveVarValue(taskInfo, json);
    	break;
    

    jobDataFilterService.doFilter 是对数据的预过滤处理,详见 JobDataFilterService.java

    public void doFilter(TaskInfo task) {
        Assert.notNull(task);
    	// 获取业务数据
        List<Map> dataList = task.getProduceDataList();
        // 获取配置的过滤条件
        List<BizDataFilter> dataFilters = task.getDataFilters();
    
        if (CollUtil.isEmpty(dataList) || CollUtil.isEmpty(dataFilters)) {
            return;
        }
    
        // 定义新的数据集合,用于存储 过滤后的数据
        List<Map> filterDatas = ListUtil.toList();
        // 遍历数据,并进行过滤
        dataList.forEach(data -> {
    
            boolean isCorrect = false;
    
            for (BizDataFilter filter : dataFilters) {
                String key = filter.getKey();
                Object filterValue = filter.getValue();
                String op = filter.getOp();
    
                // 当数据中 不包含 过滤的字段名,则执行下一项过滤
                if (!data.containsKey(key)) {
                    continue;
                }
    
                // 当数据中 指定字段的值 无效,则过滤该数据
                Object dataValue = data.get(key);
                if (ObjectUtil.isNull(dataValue)) {
                    isCorrect = true;
                    break;
                }
    
                // 判断业务数据值 和 过滤数据值 都可对比,否则过滤条件无效
                if (!(dataValue instanceof Comparable && filterValue instanceof Comparable)) {
                    break;
                }
    
                String cDataValue = dataValue.toString();
                String cFilterValue = filterValue.toString();
                // 根据op类型,过滤数据
                switch (op) {
                    case MdConstant.DATA_OP_EQ:
                        // 等于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) == 0);
                        break;
                    case MdConstant.DATA_OP_NE:
                        // 不等于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) != 0);
                        break;
                    case MdConstant.DATA_OP_GT:
                        // 大于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) > 0);
                        break;
                    case MdConstant.DATA_OP_GTE:
                        // 大于等于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) >= 0);
                        break;
                    case MdConstant.DATA_OP_LT:
                        // 小于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) < 0);
                        break;
                    case MdConstant.DATA_OP_LTE:
                        // 小于等于
                        isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) <= 0);
                        break;
    
                    default:
                        throw new RuntimeException("JobDataFilter: 不支持的过滤操作");
                }
            }
    
            // 当 未被过滤,则添加到过滤结果
            if (isCorrect) {
                filterDatas.add(data);
            }
        });
    
        task.setProduceDataList(filterDatas);
    
        task.appendLog("过滤前的业务数据:{}", dataList);
        task.appendLog("过滤条件:{}", dataFilters);
        task.appendLog("过滤后的业务数据:{}", filterDatas);
    }
    

    注:目前0.7版本暂时实现了关系运算,后续增加函数处理;

  2. 数据权限控制,限制应用接收的数据范围,即符合条件的数据才能共享给应用;

    在这里插入图片描述

    以下代码是 消费数据 类型任务的执行过程:

    // 消费数据
    case MdConstant.DATA_CONSUMER:
        List<BizDataFilter> filters = taskInfo.getDataFilters();
        if (CollUtil.isNotEmpty(filters)) {
            // 解析过滤条件值中的 自定义字符串
            parseFilterValue(filters);
            // 排除值为null的条件
            filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
        }
        String dataCode = taskInfo.getDataCode();
        if (StrUtil.isNotEmpty(dataCode)) {
            // 根据过滤条件 查询数据
            List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
            taskInfo.setConsumeDataList(dataList);
            // 根据字段映射转换为api参数
            jobDataService.convertData(taskInfo);
        }
        // 调用api传输数据
        ApiUtil.write(taskInfo);
        break;
    

    bizDataDAO.list 方法支持按配置条件查询数据,详见 BizDataDAO.java

    public List<Map> list(String dbCode, String dataCode, List<BizDataFilter> bizDataFilters) {
            MongoTemplate mongoTemplate = mongoFactory.getTemplate(dbCode);
            Query query = new Query();
            // 遍历数据过滤条件
            if (CollUtil.isNotEmpty(bizDataFilters)) {
                // mongodb的查询条件集合
                List<Criteria> criteriaList = CollUtil.newArrayList();
                for (BizDataFilter bizDataFilter : bizDataFilters) {
                    // 条件key
                    String key = bizDataFilter.getKey();
                    // 条件操作
                    String op = bizDataFilter.getOp();
                    // 条件值
                    Object value = bizDataFilter.getValue();
    
                    // 根据条件操作类型 调用mongodb对应的查询方法
                    Criteria criteria = Criteria.where(key);
                    switch (op) {
                        case MdConstant.DATA_OP_EQ:
                            criteria.is(value);
                            break;
                        case MdConstant.DATA_OP_NE:
                            criteria.ne(value);
                            break;
                        case MdConstant.DATA_OP_GT:
                            criteria.gt(value);
                            break;
                        case MdConstant.DATA_OP_GTE:
                            criteria.gte(value);
                            break;
                        case MdConstant.DATA_OP_LT:
                            criteria.lt(value);
                            break;
                        case MdConstant.DATA_OP_LTE:
                            criteria.lte(value);
                            break;
    
                        default:
                            throw new RuntimeException("BizDataDAO: 不支持的过滤操作");
                    }
                    // 存入mongodb的查询条件集合
                    criteriaList.add(criteria);
                }
    
                // mongodb查询条件集合 加入查询中
                query.addCriteria(new Criteria().andOperator(criteriaList));
            }
    
            // 执行查询
            return mongoTemplate.find(query, Map.class, dataCode);
        }
    

配置操作

  1. 创建任务过程请参考 使用手册

  2. 在创建任务界面中,添加数据过滤条件

    如下图过滤条件是 salary > 600
    在这里插入图片描述

  3. 执行任务后 通过日志详情可以看到数据入库前预清洗;
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/463377.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

spring boot nacos注册微服务示例demo_亲测成功

spring boot nacos注册微服务示例demo_亲测成功 先安装好Nacos Nacos安装使用 创建Maven项目 结构如图 例如项目名为: test-demo 下面有个子模块: test-demo-data-process 父模块pom.xml <?xml version"1.0" encoding"UTF-8"?> <project …

【Flink SQL】Flink SQL 基础概念(三):SQL 动态表 连续查询

《Flink SQL 基础概念》系列&#xff0c;共包含以下 5 篇文章&#xff1a; Flink SQL 基础概念&#xff08;一&#xff09;&#xff1a;SQL & Table 运行环境、基本概念及常用 APIFlink SQL 基础概念&#xff08;二&#xff09;&#xff1a;数据类型Flink SQL 基础概念&am…

数据有噪声?滤它!Python数据滤波详解

文章目录 维纳滤波巴特沃斯滤波器中值滤波排序滤波 Python科学计算&#xff1a;数组&#x1f4af;数据生成&#x1f4af;数据交互&#x1f4af;微积分&#x1f4af;插值&#x1f4af;拟合&#x1f4af;FFT&#x1f4af;卷积 维纳滤波 信号经过系统之后&#xff0c;相当于进行…

简单的arduino实验理解串口通信(uart为例)独立硬件的信息交互

前言 接触过单片机的人都知道串口通信&#xff0c;可以通过另一个短文了解,其中入门的应该就是串口通信了。UART全拼的个人理解为通用的异步接收和发送。常见两根短线作为通信线&#xff0c;一般使用TXD和RXD标记。对于两块通信的芯片来说&#xff0c;接收和发送是相对的&…

Stargo 管理部署 Starrocks 集群

配置主机间 ssh 互信 ssh-copy-id hadoop02 ssh-copy-id hadoop03配置系统参数 ############################ Swap检查 ############################ echo 0 | sudo tee /proc/sys/vm/swappiness########################### 内核参数检查 ########################## echo…

PHP+golang开源办公系统CRM管理系统

基于ThinkPHP6 Layui MySQL的企业办公系统。集成系统设置、人事管理、消息管理、审批管理、日常办公、客户管理、合同管理、项目管理、财务管理、电销接口集成、在线签章等模块。系统简约&#xff0c;易于功能扩展&#xff0c;方便二次开发。 服务器运行环境要求 PHP > 7.…

2.3 物理层设备

2.3 物理层设备 &#xff08;一&#xff09;中继器 产生原因 由于存在损耗&#xff0c;在线路上传输的信号功率会逐渐衰减&#xff0c;衰减到一定程度时将造成信号失真&#xff0c;因此会导致接收错误。 中继器的功能 对信号进行再生和还原&#xff0c;对衰减的信号进行放大…

ArkTs的资源Resource类型怎么转为string

使用ResourceManager同步转换 请参看&#xff1a;ResourceManager.getStringSync9 例子&#xff1a; try { let testStr: string this.context.resourceManager.getStringSync($r(app.string.test).id); } catch (error) { console.error(getStringSync failed, error code…

GEE数据集——全球( 30 弧秒)尺度地下水模型GLOBGM v1.0数据集

全球尺度地下水模型GLOBGM v1.0 GLOBGM v1.0 数据集是全球地下水建模的一个重要里程碑&#xff0c;提供了 30 弧秒 PCR-GLOBWB-MODFLOW 模型的并行实施。该数据集由 Jarno Verkaik 等人开发&#xff0c;以赤道约 1 公里的空间分辨率全面展示了全球地下水动态。该数据集利用两个…

VUE-组件间通信(一)props

props 1、单向绑定 props是父组件给子组件传输数据 当父组件的属性变化时&#xff0c;将传导给子组件&#xff0c;但是反过来不会 2、使用示例 子组件&#xff08;类似于方法&#xff09; <template> <div><h2>姓名:{{ name }}</h2><h2>性别:{{…

前端之CSS 创建css--行内引入、内联样式、外联样式

创建css有三种创建样式&#xff0c;行内引入、内联引入、外联引入。 行内引入 在行内标签引入 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>行内样式</title> </head> <body>…

Ubuntu 虚拟机安装

最小化安装后常用工具 sudo apt-get install vim# ifconfig apt install net-tools # nload apt install nload # 很多都要用到 apt install build-essential # 开发相关 apt install gcc gapt install iproute2 ntpdate tcpdump telnet traceroute \ nfs-kernel-server nfs…

mac打开exe文件的三大方法 mac怎么运行exe文件 mac打开exe游戏 macbookpro打开exe

exe文件是Windows系统的可执行文件&#xff0c;虽然Mac系统上无法直接打开exe文件&#xff0c;但是你可以在Mac电脑上安装双系统或者虚拟机来实现mac电脑上运行exe文件。除了这两种方法之外&#xff0c;你还可以在Mac电脑上使用类虚拟机软件打开exe文件&#xff0c;这三种方法各…

Java学习笔记------常用API(五)

爬虫 从网站中获取 import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; import java.util.regex.Matcher; import java.util.reg…

动态规划(算法竞赛、蓝桥杯)--单调队列优化烽火传递

1、B站视频链接&#xff1a;E43【模板】单调队列优化DP 烽火传递_哔哩哔哩_bilibili 题目链接&#xff1a;https://loj.ac/p/10180 #include <bits/stdc.h> using namespace std; const int N2e510; int n,m,w[N],f[N],q[N];int main(){cin>>n>>m;for(int …

生产线上的“变形金刚”:码垛机器人的崛起

在工业的森林里&#xff0c;有一种神奇的生物——码垛机器人。它们以精确无误的动作和不知疲倦的身躯&#xff0c;在生产线上演绎着一幕幕现代版的“变形金刚”。这些机械奇才不仅解放了人类的双手&#xff0c;更是以它们的“魔法”提升了生产效率&#xff0c;降低了成本&#…

[SAP ABAP] 使用事务码SU3改变日期与时间格式

当我们执行上述代码&#xff0c;返回结果如下所示 我们发现获取当前系统日期返回的日期格式并不是MM/DD/YYYY&#xff0c;而是YYYY.MM.DD的日期格式&#xff0c;那么我们怎样才能使得MM/DD/YYYY这种日期格式生效&#xff1f; 我们可以使用事务码SU3来改变日期或时间格式 配置完…

【强化学习笔记一】初识强化学习(定义、应用、分类、性能指标、小车上山案例及代码)

文章目录 第1章 初识强化学习1.1 强化学习及其关键元素1.2 强化学习的应用1.3 强化学习的分类1.3.1 按任务分类1.3.2 按算法分类 1.4 强化学习算法的性能指标1.5 案例&#xff1a;基于Gym库的智能体/环境接口1.5.1 安装Gym库1.5.2 使用Gym库1.5.3 小车上山1.5.3.1 有限动作空间…

软件实例,餐厅酒水寄存管理系统软件,酒水寄存登记表软件操作教程

软件实例&#xff0c;餐厅酒水寄存管理系统软件&#xff0c;酒水寄存登记表软件操作教程 一、前言 以下软件操作以 佳易王酒水寄存管理系统软件V16.0为例说明 件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 1、酒水寄存管理系统软件可以管理多个品类的物…
最新文章