From ea736cc6ba51edb7a01b5ac5a58c18217d59cb12 Mon Sep 17 00:00:00 2001 From: "Hany DH." <106172590+Hany-Dh@users.noreply.github.com> Date: Wed, 4 Mar 2026 23:02:02 +0100 Subject: [PATCH 01/17] created test files and modified insghtlog according to possible solutions --- check_empty_log.py | 3 ++ insightlog.py | 6 ++- ipv4 regex case.py | 16 ++++++++ problem 4_4_possible solutions.txt | 62 ++++++++++++++++++++++++++++++ test_insightlog.py | 9 +++++ 5 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 check_empty_log.py create mode 100644 ipv4 regex case.py create mode 100644 problem 4_4_possible solutions.txt create mode 100644 test_insightlog.py diff --git a/check_empty_log.py b/check_empty_log.py new file mode 100644 index 0000000..10b9ee7 --- /dev/null +++ b/check_empty_log.py @@ -0,0 +1,3 @@ +from insightlog import get_requests + +print(get_requests("nginx", filepath="empty.log")) \ No newline at end of file diff --git a/insightlog.py b/insightlog.py index 1232f79..dfd8b3d 100644 --- a/insightlog.py +++ b/insightlog.py @@ -59,11 +59,12 @@ 'auth': DEFAULT_AUTH } -IPv4_REGEX = r'(\d+.\d+.\d+.\d+)' +IPv4_REGEX = r'\b((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\b' AUTH_USER_INVALID_USER = r'(?i)invalid\suser\s(\w+)\s' AUTH_PASS_INVALID_USER = r'(?i)failed\spassword\sfor\s(\w+)\s' + # Validator functions def is_valid_year(year): """Check if year's value is valid""" @@ -274,8 +275,9 @@ def get_requests(service, data=None, filepath=None, filters=None): with open(filepath, 'r') as f: filtered_data = f.read() except (IOError, EnvironmentError) as e: + print("DEBUG: File error happened here") print(e.strerror) - return None + return [] else: filtered_data = data diff --git a/ipv4 regex case.py b/ipv4 regex case.py new file mode 100644 index 0000000..a3fa650 --- /dev/null +++ b/ipv4 regex case.py @@ -0,0 +1,16 @@ +from insightlog import analyze_auth_request + +test = "invalid user test from 123a456b789c012" +print(analyze_auth_request(test)) + + +import re +from insightlog import IPv4_REGEX + +text = "123a456b789c012" +print(re.findall(IPv4_REGEX, text)) +print(re.findall(r'\d+.\d+', "123a456")) +print(re.findall(r'\d+\.\d+', "123a456")) +print(re.findall(r'\d+\.\d+', "123.456")) +print(analyze_auth_request("failed login from 999.999.999.999")) + diff --git a/problem 4_4_possible solutions.txt b/problem 4_4_possible solutions.txt new file mode 100644 index 0000000..174eed0 --- /dev/null +++ b/problem 4_4_possible solutions.txt @@ -0,0 +1,62 @@ + + +4. problem code location 276 + + except (IOError, EnvironmentError) as e: + print(e.strerror) + return None + +fix option + + except (IOError, EnvironmentError) as e: + print(e.strerror) + return [] + + +more proffesional + +except (IOError, EnvironmentError) as e: + raise FileNotFoundError(f"Log file not found: {filepath}") from e + + +added new print to 277 + +print("DEBUG: File error happened here") + +=========================== + +5. problem code location 62 / 212 + + IPv4_REGEX = r'(\d+.\d+.\d+.\d+)' + +fix option + + IPv4_REGEX = r'(\d+\.\d+\.\d+\.\d+)' + +best choice full correction + +IPv4_REGEX = r'\b((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\b' + + +Python built-in validation: + +import ipaddress + +def extract_ip(text): + for word in text.split(): + try: + return str(ipaddress.IPv4Address(word)) + except: + continue + return None + + + +git clone --branch test-group-a https://github.com/AlexCasF/InsightLog.git + + + + + + + \ No newline at end of file diff --git a/test_insightlog.py b/test_insightlog.py new file mode 100644 index 0000000..a8b9272 --- /dev/null +++ b/test_insightlog.py @@ -0,0 +1,9 @@ +from insightlog import get_requests + +result = get_requests("nginx", filepath="fake_file.log") +print(result) + + +for r in get_requests("nginx", filepath="fake_file.log"): + print(r) + From 003574e01113eeccaa7c8df3d4c9b51111f3b9a1 Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Thu, 5 Mar 2026 12:48:19 +0100 Subject: [PATCH 02/17] wrote a helper fucntion using chardet to replace standard read function --- insightlog.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/insightlog.py b/insightlog.py index 1232f79..bcc663e 100644 --- a/insightlog.py +++ b/insightlog.py @@ -1,5 +1,6 @@ import re import calendar +import chardet from datetime import datetime # Service settings @@ -89,6 +90,17 @@ def is_valid_minute(minute): """Check if minute value is valid""" return (minute == '*') or (59 >= minute >= 0) +# Helper function to check encoding of file before opening, instead of just open in UTF-8 by default +def read_text_file(filepath): + try: + with open(filepath, "rb") as f: + raw_data = f.read() + result = chardet.detect(raw_data) + encoding = result["encoding"] + with open(filepath, "r", encoding=encoding) as f: + return f.read() + except (FileNotFoundError, UnicodeDecodeError): + return None # Utility functions def get_service_settings(service_name): From 839dd8f99eb0862367af1723e15858c824fe9742 Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Thu, 5 Mar 2026 12:52:16 +0100 Subject: [PATCH 03/17] replaced default file read logic w new helper fct inside "filter_data()" --- insightlog.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/insightlog.py b/insightlog.py index bcc663e..d9babc3 100644 --- a/insightlog.py +++ b/insightlog.py @@ -148,15 +148,13 @@ def filter_data(log_filter, data=None, filepath=None, is_casesensitive=True, is_ """Filter received data/file content and return the results""" return_data = "" if filepath: - try: - with open(filepath, 'r') as file_object: - for line in file_object: - if check_match(line, log_filter, is_regex, is_casesensitive, is_reverse): - return_data += line - return return_data - except (IOError, EnvironmentError) as e: - print(e.strerror) + file_data = read_text_file(filepath) + if file_data is None: return None + for line in file_data.splitlines(True): + if check_match(line, log_filter, is_regex, is_casesensitive, is_reverse): + return_data += line + return return_data elif data: for line in data.splitlines(): if check_match(line, log_filter, is_regex, is_casesensitive, is_reverse): From a40b92505635f3457f344c45ed33a4ed498e10b8 Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Thu, 5 Mar 2026 12:58:10 +0100 Subject: [PATCH 04/17] same for "apply_filters()" --- insightlog.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/insightlog.py b/insightlog.py index d9babc3..d4c85a2 100644 --- a/insightlog.py +++ b/insightlog.py @@ -236,16 +236,14 @@ def analyze_auth_request(request_info): def apply_filters(filters, data=None, filepath=None): """Apply all filters to data or file and return filtered results""" if filepath: - try: - with open(filepath, 'r') as file_object: - filtered_lines = [] - for line in file_object: - if check_all_matches(line, filters): - filtered_lines.append(line) - return ''.join(filtered_lines) - except (IOError, EnvironmentError) as e: - print(e.strerror) + file_data = read_text_file(filepath) + if file_data is None: return None + filtered_lines = [] + for line in file_data.splitlines(True): + if check_all_matches(line, filters): + filtered_lines.append(line) + return ''.join(filtered_lines) elif data: filtered_lines = [] for line in data.splitlines(): From 5b630290a641c607c9ff2bd9d88e138afdd58d4c Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Thu, 5 Mar 2026 13:03:44 +0100 Subject: [PATCH 05/17] same for "get_requests()" --- insightlog.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/insightlog.py b/insightlog.py index d4c85a2..b773f52 100644 --- a/insightlog.py +++ b/insightlog.py @@ -278,11 +278,8 @@ def get_requests(service, data=None, filepath=None, filters=None): filtered_data = apply_filters(filters, data=data, filepath=filepath) else: if filepath: - try: - with open(filepath, 'r') as f: - filtered_data = f.read() - except (IOError, EnvironmentError) as e: - print(e.strerror) + filtered_data = read_text_file(filepath) + if filtered_data is None: return None else: filtered_data = data From dff60a3ce9ec06bf0ddeb03fb0a9cfba87577d63 Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Fri, 6 Mar 2026 10:52:13 +0100 Subject: [PATCH 06/17] created some non-UTF-8 sample logs for testing --- logs-samples/non-utf-8/apache_nonutf8_latin1.sample | 2 ++ logs-samples/non-utf-8/auth_nonutf8_cp1252.sample | 2 ++ logs-samples/non-utf-8/nginx_nonutf8_cp1252.sample | 2 ++ logs-samples/non-utf-8/nginx_nonutf8_latin1.sample | 2 ++ 4 files changed, 8 insertions(+) create mode 100644 logs-samples/non-utf-8/apache_nonutf8_latin1.sample create mode 100644 logs-samples/non-utf-8/auth_nonutf8_cp1252.sample create mode 100644 logs-samples/non-utf-8/nginx_nonutf8_cp1252.sample create mode 100644 logs-samples/non-utf-8/nginx_nonutf8_latin1.sample diff --git a/logs-samples/non-utf-8/apache_nonutf8_latin1.sample b/logs-samples/non-utf-8/apache_nonutf8_latin1.sample new file mode 100644 index 0000000..2736564 --- /dev/null +++ b/logs-samples/non-utf-8/apache_nonutf8_latin1.sample @@ -0,0 +1,2 @@ +127.0.1.1 - - [04/May/2016:11:31:39 +0000] "GET / HTTP/1.1" 200 612 "-" "apaché-client" +127.0.1.2 - - [04/May/2016:11:32:39 +0000] "GET /admin HTTP/1.1" 403 300 "http://example.com/entrée" "bot-ç" diff --git a/logs-samples/non-utf-8/auth_nonutf8_cp1252.sample b/logs-samples/non-utf-8/auth_nonutf8_cp1252.sample new file mode 100644 index 0000000..d184f07 --- /dev/null +++ b/logs-samples/non-utf-8/auth_nonutf8_cp1252.sample @@ -0,0 +1,2 @@ +May 4 22:00:32 server sshd[1001]: Failed password for root from 120.25.229.167 port 22 ssh2 – legacy +May 4 22:01:32 server sshd[1002]: Invalid user admin from 120.25.229.168 port 22 “old client” diff --git a/logs-samples/non-utf-8/nginx_nonutf8_cp1252.sample b/logs-samples/non-utf-8/nginx_nonutf8_cp1252.sample new file mode 100644 index 0000000..9d684ce --- /dev/null +++ b/logs-samples/non-utf-8/nginx_nonutf8_cp1252.sample @@ -0,0 +1,2 @@ +127.0.0.3 - - [24/Apr/2016:06:28:37 +0000] "GET /docs HTTP/1.1" 200 800 "-" "Mozilla “Legacy”" +127.0.0.4 - - [24/Apr/2016:06:29:37 +0000] "GET /price HTTP/1.1" 200 900 "http://example.com/eur" "cost-€-client" diff --git a/logs-samples/non-utf-8/nginx_nonutf8_latin1.sample b/logs-samples/non-utf-8/nginx_nonutf8_latin1.sample new file mode 100644 index 0000000..926b3c0 --- /dev/null +++ b/logs-samples/non-utf-8/nginx_nonutf8_latin1.sample @@ -0,0 +1,2 @@ +127.0.0.1 - - [24/Apr/2016:06:26:37 +0000] "GET / HTTP/1.1" 200 612 "-" "café-browser" +127.0.0.2 - - [24/Apr/2016:06:27:37 +0000] "GET /search?q=olé HTTP/1.1" 200 700 "http://example.com/réf" "agent-ñ" From 330e9ed4411910f818d3fc1f5ef4cd268203698c Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Fri, 6 Mar 2026 10:58:48 +0100 Subject: [PATCH 07/17] stronger error handling + only trust chardet if confidence > 70% --- insightlog.py | 31 +++++++++++++++++++++---- tests/test_insightlog.py | 49 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/insightlog.py b/insightlog.py index b773f52..1770dfe 100644 --- a/insightlog.py +++ b/insightlog.py @@ -95,11 +95,32 @@ def read_text_file(filepath): try: with open(filepath, "rb") as f: raw_data = f.read() - result = chardet.detect(raw_data) - encoding = result["encoding"] - with open(filepath, "r", encoding=encoding) as f: - return f.read() - except (FileNotFoundError, UnicodeDecodeError): + if not raw_data: + return "" + + # Use a deterministic decode chain to avoid brittle single-shot detection. + preferred_encodings = ["utf-8", "utf-8-sig"] + detected = chardet.detect(raw_data) or {} + detected_encoding = detected.get("encoding") + detected_confidence = detected.get("confidence") or 0.0 + + candidate_encodings = list(preferred_encodings) + if detected_encoding and detected_confidence >= 0.70: + candidate_encodings.append(detected_encoding) + candidate_encodings.extend(["cp1252", "latin-1"]) + + seen = set() + for encoding in candidate_encodings: + normalized = encoding.lower() + if normalized in seen: + continue + seen.add(normalized) + try: + return raw_data.decode(encoding) + except UnicodeDecodeError: + continue + return None + except (FileNotFoundError, OSError): return None # Utility functions diff --git a/tests/test_insightlog.py b/tests/test_insightlog.py index 718da2e..eb3b0ab 100644 --- a/tests/test_insightlog.py +++ b/tests/test_insightlog.py @@ -1,4 +1,5 @@ import os +import tempfile from unittest import TestCase from datetime import datetime from insightlog import * @@ -110,4 +111,52 @@ def test_get_requests(self): requests = get_requests('nginx', filepath=nginx_logfile, filters=nginx_filters) self.assertEqual(len(requests), 2, "get_requests#2") + def test_read_text_file_non_utf8_fallbacks(self): + base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + latin1_file = os.path.join(base_dir, 'logs-samples/non-utf-8/nginx_nonutf8_latin1.sample') + cp1252_file = os.path.join(base_dir, 'logs-samples/non-utf-8/nginx_nonutf8_cp1252.sample') + auth_cp1252_file = os.path.join(base_dir, 'logs-samples/non-utf-8/auth_nonutf8_cp1252.sample') + + latin1_data = read_text_file(latin1_file) + self.assertTrue(latin1_data is not None, "read_text_file_non_utf8#1") + self.assertTrue('café-browser' in latin1_data, "read_text_file_non_utf8#2") + self.assertTrue('agent-ñ' in latin1_data, "read_text_file_non_utf8#3") + + cp1252_data = read_text_file(cp1252_file) + self.assertTrue(cp1252_data is not None, "read_text_file_non_utf8#4") + self.assertTrue('Mozilla “Legacyâ€' in cp1252_data, "read_text_file_non_utf8#5") + self.assertTrue('cost-€-client' in cp1252_data, "read_text_file_non_utf8#6") + + auth_cp1252_data = read_text_file(auth_cp1252_file) + self.assertTrue(auth_cp1252_data is not None, "read_text_file_non_utf8#7") + self.assertTrue('– legacy' in auth_cp1252_data, "read_text_file_non_utf8#8") + self.assertTrue('“old clientâ€' in auth_cp1252_data, "read_text_file_non_utf8#9") + + def test_get_requests_with_utf8_sig_and_cp1252(self): + base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + cp1252_file = os.path.join(base_dir, 'logs-samples/non-utf-8/nginx_nonutf8_cp1252.sample') + + cp1252_filters = [ + {'filter_pattern': '127.0.0.3', 'is_casesensitive': True, 'is_regex': False, 'is_reverse': False} + ] + cp1252_requests = get_requests('nginx', filepath=cp1252_file, filters=cp1252_filters) + self.assertEqual(len(cp1252_requests), 1, "get_requests_non_utf8#1") + self.assertEqual(cp1252_requests[0]['USERAGENT'], 'Mozilla “Legacyâ€', "get_requests_non_utf8#2") + + with tempfile.NamedTemporaryFile('wb', suffix='.sample', delete=False) as tmp_file: + tmp_file.write( + '127.0.0.9 - - [24/Apr/2016:06:30:37 +0000] "GET / HTTP/1.1" 200 612 "-" "utf8sig-café"\n' + .encode('utf-8-sig') + ) + utf8sig_path = tmp_file.name + try: + utf8sig_filters = [ + {'filter_pattern': '127.0.0.9', 'is_casesensitive': True, 'is_regex': False, 'is_reverse': False} + ] + utf8sig_requests = get_requests('nginx', filepath=utf8sig_path, filters=utf8sig_filters) + self.assertEqual(len(utf8sig_requests), 1, "get_requests_non_utf8#3") + self.assertEqual(utf8sig_requests[0]['USERAGENT'], 'utf8sig-café', "get_requests_non_utf8#4") + finally: + os.remove(utf8sig_path) + # TODO: Add more tests for edge cases and error handling From de42016e1f6ba56fc9394e4dbb4d2838d2d8bde4 Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Fri, 6 Mar 2026 11:14:32 +0100 Subject: [PATCH 08/17] literally just swapped regex-method as proposed in bug description -> tested, works --- insightlog.py | 4 ++-- tests/test_insightlog.py | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/insightlog.py b/insightlog.py index 1232f79..f47f3b9 100644 --- a/insightlog.py +++ b/insightlog.py @@ -123,8 +123,8 @@ def get_date_filter(settings, minute=datetime.now().minute, hour=datetime.now(). def check_match(line, filter_pattern, is_regex=False, is_casesensitive=True, is_reverse=False): """Check if line contains/matches filter pattern""" if is_regex: - check_result = re.match(filter_pattern, line) if is_casesensitive \ - else re.match(filter_pattern, line, re.IGNORECASE) + check_result = re.search(filter_pattern, line) if is_casesensitive \ + else re.search(filter_pattern, line, re.IGNORECASE) else: check_result = (filter_pattern in line) if is_casesensitive else (filter_pattern.lower() in line.lower()) if is_reverse: diff --git a/tests/test_insightlog.py b/tests/test_insightlog.py index 718da2e..ccac845 100644 --- a/tests/test_insightlog.py +++ b/tests/test_insightlog.py @@ -52,6 +52,14 @@ def test_filter_data(self): data = filter_data('120.25.229.167', filepath=file_name, is_reverse=True) self.assertFalse('120.25.229.167' in data, "filter_data#4") + def test_check_match_regex_search_behavior(self): + self.assertTrue(check_match("abc123def", r"\d+", is_regex=True), "check_match_regex#1") + self.assertFalse(check_match("abc123def", r"^\d+", is_regex=True), "check_match_regex#2") + self.assertTrue(check_match("abcABCdef", r"abc", is_regex=True, is_casesensitive=False), + "check_match_regex#3") + filtered_data = filter_data(r"\d+", data="prefix 42 suffix", is_regex=True) + self.assertEqual(filtered_data, "prefix 42 suffix\n", "check_match_regex#4") + def test_get_web_requests(self): nginx_settings = get_service_settings('nginx') base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) From 39aa34c609eb9ddcf3ac2c8e3e071968cd4e30fb Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Fri, 6 Mar 2026 12:15:03 +0100 Subject: [PATCH 09/17] introduced log level filtering + tests -> all tests passed --- insightlog.py | 37 +++++++++++++++++++++++++++++++++++++ tests/test_insightlog.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/insightlog.py b/insightlog.py index 1232f79..1e7aafc 100644 --- a/insightlog.py +++ b/insightlog.py @@ -62,6 +62,10 @@ IPv4_REGEX = r'(\d+.\d+.\d+.\d+)' AUTH_USER_INVALID_USER = r'(?i)invalid\suser\s(\w+)\s' AUTH_PASS_INVALID_USER = r'(?i)failed\spassword\sfor\s(\w+)\s' +LOG_LEVEL_INFO = 'info' +LOG_LEVEL_WARNING = 'warning' +LOG_LEVEL_ERROR = 'error' +LOG_LEVEL_CHOICES = [LOG_LEVEL_INFO, LOG_LEVEL_WARNING, LOG_LEVEL_ERROR] # Validator functions @@ -222,6 +226,36 @@ def analyze_auth_request(request_info): 'IS_CLOSED': is_closed} +def get_log_level(service, request): + """Infer a normalized log level from a parsed request.""" + if service in ('nginx', 'apache2'): + try: + code = int(request.get('CODE', 0)) + except (TypeError, ValueError): + return LOG_LEVEL_INFO + if 500 <= code: + return LOG_LEVEL_ERROR + if 400 <= code: + return LOG_LEVEL_WARNING + return LOG_LEVEL_INFO + + if service == 'auth': + if request.get('INVALID_USER') or request.get('INVALID_PASS_USER') or request.get('IS_CLOSED'): + return LOG_LEVEL_ERROR + if request.get('IS_PREAUTH'): + return LOG_LEVEL_WARNING + return LOG_LEVEL_INFO + + return LOG_LEVEL_INFO + + +def filter_requests_by_level(requests, service, log_level): + """Filter parsed requests by inferred log level.""" + if not log_level: + return requests + return [request for request in requests if get_log_level(service, request) == log_level] + + # Simplified analyzer functions (replacing the class) def apply_filters(filters, data=None, filepath=None): """Apply all filters to data or file and return filtered results""" @@ -303,6 +337,8 @@ def get_requests(service, data=None, filepath=None, filters=None): parser.add_argument('--service', required=True, choices=['nginx', 'apache2', 'auth'], help='Type of log to analyze') parser.add_argument('--logfile', required=True, help='Path to the log file') parser.add_argument('--filter', required=False, default=None, help='String to filter log lines') + parser.add_argument('--log-level', required=False, choices=LOG_LEVEL_CHOICES, + help='Filter parsed requests by inferred log level') args = parser.parse_args() filters = [] @@ -311,6 +347,7 @@ def get_requests(service, data=None, filepath=None, filters=None): requests = get_requests(args.service, filepath=args.logfile, filters=filters) if requests: + requests = filter_requests_by_level(requests, args.service, args.log_level) for req in requests: print(req) diff --git a/tests/test_insightlog.py b/tests/test_insightlog.py index 718da2e..d31321b 100644 --- a/tests/test_insightlog.py +++ b/tests/test_insightlog.py @@ -52,6 +52,36 @@ def test_filter_data(self): data = filter_data('120.25.229.167', filepath=file_name, is_reverse=True) self.assertFalse('120.25.229.167' in data, "filter_data#4") + def test_log_level_helpers(self): + web_info = {'CODE': '200'} + web_warning = {'CODE': '404'} + web_error = {'CODE': '503'} + web_invalid = {'CODE': 'invalid'} + + self.assertEqual(get_log_level('nginx', web_info), LOG_LEVEL_INFO, "log_level#1") + self.assertEqual(get_log_level('apache2', web_warning), LOG_LEVEL_WARNING, "log_level#2") + self.assertEqual(get_log_level('nginx', web_error), LOG_LEVEL_ERROR, "log_level#3") + self.assertEqual(get_log_level('nginx', web_invalid), LOG_LEVEL_INFO, "log_level#4") + + auth_warning = {'IS_PREAUTH': True} + auth_error = {'INVALID_PASS_USER': 'root'} + auth_info = {'IS_PREAUTH': False, 'INVALID_USER': None, 'INVALID_PASS_USER': None, 'IS_CLOSED': False} + + self.assertEqual(get_log_level('auth', auth_warning), LOG_LEVEL_WARNING, "log_level#5") + self.assertEqual(get_log_level('auth', auth_error), LOG_LEVEL_ERROR, "log_level#6") + self.assertEqual(get_log_level('auth', auth_info), LOG_LEVEL_INFO, "log_level#7") + + mixed_requests = [ + {'CODE': '200', 'ID': 1}, + {'CODE': '404', 'ID': 2}, + {'CODE': '503', 'ID': 3}, + ] + warning_only = filter_requests_by_level(mixed_requests, 'nginx', LOG_LEVEL_WARNING) + self.assertEqual(len(warning_only), 1, "log_level#8") + self.assertEqual(warning_only[0]['ID'], 2, "log_level#9") + no_filter = filter_requests_by_level(mixed_requests, 'nginx', None) + self.assertEqual(len(no_filter), 3, "log_level#10") + def test_get_web_requests(self): nginx_settings = get_service_settings('nginx') base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) From 4c43466770b13ba046777857cd59e9662ad3d8ec Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Fri, 6 Mar 2026 12:21:40 +0100 Subject: [PATCH 10/17] introduced time range filter + test --- insightlog.py | 44 ++++++++++++++++++++++++++++++++++++++++ tests/test_insightlog.py | 24 ++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/insightlog.py b/insightlog.py index 1e7aafc..7faac15 100644 --- a/insightlog.py +++ b/insightlog.py @@ -66,6 +66,7 @@ LOG_LEVEL_WARNING = 'warning' LOG_LEVEL_ERROR = 'error' LOG_LEVEL_CHOICES = [LOG_LEVEL_INFO, LOG_LEVEL_WARNING, LOG_LEVEL_ERROR] +TIME_RANGE_FORMAT = '%Y-%m-%d %H:%M:%S' # Validator functions @@ -256,6 +257,35 @@ def filter_requests_by_level(requests, service, log_level): return [request for request in requests if get_log_level(service, request) == log_level] +def parse_datetime_value(value): + """Parse datetime string used by parsed requests and CLI time-range arguments.""" + return datetime.strptime(value, TIME_RANGE_FORMAT) + + +def filter_requests_by_time_range(requests, time_from=None, time_to=None): + """Filter parsed requests by DATETIME range (inclusive).""" + if not time_from and not time_to: + return requests + if time_from and time_to and time_from > time_to: + raise ValueError("time_from must be less than or equal to time_to") + + filtered_requests = [] + for request in requests: + request_datetime = request.get('DATETIME') + if not request_datetime: + continue + try: + parsed_datetime = parse_datetime_value(request_datetime) + except ValueError: + continue + if time_from and parsed_datetime < time_from: + continue + if time_to and parsed_datetime > time_to: + continue + filtered_requests.append(request) + return filtered_requests + + # Simplified analyzer functions (replacing the class) def apply_filters(filters, data=None, filepath=None): """Apply all filters to data or file and return filtered results""" @@ -339,8 +369,21 @@ def get_requests(service, data=None, filepath=None, filters=None): parser.add_argument('--filter', required=False, default=None, help='String to filter log lines') parser.add_argument('--log-level', required=False, choices=LOG_LEVEL_CHOICES, help='Filter parsed requests by inferred log level') + parser.add_argument('--time-from', required=False, default=None, + help='Start datetime (inclusive), format: YYYY-MM-DD HH:MM:SS') + parser.add_argument('--time-to', required=False, default=None, + help='End datetime (inclusive), format: YYYY-MM-DD HH:MM:SS') args = parser.parse_args() + try: + time_from = parse_datetime_value(args.time_from) if args.time_from else None + time_to = parse_datetime_value(args.time_to) if args.time_to else None + except ValueError: + parser.error("Invalid datetime format for --time-from/--time-to. Use YYYY-MM-DD HH:MM:SS") + + if time_from and time_to and time_from > time_to: + parser.error("--time-from must be less than or equal to --time-to") + filters = [] if args.filter: filters.append({'filter_pattern': args.filter, 'is_casesensitive': True, 'is_regex': False, 'is_reverse': False}) @@ -348,6 +391,7 @@ def get_requests(service, data=None, filepath=None, filters=None): requests = get_requests(args.service, filepath=args.logfile, filters=filters) if requests: requests = filter_requests_by_level(requests, args.service, args.log_level) + requests = filter_requests_by_time_range(requests, time_from, time_to) for req in requests: print(req) diff --git a/tests/test_insightlog.py b/tests/test_insightlog.py index d31321b..4d8a3c2 100644 --- a/tests/test_insightlog.py +++ b/tests/test_insightlog.py @@ -82,6 +82,30 @@ def test_log_level_helpers(self): no_filter = filter_requests_by_level(mixed_requests, 'nginx', None) self.assertEqual(len(no_filter), 3, "log_level#10") + def test_time_range_helpers(self): + parsed_dt = parse_datetime_value('2016-04-24 06:26:37') + self.assertEqual(parsed_dt.year, 2016, "time_range#1") + self.assertEqual(parsed_dt.minute, 26, "time_range#2") + self.assertRaises(ValueError, parse_datetime_value, '2016-04-24') + + requests = [ + {'DATETIME': '2016-04-24 06:26:37', 'ID': 1}, + {'DATETIME': '2016-04-24 06:27:37', 'ID': 2}, + {'DATETIME': '2016-04-24 06:28:37', 'ID': 3}, + {'DATETIME': 'bad-date', 'ID': 4}, + ] + time_from = parse_datetime_value('2016-04-24 06:27:37') + time_to = parse_datetime_value('2016-04-24 06:28:37') + filtered = filter_requests_by_time_range(requests, time_from, time_to) + self.assertEqual(len(filtered), 2, "time_range#3") + self.assertEqual(filtered[0]['ID'], 2, "time_range#4") + self.assertEqual(filtered[1]['ID'], 3, "time_range#5") + + only_from = filter_requests_by_time_range(requests, time_from=time_from) + self.assertEqual(len(only_from), 2, "time_range#6") + self.assertRaises(ValueError, filter_requests_by_time_range, + requests, time_to, time_from) + def test_get_web_requests(self): nginx_settings = get_service_settings('nginx') base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) From 342f83d9dc85ad74d6adf511526a84754cdf617c Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Fri, 6 Mar 2026 12:31:18 +0100 Subject: [PATCH 11/17] added logic and cli flags for csv / json export --- insightlog.py | 47 +++++++++++++++++++++++++++++++++++----- tests/test_insightlog.py | 26 ++++++++++++++++++++++ 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/insightlog.py b/insightlog.py index 7faac15..eba5035 100644 --- a/insightlog.py +++ b/insightlog.py @@ -1,5 +1,8 @@ import re import calendar +import csv +import io +import json from datetime import datetime # Service settings @@ -67,6 +70,10 @@ LOG_LEVEL_ERROR = 'error' LOG_LEVEL_CHOICES = [LOG_LEVEL_INFO, LOG_LEVEL_WARNING, LOG_LEVEL_ERROR] TIME_RANGE_FORMAT = '%Y-%m-%d %H:%M:%S' +OUTPUT_FORMAT_TEXT = 'text' +OUTPUT_FORMAT_JSON = 'json' +OUTPUT_FORMAT_CSV = 'csv' +OUTPUT_FORMAT_CHOICES = [OUTPUT_FORMAT_TEXT, OUTPUT_FORMAT_JSON, OUTPUT_FORMAT_CSV] # Validator functions @@ -128,8 +135,8 @@ def get_date_filter(settings, minute=datetime.now().minute, hour=datetime.now(). def check_match(line, filter_pattern, is_regex=False, is_casesensitive=True, is_reverse=False): """Check if line contains/matches filter pattern""" if is_regex: - check_result = re.match(filter_pattern, line) if is_casesensitive \ - else re.match(filter_pattern, line, re.IGNORECASE) + check_result = re.search(filter_pattern, line) if is_casesensitive \ + else re.search(filter_pattern, line, re.IGNORECASE) else: check_result = (filter_pattern in line) if is_casesensitive else (filter_pattern.lower() in line.lower()) if is_reverse: @@ -286,6 +293,29 @@ def filter_requests_by_time_range(requests, time_from=None, time_to=None): return filtered_requests +def format_requests_as_json(requests): + """Format parsed requests as JSON text.""" + return json.dumps(requests, indent=2, ensure_ascii=False) + + +def format_requests_as_csv(requests): + """Format parsed requests as CSV text.""" + if not requests: + return '' + + fieldnames = list(requests[0].keys()) + for request in requests[1:]: + for key in request.keys(): + if key not in fieldnames: + fieldnames.append(key) + + out_buffer = io.StringIO() + writer = csv.DictWriter(out_buffer, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(requests) + return out_buffer.getvalue() + + # Simplified analyzer functions (replacing the class) def apply_filters(filters, data=None, filepath=None): """Apply all filters to data or file and return filtered results""" @@ -373,6 +403,8 @@ def get_requests(service, data=None, filepath=None, filters=None): help='Start datetime (inclusive), format: YYYY-MM-DD HH:MM:SS') parser.add_argument('--time-to', required=False, default=None, help='End datetime (inclusive), format: YYYY-MM-DD HH:MM:SS') + parser.add_argument('--output-format', required=False, default=OUTPUT_FORMAT_TEXT, choices=OUTPUT_FORMAT_CHOICES, + help='Output format: text, json, or csv') args = parser.parse_args() try: @@ -389,9 +421,14 @@ def get_requests(service, data=None, filepath=None, filters=None): filters.append({'filter_pattern': args.filter, 'is_casesensitive': True, 'is_regex': False, 'is_reverse': False}) requests = get_requests(args.service, filepath=args.logfile, filters=filters) - if requests: + if requests is not None: requests = filter_requests_by_level(requests, args.service, args.log_level) requests = filter_requests_by_time_range(requests, time_from, time_to) - for req in requests: - print(req) + if args.output_format == OUTPUT_FORMAT_JSON: + print(format_requests_as_json(requests)) + elif args.output_format == OUTPUT_FORMAT_CSV: + print(format_requests_as_csv(requests), end='') + else: + for req in requests: + print(req) diff --git a/tests/test_insightlog.py b/tests/test_insightlog.py index 4d8a3c2..961ea5d 100644 --- a/tests/test_insightlog.py +++ b/tests/test_insightlog.py @@ -1,4 +1,5 @@ import os +import json from unittest import TestCase from datetime import datetime from insightlog import * @@ -52,6 +53,13 @@ def test_filter_data(self): data = filter_data('120.25.229.167', filepath=file_name, is_reverse=True) self.assertFalse('120.25.229.167' in data, "filter_data#4") + def test_check_match_regex_search_behavior(self): + self.assertTrue(check_match("abc123def", r"\d+", is_regex=True), "check_match#1") + self.assertFalse(check_match("abc123def", r"^\d+", is_regex=True), "check_match#2") + self.assertTrue(check_match("abcABCdef", r"abc", is_regex=True, is_casesensitive=False), "check_match#3") + filtered_data = filter_data(r"\d+", data="prefix 42 suffix", is_regex=True) + self.assertEqual(filtered_data, "prefix 42 suffix\n", "check_match#4") + def test_log_level_helpers(self): web_info = {'CODE': '200'} web_warning = {'CODE': '404'} @@ -106,6 +114,24 @@ def test_time_range_helpers(self): self.assertRaises(ValueError, filter_requests_by_time_range, requests, time_to, time_from) + def test_output_format_helpers(self): + requests = [ + {'DATETIME': '2016-04-24 06:26:37', 'IP': '127.0.0.1', 'CODE': '200'}, + {'DATETIME': '2016-04-24 06:27:37', 'IP': '127.0.0.2', 'CODE': '404'}, + ] + json_output = format_requests_as_json(requests) + parsed_json = json.loads(json_output) + self.assertEqual(len(parsed_json), 2, "output_format#1") + self.assertEqual(parsed_json[1]['CODE'], '404', "output_format#2") + + csv_output = format_requests_as_csv(requests) + csv_lines = [line for line in csv_output.splitlines() if line] + self.assertEqual(len(csv_lines), 3, "output_format#3") + self.assertTrue('DATETIME' in csv_lines[0], "output_format#4") + self.assertTrue('127.0.0.2' in csv_lines[2], "output_format#5") + self.assertEqual(format_requests_as_csv([]), '', "output_format#6") + self.assertEqual(json.loads(format_requests_as_json([])), [], "output_format#7") + def test_get_web_requests(self): nginx_settings = get_service_settings('nginx') base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) From dfc41d222f7640e9b1721631594ca42462cdbf11 Mon Sep 17 00:00:00 2001 From: Alex Fischer Date: Fri, 6 Mar 2026 12:34:13 +0100 Subject: [PATCH 12/17] updated readme for added cli flags / features (filtering and file export) --- README.md | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fdb4a32..aebfa88 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,9 @@ InsightLog is a Python script for extracting and analyzing data from server log - Filter log files by date, IP, or custom patterns - Extract web requests and authentication attempts from logs - Analyze logs from Nginx, Apache2, and system Auth logs +- Filter parsed results by inferred log level (`info`, `warning`, `error`) +- Filter parsed results by datetime range (`--time-from`, `--time-to`) +- Export parsed output as plain text, JSON, or CSV ## Installation @@ -26,6 +29,16 @@ You can run the analyzer from the CLI: python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample --filter 192.10.1.1 ``` +### CLI Options + +- `--service`: `nginx`, `apache2`, or `auth` (required) +- `--logfile`: path to the log file (required) +- `--filter`: simple line filter before parsing (optional) +- `--log-level`: `info`, `warning`, or `error` (optional) +- `--time-from`: start datetime, inclusive, format `YYYY-MM-DD HH:MM:SS` (optional) +- `--time-to`: end datetime, inclusive, format `YYYY-MM-DD HH:MM:SS` (optional) +- `--output-format`: `text`, `json`, or `csv` (optional, default: `text`) + More examples: - Analyze Apache2 logs for a specific IP: @@ -43,6 +56,31 @@ More examples: python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample ``` +- Show only warning-level Apache2 requests (typically 4xx): + ```bash + python3 insightlog.py --service apache2 --logfile logs-samples/apache1.sample --log-level warning + ``` + +- Limit results to a time range: + ```bash + python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample --time-from "2016-04-24 06:26:00" --time-to "2016-04-24 06:30:00" + ``` + +- Export as JSON: + ```bash + python3 insightlog.py --service auth --logfile logs-samples/auth.sample --output-format json + ``` + +- Export as CSV: + ```bash + python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample --output-format csv + ``` + +- Combine filters (string + log level + time range + output format): + ```bash + python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample --filter 192.10.1.1 --log-level info --time-from "2016-04-24 06:26:00" --time-to "2016-04-24 06:27:00" --output-format json + ``` + ## Known Bugs See [KNOWN_BUGS.md](KNOWN_BUGS.md) for a list of current bugs and how to replicate them. @@ -61,4 +99,4 @@ python3 -m unittest discover -s tests -v ## License -This project is licensed under the MIT License. See [LICENSE](LICENSE) for details. \ No newline at end of file +This project is licensed under the MIT License. See [LICENSE](LICENSE) for details. From 12eb7ebadb2d2abd25e09e85bc28c06b64fe6c02 Mon Sep 17 00:00:00 2001 From: "Jacob J." Date: Fri, 6 Mar 2026 16:41:00 +0100 Subject: [PATCH 13/17] Changed default values in get_date_filter to None, to handle later --- insightlog.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/insightlog.py b/insightlog.py index 1232f79..ecb1ab5 100644 --- a/insightlog.py +++ b/insightlog.py @@ -98,10 +98,10 @@ def get_service_settings(service_name): else: raise Exception("Service \""+service_name+"\" doesn't exists!") - -def get_date_filter(settings, minute=datetime.now().minute, hour=datetime.now().hour, - day=datetime.now().day, month=datetime.now().month, - year=datetime.now().year): +# Changed the default to None and set default value later to not trigger a type mismatch (logic in Line 110-114) +def get_date_filter(settings, minute=None, hour=None, + day=None, month=None, + year=None): """Get the date pattern that can be used to filter data from logs based on the params""" if not is_valid_year(year) or not is_valid_month(month) or not is_valid_day(day) \ or not is_valid_hour(hour) or not is_valid_minute(minute): From 9cc65d0c2bf86737791cf086cc642f07d2ec2343 Mon Sep 17 00:00:00 2001 From: "Jacob J." Date: Fri, 6 Mar 2026 16:45:51 +0100 Subject: [PATCH 14/17] Added logic for setting new default from None --- insightlog.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/insightlog.py b/insightlog.py index ecb1ab5..3a9281b 100644 --- a/insightlog.py +++ b/insightlog.py @@ -98,10 +98,22 @@ def get_service_settings(service_name): else: raise Exception("Service \""+service_name+"\" doesn't exists!") -# Changed the default to None and set default value later to not trigger a type mismatch (logic in Line 110-114) +# Changed the default to None and set default value later to not trigger a type mismatch (logic in Line 106-115) def get_date_filter(settings, minute=None, hour=None, day=None, month=None, year=None): + +# setting the variable now to datetime.now for easier usage later + now = datetime.now() +# All of this are if statements. For example: +# Set minute to now.minute if no argument is given (None). +# Else, set the value to the original arguments given to the function + minute = now.minute if minute is None else minute + hour = now.hour if hour is None else hour + day = now.day if day is None else day + month = now.month if month is None else month + year = now.year if year is None else year + """Get the date pattern that can be used to filter data from logs based on the params""" if not is_valid_year(year) or not is_valid_month(month) or not is_valid_day(day) \ or not is_valid_hour(hour) or not is_valid_minute(minute): From f4f82f2a3ba937d394f66b915965db1d752c243c Mon Sep 17 00:00:00 2001 From: Dein Name Date: Sun, 8 Mar 2026 15:41:09 +0100 Subject: [PATCH 15/17] Fix bug #1: filter_data raises exception instead of returning None --- insightlog.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/insightlog.py b/insightlog.py index 1232f79..93af760 100644 --- a/insightlog.py +++ b/insightlog.py @@ -143,8 +143,7 @@ def filter_data(log_filter, data=None, filepath=None, is_casesensitive=True, is_ return_data += line return return_data except (IOError, EnvironmentError) as e: - print(e.strerror) - return None + raise Exception(e.strerror) elif data: for line in data.splitlines(): if check_match(line, log_filter, is_regex, is_casesensitive, is_reverse): From 852e089535314d9a02e1bd1d56691bc7f0953c78 Mon Sep 17 00:00:00 2001 From: "Hany DH." <106172590+Hany-Dh@users.noreply.github.com> Date: Mon, 9 Mar 2026 11:45:55 +0100 Subject: [PATCH 16/17] txt file update __Refactor error handling and update IPv4 regex Updating the text o fthe file problem 4_4 possible solution .. need to be merged --- problem 4_4_possible solutions.txt | 53 +++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/problem 4_4_possible solutions.txt b/problem 4_4_possible solutions.txt index 174eed0..19a176d 100644 --- a/problem 4_4_possible solutions.txt +++ b/problem 4_4_possible solutions.txt @@ -1,6 +1,6 @@ -4. problem code location 276 +4. problem code location 279 except (IOError, EnvironmentError) as e: print(e.strerror) @@ -13,6 +13,7 @@ fix option return [] +>------------------------------------< more proffesional except (IOError, EnvironmentError) as e: @@ -22,6 +23,8 @@ except (IOError, EnvironmentError) as e: added new print to 277 print("DEBUG: File error happened here") +<--------------------------------------------> + =========================== @@ -37,26 +40,52 @@ best choice full correction IPv4_REGEX = r'\b((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\b' +>--------------------------------< + +IPv4_REGEX = r'(\d+.\d+.\d+.\d+)' + +# Simplified regex for IPv4, can be improved to be more strict if needed + +# correct regex for + +IPv4: r'((25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){3}(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)' + +this is a standard format for 0-255.0-255.0-255.0-255 formating reading correctly like in real IPv4 addresses + +################################## + + +---> Correcting this part +---> ipv4 = re.findall(IPv4_REGEX, request_info) +But your regex contains capturing groups, so findall() returns tuples instead of the full IP string +==== + +def analyze_auth_request(request_info): + """Analyze request info and returns main data""" + + ipv4_match = re.search(IPv4_REGEX, request_info) + ipv4 = ipv4_match.group(0) if ipv4_match else None -Python built-in validation: + is_preauth = '[preauth]' in request_info.lower() -import ipaddress + invalid_user = re.findall(AUTH_USER_INVALID_USER, request_info) + invalid_pass_user = re.findall(AUTH_PASS_INVALID_USER, request_info) -def extract_ip(text): - for word in text.split(): - try: - return str(ipaddress.IPv4Address(word)) - except: - continue - return None + is_closed = 'connection closed by ' in request_info.lower() + return { + 'IP': ipv4, + 'INVALID_USER': invalid_user[0] if invalid_user else None, + 'INVALID_PASS_USER': invalid_pass_user[0] if invalid_pass_user else None, + 'IS_PREAUTH': is_preauth, + 'IS_CLOSED': is_closed + } -git clone --branch test-group-a https://github.com/AlexCasF/InsightLog.git - \ No newline at end of file + From 330374a06415d97c1e84bd8e35f08bdb1f09f6ae Mon Sep 17 00:00:00 2001 From: "Hany DH." <106172590+Hany-Dh@users.noreply.github.com> Date: Mon, 9 Mar 2026 11:56:19 +0100 Subject: [PATCH 17/17] adding new txt file wiht possible solutions with new name adding new txt file wiht possible solutions with new name th enw name is problems_4_5_possible_solutions.txt --- problems_4_5_possible_solutions.txt | 85 +++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 problems_4_5_possible_solutions.txt diff --git a/problems_4_5_possible_solutions.txt b/problems_4_5_possible_solutions.txt new file mode 100644 index 0000000..e58db08 --- /dev/null +++ b/problems_4_5_possible_solutions.txt @@ -0,0 +1,85 @@ + + +4. problem code location 279 + + except (IOError, EnvironmentError) as e: + print(e.strerror) + return None + +fix option + + except (IOError, EnvironmentError) as e: + print(e.strerror) + return [] + + +>------------------------------------< +more proffesional + +except (IOError, EnvironmentError) as e: + raise FileNotFoundError(f"Log file not found: {filepath}") from e + + +added new print to 277 + +print("DEBUG: File error happened here") +<--------------------------------------------> + + +=========================== + +5. problem code location 62 / 212 + + IPv4_REGEX = r'(\d+.\d+.\d+.\d+)' + +fix option + + IPv4_REGEX = r'(\d+\.\d+\.\d+\.\d+)' + +best choice full correction + +IPv4_REGEX = r'\b((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\b' + +>--------------------------------< + +IPv4_REGEX = r'(\d+.\d+.\d+.\d+)' + +# Simplified regex for IPv4, can be improved to be more strict if needed + +# correct regex for + +IPv4: r'((25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){3}(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)' + +this is a standard format for 0-255.0-255.0-255.0-255 formating reading correctly like in real IPv4 addresses + +################################## + + +---> Correcting this part +---> ipv4 = re.findall(IPv4_REGEX, request_info) +But your regex contains capturing groups, so findall() returns tuples instead of the full IP string +==== + +def analyze_auth_request(request_info): + """Analyze request info and returns main data""" + + ipv4_match = re.search(IPv4_REGEX, request_info) + ipv4 = ipv4_match.group(0) if ipv4_match else None + + is_preauth = '[preauth]' in request_info.lower() + + invalid_user = re.findall(AUTH_USER_INVALID_USER, request_info) + invalid_pass_user = re.findall(AUTH_PASS_INVALID_USER, request_info) + + is_closed = 'connection closed by ' in request_info.lower() + + return { + 'IP': ipv4, + 'INVALID_USER': invalid_user[0] if invalid_user else None, + 'INVALID_PASS_USER': invalid_pass_user[0] if invalid_pass_user else None, + 'IS_PREAUTH': is_preauth, + 'IS_CLOSED': is_closed + } + + +