如何使用Logstash搜集日志传输到es集群并使用kibana检测

引言:上一期我们进行了对Elasticsearch和kibana的部署,今天我们来解决如何使用Logstash搜集日志传输到es集群并使用kibana检测

目录

Logstash部署

1.安装配置Logstash

(1)安装

(2)测试文件

(3)配置

grok

1、手动输入日志数据

数据链路

2、手动输入数据,并存储到 es

数据链路

3、自定义日志1

数据链路

5、nginx access 日志

数据链路

6、nginx error日志

数据链路

7、filebate 传输给 logstash

filebeat 日志模板

Logstash部署

  • 服务器

安装软件主机名IP地址系统版本配置
LogstashElk10.12.153.71centos7.5.18042核4G
  • 软件版本:logstash-7.13.2.tar.gz

1.安装配置Logstash

Logstash运行同样依赖jdk,本次为节省资源,故将Logstash安装在了10.12.153.71节点。

(1)安装
tar zxf /usr/local/package/logstash-7.13.2.tar.gz -C /usr/local/
(2)测试文件

标准输入=>标准输出

1、启动logstash

2、logstash启动后,直接进行数据输入

3、logstash处理后,直接进行返回

input {
    stdin {}
}
output {
    stdout {
        codec => rubydebug
    }
}

标准输入=>标准输出及es集群

1、启动logstash

2、启动后直接在终端输入数据

3、数据会由logstash处理后返回并存储到es集群中

input {
    stdin {}
}
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
      hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]
      index => 'logstash-debug-%{+YYYY-MM-dd}'
    }
}

端口输入=>字段匹配=>标准输出及es集群

1、由tcp 的8888端口将日志发送到logstash

2、数据被grok进行正则匹配处理

3、处理后,数据将被打印到终端并存储到es

input {
    tcp {
        port => 8888
    }
}
filter {
    grok {
        match => {"message" => "%{DATA:key} %{NUMBER:value:int}"} 
            
    }
}
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
      hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]
      index => 'logstash-debug-%{+YYYY-MM-dd}'
    }
}
# yum install -y nc
# free -m |awk 'NF==2{print $1,$3}' |nc logstash_ip 8888

文件输入=>字段匹配及修改时间格式修改=>es集群

1、直接将本地的日志数据拉去到logstash当中

2、将日志进行处理后存储到es

input {
    file {
        type => "nginx-log"
        path => "/var/log/nginx/error.log"
        start_position => "beginning" # 此参数表示在第一次读取日志时从头读取
        # sincedb_path => "自定义位置"  # 此参数记录了读取日志的位置,默认在 data/plugins/inputs/file/.sincedb*
        
    }
}
filter {
    grok {
        match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'}    
    }    
    date {
        match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]    
    }    
}
​
output {
  if [type] == "nginx-log" {
        elasticsearch {
            hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]
            index => 'logstash-audit_log-%{+YYYY-MM-dd}'
      }
    }
  }


filebeat => 字段匹配 => 标准输出及es

input {
  beats {
    port => 5000
  }
}
filter {
    grok {
        match => {"message" => "%{IPV4:cip}"}   
    }
}
output {
        elasticsearch {
            hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"]
            index => 'test-%{+YYYY-MM-dd}'
      }
    stdout { codec => rubydebug }
}
(3)配置

创建目录,我们将所有input、filter、output配置文件全部放到该目录中。

mkdir -p /usr/local/logstash-7.13.2/etc/conf.d
vim /usr/local/logstash-7.13.2/etc/conf.d/input.conf
input { 
kafka {
    type => "audit_log"
    codec => "json"
    topics => "nginx"
    decorate_events => true
    bootstrap_servers => "10.12.153.71","10.12.153.72","10.12.153.133"
  }
}
​
vim /usr/local/logstash-7.13.2/etc/conf.d/filter.conf
filter {
    json { # 如果日志原格式是json的,需要用json插件处理
        source => "message"
        target => "nginx" # 组名
    }
}
​
vim /usr/local/logstash-7.13.2/etc/conf.d/output.conf
output {
  if [type] == "audit_log" {
      elasticsearch {
      hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]
      index => 'logstash-audit_log-%{+YYYY-MM-dd}'
      }
    }
  }

(3)启动

cd /usr/local/logstash-7.13.2
nohup bin/logstash -f etc/conf.d/  --config.reload.automatic &

grok

1、手动输入日志数据

一般为debug 方式,检测 ELK 集群是否健康,这种方法在 logstash 启动后可以直接手动数据数据,并将格式化后的数据打印出来。

数据链路

1、启动logstash

2、logstash启动后,直接进行数据输入

3、logstash处理后,直接进行返回

input {
    stdin {}
}
output {
    stdout {
        codec => rubydebug
    }
}

2、手动输入数据,并存储到 es

数据链路

1、启动logstash

2、启动后直接在终端输入数据

3、数据会由logstash处理后返回并存储到es集群中

input {
    stdin {}
}
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
      hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]
      index => 'logstash-debug-%{+YYYY-MM-dd}'
    }
}

3、自定义日志1

数据链路

1、由tcp 的8888端口将日志发送到logstash

2、数据被grok进行正则匹配处理

3、处理后,数据将被打印到终端并存储到es

input {
    tcp {
        port => 8888
    }
}
filter {
    grok {
        match => {"message" => "%{DATA:key} %{NUMBER:value:int}"}   
    }
}
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
      hosts => [""10.12.153.71","10.12.153.72","10.12.153.133""]
      index => 'logstash-debug-%{+YYYY-MM-dd}'
    }
}
# yum install -y nc
# free -m |awk 'NF==2{print $1,$3}' |nc logstash_ip 8888
​
4、自定义日志2
数据链路
1、由tcp 的8888端口将日志发送到logstash

2、数据被grok进行正则匹配处理

3、处理后,数据将被打印到终端

input {
    tcp {
        port => 8888
    }
}
filter {
    grok {
        match => {"message" => "%{WORD:username}\:%{WORD:passwd}\:%{INT:uid}\:%{INT:gid}\:%{DATA:describe}\:%{DATA:home}\:%{GREEDYDATA:shell}"}
            
    }
}
output {
    stdout {
        codec => rubydebug
    }
}
​
# cat /etc/passwd | nc logstash_ip 8888

5、nginx access 日志

数据链路

1、在filebeat配置文件中,指定kafka集群ip [output.kafka] 的指定topic当中

2、在logstash配置文件中,input区域内指定kafka接口,并指定集群ip和相应topic

3、logstash 配置filter 对数据进行清洗

4、将数据通过 output 存储到es指定index当中

5、kibana 添加es 索引,展示数据

input {
    kafka {
        type => "audit_log"
        codec => "json"
        topics => "haha"
        #decorate_events => true
        #enable_auto_commit => true
        auto_offset_reset => "earliest"
        bootstrap_servers => ["192.168.52.129:9092,192.168.52.130:9092,192.168.52.131:9092"]
      }
}
​
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG} %{QS:x_forwarded_for}"}    
    }    
    date {
        match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]    
    }    
    geoip {
        source => "lan_ip"    
    }
}
​
output {
  if [type] == "audit_log" {
        stdout {
                codec => rubydebug
        }
      elasticsearch {
      hosts => ["192.168.52.129","192.168.52.130","192.168.52.131"]
      index => 'tt-%{+YYYY-MM-dd}'
      }
    }
  }
​
  
  #filebeat 配置
  filebeat.prospectors:
- input_type: log
  paths:
    -  /opt/logs/server/nginx.log
  json.keys_under_root: true
  json.add_error_key: true
  json.message_key: log
​
output.kafka:   
  hosts: [""10.12.153.71","10.12.153.72","10.12.153.133""]
  topic: 'nginx'
​
  # nginx 配置
  log_format main        '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sents":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}';
    access_log  /var/log/nginx/access.log  main;
​

6、nginx error日志

数据链路

1、直接将本地的日志数据拉去到logstash当中

2、将日志进行处理后存储到es

input {
    file {
        type => "nginx-log"
        path => "/var/log/nginx/error.log"
        start_position => "beginning"
    }
}
filter {
    grok {
        match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'}    
    }    
    date {
        match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]    
    }    
}
​
output {
  if [type] == "nginx-log" {
        elasticsearch {
            hosts => [""10.12.153.71:9200","10.12.153.72:9200","10.12.153.133:9200""]
            index => 'logstash-audit_log-%{+YYYY-MM-dd}'
      }
    }
  }

7、filebate 传输给 logstash

input {
  beats {
    port => 5000
  }
}
filter {
    grok {
        match => {"message" => "%{IPV4:cip}"}   
    }
}
output {
        elasticsearch {
            hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"]
            index => 'test-%{+YYYY-MM-dd}'
      }
    stdout { codec => rubydebug }
}
​
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
output.logstash:
  hosts: ["192.168.52.134:5000"]

filebeat 日志模板

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
output.kafka:
  hosts: ["192.168.52.129:9092","192.168.52.130:9092","192.168.52.131:9092"]
  topic: haha
  partition.round_robin:
    reachable_only: true
  required_acks: 1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/416127.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高压高能碳陶瓷无感电阻的制作以及应用?

由于现有需求,许多现代电子电路和设备都会经历瞬态脉冲和浪涌。这反过来又导致需要“设计”瞬态浪涌保护,尤其是在电机控制器等电路中。当电机启动时,此时消耗的电流过大,可能导致电阻器故障。同样,如果电容器用于电机…

基于Python3的数据结构与算法 - 05 堆排序

目录 一、堆排序之树的基础知识 1. 树的定义 2. 树的一些概念 二、堆排序二叉树的基本知识 1. 二叉树的定义 2. 二叉树的存储方式(表达方式) 2.1 顺序存储方式 三、堆 1. 堆的定义 2. 堆的向下调整性质 四、堆排序的过程 1. 建造堆 五、时…

如何用好应用权限,保护隐私数据?银河麒麟桌面操作系统V10 SP1 2303 update2新功能解析

为您介绍银河麒麟桌面操作系统V10 SP1 2303 update2隐私设置和权限管理功能,为您的个人数据安全保驾护航。 说到个人数据隐私,在科技重塑生活本质的数字世界,个人信息遭受持续威胁。2018年,某国际知名社交平台因安全系统漏洞而遭…

CSS:弹性盒子Flexible Box布局

CSS:Flexible Box弹性盒子布局 一、flex布局原理 ​ flex是flexible Box的缩写,意为 ”弹性布局“,用来为盒状模型提供最大的灵活性,任何一个容器都可以指定为flex布局。 当我们的父盒子设置为flex布局之后,子元素的 float 、clear 和 vert…

抖音无水印视频关键词批量下载操作说明|视频批量采集工具

抖音无水印视频关键词批量下载工具是一款便捷实用的软件,通过关键词搜索功能可以轻松进行视频批量下载。QQ:290615413 以下是操作步骤及功能介绍: 打开软件后进入关键词搜索页面: 在软件中找到第一个选项卡,即为关键词搜索功能。 …

JOISC2022 复制粘贴(区间DP,字符串hash)

题目描述 题面 分析 这道题考场没有任何头绪,赛后也是看了许多题解才明白状态设计和转移的一步步思考过程。 首先我们需要想到 无论是屏幕上的字符串,还是剪切板上的字符串,在任何时候都必须是目标串的子串。这个非常好像,如果不…

视频汇聚/存储/压缩/诊断平台EasyCVR视频联网整合方案应用特点

随着科技的不断发展,监控视频在各个领域的应用越来越广泛。为了更好地管理和利用这些视频资源,视频联网与整合的需求也越来越多。通过视频联网技术将不同地理位置或不同设备的视频资源进行整合,实现实时共享和集中管理。视频联网整合方案的应…

快速创建百度百科,打造专属品牌词条

本文迅推客传媒将为小白详细讲解如何创建百度百科,并提供详细的教程。 创建百度百科账号 请先注册一个。注册完成后,登录百度账号。在搜索框中输入“百度百科”,进入百度百科官网。 选择创建词条类型 根据自己的需要选择相应的分类。例如&…

Outlook邮箱IMAP怎么开启?服务器怎么填?

Outlook邮箱IMAP服务器如何开启?Outlook设置IMAP的方法? Outlook邮箱作为其中的佼佼者,被广大用户所青睐。但在使用Outlook邮箱时,许多用户可能会碰到一个问题:如何开启IMAP服务?下面,蜂邮EDM就…

学习大数据,所必需的java基础(6)

文章目录 集合Set集合介绍HashSet集合的介绍和使用LinkedHashSet的介绍以及使用哈希值哈希值的计算方式HashSet的存储去重的过程 Map集合Map的介绍HashMap的介绍以及使用HashMap的两种遍历方式方式1:获取key,然后再根据key获取value方式2:同时…

trie树(前缀树)

前缀树 1. 前缀树的的介绍2.前缀树的实现2.1插入功能2.2删除功能2.3查找前缀和查找单词功能2.4 哈希表版本 1. 前缀树的的介绍 在计算机科学中,trie,又称前缀树或字典树,是一种有序树,用于保存关联数组,其中的键通常是…

《Spring Security 简易速速上手小册》第1章 Spring Security 概述(2024 最新版)

文章目录 1.1 Spring Security 的重要性1.1.1 基础知识详解1.1.2 主要案例:用户认证与授权1.1.3 拓展案例 1:OAuth2 社交登录1.1.4 拓展案例 2:JWT 认证 1.2 Spring Security 的核心特性1.2.1 基础知识详解1.2.2 主要案例:基于角色…

11.网络游戏逆向分析与漏洞攻防-游戏网络架构逆向分析-接管游戏接收网络数据包的操作

内容参考于:易道云信息技术研究院VIP课 上一个内容:接管游戏发送数据的操作 码云地址(master 分支):https://gitee.com/dye_your_fingers/titan 码云版本号:8256eb53e8c16281bc1a29cb8d26d352bb5bbf4c 代…

Android Duplicate class 排除重复类

一、起因: 在迭代开发的时候,发现2个ijk很多类重复。但又2个库实现的功能是不一样,目前不能合并。但又想保留2个功能。需要排除其中一个库。 二、报错如何下图: 三、解决方法: 3.1 在terminal 也就是命令行处输入 …

js 面试 什么是WebSockets?HTTP和HTTPS有什么不同?web worker是什么?

概念: webSocket 是一种在客户端和服务端之间建立持久连接的协议,它提供全双工通信通道,是服务器可以主动向客户端推送数据,同时也可以接受客户端发送的数据。 1 webSocket与https区别? 在网络通信中,We…

【mysql版本修改】

1、使用telnet确认当前mysql版本号 telnet <MySQL服务器IP地址> <MySQL端口号> telnet 192.168.38.20 33062、使用strings查看/usr/sbin/mysqld中包含版本号的字符串 # 查看/usr/sbin/mysqld文件中是否包含对应的版本号 strings /usr/sbin/mysqld | grep 5.7.30 …

Unity | 动态读取C#程序集实现热更新

目录 一、动态语言 二、创建C#dll 1.VS中创建一个C#语言的库工程 2.添加UnityEngine.dll的依赖 3.编写代码&#xff0c;生成dll 三、Unity使用dll 一、动态语言 计算机编程语言可以根据它们如何将源代码转换为可以执行的代码来分类为静态语言和动态语言。 静态语言&…

Spark Bloom Filter Join

1 综述 1.1 目的 Bloom Filter Join&#xff0c;或者说Row-level Runtime Filtering&#xff08;还额外有一条Semi-Join分支&#xff09;&#xff0c;是Spark 3.3对运行时过滤的一个最新补充   之前运行时过滤主要有两个&#xff1a;动态分区裁剪DPP&#xff08;开源实现&am…

ISO_IEC_18598-2016自动化基础设施管理(AIM)系统国际标准解读(一)

██ ISO_IEC_18598-2016是什么标准&#xff1f; ISO/IEC 18598国际标准是由ISO&#xff08;国际标准化组织&#xff09;/IEC&#xff08;国际电工委员会&#xff09;联合技术委员会1-信息技术的第25分委员会-信息技术设备互连小组制定的关于信息基础设施自动化管理的国际标准&…

C++中atomic的使用

atomic使用 概述介绍使用场景头文件atomic的使用创建load()store()exchange()compare_exchange_weakcompare_exchange_strong&#xff08;&#xff09;fetch_add()fetch_sub()fetch_and()fetch_or()fetch_xor() 示例实现代码运行结果 概述 本文只要讲述C11中atomic的使用&…
最新文章