diff --git a/.github/actions/codeclone/action.yml b/.github/actions/codeclone/action.yml index c429947..2016ea6 100644 --- a/.github/actions/codeclone/action.yml +++ b/.github/actions/codeclone/action.yml @@ -10,6 +10,16 @@ branding: color: blue inputs: + python-version: + description: "Python version to use" + required: false + default: "3.13" + + package-version: + description: "CodeClone version from PyPI (empty = latest)" + required: false + default: "" + path: description: "Path to the project root" required: false @@ -20,20 +30,50 @@ inputs: required: false default: "true" + no-progress: + description: "Disable progress output" + required: false + default: "true" + + require-baseline: + description: "Fail if codeclone.baseline.json is missing" + required: false + default: "true" + runs: using: composite steps: + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ inputs.python-version }} + cache: pip + - name: Install CodeClone shell: bash run: | python -m pip install --upgrade pip - pip install codeclone + if [ -n "${{ inputs.package-version }}" ]; then + pip install "codeclone==${{ inputs.package-version }}" + else + pip install codeclone + fi + + - name: Verify baseline exists + if: ${{ inputs.require-baseline == 'true' }} + shell: bash + run: | + test -f "${{ inputs.path }}/codeclone.baseline.json" - name: Run CodeClone shell: bash run: | + extra="" + if [ "${{ inputs.no-progress }}" = "true" ]; then + extra="--no-progress" + fi if [ "${{ inputs.fail-on-new }}" = "true" ]; then - codeclone "${{ inputs.path }}" --fail-on-new + codeclone "${{ inputs.path }}" --fail-on-new $extra else - codeclone "${{ inputs.path }}" - fi \ No newline at end of file + codeclone "${{ inputs.path }}" $extra + fi diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..70c76d2 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,74 @@ +name: tests + +on: + push: + branches: [ "**" ] + pull_request: + +permissions: + contents: read + +concurrency: + group: tests-${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: [ "3.10", "3.11", "3.12", "3.13", "3.14" ] + steps: + - name: Checkout + uses: actions/checkout@v6.0.2 + + - name: Set up Python + uses: actions/setup-python@v6.2.0 + with: + python-version: ${{ matrix.python-version }} + allow-prereleases: true + + - name: Set up uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + + - name: Install dependencies + run: uv sync --all-extras --dev + + - name: Run tests + run: uv run pytest --cov=codeclone --cov-report=term-missing --cov-fail-under=98 + + - name: Verify baseline exists + if: ${{ matrix.python-version == '3.13' }} + run: test -f codeclone.baseline.json + + - name: Check for new clones vs baseline + if: ${{ matrix.python-version == '3.13' }} + run: uv run codeclone . --fail-on-new --no-progress + + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v6.0.2 + + - name: Set up Python + uses: actions/setup-python@v6.2.0 + with: + python-version: "3.13" + + - name: Set up uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + + - name: Install dependencies + run: uv sync --all-extras --dev + + - name: Ruff + run: uv run ruff check . + + - name: Mypy + run: uv run mypy . diff --git a/CHANGELOG.md b/CHANGELOG.md index 334d6fa..e86fef2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,71 +1,186 @@ # Changelog +## [1.2.1] - 2026-02-02 + +### Overview + +This release focuses on security hardening, robustness, and long-term maintainability. +No breaking API changes were introduced. + +The goal of this release is to provide users with a safe, deterministic, and CI-friendly +tool suitable for security-sensitive and large-scale environments. + +### Security & Robustness + +- **Path Traversal Protection** + Implemented strict path validation to prevent scanning outside the project root or + accessing sensitive system directories, including macOS `/private` paths. + +- **Cache Integrity Protection** + Added HMAC-SHA256 signing for cache files to prevent cache poisoning and detect tampering. + +- **Parser Safety Limits** + Introduced AST parsing time limits to mitigate risks from pathological or adversarial inputs. + +- **Resource Exhaustion Protection** + Enforced a maximum file size limit (10MB) and a maximum file count per scan to prevent + excessive memory or CPU usage. + +- **Structured Error Handling** + Introduced a dedicated exception hierarchy (`ParseError`, `CacheError`, etc.) and replaced + broad exception handling with graceful, user-friendly failure reporting. + +### Performance Improvements + +- **Optimized AST Normalization** + Replaced expensive `deepcopy` operations with in-place AST normalization, significantly + reducing CPU and memory overhead. + +- **Improved Memory Efficiency** + Added an LRU cache for file reading and optimized string concatenation during fingerprint + generation. + +- **HTML Report Memory Bounds** + HTML reports now read only the required line ranges instead of entire files, reducing peak + memory usage on large codebases. + +### Architecture & Maintainability + +- **Strict Type Safety** + Migrated all optional typing to Python 3.10+ `| None` syntax and achieved 100% `mypy` strict + compliance. + +- **Modular CFG Design** + Split CFG data structures and builder logic into separate modules (`cfg_model.py` and + `cfg.py`) for improved clarity and extensibility. + +- **Template Extraction** + Extracted HTML templates into a dedicated `templates.py` module. + +- Added a `py.typed` marker for downstream type checkers. +- Added `__slots__` to performance-critical classes to reduce per-object memory overhead. + +### CLI & User Experience + +- Added a sequential execution fallback when process pools are unavailable (for example, in + restricted or sandboxed environments). +- Emit clear, user-visible warnings when cache validation fails instead of silently ignoring + corrupted state. +- Hardened HTML report template to safely embed JavaScript template literals and aligned it + with linting requirements. + +### Testing & Quality + +- Expanded unit and integration test coverage across the CLI, CFG construction, cache + handling, scanner, and HTML reporting paths. +- Added security regression tests for dot-dot traversal and symlinked sensitive directories. +- Tightened cache mismatch assertions to verify full state reset. +- Achieved and enforced 98%+ line coverage, with coverage configuration added to + `pyproject.toml`. +- Added GitHub Actions workflow with Python 3.10–3.14 test matrix, including `ruff` and + `mypy` checks. +- CI baseline enforcement now runs on a single pinned Python version to avoid AST dump + differences across interpreter versions. + +### Python Version Consistency for Baseline Checks + +Due to inherent differences in Python’s AST between interpreter versions, baseline +generation and verification must be performed using the same Python version. + +The baseline file now stores the Python version (`major.minor`) used during generation. +When running with `--fail-on-new`, codeclone verifies that the current interpreter version +matches the baseline and exits with code 2 if they differ. + +This design ensures deterministic and reproducible clone detection results while preserving +support for Python 3.10–3.14 across the test matrix. + +### Fixed + +- **CFG Exception Handling** + Fixed incorrect control-flow linking for `try`/`except` blocks. + +- **Pattern Matching Support** + Added missing structural handling for `match`/`case` statements in the CFG. + +- **Block Detection Scaling** + Made `MIN_LINE_DISTANCE` dynamic based on block size to improve clone detection accuracy + across differently sized functions. + +--- + ## [1.2.0] - 2026-02-02 ### BREAKING CHANGES -- **CLI Arguments**: Renamed output flags for brevity and consistency: +- **CLI Arguments** + Renamed output flags for brevity and consistency: - `--json-out` → `--json` - `--text-out` → `--text` - `--html-out` → `--html` - `--cache` → `--cache-dir` -- **Baseline Behavior**: - - The default baseline file location has changed from `~/.config/codeclone/baseline.json` to - `./codeclone.baseline.json`. This encourages committing the baseline file to the repository, simplifying CI/CD - integration. - - The CLI now warns if a baseline file is expected but missing (unless `--update-baseline` is used). + +- **Baseline Behavior** + - The default baseline file location changed from + `~/.config/codeclone/baseline.json` to `./codeclone.baseline.json`. + - The CLI now warns if a baseline file is expected but missing (unless + `--update-baseline` is used). ### Added -- **Detection Engine**: - - **Deep CFG Analysis**: Added support for constructing control flow graphs for `try`/`except`/`finally`, `with`/ - `async with`, and `match`/`case` (Python 3.10+) statements. The tool now analyzes the internal structure of these - blocks instead of treating them as opaque statements. - - **Normalization**: Implemented normalization for Augmented Assignments. Code using `x += 1` is now detected as a - clone of `x = x + 1`. -- **Rich Output**: Integrated `rich` library for professional CLI output, including: - - Color-coded status messages (Success/Warning/Error). - - Progress bars and spinners for long-running tasks. +- **Detection Engine** + - Deep CFG analysis for `try`/`except`/`finally`, `with`/`async with`, and + `match`/`case` (Python 3.10+) statements. + - Normalization for augmented assignments (`x += 1` vs `x = x + 1`). + +- **Rich Output** + - Color-coded status messages. + - Progress indicators for long-running tasks. - Formatted summary tables. -- **CI/CD Improvements**: Clearer separation of arguments in `--help` output (Target, Tuning, Baseline, Reporting). + +- **CI/CD Improvements** + - Clearer argument grouping in `--help` output. ### Improved -- **Baseline**: Enhanced `Baseline` class with safer JSON loading (error handling for corrupted files), better typing ( - using `set` instead of `Set`), and cleaner API for creating instances (`from_groups` accepts path). -- **Cache**: Refactored `Cache` to handle corrupted cache files gracefully by starting fresh instead of crashing. - Updated typing to modern standards. -- **Normalization**: Added `copy.deepcopy` to AST normalization to prevent side effects on the original AST nodes during - fingerprinting. This ensures the AST remains intact for any subsequent operations. -- **Typing**: General typing improvements across `report.py` and other modules to align with Python 3.10+ practices. +- **Baseline** + - Safer JSON loading. + - Improved typing and cleaner construction API. + +- **Cache** + - Graceful recovery from corrupted cache files. + - Updated typing to modern Python standards. + +- **Typing** + - General typing improvements across reporting and normalization modules. + +--- -## [1.1.0] — 2026-01-19 +## [1.1.0] - 2026-01-19 ### Added -- Control Flow Graph (CFG v1) for structural clone detection -- Deterministic CFG-based function fingerprints -- Interactive HTML report with syntax highlighting -- Dark/light theme toggle in HTML report -- Block-level clone visualization +- Control Flow Graph (CFG v1) for structural clone detection. +- Deterministic CFG-based function fingerprints. +- Interactive HTML report with syntax highlighting. +- Block-level clone visualization. ### Changed -- Function clone detection now based on CFG instead of pure AST -- Improved robustness against refactoring and control-flow changes +- Function clone detection now based on CFG instead of pure AST. +- Improved robustness against refactoring and control-flow changes. ### Documentation -- Added `docs/cfg.md` with CFG semantics and limitations -- Added `docs/architecture.md` describing system design +- Added `docs/cfg.md` with CFG semantics and limitations. +- Added `docs/architecture.md` describing system design. --- -## [1.0.0] — 2026-01-17 +## [1.0.0] - 2026-01-17 ### Initial release -- AST-based function clone detection -- Block-level clone detection (Type-3-lite) -- Baseline workflow for CI -- JSON and text reports +- AST-based function clone detection. +- Block-level clone detection (Type-3-lite). +- Baseline workflow for CI. +- JSON and text reports. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 405ef7e..7987e01 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,83 +1,89 @@ # Contributing to CodeClone -Thank you for your interest in contributing to **CodeClone** 🙌 -CodeClone is an **AST + CFG-based code clone detector** focused on architectural duplication, not textual similarity. +Thank you for your interest in contributing to **CodeClone**. -Contributions are welcome — especially those that improve **signal quality**, **CFG semantics**, and **real-world -usability**. +CodeClone is an **AST + CFG-based code clone detector** focused on architectural duplication, +not textual similarity. + +Contributions are welcome — especially those that improve **signal quality**, **CFG semantics**, +and **real-world usability**. --- -## 🧭 Project Philosophy +## Project Philosophy -Before contributing, please understand the core principles: +Before contributing, please understand the core principles of the project: - **Low noise over high recall** - **Structural and control-flow similarity**, not semantic equivalence - **Deterministic and explainable behavior** - Optimized for **CI usage and architectural analysis** -If a change increases false positives or reduces explainability, it is unlikely to be accepted. +If a change increases false positives or reduces explainability, +it is unlikely to be accepted. --- -## 🧩 Areas Open for Contribution +## Areas Open for Contribution -We especially welcome contributions in: +We especially welcome contributions in the following areas: -- CFG construction and semantics +- Control Flow Graph (CFG) construction and semantics - AST normalization improvements - False-positive reduction - HTML report UX improvements - Performance optimizations -- Documentation and examples +- Documentation and real-world examples --- -## 🐞 Reporting Bugs +## Reporting Bugs Please use the appropriate **GitHub Issue Template**. -When reporting bugs related to clone detection: +When reporting bugs related to clone detection, include: -- provide **minimal reproducible code snippets** -- specify whether the issue is: - - AST-related - - CFG-related - - reporting/UI-related +- minimal reproducible code snippets; +- the Python version used; +- whether the issue is primarily: + - AST-related, + - CFG-related, + - reporting / UI-related. -Screenshots alone are usually insufficient. +Screenshots alone are usually insufficient for analysis. --- -## ⚠️ False Positives +## False Positives + +False positives are **expected edge cases**, not necessarily bugs. -False positives are **expected edge cases**, not failures. +When reporting a false positive: -If reporting a false positive: +- explain **why the detected code is architecturally distinct**; +- avoid arguments based solely on naming, comments, or formatting; +- focus on **control-flow, responsibilities, or structural differences**. -- explain **why the detected code is architecturally distinct** -- avoid arguments based solely on naming or comments -- focus on **control-flow or responsibility differences** +Well-argued false-positive reports are valuable and appreciated. --- -## 🧠 CFG Semantics Discussions +## CFG Semantics Discussions -CFG behavior is intentionally conservative in v1.x. +CFG behavior in CodeClone is intentionally conservative in the 1.x series. -If proposing CFG changes: +If proposing changes to CFG semantics, please include: -- describe current behavior -- describe desired behavior -- explain impact on clone detection quality -- include code examples +- a description of the current behavior; +- the proposed new behavior; +- the expected impact on clone detection quality; +- concrete code examples. -These discussions often require design-level decisions and may be staged across versions. +Such changes often require design-level discussion and may be staged across versions. --- -## 🧪 Development Setup +## Development Setup ```bash git clone https://github.com/orenlab/codeclone.git @@ -96,32 +102,35 @@ pytest Static checks: ```bash -mypy codeclone +mypy +ruff check . +ruff format . ``` --- -## 🧹 Code Style +## Code Style - Python 3.10+ - Type annotations are required -- `mypy --strict` must pass -- Prefer explicit logic over cleverness +- `mypy` must pass +- `ruff check` must pass +- Code must be formatted with `ruff format` +- Prefer explicit, readable logic over clever or implicit constructs --- -## 📦 Versioning +## Versioning CodeClone follows **semantic versioning**: -- `MAJOR`: fundamental detection model changes -- `MINOR`: new detection capabilities (e.g. CFG improvements) -- `PATCH`: bug fixes and UI improvements +- **MAJOR**: fundamental detection model changes +- **MINOR**: new detection capabilities (for example, CFG improvements) +- **PATCH**: bug fixes, performance improvements, and UI/UX polish --- -## 📜 License - -By contributing, you agree that your contributions will be licensed under the **MIT License**. +## License -Thank you for helping improve CodeClone ❤️ +By contributing to CodeClone, you agree that your contributions will be licensed +under the **MIT License**. diff --git a/README.md b/README.md index 62b4e75..21b7cc9 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,11 @@ [![PyPI](https://img.shields.io/pypi/v/codeclone.svg)](https://pypi.org/project/codeclone/) [![Downloads](https://img.shields.io/pypi/dm/codeclone.svg)](https://pypi.org/project/codeclone/) +[![tests](https://github.com/orenlab/codeclone/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/orenlab/codeclone/actions/workflows/tests.yml) [![Python](https://img.shields.io/pypi/pyversions/codeclone.svg)](https://pypi.org/project/codeclone/) [![License](https://img.shields.io/pypi/l/codeclone.svg)](LICENSE) -**CodeClone** is a Python code clone detector based on **normalized AST and control-flow graphs (CFG)**. +**CodeClone** is a Python code clone detector based on **normalized Python AST and Control Flow Graphs (CFG)**. It helps teams discover architectural duplication and prevent new copy-paste from entering the codebase via CI. CodeClone is designed to help teams: @@ -14,15 +15,16 @@ CodeClone is designed to help teams: - identify architectural hotspots, - prevent *new* duplication via CI and pre-commit hooks. -Unlike token- or text-based tools, CodeClone operates on **normalized Python AST and CFG**, making it robust against renaming, -formatting, and minor refactoring. +Unlike token- or text-based tools, CodeClone operates on **normalized Python AST and CFG**, making it robust against +renaming, formatting, and minor refactoring. --- ## Why CodeClone? Most existing tools detect *textual* duplication. -CodeClone detects **structural and block-level duplication**, which usually signals missing abstractions or architectural drift. +CodeClone detects **structural and block-level duplication**, which usually signals missing abstractions or +architectural drift. Typical use cases: @@ -40,11 +42,11 @@ Typical use cases: - Detects functions and methods with identical **control-flow structure**. - Based on **Control Flow Graph (CFG)** fingerprinting. - Robust to: - - variable renaming, - - constant changes, - - attribute renaming, - - formatting differences, - - docstrings and type annotations. + - variable renaming, + - constant changes, + - attribute renaming, + - formatting differences, + - docstrings and type annotations. - Ideal for spotting architectural duplication across layers. ### Block-level clone detection (Type-3-lite) @@ -52,29 +54,29 @@ Typical use cases: - Detects repeated **statement blocks** inside larger functions. - Uses sliding windows over CFG-normalized statement sequences. - Targets: - - validation blocks, - - guard clauses, - - repeated orchestration logic. + - validation blocks, + - guard clauses, + - repeated orchestration logic. - Carefully filtered to reduce noise: - - no overlapping windows, - - no clones inside the same function, - - no `__init__` noise, - - size and statement-count thresholds. + - no overlapping windows, + - no clones inside the same function, + - no `__init__` noise, + - size and statement-count thresholds. ### Control-Flow Awareness (CFG v1) - Each function is converted into a **Control Flow Graph**. - CFG nodes contain normalized AST statements. - CFG edges represent structural control flow: - - `if` / `else` - - `for` / `async for` / `while` - - `try` / `except` / `finally` - - `with` / `async with` - - `match` / `case` (Python 3.10+) + - `if` / `else` + - `for` / `async for` / `while` + - `try` / `except` / `finally` + - `with` / `async with` + - `match` / `case` (Python 3.10+) - Current CFG semantics (v1): - - `break` and `continue` are treated as statements (no jump targets), - - after-blocks are explicit and always present, - - focus is on **structural similarity**, not precise runtime semantics. + - `break` and `continue` are treated as statements (no jump targets), + - after-blocks are explicit and always present, + - focus is on **structural similarity**, not precise runtime semantics. This design keeps clone detection **stable, deterministic, and low-noise**. @@ -83,6 +85,7 @@ This design keeps clone detection **stable, deterministic, and low-noise**. - AST + CFG normalization instead of token matching. - Conservative defaults tuned for real-world Python projects. - Explicit thresholds for size and statement count. +- No probabilistic scoring or heuristic similarity thresholds. - Focus on *architectural duplication*, not micro-similarities. ### CI-friendly baseline mode @@ -149,14 +152,26 @@ Commit the generated baseline file to the repository. ### 2. Use in CI ```bash -codeclone . --fail-on-new +codeclone . --fail-on-new --no-progress ``` Behavior: -- ✅ existing clones are allowed, -- ❌ build fails if *new* clones appear, -- ✅ refactoring that removes duplication is always allowed. +- existing clones are allowed, +- the build fails if *new* clones appear, +- refactoring that removes duplication is always allowed. + +`--fail-on-new` exits with a non-zero code when new clones are detected. + +### Python Version Consistency for Baseline Checks + +Due to inherent differences in Python’s AST between interpreter versions, baseline +generation and verification must be performed using the same Python version. + +This ensures deterministic and reproducible clone detection results. + +CI checks therefore pin baseline verification to a single Python version, while the +test matrix continues to validate compatibility across Python 3.10–3.14. --- @@ -164,14 +179,14 @@ Behavior: ```yaml repos: -- repo: local + - repo: local hooks: - - id: codeclone + - id: codeclone name: CodeClone entry: codeclone language: python - args: [".", "--fail-on-new"] - types: [python] + args: [ ".", "--fail-on-new" ] + types: [ python ] ``` --- @@ -204,6 +219,7 @@ repos: 6. Apply conservative filters to suppress noise. See the architectural overview: + - [docs/architecture.md](docs/architecture.md) --- @@ -216,6 +232,7 @@ to improve structural clone detection robustness. The CFG is a **structural abstraction**, not a runtime execution model. See full design and semantics: + - [docs/cfg.md](docs/cfg.md) --- diff --git a/SECURITY.md b/SECURITY.md index 74778e7..533843a 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -2,70 +2,79 @@ ## Supported Versions -CodeClone is a static analysis tool and does not process untrusted input at runtime in production systems. +CodeClone is a static analysis tool and does not execute analyzed code at runtime. +Nevertheless, security and robustness are treated as first‑class concerns. -That said, we take security seriously. +The following versions currently receive security updates: | Version | Supported | |---------|-----------| -| 1.1.x | ✅ Yes | -| 1.0.x | ❌ No | +| 1.2.x | Yes | +| 1.1.x | No | +| 1.0.x | No | --- -## 🛡️ Security Considerations +## Security Considerations -CodeClone: +CodeClone operates purely on static input and follows a conservative execution model: -- Parses Python source code using `ast` +- Parses Python source code using the standard `ast` module - Does **not** execute analyzed code -- Generates static HTML reports +- Performs analysis in-process with explicit resource limits +- Generates static HTML reports without external dependencies Potential risk areas include: -- malformed source files -- extremely large inputs (resource exhaustion) -- HTML report generation +- malformed or adversarial source files +- extremely large inputs leading to resource exhaustion +- HTML report generation and embedding + +These areas are explicitly tested and hardened, but are still the primary focus of +ongoing security review. --- -## 🚨 Reporting a Vulnerability +## Reporting a Vulnerability -If you believe you have found a security vulnerability, **do not open a public issue**. +If you believe you have discovered a security vulnerability, **do not open a public issue**. -Instead, please report it privately: +Please report it privately via email: -📧 **Email:** `pytelemonbot@mail.ru` -Subject: `Security issue in CodeClone` +**Email:** `pytelemonbot@mail.ru` +**Subject:** `Security issue in CodeClone` -Please include: +When reporting a vulnerability, please include: -- CodeClone version -- Description of the issue -- Steps to reproduce -- Potential impact +- the affected CodeClone version +- a clear description of the issue +- minimal steps to reproduce +- an assessment of potential impact, if known You will receive an acknowledgment within **72 hours**. --- -## 🧪 What Is NOT Considered a Security Issue +## What Is Not Considered a Security Issue + +The following issues are **not** considered security vulnerabilities: -The following are **not** security vulnerabilities: +- false positives or false negatives in clone detection +- performance limitations on very large codebases +- UI or HTML layout issues +- missing CFG edge cases or semantic limitations -- False positives or negatives in clone detection -- Performance issues on very large codebases -- UI / HTML layout problems -- Missing CFG edge cases (tracked as feature issues) +Such issues should be reported through the regular issue tracker as bugs or feature +requests. --- -## 🕒 Disclosure Policy +## Disclosure Policy -- Valid vulnerabilities will be fixed promptly -- A patched release will be published -- Credit will be given unless anonymity is requested +- Confirmed vulnerabilities will be addressed promptly +- A patched release will be published as soon as feasible +- Credit will be given to the reporter unless anonymity is requested --- -Thank you for helping keep CodeClone safe and reliable. \ No newline at end of file +Thank you for helping keep CodeClone secure, reliable, and trustworthy. diff --git a/codeclone.baseline.json b/codeclone.baseline.json new file mode 100644 index 0000000..ce9a169 --- /dev/null +++ b/codeclone.baseline.json @@ -0,0 +1,8 @@ +{ + "functions": [ + "23353998d062bbdf37c345cbe5256b3f5686d956|0-19", + "7d573fa56fb11050f1642f18ca4bb3225e11e194|0-19" + ], + "blocks": [], + "python_version": "3.13" +} \ No newline at end of file diff --git a/codeclone/__init__.py b/codeclone/__init__.py index 901a59b..4cd6ae8 100644 --- a/codeclone/__init__.py +++ b/codeclone/__init__.py @@ -6,7 +6,7 @@ Licensed under the MIT License. """ -from importlib.metadata import version, PackageNotFoundError +from importlib.metadata import PackageNotFoundError, version try: __version__ = version("codeclone") diff --git a/codeclone/baseline.py b/codeclone/baseline.py index ecdadc4..74f2030 100644 --- a/codeclone/baseline.py +++ b/codeclone/baseline.py @@ -9,14 +9,19 @@ from __future__ import annotations import json +from collections.abc import Mapping from pathlib import Path +from typing import Any class Baseline: + __slots__ = ("blocks", "functions", "path", "python_version") + def __init__(self, path: str | Path): self.path = Path(path) self.functions: set[str] = set() self.blocks: set[str] = set() + self.python_version: str | None = None def load(self) -> None: if not self.path.exists(): @@ -26,6 +31,10 @@ def load(self) -> None: data = json.loads(self.path.read_text("utf-8")) self.functions = set(data.get("functions", [])) self.blocks = set(data.get("blocks", [])) + python_version = data.get("python_version") + self.python_version = ( + python_version if isinstance(python_version, str) else None + ) except json.JSONDecodeError as e: raise ValueError(f"Corrupted baseline file at {self.path}: {e}") from e @@ -33,10 +42,7 @@ def save(self) -> None: self.path.parent.mkdir(parents=True, exist_ok=True) self.path.write_text( json.dumps( - { - "functions": sorted(self.functions), - "blocks": sorted(self.blocks), - }, + _baseline_payload(self.functions, self.blocks, self.python_version), indent=2, ensure_ascii=False, ), @@ -45,14 +51,34 @@ def save(self) -> None: @staticmethod def from_groups( - func_groups: dict, block_groups: dict, path: str | Path = "" - ) -> "Baseline": + func_groups: Mapping[str, object], + block_groups: Mapping[str, object], + path: str | Path = "", + python_version: str | None = None, + ) -> Baseline: bl = Baseline(path) bl.functions = set(func_groups.keys()) bl.blocks = set(block_groups.keys()) + bl.python_version = python_version return bl - def diff(self, func_groups: dict, block_groups: dict) -> tuple[set, set]: + def diff( + self, func_groups: Mapping[str, object], block_groups: Mapping[str, object] + ) -> tuple[set[str], set[str]]: new_funcs = set(func_groups.keys()) - self.functions new_blocks = set(block_groups.keys()) - self.blocks return new_funcs, new_blocks + + +def _baseline_payload( + functions: set[str], + blocks: set[str], + python_version: str | None, +) -> dict[str, Any]: + payload: dict[str, Any] = { + "functions": sorted(functions), + "blocks": sorted(blocks), + } + if python_version: + payload["python_version"] = python_version + return payload diff --git a/codeclone/blockhash.py b/codeclone/blockhash.py index c304acf..bd213ee 100644 --- a/codeclone/blockhash.py +++ b/codeclone/blockhash.py @@ -11,7 +11,7 @@ import ast import hashlib -from .normalize import NormalizationConfig, AstNormalizer +from .normalize import AstNormalizer, NormalizationConfig def stmt_hash(stmt: ast.stmt, cfg: NormalizationConfig) -> str: diff --git a/codeclone/blocks.py b/codeclone/blocks.py index cfdc6db..551d243 100644 --- a/codeclone/blocks.py +++ b/codeclone/blocks.py @@ -15,7 +15,7 @@ from .normalize import NormalizationConfig -@dataclass(frozen=True) +@dataclass(frozen=True, slots=True) class BlockUnit: block_hash: str filepath: str @@ -42,7 +42,8 @@ def extract_blocks( blocks: list[BlockUnit] = [] last_start: int | None = None - MIN_LINE_DISTANCE = 5 # suppress overlapping windows + # Allow some overlap (50%), but at least 3 lines apart + min_line_distance = max(block_size // 2, 3) for i in range(len(stmt_hashes) - block_size + 1): start = getattr(body[i], "lineno", None) @@ -50,7 +51,7 @@ def extract_blocks( if not start or not end: continue - if last_start is not None and start - last_start < MIN_LINE_DISTANCE: + if last_start is not None and start - last_start < min_line_distance: continue bh = "|".join(stmt_hashes[i : i + block_size]) diff --git a/codeclone/cache.py b/codeclone/cache.py index 410f464..f652d17 100644 --- a/codeclone/cache.py +++ b/codeclone/cache.py @@ -8,47 +8,178 @@ from __future__ import annotations +import hashlib +import hmac import json import os +import secrets +from collections.abc import Mapping from dataclasses import asdict from pathlib import Path -from typing import Any, Optional +from typing import TYPE_CHECKING, Any, TypedDict, cast + +if TYPE_CHECKING: + from .blocks import BlockUnit + from .extractor import Unit + +from .errors import CacheError + + +class FileStat(TypedDict): + mtime_ns: int + size: int + + +class UnitDict(TypedDict): + qualname: str + filepath: str + start_line: int + end_line: int + loc: int + stmt_count: int + fingerprint: str + loc_bucket: str + + +class BlockDict(TypedDict): + block_hash: str + filepath: str + qualname: str + start_line: int + end_line: int + size: int + + +class CacheEntry(TypedDict): + stat: FileStat + units: list[UnitDict] + blocks: list[BlockDict] + + +class CacheData(TypedDict): + version: str + files: dict[str, CacheEntry] class Cache: + __slots__ = ("data", "load_warning", "path", "secret") + CACHE_VERSION = "1.0" + def __init__(self, path: str | Path): self.path = Path(path) - self.data: dict[str, Any] = {"files": {}} + self.data: CacheData = {"version": self.CACHE_VERSION, "files": {}} + self.secret = self._load_secret() + self.load_warning: str | None = None - def load(self) -> None: - if self.path.exists(): + def _load_secret(self) -> bytes: + """Load or create cache signing secret.""" + # Store secret in the same directory as the cache file, named .cache_secret + # If cache is at ~/.cache/codeclone/cache.json, secret is + # ~/.cache/codeclone/.cache_secret + secret_path = self.path.parent / ".cache_secret" + if secret_path.exists(): + return secret_path.read_bytes() + else: + secret = secrets.token_bytes(32) try: - self.data = json.loads(self.path.read_text("utf-8")) - except json.JSONDecodeError: - # If cache is corrupted, start fresh - self.data = {"files": {}} + self.path.parent.mkdir(parents=True, exist_ok=True) + secret_path.write_bytes(secret) + # Set restrictive permissions on secret file (Unix only) + if os.name == "posix": + secret_path.chmod(0o600) + except OSError: + pass + return secret + + def _sign_data(self, data: Mapping[str, Any]) -> str: + """Create HMAC signature of cache data.""" + # Sort keys for deterministic JSON serialization + data_str = json.dumps(data, sort_keys=True) + return hmac.new(self.secret, data_str.encode(), hashlib.sha256).hexdigest() + + def load(self) -> None: + if not self.path.exists(): + return + + try: + raw = json.loads(self.path.read_text("utf-8")) + stored_sig = raw.get("_signature") + + # Extract data without signature for verification + data = {k: v for k, v in raw.items() if k != "_signature"} + + # Verify signature + expected_sig = self._sign_data(data) + if stored_sig != expected_sig: + self.load_warning = "Cache signature mismatch; ignoring cache." + self.data = {"version": self.CACHE_VERSION, "files": {}} + return + + if data.get("version") != self.CACHE_VERSION: + self.load_warning = ( + "Cache version mismatch " + f"(found {data.get('version')}); ignoring cache." + ) + self.data = {"version": self.CACHE_VERSION, "files": {}} + return + + # Basic structure check + if not isinstance(data.get("files"), dict): + self.load_warning = "Cache format invalid; ignoring cache." + self.data = {"version": self.CACHE_VERSION, "files": {}} + return + + self.data = cast(CacheData, data) + self.load_warning = None + + except (json.JSONDecodeError, ValueError): + self.load_warning = "Cache corrupted; ignoring cache." + self.data = {"version": self.CACHE_VERSION, "files": {}} def save(self) -> None: - self.path.parent.mkdir(parents=True, exist_ok=True) - self.path.write_text( - json.dumps(self.data, ensure_ascii=False, indent=2), - "utf-8", - ) + try: + self.path.parent.mkdir(parents=True, exist_ok=True) + + # Add signature + data_with_sig = {**self.data, "_signature": self._sign_data(self.data)} + + self.path.write_text( + json.dumps(data_with_sig, ensure_ascii=False, indent=2), + "utf-8", + ) + except OSError as e: + raise CacheError(f"Failed to save cache: {e}") from e + + def get_file_entry(self, filepath: str) -> CacheEntry | None: + entry = self.data["files"].get(filepath) + + if entry is None: + return None + + if not isinstance(entry, dict): + return None + + required = {"stat", "units", "blocks"} + if not required.issubset(entry.keys()): + return None - def get_file_entry(self, filepath: str) -> Optional[dict[str, Any]]: - return self.data.get("files", {}).get(filepath) + return entry def put_file_entry( - self, filepath: str, stat_sig: dict[str, Any], units: list, blocks: list + self, + filepath: str, + stat_sig: FileStat, + units: list[Unit], + blocks: list[BlockUnit], ) -> None: - self.data.setdefault("files", {})[filepath] = { + self.data["files"][filepath] = { "stat": stat_sig, - "units": [asdict(u) for u in units], - "blocks": [asdict(b) for b in blocks], + "units": cast(list[UnitDict], cast(object, [asdict(u) for u in units])), + "blocks": cast(list[BlockDict], cast(object, [asdict(b) for b in blocks])), } -def file_stat_signature(path: str) -> dict: +def file_stat_signature(path: str) -> FileStat: st = os.stat(path) return { "mtime_ns": st.st_mtime_ns, diff --git a/codeclone/cfg.py b/codeclone/cfg.py index 167f5f0..9235a7f 100644 --- a/codeclone/cfg.py +++ b/codeclone/cfg.py @@ -9,48 +9,21 @@ from __future__ import annotations import ast -from dataclasses import dataclass, field -from typing import Iterable +from collections.abc import Iterable +from typing import Protocol, cast +from .cfg_model import CFG, Block -# ========================= -# Core CFG structures -# ========================= - - -@dataclass(eq=False) -class Block: - id: int - statements: list[ast.stmt] = field(default_factory=list) - successors: set["Block"] = field(default_factory=set) - is_terminated: bool = False - - def add_successor(self, block: Block) -> None: - self.successors.add(block) - - def __hash__(self) -> int: - return hash(self.id) +__all__ = ["CFG", "CFGBuilder"] - def __eq__(self, other: object) -> bool: - return isinstance(other, Block) and self.id == other.id +TryStar = getattr(ast, "TryStar", ast.Try) -@dataclass -class CFG: - qualname: str - blocks: list[Block] = field(default_factory=list) - - entry: Block = field(init=False) - exit: Block = field(init=False) - - def __post_init__(self) -> None: - self.entry = self.create_block() - self.exit = self.create_block() - - def create_block(self) -> Block: - block = Block(id=len(self.blocks)) - self.blocks.append(block) - return block +class _TryLike(Protocol): + body: list[ast.stmt] + handlers: list[ast.ExceptHandler] + orelse: list[ast.stmt] + finalbody: list[ast.stmt] # ========================= @@ -59,6 +32,8 @@ def create_block(self) -> Block: class CFGBuilder: + __slots__ = ("cfg", "current") + def __init__(self) -> None: self.cfg: CFG self.current: Block @@ -110,8 +85,10 @@ def _visit(self, stmt: ast.stmt) -> None: case ast.AsyncFor(): self._visit_for(stmt) # Structure is identical to For - case ast.Try() | ast.TryStar(): - self._visit_try(stmt) + case ast.Try(): + self._visit_try(cast(_TryLike, stmt)) + case _ if TryStar is not None and isinstance(stmt, TryStar): + self._visit_try(cast(_TryLike, stmt)) case ast.With() | ast.AsyncWith(): self._visit_with(stmt) @@ -185,7 +162,8 @@ def _visit_for(self, stmt: ast.For | ast.AsyncFor) -> None: self.current = after_block def _visit_with(self, stmt: ast.With | ast.AsyncWith) -> None: - # Treat WITH as linear flow (enter -> body -> exit), but preserve block structure + # Treat WITH as linear flow (enter -> body -> exit), but preserve + # block structure # We record the context manager expression in the current block # Then we enter a new block for the body (to separate it structurally) # Then we enter a new block for 'after' (exit) @@ -210,126 +188,73 @@ def _visit_with(self, stmt: ast.With | ast.AsyncWith) -> None: self.current = after_block - def _visit_try(self, stmt: ast.Try | ast.TryStar) -> None: - # Simplified Try CFG: - # Try Body -> [Handlers...] -> Finally/After - # Try Body -> Else -> Finally/After - - try_block = self.cfg.create_block() - self.current.add_successor(try_block) - - # We don't know WHERE in the try block exception happens, so we assume - # any point in try block *could* jump to handlers. - # But for structural hashing, we just process the body. - # Ideally, we should link the try_block (or its end) to handlers? - # A simple approximation: - # 1. Process body. - # 2. Link entry (or end of body) to handlers? - # Let's do: Entry -> BodyBlock. - # Entry -> HandlerBlocks (to represent potential jump). - - # Actually, let's keep it linear but branched. - # Current -> TryBody - # Current -> Handlers (Abstractly representing the jump) + def _visit_try(self, stmt: _TryLike) -> None: + try_entry = self.cfg.create_block() + self.current.add_successor(try_entry) + self.current = try_entry handlers_blocks = [self.cfg.create_block() for _ in stmt.handlers] else_block = self.cfg.create_block() if stmt.orelse else None - final_block = self.cfg.create_block() # This is finally or after + final_block = self.cfg.create_block() - # Link current to TryBody - self.current = try_block - self._visit_statements(stmt.body) + # Process each statement in try body + # Link each to exception handlers + for stmt_node in stmt.body: + if self.current.is_terminated: + break + + # Current statement could raise exception + for h_block in handlers_blocks: + self.current.add_successor(h_block) + + self._visit(stmt_node) - # If try body finishes successfully: + # Normal exit from try if not self.current.is_terminated: if else_block: self.current.add_successor(else_block) else: self.current.add_successor(final_block) - # Handle Else - if else_block: - self.current = else_block - self._visit_statements(stmt.orelse) - if not self.current.is_terminated: - self.current.add_successor(final_block) - - # Handle Handlers - # We assume control flow *could* jump from start of Try to any handler - # (Technically from inside try, but we model structural containment) - # To make fingerprints stable, we just need to ensure handlers are visited - # and linked. - - # We link the *original* predecessor (before try) or the try_block start to handlers? - # Let's link the `try_block` (as a container concept) to handlers. - # But `try_block` was mutated by `_visit_statements`. - # Let's use the `try_block` (start of try) to link to handlers. - for h_block in handlers_blocks: - try_block.add_successor(h_block) - - for handler, h_block in zip(stmt.handlers, handlers_blocks): + # Process handlers + for handler, h_block in zip(stmt.handlers, handlers_blocks, strict=True): self.current = h_block - # Record exception type if handler.type: self.current.statements.append(ast.Expr(value=handler.type)) + self._visit_statements(handler.body) if not self.current.is_terminated: self.current.add_successor(final_block) - # Finally logic: - # If there is a finally block, `final_block` IS the finally block. - # We visit it. Then we create a new `after_finally` block? - # Or `final_block` is the start of finally. + # Process else + if else_block: + self.current = else_block + self._visit_statements(stmt.orelse) + if not self.current.is_terminated: + self.current.add_successor(final_block) + # Process finally + self.current = final_block if stmt.finalbody: - self.current = final_block self._visit_statements(stmt.finalbody) - # And then continue to next code? - # Yes, finally flows to next statement. - # Unless terminated. - - # If no finally, `final_block` is just the merge point (after). - self.current = final_block def _visit_match(self, stmt: ast.Match) -> None: - # Match subject -> Cases -> After - self.current.statements.append(ast.Expr(value=stmt.subject)) - after_block = self.cfg.create_block() - - for case_ in stmt.cases: - case_block = self.cfg.create_block() - self.current.add_successor(case_block) - - # Save current context to restore for next case branching? - # No, 'current' is the match subject block. It branches to ALL cases. - - # Visit Case - # We must set self.current to case_block for visiting body - # But we lose reference to 'match subject block' to link next case! - # So we need a variable `subject_block`. - pass - - # Re-implementing loop correctly subject_block = self.current + after_block = self.cfg.create_block() for case_ in stmt.cases: case_block = self.cfg.create_block() subject_block.add_successor(case_block) self.current = case_block - # We could record the pattern here? - # patterns are complex AST nodes. For now, let's skip pattern structure hash - # and just hash the body. Or dump pattern as statement? - # Pattern is not a statement. - # Let's ignore pattern details for V1, or try to normalize it. - # If we ignore pattern, then `case []:` and `case {}:` look same. - # Ideally: `self.current.statements.append(case_.pattern)` but pattern is not stmt. - # We can wrap in Expr? `ast.Expr(value=case_.pattern)`? - # Pattern is NOT an Expr subclass in 3.10. It's `ast.pattern`. - # So we cannot append it to `statements` list which expects `ast.stmt`. - # We will ignore pattern structure for now (it's structural flow we care about). + + # Record pattern structure + pattern_repr = ast.dump(case_.pattern, annotate_fields=False) + self.current.statements.append( + ast.Expr(value=ast.Constant(value=f"PATTERN:{pattern_repr}")) + ) self._visit_statements(case_.body) if not self.current.is_terminated: diff --git a/codeclone/cfg_model.py b/codeclone/cfg_model.py new file mode 100644 index 0000000..a90d576 --- /dev/null +++ b/codeclone/cfg_model.py @@ -0,0 +1,47 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + +from __future__ import annotations + +import ast +from dataclasses import dataclass, field + + +@dataclass(eq=False, slots=True) +class Block: + id: int + statements: list[ast.stmt] = field(default_factory=list) + successors: set[Block] = field(default_factory=set) + is_terminated: bool = False + + def add_successor(self, block: Block) -> None: + self.successors.add(block) + + def __hash__(self) -> int: + return hash(self.id) + + def __eq__(self, other: object) -> bool: + return isinstance(other, Block) and self.id == other.id + + +@dataclass(slots=True) +class CFG: + qualname: str + blocks: list[Block] = field(default_factory=list) + + entry: Block = field(init=False) + exit: Block = field(init=False) + + def __post_init__(self) -> None: + self.entry = self.create_block() + self.exit = self.create_block() + + def create_block(self) -> Block: + block = Block(id=len(self.blocks)) + self.blocks.append(block) + return block diff --git a/codeclone/cli.py b/codeclone/cli.py index 250a278..677dbf8 100644 --- a/codeclone/cli.py +++ b/codeclone/cli.py @@ -1,36 +1,32 @@ -""" -CodeClone — AST and CFG-based code clone detector for Python -focused on architectural duplication. - -Copyright (c) 2026 Den Rozhnovskiy -Licensed under the MIT License. -""" - from __future__ import annotations import argparse +import os import sys from concurrent.futures import ProcessPoolExecutor, as_completed +from dataclasses import asdict, dataclass from pathlib import Path +from typing import Any, cast from rich.console import Console from rich.panel import Panel from rich.progress import ( + BarColumn, Progress, SpinnerColumn, TextColumn, - BarColumn, TimeElapsedColumn, ) from rich.table import Table from rich.theme import Theme from .baseline import Baseline -from .cache import Cache, file_stat_signature +from .cache import Cache, CacheEntry, FileStat, file_stat_signature +from .errors import CacheError from .extractor import extract_units_from_source from .html_report import build_html_report from .normalize import NormalizationConfig -from .report import build_groups, build_block_groups, to_json, to_text +from .report import build_block_groups, build_groups, to_json_report, to_text from .scanner import iter_py_files, module_name_from_path # Custom theme for Rich @@ -45,6 +41,21 @@ ) console = Console(theme=custom_theme, width=200) +MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB +BATCH_SIZE = 100 + + +@dataclass(slots=True) +class ProcessingResult: + """Result of processing a single file.""" + + filepath: str + success: bool + error: str | None = None + units: list[Any] | None = None + blocks: list[Any] | None = None + stat: FileStat | None = None + def expand_path(p: str) -> Path: return Path(p).expanduser().resolve() @@ -56,31 +67,76 @@ def process_file( cfg: NormalizationConfig, min_loc: int, min_stmt: int, -) -> tuple[str, dict, list, list] | None: +) -> ProcessingResult: + """ + Process a single Python file with comprehensive error handling. + + Args: + filepath: Absolute path to the file + root: Root directory of the scan + cfg: Normalization configuration + min_loc: Minimum lines of code to consider a function + min_stmt: Minimum statements to consider a function + + Returns: + ProcessingResult object indicating success/failure and containing + extracted units/blocks if successful. + """ + try: - source = Path(filepath).read_text("utf-8") - except UnicodeDecodeError: - return None - - stat = file_stat_signature(filepath) - module_name = module_name_from_path(root, filepath) - - units, blocks = extract_units_from_source( - source=source, - filepath=filepath, - module_name=module_name, - cfg=cfg, - min_loc=min_loc, - min_stmt=min_stmt, - ) + # Check file size + try: + st_size = os.path.getsize(filepath) + if st_size > MAX_FILE_SIZE: + return ProcessingResult( + filepath=filepath, + success=False, + error=f"File too large: {st_size} bytes (max {MAX_FILE_SIZE})", + ) + except OSError as e: + return ProcessingResult( + filepath=filepath, success=False, error=f"Cannot stat file: {e}" + ) - return filepath, stat, units, blocks + try: + source = Path(filepath).read_text("utf-8") + except UnicodeDecodeError as e: + return ProcessingResult( + filepath=filepath, success=False, error=f"Encoding error: {e}" + ) + + stat = file_stat_signature(filepath) + module_name = module_name_from_path(root, filepath) + + units, blocks = extract_units_from_source( + source=source, + filepath=filepath, + module_name=module_name, + cfg=cfg, + min_loc=min_loc, + min_stmt=min_stmt, + ) + + return ProcessingResult( + filepath=filepath, + success=True, + units=units, + blocks=blocks, + stat=stat, + ) + + except Exception as e: + return ProcessingResult( + filepath=filepath, + success=False, + error=f"Unexpected error: {type(e).__name__}: {e}", + ) -def print_banner(): +def print_banner() -> None: console.print( Panel.fit( - "[bold white]CodeClone[/bold white] [dim]v1.2.0[/dim]\n" + "[bold white]CodeClone[/bold white] [dim]v1.2.1[/dim]\n" "[italic]Architectural duplication detector[/italic]", border_style="blue", padding=(0, 2), @@ -185,9 +241,13 @@ def main() -> None: print_banner() - root_path = Path(args.root).resolve() - if not root_path.exists(): - console.print(f"[error]Root path does not exist: {root_path}[/error]") + try: + root_path = Path(args.root).resolve() + if not root_path.exists(): + console.print(f"[error]Root path does not exist: {root_path}[/error]") + sys.exit(1) + except Exception as e: + console.print(f"[error]Invalid root path: {e}[/error]") sys.exit(1) console.print(f"[info]Scanning root:[/info] {root_path}") @@ -197,101 +257,213 @@ def main() -> None: cache_path = Path(args.cache_dir).expanduser() cache = Cache(cache_path) cache.load() + if cache.load_warning: + console.print(f"[warning]{cache.load_warning}[/warning]") - all_units: list[dict] = [] - all_blocks: list[dict] = [] + all_units: list[dict[str, Any]] = [] + all_blocks: list[dict[str, Any]] = [] changed_files_count = 0 files_to_process: list[str] = [] + def _get_cached_entry( + fp: str, + ) -> tuple[FileStat | None, CacheEntry | None, str | None]: + try: + stat = file_stat_signature(fp) + except OSError as e: + return None, None, f"[warning]Skipping file {fp}: {e}[/warning]" + cached = cache.get_file_entry(fp) + return stat, cached, None + + def _safe_process_file(fp: str) -> ProcessingResult | None: + try: + return process_file( + fp, + str(root_path), + cfg, + args.min_loc, + args.min_stmt, + ) + except Exception as e: + console.print(f"[warning]Worker failed: {e}[/warning]") + return None + + def _safe_future_result(future: Any) -> tuple[ProcessingResult | None, str | None]: + try: + return future.result(), None + except Exception as e: + return None, str(e) + # Discovery phase with console.status("[bold green]Discovering Python files...", spinner="dots"): - for fp in iter_py_files(str(root_path)): - stat = file_stat_signature(fp) - cached = cache.get_file_entry(fp) - if cached and cached.get("stat") == stat: - all_units.extend(cached.get("units", [])) - all_blocks.extend(cached.get("blocks", [])) - else: - files_to_process.append(fp) + try: + for fp in iter_py_files(str(root_path)): + stat, cached, warn = _get_cached_entry(fp) + if warn: + console.print(warn) + continue + if cached and cached.get("stat") == stat: + all_units.extend( + cast( + list[dict[str, Any]], + cast(object, cached.get("units", [])), + ) + ) + all_blocks.extend( + cast( + list[dict[str, Any]], + cast(object, cached.get("blocks", [])), + ) + ) + else: + files_to_process.append(fp) + except Exception as e: + console.print(f"[error]Scan failed: {e}[/error]") + sys.exit(1) total_files = len(files_to_process) + failed_files = [] # Processing phase if total_files > 0: - if args.no_progress: - console.print(f"[info]Processing {total_files} changed files...[/info]") + + def handle_result(result: ProcessingResult) -> None: + nonlocal changed_files_count + if result.success and result.stat: + cache.put_file_entry( + result.filepath, + result.stat, + result.units or [], + result.blocks or [], + ) + changed_files_count += 1 + if result.units: + all_units.extend([asdict(u) for u in result.units]) + if result.blocks: + all_blocks.extend([asdict(b) for b in result.blocks]) + else: + failed_files.append(f"{result.filepath}: {result.error}") + + def process_sequential(with_progress: bool) -> None: + if with_progress: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + console=console, + ) as progress: + task = progress.add_task( + f"Analyzing {total_files} files...", total=total_files + ) + for fp in files_to_process: + result = _safe_process_file(fp) + if result is not None: + handle_result(result) + progress.advance(task) + else: + console.print(f"[info]Processing {total_files} changed files...[/info]") + for fp in files_to_process: + result = _safe_process_file(fp) + if result is not None: + handle_result(result) + + try: with ProcessPoolExecutor(max_workers=args.processes) as executor: - futures = [ - executor.submit( - process_file, - fp, - str(root_path), - cfg, - args.min_loc, - args.min_stmt, + if args.no_progress: + console.print( + f"[info]Processing {total_files} changed files...[/info]" ) - for fp in files_to_process - ] - for future in as_completed(futures): - try: - result = future.result() - except Exception as e: - console.print(f"[warning]Failed to process file: {e}[/warning]") - continue - - if result: - fp, stat, units, blocks = result - cache.put_file_entry(fp, stat, units, blocks) - changed_files_count += 1 - all_units.extend([u.__dict__ for u in units]) - all_blocks.extend([b.__dict__ for b in blocks]) - else: - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), - TimeElapsedColumn(), - console=console, - ) as progress: - task = progress.add_task( - f"Analyzing {total_files} files...", total=total_files - ) - with ProcessPoolExecutor(max_workers=args.processes) as executor: - futures = [ - executor.submit( - process_file, - fp, - str(root_path), - cfg, - args.min_loc, - args.min_stmt, + + # Process in batches to manage memory + for i in range(0, total_files, BATCH_SIZE): + batch = files_to_process[i : i + BATCH_SIZE] + futures = [ + executor.submit( + process_file, + fp, + str(root_path), + cfg, + args.min_loc, + args.min_stmt, + ) + for fp in batch + ] + + for future in as_completed(futures): + result, err = _safe_future_result(future) + if result is not None: + handle_result(result) + elif err is not None: + console.print( + "[warning]Failed to process batch item: " + f"{err}[/warning]" + ) + + else: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + console=console, + ) as progress: + task = progress.add_task( + f"Analyzing {total_files} files...", total=total_files ) - for fp in files_to_process - ] - for future in as_completed(futures): - try: - result = future.result() - except Exception: - # Log error but keep progress bar moving? - # console.print might break progress bar layout, better to rely on rich logging or just skip - # console.print(f"[warning]Failed to process file: {e}[/warning]") - continue - finally: - progress.advance(task) - - if result: - fp, stat, units, blocks = result - cache.put_file_entry(fp, stat, units, blocks) - changed_files_count += 1 - all_units.extend([u.__dict__ for u in units]) - all_blocks.extend([b.__dict__ for b in blocks]) + + # Process in batches + for i in range(0, total_files, BATCH_SIZE): + batch = files_to_process[i : i + BATCH_SIZE] + futures = [ + executor.submit( + process_file, + fp, + str(root_path), + cfg, + args.min_loc, + args.min_stmt, + ) + for fp in batch + ] + + for future in as_completed(futures): + result, err = _safe_future_result(future) + if result is not None: + handle_result(result) + elif err is not None: + # Should rarely happen due to try/except + # in process_file. + console.print( + f"[warning]Worker failed: {err}[/warning]" + ) + progress.advance(task) + except (OSError, RuntimeError, PermissionError) as e: + console.print( + "[warning]Parallel processing unavailable, " + f"falling back to sequential: {e}[/warning]" + ) + process_sequential(with_progress=not args.no_progress) + + if failed_files: + console.print( + f"\n[warning]⚠ {len(failed_files)} files failed to process:[/warning]" + ) + for failure in failed_files[:10]: + console.print(f" • {failure}") + if len(failed_files) > 10: + console.print(f" ... and {len(failed_files) - 10} more") # Analysis phase with console.status("[bold green]Grouping clones...", spinner="dots"): func_groups = build_groups(all_units) block_groups = build_block_groups(all_blocks) - cache.save() + try: + cache.save() + except CacheError as e: + console.print(f"[warning]Failed to save cache: {e}[/warning]") # Reporting func_clones_count = len(func_groups) @@ -300,24 +472,45 @@ def main() -> None: # Baseline Logic baseline_path = Path(args.baseline).expanduser().resolve() - # If user didn't specify path, and default logic applies, baseline_path is now ./codeclone_baseline.json + # If user didn't specify path and default logic applies, baseline_path + # is now ./codeclone_baseline.json baseline = Baseline(baseline_path) baseline_exists = baseline_path.exists() if baseline_exists: baseline.load() + if not args.update_baseline and baseline.python_version: + current_version = f"{sys.version_info.major}.{sys.version_info.minor}" + if baseline.python_version != current_version: + console.print( + "[warning]Baseline Python version mismatch.[/warning]\n" + f"Baseline was generated with Python {baseline.python_version}.\n" + f"Current interpreter: Python {current_version}." + ) + if args.fail_on_new: + console.print( + "[error]Baseline checks require the same Python version to " + "ensure deterministic results. Please regenerate the baseline " + "using the current interpreter.[/error]" + ) + sys.exit(2) else: if not args.update_baseline: console.print( - f"[warning]Baseline file not found at: [bold]{baseline_path}[/bold][/warning]\n" + "[warning]Baseline file not found at: [bold]" + f"{baseline_path}" + "[/bold][/warning]\n" "[dim]Comparing against an empty baseline. " "Use --update-baseline to create it.[/dim]" ) if args.update_baseline: new_baseline = Baseline.from_groups( - func_groups, block_groups, path=baseline_path + func_groups, + block_groups, + path=baseline_path, + python_version=f"{sys.version_info.major}.{sys.version_info.minor}", ) new_baseline.save() console.print(f"[success]✔ Baseline updated:[/success] {baseline_path}") @@ -365,7 +558,7 @@ def main() -> None: out = Path(args.json_out).expanduser().resolve() out.parent.mkdir(parents=True, exist_ok=True) out.write_text( - to_json({"functions": func_groups, "blocks": block_groups}), + to_json_report(func_groups, block_groups), "utf-8", ) console.print(f"[info]JSON report saved:[/info] {out}") @@ -392,8 +585,9 @@ def main() -> None: sys.exit(3) if 0 <= args.fail_threshold < (func_clones_count + block_clones_count): + total = func_clones_count + block_clones_count console.print( - f"\n[error]❌ FAILED: Total clones ({func_clones_count + block_clones_count}) " + f"\n[error]❌ FAILED: Total clones ({total}) " f"exceed threshold ({args.fail_threshold})![/error]" ) sys.exit(2) diff --git a/codeclone/errors.py b/codeclone/errors.py new file mode 100644 index 0000000..c2ab463 --- /dev/null +++ b/codeclone/errors.py @@ -0,0 +1,27 @@ +""" +CodeClone — AST and CFG-based code clone detector for Python +focused on architectural duplication. + +Copyright (c) 2026 Den Rozhnovskiy +Licensed under the MIT License. +""" + + +class CodeCloneError(Exception): + """Base exception for CodeClone.""" + + +class FileProcessingError(CodeCloneError): + """Error processing a source file.""" + + +class ParseError(FileProcessingError): + """AST parsing failed.""" + + +class ValidationError(CodeCloneError): + """Input validation failed.""" + + +class CacheError(CodeCloneError): + """Cache operation failed.""" diff --git a/codeclone/extractor.py b/codeclone/extractor.py index 031f652..02f9730 100644 --- a/codeclone/extractor.py +++ b/codeclone/extractor.py @@ -9,21 +9,24 @@ from __future__ import annotations import ast +import os +import signal +from collections.abc import Iterator +from contextlib import contextmanager from dataclasses import dataclass -from typing import Sequence -from .blocks import extract_blocks, BlockUnit +from .blocks import BlockUnit, extract_blocks from .cfg import CFGBuilder -from .fingerprint import sha1, bucket_loc +from .errors import ParseError +from .fingerprint import bucket_loc, sha1 from .normalize import NormalizationConfig, normalized_ast_dump_from_list - # ========================= # Data structures # ========================= -@dataclass(frozen=True) +@dataclass(frozen=True, slots=True) class Unit: qualname: str filepath: str @@ -39,6 +42,67 @@ class Unit: # Helpers # ========================= +PARSE_TIMEOUT_SECONDS = 5 + + +class _ParseTimeoutError(Exception): + pass + + +@contextmanager +def _parse_limits(timeout_s: int) -> Iterator[None]: + if os.name != "posix" or timeout_s <= 0: + yield + return + + old_handler = signal.getsignal(signal.SIGALRM) + + def _timeout_handler(_signum: int, _frame: object) -> None: + raise _ParseTimeoutError("AST parsing timeout") + + old_limits: tuple[int, int] | None = None + try: + signal.signal(signal.SIGALRM, _timeout_handler) + signal.setitimer(signal.ITIMER_REAL, timeout_s) + + try: + import resource + + old_limits = resource.getrlimit(resource.RLIMIT_CPU) + soft, hard = old_limits + new_soft = ( + min(timeout_s, soft) if soft != resource.RLIM_INFINITY else timeout_s + ) + new_hard = ( + min(timeout_s + 1, hard) + if hard != resource.RLIM_INFINITY + else timeout_s + 1 + ) + resource.setrlimit(resource.RLIMIT_CPU, (new_soft, new_hard)) + except Exception: + # If resource is unavailable or cannot be set, rely on alarm only. + pass + + yield + finally: + signal.setitimer(signal.ITIMER_REAL, 0) + signal.signal(signal.SIGALRM, old_handler) + if old_limits is not None: + try: + import resource + + resource.setrlimit(resource.RLIMIT_CPU, old_limits) + except Exception: + pass + + +def _parse_with_limits(source: str, timeout_s: int) -> ast.AST: + try: + with _parse_limits(timeout_s): + return ast.parse(source) + except _ParseTimeoutError as e: + raise ParseError(str(e)) from e + def _stmt_count(node: ast.AST) -> int: body = getattr(node, "body", None) @@ -46,6 +110,8 @@ def _stmt_count(node: ast.AST) -> int: class _QualnameBuilder(ast.NodeVisitor): + __slots__ = ("stack", "units") + def __init__(self) -> None: self.stack: list[str] = [] self.units: list[tuple[str, ast.FunctionDef | ast.AsyncFunctionDef]] = [] @@ -56,11 +122,11 @@ def visit_ClassDef(self, node: ast.ClassDef) -> None: self.stack.pop() def visit_FunctionDef(self, node: ast.FunctionDef) -> None: - name = ".".join(self.stack + [node.name]) if self.stack else node.name + name = ".".join([*self.stack, node.name]) if self.stack else node.name self.units.append((name, node)) def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: - name = ".".join(self.stack + [node.name]) if self.stack else node.name + name = ".".join([*self.stack, node.name]) if self.stack else node.name self.units.append((name, node)) @@ -75,28 +141,39 @@ def get_cfg_fingerprint( qualname: str, ) -> str: """ - Build CFG, normalize it into a canonical form, and hash it. + Generate a structural fingerprint for a function using CFG analysis. + + The fingerprint is computed by: + 1. Building a Control Flow Graph (CFG) from the function + 2. Normalizing each CFG block's statements (variable names, constants, etc.) + 3. Creating a canonical representation of the CFG structure + 4. Hashing the representation with SHA-1 + + Functions with identical control flow and normalized statements will + produce the same fingerprint, even if they differ in variable names, + constants, or type annotations. + + Args: + node: Function AST node to fingerprint + cfg: Normalization configuration (what to ignore) + qualname: Qualified name for logging/debugging + + Returns: + 40-character hex SHA-1 hash of the normalized CFG """ builder = CFGBuilder() graph = builder.build(qualname, node) + # Use generator to avoid building large list of strings parts: list[str] = [] - - # Stable order for deterministic hash for block in sorted(graph.blocks, key=lambda b: b.id): - # NOTE: normalized_ast_dump_from_list must accept Sequence[ast.AST] (covariant), - # but even if it still accepts list[ast.AST], passing list[ast.stmt] will fail - # due to invariance. We pass as Sequence[ast.AST] via a typed view. - stmts_as_ast: Sequence[ast.AST] = block.statements - normalized_stmts = normalized_ast_dump_from_list(stmts_as_ast, cfg) - - successor_ids = sorted(succ.id for succ in block.successors) - + succ_ids = ",".join( + str(s.id) for s in sorted(block.successors, key=lambda s: s.id) + ) parts.append( - f"BLOCK[{block.id}]:{normalized_stmts}" - f"|SUCCESSORS:{','.join(map(str, successor_ids))}" + f"BLOCK[{block.id}]:{normalized_ast_dump_from_list(block.statements, cfg)}" + f"|SUCCESSORS:{succ_ids}" ) - return sha1("|".join(parts)) @@ -114,9 +191,9 @@ def extract_units_from_source( min_stmt: int, ) -> tuple[list[Unit], list[BlockUnit]]: try: - tree = ast.parse(source) - except SyntaxError: - return [], [] + tree = _parse_with_limits(source, PARSE_TIMEOUT_SECONDS) + except SyntaxError as e: + raise ParseError(f"Failed to parse {filepath}: {e}") from e qb = _QualnameBuilder() qb.visit(tree) diff --git a/codeclone/html_report.py b/codeclone/html_report.py index ac7d762..eb23f87 100644 --- a/codeclone/html_report.py +++ b/codeclone/html_report.py @@ -9,32 +9,35 @@ from __future__ import annotations import html +import importlib import itertools +from collections.abc import Iterable from dataclasses import dataclass -from pathlib import Path -from string import Template -from typing import Any, Optional, Iterable +from functools import lru_cache +from typing import Any, NamedTuple, cast from codeclone import __version__ +from codeclone.errors import FileProcessingError +from .templates import FONT_CSS_URL, REPORT_TEMPLATE -# ============================ +# ============================ # Pairwise -# ============================ +# ============================ def pairwise(iterable: Iterable[Any]) -> Iterable[tuple[Any, Any]]: a, b = itertools.tee(iterable) next(b, None) - return zip(a, b) + return zip(a, b, strict=False) -# ============================ +# ============================ # Code snippet infrastructure -# ============================ +# ============================ -@dataclass +@dataclass(slots=True) class _Snippet: filepath: str start_line: int @@ -43,28 +46,79 @@ class _Snippet: class _FileCache: - def __init__(self) -> None: - self._lines: dict[str, list[str]] = {} + __slots__ = ("_get_lines_impl", "maxsize") + + def __init__(self, maxsize: int = 128) -> None: + self.maxsize = maxsize + # Create a bound method with lru_cache + # We need to cache on the method to have instance-level caching if we wanted + # different caches per instance. But lru_cache on method actually caches + # on the function object (class level) if not careful, + # or we use a wrapper. + # However, for this script, we usually have one reporter. + # To be safe and cleaner, we can use a method that delegates to a cached + # function, OR just use lru_cache on a method (which requires 'self' to be + # hashable, which it is by default id). + # But 'self' changes if we create new instances. + # Let's use the audit's pattern: cache the implementation. + + self._get_lines_impl = lru_cache(maxsize=maxsize)(self._read_file_range) + + def _read_file_range( + self, filepath: str, start_line: int, end_line: int + ) -> tuple[str, ...]: + if start_line < 1: + start_line = 1 + if end_line < start_line: + return () + + try: + + def _read_with_errors(errors: str) -> tuple[str, ...]: + lines: list[str] = [] + with open(filepath, encoding="utf-8", errors=errors) as f: + for lineno, line in enumerate(f, start=1): + if lineno < start_line: + continue + if lineno > end_line: + break + lines.append(line.rstrip("\n")) + return tuple(lines) - def get_lines(self, filepath: str) -> list[str]: - if filepath not in self._lines: try: - text = Path(filepath).read_text("utf-8") + return _read_with_errors("strict") except UnicodeDecodeError: - text = Path(filepath).read_text("utf-8", errors="replace") - self._lines[filepath] = text.splitlines() - return self._lines[filepath] + return _read_with_errors("replace") + except OSError as e: + raise FileProcessingError(f"Cannot read {filepath}: {e}") from e + + def get_lines_range( + self, filepath: str, start_line: int, end_line: int + ) -> tuple[str, ...]: + return self._get_lines_impl(filepath, start_line, end_line) + class _CacheInfo(NamedTuple): + hits: int + misses: int + maxsize: int | None + currsize: int -def _try_pygments(code: str) -> Optional[str]: + def cache_info(self) -> _CacheInfo: + return cast(_FileCache._CacheInfo, self._get_lines_impl.cache_info()) + + +def _try_pygments(code: str) -> str | None: try: - from pygments import highlight - from pygments.formatters import HtmlFormatter - from pygments.lexers import PythonLexer + pygments = importlib.import_module("pygments") + formatters = importlib.import_module("pygments.formatters") + lexers = importlib.import_module("pygments.lexers") except Exception: return None - result = highlight(code, PythonLexer(), HtmlFormatter(nowrap=True)) + highlight = pygments.highlight + formatter_cls = formatters.HtmlFormatter + lexer_cls = lexers.PythonLexer + result = highlight(code, lexer_cls(), formatter_cls(nowrap=True)) return result if isinstance(result, str) else None @@ -74,21 +128,23 @@ def _pygments_css(style_name: str) -> str: If Pygments is not available or style missing, returns "". """ try: - from pygments.formatters import HtmlFormatter + formatters = importlib.import_module("pygments.formatters") except Exception: return "" try: - fmt = HtmlFormatter(style=style_name) + formatter_cls = formatters.HtmlFormatter + fmt = formatter_cls(style=style_name) except Exception: try: - fmt = HtmlFormatter() + fmt = formatter_cls() except Exception: return "" try: # `.codebox` scope: pygments will emit selectors like `.codebox .k { ... }` - return fmt.get_style_defs(".codebox") + css = fmt.get_style_defs(".codebox") + return css if isinstance(css, str) else "" except Exception: return "" @@ -104,11 +160,7 @@ def _prefix_css(css: str, prefix: str) -> str: if not stripped: out_lines.append(line) continue - if ( - stripped.startswith("/*") - or stripped.startswith("*") - or stripped.startswith("*/") - ): + if stripped.startswith(("/*", "*", "*/")): out_lines.append(line) continue # Selector lines usually end with `{ @@ -134,17 +186,16 @@ def _render_code_block( context: int, max_lines: int, ) -> _Snippet: - lines = file_cache.get_lines(filepath) - s = max(1, start_line - context) - e = min(len(lines), end_line + context) + e = end_line + context if e - s + 1 > max_lines: e = s + max_lines - 1 + lines = file_cache.get_lines_range(filepath, s, e) + numbered: list[tuple[bool, str]] = [] - for lineno in range(s, e + 1): - line = lines[lineno - 1] + for lineno, line in enumerate(lines, start=s): hit = start_line <= lineno <= end_line numbered.append((hit, f"{lineno:>5} | {line.rstrip()}")) @@ -164,13 +215,13 @@ def _render_code_block( filepath=filepath, start_line=start_line, end_line=end_line, - code_html=f'
{body}
', + code_html=f'
{body}
', ) -# ============================ +# ============================ # HTML report builder -# ============================ +# ============================ def _escape(v: Any) -> str: @@ -184,575 +235,6 @@ def _group_sort_key(items: list[dict[str, Any]]) -> tuple[int, int]: ) -REPORT_TEMPLATE = Template(r""" - - - - - -${title} - - - - - -
-
-
-

${title}

-
v${version}
-
-
- -
-
-
- -
-${empty_state_html} - -${func_section} -${block_section} - - -
- - - - -""") - - def build_html_report( *, func_groups: dict[str, list[dict[str, Any]]], @@ -780,19 +262,69 @@ def build_html_report( pyg_dark = _prefix_css(pyg_dark_raw, "html[data-theme='dark']") pyg_light = _prefix_css(pyg_light_raw, "html[data-theme='light']") - # ============================ + # ============================ # Icons (Inline SVG) - # ============================ - ICON_SEARCH = '' - ICON_X = '' - ICON_CHEV_DOWN = '' - # ICON_CHEV_RIGHT = '' - ICON_THEME = '' - ICON_CHECK = '' - ICON_PREV = '' - ICON_NEXT = '' - - # ---------------------------- + # ============================ + ICON_SEARCH = ( + '' + '' + '' + "" + ) + ICON_X = ( + '' + '' + '' + "" + ) + ICON_CHEV_DOWN = ( + '' + '' + "" + ) + # ICON_CHEV_RIGHT = ( + # '' + # '' + # "" + # ) + ICON_THEME = ( + '' + '' + "" + ) + ICON_CHECK = ( + '' + '' + "" + ) + ICON_PREV = ( + '' + '' + "" + ) + ICON_NEXT = ( + '' + '' + "" + ) + + # ---------------------------- # Section renderer # ---------------------------- @@ -810,26 +342,43 @@ def render_section( f'
', '
', f"

{_escape(section_title)} " - f'{len(groups)} groups

', + f'' + f"{len(groups)} groups", f""" -