From 6dac8ade7f9e6ca3859148407ba1dc95f0d92b82 Mon Sep 17 00:00:00 2001 From: verkaik Date: Wed, 18 Mar 2026 15:29:41 +0100 Subject: [PATCH] Initial commit for pull request. --- docs/api/changelog.rst | 7 +++ imod/common/utilities/partitioninfo.py | 21 ++++++- imod/mf6/multimodel/modelsplitter.py | 11 ++++ imod/mf6/simulation.py | 61 ++++++++++++++++++- imod/templates/mf6/sim-nam.j2 | 3 +- imod/templates/mf6/utl-hpc.j2 | 9 +++ imod/tests/test_mf6/test_ex01_twri.py | 3 +- imod/tests/test_mf6/test_mf6_simulation.py | 1 - .../test_mf6_modelsplitter_transport.py | 45 ++++++++++++++ 9 files changed, 151 insertions(+), 10 deletions(-) create mode 100644 imod/templates/mf6/utl-hpc.j2 diff --git a/docs/api/changelog.rst b/docs/api/changelog.rst index 629ff6f17..b1db21c56 100644 --- a/docs/api/changelog.rst +++ b/docs/api/changelog.rst @@ -18,6 +18,13 @@ Added its first times step is all nodata. This can save a lot of clipping or masking transient models with many timesteps. - Added :meth:`imod.msw.MetaSwapModel.split` to split MetaSWAP models. +- Added ``submodel_label_to_mpi_rank`` argument to + :meth:`imod.mf6.Modflow6Simulation.split` + for specifing the MPI rank for each label in case of parallel simulation. +- Added ``write_hpc_file`` argument to + :meth:`imod.mf6.Modflow6Simulation.write + for writing the HPC file with the model to MPI rank mappings in case of + parallel simulation. Fixed ~~~~~ diff --git a/imod/common/utilities/partitioninfo.py b/imod/common/utilities/partitioninfo.py index 0bd89c419..38d875677 100644 --- a/imod/common/utilities/partitioninfo.py +++ b/imod/common/utilities/partitioninfo.py @@ -1,4 +1,4 @@ -from typing import List, NamedTuple +from typing import List, NamedTuple, Optional import numpy as np @@ -9,9 +9,13 @@ class PartitionInfo(NamedTuple): active_domain: GridDataArray id: int + mpi_rank: int -def create_partition_info(submodel_labels: GridDataArray) -> List[PartitionInfo]: +def create_partition_info( + submodel_labels: GridDataArray, + submodel_label_to_mpi_rank: Optional[dict[int, int]] = None, +) -> List[PartitionInfo]: """ A PartitionInfo is used to partition a model or package. The partition info's of a domain are created using a submodel_labels array. The @@ -31,8 +35,19 @@ def create_partition_info(submodel_labels: GridDataArray) -> List[PartitionInfo] active_domain = ones_like(active_domain).where(active_domain.notnull(), 0) active_domain = active_domain.astype(submodel_labels.dtype) + if submodel_label_to_mpi_rank is not None: + if label_id not in submodel_label_to_mpi_rank: + raise ValueError( + f"Label {label_id} not found in the MPI process mapping." + ) + mpi_rank = submodel_label_to_mpi_rank[label_id] + if mpi_rank < 0: + raise ValueError(f"Negative MPI rank of {mpi_rank} is not allowed.") + else: + mpi_rank = -1 + submodel_partition_info = PartitionInfo( - id=label_id, active_domain=active_domain + id=label_id, mpi_rank=mpi_rank, active_domain=active_domain ) partition_infos.append(submodel_partition_info) diff --git a/imod/mf6/multimodel/modelsplitter.py b/imod/mf6/multimodel/modelsplitter.py index fd6e080c4..69acb8d6d 100644 --- a/imod/mf6/multimodel/modelsplitter.py +++ b/imod/mf6/multimodel/modelsplitter.py @@ -300,3 +300,14 @@ def _get_transport_models( for model_name, model in models.items() if isinstance(model, GroundwaterTransportModel) ] + + def _get_model_to_mpi_rank_mapping(self) -> dict[str, int]: + mpi_mapping = {} + + for id, model_dict in self._partition_id_to_models.items(): + mpi_rank = self.partition_info[id].mpi_rank + if mpi_rank > 0: + for model_name in model_dict.keys(): + mpi_mapping[model_name] = mpi_rank + + return mpi_mapping diff --git a/imod/mf6/simulation.py b/imod/mf6/simulation.py index 0e2013a8a..afd8c63b2 100644 --- a/imod/mf6/simulation.py +++ b/imod/mf6/simulation.py @@ -92,6 +92,12 @@ def get_packages(simulation: Modflow6Simulation) -> dict[str, Package]: } +def initialize_template_hpc_filein(): + loader = jinja2.PackageLoader("imod", "templates/mf6") + env = jinja2.Environment(loader=loader, keep_trailing_newline=True) + return env.get_template("utl-hpc.j2") + + class Modflow6Simulation(collections.UserDict, ISimulation): """ Modflow6Simulation is a class that represents a Modflow 6 simulation. It @@ -250,7 +256,7 @@ def create_time_discretization(self, additional_times, validate: bool = True): timestep_duration=timestep_duration, validate=validate ) - def _render(self, write_context: WriteContext): + def _render(self, write_context: WriteContext, hpc_filein: str | None): """Renders simulation namefile""" d: dict[str, Any] = {} models = [] @@ -284,6 +290,10 @@ def _render(self, write_context: WriteContext): d["exchanges"] = self.get_exchange_relationships() d["solutiongroups"] = [solutiongroups] + + if hpc_filein is not None: + d["hpc_filein"] = hpc_filein + return self._template.render(d) def _write_tdis_package(self, globaltimes, write_context): @@ -300,6 +310,24 @@ def _write_tdis_package(self, globaltimes, write_context): write_context=write_context, ) + def _write_hpc_package(self, hpc_filein_path): + """Write the HPC file""" + + if "split_mpi_rank_mapping" not in self: + raise ValueError("MPI rank mappings are not set.") + + template = initialize_template_hpc_filein() + partitions = [] + for mname, mrank in self["split_mpi_rank_mapping"].items(): + partitions.append((mname, mrank)) + d: dict[str, Any] = {} + d["partitions"] = partitions + + content = template.render(d) + + with open(hpc_filein_path, "w") as f: + f.write(content) + @standard_log_decorator() def write( self, @@ -307,6 +335,7 @@ def write( binary=True, validate: bool = True, use_absolute_paths=False, + write_hpc_file=False, ): """ Write Modflow6 simulation, including assigned groundwater flow and @@ -325,6 +354,9 @@ def write( ``ValidationError``. use_absolute_paths: ({True, False}, optional) True if all paths written to the mf6 inputfiles should be absolute. + write_hpc_file: ({True, False}, optional) + When True, the HPC file is being written that is used for parallel + parallel simulation with the Message Passing Interface. Examples -------- @@ -356,8 +388,15 @@ def write( directory = pathlib.Path(directory) directory.mkdir(exist_ok=True, parents=True) + # Write the hpc file + if self.is_split() and write_hpc_file: + hpc_filein = "mfsim.hpc" + self._write_hpc_package(directory / hpc_filein) + else: + hpc_filein = None + # Write simulation namefile - mfsim_content = self._render(write_context) + mfsim_content = self._render(write_context, hpc_filein) mfsim_content = prepend_content_with_version_info(mfsim_content) mfsim_path = directory / "mfsim.nam" with open(mfsim_path, "w") as f: @@ -1058,6 +1097,8 @@ def dump( toml_content[key][exchange_class_short].append(path.name) + elif key == "split_mpi_rank_mapping": + raise ValueError("Dump is not yet supported for the HPC input file.") else: path = value.to_file( directory, @@ -1395,6 +1436,7 @@ def split( self, submodel_labels: GridDataArray, ignore_time_purge_empty: Optional[bool] = None, + submodel_label_to_mpi_rank: Optional[dict[int, int]] = None, ) -> Modflow6Simulation: """ Split a simulation in different partitions using a submodel_labels @@ -1416,6 +1458,9 @@ def split( timesteps for each package is a costly operation. Therefore, this option can be set to True to only check the first timestep. If None, the value of the validation settings of the simulation are used. + submodel_label_to_mpi_rank: optional, dict, default None + A dictionary that specifies for each unique label, as in submodel_labels, + the Message Passing Interface (MPI) process rank number. Returns ------- @@ -1455,7 +1500,9 @@ def split( ) # Create partition info - partition_info = create_partition_info(submodel_labels) + partition_info = create_partition_info( + submodel_labels, submodel_label_to_mpi_rank + ) # Create new simulation new_simulation = imod.mf6.Modflow6Simulation( @@ -1511,6 +1558,8 @@ def split( ) new_simulation._add_modelsplit_exchanges(exchanges) + model_to_mpi_rank = modelsplitter._get_model_to_mpi_rank_mapping() + new_simulation._add_modelsplit_mpi_rank_mapping(model_to_mpi_rank) new_simulation._set_flow_exchange_options() new_simulation._set_transport_exchange_options() new_simulation._filter_inactive_cells_from_exchanges() @@ -1557,6 +1606,12 @@ def _add_modelsplit_exchanges(self, exchanges_list: list[GWFGWF]) -> None: self["split_exchanges"] = [] self["split_exchanges"].extend(exchanges_list) + def _add_modelsplit_mpi_rank_mapping( + self, mpi_rank_mapping: dict[str, int] + ) -> None: + if self.is_split() and mpi_rank_mapping != {}: + self["split_mpi_rank_mapping"] = mpi_rank_mapping + def _set_flow_exchange_options(self) -> None: # collect some options that we will auto-set for exchange in self["split_exchanges"]: diff --git a/imod/templates/mf6/sim-nam.j2 b/imod/templates/mf6/sim-nam.j2 index 76510482f..d1a513ef3 100644 --- a/imod/templates/mf6/sim-nam.j2 +++ b/imod/templates/mf6/sim-nam.j2 @@ -4,6 +4,8 @@ begin options {%- if nocheck is defined %} nocheck {% endif %} {%- if memory_print_option is defined %} memory_print_option {{memory_print_option}} +{% endif %} +{%- if hpc_filein is defined %} hpc6 filein {{hpc_filein}} {% endif -%} end options @@ -20,7 +22,6 @@ begin exchanges {%- for exgtype, exgfile, exgmnamea, exgmnameb in exchanges %} {{exgtype}} {{exgfile}} {{exgmnamea}} {{exgmnameb}} {%- endfor %} - end exchanges {% for solutiongroup in solutiongroups %}begin solutiongroup {{loop.index}} diff --git a/imod/templates/mf6/utl-hpc.j2 b/imod/templates/mf6/utl-hpc.j2 new file mode 100644 index 000000000..b94894f5f --- /dev/null +++ b/imod/templates/mf6/utl-hpc.j2 @@ -0,0 +1,9 @@ +begin options +{% if print_tabe is defined %} print_table +{% endif -%} +end options + +begin partitions +{% for mname, mrank in partitions %} {{mname}} {{mrank}} +{% endfor -%} +end partitions diff --git a/imod/tests/test_mf6/test_ex01_twri.py b/imod/tests/test_mf6/test_ex01_twri.py index 5709b8df5..0223bb818 100644 --- a/imod/tests/test_mf6/test_ex01_twri.py +++ b/imod/tests/test_mf6/test_ex01_twri.py @@ -378,7 +378,7 @@ def test_gwfmodel_render(twri_model, tmp_path): def test_simulation_render(twri_model): simulation = twri_model write_context = WriteContext(".") - actual = simulation._render(write_context) + actual = simulation._render(write_context, None) expected = textwrap.dedent( """\ @@ -394,7 +394,6 @@ def test_simulation_render(twri_model): end models begin exchanges - end exchanges begin solutiongroup 1 diff --git a/imod/tests/test_mf6/test_mf6_simulation.py b/imod/tests/test_mf6/test_mf6_simulation.py index 2fc5f42c9..c7f4fe3c8 100644 --- a/imod/tests/test_mf6/test_mf6_simulation.py +++ b/imod/tests/test_mf6/test_mf6_simulation.py @@ -507,7 +507,6 @@ def test_exchanges_in_simulation_file(self, transient_twri_model, tmp_path): exchanges GWF6-GWF6 GWF_1_0_GWF_1_1.gwfgwf GWF_1_0 GWF_1_1 GWF6-GWF6 GWF_1_1_GWF_1_2.gwfgwf GWF_1_1 GWF_1_2 - end exchanges """ ) diff --git a/imod/tests/test_mf6/test_multimodel/test_mf6_modelsplitter_transport.py b/imod/tests/test_mf6/test_multimodel/test_mf6_modelsplitter_transport.py index c834c5c81..419b5ee8b 100644 --- a/imod/tests/test_mf6/test_multimodel/test_mf6_modelsplitter_transport.py +++ b/imod/tests/test_mf6/test_multimodel/test_mf6_modelsplitter_transport.py @@ -1,3 +1,4 @@ +import textwrap from filecmp import dircmp from pathlib import Path @@ -198,6 +199,50 @@ def test_split_flow_and_transport_model_evaluate_output_with_species( ) +def test_split_flow_and_transport_model_evaluate_hpc_filein( + tmp_path: Path, flow_transport_simulation: Modflow6Simulation +): + simulation = flow_transport_simulation + + flow_model = simulation["flow"] + active = flow_model.domain + + submodel_labels = zeros_like(active) + submodel_labels = submodel_labels.drop_vars("layer") + submodel_labels.values[:, :, 15:] = 1 + submodel_labels = submodel_labels.sel(layer=0, drop=True) + + submodel_label_to_mpi_rank = {0: 10, 1: 100} + + new_simulation = simulation.split( + submodel_labels, submodel_label_to_mpi_rank=submodel_label_to_mpi_rank + ) + new_simulation.write(tmp_path, binary=False, write_hpc_file=True) + + expected_partitions_block = textwrap.dedent( + """\ + begin partitions + flow_0 10 + tpt_a_0 10 + tpt_b_0 10 + tpt_c_0 10 + tpt_d_0 10 + flow_1 100 + tpt_a_1 100 + tpt_b_1 100 + tpt_c_1 100 + tpt_d_1 100 + end partitions + """ + ) + + with open(tmp_path / "mfsim.hpc", mode="r") as hpc_filein: + hpcfile_content = hpc_filein.read() + + # Assert + assert expected_partitions_block in hpcfile_content + + @pytest.mark.parametrize( "advection_pkg", [AdvectionCentral, AdvectionTVD, AdvectionUpstream] )