diff --git a/tests/translators_loggers/Dockerfile.streamflow b/tests/translators_loggers/Dockerfile.streamflow index 48a2cef5..7585c547 100644 --- a/tests/translators_loggers/Dockerfile.streamflow +++ b/tests/translators_loggers/Dockerfile.streamflow @@ -26,8 +26,6 @@ RUN apt-get -y install gcc-multilib RUN apt-get -y install graphviz libgraphviz-dev RUN apt-get -y install zip - - # Python stuff RUN apt-get -y install python3 python3-pip RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 @@ -42,6 +40,8 @@ RUN apt-get -y install stress-ng # Streamflow RUN apt-get -y install nodejs RUN python3 -m pip install --break-system-packages streamflow==0.2.0.dev14 +# For now, the above hasnt' been released yet, so we get a commit tag +#RUN git clone https://github.com/alpha-unito/streamflow.git && cd streamflow && git checkout 2295eda80ad37214d6f607f75f542b9eca03e121 && python3 -m pip install --break-system-packages . # Add wfcommons user RUN useradd -ms /bin/bash wfcommons diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 7bce725f..8ab6a8ed 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -337,7 +337,8 @@ def test_translator(self, backend) -> None: elif backend == "streamflow": parser = ROCrateLogsParser(dirpath / "RO-Crate", steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"], - file_extensions_to_ignore=[".out", ".err"]) + file_extensions_to_ignore=[".out", ".err"], + instruments_to_ignore=["shell.cwl"]) if parser is not None: sys.stderr.write(f"[{backend}] Parsing the logs...\n") diff --git a/wfcommons/wfinstances/logs/ro_crate.py b/wfcommons/wfinstances/logs/ro_crate.py index ecb1775b..4eecda91 100644 --- a/wfcommons/wfinstances/logs/ro_crate.py +++ b/wfcommons/wfinstances/logs/ro_crate.py @@ -39,23 +39,27 @@ class ROCrateLogsParser(LogsParser): :type description: Optional[str] :param logger: The logger where to log information/warning or errors (optional). :type logger: Optional[Logger] + :param steps_to_ignore: Names of CWL steps that should be ignored in the translation + :type steps_to_ignore: Optional[list[str]] + :param file_extensions_to_ignore: File extensions that should be ignored in the translation + :type file_extensions_to_ignore: Optional[list[str]] + :param instruments_to_ignore: Names of instruments that should be ignored in the translation + :type instruments_to_ignore: Optional[list[str]] """ def __init__(self, crate_dir: pathlib.Path, description: Optional[str] = None, logger: Optional[Logger] = None, - steps_to_ignore: Optional[list[str]]=[], - file_extensions_to_ignore: Optional[list[str]]=[], + steps_to_ignore: Optional[list[str]] = None, + file_extensions_to_ignore: Optional[list[str]] = None, + instruments_to_ignore: Optional[list[str]] = None, ) -> None: """Create an object of the RO crate parser.""" - # TODO: Decide if these should be RO crate or Streamflow or whatev super().__init__('Streamflow-ROCrate', 'https://w3id.org/workflowhub/workflow-ro-crate/1.0', description, logger) # Sanity check - if steps_to_ignore is None: - steps_to_ignore = [] if not crate_dir.is_dir(): raise OSError(f'The provided path does not exist or is not a folder: {crate_dir}') @@ -71,8 +75,9 @@ def __init__(self, self.task_id_name_map: dict[str, str] = {} self.data_file_id_name_map: dict[str, str] = {} - self.steps_to_ignore = steps_to_ignore - self.file_extensions_to_ignore = file_extensions_to_ignore + self.steps_to_ignore : list[str] = steps_to_ignore or [] + self.file_extensions_to_ignore : list[str] = file_extensions_to_ignore or [] + self.instruments_to_ignore : list[str] = instruments_to_ignore or [] def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: @@ -204,27 +209,27 @@ def _add_dependencies(self, files, instruments): for child in file.get('in', []): self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child]) - # THIS IS COMMENTED OUT AT IT SEEMS TO ADD TONS OF NON-EXISTING DEPENDENCIES ON WORKFLOW BENCHMARKS - # (FOR INSTANCE, IT TOTALLY BREAKS THE BENCHMARK WORKFLOW DUE TO ALL OF THEM USING shell.cwl#output_files - # parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data)) - # for parameter_connection in parameter_connections: - # # parameter_connection["sourceParameter"] is either a single dict or a list of dicts, - # # which is bad design but whatever - # source_parameters = parameter_connection["sourceParameter"] - # if not isinstance(source_parameters, list): - # source_parameters = [source_parameters] - # for item in source_parameters: - # source = item["@id"] - # source = source.rsplit("#", 1)[0] # Trim to get instrument - # - # target = parameter_connection["targetParameter"]["@id"] - # target = target.rsplit("#", 1)[0] # Trim to get instrument - # print("source", source, "----> target", target) - # - # for parent in instruments.get(source, []): - # for child in instruments.get(target, []): - # self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child]) - + parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data)) + for parameter_connection in parameter_connections: + # parameter_connection["sourceParameter"] is either a single dict or a list of dicts, + # which is bad design but whatever + source_parameters = parameter_connection["sourceParameter"] + if not isinstance(source_parameters, list): + source_parameters = [source_parameters] + for item in source_parameters: + source = item["@id"] + source = source.rsplit("#", 1)[0] # Trim to get instrument + + target = parameter_connection["targetParameter"]["@id"] + target = target.rsplit("#", 1)[0] # Trim to get instrument + + if source in self.instruments_to_ignore or target in self.instruments_to_ignore: + continue + # print("source", source, "----> target", target) + + for parent in instruments.get(source, []): + for child in instruments.get(target, []): + self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child]) def _time_diff(self, start_time, end_time): diff = datetime.fromisoformat(end_time) - datetime.fromisoformat(start_time)