尚硅谷大数据项目《在线教育之实时数仓》笔记008

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第10章 数仓开发之DWS层

P066

P067

P068

P069

P070

P071

P072

P073

P074

P075

P076

P077

P078

P079

P080

P081

P082


第10章 数仓开发之DWS层

P066

第10章 数仓开发之DWS层

设计要点:
(1)DWS层的设计参考指标体系。
(2)DWS层表名的命名规范为dws_数据域_统计粒度_业务过程_统计周期(window)。
注:window 表示窗口对应的时间范围。
10.1 流量域来源关键词粒度页面浏览各窗口汇总表
10.1.1 主要任务

    从 Kafka 页面浏览明细主题读取数据,过滤搜索行为,使用自定义 UDTF(一进多出)函数对搜索内容分词。统计各窗口各关键词出现频次,写入 ClickHouse。
10.1.2 思路分析

尚硅谷大数据项目之在线教育数仓\尚硅谷大数据项目之在线教育数仓-3实时\资料\13.总线矩阵及指标体系

在线教育实时指标体系.xlsx

P067

DwsTrafficSourceKeywordPageViewWindow

//TODO 1 创建环境设置状态后端
//TODO 2 自定义拆词函数
//TODO 3 读取kafka中的page_log数据
//TODO 4 过滤数据得到搜索的关键字
//TODO 5 使用自定义函数对关键字拆词
//TODO 6 分组开窗合并计算
//TODO 7 转换为流
//TODO 8 写出到clickHouse中
//TODO 9 运行任务

package com.atguigu.edu.realtime.util;

import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;

/**
 * @author yhm
 * @create 2023-04-25 16:05
 */
public class KeyWordUtil {
    public static ArrayList<String> analyze(String text) {
        StringReader reader = new StringReader(text);
        IKSegmenter ikSegmenter = new IKSegmenter(reader, true);
        ArrayList<String> strings = new ArrayList<>();
        try {
            Lexeme lexeme = null;
            while ((lexeme = ikSegmenter.next()) != null) {
                String keyWord = lexeme.getLexemeText();
                strings.add(keyWord);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return strings;
    }

    public static void main(String[] args) {
        String s = "Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待";
        ArrayList<String> strings = analyze(s);
        System.out.println(strings);
    }
}

P068

User-defined Functions | Apache Flink

DwsTrafficSourceKeywordPageViewWindow

//TODO 1 创建环境设置状态后端
//TODO 2 自定义拆词函数
//TODO 3 读取kafka中的page_log数据
//TODO 4 过滤数据得到搜索的关键字
//TODO 5 使用自定义函数对关键字拆词

//TODO 6 分组开窗合并计算
//TODO 7 转换为流
//TODO 8 写出到clickHouse中
//TODO 9 运行任务

P069

package com.atguigu.edu.realtime.app.dws;

import com.atguigu.edu.realtime.app.func.KeyWordUDTF;
import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.common.EduConstant;
import com.atguigu.edu.realtime.util.ClickHouseUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @author yhm
 * @create 2023-04-25 16:01
 */
public class DwsTrafficSourceKeywordPageViewWindow {
    public static void main(String[] args) throws Exception {
        //TODO 1 创建环境设置状态后端
        StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //TODO 2 自定义拆词函数
        tableEnv.createTemporarySystemFunction("ik_analyze", new KeyWordUDTF());

        //TODO 3 读取kafka中的page_log数据
        String topicName = "dwd_traffic_page_log";
        String groupId = "dws_traffic_source_keyword_page_view_window";
        tableEnv.executeSql("create table page_log(\n" +
                "    common map<String,String>,\n" +
                "    page map<String,String>,\n" +
                "    ts bigint, \n" +
                "    row_time as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')), \n" +
                "    WATERMARK FOR row_time AS row_time - INTERVAL '3' SECOND" +
                ")" + KafkaUtil.getKafkaDDL(topicName, groupId));

        //TODO 4 过滤数据得到搜索的关键字
        //① page 字段下 item 字段不为 null;
        //② page 字段下 last_page_id 为 search;
        //③ page 字段下 item_type 为 keyword。
        Table searchTable = tableEnv.sqlQuery("select \n" +
                "    page['item'] full_word,\n" +
                "    row_time\n" +
                "from page_log\n" +
                "where page['item'] is not null \n" +
                "and page['item_type'] ='keyword'\n" +
                // "and page['last_page_id'] = 'search'" +
                "");
        tableEnv.createTemporaryView("search_table", searchTable);

        //TODO 5 使用自定义函数对关键字拆词
        Table splitTable = tableEnv.sqlQuery("select \n" +
                "    keyword,\n" +
                "    row_time\n" +
                "from search_table ,\n" +
                "lateral table (ik_analyze(full_word)) as t(keyword)");
        tableEnv.createTemporaryView("split_table", splitTable);
        tableEnv.executeSql("select * from split_table").print();

        //TODO 6 分组开窗合并计算

        //TODO 7 转换为流

        //TODO 8 写出到clickHouse中

        //TODO 9 运行任务
    }
}

P070

Window Aggregation | Apache Flink

P071

10.1.4 ClickHouse 建表语句

drop table if exists dws_traffic_source_keyword_page_view_window;

create table if not exists dws_traffic_source_keyword_page_view_window

(

    stt           DateTime,

    edt           DateTime,

    source        String,

    keyword       String,

    keyword_count UInt64,

    ts            UInt64

) engine = ReplacingMergeTree(ts)

      partition by toYYYYMMDD(stt)

      order by (stt, edt, source, keyword);

package com.atguigu.edu.realtime.util;

import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.bean.TransientSink;
import com.atguigu.edu.realtime.common.EduConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @author yhm
 * @create 2023-04-25 18:23
 */
public class ClickHouseUtil {
    // 设计泛型 通过传入的数据类型自动补充sql 写出到clickhouse
    public static <T> SinkFunction<T> getJdbcSink(String sql) {
        return JdbcSink.<T>sink(sql, new JdbcStatementBuilder<T>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, T obj) throws SQLException {
                        // T是泛型,明文是不知道什么类型的,需要使用反射获取
                        Field[] declaredFields = obj.getClass().getDeclaredFields();
                        int skip = 0;
                        for (int i = 0; i < declaredFields.length; i++) {
                            Field field = declaredFields[i];
                            field.setAccessible(true);

                            // 获取属性的注解
                            TransientSink annotation = field.getAnnotation(TransientSink.class);
                            if (annotation != null) {
                                skip++;
                                continue;
                            }

                            // 使用类模板的属性名 get对象 获取值
                            try {
                                Object o = field.get(obj);
                                preparedStatement.setObject(i + 1 - skip, o);
                            } catch (IllegalAccessException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }, JdbcExecutionOptions.builder()
                        .withBatchIntervalMs(5000L)
                        .withBatchSize(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(EduConfig.CLICKHOUSE_URL)
                        .withDriverName(EduConfig.CLICKHOUSE_DRIVER)
                        .build());
    }
}

[atguigu@node001 ~]$ sudo systemctl start clickhouse-server
[atguigu@node001 ~]$ clickhouse-client -m

node001 :) SHOW DATABASES;

node001 :) CREATE DATABASE edu_realtime;
node001 :) SHOW DATABASES;

node001 :) USE edu_realtime;
node001 :) SHOW TABLES FROM edu_realtime;

node001 :) SELECT * FROM dws_traffic_source_keyword_page_view_window;

[atguigu@node001 ~]$ sudo systemctl start clickhouse-server
[atguigu@node001 ~]$ clickhouse-client -m
ClickHouse client version 20.4.5.36 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 20.4.5 revision 54434.

node001 :) CREATE DATABASE edu_realtime;

CREATE DATABASE edu_realtime

Ok.

0 rows in set. Elapsed: 0.044 sec. 

node001 :) SHOW DATABASES;

SHOW DATABASES

┌─name───────────────────────────┐
│ _temporary_and_external_tables │
│ default                        │
│ edu_realtime                   │
│ system                         │
└────────────────────────────────┘

4 rows in set. Elapsed: 0.031 sec. 

node001 :) SHOW TABLES FROM edu_realtime;

SHOW TABLES FROM edu_realtime

Ok.

0 rows in set. Elapsed: 0.028 sec. 

node001 :) use edu_realtime;

USE edu_realtime

Ok.

0 rows in set. Elapsed: 0.002 sec. 

node001 :) create table if not exists dws_traffic_source_keyword_page_view_window
:-] (
:-]     stt           DateTime,
:-]     edt           DateTime,
:-]     source        String,
:-]     keyword       String,
:-]     keyword_count UInt64,
:-]     ts            UInt64
:-] ) engine = ReplacingMergeTree(ts)
:-]       partition by toYYYYMMDD(stt)
:-]       order by (stt, edt, source, keyword);

CREATE TABLE IF NOT EXISTS dws_traffic_source_keyword_page_view_window
(
    `stt` DateTime, 
    `edt` DateTime, 
    `source` String, 
    `keyword` String, 
    `keyword_count` UInt64, 
    `ts` UInt64
)
ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMMDD(stt)
ORDER BY (stt, edt, source, keyword)

Ok.

0 rows in set. Elapsed: 0.016 sec. 

node001 :) use edu_realtime;
USE edu_realtime

Ok.

0 rows in set. Elapsed: 0.002 sec. 

node001 :) SHOW TABLES FROM edu_realtime;
SHOW TABLES FROM edu_realtime

┌─name────────────────────────────────────────┐
│ dws_traffic_source_keyword_page_view_window │
└─────────────────────────────────────────────┘

1 rows in set. Elapsed: 0.007 sec. 

node001 :) SELECT * FROM dws_traffic_source_keyword_page_view_window;

SELECT *
FROM dws_traffic_source_keyword_page_view_window

┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:58:30 │ 2022-02-21 23:58:40 │ SEARCH │ java    │            19 │ 1699513951000 │
│ 2022-02-21 23:58:30 │ 2022-02-21 23:58:40 │ SEARCH │ 前端    │            27 │ 1699513951000 │
│ 2022-02-21 23:58:50 │ 2022-02-21 23:59:00 │ SEARCH │ 前端    │            14 │ 1699513951000 │
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 大      │            19 │ 1699513951000 │
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 数据库  │             4 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ hadoop  │            19 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ python  │            19 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 前端    │            39 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 数据库  │            19 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 数据    │            19 │ 1699513951000 │
│ 2022-02-21 23:59:20 │ 2022-02-21 23:59:30 │ SEARCH │ java    │            33 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ flink   │            20 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ java    │            20 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ 前端    │            19 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:10 │ 2022-02-22 00:00:20 │ SEARCH │ 多线程  │            20 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:20 │ 2022-02-22 00:00:30 │ SEARCH │ flink   │             4 │ 1699513951000 │
│ 2022-02-22 00:00:20 │ 2022-02-22 00:00:30 │ SEARCH │ java    │            27 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 20:51:40 │ 2022-02-21 20:51:50 │ SEARCH │ 多线程  │             1 │ 1699513903000 │
│ 2022-02-21 20:52:00 │ 2022-02-21 20:52:10 │ SEARCH │ hadoop  │             1 │ 1699449059000 │
│ 2022-02-21 20:53:10 │ 2022-02-21 20:53:20 │ SEARCH │ 多线程  │             1 │ 1699447298000 │
│ 2022-02-21 20:54:20 │ 2022-02-21 20:54:30 │ SEARCH │ 大      │             1 │ 1699447298000 │
│ 2022-02-21 20:54:20 │ 2022-02-21 20:54:30 │ SEARCH │ 数据    │             1 │ 1699447298000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 前端    │             3 │ 1699449067000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 数据库  │             1 │ 1699449067000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:10 │ 2022-02-22 00:00:20 │ SEARCH │ 多线程  │             2 │ 1699449067000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘

1003 rows in set. Elapsed: 0.114 sec. Processed 1.00 thousand rows, 54.17 KB (8.79 thousand rows/s., 474.47 KB/s.) 

node001 :) 

package com.atguigu.edu.realtime.app.dws;

import com.atguigu.edu.realtime.app.func.KeyWordUDTF;
import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.common.EduConstant;
import com.atguigu.edu.realtime.util.ClickHouseUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @author yhm
 * @create 2023-04-25 16:01
 */
public class DwsTrafficSourceKeywordPageViewWindow {
    public static void main(String[] args) throws Exception {
        //TODO 1 创建环境设置状态后端
        StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //TODO 2 自定义拆词函数
        tableEnv.createTemporarySystemFunction("ik_analyze", new KeyWordUDTF());

        //TODO 3 读取kafka中的page_log数据
        String topicName = "dwd_traffic_page_log";
        String groupId = "dws_traffic_source_keyword_page_view_window";
        tableEnv.executeSql("create table page_log(\n" +
                "    common map<String,String>,\n" +
                "    page map<String,String>,\n" +
                "    ts bigint, \n" +
                "    row_time as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')), \n" +
                "    WATERMARK FOR row_time AS row_time - INTERVAL '3' SECOND" +
                ")" + KafkaUtil.getKafkaDDL(topicName, groupId));

        //TODO 4 过滤数据得到搜索的关键字
        //① page 字段下 item 字段不为 null;
        //② page 字段下 last_page_id 为 search;
        //③ page 字段下 item_type 为 keyword。
        Table searchTable = tableEnv.sqlQuery("select \n" +
                "    page['item'] full_word,\n" +
                "    row_time\n" +
                "from page_log\n" +
                "where page['item'] is not null \n" +
                "and page['item_type'] ='keyword'\n" +
                // "and page['last_page_id'] = 'search'" +
                "");
        tableEnv.createTemporaryView("search_table", searchTable);

        //TODO 5 使用自定义函数对关键字拆词
        Table splitTable = tableEnv.sqlQuery("select \n" +
                "    keyword,\n" +
                "    row_time\n" +
                "from search_table ,\n" +
                "lateral table (ik_analyze(full_word)) as t(keyword)");
        tableEnv.createTemporaryView("split_table", splitTable);
        //tableEnv.executeSql("select * from split_table").print();

        //TODO 6 分组开窗合并计算
        Table keywordBeanTable = tableEnv.sqlQuery("select \n" +
                "    date_format(TUMBLE_START(\n" +
                "    row_time, INTERVAL '10' second),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "    date_format(TUMBLE_END(\n" +
                "    row_time, INTERVAL '10' second),'yyyy-MM-dd HH:mm:ss') edt,\n" +
                "\n" + "'" + EduConstant.KEYWORD_SEARCH + "' source," +
                "    0 keywordLength,\n" +
                "    keyword,\n" +
                "    count(*) keyword_count,\n" +
                "    UNIX_TIMESTAMP()*1000 ts\n" +
                "from split_table\n" +
                "group by TUMBLE(row_time, INTERVAL '10' second),keyword");

        //TODO 7 转换为流
        DataStream<KeywordBean> keywordBeanDataStream = tableEnv.toDataStream(keywordBeanTable, KeywordBean.class);
        keywordBeanDataStream.print();

        //TODO 8 写出到clickHouse中
        keywordBeanDataStream.addSink(ClickHouseUtil.<KeywordBean>getJdbcSink("insert into dws_traffic_source_keyword_page_view_window values(?,?,?,?,?,?)"));

        //TODO 9 运行任务
        env.execute();
    }
}

P072

package com.atguigu.edu.realtime.bean;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author yhm
 * @create 2023-04-25 18:37
 */
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TransientSink {
}

P073

10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表
10.2.1 主要任务

DWS 层是为 ADS 层服务的,通过对指标体系的分析,本节汇总表中需要有会话数、页面浏览数、浏览总时长、独立访客数、跳出会话数五个度量字段。我们的任务是统计这五个指标,并将数据写入 ClickHouse 汇总表。

P074

DwsTrafficVcSourceArIsNewPageViewWindow

TODO 1 ~ TODO 6

P075

DwsTrafficVcSourceArIsNewPageViewWindow

TODO 7 ~ TODO 9

P076

PhoenixUtil、public static <T> List<T> queryList(String sql, Class<T> clazz) {}

P077

DimUtil、public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String, String>... columnNamesAn {}

[atguigu@node001 ~]$ start-hbase.sh
[atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/
[atguigu@node001 apache-phoenix-5.0.0-HBase-2.0-bin]$ bin/sqlline.py node001:2181

P078

DwsTrafficVcSourceArIsNewPageViewWindow

TODO 10

P079

10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表

10.2.2 思路分析

4)旁路缓存优化

外部数据源的查询常常是流式计算的性能瓶颈。以本程序为例,每次查询都要连接 Hbase,数据传输需要做序列化、反序列化,还有网络传输,严重影响时效性。可以通过旁路缓存对查询进行优化。

P080

DimUtil、public static JSONObject getDimInfo(String tableName, Tuple2<String, String>... columnNamesAndValues) {}

P081

DimUtil 、public static void deleteCached(String tableName, String id) {}

[atguigu@node001 ~]$ redis-server ./my_redis.conf 
[atguigu@node001 ~]$ redis-cli 
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> 
[atguigu@node001 ~]$ /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin/sqlline.py node001:2181

P082

10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表

10.2.2 思路分析

6)异步 IO

DwsTrafficVcSourceArIsNewPageViewWindow

//TODO 10 维度关联

[atguigu@node001 ~]$ clickhouse-client -m
ClickHouse client version 20.4.5.36 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 20.4.5 revision 54434.

node001 :) show databases;

SHOW DATABASES

┌─name───────────────────────────┐
│ _temporary_and_external_tables │
│ default                        │
│ edu_realtime                   │
│ system                         │
└────────────────────────────────┘

4 rows in set. Elapsed: 0.019 sec. 

node001 :) use edu_realtime;

USE edu_realtime

Ok.

0 rows in set. Elapsed: 0.007 sec. 

node001 :) drop table if exists dws_traffic_vc_source_ar_is_new_page_view_window;

DROP TABLE IF EXISTS dws_traffic_vc_source_ar_is_new_page_view_window

Ok.

0 rows in set. Elapsed: 0.007 sec. 

node001 :) create table dws_traffic_vc_source_ar_is_new_page_view_window(
:-] stt DateTime,
:-] edt DateTime,
:-] version_code String,
:-] source_id String,
:-] source_name String,
:-] ar String,
:-] province_name String,
:-] is_new String,
:-] uv_count UInt64,
:-] total_session_count UInt64,
:-] page_view_count UInt64,
:-] total_during_time UInt64,
:-] jump_session_count UInt64,
:-] ts UInt64
:-] ) engine = ReplacingMergeTree(ts)
:-] partition by toYYYYMMDD(stt)
:-] order by(stt, edt, version_code, source_id, source_name, ar, province_name, is_new);

CREATE TABLE dws_traffic_vc_source_ar_is_new_page_view_window
(
    `stt` DateTime, 
    `edt` DateTime, 
    `version_code` String, 
    `source_id` String, 
    `source_name` String, 
    `ar` String, 
    `province_name` String, 
    `is_new` String, 
    `uv_count` UInt64, 
    `total_session_count` UInt64, 
    `page_view_count` UInt64, 
    `total_during_time` UInt64, 
    `jump_session_count` UInt64, 
    `ts` UInt64
)
ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMMDD(stt)
ORDER BY (stt, edt, version_code, source_id, source_name, ar, province_name, is_new)

Ok.

0 rows in set. Elapsed: 0.043 sec. 

node001 :) select * from edu_realtime.dws_traffic_vc_source_ar_is_new_page_view_window;

SELECT *
FROM edu_realtime.dws_traffic_vc_source_ar_is_new_page_view_window

Ok.

0 rows in set. Elapsed: 0.071 sec. 

node001 :) 
package com.atguigu.edu.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DimAsyncFunction;
import com.atguigu.edu.realtime.bean.DwsTrafficForSourcePvBean;
import com.atguigu.edu.realtime.util.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * @author yhm
 * @create 2023-04-26 16:08
 */
public class DwsTrafficVcSourceArIsNewPageViewWindow {
    public static void main(String[] args) throws Exception {
        //TODO 1 创建环境设置状态后端
        StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);

        //TODO 2 读取pageLog主题数据
        String pageTopic = "dwd_traffic_page_log";
        String groupId = "dws_traffic_vc_source_ar_is_new_page_view_window";
        KafkaSource<String> pageSource = KafkaUtil.getKafkaConsumer(pageTopic, groupId);
        DataStreamSource<String> pageStream = env.fromSource(pageSource, WatermarkStrategy.noWatermarks(), "page_log");

        //TODO 3 读取独立访客数据
        String uvTopic = "dwd_traffic_unique_visitor_detail";
        KafkaSource<String> uvSource = KafkaUtil.getKafkaConsumer(uvTopic, groupId);
        DataStreamSource<String> uvStream = env.fromSource(uvSource, WatermarkStrategy.noWatermarks(), "uv_detail");

        //TODO 4 读取跳出用户数据
        String jumpTopic = "dwd_traffic_user_jump_detail";
        KafkaSource<String> jumpSource = KafkaUtil.getKafkaConsumer(jumpTopic, groupId);
        DataStreamSource<String> jumpStream = env.fromSource(jumpSource, WatermarkStrategy.noWatermarks(), "jump_detail");

        //TODO 5 转换数据结构
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> pageBeanStream = pageStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean map(String value) throws Exception {
                // 将page_log的一条日志转换为一个对应的javaBean
                JSONObject jsonObject = JSON.parseObject(value);
                JSONObject common = jsonObject.getJSONObject("common");
                JSONObject page = jsonObject.getJSONObject("page");
                Long ts = jsonObject.getLong("ts");

                return DwsTrafficForSourcePvBean.builder()
                        .versionCode(common.getString("vc"))
                        .sourceId(common.getString("sc"))
                        .ar(common.getString("ar"))
                        .isNew(common.getString("is_new"))
                        .uvCount(0L)
                        .totalSessionCount(page.getString("last_page_id") == null ? 1L : 0L)
                        .pageViewCount(1L)
                        .totalDuringTime(page.getLong("during_time"))
                        .jumpSessionCount(0L)
                        .ts(ts)
                        .build();
            }
        });

        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> uvBeanStream = uvStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean map(String value) throws Exception {
                // 将page_log的一条日志转换为一个对应的javaBean
                JSONObject jsonObject = JSON.parseObject(value);
                JSONObject common = jsonObject.getJSONObject("common");
                Long ts = jsonObject.getLong("ts");

                return DwsTrafficForSourcePvBean.builder()
                        .versionCode(common.getString("vc"))
                        .sourceId(common.getString("sc"))
                        .ar(common.getString("ar"))
                        .isNew(common.getString("is_new"))
                        .uvCount(1L)
                        .totalSessionCount(0L)
                        .pageViewCount(0L)
                        .totalDuringTime(0L)
                        .jumpSessionCount(0L)
                        .ts(ts)
                        .build();
            }
        });

        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> jumpBeanStream = jumpStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean map(String value) throws Exception {
                // 将page_log的一条日志转换为一个对应的javaBean
                JSONObject jsonObject = JSON.parseObject(value);
                JSONObject common = jsonObject.getJSONObject("common");
                Long ts = jsonObject.getLong("ts");

                return DwsTrafficForSourcePvBean.builder()
                        .versionCode(common.getString("vc"))
                        .sourceId(common.getString("sc"))
                        .ar(common.getString("ar"))
                        .isNew(common.getString("is_new"))
                        .uvCount(0L)
                        .totalSessionCount(0L)
                        .pageViewCount(0L)
                        .totalDuringTime(0L)
                        .jumpSessionCount(1L)
                        .ts(ts)
                        .build();
            }
        });

        //TODO 6 合并3条数据流
        DataStream<DwsTrafficForSourcePvBean> unionStream = pageBeanStream.union(uvBeanStream).union(jumpBeanStream);

        //TODO 7 添加水位线
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> withWaterMarkStream = unionStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTrafficForSourcePvBean>forBoundedOutOfOrderness(Duration.ofSeconds(15L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTrafficForSourcePvBean>() {
            @Override
            public long extractTimestamp(DwsTrafficForSourcePvBean element, long recordTimestamp) {
                return element.getTs();
            }
        }));

        //TODO 8 分组开窗
        WindowedStream<DwsTrafficForSourcePvBean, String, TimeWindow> windowStream = withWaterMarkStream.keyBy(new KeySelector<DwsTrafficForSourcePvBean, String>() {
            @Override
            public String getKey(DwsTrafficForSourcePvBean value) throws Exception {
                return value.getVersionCode()
                        + value.getSourceId()
                        + value.getAr()
                        + value.getIsNew();
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10L)));

        //TODO 9 聚合统计
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> reduceStream = windowStream.reduce(new ReduceFunction<DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean reduce(DwsTrafficForSourcePvBean value1, DwsTrafficForSourcePvBean value2) throws Exception {
                // 合并相同common信息的数据
                value1.setTotalSessionCount(value1.getTotalSessionCount() + value2.getTotalSessionCount());
                value1.setUvCount(value1.getUvCount() + value2.getUvCount());
                value1.setTotalDuringTime(value1.getTotalDuringTime() + value2.getTotalDuringTime());
                value1.setJumpSessionCount(value1.getJumpSessionCount() + value2.getJumpSessionCount());
                value1.setPageViewCount(value1.getPageViewCount() + value2.getPageViewCount());
                return value1;
            }
        }, new ProcessWindowFunction<DwsTrafficForSourcePvBean, DwsTrafficForSourcePvBean, String, TimeWindow>() {
            @Override
            public void process(String s, Context context, Iterable<DwsTrafficForSourcePvBean> elements, Collector<DwsTrafficForSourcePvBean> out) throws Exception {
                TimeWindow timeWindow = context.window();
                String start = DateFormatUtil.toYmdHms(timeWindow.getStart());
                String end = DateFormatUtil.toYmdHms(timeWindow.getEnd());
                for (DwsTrafficForSourcePvBean element : elements) {
                    element.setStt(start);
                    element.setEdt(end);
                    // 修正时间戳
                    element.setTs(System.currentTimeMillis());
                    out.collect(element);
                }
            }
        });
        reduceStream.print();

        //TODO 10 维度关联
        reduceStream.map(new MapFunction<DwsTrafficForSourcePvBean, DwsTrafficForSourcePvBean>() {
            @Override
            public DwsTrafficForSourcePvBean map(DwsTrafficForSourcePvBean value) throws Exception {
                // 关联来源名称
                String sourceId = value.getSourceId();
                String provinceId = value.getAr();
                JSONObject dimBaseSource = DimUtil.getDimInfo("DIM_BASE_SOURCE", sourceId);
                String sourceName = dimBaseSource.getString("SOURCE_SITE");
                value.setSourceName(sourceName);
                JSONObject dimBaseProvince = DimUtil.getDimInfo("DIM_BASE_PROVINCE", provinceId);
                String provinceName = dimBaseProvince.getString("NAME");
                value.setProvinceName(provinceName);
                return value;
            }
        }).print();

        // 异步操作
        // 关联来源表
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> sourceBeanStream = AsyncDataStream.unorderedWait(reduceStream, new DimAsyncFunction<DwsTrafficForSourcePvBean>("DIM_BASE_SOURCE") {
            @Override
            public void join(DwsTrafficForSourcePvBean obj, JSONObject jsonObject) throws Exception {
                String sourceName = jsonObject.getString("SOURCE_SITE");
                obj.setSourceName(sourceName);
            }

            @Override
            public String getKey(DwsTrafficForSourcePvBean obj) {
                return obj.getSourceId();
            }
        }, 1, TimeUnit.MINUTES);

        // 关联省份
        SingleOutputStreamOperator<DwsTrafficForSourcePvBean> dimBeanStream = AsyncDataStream.unorderedWait(sourceBeanStream, new DimAsyncFunction<DwsTrafficForSourcePvBean>("DIM_BASE_PROVINCE") {
            @Override
            public void join(DwsTrafficForSourcePvBean obj, JSONObject jsonObject) throws Exception {
                String provinceName = jsonObject.getString("NAME");
                obj.setProvinceName(provinceName);
            }

            @Override
            public String getKey(DwsTrafficForSourcePvBean obj) {
                return obj.getAr();
            }
        }, 1, TimeUnit.MINUTES);

        //TODO 11 写出到clickHouse
        dimBeanStream.addSink(ClickHouseUtil.getJdbcSink(" " +
                "insert into dws_traffic_vc_source_ar_is_new_page_view_window values" +
                "(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));

        // TODO 12 执行任务
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/193891.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java游戏 王者荣耀

GameFrame类 所需图片&#xff1a; package 王者荣耀;import java.awt.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.KeyAdapter; import java.awt.event.KeyEvent; import java.io.File; import java.util.ArrayList…

2023年亚太杯APMCM数学建模大赛A题水果采摘机器人的图像识别

2023年亚太杯APMCM数学建模大赛 A题 水果采摘机器人的图像识别 原题再现 中国是世界上最大的苹果生产国&#xff0c;年产量约3500万吨。同时&#xff0c;中国也是世界上最大的苹果出口国&#xff0c;世界上每两个苹果中就有一个是中国出口的&#xff0c;世界上超过六分之一的…

Docker-简介、基本操作

目录 Docker理解 1、Docker本质 2、Docker与虚拟机的区别 3、Docker和JVM虚拟化的区别 4、容器、镜像的理解 5、Docker架构 Docker客户端 Docker服务器 Docker镜像 Docker容器 镜像仓库 Docker基本操作 1、Docker镜像仓库 镜像仓库分类 镜像仓库命令 docker lo…

CV计算机视觉每日开源代码Paper with code速览-2023.11.22

点击CV计算机视觉&#xff0c;关注更多CV干货 论文已打包&#xff0c;点击进入—>下载界面 点击加入—>CV计算机视觉交流群 1.【语义分割】Mobile-Seed: Joint Semantic Segmentation and Boundary Detection for Mobile Robots 论文地址&#xff1a;https://arxiv.or…

高效视频剪辑:按指定时长批量分割视频,释放无尽创意

随着数字媒体技术的不断发展&#xff0c;视频剪辑已经成为日常生活中不可或缺的一部分。无论是制作电影、电视剧&#xff0c;还是创意生活短视频&#xff0c;视频剪辑都扮演着重要的角色。然而&#xff0c;对于许多非专业人士来说&#xff0c;视频剪辑可能是一项复杂而耗时的任…

C#,《小白学程序》第二十五课:大数乘法(BigInteger Multiply)的Karatsuba算法及源代码

1 文本格式 /// <summary> /// 《小白学程序》第二十五课&#xff1a;大数&#xff08;BigInteger&#xff09;的Karatsuba乘法 /// Multiplies two bit strings X and Y and returns result as long integer /// </summary> /// <param name"a">&…

如何在Ubuntu系统上安装Redis

Redis的下载 Redis安装包分为windows版和Linux版当前示例中介绍的是Linux版本Linux的下载地址&#xff1a;Index of /releases/ (redis.io)本次下载的压缩包为&#xff1a;redis-6.2.14.tar.gzRedis的安装 将压缩包通过ssh远程工具上传到Linux服务器中解压压缩包 tar -zxvf red…

深度学习18

卷积层 查看每个数据 使用tensorboard查看 池化层 使用数据集进行训练 创建实例&#xff0c;使用tensorboard进行显示 最大池化保留了图片信息&#xff0c;神经网络训练的数据量大大减小&#xff0c;可以加快训练 非线性激活 非线性激活为神经网络加入了一些非线性的特质…

蓝桥杯每日一题2023.11.27

题目描述 星系炸弹 - 蓝桥云课 (lanqiao.cn) 题目分析 对于此题目一一枚举即可 #include<bits/stdc.h> using namespace std; bool is_r(int n) {if((n % 4 0 && n % 100 ! 0)|| n % 400 0)return true;return false; } int mm[13] {0, 31, 28, 31, 30, 3…

【日常总结】优雅升级Swagger 2 升至 3.0, 全局设置 content-type application/json

目录 一、场景 二、问题 三、解决方案 四、延伸 上一节&#xff1a;【日常总结】Swagger-ui 导入 showdoc &#xff08;优雅升级Swagger 2 升至 3.0&#xff09;-CSDN博客 一、场景 接上一节&#xff1a;在 Swagger3Config extends WebMvcConfigurationSupport&#xff0c…

ECShop 4.x collection_listSQL注入

漏洞描述 ECShop是一款B2C独立网店系统&#xff0c;适合企业及个人快速构建个性化网上商店。系统是基于PHP语言及MYSQL数据库构架开发的跨平台开源程序 影响版本&#xff1a;ecshop4.0.7及以下 漏洞环境及利用 docker环境搭建 访问8080端口&#xff0c;数据库主机为mysql&a…

vue day2

1、指令修饰符&#xff1a;.指明一些指令后缀&#xff0c;不同后缀封装不同处理操作 按键修饰符&#xff1a;keyup.enter v-model修饰符&#xff1a; v-model.trim&#xff1a;去首位空格 v-model.number&#xff1a;转数字 事件修饰符&#xff1a; 阻止事件冒泡&#xff1…

毫米波雷达DOA角度计算-----DBF算法

DBF算法实现程序如下&#xff1a; 输入&#xff1a; parameter 是 毫米波雷达的参数设置。 antVec 是 目标点的8个虚拟天线的非相参积累数据。 function [angle,doa_abs] dbfMethod(parameter,antVec)txAntenna parameter.txAntenna; % 发射天线 [1 1]rxAntenna para…

交换技术-电路交换-报文交换-分组交换

交换技术是指主机之间、通信设备之间或主机与通信设备之间为交换信息所采用的数据格式和交换装置的方式。按交换技术可分为&#xff1a;电路交换、报文交换和分组交换。 电路交换 交换(switching)&#xff0c;就是按照某种方式动态地分配传输线路的资源。 电路交换是在源结点…

MFC、VC++操作excel后,excel程序进程无法正常退出的非暴力处理方法

先说处理方式 1、最low的方式&#xff1a;强制结束进程 //打开进程得到进程句柄 HANDLE hProcessOpenProcess(PROCESS_ALL_ACCESS,FALSE,Pid); if(hProcess!NULL) { //结束进程 if (TerminateProcess(hProcess,0)){printf("结束进程成功\n");return 0;} }这种方式…

带你用uniapp从零开发一个仿小米商场_10. 首页开发

图标菜单栏开发 轮播图开发完成后,就是图标菜单栏了 可以看出这些图标都是一样的样式,所以可以勇哥flex布局让他们每个占百分之20 代码如下,既然都是一样的那就直接用个循环嵌套一下 data数据如下 同样,为了能让这段代码能在别的地方也用到,我直接把它封装成组件 <templ…

nodejs+vue+elementui学生竞赛管理系统65o97

高校人才培养计划的重要组成部分&#xff0c;是实现人才培养目标、培养学生体育 能力与创新思维、学生竟赛管理系统检验学生综合素质与实践能力的重要手段与综合性实践教学环节。而我所在学院多采用半手工管理学生竟赛的方式&#xff0c;所以有必要开发学生竟赛管理系统来对学生…

成为AI产品经理——TPR、FPR、ROC、AUC

目录 一、PR图、BEP 1.PR图 2.BEP 二、灵敏度、特异度 1.灵敏度 2.特异度 三、真正率、假正率 1.真正率 2.假正率 三、ROC、AUC 1.ROC 2.AUC 四、KS值 一、PR图、BEP 1.PR图 二分类问题模型通常输出的是一个概率值&#xff0c;我们需要设定一个阈值&#xff…

金蝶Apusic应用服务器 任意文件上传漏洞复现

0x01 产品简介 金蝶Apusic应用服务器&#xff08;Apusic Application Server&#xff0c;AAS&#xff09;是一款标准、安全、高效、集成并具丰富功能的企业级应用服务器软件&#xff0c;全面支持JakartaEE8/9的技术规范&#xff0c;提供满足该规范的Web容器、EJB容器以及WebSer…

【uniapp】微信运行报错TypeError_ Cannot read property ‘FormData‘ of undefined

文章目录 一、报错详情&#xff1a;二、解决&#xff1a; 一、报错详情&#xff1a; 二、解决&#xff1a; npm install axios0.27.2 #或者 npm install axios1.3.4
最新文章