diff --git a/gitgalaxy/standards/language_lens.py b/gitgalaxy/standards/language_lens.py index b658710a..b1537eee 100644 --- a/gitgalaxy/standards/language_lens.py +++ b/gitgalaxy/standards/language_lens.py @@ -202,6 +202,13 @@ def inspect( # We explicitly add .txt and .log to the prose override list if ext in {'.md', '.mdx', '.rst', '.rtf', '.txt', '.log'}: + # ---> THE FIX: Catch disguised payloads before the early exit! <--- + shebang_lang = self._tier_2_fingerprint_check(content_sample, ext) + if shebang_lang and shebang_lang != "undeterminable": + self.logger.warning(f"[{name}] IDENTITY CRISIS: Prose Ext '{ext}' contradicts Shebang '{shebang_lang}'") + result["anomaly_flags"].append(f"Identity Masking: Prose Extension ({ext}) vs Shebang ({shebang_lang})") + return self._forge_result("undeterminable", 0.0, 5, f"Identity Conflict ({ext} != {shebang_lang})", result, content_sample) + target_id = "markdown" if ext in {'.md', '.mdx'} else "plaintext" return self._forge_result(target_id, self.thresholds.get("PROSE_CONFIDENCE", 0.95), 1, f"Prose Extension ({ext})", result, content_sample) diff --git a/tests/test_chronometer_timeout.py b/tests/test_chronometer_timeout.py new file mode 100644 index 00000000..348642c8 --- /dev/null +++ b/tests/test_chronometer_timeout.py @@ -0,0 +1,73 @@ +import unittest +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +# Adjust the import path if necessary based on your actual module structure +from gitgalaxy.physics.chronometer import Chronometer + +class TestChronometerTimeout(unittest.TestCase): + + @patch('gitgalaxy.physics.chronometer.subprocess.Popen') + @patch.object(Chronometer, '_calibrate_temporal_field') # Skip the heavy init sequence + def test_zombie_process_kill_switch(self, mock_calibrate, mock_popen): + """ + Simulates an infinite, hanging Git stream. + Verifies that the Chronometer respects the timeout and successfully + reaps the zombie process at the OS level via kill() and communicate(). + """ + # 1. Setup the Infinite Stream + def infinite_git_log(): + while True: + # Yields a valid line so the internal logic has to do work + yield "mock_hash|1700000000|TestAuthor\n" + yield "src/safe_file.py\n" + + # We attach the generator to a MagicMock so we can assert .close() is called on it later + mock_stdout = MagicMock() + mock_stdout.__iter__.return_value = infinite_git_log() + mock_stderr = MagicMock() + + # 2. Setup the Mock Process + mock_process = MagicMock() + mock_process.stdout = mock_stdout + mock_process.stderr = mock_stderr + mock_popen.return_value = mock_process + + # 3. Initialize Chronometer (calibration is bypassed) + chrono = Chronometer(Path("/mock/repo")) + + # 4. Ignite the escalator with a tiny timeout (50ms) + timeout_limit = 0.05 + start_time = time.time() + + processed_lines, reached_target = chrono._run_git_stream_escalator( + cmd=["git", "log", "mock_args"], + ignored_hashes=set(), + tracked_files=set(), + required_files=10000, + timeout_limit=timeout_limit, + start_time=start_time + ) + + # ===================================================================== + # 5. INVARIANT ASSERTIONS (The Proof) + # ===================================================================== + + # Check that it actually processed lines, but was interrupted by the timeout + self.assertTrue(processed_lines > 0, "The stream should have processed lines before timing out.") + self.assertFalse(reached_target, "The escalator should have aborted before reaching the file target.") + + # --- THE ZOMBIE KILL SWITCH VERIFICATION --- + # Did we send the SIGKILL? + mock_process.kill.assert_called_once() + + # Did we flush the pipes? (Crucial: kill() without communicate() leaves a zombie) + mock_process.communicate.assert_called_once() + + # Did we close the file descriptors to prevent FD leaks? + mock_stdout.close.assert_called_once() + mock_stderr.close.assert_called_once() + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_gpu_recorder.py b/tests/test_gpu_recorder.py new file mode 100644 index 00000000..b7f18532 --- /dev/null +++ b/tests/test_gpu_recorder.py @@ -0,0 +1,51 @@ +import unittest +from gitgalaxy.recorders.gpu_recorder import GPURecorder + +class TestGPURecorderEviction(unittest.TestCase): + + def test_destructive_ram_eviction(self): + """ + Verifies Stage 3.3: Destructive RAM Eviction. + Ensures the GPU Recorder physically destroys the input arrays via .pop() + to free memory, preventing OOM crashes on massive repositories. + """ + recorder = GPURecorder(version="6.3.0") + + # 1. Create the dummy arrays (passed by reference) + mock_parsed_files = [ + {"path": f"src/file_{i}.py", "lang_id": "python", "total_loc": 100, "telemetry": {}} + for i in range(5) + ] + + mock_unparsable = [ + {"path": f"bin/payload_{i}.dll", "reason": "Binary"} + for i in range(2) + ] + + # Verify they actually have data before we start + self.assertEqual(len(mock_parsed_files), 5) + self.assertEqual(len(mock_unparsable), 2) + + # 2. Execute the GPU Recorder + result = recorder.record_mission( + parsed_files=mock_parsed_files, + unparsable_files=mock_unparsable, + summary={"unparsable_files": {}}, + forensic_report={}, + repo_name="test_repo" + ) + + # ===================================================================== + # 3. INVARIANT ASSERTIONS (The Proof) + # ===================================================================== + + # A) Did it actually build the payload successfully? + self.assertIn("galaxy", result, "GPU Recorder failed to build the galaxy payload.") + self.assertTrue(len(result["galaxy"]["paths"]) == 5, "GPU Recorder missed files in the output.") + + # B) THE EVICTION CONTRACT: Are the original RAM arrays completely destroyed? + self.assertEqual(len(mock_parsed_files), 0, "FATAL: GPU Recorder failed to evict parsed_files from RAM!") + self.assertEqual(len(mock_unparsable), 0, "FATAL: GPU Recorder failed to evict unparsable_files from RAM!") + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_language_lens.py b/tests/test_language_lens.py new file mode 100644 index 00000000..5894c5ff --- /dev/null +++ b/tests/test_language_lens.py @@ -0,0 +1,48 @@ +import unittest +from gitgalaxy.standards.language_lens import LanguageDetector + +class TestLanguageLensSecurity(unittest.TestCase): + + def test_identity_crisis_trap_blocks_disguised_payloads(self): + """ + Simulates a file attempting to mask its identity (e.g., .txt extension but a bash shebang). + Verifies that the Identity Crisis Trap catches the contradiction, strips the file's + identity to 'undeterminable', and forces it to Tier 5 (Absolute Distrust). + """ + # 1. Provide minimal mock definitions so the detector knows what 'bash' and 'txt' are + mock_lang_defs = { + "shell": { + "extensions": [".sh", ".bash"], + "shebangs": ["bash", "sh"] + }, + "plaintext": { + "extensions": [".txt", ".log"] + } + } + + # 2. Initialize the detector + detector = LanguageDetector(language_definitions=mock_lang_defs, comment_definitions={}) + + # 3. Create the malicious file: claims to be text, but acts like a shell script + mock_path = "uploads/innocent.txt" + mock_content = "#!/bin/bash\nrm -rf /\n" + + # 4. Run the inspection + result = detector.inspect(mock_path, content_sample=mock_content) + + # ===================================================================== + # 5. INVARIANT ASSERTIONS (The Proof) + # ===================================================================== + + # 1. Identity MUST be stripped to prevent downstream execution math + self.assertEqual(result["lang_id"], "undeterminable", "Detector failed to strip the identity of the masked payload!") + + # 2. Must be banished to Tier 5 (Absolute Distrust) + self.assertEqual(result["lock_tier"], 5, "Detector did not relegate the deceptive file to Tier 5!") + + # 3. Anomaly Flag must be registered in RAM so the SecurityLens can pick it up + self.assertTrue(len(result["anomaly_flags"]) > 0, "No anomaly flag was registered in the telemetry!") + self.assertIn("Identity Masking", result["anomaly_flags"][0], "The specific Identity Masking flag was missing.") + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_neural_auditor.py b/tests/test_neural_auditor.py new file mode 100644 index 00000000..dd02f200 --- /dev/null +++ b/tests/test_neural_auditor.py @@ -0,0 +1,60 @@ +import unittest +import tempfile +import struct +import os +from gitgalaxy.physics.neural_auditor import NeuralAuditor + +class TestNeuralAuditorHeaders(unittest.TestCase): + + def setUp(self): + # Initialize the auditor once for all tests in this class + self.auditor = NeuralAuditor() + + def test_truncated_safetensors_file(self): + """ + Simulates a file that is too small to even contain the 8-byte + integer required by the safetensors format specification. + """ + # Create a temporary file with a .safetensors extension + with tempfile.NamedTemporaryFile(suffix='.safetensors', delete=False) as temp_file: + # Write exactly 4 bytes of garbage data (struct.unpack requires 8) + temp_file.write(b'\x01\x02\x03\x04') + temp_path = temp_file.name + + try: + # If the auditor is robust, this will NOT crash. + # It should hit the broad except block and return the safe fallback. + result = self.auditor.audit_model(temp_path) + + self.assertEqual(result["architecture"], "Corrupted/Unknown") + self.assertEqual(result["parameters"], "Error") + self.assertEqual(result["quantization"], "Error") + finally: + # Always clean up the temporary file + os.remove(temp_path) + + def test_corrupted_json_header(self): + """ + Simulates a file that has a valid 8-byte size integer, + but the subsequent bytes are corrupted/invalid JSON. + """ + with tempfile.NamedTemporaryFile(suffix='.safetensors', delete=False) as temp_file: + # 1. Pack the number '10' into an 8-byte little-endian unsigned long long + header_size_bytes = struct.pack('