Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/switchyard/sync_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ async def sync_one(
children.append((child_digest, child_body, child_ct))
all_blobs.extend(_extract_blob_digests(child_body))

all_blobs = list(dict.fromkeys(all_blobs))
missing = [d for d in all_blobs if not await storage.has_blob(d)]
if missing:
for digest in missing:
Expand Down Expand Up @@ -149,6 +150,16 @@ async def run_sync_loop(
for marker in pending:
try:
await sync_one(marker, storage, queue, upstream)
except SyncMissingBlobsError as exc:
log.warning(
"Sync deferred for {name}:{ref} — {n} blob(s) not yet"
" available locally, will retry after they arrive: {blobs}",
name=marker.name,
ref=marker.reference,
n=len(exc.missing),
blobs=", ".join(d[:19] for d in exc.missing),
)
await queue.mark_failed(marker)
except Exception:
log.opt(exception=True).error(
"Failed to sync {name}:{ref}",
Expand Down
80 changes: 80 additions & 0 deletions tests/test_sync_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# ABOUTME: Verifies that pending markers are processed and blobs/manifests pushed upstream.
from __future__ import annotations

import asyncio
import hashlib
import json
from pathlib import Path
Expand All @@ -16,6 +17,7 @@
SyncMissingBlobsError,
_extract_blob_digests,
_extract_child_manifests,
run_sync_loop,
sync_one,
)
from switchyard.upstream import UpstreamClient
Expand Down Expand Up @@ -282,3 +284,81 @@ async def test_sync_one_fails_when_child_manifest_blobs_missing(tmp_path: Path)
# Marker should NOT be cleared (sync failed)
remaining = await queue.list_pending()
assert len(remaining) == 1


@respx.mock
async def test_sync_one_deduplicates_missing_blobs(tmp_path: Path) -> None:
"""When the same blob digest appears in multiple layers, it should only be
reported once in the SyncMissingBlobsError."""
storage = Storage(str(tmp_path))
await storage.init()
queue = SyncQueue(str(tmp_path))
await queue.init()

missing_blob = "sha256:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
# Same blob referenced twice in two layers
manifest_body = _make_manifest([missing_blob, missing_blob])
ct = "application/vnd.oci.image.manifest.v1+json"
await storage.store_manifest("myapp", "latest", manifest_body, ct)

await queue.enqueue("myapp", "latest")
pending = await queue.list_pending()

upstream = UpstreamClient(BASE)
with pytest.raises(SyncMissingBlobsError) as exc_info:
await sync_one(pending[0], storage, queue, upstream)
await upstream.close()

assert len(exc_info.value.missing) == 1


@respx.mock
async def test_run_sync_loop_logs_missing_blobs_without_traceback(
tmp_path: Path, capfd: pytest.CaptureFixture[str]
) -> None:
"""SyncMissingBlobsError is an expected condition and should be logged as a
warning without a full traceback."""
import loguru
import sys

# Set up loguru to write to stderr so capfd captures it
loguru.logger.remove()
loguru.logger.add(sys.stderr, format="{level} | {message}")

storage = Storage(str(tmp_path))
await storage.init()
queue = SyncQueue(str(tmp_path))
await queue.init()

missing_blob = "sha256:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
manifest_body = _make_manifest([missing_blob])
ct = "application/vnd.oci.image.manifest.v1+json"
await storage.store_manifest("myapp", "v1", manifest_body, ct)
await queue.enqueue("myapp", "v1")

upstream = UpstreamClient(BASE)

# Run just one iteration by cancelling after a short delay
async def cancel_after_one_iteration() -> None:
# Give the loop time to process one marker
await asyncio.sleep(0.1)
raise asyncio.CancelledError

task = asyncio.create_task(run_sync_loop(storage, queue, upstream, interval=60))
await asyncio.sleep(0.2)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await upstream.close()

captured = capfd.readouterr()
stderr = captured.err

# Should contain a WARNING, not an ERROR
assert "WARNING" in stderr
# Should NOT contain "Traceback" (no full stack trace)
assert "Traceback" not in stderr
# Should mention the missing blobs
assert "Missing" in stderr or "missing" in stderr.lower()
Loading