Flume的安装及使用

Flume的安装及使用

文章目录

  • Flume的安装及使用
      • Flume的安装
        • 1、上传至虚拟机,并解压
        • 2、重命名目录,并配置环境变量
        • 3、查看flume版本
        • 4、测试flume
        • 5、flume的使用

Flume的安装

1、上传至虚拟机,并解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/soft/

在环境变量中增加如下命令,可以使用 soft 快速切换到 /usr/local/soft

alias soft=‘cd /usr/local/soft/’

2、重命名目录,并配置环境变量
mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
source /etc/profile
3、查看flume版本
flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]# 
4、测试flume
  • 监控端口,将数据打印至控制台

    # a1表示 agent名称,具体定义在启动命令中 -n 参数中
    # r1表示a1中的source名称
    a1.sources = r1
    # k1表示a1的sink名称
    a1.sinks = k1
    # c1表示a1中的channel名称
    a1.channels = c1
    
    # 表示a1的输入源类型为netcat端口类型
    a1.sources.r1.type = netcat
    
    # 表示a1的监听主机,表示任意ip都可以
    a1.sources.r1.bind = 0.0.0.0
    # 表示a1的监听端口号
    a1.sources.r1.port = 6666
    
    # 表示a1的sinkc输出的目的地是控制台的logger类型
    a1.sinks.k1.type = logger
    
    # 表示channel类型是memory内存类型
    a1.channels.c1.type = memory
    
    # 表示a1的channel总容量为1000 event
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity = 100
    
    # 表示将r1和c1连接起来
    a1.sources.r1.channels = c1
    
    # 表示将K1 和 c1 连接起来
    a1.sinks.k1.channel=c1
    

启动命令

flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/job/natcat2logger.conf -Dflume.root.logger=INFO,console

​ 参数说明:

​ -c 表示配置文件存储在conf目录中

​ -n 表示agent起名为a1

​ -f 表示flume启动读取的配置文件

​ -Dflume.root.logger=INFO,console 表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别 ,日志级别包括: log、info、warn、 error

6666端口来源:
在这里插入图片描述

在这里插入图片描述

  • 监控一个目录,将数据打印出来

    • 配置文件
    # 首先先给agent起一个名字 叫a1
    # 分别给source channel sink取名字
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    # 分别对source、channel、sink进行配置
    
    # 配置source
    # 将source的类型指定为 spooldir 用于监听一个目录下文件的变化
    # 因为每个组件可能会出现相同的属性名称,所以在对每个组件进行配置的时候 
    # 需要加上 agent的名字.sources.组件的名字.属性 = 属性值
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/data/
    a1.sources.r1.fileSuffix = .ok
    a1.sources.r1.fileHeader = true
    
    # 给r1这个souces配置一个拦截器并取名为 i1
    a1.sources.r1.interceptors = i1 
    # 将拦截器i1的类型设置为timestamp 会将处理数据的时间以毫秒的格式插入event的header中
    # a1.sources.r1.interceptors.i1.type = timestamp
    # 将拦截器i1的类型设置为regex_filter 会根据正则表达式过滤数据
    a1.sources.r1.interceptors.i1.type = regex_filter
    # 配置正则表达式
    # 如果匹配到那么获取整行数据 
    a1.sources.r1.interceptors.i1.regex = [0-9]+
    # excludeEvents = true 表示将匹配到的过滤,未匹配到的放行
    # a1.sources.r1.interceptors.i1.excludeEvents = true
    
    # 配置sink
    # 使用logger作为sink组件,可以将收集到数据直接打印到控制台
    a1.sinks.k1.type = logger
    
    # 配置channel
    # 将channel的类型设置为memory,表示将event缓存在内存中
    a1.channels.c1.type = memory
    
    # 组装
    # 将sources的channels属性指定为c1
    a1.sources.r1.channels = c1
    
    # 将sinks的channel属性指定为c1
    a1.sinks.k1.channel = c1
    
    • 启动agent
    flume-ng agent -n a1 -f ./spoolingtest.conf -Dflume.root.logger=DEBUG,console
    
    • 新建/root/data目录
    mkdir /root/data
    
    • 在/root/data/目录下新建文件,输入内容,观察flume进程打印的日志
    # 随意在a.txt中加入一些内容
    vim /root/data/a.txt
    
5、flume的使用
  • spoolingToHDFS.conf

    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定spooldir的属性
    a.sources.r1.type = spooldir 
    a.sources.r1.spoolDir = /root/data 
    a.sources.r1.fileHeader = true 
    a.sources.r1.interceptors = i1 
    a.sources.r1.interceptors.i1.type = timestamp
    #指定sink的类型
    a.sinks.k1.type = hdfs
    a.sinks.k1.hdfs.path = /flume/data/dir1
    # 指定文件名前缀
    a.sinks.k1.hdfs.filePrefix = student
    # 指定达到多少数据量写一次文件 单位:bytes
    a.sinks.k1.hdfs.rollSize = 102400
    # 指定多少条写一次文件
    a.sinks.k1.hdfs.rollCount = 1000
    # 指定文件类型为 流 来什么输出什么
    a.sinks.k1.hdfs.fileType = DataStream
    # 指定文件输出格式 为text
    a.sinks.k1.hdfs.writeFormat = text
    # 指定文件名后缀
    a.sinks.k1.hdfs.fileSuffix = .txt
    
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 在 /root/data/目录下准备数据
    The Zen of Python, by Tim Peters
    
    Beautiful is better than ugly.
    Explicit is better than implicit.
    Simple is better than complex.
    Complex is better than complicated.
    Flat is better than nested.
    Sparse is better than dense.
    Readability counts.
    Special cases aren't special enough to break the rules.
    Although practicality beats purity.
    Errors should never pass silently.
    Unless explicitly silenced.
    In the face of ambiguity, refuse the temptation to guess.
    There should be one-- and preferably only one --obvious way to do it.
    Although that way may not be obvious at first unless you're Dutch.
    Now is better than never.
    Although never is often better than *right* now.
    If the implementation is hard to explain, it's a bad idea.
    If the implementation is easy to explain, it may be a good idea.
    Namespaces are one honking great idea -- let's do more of those!
    
    • 启动agent
    flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
    

    注意如果Hadoop是3.x版本,那么需要将Hdfs中的lib目录下的guava包复制到Flume

    执行如下:

    mv $FLUME_HOME/lib/guava-11.0.2.jar $FLUME_HOME/lib/guava-11.0.2.jar.bak

    cd /usr/local/soft/hadoop-3.1.3/share/hadoop/hdfs/lib

    cp guava-27.0-jre.jar $FLUME_HOME/lib/

  • httpToLogger

    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定http的属性
    a.sources.r1.type = http
    a.sources.r1.port = 6666 
    
    #指定sink的类型
    a.sinks.k1.type = logger
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 启动

      • 先启动agent
      flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
      
      • 再使用curl发起一个http请求
      curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"},{ "headers" :{"a2" : "a11","b2" : "b11"},"body" : "hello~http~flume2~"}]' http://master:6666
      
  • exec

    • 配置文件

      # Name the components on this agent
      a2.sources = r2
      a2.sinks = k2
      a2.channels = c2
      
      # Describe/configure the source
      a2.sources.r2.type = exec
      a2.sources.r2.command = tail -F /usr/local/soft/hive-3.1.2/log/hive.log
      
      # Describe the sink
      a2.sinks.k2.type = hdfs
      a2.sinks.k2.hdfs.path = /flume/hivelog/%Y%m%d/%H
      #上传文件的前缀
      a2.sinks.k2.hdfs.filePrefix = logs-
      #是否按照时间滚动文件夹
      a2.sinks.k2.hdfs.round = true
      #多少时间单位创建一个新的文件夹
      a2.sinks.k2.hdfs.roundValue = 1
      #重新定义时间单位
      a2.sinks.k2.hdfs.roundUnit = hour
      #是否使用本地时间戳
      a2.sinks.k2.hdfs.useLocalTimeStamp = true
      #积攒多少个Event才flush到HDFS一次
      a2.sinks.k2.hdfs.batchSize = 100
      #设置文件类型,可支持压缩
      a2.sinks.k2.hdfs.fileType = DataStream
      #多久生成一个新的文件
      a2.sinks.k2.hdfs.rollInterval = 60
      #设置每个文件的滚动大小
      a2.sinks.k2.hdfs.rollSize = 134217700
      #文件的滚动与Event数量无关
      a2.sinks.k2.hdfs.rollCount = 0
      
      a2.channels.c2.type = memory
      a2.channels.c2.capacity = 1000
      a2.channels.c2.transactionCapacity = 100
      
      
      a2.sources.r2.channels = c2
      a2.sinks.k2.channel = c2 
      
      
    • 启动

      启动flume

      bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
      

      启动hive,之后查看HDFS

  • 单数据源多输出

    ​ Flume1监控文件内容变动,将监控到的内容分别给到flume2和flume3,flume2将内容写到HDFS, Flume3将数据写到本地文件系统

    • 配置文件

      flume1.conf 
      
      #Named
      a1.sources = r1
      a1.channels = c1 c2
      a1.sinks = k1 k2
      
      #Source
      a1.sources.r1.type = TAILDIR
      a1.sources.r1.filegroups = f1
      a1.sources.r1.filegroups.f1 = /usr/local/soft/flume-1.9.0/job/taildir/.*\.txt
      # 对于监听的文件监听位置信息需要保存到该json文件中
      a1.sources.r1.positionFile = /usr/local/soft/flume-1.9.0/job/position/position.json
      
      # channel selector
      a1.sources.r1.selector.type = replicating
      
      #Channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100
      
      a1.channels.c2.type = memory
      a1.channels.c2.capacity = 10000
      a1.channels.c2.transactionCapacity = 100
      
      #Sink
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = localhost
      a1.sinks.k1.port = 7777
      
      a1.sinks.k2.type = avro
      a1.sinks.k2.hostname = localhost
      a1.sinks.k2.port = 8888
      
      
      #Bind
      a1.sources.r1.channels = c1 c2  
      a1.sinks.k1.channel = c1 
      a1.sinks.k2.channel = c2 
      
      
      
      
      flume2.conf
      
      a2.sources = r1
      a2.channels = c1
      a2.sinks = k1 
      
      #Source
      a2.sources.r1.type = avro
      a2.sources.r1.bind = localhost
      a2.sources.r1.port = 7777
      
      #Channel
      a2.channels.c1.type = memory
      a2.channels.c1.capacity = 10000
      a2.channels.c1.transactionCapacity = 100
      
      #Sink
      a2.sinks.k1.type = hdfs
      a2.sinks.k1.hdfs.path = /flume/replicating/%Y%m%d/%H
      a2.sinks.k1.hdfs.filePrefix = logs-
      a2.sinks.k1.hdfs.round = true
      a2.sinks.k1.hdfs.roundValue = 1
      a2.sinks.k1.hdfs.roundUnit = hour
      a2.sinks.k1.hdfs.useLocalTimeStamp = true
      a2.sinks.k1.hdfs.batchSize = 100
      a2.sinks.k1.hdfs.fileType = DataStream
      a2.sinks.k1.hdfs.rollInterval = 60
      a2.sinks.k1.hdfs.rollSize = 134217700
      a2.sinks.k1.hdfs.rollCount = 0
      
      #Bind
      a2.sources.r1.channels = c1 
      a2.sinks.k1.channel = c1 
      
      
      flume3.conf 
      
      #Named
      a3.sources = r1
      a3.channels = c1
      a3.sinks = k1 
      
      #Source
      a3.sources.r1.type = avro
      a3.sources.r1.bind = localhost
      a3.sources.r1.port = 8888
      
      #Channel
      a3.channels.c1.type = memory
      a3.channels.c1.capacity = 10000
      a3.channels.c1.transactionCapacity = 100
      
      #Sink
      a3.sinks.k1.type = file_roll
      # 将数据保存到本地路径中 不断滚动创建,如果某个时间段没有数据,那么会创建一个空的文件 
      a3.sinks.k1.sink.directory = /usr/local/soft/flume-1.9.0/job/fileroll
      
      #Bind
      a3.sources.r1.channels = c1 
      

    a3.sinks.k1.channel = c1

    
    * 启动
    
    ```shell
    flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console
    
    flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console
    
    flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console
    
    
    
    
    
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/567688.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python来实现nmap扫描

今天分享一个用python实现nmap扫描的方法,以下是实现步骤 代码如下: import subprocessmissing_ips {166.139.144.163, 31.47.8.35, 58.242.86.191, 212.178.135.62, 103.1.35.114} port "7" for missing_ip in missing_ips:# 构造nmap命令…

【Elasticsearch】Elasticsearch 从入门到精通(二):基础使用

《Elasticsearch 从入门到精通》共包含以下 2 2 2 篇文章: Elasticsearch 从入门到精通(一):基本介绍Elasticsearch 从入门到精通(二):基础使用 😊 如果您觉得这篇文章有用 ✔️ 的…

基于MLP算法实现交通流量预测(Pytorch版)

在海量的城市数据中,交通流量数据无疑是揭示城市运行脉络、洞察出行规律的关键要素之一。实时且精准的交通流量预测不仅能为交通规划者提供科学决策依据,助力提升道路使用效率、缓解交通拥堵,还能为公众出行提供参考,实现个性化导…

C++ :设计模式实现

文章目录 原则单一职责原则开闭原则依赖倒置原则接口隔离原则里氏替换原则 设计模式单例模式观察者模式策略模式代理模式 原则 单一职责原则 定义: 即一个类只负责一项职责 问题: 类 T 负责两个不同的职责:职责 P1,职责 P2。当…

爆破、批量PoC扫描工具 -- POC-T

免责声明 请勿利用文章内的相关技术从事非法测试,由于传播、利用此文所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,作者不为此承担任何责任。工具来自网络,安全性自测,如有侵权请联系删除。…

【java】27:java绘图

坐标体系 - 介绍: 下图说明了Java坐标系。坐标原点位于左上角,以像素为单位。在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点个像素;第二个是y坐标,表示当前位置为垂直方向…

视频不够清晰怎么办?教你几种有效方法

在我们日常生活中,有时候我们会遇到不清晰的视频,这给我们带来了很多不便。那么,怎么将不清晰的视频变清晰呢?本文将为您介绍一些常用的软件工具,帮助您提升视频的清晰度。 方法一:使用AI技术 AI技术可以通…

springboot-异步、定时、邮件任务

目录 一,前言 二,异步 2.1,案例: 1,首先创建一个service: 2,Controller: ① 想办法告诉spring我们的异步方法是异步的,所以要在方法上添加注解 Async ②去springboot主程序中开…

【Java--数据结构】模拟实现ArrayList

欢迎关注个人主页:逸狼 创造不易,可以点点赞吗~ 如有错误,欢迎指出~ 目录 LIst 顺序表ArrayList 顺序表优点 IList接口 ArrayList中定义要操作的数组 在MyArrayList中 重写接口方法 新增元素 在指定位置插入元素 pos不合法异常 判断和查找元素…

Bentley二次开发教程19-文件及模型管理-参照操作

参照操作 模型参照(*.dgn) 当我们需要与同专业,或者跨专业协同配合时,总是无可避免的需要参照他人的模型。若想通过编程的方式提前将参照模型与指定场景绑定起来,那么就需要掌握模型参照的方法。关于该方法大致的使用…

python创建线程和结束线程

👽发现宝藏 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 python创建线程和结束线程 在 Python 中,线程是一种轻量级的执行单元&#xff…

C++-DAY1

思维导图 有以下定义,说明哪些量可以改变哪些不可以改变? const char *p; const (char *) p; char *const p; const char* const p; char const *p; (char *) const p; char const* const p; const char *p:指针 p 所指向的内容不可改…

【C++庖丁解牛】C++11---右值引用和移动语义

🍁你好,我是 RO-BERRY 📗 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 🎄感谢你的陪伴与支持 ,故事既有了开头,就要画上一个完美的句号,让我们一起加油 目录 1 左值引用和右值引用2 左…

是德软件89600 RFID使用笔记

文章目录 1、进入RFID软件:2、RFID软件解调设置项3、如何查看一段指令数据本文是日常工作的笔记分享。 lauch VSA(矢量频谱分析)后会出现以下界面: 当然这是因为频谱仪的输入有信号才显示如下: 否则就显示频谱仪的噪底 这里的设置过程同一般的频谱仪,比如中心频率、span…

逆向修改app就可以游戏充值到账?

hello ,大家好, 现在市场仍然流行着非常多的传奇类游戏私服或者其他类型的游戏私服,随着私服越来越多(很多并不合法),越来越多的人加入了破解,逆向修改,或者代充的队伍并从中获利。这里我给大家分享一下这些做代充的常规的做法,以及大家作为游戏服务器如何避坑做强校验…

ApiHug 的初心-ApiHug101

视频 秒懂 ApiHug -019 HOPE 🔥 H.O.P.E.: Help other people excellent 💝 是这个项目最初的初心 🤗 ApiHug {Postman|Swagger|Api...} 快↑ 准√ 省↓ 🏠 gitee github search ApiHug ApiHug 🤗 ApiHug {Post…

数据结构(学习笔记)王道

一、绪论 1.1 数据结构的基本概念 数据:是信息的载体,是描述客观事物属性的数、字符以及所有输入到计算机中并被计算机程序识别和处理的符号的集合。(计算机程序加工的原料)数据元素:数据的基本单位,由若干…

相关电路整理(工程)相关FOC电路整理

1. 基于STM32G4的FOC电机驱动学习板 1.1 防反接电路 电源正确接入时 电流从 VIN 端流向负载,经由 Q3(NMOS) 通向地(GND)。在上电瞬间,由于 MOS 管的体二极管效应,地回路通过体二极管接通。接下来,由于 Vgs…

【sping】在logback-spring.xml 获取项目名称

在日志文件中我们想根据spring.application.name 创建出的文件夹。 也不想死在XML文件中。 application.yml spring:application:name: my-demo logback-spring.xml <springProperty name"application_name" scope"context" source"spring.app…

Unity类银河恶魔城学习记录13-4 p145 Save Skill Tree源代码

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili GameData.cs using System.Collections; using System.Collections.Generic…