深入Doris实时数仓:导入本地数据

码到三十五 : 个人主页

心中有诗画,指尖舞代码,目光览世界,步履越千山,人间尽值得 !

< 免责声明 >

  • 避免对文章进行过度解读,因为每个人的知识结构和认知背景不大同,没有一种通用的解决方案。
  • 对于文章观点,不必急于评判。融入其中,审视自我,尝试从旁观者角度认清当前的成长阶段。
  • 真正的智慧在于实践中找到适合自己的方式和路径。 day day up ! 一起加油吧 ~~

本文主要介绍如何从客户端导入本地的数据。

目录

    • 一、Doris数据导入总览
      • 支持的数据源
      • 按场景划分
      • 按导入方式划分
      • 支持的数据格式
      • 导入的原子性保证
      • 同步及异步导入
    • 二、本地导入的Stream Load模式
      • 2.1 Stream Load原理
      • 2.2 导入数据
    • 三、本地导入的MySQL Load模式
      • 3.1 基本原理
      • 3.2 导入数据
    • 四、实时数仓结合机器学习、大模型的发展趋势

一、Doris数据导入总览

在这里插入图片描述

支持的数据源

Doris 提供多种数据导入方案,可以针对不同的数据源进行选择不同的数据导入方式。

按场景划分

数据源导入方式
对象存储(s3),HDFS使用Broker导入数据
本地文件导入本地数据
Kafka订阅Kafka数据
Mysql、PostgreSQL,Oracle,SQLServer通过外部表同步数据
通过JDBC导入使用JDBC同步数据
导入JSON格式数据JSON格式数据导入

按导入方式划分

导入方式名称使用方式
Spark Load通过Spark导入外部数据
Broker Load通过Broker导入外部存储数据
Stream Load流式导入数据(本地文件及内存数据)
Routine Load导入Kafka数据
Insert Into外部表通过INSERT方式导入数据
S3 LoadS3协议的对象存储数据导入
MySQL LoadMySQL客户端导入本地数据

支持的数据格式

不同的导入方式支持的数据格式略有不同。

导入方式支持的格式
Broker Loadparquet、orc、csv、gzip
Stream Loadcsv、json、parquet、orc
Routine Loadcsv、json
MySQL Loadcsv

导入的原子性保证

Doris 的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。

同时,一个导入作业都会有一个 Label。这个 Label 是在一个数据库(Database)下唯一的,用于唯一标识一个导入作业。Label 可以由用户指定,部分导入功能也会由系统自动生成。

Label 是用于保证对应的导入作业,仅能成功导入一次。一个被成功导入的 Label,再次使用时,会被拒绝并报错 Label already used。通过这个机制,可以在 Doris 侧做到 At-Most-Once 语义。如果结合上游系统的 At-Least-Once 语义,则可以实现导入数据的 Exactly-Once 语义。

关于原子性保证的最佳实践,可以参阅 导入事务和原子性。

同步及异步导入

导入方式分为同步和异步。对于同步导入方式,返回结果即表示导入成功还是失败。而对于异步导入方式,返回成功仅代表作业提交成功,不代表数据导入成功,需要使用对应的命令查看导入作业的运行状态。

目前Doris支持两种从本地导入数据的模式:

  • Stream Load
  • MySQL Load

二、本地导入的Stream Load模式

Stream Load 用于将本地文件导入到 Doris 中。
在这里插入图片描述

2.1 Stream Load原理

Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。

Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

相比于直接使用 curl 的单并发导入,更推荐使用 专用导入工具 Doris Streamloader 该工具是一款用于将数据导入 Doris 数据库的专用客户端工具,可以提供 多并发导入 的功能,降低大数据量导入的耗时。拥有以下功能:

  • 并发导入,实现 Stream Load 的多并发导入。可以通过 workers 值设置并发数。
  • 多文件导入,一次导入可以同时导入多个文件及目录,支持设置通配符以及会自动递归获取文件夹下的所有文件。
  • 断点续传,在导入过程中可能出现部分失败的情况,支持在失败点处进行继续传输。
  • 自动重传,在导入出现失败的情况后,无需手动重传,工具会自动重传默认的次数,如果仍然不成功,打印出手动重传的命令。

不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。

该方式中涉及 HOST:PORT 应为 HTTP 协议端口。

  • BE 的 HTTP 协议端口,默认为 8040。
  • FE 的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 BE 所在机器。

本文文档我们以 curl 命令为例演示如何进行数据导入。

文档最后,我们给出一个使用 Java 导入数据的代码示例

2.2 导入数据

Stream Load 的请求体如下:

PUT /api/{db}/{table}/_stream_load

1. 创建一张表

通过 CREATE TABLE 命令在demo创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:

CREATE TABLE IF NOT EXISTS load_local_file_test
(
    id INT,
    age TINYINT,
    name VARCHAR(50)
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;

2. 导入数据

执行以下 curl 命令导入本地文件:

 curl -u user:passwd -H "label:load_local_file_test" -T /path/to/local/demo.txt http://host:port/api/demo/load_local_file_test/_stream_load
  • user:passwd 为在 Doris 中创建的用户。初始用户为 admin / root,密码初始状态下为空。
  • host:port 为 BE 的 HTTP 协议端口,默认是 8040,可以在 Doris 集群 WEB UI页面查看。
  • label: 可以在 Header 中指定 Label 唯一标识这个导入任务。

关于 Stream Load 命令的更多高级操作,请参阅 Stream Load 命令文档。

3. 等待导入结果

Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

{
    "TxnId": 1003,
    "Label": "load_local_file_test",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 1,
    "NumberUnselectedRows": 0,
    "LoadBytes": 40888898,
    "LoadTimeMs": 2144,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 325,
    "WriteDataTimeMs": 1933,
    "CommitAndPublishTimeMs": 106,
    "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}
  • Status 字段状态为 Success 即表示导入成功。
  • 其他字段的详细介绍,请参阅 Stream Load 命令文档。

导入建议
Stream Load 只能导入本地文件。
建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交。

Java 代码示例
这里通过一个简单的 JAVA 示例来执行 Stream Load:

package demo.doris;

import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

/*
这是一个 Doris Stream Load 示例,需要依赖
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.13</version>
</dependency>
 */
public class DorisStreamLoader {
    //可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
    private final static String HOST = "your_host";
    private final static int PORT = 8040;
    private final static String DATABASE = "db1";   // 要导入的数据库
    private final static String TABLE = "tbl1";     // 要导入的表
    private final static String USER = "root";      // Doris 用户名
    private final static String PASSWD = "";        // Doris 密码
    private final static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径

    private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
            HOST, PORT, DATABASE, TABLE);

    private final static HttpClientBuilder httpClientBuilder = HttpClients
            .custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() {
                @Override
                protected boolean isRedirectable(String method) {
                    // 如果连接目标是 FE,则需要处理 307 redirect。
                    return true;
                }
            });

    public void load(File file) throws Exception {
        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));

            // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
            put.setHeader("label","label1");
            put.setHeader("column_separator",",");

            // 设置导入文件。
            // 这里也可以使用 StringEntity 来传输任意数据。
            FileEntity entity = new FileEntity(file);
            put.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }

                final int statusCode = response.getStatusLine().getStatusCode();
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
                }

                System.out.println("Get load result: " + loadResult);
            }
        }
    }

    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }

    public static void main(String[] args) throws Exception{
        DorisStreamLoader loader = new DorisStreamLoader();
        File file = new File(LOAD_FILE_NAME);
        loader.load(file);
    }
}

注意:这里 http client 的版本要是4.5.13

<dependency>
   <groupId>org.apache.httpcomponents</groupId>
   <artifactId>httpclient</artifactId>
   <version>4.5.13</version>
</dependency>

三、本地导入的MySQL Load模式

该语句兼容MySQL标准的LOAD DATA语法,方便用户导入本地数据,并降低学习成本。

MySQL Load 同步执行导入并返回导入结果。用户可直接通过SQL返回信息判断本次导入是否成功。

MySQL Load 主要适用于导入客户端本地文件,或通过程序导入数据流中的数据。

3.1 基本原理

MySQL Load和Stream Load功能相似, 都是导入本地文件到Doris集群中, 因此MySQL Load实现复用了StreamLoad的基础导入能力:

  • FE接收到客户端执行的MySQL Load请求, 完成SQL解析工作

  • FE将Load请求拆解,并封装为StreamLoad的请求.

  • FE选择一个BE节点发送StreamLoad请求

  • 发送请求的同时, FE会异步且流式的从MySQL客户端读取本地文件数据, 并实时的发送到StreamLoad的HTTP请求中.

  • MySQL客户端数据传输完毕, FE等待StreamLoad完成, 并展示导入成功或者失败的信息给客户端.

3.2 导入数据

1. 创建一张表

通过 CREATE TABLE 命令在demo创建一张表用于存储待导入的数据

CREATE TABLE IF NOT EXISTS load_local_file_test
(
id INT,
age TINYINT,
name VARCHAR(50)
)
unique key(id)
DISTRIBUTED BY HASH(id) BUCKETS 3;

2. 导入数据 在MySQL客户端下执行以下 SQL 命令导入本地文件:

LOAD DATA
LOCAL
INFILE '/path/to/local/demo.txt'
INTO TABLE demo.load_local_file_test

关于 MySQL Load 命令的更多高级操作,请参阅 MySQL Load 命令文档。

3. 等待导入结果

MySQL Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

Query OK, 1 row affected (0.17 sec)
Records: 1  Deleted: 0  Skipped: 0  Warnings: 0
  • 如果出现上述结果, 则表示导入成功。导入失败, 会抛出错误,并在客户端显示错误原因
  • 其他字段的详细介绍,请参阅 MySQL Load 命令文档。

4. 导入建议

  • MySQL Load 只能导入本地文件(可以是客户端本地或者连接的FE节点本地), 而且支持CSV格式。
  • 建议一个导入请求的数据量控制在 1 - 2 GB 以内。如果有大量本地文件,可以分批并发提交。

四、实时数仓结合机器学习、大模型的发展趋势

智能化实时数据处理与分析

  1. 实时数据处理与机器学习的结合:随着机器学习技术的不断发展,未来的实时数仓将更加注重与机器学习算法的结合。通过引入机器学习模型,实时数仓能够实现对数据的实时分析和预测,为企业提供即时的业务洞察。例如,在零售行业中,通过机器学习模型对实时销售数据进行分析,可以预测未来的销售趋势,从而及时调整库存和营销策略。

  2. 大模型的应用:大模型,尤其是基石模型等先进的人工智能工具,将在实时数仓中发挥重要作用。这些模型具有强大的数据处理和分析能力,能够处理海量的实时数据,并从中提取有价值的信息。通过与实时数仓的结合,大模型可以实现更高效的数据处理和更精准的预测分析。

增强数据质量和数据治理

  1. 数据清洗与验证:机器学习算法可以用于实时数据的清洗和验证,确保进入实时数仓的数据质量。通过训练模型来识别异常数据、纠正错误,并自动进行数据标准化和归一化处理,从而提高数据的准确性和可靠性。

  2. 数据治理的智能化:结合机器学习技术,实时数仓可以实现更智能的数据治理。例如,利用算法对数据进行分类、标签化和归档,以便更好地组织和管理数据。同时,通过监控数据的使用情况和访问权限,确保数据的安全性和合规性。

优化性能和降低成本

  1. 性能优化:机器学习技术可以用于优化实时数仓的性能。通过训练模型来预测数据访问模式和查询需求,从而自动调整数据存储和索引策略,提高数据查询和处理的速度。此外,还可以利用机器学习技术对硬件资源进行智能调度和优化,以提高实时数仓的整体性能。

  2. 降低成本:通过机器学习技术的引入,实时数仓可以更加高效地利用存储和计算资源,从而降低运营成本。例如,利用预测模型来优化数据存储策略,减少不必要的数据冗余和存储开销。同时,通过智能调度算法来合理分配计算资源,提高资源利用率并降低能耗。

综上所述,实时数仓与机器学习、大模型的结合将推动数据处理和分析的智能化、高效化和安全化发展。这将为企业提供更准确、更及时的业务洞察和决策支持,助力企业在快速变化的市场环境中保持竞争优势。



听说...关注下面公众号的人都变牛了,纯技术,纯干货 !

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/570513.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Java探索之旅】解密构造方法 对象初始化的关键一步

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; Java编程秘籍 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一、对象的构造及初始化1.1 构造方法1.2 构造方法的特性1.3 默认初始化1.4 就地初始化…

新手可以能做视频号小店,其实,视频号远没你想象中难!

大家好&#xff0c;我是电商花花。 最近注意到一个又一个新手小白提供视频号小店成功逆袭&#xff0c;实现了自己的创业梦想。 最近电商行业在飞速发展&#xff0c;越来越多的人开始关注视频号小店这个新兴的市场和平台。 有的新手拼命的往里扎&#xff0c;但是不少新手商家…

数据库之数据库恢复技术思维导图+大纲笔记

大纲笔记&#xff1a; 事务的基本概念 事务 定义 用户定义的一个数据库操作系列&#xff0c;这些操作要么全做&#xff0c;要么全不做&#xff0c;是一个不可分割的基本单位 语句 BEGIN TRANSACTION 开始 COMMIT 提交&#xff0c;提交事务的所有操作 ROLLBACK 回滚&#xff0c…

UE5 GAS开发P35,36,37,38,39 将药水修改为AbilitySystem效果

这几节课都是将药水修改成更方便使用的AbilitySystem效果的Actor,分别为增加血量,增加蓝量,暂时获得最大生命值上限 AuraEffectActor.h // Fill out your copyright notice in the Description page of Project Settings. #pragma once #include "CoreMinimal.h" #…

浅析Java的字符串的底层和相关知识(恳请大佬指正)

本期经验和建议的总结&#xff1a; 在拼接字符串的时候&#xff0c;如果大量拼接时建议使用StringBuilder&#xff0c;在转为字符串。 1&#xff1a;Java的号比较的原理&#xff1a; 在Java中&#xff0c;号在对基本数据类型进行比较时&#xff0c;比较的时具体的数值大小例…

【网络安全】跨站脚本攻击(XSS)

专栏文章索引&#xff1a;网络安全 有问题可私聊&#xff1a;QQ&#xff1a;3375119339 目录 一、XSS简介 二、XSS漏洞危害 三、XSS漏洞类型 1.反射型XSS 2.存储型XSS 3.DOM型XSS 四、XSS漏洞防御 一、XSS简介 XSS&#xff08;Cross-Site Scripting&#xff09; XSS 被…

【认真白嫖】注册免费域名

一、eu.org官网 https://nic.eu.org/&#xff0c;始于1996年&#xff0c;对个人和组织是免费注册&#xff0c;页面还真有96年的风格&#xff0c;点进去注册就行。 二、注册 使用随机生成一个虚拟英国或者美国地址的网站&#xff0c;会提高通过的概率。 https://www.haoweic…

朴素贝叶斯算法分类

def loadDataSet():postingList[[my, dog, has, flea, problems, help, please], #切分的词条[maybe, not, take, him, to, dog, park, stupid],[my, dalmation, is, so, cute, I, love, him],[stop, posting, stupid, worthless, garbage],[mr, licks, ate, my, steak, …

微信小程序开发六(自定义组件)

自定义组件的创建&#xff1a; 如何创建&#xff1a; 右键选择新建component 创建完成之后需要打开app.json&#xff0c;这是全局使用这个组件&#xff0c;想要单独的页面使用&#xff0c;就在当前页面的json文件中定义 "usingComponents": {"my-zj": &quo…

为什么光电测径仪质量更稳定可靠?

光电测径仪与激光扫描式测径仪都是目前常用的外径自动化测量设备&#xff0c;他们能实现的功能相同&#xff0c;但为什么说光电测径仪更稳定可靠&#xff0c;下面一起来看一下。 光电测径仪测量原理 测头部件是测径仪的核心部件&#xff0c;它的作用是将被测物在CCD芯片上清晰…

【Git教程】(十七)发行版交付 — 概述及使用要求,执行过程及其实现,替代解决方案 ~

Git教程 发行版交付 1️⃣ 概述2️⃣ 使用要求3️⃣ 执行过程及其实现3.1 预备阶段&#xff1a;创建 stable 分支3.2 预备并创建发行版3.3 创建补丁 4️⃣ 替代解决方案 对于每个项目或产品来说&#xff0c;发布版本的创建都需要一定的时间&#xff0c;其具体过程因各公司或组…

HarmonyOS开发案例:【闹钟】

介绍 使用后台代理提醒&#xff0c;实现一个简易闹钟。要求完成以下功能&#xff1a; 展示指针表盘或数字时间。添加、修改和删除闹钟。展示闹钟列表&#xff0c;并可打开和关闭单个闹钟。闹钟到设定的时间后弹出提醒。将闹钟的定时数据保存到轻量级数据库。 相关概念 [Canva…

翻译《The Old New Thing》 - Why are HANDLE return values so inconsistent?

Why are HANDLE return values so inconsistent? - The Old New Thing (microsoft.com)https://devblogs.microsoft.com/oldnewthing/20040302-00/?p40443 Raymond Chen 2004年01月27日 简介 在处理 Windows 编程中的句柄时&#xff0c;开发者需要面对的一个挑战是不同函数可…

时间步长问题。tensorflow训练lstm时序模型,输出层实际输出维度和期待维度不一致

设置输出维度为1. Dense(1) 但结果跑出来的输出维度每次都是三维的。 模型设置&#xff1a; 输入x维度&#xff08;2250&#xff0c;48&#xff0c;2&#xff09; 输入y 维度&#xff08;2250&#xff0c;&#xff09; 和 &#xff08;2250&#xff0c;1&#xff09; 但模型预测…

盲人咖啡厅导航:科技之光点亮独立生活新里程

在这个繁华的世界中&#xff0c;咖啡厅不仅是人们社交聚会、休闲阅读的场所&#xff0c;更是无数人心灵栖息的一方天地。然而&#xff0c;对于视障群体而言&#xff0c;独自前往这样的公共场所往往面临重重挑战。幸运的是&#xff0c;一款名为蝙蝠避障专为盲人设计的辅助应用&a…

Day 5 广告管理

Day 5 广告管理 这里会总结构建项目过程中遇到的问题,主要流程&#xff0c;以及一些个人思考&#xff01;&#xff01; 学习方法&#xff1a; 1 github源码 文档 官网 2 内容复现 &#xff0c;实际操作 项目源码同步更新到github 欢迎大家star~ 后期会更新并上传前端项目 创建…

光速记单词-brother开头的单词

1. 思维导图 1.1 brother 1.2 mom 1.3 dad 1.4 man 2. 视频链接

13. Spring AOP(一)思想及使用

1. 什么是Spring AOP AOP的全称是Aspect Oriented Programming&#xff0c;也就是面向切面编程&#xff0c;是一种思想。它是针对OOP(面向对象编程)的一种补充&#xff0c;是对某一类事情的集中处理。比如一个博客网站的登陆验证功能&#xff0c;在用户进行新增、编辑、删除博…

js手写call、bind、apply

目录 call与applyapply bind call和apply和bind有两种实现方式&#xff0c;第一种是隐式绑定&#xff0c;第二种是通过new 无论是通过隐式绑定实现还是通过new实现&#xff0c;核心都是针对this的绑定规则 具体关于this的绑定规则可以看我这一篇博客 this绑定规则 call与apply…

【热议】硕士和读博士洗碗区别的两大理论

::: block-1 “时问桫椤”是一个致力于为本科生到研究生教育阶段提供帮助的不太正式的公众号。我们旨在在大家感到困惑、痛苦或面临困难时伸出援手。通过总结广大研究生的经验&#xff0c;帮助大家尽早适应研究生生活&#xff0c;尽快了解科研的本质。祝一切顺利&#xff01;—…