|
6 | 6 | class CompressionService: |
7 | 7 | @staticmethod |
8 | 8 | def _is_safe_path(base_path, target_path): |
9 | | - """Check if target path is within base path (prevents directory traversal)""" |
| 9 | + """Check if target path is within base path (prevents directory traversal) |
| 10 | + |
| 11 | + Note: This validates the path string without resolving symlinks to prevent |
| 12 | + symlink-based attacks. The check uses string comparison after normalization. |
| 13 | + """ |
10 | 14 | try: |
11 | 15 | base = Path(base_path).resolve() |
12 | | - target = Path(target_path).resolve() |
13 | | - return target.is_relative_to(base) |
| 16 | + # Normalize target without fully resolving to prevent symlink attacks |
| 17 | + # Join base with target and check if it stays within base |
| 18 | + full_path = (base / target_path).resolve() |
| 19 | + return full_path.is_relative_to(base) |
14 | 20 | except (ValueError, RuntimeError): |
15 | 21 | return False |
16 | 22 |
|
17 | 23 | @staticmethod |
18 | 24 | def _safe_extract_member(archive_path, member_name, extract_to): |
19 | | - """Validate that extracted file stays within extract_to directory""" |
20 | | - target_path = Path(extract_to) / member_name |
21 | | - if not CompressionService._is_safe_path(extract_to, target_path): |
| 25 | + """Validate that extracted file stays within extract_to directory |
| 26 | + |
| 27 | + Rejects paths with: |
| 28 | + - Absolute paths |
| 29 | + - Parent directory references (..) |
| 30 | + - Paths that escape the extraction directory |
| 31 | + """ |
| 32 | + # Reject absolute paths |
| 33 | + if Path(member_name).is_absolute(): |
| 34 | + raise ValueError(f"Absolute path in archive is not allowed: {member_name}") |
| 35 | + |
| 36 | + # Reject paths with parent directory references |
| 37 | + if '..' in Path(member_name).parts: |
| 38 | + raise ValueError(f"Path traversal attempt detected: {member_name}") |
| 39 | + |
| 40 | + # Validate the final path stays within extraction directory |
| 41 | + if not CompressionService._is_safe_path(extract_to, member_name): |
22 | 42 | raise ValueError(f"Attempted path traversal in archive: {member_name}") |
23 | 43 | return True |
24 | 44 |
|
@@ -61,11 +81,11 @@ def extract_tar(archive_path, extract_to): |
61 | 81 | # Validate all members before extraction to prevent path traversal |
62 | 82 | for member in tarf.getmembers(): |
63 | 83 | CompressionService._safe_extract_member(archive_path, member.name, extract_to) |
64 | | - # Use data filter for Python 3.12+ or validate manually for earlier versions |
| 84 | + # Use data filter for Python 3.11.4+ or validate manually for earlier versions |
65 | 85 | try: |
66 | 86 | tarf.extractall(extract_to, filter='data') |
67 | 87 | except TypeError: |
68 | | - # Python < 3.12 doesn't support filter parameter, but we already validated |
| 88 | + # Python < 3.11.4 doesn't support filter parameter, but we already validated |
69 | 89 | tarf.extractall(extract_to) |
70 | 90 |
|
71 | 91 | @staticmethod |
|
0 commit comments