04数据平台Flume

Flume 功能

在这里插入图片描述
Flume主要作用,就是实时读取服务器本地磁盘数据,将数据写入到 HDFS。

Flume是 Cloudera提供的高可用,高可靠性,分布式的海量日志采集、聚合和传输的系统工具。

Flume 架构

Flume组成架构如下图所示:
在这里插入图片描述

Agent

每个 Agent 代表着一个 JVM 进程,它以事件的方式将数据从源头送至目的地。
Agent 由 3 个部分组成,Source、Channel、Sink。

  • Source:Source是负责接收数据到Flume Agent 的组件,Source 组件可以处理各种类型、各种格式的日志数据。
  • Sink:Sink不断轮询Channel中的事件并且批量移除他们,并将这些数据批量写入到存储或者索引系统,或被发送到另一个Flume Agent。
  • Channel:Channel是位于Source 和Sink 之间的缓冲区。因此,Channel允许 Source和 Sink 有不同的运行速率。Channel 是线程安全的, 可以同时处理几个 Source 的写操作和多个 Sink 的读操作。
    Flume自带两种 Channel:Memory Channel 和File Channel。
    Memory Channel 是一个内存队列,Source将事件写入其尾部,Sink从其头部读取事件。 Memory Channel 将源写入的事件存储在堆上。由于它将所有数据存储在内存中,因此提供了高吞吐量。 它最适合那些不担心数据丢失的流。 它不适合涉及数据丢失的数据流。
    File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
  • Event:Flume 传输数据的基本单元,数据以 Event的形式将数据从源头送至目的地。Event由Header 和 Body 组成,Header用来存放event 的属性,为 K-V结构,Body 用于存放数据,为字节数组结构。

安装 flume

话不多说,直接开始安装 flume。

  1. 前往 http://archive.apache.org/dist/flume/ , 选择1.9.0
  2. 将apache-flume-1.9.0-bin.tar.gz使用 sftp上传到linux的/opt/software目录下
    在这里插入图片描述
  3. 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
[logan@hadoop101 hadoop]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  1. 创建软链接
cd /opt/module
ln -snf flume-1.9.0/ flume
  1. 删除jar 包,否则会报错
cd /opt/module/flume
rm lib/guava-11.0.2.jar

注意删除guava,一定要配置 Hadoop 环境变量,否则会报错:

Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more
  1. 修改log4j.properties配置文件
[logan@hadoop101 flume]$ vim conf/log4j.properties 
# 注意是修改内容
flume.log.dir=/opt/module/flume/logs
  1. 验证 flume
[logan@hadoop101 flume]$ /opt/module/flume/bin/flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume日志采集

配置解析

需要采集的日志文件分布在hadoop101,hadoop102两台日志服务器,故需要在hadoop101,hadoop102上配置日志采集 Flume。Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到 Kafka。
此处选择TailDirSource和 KafkaChannel。

  • TailDirSource优势:支持断点续传、多目录。
  • KafkaChannel优势:省去了 Sink,提高了效率。
  • 日志采集 Flume关键配置 :
    在这里插入图片描述

Flume日志采集配置

  1. 创建Flume 配置文件
[logan@hadoop101 flume]$ pwd
/opt/module/flume
[logan@hadoop101 flume]$ mkdir job
[logan@hadoop101 flume]$ vim job/file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1
  1. 编写拦截器
    • 创建Maven工程flume-interceptor
    • 创建包:com.logan.gmall.flume.interceptor
    • 在pom.xml文件中添加如下配置
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>
    
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    • 在com.logan.gmall.flume.utils包下创建JSONUtil类
    package com.logan.gmall.flume.utils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONException;
    
    public class JSONUtil {
        /**
         * 通过异常判定是否是JSON字符串
         */
        public static boolean isJSONValidate(String input){
            try {
                JSON.parseObject(input);
                return true;
            } catch (JSONException e) {
                return false;
            }
        }
    }
    
    • 在com.logan.gmall.flume.interceptor包下创建ETLInterceptor类
    package com.logan.gmall.flume.interceptor;
    
    import com.logan.gmall.flume.utils.JSONUtil;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.List;
    
    public class ETLInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            //1、获取body当中的数据并转成字符串
            byte[] body = event.getBody();
            String log = new String(body, StandardCharsets.UTF_8);
            //2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
            if (JSONUtil.isJSONValidate(log)) {
                return event;
            } else {
                return null;
            }
        }
    
    
        @Override
        public List<Event> intercept(List<Event> list) {
    
            Iterator<Event> iterator = list.iterator();
    
            while (iterator.hasNext()){
                Event next = iterator.next();
                if(intercept(next)==null){
                    iterator.remove();
                }
            }
    
            return list;
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new ETLInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    
    
        @Override
        public void close() {
    
        }
    }
    
    • 打包
      在这里插入图片描述
    • 将打包文件放到hadoop101 的/opt/moduie/flume/lib文件夹下

采集测试

  1. 启动Zookeeper、Kafka集群
  2. 启动 hadoop101上的日志采集Flume
[logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
  1. 启动一个Kafka的Console-Consumer
[logan@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topic_log
  1. 生成模拟数据
[logan@hadoop101 module]$ mkdir -p /opt/module/applog/log/
[logan@hadoop101 module]$ vim /opt/module/applog/log/app.2023-12-02.log
{"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","order":1,"pos_id":5},{"display_type":"activity","item":"2","item_type":"activity_id","order":2,"pos_id":5},{"display_type":"query","item":"19","item_type":"sku_id","order":3,"pos_id":4},{"display_type":"query","item":"3","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"query","item":"5","item_type":"sku_id","order":5,"pos_id":2},{"display_type":"promotion","item":"19","item_type":"sku_id","order":6,"pos_id":4},{"display_type":"query","item":"14","item_type":"sku_id","order":7,"pos_id":2},{"display_type":"query","item":"9","item_type":"sku_id","order":8,"pos_id":2},{"display_type":"promotion","item":"35","item_type":"sku_id","order":9,"pos_id":1}],"page":{"during_time":9853,"page_id":"home"},"ts":1672512476000}
{"actions":[{"action_id":"favor_add","item":"9","item_type":"sku_id","ts":1672512480386},{"action_id":"get_coupon","item":"2","item_type":"coupon_id","ts":1672512483772}],"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"promotion","item":"19","item_type":"sku_id","order":1,"pos_id":4},{"display_type":"promotion","item":"14","item_type":"sku_id","order":2,"pos_id":5},{"display_type":"query","item":"21","item_type":"sku_id","order":3,"pos_id":1},{"display_type":"query","item":"11","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"promotion","item":"28","item_type":"sku_id","order":5,"pos_id":1}],"page":{"during_time":10158,"item":"9","item_type":"sku_id","last_page_id":"home","page_id":"good_detail","source_type":"promotion"},"ts":1672512477000}
  1. 观察kafka是否有消费到数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/218489.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SSM项目实战-前端-在Index.vue中展示第一页数据

1、util/request.js import axios from "axios";let request axios.create({baseURL: "http://localhost:8080",timeout: 50000 });export default request 2、api/schedule.js import request from "../util/request.js";export let getSchedu…

力扣刷题day1(两数相加,回文数,罗马数转整数)

题目1&#xff1a;1.两数之和 思路1和解析&#xff1a; //1.暴力枚举解法(历遍两次数组&#xff0c;时间复杂度O&#xff08;N^2)&#xff0c;空间复杂度O&#xff08;1&#xff09; int* twoSum(int* nums, int numsSize, int target, int* returnSize) {for (int i 0; i &…

【恋上数据结构】二叉堆学习笔记

二叉堆 需求分析 Top K 问题 什么是 Top K 问题&#xff1f; 从海量数据中找出前 K 个数据。 比如&#xff1a;从 100 万个整数中找出最大的 100 个整数Top K 问题的解法之一&#xff1a;可以用数据结构 “堆” 来解决。 堆 堆是一种【完全二叉树】&#xff0c;可以分为【…

SpringBoot自定义异常处理机制

说明&#xff1a;在完整的项目结构中&#xff0c;我们通常会创建一套自定义的异常处理机制&#xff0c;在系统可能出现异常的地方手动抛出这些异常&#xff0c;可以快速定位到异常代码片段&#xff0c;提高系统的可维护性。 本文介绍在SpringBoot项目中&#xff0c;搭建一套自…

2023.12.1 --数据仓库之 拉链表

目录 什么是拉链表 为什么要做拉链表? 没使用拉链表: 使用了拉链表: 题中订单拉链表的形成过程 实现语句 什么是拉链表 拉链表是缓慢渐变维的一种解决方案. 拉链表,记录每条信息的生命周期,一旦一条记录的生命周期结束,就重新开始一条新的记录,并把当前日期放入生效开始…

EI论文复现:基于组合双向拍卖的共享储能机制研究程序代码!

本程序参考EI期刊论文《基于组合双向拍卖的共享储能机制研究》&#xff0c;文中的组合双向拍卖交易机制较为新颖&#xff0c;本质上属于博弈范畴&#xff0c;共享储能是目前的研究热点&#xff0c;牵涉到共享储能参与者的投标策略和收益函数&#xff0c;文中所提模型可为电力市…

【兔子王赠书第10期】零基础入门Python,看这篇就够啦!

文章目录 写在前面推荐图书前言为什么要学习编程如何学习编程本书内容获得帮助 推荐理由粉丝福利写在后面 写在前面 粉丝福利第10期来啦&#xff0c;本期博主给大家推荐一本非常适合零基础入门Python的图书&#xff1a;《Python超能学习手册》&#xff0c;祝大家读完本书后都可…

深入微服务架构 | 微服务与k8s架构解读

微服务项目架构解读 ① 什么是微服务&#xff1f; 微服务是指开发一个单个小型的但有业务功能的服务&#xff0c;每个服务都有自己的处理和轻量通讯机制&#xff0c;可以部署在单个或多个服务器上。 微服务也指一种种松耦合的、有一定的有界上下文的面向服务架构。也就是说&…

<Linux>(极简关键、省时省力)《Linux操作系统原理分析之linux存储管理(5)》(21)

《Linux操作系统原理分析之linux存储管理&#xff08;5&#xff09;》&#xff08;21&#xff09; 6 Linux存储管理6.6 Linux 物理空间管理6.6.1 Linux 物理内存空间6.6.2 物理页面的管理6.6.3 空闲页面管理——buddy 算法 6.7 内存的分配与释放6.7.1 物理内存分配的数据结构 6…

运维工具之MobaXterm工具安装和使用

一、MobaXterm工具简介 MobaXterm是远程计算的终极工具箱。在一个Windows应用程序中&#xff0c;它提供了大量的功能&#xff0c;这些功能是为程序员、网站管理员、it管理员以及几乎所有需要以更简单的方式处理远程工作的用户量身定制的。MobaXterm在一个开箱即用的可移植exe文…

ros2与stm32通讯比较优秀的串口库

这个是我确定的串口库&#xff1a;serial: serial::Serial Class Reference (wjwwood.io) 我也不知道其他的串口库了&#xff0c;我就知道几个&#xff0c;然后我觉得这个是3个里面学习周期比较短&#xff0c;然后质量比较可靠的库 我隐隐觉得这个串口库就是ros1选择的串口库…

如何在Linux环境搭建本地SVN服务器并结合cpolar实现公网访问

目录 前言 1. Ubuntu安装SVN服务 2. 修改配置文件 2.1 修改svnserve.conf文件 2.2 修改passwd文件 2.3 修改authz文件 3. 启动svn服务 4. 内网穿透 4.1 安装cpolar内网穿透 4.2 创建隧道映射本地端口 5. 测试公网访问 6. 配置固定公网TCP端口地址 6.1 保留一个固定…

SVN 版本管理

SVN 文件状态 这里有一张图片可以说明&#xff1a;

C#中内置的泛型委托Func与Action

简介 从C# 3.0起很少需要自己声明委托。System.Func 是一个泛型委托&#xff0c;它可以表示带有返回值的方法。它可以接受一个到多个输入参数&#xff0c;并返回一个指定类型的结果。System.Func 委托的最后一个类型参数表示方法的返回值类型。而System.Action系列代表返回voi…

文章解读与仿真程序复现思路——中国电机工程学报EI\CSCD\北大核心《考虑富氧燃烧技术的电–气–热综合能源系统低碳经济调度》

这个标题涉及到一个关于能源系统和经济调度的复杂主题。让我们逐步解读&#xff1a; 电–气–热综合能源系统&#xff1a; 指的是一个综合的能源系统&#xff0c;包括了电力、气体&#xff08;可能是天然气等&#xff09;、热能等多个能源形式。这种系统的设计和优化旨在使不同…

vue+electron问题汇总

1. Vue_Bug Failed to fetch extension, trying 4 more times 描述&#xff1a;项目启动时报错 解决&#xff1a;注释图片中内容 2. Module not found: Error: Can’t resolve ‘fs’ in 描述&#xff1a;项目启动报错 解决&#xff1a;vue.config.js中添加图中数据 3.导入…

8.7 矢量图层点要素点分布(Point displacement)使用

文章目录 前言点分布&#xff08;Point displacement&#xff09;QGis代码实现 总结 前言 前面介绍了矢量-点要素-单一符号、矢量-点要素-分类符号、矢量-点要素-分级符号以及矢量-点要素-基于规则的使用本章介绍如何使用点分布&#xff08;Point displacement&#xff09;说明…

Java集合常见问题

目录 Java集合 1.前言2.集合3.Collection接口类3.1 List接口3.1.1 ArrayList&#xff08;常用&#xff09;3.1.2 LinkedList&#xff08;常用&#xff09;3.1.3 Vector&#xff08;不常用&#xff09; 3.2 Set接口3.2.1 HashSet&#xff08;常用&#xff09;3.2.2 LinkedHash…

软件设计中如何画各类图之五用例图(Use Case Diagram):系统功能需求与用户交互的图形化描述

目录 1 前言2 用例图基本介绍3 用例图的符号及说明3.1 用例&#xff08;Use Case&#xff09;3.2 参与者&#xff08;Actor&#xff09;3.2 关系&#xff08;Relationships&#xff09; 4 画用例图的步骤4.1 确定系统边界4.2 识别参与者4.3 定义用例4.4 绘制关系4.5 完善细节 5…

webpack学习-2.管理资源

webpack学习-2.管理资源 1.这章要干嘛2.加载css注意顺序&#xff01; 3.总结 1.这章要干嘛 管理资源&#xff0c;什么意思呢&#xff1f;管理什么资源&#xff1f;项目中经常会 导入各种各样的css文件&#xff0c;图片文件&#xff0c;字体文件&#xff0c;数据文件等等&#…
最新文章