Hadoop 之 Hbase 配置与使用(四)

Hadoop 之 Hbase 配置与使用

  • 一.Hbase 下载
    • 1.Hbase 下载
  • 二.Hbase 配置
    • 1.单机部署
    • 2.伪集群部署(基于单机配置)
    • 3.集群部署
      • 1.启动 hadoop 集群
      • 2.启动 zookeeper 集群
      • 3.启动 hbase 集群
      • 4.集群启停脚本
  • 三.测试
    • 1.Pom 配置
    • 2.Yml 配置
    • 3.Hbase 配置类
    • 4.Hbase 连接池配置
    • 5.测试类
    • 6.启动类
    • 7.测试

一.Hbase 下载

HBase 是一个分布式的、面向列的开源数据库:Hbase API

1.Hbase 下载

Hbase 下载

在这里插入图片描述

跳转到下载链接

在这里插入图片描述

二.Hbase 配置

1.单机部署

## 1.创建安装目录
mkdir -p /usr/local/hbase
## 2.将压缩包拷贝到虚拟机并解压缩
tar zxvf hbase-3.0.0-alpha-4-bin.tar.gz -C /usr/local/hbase/
## 3.添加环境变量
echo 'export HBASE_HOME=/usr/local/hbase/hbase-3.0.0-alpha-4' >> /etc/profile
echo 'export PATH=${HBASE_HOME}/bin:${PATH}' >> /etc/profile
source /etc/profile
## 4.指定 JDK 版本
echo 'export JAVA_HOME=/usr/local/java/jdk-11.0.19' >> $HBASE_HOME/conf/hbase-env.sh
## 5.创建 hbase 存储目录
mkdir -p /home/hbase/data
## 6.修改配置
vim $HBASE_HOME/conf/hbase-site.xml
     添加如下信息
	 <property>
	    <name>hbase.rootdir</name>
	    <value>file:///home/hbase/data</value>
	 </property>

在这里插入图片描述

## 1.进入安装目录
cd $HBASE_HOME
## 2.启动服务
./bin/start-hbase.sh

在这里插入图片描述

## 1.进入安装目录
cd $HBASE_HOME
## 2.关闭服务
./bin/stop-hbase.sh

在这里插入图片描述

2.伪集群部署(基于单机配置)

## 1.修改 hbase-env.sh 
echo 'export JAVA_HOME=/usr/local/java/jdk-11.0.19' >> $HBASE_HOME/conf/hbase-env.sh
echo 'export HBASE_MANAGES_ZK=true' >> $HBASE_HOME/conf/hbase-env.sh
## 2.修改 hbase_site.xml
vim $HBASE_HOME/conf/hbase-site.xml
	 <!-- 将 hbase 数据保存到 hdfs -->
	 <property>
	    <name>hbase.rootdir</name>
	    <value>hdfs://nn/hbase</value>
	 </property>
	 <!-- 分布式配置 -->
	 <property>
	    <name>hbase.cluster.distributed</name>
	    <value>true</value>
	 </property>
	 <!-- 配置 ZK 地址 -->
	 <property>
	    <name>hbase.zookeeper.quorum</name>
	    <value>nn</value>
	 </property>
	 <!-- 配置 JK 地址 -->
	 <property>
	    <name>dfs.replication</name>
	    <value>1</value>
	 </property>
## 3.修改 regionservers 的 localhost 为 nn
echo nn > $HBASE_HOME/conf/regionservers

在这里插入图片描述

## 1.进入安装目录
cd $HADOOP_HOME
## 2.启动 hadoop 服务
./sbin/start-all.sh

在这里插入图片描述

## 1.进入安装目录
cd $HBASE_HOME
## 2.启动服务
./bin/start-hbase.sh

在这里插入图片描述

## 1.进入安装目录
cd $HBASE_HOME
## 2.关闭主节点服务(直接关服务是关不掉的,如图)
. bin/hbase-daemon.sh stop master
## 3.关闭服务
./bin/stop-hbase.sh

在这里插入图片描述

3.集群部署

## 1.创建 zookeeper 数据目录
mkdir -p $HBASE_HOME/zookeeper/data
## 2.进入安装目录
cd $HBASE_HOME/conf
## 3.修改环境配置
vim hbase-env.sh
## 添加 JDK / 启动外置 Zookeeper
	# JDK
	export JAVA_HOME=/usr/local/java/jdk-11.0.19
	# Disable Zookeeper
	export HBASE_MANAGES_ZK=false
## 4.修改 hbase-site.xml
vim hbase-site.xml
## 配置如下信息
	<!--允许的最大同步时钟偏移-->
	<property>
	  <name>hbase.master.maxclockskew</name>`
	  <value>6000</value>
	</property>
	<!--配置 HDFS 存储实例-->
	<property>
	  <name>hbase.rootdir</name>
	  <value>hdfs://nn:9000/hbase</value>
	</property>
	<!--启用分布式配置-->
	<property>
	  <name>hbase.cluster.distributed</name>
	  <value>true</value>
	</property>
	<!--配置 zookeeper 集群节点-->
	<property>
	  <name>hbase.zookeeper.quorum</name>
	  <value>zk1,zk2,zk3</value>
	</property>
	<!--配置 zookeeper 数据目录-->
	<property>
	  <name>hbase.zookeeper.property.dataDir</name>
	  <value>/usr/local/hbase/hbase-3.0.0-alpha-4/zookeeper/data</value>
	</property>
	<!-- Server is not running yet -->
	<property>
      <name>hbase.wal.provider</name>
      <value>filesystem</value>
    </property>
## 5.清空 regionservers 并添加集群节点域名
echo '' > regionservers
echo 'nn' >> regionservers
echo 'nd1' >> regionservers
echo 'nd2' >> regionservers
## 6.分别为 nd1 / nd2 创建 hbase 目录
mkdir -p /usr/local/hbase
## 7.分发 hbase 配置到另外两台虚拟机 nd1 / nd2
scp -r /usr/local/hbase/hbase-3.0.0-alpha-4 root@nd1:/usr/local/hbase
scp -r /usr/local/hbase/hbase-3.0.0-alpha-4 root@nd2:/usr/local/hbase
## 8.分发环境变量配置
scp /etc/profile root@nd1:/etc/profile
scp /etc/profile root@nd2:/etc/profile

1.启动 hadoop 集群

Hadoop 集群搭建参考:Hadoop 搭建

## 1.启动 hadoop
cd $HADOOP_HOME
. sbin/start-all.sh

在这里插入图片描述

## 1.关闭 hadoop 安全模式
hadoop dfsadmin -safemode leave

在这里插入图片描述

2.启动 zookeeper 集群

ZOOKEEPER 集群搭建说明

## 1.启动 zookeeper 集群
zkServer.sh start && ssh root@zk2 "source /etc/profile && zkServer.sh start && exit" && ssh root@zk3 "source /etc/profile && zkServer.sh start && exit"
## 2.查看状态
zkServer.sh status && ssh root@zk2 "source /etc/profile && zkServer.sh status && exit" && ssh root@zk3 "source /etc/profile && zkServer.sh status && exit"

在这里插入图片描述

3.启动 hbase 集群

## 1.分别为 nn /nd1 / nd2 配置 zookeeper 域名解析
echo '192.168.1.100 zk1' >> /etc/hosts
echo '192.168.1.101 zk2' >> /etc/hosts
echo '192.168.1.102 zk3' >> /etc/hosts
## 2.启动 habase
cd $HBASE_HOME
. bin/start-hbase.sh
## 3.停止服务
. bin/hbase-daemon.sh stop master
. bin/hbase-daemon.sh stop regionserver
. bin/stop-hbase.sh


在这里插入图片描述

查看 UI 监控:http://192.168.1.6:16010/master-status

在这里插入图片描述

4.集群启停脚本

#!/bin/bash

case $1 in
"start")
	## start hadoop
	start-all.sh
	## start zookeeper (先配置免密登录)
	zkServer.sh start && ssh root@zk2 "source /etc/profile && zkServer.sh start && exit" && ssh root@zk3 "source /etc/profile && zkServer.sh start && exit"
	## start hbase
	start-hbase.sh
	;;
"stop")
	## stop hbase
	ssh root@nd1 "source /etc/profile && hbase-daemon.sh stop regionserver && stop-hbase.sh && exit"
	ssh root@nd2 "source /etc/profile && hbase-daemon.sh stop regionserver && stop-hbase.sh && exit"
	hbase-daemon.sh stop master  && hbase-daemon.sh stop regionserver && stop-hbase.sh
	## stop zookeeper
	zkServer.sh stop && ssh root@zk2 "source /etc/profile && zkServer.sh stop && exit" && ssh root@zk3 "source /etc/profile && zkServer.sh stop && exit"
	## stop hadoop
	stop-all.sh
	;;
*)
	echo "pls inout start|stop"
	;;
esac

三.测试

## 1.为 Windows 增加 Hosts 配置,添加 Hbase 集群域名解析 编辑如下文件
C:\Windows\System32\drivers\etc\hosts
## 2.增加如下信息
192.168.1.6 nn
192.168.1.7 nd1
192.168.1.8 nd2

测试配置效果

在这里插入图片描述

JDK 版本

在这里插入图片描述

工程结构

在这里插入图片描述

1.Pom 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hbase-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <spring.version>2.7.8</spring.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.32</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>3.0.0-alpha-4</version>
        </dependency>


    </dependencies>

</project>

2.Yml 配置

hbase:
  zookeeper:
    quorum: 192.168.1.100,192.168.1.101,192.168.1.102
    property:
      clientPort: 2181
  master:
    ip: 192.168.1.6
    port: 16000

3.Hbase 配置类

package org.example.config;


import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Administrator
 * @Description
 * @create 2023-07-25 0:26
 */
@Configuration
public class HbaseConfig {

    @Value("${hbase.zookeeper.quorum}")
    private String zookeeperQuorum;

    @Value("${hbase.zookeeper.property.clientPort}")
    private String clientPort;

    @Value("${hbase.master.ip}")
    private String ip;

    @Value("${hbase.master.port}")
    private int masterPort;

    @Bean
    public org.apache.hadoop.conf.Configuration hbaseConfiguration(){
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum",zookeeperQuorum);
        conf.set("hbase.zookeeper.property.clientPort",clientPort);
        conf.set("hbase.masters", ip + ":" + masterPort);
        conf.set("hbase.client.keyvalue.maxsize","20971520");
        return HBaseConfiguration.create(conf);
    }

}

4.Hbase 连接池配置

package org.example.config;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Enumeration;
import java.util.Vector;

/**
 * @author Administrator
 * @Description
 * @create 2023-07-25 22:39
 */
@Slf4j
@Component
public class HbaseConnectionPool {

    /**
     * 连接池的初始大小
     * 连接池的创建步长
     * 连接池最大的大小
     */
    private int nInitConnectionAmount = 3;
    private int nIncrConnectionAmount = 3;
    private int nMaxConnections = 20;

    /**
     * 存放连接池中数据库连接的向量
     */
    private Vector vcConnections = new Vector();

    /**
     * 注入连接配置
     */
    @Resource
    private Configuration hbaseConfiguration;

    /**
     * 初始化连接
     */
    @PostConstruct
    public void init() {
        createConnections(nInitConnectionAmount);
    }

    /**
     * 获取可用连接
     * @return
     */
    public synchronized Connection getConnection() {
        Connection conn;
        while (null == (conn =getFreeConnection())){
            try {
                wait(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 返回获得的可用的连接
        return conn;
    }

    /**
     * 释放连接
     * @param conn
     */
    public synchronized void releaseConnection(Connection conn) {
        ConnectionWrapper connWrapper;
        Enumeration enumerate = this.vcConnections.elements();
        while(enumerate.hasMoreElements()) {
            connWrapper = (ConnectionWrapper) enumerate.nextElement();
            if (conn == connWrapper.getConnection()) {
                connWrapper.setBusy(false);
                break;
            }
        }
    }

    /**
     * 获取可用连接 当前无可用连接则创建 如果已达到最大连接数则返回 null 阻塞后重试获取
     * @return
     */
    private Connection getFreeConnection() {
        Connection conn;
        if (null == (conn = findFreeConnection())) {
            // 创建新连接
            createConnections(nIncrConnectionAmount);
            // 查看是否有可用连接
            if (null == (conn = findFreeConnection())) {
                return null;
            }
        }
        return conn;
    }

    /**
     * 查找可用连接
     * @return
     */
    private Connection findFreeConnection() {
        ConnectionWrapper connWrapper;
        //遍历向量内连接对象
        Enumeration enumerate = vcConnections.elements();
        while (enumerate.hasMoreElements()) {
            connWrapper = (ConnectionWrapper) enumerate.nextElement();
            //判断当前连接是否被占用
            if (!connWrapper.isBusy()) {
                connWrapper.setBusy(true);
                return connWrapper.getConnection();
            }
        }
        // 返回 NULL
        return null;
    }

    /**
     * 创建新连接
     * @param counts
     */
    private void createConnections(int counts) {
        // 循环创建指定数目的数据库连接
        try {
            for (int i = 0; i < counts; i++) {
                if (this.nMaxConnections > 0  && this.vcConnections.size() >= this.nMaxConnections) {
                    log.warn("已达到最大连接数...");
                    break;
                }
                // 创建一个新连接并加到向量
                vcConnections.addElement(new ConnectionWrapper(newConnection()));
            }
        } catch (Exception e) {
            log.error("创建连接失败...");
        }
    }

    /**
     * 创建新连接
     * @return
     */
    private Connection newConnection() {
        /** hbase 连接 */
        Connection conn = null;
        // 创建一个数据库连接
        try {
            conn = ConnectionFactory.createConnection(hbaseConfiguration);
        } catch (Exception e) {
            log.error("HBase 连接失败...");
        }

        // 返回创建的新的数据库连接
        return conn;
    }

    /**
     * 封装连接对象
     */
    @Data
    class ConnectionWrapper {

        /**
         * 数据库连接
         */
        private Connection connection;

        /**
         * 此连接是否正在使用的标志,默认没有正在使用
         */
        private boolean busy = false;

        /**
         * 构造函数,根据一个 Connection 构告一个 PooledConnection 对象
         */
        public ConnectionWrapper(Connection connection) {
            this.connection = connection;
        }
    }

}

5.测试类

package org.example.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.ColumnValueFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;
import org.example.config.HbaseConnectionPool;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;

/**
 * @author Administrator
 *
 * 可利用 aop 进行连接获取和释放处理
 *
 * @Description
 * @create 2023-07-25 23:06
 */
@Slf4j
@RestController
@RequestMapping("/hbase")
public class HbaseController {

    @Resource
    private HbaseConnectionPool pool;

    /**
     * 表名
     */
    private String tbl_user = "tbl_user";

    /**
     * 创建表(不允许重复创建)
     */
    @GetMapping("/create")
    public void createTable(){
        Connection conn = null;
        //获取连接
        try {
            conn = pool.getConnection();
            Admin admin = conn.getAdmin();
            TableName tableName = TableName.valueOf(tbl_user);
            if (!admin.tableExists(tableName)){
                //指定表名
                TableDescriptorBuilder tdb_user = TableDescriptorBuilder.newBuilder(tableName);
                //添加列族(info,data)
                ColumnFamilyDescriptor hcd_info = ColumnFamilyDescriptorBuilder.of("name");
                ColumnFamilyDescriptor hcd_data = ColumnFamilyDescriptorBuilder.of("age");
                tdb_user.setColumnFamily(hcd_info);
                tdb_user.setColumnFamily(hcd_data);
                //创建表
                TableDescriptor td = tdb_user.build();
                admin.createTable(td);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (null != conn){
                pool.releaseConnection(conn);
            }
        }
    }

    /**
     * 删除表(不允许删除不存在的表)
     */
    @GetMapping("/drop")
    public void dropTable(){
        Connection conn = null;
        try {
            conn = pool.getConnection();
            Admin admin = conn.getAdmin();
            TableName tableName = TableName.valueOf(tbl_user);
            if (admin.tableExists(tableName)){
                admin.disableTable(tableName);
                admin.deleteTable(tableName);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            if (null != conn){
                pool.releaseConnection(conn);
            }
        }
    }

    /**
     * 插入测试
     */
    @GetMapping("/insert")
    public void insert(){
        log.info("---插入一列数据---1");
        putData(tbl_user, "row1", "name", "a", "zhangSan");
        putData(tbl_user, "row1", "age", "a", "18");
        log.info("---插入多列数据---2");
        putData(tbl_user, "row2", "name",
                Arrays.asList("a", "b", "c"),  Arrays.asList("liSi", "wangWu", "zhaoLiu"));
        log.info("---插入多列数据---3");
        putData(tbl_user, "row3", "age",
                Arrays.asList("a", "b", "c"),  Arrays.asList("18","19","20"));
        log.info("---插入多列数据---4");
        putData(tbl_user, "row4", "age",
                Arrays.asList("a", "b", "c"),  Arrays.asList("30","19","20"));

    }

    /**
     * 插入数据(单条)
     * @param tableName 表名
     * @param rowKey rowKey
     * @param columnFamily 列族
     * @param column 列
     * @param value 值
     * @return true/false
     */
    public boolean putData(String tableName, String rowKey, String columnFamily, String column,
                           String value) {
        return putData(tableName, rowKey, columnFamily, Arrays.asList(column),
                Arrays.asList(value));
    }

    /**
     * 插入数据(批量)
     * @param tableName 表名
     * @param rowKey rowKey
     * @param columnFamily 列族
     * @param columns 列
     * @param values 值
     * @return true/false
     */
    public boolean putData(String tableName, String rowKey, String columnFamily,
                           List<String> columns, List<String> values) {
        Connection conn = null;
        try {
            conn = pool.getConnection();
            Table table = conn.getTable(TableName.valueOf(tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            for (int i=0; i<columns.size(); i++) {
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns.get(i)), Bytes.toBytes(values.get(i)));
            }
            table.put(put);
            table.close();
            return true;
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        } finally {
            if (null != conn){
                pool.releaseConnection(conn);
            }
        }
    }

    /**
     * 查询测试
     */
    @GetMapping("/query")
    public void getResultScanner(){
        log.info("全表数据:{}",getData(tbl_user));
        log.info("过滤器,按年龄 [18]:{}",getData(tbl_user,new ColumnValueFilter(Bytes.toBytes("age"), Bytes.toBytes("a"), CompareOperator.EQUAL, Bytes.toBytes("18"))));
        log.info("根据 rowKey [row1]:{}",getData(tbl_user,"row1"));
        log.info("根据 rowKey 列族 列 [row2 name a]:{}",getData(tbl_user,"row2","name","a"));
    }

    /**
     * 获取数据(全表数据)
     * @param tableName 表名
     * @return map
     */
    public List<Map<String, String>> getData(String tableName) {
        List<Map<String, String>> list = new ArrayList<>();
        Connection conn = null;
        try {
            conn = pool.getConnection();
            Table table = conn.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            ResultScanner resultScanner = table.getScanner(scan);
            for(Result result : resultScanner) {
                HashMap<String, String> map = new HashMap<>(result.listCells().size());
                map.put("row", Bytes.toString(result.getRow()));
                for (Cell cell : result.listCells()) {
                    //列族
                    String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
                    //列
                    String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                    //值
                    String data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                    map.put(family + ":" + qualifier, data);
                }
                list.add(map);
            }
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != conn){
                pool.releaseConnection(conn);
            }
        }
        return list;
    }

    /**
     * 获取数据(根据 filter)
     * @param tableName 表名
     * @param filter 过滤器
     * @return map
     */
    public List<Map<String, String>> getData(String tableName, Filter filter) {
        List<Map<String, String>> list = new ArrayList<>();
        Connection conn = null;
        try {
            conn = pool.getConnection();
            Table table = conn.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            // 添加过滤器
            scan.setFilter(filter);
            ResultScanner resultScanner = table.getScanner(scan);
            for(Result result : resultScanner) {
                HashMap<String, String> map = new HashMap<>(result.listCells().size());
                map.put("row", Bytes.toString(result.getRow()));
                for (Cell cell : result.listCells()) {
                    String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
                    String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                    String data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                    map.put(family + ":" + qualifier, data);
                }
                list.add(map);
            }
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != conn){
                pool.releaseConnection(conn);
            }
        }
        return list;
    }

    /**
     * 获取数据(根据 rowKey)
     * @param tableName 表名
     * @param rowKey rowKey
     * @return map
     */
    public Map<String, String> getData(String tableName, String rowKey) {
        HashMap<String, String> map = new HashMap<>();
        Connection conn = null;
        try {
            conn = pool.getConnection();
            Table table = conn.getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            Result result = table.get(get);
            if (result != null && !result.isEmpty()) {
                for (Cell cell : result.listCells()) {
                    String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
                    String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                    String data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                    map.put(family + ":" + qualifier, data);
                }
            }
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != conn){
                pool.releaseConnection(conn);
            }
        }
        return map;
    }

    /**
     * 获取数据(根据 rowKey 列族 列)
     * @param tableName 表名
     * @param rowKey rowKey
     * @param columnFamily 列族
     * @param columnQualifier 列
     * @return map
     */
    public String getData(String tableName, String rowKey, String columnFamily,
                          String columnQualifier) {
        String data = "";
        Connection conn = null;
        try {
            conn = pool.getConnection();
            Table table = conn.getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier));
            Result result = table.get(get);
            if (result != null && !result.isEmpty()) {
                Cell cell = result.listCells().get(0);
                data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
            }
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != conn){
                pool.releaseConnection(conn);
            }
        }
        return data;
    }

    /**
     * 删除数据
     */
    @GetMapping("/delete")
    public void delete(){
        log.info("---删除 rowKey --- row1 ");
        deleteData(tbl_user,"row1");
        log.info("---删除 rowKey 列族 --- row2 age ");
        deleteData(tbl_user,"row2","age");
    }

    /**
     * 删除数据(根据 rowKey)
     * @param tableName 表名
     * @param rowKey rowKey
     */
    public void deleteData(String tableName, String rowKey) {
        Connection conn = null;
        try {
            conn = pool.getConnection();
            Table table = conn.getTable(TableName.valueOf(tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            table.delete(delete);
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != conn){
                pool.releaseConnection(conn);
            }
        }
    }

    /**
     * 删除数据(根据 row key,列族)
     * @param tableName 表名
     * @param rowKey rowKey
     * @param columnFamily 列族
     */
    public void deleteData(String tableName, String rowKey, String columnFamily) {
        Connection conn = null;
        try {
            conn = pool.getConnection();
            Table table = conn.getTable(TableName.valueOf(tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            delete.addFamily(columnFamily.getBytes());
            table.delete(delete);
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != conn){
                pool.releaseConnection(conn);
            }
        }
    }

}


6.启动类

package org.example;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Administrator
 */
@SpringBootApplication
public class HbaseDemo {
    public static void main(String[] args) {
        SpringApplication.run(HbaseDemo.class,args);
    }
}

7.测试

创建表:http://127.0.0.1:8080/hbase/create
插入:http://127.0.0.1:8080/hbase/insert
查询:http://127.0.0.1:8080/hbase/query
删除:http://127.0.0.1:8080/hbase/delete
删除表:http://127.0.0.1:8080/hbase/drop

查看 UI

在这里插入图片描述

测试输出日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/44882.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

关于PyTorch中一维卷积Conv1d的理解

首先明确一点&#xff0c;PyTorch中的一维卷积是从左往右做的&#xff0c;不是从上往下。 然后明确第二点&#xff0c;一维卷积和二维卷积最大的区别在于&#xff0c;一维卷积的卷积方向只有一个维度&#xff0c;一维卷积的卷积核不像二维卷积核一样可以左右和上下两个维度移动…

【使用时空RBF-NN进行非线性系统识别】实现了 RBF、分数 RBF 和时空 RBF 神经网络,用于非线性系统识别研究(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 2.1 算例1 2.2 算例2 &#x1f389;3 参考文献 &#x1f308;4 Matlab代码实现 &#x1f4a5;1 概述 本文用于非线性系统识别任务的径向基函数神经网络&#xff08;RBF-NN&#xff09;的三种变体。特别是&#xff0c;我实现…

大模型,开源干不掉闭源

开源大模型对闭源大模型的冲击&#xff0c;变得非常猛烈。 今年3月&#xff0c;Meta发布了Llama&#xff08;羊驼&#xff09;&#xff0c;很快成为AI社区内最强大的开源大模型&#xff0c;也是许多模型的基座模型。有人戏称&#xff0c;当前的大模型集群&#xff0c;就是一堆各…

刘铁猛C#教程笔记——操作符

C#语言中的操作符 表中位于同一行的操作符优先级相同&#xff0c;从上到下优先级依次减弱&#xff1b; 操作符的用法举例 成员访问运算符——“.”&#xff1a;用于访问类中的成员或者访问位于某个名空间中的类&#xff0c;如&#xff1a; using System; using System.Collec…

25.3 matlab里面的10中优化方法介绍——Nelder-Mead法(matlab程序)

1.简述 fminsearch函数用来求解多维无约束的线性优化问题 用derivative-free的方法找到多变量无约束函数的最小值 语法 x fminsearch(fun,x0) x fminsearch(fun,x0,options) [x,fval] fminsearch(...) [x,fval,exitflag] fminsearch(...) [x,fval,exitflag,output] fmins…

使用sftp

一、背景 新项目组前端部署方式是Build打包生成dist文件&#xff0c;交由后端部署。后来知道了vscode安装sftp前端可以自行部署。 二、实操 1、vscode安装sftp 2、 配置 ①F1 / ctrlshiftp ②命令行输入sftp -> 选择 sftp: Config ③配置信息介绍 {"name"…

vscode默认gbk编码格式打开

目录 1. 问题描述2. 解决方案 1. 问题描述 每次打开vscode都是utf-8格式打开文件&#xff0c;然后满屏的中文乱码&#xff0c;自己手动换成gbk编码 后中文显示正常&#xff0c;但是换多了很烦。 2. 解决方案 ctrlshiftP 点首选项&#xff1a;打开用户设置 加上这行在最后&…

SpringBoot静态资源访问及参数处理

静态资源访问&#xff1a; 资源访问&#xff1a; 1&#xff1a;Spring Boot 支持静态和模板化的欢迎页面。它首先在配置的静态内容位置中查找index.html文件。如果未找到&#xff0c;则查找index相关模板。如果找到任一&#xff0c;它将自动用作应用程序的欢迎页面。 2&…

Elasticsearch笔记

一、ElasticSearch概述 ElasticSearch&#xff08;简称ES&#xff09;是一个分布式、RESTful 风格的搜索引擎、数据分析引擎。ES底层是基于Apache Lucene搜索引擎库实现的&#xff0c;但是ES的目的是通过简单的RESTful API来隐藏Lucene的复杂性&#xff0c;从而让全文搜索变得简…

Redisson实现简单消息队列:优雅解决缓存清理冲突

在项目中&#xff0c;缓存是提高应用性能和响应速度的关键手段之一。然而&#xff0c;当多个模块在短时间内发布工单并且需要清理同一个接口的缓存时&#xff0c;容易引发缓存清理冲突&#xff0c;导致缓存失效的问题。为了解决这一难题&#xff0c;我们采用Redisson的消息队列…

【MCU学习】RTthread工程介绍

RT-Thread架构 RT-Thread诞生于2006年&#xff0c;是一款以开源、中立、社区化发展起来的物联网操作系统。 RT-Thread主要采用 C 语言编写&#xff0c;浅显易懂&#xff0c;且具有方便移植的特性&#xff08;可快速移植到多种主流 MCU 及模组芯片上&#xff09;。RT-Thread把面…

cocosCreator 之 ScrollView

版本&#xff1a;3.4.0 参考&#xff1a;ScrollView组件 简介 ScrollView组件作为滚动容器来使用&#xff0c;它的实现通过ScrollBar组件来展示内容的位置和Mask组件显示指定区域&#xff0c;来保证有限的区域内显示更多的内容。 它的构成部分&#xff1a; ScrollBar滚动条相…

03 shell 编程

变量 语言型 编译型语言 解释型语言 shell脚本语言是解释型语言shell脚本的本质&#xff1a;shell命令的有序集合 shell 编程的基本过程 基本过程分为三步&#xff1a; step1. 建立 shell 文件 包含任意多行操作系统命令或shell命令的文本文件; step2. 赋予shell文件执行…

23 自定义控件

案例&#xff1a;组合Spin Box和Horizontal Slider实现联动 新建Qt设计师界面&#xff1a; 选择Widget&#xff1a; 选择类名&#xff08;生成.h、.cpp、.ui文件&#xff09; 在smallWidget.ui中使用Spin Box和Horizontal Slider控件 可以自定义数字区间&#xff1a; 在主窗口w…

脑电信号处理与特征提取——1. 脑电、诱发电位和事件相关电位(胡理)

目录 一、 脑电、诱发电位和事件相关电位 1.1 EEG基本知识 1.2 经典的ERPs成分及研究 1.2.1 ERPs命名规则及分类 1.2.2 常见的脑电成分 1.2.3 P300及Oddball范式 1.2.4 N400成分 一、 脑电、诱发电位和事件相关电位 1.1 EEG基本知识 EEG(Electroencephalogram)&#x…

MFC第二十天 数值型关联变量 和单选按钮与复选框的开发应用

文章目录 数值型关联变量数值型关联变量的种类介绍 单选按钮与复选框单选按钮的组内选择原理解析单选按钮和复选框以及应用数值型关联变量的开发CMainDlg.cppCInputDlg.hCInputDlg.cpp 附录 数值型关联变量 数值型关联变量的种类介绍 1、 数值型关联变量&#xff1a; a)控件型…

全志F1C200S嵌入式驱动开发(解决spi加载过慢的问题)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 之前的几个章节当中,我们陆续解决了spi-nor驱动的问题、uboot支持spi-nor的问题。按道理来说,下面要做的应该就是用uboot的loady命令把kernel、dtb、rootfs这些文件下载到ddr,然…

执行 yum install gcc 报 【-bash: $‘yum\302\240install\302\240gcc‘: 未找到命令】

执行 yum install gcc 报错 找了一圈&#xff0c;执行&#xff1a;sudo apt-get install yum 执行&#xff1a;wget http://yum.baseurl.org/download/3.2/yum-3.2.28.tar.gz 在线下载yum完成 对其进行解压&#xff1a;tar zxvf yum-3.2.28.tar.gz 解压后如下&#xff1a; 执行…

Tiny Player (js) - 轻量好用、免费开源的 web 视频播放开发组件,内置硬解、软解视频功能

一款简单好用的 JS 视频播放器&#xff0c;完美解决我遇到的移动端播放视频的需求&#xff0c;安利给各位。 关于 Tiny Player Tiny Player 是一个极简的视频播放器 JS 库&#xff0c;内置硬解、软解视频功能&#xff0c;支持原生控件样式以及自定义控件样式&#xff0c;小巧…

Android Service启动ANR原理

一、前言 在Service组件StartService()方式启动流程分析文章中&#xff0c;针对Context#startService()启动Service流程分析了源码&#xff0c;其实关于Service启动还有一个比较重要的点是Service启动的ANR&#xff0c;因为因为线上出现了上百例的"executing service &quo…
最新文章