apache atlas 如何自定义hook

atals 是开源的数据元数据和数据资产管理平台,平台设计支持强大的图数数据库,nosql,和搜索引擎3个组件构建。都是基于开源构建。

 目前市场上开源的元数据管理工具有Atlas, Datahub, Openmetadata等,你要说二次开发,谁最好,如果是java 开发,还是 Atlas ,灵活,简单。其他两个都要会python,多种语言。

atlas 虽然支持,hbase,hive,impala,sqoop等这些组件的实时元数据采集。但是其他的可以采用自定义hook来实现钩子函数。下图是一个钩子函数的流程:

我们了解钩子函数先了解,数据源,所谓钩子函数,其实是需要源系统配合,这个其实就是源系统的一个监听机制,就是在客户端(写sql)——执行端,在中间有个监听程序,可以获取sql解析过程。如果源系统没有,那就不能实现监听数据获取。

其实不会写监听程序,atlas 也好处理,中间的kafka 就是一个实时监听通道,只要你按照atlas 的格式要求,提交监控程序,就可以实现元数据管理。kafka 有两个topic:ATLAS_HOOK_TOPICATLAS_ENTITIES_TOPIC 。只要满足这两个topic 的数据格式,可以实时写入元数据。

Atlas 在元数据管理,主要分为两部分API和kafka.在kafka之前我们先说一下什么是model .

其实models 类似我们的jdbc连接或者是presto 的catalog 信息。这个元数据的注册信息。就是你连接的是什么数据库,什么程序,字段,表,视图等这些信息需要进行注册,毕竟不同的库,这些信息不一样,比如hive 和hbase 的属性肯定不一样。那就需要建设model ,建model 有两种方式,一种是java API 

另外一个是通过model json 进行提交

源码里面有很多的json model文件

curl -i -X POST -H "Content-Type: application/json" -d '{
    "enumTypes": [],
    "structTypes": [],
    "classificationDefs": [],
    "entityDefs": [
        {
      "category": "ENTITY",
      "version": 1,
      "name": "clickhouse_db",
      "description": "clickhouse_db",
      "typeVersion": "1.0",
      "serviceType": "clickhouse",
      "attributeDefs": [
        {
          "name": "location",
          "typeName": "string",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": 5
        },
        {
          "name": "clusterName",
          "typeName": "string",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": 8
        },
        {
          "name": "parameters",
          "typeName": "map<string,string>",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1
        },
        {
          "name": "ownerType",
          "typeName": "string",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1
        }
      ],
      "superTypes": [
        "DataSet"
      ],
      "subTypes": [],
      "relationshipAttributeDefs": [
        {
          "name": "inputToProcesses",
          "typeName": "array<Process>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "dataset_process_inputs",
          "isLegacyAttribute": false
        },
        {
          "name": "schema",
          "typeName": "array<avro_schema>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "avro_schema_associatedEntities",
          "isLegacyAttribute": false
        },
        {
          "name": "tables",
          "typeName": "array<clickhouse_table>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "clickhouse_table_db",
          "isLegacyAttribute": false
        },
        {
          "name": "meanings",
          "typeName": "array<AtlasGlossaryTerm>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "AtlasGlossarySemanticAssignment",
          "isLegacyAttribute": false
        },
        {
          "name": "outputFromProcesses",
          "typeName": "array<Process>",
          "isOptional": true,
          "cardinality": "SET",
          "valuesMinCount": -1,
          "valuesMaxCount": -1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1,
          "relationshipTypeName": "process_dataset_outputs",
          "isLegacyAttribute": false
        }
      ],
      "businessAttributeDefs": {}
    }
    ],
    "relationshipDefs": []
}' --user admin:admin "http://localhost:21000/api/atlas/v2/types/typedefs"
 
 
 
 

这一步是要注册数据库类型:注册数据库,注册数据表,注册字段等

下一步要对,库-表,字段进行关系映射

#/v2/types/typedefs
{
  "entityDefs": [],
  "classificationDefs": [],
  "structDefs": [],
  "enumDefs": [],
  "relationshipDefs": [
    {
      "category": "RELATIONSHIP",
      "version": 1,
      "name": "clickhouse_table_db",
      "description": "clickhouse_table_db",
      "typeVersion": "1.0",
      "serviceType": "clickhouse",
      "attributeDefs": [],
      "relationshipCategory": "AGGREGATION",
      "propagateTags": "NONE",
      "endDef1": {
        "type": "clickhouse_table",
        "name": "db",
        "isContainer": false,
        "cardinality": "SINGLE",
        "isLegacyAttribute": false
      },
      "endDef2": {
        "type": "clickhouse_db",
        "name": "tables",
        "isContainer": true,
        "cardinality": "SET",
        "isLegacyAttribute": false
      }
    },
    {
      "category": "RELATIONSHIP",
      "version": 1,
      "name": "clickhouse_table_columns",
      "description": "clickhouse_table_columns",
      "typeVersion": "1.0",
      "serviceType": "clickhouse",
      "attributeDefs": [],
      "relationshipCategory": "COMPOSITION",
      "propagateTags": "NONE",
      "endDef1": {
        "type": "clickhouse_table",
        "name": "columns",
        "isContainer": true,
        "cardinality": "SET",
        "isLegacyAttribute": false
      },
      "endDef2": {
        "type": "clickhouse_column",
        "name": "table",
        "isContainer": false,
        "cardinality": "SINGLE",
        "isLegacyAttribute": false
      }
    },
    {
      "category": "RELATIONSHIP",
      "version": 1,
      "name": "clickhouse_table_storagedesc",
      "description": "clickhouse_table_storagedesc",
      "typeVersion": "1.0",
      "serviceType": "clickhouse",
      "attributeDefs": [],
      "relationshipCategory": "ASSOCIATION",
      "propagateTags": "NONE",
      "endDef1": {
        "type": "clickhouse_table",
        "name": "sd",
        "isContainer": false,
        "cardinality": "SINGLE",
        "isLegacyAttribute": false
      },
      "endDef2": {
        "type": "clickhouse_storagedesc",
        "name": "table",
        "isContainer": false,
        "cardinality": "SINGLE",
        "isLegacyAttribute": false
      }
    }
  ]
}

关系是 数据库-表-字段-属性等关系映射,这个是为了映射跳转。

第二步:kafka写数据

写入数据,可以通过api调研,也可以通过kafka 提交:

{
    "version": {
        "version": "1.0.0",
        "versionParts": Array[1]
    },
    "msgCompressionKind": "NONE",
    "msgSplitIdx": 1,
    "msgSplitCount": 1,
    "msgSourceIP": "10.45.1.116",
    "msgCreatedBy": "bi",
    "msgCreationTime": 1710575827820,
    "message": {
        "type": "ENTITY_CREATE_V2",
        "user": "bi",
        "entities": {
            "entities": [
                {
                    "typeName": "clickhouse_table",
                    "attributes": {
                        "owner": "bi",
                        "ownerType": "USER",
                        "sd": Object{...},
                        "tableType": "MANAGED",
                        "createTime": 1710575827000,
                        "qualifiedName": "test.wuxl_0316_ss@primary",
                        "columns": [
                            Object{...},
                            Object{...}
                        ],
                        "name": "wuxl_0316_ss",
                        "comment": "测试表",
                        "parameters": {
                            "transient_lastDdlTime": "1710575827"
                        },
                        "db": {
                            "typeName": "clickhouse_db",
                            "attributes": {
                                "owner": "bi",
                                "ownerType": "USER",
                                "qualifiedName": "test@primary",
                                "clusterName": "primary",
                                "name": "test",
                                "description": "",
                                "location": "hdfs://HDFS80727/bi/test.db",
                                "parameters": {

                                }
                            },
                            "guid": "-861237351166886",
                            "version": 0,
                            "proxy": false
                        }
                    },
                    "guid": "-861237351166888",
                    "version": 0,
                    "proxy": false
                },
                Object{...},
                Object{...},
                Object{...},
                Object{...}
            ]
        }
    }
}

可以通过flink 提交

-- 使用Flinksql往Atlas自带的topic里写消息
CREATE TABLE ads_zdm_offsite_platform_daren_rank_df_to_kafka (
        data string
) WITH (
  'connector' = 'kafka',
  'topic' = 'ATLAS_HOOK',
  'properties.bootstrap.servers' = 'localhost:9092', 
  'format' = 'raw'
);
 
insert into ads_zdm_offsite_platform_daren_rank_df_to_kafka
select '{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"10.45.1.116","msgCreatedBy":"bi","msgCreationTime":1710575827820,"message":{"type":"ENTITY_CREATE_V2","user":"bi","entities":{"entities":[{"typeName":"clickhouse_table","attributes":{"owner":"bi","ownerType":"USER","sd":{"typeName":"clickhouse_storagedesc","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary_storage","name":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","location":"hdfs://HDFS80727/bi/test.db/wuxl_0316_ss","compressed":false,"inputFormat":"org.apache.hadoop.mapred.TextInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","parameters":{"serialization.format":"1"}},"guid":"-861237351166887","version":0,"proxy":false},"tableType":"MANAGED","createTime":1710575827000,"qualifiedName":"test.wuxl_0316_ss@primary","columns":[{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_1@primary","name":"column_tt_1","comment":"测试字段1","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166890","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_2@primary","name":"column_tt_2","comment":"测试字段2","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166891","version":0,"proxy":false}],"name":"wuxl_0316_ss","comment":"测试表","parameters":{"transient_lastDdlTime":"1710575827"},"db":{"typeName":"clickhouse_db","attributes":{"owner":"bi","ownerType":"USER","qualifiedName":"test@primary","clusterName":"primary","name":"test","description":"","location":"hdfs://HDFS80727/bi/test.db","parameters":{}},"guid":"-861237351166886","version":0,"proxy":false}},"guid":"-861237351166888","version":0,"proxy":false},{"typeName":"clickhouse_db","attributes":{"owner":"bi","ownerType":"USER","qualifiedName":"test@primary","clusterName":"primary","name":"test","description":"","location":"hdfs://HDFS80727/bi/test.db","parameters":{}},"guid":"-861237351166886","version":0,"proxy":false},{"typeName":"clickhouse_storagedesc","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary_storage","name":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","location":"hdfs://HDFS80727/bi/test.db/wuxl_0316_ss","compressed":false,"inputFormat":"org.apache.hadoop.mapred.TextInputFormat","outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","parameters":{"serialization.format":"1"}},"guid":"-861237351166887","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_1@primary","name":"column_tt_1","comment":"测试字段1","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166890","version":0,"proxy":false},{"typeName":"clickhouse_column","attributes":{"qualifiedName":"test.wuxl_0316_ss.column_tt_2@primary","name":"column_tt_2","comment":"测试字段2","type":"string","table":{"typeName":"clickhouse_table","attributes":{"qualifiedName":"test.wuxl_0316_ss@primary"},"guid":"-861237351166888","version":0,"proxy":false}},"guid":"-861237351166891","version":0,"proxy":false}]}}}' as data
;
 

atlas 在自定义表,应用程序,报表等都有很方便的接口,可以通过接口或者kafka提交实时的变更信息,方便实时监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/615728.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

进程间通信:连接不同程序世界的桥梁

目录 一、进程间通信的重要性 二、常见的进程间通信方式 三、进程间通信的目的 四、进程间通信的本质 在计算机编程的领域中&#xff0c;进程间通信&#xff08;Inter-Process Communication&#xff0c;IPC&#xff09;是一个至关重要的概念。当我们在操作系统中运行多个程…

Vue中进行粘贴板粘贴数据(图片、文字等)

在页面中如果需要进行粘贴数据&#xff0c;那么就要读取系统粘贴板clipboard&#xff0c;通过此Api来进行粘贴板数据的操作。 目录: 一.封装相关函数1.示例代码&#xff1a;2.代码解释&#xff1a; 二.页面中进行粘贴1.代码示例&#xff1a;2.代码解释&#xff1a; 三.运行结果…

linux day 3

touch 创建文件命令 cat命令&#xff0c;查看文件内容 more命令&#xff0c;查看文件内容。 cat是直接全部显示出来&#xff0c;more是支持翻页&#xff0c;即文件内容过多可以一页一页显示&#xff08;按空格翻页&#xff0c;按Q进行退出&#xff09; cp命令&#xff0c;复制…

数据结构深入理解--栈

目录 一、栈的定义 二、栈的实现 2.1 栈的结构 2.2 栈的初始化 2.3 栈的销毁 2.3 栈元素的插入 2.4 栈元素的删除 2.5 栈顶元素获取 2.6 栈元素有效个数获取 2.7 栈是否为空判断 三、代码总览 Stack.h Stack.c 测试代码:test.c 四、例题 例一&#xff1a; 例二&#xff…

llm.c的Makefile

源码 CC ? clang CFLAGS -Ofast -Wno-unused-result -Wno-ignored-pragmas -Wno-unknown-attributes LDFLAGS LDLIBS -lm INCLUDES CFLAGS_COND -marchnative# Find nvcc SHELL_UNAME $(shell uname) REMOVE_FILES rm -f OUTPUT_FILE -o $ CUDA_OUTPUT_FILE -o $# N…

5.神经网络-激活函数

目录 1. 激活函数不是阶跃函数 1.1 激活函数和阶跃函数都是非线性函数 1.2 激活函数不是阶跃函数 2. sigmoid 函数 2.1 sigmoid 函数表达式 2.2 sigmoid 函数 Python 实现 2.4 sigmoid 函数图 3. ReLU 函数 3.1 ReLU 函数表达式 3.2 ReLU 函数 Python 实现 3.4 ReLU…

Chatgpt的应用场景

文案创作类&#xff1a; 作为一名大型语言模型&#xff0c;ChatGPT可以为使用者提供多种文本处理和文字创作方面的服务&#xff0c;例如&#xff1a; 文本生成和创作 ChatGPT可以基于您提供的主题、关键词或文本段落&#xff0c;生成符合使用者要求的新文本。这些文本可以是文…

Golang | Leetcode Golang题解之第83题删除排序链表中的重复元素

题目&#xff1a; 题解&#xff1a; func deleteDuplicates(head *ListNode) *ListNode {if head nil {return nil}cur : headfor cur.Next ! nil {if cur.Val cur.Next.Val {cur.Next cur.Next.Next} else {cur cur.Next}}return head }

C++的数据结构(二)

一、链表的基本概念 链表&#xff08;Linked List&#xff09;是一种物理存储单元上非连续的、非顺序的线性数据结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的。链表由一系列节点&#xff08;链表中每一个元素称为节点&#xff09;组成&#xff0c;节点…

(二刷)代码随想录第4天|24. 两两交换链表中的节点 ● 19.删除链表的倒数第N个节点

24. 两两交换链表中的节点 24. 两两交换链表中的节点 - 力扣&#xff08;LeetCode&#xff09; 代码随想录 (programmercarl.com) 帮你把链表细节学清楚&#xff01; | LeetCode&#xff1a;24. 两两交换链表中的节点_哔哩哔哩_bilibili 给你一个链表&#xff0c;两两交换其…

【计算机网络】物理层传输介质 习题3

双绞线是用两根绝缘导线绞合而成的&#xff0c;绞合的目的是( )。 A.减少干扰 B.提高传输速度 C.增大传输距离 D.增大抗拉强度 在电缆中采用屏蔽技术带来的好处主要是( ) A.减少信号衰减 B. 减少电磁干扰辐射 C.减少物理损坏 D. 减少电缆的阻抗 利用一根同轴电缆互连主机构成…

docker安装时报错:Error: Nothing to do

安装docker时报以下错误 解决方法&#xff1a; 1.下载关于docker的相关依赖环境 yum -y install yum-utils device-mapper-persistent-data lvm22.设置下载Docker的镜像源 yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo3…

Java 常见集合类

集合的整体框架 Java 的集合&#xff0c;也可以叫做容器&#xff0c;根据集合的整体框架可以看出&#xff0c;主要是两大集合接口&#xff1a;第一个是 Collection 接口&#xff0c;主要用来存放单一的元素对象&#xff1b;另一个是 Map 接口&#xff0c;主要用于存储键值对。…

Linux·基本指令

从本节开始将新开一个关于Linux操作系统的板块&#xff0c;其实Linux也没什么太神秘的&#xff0c;就是一个操作系统(OS)嘛&#xff0c;跟Windows操作系统是一个概念&#xff0c;只不过Windows中的大部分操作都是用光标点击来进行人机交互&#xff0c;但是Linux是通过输入命令行…

SQLite性能测试(插入)

最近一直在思考一个问题&#xff0c;SQLite 做到这么轻量级&#xff0c;那它注定不会像 MySql 一样强性能&#xff0c;那么它的性能怎么样呢&#xff1f;并发量多高呢&#xff1f; 官方解释&#xff1a; About SQLite 最大数据库大小&#xff1a;281TB 最大行大小&#xff1…

俄罗斯方块的代码实现

文章目录 首先是头文件的引入部分接下来是一些预处理指令接下来定义了两个结构体&#xff1a;接下来是全局变量g_hConsoleOutput&#xff0c;用于存储控制台输出句柄。之后是一系列函数的声明最后是main函数源码 首先是头文件的引入部分 包括stdio.h、string.h、stdlib.h、tim…

Neo4j 之安装和 CQL 基本命令学习

正常使用结构化的查询语言 SQL&#xff08;Structured Query Language&#xff09;较多一些&#xff0c;但是像 Neo4j 这种非结构化的图形数据库来说&#xff0c;就不得不学习下 CQL&#xff08;Cypher Query Language&#xff09;语言了。如果你之前学过 《离散数学》或《图论…

【北京迅为】《iTOP-3588从零搭建ubuntu环境手册》-第6章 安装Samba

RK3588是一款低功耗、高性能的处理器&#xff0c;适用于基于arm的PC和Edge计算设备、个人移动互联网设备等数字多媒体应用&#xff0c;RK3588支持8K视频编解码&#xff0c;内置GPU可以完全兼容OpenGLES 1.1、2.0和3.2。RK3588引入了新一代完全基于硬件的最大4800万像素ISP&…

Debian安装Redis、RabbitMQ、Nacos

安装Redis&#xff1a; 启动Redis、开机自启动 sudo systemctl start redis-server #启动sudo systemctl enable redis-server #开机自启 Redis状态(是否在运行) sudo systemctl status redis-server #查看运行状态 redis-cli ping # 客户端尝试连接 安装RabbitMQ&#xff0c;…

Rust学习笔记(中)

前言 笔记的内容主要参考与《Rust 程序设计语言》&#xff0c;一些也参考了《通过例子学 Rust》和《Rust语言圣经》。 Rust学习笔记分为上中下&#xff0c;其它两个地址在Rust学习笔记&#xff08;上&#xff09;和Rust学习笔记&#xff08;下&#xff09;。 错误处理 pani…