广告数仓:采集通道创建

系列文章目录

广告数仓:采集通道创建


文章目录

  • 系列文章目录
  • 前言
  • 一、环境和模拟数据准备
    • 1.hadoop集群
    • 2.mysql安装
    • 3.生成曝光测试数据
  • 二、广告管理平台数据采集
    • 1.安装DataX
    • 2.上传脚本生成器
    • 3.生成传输脚本
    • 4.编写全量传输脚本
  • 三、曝光点击检测数据采集
    • 1.安装Zookeeper
    • 2.安装Kafka
    • 3.安装Flume
    • 4.Local Files->Flume->Kafka
      • 1.编写Flume配置文件
      • 2.功能测试
      • 3.编写启动脚本
    • 5.Kafka->Flume->HDFS
      • 1.编写拦截器
      • 2.编写配置文件
      • 3.测试功能
      • 4.编写启动脚本
  • 总结


前言

常用的大数据技术,基本都学完,啃个项目玩玩,这个项目来源于尚桂谷最新的广告数仓


一、环境和模拟数据准备

1.hadoop集群

集群搭建可以看我的hadoop专栏

这里不在演示了。
建议吧hadoop把hadoop版本换成3.3.4不然后期可能会有依赖错误(血的教训)
在这里插入图片描述

2.mysql安装

此次项目使用的是Mysql8,所有实验需要的软件可以在B站尚桂谷平台找到。
上传jar包
在这里插入图片描述
卸载可能影响的依赖

 sudo yum remove mysql-libs

安装需要的依赖

sudo yum install libaio autoconf

用安装脚本一键安装

sudo bash install_mysql.sh

安装完成后后远程连接工具测试一下。
由于mysql8新增了一个公钥验证,所以我们要修改一个默认参数
在这里插入图片描述

在这里插入图片描述
之后新建数据库,将我们的广告平台管理数据导入
在这里插入图片描述

在这里插入图片描述

3.生成曝光测试数据

上传数据模拟器文件
在这里插入图片描述
这里稍微修改一个配置文件,把刚刚的公钥检测加上
在这里插入图片描述

java -jar NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述
我门用hadoop102和hadoop103来生成曝光数据

scp -r ad_mock/ 192.168.10.103:/opt/module/ad_mock

创建一个生成脚本
在这里插入图片描述

#!/bin/bash

for i in hadoop102 hadoop103
do
echo "========== $i =========="
        ssh $i "cd /opt/module/ad_mock ; java -jar /opt/module/ad_mock/NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar >/dev/null  2>&1 &"
done

二、广告管理平台数据采集

1.安装DataX

datax是阿里的一个开源工具,作用和sqoop差不多,主要是用来在不同数据库之间传输数据
github官方下载链接
也可以用资料里给的
在这里插入图片描述
直接解压即可
然后执行自带的一个测试脚本
在这里插入图片描述

python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

在这里插入图片描述

2.上传脚本生成器

在这里插入图片描述
按需要修改一下脚本
在这里插入图片描述

mysql.username=root
mysql.password=000000
mysql.host=hadoop102
mysql.port=3306
hdfs.uri=hdfs://hadoop102:8020
is.seperated.tables=0

mysql.database.import=ad
mysql.tables.import=
import_out_dir=/opt/module/datax/job/import

3.生成传输脚本

java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

4.编写全量传输脚本

vim ~/bin/ad_mysql_to_hdfs_full.sh
#!/bin/bash

DATAX_HOME=/opt/module/datax

# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d "-1 day" +%F`
fi

#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
  hadoop fs -test -e $1
  if [[ $? -eq 1 ]]; then
    echo "路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1
  else
    echo "路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$1为空"
    else
      echo "路径$1不为空,正在清空......"
      hadoop fs -rm -r -f $1/*
    fi
  fi
}

#数据同步
#参数:arg1-datax 配置文件路径;arg2-源数据所在路径
import_data() {
  handle_targetdir $2
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$2" $1
}

case $1 in
"product")
  import_data /opt/module/datax/job/import/ad.product.json /origin_data/ad/db/product_full/$do_date
  ;;
"ads")
  import_data /opt/module/datax/job/import/ad.ads.json /origin_data/ad/db/ads_full/$do_date
  ;;
"server_host")
  import_data /opt/module/datax/job/import/ad.server_host.json /origin_data/ad/db/server_host_full/$do_date
  ;;
"ads_platform")
  import_data /opt/module/datax/job/import/ad.ads_platform.json /origin_data/ad/db/ads_platform_full/$do_date
  ;;
"platform_info")
  import_data /opt/module/datax/job/import/ad.platform_info.json /origin_data/ad/db/platform_info_full/$do_date
  ;;
"all")
  import_data /opt/module/datax/job/import/ad.product.json /origin_data/ad/db/product_full/$do_date
  import_data /opt/module/datax/job/import/ad.ads.json /origin_data/ad/db/ads_full/$do_date
  import_data /opt/module/datax/job/import/ad.server_host.json /origin_data/ad/db/server_host_full/$do_date
  import_data /opt/module/datax/job/import/ad.ads_platform.json /origin_data/ad/db/ads_platform_full/$do_date
  import_data /opt/module/datax/job/import/ad.platform_info.json /origin_data/ad/db/platform_info_full/$do_date
  ;;
esac

赋予执行权限
然后启动hadoop集群
在这里插入图片描述
执行脚本

ad_mysql_to_hdfs_full.sh all 2023-01-07

在这里插入图片描述

三、曝光点击检测数据采集

1.安装Zookeeper

Zookeeper安装

2.安装Kafka

Kafka安装
kafka安装大部分都要把专栏方法相同,不同点是要添加一个配置参数。
在这里插入图片描述
每个节点都要做对应的修改

3.安装Flume

Flume安装
我们还需要修改一下日志文件
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
修改完成后分发即可。

4.Local Files->Flume->Kafka

1.编写Flume配置文件

在这里插入图片描述

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/ad_mock/log/.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092
a1.channels.c1.kafka.topic = ad_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1

2.功能测试

我们先启动zookeeper和kafka
在这里插入图片描述
然后启动Flume进行日志采集

bin/flume-ng agent -n a1 -c conf/ -f job/ad_file_to_kafka.conf -Dflume.root.logger=info,console

在这里插入图片描述
然后在kafka启动一个ad_log消费者,如果没有他会自动创建

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ad_log

然后用ad_mock.sh生成数据,观察消费者是否能消费到数据。
在这里插入图片描述

3.编写启动脚本

因为ad_mock.sh在102和103同时生成数据,所以我们需要把102的配置文件同步过去。

scp -r job/ hadoop103:/opt/module/flume/

为flume编写一个启动脚本

vim ~/bin/ad_f1.sh
#!/bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/ad_file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep ad_file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

编写之后启动它
在这里插入图片描述
再次启动kafka消费者,然后生成数据,如果仍然能出现数据,说明脚本没问题。

5.Kafka->Flume->HDFS

1.编写拦截器

用idea创建一个Maven工程,名字随意。
添加Maven依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.11.0</version>
        </dependency>
</dependencies>

创建拦截器类
在这里插入图片描述
TimestampInterceptor.java

package com.atguigu.ad.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class TimestampInterceptor implements Interceptor {
    private Pattern pattern;

    @Override
    public void initialize() {
        pattern = Pattern.compile(".*t=(\\d{13}).*");

    }

    @Override
    public Event intercept(Event event) {
        // 提取数据中的时间戳,补充到header中
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
        //去除日志中的双引号
        String sublog = log.substring(1, log.length() - 1);
        String result = sublog.replaceAll("\"\u0001\"", "\u0001");
        event.setBody(result.getBytes(StandardCharsets.UTF_8));

        // 正则匹配获取时间戳
        Matcher matcher = pattern.matcher(result);
        if (matcher.matches()) {
            String ts = matcher.group(1);
            headers.put("timestamp", ts);
        } else {
            return null;
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        Iterator<Event> iterator = events.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            Event intercept = intercept(next);
            if (intercept == null) {
                iterator.remove();
            }
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

直接打包上传到104 flume/lib目录
在这里插入图片描述
在这里插入图片描述

2.编写配置文件

 vim job/ad_kafka_to_hdfs.conf 
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = ad_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.ad.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/ad/log/ad_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.测试功能

将hadoop,zookeeper,kafka和之前配置的flume脚本全部启动
在这里插入图片描述
在104开启Flume采集

bin/flume-ng agent -n a1 -c conf/ -f job/ad_kafka_to_hdfs.conf -Dflume.root.logger=info,console

用ad_mock.sh生成数据。
在这里插入图片描述

4.编写启动脚本

vim ~/bin/ad_f2.sh

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 日志数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/ad_kafka_to_hdfs.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 日志数据flume-------"
        ssh hadoop104 "ps -ef | grep ad_kafka_to_hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

最后启动脚本在测试一下
在这里插入图片描述


总结

采集通道的创建就到这里了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/29795.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

大数据周会-本周学习内容总结018

开会时间&#xff1a;2023.06.18 15:00 线下会议 01【调研-数据分析&#xff08;质量、ETL、可视化&#xff09;】 ETL&#xff0c;是英文Extract-Transform-Load的缩写&#xff0c;用来描述将数据从来源端经过抽取&#xff08;extract&#xff09;、转换&#xff08;transform…

开源游戏区块链项目分享:Unity开发的独立区块链

Arouse Blockchain [Unity独立区块链] ❗️千万别被误导&#xff0c;上图内容虽然都在项目中可寻&#xff0c;但与目前区块链的业务代码关联不大&#xff0c;仅供宣传作用(总得放些图看着好看)。之所以有以上内容是项目有个目标功能是希望每个用户在区块链上都有一个独一无二的…

如何看待 Facebook 上线支付功能?

随着科技的不断进步&#xff0c;电子支付在我们的生活中变得越来越普遍。最近&#xff0c;Facebook宣布推出自己的支付功能&#xff0c;这引起了广泛的关注和讨论。作为世界上最大的社交媒体平台之一&#xff0c;Facebook进入支付领域的举措无疑具有重要意义。那么&#xff0c;…

13年测试老鸟,带你详探服务端的接口测试,测试内卷之路...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 服务器的接口测试…

数据库第三章(SQL)

目录 1.SQL语言 索引 1.SQL语言 sql语言是个非过程性语言 sql的特点 1.综合统一&#xff0c;把增删查改都统一了起来 2.高度非过程化&#xff0c;不关心过程 3.面向集合的操作方式 sql基本语法 drop是删除表 delete是删除表的某个元组 安全方面&#xff1a;grant授权 revo…

EXCEL函数笔记1(数学函数、文本函数、日期函数)

数学函数 取整&#xff1a;INT(number) 取余&#xff1a;MOD(number,除数) 四舍五入&#xff1a;ROUND(number&#xff0c;保留几位小数) 取绝对值&#xff1a;ABS(number) 根号处理&#xff1a;SQRT&#xff08;number&#xff09; 0到1随机数&#xff1a;RAND&#xff08;&am…

【机器学习】十大算法之一 “朴素贝叶斯”

作者主页&#xff1a;爱笑的男孩。的博客_CSDN博客-深度学习,活动,python领域博主爱笑的男孩。擅长深度学习,活动,python,等方面的知识,爱笑的男孩。关注算法,python,计算机视觉,图像处理,深度学习,pytorch,神经网络,opencv领域.https://blog.csdn.net/Code_and516?typeblog个…

【LeetCode】每日一题 -- 1171. 从链表中删去总和值为零的连续节点 -- Java Version

题目链接&#xff1a;https://leetcode.cn/problems/remove-zero-sum-consecutive-nodes-from-linked-list/ 1. 题解&#xff08;1171. 从链表中删去总和值为零的连续节点&#xff09; 2021年字节二面真题 1.1 暴力解法&#xff1a;穷举 时间复杂度 O(n2)&#xff0c;空间复杂…

【论文】attention is all you need

重点在第三节 attention is all you need摘要1. 绪论2. 背景3. 模型架构3.1 编码器和解码器堆叠 3.2 注意力3.2.1 缩放点积注意力&#xff08;Scaled Dot-Product Attention&#xff09;3.2.2 多头注意力机制3.2.3 模型中注意力的应用 3.3 职位感知前馈网络&#xff08;Positio…

前端中间件Midway的使用

一、 关于midway1. 解决什么痛点2. 期望达到什么效果 二、创建应用并使用1. 创建midway应用2. 认识Midway2.1 目录结构2.2 Controller2.3 路由2.4 获取请求参数2.5 Web中间件2.6 组件使用2.7 服务(service) 三、写到最后 一、 关于midway Midway 是阿里巴巴 - 淘宝前端架构团队…

基于深度学习的高精度安全背心检测识别系统(PyTorch+Pyside6+YOLOv5模型)

摘要&#xff1a;基于深度学习的高精度安全背心检测识别系统可用于日常生活中或野外来检测与定位安全背心目标&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的安全背心目标检测识别&#xff0c;另外支持结果可视化与图片或视频检测结果的导出。本系统采用YOLOv5…

微服务: 01-rabbitmq的应用场景及安装(docker)

目录 1. rabbitmq前言简介: 1.1 RabbitMQ的几个重要作用&#xff1a; -> 1.1.1 解耦&#xff1a; -> 1.1.2 异步通信&#xff1a; -> 1.1.3 流量削峰&#xff1a; -> 1.1.4 消息传递的可靠性和持久性&#xff1a; 2. rabbitmq的安装(docker版) -> 2.1 …

SpringMVC 学习整理

文章目录 一、SpringMVC 简介1.1 什么是MVC1.2 什么是Spring MVC1.3 Spring MVC的特点 二、SpringMVC 快速入门三、RequestMapping注解说明四、SpringMVC获取请求参数4.1 通过ServletAPI获取请求参数4.2 通过控制器方法的形参获取请求参数4.3 通过RequestParam接收请求参数4.4 …

Rust语言从入门到入坑——(2)Rust在windows上搭建开发环境

文章目录 0 引入1、搭建 Visual Studio Code 开发环境1.1、安装 Rust 编译工具1.2 、VS Code安装 2、官网在线3、总结4、引用 0 引入 开始搭建一个适合在windows上运行的Rust环境。 Rust支持的程序语言很多&#xff1a;可详见官网介绍 1、搭建 Visual Studio Code 开发环境 …

[架构之路-211]- 需求- 软架构前的需求理解:ADMEMS标准化、有序化、结构化、层次化需求矩阵 =》需求框架

目录 前言&#xff1a; 一、什么是ADMES: 首先&#xff0c;需求是分层次的&#xff1a; 其次&#xff0c;需求是有结构的&#xff0c;有维度的 再次&#xff0c;不同层次需求、不同维度需求之间可以相互转化&#xff08;难点、经验积累&#xff09; 最终&#xff0c;标准…

【雕爷学编程】Arduino动手做(114)---US-015高分辨超声波模块

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…

Floyd 判圈算法(Floyd Cycle Detection Algorithm)

Floyd 判圈算法(Floyd Cycle Detection Algorithm) 前言 Floyd判圈算法属于对指针操作的算法&#xff0c;它一般需要且仅需要两个指针&#xff0c;通过设定不同的指针移动速度&#xff0c;来判定链表或有限状态机中是否存在环。人为规定移动较快的指针称为快速指针(fast poin…

给初级测试工程师的一些避坑建议

我遇到的大多数开发人员都不怎么热衷于测试。有些会去做测试&#xff0c;但大多数都不测试&#xff0c;不愿意测试&#xff0c;或者勉而为之。我喜欢测试&#xff0c;并且比起编写新的代码&#xff0c;愉快地花更多的时间在测试中。我认为&#xff0c;正是因为专注于测试&#…

【Turfjs的java版本JTS】前面讲了Turfjs可以实现几何计算,空间计算的功能,如果后端要做这项功能也有类似的类库,JTS

JTS Java Topology Suite 几何计算&#xff1a; 1. 前端js就用这个 Turfjs的类库。参考网站&#xff1a; 计算两线段相交点 | Turf.js中文网 2. 后端java语言就可以用 JTS这个类库&#xff0c;参考网站&#xff1a; JTS参考网站&#xff1a; 1. https://github.com/locatio…

Windows11 安装 CUDA/cuDNN+Pytorch

一、准备工作&#xff1a; 查看torch版本&#xff1a;进入python交互环境&#xff1a; >>>import torch >>>torch.__version__ 查看cuda版本&#xff1a;CMD窗口 nvcc --version 如果版本不一致&#xff0c;需要卸载再重装。 二、安装 Windows 安装 CU…
最新文章