实现HBase表和RDB表的转化(附Java源码资源)

实现HBase表和RDB表的转化

在这里插入图片描述
在这里插入图片描述

一、引入

转化为HBase表的三大来源:RDB Table、Client API、Files

在这里插入图片描述
如何构造通用性的代码模板实现向HBase表的转换,是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。

首先,我们需要分别构造rdb和hbase的对象,根据批处理的思想,我们可以考虑批量将rdb中的数据导出,并且转化为List<Put>的格式,直接导入HBase表中,最后释放资源,伪代码模板如下:

rdb=...
hbase=...
rdb.init();
hbase.init();
while(rdb.hasNextBatch()){
	List<Put> batch = rdb.nextBatch();
	hbase.putBatch(batch);
}
hbase.close();
rdb.close();

二、代码讲解

1. 目录结构

在这里插入图片描述

2. 具体实现
  • transfer.properties
    在这里插入图片描述

内含HBase和RDB转换所有配置信息的配置文件,因为该配置文件是在启动时就需要进行配置,因此我们需要按以下图片进行配置导入配置文件:
在这里插入图片描述

  1. Run/Debug Configurations中,新建一个Application
  2. 配置好主类
  3. 配置好配置文件的具体路径
  • RDB 接口
public interface RDB extends Com {
    // 要提升性能,需要使用批处理
    boolean hasNextBatch() throws SQLException;// 是否存在下一个批次
    List<Put> nextBatch() throws SQLException;// 一个put代表往一个hbase表的一行的一个列族的一个列插入一条数据,对Hbase来说,批次就是List<Put>
}
  • RDB 实现类
public class RDBImpl implements RDB {
    private static Logger logger = Logger.getLogger(RDBImpl.class);
    // JDBC 的基本元素:连接对象(装载[驱动]、[URL]、[账号]、[密码])->执行对象(SQL语句)->结果集
    private Properties config;
    /**
     * 它们需要设置成全局变量的原因是它们需要共享
     */
    private Connection con;
    private PreparedStatement pst;
    private ResultSet rst;
    // 定义每个批次处理的记录数的最大数量
    private int batchSize;
    // hbase的行键对应rdb的列的列名
    private String hbaseRowKeyRdbCol;
    private Map<String,Map<String,String>> hbaseRdbColMapping;

    // RDB配置可以灵活地从外部传入(构造方法),从内部读取(config())
    public RDBImpl(Properties config) {
        this.config = config;
    }

    @Override
    public Properties config() {
        return config;
    }

    /**
     * 内部资源初始化
     */
    @Override
    public void init() throws Exception{
        con = getConnection();
        logger.info("RDB 创建 [ 连接 ] 对象成功");
        pst = getStatement(con);
        logger.info("RDB 创建 [ 执行 ] 对象成功");
        rst = getResult(pst);
        logger.info("RDB 创建 [ 结果集 ] 成功");
        batchSize = batchSize();
        hbaseRdbColMapping = hbaseRdbColumnsMapping();
    }

    @Override
    public void close() {
        closeAll(rst,pst,con);
    }


    private String driver(){
        return checkAndGetConfig("rdb.driver");
    }

    private String url(){
        return checkAndGetConfig("rdb.url");
    }

    private String username(){
        return checkAndGetConfig("rdb.username");
    }

    private String password(){
        return checkAndGetConfig("rdb.password");
    }

    private String sql(){
        return checkAndGetConfig("rdb.sql");
    }

    private int batchSize(){
        return Integer.parseInt(checkAndGetConfig("rdb.batchSize"));
    }

    // java.sql下的Connection
    private Connection getConnection() throws ClassNotFoundException, SQLException {
        // 装载驱动
        Class.forName(driver());
        // 获取并返回连接对象
        return DriverManager.getConnection(url(),username(),password());
    }
    private PreparedStatement getStatement(Connection con) throws SQLException {
        return con.prepareStatement(sql());
    }
    private ResultSet getResult(PreparedStatement statement) throws SQLException {
        return statement.executeQuery();
    }
    /**
     * hbase 列族和列与rdb中列的映射关系
     *             hbase列族   hbase列  rdb列
     * @return Map<String,Map<String,String>>
     */
    private Map<String, Map<String,String>> hbaseRdbColumnsMapping(){
        String mapping = checkAndGetConfig("rdb.hbase.columns.mapping");
        Map<String,Map<String,String>> map = new HashMap<>();
        String[] pss = mapping.split(",");
        for(String ps : pss){
            String[] pp = ps.split("->");
            String[] p = pp[0].split(":");
            String rdbCol = pp[1],hbaseColFamily,hbaseColName;
            if(p.length==1){
                hbaseRowKeyRdbCol = pp[1];
            }else {
                hbaseColFamily = p[0];
                hbaseColName = p[1];
                if(!map.containsKey(hbaseColFamily)){
                    map.put(hbaseColFamily,new HashMap<>());
                }
                map.get(hbaseColFamily).put(hbaseColName,rdbCol);
            }
        }
        return map;
    }

    /**
     * 将RDB的列转化为字节数组(需要确定列的数据类型)
     * @param rdbColumn
     * @return
     * @throws SQLException
     */

    private byte[] toBytesFromRdb(String rdbColumn) throws SQLException {
        Object obj = rst.getObject(rdbColumn);
        if(obj instanceof String){
            return Bytes.toBytes((String)obj);
        } else if(obj instanceof Float){
            return Bytes.toBytes(((Float)obj).floatValue());
        } else if(obj instanceof Double){
            return Bytes.toBytes(((Double)obj).doubleValue());
        } else if(obj instanceof BigDecimal){
            return Bytes.toBytes((BigDecimal)obj);
        } else if(obj instanceof Short){
            return Bytes.toBytes(((Short) obj).shortValue());
        } else if(obj instanceof Integer){
            return Bytes.toBytes(((Integer)obj).intValue());
        } else if(obj instanceof Boolean){
            return Bytes.toBytes((Boolean)((Boolean) obj).booleanValue());
        } else {
            throw new SQLException("HBase不支持转化为字节数组的类型:"+obj.getClass().getName());
        }
    }

    /**
     * 将HBase的列名或列族名转化为字节数组
     * @param name
     * @return
     */
    private byte[] toBytes(String name){
        return Bytes.toBytes(name);
    }

    // 最后一个批次的数据最少有一条
    @Override
    public boolean hasNextBatch() throws SQLException{
        return rst.next();
    }

    @Override
    public List<Put> nextBatch() throws SQLException{
        // 预先分配容量
        List<Put> list = new ArrayList<>(batchSize);
        int count = 0;
        do{
            /**
             * 如何将一行解析为多个put(结合配置文件)
             * 对每条数据,创建一个带行键的put,向put中放入HBase列族名,HBase列名,RDB列名
             */
            Put put = new Put(toBytesFromRdb(hbaseRowKeyRdbCol));
            for (Map.Entry<String, Map<String, String>> e : hbaseRdbColMapping.entrySet()) {
                String columnFamily = e.getKey();
                for (Map.Entry<String, String> s : e.getValue().entrySet()) {
                    String hbaseColumn = s.getKey();
                    String rdbColumn = s.getValue();
                    // 需要将内容转变为字节数组传入方法
                    put.addColumn(toBytes(columnFamily),toBytes(hbaseColumn),toBytesFromRdb(rdbColumn));
                }
            }
            list.add(put);
        }while(++count<batchSize && rst.next());
        return list;
    }

}

如何理解一行转化为多个put?
在这里插入图片描述
结果集的实质?
在这里插入图片描述
rst.next() 的两个作用

rst.next();
// 1.判定是否存在下一个有效行
// 2.若存在下一个有效行,则指向该有效行

a. 只通过config作为参数构造rdb
b. 以JDBC为核心,需要连接对象(驱动,URL,账号,密码)=>执行对象(SQL)=>结果集,这些都需要被设计为全局变量(因为需要被共享)
c. 既实现了RDB接口,还实现了RDB的继承接口Com中的init()、close()进行资源的初始化和释放,checkAndGetConfig()根据传入的配置文件获取配置信息并且赋值给全局变量。
d. 重点:我们还需要对RDB和HBase的映射关系进行解析,最终解析出RDB列名,HBase列族名,HBase列名,具体如何解析参考配置文件transfer.properties,并将解析出来的名字构造成一个Put对象,由于构造Put对象只能放字节数组,所以需要转化为字节数组的方法,又因为解析RDB的列名需要考虑列的数据类型,而解析HBase的列族或列名不需要考虑,因此需要有两个转换方法==ToBytesFromRDB()和ToBytes()==分别实现两种情况的字节数组转化。

  • HBase接口
public interface HBase extends Com {
    // RDBImpl的nextBatch()返回的就是List<Put>,直接放入HBase表即可。
    void putBatch(List<Put> batch) throws IOException;
}
  • HBase实现类
public class HBaseImpl implements HBase {
    private static Logger loggerHBase = Logger.getLogger(HBaseImpl.class);
    private Properties config;
    private Connection con;
    private Table hbaseTable;


    public HBaseImpl(Properties config) {
        this.config = config;
    }

    @Override
    public Properties config() {
        return config;
    }

    @Override
    public void init() throws Exception {
        con = getCon();
        loggerHBase.info("HBase 创建 [ 连接 ] 成功");
        hbaseTable = checkAndGetTable(con);
        loggerHBase.info("HBase 创建 [ 数据表 ] 成功");
    }

    @Override
    public void close() {
        closeAll(hbaseTable,con);
    }

    private String tableName(){
        return checkAndGetConfig("hbase.table.name");
    }
    private String zkUrl(){
        return checkAndGetConfig("hbase.zk");
    }

    private Connection getCon() throws IOException {
        // hadoop.conf的configuration
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum",zkUrl());
        return ConnectionFactory.createConnection(config);
    }

    private Table checkAndGetTable(Connection con) throws IOException {
        /**
         * Admin : HBase DDL
         */
        Admin admin = con.getAdmin();
        TableName tableName = TableName.valueOf(tableName());
        // 通过tableName判定表是否存在
        if(!admin.tableExists(tableName)){
            throw new IOException("HBase表不存在异常:"+tableName);
        }
        /**
         * Table : HBase DML & DQL
         */
        // 传入的参数可以是TableName tableName,ExecutorService pool(表操作可以并发)
        return con.getTable(tableName);
    }

    @Override
    public void putBatch(List<Put> batch) throws IOException{
        hbaseTable.put(batch);
    }
}

HBase的实现类和RDB的实现类也非常类似:
先重写HBase接口中的方法和Com接口中的方法,发现往里放数据需要构造一个Table对象,而Table对象的构建需要一个连接对象和TableName,因此在构造了两个方法tableName()获取配置信息中的TableName(注意:此时的TableName是字符串类型),zkUrl()获取zk.url作为配置构造连接对象。

  • Com接口
public interface Com {
    Logger logger = Logger.getLogger(Com.class);
    // 获取配置对象
    Properties config();

    // 初始化资源
    void init() throws Exception;

    // 释放资源
    void close();

    default String checkAndGetConfig(String key){
        if(!config().containsKey(key)){
            // 因为该方法可能被用于HBase和RDB
            throw new RuntimeException("配置项缺失异常:"+key);
        }
        String item = config().getProperty(key);
        logger.info(String.format("获取配置项 %s : %s",key,item));
        return item;
    }

    default void closeAll(AutoCloseable...acs){
        for (AutoCloseable ac : acs) {
            if (Objects.nonNull(ac)) {
                try {
                    ac.close();
                    logger.info(String.format("释放 %s 成功",ac.getClass().getName()));
                } catch (Exception e) {
                    logger.error("释放资源异常:"+e);
                }
            }
        }
    }
}

在Com接口中,设计了一些普通方法config()实现配置的导出,init()、close()资源的初始化和关闭;同样还设计了一些无需实现的默认方法便于实现init()和close()方法。这些方法适用于RDB和HBase的实现类。

  • RDBToHBase接口
public interface RDBToHBase {
    // 创建一个RDB对象
    void setRDB(RDB rdb);
    // 创建一个HBase对象
    void setHBase(HBase hbase);
    // 进行数据的传输
    void startTransfer();
}
  • RDBToHBase实现类
public class RDBToHBaseImpl implements RDBToHBase {
    // 日志显示
    private static Logger loggerRH = Logger.getLogger(RDBToHBaseImpl.class);
    private RDB rdb;
    private HBase hbase;

    @Override
    public void setRDB(RDB rdb) {
        this.rdb = rdb;
    }

    @Override
    public void setHBase(HBase hbase) {
        this.hbase = hbase;
    }

    @Override
    public void startTransfer() {
        try {
            rdb.init();
            loggerRH.info("RDB 初始化成功");
            hbase.init();
            loggerRH.info("HBase 初始化成功");
            loggerRH.info("数据从 RDB 迁移至 HBase 开始...");
            int count = 0;
            while (rdb.hasNextBatch()) {
                final List<Put> batch = rdb.nextBatch();
                hbase.putBatch(batch);
                loggerRH.info(String.format("第 %d 批:%d 条数据插入成功",++count,batch.size()));
            }
            loggerRH.info("数据从 RDB 迁移至 HBase 结束...");
        } catch (Exception e){
            loggerRH.error("将 RDB 数据批量迁移至 HBase 异常",e);
        } finally{
            hbase.close();
            rdb.close();
        }
    }
}
  • AppRDBToHBase 实现类
public class AppRDBToHBase
{
    private static Logger logger = Logger.getLogger(AppRDBToHBase.class);
    private static void start(String[] args){
        try {
            if (Objects.isNull(args) || args.length == 0 || Objects.isNull(args[0])) {
                throw new NullPointerException("配置文件路径空指针异常");
            }
            final String PATH = args[0];
            final File file = new File(PATH);
            if (!file.exists() || file.length() == 0 || !file.canRead()) {
                throw new IOException("配置文件不存在、不可读、空白");
            }
            Properties config = new Properties();
            // final String path = args[0];
            config.load(new FileReader(file));

            RDB rdb = new RDBImpl(config);
            HBase hBase = new HBaseImpl(config);
            RDBToHBase rdbToHBase = new RDBToHBaseImpl();
            rdbToHBase.setRDB(rdb);
            rdbToHBase.setHBase(hBase);
            rdbToHBase.startTransfer();
        }catch(Exception e){
            logger.error("配置异常",e);
        }
    }
    public static void main( String[] args ) {
        start(args);
    }
}

对于传入的配置文件路径,既要检查路径本身,也要检查路径代表的文件本身。
通过流的方式将文件进行配置,并且利用该配置构造RDB和HBase并进行数据的传输

其他:日志文件系统Log.4j的应用
  • 准备:需要在Resources模块下配置log4j.properties文件
  • 注意:
    • 日志文件信息的输出方式有三种logger.error()、logger.info()、logger.warn() ,除了对错误信息进行输出之外,也要习惯于补充正常信息的输出,以增强代码的可读性。
    • log.4j除了在控制台打印日志信息之外,还能在磁盘下的日志文件中打印日志信息,因此在导入log4j.properties文件之后需要修改日志文件的路径。
    • 对于不同类或接口下的logger,需要注意进行名字的区分。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/462606.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JUnit 面试题及答案整理,最新面试题

JUnit中的断言&#xff08;Assert&#xff09;有哪些类型&#xff1f; JUnit提供了多种断言类型来帮助测试代码的正确性。常见的断言类型包括&#xff1a; 1、assertEquals&#xff1a; 用于检查两个值是否相等。如果不相等&#xff0c;测试失败。 2、assertTrue和assertFal…

Midjourney订阅攻略

订阅Midjourney的攻略主要包括以下几个步骤&#xff1a; 访问Midjourney官网&#xff1a;首先&#xff0c;你需要访问Midjourney的官方网站。你可以通过搜索引擎找到它&#xff0c;或者通过社交媒体、论坛等渠道获取官方网站链接。了解订阅选项&#xff1a;在官网上&#xff0…

HarmonyOS NEXT应用开发—视频全屏切换案例

介绍 本示例介绍了Video组件和ohos.window接口实现媒体全屏的功能。 该场景多用于首页瀑布流媒体播放等。 效果图预览 使用说明&#xff1a; 点击全屏按钮&#xff0c;横屏媒体窗口。点击恢复窗口按钮&#xff0c;恢复媒体窗口。 实现步骤 在Video组件内调用 onFullscreen…

并发编程之join 方法的详细解析

3.8 join 方法详解 为什么需要 join 下面的代码执行&#xff0c;打印 r 是什么&#xff1f; static int r 0; public static void main(String[] args) throws InterruptedException {test1(); } private static void test1() throws InterruptedException {log.debug(&quo…

CSS学习(2)-盒子模型

1. CSS 长度单位 px &#xff1a;像素。em &#xff1a;相对元素 font-size 的倍数。rem &#xff1a;相对根字体大小&#xff0c;html标签就是根。% &#xff1a;相对父元素计算。 注意&#xff1a; CSS 中设置长度&#xff0c;必须加单位&#xff0c;否则样式无效&#xff…

《JAVA与模式》之简单工厂模式

系列文章目录 文章目录 系列文章目录前言一、简单工厂模式二、简单工厂模式的优缺点前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 在阎宏博士的《JAVA与模式》一…

如何在IDEA 中设置背景图片

在IDEA 中设置背景图片&#xff0c;可以按照以下步骤操作&#xff1a; 1、打开 IntelliJ IDEA 软件&#xff0c;进入代码编辑主界面。 点击编辑窗口上方的“File”菜单项。 2、在下拉子菜单中&#xff0c;选择“Settings”选项&#xff08;如果你使用的是 macOS&#xff0c;可…

双指针、bfs与图论

1238. 日志统计 - AcWing题库 import java.util.*;class PII implements Comparable<PII>{int x, y;public PII(int x, int y){this.x x;this.y y;}public int compareTo(PII o){return Integer.compare(x, o.x);} }public class Main{static int N 100010, D, K;st…

Android 面试题及答案整理,最新面试题

Android中Intent的作用及分类 Intent在Android中用作组件之间传递信息&#xff0c;它可以用于启动活动(activity)、服务(service)和发送广播(broadcast)。Intent主要分为显式Intent和隐式Intent两大类。 1、显式Intent&#xff1a; 直接指定了要启动的组件名称&#xff08;比如…

Lua中文语言编程源码-第一节,更改llex.c词法分析器模块, 使Lua支持中文关键词。

源码已经更新在CSDN的码库里&#xff1a; git clone https://gitcode.com/funsion/CLua.git 在src文件夹下的llex.c&#xff0c;是Lua的词法分析器模块。 增加中文保留字标识符列表&#xff0c;保留英文保留字标识符列表。 搜索“ORDER RESERVED”&#xff0c;将原始代码 …

嵌入式硬件设计(一)|利用 NodeMCU-ESP8266 开发板和继电器结合APP“点灯•blinker”制作Wi-Fi智能开关(附有关硬件详细资料)

概述 本文主要讲述利用 NodeMCU-ESP8266 开发板和继电器通过手机 APP “ 点灯 • Blinker ” 制作一款能够由手机控制的WiFi 智能开关&#xff0c;从而实现智能物联。NodeMCU 是基于 Lua 的开源固件&#xff0c;ESP8266-NodeMCU是一个开源硬件开发板&#xff0c;支持WiFi功能&a…

IDEA创建Sping项目只能勾选17和21,没有Java8?

解决办法: 替换创建项目的源 我们只知道IDEA页面创建Spring项目&#xff0c;其实是访问spring initializr去创建项目。故我们可以通过阿里云国服去间接创建Spring项目。将https://start.spring.io/或者http://start.springboot.io/替换为 https://start.aliyun.com/

OpenCV系列文章目录(持续更新中......)

引言&#xff1a; OpenCV是一个开源的计算机视觉库&#xff0c;由英特尔公司开发并开源的一组跨平台的C函数和少量的C函数组成&#xff0c;用于实时图像处理、计算机视觉和机器学习等应用领域。OpenCV可以在包括Windows、Linux、macOS等各种操作系统平台上使用&#xff0c;具…

Github Copilot 工具,无需账号,一键激活

① 无需账号&#xff0c;100%认证成功&#xff01;0风险&#xff0c;可联网可更新&#xff0c;&#xff0c;支持copilot版本升级&#xff0c;支持chat ② 支持windows、mac、linux系统等设备 ③一号通用&#xff0c;支持所有IDE(AppCode,CLion,DataGrip,GoLand,IntelliJ IDEA …

面向对象(C# )

面向对象&#xff08;C# &#xff09; 文章目录 面向对象&#xff08;C# &#xff09;ref 和 out传值调用和引用调用ref 和 out 的使用ref 和 out 的区别 结构体垃圾回收GC封装成员属性索引器静态成员静态类静态构造函数拓展方法运算符重载内部类和分布类 继承里氏替换继承中的…

图像处理ASIC设计方法 笔记11 像素误差与字长优化

P108 P105 定点误差分析与字长优化 1 像素误差是什么原因导致的? 在本书所说的算法中,像素误差是由几次定点运算累加导致的: 首先由行(列)号与定点正弦/正切值计算出该行(列)的小数平移量,然后将这些小数平移量截取一定字长用来计算插值核,再将这些插值核也截取一…

python统计分析——单变量分布之量化变异度

参考资料&#xff1a;python统计分析【托马斯】 1、极差 极差仅仅是最高值和最低值之间的差异。使用函数为&#xff1a;numpy.ptp()。代码如下&#xff1a; import numpy as npxnp.arange(1,11) np.ptp(x) ptp代表“峰值到峰值”&#xff0c;唯一应该注意的异常值&#xff0c…

LCD屏的应用

一、LCD屏应用 Linux下一切皆文件&#xff0c;我们的LCD屏再系统中也是一个文件&#xff0c;设备文件&#xff1a;/dev/fb0。 如果要在LCD屏显示数据&#xff0c;那我们就可以把数据写入LCD屏的设备文件。 1.显示颜色块 LCD屏分辨&#xff1a;800*480 像素 32位:说明一个像…

HarmonyOS NEXT应用开发—发布图片评论

介绍 本示例将通过发布图片评论场景&#xff0c;介绍如何使用startAbilityForResult接口拉起相机拍照&#xff0c;并获取相机返回的数据。 效果图预览 使用说明 通过startAbilityForResult接口拉起相机&#xff0c;拍照后获取图片地址。 实现思路 创建CommentData类&#…

CSS中如何设置单行或多行内容超出后,显示省略号

1. 设置超出显示省略号 css设置超出显示省略号可分两种情况&#xff1a; 单行文本溢出显示省略号…多行文本溢出显示省略号… 但使用的核心代码是一样的&#xff1a;需要先使用 overflow:hidden;来把超出的部分隐藏&#xff0c;然后使用text-overflow:ellipsis;当文本超出时…
最新文章