1. 【工具类】正则表达式过滤器(过滤日志、过滤文件)
- 1. 【工具类】正则表达式过滤器(过滤日志、过滤文件)
- 1.1. 划重点
- 1.2. 参数说明
- 1.3. 正则表达式 regular.json 内容如下
- 1.4. 举例
- 1.5. 代码说明
1.1. 划重点
功能: python实现的支持对文件进行正则表达式过滤,不同的过滤模板,维护不同的正则表达式文件即可,方便跨平台通用
- 编写自己的正则表达式,主要填写 regexp 字段,并保存为
regular.json
文件,格式如下
[
{"id": "","regexp": ".*hello world.*","ignore":0,"add":"","time_id":""},
{"id": "","regexp": "^my name is knowledgebao.*","ignore":0,"add":"","time_id":""}
]
- 将下边python代码保存为
filter_file.py
文件,准备要过滤的文件test.log
- 执行
python3 filter_file.py -i test.log -r regular.json -o output.log
其中 output.log 是过滤后的文件
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import argparse
import os
import logging
import json
import uuid
import csv
from datetime import datetime
logging.basicConfig(format='[%(asctime)s.%(msecs)03d] [%(levelname).1s] [%(filename)s:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)
# 日志级别设置
def set_log_level(level):
log_levels = {
0: logging.DEBUG,
1: logging.INFO,
2: logging.WARNING,
3: logging.ERROR,
4: logging.CRITICAL
}
if level in log_levels:
logger.setLevel(log_levels[level])
# 正则表达式解析结构体,对应 json 文件中的一条记录
class regexp_info:
def __init__(self):
self.id = None #
self.ignore = None #
self.add = None # 额外添加在日志前边的字符串
self.regexp_text = None # 原始正则表达式内容
self.regexp = None # 编译后的正则表达式句柄
self.time_id = None #
self.time = None
def __str__(self) -> str:
return f"id: {self.id}, ignore: {self.ignore}, add: {self.add}, time_id: {self.time_id} regexp: {self.regexp_text}"
# 常用正则表达式
class regexp_utils:
pattern_times = [
# ... [0411 15:46:57.447] log info
[re.compile(r".*\[(\d{4} \d{2}:\d{2}:\d{2}\.\d{3})\]"),
"%m%d %H:%M:%S.%f"],
# ... [2024-04-26 12:00:37.125 D|E|I|W] log info
[re.compile(
r".*\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) [D|E|I|W]\]"), "%Y-%m-%d %H:%M:%S.%f"]
]
# 日志解析类,过滤日志文件
class filter_file:
def __init__(self):
self.regexps = {}
self.pattern_time = None
self.f = None
self.all_line = 0
self.process_line = 0
self.valid_line = 0
self.valid_times = []
# 解析正则表达式
def parse_regexp(self, regexp_path, regexp_list, output, time_info):
self.pattern_time = time_info[0]
self.format_time = time_info[1]
if regexp_path and os.path.exists(regexp_path):
with open(regexp_path, 'r') as f:
data = json.load(f)
for val in data:
info = regexp_info()
info.id = val.get("id")
info.ignore = val.get("ignore")
# .replace("\\n", "\n").replace("\\t", "\t")
info.add = val.get("add")
info.regexp_text = val.get("regexp")
if info.regexp_text is None or info.regexp_text == "":
continue
try:
info.regexp = re.compile(info.regexp_text)
except Exception as e:
logger.error(f"regexp: {info.regexp_text} error: {e}")
raise e
info.time_id = val.get("time_id")
self.regexps[info.id] = info
logger.info(f"{info}")
if regexp_list:
for regexp in regexp_list:
if regexp is None or regexp == "":
continue
info = regexp_info()
info.id = uuid.uuid4().hex
info.regexp_text = ".*"+regexp+".*"
try:
info.regexp = re.compile(info.regexp_text)
except Exception as e:
logger.error(f"regexp: {info.regexp_text} error: {e}")
raise e
self.regexps[info.id] = info
logger.info(f"{info}")
if output:
self.f = open(output, 'w+')
def __write_file(self, text):
if self.f and text and text != "":
self.f.write(text)
# return t1-t2
def __get_time_delta(self, t1, t2):
if t1 is None or t2 is None:
return None
if not (isinstance(t1, datetime) and isinstance(t2, datetime)):
return None
delta_time = str(t1 - t2)
if len(delta_time) < 14:
delta_time += "."
delta_time += "0" * (14-len(delta_time))
return delta_time
def __process_time(self, line, regexp):
time = None
if self.pattern_time is None:
return time
date_time = self.pattern_time.findall(line)
if len(date_time) > 0:
time = datetime.strptime(date_time[0], self.format_time)
if regexp.time_id and regexp.time_id in self.regexps and self.regexps[regexp.time_id].time:
delta = self.__get_time_delta(
time, self.regexps[regexp.time_id].time)
logger.debug(
f"{delta}, {time}, {self.regexps[regexp.time_id].time}")
self.__write_file(f"{delta}-")
self.valid_times.append(delta)
if regexp.add:
self.valid_times.append(f"begin:{time}")
return time
def __parse_line(self, line):
self.valid_line += 1
# logger.debug(f"beg process: {line}")
for regexp in self.regexps.values():
# logger.debug(f"regexp: {regexp}")
if regexp.ignore or regexp.regexp is None or regexp.regexp_text == "":
continue
if regexp.regexp.search(line):
logger.debug(f"{regexp.id}: {line}")
if self.f:
# 打印额外信息
self.__write_file(regexp.add)
# 获取和打印时间差
regexp.time = self.__process_time(line, regexp)
# 打印日志内容
self.__write_file(line)
self.process_line += 1
break
def parses(self, file_path):
for line in open(file_path, "rb"):
self.all_line += 1
try:
line = line.decode("utf8", errors="replace")
if line and line != "":
self.__parse_line(line)
except Exception as e:
logger.warning(f"parse lines {self.all_line} failed: {e}")
# if self.all_line > 10: # for test
# break
def print_result(self, to_csv, csv_name):
logger.info(
f"all line: {self.all_line}, decode line: {self.valid_line}, process line: {self.process_line}")
write = None
if self.valid_times:
if to_csv:
f = open(csv_name, 'w', newline='')
write = csv.writer(f)
item_time_deltes = []
for val in self.valid_times:
if val.startswith("begin"):
if item_time_deltes and write:
write.writerow(item_time_deltes)
item_time_deltes = []
item_time_deltes.append(val)
print(f"\n{val}",end=" ")
continue
item_time_deltes.append(val)
print(val,end=" ")
if item_time_deltes and write:
write.writerow(item_time_deltes)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.description = 'please enter correct para'
parser.add_argument("-i", "--file", help="log file", type=str)
parser.add_argument("-r", "--regular_file",
help="regular json file", type=str)
parser.add_argument("-rl", "--regular_list",
help="regular text, support many", type=str, nargs='+')
parser.add_argument("-o", "--output", help="log output file",
type=str, default="output.log")
parser.add_argument(
"-l", "--log_level", help="0-4, debug,info,warning,error,critical", type=int, default=1)
parser.add_argument("-t", "--time_format",
help="0: [0411 15:46:57.447], 1:[2024-04-26 12:00:37.125 D|E|I|W]", type=int, default=0)
parser.add_argument("-oc", "--output_csv", help="del_time output to csv file", type=int, default=0)
args = parser.parse_args()
logger.info(f"log level {args.log_level}")
logger.info(f"input file {args.file}")
logger.info(f"output file {args.output}")
logger.info(f"regular file {args.regular_file}")
logger.info(f"regular_list {args.regular_list}")
set_log_level(args.log_level)
if not os.path.exists(args.file):
logger.error(f"input log file {args.file} not exist")
exit(1)
log_process = filter_file()
try:
log_process.parse_regexp(args.regular_file, args.regular_list,
args.output, regexp_utils.pattern_times[args.time_format])
logger.info(f"begin parse {args.file}")
logger.info(f"==================================")
log_process.parses(args.file)
log_process.print_result(to_csv=args.output_csv, csv_name=f"{args.output}.csv")
except Exception as e:
logger.error(f"parse log file {args.file} failed: {e}")
1.2. 参数说明
usage: filter_file.py [-h] [-i FILE] [-r REGULAR_FILE] [-rl REGULAR_LIST [REGULAR_LIST ...]] [-o OUTPUT] [-l LOG_LEVEL] [-t TIME_FORMAT]
please enter correct para
optional arguments:
-h, --help show this help message and exit
-i FILE, --file FILE log file
-r REGULAR_FILE, --regular_file REGULAR_FILE
regular json file
-rl REGULAR_LIST [REGULAR_LIST ...], --regular_list REGULAR_LIST [REGULAR_LIST ...]
regular text, support many
-o OUTPUT, --output OUTPUT
log output file
-l LOG_LEVEL, --log_level LOG_LEVEL
0-4, debug,info,warning,error,critical
-t TIME_FORMAT, --time_format TIME_FORMAT
0: [0411 15:46:57.447], 1:[2024-04-26 12:00:37.125 D|E|I|W]
- -i 原始待处理文件
- -r regular_file,指定正则表达式文件
- -rl regular_list, 指定正则表达式字符串,可以指定多个, 与 -r 类似,直接命令行输入正则表达式
- -o 指定输出文件,默认是 output.log
- -l 日志级别,默认是info
- -t 指定时间格式,默认是0,表示 [0411 15:46:57.447]
1.3. 正则表达式 regular.json 内容如下
[
{"id": "xxx","regexp": "xxxx","ignore":0,"add":"","time_id":""}
]
- id 用来定义唯一标识,可用来关联其他条目,目前可以计算时间差
- regexp 是正则表达式,用于过滤有效日志
- ignore 表示是否忽略该行,0表示不忽略,1表示忽略
- add 表示添加的字符串,比如添加换行符,添加在对应日志的前边
- time_id 表示与哪个 id 关联,目前主要是用来计算时间差
1.4. 举例
- 举例 python3 filter_file.py -i test.log -o output.log -r regular.json
reqular.json 内容如下
[
{"id": "001","regexp": ".*0000000000000.*","ignore":0,"add":"\n","time_id":""},
{"id": "002","regexp": ".*222222222.*","ignore":0,"add":"","time_id":""},
{"id": "003","regexp": ".*333333333.*","ignore":0,"add":"","time_id":"001"},
{"id": "004","regexp": ".*555555555.*","ignore":0,"add":"","time_id":""},
{"id": "005","regexp": ".*777777777.*","ignore":1,"add":"","time_id":""},
{"id": "006","regexp": ".*999999999.*","ignore":0,"add":"","time_id":"001"}
]
test.log 内容如下
[15:47;28.931][D][0411 15:43:21.040] log 0000000000000 ppfejf
[15:47;28.931][W][0411 15:43:21.040] log 1111111111111ppfejf
[15:47;28.931][I][0411 15:43:22.040] log 2222222222222ppfejf
[15:47;28.931][E][0411 15:43:23.040] log 33333333333333ppfejf
[15:47;28.931][D][0411 15:43:24.040] log 444444444444444ppfejf
[15:47;28.931][W][0411 15:43:24.040] log 555555555555555ppfejf
[15:47;28.931][I][0411 15:43:24.040] log 666666666666666fejf
[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
[15:47;28.931][D][0411 15:43:24.040] log 7777777777777777ppfejf
[15:47;28.931][W][0411 15:43:24.040] log 3333333333333ppfejf
[15:47;28.931][I][0411 15:43:24.040] log 888888888888888ppfejf
[15:47;28.931][E][0411 15:43:24.040] log 999999999999999ppfejf
[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
output.log 输出结果如下
[15:47;28.931][D][0411 15:43:21.040] log 0000000000000 ppfejf
[15:47;28.931][I][0411 15:43:22.040] log 2222222222222ppfejf
0:00:02.000000_[15:47;28.931][E][0411 15:43:23.040] log 33333333333333ppfejf
[15:47;28.931][W][0411 15:43:24.040] log 555555555555555ppfejf
0:00:03.000000_[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
0:00:03.000000_[15:47;28.931][W][0411 15:43:24.040] log 3333333333333ppfejf
0:00:03.000000_[15:47;28.931][E][0411 15:43:24.040] log 999999999999999ppfejf
0:00:03.000000_[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
1.5. 代码说明
- set_log_level 用来设置日志级别
- regexp_info 类,用来解析正则表达式
- regexp_utils 类,常用正则表达式,比如解析获取文本中的时间等
- filter_file 类,用来解析正则表达式,读文件,过滤每一行,还可以计算时间差