【工具类】正则表达式过滤器(过滤日志、过滤文件)

1. 【工具类】正则表达式过滤器(过滤日志、过滤文件)

  • 1. 【工具类】正则表达式过滤器(过滤日志、过滤文件)
    • 1.1. 划重点
    • 1.2. 参数说明
    • 1.3. 正则表达式 regular.json 内容如下
    • 1.4. 举例
    • 1.5. 代码说明

1.1. 划重点

功能: python实现的支持对文件进行正则表达式过滤,不同的过滤模板,维护不同的正则表达式文件即可,方便跨平台通用

  • 编写自己的正则表达式,主要填写 regexp 字段,并保存为 regular.json 文件,格式如下
[
{"id": "","regexp": ".*hello world.*","ignore":0,"add":"","time_id":""},
{"id": "","regexp": "^my name is knowledgebao.*","ignore":0,"add":"","time_id":""}
]
  • 将下边python代码保存为 filter_file.py 文件,准备要过滤的文件 test.log
  • 执行 python3 filter_file.py -i test.log -r regular.json -o output.log 其中 output.log 是过滤后的文件
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import re
import argparse
import os
import logging
import json
import uuid
import csv
from datetime import datetime

logging.basicConfig(format='[%(asctime)s.%(msecs)03d] [%(levelname).1s] [%(filename)s:%(lineno)d] %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)

#  日志级别设置


def set_log_level(level):
    log_levels = {
        0: logging.DEBUG,
        1: logging.INFO,
        2: logging.WARNING,
        3: logging.ERROR,
        4: logging.CRITICAL
    }
    if level in log_levels:
        logger.setLevel(log_levels[level])


# 正则表达式解析结构体,对应 json 文件中的一条记录
class regexp_info:
    def __init__(self):
        self.id = None              #
        self.ignore = None          #
        self.add = None             # 额外添加在日志前边的字符串
        self.regexp_text = None     # 原始正则表达式内容
        self.regexp = None          # 编译后的正则表达式句柄
        self.time_id = None         #

        self.time = None

    def __str__(self) -> str:
        return f"id: {self.id}, ignore: {self.ignore}, add: {self.add}, time_id: {self.time_id} regexp: {self.regexp_text}"


# 常用正则表达式
class regexp_utils:
    pattern_times = [
        # ... [0411 15:46:57.447] log info
        [re.compile(r".*\[(\d{4} \d{2}:\d{2}:\d{2}\.\d{3})\]"),
         "%m%d %H:%M:%S.%f"],
        # ... [2024-04-26 12:00:37.125 D|E|I|W] log info
        [re.compile(
            r".*\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) [D|E|I|W]\]"), "%Y-%m-%d %H:%M:%S.%f"]
    ]

# 日志解析类,过滤日志文件


class filter_file:
    def __init__(self):
        self.regexps = {}
        self.pattern_time = None
        self.f = None
        self.all_line = 0
        self.process_line = 0
        self.valid_line = 0
        self.valid_times = []

    # 解析正则表达式
    def parse_regexp(self, regexp_path, regexp_list, output, time_info):
        self.pattern_time = time_info[0]
        self.format_time = time_info[1]
        if regexp_path and os.path.exists(regexp_path):
            with open(regexp_path, 'r') as f:
                data = json.load(f)
                for val in data:
                    info = regexp_info()
                    info.id = val.get("id")
                    info.ignore = val.get("ignore")
                    # .replace("\\n", "\n").replace("\\t", "\t")
                    info.add = val.get("add")
                    info.regexp_text = val.get("regexp")
                    if info.regexp_text is None or info.regexp_text == "":
                        continue
                    try:
                        info.regexp = re.compile(info.regexp_text)
                    except Exception as e:
                        logger.error(f"regexp: {info.regexp_text} error: {e}")
                        raise e
                    info.time_id = val.get("time_id")
                    self.regexps[info.id] = info
                    logger.info(f"{info}")
        if regexp_list:
            for regexp in regexp_list:
                if regexp is None or regexp == "":
                    continue
                info = regexp_info()
                info.id = uuid.uuid4().hex
                info.regexp_text = ".*"+regexp+".*"
                try:
                    info.regexp = re.compile(info.regexp_text)
                except Exception as e:
                    logger.error(f"regexp: {info.regexp_text} error: {e}")
                    raise e
                self.regexps[info.id] = info
                logger.info(f"{info}")

        if output:
            self.f = open(output, 'w+')

    def __write_file(self, text):
        if self.f and text and text != "":
            self.f.write(text)

    # return t1-t2
    def __get_time_delta(self, t1, t2):
        if t1 is None or t2 is None:
            return None
        if not (isinstance(t1, datetime) and isinstance(t2, datetime)):
            return None

        delta_time = str(t1 - t2)
        if len(delta_time) < 14:
            delta_time += "."
            delta_time += "0" * (14-len(delta_time))
        return delta_time

    def __process_time(self, line, regexp):
        time = None
        if self.pattern_time is None:
            return time
        date_time = self.pattern_time.findall(line)
        if len(date_time) > 0:
            time = datetime.strptime(date_time[0], self.format_time)
            if regexp.time_id and regexp.time_id in self.regexps and self.regexps[regexp.time_id].time:
                delta = self.__get_time_delta(
                    time, self.regexps[regexp.time_id].time)
                logger.debug(
                    f"{delta}, {time}, {self.regexps[regexp.time_id].time}")
                self.__write_file(f"{delta}-")
                self.valid_times.append(delta)
            if regexp.add:
                self.valid_times.append(f"begin:{time}")
        return time

    def __parse_line(self, line):
        self.valid_line += 1
        # logger.debug(f"beg process: {line}")
        for regexp in self.regexps.values():
            # logger.debug(f"regexp: {regexp}")
            if regexp.ignore or regexp.regexp is None or regexp.regexp_text == "":
                continue

            if regexp.regexp.search(line):
                logger.debug(f"{regexp.id}: {line}")
                if self.f:
                    # 打印额外信息
                    self.__write_file(regexp.add)
                    # 获取和打印时间差
                    regexp.time = self.__process_time(line, regexp)
                    # 打印日志内容
                    self.__write_file(line)
                    self.process_line += 1
                    break

    def parses(self, file_path):
        for line in open(file_path, "rb"):
            self.all_line += 1
            try:
                line = line.decode("utf8", errors="replace")
                if line and line != "":
                    self.__parse_line(line)
            except Exception as e:
                logger.warning(f"parse lines {self.all_line} failed: {e}")
            # if self.all_line > 10:  # for test
            #     break

    def print_result(self, to_csv, csv_name):
        logger.info(
            f"all line: {self.all_line}, decode line: {self.valid_line}, process line: {self.process_line}")

        write = None
        if self.valid_times:
            if to_csv:
                f = open(csv_name, 'w', newline='')
                write = csv.writer(f)
            item_time_deltes = []
            for val in self.valid_times:
                if val.startswith("begin"):
                    if item_time_deltes and write:
                        write.writerow(item_time_deltes)
                    item_time_deltes = []
                    item_time_deltes.append(val)
                    print(f"\n{val}",end=" ")
                    continue
                item_time_deltes.append(val)
                print(val,end=" ")
            if item_time_deltes and write:
                write.writerow(item_time_deltes)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.description = 'please enter correct para'
    parser.add_argument("-i", "--file", help="log file", type=str)
    parser.add_argument("-r", "--regular_file",
                        help="regular json file", type=str)
    parser.add_argument("-rl", "--regular_list",
                        help="regular text, support many", type=str, nargs='+')
    parser.add_argument("-o", "--output", help="log output file",
                        type=str, default="output.log")
    parser.add_argument(
        "-l", "--log_level", help="0-4, debug,info,warning,error,critical", type=int, default=1)
    parser.add_argument("-t", "--time_format",
                        help="0: [0411 15:46:57.447], 1:[2024-04-26 12:00:37.125 D|E|I|W]", type=int, default=0)
    parser.add_argument("-oc", "--output_csv", help="del_time output to csv file", type=int, default=0)

    args = parser.parse_args()

    logger.info(f"log level {args.log_level}")
    logger.info(f"input file {args.file}")
    logger.info(f"output file {args.output}")
    logger.info(f"regular file {args.regular_file}")
    logger.info(f"regular_list {args.regular_list}")

    set_log_level(args.log_level)

    if not os.path.exists(args.file):
        logger.error(f"input log file {args.file} not exist")
        exit(1)

    log_process = filter_file()
    try:
        log_process.parse_regexp(args.regular_file, args.regular_list,
                                 args.output, regexp_utils.pattern_times[args.time_format])
        logger.info(f"begin parse {args.file}")
        logger.info(f"==================================")
        log_process.parses(args.file)
        log_process.print_result(to_csv=args.output_csv, csv_name=f"{args.output}.csv")
    except Exception as e:
        logger.error(f"parse log file {args.file} failed: {e}")

1.2. 参数说明

usage: filter_file.py [-h] [-i FILE] [-r REGULAR_FILE] [-rl REGULAR_LIST [REGULAR_LIST ...]] [-o OUTPUT] [-l LOG_LEVEL] [-t TIME_FORMAT]

please enter correct para

optional arguments:
  -h, --help            show this help message and exit
  -i FILE, --file FILE  log file
  -r REGULAR_FILE, --regular_file REGULAR_FILE
                        regular json file
  -rl REGULAR_LIST [REGULAR_LIST ...], --regular_list REGULAR_LIST [REGULAR_LIST ...]
                        regular text, support many
  -o OUTPUT, --output OUTPUT
                        log output file
  -l LOG_LEVEL, --log_level LOG_LEVEL
                        0-4, debug,info,warning,error,critical
  -t TIME_FORMAT, --time_format TIME_FORMAT
                        0: [0411 15:46:57.447], 1:[2024-04-26 12:00:37.125 D|E|I|W]
  • -i 原始待处理文件
  • -r regular_file,指定正则表达式文件
  • -rl regular_list, 指定正则表达式字符串,可以指定多个, 与 -r 类似,直接命令行输入正则表达式
  • -o 指定输出文件,默认是 output.log
  • -l 日志级别,默认是info
  • -t 指定时间格式,默认是0,表示 [0411 15:46:57.447]

1.3. 正则表达式 regular.json 内容如下

[
{"id": "xxx","regexp": "xxxx","ignore":0,"add":"","time_id":""}
]
  • id 用来定义唯一标识,可用来关联其他条目,目前可以计算时间差
  • regexp 是正则表达式,用于过滤有效日志
  • ignore 表示是否忽略该行,0表示不忽略,1表示忽略
  • add 表示添加的字符串,比如添加换行符,添加在对应日志的前边
  • time_id 表示与哪个 id 关联,目前主要是用来计算时间差

1.4. 举例

  • 举例 python3 filter_file.py -i test.log -o output.log -r regular.json

reqular.json 内容如下

[
{"id": "001","regexp": ".*0000000000000.*","ignore":0,"add":"\n","time_id":""},
{"id": "002","regexp": ".*222222222.*","ignore":0,"add":"","time_id":""},
{"id": "003","regexp": ".*333333333.*","ignore":0,"add":"","time_id":"001"},
{"id": "004","regexp": ".*555555555.*","ignore":0,"add":"","time_id":""},
{"id": "005","regexp": ".*777777777.*","ignore":1,"add":"","time_id":""},
{"id": "006","regexp": ".*999999999.*","ignore":0,"add":"","time_id":"001"}
]

test.log 内容如下

[15:47;28.931][D][0411 15:43:21.040] log 0000000000000 ppfejf
[15:47;28.931][W][0411 15:43:21.040] log 1111111111111ppfejf
[15:47;28.931][I][0411 15:43:22.040] log 2222222222222ppfejf
[15:47;28.931][E][0411 15:43:23.040] log 33333333333333ppfejf
[15:47;28.931][D][0411 15:43:24.040] log 444444444444444ppfejf
[15:47;28.931][W][0411 15:43:24.040] log 555555555555555ppfejf
[15:47;28.931][I][0411 15:43:24.040] log 666666666666666fejf
[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
[15:47;28.931][D][0411 15:43:24.040] log 7777777777777777ppfejf
[15:47;28.931][W][0411 15:43:24.040] log 3333333333333ppfejf
[15:47;28.931][I][0411 15:43:24.040] log 888888888888888ppfejf
[15:47;28.931][E][0411 15:43:24.040] log 999999999999999ppfejf
[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf

output.log 输出结果如下


[15:47;28.931][D][0411 15:43:21.040] log 0000000000000 ppfejf
[15:47;28.931][I][0411 15:43:22.040] log 2222222222222ppfejf
0:00:02.000000_[15:47;28.931][E][0411 15:43:23.040] log 33333333333333ppfejf
[15:47;28.931][W][0411 15:43:24.040] log 555555555555555ppfejf
0:00:03.000000_[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
0:00:03.000000_[15:47;28.931][W][0411 15:43:24.040] log 3333333333333ppfejf
0:00:03.000000_[15:47;28.931][E][0411 15:43:24.040] log 999999999999999ppfejf
0:00:03.000000_[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf

1.5. 代码说明

  • set_log_level 用来设置日志级别
  • regexp_info 类,用来解析正则表达式
  • regexp_utils 类,常用正则表达式,比如解析获取文本中的时间等
  • filter_file 类,用来解析正则表达式,读文件,过滤每一行,还可以计算时间差

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/586356.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JAVA前端快速入门基础_javascript入门(02)

写在前面:本文用于快速学会简易的JS&#xff0c;仅做扫盲和参考作用 1.JavaScript函数 什么是函数:执行特定任务的代码块 1.1定义&#xff1a; 使用function来进行定义(类似于python里面的def 或者java和c里面的void&#xff0c;int这些返回类型开头)。定义规则如下: func…

【17】JAVASE-集合专题【从零开始学JAVA】

Java零基础系列课程-JavaSE基础篇 Lecture&#xff1a;波哥 Java 是第一大编程语言和开发平台。它有助于企业降低成本、缩短开发周期、推动创新以及改善应用服务。如今全球有数百万开发人员运行着超过 51 亿个 Java 虚拟机&#xff0c;Java 仍是企业和开发人员的首选开发平台。…

【linuxC语言】进程概念与fork

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、进程的概念二、进程基本函数2.1 fork函数2.2 getpid与getppid函数 三、示例代码总结 前言 在 Linux 系统编程中&#xff0c;进程是计算机中正在执行的程序…

【Spring基础】关于Spring IoC的那些事

文章目录 一、如何理解IoC1.1 Spring IOC 概述1.2 IoC 是什么 二、Ioc 配置的方式2.1 xml 配置2.2 Java 配置2.3 注解配置 三、依赖注入的方式3.1 setter方式3.2 构造函数3.3 注解注入 小结 一、如何理解IoC 1.1 Spring IOC 概述 控制反转 IoC(Inversion of Control)是一种设计…

分辨率与像素

一 概念 分辨率: 分辨率指的是图像或显示器屏幕上可见的像素数量&#xff0c;通常以水平像素数和垂直像素数表示。例如&#xff0c;一个分辨率为1920x1080的屏幕意味着在水平方向上有1920个像素&#xff0c;在垂直方向上有1080个像素。分辨率决定了图像或屏幕上能够显示的细节…

神经网络反向传播算法

今天我们来看一下神经网络中的反向传播算法&#xff0c;之前介绍了梯度下降与正向传播~ 神经网络的反向传播 专栏&#xff1a;&#x1f48e;实战PyTorch&#x1f48e; 反向传播算法&#xff08;Back Propagation&#xff0c;简称BP&#xff09;是一种用于训练神经网络的算…

qt5-入门-2D绘图-Graphics View 架构

参考&#xff1a; Qt Graphics View Framework_w3cschool https://www.w3cschool.cn/learnroadqt/4mvj1j53.html C GUI Programming with Qt 4, Second Edition 本地环境&#xff1a; win10专业版&#xff0c;64位&#xff0c;Qt 5.12 基础知识 QPainter比较适合少量绘图的情…

蓝桥杯如何准备国赛?

目录 一、赛前准备 1、如何刷题&#xff0c;刷哪些题&#xff1f; 2、记录&#xff08;主要看个人习惯&#xff09; CSDN博客 写注释 3、暴力骗分 4、从出题人的角度出发&#xff0c;应该如何骗分 二、赛中注意事项 一、赛前准备 1、如何刷题&#xff0c;刷哪些题&…

Ubuntu 24.04安装搜狗输入法-解决闪屏问题

问题描述 在Ubuntu 24.04 LTS系统中按照官方安装指导《Ubuntu20.04安装搜狗输入法步骤》安装搜狗输入法后&#xff1a; 会出现屏幕闪烁&#xff0c;无法正常使用的问题&#xff1b;系统搜索框和gnome-text-editor无法使用搜狗输入法&#xff1b; 原因分析 闪屏可能是Ubuntu…

ESP32-C3第二路串口(非调试)串口打通(1)

1. 概述与引脚复用 《ESP32-C3 系列芯片技术规格书》中提到&#xff0c;ESP32-C3系列芯片中有两路串口。 第1路串口就是常用的调试串口&#xff0c;在笔者使用的ESP32-C3-DevKitC-02开发板中&#xff0c;这一路串口通过CP2102 USB转UART桥芯片与电脑的USB口相连接&#xff0c;…

c4d渲染动画只能渲染1帧怎么回事?c4d云渲染解决1秒停止

当您在C4D中尝试渲染动画时&#xff0c;如果只渲染出了一个静止的帧&#xff0c;这通常意味着您的设置中存在一些问题。动画本身是由一系列连续的静态图像&#xff08;帧&#xff09;组成的&#xff0c;如果只生成了一帧&#xff0c;那么显然是渲染设置出现了错误。为了解决这个…

如何利用快解析远程访问NAS、FTP、Web服务

什么是内网、外网&#xff1f; 所谓内网就是内部建立的局域网络或办公网络。一家公司或一个家庭有多台计算机&#xff0c;他们利用不同网络布局将这一台或多台计算机或其它设备连接起来构成一个局部的办公或者资源共享网络&#xff0c;我们就称它为内部网络&#xff0c;也叫内…

微服务之SpringCloud AlibabaSeata处理分布式事务

一、概述 1.1背景 一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用&#xff0c;就会产生分布式事务问题 but 关系型数据库提供的能力是基于单机事务的&#xff0c;一旦遇到分布式事务场景&#xff0c;就需要通过更多其他技术手段来解决问题。 全局事务&#xff1a;…

计算机网络4——网络层4内部路由选择协议

文章目录 一、有关路由选择协议的几个基本概念1、理想的路由算法2、分层次的路由选择协议 二、内部网关协议 RIP1、协议 RIP 的工作原理2、特点3、距离向量算法4、坏消息传播慢 三、内部网关协议 OSPF1、基本特点2、OSPF 的五种分组类型 本节将讨论几种常用的路由选择协议&…

【Mac】mac 安装 prometheus 报错 prometheus: prometheus: cannot execute binary file

1、官网下载 Download | Prometheus 这里下载的是prometheus-2.51.2.linux-amd64.tar.gz 2、现象 解压之后启动Prometheus 启动脚本&#xff1a; nohup ./prometheus --config.fileprometheus.yml > prometheus.out 2>&1 & prometheus.out日志文件&#xff…

【C++】:类和对象(下)

目录 一&#xff0c;再谈构造函数1.初始化列表2. 隐式类型转换的过程及其优化3. 隐式类型转换的使用4. explcit关键字5. 单参数和多参数构造函数的隐式类型转换 二&#xff0c;static成员1.静态成员变量2.静态成员函数 三&#xff0c;友元3.1 友元函数3.2 友元类 四&#xff0c…

Vue ui 创建vue项目,详细使用攻略。

1.安装及启动 1.1 Vue ui 使用前提是全局安装vue.js 命令如下 npm install vue -g 1.2 安装过Vue.js 之后 随便在自己系统的一个地方打开命令面板 1.3 使用命令启动vue ui面板创建项目 vue ui 如图运行后显示这种就是启动成功&#xff0c;成功之后会弹出页面或者直接访问你的…

QT5制做两个独立窗口

目录 增加第二个窗口 主窗口文件添加一个私有成员为子窗口 定义两个槽函数和 关联按钮和子窗口和主窗口 添加子窗口成员 子窗口处理函数 补充回顾 增加第二个窗口 1、 2、 3 主窗口文件添加一个私有成员为子窗口 在mainwidget.h文件 同时添加两个槽&#xff1b;来处理…

Visual studio 2019 编程控制CH341A芯片的USB设备

1、硬件 买了个USB可转IIC、或SPI、或UART的设备&#xff0c;主芯片是CH341A 主要说明USB转SPI的应用&#xff0c;绿色跳线帽选择IIC&SPI&#xff0c;用到CS0、SCK、MOSI、MISO这4个引脚 2、软件 2.1、下载CH341A的驱动 点CH341A官网https://www.wch.cn/downloads/CH34…

人工智能工具的强大之处:我用过的最好用的AI工具

人工智能工具的强大之处&#xff1a;我用过的最好用的AI工具 在当今科技迅速发展的时代&#xff0c;人工智能(AI)工具已经成为我们日常生活和工作中不可或缺的一部分。从语音助手到自动化内容创建工具&#xff0c;再到数据分析软件&#xff0c;AI的应用领域广泛且深远。本篇博…
最新文章