物流实时数仓:数仓搭建(ODS)

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建


文章目录

  • 系列文章目录
  • 前言
  • 一、IDEA环境准备
    • 1.pom.xml
    • 2.目录创建
  • 二、代码编写
    • 1.log4j.properties
    • 2.CreateEnvUtil.java
    • 3.KafkaUtil.java
    • 4.OdsApp.java
  • 三、代码测试
  • 总结


前言

现在我们开始进行数仓的搭建,我们用Kafka来代替数仓的ods层。
基本流程为使用Flink从MySQL读取数据然后写入Kafka中


一、IDEA环境准备

1.pom.xml

写入项目需要的配置

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <flink.version>1.17.0</flink.version>
        <hadoop.version>3.2.3</hadoop.version>
        <flink-cdc.version>2.3.0</flink-cdc.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-reload4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

基本上项目需要的所有jar包都有了,不够以后在加。

2.目录创建

在这里插入图片描述按照以上目录结构进行目录创建

二、代码编写

1.log4j.properties

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

2.CreateEnvUtil.java

这个文件中有两个方法
创建初始化Flink的env
Flink连接mysql的MySqlSource

package com.atguigu.tms.realtime.utils;


import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverterConfig;

import java.util.HashMap;

public class CreateEnvUtil {
    public static StreamExecutionEnvironment getStreamEnv(String[] args) {
        // 1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.检查点相关设置
        // 2.1 开启检查点
        env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置检查点的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(120000L);
        // 2.3 设置job取消之后 检查点是否保留
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 2.4 设置两个检查点之间的最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);
        // 2.5 设置重启策略
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));
        // 2.6 设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/tms/ck");
        // 2.7 设置操作hdfs用户
        // 获取命令行参数
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hdfsUserName = parameterTool.get("hadoop-user-name", "atguigu");
        System.setProperty("HADOOP_USER_NAME", hdfsUserName);
        return env;
        
    }

    public static MySqlSource<String> getMysqlSource(String option, String serverId, String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String mysqlHostname = parameterTool.get("hadoop-user-name", "hadoop102");
        int mysqlPort = Integer.parseInt(parameterTool.get("mysql-port", "3306"));
        String mysqlUsername = parameterTool.get("mysql-username", "root");
        String mysqlPasswd = parameterTool.get("mysql-passwd", "000000");
        option = parameterTool.get("start-up-option", option);
        serverId = parameterTool.get("server-id", serverId);

        // 创建配置信息 Map 集合,将 Decimal 数据类型的解析格式配置 k-v 置于其中
        HashMap config = new HashMap<>();
        config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
        // 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema,该 Schema 将用于 MysqlSource 的初始化
        JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema =
                new JsonDebeziumDeserializationSchema(false, config);

        MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
                .hostname(mysqlHostname)
                .port(mysqlPort)
                .username(mysqlUsername)
                .password(mysqlPasswd)
                .deserializer(jsonDebeziumDeserializationSchema);
        switch (option) {
            // 读取实时数据
            case "dwd":
                String[] dwdTables = new String[]{
                        "tms.order_info",
                        "tms.order_cargo",
                        "tms.transport_task",
                        "tms.order_org_bound"};
                return builder
                        .databaseList("tms")
                        .tableList(dwdTables)
                        .startupOptions(StartupOptions.latest())
                        .serverId(serverId)
                        .build();

            // 读取维度数据
            case "realtime_dim":
                String[] realtimeDimTables = new String[]{
                        "tms.user_info",
                        "tms.user_address",
                        "tms.base_complex",
                        "tms.base_dic",
                        "tms.base_region_info",
                        "tms.base_organ",
                        "tms.express_courier",
                        "tms.express_courier_complex",
                        "tms.employee_info",
                        "tms.line_base_shift",
                        "tms.line_base_info",
                        "tms.truck_driver",
                        "tms.truck_info",
                        "tms.truck_model",
                        "tms.truck_team"};
                return builder
                        .databaseList("tms")
                        .tableList(realtimeDimTables)
                        .startupOptions(StartupOptions.initial())
                        .serverId(serverId)
                        .build();


        }

        Log.error("不支持操作类型");
        return null;

    }
}

3.KafkaUtil.java

该文件中有一个方法,创建Flink连接Kafka需要的Sink

package com.atguigu.tms.realtime.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.kafka.clients.producer.ProducerConfig;

public class KafkaUtil {
    private static final String KAFKA_SERVER = "hadoop102:9092,hadoop103:9092,hadoop104:9092";

    public static KafkaSink<String> getKafkaSink(String topic, String transIdPrefix, String[] args) {
        // 将命令行参数对象封装为 ParameterTool 类对象
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        // 提取命令行传入的 key 为 topic 的配置信息,并将默认值指定为方法参数 topic
        // 当命令行没有指定 topic 时,会采用默认值
        topic = parameterTool.get("topic", topic);
        // 如果命令行没有指定主题名称且默认值为 null 则抛出异常
        if (topic == null) {
            throw new IllegalArgumentException("主题名不可为空:命令行传参为空且没有默认值!");
        }

        // 获取命令行传入的 key 为 bootstrap-servers 的配置信息,并指定默认值
        String bootstrapServers = parameterTool.get("bootstrap-severs", KAFKA_SERVER);
        // 获取命令行传入的 key 为 transaction-timeout 的配置信息,并指定默认值
        String transactionTimeout = parameterTool.get("transaction-timeout", 15 * 60 * 1000 + "");


        return KafkaSink.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .setTransactionalIdPrefix(transIdPrefix)
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout)
                .build();

    }

    public static KafkaSink<String> getKafkaSink(String topic, String[] args) {
        return getKafkaSink(topic, topic + "_trans", args);

    }
}

4.OdsApp.java

Ods层的app创建,负责读取和写入数据

package com.atguigu.tms.realtime.app.ods;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class OdsApp {
    public static void main(String[] args) throws Exception {
        // 1.获取流处理环境并指定检查点
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);


        // 2 使用FlinkCDC从MySQL中读取数据-事实数据
        String dwdOption = "dwd";
        String dwdServerId = "6030";
        String dwdsourceName = "ods_app_dwd_source";

        mysqlToKafka(dwdOption, dwdServerId, dwdsourceName, env, args);

        // 3 使用FlinkCDC从MySQL中读取数据-维度数据
        String realtimeDimOption = "realtime_dim";
        String realtimeDimServerId = "6040";
        String realtimeDimsourceName = "ods_app_realtimeDim_source";

        mysqlToKafka(realtimeDimOption, realtimeDimServerId, realtimeDimsourceName, env, args);

        env.execute();


    }

    public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {

        MySqlSource<String> MySqlSource = CreateEnvUtil.getMysqlSource(option, serverId, args);

        SingleOutputStreamOperator<String> dwdStrDS = env.fromSource(MySqlSource, WatermarkStrategy.noWatermarks(), sourceName)
                .setParallelism(1)
                .uid(option + sourceName);


        // 3 简单ETL
        SingleOutputStreamOperator<String> processDS = dwdStrDS.process(
                new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) {
                        try {
                            JSONObject jsonObj = JSONObject.parseObject(jsonStr);
                            if (jsonObj.getJSONObject("after") != null && !"d".equals(jsonObj.getString("op"))) {
//                                System.out.println(jsonObj);
                                Long tsMs = jsonObj.getLong("ts_ms");
                                jsonObj.put("ts", tsMs);
                                jsonObj.remove("ts_ms");
                                String jsonString = jsonObj.toJSONString();
                                out.collect(jsonString);
                            }

                        } catch (Exception e) {
                            Log.error("从Flink-CDC得到的数据不是一个标准的json格式",e);
                        }
                    }
                }
        ).setParallelism(1);
        // 4 按照主键进行分组,避免出现乱序
        KeyedStream<String, String> keyedDS = processDS.keyBy((KeySelector<String, String>) jsonStr -> {
            JSONObject jsonObj = JSON.parseObject(jsonStr);
            return jsonObj.getJSONObject("after").getString("id");
        });

        //将数据写入Kafka

        keyedDS.sinkTo(KafkaUtil.getKafkaSink("tms_ods", sourceName + "_transPre", args))
                .uid(option + "_ods_app_sink");
    }
}

三、代码测试

在虚拟机启动我们需要的组件,目前需要hadoop、zk、kafka和MySQL。
在这里插入图片描述
先开一个消费者进行消费。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_ods

然后运行OdsApp.java
他会先读取维度数据,因为维度数据需要全量更新之前的数据。
在这里插入图片描述
当他消费结束后,我们运行jar包,获取事实数据。

java -jar tms-mock-2023-01-06.jar 

如果能消费到新数据,代表通道没问题,ODS层创建完成。

在这里插入图片描述


总结

至此ODS搭建完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/186510.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Centos7 离线安装 CDH7.1.7

1. 安装CDH的准备工作&#xff08;所有节点都要执行&#xff09; 1.1 准备环境 角色 IP k8s-master 192.168.181.129 k8s-node1 192.168.181.130 k8s-node2 192.168.181.131 1.2 安装JDK # https://www.oracle.com/java/technologies/downloads/#java11 wget rpm -ivh…

C#,数值计算——插值和外推,Powvargram的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { /// <summary> /// Functor for variogram v(r)ar^b, /// where b is specified, a is fitted from the data. /// </summary> public class Powvargram { private do…

技术分享 | 在 IDE 插件开发中接入 JCEF 框架

项目背景 当前的开发环境存在多种不同语言的 IDE&#xff0c;如 JetBrains 全家桶、Eclipse、Android Studio 和 VS Code 等等。由于每个 IDE 各有其特定的语言和平台要求&#xff0c;因此开发 IDE 插件时&#xff0c;需要投入大量资源才能尽可能覆盖大部分工具。同时&#xf…

【精选】框架初探篇之——MyBatis的CRUD及配置文件

MyBatis增删改查 MyBatis新增 新增用户 持久层接口添加方法 void add(User user);映射文件添加标签 <insert id"add" parameterType"com.mybatis.pojo.User">insert into user(username,sex,address) values(# {username},# {sex},# {address}) <…

SOLIDWORKS 2024新功能之CAM篇

SOLIDWORKS 2024 新功能 CAM篇目录概述 • 附加探测周期参数 • 反转切割的固定循环螺纹加工 • 包含装配体的零件的正确进给/速度数据 • Heidenhain 探测类型 • 2.5 轴特征向导中岛屿的终止条件 • 链接轮廓铣削操作的切入引导和切出引导参数 • 螺纹铣削操作的最小孔…

从0开始学习JavaScript--深入了解JavaScript框架

JavaScript框架在现代Web开发中扮演着关键角色&#xff0c;为开发者提供了丰富的工具和抽象层&#xff0c;使得构建复杂的、高性能的Web应用变得更加容易。本文将深入探讨JavaScript框架的核心概念、常见框架的特点以及它们在实际应用中的使用。 JavaScript框架的作用 JavaSc…

NX二次开发UF_CSYS_edit_matrix_of_object 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CSYS_edit_matrix_of_object Defined in: uf_csys.h int UF_CSYS_edit_matrix_of_object(tag_t object_id, tag_t matrix_id ) overview 概述 Updates the specified coordinat…

Matplotlib自定义坐标刻度_Python数据分析与可视化

自定义坐标刻度 主次要刻度隐藏刻度与标签花哨的刻度格式格式生成器与定位器 虽然matplotlib默认的坐标轴定位器与格式生成器可以满足大部分需求&#xff0c;但是并非对每一幅图都合适。 主次要刻度 学习前最好有对matplotlib图形的对象层级较为了解&#xff0c;例如查看前面…

leetcode刷题详解三

2. 两数相加 思路&#xff1a;直接加&#xff0c;注意进位条件不要用if&#xff0c;核心代码在于sum l1->val l2->val carry; ListNode* addTwoNumbers(ListNode* l1, ListNode* l2) {ListNode* dummy new ListNode();ListNode* dummy_head dummy;int carry 0;int …

Ubuntu 22.04.3编译AOSP13刷机

文章目录 设备信息下载AOSP并切换分支获取设备驱动编译系统编译遇到的问题Cannot allocate memoryUbuntu设置USB调试刷机参考链接 设备信息 手机&#xff1a;Pixel 4XL 下载AOSP并切换分支 在清华大学开源软件镜像站下载初始化包aosp-latest.tar。 解压缩&#xff0c;切换到…

【汉诺塔 —— (经典分治递归)】

汉诺塔 —— &#xff08;经典分治递归&#xff09; 一.汉诺塔介绍二.分治算法解决汉诺塔问题三.汉诺塔问题的代码实现四.主函数测试展示 一.汉诺塔介绍 汉诺塔问题源自印度一个古老的传说&#xff0c;印度教的“创造之神”梵天创造世界时做了 3 根金刚石柱&#xff0c;其中的一…

机器视觉尺寸测量仪 助力打造智能工厂!

摘要&#xff1a;机器视觉系统基本的特点就是提高生产的灵活性和自动化程度&#xff0c;目前机器视觉技术在蓬勃发展中&#xff0c;机器视觉尺寸测量仪是基于机器视觉原理制造而成的在线几何尺寸精密仪器。本文系统介绍一下该类测量设备。 机器视觉是什么&#xff1f; 简单来讲…

从0开始学习JavaScript--JavaScript数据类型与数据结构

JavaScript作为一门动态、弱类型的脚本语言&#xff0c;拥有丰富的数据类型和数据结构&#xff0c;这些构建了语言的基础&#xff0c;为开发者提供了灵活性和表达力。本文将深入探讨JavaScript中的各种数据类型&#xff0c;包括基本数据类型和复杂数据类型&#xff0c;并介绍常…

2023.11.24制作一个常用的登录注册模板(包含密码验证、输出格式验证、验证码等功能)

2023.11.24制作一个常用的登录注册模板&#xff08;包含密码验证、输出格式验证、验证码等功能&#xff09; 1. 简介2. 功能3. 页面效果3.1 登录页面3.2 忘记密码页3.3 注册页面 1. 简介 比较喜欢简洁风&#xff0c;只是用bootstrap进行简单装饰 制作一个模板&#xff0c;日常…

Leetcode---372周赛

题目列表 2937. 使三个字符串相等 2938. 区分黑球与白球 2939. 最大异或乘积 2940. 找到 Alice 和 Bob 可以相遇的建筑 一、使三个字符串相等 这题把题目意思读懂&#xff0c;正常模拟就行&#xff0c;简单来说就是看三个字符串的最长公共前缀有多长&#xff0c; 代码如下…

学习Pandas 二(Pandas缺失值处理、数据离散化、合并、交叉表与透视表、分组与聚合)

文章目录 六、高级处理-缺失值处理6.1 检查是否有缺失值6.2 缺失值处理6.3 不是缺失值NaN&#xff0c;有默认标记的 七、高级处理-数据离散化7.1 什么是数据的离散化7.2 为什么要离散化7.3 如何实现数据的离散化 八、高级处理-合并8.1 pc.concat实现合并&#xff0c;按方向进行…

x-www-form-urlencoded的含义解释,getReader()和getParameter()的区别

1、x-www-form-urlencoded x-www-form-urlencoded是一种编码格式&#xff0c;它是一种常见的编码方式&#xff0c;用于在HTTP请求中 传输表单数据 。在这种编码方式下&#xff0c;表单数据被编码为URL格式&#xff0c;然后作为请求体&#xff08;payload&#xff09;发送。 需要…

前端大厂(腾讯、字节跳动、阿里......)校招面试真题解析,让你面试轻松无压力!

前言 校招很重要&#xff0c;应届生的身份很珍贵&#xff01;在校招的时候与我们竞争的大部分都是没有工作经验的学生&#xff0c;而且校招企业对学生的包容度高&#xff0c;一般对企业来说&#xff0c;社招更看重实际工作经验&#xff0c;而校招更愿意“培养人”&#xff0c;校…

FindMy技术用于旅行箱

旅行箱&#xff0c;那是出门在外的我们不可或缺的伙伴。无论是商务出差&#xff0c;还是短途旅行&#xff0c;亦或是长途度假&#xff0c;旅行箱都以其便捷的方式&#xff0c;陪伴着我们的整个行程。 然而&#xff0c;在旅途中&#xff0c;丢失旅行箱是一件非常棘手的问题&…

【Web】PhpBypassTrick相关例题wp

目录 ①[NSSCTF 2022 Spring Recruit]babyphp ②[鹤城杯 2021]Middle magic ③[WUSTCTF 2020]朴实无华 ④[SWPUCTF 2022 新生赛]funny_php 明天中期考&#xff0c;先整理些小知识点冷静一下 ①[NSSCTF 2022 Spring Recruit]babyphp payload: a[]1&b1[]1&b2[]2&…
最新文章