diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index b150fe553..7c69701ad 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -5,18 +5,19 @@ from numpy.typing import DTypeLike from mpi4py import MPI +from httomo.data.hdf._utils.reslice import reslice_memory_estimator from httomo.runner.section import Section, determine_section_padding from httomo.utils import _get_slicing_dim, make_3d_shape_from_shape -def calculate_section_chunk_shape( +def calculate_section_input_chunk_shape( comm: MPI.Comm, global_shape: Tuple[int, int, int], slicing_dim: int, padding: Tuple[int, int], ) -> Tuple[int, int, int]: """ - Calculate chunk shape (w/ or w/o padding) for a section. + Calculate the shape of the section input chunk w/ or w/o padding. """ start = round((global_shape[slicing_dim] / comm.size) * comm.rank) stop = round((global_shape[slicing_dim] / comm.size) * (comm.rank + 1)) @@ -26,29 +27,30 @@ def calculate_section_chunk_shape( return make_3d_shape_from_shape(shape) -def calculate_section_chunk_bytes( +def calculate_section_output_chunk_shape( chunk_shape: Tuple[int, int, int], - dtype: DTypeLike, section: Section, -) -> int: +) -> Tuple[int, int, int]: """ - Calculate the number of bytes in the section output chunk that is written to the store. Ths - accounts for data's non-slicing dims changing during processing, which changes the chunk - shape for the section and thus affects the number of bytes in the chunk. + Calculate the shape of the section output chunk that is written to the store. This + accounts for the data's non-slicing dims changing during processing, which changes the + chunk shape for the section. """ slicing_dim = _get_slicing_dim(section.pattern) - 1 non_slice_dims_list = list(chunk_shape) non_slice_dims_list.pop(slicing_dim) - non_slice_dims = (non_slice_dims_list[0], non_slice_dims_list[1]) + input_non_slice_dims = (non_slice_dims_list[0], non_slice_dims_list[1]) + output_non_slice_dims = input_non_slice_dims for method in section.methods: if method.memory_gpu is None: continue - non_slice_dims = method.calculate_output_dims(non_slice_dims) + output_non_slice_dims = method.calculate_output_dims(input_non_slice_dims) - return int( - np.prod(non_slice_dims) * chunk_shape[slicing_dim] * np.dtype(dtype).itemsize - ) + output_chunk_shape = list(output_non_slice_dims) + output_chunk_shape.insert(slicing_dim, chunk_shape[slicing_dim]) + + return make_3d_shape_from_shape(output_chunk_shape) class DataSetStoreBacking(Enum): @@ -56,23 +58,6 @@ class DataSetStoreBacking(Enum): File = 2 -def _non_last_section_in_pipeline( - memory_limit_bytes: int, - write_chunk_bytes: int, - read_chunk_bytes: int, -) -> DataSetStoreBacking: - """ - Calculate backing of dataset store for non-last sections in pipeline - """ - if ( - memory_limit_bytes > 0 - and write_chunk_bytes + read_chunk_bytes >= memory_limit_bytes - ): - return DataSetStoreBacking.File - - return DataSetStoreBacking.RAM - - def determine_store_backing( comm: MPI.Comm, sections: List[Section], @@ -84,7 +69,7 @@ def determine_store_backing( # Get chunk shape created by reader of section `n` (the current section) that will account # for padding. This chunk shape is based on the chunk shape written by the writer of # section `n - 1` (the previous section) - padded_input_chunk_shape = calculate_section_chunk_shape( + padded_input_chunk_shape = calculate_section_input_chunk_shape( comm=comm, global_shape=global_shape, slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, @@ -96,7 +81,7 @@ def determine_store_backing( # Get unpadded chunk shape input to current section (for calculation of bytes in output # chunk for the current section) - input_chunk_shape = calculate_section_chunk_shape( + input_chunk_shape = calculate_section_input_chunk_shape( comm=comm, global_shape=global_shape, slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, @@ -106,21 +91,38 @@ def determine_store_backing( # Get the number of bytes in the input chunk to the section w/ potential modifications to # the non-slicing dims, to then determine the number of bytes in the output chunk written # by the current section - output_chunk_bytes = calculate_section_chunk_bytes( + output_chunk_shape = calculate_section_output_chunk_shape( chunk_shape=input_chunk_shape, - dtype=dtype, section=sections[section_idx], ) + output_chunk_bytes = int(np.prod(output_chunk_shape) * np.dtype(dtype).itemsize) + + # If a reslice operation would occur in moving from the current section to the next + # section, then calculate the number of bytes the reslice operation would take, given the + # input to it (which would be the output chunk of the current section) + reslice_bytes = 0 + if ( + comm.size > 1 + and section_idx < len(sections) - 1 + and sections[section_idx].pattern != sections[section_idx + 1].pattern + ): + ring_algorithm_bytes, reslice_output_bytes = reslice_memory_estimator( + output_chunk_shape, + dtype, + _get_slicing_dim(sections[section_idx].pattern), + _get_slicing_dim(sections[section_idx + 1].pattern), + comm, + ) + reslice_bytes += ring_algorithm_bytes + reslice_output_bytes send_buffer = np.zeros(1, dtype=bool) recv_buffer = np.zeros(1, dtype=bool) - store_backing = _non_last_section_in_pipeline( - memory_limit_bytes=memory_limit_bytes, - read_chunk_bytes=padded_input_chunk_bytes, - write_chunk_bytes=output_chunk_bytes, - ) - if store_backing is DataSetStoreBacking.File: + if ( + memory_limit_bytes > 0 + and padded_input_chunk_bytes + output_chunk_bytes + reslice_bytes + >= memory_limit_bytes + ): send_buffer[0] = True # do a logical OR of all the enum variants across the processes diff --git a/tests/runner/test_dataset_store_backing.py b/tests/runner/test_dataset_store_backing.py index 354545f09..bb4123585 100644 --- a/tests/runner/test_dataset_store_backing.py +++ b/tests/runner/test_dataset_store_backing.py @@ -7,10 +7,11 @@ from httomo.runner.dataset_store_backing import ( DataSetStoreBacking, - calculate_section_chunk_shape, - calculate_section_chunk_bytes, + calculate_section_input_chunk_shape, + calculate_section_output_chunk_shape, determine_store_backing, ) +from httomo.runner.output_ref import OutputRef from httomo.runner.pipeline import Pipeline from httomo.runner.section import sectionize from httomo.utils import make_3d_shape_from_shape @@ -63,7 +64,7 @@ def test_calculate_section_chunk_shape( expected_chunk_shape[section_slicing_dim] = ( slicing_dim_len + section_padding[0] + section_padding[1] ) - section_chunk_shape = calculate_section_chunk_shape( + section_chunk_shape = calculate_section_input_chunk_shape( comm=mock_global_comm, global_shape=GLOBAL_SHAPE, slicing_dim=section_slicing_dim, @@ -108,11 +109,13 @@ def test_calculate_section_chunk_bytes_output_dims_change(mocker: MockerFixture) # Check that the number of bytes in the chunk accounts for the non-slicing dims change by # the method in the section - section_output_chunk_bytes = calculate_section_chunk_bytes( + section_output_chunk_shape = calculate_section_output_chunk_shape( chunk_shape=SECTION_INPUT_CHUNK_SHAPE, - dtype=DTYPE, section=sections[0], ) + section_output_chunk_bytes = ( + np.prod(section_output_chunk_shape) * np.dtype(DTYPE).itemsize + ) assert section_output_chunk_bytes == EXPECTED_SECTION_OUTPUT_CHUNK_BYTES @@ -170,11 +173,13 @@ def test_calculate_section_chunk_bytes_output_dims_change_and_swap( # Check that the number of bytes in the chunk accounts for the non-slicing dims change by # the method in the section - section_output_chunk_bytes = calculate_section_chunk_bytes( + section_output_chunk_shape = calculate_section_output_chunk_shape( chunk_shape=SECTION_INPUT_CHUNK_SHAPE, - dtype=DTYPE, section=sections[0], ) + section_output_chunk_bytes = ( + np.prod(section_output_chunk_shape) * np.dtype(DTYPE).itemsize + ) assert section_output_chunk_bytes == EXPECTED_SECTION_OUTPUT_CHUNK_BYTES @@ -186,7 +191,7 @@ def test_calculate_section_chunk_bytes_output_dims_change_and_swap( ], ids=["6MB-limit-file-backing", "7MB-limit-ram-backing"], ) -def test_determine_store_backing_non_last_section_pipeline_single_proc( +def test_determine_store_backing_reslice_single_proc( mocker: MockerFixture, memory_limit: int, expected_store_backing: DataSetStoreBacking, @@ -198,6 +203,7 @@ def test_determine_store_backing_non_last_section_pipeline_single_proc( # The dtype and shape combined makes: # - the write chunk ~3.4MB # - the read chunk also ~3.4MB + # - reslice shouldn't occur due to running with a single process DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) @@ -213,13 +219,109 @@ def test_determine_store_backing_non_last_section_pipeline_single_proc( ) sections = sectionize(pipeline) - # For execution of non-last sections in pipelines, the writer must take into account that a - # copy of the chunk is made by the reader of the following section. Therefore, two copies - # of the chunk must be taken into account when deciding the backing of the store. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (6 * 1024**2, DataSetStoreBacking.File), + (7 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["6MB-limit-file-backing", "7MB-limit-ram-backing"], +) +def test_determine_store_backing_no_reslice_single_proc( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape + # + # The dtype and shape combined makes: + # - the write chunk ~3.4MB + # - the read chunk also ~3.4MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method( + mocker=mocker, + method_name="m2", + pattern=Pattern.projection, + some_param=(OutputRef(m1, "some-param")), + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.mpi +@pytest.mark.skipif( + MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" +) +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (6 * 1024**2, DataSetStoreBacking.File), + (7 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["6MB-limit-file-backing", "7MB-limit-ram-backing"], +) +def test_determine_store_backing_reslice_two_procs( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For two processes, chunk shape = half of global shape # - # Note that section 0 is only the section that is "not the last section", so it's the only - # one that will need to account for two copies of the chunk, and thus the main target of - # the test. Hence, why `section_idx=0` is given. + # The dtype and shape combined makes: + # - the write chunk ~1.7MB + # - the read chunk also ~1.7MB + # - the output of reslice also ~1.7MB + # - the intermediate data created by reslice algorithm ~1.7MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + store_backing = determine_store_backing( comm=COMM, sections=sections, @@ -243,7 +345,7 @@ def test_determine_store_backing_non_last_section_pipeline_single_proc( ], ids=["3MB-limit-file-backing", "4MB-limit-ram-backing"], ) -def test_determine_store_backing_non_last_section_pipeline_two_procs( +def test_determine_store_backing_no_reslice_two_procs( mocker: MockerFixture, memory_limit: int, expected_store_backing: DataSetStoreBacking, @@ -261,7 +363,12 @@ def test_determine_store_backing_non_last_section_pipeline_two_procs( # Define dummy loader and method wrapper objects loader = make_test_loader(mocker=mocker) m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) - m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + m2 = make_test_method( + mocker=mocker, + method_name="m2", + pattern=Pattern.projection, + some_param=(OutputRef(m1, "some-param")), + ) # Get list of section objects that represent pipeline pipeline = Pipeline( @@ -270,13 +377,61 @@ def test_determine_store_backing_non_last_section_pipeline_two_procs( ) sections = sectionize(pipeline) - # For exeuction of non-last sections in pipelines, the writer must take into account that a - # copy of the chunk is made by the reader of the following section. Therefore, two copies - # of the chunk must be taken into account when deciding the backing of the store. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (41 * 1024**2, DataSetStoreBacking.File), + (42 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["41MB-limit-file-backing", "42MB-limit-ram-backing"], +) +def test_determine_store_backing_large_padding_reslice_single_proc( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape # - # Note that section 0 is only the section that is "not the last section", so it's the only - # one that will need to account for two copies of the chunk, and thus the main target of - # the test. Hence, why `section_idx=0` is given. + # The dtype and shape combined makes: + # - the write chunk ~3.4MB + # - the padded input chunk ~37.7MB (110 * 300 * 300 * 4 / (1024 ** 2)) + # - reslice shouldn't occur due to running with a single process + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + PADDING = (50, 50) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method( + mocker=mocker, method_name="m1", pattern=Pattern.projection, padding=True + ) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + mocker.patch.object( + target=m1, + attribute="calculate_padding", + return_value=PADDING, + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + store_backing = determine_store_backing( comm=COMM, sections=sections, @@ -296,7 +451,7 @@ def test_determine_store_backing_non_last_section_pipeline_two_procs( ], ids=["41MB-limit-file-backing", "42MB-limit-ram-backing"], ) -def test_determine_store_backing_non_last_section_pipeline_large_padding_single_proc( +def test_determine_store_backing_large_padding_no_reslice_single_proc( mocker: MockerFixture, memory_limit: int, expected_store_backing: DataSetStoreBacking, @@ -318,7 +473,12 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_single_ m1 = make_test_method( mocker=mocker, method_name="m1", pattern=Pattern.projection, padding=True ) - m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + m2 = make_test_method( + mocker=mocker, + method_name="m2", + pattern=Pattern.projection, + some_param=(OutputRef(m1, "some-param")), + ) mocker.patch.object( target=m1, attribute="calculate_padding", @@ -333,13 +493,66 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_single_ sections = sectionize(pipeline) print(sections) - # For execution of non-last sections which have non-zero padding, the reader creates a - # padded copy of the chunk that is made. Therefore, two copies of the chunk must be taken - # into account when deciding the backing of the store. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.mpi +@pytest.mark.skipif( + MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" +) +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (41 * 1024**2, DataSetStoreBacking.File), + (42 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["41MB-limit-file-backing", "42MB-limit-ram-backing"], +) +def test_determine_store_backing_large_padding_reslice_two_procs( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For two processes, chunk shape = half of global shape # - # Note that section 0 is the only section of the two produced that is "not the last - # section", so it's the only one that will need to account for two copies of the chunk, and - # thus the main target of the test. Hence, why `section_idx=0` is given. + # The dtype and shape combined makes: + # - the write chunk ~1.7MB + # - the padded input chunk ~36.0MB (105 * 300 * 300 * 4 / (1024 ** 2)) + # - the output of reslice also ~1.7MB + # - the intermediate data created by reslice algorithm ~1.7MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + PADDING = (50, 50) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method( + mocker=mocker, method_name="m1", pattern=Pattern.projection, padding=True + ) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + mocker.patch.object( + target=m1, + attribute="calculate_padding", + return_value=PADDING, + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + store_backing = determine_store_backing( comm=COMM, sections=sections, @@ -363,7 +576,7 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_single_ ], ids=["37MB-limit-file-backing", "38MB-limit-ram-backing"], ) -def test_determine_store_backing_non_last_section_pipeline_large_padding_two_procs( +def test_determine_store_backing_large_padding_no_reslice_two_procs( mocker: MockerFixture, memory_limit: int, expected_store_backing: DataSetStoreBacking, @@ -385,7 +598,12 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_two_pro m1 = make_test_method( mocker=mocker, method_name="m1", pattern=Pattern.projection, padding=True ) - m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + m2 = make_test_method( + mocker=mocker, + method_name="m2", + pattern=Pattern.projection, + some_param=(OutputRef(m1, "some-param")), + ) mocker.patch.object( target=m1, attribute="calculate_padding", @@ -399,13 +617,6 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_two_pro ) sections = sectionize(pipeline) - # For execution of non-last sections which have non-zero padding, the reader creates a - # padded copy of the chunk that is made. Therefore, two copies of the chunk must be taken - # into account when deciding the backing of the store. - # - # Note that section 0 is the only section of the two produced that is "not the last - # section", so it's the only one that will need to account for two copies of the chunk, and - # thus the main target of the test. Hence, why `section_idx=0` is given. store_backing = determine_store_backing( comm=COMM, sections=sections,