尚硅谷大数据项目《在线教育之采集系统》笔记002

视频地址:尚硅谷大数据项目《在线教育之采集系统》_哔哩哔哩_bilibili

目录

P032

P033

P033

P034

P035

P036


P032

P033

# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1

# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100

# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

P033

2023-07-26 11:13:42,136 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
2023-07-26 11:13:42,139 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:42,241 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.
2023-07-26 11:13:43,157 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -3 could not be established. Broker may not be available.
2023-07-26 11:13:43,164 (kafka-producer-network-thread | producer-1) [WARN - org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:671)] [Producer clientId=producer-1] Connection to node -2 could not be established. Broker may not be available.

[2023-07-26 11:03:06,989] INFO Opening socket connection to server node002/192.168.10.102:2181. (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,989] INFO SASL config status: Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2023-07-26 11:03:06,992] WARN Session 0x0 for sever node002/192.168.10.102:2181, Closing socket connection. Attempting reconnect except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: 拒绝连接
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:344)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1290)

flume生效!

node001
启动hadoop、zookeeper、kafka,再启动flume。

[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf
Info: Sourcing environment configuration script /opt/module/flume/flume-1.9.0/conf/flume-env.sh
Info: Including Hadoop libraries found via (/opt/module/hadoop/hadoop-3.1.3/bin/hadoop) for HDFS access
Info: Including Hive libraries found via () for Hive access
...
[atguigu@node001 ~]$ jpsall
================ node001 ================
6368 NodeManager
5793 NameNode
2819 QuorumPeerMain
6598 JobHistoryServer
5960 DataNode
6681 Application
4955 Kafka
7532 Jps
================ node002 ================
4067 NodeManager
2341 Kafka
3942 ResourceManager
4586 ConsoleConsumer
5131 Jps
1950 QuorumPeerMain
3742 DataNode
================ node003 ================
3472 NodeManager
3235 DataNode
1959 QuorumPeerMain
3355 SecondaryNameNode
2347 Kafka
3679 Jps
[atguigu@node001 ~]$ 
[atguigu@node002 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_log
[atguigu@node001 ~]$ mock.sh
[atguigu@node001 ~]$ 

P034

# /opt/module/flume/flume-1.9.0/job


# 1、定义组件,为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks - k1


# 2、配置sources,描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/01-onlineEducation/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/flume-1.9.0/taildir_position.json
a1.sources.r1.batchSize = 100

a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder


# 3、配置channels,描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false


# 4、组装,绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1
package com.atguigu.flume.interceptor;

import com.atguigu.flume.interceptor.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    /**
     * 过滤掉脏数据(不完整的json)
     *
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) {
        //1、获取body当中的数据
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        //2、判断数据是否为完整的json
        if (JSONUtil.isJSONValidate(log)) {
            return event;
        }

        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()) {
            Event event = iterator.next();
            if (intercept(event) == null) {
                iterator.remove();
            }
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}
package com.atguigu.flume.interceptor.utils;

import com.alibaba.fastjson.JSONObject;

public class JSONUtil {
    public static boolean isJSONValidate(String log) {
        try {
            JSONObject.parseObject(log);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}
[atguigu@node001 log]$ echo '{"id":1}' >> app.log
[atguigu@node001 log]$ echo '{"id": }' >> app.log
[atguigu@node001 log]$ echo '{"id":2}' >> app.log
[atguigu@node001 log]$ 

P035

#! /bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
        done
};;	
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac
#! /bin/bash

case $1 in
"start"){
	for i in node001 node002
	do
		echo " --------启动 $i 采集flume-------"
		ssh $i "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/flume-1.9.0/log1.txt 2>&1 &"
	done
};;	
"stop"){
	for i in node001 node002
	do
		echo " --------停止 $i 采集flume-------"
		ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
	done
};;
esac
#! /bin/bash

case $1 in
"start") {
	echo " --------采集flume启动-------"
	ssh node001 "nohup /opt/module/flume/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume/flume-1.9.0/conf/ -f /opt/module/flume/flume-1.9.0/job/file_to_kafka.conf >/dev /null 2>&1 &"
};;	
"stop") {
	echo " --------采集flume关闭-------"
	ssh node001 "ps -ef | grep file_to_kafka | grep -v grep | awk '{print \$2}' | xargs -n1 kill -9"
};;
esac

P036

## 1、定义组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1


## 2、配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = node001:9092,node002:9092,node003:9092
a1.sources.r1.kafka.consumer.group.id = topic_log
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.useFlumeEventFormat = false


## 3、配置channels
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/flume-1.9.0/data/behavior1/
a1.channels.c1.capacity = 1000000
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.keep-alive = 3


## 4、配置sinks
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/log/edu_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#a1.sinks.k1.hdfs.rollInterval = 10
#a1.sinks.k1.hdfs.rollSize = 134217728
#a1.sinks.k1.hdfs.rollCount = 0


## 5、组装 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[atguigu@node001 ~]$ cd /opt/module/flume/flume-1.9.0/
[atguigu@node001 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/53778.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ALLEGRO之Route菜单

本文主要介绍了ALLEGRO的Route菜单。 &#xff08;1&#xff09;Connect&#xff1a;走线&#xff1b; &#xff08;2&#xff09;Slide&#xff1a;推挤&#xff1b; &#xff08;3&#xff09;Timing Vision&#xff1a;等长设计时使用&#xff1f;暂不清楚&#xff1b; &…

oracle,获取每日24*60,所有分钟数

前言&#xff1a; 为规范用户的时间录入&#xff0c;因此我们采用下拉的方式&#xff0c;让用户选择需要的时间&#xff0c;因此我们需要将一天24小时的时间拆分为类似00:00,00:01...23:00,23:01,23:59。因此我们需要生成24*601440行的下拉复选值。具体效果如下图所示。 思路 1…

C语言字串函数、内存函数介绍以及模拟实现

目录 前言 本期内容介绍&#xff1a; 一、字符串函数 strlen介绍 strlen 模拟实现&#xff08;三种方式&#xff09; 方法一&#xff1a;计数器法 方法二&#xff1a;递归法&#xff08;不创建临时变量法&#xff09; 方法三&#xff1a;指针-指针 strcpy介绍 strcpy模…

SSIS对SQL Server向Mysql数据转发表数据 (完结)

1、对于根据主键进行更新和插入新的数据&#xff0c;根据前面的文章&#xff0c;对于组件已经很熟悉了&#xff0c;我们直接加入一个 查找 组件 &#xff0c;如下所示 2、右键点击"查找"&#xff0c;然后“编辑” &#xff0c;选择“连接”,选中我们的目标连接器&…

Vue2 第七节 Vue监测数据更新原理

&#xff08;1&#xff09;Vue会监视data中所有层次的数据 &#xff08;2&#xff09;如何监测对象中的数据 通过setter实现监视&#xff0c;且要在new Vue时传入要监测的数据对象中后追加的属性&#xff0c;Vue默认不做响应式处理如果要给后添加的属性做响应式&#xff0c;使…

Docker私有仓库

Docker私有仓库 Docker官方的Docker hub&#xff08;https://hub.docker.com&#xff09;是一个用于管理公共镜像的仓库&#xff0c;我们可以从上面拉取镜像到本地&#xff0c;也可以把我们自己的镜像推送上去。但是&#xff0c;有时候我们的服务器无法访问互联网&#xff0c;…

初阶数据结构——二叉树题目

文章目录 一、单值二叉树二、检查两颗树是否相同三、另一棵树的子树四、二叉树的前序遍历五、对称二叉树 一、单值二叉树 单值二叉树 如果二叉树每个节点都具有相同的值&#xff0c;那么该二叉树就是单值二叉树。只有给定的树是单值二叉树时&#xff0c;才返回 true&#xff…

Docker学习笔记,包含docker安装、常用命令、dockerfile、docker-compose等等

&#x1f600;&#x1f600;&#x1f600;创作不易&#xff0c;各位看官点赞收藏. 文章目录 Docker 学习笔记1、容器2、Docker 安装3、Docker 常用命令4、Docker 镜像5、自定义镜像5.1、镜像推送到阿里云5.2、镜像私有库 6、数据卷7、Docker 软件安装8、Docker File8.1、常见保…

基于python+Xception算法模型实现一个图像分类识别系统

一、目录 Xception介绍数据集处理模型训练模型评估项目扩展 二、Xception介绍 在计算机视觉领域&#xff0c;图像识别是一个非常重要的任务&#xff0c;其应用涵盖了人脸识别、物体检测、场景理解等众多领域。随着深度学习技术的发展&#xff0c;深度卷积神经网络&#xff0…

哈工大计算机网络课程网络安全基本原理之:身份认证

哈工大计算机网络课程网络安全基本原理之&#xff1a;身份认证 在日常生活中&#xff0c;在很多场景下我们都需要对当前身份做认证&#xff0c;比如使用密码、人脸识别、指纹识别等&#xff0c;这些都是身份认证的常用方式。本节介绍的身份认证&#xff0c;是在计算机网络安全…

android 如何分析应用的内存(十三)——perfetto

android 如何分析应用的内存&#xff08;十三&#xff09; 本篇文章是native内存的最后一篇文章——perfetto perfetto简介 从2018年始&#xff0c;android开发者峰会正式推出perfetto工具。从此perfetto成为安卓最重要的工具之一。在2018年以前&#xff0c;android使用syst…

OpenHarmony开源鸿蒙学习入门 - 基于3.2Release 应用开发环境安装

OpenHarmony开源鸿蒙学习入门 - 基于3.2Release 应用开发环境安装 基于目前官方master主支&#xff0c;最新文档版本3.2Release&#xff0c;更新应用开发环境安装文档。 一、安装IDE&#xff1a; 1.IDE安装的系统要求 2.IDE下载官网链接&#xff08;IDE下载链接&#xff09; …

小红书2020校招测试开发后端笔试题卷三

//完全背包求组合数 #include <iostream> #include<vector> #include<set> #include<map> #include<algorithm> using namespace std; int value[300]; // vector<int>vis; // vector<int>vis1; map<vector<int>,int>m…

Tomcat的基本使用,如何用Maven创建Web项目、开发完成部署的Web项目

Tomcat 一、Tomcat简介二、Tomcat基本使用三、Maven创建Web项目3.1 Web项目结构3.2开发完成部署的Web项目3.3创建Maven Web项目3.3.1方式一3.3.2方式二&#xff08;个人推荐&#xff09; 总结 一、Tomcat简介 Web服务器&#xff1a; Web服务器是一个应用程序&#xff08;软件&…

深入探究Java面向对象的三大特征:封装、继承、多态

文章目录 1. 封装&#xff08;Encapsulation&#xff09;2. 继承&#xff08;Inheritance&#xff09;3. 多态&#xff08;Polymorphism&#xff09;结语 导语&#xff1a;Java是一门面向对象的编程语言&#xff0c;其核心思想是将现实世界中的事物抽象成对象&#xff0c;并通过…

PACS系统源码:支持三维重建功能、集成放射科管理RIS系统、图文报告编辑、打印、多级审核机制

PACS系统源码 PACS系统是以最新的IT技术为基础&#xff0c;遵循医疗卫生行业IHE/DICOM3.0和HL7标准&#xff0c;开发的多功能服务器和阅片系统。通过简单高性能的阅片功能&#xff0c;支持繁忙时的影像诊断业务&#xff0c;拥有保存影像的院内Web传输及离线影像等功能&#xf…

语义分割、转置卷积、风格迁移(第十二次组会)

TOC 语义分割 图像分割、实例分割 上采样、下采样 转置卷积 全卷积网络 风格迁移

网络安全 Day24-select高级用法和多表连接

select高级用法和多表连接 1. select 多子句单表高级实践1.1 select 多子句高级语法1.2 聚合函数1.3 group by 实践1.4 having 筛选1.5 order by 排序1.6 limit 2. 多表连接 1. select 多子句单表高级实践 1.1 select 多子句高级语法 where 和 having 区别是后者是分组后进行…

JAVASE---类和对象

1. 面向对象的初步认知 1.1 什么是面向对象 Java是一门纯面向对象的语言(Object Oriented Program&#xff0c;简称OOP)&#xff0c;在面向对象的世界里&#xff0c;一切皆为对象。面向对象是解决问题的一种思想&#xff0c;主要依靠对象之间的交互完成一件事情。用面向对象的…

测试开源C#人脸识别模块ViewFaceCore(5:质量检测和眼睛状态检测)

ViewFaceCore模块中的FaceQuality支持预测人脸质量&#xff0c;最初以为是预测人体体重&#xff0c;实际测试过程中才发现是评估人脸图片质量&#xff0c;主要调用Detect函数执行图片质量检测操作&#xff0c;其函数原型如下所示&#xff1a; //// 摘要:// 人脸质量评估///…
最新文章