Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ea736cc
created test files and modified insghtlog according to possible solut…
Hany-Dh Mar 4, 2026
003574e
wrote a helper fucntion using chardet to replace standard read function
AlexCasF Mar 5, 2026
839dd8f
replaced default file read logic w new helper fct inside "filter_data()"
AlexCasF Mar 5, 2026
a40b925
same for "apply_filters()"
AlexCasF Mar 5, 2026
5b63029
same for "get_requests()"
AlexCasF Mar 5, 2026
dff60a3
created some non-UTF-8 sample logs for testing
AlexCasF Mar 6, 2026
330e9ed
stronger error handling + only trust chardet if confidence > 70%
AlexCasF Mar 6, 2026
de42016
literally just swapped regex-method as proposed in bug description ->…
AlexCasF Mar 6, 2026
39aa34c
introduced log level filtering + tests -> all tests passed
AlexCasF Mar 6, 2026
4c43466
introduced time range filter + test
AlexCasF Mar 6, 2026
342f83d
added logic and cli flags for csv / json export
AlexCasF Mar 6, 2026
dfc41d2
updated readme for added cli flags / features (filtering and file exp…
AlexCasF Mar 6, 2026
12eb7eb
Changed default values in get_date_filter to None, to handle later
jcbjung Mar 6, 2026
9cc65d0
Added logic for setting new default from None
jcbjung Mar 6, 2026
42892b1
Merge branch 'check_match-use-re.search' into test-group-a
AlexCasF Mar 9, 2026
f90a265
Merge remote-tracking branch 'origin/get-auth-year-flawed-detect' int…
AlexCasF Mar 9, 2026
71c8fc3
Merge branch 'test-group-a' into cli-time-range-loglevel-export
AlexCasF Mar 9, 2026
da4a3c5
Merge pull request #3 from AlexCasF/cli-time-range-loglevel-export
AlexCasF Mar 9, 2026
e1c2342
Merge branch 'test-group-a' into check_file_encoding
AlexCasF Mar 9, 2026
25b77c4
Merge pull request #2 from AlexCasF/check_file_encoding
AlexCasF Mar 9, 2026
852e089
txt file update __Refactor error handling and update IPv4 regex
Hany-Dh Mar 9, 2026
add7fb7
Rename file
Hany-Dh Mar 9, 2026
330374a
adding new txt file wiht possible solutions with new name
Hany-Dh Mar 9, 2026
ae63fc8
Merge branch 'test-group-a' into hany-complicated-issue
AlexCasF Mar 9, 2026
ea381d6
Add IPv4 regex improvements and refactor analyze_auth_request function
Hany-Dh Mar 9, 2026
f0af2ff
Enhance README with new filtering and export options; add non-UTF-8 s…
Hany-Dh Mar 9, 2026
a5633f2
deleted: insightlog - Kopie.py
Hany-Dh Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ InsightLog is a Python script for extracting and analyzing data from server log
- Filter log files by date, IP, or custom patterns
- Extract web requests and authentication attempts from logs
- Analyze logs from Nginx, Apache2, and system Auth logs
- Filter parsed results by inferred log level (`info`, `warning`, `error`)
- Filter parsed results by datetime range (`--time-from`, `--time-to`)
- Export parsed output as plain text, JSON, or CSV

## Installation

Expand All @@ -26,6 +29,16 @@ You can run the analyzer from the CLI:
python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample --filter 192.10.1.1
```

### CLI Options

- `--service`: `nginx`, `apache2`, or `auth` (required)
- `--logfile`: path to the log file (required)
- `--filter`: simple line filter before parsing (optional)
- `--log-level`: `info`, `warning`, or `error` (optional)
- `--time-from`: start datetime, inclusive, format `YYYY-MM-DD HH:MM:SS` (optional)
- `--time-to`: end datetime, inclusive, format `YYYY-MM-DD HH:MM:SS` (optional)
- `--output-format`: `text`, `json`, or `csv` (optional, default: `text`)

More examples:

- Analyze Apache2 logs for a specific IP:
Expand All @@ -43,6 +56,31 @@ More examples:
python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample
```

- Show only warning-level Apache2 requests (typically 4xx):
```bash
python3 insightlog.py --service apache2 --logfile logs-samples/apache1.sample --log-level warning
```

- Limit results to a time range:
```bash
python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample --time-from "2016-04-24 06:26:00" --time-to "2016-04-24 06:30:00"
```

- Export as JSON:
```bash
python3 insightlog.py --service auth --logfile logs-samples/auth.sample --output-format json
```

- Export as CSV:
```bash
python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample --output-format csv
```

- Combine filters (string + log level + time range + output format):
```bash
python3 insightlog.py --service nginx --logfile logs-samples/nginx1.sample --filter 192.10.1.1 --log-level info --time-from "2016-04-24 06:26:00" --time-to "2016-04-24 06:27:00" --output-format json
```

## Known Bugs

See [KNOWN_BUGS.md](KNOWN_BUGS.md) for a list of current bugs and how to replicate them.
Expand All @@ -61,4 +99,4 @@ python3 -m unittest discover -s tests -v

## License

This project is licensed under the MIT License. See [LICENSE](LICENSE) for details.
This project is licensed under the MIT License. See [LICENSE](LICENSE) for details.
3 changes: 3 additions & 0 deletions check_empty_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from insightlog import get_requests

print(get_requests("nginx", filepath="empty.log"))
240 changes: 206 additions & 34 deletions insightlog.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import re
import calendar
import chardet
import csv
import io
import json
from datetime import datetime

# Service settings
Expand Down Expand Up @@ -59,9 +63,20 @@
'auth': DEFAULT_AUTH
}

IPv4_REGEX = r'(\d+.\d+.\d+.\d+)'
IPv4_REGEX = r'(\d+.\d+.\d+.\d+)' # Simplified regex for IPv4, can be improved to be more strict if needed
# correct regex for IPv4: r'((25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){3}(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)'

AUTH_USER_INVALID_USER = r'(?i)invalid\suser\s(\w+)\s'
AUTH_PASS_INVALID_USER = r'(?i)failed\spassword\sfor\s(\w+)\s'
LOG_LEVEL_INFO = 'info'
LOG_LEVEL_WARNING = 'warning'
LOG_LEVEL_ERROR = 'error'
LOG_LEVEL_CHOICES = [LOG_LEVEL_INFO, LOG_LEVEL_WARNING, LOG_LEVEL_ERROR]
TIME_RANGE_FORMAT = '%Y-%m-%d %H:%M:%S'
OUTPUT_FORMAT_TEXT = 'text'
OUTPUT_FORMAT_JSON = 'json'
OUTPUT_FORMAT_CSV = 'csv'
OUTPUT_FORMAT_CHOICES = [OUTPUT_FORMAT_TEXT, OUTPUT_FORMAT_JSON, OUTPUT_FORMAT_CSV]


# Validator functions
Expand Down Expand Up @@ -89,6 +104,38 @@ def is_valid_minute(minute):
"""Check if minute value is valid"""
return (minute == '*') or (59 >= minute >= 0)

# Helper function to check encoding of file before opening, instead of just open in UTF-8 by default
def read_text_file(filepath):
try:
with open(filepath, "rb") as f:
raw_data = f.read()
if not raw_data:
return ""

# Use a deterministic decode chain to avoid brittle single-shot detection.
preferred_encodings = ["utf-8", "utf-8-sig"]
detected = chardet.detect(raw_data) or {}
detected_encoding = detected.get("encoding")
detected_confidence = detected.get("confidence") or 0.0

candidate_encodings = list(preferred_encodings)
if detected_encoding and detected_confidence >= 0.70:
candidate_encodings.append(detected_encoding)
candidate_encodings.extend(["cp1252", "latin-1"])

seen = set()
for encoding in candidate_encodings:
normalized = encoding.lower()
if normalized in seen:
continue
seen.add(normalized)
try:
return raw_data.decode(encoding)
except UnicodeDecodeError:
continue
return None
except (FileNotFoundError, OSError):
return None

# Utility functions
def get_service_settings(service_name):
Expand All @@ -98,10 +145,22 @@ def get_service_settings(service_name):
else:
raise Exception("Service \""+service_name+"\" doesn't exists!")

# Changed the default to None and set default value later to not trigger a type mismatch (logic in Line 106-115)
def get_date_filter(settings, minute=None, hour=None,
day=None, month=None,
year=None):

# setting the variable now to datetime.now for easier usage later
now = datetime.now()
# All of this are if statements. For example:
# Set minute to now.minute if no argument is given (None).
# Else, set the value to the original arguments given to the function
minute = now.minute if minute is None else minute
hour = now.hour if hour is None else hour
day = now.day if day is None else day
month = now.month if month is None else month
year = now.year if year is None else year

def get_date_filter(settings, minute=datetime.now().minute, hour=datetime.now().hour,
day=datetime.now().day, month=datetime.now().month,
year=datetime.now().year):
"""Get the date pattern that can be used to filter data from logs based on the params"""
if not is_valid_year(year) or not is_valid_month(month) or not is_valid_day(day) \
or not is_valid_hour(hour) or not is_valid_minute(minute):
Expand All @@ -123,8 +182,8 @@ def get_date_filter(settings, minute=datetime.now().minute, hour=datetime.now().
def check_match(line, filter_pattern, is_regex=False, is_casesensitive=True, is_reverse=False):
"""Check if line contains/matches filter pattern"""
if is_regex:
check_result = re.match(filter_pattern, line) if is_casesensitive \
else re.match(filter_pattern, line, re.IGNORECASE)
check_result = re.search(filter_pattern, line) if is_casesensitive \
else re.search(filter_pattern, line, re.IGNORECASE)
else:
check_result = (filter_pattern in line) if is_casesensitive else (filter_pattern.lower() in line.lower())
if is_reverse:
Expand All @@ -136,15 +195,13 @@ def filter_data(log_filter, data=None, filepath=None, is_casesensitive=True, is_
"""Filter received data/file content and return the results"""
return_data = ""
if filepath:
try:
with open(filepath, 'r') as file_object:
for line in file_object:
if check_match(line, log_filter, is_regex, is_casesensitive, is_reverse):
return_data += line
return return_data
except (IOError, EnvironmentError) as e:
print(e.strerror)
file_data = read_text_file(filepath)
if file_data is None:
return None
for line in file_data.splitlines(True):
if check_match(line, log_filter, is_regex, is_casesensitive, is_reverse):
return_data += line
return return_data
elif data:
for line in data.splitlines():
if check_match(line, log_filter, is_regex, is_casesensitive, is_reverse):
Expand Down Expand Up @@ -209,33 +266,123 @@ def get_auth_requests(data, pattern, date_pattern=None, date_keys=None):


def analyze_auth_request(request_info):
"""Analyze request info and returns main data (IP, invalid user, invalid password's user, is_preauth, is_closed)"""
ipv4 = re.findall(IPv4_REGEX, request_info)
"""Analyze request info and returns main data"""

ipv4_match = re.search(IPv4_REGEX, request_info)
ipv4 = ipv4_match.group(0) if ipv4_match else None

is_preauth = '[preauth]' in request_info.lower()

invalid_user = re.findall(AUTH_USER_INVALID_USER, request_info)
invalid_pass_user = re.findall(AUTH_PASS_INVALID_USER, request_info)

is_closed = 'connection closed by ' in request_info.lower()
return {'IP': ipv4[0] if ipv4 else None,
'INVALID_USER': invalid_user[0] if invalid_user else None,
'INVALID_PASS_USER': invalid_pass_user[0] if invalid_pass_user else None,
'IS_PREAUTH': is_preauth,
'IS_CLOSED': is_closed}

return {
'IP': ipv4,
'INVALID_USER': invalid_user[0] if invalid_user else None,
'INVALID_PASS_USER': invalid_pass_user[0] if invalid_pass_user else None,
'IS_PREAUTH': is_preauth,
'IS_CLOSED': is_closed
}




def get_log_level(service, request):
"""Infer a normalized log level from a parsed request."""
if service in ('nginx', 'apache2'):
try:
code = int(request.get('CODE', 0))
except (TypeError, ValueError):
return LOG_LEVEL_INFO
if 500 <= code:
return LOG_LEVEL_ERROR
if 400 <= code:
return LOG_LEVEL_WARNING
return LOG_LEVEL_INFO

if service == 'auth':
if request.get('INVALID_USER') or request.get('INVALID_PASS_USER') or request.get('IS_CLOSED'):
return LOG_LEVEL_ERROR
if request.get('IS_PREAUTH'):
return LOG_LEVEL_WARNING
return LOG_LEVEL_INFO

return LOG_LEVEL_INFO


def filter_requests_by_level(requests, service, log_level):
"""Filter parsed requests by inferred log level."""
if not log_level:
return requests
return [request for request in requests if get_log_level(service, request) == log_level]


def parse_datetime_value(value):
"""Parse datetime string used by parsed requests and CLI time-range arguments."""
return datetime.strptime(value, TIME_RANGE_FORMAT)


def filter_requests_by_time_range(requests, time_from=None, time_to=None):
"""Filter parsed requests by DATETIME range (inclusive)."""
if not time_from and not time_to:
return requests
if time_from and time_to and time_from > time_to:
raise ValueError("time_from must be less than or equal to time_to")

filtered_requests = []
for request in requests:
request_datetime = request.get('DATETIME')
if not request_datetime:
continue
try:
parsed_datetime = parse_datetime_value(request_datetime)
except ValueError:
continue
if time_from and parsed_datetime < time_from:
continue
if time_to and parsed_datetime > time_to:
continue
filtered_requests.append(request)
return filtered_requests


def format_requests_as_json(requests):
"""Format parsed requests as JSON text."""
return json.dumps(requests, indent=2, ensure_ascii=False)


def format_requests_as_csv(requests):
"""Format parsed requests as CSV text."""
if not requests:
return ''

fieldnames = list(requests[0].keys())
for request in requests[1:]:
for key in request.keys():
if key not in fieldnames:
fieldnames.append(key)

out_buffer = io.StringIO()
writer = csv.DictWriter(out_buffer, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(requests)
return out_buffer.getvalue()


# Simplified analyzer functions (replacing the class)
def apply_filters(filters, data=None, filepath=None):
"""Apply all filters to data or file and return filtered results"""
if filepath:
try:
with open(filepath, 'r') as file_object:
filtered_lines = []
for line in file_object:
if check_all_matches(line, filters):
filtered_lines.append(line)
return ''.join(filtered_lines)
except (IOError, EnvironmentError) as e:
print(e.strerror)
file_data = read_text_file(filepath)
if file_data is None:
return None
filtered_lines = []
for line in file_data.splitlines(True):
if check_all_matches(line, filters):
filtered_lines.append(line)
return ''.join(filtered_lines)
elif data:
filtered_lines = []
for line in data.splitlines():
Expand Down Expand Up @@ -274,8 +421,9 @@ def get_requests(service, data=None, filepath=None, filters=None):
with open(filepath, 'r') as f:
filtered_data = f.read()
except (IOError, EnvironmentError) as e:
print("DEBUG: File error happened here")
print(e.strerror)
return None
return [] # Return empty list if file can't be read ### Corrected from None to empty list to avoid issues in the main function ###
else:
filtered_data = data

Expand Down Expand Up @@ -303,14 +451,38 @@ def get_requests(service, data=None, filepath=None, filters=None):
parser.add_argument('--service', required=True, choices=['nginx', 'apache2', 'auth'], help='Type of log to analyze')
parser.add_argument('--logfile', required=True, help='Path to the log file')
parser.add_argument('--filter', required=False, default=None, help='String to filter log lines')
parser.add_argument('--log-level', required=False, choices=LOG_LEVEL_CHOICES,
help='Filter parsed requests by inferred log level')
parser.add_argument('--time-from', required=False, default=None,
help='Start datetime (inclusive), format: YYYY-MM-DD HH:MM:SS')
parser.add_argument('--time-to', required=False, default=None,
help='End datetime (inclusive), format: YYYY-MM-DD HH:MM:SS')
parser.add_argument('--output-format', required=False, default=OUTPUT_FORMAT_TEXT, choices=OUTPUT_FORMAT_CHOICES,
help='Output format: text, json, or csv')
args = parser.parse_args()

try:
time_from = parse_datetime_value(args.time_from) if args.time_from else None
time_to = parse_datetime_value(args.time_to) if args.time_to else None
except ValueError:
parser.error("Invalid datetime format for --time-from/--time-to. Use YYYY-MM-DD HH:MM:SS")

if time_from and time_to and time_from > time_to:
parser.error("--time-from must be less than or equal to --time-to")

filters = []
if args.filter:
filters.append({'filter_pattern': args.filter, 'is_casesensitive': True, 'is_regex': False, 'is_reverse': False})

requests = get_requests(args.service, filepath=args.logfile, filters=filters)
if requests:
for req in requests:
print(req)
if requests is not None:
requests = filter_requests_by_level(requests, args.service, args.log_level)
requests = filter_requests_by_time_range(requests, time_from, time_to)
if args.output_format == OUTPUT_FORMAT_JSON:
print(format_requests_as_json(requests))
elif args.output_format == OUTPUT_FORMAT_CSV:
print(format_requests_as_csv(requests), end='')
else:
for req in requests:
print(req)

Loading