Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/translators_loggers/Dockerfile.streamflow
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ RUN apt-get -y install gcc-multilib
RUN apt-get -y install graphviz libgraphviz-dev
RUN apt-get -y install zip



# Python stuff
RUN apt-get -y install python3 python3-pip
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
Expand All @@ -42,6 +40,8 @@ RUN apt-get -y install stress-ng
# Streamflow
RUN apt-get -y install nodejs
RUN python3 -m pip install --break-system-packages streamflow==0.2.0.dev14
# For now, the above hasnt' been released yet, so we get a commit tag
#RUN git clone https://github.com/alpha-unito/streamflow.git && cd streamflow && git checkout 2295eda80ad37214d6f607f75f542b9eca03e121 && python3 -m pip install --break-system-packages .

# Add wfcommons user
RUN useradd -ms /bin/bash wfcommons
Expand Down
3 changes: 2 additions & 1 deletion tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ def test_translator(self, backend) -> None:
elif backend == "streamflow":
parser = ROCrateLogsParser(dirpath / "RO-Crate",
steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"],
file_extensions_to_ignore=[".out", ".err"])
file_extensions_to_ignore=[".out", ".err"],
instruments_to_ignore=["shell.cwl"])

if parser is not None:
sys.stderr.write(f"[{backend}] Parsing the logs...\n")
Expand Down
61 changes: 33 additions & 28 deletions wfcommons/wfinstances/logs/ro_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,27 @@ class ROCrateLogsParser(LogsParser):
:type description: Optional[str]
:param logger: The logger where to log information/warning or errors (optional).
:type logger: Optional[Logger]
:param steps_to_ignore: Names of CWL steps that should be ignored in the translation
:type steps_to_ignore: Optional[list[str]]
:param file_extensions_to_ignore: File extensions that should be ignored in the translation
:type file_extensions_to_ignore: Optional[list[str]]
:param instruments_to_ignore: Names of instruments that should be ignored in the translation
:type instruments_to_ignore: Optional[list[str]]
"""

def __init__(self,
crate_dir: pathlib.Path,
description: Optional[str] = None,
logger: Optional[Logger] = None,
steps_to_ignore: Optional[list[str]]=[],
file_extensions_to_ignore: Optional[list[str]]=[],
steps_to_ignore: Optional[list[str]] = None,
file_extensions_to_ignore: Optional[list[str]] = None,
instruments_to_ignore: Optional[list[str]] = None,
) -> None:
"""Create an object of the RO crate parser."""

# TODO: Decide if these should be RO crate or Streamflow or whatev
super().__init__('Streamflow-ROCrate', 'https://w3id.org/workflowhub/workflow-ro-crate/1.0', description, logger)

# Sanity check
if steps_to_ignore is None:
steps_to_ignore = []
if not crate_dir.is_dir():
raise OSError(f'The provided path does not exist or is not a folder: {crate_dir}')

Expand All @@ -71,8 +75,9 @@ def __init__(self,
self.task_id_name_map: dict[str, str] = {}
self.data_file_id_name_map: dict[str, str] = {}

self.steps_to_ignore = steps_to_ignore
self.file_extensions_to_ignore = file_extensions_to_ignore
self.steps_to_ignore : list[str] = steps_to_ignore or []
self.file_extensions_to_ignore : list[str] = file_extensions_to_ignore or []
self.instruments_to_ignore : list[str] = instruments_to_ignore or []


def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
Expand Down Expand Up @@ -204,27 +209,27 @@ def _add_dependencies(self, files, instruments):
for child in file.get('in', []):
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])

# THIS IS COMMENTED OUT AT IT SEEMS TO ADD TONS OF NON-EXISTING DEPENDENCIES ON WORKFLOW BENCHMARKS
# (FOR INSTANCE, IT TOTALLY BREAKS THE BENCHMARK WORKFLOW DUE TO ALL OF THEM USING shell.cwl#output_files
# parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
# for parameter_connection in parameter_connections:
# # parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
# # which is bad design but whatever
# source_parameters = parameter_connection["sourceParameter"]
# if not isinstance(source_parameters, list):
# source_parameters = [source_parameters]
# for item in source_parameters:
# source = item["@id"]
# source = source.rsplit("#", 1)[0] # Trim to get instrument
#
# target = parameter_connection["targetParameter"]["@id"]
# target = target.rsplit("#", 1)[0] # Trim to get instrument
# print("source", source, "----> target", target)
#
# for parent in instruments.get(source, []):
# for child in instruments.get(target, []):
# self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])

parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
for parameter_connection in parameter_connections:
# parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
# which is bad design but whatever
source_parameters = parameter_connection["sourceParameter"]
if not isinstance(source_parameters, list):
source_parameters = [source_parameters]
for item in source_parameters:
source = item["@id"]
source = source.rsplit("#", 1)[0] # Trim to get instrument

target = parameter_connection["targetParameter"]["@id"]
target = target.rsplit("#", 1)[0] # Trim to get instrument

if source in self.instruments_to_ignore or target in self.instruments_to_ignore:
continue
# print("source", source, "----> target", target)

for parent in instruments.get(source, []):
for child in instruments.get(target, []):
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])

def _time_diff(self, start_time, end_time):
diff = datetime.fromisoformat(end_time) - datetime.fromisoformat(start_time)
Expand Down