diff --git a/pyproject.toml b/pyproject.toml index 346736f6..60c26f3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,11 +18,13 @@ dependencies = [ "qcore-utils>=2025.12.2", "source_modelling>=2025.12.1", # Data Formats + "dask", + "flacarray", "geopandas", + "h5py", "pandas[parquet, hdf5]", "pyyaml", "xarray[io]", - # Numerics "numpy", "scipy", @@ -35,7 +37,6 @@ dependencies = [ "schema", # For loading realisations "structlog", # Logging. "psutil", # To get the CPU affinity for jobs - ] [project.optional-dependencies] @@ -64,6 +65,7 @@ generate-stoch = "workflow.scripts.generate_stoch:app" merge-ts = "workflow.scripts.merge_ts:app" hf-sim = "workflow.scripts.hf_sim:app" bb-sim = "workflow.scripts.bb_sim:app" +compress-waveform = "workflow.scripts.compress_waveform:app" im-calc = "workflow.scripts.im_calc:app" check-srf = "workflow.scripts.check_srf:app" check-domain = "workflow.scripts.check_domain:app" diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py new file mode 100644 index 00000000..add99c2e --- /dev/null +++ b/tests/test_compress_waveform.py @@ -0,0 +1,74 @@ +from pathlib import Path + +import numpy as np +import xarray as xr + +from workflow import waveform +from workflow.scripts.compress_waveform import ( + compress_waveform, +) + +# Constants for test data generation +N_COMPONENTS, N_STATIONS, N_TIME = 3, 5, 1000 +DT = 0.05 + + +def _make_test_dataset() -> xr.Dataset: + """Create a simple synthetic waveform dataset for testing.""" + time = np.arange(N_TIME) * DT + waveform = ( + np.sin(time * 2 * np.pi * 1.0) + + np.random.default_rng(42).standard_normal((N_COMPONENTS, N_STATIONS, N_TIME)) + * 0.1 + ) + + return xr.Dataset( + {"waveform": (["component", "station", "time"], waveform.astype(np.float32))}, + coords={ + "component": ["x", "y", "z"], + "station": [f"STA{i:02d}" for i in range(N_STATIONS)], + "time": time, + "lat": ("station", np.linspace(-45, -43, N_STATIONS)), + }, + attrs={"units": "m/s", "source": "test_gen"}, + ) + + +def test_waveform_roundtrip_integrity(tmp_path: Path) -> None: + """Verify waveform values and metadata survive the compression roundtrip.""" + with _make_test_dataset() as ds: + input_path = tmp_path / "input.h5" + original_attrs = ds.attrs + ds.to_netcdf(input_path, engine="h5netcdf") + output_path = tmp_path / "output.h5" + + compress_waveform(input_path, output_path) + restored = waveform.load_waveform_dataset(output_path).compute() + + restored_subset = {k: v for k, v in restored.attrs.items() if k in original_attrs} + assert restored_subset == original_attrs, ( + "Restored attributes do not match original attributes." + ) + + for coord in ds.coords: + np.testing.assert_array_equal(restored[coord].values, ds[coord].values) + + xr.testing.assert_allclose(restored, ds, atol=5e-4) + + +def test_compression_efficiency(tmp_path: Path) -> None: + """Verify the compressed file is actually smaller than the raw values.""" + input_path = tmp_path / "input.h5" + output_path = tmp_path / "output.h5" + + with _make_test_dataset() as ds: + ds.to_netcdf(input_path, engine="h5netcdf") + + compress_waveform(input_path, output_path) + + raw_size = input_path.stat().st_size + compressed_size = output_path.stat().st_size + + assert compressed_size < raw_size, ( + f"Compression failed to reduce size: {compressed_size} >= {raw_size}" + ) diff --git a/uv.lock b/uv.lock index 444957db..d43de148 100644 --- a/uv.lock +++ b/uv.lock @@ -420,6 +420,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/73/86/43fa9f15c5b9fb6e82620428827cd3c284aa933431405d1bcf5231ae3d3e/cligj-0.7.2-py3-none-any.whl", hash = "sha256:c1ca117dbce1fe20a5809dc96f01e1c2840f6dcc939b3ddbb1111bf330ba82df", size = 7069, upload-time = "2021-05-28T21:23:26.877Z" }, ] +[[package]] +name = "cloudpickle" +version = "3.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/27/fb/576f067976d320f5f0114a8d9fa1215425441bb35627b1993e5afd8111e5/cloudpickle-3.1.2.tar.gz", hash = "sha256:7fda9eb655c9c230dab534f1983763de5835249750e85fbcef43aaa30a9a2414", size = 22330, upload-time = "2025-11-03T09:25:26.604Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/88/39/799be3f2f0f38cc727ee3b4f1445fe6d5e4133064ec2e4115069418a5bb6/cloudpickle-3.1.2-py3-none-any.whl", hash = "sha256:9acb47f6afd73f60dc1df93bb801b472f05ff42fa6c84167d25cb206be1fbf4a", size = 22228, upload-time = "2025-11-03T09:25:25.534Z" }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -588,6 +597,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e7/05/c19819d5e3d95294a6f5947fb9b9629efb316b96de511b418c53d245aae6/cycler-0.12.1-py3-none-any.whl", hash = "sha256:85cef7cff222d8644161529808465972e51340599459b8ac3ccbac5a854e0d30", size = 8321, upload-time = "2023-10-07T05:32:16.783Z" }, ] +[[package]] +name = "dask" +version = "2026.1.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "cloudpickle" }, + { name = "fsspec" }, + { name = "packaging" }, + { name = "partd" }, + { name = "pyyaml" }, + { name = "toolz" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/bd/52/b0f9172b22778def907db1ff173249e4eb41f054b46a9c83b1528aaf811f/dask-2026.1.2.tar.gz", hash = "sha256:1136683de2750d98ea792670f7434e6c1cfce90cab2cc2f2495a9e60fd25a4fc", size = 10997838, upload-time = "2026-01-30T21:04:20.54Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/23/d39ccc4ed76222db31530b0a7d38876fdb7673e23f838e8d8f0ed4651a4f/dask-2026.1.2-py3-none-any.whl", hash = "sha256:46a0cf3b8d87f78a3d2e6b145aea4418a6d6d606fe6a16c79bd8ca2bb862bc91", size = 1482084, upload-time = "2026-01-30T21:04:18.363Z" }, +] + [[package]] name = "decorator" version = "5.2.1" @@ -736,6 +763,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/60/14/5ef47002ef19bd5cfbc7a74b21c30ef83f22beb80609314ce0328989ceda/fiona-1.10.1-cp313-cp313-win_amd64.whl", hash = "sha256:15751c90e29cee1e01fcfedf42ab85987e32f0b593cf98d88ed52199ef5ca623", size = 24461486, upload-time = "2024-09-16T20:15:13.399Z" }, ] +[[package]] +name = "flacarray" +version = "0.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/03/50/14506dea04a52f1a292f3e183209823fc249de19514dfa4532c2d2b522f6/flacarray-0.4.0.tar.gz", hash = "sha256:6da027423733972ce542108d197aff579a089c5c92d480930138f3ce81d02aeb", size = 1304511, upload-time = "2026-03-09T07:12:18.747Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/04/53/6262f9135d9a8b510c847187d440665d121fb6dbe3e6a7e938299f7c0fc8/flacarray-0.4.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:d468f7ce05dd3bddc53a46262055bd7ced112e772103c9abebca8a7db80fba83", size = 357563, upload-time = "2026-03-09T07:12:07.161Z" }, + { url = "https://files.pythonhosted.org/packages/6a/32/7770bbcf62f654128f7651813083d02c80c55c086ae5556daab0d7570a04/flacarray-0.4.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ebe6a39deb21377c30f06dfc90e47104ba569ccd0e6a0bdfa3fa24d865d8914c", size = 311620, upload-time = "2026-03-09T07:12:08.334Z" }, + { url = "https://files.pythonhosted.org/packages/6e/cd/ebbe2d2b81b1da7522370ed814b1fbb659cc5bddf18c3100796678e95fca/flacarray-0.4.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:65a8bc70a7558acbd049b7947e3e89779c629eb938999b2f9542b1f26a917f7d", size = 430258, upload-time = "2026-03-09T07:12:09.637Z" }, + { url = "https://files.pythonhosted.org/packages/5d/7d/363dd6fb58d1ee78318ec56a4f60db0c38f7e0395d08872e7544f08a4d8a/flacarray-0.4.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:e65532ef89a294150e1a757eaba8213b8d60fd38d90c083a427de6f097b0119c", size = 357009, upload-time = "2026-03-09T07:12:10.823Z" }, + { url = "https://files.pythonhosted.org/packages/2b/7c/da49833ea9f378412c2f61478205998e80abdc544b178ffb527fcc0e9fd7/flacarray-0.4.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:0d901059f07354eb421b9859a2484a7db9bb103d4b78567b93eee7f6bc1eb8f6", size = 311063, upload-time = "2026-03-09T07:12:12.289Z" }, + { url = "https://files.pythonhosted.org/packages/fe/1f/acb88c1f54b6ae1f4ecab9c7d925693a2f8fa5f0dd3f2eaf318d59619a8c/flacarray-0.4.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:632e486437ebd9029a207a45c403e57365075ecc9507f852a1ae29e06c708230", size = 431720, upload-time = "2026-03-09T07:12:13.463Z" }, + { url = "https://files.pythonhosted.org/packages/96/c4/c39c8078b15b53a951e45d817a42eedc5797a88731976d03c0b4e835b08c/flacarray-0.4.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:e70d829e6f3d10c0c708b7b3b8e451c1eac2a18eb6d73580e83953037d56f8bc", size = 311517, upload-time = "2026-03-09T07:12:15.134Z" }, + { url = "https://files.pythonhosted.org/packages/cb/f4/bd371baca60410d3e715866b64cc8eac0aed36b6f7180753fa73530b88de/flacarray-0.4.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4da02341d7963f7b8182adff7e96283ccf667dab14bc95f64eb169a495d07af9", size = 432430, upload-time = "2026-03-09T07:12:16.484Z" }, +] + [[package]] name = "flexcache" version = "0.3" @@ -1091,6 +1137,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/30/a8/e61a8c2b3cc7a597073d9cde1fcbb567e9d827f1db30c93cf80422eac70d/llvmlite-0.46.0-cp314-cp314-win_amd64.whl", hash = "sha256:7821eda3ec1f18050f981819756631d60b6d7ab1a6cf806d9efefbe3f4082d61", size = 39153056, upload-time = "2025-12-08T18:15:33.938Z" }, ] +[[package]] +name = "locket" +version = "1.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2f/83/97b29fe05cb6ae28d2dbd30b81e2e402a3eed5f460c26e9eaa5895ceacf5/locket-1.0.0.tar.gz", hash = "sha256:5c0d4c052a8bbbf750e056a8e65ccd309086f4f0f18a2eac306a8dfa4112a632", size = 4350, upload-time = "2022-04-20T22:04:44.312Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/db/bc/83e112abc66cd466c6b83f99118035867cecd41802f8d044638aa78a106e/locket-1.0.0-py2.py3-none-any.whl", hash = "sha256:b6c819a722f7b6bd955b80781788e4a66a55628b858d347536b7e81325a3a5e3", size = 4398, upload-time = "2022-04-20T22:04:42.23Z" }, +] + [[package]] name = "lxml" version = "6.0.2" @@ -1807,6 +1862,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3e/b6/3fee2205ce1333eaa85fdf8500de4e412bbc112d77c9b0045cc8d5a6fcec/parse-1.21.0-py2.py3-none-any.whl", hash = "sha256:6d81f7bae0ab25fd72818375c4a9c71c8705256bfc42e8725be609cf8b904aed", size = 20277, upload-time = "2026-02-05T18:33:39.673Z" }, ] +[[package]] +name = "partd" +version = "1.4.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "locket" }, + { name = "toolz" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b2/3a/3f06f34820a31257ddcabdfafc2672c5816be79c7e353b02c1f318daa7d4/partd-1.4.2.tar.gz", hash = "sha256:d022c33afbdc8405c226621b015e8067888173d85f7f5ecebb3cafed9a20f02c", size = 21029, upload-time = "2024-05-06T19:51:41.945Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/71/e7/40fb618334dcdf7c5a316c0e7343c5cd82d3d866edc100d98e29bc945ecd/partd-1.4.2-py3-none-any.whl", hash = "sha256:978e4ac767ec4ba5b86c6eaa52e5a2a3bc748a2ca839e8cc798f1cc6ce6efb0f", size = 18905, upload-time = "2024-05-06T19:51:39.271Z" }, +] + [[package]] name = "pathspec" version = "1.0.4" @@ -2927,6 +2995,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/44/6f/7120676b6d73228c96e17f1f794d8ab046fc910d781c8d151120c3f1569e/toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", size = 16588, upload-time = "2020-11-01T01:40:20.672Z" }, ] +[[package]] +name = "toolz" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/11/d6/114b492226588d6ff54579d95847662fc69196bdeec318eb45393b24c192/toolz-1.1.0.tar.gz", hash = "sha256:27a5c770d068c110d9ed9323f24f1543e83b2f300a687b7891c1a6d56b697b5b", size = 52613, upload-time = "2025-10-17T04:03:21.661Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fb/12/5911ae3eeec47800503a238d971e51722ccea5feb8569b735184d5fcdbc0/toolz-1.1.0-py3-none-any.whl", hash = "sha256:15ccc861ac51c53696de0a5d6d4607f99c210739caf987b5d2054f3efed429d8", size = 58093, upload-time = "2025-10-17T04:03:20.435Z" }, +] + [[package]] name = "tqdm" version = "4.67.3" @@ -3108,7 +3185,10 @@ wheels = [ name = "workflow" source = { editable = "." } dependencies = [ + { name = "dask" }, + { name = "flacarray" }, { name = "geopandas" }, + { name = "h5py" }, { name = "im-calculation" }, { name = "nshmdb" }, { name = "numpy" }, @@ -3149,8 +3229,11 @@ types = [ [package.metadata] requires-dist = [ + { name = "dask" }, { name = "deptry", marker = "extra == 'dev'" }, + { name = "flacarray" }, { name = "geopandas" }, + { name = "h5py" }, { name = "hypothesis", extras = ["numpy"], marker = "extra == 'test'", specifier = ">=6.0.0" }, { name = "im-calculation", specifier = ">=2025.12.5" }, { name = "nshmdb", specifier = ">=2025.12.1" }, diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py new file mode 100644 index 00000000..f9b0041a --- /dev/null +++ b/workflow/scripts/compress_waveform.py @@ -0,0 +1,89 @@ +"""Compress Waveform. + +Description +----------- +Compress a broadband waveform HDF5 file using FLAC compression. + +Inputs +------ +1. A broadband waveform file (HDF5/NetCDF4 format, output of ``bb-sim``). + +Outputs +------- +A compressed waveform file in HDF5 format with FlacArray-encoded waveform data. + +Environment +----------- +Can be run in the cybershake container. Can also be run from your own +computer using the ``compress-waveform`` command which is installed after running +``pip install workflow@git+https://github.com/ucgmsim/workflow``. + +Usage +----- +``compress-waveform WAVEFORM_FFP OUTPUT_FFP`` + +For More Help +------------- +See the output of ``compress-waveform --help``. +""" + +from pathlib import Path +from typing import Annotated + +import flacarray.hdf5 +import h5py +import typer +import xarray as xr + +from qcore import cli +from workflow import log_utils + +app = typer.Typer() + + +@cli.from_docstring(app) +@log_utils.log_call() +def compress_waveform( + waveform_ffp: Annotated[Path, typer.Argument(dir_okay=False, exists=True)], + output_ffp: Annotated[Path, typer.Argument(dir_okay=False, writable=True)], + level: Annotated[int, typer.Option(min=0, max=8)] = 5, + precision: Annotated[int, typer.Option(min=1)] = 4, +) -> None: + """Compress a broadband waveform file using FLAC. + + Parameters + ---------- + waveform_ffp : Path + Path to the input broadband waveform file (HDF5/NetCDF4). + output_ffp : Path + Path to the output compressed HDF5 file. + level : int, optional + FLAC compression level (0-8). Higher values compress more but + are slower. Defaults to 5. + precision : int, optional + FLAC precision level (in significant digits of input data). Higher values compress less but + have more precision. Defaults to 4. + """ + with ( + xr.open_dataset(waveform_ffp, engine="h5netcdf") as broadband, + ): + broadband.drop_vars("waveform").to_netcdf(output_ffp, engine="h5netcdf") + with h5py.File(output_ffp, "a") as hdf: + group = hdf.create_group("_flac_compressed_waveform") + group.attrs["flac_array"] = True + group.attrs["name"] = "waveform" + group.attrs["shape"] = broadband.waveform.shape + group.attrs["dims"] = broadband.waveform.dims + group.attrs["dtype"] = str(broadband.waveform.dtype) + + flacarray.hdf5.write_array( + broadband.waveform.values, + group, + precision=precision, + level=level, + use_threads=True, + ) + + +if __name__ == "__main__": + app() diff --git a/workflow/scripts/hf_sim.py b/workflow/scripts/hf_sim.py index 1b51894d..ea009afb 100644 --- a/workflow/scripts/hf_sim.py +++ b/workflow/scripts/hf_sim.py @@ -33,6 +33,7 @@ """ import concurrent.futures +import hashlib import subprocess import tempfile from collections.abc import Iterable diff --git a/workflow/waveform.py b/workflow/waveform.py new file mode 100644 index 00000000..a048e74c --- /dev/null +++ b/workflow/waveform.py @@ -0,0 +1,162 @@ +"""API for loading compressed waveform datasets.""" + +from numbers import Integral +from pathlib import Path + +import dask.array as da +import flacarray.decompress +import h5py +import numpy as np +import xarray as xr +from numpy.lib import stride_tricks + + +class FlacH5Wrapper: + """An array-like interface for FLAC compressed waveforms that only allocates memory for the compressed raw bytes. + + Parameters + ---------- + group : h5py.Group + HDF5 group to read from + shape : tuple + Shape of output array. + dtype : np.dtype + Data type of output array. + """ + + def __init__( + self, + group: h5py.Group, + shape: tuple, + dtype: np.dtype, + ) -> None: + """Initialise the FLAC hdf5 wrapper. + + + Parameters + ---------- + group : h5py.Group + HDF5 group to read from + shape : tuple + Shape of output array. + dtype : np.dtype + Data type of output array. + """ + self.shape = shape + self.dtype = dtype + self.ndim = len(shape) + self.stream_gains = np.array(group["stream_gains"]) + self.stream_offsets = np.array(group["stream_offsets"]) + self.stream_starts = np.array(group["stream_starts"]) + self.stream_nbytes = np.array(group["stream_bytes"]) + self.raw_bytes = np.array(group["compressed"]) + + def __getitem__(self, key: tuple[int | slice, ...]) -> np.ndarray: + """Getitem implementation that decompresses a slice of the compressed array data. + + + + Parameters + ---------- + key : tuple[int | slice, ...] + indexing key + + Returns + ------- + np.ndarray + Decompressed waveform array. + """ + keep = np.zeros(self.stream_starts.shape, dtype=bool) + stream_indexer = key[:-1] + sample_indexer = key[-1] + keep[stream_indexer] = True + + step = None + if isinstance(sample_indexer, Integral): + # Handle scalar indices, including negative ones. + index = int(sample_indexer) + if index < 0: + index += self.shape[-1] + if index < 0 or index >= self.shape[-1]: + raise IndexError("Sample index out of range") + first_sample = index + last_sample = index + 1 + else: + # Normalise the sample indexer (last dimension). This ensures that + # first_stream_sample/last_stream_sample passed to the decompressor are + # consistent with the actual indexing semantics. + + if not isinstance(sample_indexer, slice): + raise TypeError( + f"Unsupported index type for samples: {type(sample_indexer)!r}" + ) + start, stop, step = sample_indexer.indices(self.shape[-1]) + first_sample = start + last_sample = stop + + decompressed_data, _ = flacarray.decompress.array_decompress_slice( + compressed=self.raw_bytes, + stream_size=self.shape[-1], # assuming last dim is time/samples + stream_starts=self.stream_starts, + stream_nbytes=self.stream_nbytes, + stream_offsets=self.stream_offsets, + stream_gains=self.stream_gains, + keep=keep, + first_stream_sample=first_sample, + last_stream_sample=last_sample, + is_int64=(self.dtype == np.float64), + ) + + if step not in (None, 1): + decompressed_data = decompressed_data[..., ::step] + + dummy = np.zeros(1, dtype=np.int8) + # The following is a "virtual array" with the same shape as + # the theoretical array but the strides (how much memory each + # element consumes) is zero so the effective memory + # consumption of this array is zero. We are using this to get + # an array-like that can calculate the expected shape from the + # indexing operation without requiring potentially tens of gb + # of memory. + virtual_array = stride_tricks.as_strided( + dummy, shape=self.shape, strides=(0,) * len(self.shape) + ) + target_shape = virtual_array[key].shape + + return decompressed_data.reshape(target_shape) + + +def load_waveform_dataset(dataset_ffp: Path) -> xr.Dataset: + """Read a compressed waveform array as an xarray dataset. + + Parameters + ---------- + dataset_ffp : Path + Path to xarray dataset. + + Returns + ------- + xr.Dataset + The xarray dataset read from disk. + """ + ds = xr.open_dataset( + dataset_ffp, engine="h5netcdf", drop_variables=["_flac_compressed_waveform"] + ) + + with h5py.File(dataset_ffp, "r") as h5: + group = h5["_flac_compressed_waveform"] + shape = tuple(group.attrs["shape"]) + dtype = np.dtype(group.attrs["dtype"]) + name = group.attrs["name"] + dims = group.attrs["dims"] + + wrapper = FlacH5Wrapper(group, shape, dtype) + + waveform_da = da.from_array( + wrapper, + chunks="auto", + name=name, + ) + + ds[name] = (dims, waveform_da) + return ds