diff --git a/ontrack.py b/ontrack.py index ee7686b..dbe2c00 100644 --- a/ontrack.py +++ b/ontrack.py @@ -51,6 +51,7 @@ import os import pathlib import pwd +import subprocess import sys import yaml @@ -203,6 +204,62 @@ def get_group_subdirectories( return result +def _run_du( + path: str, + patterns: list[str], + all_files: bool = False, +) -> list[tuple[int, str]]: + """Run ``du(1)`` and return a list of ``(size_bytes, path)`` pairs. + + Uses ``--apparent-size`` and ``--block-size=1`` to report byte-accurate + apparent file sizes, matching ``os.stat().st_size``. When *all_files* is + ``False`` (the default) only directory entries are emitted by ``du``; when + ``True`` both regular files and directories are included via the ``-a`` + flag. + + Args: + path: Root directory to pass to ``du``. + patterns: Shell-style glob patterns forwarded as ``--exclude`` options + to ``du``. Matching files and directories are omitted from the + output. + all_files: Pass ``-a`` to ``du`` so that regular files are included + in the output in addition to directories. + + Returns: + A list of ``(size_in_bytes, absolute_path)`` tuples parsed from + ``du`` stdout, or an empty list if ``du`` cannot be executed or + produces no parsable output. + """ + cmd = ["du", "--apparent-size", "--block-size=1"] + if all_files: + cmd.append("-a") + for pattern in patterns: + cmd.extend(["--exclude", pattern]) + cmd.append(path) + try: + proc = subprocess.run(cmd, capture_output=True, text=True, check=False) + if proc.returncode != 0: + logger.warning( + "du exited with code %d for %s: %s", + proc.returncode, + path, + proc.stderr.strip(), + ) + result: list[tuple[int, str]] = [] + for line in proc.stdout.splitlines(): + if "\t" not in line: + continue + size_str, entry_path = line.split("\t", 1) + try: + result.append((int(size_str), entry_path)) + except ValueError: + logger.debug("Could not parse du output line: %r", line) + return result + except OSError as exc: + logger.warning("Could not run du for %s: %s", path, exc) + return [] + + def get_directory_stats( path: str, groups: list[str] | None = None, @@ -211,19 +268,23 @@ def get_directory_stats( ) -> dict: """Return file count and total size (bytes) for a directory tree. + Uses ``du(1)`` for directory traversal, which correctly handles + directories where the calling user lacks execute permission and is + significantly faster than Python-level ``os.walk`` on large trees or + network filesystems. Directories and files whose base names match any + pattern in *ignore_patterns* are excluded via ``du``'s ``--exclude`` + option. + If *groups* is given, only files owned by users belonging to any of those - Unix groups are counted. If *show_progress* is ``True`` (default: - ``False``), a tqdm progress bar is displayed on stderr for each - subdirectory visited during the walk; set it to ``False`` to suppress all - progress output. Directories and files whose base names match any pattern - in *ignore_patterns* are excluded from the walk and the counts - respectively. + Unix groups are counted. Ownership is determined via ``lstat``; files + whose ownership cannot be determined are included in the totals. Args: path: Root of the directory tree to scan. groups: Optional list of Unix group names; when supplied only files owned by members of these groups are included in the counts. - show_progress: Display a tqdm progress bar on stderr while scanning. + show_progress: Accepted for API compatibility; has no effect because + ``du`` completes the scan in a single subprocess call. ignore_patterns: Shell-style glob patterns (see :func:`_is_ignored`). Matched directories are not descended into; matched files are not counted. ``None`` is treated as an empty list. @@ -235,33 +296,33 @@ def get_directory_stats( for group in groups: allowed_users.update(get_group_members(group)) + # First pass: du without -a lists only directories. Building a set of + # these paths lets us distinguish directory entries from file entries in + # the second pass without an extra stat call per entry. + dir_entries = _run_du(path, patterns, all_files=False) + dir_paths: set[str] = {p for _, p in dir_entries} + + # Second pass: du -a lists both files and directories. + all_entries = _run_du(path, patterns, all_files=True) + file_count = 0 total_size = 0 - walker = os.walk(path) - if show_progress: - walker = tqdm( - walker, - desc=f"Scanning {os.path.basename(path)}", - unit="dir", - file=sys.stderr, - leave=False, - ) - for dirpath, dirnames, filenames in walker: - # Prune ignored subdirectories so os.walk does not descend into them. - if patterns: - dirnames[:] = [d for d in dirnames if not _is_ignored(d, patterns)] - dir_path = pathlib.Path(dirpath) - for filename in filenames: - if patterns and _is_ignored(filename, patterns): - continue - filepath = dir_path / filename + for size, entry_path in all_entries: + if entry_path in dir_paths: + continue # skip directory entries + if allowed_users is not None: try: - if allowed_users is not None and filepath.owner() not in allowed_users: + file_uid = pathlib.Path(entry_path).lstat().st_uid + if _uid_to_username(file_uid) not in allowed_users: continue - total_size += filepath.lstat().st_size - file_count += 1 - except (OSError, KeyError): + except OSError: + # Cannot determine ownership (e.g. no execute permission on the + # containing directory); fall through and count the file anyway + # so that du's reported size is not silently discarded. pass + file_count += 1 + total_size += size + return {"file_count": file_count, "total_size": total_size} @@ -288,9 +349,16 @@ def _build_directory_entry( valid directory. In *light* mode only the path and username are included; file-count and size scanning are skipped. + *groups* is used only to label the output entry (``entry["groups"]``); it + is **not** forwarded to :func:`get_directory_stats`. By the time this + function is called the reporting directory has already been selected + (e.g. by :func:`get_group_subdirectories`), so all files inside it should + be counted unconditionally. + Args: path: Directory to report on. - groups: Optional list of Unix group names forwarded to + groups: Optional list of Unix group names included in the output entry + for display purposes. Does not filter the files counted by :func:`get_directory_stats`. light: When ``True``, skip file-count and size scanning. show_progress: Forward progress display flag to :func:`get_directory_stats`. @@ -308,7 +376,7 @@ def _build_directory_entry( entry["groups"] = groups if not light: stats = get_directory_stats( - path, groups=groups, show_progress=show_progress, ignore_patterns=ignore_patterns + path, show_progress=show_progress, ignore_patterns=ignore_patterns ) entry["file_count"] = stats["file_count"] entry["total_size"] = stats["total_size"] diff --git a/tests/test_ontrack.py b/tests/test_ontrack.py index 9c9767d..63d61db 100644 --- a/tests/test_ontrack.py +++ b/tests/test_ontrack.py @@ -18,6 +18,7 @@ _build_directory_entry, _find_reporting_directories, _is_ignored, + _run_du, _uid_to_username, format_size, get_directory_stats, @@ -993,6 +994,34 @@ def test_build_directory_entry_with_group(tmp_path): assert entry["groups"] == [group_name] +def test_build_directory_entry_group_does_not_filter_stats(tmp_path): + """_build_directory_entry counts all files even when groups are supplied. + + groups is used only as a display label on the entry; it must not be + forwarded to get_directory_stats as a file-ownership filter. The + reporting directory has already been selected by group ownership, so all + files inside it – regardless of who owns them – should be counted. + """ + current_gid = os.getgid() + group_name = grp.getgrgid(current_gid).gr_name + + # Two files: both owned by the current user (who is in group_name). + # The total should be 2 regardless of the groups label. + (tmp_path / "file1.txt").write_text("hello") + (tmp_path / "file2.txt").write_text("world") + + # Call with groups= (display label only) and without groups= (baseline). + entry_with_group = _build_directory_entry(str(tmp_path), groups=[group_name]) + entry_no_group = _build_directory_entry(str(tmp_path)) + + assert entry_with_group is not None + assert entry_no_group is not None + # Stats must be identical regardless of whether groups is supplied. + assert entry_with_group["file_count"] == entry_no_group["file_count"] + assert entry_with_group["total_size"] == entry_no_group["total_size"] + assert entry_with_group["file_count"] == 2 + + # --------------------------------------------------------------------------- # _is_ignored # --------------------------------------------------------------------------- @@ -1208,3 +1237,120 @@ def test_main_no_ignore_key_counts_all_files(tmp_path, capsys): captured = capsys.readouterr() assert "Files : 2" in captured.out + +# --------------------------------------------------------------------------- +# _run_du +# --------------------------------------------------------------------------- + + +def test_run_du_returns_files_and_dirs(tmp_path): + """_run_du with all_files=True returns both file and directory entries.""" + (tmp_path / "a.txt").write_text("hello") # 5 bytes + sub = tmp_path / "sub" + sub.mkdir() + (sub / "b.txt").write_text("world!") # 6 bytes + + entries = _run_du(str(tmp_path), [], all_files=True) + paths = [p for _, p in entries] + assert any("a.txt" in p for p in paths) + assert any("b.txt" in p for p in paths) + assert any(str(tmp_path) == p for p in paths) # root directory itself + + +def test_run_du_dirs_only_excludes_regular_files(tmp_path): + """_run_du with all_files=False lists only directories, not regular files.""" + (tmp_path / "a.txt").write_text("hello") + sub = tmp_path / "sub" + sub.mkdir() + (sub / "b.txt").write_text("world!") + + entries = _run_du(str(tmp_path), [], all_files=False) + paths = [p for _, p in entries] + # Regular files must not appear in the dirs-only output. + assert not any("a.txt" in p for p in paths) + assert not any("b.txt" in p for p in paths) + # Directories must appear. + assert any(str(sub) == p for p in paths) + assert any(str(tmp_path) == p for p in paths) + + +def test_run_du_sizes_match_lstat(tmp_path): + """_run_du apparent sizes match os.lstat().st_size for regular files.""" + content = "hello world" # 11 bytes + f = tmp_path / "file.txt" + f.write_text(content) + + entries = _run_du(str(tmp_path), [], all_files=True) + file_entry = next(((s, p) for s, p in entries if "file.txt" in p), None) + assert file_entry is not None + size_from_du, _ = file_entry + assert size_from_du == f.lstat().st_size + + +def test_run_du_respects_exclude_patterns(tmp_path): + """_run_du honours shell-style exclude patterns.""" + (tmp_path / "visible.txt").write_text("hello") + (tmp_path / ".hidden").write_text("secret") + hidden_dir = tmp_path / ".cache" + hidden_dir.mkdir() + (hidden_dir / "data").write_text("cached") + + entries = _run_du(str(tmp_path), [".*"], all_files=True) + paths = [p for _, p in entries] + assert not any(".hidden" in p for p in paths) + assert not any(".cache" in p for p in paths) + assert any("visible.txt" in p for p in paths) + + +def test_run_du_nonexistent_path(tmp_path): + """_run_du returns an empty list (no exception) for a nonexistent path.""" + result = _run_du(str(tmp_path / "does_not_exist"), []) + assert result == [] + + +def test_run_du_empty_directory(tmp_path): + """_run_du on an empty directory returns exactly one entry for the root.""" + entries = _run_du(str(tmp_path), [], all_files=True) + assert len(entries) == 1 + size, path = entries[0] + assert path == str(tmp_path) + assert size == 0 + + +# --------------------------------------------------------------------------- +# get_directory_stats – no-execute subdirectory +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif(os.getuid() == 0, reason="root bypasses permission checks") +def test_get_directory_stats_no_execute_subdir(tmp_path): + """Stats for accessible files are still reported when a subdir lacks execute permission. + + When a subdirectory has read-only permission (no execute), the tool cannot + stat individual files inside it. The files that ARE accessible (in other + subdirectories and at the top level) must still be counted correctly. + """ + # Accessible file at the top level. + accessible = tmp_path / "accessible.txt" + accessible.write_text("hello") # 5 bytes + + # Subdirectory that will have its execute bit removed. + restricted = tmp_path / "restricted" + restricted.mkdir() + (restricted / "secret.txt").write_text("secret data") # 11 bytes + + # Remove execute permission from the restricted subdirectory. + restricted.chmod(0o444) + + try: + stats = get_directory_stats(str(tmp_path)) + # The top-level accessible file must be counted. + # Files inside the restricted dir are inaccessible to both du and + # Python's stat(), so they may or may not be counted; the important + # thing is that the function does not raise and returns the files it + # can see. + assert stats["file_count"] >= 1 + assert stats["total_size"] >= accessible.lstat().st_size + finally: + # Restore permissions so tmp_path cleanup succeeds. + restricted.chmod(0o755)