[Spark SQL]Spark SQL读取Kudu,写入Hive

SparkUnit

Function:用于获取Spark Session

package com.example.unitl

import org.apache.spark.sql.SparkSession

object SparkUnit {
  def getLocal(appName: String): SparkSession = {
    SparkSession.builder().appName(appName).master("local[*]").getOrCreate()
  }

  def getLocal(appName: String, supportHive: Boolean): SparkSession = {
    if (supportHive) getLocal(appName,"local[*]",true)
    else getLocal(appName)
  }

  def getLocal(appName:String,master:String,supportHive:Boolean): SparkSession = {
    if (supportHive) SparkSession.builder().appName(appName).master(master).enableHiveSupport().getOrCreate()
    else  SparkSession.builder().appName(appName).master(master).getOrCreate()
  }

  def stopSs(ss:SparkSession): Unit ={
    if (ss != null) {
      ss.stop()
    }
  }
}

log4j.properties

Function:设置控制台输出级别

# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

KTV

Function:读取kudu,写入hive。Kudu_To_Hive,简称KTV

package com.example.dao

import com.example.unitl.SparkUnit
import org.apache.spark.sql.SparkSession

object KTV {
  def getKuduTableDataFrame(ss: SparkSession): Unit = {
    // 读取kudu
    // 获取tb对象
    val kuduTb = ss.read.format("org.apache.kudu.spark.kudu")
      .option("kudu.master", "10.168.1.12:7051")
      .option("kudu.table", "impala::realtimedcs.bakup_db") // Tips:注意指定库
      .load()

    // create view
    kuduTb.createTempView("v1")

    val kudu_unit1_df = ss.sql(
      """
        |SELECT * FROM `sources_tb1`
        |WHERE `splittime` = "2021-07-11"
        |""".stripMargin)

    // print
    kudu_unit1_df.printSchema()
    kudu_unit1_df.show()

    // load of memory
    kudu_unit1_df.createOrReplaceTempView("v2")
  }

  def insertHive(ss: SparkSession): Unit = {
    // create table
    ss.sql(
      """
        |USE `bakup_db`
        |""".stripMargin)

    ss.sql(
      """
        |  CREATE TABLE IF NOT EXISTS `bak_tb1`(
        |   `id` int,
        |   `packtimestr` string,
        |   `dcs_name` string,
        |   `dcs_type` string,
        |   `dcs_value` string,
        |   `dcs_as` string,
        |   `dcs_as2` string)
        | PARTITIONED BY (
        |   `splittime` string)
        |""".stripMargin)
    println("创建表成功!")

    // create view
    ss.sql(
      """
        |INSERT INTO `bakup_db`
        |SELECT * FROM bak_tb1
        |""".stripMargin)
    println("保存成功!")
  }

  def main(args: Array[String]): Unit = {
    //get ss
    val ss = SparkUnit.getLocal("KTV", true)
    // 做动态分区, 所以要先设定partition参数
    // default是false, 需要额外下指令打开这个开关
    ss.sqlContext.setConf("hive.exec.dynamic.partition;","true");
    ss.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict");

    // 调用方法
    getKuduTableDataFrame(ss)
    insertHive(ss)

    // 关闭连接
    SparkUnit.stopSs(ss)
  }
}

运行:

运行时请将hive的配置文件 hive-site.xml文件,复制到项目resource下。

hue查看写入的数据:

略

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/455451.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

探索考古文字场景,基于YOLOv8全系列【n/s/m/l/x】参数模型开发构建文本考古场景下的甲骨文字符图像检测识别系统

甲骨文是一种非常历史悠久的古老文字,在前面我们基本上很少有涉及这块的内容,最近正好在做文字相关的项目开发研究,就想着基于甲骨文的场景来开发对应的检测识别系统,在前文中我们基于YOLOv5、YOLOv7和YOLOv9开发构建了在仿真数据…

c++:类和对象中:拷贝构造和赋值运算符重载详解

c:类和对象 构造函数和析构函数详解 文章目录 c:类和对象构造函数和析构函数详解 前言一、拷贝构造怎么写拷贝构造1.拷贝构造也是构造函数的一种,构造函数没有值.所以拷贝构造也没有返回值**2.拷贝构造只有一个形参,正常这个形参是自定义类型对象的引用.3. 如果我们没有显示写…

Excel判断CD两列在EF两列的列表中是否存在

需求 需要将CD两列的ID和NAME组合起来,查询EF两列的ID和NAME组合起来的列表中是否存在? 比如,判断第二行的“123456ABC”在EF的第二行到第四行中是否存在,若存在则显示Y,不存在则显示N 实现的计算公式 IF(ISNUMBER…

文字弹性跳动CSS3代码

文字弹性跳动CSS3代码,源码由HTMLCSSJS组成,记事本打开源码文件可以进行内容文字之类的修改,双击html文件可以本地运行效果,也可以上传到服务器里面,重定向这个界面 下载地址 文字弹性跳动CSS3代码

YOLOv9实例分割教程|(一)训练教程

专栏介绍:YOLOv9改进系列 | 包含深度学习最新创新,主力高效涨点!!! 一、创建数据集及数据配置文件 创新一个文件夹存放分割数据集,包含一个images和labels文件夹。标签格式如下所示: 创新数据集…

Linux——文件系统与软硬链接

目录 前言 一、物理上的磁盘 二、磁盘存储的逻辑抽象结构 三、块组内容与工作原理 1.inode 2.Data bolcks 3.inode 位图 4.bolck bitmap 5.GDT 6.Super Block 四、软硬链接 五、软硬链接链接目录 前言 我们之前学习了文件标识符,重定向,缓…

Spring框架----AOP全集

一:AOP概念的引入 首先我们来看一下登录的原理 如上图所示这是一个基本的登录原理图,但是如果我们想要在这个登录之上添加一些新的功能,比如权限校验 那么我们能想到的就有两种方法: ①:通过对源代码的修改实现 ②&a…

Android实现点击获取验证码60秒后重新获取功能

本文实例为大家分享了Android实现点击获取验证码60秒后重新获取的具体代码,供大家参考,具体内容如下 上代码 /*** Created by Xia_焱 on 2017/5/7.*/public class CountDownTimerUtils extends CountDownTimer {private TextView mTextView;/*** param…

Requests教程-17-请求代理设置

上一小节我们学习了requests解决乱码的方法,本小节我们讲解一下requests设置代理的方法。 代理基本原理 代理实际上指的就是代理服务器, 英文叫作proxy server ,它的功能是代理网络用户去取得网络信息。形象地说,它是网络信息的中…

Android Studio入门——页面跳转

1.工程目录 2.MainActivity package com.example.demo01;import android.content.Intent; import android.os.Bundle; import android.view.View; import android.widget.TextView;import androidx.appcompat.app.AppCompatActivity;public class MainActivity extends AppCo…

前端开发小技巧 - 【Vue3 + TS】 - 在 TS + Vue3 中使用 Pinia,实现 Pinia 的持久化,优化Pinia(仓库统一管理)

前言 ts 中使用 pinia 和 Vue3 基本一致,唯一的不同点在于,需要根据接口文档给 state 标注类型,也要给 actions 标注类型;以下都是 组合式API 的写法,选项式API 的写法大家可以去官网看看; Pinia&#xff…

Text-to-SQL 工具Vanna进阶|数据库对话机器人的多轮对话

跟数据库对话机器人对话,我可不止一个问题。 可能基于第一句问话,还有第二句、第三句问话。。。第N句对话。所以本文测试了多轮对话功能。 单轮对话的环境搭建参考博客 Text-to-SQL 工具Vanna + MySQL本地部署 | 数据库对话机器人 我的数据是这样 1. 基础配置 import vann…

深入解析Java中锁机制以及底层原理

一、概述 1.1 背景 概念:锁是多线程编程中的机制,用于控制对共享资源的访问。可以防止多个线程同时修改或读取共享资源,从而保证线程安全。 作用:锁用于实现线程间的互斥和协调,确保在多线程环境下对共享资源的访问顺…

前端性能优化——javascript

优化处理: 讲javascript脚本文件放到body标记的后面 减少页面当中所包含的script标记的数量 课堂练习: 脚本优化处理 使用原生JavaScript完成操作过程。 document.querySelector document.querySelectorAll classList以及类的操作API Element.class…

LLM之RAG实战(二十九)| 探索RAG PDF解析

对于RAG来说,从文档中提取信息是一种不可避免的场景,确保从源文件中提取出有效的内容对于提高最终输出的质量至关重要。 文件解析过程在RAG中的位置如图1所示: 在实际工作中,非结构化数据比结构化数据丰富得多。如果这些海量数据无…

记录西门子:SCL设置不同顺序

一台搅拌的设备,需要控制三种料的进料顺序和进料重量,顺序和重量可以随便设定,也可以是几十种料。触摸屏上面有A、B、C三种液体原料,需要设定三种液体原料重量,并设定序号。 假设如下面所示设定:那将先打开…

java数据结构与算法刷题-----LeetCode90. 子集 II

java数据结构与算法刷题目录(剑指Offer、LeetCode、ACM)-----主目录-----持续更新(进不去说明我没写完):https://blog.csdn.net/grd_java/article/details/123063846 文章目录 解题思路:时间复杂度O( n 2 ∗ n n^2*n n2∗n),空间复杂度O(n) 7…

开箱机自动化装箱技术:原理、应用与未来趋势

在物流、电商等行业中,开箱机作为自动化装箱技术的核心设备,正逐渐改变着传统的包装方式,为企业带来了效率和成本的双重优化。星派将从开箱机的原理出发,详细解析其应用领域,并展望未来的发展趋势。 一、开箱机的工作原…

《高效便捷,探索快递柜系统架构的智慧之路》

随着电商业务的蓬勃发展,快递柜系统作为一种高效、便捷的最后一公里配送解决方案,正在受到越来越多企业和消费者的青睐。本篇博客将深入探讨快递柜系统的架构设计理念、优势和实践,帮助读者了解如何构建智能化的快递柜系统,提升物…

【UE5】非持枪状态蹲姿移动的动画混合空间

项目资源文末百度网盘自取 在BlendSpace文件夹中单击右键选择动画(Animation)中的混合空间(Blend Space) ,选择SK_Female_Skeleton,命名为BS_NormalCrouch 打开BS_NormalCrouch 水平轴表示角色的方向,命名为Direction,方向的最…