Skip to content

Commit 2c9b70e

Browse files
committed
Move runtimes to json journal store
1 parent cf4e8fb commit 2c9b70e

File tree

9 files changed

+356
-51
lines changed

9 files changed

+356
-51
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
77

88
## Unreleased
99

10-
- Nothing yet.
10+
- Move runtime profiling persistence from SQLite to a JSON snapshot plus append-only
11+
journal in `.pytask/`, keeping runtime data resilient to crashes and compacted on
12+
normal build exits.
1113

1214
## 0.5.8 - 2025-12-30
1315

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies = [
2424
"attrs>=21.3.0",
2525
"click>=8.1.8,!=8.2.0",
2626
"click-default-group>=1.2.4",
27+
"msgspec>=0.18.6",
2728
"networkx>=2.4.0",
2829
"optree>=0.9.0",
2930
"packaging>=23.0.0",

src/_pytask/journal.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""Helpers for append-only JSONL journals."""
2+
3+
from __future__ import annotations
4+
5+
from typing import TYPE_CHECKING
6+
from typing import TypeVar
7+
8+
import msgspec
9+
10+
if TYPE_CHECKING:
11+
from pathlib import Path
12+
13+
T = TypeVar("T")
14+
15+
16+
def append_jsonl(path: Path, payload: msgspec.Struct) -> None:
17+
"""Append a JSON line to the journal."""
18+
with path.open("ab") as journal_file:
19+
journal_file.write(msgspec.json.encode(payload) + b"\n")
20+
21+
22+
def read_jsonl(path: Path, *, type_: type[T]) -> list[T]:
23+
"""Read JSONL entries from a journal, stopping at the first invalid line."""
24+
if not path.exists():
25+
return []
26+
27+
entries: list[T] = []
28+
for line in path.read_bytes().splitlines():
29+
if not line.strip():
30+
continue
31+
try:
32+
entries.append(msgspec.json.decode(line, type=type_))
33+
except msgspec.DecodeError:
34+
break
35+
return entries
36+
37+
38+
def delete_if_exists(path: Path) -> None:
39+
"""Delete a file if it exists."""
40+
if path.exists():
41+
path.unlink()

src/_pytask/profile.py

Lines changed: 31 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,12 @@
1313

1414
import click
1515
from rich.table import Table
16-
from sqlalchemy.orm import Mapped
17-
from sqlalchemy.orm import mapped_column
1816

1917
from _pytask.click import ColoredCommand
2018
from _pytask.click import EnumChoice
2119
from _pytask.console import console
2220
from _pytask.console import format_task_name
2321
from _pytask.dag import create_dag
24-
from _pytask.database_utils import BaseTable
25-
from _pytask.database_utils import DatabaseSession
2622
from _pytask.exceptions import CollectionError
2723
from _pytask.exceptions import ConfigurationError
2824
from _pytask.node_protocols import PPathNode
@@ -31,6 +27,7 @@
3127
from _pytask.outcomes import TaskOutcome
3228
from _pytask.pluginmanager import hookimpl
3329
from _pytask.pluginmanager import storage
30+
from _pytask.runtime_store import RuntimeState
3431
from _pytask.session import Session
3532
from _pytask.traceback import Traceback
3633

@@ -48,16 +45,6 @@ class _ExportFormats(enum.Enum):
4845
CSV = "csv"
4946

5047

51-
class Runtime(BaseTable):
52-
"""Record of runtimes of tasks."""
53-
54-
__tablename__ = "runtime"
55-
56-
task: Mapped[str] = mapped_column(primary_key=True)
57-
date: Mapped[float]
58-
duration: Mapped[float]
59-
60-
6148
@hookimpl(tryfirst=True)
6249
def pytask_extend_command_line_interface(cli: click.Group) -> None:
6350
"""Extend the command line interface."""
@@ -67,6 +54,7 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
6754
@hookimpl
6855
def pytask_post_parse(config: dict[str, Any]) -> None:
6956
"""Register the export option."""
57+
config["runtime_state"] = RuntimeState.from_root(config["root"])
7058
config["pm"].register(ExportNameSpace)
7159
config["pm"].register(DurationNameSpace)
7260
config["pm"].register(FileSizeNameSpace)
@@ -83,26 +71,16 @@ def pytask_execute_task(task: PTask) -> Generator[None, None, None]:
8371

8472

8573
@hookimpl
86-
def pytask_execute_task_process_report(report: ExecutionReport) -> None:
87-
"""Store runtime of successfully finishing tasks in database."""
74+
def pytask_execute_task_process_report(
75+
session: Session, report: ExecutionReport
76+
) -> None:
77+
"""Store runtime of successfully finishing tasks."""
8878
task = report.task
8979
duration = task.attributes.get("duration")
9080
if report.outcome == TaskOutcome.SUCCESS and duration is not None:
91-
_create_or_update_runtime(task.signature, *duration)
92-
93-
94-
def _create_or_update_runtime(task_signature: str, start: float, end: float) -> None:
95-
"""Create or update a runtime entry."""
96-
with DatabaseSession() as session:
97-
runtime = session.get(Runtime, task_signature)
98-
99-
if not runtime:
100-
session.add(Runtime(task=task_signature, date=start, duration=end - start))
101-
else:
102-
for attr, val in (("date", start), ("duration", end - start)):
103-
setattr(runtime, attr, val)
104-
105-
session.commit()
81+
runtime_state = session.config.get("runtime_state")
82+
if runtime_state is not None:
83+
runtime_state.update_task(task, *duration)
10684

10785

10886
@click.command(cls=ColoredCommand)
@@ -189,20 +167,23 @@ class DurationNameSpace:
189167
@staticmethod
190168
@hookimpl
191169
def pytask_profile_add_info_on_task(
192-
tasks: list[PTask], profile: dict[str, dict[str, Any]]
170+
session: Session, tasks: list[PTask], profile: dict[str, dict[str, Any]]
193171
) -> None:
194172
"""Add the runtime for tasks to the profile."""
195-
runtimes = _collect_runtimes(tasks)
173+
runtimes = _collect_runtimes(session, tasks)
196174
for name, duration in runtimes.items():
197175
profile[name]["Duration (in s)"] = round(duration, 2)
198176

199177

200-
def _collect_runtimes(tasks: list[PTask]) -> dict[str, float]:
178+
def _collect_runtimes(session: Session, tasks: list[PTask]) -> dict[str, float]:
201179
"""Collect runtimes."""
202-
with DatabaseSession() as session:
203-
runtimes = [session.get(Runtime, task.signature) for task in tasks]
180+
runtime_state = session.config.get("runtime_state")
181+
if runtime_state is None:
182+
return {}
204183
return {
205-
task.name: r.duration for task, r in zip(tasks, runtimes, strict=False) if r
184+
task.name: duration
185+
for task in tasks
186+
if (duration := runtime_state.get_duration(task)) is not None
206187
}
207188

208189

@@ -313,3 +294,16 @@ def _get_info_names(profile: dict[str, dict[str, Any]]) -> list[str]:
313294
base: set[str] = set()
314295
info_names: list[str] = sorted(base.union(*(set(val) for val in profile.values())))
315296
return info_names
297+
298+
299+
@hookimpl
300+
def pytask_unconfigure(session: Session) -> None:
301+
"""Flush runtime information on normal build exits."""
302+
if session.config.get("command") != "build":
303+
return
304+
if session.config.get("dry_run") or session.config.get("explain"):
305+
return
306+
runtime_state = session.config.get("runtime_state")
307+
if runtime_state is None:
308+
return
309+
runtime_state.flush()

src/_pytask/runtime_store.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
"""Runtime storage with an append-only journal."""
2+
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass
6+
from dataclasses import field
7+
from typing import TYPE_CHECKING
8+
9+
import msgspec
10+
from packaging.version import Version
11+
12+
from _pytask.journal import append_jsonl
13+
from _pytask.journal import delete_if_exists
14+
from _pytask.journal import read_jsonl
15+
16+
if TYPE_CHECKING:
17+
from pathlib import Path
18+
19+
from _pytask.node_protocols import PTask
20+
21+
CURRENT_RUNTIME_VERSION = "1"
22+
23+
24+
class RuntimeStoreError(Exception):
25+
"""Raised when reading or writing runtime files fails."""
26+
27+
28+
class RuntimeStoreVersionError(RuntimeStoreError):
29+
"""Raised when a runtime file version is not supported."""
30+
31+
32+
class _RuntimeEntry(msgspec.Struct):
33+
id: str
34+
date: float
35+
duration: float
36+
37+
38+
class _RuntimeFile(msgspec.Struct, forbid_unknown_fields=False):
39+
runtime_version: str = msgspec.field(name="runtime-version")
40+
task: list[_RuntimeEntry] = msgspec.field(default_factory=list)
41+
42+
43+
class _RuntimeJournalEntry(msgspec.Struct):
44+
runtime_version: str = msgspec.field(name="runtime-version")
45+
id: str
46+
date: float
47+
duration: float
48+
49+
50+
def _runtimes_path(root: Path) -> Path:
51+
return root / ".pytask" / "runtimes.json"
52+
53+
54+
def _journal_path(path: Path) -> Path:
55+
return path.with_suffix(".journal")
56+
57+
58+
def _read_runtimes(path: Path) -> _RuntimeFile | None:
59+
if not path.exists():
60+
return None
61+
try:
62+
data = msgspec.json.decode(path.read_bytes(), type=_RuntimeFile)
63+
except msgspec.DecodeError:
64+
msg = "Runtime file has invalid format."
65+
raise RuntimeStoreError(msg) from None
66+
67+
if Version(data.runtime_version) != Version(CURRENT_RUNTIME_VERSION):
68+
msg = (
69+
f"Unsupported runtime-version {data.runtime_version!r}. "
70+
f"Current version is {CURRENT_RUNTIME_VERSION}."
71+
)
72+
raise RuntimeStoreVersionError(msg)
73+
return data
74+
75+
76+
def _write_runtimes(path: Path, runtimes: _RuntimeFile) -> None:
77+
data = msgspec.json.encode(runtimes)
78+
tmp = path.with_suffix(f"{path.suffix}.tmp")
79+
tmp.write_bytes(data)
80+
tmp.replace(path)
81+
82+
83+
def _read_journal(path: Path) -> list[_RuntimeJournalEntry]:
84+
journal_path = _journal_path(path)
85+
entries = read_jsonl(journal_path, type_=_RuntimeJournalEntry)
86+
for entry in entries:
87+
if Version(entry.runtime_version) != Version(CURRENT_RUNTIME_VERSION):
88+
msg = (
89+
f"Unsupported runtime-version {entry.runtime_version!r}. "
90+
f"Current version is {CURRENT_RUNTIME_VERSION}."
91+
)
92+
raise RuntimeStoreVersionError(msg)
93+
return entries
94+
95+
96+
def _apply_journal(
97+
runtimes: _RuntimeFile, entries: list[_RuntimeJournalEntry]
98+
) -> _RuntimeFile:
99+
if not entries:
100+
return runtimes
101+
index = {entry.id: entry for entry in runtimes.task}
102+
for entry in entries:
103+
index[entry.id] = _RuntimeEntry(
104+
id=entry.id, date=entry.date, duration=entry.duration
105+
)
106+
return _RuntimeFile(
107+
runtime_version=CURRENT_RUNTIME_VERSION,
108+
task=list(index.values()),
109+
)
110+
111+
112+
def _build_task_id(task: PTask) -> str:
113+
return task.name
114+
115+
116+
@dataclass
117+
class RuntimeState:
118+
path: Path
119+
runtimes: _RuntimeFile
120+
_index: dict[str, _RuntimeEntry] = field(init=False, default_factory=dict)
121+
_dirty: bool = field(init=False, default=False)
122+
123+
def __post_init__(self) -> None:
124+
self._rebuild_index()
125+
126+
@classmethod
127+
def from_root(cls, root: Path) -> RuntimeState:
128+
path = _runtimes_path(root)
129+
existing = _read_runtimes(path)
130+
journal_entries = _read_journal(path)
131+
if existing is None:
132+
runtimes = _RuntimeFile(
133+
runtime_version=CURRENT_RUNTIME_VERSION,
134+
task=[],
135+
)
136+
runtimes = _apply_journal(runtimes, journal_entries)
137+
state = cls(path=path, runtimes=runtimes)
138+
else:
139+
runtimes = _apply_journal(existing, journal_entries)
140+
state = cls(path=path, runtimes=runtimes)
141+
142+
if journal_entries:
143+
state._dirty = True
144+
return state
145+
146+
def _rebuild_index(self) -> None:
147+
self._index = {entry.id: entry for entry in self.runtimes.task}
148+
149+
def update_task(self, task: PTask, start: float, end: float) -> None:
150+
task_id = _build_task_id(task)
151+
entry = _RuntimeEntry(id=task_id, date=start, duration=end - start)
152+
self._index[entry.id] = entry
153+
self.runtimes = _RuntimeFile(
154+
runtime_version=CURRENT_RUNTIME_VERSION,
155+
task=list(self._index.values()),
156+
)
157+
self._rebuild_index()
158+
journal_entry = _RuntimeJournalEntry(
159+
runtime_version=CURRENT_RUNTIME_VERSION,
160+
id=entry.id,
161+
date=entry.date,
162+
duration=entry.duration,
163+
)
164+
append_jsonl(_journal_path(self.path), journal_entry)
165+
self._dirty = True
166+
167+
def get_duration(self, task: PTask) -> float | None:
168+
task_id = _build_task_id(task)
169+
entry = self._index.get(task_id)
170+
if entry is None:
171+
return None
172+
return entry.duration
173+
174+
def flush(self) -> None:
175+
if not self._dirty:
176+
return
177+
_write_runtimes(self.path, self.runtimes)
178+
delete_if_exists(_journal_path(self.path))
179+
self._dirty = False

src/pytask/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
from _pytask.pluginmanager import get_plugin_manager
6767
from _pytask.pluginmanager import hookimpl
6868
from _pytask.pluginmanager import storage
69-
from _pytask.profile import Runtime
7069
from _pytask.reports import CollectionReport
7170
from _pytask.reports import DagReport
7271
from _pytask.reports import ExecutionReport
@@ -123,7 +122,6 @@
123122
"PytaskError",
124123
"PythonNode",
125124
"ResolvingDependenciesError",
126-
"Runtime",
127125
"Session",
128126
"ShowCapture",
129127
"Skipped",

0 commit comments

Comments
 (0)