离线数仓数据导出-hive数据同步到mysql

离线数仓数据导出-hive数据同步到mysql

  • MySQL建库建表
  • 数据导出

为方便报表应用使用数据,需将ads各指标的统计结果导出到MySQL数据库中。
datax支持hive同步MySQL:仅仅支持hive存储的hdfs文件导出。所以reader选hdfs-reader,writer选mysql-writer。

在这里插入图片描述
null值 在hive和mysql里的存储格式不一样,需要告诉DataX应该如何转换。
在这里插入图片描述

MySQL建库建表

12.1.1 创建数据库

CREATE DATABASE IF NOT EXISTS gmall_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;

建mysql表的,
1字段个数要和hive中的ads层数据保持一致,
2字段类型要和hive对等替换,
3字段顺序也要一致
每张表要有主键

1)各活动补贴率
dt activity_id activity_name 三个主键联合而成

DROP TABLE IF EXISTS `ads_activity_stats`;
CREATE TABLE `ads_activity_stats`  (
  `dt` date NOT NULL COMMENT '统计日期',
  `activity_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '活动ID',
  `activity_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',
  `start_date` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动开始日期',
  `reduce_rate` decimal(16, 2) NULL DEFAULT NULL COMMENT '补贴率',
  PRIMARY KEY (`dt`, `activity_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '活动统计' ROW_FORMAT = Dynamic;

数据导出

DataX配置文件生成脚本
方便起见,此处提供了DataX配置文件批量生成脚本,脚本内容及使用方式如下。
1)在~/bin目录下创建gen_export_config.py脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.py
脚本内容如下

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/export"


def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))


def generate_json(target_database, target_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "${exportdir}",
                        "defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,
                        "column": ["*"],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "nullFormat": "\\N"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "replace",
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(target_database, target_table),
                        "connection": [
                            {
                                "jdbcUrl":
                                    "jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8",
                                "table": [target_table]
                            }
                        ]
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    target_database = ""
    target_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--targetdb'):
            target_database = opt_value
        if opt_name in ('-t', '--targettbl'):
            target_table = opt_value

    generate_json(target_database, target_table)

if __name__ == '__main__':
    main(sys.argv[1:])

在~/bin目录下创建gen_export_config.sh脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.sh
脚本内容如下。

#!/bin/bash

python ~/bin/gen_export_config.py -d gmall_report -t ads_activity_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_coupon_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_new_buyer_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_order_by_province
python ~/bin/gen_export_config.py -d gmall_report -t ads_page_path
python ~/bin/gen_export_config.py -d gmall_report -t ads_repeat_purchase_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_sku_cart_num_top3_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_traffic_stats_by_channel
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_action
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_change
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_retention
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_stats

3)为gen_export_config.sh脚本增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ~/bin/gen_export_config.sh
4)执行gen_export_config.sh脚本,生成配置文件
[atguigu@hadoop102 bin]$ gen_export_config.sh
5)观察生成的配置文件

[atguigu@hadoop102 bin]$ ls /opt/module/datax/job/export/

编写每日导出脚本
(1)在hadoop102的/home/atguigu/bin目录下创建hdfs_to_mysql.sh
[atguigu@hadoop102 bin]$ vim hdfs_to_mysql.sh
(2)编写如下内容


#! /bin/bash

DATAX_HOME=/opt/module/datax

#DataX导出路径不允许存在空文件,该函数作用为清理空文件
handle_export_path(){
  target_file=$1
  for i in `hadoop fs -ls -R $target_file | awk '{print $8}'`; do
    hadoop fs -test -z $i
    if [[ $? -eq 0 ]]; then
      echo "$i文件大小为0,正在删除"
      hadoop fs -rm -r -f $i
    fi
  done

}


#数据导出
export_data() {
  datax_config=$1
  export_dir=$2
  hadoop fs -test -e $export_dir
  if [[ $? -eq 0 ]]
  then
    handle_export_path $export_dir
    file_count=$(hadoop fs -ls $export_dir | wc -l)
    if [ $file_count -gt 0 ]
    then
      set -e;
      $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config
      set +e;
    else 
      echo "$export_dir 目录为空,跳过~"
    fi
  else
    echo "路径 $export_dir 不存在,跳过~"
  fi
}


case $1 in
  "ads_new_buyer_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
  ;;
  "ads_order_by_province")
    export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
  ;;
  "ads_page_path")
    export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
  ;;
  "ads_repeat_purchase_by_tm")
    export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
  ;;
  "ads_trade_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
  ;;
  "ads_trade_stats_by_cate")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
  ;;
  "ads_trade_stats_by_tm")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
  ;;
  "ads_traffic_stats_by_channel")
    export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
  ;;
  "ads_user_action")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
  ;;
  "ads_user_change")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
  ;;
  "ads_user_retention")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
  ;;
  "ads_user_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
  ;;
  "ads_activity_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
  ;;
  "ads_coupon_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
  ;;
  "ads_sku_cart_num_top3_by_cate")
    export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
  ;;

"all")
  export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
  export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
  export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
  export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
  export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
  export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
  export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
  export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
  export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
  export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
  ;;
esac

(3)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x hdfs_to_mysql.sh
(4)脚本用法
[atguigu@hadoop102 bin]$ hdfs_to_mysql.sh all

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/558950.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

架构师系列-搜索引擎ElasticSearch(十)- 索引别名及重建

索引别名 别名,有点类似数据库的视图,别名一般都会和一些过滤条件相结合,可以做到即使是同一个索引上,让不同人看到不同的数据。 别名的作用 在开发中,一般随着业务需求的迭代,较老的业务逻辑就要面临更新…

架构设计-权限系统之通用的权限系统设计方案

一个系统,如果没有安全控制,是十分危险的,一般安全控制包括身份认证和权限管理。用户访问时,首先需要查看此用户是否是合法用户,然后检查此用户可以对那些资源进行何种操作,最终做到安全访问。身份认证的方…

Junit 基础-ApiHug准备-测试篇-009

🤗 ApiHug {Postman|Swagger|Api...} 快↑ 准√ 省↓ GitHub - apihug/apihug.com: All abou the Apihug apihug.com: 有爱,有温度,有质量,有信任ApiHug - API design Copilot - IntelliJ IDEs Plugin | Marketplace 注解 J…

STM32直接存储器存取DMA

前提知识: 1、STM32F103内部存储器结构以及映射 STM32F103的程序存储器、数据存储器、寄存器和IO端口被组织在同一个4GB的线性地址空间内。数据字节以小端模式存放在存储器中。即低地址中存放的是字数据的低字节,高地址中存放的是字数据的高字节 可访问…

React间接实现一个动态组件逻辑

在开发一个浏览器插件的时候,用的plasmo框架和react支持的,里面使用react开发一个菜单功能,但是又不想使用react-router,所以就想着能不能使用一个很简单的方式做一个替代方案?那肯定是可以。 我在引入一个组件后&…

C语言 | Leetcode C语言题解之第40题组合总和II

题目: 题解: int** ans; int* ansColumnSizes; int ansSize;int* sequence; int sequenceSize;int** freq; int freqSize;void dfs(int pos, int rest) {if (rest 0) {int* tmp malloc(sizeof(int) * sequenceSize);memcpy(tmp, sequence, sizeof(int…

Golang | Leetcode Golang题解之第37题解数独

题目: 题解: func solveSudoku(board [][]byte) {var line, column [9][9]boolvar block [3][3][9]boolvar spaces [][2]intfor i, row : range board {for j, b : range row {if b . {spaces append(spaces, [2]int{i, j})} else {digit : b - 1line…

如何实现外网访问内网ip?公网端口映射或内网映射来解决

本地搭建服务器应用,在局域网内可以访问,但在外网不能访问。如何实现外网访问内网ip?主要有两种方案:路由器端口映射和快解析内网映射。根据自己本地网络环境,结合是否有公网IP,是否有路由权限,…

Vast+产品展厅 | Vastbase G100数据库是什么架构?(1)

Vastbase G100是海量数据融合了多年对各行业应用场景的深入理解,基于openGauss内核开发的企业级关系型数据库。 了解Vastbase G100的架构,可以帮助您确保数据库系统的高效、可靠和安全运行。 “Vast产品展厅”将分两期,为您详细讲解Vastbas…

Excel模板导入、导出工具类

1.引入maven依赖&#xff0c;利用hutool的excel读取 Hutool-poi对excel读取、写入 <dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version></dependency> <depen…

手写Java设计模式之工厂模式,附源码解读

工厂模式&#xff08;Factory Pattern&#xff09;是 Java 中最常用的设计模式之一&#xff0c;这种类型的设计模式属于创建型模式&#xff0c;它提供了一种创建对象的最佳方式。 工厂模式提供了一种创建对象的方式&#xff0c;而无需指定要创建的具体类。 工厂模式属于创建型…

智慧园区引领产业智能化升级:科技创新驱动打造智慧化、高效化产业新未来

随着全球科技革命的深入推进&#xff0c;以大数据、云计算、物联网、人工智能等为代表的新一代信息技术正深刻改变着传统产业的发展模式。在这一背景下&#xff0c;智慧园区作为产业智能化升级的重要载体和平台&#xff0c;正以其前瞻性的规划、创新的科技和卓越的实践&#xf…

桥接模式【结构型模式C++】

1.概述 桥接模式是一种结构型设计模式&#xff0c;是用于把抽象化与实现化解耦&#xff0c;使得二者可以独立变化。这种类型的设计模式属于结构型模式&#xff0c;它通过提供抽象化和实现化之间的桥接结构&#xff0c;来实现二者的解耦。 这种模式涉及到一个作为桥接的接口&am…

完整、免费的把pdf转word文档

在线工具网 https://www.orcc.online/pdf 支持pdf转word&#xff0c;免费、完整、快捷 登录网站 https://orcc.online/pdf 选择需要转换的pdf文件&#xff1a; 等待转换完成 点击蓝色文件即可下载 无限制&#xff0c;完整转换。

C语言-浮点数在内存中的存储

目录 C语言-浮点数在内存中的存储 练习 浮点数的存储 浮点数存的过程 浮点数在内存的存储 浮点数取的过程 C语言-浮点数在内存中的存储 常见的浮点数&#xff1a;3.14149、1E10&#xff08;科学计数法表示形式&#xff1a;1.0*10^10&#xff09;等&#xff0c;浮点数家族…

数据赋能(62)——要求:数据管理部门能力

“要求&#xff1a;数据管理部门能力”是作为标准的参考内容编写的。 在实施数据赋能中&#xff0c;数据管理部门的能力体现在多个方面&#xff0c;关键能力如下图所示。 在实施数据赋能的过程中&#xff0c;数据管理部门应具备的关键能力如下。 数据治理与标准化能力 数据管…

数字技术重塑园区管理,数字园区开启智慧化新篇章,引领产业创新发展

目录 一、引言 二、数字技术重塑园区管理 1、智能化基础设施建设 2、数据驱动的决策管理 3、精细化服务模式 三、数字园区开启智慧化新篇章 1、智慧化运营管理 2、智慧化产业创新 3、智慧化生态环境 四、引领产业创新发展 1、推动传统产业转型升级 2、培育新兴产业…

A Geolocation Databases Study(2011年)第五部分:Evalution Model

下载地址:A Geolocation Databases Study | IEEE Journals & Magazine | IEEE Xplore 被引次数:195 Shavitt Y, Zilberman N. A geolocation databases study[J]. IEEE Journal on Selected Areas in Communications, 2011, 29(10): 2044-2056. 5. Discussion 在我们讨…

JavaSE——常用API进阶二(6/8)-ZoneId、ZoneDateTime、Instant(常见方法、用法示例)

目录 ZoneId 常见方法 用法示例 ZoneDateTime 常见方法 用法示例 Instant 常见方法 用法示例 如果在开发中我们有这样的需求&#xff1a;我们的系统需要获取美国现在的时间&#xff0c;或者其他地区的时间给用户观看&#xff0c;或者进行一些处理&#xff0c;那应该怎…
最新文章