refactors

This commit is contained in:
Mira Kristipati 2024-08-14 01:57:15 -04:00
parent 0bcf59f3e0
commit befb8f8297

View file

@ -10,8 +10,7 @@ from typing import cast
import parsy as ps import parsy as ps
from colorama import Back, Fore, Style from colorama import Back, Fore, Style
from dotenv import dotenv_values from parsy import Parser, generate, peek, regex, seq, string
from parsy import Parser, regex, seq, string
LEVEL_UNKNOWN = 255 LEVEL_UNKNOWN = 255
NEWLINE = string("\n") NEWLINE = string("\n")
@ -23,10 +22,16 @@ class Config(dict[str, str]):
def __call__(self, parser, namespace, values, option_string=None) -> None: def __call__(self, parser, namespace, values, option_string=None) -> None:
_ = (parser, option_string) _ = (parser, option_string)
values = cast(str, values) values = cast(str, values)
setattr(namespace, self.dest, format_debug_level(values.upper())[0]) setattr(namespace, self.dest, get_debug_level_const(values.upper())[0])
def __init__(self) -> None: def __init__(self) -> None:
self.dict = {k: v for k, v in dotenv_values(".env").items() if v is not None} self.dict = {}
if os.path.exists(".env"):
from dotenv import dotenv_values
self.dict = {
k: v for k, v in dotenv_values(".env").items() if v is not None
}
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
if not ("LOGFILE" in self.dict or "LOGFILE" in os.environ): if not ("LOGFILE" in self.dict or "LOGFILE" in os.environ):
parser.add_argument( parser.add_argument(
@ -85,10 +90,10 @@ def colorize_status(status: str) -> str:
return status return status
def format_debug_level(level: str) -> tuple[int, str]: def get_debug_level_const(level: str) -> tuple[int, str]:
if level == "DEBUG": if "DEBUG" in level:
return (logging.DEBUG, level) return (logging.DEBUG, level)
elif level == "INFO": elif "INFO" in level:
return (logging.INFO, level) return (logging.INFO, level)
elif "WARN" in level: elif "WARN" in level:
return (logging.WARNING, level) return (logging.WARNING, level)
@ -101,21 +106,51 @@ def format_debug_level(level: str) -> tuple[int, str]:
def colorize_by_level(level: str, text: str) -> str: def colorize_by_level(level: str, text: str) -> str:
if level == "DEBUG": if "DEBUG" in level:
return Style.DIM + text + Style.RESET_ALL return Style.DIM + text + Style.RESET_ALL
elif level == "INFO": elif "INFO" in level:
return Style.BRIGHT + text + Style.RESET_ALL return Style.BRIGHT + text + Style.RESET_ALL
elif level == "WARNING": elif "WARN" in level:
return Fore.YELLOW + text + Style.RESET_ALL return Fore.YELLOW + text + Style.RESET_ALL
elif level == "ERROR": elif "ERR" in level:
return Fore.RED + text + Style.RESET_ALL return Fore.RED + text + Style.RESET_ALL
elif level == "CRITICAL": elif "CRIT" in level:
return Style.BRIGHT + Back.RED + Fore.BLACK + text + Style.RESET_ALL return Style.BRIGHT + Back.RED + Fore.BLACK + text + Style.RESET_ALL
else: else:
return Fore.BLUE + text + Style.RESET_ALL return Fore.BLUE + text + Style.RESET_ALL
# return level # return level
"""
Output functions
"""
class TestPrinterBuilder:
loglevel: int = logging.NOTSET
query: str | re.Pattern[str] | None = None
def __call__(
self, ts: str, level: tuple[int, str], src: str, msg: str
) -> str | None:
level_name = level[1]
if (level[0] >= self.loglevel) or (
self.query is not None and re.search(self.query, msg)
):
res = f"{ts}\t{Fore.BLACK + Back.WHITE + level[1] + Style.RESET_ALL}\t{colorize_by_level(level_name,f'{src}: {msg}')}"
print(res)
return res
def with_loglevel(self, loglevel: int) -> TestPrinterBuilder:
self.loglevel = loglevel
return self
def with_query(self, query: str | re.Pattern) -> TestPrinterBuilder:
self.query = query
return self
""" """
Parser defs Parser defs
""" """
@ -125,8 +160,13 @@ def block_parser(text: str) -> Parser:
return regex(rf"^.*\n^.*{text}.*$\n.*", re.MULTILINE) return regex(rf"^.*\n^.*{text}.*$\n.*", re.MULTILINE)
timestamp: Parser = regex(r"\d{4}-\d{2}-\d{2}.\d{2}:\d{2}:\d{2}(,\d{3})?") timestamp: Parser = (
debug_level: Parser = regex(r"\S+") regex(
r"^\d{4}-\d{2}-\d{2}.\d{2}:\d{2}:\d{2}(\+\d{2}:\d{2})?(,\d{3})?", re.MULTILINE
)
<< string(":").optional()
)
debug_level: Parser = regex(r"[A-Za-z0-9_%-]+") << string(":").optional()
start_block = block_parser("EASYPY JOB START") start_block = block_parser("EASYPY JOB START")
@ -136,90 +176,108 @@ summary_block = block_parser("Task Result Summary")
details_block = block_parser("Task Result Details") details_block = block_parser("Task Result Details")
sections = seq( message = ps.any_char.until(ps.peek(timestamp) | ps.eof).concat()
header=ps.any_char.until(start_block).concat() << start_block << NEWLINE,
test_output=ps.any_char.until(end_block).concat() << end_block << NEWLINE,
post_test=ps.any_char.until(report_block).concat() << report_block,
report=ps.any_char.until(summary_block).concat() << summary_block,
summary=ps.any_char.until(details_block).concat() << details_block,
details=ps.any_char.until(NEWLINE + ps.letter).concat() << NEWLINE,
etc=ps.any_char.until(ps.eof).concat(),
)
messsage = ps.any_char.until(ps.peek(timestamp) | ps.eof).concat()
test_output = seq(
ts=timestamp << SPACE,
level=debug_level.map(format_debug_level) << SPACE,
src=string("[") >> regex(r"[A-Za-z_0-9]+") << string("] "),
msg=messsage.map(lambda x: x.strip()),
).many()
test_details: Parser = seq( @generate
test_name=( def test_output_line():
NEWLINE.optional() ts = yield timestamp << SPACE
<< (SPACE * 4) level = yield debug_level << SPACE
<< (string("|--") | string("`--")) src = yield (string("[") >> regex(r"[A-Za-z_0-9]+") << string("] ")).optional()
msg = yield message.map(lambda x: x.rstrip())
return print_test_line(ts, get_debug_level_const(level), src, msg)
# return {"ts": ts, "level": level, "src": src, "msg": msg}
@generate
def test_details():
test_name = yield (
(
NEWLINE.optional()
<< (SPACE * 4)
<< (string("|--") | string("`--"))
<< ps.whitespace
)
>> ps.any_char.until(ps.whitespace).concat()
<< ps.whitespace << ps.whitespace
) )
>> ps.any_char.until(ps.whitespace).concat() test_results = yield (ps.letter.many().concat())
<< ps.whitespace, print(f" | {colorize_status(test_results)}\t{test_name}")
test_results=(ps.letter.many().concat()).map(colorize_status), return {"test_results": test_results, "test_name": test_name}
)
subtask_details: Parser = seq(
subtask_name=(string("`--") << ps.whitespace)
>> ps.any_char.until(ps.whitespace).concat()
<< ps.whitespace,
subtask_result=(ps.letter.many().concat()).map(colorize_status),
tests=test_details.many(),
)
task_details: Parser = seq(
task_number=ps.any_char.until(string(":")).concat() << string(":") << ps.whitespace,
task_name=ps.any_char.until(ps.whitespace).concat() << ps.whitespace,
subtasks=subtask_details.many(),
)
details: Parser = task_details.many()
""" @generate
Output functions def subtask_details():
""" subtask_name = (
yield (string("`--") << ps.whitespace)
>> ps.any_char.until(ps.whitespace).concat()
<< ps.whitespace
)
subtask_result = yield (ps.letter.many().concat())
print(f" > {colorize_status(subtask_result)}\t{subtask_name}")
tests = yield test_details.many()
return {
"subtask_result": subtask_result,
"subtask_name": subtask_name,
"tests": tests,
}
def output_details(log_details: list) -> None: @generate
for task in log_details: def task_details():
print( task_number = (
f"{task['task_number']}\t {Style.BRIGHT + task['task_name'] + Style.RESET_ALL}" yield ps.any_char.until(string(":")).concat() << string(":") << ps.whitespace
) )
for subtask in task["subtasks"]: task_name = yield (ps.any_char.until(ps.whitespace).concat() << ps.whitespace)
print(f" {subtask['subtask_result']}\t{subtask['subtask_name']}") print(f"{task_number}\t {Style.BRIGHT + task_name + Style.RESET_ALL}")
for test in subtask["tests"]:
print(f" | {test['test_results']}\t{test['test_name']}") subtasks = yield (subtask_details.many())
return {"task_number": task_number, "task_name": task_name, "subtasks": subtasks}
def output_tests( @generate
tests: list, # NOTE: Uncomment the print statements if you actually care what's in these sections
filter_level: int = logging.NOTSET, def sections():
search_term: str | re.Pattern | None = None, header = yield (
) -> None: ps.any_char.until(peek(timestamp)).concat() # TODO: output this
for test in tests: + test_output_line.until(start_block).concat()
level = test["level"] << start_block
if (search_term and not re.search(search_term, test["msg"])) or level[ << NEWLINE
0 )
] >= filter_level:
print( test_output = yield test_output_line.until(end_block) << end_block << NEWLINE
colorize_by_level( post_test = yield (ps.any_char.until(report_block).concat() << report_block)
level[1], f"{test['ts']} {level[1]} {test['src']} {test['msg']}" # print(post_test)
) report = yield (ps.any_char.until(summary_block).concat() << summary_block)
) # print(report)
summary = yield (ps.any_char.until(details_block).concat() << details_block)
# print(summary)
details = yield task_details.many()
etc = yield (ps.any_char.until(ps.eof).concat())
return {
"header": header,
"test_output": test_output,
"post_test": post_test,
"report": report,
"summary": summary,
"details": details,
"etc": etc,
}
# TODO: make this work with streams
if __name__ == "__main__": if __name__ == "__main__":
config = Config() config = Config()
with Path(config["LOGFILE"]).open() as f: with Path(config["LOGFILE"]).open() as f:
log = f.read() log = f.read()
global output_test_line
print_test_line = TestPrinterBuilder()
if loglevel := config.get("LOGLEVEL"):
print_test_line = print_test_line.with_loglevel(cast(int, loglevel))
if query := config.get("QUERY"):
print_test_line = print_test_line.with_query(query)
from icecream import ic
parsed_sections = sections.parse(log) parsed_sections = sections.parse(log)
output_details(details.parse(parsed_sections["details"]))
print("-------------\v")
output_tests(test_output.parse(parsed_sections["test_output"]), config["LOGLEVEL"]) # noqa