探索利用 LineageLogger 获取hive的字段级血缘关系

apache hive 源码中有 org.apache.hadoop.hive.ql.hooks.LineageLogger 类可以获取 insert hql 的字段之间的关系。但是又由于 org.apache.hadoop.hive.ql.optimizer.Optimizer的原因,使我们重写 hook 类无法实现字段级血缘。

  if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_LINEAGE_INFO) // 版本 4.0+加入
      || postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")
      || postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")
      // 版本 2.3 加入
      || postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {
    transformations.add(new Generator(postExecHooks));
  }

现在考虑通过LineageLogger 搭配日志监测服务来实现字段级血缘

  1. 加入插件 conf/hive-site.xml
  <property>
    <name>hive.exec.post.hooks</name>
    <value>org.apache.hadoop.hive.ql.hooks.LineageLogger</value>
  </property>
  1. 打开日志 conf/log4j.properties
log4j.logger.org.apache.hadoop.hive.ql.hooks.LineageLogger=INFO
  1. hive任务日志目录
>set system:hive.log.dir; # 服务日志

>set hive.querylog.location; #查询日志


/tmp/hive-{用户名}/

4.写脚本监测

# -*- coding: utf-8 -*-
import hashlib
import json
import os.path
from json import JSONDecodeError

import requests

log_path_list = [
    "/tmp/root/hive.log"
]


def read_hive_log(file_path):
    """
    读取Hive日志文件并返回包含关键词的行内容列表

    参数:
    file_path (str):Hive日志文件的路径

    返回:
    content (list):包含关键词的行内容json列表
    """
    save_dict = {}
    if os.path.exists('./hash_index.log'):
        try:
            with open("./hash_index.log", 'r') as f:
                file_content = f.read()
                if file_content != '':
                    save_dict = json.loads(file_content)
        except json.JSONDecodeError as e:
            print(f"无法将文件内容转换为JSON:{e}")

    new_file = log_path.split("/")[-1]


    if new_file in save_dict.keys():
        old_size = save_dict.get(new_file).get('size', 0)
        line_index = save_dict.get('index', 0)
    else:
        # print("此为新文件,从头开始读取")
        old_size = 0
        line_index = 0


    is_new_file = False
    try:
        new_size: int = os.path.getsize(file_path)
    except Exception as e:
        print("读取文件大小失败:", e)
        new_size = 0
    if (new_file not in save_dict.keys()) or (new_file in save_dict.keys() and (new_size < old_size or old_size == 0)):
        is_new_file = True

    content = []

    is_new_file_only_one = is_old_file_only_one = is_just_info_only_one = False
    try:
        with open(file_path, 'r', encoding='utf-8', errors='replace') as log_file:
            for line_number, line in enumerate(log_file, 1):
                if search_keyword in line:
                    if is_new_file:
                        if not is_new_file_only_one:
                            print("是新文件,从头开始读取")
                            is_new_file_only_one = True
                        content.append((line_number, line.split(search_keyword)[-1]))
                        line_index = line_number

                    else:
                        if line_number >= line_index:
                            if not is_old_file_only_one:
                                print("是旧文件,从上次读取位置继续读取: {}".format(line_index))
                                is_old_file_only_one = True
                            content.append((line_number, line.split(search_keyword)[-1]))
                            line_index = line_number

    except Exception as e:
        print(f"读取Hive日志文件失败:{e}")
    return content, new_size, line_index, new_file


def parse_vertice(vertices):
    """
    解析顶点数据并返回顶点字典

    参数:
    vertices(list): 顶点数据列表

    返回值:
    vertex_dict(dict): 顶点字典,键为顶点ID,值为元组,包含数据库名、表名和列名(如果顶点类型为列)

    """
    vertex_dict = {}
    for vertex in vertices:
        vertex_id = vertex.get("id", "")
        vertex_type = vertex.get("vertexType", "")
        vertex_names = vertex.get("vertexId", "").split(".")

        if len(vertex_names) >= 3:
            db_name = vertex_names[0]
            tb_name = vertex_names[1]
            col_name = vertex_names[-1] if vertex_type == "COLUMN" else ""

            if col_name not in partition_field:
                vertex_dict.setdefault(vertex_id, {"db": db_name, "tb": tb_name, "col": col_name})
    return vertex_dict


def parse_edge(edges):
    """
    解析边的函数

    参数:
    edges (list): 边的列表

    返回值:
    list: 边元素的列表,每个元素为一个元组,包含源节点列表、目标节点列表和表达式
    """
    edge_elem_list = []
    for edge in edges:
        source_arr = edge.get("sources", [])
        target_arr = edge.get("targets", [])
        expression = edge.get("expression", "")
        edge_type = edge.get("edgeType", "")
        edge_elem_list.append({"source": source_arr, "target": target_arr, "exp": expression, "type": edge_type})
    return edge_elem_list

def parse_lineage_log(content: list):
    column_info_dict = {}

    # 去重数据
    for (line_number, line) in content:
        try:
            lineage_dict = json.loads(line)
            vertex_dict = parse_vertice(lineage_dict.get('vertices', []))
            edge_list = parse_edge(lineage_dict.get('edges', []))
            tb, column_info = get_column_depend(vertex_dict, edge_list)
            column_info_dict[tb] = column_info
        except JSONDecodeError as e:
            print("json解析错误: {}".format(line))
            print("该行错误位置: {}".format(line_number))

    return column_info_dict


if __name__ == '__main__':
    print("开始启动....")
    log_dict = {}
    for log_path in log_path_list:
        contents, file_size, index, new_file_name = read_hive_log(log_path)
        column_info_dicts = parse_lineage_log(contents)
        print("{} 文件执行完".format(log_path))
        log_dict.setdefault(log_path.split('/')[-1], dict(size=file_size, index=index, file=new_file_name))
    with open("./hash_index.log", 'w') as f:
        f.write(json.dumps(log_dict))
    print("执行结束...")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/608995.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

亚马逊云科技中国峰会:与你开启云计算与前沿技术的探索之旅

亚马逊云科技中国峰会&#xff1a;与你开启云计算与前沿技术的探索之旅 Hello,我是科技博主Maynor&#xff0c;非常高兴地向你们推荐亚马逊云科技中国峰会&#xff0c;这是一场将于 5 月 29 日至 30 日在上海世博中心举办的科技盛会&#xff0c;如果你对云计算、行业发展新趋势…

IDEA 使用maven编译,控制台出现乱码问题的解决方式

前言 使用idea进行maven项目的编译时&#xff0c;控制台输出中文的时候出现乱码的情况。 通常出现这样的问题&#xff0c;都是因为编码格式不一样导致的。既然是maven出的问题&#xff0c;我们在idea中查找下看可以如何设置文件编码。 第一种方式 在pom.xml文件中&#xff…

Meta FAIR: 深层网络不合理的低效性

这篇文章的标题"The Unreasonable Ineffectiveness of the Deeper Layers"巧妙地呼应了著名物理学家尤金维格纳在1960年发表的一篇论文"数学在自然科学中不合理的有效性"(The Unreasonable Effectiveness of Mathematics in the Natural Sciences)。维格纳…

FPGA+炬力ARM实现VR视频播放器方案

FPGA炬力ARM方案&#xff0c;单个视频源信号&#xff0c;同时驱动两个LCD屏显示&#xff0c;实现3D 沉浸式播放 客户应用&#xff1a;VR视频播放器 主要功能&#xff1a; 1.支持多种格式视频文件播放 2.支持2D/3D 效果实时切换播放 3.支持TF卡/U盘文件播放 4.支持定制化配置…

Linux运维:centos环境变量

前言 在 Linux 运维工作中&#xff0c;管理环境变量是至关重要的一项任务。在 CentOS 环境下&#xff0c;正确配置环境变量可以使系统更加高效和易于管理。 本文将重点讨论 CentOS 环境下的环境变量设置&#xff0c;并就python的环境变量配置方案进行讲解&#xff08;不包含Ano…

AutoModelForCausalLM.from_pretrained 函数调用本地权重报错

文章目录 1、代码报错的位置&#xff08;前情提要&#xff09;finetune_lora.shfintune_clm_lora.py 2、报错截图2.1、huggingfaces上的 meta-llama/Llama-2-7b-chat-hf2.2、服务器上模型文件路径 3、特别注意事项 1、代码报错的位置&#xff08;前情提要&#xff09; 在终端直…

06.命令的组合使用

命令的组合使用 1.查询当前整个系统每个进程的线程数 我们经常遇到这样的问题&#xff0c;比如某台服务器的CPU 使用率飙升&#xff0c;通过top命令查看是某个程序&#xff08;例如java&#xff09;占用的cpu比较大&#xff0c;现在需要查询java各个进程下的线程数情况。可以通…

Reactor Netty HTTP 服务器端-响应式编程-014

🤗 ApiHug {Postman|Swagger|Api...} = 快↑ 准√ 省↓ GitHub - apihug/apihug.com: All abou the Apihug apihug.com: 有爱,有温度,有质量,有信任ApiHug - API design Copilot - IntelliJ IDEs Plugin | Marketplace The Next Generation API Development Platform …

风扇开启执行逻辑

执行流程 public static void businessExecutionWork(){//以下为业务逻辑部分System.out.println("1、根据电池包控制风扇服务执行 开始!");//1、获取电池包电压、电流、环境温度//获取电池包电压、电流、环境温度ObtainBatteryDataService obtainBatteryDataServic…

Docker 怎么将映射出的路径设置为非root用户权限

在Docker中&#xff0c;容器的根文件系统默认是由root用户拥有的。如果想要在映射到宿主机的路径时设置为非root用户权限&#xff0c;可以通过以下几种方式来实现&#xff1a; 1. 使用具有特定UID和GID的非root用户运行容器&#xff1a; 在运行容器时&#xff0c;你可以使用-u…

监控异地组网怎么组网?

监控异地组网是指在不同地域的网络环境下&#xff0c;实现对监控设备的远程访问和管理。在传统的网络环境下&#xff0c;由于网络限制和设备配置等问题&#xff0c;监控设备的远程访问往往受到一定的限制和困扰。为了解决这个问题&#xff0c;引入了天联组网技术&#xff0c;实…

Mysql幻读

幻读指的是一个事务在前后两次查询同一个范围的时候&#xff0c;后一次查询看到了前一次查询没有看到的行。 幻读仅专指“新插入的行” 在可重复读隔离级别下&#xff0c;普通的查询是快照读&#xff0c;是不会看到别的事务插入的数据的。因此&#xff0c;幻读在“当前读”下…

【JavaEE网络】HTTP/HTTPS协议的工作原理与格式详解

目录 HTTP/HTTPSHTTP是什么理解“应用层协议”理解HTTP协议的工作过程HTTP协议格式 HTTP/HTTPS HTTP是什么 应用层&#xff0c;一方面是需要自定义协议&#xff0c;一方面也会用到一些现成的协议 HTTP及HTTPS是应用层重点协议 使用浏览器&#xff0c;打开网站&#xff0c;这…

自动化运维工具——Ansible

一、Ansible的概念&#xff1a; 1.Ansible的介绍&#xff1a; Ansible是一个基于Python开发的配置管理和应用部署工具&#xff0c;现在也在自动化管理领域大放异彩。它融合了众多老牌运维工具的优点&#xff0c;Pubbet和Saltstack能实现的功能&#xff0c;Ansible基本上都可以…

OpenHarmony 实战开发(南向)-Docker编译环境搭建

Docker环境介绍 OpenHarmony为开发者提供了两种Docker环境&#xff0c;以帮助开发者快速完成复杂的开发环境准备工作。两种Docker环境及适用场景如下&#xff1a; 独立Docker环境&#xff1a;适用于直接基于Ubuntu、Windows操作系统平台进行版本编译的场景。 基于HPM的Docker…

2024车载测试还有发展吗?

2024年已过接近1/4了,你是不是还在围观车载测试行业的发展? 现在入车载测试还来得及吗? 如何高效学习车载测试呢? 首先我们看一下车载测试行情发展,通过某大平台,我们后去数据如下: 这样的数据可以预估一下未来车载测试还是会持续发展. 随着科技的发展和汽车行业的不断创新,…

第08章 IP分类编址和无分类编址

8.1 本章目标 了解IP地址的用途和种类了解分类编址和无分类编址区别掌握IP地址、子网掩码、网关概念及使用掌握子网划分及超网划分方法掌握无分类编址的改变和使用 8.2 IP地址的用途和种类 分类编址&#xff1a;造成地址的浪费&#xff0c;以及地址不够用&#xff1b;无分类编…

labview技术交流-字符串数组连接成字符串

应用场景 我们可能需要将一维的字符串数组转换成一整条字符串&#xff0c;然后方便记录在数据库或表格中的一个单元格中。 代码展示 方案一 我们使用for循环完成这样的功能需求&#xff0c;见下图&#xff1a; 这种方案可能相对基础和普通&#xff0c;但是它更方便和易于扩展…

在Flask中使用Celery完成异步和定时任务(Flask、Celery、Redis)

编程目标 通过使用Flask和Celery&#xff0c;实现一个简单的Web应用程序&#xff0c;能够接收HTTP POST请求&#xff0c;并异步发送电子邮件。 说明 使用Flask创建一个简单的Web应用程序&#xff0c;包含一个HTTP POST路由&#xff0c;用于接收发送电子邮件的请求。使用Cele…

【Java SE】对象的比较

&#x1f970;&#x1f970;&#x1f970;来都来了&#xff0c;不妨点个关注叭&#xff01; &#x1f449;博客主页&#xff1a;欢迎各位大佬!&#x1f448; 本期内容满满干货&#xff0c;将会深入介绍对象与对象之间是如何进行比较的&#xff0c;我们知道基本数据类型是可以直…
最新文章