diff --git a/python-sdk/codepathfinder/c_decorators.py b/python-sdk/codepathfinder/c_decorators.py new file mode 100644 index 00000000..98e7732c --- /dev/null +++ b/python-sdk/codepathfinder/c_decorators.py @@ -0,0 +1,138 @@ +""" +Decorators for C security rules. + +Mirrors `go_decorators.py` exactly. The only behavioural difference is the +language tag injected into dataflow IR: ``language="c"`` so the executor +scopes analysis to nodes with ``Node.Language == "c"``. + +Pure ``calls()`` matchers (``type == "call_matcher"``) are NOT language-scoped, +matching the @go_rule contract — see PR-11 spec, Gap 1 / Gap 4. +""" + +import atexit +import json +import sys +from typing import Callable, List +from dataclasses import dataclass + + +@dataclass +class CRuleMetadata: + """Metadata for a C security rule.""" + + id: str + name: str = "" + severity: str = "MEDIUM" + category: str = "security" + cwe: str = "" + cve: str = "" + tags: str = "" + message: str = "" + owasp: str = "" + + +@dataclass +class CRuleDefinition: + """Complete definition of a C security rule.""" + + metadata: CRuleMetadata + matcher: dict + rule_function: Callable + + +_c_rules: List[CRuleDefinition] = [] +_auto_execute_enabled = False + + +def _enable_auto_execute() -> None: + """Enable automatic rule compilation and stdout JSON output at script exit.""" + global _auto_execute_enabled + if _auto_execute_enabled: + return + _auto_execute_enabled = True + + def _output_rules(): + if not _c_rules: + return + from . import c_ir + + compiled = c_ir.compile_all_rules() + print(json.dumps(compiled)) + + atexit.register(_output_rules) + + +def _register_rule() -> None: + """Enable auto-execute when a rule file is run as ``__main__``.""" + frame = sys._getframe(2) + if frame.f_globals.get("__name__") == "__main__": + _enable_auto_execute() + + +def c_rule( + id: str, + name: str = "", + severity: str = "MEDIUM", + category: str = "security", + cwe: str = "", + cve: str = "", + tags: str = "", + message: str = "", + owasp: str = "", +) -> Callable: + """ + Decorator for C security rules. Mirrors @go_rule. + + Sets ``language="c"`` on the DataflowMatcher dict so DataflowExecutor + scopes analysis to C functions only. Only affects flows() rules + (``type=="dataflow"``); pure calls() rules remain language-agnostic. + """ + + def decorator(func: Callable) -> Callable: + matcher_result = func() + + if hasattr(matcher_result, "to_ir"): + matcher_dict = matcher_result.to_ir() + elif hasattr(matcher_result, "to_dict"): + matcher_dict = matcher_result.to_dict() + elif isinstance(matcher_result, dict): + matcher_dict = matcher_result + else: + raise ValueError(f"Rule {id} must return a matcher or dict") + + if isinstance(matcher_dict, dict) and matcher_dict.get("type") == "dataflow": + matcher_dict["language"] = "c" + + metadata = CRuleMetadata( + id=id, + name=name or func.__name__.replace("_", " ").title(), + severity=severity, + category=category, + cwe=cwe, + cve=cve, + tags=tags, + message=message or f"Security issue detected by {id}", + owasp=owasp, + ) + rule_def = CRuleDefinition( + metadata=metadata, + matcher=matcher_dict, + rule_function=func, + ) + _c_rules.append(rule_def) + _register_rule() + + return func + + return decorator + + +def get_c_rules() -> List[CRuleDefinition]: + """Return a snapshot of registered C rules.""" + return _c_rules.copy() + + +def clear_c_rules() -> None: + """Clear all registered C rules (test isolation).""" + global _c_rules + _c_rules = [] diff --git a/python-sdk/codepathfinder/c_ir.py b/python-sdk/codepathfinder/c_ir.py new file mode 100644 index 00000000..2d73624a --- /dev/null +++ b/python-sdk/codepathfinder/c_ir.py @@ -0,0 +1,40 @@ +""" +JSON IR (Intermediate Representation) compiler for C security rules. + +Mirrors `go_ir.py`. Emits ``language="c"`` in rule metadata for +display/filtering. The same field is also present inside the matcher dict +(injected by ``@c_rule``) for runtime DataflowExecutor scoping. +""" + +from typing import List, Dict, Any + +from .c_decorators import get_c_rules + + +def compile_c_rules() -> List[Dict[str, Any]]: + """Compile all registered C rules into the JSON IR list expected by the Go executor.""" + rules = get_c_rules() + compiled = [] + + for rule in rules: + ir = { + "rule": { + "id": rule.metadata.id, + "name": rule.metadata.name, + "severity": rule.metadata.severity.lower(), + "cwe": rule.metadata.cwe, + "owasp": rule.metadata.owasp, + "description": rule.metadata.message + or f"Security issue: {rule.metadata.id}", + "language": "c", + }, + "matcher": rule.matcher, + } + compiled.append(ir) + + return compiled + + +def compile_all_rules() -> List[Dict[str, Any]]: + """Compile all C rules to the JSON IR array format.""" + return compile_c_rules() diff --git a/python-sdk/codepathfinder/cpp_decorators.py b/python-sdk/codepathfinder/cpp_decorators.py new file mode 100644 index 00000000..8e9b9e46 --- /dev/null +++ b/python-sdk/codepathfinder/cpp_decorators.py @@ -0,0 +1,138 @@ +""" +Decorators for C++ security rules. + +Mirrors `c_decorators.py` / `go_decorators.py`. The only behavioural +difference is the language tag injected into dataflow IR: ``language="cpp"`` +so the executor scopes analysis to nodes with ``Node.Language == "cpp"``. + +Pure ``calls()`` matchers (``type == "call_matcher"``) are NOT language-scoped, +matching the @go_rule contract — see PR-11 spec, Gap 1 / Gap 4. +""" + +import atexit +import json +import sys +from typing import Callable, List +from dataclasses import dataclass + + +@dataclass +class CppRuleMetadata: + """Metadata for a C++ security rule.""" + + id: str + name: str = "" + severity: str = "MEDIUM" + category: str = "security" + cwe: str = "" + cve: str = "" + tags: str = "" + message: str = "" + owasp: str = "" + + +@dataclass +class CppRuleDefinition: + """Complete definition of a C++ security rule.""" + + metadata: CppRuleMetadata + matcher: dict + rule_function: Callable + + +_cpp_rules: List[CppRuleDefinition] = [] +_auto_execute_enabled = False + + +def _enable_auto_execute() -> None: + """Enable automatic rule compilation and stdout JSON output at script exit.""" + global _auto_execute_enabled + if _auto_execute_enabled: + return + _auto_execute_enabled = True + + def _output_rules(): + if not _cpp_rules: + return + from . import cpp_ir + + compiled = cpp_ir.compile_all_rules() + print(json.dumps(compiled)) + + atexit.register(_output_rules) + + +def _register_rule() -> None: + """Enable auto-execute when a rule file is run as ``__main__``.""" + frame = sys._getframe(2) + if frame.f_globals.get("__name__") == "__main__": + _enable_auto_execute() + + +def cpp_rule( + id: str, + name: str = "", + severity: str = "MEDIUM", + category: str = "security", + cwe: str = "", + cve: str = "", + tags: str = "", + message: str = "", + owasp: str = "", +) -> Callable: + """ + Decorator for C++ security rules. Mirrors @go_rule / @c_rule. + + Sets ``language="cpp"`` on the DataflowMatcher dict so DataflowExecutor + scopes analysis to C++ functions only. Only affects flows() rules + (``type=="dataflow"``); pure calls() rules remain language-agnostic. + """ + + def decorator(func: Callable) -> Callable: + matcher_result = func() + + if hasattr(matcher_result, "to_ir"): + matcher_dict = matcher_result.to_ir() + elif hasattr(matcher_result, "to_dict"): + matcher_dict = matcher_result.to_dict() + elif isinstance(matcher_result, dict): + matcher_dict = matcher_result + else: + raise ValueError(f"Rule {id} must return a matcher or dict") + + if isinstance(matcher_dict, dict) and matcher_dict.get("type") == "dataflow": + matcher_dict["language"] = "cpp" + + metadata = CppRuleMetadata( + id=id, + name=name or func.__name__.replace("_", " ").title(), + severity=severity, + category=category, + cwe=cwe, + cve=cve, + tags=tags, + message=message or f"Security issue detected by {id}", + owasp=owasp, + ) + rule_def = CppRuleDefinition( + metadata=metadata, + matcher=matcher_dict, + rule_function=func, + ) + _cpp_rules.append(rule_def) + _register_rule() + + return func + + return decorator + + +def get_cpp_rules() -> List[CppRuleDefinition]: + """Return a snapshot of registered C++ rules.""" + return _cpp_rules.copy() + + +def clear_cpp_rules() -> None: + """Clear all registered C++ rules (test isolation).""" + global _cpp_rules + _cpp_rules = [] diff --git a/python-sdk/codepathfinder/cpp_ir.py b/python-sdk/codepathfinder/cpp_ir.py new file mode 100644 index 00000000..5ce4d00c --- /dev/null +++ b/python-sdk/codepathfinder/cpp_ir.py @@ -0,0 +1,40 @@ +""" +JSON IR (Intermediate Representation) compiler for C++ security rules. + +Mirrors `c_ir.py`. Emits ``language="cpp"`` in rule metadata for +display/filtering. The same field is also present inside the matcher dict +(injected by ``@cpp_rule``) for runtime DataflowExecutor scoping. +""" + +from typing import List, Dict, Any + +from .cpp_decorators import get_cpp_rules + + +def compile_cpp_rules() -> List[Dict[str, Any]]: + """Compile all registered C++ rules into the JSON IR list expected by the Go executor.""" + rules = get_cpp_rules() + compiled = [] + + for rule in rules: + ir = { + "rule": { + "id": rule.metadata.id, + "name": rule.metadata.name, + "severity": rule.metadata.severity.lower(), + "cwe": rule.metadata.cwe, + "owasp": rule.metadata.owasp, + "description": rule.metadata.message + or f"Security issue: {rule.metadata.id}", + "language": "cpp", + }, + "matcher": rule.matcher, + } + compiled.append(ir) + + return compiled + + +def compile_all_rules() -> List[Dict[str, Any]]: + """Compile all C++ rules to the JSON IR array format.""" + return compile_cpp_rules() diff --git a/python-sdk/rules/c_decorators.py b/python-sdk/rules/c_decorators.py new file mode 100644 index 00000000..9209df38 --- /dev/null +++ b/python-sdk/rules/c_decorators.py @@ -0,0 +1,11 @@ +""" +Backward-compatibility shim. c_decorators has moved to the codepathfinder package. +Import from: from codepathfinder.c_decorators import c_rule +""" +from codepathfinder.c_decorators import ( # noqa: F401 + CRuleMetadata, + CRuleDefinition, + c_rule, + get_c_rules, + clear_c_rules, +) diff --git a/python-sdk/rules/c_ir.py b/python-sdk/rules/c_ir.py new file mode 100644 index 00000000..e065a6cb --- /dev/null +++ b/python-sdk/rules/c_ir.py @@ -0,0 +1,7 @@ +""" +Backward-compatibility shim. c_ir has moved to the codepathfinder package. +""" +from codepathfinder.c_ir import ( # noqa: F401 + compile_c_rules, + compile_all_rules, +) diff --git a/python-sdk/rules/cpp_decorators.py b/python-sdk/rules/cpp_decorators.py new file mode 100644 index 00000000..962dd5cb --- /dev/null +++ b/python-sdk/rules/cpp_decorators.py @@ -0,0 +1,11 @@ +""" +Backward-compatibility shim. cpp_decorators has moved to the codepathfinder package. +Import from: from codepathfinder.cpp_decorators import cpp_rule +""" +from codepathfinder.cpp_decorators import ( # noqa: F401 + CppRuleMetadata, + CppRuleDefinition, + cpp_rule, + get_cpp_rules, + clear_cpp_rules, +) diff --git a/python-sdk/rules/cpp_ir.py b/python-sdk/rules/cpp_ir.py new file mode 100644 index 00000000..228e7316 --- /dev/null +++ b/python-sdk/rules/cpp_ir.py @@ -0,0 +1,7 @@ +""" +Backward-compatibility shim. cpp_ir has moved to the codepathfinder package. +""" +from codepathfinder.cpp_ir import ( # noqa: F401 + compile_cpp_rules, + compile_all_rules, +) diff --git a/python-sdk/tests/test_c_rule.py b/python-sdk/tests/test_c_rule.py new file mode 100644 index 00000000..9b8bc8be --- /dev/null +++ b/python-sdk/tests/test_c_rule.py @@ -0,0 +1,236 @@ +"""Tests for the @c_rule decorator and the C IR compiler.""" + +import json + +import pytest + +from codepathfinder import calls, flows +from codepathfinder.presets import PropagationPresets +from codepathfinder.c_decorators import c_rule, get_c_rules, clear_c_rules +from codepathfinder.c_ir import compile_c_rules, compile_all_rules + + +@pytest.fixture(autouse=True) +def _clear_rules(): + """Reset the global rule registry around every test.""" + clear_c_rules() + yield + clear_c_rules() + + +# ========== Decorator metadata + registration ========== + + +class TestCRuleDecorator: + def test_basic_rule_registers_once(self): + @c_rule(id="C-TEST-001", severity="HIGH", cwe="CWE-78") + def c_command_injection(): + return calls("system", "popen") + + rules = get_c_rules() + assert len(rules) == 1 + assert rules[0].metadata.id == "C-TEST-001" + assert rules[0].metadata.severity == "HIGH" + assert rules[0].metadata.cwe == "CWE-78" + + def test_default_name_derived_from_func(self): + @c_rule(id="C-TEST-002") + def c_unsafe_string_copy(): + return calls("strcpy") + + rules = get_c_rules() + assert rules[0].metadata.name == "C Unsafe String Copy" + + def test_explicit_name_wins(self): + @c_rule(id="C-TEST-003", name="Override Name") + def c_anything(): + return calls("foo") + + assert get_c_rules()[0].metadata.name == "Override Name" + + def test_full_metadata(self): + @c_rule( + id="C-NET-001", + name="C SSRF", + severity="HIGH", + category="net", + cwe="CWE-918", + cve="CVE-2024-9999", + tags="c,ssrf", + message="User input flows to network call", + owasp="A10:2021", + ) + def c_ssrf(): + return flows( + from_sources=[calls("recv")], + to_sinks=[calls("connect")], + propagates_through=PropagationPresets.standard(), + scope="global", + ) + + meta = get_c_rules()[0].metadata + assert meta.name == "C SSRF" + assert meta.category == "net" + assert meta.cve == "CVE-2024-9999" + assert meta.tags == "c,ssrf" + assert meta.owasp == "A10:2021" + assert meta.message == "User input flows to network call" + + def test_default_message_when_missing(self): + @c_rule(id="C-MSG-001") + def c_default_msg(): + return calls("strcpy") + + assert ( + get_c_rules()[0].metadata.message + == "Security issue detected by C-MSG-001" + ) + + def test_returns_underlying_function(self): + @c_rule(id="C-RET-001") + def c_identity(): + return calls("strcpy") + + # Decorator must preserve the original callable so atexit + repeated + # invocations work the same as @go_rule. + assert callable(c_identity) + assert c_identity.__name__ == "c_identity" + + +# ========== Language injection contract ========== + + +class TestLanguageInjection: + def test_language_injected_into_dataflow(self): + @c_rule(id="C-DF-001", severity="MEDIUM") + def c_buffer_overflow(): + return flows( + from_sources=[calls("gets", "scanf")], + to_sinks=[calls("strcpy", "strcat")], + propagates_through=PropagationPresets.standard(), + scope="local", + ) + + matcher = get_c_rules()[0].matcher + assert matcher["type"] == "dataflow" + assert matcher["language"] == "c" + + def test_language_NOT_injected_for_call_matcher(self): + """Pure calls() rules are language-agnostic — same as @go_rule.""" + + @c_rule(id="C-CM-001", severity="LOW") + def c_calls_only(): + return calls("system") + + matcher = get_c_rules()[0].matcher + assert matcher["type"] == "call_matcher" + assert "language" not in matcher + + def test_dict_matcher_is_passed_through(self): + @c_rule(id="C-DICT-001") + def c_raw_dict(): + return {"type": "dataflow", "sources": [], "sinks": []} + + matcher = get_c_rules()[0].matcher + assert matcher["language"] == "c" + + def test_invalid_matcher_raises(self): + with pytest.raises(ValueError, match="C-BAD-001"): + + @c_rule(id="C-BAD-001") + def c_bad(): + return 42 # not a matcher / dict + + +# ========== c_ir.compile_c_rules ========== + + +class TestCIRCompiler: + def test_compile_empty(self): + assert compile_c_rules() == [] + assert compile_all_rules() == [] + + def test_compile_single_dataflow_rule(self): + @c_rule(id="C-001", severity="CRITICAL", cwe="CWE-120", owasp="A03:2021") + def c_buffer_overflow(): + return flows( + from_sources=[calls("gets")], + to_sinks=[calls("strcpy")], + propagates_through=PropagationPresets.standard(), + scope="global", + ) + + result = compile_c_rules() + assert len(result) == 1 + + ir = result[0] + assert ir["rule"]["id"] == "C-001" + assert ir["rule"]["severity"] == "critical" + assert ir["rule"]["cwe"] == "CWE-120" + assert ir["rule"]["owasp"] == "A03:2021" + assert ir["rule"]["language"] == "c" + + matcher = ir["matcher"] + assert matcher["type"] == "dataflow" + assert matcher["language"] == "c" + assert matcher["scope"] == "global" + + def test_compile_call_matcher_rule_keeps_metadata_language(self): + """`rule.language` is "c" even when the matcher is a pure calls() one.""" + + @c_rule(id="C-002", severity="HIGH") + def c_format_string(): + return calls("printf", "sprintf") + + ir = compile_c_rules()[0] + assert ir["rule"]["language"] == "c" + assert ir["matcher"]["type"] == "call_matcher" + assert "language" not in ir["matcher"] + + def test_compile_default_description_when_message_missing(self): + # The decorator fills metadata.message with a default when blank, + # so compile_c_rules must surface that as the IR description. + @c_rule(id="C-NOMSG-001") + def c_nomsg(): + return calls("strcpy") + + ir = compile_c_rules()[0] + assert ir["rule"]["description"] == "Security issue detected by C-NOMSG-001" + + def test_compile_json_serializable(self): + @c_rule(id="C-JSON-001", severity="HIGH") + def c_json_round_trip(): + return flows( + from_sources=[calls("recv")], + to_sinks=[calls("strcpy")], + propagates_through=PropagationPresets.standard(), + scope="global", + ) + + encoded = json.dumps(compile_c_rules()) + parsed = json.loads(encoded) + assert parsed[0]["rule"]["language"] == "c" + assert parsed[0]["matcher"]["language"] == "c" + + +# ========== Registry hygiene ========== + + +class TestRegistryIsolation: + def test_clear_resets_state(self): + @c_rule(id="C-X-001") + def c_x(): + return calls("strcpy") + + assert len(get_c_rules()) == 1 + clear_c_rules() + assert get_c_rules() == [] + + def test_get_returns_a_copy(self): + @c_rule(id="C-COPY-001") + def c_copy(): + return calls("strcpy") + + snapshot = get_c_rules() + snapshot.clear() + assert len(get_c_rules()) == 1, "external mutation must not affect registry" diff --git a/python-sdk/tests/test_cpp_rule.py b/python-sdk/tests/test_cpp_rule.py new file mode 100644 index 00000000..92782c37 --- /dev/null +++ b/python-sdk/tests/test_cpp_rule.py @@ -0,0 +1,233 @@ +"""Tests for the @cpp_rule decorator and the C++ IR compiler.""" + +import json + +import pytest + +from codepathfinder import calls, flows +from codepathfinder.presets import PropagationPresets +from codepathfinder.cpp_decorators import cpp_rule, get_cpp_rules, clear_cpp_rules +from codepathfinder.cpp_ir import compile_cpp_rules, compile_all_rules + + +@pytest.fixture(autouse=True) +def _clear_rules(): + """Reset the global rule registry around every test.""" + clear_cpp_rules() + yield + clear_cpp_rules() + + +# ========== Decorator metadata + registration ========== + + +class TestCppRuleDecorator: + def test_basic_rule_registers_once(self): + @cpp_rule(id="CPP-TEST-001", severity="HIGH", cwe="CWE-78") + def cpp_command_injection(): + return calls("system", "popen") + + rules = get_cpp_rules() + assert len(rules) == 1 + assert rules[0].metadata.id == "CPP-TEST-001" + assert rules[0].metadata.severity == "HIGH" + + def test_default_name_derived_from_func(self): + @cpp_rule(id="CPP-TEST-002") + def cpp_unsafe_resource(): + return calls("fopen") + + assert get_cpp_rules()[0].metadata.name == "Cpp Unsafe Resource" + + def test_full_metadata(self): + @cpp_rule( + id="CPP-NET-001", + name="C++ SSRF", + severity="HIGH", + category="net", + cwe="CWE-918", + cve="CVE-2024-9999", + tags="cpp,ssrf", + message="User input flows to network call", + owasp="A10:2021", + ) + def cpp_ssrf(): + return flows( + from_sources=[calls("recv")], + to_sinks=[calls("connect")], + propagates_through=PropagationPresets.standard(), + scope="global", + ) + + meta = get_cpp_rules()[0].metadata + assert meta.name == "C++ SSRF" + assert meta.cve == "CVE-2024-9999" + assert meta.tags == "cpp,ssrf" + assert meta.owasp == "A10:2021" + + def test_default_message_when_missing(self): + @cpp_rule(id="CPP-MSG-001") + def cpp_default_msg(): + return calls("strcpy") + + assert ( + get_cpp_rules()[0].metadata.message + == "Security issue detected by CPP-MSG-001" + ) + + def test_returns_underlying_function(self): + @cpp_rule(id="CPP-RET-001") + def cpp_identity(): + return calls("strcpy") + + assert callable(cpp_identity) + assert cpp_identity.__name__ == "cpp_identity" + + +# ========== Language injection contract ========== + + +class TestLanguageInjection: + def test_language_injected_into_dataflow(self): + @cpp_rule(id="CPP-DF-001") + def cpp_resource_management(): + return flows( + from_sources=[calls("fopen")], + to_sinks=[calls("write")], + propagates_through=PropagationPresets.standard(), + scope="local", + ) + + matcher = get_cpp_rules()[0].matcher + assert matcher["type"] == "dataflow" + assert matcher["language"] == "cpp" + # Critical: must NOT collide with the C decorator's tag. + assert matcher["language"] != "c" + + def test_language_NOT_injected_for_call_matcher(self): + @cpp_rule(id="CPP-CM-001", severity="LOW") + def cpp_calls_only(): + return calls("system") + + matcher = get_cpp_rules()[0].matcher + assert matcher["type"] == "call_matcher" + assert "language" not in matcher + + def test_dict_matcher_is_passed_through(self): + @cpp_rule(id="CPP-DICT-001") + def cpp_raw_dict(): + return {"type": "dataflow", "sources": [], "sinks": []} + + matcher = get_cpp_rules()[0].matcher + assert matcher["language"] == "cpp" + + def test_invalid_matcher_raises(self): + with pytest.raises(ValueError, match="CPP-BAD-001"): + + @cpp_rule(id="CPP-BAD-001") + def cpp_bad(): + return 42 # not a matcher / dict + + +# ========== cpp_ir.compile_cpp_rules ========== + + +class TestCppIRCompiler: + def test_compile_empty(self): + assert compile_cpp_rules() == [] + assert compile_all_rules() == [] + + def test_compile_single_dataflow_rule(self): + @cpp_rule(id="CPP-001", severity="CRITICAL", cwe="CWE-120", owasp="A03:2021") + def cpp_buffer_overflow(): + return flows( + from_sources=[calls("gets")], + to_sinks=[calls("strcpy")], + propagates_through=PropagationPresets.standard(), + scope="global", + ) + + ir = compile_cpp_rules()[0] + assert ir["rule"]["id"] == "CPP-001" + assert ir["rule"]["severity"] == "critical" + assert ir["rule"]["language"] == "cpp" + + matcher = ir["matcher"] + assert matcher["type"] == "dataflow" + assert matcher["language"] == "cpp" + + def test_compile_default_description_when_message_missing(self): + # Decorator already fills metadata.message with a default when blank; + # compile_cpp_rules must surface it as the IR description. + @cpp_rule(id="CPP-NOMSG-001") + def cpp_nomsg(): + return calls("strcpy") + + ir = compile_cpp_rules()[0] + assert ( + ir["rule"]["description"] == "Security issue detected by CPP-NOMSG-001" + ) + + def test_compile_json_serializable(self): + @cpp_rule(id="CPP-JSON-001", severity="HIGH") + def cpp_json_round_trip(): + return flows( + from_sources=[calls("recv")], + to_sinks=[calls("strcpy")], + propagates_through=PropagationPresets.standard(), + scope="global", + ) + + encoded = json.dumps(compile_cpp_rules()) + parsed = json.loads(encoded) + assert parsed[0]["rule"]["language"] == "cpp" + assert parsed[0]["matcher"]["language"] == "cpp" + + +# ========== Registry hygiene + decorator independence ========== + + +class TestRegistryIsolation: + def test_clear_resets_state(self): + @cpp_rule(id="CPP-X-001") + def cpp_x(): + return calls("strcpy") + + assert len(get_cpp_rules()) == 1 + clear_cpp_rules() + assert get_cpp_rules() == [] + + def test_get_returns_a_copy(self): + @cpp_rule(id="CPP-COPY-001") + def cpp_copy(): + return calls("strcpy") + + snapshot = get_cpp_rules() + snapshot.clear() + assert len(get_cpp_rules()) == 1, "external mutation must not affect registry" + + def test_c_and_cpp_registries_are_independent(self): + from codepathfinder.c_decorators import ( + c_rule, + get_c_rules, + clear_c_rules, + ) + + clear_c_rules() + + @c_rule(id="C-INDEP-001") + def c_only(): + return calls("strcpy") + + @cpp_rule(id="CPP-INDEP-001") + def cpp_only(): + return calls("strcpy") + + c_rules = get_c_rules() + cpp_rules = get_cpp_rules() + assert len(c_rules) == 1 and c_rules[0].metadata.id == "C-INDEP-001" + assert len(cpp_rules) == 1 and cpp_rules[0].metadata.id == "CPP-INDEP-001" + + clear_cpp_rules() + assert len(get_c_rules()) == 1, "clear_cpp_rules must not touch C registry" + clear_c_rules() diff --git a/sast-engine/cmd/resolution_report.go b/sast-engine/cmd/resolution_report.go index 66d01fc8..89b17bfe 100644 --- a/sast-engine/cmd/resolution_report.go +++ b/sast-engine/cmd/resolution_report.go @@ -47,7 +47,7 @@ Use --csv to export unresolved calls with file, line, target, and reason.`, fmt.Println("Building call graph...") logger := output.NewLogger(output.VerbosityDefault) - cg, registry, _, err := callgraph.InitializeCallGraph(codeGraph, projectInput, logger) + cg, modReg, _, err := callgraph.InitializeCallGraph(codeGraph, projectInput, logger) if err != nil { fmt.Printf("Error building call graph: %v\n", err) return @@ -88,6 +88,10 @@ Use --csv to export unresolved calls with file, line, target, and reason.`, } } + // Reuse scan.go's helper so both commands stay aligned. It gates + // each builder on hasLanguageNodes and merges into cg in place. + buildClikeCallGraphs(cg, codeGraph, projectInput, logger) + fmt.Printf("\nResolution Report for %s\n", projectInput) fmt.Println("===============================================") @@ -128,7 +132,7 @@ Use --csv to export unresolved calls with file, line, target, and reason.`, printTopUnresolvedPatterns(stats, 20) fmt.Println() - fmt.Printf("Module registry: %d modules\n", len(registry.Modules)) + fmt.Printf("Module registry: %d modules\n", len(modReg.Modules)) // Export CSV if requested if csvOutput != "" { diff --git a/sast-engine/dsl/loader.go b/sast-engine/dsl/loader.go index f6f65467..a001e654 100644 --- a/sast-engine/dsl/loader.go +++ b/sast-engine/dsl/loader.go @@ -163,6 +163,8 @@ func hasCodeAnalysisRuleDecorators(filePath string) bool { // rule files where @rule appears at the top level. return strings.Contains(fileContent, "@rule(") || strings.Contains(fileContent, "@go_rule(") || + strings.Contains(fileContent, "@c_rule(") || + strings.Contains(fileContent, "@cpp_rule(") || strings.Contains(fileContent, "from codepathfinder import") || strings.Contains(fileContent, "import codepathfinder") } diff --git a/sast-engine/graph/initialize.go b/sast-engine/graph/initialize.go index 12274298..757850f1 100644 --- a/sast-engine/graph/initialize.go +++ b/sast-engine/graph/initialize.go @@ -170,14 +170,17 @@ func Initialize(directory string, callbacks *ProgressCallbacks) *CodeGraph { close(resultChan) }() - // Collect results + // Collect results. + // Each worker already populated edge.From.OutgoingEdges via localGraph.AddEdge, + // and node pointers are shared across local/global graphs, so we transfer the + // edge structs without re-attaching them — calling codeGraph.AddEdge here would + // double every entry in OutgoingEdges and break callers that walk it (e.g. the + // C/C++ call-graph builders). for localGraph := range resultChan { for _, node := range localGraph.Nodes { codeGraph.AddNode(node) } - for _, edge := range localGraph.Edges { - codeGraph.AddEdge(edge.From, edge.To) - } + codeGraph.Edges = append(codeGraph.Edges, localGraph.Edges...) } // Resolve transitive inheritance for Python classes. diff --git a/sast-engine/graph/initialize_test.go b/sast-engine/graph/initialize_test.go index 9d19a7ec..5f5a5def 100644 --- a/sast-engine/graph/initialize_test.go +++ b/sast-engine/graph/initialize_test.go @@ -631,3 +631,141 @@ func TestInitializeWithPythonFileReadError(t *testing.T) { t.Errorf("OnProgress should be called once for unreadable Python file, got %d", progressCalls) } } + +// TestInitialize_NoDuplicateOutgoingEdges guards against a regression where +// the result-collection step in Initialize re-attached every per-file edge to +// its source node (calling codeGraph.AddEdge on edges already inserted by the +// worker). That doubled OutgoingEdges and surfaced as 2× detections on every +// rule that walked the call graph (PR-07/08 C/C++ builders, etc.). +// +// The test parses a tiny C source containing two distinct calls inside one +// function and asserts that the function node ends up with exactly two +// OutgoingEdges, not four. +func TestInitialize_NoDuplicateOutgoingEdges(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "test_no_dup_edges") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + src := `void f(const char *s) { + strcpy(0, s); + system(s); +}` + if err := os.WriteFile(filepath.Join(tmpDir, "main.c"), []byte(src), 0644); err != nil { + t.Fatalf("write c source: %v", err) + } + + g := Initialize(tmpDir, nil) + if g == nil { + t.Fatal("Initialize returned nil") + } + + var fn *Node + for _, n := range g.Nodes { + if n != nil && n.Language == "c" && n.Type == "function_definition" && n.Name == "f" { + fn = n + break + } + } + if fn == nil { + t.Fatal("expected function_definition for f") + } + + if got := len(fn.OutgoingEdges); got != 2 { + t.Fatalf("expected 2 outgoing edges (one per distinct call), got %d", got) + } + + seen := map[string]int{} + for _, e := range fn.OutgoingEdges { + if e == nil || e.To == nil { + t.Fatalf("nil edge or destination on f") + } + key := e.To.Name + "@" + e.To.ID + seen[key]++ + } + for k, c := range seen { + if c != 1 { + t.Errorf("edge %s seen %d times, expected exactly 1", k, c) + } + } +} + +// TestInitialize_PreservesDistinctSameLineCalls guards the dedup fix's +// non-regression contract: when several calls live on the same line — either +// distinct targets (`printf("%s", strdup(s));`) or the same target nested +// (`strcpy(a, strcpy(b, c))`) — every call site must remain visible. The fix +// removes a duplicate edge that was attached twice to the same OutgoingEdges +// slice; it must not collapse genuinely distinct sites that happen to share a +// line number. +func TestInitialize_PreservesDistinctSameLineCalls(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "test_same_line_calls") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + src := `void f(const char *src) { + char a[16], b[16], c[16]; + printf("%s", strdup(src)); + strcpy(c, strcpy(a, b)); + memcpy(strcat(a, strdup(src)), b, 4); +}` + if err := os.WriteFile(filepath.Join(tmpDir, "main.c"), []byte(src), 0644); err != nil { + t.Fatalf("write c source: %v", err) + } + + g := Initialize(tmpDir, nil) + if g == nil { + t.Fatal("Initialize returned nil") + } + + var fn *Node + for _, n := range g.Nodes { + if n != nil && n.Language == "c" && n.Type == "function_definition" && n.Name == "f" { + fn = n + break + } + } + if fn == nil { + t.Fatal("expected function_definition for f") + } + + // Line 3: printf(...) + strdup(...) — 2 calls + // Line 4: strcpy(c, ...) outer + strcpy(a, b) inner — 2 calls of the same target + // Line 5: memcpy(...) + strcat(...) + strdup(...) — 3 calls + // Total: 7 syntactic call sites. + const expectedTotal = 7 + if got := len(fn.OutgoingEdges); got != expectedTotal { + t.Fatalf("expected %d outgoing edges across all same-line calls, got %d", expectedTotal, got) + } + + byTarget := map[string]int{} + byLine := map[uint32]int{} + for _, e := range fn.OutgoingEdges { + if e == nil || e.To == nil { + t.Fatalf("nil edge or destination on f") + } + byTarget[e.To.Name]++ + byLine[e.To.LineNumber]++ + } + + // Both nested strcpys on line 4 must survive (same target, same line). + if got := byTarget["strcpy"]; got != 2 { + t.Errorf("expected 2 strcpy edges on line 4 (outer + nested), got %d", got) + } + // Distinct targets on line 3 must both survive. + if got := byTarget["printf"]; got != 1 { + t.Errorf("expected 1 printf edge on line 3, got %d", got) + } + // strdup appears once on line 3 and once on line 5 — both must survive. + if got := byTarget["strdup"]; got != 2 { + t.Errorf("expected 2 strdup edges (line 3 + line 5), got %d", got) + } + // Three distinct calls on the multi-call line must all survive. + for _, n := range []string{"memcpy", "strcat"} { + if got := byTarget[n]; got != 1 { + t.Errorf("expected 1 %s edge on the 3-call line, got %d", n, got) + } + } +}