-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresponse_engine.py
More file actions
166 lines (134 loc) · 4.03 KB
/
response_engine.py
File metadata and controls
166 lines (134 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python3
import time
import smtplib
import subprocess
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
# ✅ Your email details
SENDER_EMAIL = "favour.ids.monitor@gmail.com"
RECEIVER_EMAIL = "favour.ids.alerts@gmail.com"
APP_PASSWORD = "jdoqsyjiilywqwjq"
# Log file location
LOG_FILE = "/var/log/suricata/fast.log"
# Track already responded alerts
responded_alerts = set()
def send_email(subject, body):
"""Send email alert"""
try:
msg = MIMEMultipart()
msg['From'] = SENDER_EMAIL
msg['To'] = RECEIVER_EMAIL
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(SENDER_EMAIL, APP_PASSWORD)
server.send_message(msg)
server.quit()
print(f"[+] Email sent: {subject}")
except Exception as e:
print(f"[!] Email failed: {e}")
def block_ip(ip):
"""Auto block IP using iptables"""
try:
subprocess.run([
'iptables', '-A', 'INPUT',
'-s', ip, '-j', 'DROP'
], check=True)
print(f"[!!!] BLOCKED IP: {ip}")
except Exception as e:
print(f"[!] Block failed: {e}")
def extract_ip(line):
"""Extract source IP from alert line"""
try:
# Suricata format: timestamp [**] [sid] msg [**] [class] [priority] {protocol} src_ip:port -> dst_ip:port
parts = line.split('{')
if len(parts) > 1:
ip_part = parts[-1].split('}')[-1].strip()
src_ip = ip_part.split(':')[0].strip()
return src_ip
except:
pass
return None
def handle_alert(line):
"""Decide response based on priority"""
# Skip if already handled
if line in responded_alerts:
return
responded_alerts.add(line)
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
ip = extract_ip(line)
# 🔴 CRITICAL — Auto block + Email
if 'FTP' in line or 'MySQL' in line:
print(f"[!!!] CRITICAL ALERT at {timestamp}")
# Block the IP
if ip:
block_ip(ip)
# Send email
send_email(
subject="🚨 CRITICAL ALERT — Favour's IDS",
body=f"""
CRITICAL SECURITY ALERT!
Time: {timestamp}
Alert: {line}
Source IP: {ip}
Action Taken: IP has been automatically blocked!
Review immediately!
Favour's IDS Response Engine
"""
)
# 🟡 WARNING — Email only
elif 'Port Scan' in line or 'HTTP' in line:
print(f"[!!] WARNING ALERT at {timestamp}")
send_email(
subject="⚠️ WARNING — Favour's IDS",
body=f"""
WARNING SECURITY ALERT!
Time: {timestamp}
Alert: {line}
Source IP: {ip}
Action Taken: Alert logged and reported
Monitor this activity closely!
Favour's IDS Response Engine
"""
)
# 🔵 INFO — Email only
elif 'Ping' in line or 'ICMP' in line:
print(f"[i] INFO ALERT at {timestamp}")
send_email(
subject="ℹ️ INFO — Ping Detected — Favour's IDS",
body=f"""
INFO ALERT
Time: {timestamp}
Alert: {line}
Source IP: {ip}
Action Taken: Logged for monitoring
Favour's IDS Response Engine
"""
)
def monitor_log():
"""Watch fast.log for new alerts"""
print("🛡️ Favour's IDS Response Engine Started!")
print("👀 Watching for alerts...")
print("Press Ctrl+C to stop\n")
with open(LOG_FILE, 'r') as f:
# Go to end of file
f.seek(0, 2)
while True:
line = f.readline()
if line:
line = line.strip()
if line:
print(f"[*] New alert: {line[:60]}...")
handle_alert(line)
else:
time.sleep(1)
# Run
try:
monitor_log()
except KeyboardInterrupt:
print("\n[*] Response Engine stopped!")
except FileNotFoundError:
print(f"[!] Log file not found: {LOG_FILE}")
print("[!] Make sure Suricata is running first!")