-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsingle_alert_formatter.py
More file actions
2101 lines (1834 loc) · 93.5 KB
/
single_alert_formatter.py
File metadata and controls
2101 lines (1834 loc) · 93.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import json
import pyperclip
import sys
import os
import requests
import time
import re
import traceback
import google.generativeai as genai
from datetime import datetime, timedelta
from dotenv import load_dotenv
# --- Load Environment Variables ---
load_dotenv()
# --- Configuration ---
VIRUSTOTAL_API_KEY = os.getenv('VIRUSTOTAL_API_KEY')
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
VIRUSTOTAL_API_URL = "https://www.virustotal.com/api/v3/files/{}"
GEMINI_MODEL_NAME = "gemini-2.5-flash"
CHATGPT5_HARD_CHAR_LIMIT = 65000
CHATGPT5_SAFETY_MARGIN = 1000
CHATGPT5_MAX_PROMPT_CHARS = CHATGPT5_HARD_CHAR_LIMIT - CHATGPT5_SAFETY_MARGIN
CHATGPT5_CONTINUATION_BUFFER = 1500
CHATGPT5_LARGE_PROMPT_THRESHOLD = CHATGPT5_MAX_PROMPT_CHARS - 6000
CHATGPT5_NEAR_LIMIT_THRESHOLD = CHATGPT5_MAX_PROMPT_CHARS - 1500
# - Clear actor identification speeds up investigation and response
# ===============================================================================
# Feature note:
# - When an alert is determined to be memory- or shellcode-related (e.g., memory protection, suspicious thread/code injection,
# shellcode execution), the script extracts call stack details from the alert payload and includes them in the Investigation Report.
# This inclusion is conditional and occurs only when these behaviors are directly involved.
# --- Common Process List (Kept for potential future use or reference) ---
COMMON_PROCESS_FILENAMES = {
"explorer.exe", "svchost.exe", "lsass.exe", "csrss.exe", "wininit.exe",
"services.exe", "conhost.exe", "runtimebroker.exe", "taskhostw.exe",
"taskhostex.exe", "dwm.exe", "winlogon.exe", "system", "smss.exe",
"spoolsv.exe", "searchindexer.exe", "sihost.exe", "ctfmon.exe",
"dllhost.exe", "audiodg.exe", "fontdrvhost.exe", "consent.exe",
"applicationframehost.exe", "shellexperiencehost.exe", "systemsettings.exe",
"wmiprvse.exe", "taskmgr.exe", "services.msc", "mmc.exe", "chrome.exe",
"firefox.exe", "msedge.exe", "iexplore.exe", "winword.exe", "excel.exe",
"powerpnt.exe", "outlook.exe", "onedrive.exe", "teams.exe", "mstsc.exe",
"cmd.exe", "powershell.exe", "cscript.exe", "wscript.exe", "regsvr32.exe",
"7zg.exe", "7zfm.exe", "7z.exe",
}
# --- Helper Functions (general) ---
def condense_alert_json(alert_data):
"""
Aggressively condense alert JSON by removing verbose/redundant fields
while preserving ALL investigation-critical data.
REMOVES (to save space):
- Elastic metadata (index, shard, sort, version)
- Verbose kibana.alert.* metadata (keeps core fields only)
- Policy configuration details (Endpoint.policy.applied.artifacts)
- Massive DLL lists (process.Ext.dll, Target.process.Ext.dll) - keeps count only
- Duplicate nested copies of same data
- Flattened field duplicates (e.g., host.name vs host.hostname)
- Large code signature arrays (keeps summary only)
- Empty/null nested objects
PRESERVES (for investigation):
- Core process, file, user, host information
- All SHA256/SHA1/MD5 hashes
- Critical threat indicators
- Command lines and arguments
- Parent-child process relationships
- Timestamps and event metadata
- Rule details (name, description, reason)
- Memory/thread information (call stacks, memory regions)
- Network indicators
- Target process information
"""
import copy
condensed = copy.deepcopy(alert_data)
# Fields to remove (non-critical metadata)
removable_top_level = [
'_index', '_type', '_id', '_score', '_version',
'sort', 'fields', '_ignored', 'highlight', '_explanation'
]
for field in removable_top_level:
condensed.pop(field, None)
# Work with _source if it exists, otherwise work with root
source = condensed.get('_source', condensed)
# === AGGRESSIVE KIBANA.ALERT CLEANUP ===
if 'kibana' in source and 'alert' in source['kibana']:
kibana_alert = source['kibana']['alert']
# Keep ONLY these essential fields
essential_kibana_fields = {
'rule': {'name', 'description', 'severity', 'risk_score', 'uuid', 'rule_id'},
'reason': True,
'severity': True,
'risk_score': True,
'workflow_status': True,
'original_time': True,
'url': True,
}
# Keep minimal rule info
if 'rule' in kibana_alert and isinstance(kibana_alert['rule'], dict):
rule_essential = essential_kibana_fields['rule']
rule_keys = list(kibana_alert['rule'].keys())
for key in rule_keys:
if key not in rule_essential:
kibana_alert['rule'].pop(key, None)
# Remove all other kibana.alert.* bloat
keys_to_remove = [k for k in list(kibana_alert.keys()) if k not in essential_kibana_fields]
for key in keys_to_remove:
kibana_alert.pop(key, None)
# === AGGRESSIVE ENDPOINT POLICY CLEANUP ===
# The Endpoint.policy.applied.artifacts object is MASSIVE and not needed for investigation
if 'Endpoint' in source and isinstance(source['Endpoint'], dict):
if 'policy' in source['Endpoint'] and isinstance(source['Endpoint']['policy'], dict):
if 'applied' in source['Endpoint']['policy']:
applied = source['Endpoint']['policy']['applied']
# Keep only name and ID
source['Endpoint']['policy']['applied'] = {
'name': applied.get('name', 'N/A'),
'id': applied.get('id', 'N/A'),
'_note': 'Policy artifacts removed to save space'
}
# === AGGRESSIVE DLL LIST CLEANUP ===
# DLL lists can be 100+ entries - we only need count for investigation
def condense_dll_list(dll_array):
"""Replace massive DLL arrays with summary"""
if not isinstance(dll_array, list):
return dll_array
if len(dll_array) <= 5:
return dll_array # Keep small lists
# For large lists, keep first 2 and last 2, add summary
return [
dll_array[0],
dll_array[1],
{
'_summary': f'{len(dll_array) - 4} DLLs omitted (total: {len(dll_array)})',
'_note': 'Full DLL list removed to save space'
},
dll_array[-2],
dll_array[-1]
]
# Clean process DLL list
if 'process' in source and isinstance(source['process'], dict):
if 'Ext' in source['process'] and isinstance(source['process']['Ext'], dict):
if 'dll' in source['process']['Ext']:
source['process']['Ext']['dll'] = condense_dll_list(source['process']['Ext']['dll'])
# Clean Target.process DLL list
if 'Target' in source and isinstance(source['Target'], dict):
if 'process' in source['Target'] and isinstance(source['Target']['process'], dict):
if 'Ext' in source['Target']['process'] and isinstance(source['Target']['process']['Ext'], dict):
if 'dll' in source['Target']['process']['Ext']:
source['Target']['process']['Ext']['dll'] = condense_dll_list(source['Target']['process']['Ext']['dll'])
# === REMOVE REDUNDANT FIELDS ===
# Remove agent.* verbose fields (keep minimal info)
if 'agent' in source:
agent_essential = {'id', 'type', 'version'}
agent_keys = list(source['agent'].keys())
for key in agent_keys:
if key not in agent_essential:
source['agent'].pop(key, None)
# Remove ecs metadata
source.pop('ecs', None)
# Remove observer verbose metadata
if 'observer' in source:
source.pop('observer', None) # Usually duplicates host/agent
# Remove data_stream metadata
source.pop('data_stream', None)
# Remove elastic metadata
source.pop('elastic', None)
# === CONDENSE LARGE ARRAYS ===
# Condense large Events array
if 'Events' in source and isinstance(source['Events'], list):
if len(source['Events']) > 2:
events_count = len(source['Events'])
source['Events'] = [
source['Events'][0],
{'_summary': f'{events_count - 2} events omitted'},
source['Events'][-1]
]
# === CONDENSE CODE SIGNATURES ===
def condense_code_signature_array(sig_array):
"""Keep only first signature from array"""
if not isinstance(sig_array, list):
return sig_array
if len(sig_array) == 0:
return sig_array
# Keep only first signature (most relevant)
return [sig_array[0]]
# Condense process code signatures
if 'process' in source and isinstance(source['process'], dict):
if 'Ext' in source['process'] and isinstance(source['process']['Ext'], dict):
if 'code_signature' in source['process']['Ext']:
source['process']['Ext']['code_signature'] = condense_code_signature_array(
source['process']['Ext']['code_signature']
)
if 'parent' in source['process'] and isinstance(source['process']['parent'], dict):
if 'Ext' in source['process']['parent'] and isinstance(source['process']['parent']['Ext'], dict):
if 'code_signature' in source['process']['parent']['Ext']:
source['process']['parent']['Ext']['code_signature'] = condense_code_signature_array(
source['process']['parent']['Ext']['code_signature']
)
# === REMOVE DUPLICATIVE NESTED FIELDS ===
# Remove kibana.alert.original_event if it duplicates the root event
if 'kibana' in source and 'alert' in source['kibana']:
if 'original_event' in source['kibana']['alert']:
# Keep only if it has unique data not in root event
original_evt = source['kibana']['alert']['original_event']
root_evt = source.get('event', {})
if isinstance(original_evt, dict) and isinstance(root_evt, dict):
# If they look similar, remove original_event
if original_evt.get('code') == root_evt.get('code'):
source['kibana']['alert'].pop('original_event', None)
# === CONDENSE HOST INFO ===
# Remove excessive IP arrays (keep first 3)
if 'host' in source and isinstance(source['host'], dict):
if 'ip' in source['host'] and isinstance(source['host']['ip'], list):
if len(source['host']['ip']) > 3:
ip_count = len(source['host']['ip'])
source['host']['ip'] = source['host']['ip'][:3] + [f'... {ip_count - 3} more IPs']
if 'mac' in source['host'] and isinstance(source['host']['mac'], list):
if len(source['host']['mac']) > 2:
mac_count = len(source['host']['mac'])
source['host']['mac'] = source['host']['mac'][:2] + [f'... {mac_count - 2} more MACs']
# === REMOVE LARGE TEXT FIELDS ===
# Remove/truncate message field if long (usually duplicates reason)
if 'message' in source:
if isinstance(source['message'], str) and len(source['message']) > 500:
source['message'] = source['message'][:200] + "... [truncated]"
# Remove event.original if huge
if 'event' in source and 'original' in source['event']:
if isinstance(source['event']['original'], str) and len(source['event']['original']) > 2000:
source['event'].pop('original', None)
# === CONDENSE RELATED.* ARRAYS ===
if 'related' in source:
for key in list(source['related'].keys()):
if isinstance(source['related'][key], list) and len(source['related'][key]) > 10:
count = len(source['related'][key])
source['related'][key] = source['related'][key][:10] + [f'... {count - 10} more']
# === REMOVE MEMORY_PROTECTION VERBOSE FIELDS ===
if 'Memory_protection' in source and isinstance(source['Memory_protection'], dict):
# Keep only essential memory protection indicators
essential_memory_fields = {'feature', 'self_injection', 'cross_session'}
mem_keys = list(source['Memory_protection'].keys())
for key in mem_keys:
if key not in essential_memory_fields:
source['Memory_protection'].pop(key, None)
# === REMOVE RULE METADATA ===
if 'rule' in source and isinstance(source['rule'], dict):
# Usually just contains "ruleset": "production" - not useful
source.pop('rule', None)
# Update _source if we were working with it
if '_source' in condensed:
condensed['_source'] = source
return condensed
def _take_chunk_portion(text, capacity):
"""Return a tuple of (chunk, remainder) using soft newline boundaries."""
if capacity <= 0:
raise ValueError("capacity must be positive")
if not isinstance(text, str):
text = str(text)
if len(text) <= capacity:
return text, ""
split_idx = capacity
newline_idx = text.rfind('\n', 0, capacity)
# Prefer to split on a newline that is reasonably close to the limit for readability
if newline_idx != -1 and capacity - newline_idx <= 1000:
split_idx = max(newline_idx + 1, 1)
chunk = text[:split_idx]
remainder = text[split_idx:]
if not chunk:
chunk = text[:capacity]
remainder = text[capacity:]
return chunk, remainder
def split_json_into_prompt_chunks(json_text, instructions, max_prompt_chars, continuation_buffer=1500):
"""
Split JSON payload into multiple prompt-safe chunks while keeping instructions in the first chunk.
Returns a list of prompt strings, each smaller than max_prompt_chars.
"""
if not isinstance(json_text, str):
json_text = str(json_text)
if not isinstance(instructions, str):
instructions = str(instructions)
if max_prompt_chars <= 0:
raise ValueError("max_prompt_chars must be positive")
# We'll try to construct chunks such that each final prompt (header + chunk)
# does not exceed `max_prompt_chars`. Because header length depends on
# the total number of chunks, we may need to iteratively shrink the
# continuation payload size until all constructed prompts fit.
if continuation_buffer < 0:
raise ValueError("continuation_buffer must be non-negative")
# Starting conservative continuation capacity (will be reduced if needed)
continuation_capacity = max_prompt_chars - continuation_buffer
if continuation_capacity <= 0:
raise ValueError("Continuation buffer too large for remaining prompt space.")
# Iteratively attempt chunking, shrinking continuation_capacity when headers+chunks overflow
attempt = 0
max_attempts = 12
chunk_texts = None
while True:
attempt += 1
# Make a reasonable first-chunk capacity that leaves room for the instructions
first_capacity = max(256, continuation_capacity - len(instructions) - 2)
if first_capacity <= 0:
raise ValueError("Instructions exceed available prompt capacity.")
# Perform chunking using the current capacities
remaining = json_text
first_chunk, remaining = _take_chunk_portion(remaining, first_capacity)
tmp_chunks = [first_chunk]
too_many = False
while remaining:
chunk, remaining = _take_chunk_portion(remaining, continuation_capacity)
if not chunk:
break
tmp_chunks.append(chunk)
if len(tmp_chunks) > 800:
too_many = True
break
if too_many:
# Shrink and retry
if attempt >= max_attempts or continuation_capacity <= 256:
raise ValueError("Too many chunks required to satisfy prompt limits.")
continuation_capacity = max(256, int(continuation_capacity * 0.7))
continue
# Now build prompts with headers and validate sizes
prompts_try = []
total_chunks = len(tmp_chunks)
for idx, chunk_text in enumerate(tmp_chunks, start=1):
if idx == 1:
header = (
f"[CHUNK {idx}/{total_chunks}] BEGIN ALERT JSON - DO NOT RESPOND OR ANALYZE UNTIL ALL CHUNKS ARE RECEIVED.\n"
f"The final chunk will include the exact marker: END_OF_ALERT_JSON_CHUNKS\n"
"When you receive the final chunk containing that marker, then process all chunks together and reply.\n\n"
f"INSTRUCTIONS (do not alter):\n{instructions}\n\n"
)
else:
final_note = " This is the FINAL CHUNK; after this chunk the sender will provide the marker: END_OF_ALERT_JSON_CHUNKS." if idx == total_chunks else ""
header = (
f"[CHUNK {idx}/{total_chunks}] CONTINUATION - DO NOT RESPOND. Paste this immediately after the previous chunk so the assistant retains context.{final_note}\n\n"
)
# If this is the last chunk, append the explicit end marker so remote assistant knows all chunks arrived
if idx == total_chunks:
prompts_try.append(header + chunk_text + "\nEND_OF_ALERT_JSON_CHUNKS\n")
else:
prompts_try.append(header + chunk_text)
# Verify all prompts fit within max_prompt_chars
oversized = [p for p in prompts_try if len(p) > max_prompt_chars]
if not oversized:
chunk_texts = tmp_chunks
break
# If oversized, shrink continuation_capacity and retry
if attempt >= max_attempts or continuation_capacity <= 256:
raise ValueError("Chunked prompt still exceeds configured limit.")
# Reduce by 20% and retry
continuation_capacity = max(256, int(continuation_capacity * 0.8))
# loop to retry
# Build final prompts (we already validated sizes in the loop above)
prompts = []
total_chunks = len(chunk_texts)
for idx, chunk_text in enumerate(chunk_texts, start=1):
if idx == 1:
header = (
f"[CHUNK {idx}/{total_chunks}] BEGIN ALERT JSON - DO NOT RESPOND OR ANALYZE UNTIL ALL CHUNKS ARE RECEIVED.\n"
f"The final chunk will include the exact marker: END_OF_ALERT_JSON_CHUNKS\n"
"When you receive the final chunk containing that marker, then process all chunks together and reply.\n\n"
f"INSTRUCTIONS (do not alter):\n{instructions}\n\n"
)
else:
final_note = " This is the FINAL CHUNK; after this chunk the sender will provide the marker: END_OF_ALERT_JSON_CHUNKS." if idx == total_chunks else ""
header = (
f"[CHUNK {idx}/{total_chunks}] CONTINUATION - DO NOT RESPOND. Paste this immediately after the previous chunk so the assistant retains context.{final_note}\n\n"
)
# Ensure final chunk contains the explicit end marker required by cooperating assistants
if idx == total_chunks:
prompts.append(header + chunk_text + "\nEND_OF_ALERT_JSON_CHUNKS\n")
else:
prompts.append(header + chunk_text)
return prompts
def print_prompt_size_guidance(char_count):
if char_count >= CHATGPT5_NEAR_LIMIT_THRESHOLD:
print(f"⚠️ CAUTION: Prompt is very close to the current ChatGPT-5.1 cap (~{CHATGPT5_HARD_CHAR_LIMIT:,} chars).")
elif char_count >= max(CHATGPT5_LARGE_PROMPT_THRESHOLD, 0):
print("ℹ️ INFO: Prompt is large but still within the reduced limit.")
else:
print("✓ Prompt size is within safe limits for ChatGPT-5 extended thinking.")
def _get_nested_dict(data_dict, keys):
current = data_dict
for key in keys:
if not isinstance(current, dict):
return None
current = current.get(key)
return current if isinstance(current, dict) else None
def _contains_npm_token(text):
if not text:
return False
lowered = text.lower()
if 'npm-cli.js' in lowered:
return True
return bool(re.search(r'(^|[\s"\'/\\])npm(\.cmd|\.exe)?([\s"\'/\\]|$)', lowered))
def is_npm_related_process(name, command_line, args):
name_lower = (name or '').lower()
if name_lower in {'npm', 'npm.exe', 'npm.cmd'}:
return True
iterable_args = []
if isinstance(args, list):
iterable_args = [str(a) for a in args if a]
if _contains_npm_token(command_line or ''):
return True
for arg in iterable_args:
if _contains_npm_token(arg):
return True
return False
def summarize_npm_command(command_line, args):
tokens = []
if isinstance(args, list) and args:
tokens = [str(a) for a in args if a]
elif isinstance(command_line, str) and command_line.strip():
tokens = [t for t in re.split(r'\s+', command_line.strip()) if t]
if not tokens:
return "Unable to parse npm command (no arguments provided)."
lowered = [t.lower() for t in tokens]
npm_idx = None
for idx, tok in enumerate(lowered):
if 'npm' == tok or tok.endswith('npm') or 'npm-cli.js' in tok or tok.endswith('npm.cmd') or tok.endswith('npm.exe'):
npm_idx = idx
break
relevant = tokens[npm_idx + 1:] if npm_idx is not None and npm_idx + 1 < len(tokens) else tokens
if not relevant:
return "NPM invoked without additional arguments."
primary = relevant[0].lower()
extra = relevant[1:]
def _join(values, limit=6):
display = values[:limit]
tail = " ..." if len(values) > limit else ""
return " ".join(display) + tail if display else ""
if primary == 'run':
script = extra[0] if extra else None
if script:
extras = _join(extra[1:])
return f"npm run `{script}`" + (f" with extras: {extras}" if extras else "")
return "npm run executed without a script target."
elif primary in {'install', 'i', 'ci', 'update', 'upgrade', 'uninstall', 'remove', 'exec', 'audit', 'start', 'test', 'build'}:
joined = _join(extra)
return f"npm {primary}" + (f" {joined}" if joined else " (no additional arguments).")
joined_generic = _join(relevant)
return "npm " + joined_generic if joined_generic else "npm executed without arguments."
def detect_npm_activity(alert_data):
activity = []
candidates = [
(['process'], "Alerted Process"),
(['process', 'parent'], "Parent Process"),
(['Target', 'process'], "Target Process"),
]
for path, label in candidates:
proc_dict = _get_nested_dict(alert_data, path)
if not proc_dict:
continue
name = proc_dict.get('name') or _basename(proc_dict.get('executable', ''))
command_line = proc_dict.get('command_line')
args = proc_dict.get('args') if isinstance(proc_dict.get('args'), list) else None
if not is_npm_related_process(name, command_line, args):
continue
activity.append({
'label': label,
'name': name or 'npm',
'command_line': command_line,
'args': args,
'working_directory': proc_dict.get('working_directory'),
'summary': summarize_npm_command(command_line, args)
})
return activity
def get_nested_value(data_dict, keys, default="N/A"):
"""Safely retrieves a nested value from a dictionary."""
current_value = data_dict
try:
for i, key in enumerate(keys):
if isinstance(current_value, dict):
current_value = current_value.get(key)
elif isinstance(current_value, list):
try:
idx = int(key)
if 0 <= idx < len(current_value):
current_value = current_value[idx]
else:
current_value = None
except (ValueError, TypeError):
if i == len(keys) - 1:
pass
else:
current_value = None
else:
current_value = None
if current_value is None:
break
if current_value in [None, "", []]:
return default
return current_value
except (TypeError, KeyError, IndexError):
return default
def format_timestamp(ts_string, default="N/A"):
"""Formats an ISO 8601 timestamp string."""
if not ts_string or ts_string == default:
return default
try:
if isinstance(ts_string, str) and ts_string.endswith('Z'):
ts_string = ts_string[:-1] + '+00:00'
dt_obj = datetime.fromisoformat(ts_string)
return dt_obj.strftime('%Y-%m-%d %H:%M:%S %Z%z')
except ValueError:
return ts_string
def format_list(data_list, default="N/A"):
"""Formats a list into a comma-separated string."""
if not isinstance(data_list, list) or not data_list:
return default
return ", ".join(map(str, data_list))
def _zone_from_path(path):
if not isinstance(path, str) or not path:
return "Unknown"
p = path.lower()
if p.startswith("\\\\"): return "UNC/Network"
if ":\\windows\\system32" in p: return "System32"
if ":\\windows\\" in p: return "Windows"
if ":\\program files" in p: return "Program Files"
if "\\appdata\\local\\temp\\" in p or p.endswith("\\temp") or "\\temp\\" in p: return "Temp"
if "\\downloads\\" in p: return "Downloads"
if "\\users\\" in p: return "User Profile"
return "Other"
# --- Signature status helper ---
def get_signature_status_string(sig_info_dict):
if not isinstance(sig_info_dict, dict) or not sig_info_dict:
return "Unsigned or Info Unavailable"
verified_status_raw = sig_info_dict.get('verified', None)
if verified_status_raw is None:
return "Unsigned"
verified_status = str(verified_status_raw).lower().strip()
if "revoked" in verified_status: return "Revoked"
if "expired" in verified_status: return "Expired"
if "invalid" in verified_status: return "Invalid"
if "cannot verify" in verified_status: return "Cannot Verify"
if "file is not signed" in verified_status or verified_status == "not signed": return "Unsigned"
if "unsigned" in verified_status: return "Unsigned"
if "signed and valid" in verified_status or verified_status in ("valid", "signed"): return "Valid"
return str(verified_status_raw).capitalize() if verified_status_raw else "Status Unknown"
def _drive_letter(path):
if isinstance(path, str) and len(path) >= 2 and path[1] == ":":
return path[0].upper()
return "?"
def _is_lolbin(name):
if not name: return False
lolbins = {
"rundll32.exe","regsvr32.exe","mshta.exe","powershell.exe","cmd.exe","wscript.exe","cscript.exe",
"wmic.exe","msiexec.exe","certutil.exe","bitsadmin.exe","installutil.exe","msbuild.exe",
"forfiles.exe","schtasks.exe","curl.exe","ftp.exe","vssadmin.exe","bcdedit.exe","wbadmin.exe"
}
return name.lower() in lolbins
def _suspicious_parent_child(p_name, c_name):
p = (p_name or "").lower()
c = (c_name or "").lower()
combos = {
"winword.exe": {"powershell.exe","cmd.exe","wscript.exe","cscript.exe","mshta.exe","rundll32.exe","regsvr32.exe","msiexec.exe"},
"excel.exe": {"powershell.exe","cmd.exe","wscript.exe","cscript.exe","mshta.exe","rundll32.exe","regsvr32.exe","msiexec.exe"},
"powerpnt.exe":{"powershell.exe","cmd.exe","wscript.exe","cscript.exe","mshta.exe","rundll32.exe","regsvr32.exe","msiexec.exe"},
"outlook.exe": {"powershell.exe","cmd.exe","wscript.exe","cscript.exe","mshta.exe","rundll32.exe","regsvr32.exe","msiexec.exe"},
"explorer.exe":{"powershell.exe","wscript.exe","cscript.exe","cmd.exe","mshta.exe","rundll32.exe"},
"chrome.exe": {"powershell.exe","cmd.exe","wscript.exe","cscript.exe","mshta.exe","rundll32.exe","regsvr32.exe","msiexec.exe"},
"msedge.exe": {"powershell.exe","cmd.exe","wscript.exe","cscript.exe","mshta.exe","rundll32.exe","regsvr32.exe","msiexec.exe"},
"firefox.exe": {"powershell.exe","cmd.exe","wscript.exe","cscript.exe","mshta.exe","rundll32.exe","regsvr32.exe","msiexec.exe"},
"svchost.exe": {"powershell.exe","cmd.exe"}
}
return c in combos.get(p, set())
def _has_suspicious_flags(cmd):
if not isinstance(cmd, str): return False
s = cmd.lower()
flags = [" -enc", "-encodedcommand", " -nop", "-w hidden", "/bypass", " -nologo", " -exec bypass"]
return any(flag in s for flag in flags)
def _file_ext(path_or_name):
if not isinstance(path_or_name, str): return ""
base = path_or_name.rsplit("\\",1)[-1]
if "." in base: return base.rsplit(".",1)[1].lower()
return ""
def _suspicious_file_ext(ext):
return ext in {"js","vbs","vbe","jse","wsf","wsh","hta","ps1","bat","cmd","scr","dll"}
def extract_username_from_paths(data):
"""
Attempts to extract username from file paths when user.name is not provided.
Looks for patterns like C:\\Users\\username\\ in various path fields.
Returns the extracted username or None if not found.
"""
import re
# Common path fields to check
path_fields = [
['file', 'path'],
['process', 'executable'],
['process', 'path'],
['process', 'parent', 'executable'],
['process', 'parent', 'path'],
['process', 'working_directory'],
['process', 'args'],
]
# Pattern to match Windows user paths: C:\Users\username\...
# Captures username between \Users\ and the next backslash
user_path_pattern = re.compile(r'[A-Z]:\\Users\\([^\\]+)\\', re.IGNORECASE)
for field_path in path_fields:
value = get_nested_value(data, field_path, default=None)
# Handle both string and list values (like process.args)
values_to_check = []
if isinstance(value, str):
values_to_check = [value]
elif isinstance(value, list):
values_to_check = [str(v) for v in value if v]
for path_str in values_to_check:
if not path_str:
continue
match = user_path_pattern.search(path_str)
if match:
username = match.group(1)
# Filter out common system accounts
system_accounts = {'public', 'default', 'defaultuser0', 'administrator', 'guest', 'all users'}
if username.lower() not in system_accounts:
return username
return None
def _get_pid_ppid(data):
pid = get_nested_value(data, ['process','pid'], default="N/A")
ppid = get_nested_value(data, ['process','parent','pid'], default="N/A")
return pid, ppid
def _get_start_times(data):
child_start = format_timestamp(get_nested_value(data, ['process','start'], default="N/A"))
parent_start = format_timestamp(get_nested_value(data, ['process','parent','start'], default="N/A"))
return child_start, parent_start
# --- Memory alert helpers ---
def is_memory_related(context, data):
"""Heuristically determine if this alert directly involves memory (e.g., memory protection, code injection, suspicious thread).
Uses rule name, reason text, and presence of Endgame suspicious_thread_info.
"""
try:
rule_name = (context.get('rule_name') or '').lower()
reason = (context.get('reason') or '').lower()
if 'memory' in rule_name or 'memory' in reason:
return True
# Endgame module or data hints
endgame_module = get_nested_value(data, ['endgame', 'module'], default="N/A")
if isinstance(endgame_module, str) and 'memory' in endgame_module.lower():
return True
suspicious_thread = get_nested_value(data, ['endgame', 'data', 'suspicious_thread_info', '0'], default="N/A")
if isinstance(suspicious_thread, dict) and suspicious_thread:
return True
# Other common hints in tags or event.action
tags = get_nested_value(data, ['tags'], default=[])
if isinstance(tags, list) and any(isinstance(t, str) and 'memory' in t.lower() for t in tags):
return True
event_action = get_nested_value(data, ['event', 'action'], default="N/A")
if isinstance(event_action, str) and any(k in event_action.lower() for k in ['suspicious_thread', 'code_injection', 'create_remote_thread']):
return True
except Exception:
pass
return False
def is_shellcode_related(context, data):
"""Detect whether alert references shellcode execution or injection."""
try:
keywords = ('shellcode', 'shell code', 'shell-code', 'shell_code')
def _contains_keywords(value):
if not isinstance(value, str):
return False
lowered = value.lower()
return any(keyword in lowered for keyword in keywords)
rule_name = context.get('rule_name') or ''
reason = context.get('reason') or ''
if _contains_keywords(rule_name) or _contains_keywords(reason):
return True
event_action = get_nested_value(data, ['event', 'action'], default='')
if _contains_keywords(event_action):
return True
tags = get_nested_value(data, ['tags'], default=[])
if isinstance(tags, list) and any(_contains_keywords(str(tag)) for tag in tags):
return True
threat_desc = get_nested_value(data, ['threat', 'indicator', 'description'], default='')
if _contains_keywords(threat_desc):
return True
detection = get_nested_value(data, ['Detection', 'message'], default='')
if _contains_keywords(detection):
return True
except Exception:
pass
return False
def _format_stack_frames(frames):
"""Format a list of frames that may be strings or dicts."""
out = []
for fr in frames:
try:
if isinstance(fr, str):
line = fr.strip('\n')
elif isinstance(fr, dict):
module = fr.get('module') or fr.get('image') or fr.get('dll') or ''
symbol = fr.get('symbol_info') or fr.get('symbol') or fr.get('function') or ''
addr = fr.get('address') or fr.get('addr') or ''
offset = fr.get('offset') or ''
headline = []
if module:
headline.append(str(module))
if symbol:
headline.append(str(symbol))
if offset:
headline.append(f"+{offset}")
head = ' | '.join(headline)
detail_pairs = []
preferred_order = [
('protection_provenance_path', 'path'),
('protection_provenance', 'provenance'),
('allocation_private_bytes', 'bytes'),
('callsite_trailing_bytes', 'trailing'),
('callsite_leading_bytes', 'leading'),
]
for key, label in preferred_order:
value = fr.get(key)
if value:
trimmed = str(value)
if len(trimmed) > 140:
trimmed = trimmed[:137] + '...'
detail_pairs.append(f"{label}: {trimmed}")
residual = {
k: v for k, v in fr.items()
if k not in {pair[0] for pair in preferred_order}
and k not in {'module', 'image', 'dll', 'symbol_info', 'symbol', 'function', 'sym', 'address', 'addr', 'offset'}
}
for key, value in residual.items():
if value is None or isinstance(value, (list, dict)):
continue
detail_pairs.append(f"{key}: {value}")
pieces = []
if head:
pieces.append(head)
if addr:
pieces.append(f"addr: {addr}")
if detail_pairs:
pieces.append(" | ".join(detail_pairs))
line = " | ".join(pieces) if pieces else json.dumps(fr, ensure_ascii=False)
else:
line = str(fr)
except Exception:
line = str(fr)
out.append(line)
return out
def extract_call_stack(data):
"""Attempt to extract a call stack from multiple possible fields in the alert payload.
Returns a normalized multi-line string or "N/A" if not found.
"""
# Primary suspected location (Elastic Endgame suspicious thread info)
suspicious_thread = get_nested_value(data, ['endgame', 'data', 'suspicious_thread_info', '0'], default=None)
candidate_values = []
if isinstance(suspicious_thread, dict):
# Look for explicit stack fields first
for key in ['call_stack', 'stack_trace', 'stack', 'stack_buffer']:
val = suspicious_thread.get(key)
if val:
candidate_values.append(val)
# If none explicit, try any field that looks like stack-ish
if not candidate_values:
for k, v in suspicious_thread.items():
if isinstance(k, str) and any(s in k.lower() for s in ['stack', 'trace']):
if v:
candidate_values.append(v)
# Other possible locations
other_paths = [
['endgame','data','call_stack'],
['kibana','alert','original_event','call_stack'],
['kibana','alert','original_event','stack_trace'],
['event','call_stack'],
['event','stacktrace'],
['process','thread','stack'],
['process','thread','Ext','call_stack'],
['process','thread','Ext','call_stack_summary'],
]
for p in other_paths:
v = get_nested_value(data, p, default=None)
if v:
candidate_values.append(v)
# Also check common flattened keys used by Elastic _source
flat_keys = [
'process.thread.Ext.call_stack',
'process.thread.Ext.call_stack_summary',
'kibana.alert.original_event.process.thread.Ext.call_stack',
'kibana.alert.original_event.process.thread.Ext.call_stack_summary',
'winlog.event_data.CallTrace',
'event.original',
'message',
'kibana.alert.reason',
]
for k in flat_keys:
try:
v = data.get(k)
if v:
candidate_values.append(v)
except Exception:
pass
if not candidate_values:
return "N/A"
# Normalize: prefer the first non-empty candidate
value = candidate_values[0]
lines = []
if isinstance(value, str):
# Split on common delimiters while keeping readability
raw_lines = [l for l in value.replace('\r','').split('\n') if l.strip()]
lines = raw_lines
elif isinstance(value, list):
lines = _format_stack_frames(value)
elif isinstance(value, dict):
# Some payloads use a dict with frames
frames = value.get('frames') or value.get('stack') or value.get('trace') or []
if isinstance(frames, list):
lines = _format_stack_frames(frames)
else:
# Fallback to stringified dict
return json.dumps(value, ensure_ascii=False)
else:
lines = [str(value)]
# Cap excessively long stacks but keep informative content
MAX_LINES = 60
if len(lines) > MAX_LINES:
head = lines[:MAX_LINES]
head.append(f"... (truncated, {len(lines)} total frames)")
lines = head
return "\n".join(lines) if lines else "N/A"
def extract_call_stack_summary(data):
"""Extract the call stack summary only from common Elastic Endpoint fields.
Returns a list of pretty lines or an empty list if not found.
Preferred fields: process.thread.Ext.call_stack_summary and its original_event variant.
"""
candidates = []
# Try nested forms first
nested_paths = [
['process','thread','Ext','call_stack_summary'],
['kibana','alert','original_event','process','thread','Ext','call_stack_summary'],
]
for p in nested_paths:
v = get_nested_value(data, p, default=None)
if v:
candidates.append(v)
# Also check flattened keys directly on the top-level doc
for k in (
'process.thread.Ext.call_stack_summary',
'kibana.alert.original_event.process.thread.Ext.call_stack_summary',
):
try:
v = data.get(k)
if v:
candidates.append(v)
except Exception:
pass
if not candidates:
return []
value = candidates[0]
lines = []
# Normalize different shapes
if isinstance(value, str):
txt = value.replace('\r','').strip()
# Split on newlines or semicolons commonly used in summaries
if '\n' in txt:
lines = [l.strip() for l in txt.split('\n') if l.strip()]
else:
parts = [p.strip() for p in txt.split(';') if p.strip()]
lines = parts if parts else ([txt] if txt else [])
elif isinstance(value, list):
for item in value:
if isinstance(item, str):
s = item.strip()
if s:
lines.append(s)
elif isinstance(item, dict):
si = item.get('symbol_info') or item.get('summary') or item.get('symbol') or ''
if si:
lines.append(str(si).strip())
elif isinstance(value, dict):
# Some implementations put summary under a key
si = value.get('summary') or value.get('text') or value.get('symbol_info')
if isinstance(si, str):
txt = si.replace('\r','').strip()
if '\n' in txt:
lines = [l.strip() for l in txt.split('\n') if l.strip()]
else:
parts = [p.strip() for p in txt.split(';') if p.strip()]
lines = parts if parts else ([txt] if txt else [])
# Basic prettifying: collapse excessive spaces
pretty = []
for l in lines:
s = ' '.join(str(l).split())