Flink日志采集-ELK可视化实现

一、各组件版本

组件版本
Flink1.16.1
kafka2.0.0
Logstash6.5.4
Elasticseach6.3.1
Kibana6.3.1

  针对按照⽇志⽂件⼤⼩滚动⽣成⽂件的⽅式,可能因为某个错误的问题,需要看好多个⽇志⽂件,还有Flink on Yarn模式提交Flink任务,在任务执行完毕或者任务报错后container会被回收从而导致日志丢失,为了方便排查问题可以把⽇志⽂件通过KafkaAppender写⼊到kafka中,然后通过ELK等进⾏⽇志搜索甚⾄是分析告警。

二、Flink配置将日志写入Kafka

2.1 flink-conf.yaml增加下面两行配置信息

env.java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID
env.java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID

2.2 log4j.properties配置案例如下

##################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
##################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30

# This affects logging for both user code and Flink
#rootLogger.appenderRef.file.ref = MainAppender
rootLogger.level = INFO
rootLogger.appenderRef.kafka.ref = Kafka
rootLogger.appenderRef.file.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO

# Log all infos in the given file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 500MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10

#appender.main.name = MainAppender
#appender.main.type = RollingFile
#appender.main.append = true
#appender.main.fileName = ${sys:log.file}
#appender.main.filePattern = ${sys:log.file}.%i
#appender.main.layout.type = PatternLayout
#appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
#appender.main.policies.type = Policies
#appender.main.policies.size.type = SizeBasedTriggeringPolicy
#appender.main.policies.size.size = 100MB
#appender.main.policies.startup.type = OnStartupTriggeringPolicy
#appender.main.strategy.type = DefaultRolloverStrategy
#appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}

# kafka
appender.kafka.type = Kafka
appender.kafka.name = Kafka
appender.kafka.syncSend = true
appender.kafka.ignoreExceptions = false
appender.kafka.topic = flink_logs
appender.kafka.property.type = Property
appender.kafka.property.name = bootstrap.servers
appender.kafka.property.value = xxx1:9092,xxx2:9092,xxx3:9092
appender.kafka.layout.type = JSONLayout
apender.kafka.layout.value = net.logstash.log4j.JSONEventLayoutV1
appender.kafka.layout.compact = true
appender.kafka.layout.complete = false

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
#logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

#通过 flink on yarn 模式还可以添加⾃定义字段
# 日志路径
appender.kafka.layout.additionalField1.type = KeyValuePair
appender.kafka.layout.additionalField1.key = logdir
appender.kafka.layout.additionalField1.value = ${sys:log.file}
# flink-job-name
appender.kafka.layout.additionalField2.type = KeyValuePair
appender.kafka.layout.additionalField2.key = flinkJobName
appender.kafka.layout.additionalField2.value = ${sys:flinkJobName}
# 提交到yarn的containerId
appender.kafka.layout.additionalField3.type = KeyValuePair
appender.kafka.layout.additionalField3.key = yarnContainerId
appender.kafka.layout.additionalField3.value = ${sys:yarnContainerId}

  上⾯的 appender.kafka.layout.type 可以使⽤ JSONLayout ,也可以⾃定义。

  ⾃定义需要将上⾯的appender.kafka.layout.type 和 appender.kafka.layout.value 修改成如下:

appender.kafka.layout.type = PatternLayout
appender.kafka.layout.pattern ={"log_level":"%p","log_timestamp":"%d{ISO8601}","log_thread":"%t","log_file":"%F","l
og_line":"%L","log_message":"'%m'","log_path":"%X{log_path}","job_name":"${sys:flink
_job_name}"}%n

2.3 基于Flink on yarn模式提交任务前期准备

2.3.1 需要根据kafka的版本在flink/lib⽬录下放⼊kafka-clients的jar包

在这里插入图片描述

2.3.2 kafka处于启动状态

2.3.3 Flink Standalone集群

# 根据kafka的版本放⼊kafka-clients
kafka-clients-3.1.0.jar
# jackson对应的jar包
jackson-annotations-2.13.3.jar
jackson-core-2.13.3.jar
jackson-databind-2.13.3.jar

2.4 Flink on yarn任务提交案例

/root/software/flink-1.16.1/bin/flink run-application \
-t yarn-application \
-D yarn.application.name=TopSpeedWindowing \
-D parallelism.default=3 \
-D jobmanager.memory.process.size=2g \
-D taskmanager.memory.process.size=2g \
-D env.java.opts="-DflinkJobName=TopSpeedWindowing" \
/root/software/flink-1.16.1/examples/streaming/TopSpeedWindowing.jar

【注意】启动脚本需要加入这个参数,日志才能采集到任务名称(-D env.java.opts="-DflinkJobName=xxx")

  消费flink_logs案例

{
    instant: {
        epochSecond: 1698723428,
        nanoOfSecond: 544000000,
    },
    thread: 'flink-akka.actor.default-dispatcher-17',
    level: 'INFO',
    loggerName: 'org.apache.flink.runtime.rpc.akka.AkkaRpcService',
    message: 'Stopped Akka RPC service.',
    endOfBatch: false,
    loggerFqcn: 'org.apache.logging.slf4j.Log4jLogger',
    threadId: 68,
    threadPriority: 5,
    logdir: '/yarn/container-logs/application_1697779774806_0046/container_1697779774806_0046_01_000002/taskmanager.log',
    flinkJobName: 'flink-log-collect-test',
    yarnContainerId: 'container_1697779774806_0046_01_000002',
}

  ⽇志写⼊Kafka之后可以通过Logstash接⼊elasticsearch,然后通过kibana进⾏查询或搜索

三、LogStash部署

  部署过程略,网上都有

  需要注意Logstash内部kafka-clients和Kafka版本兼容问题,需要根据Kafka版本选择合适的Logstash版本

  将以下内容写⼊config/logstash-sample.conf ⽂件中

input {
	kafka {
		bootstrap_servers => ["xxx1:9092,xxx2:9092,xxx3:9092"] 
		group_id => "logstash-group"
		topics => ["flink_logs"] 
		consumer_threads => 3 
		type => "flink-logs" 
		codec => "json"
		auto_offset_reset => "latest"
	}
}

output {
	elasticsearch {
		hosts => ["192.168.1.249:9200"] 
		index => "flink-log-%{+YYYY-MM-dd}"
	}
}

  Logstash启动:

logstash-6.5.4/bin/logstash -f logstash-6.5.4/config/logstash-sample.conf 2>&1 >logstash-6.5.4/logs/logstash.log &

四、Elasticsearch部署

  部署过程略,网上都有

  注意需要用root用户以外的用户启动Elasticsearch

  启动脚本:

Su elasticsearchlogtest

elasticsearch-6.3.1/bin/elasticsearch

在这里插入图片描述

  Windows访问ES客户端推荐使用ElasticHD,本地运行后可以直连ES
在这里插入图片描述

五、Kibana部署

  部署过程略,网上都有

  启动脚本:

  kibana-6.3.1-linux-x86_64/bin/kibana

5.1 配置规则

在这里插入图片描述
在这里插入图片描述

5.2 日志分析

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/114300.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux基础环境开发工具的使用(yum,vim,gcc,g++)

Linux基础环境开发工具的使用[yum,vim,gcc,g] 一.yum1.yum的快速入门1.yum安装软件2.yum卸载软件 2.yum的生态环境1.操作系统的分化2.四个问题1.服务器是谁提供的呢?2.服务器上的软件是谁提供的呢?3.为什么要提供呢?4.yum是如何得知目标服务器的地址和下载链接呢?5.软件源 …

VMware打开centos黑屏解决方法汇总以及解决出现的bug(Centos7系统网络异常等)

VMware打开centos黑屏解决方法汇总 前言:一. VMware打开centos黑屏解决方法汇总一 .情况情况一:情况二情况三 二. 解决方法最简单的方法:一. 以管理员权限在命令行执行1. 管理员身份运行cmd2. 输入“netsh winsock reset”,回车3. 重启电脑即…

Python条件判断的运用

问题 在生活中,我们可以通过判断条件是否成立,来决定执行哪个分支。选择语句有多种形式:if语句,if-else语句,if-elif-else语句等。 Python使用if条件判断语句来实现条件判断时,可以在多个循环中实现对问题的…

Linux-----nginx的简介,nginx搭载负载均衡以及nginx部署前后端分离项目

目录 nginx的简介 是什么 nginx的特点以及功能 Nginx负载均衡 下载 安装 负载均衡 nginx的简介 是什么 Nginx是一个高性能的开源Web服务器和反向代理服务器。它的设计目标是为了解决C10k问题,即在同一时间内支持上万个并发连接。 Nginx采用事件驱动的异…

【云原生-K8s】Kubernetes安全组件CIS基准kube-beach安装及使用

基础介绍kube-beach介绍kube-beach 下载百度网盘下载wget下载 kube-beach安装kube-beach使用基础参数配置信息解读示例结果说明 基础介绍 为了保证集群以及容器应用的安全,Kubernetes 提供了多种安全机制,限制容器的行为,减少容器和集群的攻…

身份证二要素核验API:提高身份验证的精确性与效率

前言 在数字时代,身份验证已经成为各行各业的重要环节。无论是金融交易、电子商务还是在线服务,确保用户身份的准确性至关重要。身份证二要素核验API,作为一种先进的技术解决方案,正在逐渐崭露头角,为身份验证带来了精…

el-table中的el-input标签修改值,但界面未更新,解决方法

el-table中的el-input标签修改值,界面未更新 在el-table中的el-input里面写的change事件根本不触发,都不打印,试了网络上各种方法都没用 然后换成input事件,input事件会触发,但界面也未更新。我在触发事件的时候&…

Mac允许任何来源的的安装包进行安装

首先打开终端,开启“任何来源”,执行如下命令: sudo spctl --master-disable 然后回车,继续输入密码(密码输入时是不可见的),然后回车。 接着打开【系统偏好设置】,选择【安全性与…

react-router

一、react-router是什么 react-router是一个用于管理React应用程序中路由的库。路由是指确定应用程序如何根据URL路径来渲染不同的组件和页面。使用react-router可以将应用程序的不同页面映射到不同的URL路径,以及在用户导航时动态地加载适当的组件。这样&#xff…

【实战Flask API项目指南】之五 RESTful API设计

实战Flask API项目指南之 RESTful API设计 本系列文章将带你深入探索实战Flask API项目指南,通过跟随小菜的学习之旅,你将逐步掌握 Flask 在实际项目中的应用。让我们一起踏上这个精彩的学习之旅吧! 前言 当小菜踏入Flask后端开发的世界时…

sublime怎么调中文?

Sublime Text是一个功能强大的文本编辑器,它被广泛使用于编码过程中。在开发过程中,Sublime Text界面的语言设置通常默认为英语,无法直接输入中文。那么如何调整Sublime Text编辑器的设置,以允许在界面中输入和编辑中文呢&#xf…

嵌入式web boa配置流程详解

boa配置流程详解 前期准备boa介绍操作目的下载boa 配置流程1.解压boa服务器2.配置Makefile3.编译boa服务器4.修改boa配置文件5.增加权限并编译cgi6.测试demo7.错误示例 附录A history附录B boa.conf 前期准备 boa介绍 Boa服务器是一个小巧高效的web服务器,是一个运…

虹科示波器 | 汽车免拆检修 | 2012 款上汽大众帕萨特车 发动机偶尔无法起动

一、故障现象 一辆2012款上汽大众帕萨特车,搭载CFB发动机,累计行驶里程约为12万km。车主反映,将点火开关置于起动挡,偶尔只能听到“咔哒”一声,起动机没有反应,类似蓄电池亏电时起动发动机的现象。为此&…

优思学院|质量管理工作困难重重,为何仍有人乐此不疲?

有一位网友说道:质量管理工作困难重重,为何仍有人乐此不疲?我完全理解他的困惑,质量管理工作的确是个不容易的任务,充满挑战,需要不断的努力和耐心。尽管如此,很多人依然选择从事这个领域的工作…

学习c++的第二天

目录 数据类型 基本数据类型 typedef 声明 枚举类型 类型转换 变量类型 变量定义 变量声明 左值(Lvalues)和右值(Rvalues) 变量作用域 数据类型 基本数据类型 C 为程序员提供了种类丰富的内置数据类型和用户自定义的数…

uni-app 应对微信小程序最新隐私协议接口要求的处理方法

这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 一,问题起因 最新在开发小程序的时候,调用微信小程序来获取用户信息的时候经常报错一个问题 fail api scope is not declared in the privacy agreement,api更具公告…

Windows Server 2008安装

1.典型 2.稍后安装操作系统 3.Microsoft Windows 4.尽量虚拟机名称也改一下,我忘记改了 5. 默认40G差不多了,不用修改了 6.直接点完成 7.配置处理器和镜像 8.中文简体 9.现在安装 10.第一个就行(完全安装) 11.我接受&#xff0c…

HarmonyOS数据管理与应用数据持久化(一)

一. 数据管理概述 功能介绍 数据管理为开发者提供数据存储、数据管理能力,比如联系人应用数据可以保存到数据库中,提供数据库的安全、可靠等管理机制。 数据存储:提供通用数据持久化能力,根据数据特点,分为用户首选项、…

设计模式_状态模式

状态模式 介绍 设计模式定义案例问题堆积在哪里解决办法状态模式一个对象 状态可以发生改变 不同的状态又有不同的行为逻辑游戏角色 加载不同的技能 每个技能有不同的:攻击逻辑 攻击范围 动作等等1 状态很多 2 每个状态有自己的属性和逻辑每种状态单独写一个类 角色…

银河麒麟x86版、银河麒麟arm版操作系统编译zlmediakit

脚本 # 安装依赖 gcc-c.x86_64 这个不加的话会有问题 sudo yum -y install gcc gcc-c libssl-dev libsdl-dev libavcodec-dev libavutil-dev ffmpeg git openssl-devel gcc-c.x86_64mkdir -p /home/zenglg cd /home/zenglg git clone --depth 1 https://gitee.com/xia-chu…