Skip to content

Commit d2a47aa

Browse files
committed
Add lockfile journaling
1 parent 3e44287 commit d2a47aa

File tree

2 files changed

+205
-12
lines changed

2 files changed

+205
-12
lines changed

src/_pytask/lockfile.py

Lines changed: 106 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ class _Lockfile(msgspec.Struct, forbid_unknown_fields=False):
4747
task: list[_TaskEntry] = msgspec.field(default_factory=list)
4848

4949

50+
class _JournalEntry(msgspec.Struct):
51+
lock_version: str = msgspec.field(name="lock-version")
52+
id: str
53+
state: str
54+
depends_on: dict[str, str] = msgspec.field(default_factory=dict)
55+
produces: dict[str, str] = msgspec.field(default_factory=dict)
56+
57+
5058
def _encode_node_path(path: tuple[str | int, ...]) -> str:
5159
return msgspec.json.encode(path).decode()
5260

@@ -91,6 +99,52 @@ def build_portable_node_id(node: PNode, root: Path) -> str:
9199
return node.name
92100

93101

102+
def _journal_path(path: Path) -> Path:
103+
return path.with_suffix(f"{path.suffix}.journal")
104+
105+
106+
def _append_journal_entry(path: Path, entry: _TaskEntry) -> None:
107+
journal_path = _journal_path(path)
108+
payload = _JournalEntry(
109+
lock_version=CURRENT_LOCKFILE_VERSION,
110+
id=entry.id,
111+
state=entry.state,
112+
depends_on=entry.depends_on,
113+
produces=entry.produces,
114+
)
115+
with journal_path.open("ab") as journal_file:
116+
journal_file.write(msgspec.json.encode(payload) + b"\n")
117+
118+
119+
def _read_journal_entries(path: Path) -> list[_JournalEntry]:
120+
journal_path = _journal_path(path)
121+
if not journal_path.exists():
122+
return []
123+
124+
entries: list[_JournalEntry] = []
125+
for line in journal_path.read_bytes().splitlines():
126+
if not line.strip():
127+
continue
128+
try:
129+
entry = msgspec.json.decode(line, type=_JournalEntry)
130+
except msgspec.DecodeError:
131+
break
132+
if Version(entry.lock_version) != Version(CURRENT_LOCKFILE_VERSION):
133+
msg = (
134+
f"Unsupported lock-version {entry.lock_version!r}. "
135+
f"Current version is {CURRENT_LOCKFILE_VERSION}."
136+
)
137+
raise LockfileVersionError(msg)
138+
entries.append(entry)
139+
return entries
140+
141+
142+
def _delete_journal(path: Path) -> None:
143+
journal_path = _journal_path(path)
144+
if journal_path.exists():
145+
journal_path.unlink()
146+
147+
94148
def read_lockfile(path: Path) -> _Lockfile | None:
95149
if not path.exists():
96150
return None
@@ -147,6 +201,23 @@ def write_lockfile(path: Path, lockfile: _Lockfile) -> None:
147201
tmp.replace(path)
148202

149203

204+
def _apply_journal(lockfile: _Lockfile, entries: list[_JournalEntry]) -> _Lockfile:
205+
if not entries:
206+
return lockfile
207+
task_index = {task.id: task for task in lockfile.task}
208+
for entry in entries:
209+
task_index[entry.id] = _TaskEntry(
210+
id=entry.id,
211+
state=entry.state,
212+
depends_on=entry.depends_on,
213+
produces=entry.produces,
214+
)
215+
return _Lockfile(
216+
lock_version=CURRENT_LOCKFILE_VERSION,
217+
task=list(task_index.values()),
218+
)
219+
220+
150221
def _build_task_entry(session: Session, task: PTask, root: Path) -> _TaskEntry | None:
151222
task_state = task.state()
152223
if task_state is None:
@@ -206,30 +277,40 @@ class LockfileState:
206277
lockfile: _Lockfile
207278
_task_index: dict[str, _TaskEntry] = field(init=False, default_factory=dict)
208279
_node_index: dict[str, dict[str, str]] = field(init=False, default_factory=dict)
280+
_dirty: bool = field(init=False, default=False)
209281

210282
def __post_init__(self) -> None:
211283
self._rebuild_indexes()
212284

213285
@classmethod
214286
def from_path(cls, path: Path, root: Path) -> LockfileState:
215287
existing = read_lockfile(path)
288+
journal_entries = _read_journal_entries(path)
216289
if existing is None:
217290
lockfile = _Lockfile(
218291
lock_version=CURRENT_LOCKFILE_VERSION,
219292
task=[],
220293
)
221-
return cls(
294+
lockfile = _apply_journal(lockfile, journal_entries)
295+
state = cls(
222296
path=path,
223297
root=root,
224-
use_lockfile_for_skip=False,
298+
use_lockfile_for_skip=bool(journal_entries),
225299
lockfile=lockfile,
226300
)
227-
return cls(
301+
if journal_entries:
302+
state._dirty = True
303+
return state
304+
lockfile = _apply_journal(existing, journal_entries)
305+
state = cls(
228306
path=path,
229307
root=root,
230308
use_lockfile_for_skip=True,
231-
lockfile=existing,
309+
lockfile=lockfile,
232310
)
311+
if journal_entries:
312+
state._dirty = True
313+
return state
233314

234315
def _rebuild_indexes(self) -> None:
235316
self._task_index = {task.id: task for task in self.lockfile.task}
@@ -257,7 +338,8 @@ def update_task(self, session: Session, task: PTask) -> None:
257338
task=list(self._task_index.values()),
258339
)
259340
self._rebuild_indexes()
260-
write_lockfile(self.path, self.lockfile)
341+
_append_journal_entry(self.path, entry)
342+
self._dirty = True
261343

262344
def rebuild_from_session(self, session: Session) -> None:
263345
if session.dag is None:
@@ -273,6 +355,15 @@ def rebuild_from_session(self, session: Session) -> None:
273355
)
274356
self._rebuild_indexes()
275357
write_lockfile(self.path, self.lockfile)
358+
_delete_journal(self.path)
359+
self._dirty = False
360+
361+
def flush(self) -> None:
362+
if not self._dirty:
363+
return
364+
write_lockfile(self.path, self.lockfile)
365+
_delete_journal(self.path)
366+
self._dirty = False
276367

277368

278369
@hookimpl
@@ -288,13 +379,20 @@ def pytask_unconfigure(session: Session) -> None:
288379
"""Optionally rewrite the lockfile to drop stale entries."""
289380
if session.config.get("command") != "build":
290381
return
291-
if not session.config.get("clean_lockfile"):
292-
return
293382
if session.config.get("dry_run"):
294383
return
384+
if session.config.get("explain"):
385+
return
295386
if session.exit_code != ExitCode.OK:
387+
lockfile_state = session.config.get("lockfile_state")
388+
if lockfile_state is None:
389+
return
390+
lockfile_state.flush()
296391
return
297392
lockfile_state = session.config.get("lockfile_state")
298393
if lockfile_state is None:
299394
return
300-
lockfile_state.rebuild_from_session(session)
395+
if session.config.get("clean_lockfile"):
396+
lockfile_state.rebuild_from_session(session)
397+
else:
398+
lockfile_state.flush()

tests/test_lockfile.py

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,108 @@ def func(path):
148148

149149
calls = {"count": 0}
150150

151-
original_write = lockfile_module.write_lockfile
151+
original_append = lockfile_module._append_journal_entry
152152

153-
def _counting_write(path, lockfile):
153+
def _counting_append(path, entry):
154154
calls["count"] += 1
155-
return original_write(path, lockfile)
155+
return original_append(path, entry)
156156

157-
monkeypatch.setattr(lockfile_module, "write_lockfile", _counting_write)
157+
monkeypatch.setattr(lockfile_module, "_append_journal_entry", _counting_append)
158158
lockfile_state.update_task(session, session.tasks[0])
159159

160160
assert calls["count"] == 0
161+
162+
163+
def test_update_task_appends_journal_on_change(tmp_path):
164+
def func(path):
165+
path.write_text("data")
166+
167+
task = TaskWithoutPath(
168+
name="task",
169+
function=func,
170+
produces={"path": PathNode(path=tmp_path / "out.txt")},
171+
)
172+
173+
session = build(tasks=[task], paths=tmp_path)
174+
assert session.exit_code == ExitCode.OK
175+
176+
lockfile_state = session.config["lockfile_state"]
177+
assert lockfile_state is not None
178+
179+
def new_func(path):
180+
path.write_text("changed")
181+
182+
session.tasks[0].function = new_func
183+
184+
lockfile_state.update_task(session, session.tasks[0])
185+
186+
journal_path = (tmp_path / "pytask.lock").with_suffix(".lock.journal")
187+
assert journal_path.exists()
188+
assert journal_path.read_text().strip()
189+
190+
191+
def test_journal_replay_updates_lockfile_state(tmp_path):
192+
def func(path):
193+
path.write_text("data")
194+
195+
task = TaskWithoutPath(
196+
name="task",
197+
function=func,
198+
produces={"path": PathNode(path=tmp_path / "out.txt")},
199+
)
200+
201+
session = build(tasks=[task], paths=tmp_path)
202+
assert session.exit_code == ExitCode.OK
203+
204+
lockfile_state = session.config["lockfile_state"]
205+
assert lockfile_state is not None
206+
207+
def new_func(path):
208+
path.write_text("changed")
209+
210+
session.tasks[0].function = new_func
211+
lockfile_state.update_task(session, session.tasks[0])
212+
213+
journal_path = (tmp_path / "pytask.lock").with_suffix(".lock.journal")
214+
assert journal_path.exists()
215+
216+
reloaded = lockfile_module.LockfileState.from_path(
217+
tmp_path / "pytask.lock", tmp_path
218+
)
219+
entry = reloaded.get_task_entry("task")
220+
assert entry is not None
221+
assert entry.state == session.tasks[0].state()
222+
223+
224+
def test_flush_writes_lockfile_and_deletes_journal(tmp_path):
225+
def func(path):
226+
path.write_text("data")
227+
228+
task = TaskWithoutPath(
229+
name="task",
230+
function=func,
231+
produces={"path": PathNode(path=tmp_path / "out.txt")},
232+
)
233+
234+
session = build(tasks=[task], paths=tmp_path)
235+
assert session.exit_code == ExitCode.OK
236+
237+
lockfile_state = session.config["lockfile_state"]
238+
assert lockfile_state is not None
239+
240+
def new_func(path):
241+
path.write_text("changed")
242+
243+
session.tasks[0].function = new_func
244+
lockfile_state.update_task(session, session.tasks[0])
245+
246+
journal_path = (tmp_path / "pytask.lock").with_suffix(".lock.journal")
247+
assert journal_path.exists()
248+
249+
lockfile_state.flush()
250+
251+
assert not journal_path.exists()
252+
lockfile = read_lockfile(tmp_path / "pytask.lock")
253+
assert lockfile is not None
254+
entries = {entry.id: entry for entry in lockfile.task}
255+
assert entries["task"].state == session.tasks[0].state()

0 commit comments

Comments
 (0)