Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/api/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ Added
its first times step is all nodata. This can save a lot of clipping or masking
transient models with many timesteps.
- Added :meth:`imod.msw.MetaSwapModel.split` to split MetaSWAP models.
- Added ``submodel_label_to_mpi_rank`` argument to
:meth:`imod.mf6.Modflow6Simulation.split`
for specifing the MPI rank for each label in case of parallel simulation.
- Added ``write_hpc_file`` argument to
:meth:`imod.mf6.Modflow6Simulation.write
for writing the HPC file with the model to MPI rank mappings in case of
parallel simulation.

Fixed
~~~~~
Expand Down
21 changes: 18 additions & 3 deletions imod/common/utilities/partitioninfo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, NamedTuple
from typing import List, NamedTuple, Optional

import numpy as np

Expand All @@ -9,9 +9,13 @@
class PartitionInfo(NamedTuple):
active_domain: GridDataArray
id: int
mpi_rank: int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment on line R47 in this file

Suggested change
mpi_rank: int
mpi_rank: Optional[int]



def create_partition_info(submodel_labels: GridDataArray) -> List[PartitionInfo]:
def create_partition_info(
submodel_labels: GridDataArray,
submodel_label_to_mpi_rank: Optional[dict[int, int]] = None,
) -> List[PartitionInfo]:
"""
A PartitionInfo is used to partition a model or package. The partition
info's of a domain are created using a submodel_labels array. The
Expand All @@ -31,8 +35,19 @@ def create_partition_info(submodel_labels: GridDataArray) -> List[PartitionInfo]
active_domain = ones_like(active_domain).where(active_domain.notnull(), 0)
active_domain = active_domain.astype(submodel_labels.dtype)

if submodel_label_to_mpi_rank is not None:
if label_id not in submodel_label_to_mpi_rank:
raise ValueError(
f"Label {label_id} not found in the MPI process mapping."
)
mpi_rank = submodel_label_to_mpi_rank[label_id]
if mpi_rank < 0:
raise ValueError(f"Negative MPI rank of {mpi_rank} is not allowed.")
else:
mpi_rank = -1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like sentinel values to indicate the mpi_rank is not set. It's more understandable for other developers to set mpi_rank to None here to indicate mpi_rank is not set.


submodel_partition_info = PartitionInfo(
id=label_id, active_domain=active_domain
id=label_id, mpi_rank=mpi_rank, active_domain=active_domain
)
partition_infos.append(submodel_partition_info)

Expand Down
11 changes: 11 additions & 0 deletions imod/mf6/multimodel/modelsplitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,14 @@ def _get_transport_models(
for model_name, model in models.items()
if isinstance(model, GroundwaterTransportModel)
]

def _get_model_to_mpi_rank_mapping(self) -> dict[str, int]:
mpi_mapping = {}

for id, model_dict in self._partition_id_to_models.items():
mpi_rank = self.partition_info[id].mpi_rank
if mpi_rank > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the code in common.utilities.partitioninfo.py you assume -1 is an inactive mpi_rank, but here you assume mpi_rank==0 is inactive as well?

for model_name in model_dict.keys():
mpi_mapping[model_name] = mpi_rank

return mpi_mapping
61 changes: 58 additions & 3 deletions imod/mf6/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@
}


def initialize_template_hpc_filein():
loader = jinja2.PackageLoader("imod", "templates/mf6")
env = jinja2.Environment(loader=loader, keep_trailing_newline=True)
return env.get_template("utl-hpc.j2")


class Modflow6Simulation(collections.UserDict, ISimulation):
"""
Modflow6Simulation is a class that represents a Modflow 6 simulation. It
Expand Down Expand Up @@ -250,7 +256,7 @@
timestep_duration=timestep_duration, validate=validate
)

def _render(self, write_context: WriteContext):
def _render(self, write_context: WriteContext, hpc_filein: str | None):

Check failure on line 259 in imod/mf6/simulation.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 23 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=Deltares_imod-python&issues=AZ0BZrr9kWjQTfwsF08M&open=AZ0BZrr9kWjQTfwsF08M&pullRequest=1790
"""Renders simulation namefile"""
d: dict[str, Any] = {}
models = []
Expand Down Expand Up @@ -284,6 +290,10 @@
d["exchanges"] = self.get_exchange_relationships()

d["solutiongroups"] = [solutiongroups]

if hpc_filein is not None:
d["hpc_filein"] = hpc_filein

return self._template.render(d)

def _write_tdis_package(self, globaltimes, write_context):
Expand All @@ -300,13 +310,32 @@
write_context=write_context,
)

def _write_hpc_package(self, hpc_filein_path):
"""Write the HPC file"""

if "split_mpi_rank_mapping" not in self:
raise ValueError("MPI rank mappings are not set.")

template = initialize_template_hpc_filein()
partitions = []
for mname, mrank in self["split_mpi_rank_mapping"].items():
partitions.append((mname, mrank))
d: dict[str, Any] = {}
d["partitions"] = partitions

content = template.render(d)

with open(hpc_filein_path, "w") as f:
f.write(content)

@standard_log_decorator()
def write(
self,
directory=".",
binary=True,
validate: bool = True,
use_absolute_paths=False,
write_hpc_file=False,
):
"""
Write Modflow6 simulation, including assigned groundwater flow and
Expand All @@ -325,6 +354,9 @@
``ValidationError``.
use_absolute_paths: ({True, False}, optional)
True if all paths written to the mf6 inputfiles should be absolute.
write_hpc_file: ({True, False}, optional)
When True, the HPC file is being written that is used for parallel
parallel simulation with the Message Passing Interface.

Examples
--------
Expand Down Expand Up @@ -356,8 +388,15 @@
directory = pathlib.Path(directory)
directory.mkdir(exist_ok=True, parents=True)

# Write the hpc file
if self.is_split() and write_hpc_file:
hpc_filein = "mfsim.hpc"
self._write_hpc_package(directory / hpc_filein)
else:
hpc_filein = None

# Write simulation namefile
mfsim_content = self._render(write_context)
mfsim_content = self._render(write_context, hpc_filein)
mfsim_content = prepend_content_with_version_info(mfsim_content)
mfsim_path = directory / "mfsim.nam"
with open(mfsim_path, "w") as f:
Expand Down Expand Up @@ -1058,6 +1097,8 @@

toml_content[key][exchange_class_short].append(path.name)

elif key == "split_mpi_rank_mapping":
raise ValueError("Dump is not yet supported for the HPC input file.")
else:
path = value.to_file(
directory,
Expand Down Expand Up @@ -1395,6 +1436,7 @@
self,
submodel_labels: GridDataArray,
ignore_time_purge_empty: Optional[bool] = None,
submodel_label_to_mpi_rank: Optional[dict[int, int]] = None,
) -> Modflow6Simulation:
"""
Split a simulation in different partitions using a submodel_labels
Expand All @@ -1416,6 +1458,9 @@
timesteps for each package is a costly operation. Therefore, this
option can be set to True to only check the first timestep. If None,
the value of the validation settings of the simulation are used.
submodel_label_to_mpi_rank: optional, dict, default None
A dictionary that specifies for each unique label, as in submodel_labels,
the Message Passing Interface (MPI) process rank number.

Returns
-------
Expand Down Expand Up @@ -1455,7 +1500,9 @@
)

# Create partition info
partition_info = create_partition_info(submodel_labels)
partition_info = create_partition_info(
submodel_labels, submodel_label_to_mpi_rank
)

# Create new simulation
new_simulation = imod.mf6.Modflow6Simulation(
Expand Down Expand Up @@ -1511,6 +1558,8 @@
)

new_simulation._add_modelsplit_exchanges(exchanges)
model_to_mpi_rank = modelsplitter._get_model_to_mpi_rank_mapping()
new_simulation._add_modelsplit_mpi_rank_mapping(model_to_mpi_rank)
new_simulation._set_flow_exchange_options()
new_simulation._set_transport_exchange_options()
new_simulation._filter_inactive_cells_from_exchanges()
Expand Down Expand Up @@ -1557,6 +1606,12 @@
self["split_exchanges"] = []
self["split_exchanges"].extend(exchanges_list)

def _add_modelsplit_mpi_rank_mapping(
self, mpi_rank_mapping: dict[str, int]
) -> None:
if self.is_split() and mpi_rank_mapping != {}:
self["split_mpi_rank_mapping"] = mpi_rank_mapping

def _set_flow_exchange_options(self) -> None:
# collect some options that we will auto-set
for exchange in self["split_exchanges"]:
Expand Down
3 changes: 2 additions & 1 deletion imod/templates/mf6/sim-nam.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ begin options
{%- if nocheck is defined %} nocheck
{% endif %}
{%- if memory_print_option is defined %} memory_print_option {{memory_print_option}}
{% endif %}
{%- if hpc_filein is defined %} hpc6 filein {{hpc_filein}}
{% endif -%}
end options

Expand All @@ -20,7 +22,6 @@ begin exchanges
{%- for exgtype, exgfile, exgmnamea, exgmnameb in exchanges %}
{{exgtype}} {{exgfile}} {{exgmnamea}} {{exgmnameb}}
{%- endfor %}

end exchanges

{% for solutiongroup in solutiongroups %}begin solutiongroup {{loop.index}}
Expand Down
9 changes: 9 additions & 0 deletions imod/templates/mf6/utl-hpc.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
begin options
{% if print_tabe is defined %} print_table
{% endif -%}
end options

begin partitions
{% for mname, mrank in partitions %} {{mname}} {{mrank}}
{% endfor -%}
end partitions
3 changes: 1 addition & 2 deletions imod/tests/test_mf6/test_ex01_twri.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def test_gwfmodel_render(twri_model, tmp_path):
def test_simulation_render(twri_model):
simulation = twri_model
write_context = WriteContext(".")
actual = simulation._render(write_context)
actual = simulation._render(write_context, None)

expected = textwrap.dedent(
"""\
Expand All @@ -394,7 +394,6 @@ def test_simulation_render(twri_model):
end models

begin exchanges

end exchanges

begin solutiongroup 1
Expand Down
1 change: 0 additions & 1 deletion imod/tests/test_mf6/test_mf6_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ def test_exchanges_in_simulation_file(self, transient_twri_model, tmp_path):
exchanges
GWF6-GWF6 GWF_1_0_GWF_1_1.gwfgwf GWF_1_0 GWF_1_1
GWF6-GWF6 GWF_1_1_GWF_1_2.gwfgwf GWF_1_1 GWF_1_2

end exchanges
"""
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import textwrap
from filecmp import dircmp
from pathlib import Path

Expand Down Expand Up @@ -198,6 +199,50 @@ def test_split_flow_and_transport_model_evaluate_output_with_species(
)


def test_split_flow_and_transport_model_evaluate_hpc_filein(
tmp_path: Path, flow_transport_simulation: Modflow6Simulation
):
simulation = flow_transport_simulation

flow_model = simulation["flow"]
active = flow_model.domain

submodel_labels = zeros_like(active)
submodel_labels = submodel_labels.drop_vars("layer")
submodel_labels.values[:, :, 15:] = 1
submodel_labels = submodel_labels.sel(layer=0, drop=True)

submodel_label_to_mpi_rank = {0: 10, 1: 100}

new_simulation = simulation.split(
submodel_labels, submodel_label_to_mpi_rank=submodel_label_to_mpi_rank
)
new_simulation.write(tmp_path, binary=False, write_hpc_file=True)

expected_partitions_block = textwrap.dedent(
"""\
begin partitions
flow_0 10
tpt_a_0 10
tpt_b_0 10
tpt_c_0 10
tpt_d_0 10
flow_1 100
tpt_a_1 100
tpt_b_1 100
tpt_c_1 100
tpt_d_1 100
end partitions
"""
)

with open(tmp_path / "mfsim.hpc", mode="r") as hpc_filein:
hpcfile_content = hpc_filein.read()

# Assert
assert expected_partitions_block in hpcfile_content


@pytest.mark.parametrize(
"advection_pkg", [AdvectionCentral, AdvectionTVD, AdvectionUpstream]
)
Expand Down
Loading