diff --git a/.gitignore b/.gitignore index 38d49af..2d0c882 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ __pycache__/ dist/ .claude/ + +.vscode/ diff --git a/wordlift_sdk/kg_build/postprocessors.py b/wordlift_sdk/kg_build/postprocessors.py index b67de6b..4e5a079 100644 --- a/wordlift_sdk/kg_build/postprocessors.py +++ b/wordlift_sdk/kg_build/postprocessors.py @@ -1,5 +1,8 @@ from __future__ import annotations +import asyncio +import importlib +import inspect import json import logging import select @@ -21,6 +24,7 @@ _RUNTIME_ONESHOT = "oneshot" _RUNTIME_PERSISTENT = "persistent" +_RUNTIME_INPROCESS = "inprocess" @dataclass(frozen=True) @@ -181,7 +185,7 @@ def _ensure_started(self) -> subprocess.Popen[str]: try: ready = self._read_message( - process, timeout_seconds=min(self._spec.timeout_seconds, 10) + process, timeout_seconds=min(self._spec.timeout_seconds, 60) ) except Exception: self._terminate(process) @@ -373,6 +377,23 @@ def _run_persistent( ) +@dataclass(frozen=True) +class InProcessPostprocessor: + class_path: str + + def process_graph( + self, graph: Graph, context: PostprocessorContext + ) -> Graph | None: + module_name, class_name = self.class_path.split(":", 1) + module = importlib.import_module(module_name) + klass = getattr(module, class_name) + processor = klass() + result = processor.process_graph(graph, context) + if inspect.isawaitable(result): + result = asyncio.run(result) + return result + + def _as_bool(value: Any, default: bool) -> bool: if value is None: return default @@ -399,8 +420,10 @@ def _as_positive_int(value: Any, default: int) -> int: def _normalize_runtime(value: str | None) -> str: runtime = (value or _RUNTIME_ONESHOT).strip().lower() - if runtime not in {_RUNTIME_ONESHOT, _RUNTIME_PERSISTENT}: - raise ValueError("POSTPROCESSOR_RUNTIME must be one of: oneshot, persistent.") + if runtime not in {_RUNTIME_ONESHOT, _RUNTIME_PERSISTENT, _RUNTIME_INPROCESS}: + raise ValueError( + "POSTPROCESSOR_RUNTIME must be one of: oneshot, persistent, inprocess." + ) return runtime @@ -510,16 +533,17 @@ def load_postprocessors_for_profile( for spec in specs: if not spec.enabled: continue - loaded.append( - LoadedPostprocessor( - name=spec.class_path, - handler=SubprocessPostprocessor( - spec=spec, - root_dir=root_dir, - runtime=resolved_runtime, - ), + if resolved_runtime == _RUNTIME_INPROCESS: + handler: GraphPostprocessor = InProcessPostprocessor( + class_path=spec.class_path ) - ) + else: + handler = SubprocessPostprocessor( + spec=spec, + root_dir=root_dir, + runtime=resolved_runtime, + ) + loaded.append(LoadedPostprocessor(name=spec.class_path, handler=handler)) logger.info( "Loaded %s postprocessors for profile '%s' from manifest: %s (runtime=%s)", @@ -550,16 +574,17 @@ def load_postprocessors( for spec in specs: if not spec.enabled: continue - loaded.append( - LoadedPostprocessor( - name=spec.class_path, - handler=SubprocessPostprocessor( - spec=spec, - root_dir=root_dir, - runtime=resolved_runtime, - ), + if resolved_runtime == _RUNTIME_INPROCESS: + handler: GraphPostprocessor = InProcessPostprocessor( + class_path=spec.class_path ) - ) + else: + handler = SubprocessPostprocessor( + spec=spec, + root_dir=root_dir, + runtime=resolved_runtime, + ) + loaded.append(LoadedPostprocessor(name=spec.class_path, handler=handler)) return loaded diff --git a/wordlift_sdk/kg_build/protocol.py b/wordlift_sdk/kg_build/protocol.py index fd8ddc0..5d8bcfe 100644 --- a/wordlift_sdk/kg_build/protocol.py +++ b/wordlift_sdk/kg_build/protocol.py @@ -1,10 +1,13 @@ from __future__ import annotations import asyncio +import functools import hashlib import logging import os -import tempfile +import time +import concurrent.futures +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import asdict from pathlib import Path from types import SimpleNamespace @@ -17,10 +20,13 @@ from wordlift_sdk.protocol.web_page_import_protocol import ( WebPageImportProtocolInterface, ) +from pyshacl import validate as pyshacl_validate +from rdflib.namespace import SH from wordlift_sdk.validation.shacl import ( ValidationResult, + _load_shapes_graph, + _normalize_schema_org_uris, resolve_shape_specs, - validate_file, ) from .config import ProfileDefinition @@ -35,6 +41,7 @@ ) from .rml_mapping import RmlMappingService from .templates import JinjaRdfTemplateReifier, TemplateTextRenderer +from wordlift_sdk.structured_data.engine import init_morph_kgc_pool, _morph_kgc_tls logger = logging.getLogger(__name__) SEOVOC_SOURCE = URIRef("https://w3id.org/seovoc/source") @@ -45,6 +52,63 @@ def _path_contains_part(path: str, part: str) -> bool: return part in Path(path).parts +# Module-level state for SHACL worker processes (one copy per process) +_shacl_worker_shapes_graph: Graph | None = None +_shacl_worker_source_map: dict = {} + + +def _init_shacl_worker(shape_specs: list[str] | None) -> None: + global _shacl_worker_shapes_graph, _shacl_worker_source_map + _shacl_worker_shapes_graph, _shacl_worker_source_map = _load_shapes_graph( + shape_specs + ) + + +def _shacl_validate_in_worker(ntriples: str, submit_time: float) -> dict: + _queue_wait_ms = int((time.time() - submit_time) * 1000) + _t_start = time.perf_counter() + data_graph = Graph() + data_graph.parse(data=ntriples, format="nt") + data_graph = _normalize_schema_org_uris(data_graph) + conforms, report_graph, _ = pyshacl_validate( + data_graph, + shacl_graph=_shacl_worker_shapes_graph, + inference="rdfs", + abort_on_first=False, + allow_infos=True, + allow_warnings=True, + ) + warning_sources: dict[str, int] = {} + error_sources: dict[str, int] = {} + warning_count = 0 + error_count = 0 + for node in report_graph.subjects(SH.resultSeverity, SH.Warning): + warning_count += 1 + shape = next(report_graph.objects(node, SH.sourceShape), None) + label = _shacl_worker_source_map.get(shape, "unknown") + warning_sources[str(label)] = warning_sources.get(str(label), 0) + 1 + for node in report_graph.subjects(SH.resultSeverity, SH.Violation): + error_count += 1 + shape = next(report_graph.objects(node, SH.sourceShape), None) + label = _shacl_worker_source_map.get(shape, "unknown") + error_sources[str(label)] = error_sources.get(str(label), 0) + 1 + return { + "total": 1, + "pass": bool(conforms), + "fail": not bool(conforms), + "warnings": { + "count": warning_count, + "sources": dict(sorted(warning_sources.items())), + }, + "errors": { + "count": error_count, + "sources": dict(sorted(error_sources.items())), + }, + "_queue_wait_ms": _queue_wait_ms, + "_validation_ms": int((time.perf_counter() - _t_start) * 1000), + } + + def _resolve_postprocessor_runtime(settings: dict[str, Any]) -> str: value = settings.get("postprocessor_runtime") if value is None: @@ -115,11 +179,53 @@ def __init__( self._postprocessor_runtime, self.profile.origins.get("postprocessor_runtime", "default"), ) - self._postprocessors = load_postprocessors_for_profile( - root_dir=self.root_dir, - profile_name=self.profile.name, - runtime=self._postprocessor_runtime, + _pool_size = int( + self.profile.settings.get( + "concurrency", self.profile.settings.get("CONCURRENCY", 4) + ) + ) + _pp_pool_size = int( + self.profile.settings.get( + "postprocessor_pool_size", + self.profile.settings.get("POSTPROCESSOR_POOL_SIZE", _pool_size), + ) + ) + logger.info( + "Postprocessor pool size for profile '%s': %d (concurrency=%d)", + self.profile.name, + _pp_pool_size, + _pool_size, + ) + self._pp_executor = ThreadPoolExecutor( + max_workers=_pp_pool_size, thread_name_prefix="worai_pp" + ) + _mapping_pool_size = int( + self.profile.settings.get( + "mapping_pool_size", + self.profile.settings.get("MAPPING_POOL_SIZE", os.cpu_count() or 4), + ) + ) + logger.info( + "Mapping pool size for profile '%s': %d", + self.profile.name, + _mapping_pool_size, ) + init_morph_kgc_pool(_mapping_pool_size) + # Wraps apply_mapping calls so they run in a thread rather than blocking + # the asyncio event loop. The thread itself blocks on the morph_kgc + # ProcessPoolExecutor slot, leaving the event loop free for I/O. + self._mapping_executor = ThreadPoolExecutor( + max_workers=_pool_size, thread_name_prefix="worai_ml" + ) + self._postprocessors_queue: asyncio.Queue = asyncio.Queue() + for _ in range(_pp_pool_size): + self._postprocessors_queue.put_nowait( + load_postprocessors_for_profile( + root_dir=self.root_dir, + profile_name=self.profile.name, + runtime=self._postprocessor_runtime, + ) + ) self._shacl_mode = self._resolve_validation_mode( self.profile.settings.get( "shacl_validate_mode", @@ -148,6 +254,29 @@ def __init__( exclude_builtin_shapes=shacl_exclude_builtin_shapes or None, extra_shapes=shacl_extra_shapes or None, ) + if self._shacl_mode != "off": + _shacl_pool_size = int( + self.profile.settings.get( + "shacl_pool_size", + self.profile.settings.get( + "SHACL_POOL_SIZE", max(2, _pool_size // 2) + ), + ) + ) + self._process_executor: ProcessPoolExecutor | None = ProcessPoolExecutor( + max_workers=_shacl_pool_size, + initializer=_init_shacl_worker, + initargs=( + self._shacl_shape_specs if self._shacl_shape_specs else None, + ), + ) + logger.info( + "Created SHACL process pool with %d workers for profile '%s'", + _shacl_pool_size, + self.profile.name, + ) + else: + self._process_executor = None self._import_hash_mode = self._resolve_import_hash_mode( self.profile.settings.get( "import_hash_mode", @@ -195,21 +324,63 @@ async def callback( mapping_response = self._mapping_response(response, existing_web_page_id) debug_output: dict[str, str] | None = {} if self.debug_dir else None - graph = await self.rml_service.apply_mapping( - html=response.web_page.html, - url=url, - mapping_file_path=mapping_path, - mapping_content=rendered_mapping, - response=mapping_response, - debug_output=debug_output, - ) + _t0 = time.perf_counter() + # apply_mapping has no awaits — all work is synchronous (morph_kgc). + # Run it in a thread so the event loop stays free for I/O while the + # thread waits for its morph_kgc subprocess slot to become available. + # _morph_kgc_tls is thread-local: capture it inside the worker thread + # and pass the value back via a closure dict. + _timing: dict[str, int] = {} + + def _run_mapping() -> Graph | None: + _t_start = time.perf_counter() + result = asyncio.run( + self.rml_service.apply_mapping( + html=response.web_page.html, + url=url, + mapping_file_path=mapping_path, + mapping_content=rendered_mapping, + response=mapping_response, + debug_output=debug_output, + ) + ) + mw = getattr(_morph_kgc_tls, "mapping_wait_ms", 0) + _timing["mapping_wait_ms"] = mw + # Subtract queue-wait so mapping= shows actual execution time only, + # consistent with how validation_wait/validation are reported. + _timing["mapping_ms"] = int((time.perf_counter() - _t_start) * 1000) - mw + return result + + _loop = asyncio.get_event_loop() + graph = await _loop.run_in_executor(self._mapping_executor, _run_mapping) + _t_mapping = _timing.get("mapping_ms", int((time.perf_counter() - _t0) * 1000)) + _t_mapping_wait = _timing.get("mapping_wait_ms", 0) if not graph or len(graph) == 0: logger.warning("No triples produced for %s", url) return if existing_web_page_id: self._reconcile_root_id(graph, existing_web_page_id) - graph = self._apply_postprocessors(graph, url, response, existing_web_page_id) + loop = asyncio.get_event_loop() + _t1 = time.perf_counter() + _postprocessors = await self._postprocessors_queue.get() + _t_queue_wait = int((time.perf_counter() - _t1) * 1000) + try: + _t2 = time.perf_counter() + graph = await loop.run_in_executor( + self._pp_executor, + functools.partial( + self._apply_postprocessors_with, + graph, + url, + response, + existing_web_page_id, + _postprocessors, + ), + ) + _t_postprocessors = int((time.perf_counter() - _t2) * 1000) + finally: + self._postprocessors_queue.put_nowait(_postprocessors) # Canonical IDs must run after custom postprocessors so any nodes minted # by local logic are normalized before graph sync patching. graph = self._core_ids.process_graph( @@ -225,7 +396,11 @@ async def callback( ) self._write_debug_graph(graph, url) - validation_payload = self._validate_graph_if_enabled(graph, url) + ( + validation_payload, + _t_validation_wait, + _t_validation_actual, + ) = await self._async_validate_if_enabled(loop, graph, url) graph_metrics = self._kpi.graph_metrics(graph) self._emit_progress( { @@ -244,10 +419,28 @@ async def callback( ): raise RuntimeError(f"SHACL validation failed for {url} in fail mode.") await self._write_graph(graph) - logger.info("Wrote %s triples for %s", len(graph), url) + logger.info( + "Wrote %s triples for %s [mapping_wait=%dms mapping=%dms postprocessor_wait=%dms postprocessors=%dms validation_wait=%dms validation=%dms]", + len(graph), + url, + _t_mapping_wait, + _t_mapping, + _t_queue_wait, + _t_postprocessors, + _t_validation_wait, + _t_validation_actual, + ) def close(self) -> None: - close_loaded_postprocessors(self._postprocessors) + while not self._postprocessors_queue.empty(): + try: + close_loaded_postprocessors(self._postprocessors_queue.get_nowait()) + except asyncio.QueueEmpty: + break + self._pp_executor.shutdown(wait=False) + self._mapping_executor.shutdown(wait=False) + if self._process_executor is not None: + self._process_executor.shutdown(wait=False) def get_kpi_summary(self) -> dict[str, object]: return self._kpi.summary(self.profile.name) @@ -284,8 +477,9 @@ async def _patch_static_templates_once(self) -> None: self._ensure_templates_loaded() if self._template_graph and len(self._template_graph) > 0: - validation_payload = self._validate_graph_if_enabled( - self._template_graph, "static_templates" + _loop = asyncio.get_event_loop() + validation_payload, _, _ = await self._async_validate_if_enabled( + _loop, self._template_graph, "static_templates" ) self._emit_progress( { @@ -464,7 +658,23 @@ def _apply_postprocessors( response: WebPageScrapeResponse, existing_web_page_id: str | None, ) -> Graph: - if not self._postprocessors: + return self._apply_postprocessors_with( + graph, + url, + response, + existing_web_page_id, + list(self._postprocessors_queue._queue), # type: ignore[attr-defined] + ) + + def _apply_postprocessors_with( + self, + graph: Graph, + url: str, + response: WebPageScrapeResponse, + existing_web_page_id: str | None, + postprocessors: list, + ) -> Graph: + if not postprocessors: return graph pp_context = self._build_pp_context(url, response, existing_web_page_id) @@ -474,9 +684,15 @@ def _apply_postprocessors( "'api_key', WORDLIFT_KEY, or WORDLIFT_API_KEY." ) - for processor in self._postprocessors: + for processor in postprocessors: + _tp = time.perf_counter() graph = processor.run(graph, pp_context) - logger.info("Applied postprocessor '%s' for %s", processor.name, url) + logger.info( + "Applied postprocessor '%s' for %s [%dms]", + processor.name, + url, + int((time.perf_counter() - _tp) * 1000), + ) return graph def _build_pp_context( @@ -640,6 +856,46 @@ def _mapping_response( web_page=response.web_page, ) + async def _async_validate_if_enabled( + self, loop: Any, graph: Graph, url: str + ) -> tuple[dict[str, Any] | None, int, int]: + if self._shacl_mode == "off": + return None, 0, 0 + ntriples = graph.serialize(format="nt") + try: + result = await asyncio.wait_for( + loop.run_in_executor( + self._process_executor, + functools.partial(_shacl_validate_in_worker, ntriples, time.time()), + ), + timeout=120.0, + ) + except (asyncio.TimeoutError, concurrent.futures.BrokenExecutor) as exc: + logger.warning( + "SHACL validation skipped for %s: %s (%s)", + url, + type(exc).__name__, + exc, + ) + return None, 0, 0 + validation_queue_wait_ms = result.pop("_queue_wait_ms", 0) + validation_ms = result.pop("_validation_ms", 0) + self._kpi.record_validation( + passed=result["pass"], + warning_count=result["warnings"]["count"], + error_count=result["errors"]["count"], + warning_sources=result["warnings"]["sources"], + error_sources=result["errors"]["sources"], + ) + logger.info( + "SHACL validation for %s: pass=%s warnings=%s errors=%s", + url, + result["pass"], + result["warnings"]["count"], + result["errors"]["count"], + ) + return result, validation_queue_wait_ms, validation_ms + def _validate_graph_if_enabled( self, graph: Graph, url: str ) -> dict[str, Any] | None: @@ -664,21 +920,26 @@ def _validate_graph_if_enabled( return summary def _validate_graph(self, graph: Graph) -> ValidationResult: - with tempfile.NamedTemporaryFile(mode="w", suffix=".ttl", delete=False) as f: - tmp = Path(f.name) - try: - graph.serialize(destination=tmp, format="turtle") - return validate_file( - str(tmp), - shape_specs=self._shacl_shape_specs - if self._shacl_shape_specs - else None, - ) - finally: - try: - tmp.unlink(missing_ok=True) - except Exception: - logger.debug("Failed to remove temporary SHACL graph file: %s", tmp) + data_graph = _normalize_schema_org_uris(graph) + conforms, report_graph, report_text = pyshacl_validate( + data_graph, + shacl_graph=self._shacl_shapes_graph, + inference="rdfs", + abort_on_first=False, + allow_infos=True, + allow_warnings=True, + ) + warning_count = sum( + 1 for _ in report_graph.subjects(SH.resultSeverity, SH.Warning) + ) + return ValidationResult( + conforms=conforms, + report_text=report_text, + report_graph=report_graph, + data_graph=data_graph, + shape_source_map=self._shacl_source_map, + warning_count=warning_count, + ) def _summarize_validation(self, result: ValidationResult) -> dict[str, Any]: sh = URIRef("http://www.w3.org/ns/shacl#") diff --git a/wordlift_sdk/protocol/graph/graph_queue.py b/wordlift_sdk/protocol/graph/graph_queue.py index 1ad8e33..56e818a 100644 --- a/wordlift_sdk/protocol/graph/graph_queue.py +++ b/wordlift_sdk/protocol/graph/graph_queue.py @@ -8,7 +8,13 @@ from rdflib import Graph from rdflib.compare import to_isomorphic from wordlift_client import Configuration -from tenacity import retry, retry_if_exception_type, wait_fixed, after_log +from tenacity import ( + retry, + retry_if_exception_type, + wait_fixed, + after_log, + stop_after_attempt, +) logger = logging.getLogger(__name__) @@ -20,9 +26,37 @@ class GraphQueue: def __init__(self, client_configuration: Configuration): self.client_configuration = client_configuration self.hashes = set() + self._api_client: wordlift_client.ApiClient | None = None + self._api_client_lock: asyncio.Lock | None = None + + async def _get_api_client(self) -> wordlift_client.ApiClient: + # Lazy-init the lock (must be created on the event loop). + if self._api_client_lock is None: + self._api_client_lock = asyncio.Lock() + if self._api_client is not None: + return self._api_client + async with self._api_client_lock: + if self._api_client is None: + # ApiClient.__init__ calls ssl.create_default_context() synchronously + # and must run on the event loop thread (it calls asyncio internals). + # Creating it once and caching avoids repeated SSL cert loading per put(). + client = wordlift_client.ApiClient( + configuration=self.client_configuration + ) + await client.__aenter__() + self._api_client = client + return self._api_client + + async def close(self) -> None: + if self._api_client is not None: + try: + await self._api_client.__aexit__(None, None, None) + except Exception: + pass + self._api_client = None @retry( - # stop=stop_after_attempt(5), # Retry up to 5 times + stop=stop_after_attempt(5), retry=retry_if_exception_type( asyncio.TimeoutError | aiohttp.client_exceptions.ServerDisconnectedError @@ -39,23 +73,22 @@ def __init__(self, client_configuration: Configuration): reraise=True, ) async def put(self, graph: Graph) -> None: - hash = GraphQueue.hash_graph(graph) + loop = asyncio.get_event_loop() + hash = await loop.run_in_executor(None, GraphQueue.hash_graph, graph) if hash not in self.hashes: self.hashes.add(hash) - async with wordlift_client.ApiClient( - configuration=self.client_configuration - ) as api_client: - api_instance = wordlift_client.EntitiesApi(api_client) + api_client = await self._get_api_client() + api_instance = wordlift_client.EntitiesApi(api_client) - try: - await api_instance.create_or_update_entities( - graph.serialize(format="turtle"), - _content_type="text/turtle", - ) - except Exception as e: - logger.error(f"Failed to create entities: {e}", exc_info=e) - raise e + try: + await api_instance.create_or_update_entities( + graph.serialize(format="turtle"), + _content_type="text/turtle", + ) + except Exception as e: + logger.error(f"Failed to create entities: {e}", exc_info=e) + raise e @staticmethod def hash_graph(graph: Graph) -> str: diff --git a/wordlift_sdk/structured_data/engine.py b/wordlift_sdk/structured_data/engine.py index 2fec73d..c62ad75 100644 --- a/wordlift_sdk/structured_data/engine.py +++ b/wordlift_sdk/structured_data/engine.py @@ -6,7 +6,10 @@ import hashlib import json import logging +import multiprocessing +import os import re +from concurrent.futures import ProcessPoolExecutor from dataclasses import dataclass from importlib import resources from pathlib import Path @@ -29,6 +32,56 @@ from wordlift_sdk.validation.shacl import ValidationResult, validate_file +import threading +import time as _time + + +# Top-level worker — must be module-level to be picklable for ProcessPoolExecutor. +# Accepts submit_time so it can measure queue wait (time spent waiting for a +# free subprocess slot). Returns (ntriples, queue_wait_ms). +def _morph_kgc_worker(config: str, submit_time: float) -> tuple[str, int]: + import morph_kgc as _mkgc + import time as _t + + queue_wait_ms = int((_t.time() - submit_time) * 1000) + ntriples = _mkgc.materialize(config).serialize(format="nt") + return ntriples, queue_wait_ms + + +# Thread-local used to pass mapping_wait_ms back to the protocol layer without +# changing the return type of _materialize_graph / apply_mapping. +_morph_kgc_tls = threading.local() + +# Lazy process pool — created on first use in the main process only. +# Worker subprocesses import this module but never call _get_morph_kgc_pool(), +# so they do NOT create their own pools (no recursive process explosion). +_morph_kgc_pool: ProcessPoolExecutor | None = None + + +def init_morph_kgc_pool(max_workers: int) -> None: + """Pre-create the morph_kgc process pool with a specific worker count. + Call once from the protocol __init__ before any mapping work starts. + Subsequent calls are no-ops (pool is only created once). + """ + global _morph_kgc_pool + if _morph_kgc_pool is not None: + return + ctx = multiprocessing.get_context("spawn") + _morph_kgc_pool = ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx) + + +def _get_morph_kgc_pool() -> ProcessPoolExecutor: + global _morph_kgc_pool + if _morph_kgc_pool is None: + # Fallback if init_morph_kgc_pool was never called. + ctx = multiprocessing.get_context("spawn") + _morph_kgc_pool = ProcessPoolExecutor( + max_workers=os.cpu_count() or 4, + mp_context=ctx, + ) + return _morph_kgc_pool + + _SCHEMA_BASE = "https://schema.org" _SCHEMA_HTTP = "http://schema.org/" _AGENT_BASE_URL = "https://api.wordlift.io/agent" @@ -1341,22 +1394,32 @@ def _normalize_materialization_error(error: Exception) -> RuntimeError: def _materialize_graph(mapping_path: Path) -> Graph: - try: - import morph_kgc - except ImportError as exc: - raise RuntimeError( - "morph-kgc is required. Install with: pip install morph-kgc" - ) from exc - config = ( "[CONFIGURATION]\n" "output_format = N-TRIPLES\n" + # Disable morph_kgc internal multiprocessing: on Linux it uses fork() which + # deadlocks when the parent process already has threads running (asyncio pool, + # SHACL ProcessPoolExecutor). The outer pipeline handles concurrency. + "number_of_processes = 1\n" "\n" "[DataSource1]\n" f"mappings = {mapping_path}\n" ) try: - return morph_kgc.materialize(config) + # Submit to subprocess pool — each worker has isolated pyparsing state, + # so calls are genuinely parallel across CPU cores with no lock needed. + # .result() blocks the calling thread (not the asyncio event loop). + ntriples, queue_wait_ms = ( + _get_morph_kgc_pool() + .submit(_morph_kgc_worker, config, _time.time()) + .result() + ) + # Store wait time in thread-local so protocol.py can read it without + # changing the return type of this function. + _morph_kgc_tls.mapping_wait_ms = queue_wait_ms + graph = Graph() + graph.parse(data=ntriples, format="nt") + return graph except RuntimeError: raise except Exception as exc: diff --git a/wordlift_sdk/workflow/url_handler/ingestion_web_page_scrape_url_handler.py b/wordlift_sdk/workflow/url_handler/ingestion_web_page_scrape_url_handler.py index a228e26..6c7a719 100644 --- a/wordlift_sdk/workflow/url_handler/ingestion_web_page_scrape_url_handler.py +++ b/wordlift_sdk/workflow/url_handler/ingestion_web_page_scrape_url_handler.py @@ -1,5 +1,7 @@ from __future__ import annotations +import asyncio +import functools import json import logging import re @@ -43,7 +45,10 @@ def __init__( async def __call__(self, url: Url) -> None: settings = self._build_settings(url) - result = run_ingestion(settings) + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, functools.partial(run_ingestion, settings) + ) if not result.pages: failed = [