1313from packaging .version import Version
1414from upath import UPath
1515
16+ from _pytask .journal import JsonlJournal
1617from _pytask .node_protocols import PNode
1718from _pytask .node_protocols import PPathNode
1819from _pytask .node_protocols import PTask
@@ -99,52 +100,24 @@ def build_portable_node_id(node: PNode, root: Path) -> str:
99100 return node .name
100101
101102
102- def _journal_path (path : Path ) -> Path :
103- return path .with_suffix (f"{ path .suffix } .journal" )
104-
105-
106- def _append_journal_entry (path : Path , entry : _TaskEntry ) -> None :
107- journal_path = _journal_path (path )
108- payload = _JournalEntry (
109- lock_version = CURRENT_LOCKFILE_VERSION ,
110- id = entry .id ,
111- state = entry .state ,
112- depends_on = entry .depends_on ,
113- produces = entry .produces ,
103+ def _journal (path : Path ) -> JsonlJournal [_JournalEntry ]:
104+ return JsonlJournal (
105+ path = path .with_suffix (f"{ path .suffix } .journal" ), type_ = _JournalEntry
114106 )
115- with journal_path .open ("ab" ) as journal_file :
116- journal_file .write (msgspec .json .encode (payload ) + b"\n " )
117107
118108
119- def _read_journal_entries (path : Path ) -> list [_JournalEntry ]:
120- journal_path = _journal_path (path )
121- if not journal_path .exists ():
122- return []
123-
124- entries : list [_JournalEntry ] = []
125- for line in journal_path .read_bytes ().splitlines ():
126- if not line .strip ():
127- continue
128- try :
129- entry = msgspec .json .decode (line , type = _JournalEntry )
130- except msgspec .DecodeError :
131- break
109+ def _read_journal_entries (journal : JsonlJournal [_JournalEntry ]) -> list [_JournalEntry ]:
110+ entries = journal .read ()
111+ for entry in entries :
132112 if Version (entry .lock_version ) != Version (CURRENT_LOCKFILE_VERSION ):
133113 msg = (
134114 f"Unsupported lock-version { entry .lock_version !r} . "
135115 f"Current version is { CURRENT_LOCKFILE_VERSION } ."
136116 )
137117 raise LockfileVersionError (msg )
138- entries .append (entry )
139118 return entries
140119
141120
142- def _delete_journal (path : Path ) -> None :
143- journal_path = _journal_path (path )
144- if journal_path .exists ():
145- journal_path .unlink ()
146-
147-
148121def read_lockfile (path : Path ) -> _Lockfile | None :
149122 if not path .exists ():
150123 return None
@@ -285,7 +258,8 @@ def __post_init__(self) -> None:
285258 @classmethod
286259 def from_path (cls , path : Path , root : Path ) -> LockfileState :
287260 existing = read_lockfile (path )
288- journal_entries = _read_journal_entries (path )
261+ journal = _journal (path )
262+ journal_entries = _read_journal_entries (journal )
289263 if existing is None :
290264 lockfile = _Lockfile (
291265 lock_version = CURRENT_LOCKFILE_VERSION ,
@@ -338,7 +312,16 @@ def update_task(self, session: Session, task: PTask) -> None:
338312 task = list (self ._task_index .values ()),
339313 )
340314 self ._rebuild_indexes ()
341- _append_journal_entry (self .path , entry )
315+ journal = _journal (self .path )
316+ journal .append (
317+ _JournalEntry (
318+ lock_version = CURRENT_LOCKFILE_VERSION ,
319+ id = entry .id ,
320+ state = entry .state ,
321+ depends_on = entry .depends_on ,
322+ produces = entry .produces ,
323+ )
324+ )
342325 self ._dirty = True
343326
344327 def rebuild_from_session (self , session : Session ) -> None :
@@ -355,14 +338,14 @@ def rebuild_from_session(self, session: Session) -> None:
355338 )
356339 self ._rebuild_indexes ()
357340 write_lockfile (self .path , self .lockfile )
358- _delete_journal (self .path )
341+ _journal (self .path ). delete ( )
359342 self ._dirty = False
360343
361344 def flush (self ) -> None :
362345 if not self ._dirty :
363346 return
364347 write_lockfile (self .path , self .lockfile )
365- _delete_journal (self .path )
348+ _journal (self .path ). delete ( )
366349 self ._dirty = False
367350
368351
0 commit comments