-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
75 lines (59 loc) · 2.09 KB
/
utils.py
File metadata and controls
75 lines (59 loc) · 2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
import json
from cryptography.fernet import Fernet
import gzip
import base64
import hashlib
def _get_cipher():
key = os.environ.get('ENCRYPTION_KEY')
if not key:
key = Fernet.generate_key()
if isinstance(key, str):
key = key.encode()
return Fernet(key)
def encrypt_data(data):
cipher = _get_cipher()
return cipher.encrypt(json.dumps(data).encode()).decode()
def decrypt_data(encrypted):
cipher = _get_cipher()
return json.loads(cipher.decrypt(encrypted.encode()).decode())
def get_events_from_submission(submission):
"""
Decrypt and return event data from a Submission.
Always returns compact format: { base_time: int, events: [[delta_ms, type, file, char_count], ...] }
"""
if not submission or not getattr(submission, 'events_encrypted', None):
return {'base_time': 0, 'events': []}
try:
data = decrypt_data(submission.events_encrypted)
if isinstance(data, dict) and 'base_time' in data:
return data
return {'base_time': 0, 'events': []}
except Exception:
return {'base_time': 0, 'events': []}
def files_from_events(event_data):
"""
Return sorted list of unique basenames from compact event data.
Each event: [delta_ms, type, filename, char_count]
"""
events = event_data.get('events', [])
return sorted({e[2] for e in events if len(e) > 2 and e[2]})
def compress_text_to_b64(text: str) -> str:
"""Gzip-compress a UTF-8 string and return base64-encoded bytes as str."""
if text is None:
return ''
compressed = gzip.compress(text.encode('utf-8'))
return base64.b64encode(compressed).decode('ascii')
def decompress_b64_to_text(b64: str) -> str:
"""Decode base64 then gunzip to UTF-8 string."""
if not b64:
return ''
raw = base64.b64decode(b64)
return gzip.decompress(raw).decode('utf-8')
def sha256_of_b64(b64: str) -> str:
"""Return hex SHA256 of the decoded base64 bytes."""
try:
data = base64.b64decode(b64)
except Exception:
data = b''
return hashlib.sha256(data).hexdigest()