Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 99 additions & 31 deletions ontrack.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import os
import pathlib
import pwd
import subprocess
import sys

import yaml
Expand Down Expand Up @@ -203,6 +204,62 @@ def get_group_subdirectories(
return result


def _run_du(
path: str,
patterns: list[str],
all_files: bool = False,
) -> list[tuple[int, str]]:
"""Run ``du(1)`` and return a list of ``(size_bytes, path)`` pairs.

Uses ``--apparent-size`` and ``--block-size=1`` to report byte-accurate
apparent file sizes, matching ``os.stat().st_size``. When *all_files* is
``False`` (the default) only directory entries are emitted by ``du``; when
``True`` both regular files and directories are included via the ``-a``
flag.

Args:
path: Root directory to pass to ``du``.
patterns: Shell-style glob patterns forwarded as ``--exclude`` options
to ``du``. Matching files and directories are omitted from the
output.
all_files: Pass ``-a`` to ``du`` so that regular files are included
in the output in addition to directories.

Returns:
A list of ``(size_in_bytes, absolute_path)`` tuples parsed from
``du`` stdout, or an empty list if ``du`` cannot be executed or
produces no parsable output.
"""
cmd = ["du", "--apparent-size", "--block-size=1"]
if all_files:
cmd.append("-a")
for pattern in patterns:
cmd.extend(["--exclude", pattern])
cmd.append(path)
try:
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode != 0:
logger.warning(
"du exited with code %d for %s: %s",
proc.returncode,
path,
proc.stderr.strip(),
)
result: list[tuple[int, str]] = []
for line in proc.stdout.splitlines():
if "\t" not in line:
continue
size_str, entry_path = line.split("\t", 1)
try:
result.append((int(size_str), entry_path))
except ValueError:
logger.debug("Could not parse du output line: %r", line)
return result
except OSError as exc:
logger.warning("Could not run du for %s: %s", path, exc)
return []


def get_directory_stats(
path: str,
groups: list[str] | None = None,
Expand All @@ -211,19 +268,23 @@ def get_directory_stats(
) -> dict:
"""Return file count and total size (bytes) for a directory tree.

Uses ``du(1)`` for directory traversal, which correctly handles
directories where the calling user lacks execute permission and is
significantly faster than Python-level ``os.walk`` on large trees or
network filesystems. Directories and files whose base names match any
pattern in *ignore_patterns* are excluded via ``du``'s ``--exclude``
option.

If *groups* is given, only files owned by users belonging to any of those
Unix groups are counted. If *show_progress* is ``True`` (default:
``False``), a tqdm progress bar is displayed on stderr for each
subdirectory visited during the walk; set it to ``False`` to suppress all
progress output. Directories and files whose base names match any pattern
in *ignore_patterns* are excluded from the walk and the counts
respectively.
Unix groups are counted. Ownership is determined via ``lstat``; files
whose ownership cannot be determined are included in the totals.

Args:
path: Root of the directory tree to scan.
groups: Optional list of Unix group names; when supplied only files
owned by members of these groups are included in the counts.
show_progress: Display a tqdm progress bar on stderr while scanning.
show_progress: Accepted for API compatibility; has no effect because
``du`` completes the scan in a single subprocess call.
ignore_patterns: Shell-style glob patterns (see :func:`_is_ignored`).
Matched directories are not descended into; matched files are not
counted. ``None`` is treated as an empty list.
Expand All @@ -235,33 +296,33 @@ def get_directory_stats(
for group in groups:
allowed_users.update(get_group_members(group))

# First pass: du without -a lists only directories. Building a set of
# these paths lets us distinguish directory entries from file entries in
# the second pass without an extra stat call per entry.
dir_entries = _run_du(path, patterns, all_files=False)
dir_paths: set[str] = {p for _, p in dir_entries}

# Second pass: du -a lists both files and directories.
all_entries = _run_du(path, patterns, all_files=True)

file_count = 0
total_size = 0
walker = os.walk(path)
if show_progress:
walker = tqdm(
walker,
desc=f"Scanning {os.path.basename(path)}",
unit="dir",
file=sys.stderr,
leave=False,
)
for dirpath, dirnames, filenames in walker:
# Prune ignored subdirectories so os.walk does not descend into them.
if patterns:
dirnames[:] = [d for d in dirnames if not _is_ignored(d, patterns)]
dir_path = pathlib.Path(dirpath)
for filename in filenames:
if patterns and _is_ignored(filename, patterns):
continue
filepath = dir_path / filename
for size, entry_path in all_entries:
if entry_path in dir_paths:
continue # skip directory entries
if allowed_users is not None:
try:
if allowed_users is not None and filepath.owner() not in allowed_users:
file_uid = pathlib.Path(entry_path).lstat().st_uid
if _uid_to_username(file_uid) not in allowed_users:
continue
total_size += filepath.lstat().st_size
file_count += 1
except (OSError, KeyError):
except OSError:
# Cannot determine ownership (e.g. no execute permission on the
# containing directory); fall through and count the file anyway
# so that du's reported size is not silently discarded.
pass
file_count += 1
total_size += size

return {"file_count": file_count, "total_size": total_size}


Expand All @@ -288,9 +349,16 @@ def _build_directory_entry(
valid directory. In *light* mode only the path and username are included;
file-count and size scanning are skipped.

*groups* is used only to label the output entry (``entry["groups"]``); it
is **not** forwarded to :func:`get_directory_stats`. By the time this
function is called the reporting directory has already been selected
(e.g. by :func:`get_group_subdirectories`), so all files inside it should
be counted unconditionally.

Args:
path: Directory to report on.
groups: Optional list of Unix group names forwarded to
groups: Optional list of Unix group names included in the output entry
for display purposes. Does not filter the files counted by
:func:`get_directory_stats`.
light: When ``True``, skip file-count and size scanning.
show_progress: Forward progress display flag to :func:`get_directory_stats`.
Expand All @@ -308,7 +376,7 @@ def _build_directory_entry(
entry["groups"] = groups
if not light:
stats = get_directory_stats(
path, groups=groups, show_progress=show_progress, ignore_patterns=ignore_patterns
path, show_progress=show_progress, ignore_patterns=ignore_patterns
)
entry["file_count"] = stats["file_count"]
entry["total_size"] = stats["total_size"]
Expand Down
146 changes: 146 additions & 0 deletions tests/test_ontrack.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
_build_directory_entry,
_find_reporting_directories,
_is_ignored,
_run_du,
_uid_to_username,
format_size,
get_directory_stats,
Expand Down Expand Up @@ -993,6 +994,34 @@ def test_build_directory_entry_with_group(tmp_path):
assert entry["groups"] == [group_name]


def test_build_directory_entry_group_does_not_filter_stats(tmp_path):
"""_build_directory_entry counts all files even when groups are supplied.

groups is used only as a display label on the entry; it must not be
forwarded to get_directory_stats as a file-ownership filter. The
reporting directory has already been selected by group ownership, so all
files inside it – regardless of who owns them – should be counted.
"""
current_gid = os.getgid()
group_name = grp.getgrgid(current_gid).gr_name

# Two files: both owned by the current user (who is in group_name).
# The total should be 2 regardless of the groups label.
(tmp_path / "file1.txt").write_text("hello")
(tmp_path / "file2.txt").write_text("world")

# Call with groups= (display label only) and without groups= (baseline).
entry_with_group = _build_directory_entry(str(tmp_path), groups=[group_name])
entry_no_group = _build_directory_entry(str(tmp_path))

assert entry_with_group is not None
assert entry_no_group is not None
# Stats must be identical regardless of whether groups is supplied.
assert entry_with_group["file_count"] == entry_no_group["file_count"]
assert entry_with_group["total_size"] == entry_no_group["total_size"]
assert entry_with_group["file_count"] == 2


# ---------------------------------------------------------------------------
# _is_ignored
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1208,3 +1237,120 @@ def test_main_no_ignore_key_counts_all_files(tmp_path, capsys):
captured = capsys.readouterr()
assert "Files : 2" in captured.out


# ---------------------------------------------------------------------------
# _run_du
# ---------------------------------------------------------------------------


def test_run_du_returns_files_and_dirs(tmp_path):
"""_run_du with all_files=True returns both file and directory entries."""
(tmp_path / "a.txt").write_text("hello") # 5 bytes
sub = tmp_path / "sub"
sub.mkdir()
(sub / "b.txt").write_text("world!") # 6 bytes

entries = _run_du(str(tmp_path), [], all_files=True)
paths = [p for _, p in entries]
assert any("a.txt" in p for p in paths)
assert any("b.txt" in p for p in paths)
assert any(str(tmp_path) == p for p in paths) # root directory itself


def test_run_du_dirs_only_excludes_regular_files(tmp_path):
"""_run_du with all_files=False lists only directories, not regular files."""
(tmp_path / "a.txt").write_text("hello")
sub = tmp_path / "sub"
sub.mkdir()
(sub / "b.txt").write_text("world!")

entries = _run_du(str(tmp_path), [], all_files=False)
paths = [p for _, p in entries]
# Regular files must not appear in the dirs-only output.
assert not any("a.txt" in p for p in paths)
assert not any("b.txt" in p for p in paths)
# Directories must appear.
assert any(str(sub) == p for p in paths)
assert any(str(tmp_path) == p for p in paths)


def test_run_du_sizes_match_lstat(tmp_path):
"""_run_du apparent sizes match os.lstat().st_size for regular files."""
content = "hello world" # 11 bytes
f = tmp_path / "file.txt"
f.write_text(content)

entries = _run_du(str(tmp_path), [], all_files=True)
file_entry = next(((s, p) for s, p in entries if "file.txt" in p), None)
assert file_entry is not None
size_from_du, _ = file_entry
assert size_from_du == f.lstat().st_size


def test_run_du_respects_exclude_patterns(tmp_path):
"""_run_du honours shell-style exclude patterns."""
(tmp_path / "visible.txt").write_text("hello")
(tmp_path / ".hidden").write_text("secret")
hidden_dir = tmp_path / ".cache"
hidden_dir.mkdir()
(hidden_dir / "data").write_text("cached")

entries = _run_du(str(tmp_path), [".*"], all_files=True)
paths = [p for _, p in entries]
assert not any(".hidden" in p for p in paths)
assert not any(".cache" in p for p in paths)
assert any("visible.txt" in p for p in paths)


def test_run_du_nonexistent_path(tmp_path):
"""_run_du returns an empty list (no exception) for a nonexistent path."""
result = _run_du(str(tmp_path / "does_not_exist"), [])
assert result == []


def test_run_du_empty_directory(tmp_path):
"""_run_du on an empty directory returns exactly one entry for the root."""
entries = _run_du(str(tmp_path), [], all_files=True)
assert len(entries) == 1
size, path = entries[0]
assert path == str(tmp_path)
assert size == 0


# ---------------------------------------------------------------------------
# get_directory_stats – no-execute subdirectory
# ---------------------------------------------------------------------------


@pytest.mark.skipif(os.getuid() == 0, reason="root bypasses permission checks")
def test_get_directory_stats_no_execute_subdir(tmp_path):
"""Stats for accessible files are still reported when a subdir lacks execute permission.

When a subdirectory has read-only permission (no execute), the tool cannot
stat individual files inside it. The files that ARE accessible (in other
subdirectories and at the top level) must still be counted correctly.
"""
# Accessible file at the top level.
accessible = tmp_path / "accessible.txt"
accessible.write_text("hello") # 5 bytes

# Subdirectory that will have its execute bit removed.
restricted = tmp_path / "restricted"
restricted.mkdir()
(restricted / "secret.txt").write_text("secret data") # 11 bytes

# Remove execute permission from the restricted subdirectory.
restricted.chmod(0o444)

try:
stats = get_directory_stats(str(tmp_path))
# The top-level accessible file must be counted.
# Files inside the restricted dir are inaccessible to both du and
# Python's stat(), so they may or may not be counted; the important
# thing is that the function does not raise and returns the files it
# can see.
assert stats["file_count"] >= 1
assert stats["total_size"] >= accessible.lstat().st_size
finally:
# Restore permissions so tmp_path cleanup succeeds.
restricted.chmod(0o755)