From 9ff5f4218253077c18449facfeeca365c9856678 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 4 Mar 2026 16:22:11 -1000 Subject: [PATCH 1/2] Simplified CWL translator a tiny bit now that the duplicate-file bug has been fixed in Streamflow --- tests/translators_loggers/Dockerfile.streamflow | 4 +++- wfcommons/wfbench/translator/templates/cwl/shell.cwl | 2 -- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/translators_loggers/Dockerfile.streamflow b/tests/translators_loggers/Dockerfile.streamflow index 48a2cef5..1c305aa8 100644 --- a/tests/translators_loggers/Dockerfile.streamflow +++ b/tests/translators_loggers/Dockerfile.streamflow @@ -41,7 +41,9 @@ RUN apt-get -y install stress-ng # Streamflow RUN apt-get -y install nodejs -RUN python3 -m pip install --break-system-packages streamflow==0.2.0.dev14 +#RUN python3 -m pip install --break-system-packages streamflow==0.2.0.dev15 +# For now, the above hasnt' been released yet, so we get a commit tag +RUN git clone https://github.com/alpha-unito/streamflow.git && cd streamflow && git checkout 2295eda80ad37214d6f607f75f542b9eca03e121 && python3 -m pip install --break-system-packages . # Add wfcommons user RUN useradd -ms /bin/bash wfcommons diff --git a/wfcommons/wfbench/translator/templates/cwl/shell.cwl b/wfcommons/wfbench/translator/templates/cwl/shell.cwl index 1ff21740..6ef0496c 100644 --- a/wfcommons/wfbench/translator/templates/cwl/shell.cwl +++ b/wfcommons/wfbench/translator/templates/cwl/shell.cwl @@ -15,8 +15,6 @@ arguments: } } cmd = cmd + " > " + runtime.outdir + "/" + inputs.step_name + ".out 2> " + runtime.outdir + "/" + inputs.step_name + ".err"; - cmd = cmd + " ; echo '-- end of stdout for " + inputs.step_name + " --' >> " + runtime.outdir + "/" + inputs.step_name + ".out"; - cmd = cmd + " ; echo '-- end of stderr for " + inputs.step_name + " --' >> " + runtime.outdir + "/" + inputs.step_name + ".err"; return cmd; } shellQuote: false From 3c3ce86a5e67a533844fadea805fda2f922edfda Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Thu, 5 Mar 2026 15:47:03 -1000 Subject: [PATCH 2/2] Updates to the RO-Crate translator --- .../translators_loggers/Dockerfile.streamflow | 6 +- .../test_translators_loggers.py | 3 +- .../translator/templates/cwl/shell.cwl | 2 + wfcommons/wfinstances/logs/ro_crate.py | 61 ++++++++++--------- 4 files changed, 39 insertions(+), 33 deletions(-) diff --git a/tests/translators_loggers/Dockerfile.streamflow b/tests/translators_loggers/Dockerfile.streamflow index 1c305aa8..7585c547 100644 --- a/tests/translators_loggers/Dockerfile.streamflow +++ b/tests/translators_loggers/Dockerfile.streamflow @@ -26,8 +26,6 @@ RUN apt-get -y install gcc-multilib RUN apt-get -y install graphviz libgraphviz-dev RUN apt-get -y install zip - - # Python stuff RUN apt-get -y install python3 python3-pip RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 @@ -41,9 +39,9 @@ RUN apt-get -y install stress-ng # Streamflow RUN apt-get -y install nodejs -#RUN python3 -m pip install --break-system-packages streamflow==0.2.0.dev15 +RUN python3 -m pip install --break-system-packages streamflow==0.2.0.dev14 # For now, the above hasnt' been released yet, so we get a commit tag -RUN git clone https://github.com/alpha-unito/streamflow.git && cd streamflow && git checkout 2295eda80ad37214d6f607f75f542b9eca03e121 && python3 -m pip install --break-system-packages . +#RUN git clone https://github.com/alpha-unito/streamflow.git && cd streamflow && git checkout 2295eda80ad37214d6f607f75f542b9eca03e121 && python3 -m pip install --break-system-packages . # Add wfcommons user RUN useradd -ms /bin/bash wfcommons diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 7bce725f..8ab6a8ed 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -337,7 +337,8 @@ def test_translator(self, backend) -> None: elif backend == "streamflow": parser = ROCrateLogsParser(dirpath / "RO-Crate", steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"], - file_extensions_to_ignore=[".out", ".err"]) + file_extensions_to_ignore=[".out", ".err"], + instruments_to_ignore=["shell.cwl"]) if parser is not None: sys.stderr.write(f"[{backend}] Parsing the logs...\n") diff --git a/wfcommons/wfbench/translator/templates/cwl/shell.cwl b/wfcommons/wfbench/translator/templates/cwl/shell.cwl index 6ef0496c..1ff21740 100644 --- a/wfcommons/wfbench/translator/templates/cwl/shell.cwl +++ b/wfcommons/wfbench/translator/templates/cwl/shell.cwl @@ -15,6 +15,8 @@ arguments: } } cmd = cmd + " > " + runtime.outdir + "/" + inputs.step_name + ".out 2> " + runtime.outdir + "/" + inputs.step_name + ".err"; + cmd = cmd + " ; echo '-- end of stdout for " + inputs.step_name + " --' >> " + runtime.outdir + "/" + inputs.step_name + ".out"; + cmd = cmd + " ; echo '-- end of stderr for " + inputs.step_name + " --' >> " + runtime.outdir + "/" + inputs.step_name + ".err"; return cmd; } shellQuote: false diff --git a/wfcommons/wfinstances/logs/ro_crate.py b/wfcommons/wfinstances/logs/ro_crate.py index ecb1775b..4eecda91 100644 --- a/wfcommons/wfinstances/logs/ro_crate.py +++ b/wfcommons/wfinstances/logs/ro_crate.py @@ -39,23 +39,27 @@ class ROCrateLogsParser(LogsParser): :type description: Optional[str] :param logger: The logger where to log information/warning or errors (optional). :type logger: Optional[Logger] + :param steps_to_ignore: Names of CWL steps that should be ignored in the translation + :type steps_to_ignore: Optional[list[str]] + :param file_extensions_to_ignore: File extensions that should be ignored in the translation + :type file_extensions_to_ignore: Optional[list[str]] + :param instruments_to_ignore: Names of instruments that should be ignored in the translation + :type instruments_to_ignore: Optional[list[str]] """ def __init__(self, crate_dir: pathlib.Path, description: Optional[str] = None, logger: Optional[Logger] = None, - steps_to_ignore: Optional[list[str]]=[], - file_extensions_to_ignore: Optional[list[str]]=[], + steps_to_ignore: Optional[list[str]] = None, + file_extensions_to_ignore: Optional[list[str]] = None, + instruments_to_ignore: Optional[list[str]] = None, ) -> None: """Create an object of the RO crate parser.""" - # TODO: Decide if these should be RO crate or Streamflow or whatev super().__init__('Streamflow-ROCrate', 'https://w3id.org/workflowhub/workflow-ro-crate/1.0', description, logger) # Sanity check - if steps_to_ignore is None: - steps_to_ignore = [] if not crate_dir.is_dir(): raise OSError(f'The provided path does not exist or is not a folder: {crate_dir}') @@ -71,8 +75,9 @@ def __init__(self, self.task_id_name_map: dict[str, str] = {} self.data_file_id_name_map: dict[str, str] = {} - self.steps_to_ignore = steps_to_ignore - self.file_extensions_to_ignore = file_extensions_to_ignore + self.steps_to_ignore : list[str] = steps_to_ignore or [] + self.file_extensions_to_ignore : list[str] = file_extensions_to_ignore or [] + self.instruments_to_ignore : list[str] = instruments_to_ignore or [] def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: @@ -204,27 +209,27 @@ def _add_dependencies(self, files, instruments): for child in file.get('in', []): self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child]) - # THIS IS COMMENTED OUT AT IT SEEMS TO ADD TONS OF NON-EXISTING DEPENDENCIES ON WORKFLOW BENCHMARKS - # (FOR INSTANCE, IT TOTALLY BREAKS THE BENCHMARK WORKFLOW DUE TO ALL OF THEM USING shell.cwl#output_files - # parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data)) - # for parameter_connection in parameter_connections: - # # parameter_connection["sourceParameter"] is either a single dict or a list of dicts, - # # which is bad design but whatever - # source_parameters = parameter_connection["sourceParameter"] - # if not isinstance(source_parameters, list): - # source_parameters = [source_parameters] - # for item in source_parameters: - # source = item["@id"] - # source = source.rsplit("#", 1)[0] # Trim to get instrument - # - # target = parameter_connection["targetParameter"]["@id"] - # target = target.rsplit("#", 1)[0] # Trim to get instrument - # print("source", source, "----> target", target) - # - # for parent in instruments.get(source, []): - # for child in instruments.get(target, []): - # self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child]) - + parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data)) + for parameter_connection in parameter_connections: + # parameter_connection["sourceParameter"] is either a single dict or a list of dicts, + # which is bad design but whatever + source_parameters = parameter_connection["sourceParameter"] + if not isinstance(source_parameters, list): + source_parameters = [source_parameters] + for item in source_parameters: + source = item["@id"] + source = source.rsplit("#", 1)[0] # Trim to get instrument + + target = parameter_connection["targetParameter"]["@id"] + target = target.rsplit("#", 1)[0] # Trim to get instrument + + if source in self.instruments_to_ignore or target in self.instruments_to_ignore: + continue + # print("source", source, "----> target", target) + + for parent in instruments.get(source, []): + for child in instruments.get(target, []): + self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child]) def _time_diff(self, start_time, end_time): diff = datetime.fromisoformat(end_time) - datetime.fromisoformat(start_time)