From 1a3a56f3bd2756612b7d7a1369682e2f3a0d03da Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Thu, 5 Mar 2026 17:56:03 +0100 Subject: [PATCH 01/24] chore: vscode excluded by gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 38d49af..2d0c882 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ __pycache__/ dist/ .claude/ + +.vscode/ From 220d6b0134d1f7eb38a4738625ddbeea81033c5e Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Tue, 17 Mar 2026 16:01:25 +0100 Subject: [PATCH 02/24] fix: run ingestion in executor to avoid blocking event loop --- .../url_handler/ingestion_web_page_scrape_url_handler.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/wordlift_sdk/workflow/url_handler/ingestion_web_page_scrape_url_handler.py b/wordlift_sdk/workflow/url_handler/ingestion_web_page_scrape_url_handler.py index a228e26..6c7a719 100644 --- a/wordlift_sdk/workflow/url_handler/ingestion_web_page_scrape_url_handler.py +++ b/wordlift_sdk/workflow/url_handler/ingestion_web_page_scrape_url_handler.py @@ -1,5 +1,7 @@ from __future__ import annotations +import asyncio +import functools import json import logging import re @@ -43,7 +45,10 @@ def __init__( async def __call__(self, url: Url) -> None: settings = self._build_settings(url) - result = run_ingestion(settings) + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, functools.partial(run_ingestion, settings) + ) if not result.pages: failed = [ From 4c41347aa3a9e9c78bb1ed3e9799a0104fdafb6b Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Tue, 17 Mar 2026 16:05:45 +0100 Subject: [PATCH 03/24] fix: offload postprocessors and validation to executor to prevent blocking event loop --- wordlift_sdk/kg_build/protocol.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index fd8ddc0..6bf88b8 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import functools import hashlib import logging import os @@ -95,6 +96,7 @@ def __init__( self._mapping_cache: dict[Path, str] = {} self._static_templates_patched = False self._static_templates_lock = asyncio.Lock() + self._postprocessor_lock = asyncio.Lock() canonical_id_strategy = ( str( self.profile.settings.get( @@ -209,7 +211,18 @@ async def callback( if existing_web_page_id: self._reconcile_root_id(graph, existing_web_page_id) - graph = self._apply_postprocessors(graph, url, response, existing_web_page_id) + loop = asyncio.get_event_loop() + async with self._postprocessor_lock: + graph = await loop.run_in_executor( + None, + functools.partial( + self._apply_postprocessors, + graph, + url, + response, + existing_web_page_id, + ), + ) # Canonical IDs must run after custom postprocessors so any nodes minted # by local logic are normalized before graph sync patching. graph = self._core_ids.process_graph( @@ -225,7 +238,9 @@ async def callback( ) self._write_debug_graph(graph, url) - validation_payload = self._validate_graph_if_enabled(graph, url) + validation_payload = await loop.run_in_executor( + None, functools.partial(self._validate_graph_if_enabled, graph, url) + ) graph_metrics = self._kpi.graph_metrics(graph) self._emit_progress( { @@ -284,8 +299,14 @@ async def _patch_static_templates_once(self) -> None: self._ensure_templates_loaded() if self._template_graph and len(self._template_graph) > 0: - validation_payload = self._validate_graph_if_enabled( - self._template_graph, "static_templates" + _loop = asyncio.get_event_loop() + validation_payload = await _loop.run_in_executor( + None, + functools.partial( + self._validate_graph_if_enabled, + self._template_graph, + "static_templates", + ), ) self._emit_progress( { From 00e2a46a89259fe7329771dfc1ddbc81089decac Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Tue, 17 Mar 2026 16:39:24 +0100 Subject: [PATCH 04/24] feat: use postprocessor pool for true concurrent processing --- wordlift_sdk/kg_build/protocol.py | 57 +++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index 6bf88b8..9aaecb4 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -96,7 +96,6 @@ def __init__( self._mapping_cache: dict[Path, str] = {} self._static_templates_patched = False self._static_templates_lock = asyncio.Lock() - self._postprocessor_lock = asyncio.Lock() canonical_id_strategy = ( str( self.profile.settings.get( @@ -117,11 +116,25 @@ def __init__( self._postprocessor_runtime, self.profile.origins.get("postprocessor_runtime", "default"), ) - self._postprocessors = load_postprocessors_for_profile( - root_dir=self.root_dir, - profile_name=self.profile.name, - runtime=self._postprocessor_runtime, + _pool_size = int( + self.profile.settings.get( + "concurrency", self.profile.settings.get("CONCURRENCY", 4) + ) ) + logger.info( + "Postprocessor pool size for profile '%s': %d", + self.profile.name, + _pool_size, + ) + self._postprocessors_queue: asyncio.Queue = asyncio.Queue() + for _ in range(_pool_size): + self._postprocessors_queue.put_nowait( + load_postprocessors_for_profile( + root_dir=self.root_dir, + profile_name=self.profile.name, + runtime=self._postprocessor_runtime, + ) + ) self._shacl_mode = self._resolve_validation_mode( self.profile.settings.get( "shacl_validate_mode", @@ -212,17 +225,21 @@ async def callback( if existing_web_page_id: self._reconcile_root_id(graph, existing_web_page_id) loop = asyncio.get_event_loop() - async with self._postprocessor_lock: + _postprocessors = await self._postprocessors_queue.get() + try: graph = await loop.run_in_executor( None, functools.partial( - self._apply_postprocessors, + self._apply_postprocessors_with, graph, url, response, existing_web_page_id, + _postprocessors, ), ) + finally: + self._postprocessors_queue.put_nowait(_postprocessors) # Canonical IDs must run after custom postprocessors so any nodes minted # by local logic are normalized before graph sync patching. graph = self._core_ids.process_graph( @@ -262,7 +279,11 @@ async def callback( logger.info("Wrote %s triples for %s", len(graph), url) def close(self) -> None: - close_loaded_postprocessors(self._postprocessors) + while not self._postprocessors_queue.empty(): + try: + close_loaded_postprocessors(self._postprocessors_queue.get_nowait()) + except asyncio.QueueEmpty: + break def get_kpi_summary(self) -> dict[str, object]: return self._kpi.summary(self.profile.name) @@ -485,7 +506,23 @@ def _apply_postprocessors( response: WebPageScrapeResponse, existing_web_page_id: str | None, ) -> Graph: - if not self._postprocessors: + return self._apply_postprocessors_with( + graph, + url, + response, + existing_web_page_id, + list(self._postprocessors_queue._queue), # type: ignore[attr-defined] + ) + + def _apply_postprocessors_with( + self, + graph: Graph, + url: str, + response: WebPageScrapeResponse, + existing_web_page_id: str | None, + postprocessors: list, + ) -> Graph: + if not postprocessors: return graph pp_context = self._build_pp_context(url, response, existing_web_page_id) @@ -495,7 +532,7 @@ def _apply_postprocessors( "'api_key', WORDLIFT_KEY, or WORDLIFT_API_KEY." ) - for processor in self._postprocessors: + for processor in postprocessors: graph = processor.run(graph, pp_context) logger.info("Applied postprocessor '%s' for %s", processor.name, url) return graph From 3d319402ba72fbf3668dff74c9b0e6ab6ee1af86 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Tue, 17 Mar 2026 17:12:06 +0100 Subject: [PATCH 05/24] fix: increase postprocessor startup timeout from 10s to 60s --- wordlift_sdk/kg_build/postprocessors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wordlift_sdk/kg_build/postprocessors.py b/wordlift_sdk/kg_build/postprocessors.py index b67de6b..a41af15 100644 --- a/wordlift_sdk/kg_build/postprocessors.py +++ b/wordlift_sdk/kg_build/postprocessors.py @@ -181,7 +181,7 @@ def _ensure_started(self) -> subprocess.Popen[str]: try: ready = self._read_message( - process, timeout_seconds=min(self._spec.timeout_seconds, 10) + process, timeout_seconds=min(self._spec.timeout_seconds, 60) ) except Exception: self._terminate(process) From 87367f59ce9f06884ca4df5f1fff364f8d332511 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Tue, 17 Mar 2026 17:41:51 +0100 Subject: [PATCH 06/24] debug: add timing instrumentation to mapping, postprocessor, and validation stages --- wordlift_sdk/kg_build/protocol.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index 9aaecb4..4b63047 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -6,6 +6,7 @@ import logging import os import tempfile +import time from dataclasses import asdict from pathlib import Path from types import SimpleNamespace @@ -210,6 +211,7 @@ async def callback( mapping_response = self._mapping_response(response, existing_web_page_id) debug_output: dict[str, str] | None = {} if self.debug_dir else None + _t0 = time.perf_counter() graph = await self.rml_service.apply_mapping( html=response.web_page.html, url=url, @@ -218,6 +220,7 @@ async def callback( response=mapping_response, debug_output=debug_output, ) + _t_mapping = int((time.perf_counter() - _t0) * 1000) if not graph or len(graph) == 0: logger.warning("No triples produced for %s", url) return @@ -225,8 +228,11 @@ async def callback( if existing_web_page_id: self._reconcile_root_id(graph, existing_web_page_id) loop = asyncio.get_event_loop() + _t1 = time.perf_counter() _postprocessors = await self._postprocessors_queue.get() + _t_queue_wait = int((time.perf_counter() - _t1) * 1000) try: + _t2 = time.perf_counter() graph = await loop.run_in_executor( None, functools.partial( @@ -238,6 +244,7 @@ async def callback( _postprocessors, ), ) + _t_postprocessors = int((time.perf_counter() - _t2) * 1000) finally: self._postprocessors_queue.put_nowait(_postprocessors) # Canonical IDs must run after custom postprocessors so any nodes minted @@ -255,9 +262,11 @@ async def callback( ) self._write_debug_graph(graph, url) + _t3 = time.perf_counter() validation_payload = await loop.run_in_executor( None, functools.partial(self._validate_graph_if_enabled, graph, url) ) + _t_validation = int((time.perf_counter() - _t3) * 1000) graph_metrics = self._kpi.graph_metrics(graph) self._emit_progress( { @@ -276,7 +285,15 @@ async def callback( ): raise RuntimeError(f"SHACL validation failed for {url} in fail mode.") await self._write_graph(graph) - logger.info("Wrote %s triples for %s", len(graph), url) + logger.info( + "Wrote %s triples for %s [mapping=%dms queue_wait=%dms postprocessors=%dms validation=%dms]", + len(graph), + url, + _t_mapping, + _t_queue_wait, + _t_postprocessors, + _t_validation, + ) def close(self) -> None: while not self._postprocessors_queue.empty(): @@ -533,8 +550,14 @@ def _apply_postprocessors_with( ) for processor in postprocessors: + _tp = time.perf_counter() graph = processor.run(graph, pp_context) - logger.info("Applied postprocessor '%s' for %s", processor.name, url) + logger.info( + "Applied postprocessor '%s' for %s [%dms]", + processor.name, + url, + int((time.perf_counter() - _tp) * 1000), + ) return graph def _build_pp_context( From b7a47458891791c4ecaf0fb27591afa5d8938343 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 09:03:14 +0100 Subject: [PATCH 07/24] refactor: pre-load SHACL and validate in-memory to avoid I/O --- wordlift_sdk/kg_build/protocol.py | 60 ++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index 4b63047..d6b7c65 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -5,7 +5,7 @@ import hashlib import logging import os -import tempfile + import time from dataclasses import asdict from pathlib import Path @@ -19,10 +19,13 @@ from wordlift_sdk.protocol.web_page_import_protocol import ( WebPageImportProtocolInterface, ) +from pyshacl import validate as pyshacl_validate +from rdflib.namespace import SH from wordlift_sdk.validation.shacl import ( ValidationResult, + _load_shapes_graph, + _normalize_schema_org_uris, resolve_shape_specs, - validate_file, ) from .config import ProfileDefinition @@ -164,6 +167,24 @@ def __init__( exclude_builtin_shapes=shacl_exclude_builtin_shapes or None, extra_shapes=shacl_extra_shapes or None, ) + _shacl_validate_mode_for_preload = self._resolve_validation_mode( + self.profile.settings.get( + "shacl_validate_mode", + self.profile.settings.get("SHACL_VALIDATE_MODE", "warn"), + ) + ) + if _shacl_validate_mode_for_preload != "off": + self._shacl_shapes_graph, self._shacl_source_map = _load_shapes_graph( + self._shacl_shape_specs if self._shacl_shape_specs else None + ) + logger.info( + "Pre-loaded %d SHACL shape triples for profile '%s'", + len(self._shacl_shapes_graph), + self.profile.name, + ) + else: + self._shacl_shapes_graph = None + self._shacl_source_map = {} self._import_hash_mode = self._resolve_import_hash_mode( self.profile.settings.get( "import_hash_mode", @@ -745,21 +766,26 @@ def _validate_graph_if_enabled( return summary def _validate_graph(self, graph: Graph) -> ValidationResult: - with tempfile.NamedTemporaryFile(mode="w", suffix=".ttl", delete=False) as f: - tmp = Path(f.name) - try: - graph.serialize(destination=tmp, format="turtle") - return validate_file( - str(tmp), - shape_specs=self._shacl_shape_specs - if self._shacl_shape_specs - else None, - ) - finally: - try: - tmp.unlink(missing_ok=True) - except Exception: - logger.debug("Failed to remove temporary SHACL graph file: %s", tmp) + data_graph = _normalize_schema_org_uris(graph) + conforms, report_graph, report_text = pyshacl_validate( + data_graph, + shacl_graph=self._shacl_shapes_graph, + inference="rdfs", + abort_on_first=False, + allow_infos=True, + allow_warnings=True, + ) + warning_count = sum( + 1 for _ in report_graph.subjects(SH.resultSeverity, SH.Warning) + ) + return ValidationResult( + conforms=conforms, + report_text=report_text, + report_graph=report_graph, + data_graph=data_graph, + shape_source_map=self._shacl_source_map, + warning_count=warning_count, + ) def _summarize_validation(self, result: ValidationResult) -> dict[str, Any]: sh = URIRef("http://www.w3.org/ns/shacl#") From 8a588fd3363fad37461ada1db9bcbaebce7a823d Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 10:38:21 +0100 Subject: [PATCH 08/24] feat: run SHACL validation in a process pool to bypass GIL and parallelize across CPUs --- wordlift_sdk/kg_build/protocol.py | 119 ++++++++++++++++++++++++------ 1 file changed, 95 insertions(+), 24 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index d6b7c65..4149cc3 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -5,8 +5,8 @@ import hashlib import logging import os - import time +from concurrent.futures import ProcessPoolExecutor from dataclasses import asdict from pathlib import Path from types import SimpleNamespace @@ -50,6 +50,59 @@ def _path_contains_part(path: str, part: str) -> bool: return part in Path(path).parts +# Module-level state for SHACL worker processes (one copy per process) +_shacl_worker_shapes_graph: Graph | None = None +_shacl_worker_source_map: dict = {} + + +def _init_shacl_worker(shape_specs: list[str] | None) -> None: + global _shacl_worker_shapes_graph, _shacl_worker_source_map + _shacl_worker_shapes_graph, _shacl_worker_source_map = _load_shapes_graph( + shape_specs + ) + + +def _shacl_validate_in_worker(ntriples: str) -> dict: + data_graph = Graph() + data_graph.parse(data=ntriples, format="nt") + data_graph = _normalize_schema_org_uris(data_graph) + conforms, report_graph, _ = pyshacl_validate( + data_graph, + shacl_graph=_shacl_worker_shapes_graph, + inference="rdfs", + abort_on_first=False, + allow_infos=True, + allow_warnings=True, + ) + warning_sources: dict[str, int] = {} + error_sources: dict[str, int] = {} + warning_count = 0 + error_count = 0 + for node in report_graph.subjects(SH.resultSeverity, SH.Warning): + warning_count += 1 + shape = next(report_graph.objects(node, SH.sourceShape), None) + label = _shacl_worker_source_map.get(shape, "unknown") + warning_sources[str(label)] = warning_sources.get(str(label), 0) + 1 + for node in report_graph.subjects(SH.resultSeverity, SH.Violation): + error_count += 1 + shape = next(report_graph.objects(node, SH.sourceShape), None) + label = _shacl_worker_source_map.get(shape, "unknown") + error_sources[str(label)] = error_sources.get(str(label), 0) + 1 + return { + "total": 1, + "pass": bool(conforms), + "fail": not bool(conforms), + "warnings": { + "count": warning_count, + "sources": dict(sorted(warning_sources.items())), + }, + "errors": { + "count": error_count, + "sources": dict(sorted(error_sources.items())), + }, + } + + def _resolve_postprocessor_runtime(settings: dict[str, Any]) -> str: value = settings.get("postprocessor_runtime") if value is None: @@ -167,24 +220,21 @@ def __init__( exclude_builtin_shapes=shacl_exclude_builtin_shapes or None, extra_shapes=shacl_extra_shapes or None, ) - _shacl_validate_mode_for_preload = self._resolve_validation_mode( - self.profile.settings.get( - "shacl_validate_mode", - self.profile.settings.get("SHACL_VALIDATE_MODE", "warn"), - ) - ) - if _shacl_validate_mode_for_preload != "off": - self._shacl_shapes_graph, self._shacl_source_map = _load_shapes_graph( - self._shacl_shape_specs if self._shacl_shape_specs else None + if self._shacl_mode != "off": + self._process_executor: ProcessPoolExecutor | None = ProcessPoolExecutor( + max_workers=_pool_size, + initializer=_init_shacl_worker, + initargs=( + self._shacl_shape_specs if self._shacl_shape_specs else None, + ), ) logger.info( - "Pre-loaded %d SHACL shape triples for profile '%s'", - len(self._shacl_shapes_graph), + "Created SHACL process pool with %d workers for profile '%s'", + _pool_size, self.profile.name, ) else: - self._shacl_shapes_graph = None - self._shacl_source_map = {} + self._process_executor = None self._import_hash_mode = self._resolve_import_hash_mode( self.profile.settings.get( "import_hash_mode", @@ -284,9 +334,7 @@ async def callback( self._write_debug_graph(graph, url) _t3 = time.perf_counter() - validation_payload = await loop.run_in_executor( - None, functools.partial(self._validate_graph_if_enabled, graph, url) - ) + validation_payload = await self._async_validate_if_enabled(loop, graph, url) _t_validation = int((time.perf_counter() - _t3) * 1000) graph_metrics = self._kpi.graph_metrics(graph) self._emit_progress( @@ -322,6 +370,8 @@ def close(self) -> None: close_loaded_postprocessors(self._postprocessors_queue.get_nowait()) except asyncio.QueueEmpty: break + if self._process_executor is not None: + self._process_executor.shutdown(wait=False) def get_kpi_summary(self) -> dict[str, object]: return self._kpi.summary(self.profile.name) @@ -359,13 +409,8 @@ async def _patch_static_templates_once(self) -> None: self._ensure_templates_loaded() if self._template_graph and len(self._template_graph) > 0: _loop = asyncio.get_event_loop() - validation_payload = await _loop.run_in_executor( - None, - functools.partial( - self._validate_graph_if_enabled, - self._template_graph, - "static_templates", - ), + validation_payload = await self._async_validate_if_enabled( + _loop, self._template_graph, "static_templates" ) self._emit_progress( { @@ -742,6 +787,32 @@ def _mapping_response( web_page=response.web_page, ) + async def _async_validate_if_enabled( + self, loop: Any, graph: Graph, url: str + ) -> dict[str, Any] | None: + if self._shacl_mode == "off": + return None + ntriples = graph.serialize(format="nt") + summary = await loop.run_in_executor( + self._process_executor, + functools.partial(_shacl_validate_in_worker, ntriples), + ) + self._kpi.record_validation( + passed=summary["pass"], + warning_count=summary["warnings"]["count"], + error_count=summary["errors"]["count"], + warning_sources=summary["warnings"]["sources"], + error_sources=summary["errors"]["sources"], + ) + logger.info( + "SHACL validation for %s: pass=%s warnings=%s errors=%s", + url, + summary["pass"], + summary["warnings"]["count"], + summary["errors"]["count"], + ) + return summary + def _validate_graph_if_enabled( self, graph: Graph, url: str ) -> dict[str, Any] | None: From 65867cb52743aa28730b8c92626b9b83b94481e3 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 10:59:56 +0100 Subject: [PATCH 09/24] feat: add separate pool size settings for postprocessors and SHACL validation --- wordlift_sdk/kg_build/protocol.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index 4149cc3..a32238c 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -178,13 +178,20 @@ def __init__( "concurrency", self.profile.settings.get("CONCURRENCY", 4) ) ) + _pp_pool_size = int( + self.profile.settings.get( + "postprocessor_pool_size", + self.profile.settings.get("POSTPROCESSOR_POOL_SIZE", _pool_size), + ) + ) logger.info( - "Postprocessor pool size for profile '%s': %d", + "Postprocessor pool size for profile '%s': %d (concurrency=%d)", self.profile.name, + _pp_pool_size, _pool_size, ) self._postprocessors_queue: asyncio.Queue = asyncio.Queue() - for _ in range(_pool_size): + for _ in range(_pp_pool_size): self._postprocessors_queue.put_nowait( load_postprocessors_for_profile( root_dir=self.root_dir, @@ -221,8 +228,16 @@ def __init__( extra_shapes=shacl_extra_shapes or None, ) if self._shacl_mode != "off": + _shacl_pool_size = int( + self.profile.settings.get( + "shacl_pool_size", + self.profile.settings.get( + "SHACL_POOL_SIZE", max(2, _pool_size // 2) + ), + ) + ) self._process_executor: ProcessPoolExecutor | None = ProcessPoolExecutor( - max_workers=_pool_size, + max_workers=_shacl_pool_size, initializer=_init_shacl_worker, initargs=( self._shacl_shape_specs if self._shacl_shape_specs else None, @@ -230,7 +245,7 @@ def __init__( ) logger.info( "Created SHACL process pool with %d workers for profile '%s'", - _pool_size, + _shacl_pool_size, self.profile.name, ) else: From 90492abde8d60a739ebab0c782034f0b7e690850 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 11:44:53 +0100 Subject: [PATCH 10/24] feat: track SHACL process pool queue wait and execution time separately in timing log --- wordlift_sdk/kg_build/protocol.py | 49 ++++++++++++++++++------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index a32238c..989d889 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -62,7 +62,9 @@ def _init_shacl_worker(shape_specs: list[str] | None) -> None: ) -def _shacl_validate_in_worker(ntriples: str) -> dict: +def _shacl_validate_in_worker(ntriples: str, submit_time: float) -> dict: + _queue_wait_ms = int((time.time() - submit_time) * 1000) + _t_start = time.perf_counter() data_graph = Graph() data_graph.parse(data=ntriples, format="nt") data_graph = _normalize_schema_org_uris(data_graph) @@ -100,6 +102,8 @@ def _shacl_validate_in_worker(ntriples: str) -> dict: "count": error_count, "sources": dict(sorted(error_sources.items())), }, + "_queue_wait_ms": _queue_wait_ms, + "_validation_ms": int((time.perf_counter() - _t_start) * 1000), } @@ -348,9 +352,11 @@ async def callback( ) self._write_debug_graph(graph, url) - _t3 = time.perf_counter() - validation_payload = await self._async_validate_if_enabled(loop, graph, url) - _t_validation = int((time.perf_counter() - _t3) * 1000) + ( + validation_payload, + _t_validation_wait, + _t_validation_actual, + ) = await self._async_validate_if_enabled(loop, graph, url) graph_metrics = self._kpi.graph_metrics(graph) self._emit_progress( { @@ -370,13 +376,14 @@ async def callback( raise RuntimeError(f"SHACL validation failed for {url} in fail mode.") await self._write_graph(graph) logger.info( - "Wrote %s triples for %s [mapping=%dms queue_wait=%dms postprocessors=%dms validation=%dms]", + "Wrote %s triples for %s [mapping=%dms postprocessor_wait=%dms postprocessors=%dms validation_wait=%dms validation=%dms]", len(graph), url, _t_mapping, _t_queue_wait, _t_postprocessors, - _t_validation, + _t_validation_wait, + _t_validation_actual, ) def close(self) -> None: @@ -424,7 +431,7 @@ async def _patch_static_templates_once(self) -> None: self._ensure_templates_loaded() if self._template_graph and len(self._template_graph) > 0: _loop = asyncio.get_event_loop() - validation_payload = await self._async_validate_if_enabled( + validation_payload, _, _ = await self._async_validate_if_enabled( _loop, self._template_graph, "static_templates" ) self._emit_progress( @@ -804,29 +811,31 @@ def _mapping_response( async def _async_validate_if_enabled( self, loop: Any, graph: Graph, url: str - ) -> dict[str, Any] | None: + ) -> tuple[dict[str, Any] | None, int, int]: if self._shacl_mode == "off": - return None + return None, 0, 0 ntriples = graph.serialize(format="nt") - summary = await loop.run_in_executor( + result = await loop.run_in_executor( self._process_executor, - functools.partial(_shacl_validate_in_worker, ntriples), + functools.partial(_shacl_validate_in_worker, ntriples, time.time()), ) + validation_queue_wait_ms = result.pop("_queue_wait_ms", 0) + validation_ms = result.pop("_validation_ms", 0) self._kpi.record_validation( - passed=summary["pass"], - warning_count=summary["warnings"]["count"], - error_count=summary["errors"]["count"], - warning_sources=summary["warnings"]["sources"], - error_sources=summary["errors"]["sources"], + passed=result["pass"], + warning_count=result["warnings"]["count"], + error_count=result["errors"]["count"], + warning_sources=result["warnings"]["sources"], + error_sources=result["errors"]["sources"], ) logger.info( "SHACL validation for %s: pass=%s warnings=%s errors=%s", url, - summary["pass"], - summary["warnings"]["count"], - summary["errors"]["count"], + result["pass"], + result["warnings"]["count"], + result["errors"]["count"], ) - return summary + return result, validation_queue_wait_ms, validation_ms def _validate_graph_if_enabled( self, graph: Graph, url: str From 24ea3bda01e10d5097479b729e7259b4a0fd1be6 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 12:13:08 +0100 Subject: [PATCH 11/24] feat: add inprocess postprocessor runtime for running processors in the same process --- wordlift_sdk/kg_build/postprocessors.py | 65 +++++++++++++++++-------- 1 file changed, 45 insertions(+), 20 deletions(-) diff --git a/wordlift_sdk/kg_build/postprocessors.py b/wordlift_sdk/kg_build/postprocessors.py index a41af15..4e5a079 100644 --- a/wordlift_sdk/kg_build/postprocessors.py +++ b/wordlift_sdk/kg_build/postprocessors.py @@ -1,5 +1,8 @@ from __future__ import annotations +import asyncio +import importlib +import inspect import json import logging import select @@ -21,6 +24,7 @@ _RUNTIME_ONESHOT = "oneshot" _RUNTIME_PERSISTENT = "persistent" +_RUNTIME_INPROCESS = "inprocess" @dataclass(frozen=True) @@ -373,6 +377,23 @@ def _run_persistent( ) +@dataclass(frozen=True) +class InProcessPostprocessor: + class_path: str + + def process_graph( + self, graph: Graph, context: PostprocessorContext + ) -> Graph | None: + module_name, class_name = self.class_path.split(":", 1) + module = importlib.import_module(module_name) + klass = getattr(module, class_name) + processor = klass() + result = processor.process_graph(graph, context) + if inspect.isawaitable(result): + result = asyncio.run(result) + return result + + def _as_bool(value: Any, default: bool) -> bool: if value is None: return default @@ -399,8 +420,10 @@ def _as_positive_int(value: Any, default: int) -> int: def _normalize_runtime(value: str | None) -> str: runtime = (value or _RUNTIME_ONESHOT).strip().lower() - if runtime not in {_RUNTIME_ONESHOT, _RUNTIME_PERSISTENT}: - raise ValueError("POSTPROCESSOR_RUNTIME must be one of: oneshot, persistent.") + if runtime not in {_RUNTIME_ONESHOT, _RUNTIME_PERSISTENT, _RUNTIME_INPROCESS}: + raise ValueError( + "POSTPROCESSOR_RUNTIME must be one of: oneshot, persistent, inprocess." + ) return runtime @@ -510,16 +533,17 @@ def load_postprocessors_for_profile( for spec in specs: if not spec.enabled: continue - loaded.append( - LoadedPostprocessor( - name=spec.class_path, - handler=SubprocessPostprocessor( - spec=spec, - root_dir=root_dir, - runtime=resolved_runtime, - ), + if resolved_runtime == _RUNTIME_INPROCESS: + handler: GraphPostprocessor = InProcessPostprocessor( + class_path=spec.class_path ) - ) + else: + handler = SubprocessPostprocessor( + spec=spec, + root_dir=root_dir, + runtime=resolved_runtime, + ) + loaded.append(LoadedPostprocessor(name=spec.class_path, handler=handler)) logger.info( "Loaded %s postprocessors for profile '%s' from manifest: %s (runtime=%s)", @@ -550,16 +574,17 @@ def load_postprocessors( for spec in specs: if not spec.enabled: continue - loaded.append( - LoadedPostprocessor( - name=spec.class_path, - handler=SubprocessPostprocessor( - spec=spec, - root_dir=root_dir, - runtime=resolved_runtime, - ), + if resolved_runtime == _RUNTIME_INPROCESS: + handler: GraphPostprocessor = InProcessPostprocessor( + class_path=spec.class_path ) - ) + else: + handler = SubprocessPostprocessor( + spec=spec, + root_dir=root_dir, + runtime=resolved_runtime, + ) + loaded.append(LoadedPostprocessor(name=spec.class_path, handler=handler)) return loaded From 9630bca4cafa0753a38071d355596ff6b98c1567 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 12:32:08 +0100 Subject: [PATCH 12/24] feat: run postprocessors on a dedicated thread pool instead of the default executor --- wordlift_sdk/kg_build/protocol.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index 989d889..69e17e7 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -6,7 +6,7 @@ import logging import os import time -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import asdict from pathlib import Path from types import SimpleNamespace @@ -194,6 +194,9 @@ def __init__( _pp_pool_size, _pool_size, ) + self._pp_executor = ThreadPoolExecutor( + max_workers=_pp_pool_size, thread_name_prefix="worai_pp" + ) self._postprocessors_queue: asyncio.Queue = asyncio.Queue() for _ in range(_pp_pool_size): self._postprocessors_queue.put_nowait( @@ -324,7 +327,7 @@ async def callback( try: _t2 = time.perf_counter() graph = await loop.run_in_executor( - None, + self._pp_executor, functools.partial( self._apply_postprocessors_with, graph, @@ -392,6 +395,7 @@ def close(self) -> None: close_loaded_postprocessors(self._postprocessors_queue.get_nowait()) except asyncio.QueueEmpty: break + self._pp_executor.shutdown(wait=False) if self._process_executor is not None: self._process_executor.shutdown(wait=False) From 60c407b2d82191ddf774885a69b9d4bcd573737b Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 12:54:15 +0100 Subject: [PATCH 13/24] fix: handle SHACL process pool timeout and broken executor errors gracefully --- wordlift_sdk/kg_build/protocol.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index 69e17e7..a7fcaf6 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -6,6 +6,7 @@ import logging import os import time +import concurrent.futures from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import asdict from pathlib import Path @@ -819,10 +820,22 @@ async def _async_validate_if_enabled( if self._shacl_mode == "off": return None, 0, 0 ntriples = graph.serialize(format="nt") - result = await loop.run_in_executor( - self._process_executor, - functools.partial(_shacl_validate_in_worker, ntriples, time.time()), - ) + try: + result = await asyncio.wait_for( + loop.run_in_executor( + self._process_executor, + functools.partial(_shacl_validate_in_worker, ntriples, time.time()), + ), + timeout=120.0, + ) + except (asyncio.TimeoutError, concurrent.futures.BrokenExecutor) as exc: + logger.warning( + "SHACL validation skipped for %s: %s (%s)", + url, + type(exc).__name__, + exc, + ) + return None, 0, 0 validation_queue_wait_ms = result.pop("_queue_wait_ms", 0) validation_ms = result.pop("_validation_ms", 0) self._kpi.record_validation( From 5b7c6157779f2366b621a46df6ff17ad67c6bc51 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 13:51:58 +0100 Subject: [PATCH 14/24] fix: offload graph hashing to executor to avoid blocking the event loop --- wordlift_sdk/protocol/graph/graph_queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wordlift_sdk/protocol/graph/graph_queue.py b/wordlift_sdk/protocol/graph/graph_queue.py index 1ad8e33..f104777 100644 --- a/wordlift_sdk/protocol/graph/graph_queue.py +++ b/wordlift_sdk/protocol/graph/graph_queue.py @@ -39,7 +39,8 @@ def __init__(self, client_configuration: Configuration): reraise=True, ) async def put(self, graph: Graph) -> None: - hash = GraphQueue.hash_graph(graph) + loop = asyncio.get_event_loop() + hash = await loop.run_in_executor(None, GraphQueue.hash_graph, graph) if hash not in self.hashes: self.hashes.add(hash) From bebf0a67ba1faf053efa7dd441502e8165cbfb86 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 13:52:47 +0100 Subject: [PATCH 15/24] fix: enable stop_after_attempt(5) retry limit on graph queue put --- wordlift_sdk/protocol/graph/graph_queue.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/wordlift_sdk/protocol/graph/graph_queue.py b/wordlift_sdk/protocol/graph/graph_queue.py index f104777..ff14016 100644 --- a/wordlift_sdk/protocol/graph/graph_queue.py +++ b/wordlift_sdk/protocol/graph/graph_queue.py @@ -8,7 +8,13 @@ from rdflib import Graph from rdflib.compare import to_isomorphic from wordlift_client import Configuration -from tenacity import retry, retry_if_exception_type, wait_fixed, after_log +from tenacity import ( + retry, + retry_if_exception_type, + wait_fixed, + after_log, + stop_after_attempt, +) logger = logging.getLogger(__name__) @@ -22,7 +28,7 @@ def __init__(self, client_configuration: Configuration): self.hashes = set() @retry( - # stop=stop_after_attempt(5), # Retry up to 5 times + stop=stop_after_attempt(5), retry=retry_if_exception_type( asyncio.TimeoutError | aiohttp.client_exceptions.ServerDisconnectedError From feaabd3272aebf273378e2dc251127301b5a424e Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 14:47:54 +0100 Subject: [PATCH 16/24] fix: disable morph_kgc internal multiprocessing to prevent fork deadlocks in threaded context --- wordlift_sdk/structured_data/engine.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/wordlift_sdk/structured_data/engine.py b/wordlift_sdk/structured_data/engine.py index 2fec73d..5107852 100644 --- a/wordlift_sdk/structured_data/engine.py +++ b/wordlift_sdk/structured_data/engine.py @@ -1351,6 +1351,10 @@ def _materialize_graph(mapping_path: Path) -> Graph: config = ( "[CONFIGURATION]\n" "output_format = N-TRIPLES\n" + # Disable morph_kgc internal multiprocessing: on Linux it uses fork() which + # deadlocks when the parent process already has threads running (asyncio pool, + # SHACL ProcessPoolExecutor). The outer pipeline handles concurrency. + "number_of_processes = 1\n" "\n" "[DataSource1]\n" f"mappings = {mapping_path}\n" From 783c7eac16038aeb60e85e34829212cfb3e546b7 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 15:01:39 +0100 Subject: [PATCH 17/24] fix: offload RML mapping to dedicated thread pool to prevent blocking the event loop --- wordlift_sdk/kg_build/protocol.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index a7fcaf6..04cb0e4 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -198,6 +198,11 @@ def __init__( self._pp_executor = ThreadPoolExecutor( max_workers=_pp_pool_size, thread_name_prefix="worai_pp" ) + # Dedicated executor for RML mapping (morph_kgc is CPU-bound and has no + # async I/O — running it directly on the event loop thread blocks everything). + self._mapping_executor = ThreadPoolExecutor( + max_workers=_pool_size, thread_name_prefix="worai_ml" + ) self._postprocessors_queue: asyncio.Queue = asyncio.Queue() for _ in range(_pp_pool_size): self._postprocessors_queue.put_nowait( @@ -306,13 +311,23 @@ async def callback( debug_output: dict[str, str] | None = {} if self.debug_dir else None _t0 = time.perf_counter() - graph = await self.rml_service.apply_mapping( - html=response.web_page.html, - url=url, - mapping_file_path=mapping_path, - mapping_content=rendered_mapping, - response=mapping_response, - debug_output=debug_output, + # apply_mapping is async def but contains no awaits — it runs morph_kgc + # (pure Python CPU work) synchronously. Running it directly on the event + # loop blocks all other coroutines for ~450ms per URL. Offload to a thread + # so the event loop stays free to schedule I/O for other concurrent URLs. + _mapping_loop = asyncio.get_event_loop() + graph = await _mapping_loop.run_in_executor( + self._mapping_executor, + lambda: asyncio.run( + self.rml_service.apply_mapping( + html=response.web_page.html, + url=url, + mapping_file_path=mapping_path, + mapping_content=rendered_mapping, + response=mapping_response, + debug_output=debug_output, + ) + ), ) _t_mapping = int((time.perf_counter() - _t0) * 1000) if not graph or len(graph) == 0: @@ -397,6 +412,7 @@ def close(self) -> None: except asyncio.QueueEmpty: break self._pp_executor.shutdown(wait=False) + self._mapping_executor.shutdown(wait=False) if self._process_executor is not None: self._process_executor.shutdown(wait=False) From 0c6b5a71497702604725db2f9bee913d69cca9f8 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 15:11:18 +0100 Subject: [PATCH 18/24] fix: serialize morph_kgc calls with a lock to prevent thread-safety issues in pyparsing --- wordlift_sdk/structured_data/engine.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/wordlift_sdk/structured_data/engine.py b/wordlift_sdk/structured_data/engine.py index 5107852..cb59f15 100644 --- a/wordlift_sdk/structured_data/engine.py +++ b/wordlift_sdk/structured_data/engine.py @@ -7,6 +7,7 @@ import json import logging import re +import threading from dataclasses import dataclass from importlib import resources from pathlib import Path @@ -28,6 +29,9 @@ from wordlift_sdk.utils.ssl_ca_bundle import resolve_ssl_ca_cert from wordlift_sdk.validation.shacl import ValidationResult, validate_file +# morph_kgc uses rdflib's SPARQL parser (pyparsing) which has global state and +# is NOT thread-safe. Serialize all morph_kgc calls with a module-level lock. +_morph_kgc_lock = threading.Lock() _SCHEMA_BASE = "https://schema.org" _SCHEMA_HTTP = "http://schema.org/" @@ -1360,7 +1364,8 @@ def _materialize_graph(mapping_path: Path) -> Graph: f"mappings = {mapping_path}\n" ) try: - return morph_kgc.materialize(config) + with _morph_kgc_lock: + return morph_kgc.materialize(config) except RuntimeError: raise except Exception as exc: From 457740e32d014a444b7945cf5f0230776a48bee5 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 15:22:20 +0100 Subject: [PATCH 19/24] Revert "fix: offload RML mapping to dedicated thread pool to prevent blocking the event loop" This reverts commit 783c7eac16038aeb60e85e34829212cfb3e546b7. --- wordlift_sdk/kg_build/protocol.py | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index 04cb0e4..a7fcaf6 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -198,11 +198,6 @@ def __init__( self._pp_executor = ThreadPoolExecutor( max_workers=_pp_pool_size, thread_name_prefix="worai_pp" ) - # Dedicated executor for RML mapping (morph_kgc is CPU-bound and has no - # async I/O — running it directly on the event loop thread blocks everything). - self._mapping_executor = ThreadPoolExecutor( - max_workers=_pool_size, thread_name_prefix="worai_ml" - ) self._postprocessors_queue: asyncio.Queue = asyncio.Queue() for _ in range(_pp_pool_size): self._postprocessors_queue.put_nowait( @@ -311,23 +306,13 @@ async def callback( debug_output: dict[str, str] | None = {} if self.debug_dir else None _t0 = time.perf_counter() - # apply_mapping is async def but contains no awaits — it runs morph_kgc - # (pure Python CPU work) synchronously. Running it directly on the event - # loop blocks all other coroutines for ~450ms per URL. Offload to a thread - # so the event loop stays free to schedule I/O for other concurrent URLs. - _mapping_loop = asyncio.get_event_loop() - graph = await _mapping_loop.run_in_executor( - self._mapping_executor, - lambda: asyncio.run( - self.rml_service.apply_mapping( - html=response.web_page.html, - url=url, - mapping_file_path=mapping_path, - mapping_content=rendered_mapping, - response=mapping_response, - debug_output=debug_output, - ) - ), + graph = await self.rml_service.apply_mapping( + html=response.web_page.html, + url=url, + mapping_file_path=mapping_path, + mapping_content=rendered_mapping, + response=mapping_response, + debug_output=debug_output, ) _t_mapping = int((time.perf_counter() - _t0) * 1000) if not graph or len(graph) == 0: @@ -412,7 +397,6 @@ def close(self) -> None: except asyncio.QueueEmpty: break self._pp_executor.shutdown(wait=False) - self._mapping_executor.shutdown(wait=False) if self._process_executor is not None: self._process_executor.shutdown(wait=False) From 4a7d68f0924c67fd54ea25a6af2ddea3546d57b7 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 15:46:37 +0100 Subject: [PATCH 20/24] perf: run morph_kgc in a subprocess pool for true parallelism without pyparsing lock contention --- wordlift_sdk/kg_build/protocol.py | 30 +++++++++++---- wordlift_sdk/structured_data/engine.py | 51 +++++++++++++++++++------- 2 files changed, 61 insertions(+), 20 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index a7fcaf6..ba4205f 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -198,6 +198,12 @@ def __init__( self._pp_executor = ThreadPoolExecutor( max_workers=_pp_pool_size, thread_name_prefix="worai_pp" ) + # Wraps apply_mapping calls so they run in a thread rather than blocking + # the asyncio event loop. The thread itself blocks on the morph_kgc + # ProcessPoolExecutor slot, leaving the event loop free for I/O. + self._mapping_executor = ThreadPoolExecutor( + max_workers=_pool_size, thread_name_prefix="worai_ml" + ) self._postprocessors_queue: asyncio.Queue = asyncio.Queue() for _ in range(_pp_pool_size): self._postprocessors_queue.put_nowait( @@ -306,13 +312,22 @@ async def callback( debug_output: dict[str, str] | None = {} if self.debug_dir else None _t0 = time.perf_counter() - graph = await self.rml_service.apply_mapping( - html=response.web_page.html, - url=url, - mapping_file_path=mapping_path, - mapping_content=rendered_mapping, - response=mapping_response, - debug_output=debug_output, + # apply_mapping has no awaits — all work is synchronous (morph_kgc). + # Run it in a thread so the event loop stays free for I/O while the + # thread waits for its morph_kgc subprocess slot to become available. + _loop = asyncio.get_event_loop() + graph = await _loop.run_in_executor( + self._mapping_executor, + lambda: asyncio.run( + self.rml_service.apply_mapping( + html=response.web_page.html, + url=url, + mapping_file_path=mapping_path, + mapping_content=rendered_mapping, + response=mapping_response, + debug_output=debug_output, + ) + ), ) _t_mapping = int((time.perf_counter() - _t0) * 1000) if not graph or len(graph) == 0: @@ -397,6 +412,7 @@ def close(self) -> None: except asyncio.QueueEmpty: break self._pp_executor.shutdown(wait=False) + self._mapping_executor.shutdown(wait=False) if self._process_executor is not None: self._process_executor.shutdown(wait=False) diff --git a/wordlift_sdk/structured_data/engine.py b/wordlift_sdk/structured_data/engine.py index cb59f15..9543330 100644 --- a/wordlift_sdk/structured_data/engine.py +++ b/wordlift_sdk/structured_data/engine.py @@ -6,8 +6,10 @@ import hashlib import json import logging +import multiprocessing +import os import re -import threading +from concurrent.futures import ProcessPoolExecutor from dataclasses import dataclass from importlib import resources from pathlib import Path @@ -29,9 +31,34 @@ from wordlift_sdk.utils.ssl_ca_bundle import resolve_ssl_ca_cert from wordlift_sdk.validation.shacl import ValidationResult, validate_file -# morph_kgc uses rdflib's SPARQL parser (pyparsing) which has global state and -# is NOT thread-safe. Serialize all morph_kgc calls with a module-level lock. -_morph_kgc_lock = threading.Lock() + +# Top-level worker — must be module-level to be picklable for ProcessPoolExecutor. +# Each subprocess has its own Python interpreter so pyparsing state is isolated; +# no lock needed and genuine parallelism is possible. +def _morph_kgc_worker(config: str) -> str: + import morph_kgc as _mkgc + + return _mkgc.materialize(config).serialize(format="nt") + + +# Lazy process pool — created on first use in the main process only. +# Worker subprocesses import this module but never call _get_morph_kgc_pool(), +# so they do NOT create their own pools (no recursive process explosion). +_morph_kgc_pool: ProcessPoolExecutor | None = None + + +def _get_morph_kgc_pool() -> ProcessPoolExecutor: + global _morph_kgc_pool + if _morph_kgc_pool is None: + # Use "spawn" context to start workers cleanly without inheriting any + # locks or file descriptors from the parent process. + ctx = multiprocessing.get_context("spawn") + _morph_kgc_pool = ProcessPoolExecutor( + max_workers=os.cpu_count() or 4, + mp_context=ctx, + ) + return _morph_kgc_pool + _SCHEMA_BASE = "https://schema.org" _SCHEMA_HTTP = "http://schema.org/" @@ -1345,13 +1372,6 @@ def _normalize_materialization_error(error: Exception) -> RuntimeError: def _materialize_graph(mapping_path: Path) -> Graph: - try: - import morph_kgc - except ImportError as exc: - raise RuntimeError( - "morph-kgc is required. Install with: pip install morph-kgc" - ) from exc - config = ( "[CONFIGURATION]\n" "output_format = N-TRIPLES\n" @@ -1364,8 +1384,13 @@ def _materialize_graph(mapping_path: Path) -> Graph: f"mappings = {mapping_path}\n" ) try: - with _morph_kgc_lock: - return morph_kgc.materialize(config) + # Submit to subprocess pool — each worker has isolated pyparsing state, + # so calls are genuinely parallel across CPU cores with no lock needed. + # .result() blocks the calling thread (not the asyncio event loop). + ntriples = _get_morph_kgc_pool().submit(_morph_kgc_worker, config).result() + graph = Graph() + graph.parse(data=ntriples, format="nt") + return graph except RuntimeError: raise except Exception as exc: From 4287c906461d36063227e3d91ca23246acb3c575 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 15:56:18 +0100 Subject: [PATCH 21/24] perf: expose morph_kgc pool size setting and track subprocess queue wait in timing log --- wordlift_sdk/kg_build/protocol.py | 17 +++++++++- wordlift_sdk/structured_data/engine.py | 43 +++++++++++++++++++++----- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index ba4205f..ba653fe 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -41,6 +41,7 @@ ) from .rml_mapping import RmlMappingService from .templates import JinjaRdfTemplateReifier, TemplateTextRenderer +from wordlift_sdk.structured_data.engine import init_morph_kgc_pool, _morph_kgc_tls logger = logging.getLogger(__name__) SEOVOC_SOURCE = URIRef("https://w3id.org/seovoc/source") @@ -198,6 +199,18 @@ def __init__( self._pp_executor = ThreadPoolExecutor( max_workers=_pp_pool_size, thread_name_prefix="worai_pp" ) + _mapping_pool_size = int( + self.profile.settings.get( + "mapping_pool_size", + self.profile.settings.get("MAPPING_POOL_SIZE", os.cpu_count() or 4), + ) + ) + logger.info( + "Mapping pool size for profile '%s': %d", + self.profile.name, + _mapping_pool_size, + ) + init_morph_kgc_pool(_mapping_pool_size) # Wraps apply_mapping calls so they run in a thread rather than blocking # the asyncio event loop. The thread itself blocks on the morph_kgc # ProcessPoolExecutor slot, leaving the event loop free for I/O. @@ -330,6 +343,7 @@ async def callback( ), ) _t_mapping = int((time.perf_counter() - _t0) * 1000) + _t_mapping_wait = getattr(_morph_kgc_tls, "mapping_wait_ms", 0) if not graph or len(graph) == 0: logger.warning("No triples produced for %s", url) return @@ -395,9 +409,10 @@ async def callback( raise RuntimeError(f"SHACL validation failed for {url} in fail mode.") await self._write_graph(graph) logger.info( - "Wrote %s triples for %s [mapping=%dms postprocessor_wait=%dms postprocessors=%dms validation_wait=%dms validation=%dms]", + "Wrote %s triples for %s [mapping_wait=%dms mapping=%dms postprocessor_wait=%dms postprocessors=%dms validation_wait=%dms validation=%dms]", len(graph), url, + _t_mapping_wait, _t_mapping, _t_queue_wait, _t_postprocessors, diff --git a/wordlift_sdk/structured_data/engine.py b/wordlift_sdk/structured_data/engine.py index 9543330..c62ad75 100644 --- a/wordlift_sdk/structured_data/engine.py +++ b/wordlift_sdk/structured_data/engine.py @@ -32,26 +32,48 @@ from wordlift_sdk.validation.shacl import ValidationResult, validate_file +import threading +import time as _time + + # Top-level worker — must be module-level to be picklable for ProcessPoolExecutor. -# Each subprocess has its own Python interpreter so pyparsing state is isolated; -# no lock needed and genuine parallelism is possible. -def _morph_kgc_worker(config: str) -> str: +# Accepts submit_time so it can measure queue wait (time spent waiting for a +# free subprocess slot). Returns (ntriples, queue_wait_ms). +def _morph_kgc_worker(config: str, submit_time: float) -> tuple[str, int]: import morph_kgc as _mkgc + import time as _t - return _mkgc.materialize(config).serialize(format="nt") + queue_wait_ms = int((_t.time() - submit_time) * 1000) + ntriples = _mkgc.materialize(config).serialize(format="nt") + return ntriples, queue_wait_ms +# Thread-local used to pass mapping_wait_ms back to the protocol layer without +# changing the return type of _materialize_graph / apply_mapping. +_morph_kgc_tls = threading.local() + # Lazy process pool — created on first use in the main process only. # Worker subprocesses import this module but never call _get_morph_kgc_pool(), # so they do NOT create their own pools (no recursive process explosion). _morph_kgc_pool: ProcessPoolExecutor | None = None +def init_morph_kgc_pool(max_workers: int) -> None: + """Pre-create the morph_kgc process pool with a specific worker count. + Call once from the protocol __init__ before any mapping work starts. + Subsequent calls are no-ops (pool is only created once). + """ + global _morph_kgc_pool + if _morph_kgc_pool is not None: + return + ctx = multiprocessing.get_context("spawn") + _morph_kgc_pool = ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx) + + def _get_morph_kgc_pool() -> ProcessPoolExecutor: global _morph_kgc_pool if _morph_kgc_pool is None: - # Use "spawn" context to start workers cleanly without inheriting any - # locks or file descriptors from the parent process. + # Fallback if init_morph_kgc_pool was never called. ctx = multiprocessing.get_context("spawn") _morph_kgc_pool = ProcessPoolExecutor( max_workers=os.cpu_count() or 4, @@ -1387,7 +1409,14 @@ def _materialize_graph(mapping_path: Path) -> Graph: # Submit to subprocess pool — each worker has isolated pyparsing state, # so calls are genuinely parallel across CPU cores with no lock needed. # .result() blocks the calling thread (not the asyncio event loop). - ntriples = _get_morph_kgc_pool().submit(_morph_kgc_worker, config).result() + ntriples, queue_wait_ms = ( + _get_morph_kgc_pool() + .submit(_morph_kgc_worker, config, _time.time()) + .result() + ) + # Store wait time in thread-local so protocol.py can read it without + # changing the return type of this function. + _morph_kgc_tls.mapping_wait_ms = queue_wait_ms graph = Graph() graph.parse(data=ntriples, format="nt") return graph From d6740161a88fb1cd936d2104a1f4afd08cf63248 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 16:46:42 +0100 Subject: [PATCH 22/24] fix: read morph_kgc queue wait from worker thread via closure to avoid thread-local race --- wordlift_sdk/kg_build/protocol.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index ba653fe..5d8bcfe 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -328,10 +328,13 @@ async def callback( # apply_mapping has no awaits — all work is synchronous (morph_kgc). # Run it in a thread so the event loop stays free for I/O while the # thread waits for its morph_kgc subprocess slot to become available. - _loop = asyncio.get_event_loop() - graph = await _loop.run_in_executor( - self._mapping_executor, - lambda: asyncio.run( + # _morph_kgc_tls is thread-local: capture it inside the worker thread + # and pass the value back via a closure dict. + _timing: dict[str, int] = {} + + def _run_mapping() -> Graph | None: + _t_start = time.perf_counter() + result = asyncio.run( self.rml_service.apply_mapping( html=response.web_page.html, url=url, @@ -340,10 +343,18 @@ async def callback( response=mapping_response, debug_output=debug_output, ) - ), - ) - _t_mapping = int((time.perf_counter() - _t0) * 1000) - _t_mapping_wait = getattr(_morph_kgc_tls, "mapping_wait_ms", 0) + ) + mw = getattr(_morph_kgc_tls, "mapping_wait_ms", 0) + _timing["mapping_wait_ms"] = mw + # Subtract queue-wait so mapping= shows actual execution time only, + # consistent with how validation_wait/validation are reported. + _timing["mapping_ms"] = int((time.perf_counter() - _t_start) * 1000) - mw + return result + + _loop = asyncio.get_event_loop() + graph = await _loop.run_in_executor(self._mapping_executor, _run_mapping) + _t_mapping = _timing.get("mapping_ms", int((time.perf_counter() - _t0) * 1000)) + _t_mapping_wait = _timing.get("mapping_wait_ms", 0) if not graph or len(graph) == 0: logger.warning("No triples produced for %s", url) return From dc22bece838798c39c1915d7cf5d3f6fdaad3fd2 Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 17:10:13 +0100 Subject: [PATCH 23/24] perf: reuse a single persistent ApiClient across requests instead of creating one per graph --- wordlift_sdk/protocol/graph/graph_queue.py | 53 +++++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/wordlift_sdk/protocol/graph/graph_queue.py b/wordlift_sdk/protocol/graph/graph_queue.py index ff14016..053360e 100644 --- a/wordlift_sdk/protocol/graph/graph_queue.py +++ b/wordlift_sdk/protocol/graph/graph_queue.py @@ -26,6 +26,37 @@ class GraphQueue: def __init__(self, client_configuration: Configuration): self.client_configuration = client_configuration self.hashes = set() + self._api_client: wordlift_client.ApiClient | None = None + self._api_client_lock: asyncio.Lock | None = None + + async def _get_api_client(self) -> wordlift_client.ApiClient: + # Lazy-init the lock (must be created on the event loop). + if self._api_client_lock is None: + self._api_client_lock = asyncio.Lock() + if self._api_client is not None: + return self._api_client + async with self._api_client_lock: + if self._api_client is None: + # ApiClient.__init__ calls ssl.create_default_context() synchronously. + # Run it in a thread so the event loop isn't blocked during cert loading. + loop = asyncio.get_event_loop() + client = await loop.run_in_executor( + None, + lambda: wordlift_client.ApiClient( + configuration=self.client_configuration + ), + ) + await client.__aenter__() + self._api_client = client + return self._api_client + + async def close(self) -> None: + if self._api_client is not None: + try: + await self._api_client.__aexit__(None, None, None) + except Exception: + pass + self._api_client = None @retry( stop=stop_after_attempt(5), @@ -50,19 +81,17 @@ async def put(self, graph: Graph) -> None: if hash not in self.hashes: self.hashes.add(hash) - async with wordlift_client.ApiClient( - configuration=self.client_configuration - ) as api_client: - api_instance = wordlift_client.EntitiesApi(api_client) + api_client = await self._get_api_client() + api_instance = wordlift_client.EntitiesApi(api_client) - try: - await api_instance.create_or_update_entities( - graph.serialize(format="turtle"), - _content_type="text/turtle", - ) - except Exception as e: - logger.error(f"Failed to create entities: {e}", exc_info=e) - raise e + try: + await api_instance.create_or_update_entities( + graph.serialize(format="turtle"), + _content_type="text/turtle", + ) + except Exception as e: + logger.error(f"Failed to create entities: {e}", exc_info=e) + raise e @staticmethod def hash_graph(graph: Graph) -> str: From 519efbff9bb6a54162d4990a2d526553960a9e7a Mon Sep 17 00:00:00 2001 From: Rubens Panfili Date: Wed, 18 Mar 2026 17:19:42 +0100 Subject: [PATCH 24/24] fix: create ApiClient directly on the event loop thread instead of in an executor --- wordlift_sdk/protocol/graph/graph_queue.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/wordlift_sdk/protocol/graph/graph_queue.py b/wordlift_sdk/protocol/graph/graph_queue.py index 053360e..56e818a 100644 --- a/wordlift_sdk/protocol/graph/graph_queue.py +++ b/wordlift_sdk/protocol/graph/graph_queue.py @@ -37,14 +37,11 @@ async def _get_api_client(self) -> wordlift_client.ApiClient: return self._api_client async with self._api_client_lock: if self._api_client is None: - # ApiClient.__init__ calls ssl.create_default_context() synchronously. - # Run it in a thread so the event loop isn't blocked during cert loading. - loop = asyncio.get_event_loop() - client = await loop.run_in_executor( - None, - lambda: wordlift_client.ApiClient( - configuration=self.client_configuration - ), + # ApiClient.__init__ calls ssl.create_default_context() synchronously + # and must run on the event loop thread (it calls asyncio internals). + # Creating it once and caching avoids repeated SSL cert loading per put(). + client = wordlift_client.ApiClient( + configuration=self.client_configuration ) await client.__aenter__() self._api_client = client