Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions gitgalaxy/standards/language_lens.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ def inspect(

# We explicitly add .txt and .log to the prose override list
if ext in {'.md', '.mdx', '.rst', '.rtf', '.txt', '.log'}:
# ---> THE FIX: Catch disguised payloads before the early exit! <---
shebang_lang = self._tier_2_fingerprint_check(content_sample, ext)
if shebang_lang and shebang_lang != "undeterminable":
self.logger.warning(f"[{name}] IDENTITY CRISIS: Prose Ext '{ext}' contradicts Shebang '{shebang_lang}'")
result["anomaly_flags"].append(f"Identity Masking: Prose Extension ({ext}) vs Shebang ({shebang_lang})")
return self._forge_result("undeterminable", 0.0, 5, f"Identity Conflict ({ext} != {shebang_lang})", result, content_sample)

target_id = "markdown" if ext in {'.md', '.mdx'} else "plaintext"
return self._forge_result(target_id, self.thresholds.get("PROSE_CONFIDENCE", 0.95), 1, f"Prose Extension ({ext})", result, content_sample)

Expand Down
73 changes: 73 additions & 0 deletions tests/test_chronometer_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import unittest
import time
from pathlib import Path
from unittest.mock import MagicMock, patch

# Adjust the import path if necessary based on your actual module structure
from gitgalaxy.physics.chronometer import Chronometer

class TestChronometerTimeout(unittest.TestCase):

@patch('gitgalaxy.physics.chronometer.subprocess.Popen')
@patch.object(Chronometer, '_calibrate_temporal_field') # Skip the heavy init sequence
def test_zombie_process_kill_switch(self, mock_calibrate, mock_popen):
"""
Simulates an infinite, hanging Git stream.
Verifies that the Chronometer respects the timeout and successfully
reaps the zombie process at the OS level via kill() and communicate().
"""
# 1. Setup the Infinite Stream
def infinite_git_log():
while True:
# Yields a valid line so the internal logic has to do work
yield "mock_hash|1700000000|TestAuthor\n"
yield "src/safe_file.py\n"

# We attach the generator to a MagicMock so we can assert .close() is called on it later
mock_stdout = MagicMock()
mock_stdout.__iter__.return_value = infinite_git_log()
mock_stderr = MagicMock()

# 2. Setup the Mock Process
mock_process = MagicMock()
mock_process.stdout = mock_stdout
mock_process.stderr = mock_stderr
mock_popen.return_value = mock_process

# 3. Initialize Chronometer (calibration is bypassed)
chrono = Chronometer(Path("/mock/repo"))

# 4. Ignite the escalator with a tiny timeout (50ms)
timeout_limit = 0.05
start_time = time.time()

processed_lines, reached_target = chrono._run_git_stream_escalator(
cmd=["git", "log", "mock_args"],
ignored_hashes=set(),
tracked_files=set(),
required_files=10000,
timeout_limit=timeout_limit,
start_time=start_time
)

# =====================================================================
# 5. INVARIANT ASSERTIONS (The Proof)
# =====================================================================

# Check that it actually processed lines, but was interrupted by the timeout
self.assertTrue(processed_lines > 0, "The stream should have processed lines before timing out.")
self.assertFalse(reached_target, "The escalator should have aborted before reaching the file target.")

# --- THE ZOMBIE KILL SWITCH VERIFICATION ---
# Did we send the SIGKILL?
mock_process.kill.assert_called_once()

# Did we flush the pipes? (Crucial: kill() without communicate() leaves a zombie)
mock_process.communicate.assert_called_once()

# Did we close the file descriptors to prevent FD leaks?
mock_stdout.close.assert_called_once()
mock_stderr.close.assert_called_once()

if __name__ == '__main__':
unittest.main()
51 changes: 51 additions & 0 deletions tests/test_gpu_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import unittest
from gitgalaxy.recorders.gpu_recorder import GPURecorder

class TestGPURecorderEviction(unittest.TestCase):

def test_destructive_ram_eviction(self):
"""
Verifies Stage 3.3: Destructive RAM Eviction.
Ensures the GPU Recorder physically destroys the input arrays via .pop()
to free memory, preventing OOM crashes on massive repositories.
"""
recorder = GPURecorder(version="6.3.0")

# 1. Create the dummy arrays (passed by reference)
mock_parsed_files = [
{"path": f"src/file_{i}.py", "lang_id": "python", "total_loc": 100, "telemetry": {}}
for i in range(5)
]

mock_unparsable = [
{"path": f"bin/payload_{i}.dll", "reason": "Binary"}
for i in range(2)
]

# Verify they actually have data before we start
self.assertEqual(len(mock_parsed_files), 5)
self.assertEqual(len(mock_unparsable), 2)

# 2. Execute the GPU Recorder
result = recorder.record_mission(
parsed_files=mock_parsed_files,
unparsable_files=mock_unparsable,
summary={"unparsable_files": {}},
forensic_report={},
repo_name="test_repo"
)

# =====================================================================
# 3. INVARIANT ASSERTIONS (The Proof)
# =====================================================================

# A) Did it actually build the payload successfully?
self.assertIn("galaxy", result, "GPU Recorder failed to build the galaxy payload.")
self.assertTrue(len(result["galaxy"]["paths"]) == 5, "GPU Recorder missed files in the output.")

# B) THE EVICTION CONTRACT: Are the original RAM arrays completely destroyed?
self.assertEqual(len(mock_parsed_files), 0, "FATAL: GPU Recorder failed to evict parsed_files from RAM!")
self.assertEqual(len(mock_unparsable), 0, "FATAL: GPU Recorder failed to evict unparsable_files from RAM!")

if __name__ == '__main__':
unittest.main()
48 changes: 48 additions & 0 deletions tests/test_language_lens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import unittest
from gitgalaxy.standards.language_lens import LanguageDetector

class TestLanguageLensSecurity(unittest.TestCase):

def test_identity_crisis_trap_blocks_disguised_payloads(self):
"""
Simulates a file attempting to mask its identity (e.g., .txt extension but a bash shebang).
Verifies that the Identity Crisis Trap catches the contradiction, strips the file's
identity to 'undeterminable', and forces it to Tier 5 (Absolute Distrust).
"""
# 1. Provide minimal mock definitions so the detector knows what 'bash' and 'txt' are
mock_lang_defs = {
"shell": {
"extensions": [".sh", ".bash"],
"shebangs": ["bash", "sh"]
},
"plaintext": {
"extensions": [".txt", ".log"]
}
}

# 2. Initialize the detector
detector = LanguageDetector(language_definitions=mock_lang_defs, comment_definitions={})

# 3. Create the malicious file: claims to be text, but acts like a shell script
mock_path = "uploads/innocent.txt"
mock_content = "#!/bin/bash\nrm -rf /\n"

# 4. Run the inspection
result = detector.inspect(mock_path, content_sample=mock_content)

# =====================================================================
# 5. INVARIANT ASSERTIONS (The Proof)
# =====================================================================

# 1. Identity MUST be stripped to prevent downstream execution math
self.assertEqual(result["lang_id"], "undeterminable", "Detector failed to strip the identity of the masked payload!")

# 2. Must be banished to Tier 5 (Absolute Distrust)
self.assertEqual(result["lock_tier"], 5, "Detector did not relegate the deceptive file to Tier 5!")

# 3. Anomaly Flag must be registered in RAM so the SecurityLens can pick it up
self.assertTrue(len(result["anomaly_flags"]) > 0, "No anomaly flag was registered in the telemetry!")
self.assertIn("Identity Masking", result["anomaly_flags"][0], "The specific Identity Masking flag was missing.")

if __name__ == '__main__':
unittest.main()
60 changes: 60 additions & 0 deletions tests/test_neural_auditor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import unittest
import tempfile
import struct
import os
from gitgalaxy.physics.neural_auditor import NeuralAuditor

class TestNeuralAuditorHeaders(unittest.TestCase):

def setUp(self):
# Initialize the auditor once for all tests in this class
self.auditor = NeuralAuditor()

def test_truncated_safetensors_file(self):
"""
Simulates a file that is too small to even contain the 8-byte
integer required by the safetensors format specification.
"""
# Create a temporary file with a .safetensors extension
with tempfile.NamedTemporaryFile(suffix='.safetensors', delete=False) as temp_file:
# Write exactly 4 bytes of garbage data (struct.unpack requires 8)
temp_file.write(b'\x01\x02\x03\x04')
temp_path = temp_file.name

try:
# If the auditor is robust, this will NOT crash.
# It should hit the broad except block and return the safe fallback.
result = self.auditor.audit_model(temp_path)

self.assertEqual(result["architecture"], "Corrupted/Unknown")
self.assertEqual(result["parameters"], "Error")
self.assertEqual(result["quantization"], "Error")
finally:
# Always clean up the temporary file
os.remove(temp_path)

def test_corrupted_json_header(self):
"""
Simulates a file that has a valid 8-byte size integer,
but the subsequent bytes are corrupted/invalid JSON.
"""
with tempfile.NamedTemporaryFile(suffix='.safetensors', delete=False) as temp_file:
# 1. Pack the number '10' into an 8-byte little-endian unsigned long long
header_size_bytes = struct.pack('<Q', 10)
temp_file.write(header_size_bytes)

# 2. Write 10 bytes of invalid, un-parsable JSON
temp_file.write(b'{GARBAGE_}')
temp_path = temp_file.name

try:
# This should trigger a json.decoder.JSONDecodeError internally,
# which must be safely caught by the auditor.
result = self.auditor.audit_model(temp_path)

self.assertEqual(result["architecture"], "Corrupted/Unknown")
finally:
os.remove(temp_path)

if __name__ == '__main__':
unittest.main()
74 changes: 74 additions & 0 deletions tests/test_redos_poison.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import unittest
import time
from gitgalaxy.core.detector import LogicSplicer
from gitgalaxy.core.prism import Prism

class TestReDoSPoisoning(unittest.TestCase):

def test_logic_splicer_balanced_end_pathological(self):
"""
Simulates a pathologically nested file (50,000+ opening braces).
Verifies that the C-backed .find() loop survives and exits instantly.
"""
# Initialize with a dummy language config
splicer = LogicSplicer("cpp", {})

# Create a massive string of entirely unbalanced opening braces
poison_string = "{" * 60000

start_time = time.time()
# The search should hit the HANDSHAKE_LOOKAHEAD_LIMIT and exit cleanly
end_idx = splicer._find_balanced_end(poison_string, 0, "{", "}")
duration = time.time() - start_time

# 1. Did it respect the lookahead limit? (Should stop at 50,000 max)
self.assertTrue(end_idx <= 50000, "Splicer failed to clamp the lookahead limit!")

# 2. Did it run in O(N) time? (Should take less than 0.1 seconds)
self.assertTrue(duration < 0.5, f"LogicSplicer hung for {duration:.2f} seconds!")

def test_prism_nested_peel_limit(self):
"""
Simulates a pathologically nested block comment (Rust/Swift style).
Verifies that NESTED_PEEL_LIMIT stops infinite loops.
"""
comment_defs = {
"mechanical_families": {
"nested_c": {
"delimiters": ["//", "/*", "*/"]
}
}
}
prism = Prism(comment_definitions=comment_defs, language_definitions={})

# Create a string with 2,000 nested opening block comments and one closer
poison_comment = "/* " * 2000 + " */"

start_time = time.time()
# _refract_nested uses a while-peel loop that must clamp to NESTED_PEEL_LIMIT
code, lits = prism._refract_nested(poison_comment)

Check notice

Code scanning / CodeQL

Unused local variable Note test

Variable code is not used.

Check notice

Code scanning / CodeQL

Unused local variable Note test

Variable lits is not used.
duration = time.time() - start_time

# If the peel limit (500) works, this exits almost instantly rather than looping 2000 times
self.assertTrue(duration < 0.5, f"Prism nested peel hung for {duration:.2f} seconds!")

def test_prism_refract_unbalanced_quotes(self):
"""
Simulates massive unbalanced quotes and braces sent to the main refract function.
Verifies the balanced scoping implementation doesn't get trapped by escaped quotes.
"""
prism = Prism(comment_definitions={}, language_definitions={})

# 20,000 alternating escaped quotes and braces
poison_string = "{\\\" " * 10000

start_time = time.time()
# Pass it through the main entry point to hit _find_balanced_end with quote-tracking
result = prism.refract(poison_string, "cpp")

Check notice

Code scanning / CodeQL

Unused local variable Note test

Variable result is not used.
duration = time.time() - start_time

# The character-by-character scan should still process 20k chars in milliseconds
self.assertTrue(duration < 0.5, f"Prism refract hung for {duration:.2f} seconds!")

if __name__ == '__main__':
unittest.main()
36 changes: 36 additions & 0 deletions tests/test_zero_dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,42 @@ def test_fallback_does_not_crash_signal_processor(self):

except TypeError as e:
self.fail(f"Zero-Dependency Mode crashed the Signal Processor! Error: {e}")

@patch('gitgalaxy.security.security_auditor.ML_AVAILABLE', False)
def test_fallback_does_not_crash_security_auditor(self):
"""
Simulates a user running GalaxyScope without 'xgboost' or 'pandas' installed.
Ensures that the ML Threat Inference gracefully bypasses without crashing,
while still executing the dependency graph resolution.
"""
# We must import it inside the test or at the top of the file
from gitgalaxy.security.security_auditor import SecurityAuditor

# 1. Initialize the blinded auditor
auditor = SecurityAuditor(model_path="dummy_path.json")

# 2. Create a mock star
mock_stars = [{
"path": "src/safe_file.py",
"telemetry": {},
"raw_imports": ["src/other_file.py"]
}]

try:
# 3. Force the auditor to process the stars
result_stars = auditor.audit_galaxy(mock_stars)

# 4. INVARIANT ASSERTIONS
self.assertEqual(len(result_stars), 1, "Auditor should return the exact same number of stars.")

# Verify the ML threat assessment was safely skipped
self.assertFalse(result_stars[0].get("is_ml_threat", False), "Blinded auditor should not flag ML threats.")

# Verify the first half of the function (dependency graph mapping) STILL worked!
self.assertIn("dependency_network", result_stars[0], "Auditor failed to run the dependency graph resolution fallback.")

except Exception as e:
self.fail(f"Zero-Dependency Mode crashed the Security Auditor! Error: {e}")

if __name__ == '__main__':
unittest.main()
Loading