-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpentestpycap.py
More file actions
358 lines (329 loc) · 16.3 KB
/
pentestpycap.py
File metadata and controls
358 lines (329 loc) · 16.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import scapy.all as scapy
from scapy.layers import dns, http, smb
from scapy.layers.inet import IP, TCP, UDP
from scapy.all import sniff, conf, get_if_addr, TCP, IP, Raw
import socket
import urllib.parse
import base64
import threading
import sys
import time
import random
def sniff(interface):
"""
Esta función inicia el proceso de escucha en la interfaz de red especificada.
Args:
interface (str): El nombre de la interfaz de red en la que escuchar.
"""
scapy.sniff(iface=interface, store=False, prn=sniffed_packet)
def get_ip(domain):
"""
Esta función obtiene la dirección IP de un dominio.
Args:
domain (str): El dominio del que obtener la dirección IP.
Returns:
str: La dirección IP del dominio, o None si el dominio no se puede resolver.
"""
try:
return socket.gethostbyname(domain)
except socket.gaierror:
return None
def parse_http_payload(payload):
"""
Esta función analiza la carga útil de un paquete HTTP para extraer el nombre de usuario y la contraseña.
Args:
payload (str): La carga útil del paquete HTTP.
Returns:
tuple: El nombre de usuario y la contraseña extraídos de la carga útil, o None si no se pueden extraer.
"""
payload = urllib.parse.unquote(payload)
username = None
password = None
if 'usernameLogin' in payload and 'passLogin' in payload:
username = payload.split('usernameLogin=')[1].split('&')[0]
password = payload.split('passLogin=')[1].split('&')[0]
return username, password
# Este Payload captura el nombre de usuario y la contraseña en FTP
def parse_ftp_payload(payload):
username = None
password = None
if 'USER' in payload:
username = payload.split('USER ')[1].split('\r\n')[0]
if 'PASS' in payload:
password = payload.split('PASS ')[1].split('\r\n')[0]
return username, password
# Este Payload captura el nombre de usuario y la contraseña en SMTP, ademas DATA es el inicio de los datos
def parse_smtp_payload(payload):
username = None
password = None
data = None
in_data = False
lines = payload.split('\r\n')
for line in lines:
if in_data:
if line == '.':
# End of data
in_data = False
else:
# Append line to data
if data is None:
data = line
else:
data += '\n' + line
elif line.startswith('AUTH LOGIN'):
# The username and password are base64 encoded
username = base64.b64decode(line.split(' ')[2]).decode('utf-8')
elif line.startswith('AUTH PLAIN'):
# The username and password are base64 encoded and separated by a null byte
auth_str = base64.b64decode(line.split(' ')[2]).decode('utf-8')
username, password = auth_str.split('\x00')[1:]
elif line.startswith('MAIL FROM:'):
username = line.split(':')[1]
elif line.startswith('RCPT TO:'):
password = line.split(':')[1]
elif line == 'DATA':
# Start of data
in_data = True
return username, password, data
def get_icmp_error_description(type, code):
"""
Esta función devuelve la descripción del error ICMP correspondiente al tipo y código proporcionados.
Args:
type (int): El tipo de error ICMP.
code (int): El código de error ICMP.
Returns:
str: La descripción del error ICMP, o "Descripción no disponible" si el tipo y el código no se encuentran en el diccionario.
Ejemplo:
>>> get_icmp_error_description(3, 0)
'Red inalcanzable'
>>> get_icmp_error_description(4, 0)
'La fuente de la red está congestionada'
>>> get_icmp_error_description(100, 100)
'Descripción no disponible'
"""
icmp_error_descriptions = {
3: {
0: "Red inalcanzable",
1: "Host inalcanzable",
2: "Protocolo inalcanzable",
3: "Puerto inalcanzable",
4: "Fragmentación necesaria pero no permitida",
5: "Ruta de origen fallida",
6: "Red desconocida",
7: "Host desconocido",
8: "Host de origen aislado",
9: "Comunicación con red de destino administrativamente prohibida",
10: "Comunicación con host de destino administrativamente prohibida",
11: "Red inalcanzable para el servicio requerido",
12: "Host inalcanzable para el servicio requerido",
},
4: {
0: "La fuente de la red está congestionada",
},
5: {
0: "Redireccionar datagramas para la red",
1: "Redireccionar datagramas para el host",
2: "Redireccionar datagramas para el servicio de red y host",
3: "Redireccionar datagramas para el servicio de host",
},
11: {
0: "Tiempo excedido en tránsito",
1: "Tiempo excedido en reensamblado",
},
}
return icmp_error_descriptions.get(type, {}).get(code, "Descripción no disponible")
def sniffed_packet(packet):
"""
Esta función procesa un paquete capturado, analiza su contenido y escribe la información relevante en un archivo.
Args:
packet (scapy.Packet): El paquete capturado a procesar.
Nota:
Esta función escribe la información del paquete en un archivo llamado 'informe.txt'.
Si el archivo no existe, se creará automáticamente.
"""
try:
with open('informe.txt', 'a') as f:
if packet.haslayer(scapy.ICMP):
# Procesa paquetes ICMP
if packet[scapy.ICMP].type in [3, 4, 5, 11]:
ip_destino = packet[scapy.IP].dst
if ip_destino not in seen_icmp_errors:
seen_icmp_errors.add(ip_destino)
# Guarda información del paquete ICMP en el archivo .txt
f.write("ERROR: Paquete ICMP con error detectado\n")
f.write("Tipo: " + str(packet[scapy.ICMP].type) + "\n")
f.write("Código: " + str(packet[scapy.ICMP].code) + "\n")
f.write("Descripción: " + get_icmp_error_description(packet[scapy.ICMP].type, packet[scapy.ICMP].code) + "\n")
f.write("IP origen: " + packet[scapy.IP].src + "\n")
f.write("IP destino: " + ip_destino + "\n\n")
if packet.haslayer(dns.DNSQR):
# Procesa consultas DNS
domain = packet[dns.DNSQR].qname.decode('utf-8').rstrip('.')
if domain not in seen_domains:
seen_domains.add(domain)
ip = get_ip(domain)
if ip is not None:
# Guarda información del paquete DNS en el archivo .txt
f.write("Paquete DNS detectado\n")
f.write("Protocolo: " + str(packet[scapy.IP].proto) + "\n") # Log protocol
f.write("Puerto destino: " + str(packet[scapy.UDP].dport) + "\n") # Log destination port
f.write("IP origen: " + packet[scapy.IP].src + "\n")
f.write("IP destino (servidor web o página web): " + ip + "\n")
f.write("Nombre de dominio consultado: " + domain + "\n\n")
if packet.haslayer(http.HTTPRequest):
"""
Este bloque de código procesa los paquetes que contienen solicitudes HTTP.
Extrae información relevante como el host, la ruta, la IP de destino y la URL completa.
Si el host no ha sido visto antes, se agrega a la lista de dominios vistos y se escribe una alerta en el archivo de informe.
Si el paquete contiene datos en bruto, se intenta extraer la información de autenticación y se escribe una alerta en el archivo de informe si se encuentra.
"""
# Extrae el host, la ruta, la IP de destino y la URL completa de la solicitud HTTP
http_host = packet[http.HTTPRequest].Host.decode('utf-8')
http_path = packet[http.HTTPRequest].Path.decode('utf-8')
http_ip = packet[scapy.IP].dst
http_url = "http://" + http_host + http_path
# Si el host no ha sido visto antes, agrega una alerta al archivo de informe
if http_host not in seen_domains:
seen_domains.add(http_host)
info = "(Alerta!)Paquete HTTP detectado (No es seguro)\n" + \
"Protocolo: " + str(packet[scapy.IP].proto) + "\n" + \
"Puerto destino: " + str(packet[scapy.TCP].dport) + "\n" + \
"Host: " + http_host + "\n" + \
"URL: " + http_url + "\n" + \
"IP destino (servidor web o página web): " + http_ip + "\n\n"
print("\n" + info) # Imprime la alerta en la consola
f.write(info) # Escribe la alerta en el archivo de informe
# Si el paquete contiene datos en bruto, intenta extraer la información de autenticación
if packet.haslayer(scapy.Raw):
username, password = parse_http_payload(packet[scapy.Raw].load.decode('utf-8'))
if username and password:
info = "(Alerta!)Autenticación capturada mediante HTTP\n" + \
"URL: " + http_url + "\n" + \
"Nombre de usuario: " + username + "\n" + \
"Contraseña: " + password + "\n\n"
print("\n" + info) # Imprime la alerta en la consola
f.write(info) # Escribe la alerta en el archivo de informe
if packet.haslayer(TCP) and packet[TCP].dport == 21 and packet.haslayer(Raw):
# Procesa paquetes FTP
ftp_ip = packet[IP].dst
ftp_port = packet[TCP].dport
try:
ftp_payload = packet[Raw].load.decode('latin-1')
except UnicodeDecodeError:
ftp_payload = packet[Raw].load.decode('latin-1', 'ignore')
if ftp_ip not in seen_domains:
seen_domains.add(ftp_ip)
info = "(Alerta!)Paquete FTP detectado (No es seguro)\n" + \
"Protocolo: " + str(packet[IP].proto) + "\n" + \
"Puerto destino: " + str(ftp_port) + "\n" + \
"IP destino (servidor FTP): " + ftp_ip + "\n" + \
"Carga útil FTP: " + ftp_payload + "\n\n"
print("\n" + info)
f.write(info)
username, password = parse_ftp_payload(ftp_payload)
if username and password:
info = "(Alerta!)Autenticación capturada mediante FTP\n" + \
"IP destino: " + ftp_ip + "\n" + \
"Nombre de usuario: " + username + "\n" + \
"Contraseña: " + password + "\n\n"
print("\n" + info)
f.write(info)
if packet.haslayer(TCP) and packet[TCP].dport == 25 and packet.haslayer(Raw):
# Procesa paquetes SMTP
smtp_ip = packet[IP].dst
smtp_port = packet[TCP].dport
try:
smtp_payload = packet[Raw].load.decode('latin-1')
except UnicodeDecodeError:
smtp_payload = packet[Raw].load.decode('latin-1', 'ignore')
if smtp_ip not in seen_domains:
seen_domains.add(smtp_ip)
info = "(Alerta!)Paquete SMTP detectado (No es seguro)\n" + \
"Protocolo: " + str(packet[IP].proto) + "\n" + \
"Puerto destino: " + str(smtp_port) + "\n" + \
"IP destino (servidor SMTP): " + smtp_ip + "\n" + \
"Carga útil SMTP: " + smtp_payload + "\n\n"
print("\n" + info)
f.write(info)
username, password = parse_smtp_payload(smtp_payload)
if username and password:
info = "(Alerta!)Autenticación capturada mediante SMTP\n" + \
"IP destino: " + smtp_ip + "\n" + \
"Nombre de usuario: " + username + "\n" + \
"Contraseña: " + password + "\n\n"
print("\n" + info)
f.write(info)
if packet.haslayer(TCP) and packet[TCP].dport == 445:
# Procesa paquetes SMB
smb_ip_src = packet[IP].src
smb_ip_dst = packet[IP].dst
smb_port_src = packet[TCP].sport
smb_port_dst = packet[TCP].dport
info = "(Alerta!)Paquete SMB detectado\n" + \
"IP origen: " + smb_ip_src + "\n" + \
"IP destino: " + smb_ip_dst + "\n" + \
"Puerto origen: " + str(smb_port_src) + "\n" + \
"Puerto destino: " + str(smb_port_dst) + "\n\n"
print("\n" + info)
f.write(info)
if packet.haslayer(TCP) and packet[TCP].dport == 23 and packet.haslayer(Raw):
# Procesa paquetes Telnet
telnet_ip_src = packet[IP].src
telnet_ip_dst = packet[IP].dst
telnet_port_src = packet[TCP].sport
telnet_port_dst = packet[TCP].dport
telnet_payload = packet[Raw].load.decode('utf-8')
info = "(Alerta!)Paquete Telnet detectado (No es seguro)\n" + \
"IP origen: " + telnet_ip_src + "\n" + \
"IP destino: " + telnet_ip_dst + "\n" + \
"Puerto origen: " + str(telnet_port_src) + "\n" + \
"Puerto destino: " + str(telnet_port_dst) + "\n" + \
"Carga útil: " + telnet_payload + "\n\n"
print("\n" + info)
f.write(info)
# Fin del bucle de captura
except Exception as e:
print("Error al escribir en el archivo: ", str(e))
# Animación de carga
def loader():
chars = "0123456789ABCDEF"
i = 0
while running: # Bucle que se ejecuta mientras running sea True
line = ''.join(random.choice(chars) for _ in range(i))
sys.stdout.write(f'\r{line}')
sys.stdout.flush()
time.sleep(0.1)
i = (i+1) % 50
def main():
"""
Función principal del script. Inicializa las variables globales, obtiene la interfaz de red predeterminada,
verifica si la interfaz está activa y no es lo0, y luego inicia el proceso de escucha en la interfaz activa.
Si la interfaz no está activa o es lo0, el script se termina.
"""
global seen_domains
global seen_icmp_errors
seen_domains = set()
seen_icmp_errors = set()
# Obtén la interfaz de red predeterminada
active_interface = conf.iface
# Verifica si la interfaz está activa y no es lo0
if get_if_addr(active_interface) and active_interface != "lo0":
# Imprime el mensaje final salto de linea y mensaje seguido del loader al final del mensaje sin salto de line
print("Escuchando en la interfaz: " + active_interface + "\nCapturando trafico y generando archivo txt ... \n", end="\n")
global running
running = True # Variable que controla si el loader se está ejecutando
loader_thread = threading.Thread(target=loader) # Crear un nuevo hilo para el loader
loader_thread.start() # Iniciar el hilo del loader
# Hacer que el script escuche en la interfaz activa
sniff(active_interface)
running = False # Detener el loader
loader_thread.join() # Esperar a que el hilo del loader termine
else:
print("No hay ninguna interfaz activa o es la interfaz lo0. Finalizando el script.")
sys.exit(1)
if __name__ == "__main__":
"""
Si el script se ejecuta como un programa independiente (en lugar de ser importado como un módulo),
se llama a la función main().
"""
main()