Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1a3a56f
chore: vscode excluded by gitignore
rpanfili Mar 5, 2026
220d6b0
fix: run ingestion in executor to avoid blocking event loop
rpanfili Mar 17, 2026
4c41347
fix: offload postprocessors and validation to executor to prevent blo…
rpanfili Mar 17, 2026
00e2a46
feat: use postprocessor pool for true concurrent processing
rpanfili Mar 17, 2026
3d31940
fix: increase postprocessor startup timeout from 10s to 60s
rpanfili Mar 17, 2026
87367f5
debug: add timing instrumentation to mapping, postprocessor, and vali…
rpanfili Mar 17, 2026
b7a4745
refactor: pre-load SHACL and validate in-memory to avoid I/O
rpanfili Mar 18, 2026
8a588fd
feat: run SHACL validation in a process pool to bypass GIL and parall…
rpanfili Mar 18, 2026
65867cb
feat: add separate pool size settings for postprocessors and SHACL va…
rpanfili Mar 18, 2026
90492ab
feat: track SHACL process pool queue wait and execution time separate…
rpanfili Mar 18, 2026
24ea3bd
feat: add inprocess postprocessor runtime for running processors in t…
rpanfili Mar 18, 2026
9630bca
feat: run postprocessors on a dedicated thread pool instead of the de…
rpanfili Mar 18, 2026
60c407b
fix: handle SHACL process pool timeout and broken executor errors gra…
rpanfili Mar 18, 2026
5b7c615
fix: offload graph hashing to executor to avoid blocking the event loop
rpanfili Mar 18, 2026
bebf0a6
fix: enable stop_after_attempt(5) retry limit on graph queue put
rpanfili Mar 18, 2026
feaabd3
fix: disable morph_kgc internal multiprocessing to prevent fork deadl…
rpanfili Mar 18, 2026
783c7ea
fix: offload RML mapping to dedicated thread pool to prevent blocking…
rpanfili Mar 18, 2026
0c6b5a7
fix: serialize morph_kgc calls with a lock to prevent thread-safety i…
rpanfili Mar 18, 2026
457740e
Revert "fix: offload RML mapping to dedicated thread pool to prevent …
rpanfili Mar 18, 2026
4a7d68f
perf: run morph_kgc in a subprocess pool for true parallelism without…
rpanfili Mar 18, 2026
4287c90
perf: expose morph_kgc pool size setting and track subprocess queue w…
rpanfili Mar 18, 2026
d674016
fix: read morph_kgc queue wait from worker thread via closure to avoi…
rpanfili Mar 18, 2026
dc22bec
perf: reuse a single persistent ApiClient across requests instead of …
rpanfili Mar 18, 2026
519efbf
fix: create ApiClient directly on the event loop thread instead of in…
rpanfili Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
__pycache__/
dist/
.claude/

.vscode/
67 changes: 46 additions & 21 deletions wordlift_sdk/kg_build/postprocessors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import asyncio
import importlib
import inspect
import json
import logging
import select
Expand All @@ -21,6 +24,7 @@

_RUNTIME_ONESHOT = "oneshot"
_RUNTIME_PERSISTENT = "persistent"
_RUNTIME_INPROCESS = "inprocess"


@dataclass(frozen=True)
Expand Down Expand Up @@ -181,7 +185,7 @@ def _ensure_started(self) -> subprocess.Popen[str]:

try:
ready = self._read_message(
process, timeout_seconds=min(self._spec.timeout_seconds, 10)
process, timeout_seconds=min(self._spec.timeout_seconds, 60)
)
except Exception:
self._terminate(process)
Expand Down Expand Up @@ -373,6 +377,23 @@ def _run_persistent(
)


@dataclass(frozen=True)
class InProcessPostprocessor:
class_path: str

def process_graph(
self, graph: Graph, context: PostprocessorContext
) -> Graph | None:
module_name, class_name = self.class_path.split(":", 1)
module = importlib.import_module(module_name)
klass = getattr(module, class_name)
processor = klass()
result = processor.process_graph(graph, context)
if inspect.isawaitable(result):
result = asyncio.run(result)
return result


def _as_bool(value: Any, default: bool) -> bool:
if value is None:
return default
Expand All @@ -399,8 +420,10 @@ def _as_positive_int(value: Any, default: int) -> int:

def _normalize_runtime(value: str | None) -> str:
runtime = (value or _RUNTIME_ONESHOT).strip().lower()
if runtime not in {_RUNTIME_ONESHOT, _RUNTIME_PERSISTENT}:
raise ValueError("POSTPROCESSOR_RUNTIME must be one of: oneshot, persistent.")
if runtime not in {_RUNTIME_ONESHOT, _RUNTIME_PERSISTENT, _RUNTIME_INPROCESS}:
raise ValueError(
"POSTPROCESSOR_RUNTIME must be one of: oneshot, persistent, inprocess."
)
return runtime


Expand Down Expand Up @@ -510,16 +533,17 @@ def load_postprocessors_for_profile(
for spec in specs:
if not spec.enabled:
continue
loaded.append(
LoadedPostprocessor(
name=spec.class_path,
handler=SubprocessPostprocessor(
spec=spec,
root_dir=root_dir,
runtime=resolved_runtime,
),
if resolved_runtime == _RUNTIME_INPROCESS:
handler: GraphPostprocessor = InProcessPostprocessor(
class_path=spec.class_path
)
)
else:
handler = SubprocessPostprocessor(
spec=spec,
root_dir=root_dir,
runtime=resolved_runtime,
)
loaded.append(LoadedPostprocessor(name=spec.class_path, handler=handler))

logger.info(
"Loaded %s postprocessors for profile '%s' from manifest: %s (runtime=%s)",
Expand Down Expand Up @@ -550,16 +574,17 @@ def load_postprocessors(
for spec in specs:
if not spec.enabled:
continue
loaded.append(
LoadedPostprocessor(
name=spec.class_path,
handler=SubprocessPostprocessor(
spec=spec,
root_dir=root_dir,
runtime=resolved_runtime,
),
if resolved_runtime == _RUNTIME_INPROCESS:
handler: GraphPostprocessor = InProcessPostprocessor(
class_path=spec.class_path
)
)
else:
handler = SubprocessPostprocessor(
spec=spec,
root_dir=root_dir,
runtime=resolved_runtime,
)
loaded.append(LoadedPostprocessor(name=spec.class_path, handler=handler))
return loaded


Expand Down
Loading
Loading