Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,32 @@ cat examples/checksums.csv
> examples/example_content/dir/.hidden_dir/file.txt,file.txt,7d52c7437e9af58dac029dd11b1024df
>```

- **Zip Support:**
sum-buddy now supports processing zip files. When a zip file is encountered, it will:
- Calculate the checksum of the zip file itself.
- List each file inside the zip as `zipfile.zip/filename` with its own checksum.

Example:
```bash
sum-buddy --output-file examples/checksums_zip.csv examples/example_content/
```
> Output
> ```console
> Calculating md5 checksums on examples/example_content/: 100%|████████████████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 15109.16it/s]
> md5 checksums for examples/example_content/ written to examples/checksums_zip.csv
> ```
```bash
cat examples/checksums_zip.csv
```
> Output:
> ```console
> filepath,filename,md5
> examples/example_content/file.txt,file.txt,7d52c7437e9af58dac029dd11b1024df
> examples/example_content/testzip.zip,testzip.zip,dcf68ba27f40590ff899b63d44e18836
> examples/example_content/testzip.zip/file.txt,file.txt,7d52c7437e9af58dac029dd11b1024df
> examples/example_content/testzip.zip/dir/file.txt,file.txt,7d52c7437e9af58dac029dd11b1024df
> examples/example_content/dir/file.txt,file.txt,7d52c7437e9af58dac029dd11b1024df
> ```

If only a target directory is passed, the default settings are to ignore hidden files and directories (those that begin with a `.`), use the `md5` algorithm, and print output to `stdout`, which can be piped (`|`).

Expand Down Expand Up @@ -172,9 +198,13 @@ pip install -e ".[dev]"
3. Install pre-commit hook
```bash
pre-commit install
pre-commit autoupdate # optionally update
```
4. Run tests:

### Tests

A dedicated test file, `tests/test_archive.py`, has been added to verify zip file support. This test ensures that both zip files and their contents are correctly processed and checksummed. The test uses a sample archive (`tests/test_archive.zip`) included in the repository.

Run all tests with:
```bash
pytest
python -m pytest -v
```
21 changes: 18 additions & 3 deletions src/sumbuddy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tqdm import tqdm
import sys
import os
import zipfile

def get_checksums(input_path, output_filepath=None, ignore_file=None, include_hidden=False, algorithm='md5', length=None):
"""
Expand Down Expand Up @@ -49,8 +50,23 @@ def get_checksums(input_path, output_filepath=None, ignore_file=None, include_hi

disable_tqdm = output_filepath is None
for file_path in tqdm(file_paths, desc=f"Calculating {algorithm} checksums on {input_path}", disable=disable_tqdm):
checksum = hasher.checksum_file(file_path, algorithm=algorithm, length=length)
writer.writerow([file_path, os.path.basename(file_path), checksum])
# For files inside zip files (indicated by path containing .zip/)
if '.zip/' in file_path:
zip_index = file_path.find('.zip/')
zip_path = file_path[:zip_index + 4] # include '.zip'
file_in_zip = file_path[zip_index + 5:]
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# Only try to open if the file exists in the zip
if file_in_zip in zip_ref.namelist():
with zip_ref.open(file_in_zip) as file_in_zip_ref:
checksum = hasher.checksum_file(file_in_zip_ref, algorithm=algorithm, length=length)
writer.writerow([file_path, os.path.basename(file_path), checksum])
else:
print(f"Warning: {file_in_zip} not found in {zip_path}, skipping.")
else:
# For regular files and zip files themselves
checksum = hasher.checksum_file(file_path, algorithm=algorithm, length=length)
writer.writerow([file_path, os.path.basename(file_path), checksum])

finally:
if output_filepath:
Expand All @@ -60,7 +76,6 @@ def get_checksums(input_path, output_filepath=None, ignore_file=None, include_hi
print(f"{algorithm} checksums for {input_path} written to {output_filepath}")

def main():

available_algorithms = ', '.join(hashlib.algorithms_available)

parser = argparse.ArgumentParser(description="Generate CSV with filepath, filename, and checksums for all files in a given directory (or a single file)")
Expand Down
53 changes: 53 additions & 0 deletions src/sumbuddy/archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import zipfile
import tempfile
import shutil

class ArchiveHandler:
def __init__(self):
self.temp_dir = None

def process_zip(self, zip_path, root_dir):
"""
Process a zip file and return paths to its contents.

Parameters:
------------
zip_path - String. Path to the zip file.
root_dir - String. Root directory for relative path calculations.

Returns:
---------
List of tuples (file_path, relative_path) for files in the zip.
"""
if not zipfile.is_zipfile(zip_path):
return []

# Create a temporary directory for extraction
self.temp_dir = tempfile.mkdtemp()

try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# Extract all contents to temp directory
zip_ref.extractall(self.temp_dir)

# Get list of all files in the zip
file_paths = []
for member in zip_ref.namelist():
# Only add files, not directories
if member.endswith('/'):
continue
full_path = os.path.join(self.temp_dir, member)
# The path as it should appear in the CSV: zip_path/member
rel_path = f"{zip_path}/{member}"
file_paths.append((full_path, rel_path))
return file_paths
except Exception as e:
self.cleanup()
raise e

def cleanup(self):
"""Clean up temporary directory if it exists."""
if self.temp_dir and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
self.temp_dir = None
15 changes: 10 additions & 5 deletions src/sumbuddy/hasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ class Hasher:
def __init__(self, algorithm='md5'):
self.algorithm = algorithm

def checksum_file(self, file_path, algorithm=None, length=None):
def checksum_file(self, file_path_or_obj, algorithm=None, length=None):
"""
Calculate the checksum of a file using the specified algorithm.

Parameters:
------------
file_path - String. Path to file to apply checksum function.
file_path_or_obj - String or file-like object. Path to file or file-like object to apply checksum function.
algorithm - String. Hash function to use for checksums. Default: 'md5', see options with 'hashlib.algorithms_available'.
length - Integer [optional]. Length of the digest for SHAKE and BLAKE algorithms in bytes.

Expand Down Expand Up @@ -55,9 +55,14 @@ def checksum_file(self, file_path, algorithm=None, length=None):
raise LengthUsedForFixedLengthHashError(algorithm)
hash_func = hashlib.new(algorithm)

# Read the file and update the hash function
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
# Handle both file paths and file-like objects
if isinstance(file_path_or_obj, str):
with open(file_path_or_obj, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_func.update(chunk)
else:
# Assume it's a file-like object
for chunk in iter(lambda: file_path_or_obj.read(4096), b""):
hash_func.update(chunk)

# Return the hash digest
Expand Down
14 changes: 14 additions & 0 deletions src/sumbuddy/mapper.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
import zipfile
from sumbuddy.filter import Filter
from sumbuddy.exceptions import EmptyInputDirectoryError, NoFilesAfterFilteringError, NotADirectoryError
from sumbuddy.archive import ArchiveHandler

class Mapper:
def __init__(self):
self.filter_manager = Filter()
self.archive_handler = ArchiveHandler()

def reset_filter(self, ignore_file=None, include_hidden=False):
"""
Expand Down Expand Up @@ -56,7 +59,18 @@ def gather_file_paths(self, input_directory, ignore_file=None, include_hidden=Fa
file_path = os.path.join(root, name)
if self.filter_manager.should_include(file_path, root_directory):
file_paths.append(file_path)
# If it's a zip file, process its contents
if zipfile.is_zipfile(file_path):
try:
zip_contents = self.archive_handler.process_zip(file_path, root_directory)
for _, zip_path in zip_contents:
if self.filter_manager.should_include(zip_path, root_directory):
file_paths.append(zip_path)
finally:
pass

# Perform cleanup after processing all zip files
self.archive_handler.cleanup()
if not has_files:
raise EmptyInputDirectoryError(input_directory)
if not file_paths:
Expand Down
181 changes: 181 additions & 0 deletions tests/test_archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import tempfile
import zipfile
from pathlib import Path

from sumbuddy.archive import ArchiveHandler
from sumbuddy.mapper import Mapper
from sumbuddy.hasher import Hasher


class TestArchiveHandler:
"""Test cases for ArchiveHandler class."""

def test_process_zip_success(self):
"""Test successful zip file processing."""
handler = ArchiveHandler()
test_zip_path = Path(__file__).parent / "test_archive.zip"

# Ensure test zip exists
assert test_zip_path.exists(), "Test zip file not found"

with tempfile.TemporaryDirectory() as temp_dir:
extracted_files = handler.process_zip(str(test_zip_path), temp_dir)

# Should return list of tuples (file_path, relative_path)
assert len(extracted_files) == 2
assert any("test_file.txt" in str(f[1]) for f in extracted_files)
assert any("nested_file.txt" in str(f[1]) for f in extracted_files)

# Check that files were actually extracted
for file_path, _ in extracted_files:
assert Path(file_path).exists()

def test_process_zip_invalid_file(self):
"""Test processing non-zip file."""
handler = ArchiveHandler()

with tempfile.TemporaryDirectory() as temp_dir:
# Create a non-zip file
non_zip_file = Path(temp_dir) / "not_a_zip.txt"
non_zip_file.write_text("This is not a zip file")

# Should return empty list for non-zip files
result = handler.process_zip(str(non_zip_file), temp_dir)
assert result == []

def test_process_zip_nonexistent_file(self):
"""Test processing non-existent file."""
handler = ArchiveHandler()

with tempfile.TemporaryDirectory() as temp_dir:
non_existent_file = Path(temp_dir) / "nonexistent.zip"

# Should return empty list for non-existent files
result = handler.process_zip(str(non_existent_file), temp_dir)
assert result == []


class TestMapperWithZip:
"""Test cases for Mapper class with zip file support."""

def test_gather_file_paths_with_zip(self):
"""Test gathering file paths including zip files."""
mapper = Mapper()
test_zip_path = Path(__file__).parent / "test_archive.zip"

# Create a temporary directory with the test zip
with tempfile.TemporaryDirectory() as temp_dir:
temp_zip_path = Path(temp_dir) / "test_archive.zip"
# Copy test zip to temp directory
import shutil
shutil.copy2(test_zip_path, temp_zip_path)

file_paths = mapper.gather_file_paths(temp_dir)

# Should include the zip file itself
assert str(temp_zip_path) in file_paths

# Should include files from within the zip
zip_file_paths = [p for p in file_paths if "test_archive.zip/" in p]
assert len(zip_file_paths) == 2
assert any("test_file.txt" in p for p in zip_file_paths)
assert any("nested_file.txt" in p for p in zip_file_paths)

def test_gather_file_paths_with_zip_and_filter(self):
"""Test gathering file paths with zip files and filters."""
mapper = Mapper()
test_zip_path = Path(__file__).parent / "test_archive.zip"

# Create a temporary directory with the test zip
with tempfile.TemporaryDirectory() as temp_dir:
temp_zip_path = Path(temp_dir) / "test_archive.zip"
import shutil
shutil.copy2(test_zip_path, temp_zip_path)

# Create an ignore file to exclude nested files
ignore_file = Path(temp_dir) / ".ignore"
ignore_file.write_text("**/nested_dir/**")

file_paths = mapper.gather_file_paths(temp_dir, ignore_file=str(ignore_file))

# Should include the zip file itself
assert str(temp_zip_path) in file_paths

# Should include only non-nested files from zip
zip_file_paths = [p for p in file_paths if "test_archive.zip/" in p]
assert len(zip_file_paths) == 1
assert any("test_file.txt" in p for p in zip_file_paths)
assert not any("nested_file.txt" in p for p in zip_file_paths)


class TestHasherWithZip:
"""Test cases for Hasher class with zip file support."""

def test_checksum_file_with_file_like_object(self):
"""Test checksum calculation with file-like object."""
hasher = Hasher()
test_zip_path = Path(__file__).parent / "test_archive.zip"

# Test with zip file
with zipfile.ZipFile(test_zip_path, 'r') as zip_file:
# Get the first file in the zip
file_name = zip_file.namelist()[0]
with zip_file.open(file_name) as file_obj:
checksum = hasher.checksum_file(file_obj)

# Should return a valid checksum
assert isinstance(checksum, str)
assert len(checksum) > 0

def test_checksum_file_with_zip_file_path(self):
"""Test checksum calculation with zip file path."""
hasher = Hasher()
test_zip_path = Path(__file__).parent / "test_archive.zip"

checksum = hasher.checksum_file(str(test_zip_path))

# Should return a valid checksum
assert isinstance(checksum, str)
assert len(checksum) > 0


def test_integration_zip_support():
"""Integration test for zip support functionality."""
from sumbuddy import get_checksums
import tempfile
import csv

test_zip_path = Path(__file__).parent / "test_archive.zip"

with tempfile.TemporaryDirectory() as temp_dir:
temp_zip_path = Path(temp_dir) / "test_archive.zip"
import shutil
shutil.copy2(test_zip_path, temp_zip_path)

output_file = Path(temp_dir) / "checksums.csv"

# Run get_checksums on directory containing zip
get_checksums(temp_dir, output_file)

# Verify output file was created
assert output_file.exists()

# Read and verify CSV contents
with open(output_file, 'r') as f:
reader = csv.DictReader(f)
rows = list(reader)

# Should have at least the zip file and its contents
assert len(rows) >= 3

# Should include zip file itself
zip_rows = [r for r in rows if r['filename'] == 'test_archive.zip']
assert len(zip_rows) == 1

# Should include files from within zip
zip_content_rows = [r for r in rows if 'test_archive.zip/' in r['filepath']]
assert len(zip_content_rows) == 2

# All rows should have valid checksums
for row in rows:
assert row['md5'] and len(row['md5']) > 0
Binary file added tests/test_archive.zip
Binary file not shown.