1. 【工具类】正则表达式过滤器(过滤日志、过滤文件)
- 1. 【工具类】正则表达式过滤器(过滤日志、过滤文件)
- 1.1. 划重点
- 1.2. 参数说明
- 1.3. 正则表达式 regular.json 内容如下
- 1.4. 举例
- 1.5. 代码说明
1.1. 划重点
功能: python实现的支持对文件进行正则表达式过滤,不同的过滤模板,维护不同的正则表达式文件即可,方便跨平台通用
- 编写自己的正则表达式,主要填写 regexp 字段,并保存为
regular.json
文件,格式如下
[
{"id": "","regexp": ".*hello world.*","ignore":0,"add":"","time_id":""},
{"id": "","regexp": "^my name is knowledgebao.*","ignore":0,"add":"","time_id":""}
]
- 将下边python代码保存为
filter_file.py
文件,准备要过滤的文件test.log
- 执行
python3 filter_file.py -i test.log -r regular.json -o output.log
其中 output.log 是过滤后的文件
#!/usr/bin/env python3
# -*- coding: utf-8 -*-import re
import argparse
import os
import logging
import json
import uuid
from datetime import datetimelogging.basicConfig(format='[%(asctime)s.%(msecs)03d] [%(levelname).1s] [%(filename)s:%(lineno)d] %(message)s',datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)# 日志级别设置
def set_log_level(level):log_levels = {0: logging.DEBUG,1: logging.INFO,2: logging.WARNING,3: logging.ERROR,4: logging.CRITICAL}if level in log_levels:logger.setLevel(log_levels[level])# 正则表达式解析结构体,对应 json 文件中的一条记录
class regexp_info:def __init__(self):self.id = None # self.ignore = None # self.add = None # 额外添加在日志前边的字符串self.regexp_text = None # 原始正则表达式内容self.regexp = None # 编译后的正则表达式句柄self.time_id = None # self.time = Nonedef __str__(self) -> str:return f"id: {self.id}, ignore: {self.ignore}, add: {self.add}, time_id: {self.time_id} regexp: {self.regexp_text}"# 常用正则表达式
class regexp_utils:# [15:48;00.224][I][0411 15:46:57.447] log infopattern_time1 = re.compile(r"^\[\d{2}:\d{2};\d{2}\.\d{3}\]\[.\]\[(\d{4} \d{2}:\d{2}:\d{2}\.\d{3})\]")# [E][0410 21:44:07.514][xxx][xxx] loginpattern_time2 = re.compile(r"^\[.\]\[(\d{4} \d{2}:\d{2}:\d{2}\.\d{3})\]")# 日志解析类,过滤日志文件
class filter_file:def __init__(self):self.regexps = {} self.pattern_time = regexp_utils.pattern_time1self.f = Noneself.all_line = 0self.process_line = 0self.valid_line = 0# 解析正则表达式def parse_regexp(self, regexp_path, regexp_list, output):if regexp_path and os.path.exists(regexp_path):with open(regexp_path, 'r') as f:data = json.load(f)for val in data:info = regexp_info()info.id = val.get("id")info.ignore = val.get("ignore")info.add = val.get("add") # .replace("\\n", "\n").replace("\\t", "\t")info.regexp_text = val.get("regexp")if info.regexp_text is None or info.regexp_text == "":continuetry:info.regexp = re.compile(info.regexp_text)except Exception as e:logger.error(f"regexp: {info.regexp_text} error: {e}")raise einfo.time_id = val.get("time_id")self.regexps[info.id] = infologger.info(f"{info}")if regexp_list:for regexp in regexp_list:if regexp is None or regexp == "":continueinfo = regexp_info()info.id = uuid.uuid4().hexinfo.regexp_text = ".*"+regexp+".*"try:info.regexp = re.compile(info.regexp_text)except Exception as e:logger.error(f"regexp: {info.regexp_text} error: {e}")raise eself.regexps[info.id] = infologger.info(f"{info}")if output:self.f = open(output, 'w+')def __write_file(self, text):if self.f and text and text != "":self.f.write(text)# return t1-t2def __get_time_delta(self, t1, t2):if t1 is None or t2 is None:return Noneif not (isinstance(t1, datetime) and isinstance(t2, datetime)):return Nonedelta_time = str(t1 - t2)if len(delta_time) < 14:delta_time += "."delta_time += "0" * (14-len(delta_time))delta_time += "_"return delta_timedef __process_time(self, line, regexp):time = Nonedate_time = self.pattern_time.findall(line)if len(date_time) > 0:time = datetime.strptime(date_time[0], '%m%d %H:%M:%S.%f')if regexp.time_id and regexp.time_id in self.regexps and self.regexps[regexp.time_id].time:delta = self.__get_time_delta(time, self.regexps[regexp.time_id].time)logger.debug(f"{delta}, {time}, {self.regexps[regexp.time_id].time}")self.__write_file(delta)return timedef __parse_line(self, line):self.valid_line += 1# logger.debug(f"beg process: {line}")for regexp in self.regexps.values():# logger.debug(f"regexp: {regexp}")if regexp.ignore or regexp.regexp is None or regexp.regexp_text == "":continueif regexp.regexp.search(line):logger.debug(f"{regexp.id}: {line}")if self.f:# 打印额外信息self.__write_file(regexp.add)# 获取和打印时间差regexp.time = self.__process_time(line, regexp)# 打印日志内容self.__write_file(line)self.process_line += 1breakdef parses(self, file_path):for line in open(file_path, "rb"):self.all_line += 1try:line = line.decode("utf8",errors="replace")if line and line != "":self.__parse_line(line)except Exception as e:logger.warning(f"parse lines {self.all_line} failed: {e}")# if self.all_line > 10: # for test# breakdef print_result(self):logger.info(f"all line: {self.all_line}, decode line: {self.valid_line}, process line: {self.process_line}")if __name__ == "__main__":parser = argparse.ArgumentParser()parser.description = 'please enter correct para'parser.add_argument("-i", "--file", help="log file", type=str)parser.add_argument("-r", "--regular_file", help="regular json file", type=str)parser.add_argument("-rl", "--regular_list", help="regular text, support many", type=str, nargs='+')parser.add_argument("-o", "--output", help="log output file", type=str, default="output.log")parser.add_argument("-l", "--log_level", help="0-4, debug,info,warning,error,critical", type=int, default=1)args = parser.parse_args()logger.info(f"log level {args.log_level}")logger.info(f"input file {args.file}")logger.info(f"output file {args.output}")logger.info(f"regular file {args.regular_file}")logger.info(f"regular_list {args.regular_list}")set_log_level(args.log_level)if not os.path.exists(args.file):logger.error(f"input log file {args.file} not exist")exit(1)log_process = filter_file()try:log_process.parse_regexp(args.regular_file, args.regular_list, args.output)logger.info(f"begin parse {args.file}")logger.info(f"==================================")log_process.parses(args.file)log_process.print_result()except Exception as e:logger.error(f"parse log file {args.file} failed: {e}")
1.2. 参数说明
➜ python filter_file.py --help
usage: filter_file.py [-h] [-i FILE] [-r REGULAR_FILE] [-rl REGULAR_LIST [REGULAR_LIST ...]] [-o OUTPUT] [-l LOG_LEVEL]please enter correct paraoptional arguments:-h, --help show this help message and exit-i FILE, --file FILE log file-r REGULAR_FILE, --regular_file REGULAR_FILE regular json file-rl REGULAR_LIST [REGULAR_LIST ...], --regular_list REGULAR_LIST [REGULAR_LIST ...] regular text, support many-o OUTPUT, --output OUTPUT log output file-l LOG_LEVEL, --log_level LOG_LEVEL 0-4, debug,info,warning,error,critical
- -i 原始待处理文件
- -r regular_file,指定正则表达式文件
- -rl regular_list, 指定正则表达式字符串,可以指定多个, 与 -r 类似,直接命令行输入正则表达式
- -o 指定输出文件,默认是 output.log
- -l 日志级别,默认是info
1.3. 正则表达式 regular.json 内容如下
[
{"id": "xxx","regexp": "xxxx","ignore":0,"add":"","time_id":""}
]
- id 用来定义唯一标识,可用来关联其他条目,目前可以计算时间差
- regexp 是正则表达式,用于过滤有效日志
- ignore 表示是否忽略该行,0表示不忽略,1表示忽略
- add 表示添加的字符串,比如添加换行符,添加在对应日志的前边
- time_id 表示与哪个 id 关联,目前主要是用来计算时间差
1.4. 举例
- 举例 python3 filter_file.py -i test.log -o output.log -r regular.json
reqular.json 内容如下
[
{"id": "001","regexp": ".*0000000000000.*","ignore":0,"add":"\n","time_id":""},
{"id": "002","regexp": ".*222222222.*","ignore":0,"add":"","time_id":""},
{"id": "003","regexp": ".*333333333.*","ignore":0,"add":"","time_id":"001"},
{"id": "004","regexp": ".*555555555.*","ignore":0,"add":"","time_id":""},
{"id": "005","regexp": ".*777777777.*","ignore":1,"add":"","time_id":""},
{"id": "006","regexp": ".*999999999.*","ignore":0,"add":"","time_id":"001"}
]
test.log 内容如下
[15:47;28.931][D][0411 15:43:21.040] log 0000000000000 ppfejf
[15:47;28.931][W][0411 15:43:21.040] log 1111111111111ppfejf
[15:47;28.931][I][0411 15:43:22.040] log 2222222222222ppfejf
[15:47;28.931][E][0411 15:43:23.040] log 33333333333333ppfejf
[15:47;28.931][D][0411 15:43:24.040] log 444444444444444ppfejf
[15:47;28.931][W][0411 15:43:24.040] log 555555555555555ppfejf
[15:47;28.931][I][0411 15:43:24.040] log 666666666666666fejf
[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
[15:47;28.931][D][0411 15:43:24.040] log 7777777777777777ppfejf
[15:47;28.931][W][0411 15:43:24.040] log 3333333333333ppfejf
[15:47;28.931][I][0411 15:43:24.040] log 888888888888888ppfejf
[15:47;28.931][E][0411 15:43:24.040] log 999999999999999ppfejf
[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
output.log 输出结果如下
[15:47;28.931][D][0411 15:43:21.040] log 0000000000000 ppfejf
[15:47;28.931][I][0411 15:43:22.040] log 2222222222222ppfejf
0:00:02.000000_[15:47;28.931][E][0411 15:43:23.040] log 33333333333333ppfejf
[15:47;28.931][W][0411 15:43:24.040] log 555555555555555ppfejf
0:00:03.000000_[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
0:00:03.000000_[15:47;28.931][W][0411 15:43:24.040] log 3333333333333ppfejf
0:00:03.000000_[15:47;28.931][E][0411 15:43:24.040] log 999999999999999ppfejf
0:00:03.000000_[15:47;28.931][E][0411 15:43:24.040] log 3333333333333ppfejf
1.5. 代码说明
- set_log_level 用来设置日志级别
- regexp_info 类,用来解析正则表达式
- regexp_utils 类,常用正则表达式,比如解析获取文本中的时间等
- filter_file 类,用来解析正则表达式,读文件,过滤每一行,还可以计算时间差