Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions src/switchyard/sync_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from __future__ import annotations

import asyncio
import contextlib
import json
import os
import tempfile
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from pathlib import Path

from loguru import logger
Expand Down Expand Up @@ -35,6 +38,24 @@ def path_key(self) -> str:
return f"{self.name}/{safe_ref}.json"


def _atomic_write(path: Path, content: str) -> None:
"""Write content to path atomically via temp file + rename.

Concurrent readers will either see the old content or the new content,
never a partially-written file.
"""
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
try:
with os.fdopen(fd, "w") as f:
f.write(content)
os.replace(tmp_path, path)
except BaseException:
with contextlib.suppress(OSError):
os.unlink(tmp_path)
raise


class SyncQueue:
def __init__(self, data_dir: str) -> None:
self._pending = Path(data_dir) / "pending"
Expand All @@ -49,11 +70,7 @@ async def enqueue(self, name: str, reference: str) -> Path:
marker = SyncMarker(name=name, reference=reference)
path = self._pending / marker.path_key

def _write() -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(asdict(marker), indent=2))

await asyncio.to_thread(_write)
await asyncio.to_thread(_atomic_write, path, json.dumps(asdict(marker), indent=2))
log.info("Queued sync for {name}:{ref}", name=name, ref=reference)
return path

Expand All @@ -68,8 +85,8 @@ def _scan() -> list[SyncMarker]:
marker = SyncMarker(**data)
if marker.is_ready:
markers.append(marker)
except (json.JSONDecodeError, TypeError, KeyError):
log.warning("Skipping malformed marker: {}", path)
except (json.JSONDecodeError, TypeError, KeyError) as exc:
log.warning("Skipping malformed marker: {} ({})", path, exc)
return markers

return await asyncio.to_thread(_scan)
Expand Down Expand Up @@ -102,10 +119,10 @@ def _nudge() -> int:
marker = SyncMarker(**data)
if not marker.is_ready:
data["next_attempt"] = now
path.write_text(json.dumps(data, indent=2))
_atomic_write(path, json.dumps(data, indent=2))
count += 1
except (json.JSONDecodeError, TypeError, KeyError):
log.warning("Skipping malformed marker: {}", path)
except (json.JSONDecodeError, TypeError, KeyError) as exc:
log.warning("Skipping malformed marker: {} ({})", path, exc)
return count

nudged = await asyncio.to_thread(_nudge)
Expand All @@ -116,15 +133,9 @@ def _nudge() -> int:
async def mark_failed(self, marker: SyncMarker) -> None:
marker.retries += 1
backoff = min(5 * (2**marker.retries), MAX_BACKOFF_SECONDS)
from datetime import timedelta

marker.next_attempt = (datetime.now(UTC) + timedelta(seconds=backoff)).isoformat()
path = self._pending / marker.path_key

def _write() -> None:
path.write_text(json.dumps(asdict(marker), indent=2))

await asyncio.to_thread(_write)
await asyncio.to_thread(_atomic_write, path, json.dumps(asdict(marker), indent=2))
log.warning(
"Sync failed for {name}:{ref} (retry {n}, next in {b}s)",
name=marker.name,
Expand Down
33 changes: 33 additions & 0 deletions tests/test_sync_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,39 @@ async def test_nudge_pending_skips_already_ready(tmp_path: Path) -> None:
assert count == 0


async def test_corrupted_marker_file_skipped_gracefully(tmp_path: Path) -> None:
"""A truncated marker file (e.g. from a concurrent write) should be skipped
without affecting other valid markers."""
queue = await _make_queue(tmp_path)
await queue.enqueue("app-a", "v1")

# Write a corrupted (empty) marker file simulating a mid-write read
corrupted_path = tmp_path / "pending" / "app-b" / "latest.json"
corrupted_path.parent.mkdir(parents=True, exist_ok=True)
corrupted_path.write_text("")

pending = await queue.list_pending()
assert len(pending) == 1
assert pending[0].name == "app-a"


async def test_atomic_write_produces_valid_marker(tmp_path: Path) -> None:
"""Marker files should always contain valid JSON, even after mark_failed updates."""
queue = await _make_queue(tmp_path)
path = await queue.enqueue("myapp", "sha256:abc123")

# Verify file is valid JSON
data = json.loads(path.read_text())
assert data["name"] == "myapp"

pending = await queue.list_pending()
await queue.mark_failed(pending[0])

# After mark_failed, file should still be valid JSON
data = json.loads(path.read_text())
assert data["retries"] == 1


async def test_marker_is_ready() -> None:
past = SyncMarker(
name="a",
Expand Down
Loading