From b713d919b91e2d9f6e8aab15844fd6d02abc0806 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 15 Mar 2026 23:41:53 +0000 Subject: [PATCH 01/25] Initial plan From a055112afdfc3e50167eb25952af90a1b1496234 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 15 Mar 2026 23:50:01 +0000 Subject: [PATCH 02/25] Add compress-waveform stage using FlacArray compression - New workflow script: workflow/scripts/compress_waveform.py - Reads bb_sim HDF5 output, compresses waveform data with FlacArray - Preserves coordinates and attributes in the compressed output - Registered as compress-waveform CLI command - Added flacarray dependency to pyproject.toml Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com> --- pyproject.toml | 2 + workflow/scripts/compress_waveform.py | 94 +++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 workflow/scripts/compress_waveform.py diff --git a/pyproject.toml b/pyproject.toml index 346736f6..e5fa6bc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "qcore-utils>=2025.12.2", "source_modelling>=2025.12.1", # Data Formats + "flacarray", "geopandas", "pandas[parquet, hdf5]", "pyyaml", @@ -64,6 +65,7 @@ generate-stoch = "workflow.scripts.generate_stoch:app" merge-ts = "workflow.scripts.merge_ts:app" hf-sim = "workflow.scripts.hf_sim:app" bb-sim = "workflow.scripts.bb_sim:app" +compress-waveform = "workflow.scripts.compress_waveform:app" im-calc = "workflow.scripts.im_calc:app" check-srf = "workflow.scripts.check_srf:app" check-domain = "workflow.scripts.check_domain:app" diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py new file mode 100644 index 00000000..9cad43fe --- /dev/null +++ b/workflow/scripts/compress_waveform.py @@ -0,0 +1,94 @@ +"""Compress Waveform. + +Description +----------- +Compress a broadband waveform HDF5 file using FlacArray compression. + +Inputs +------ +1. A broadband waveform file (HDF5/NetCDF4 format, output of ``bb-sim``). + +Outputs +------- +A compressed waveform file in HDF5 format with FlacArray-encoded waveform data. + +Environment +----------- +Can be run in the cybershake container. Can also be run from your own +computer using the ``compress-waveform`` command which is installed after running +``pip install workflow@git+https://github.com/ucgmsim/workflow``. + +Usage +----- +``compress-waveform WAVEFORM_FFP OUTPUT_FFP`` + +For More Help +------------- +See the output of ``compress-waveform --help``. +""" + +from pathlib import Path +from typing import Annotated + +import h5py +import numpy as np +import typer +import xarray as xr +from flacarray import FlacArray + +from qcore import cli +from workflow import log_utils + +app = typer.Typer() + + +@cli.from_docstring(app) +@log_utils.log_call() +def compress_waveform( + waveform_ffp: Annotated[Path, typer.Argument(dir_okay=False, exists=True)], + output_ffp: Annotated[Path, typer.Argument(dir_okay=False, writable=True)], + precision: Annotated[int, typer.Option(min=1)] = 4, + level: Annotated[int, typer.Option(min=0, max=8)] = 5, +) -> None: + """Compress a broadband waveform file using FlacArray. + + Parameters + ---------- + waveform_ffp : Path + Path to the input broadband waveform file (HDF5/NetCDF4). + output_ffp : Path + Path to the output compressed HDF5 file. + precision : int, optional + Number of significant decimal digits to retain in the + compression. Higher values produce more accurate but larger + files. Defaults to 4. + level : int, optional + FLAC compression level (0-8). Higher values compress more but + are slower. Defaults to 5. + """ + broadband = xr.open_dataset(waveform_ffp) + + waveform_data = broadband["waveform"].values + flac_waveform = FlacArray.from_array( + waveform_data, precision=precision, level=level + ) + + with h5py.File(output_ffp, "w") as hdf: + flac_waveform.write_hdf5(hdf.create_group("waveform")) + + for coord_name in broadband.coords: + coord_data = broadband.coords[coord_name].values + if np.issubdtype(coord_data.dtype, np.str_): + coord_data = coord_data.astype(bytes) + hdf.create_dataset(f"coords/{coord_name}", data=coord_data) + + for attr_name, attr_value in broadband.attrs.items(): + hdf.attrs[attr_name] = attr_value + + hdf.attrs["waveform_dims"] = list(broadband["waveform"].dims) + + broadband.close() + + +if __name__ == "__main__": + app() From 29c7d3b69cea8b616e88554a0c7abe5efd29d21b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 15 Mar 2026 23:51:11 +0000 Subject: [PATCH 03/25] Address code review: use dtype.kind check instead of np.issubdtype for string detection, remove unused numpy import Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com> --- workflow/scripts/compress_waveform.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index 9cad43fe..a6a68781 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -31,7 +31,6 @@ from typing import Annotated import h5py -import numpy as np import typer import xarray as xr from flacarray import FlacArray @@ -78,7 +77,7 @@ def compress_waveform( for coord_name in broadband.coords: coord_data = broadband.coords[coord_name].values - if np.issubdtype(coord_data.dtype, np.str_): + if coord_data.dtype.kind == "U": coord_data = coord_data.astype(bytes) hdf.create_dataset(f"coords/{coord_name}", data=coord_data) From 928491a7f2dfc48330460e418d6bcf5e0fa8589a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 00:09:27 +0000 Subject: [PATCH 04/25] Rewrite compress_waveform with int16 scaling, delta encoding, and dask chunking - Scale waveform data to int16 range for efficient FLAC bit-shunting - Apply delta encoding (first differences) to reduce variance for Rice coding - Use dask chunking when loading input dataset - Preserve all xarray metadata (coords, attrs, dims, dtype) - Add decompress_waveform() to reconstruct xr.Dataset from compressed file - Add tests for roundtrip correctness, compression efficiency, and metadata - Add dask dependency for chunked dataset loading Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com> --- pyproject.toml | 1 + tests/test_compress_waveform.py | 128 +++++++++++++++++++++++ workflow/scripts/compress_waveform.py | 143 +++++++++++++++++++++++--- 3 files changed, 255 insertions(+), 17 deletions(-) create mode 100644 tests/test_compress_waveform.py diff --git a/pyproject.toml b/pyproject.toml index e5fa6bc2..089a1ba3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "xarray[io]", # Numerics + "dask", "numpy", "scipy", "shapely", diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py new file mode 100644 index 00000000..5e226ef2 --- /dev/null +++ b/tests/test_compress_waveform.py @@ -0,0 +1,128 @@ +from pathlib import Path + +import numpy as np +import xarray as xr + +from workflow.scripts.compress_waveform import compress_waveform, decompress_waveform + +N_COMPONENTS = 3 +N_STATIONS = 20 +N_TIME = 20_000 +DT = 0.005 + + +def _make_broadband_dataset(rng: np.random.Generator) -> xr.Dataset: + """Create a synthetic broadband waveform dataset resembling bb_sim output. + + The waveform mimics real seismic traces by combining a few + sinusoidal components with different frequencies and adding a + small amount of noise. Real broadband waveforms have strong + temporal autocorrelation, so this synthetic data is a reasonable + stand-in for testing compression behaviour. + """ + time = np.arange(N_TIME) * DT + + # Build smooth, correlated waveforms (sine sweep + harmonics) + freqs = rng.uniform(0.5, 5.0, size=(N_COMPONENTS, N_STATIONS, 5)) + phases = rng.uniform(0, 2 * np.pi, size=(N_COMPONENTS, N_STATIONS, 5)) + amplitudes = rng.uniform(0.001, 0.05, size=(N_COMPONENTS, N_STATIONS, 5)) + + waveform = np.zeros((N_COMPONENTS, N_STATIONS, N_TIME), dtype=np.float32) + for k in range(5): + waveform += ( + amplitudes[..., k : k + 1] + * np.sin( + 2 * np.pi * freqs[..., k : k + 1] * time[np.newaxis, np.newaxis, :] + + phases[..., k : k + 1] + ) + ).astype(np.float32) + + # Small additive noise + waveform += (rng.standard_normal(waveform.shape) * 1e-4).astype(np.float32) + + return xr.Dataset( + {"waveform": (["component", "station", "time"], waveform)}, + coords={ + "component": ("component", ["x", "y", "z"]), + "station": ("station", [f"STA{i:03d}" for i in range(N_STATIONS)]), + "time": ("time", time), + "latitude": ("station", -43.5 + np.arange(N_STATIONS) * 0.1), + "longitude": ("station", 172.5 + np.arange(N_STATIONS) * 0.1), + }, + attrs={"units": "g"}, + ) + + +def test_compress_decompress_roundtrip(tmp_path: Path) -> None: + """Verify that compress → decompress preserves the dataset.""" + rng = np.random.default_rng(12345) + ds = _make_broadband_dataset(rng) + + input_ffp = tmp_path / "broadband.nc" + compressed_ffp = tmp_path / "broadband_compressed.h5" + + ds.to_netcdf(input_ffp, engine="h5netcdf") + compress_waveform(input_ffp, compressed_ffp) + + restored = decompress_waveform(compressed_ffp) + + # Coordinates and attributes are preserved. + assert set(restored.coords) == set(ds.coords) + assert restored.attrs == ds.attrs + assert list(restored.waveform.dims) == list(ds.waveform.dims) + assert restored.waveform.dtype == ds.waveform.dtype + assert restored.waveform.shape == ds.waveform.shape + + # Numerical accuracy: int16 quantisation error is at most 0.5 LSB. + max_abs = float(np.abs(ds.waveform.values).max()) + scale_factor = max_abs / np.iinfo(np.int16).max + max_err = float(np.abs(restored.waveform.values - ds.waveform.values).max()) + assert max_err <= scale_factor, ( + f"Roundtrip error {max_err} exceeds one LSB ({scale_factor})" + ) + + +def test_compression_is_efficient(tmp_path: Path) -> None: + """Verify that the compressed file is meaningfully smaller than the original. + + Smooth, correlated waveforms (like real seismic data) should + compress well with int16-scaling + delta-encoding + FLAC. + """ + rng = np.random.default_rng(12345) + ds = _make_broadband_dataset(rng) + + input_ffp = tmp_path / "broadband.nc" + compressed_ffp = tmp_path / "broadband_compressed.h5" + + ds.to_netcdf(input_ffp, engine="h5netcdf") + compress_waveform(input_ffp, compressed_ffp) + + original_bytes = input_ffp.stat().st_size + compressed_bytes = compressed_ffp.stat().st_size + ratio = original_bytes / compressed_bytes + + # With smooth waveforms, expect at least 3× compression. + assert ratio > 3.0, ( + f"Compression ratio {ratio:.2f}x is too low " + f"(original={original_bytes}, compressed={compressed_bytes})" + ) + + +def test_metadata_preserved(tmp_path: Path) -> None: + """Verify that all coordinate variables round-trip exactly.""" + rng = np.random.default_rng(12345) + ds = _make_broadband_dataset(rng) + + input_ffp = tmp_path / "broadband.nc" + compressed_ffp = tmp_path / "broadband_compressed.h5" + + ds.to_netcdf(input_ffp, engine="h5netcdf") + compress_waveform(input_ffp, compressed_ffp) + restored = decompress_waveform(compressed_ffp) + + for coord_name in ds.coords: + np.testing.assert_array_equal( + restored.coords[coord_name].values, + ds.coords[coord_name].values, + err_msg=f"Coordinate {coord_name!r} differs after roundtrip", + ) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index a6a68781..9e32c620 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -2,7 +2,15 @@ Description ----------- -Compress a broadband waveform HDF5 file using FlacArray compression. +Compress a broadband waveform HDF5 file using FlacArray compression with +int16 rescaling and delta encoding for efficient storage. + +The waveform data is rescaled to fit the full range of a signed 16-bit +integer ([-32768, 32767]) and delta encoded (first differences) before +FLAC compression. This produces small integer residuals that FLAC's +Rice coding compresses very efficiently. All coordinates and attributes +from the input xarray dataset are preserved so the compressed file can +be decompressed back to a complete xarray Dataset. Inputs ------ @@ -31,6 +39,7 @@ from typing import Annotated import h5py +import numpy as np import typer import xarray as xr from flacarray import FlacArray @@ -40,54 +49,154 @@ app = typer.Typer() +INT16_MAX = np.iinfo(np.int16).max + + +def _write_coords( + hdf: h5py.File, + broadband: xr.Dataset, +) -> None: + """Write all coordinate variables to an HDF5 group. + + Parameters + ---------- + hdf : h5py.File + The open HDF5 file to write to. + broadband : xr.Dataset + The source xarray dataset. + """ + coords_grp = hdf.create_group("coords") + for coord_name in broadband.coords: + coord_data = broadband.coords[coord_name].values + if coord_data.dtype.kind == "U": + coord_data = coord_data.astype(bytes) + dset = coords_grp.create_dataset(coord_name, data=coord_data) + dset.attrs["dims"] = list(broadband.coords[coord_name].dims) + + +def _read_coords(hdf: h5py.File) -> dict[str, tuple[tuple[str, ...], np.ndarray]]: + """Read coordinate variables from an HDF5 group. + + Parameters + ---------- + hdf : h5py.File + The open HDF5 file to read from. + + Returns + ------- + dict + A mapping of coordinate names to (dims, values) tuples. + """ + coords: dict[str, tuple[tuple[str, ...], np.ndarray]] = {} + for coord_name in hdf["coords"]: + dset = hdf["coords"][coord_name] + data = dset[:] + if data.dtype.kind == "S": + data = data.astype(str) + dims = tuple(dset.attrs["dims"]) + coords[coord_name] = (dims, data) + return coords + @cli.from_docstring(app) @log_utils.log_call() def compress_waveform( waveform_ffp: Annotated[Path, typer.Argument(dir_okay=False, exists=True)], output_ffp: Annotated[Path, typer.Argument(dir_okay=False, writable=True)], - precision: Annotated[int, typer.Option(min=1)] = 4, level: Annotated[int, typer.Option(min=0, max=8)] = 5, ) -> None: """Compress a broadband waveform file using FlacArray. + The waveform data is rescaled to the signed 16-bit integer range and + delta encoded before FLAC compression. This ensures FLAC's Rice + coding operates on small centred residuals for best compression. + Parameters ---------- waveform_ffp : Path Path to the input broadband waveform file (HDF5/NetCDF4). output_ffp : Path Path to the output compressed HDF5 file. - precision : int, optional - Number of significant decimal digits to retain in the - compression. Higher values produce more accurate but larger - files. Defaults to 4. level : int, optional FLAC compression level (0-8). Higher values compress more but are slower. Defaults to 5. """ - broadband = xr.open_dataset(waveform_ffp) + broadband = xr.open_dataset(waveform_ffp, chunks={"time": 10_000}) - waveform_data = broadband["waveform"].values - flac_waveform = FlacArray.from_array( - waveform_data, precision=precision, level=level - ) + waveform: np.ndarray = broadband["waveform"].values + waveform_dtype = str(waveform.dtype) + + # Scale to fill the int16 range [-32768, 32767] for efficient FLAC + # bit-shunting. Values are stored as int32 because FlacArray only + # accepts int32/int64 integer types. + max_abs = float(np.abs(waveform).max()) + scale_factor = max_abs / INT16_MAX if max_abs > 0 else 1.0 + scaled = np.round(waveform / scale_factor).astype(np.int32) + + # Delta encode along the time axis (last axis): y[n] = x[n] - x[n-1]. + # This centres the distribution around zero with much smaller + # variance, so FLAC's Rice coding needs fewer bits per sample. + delta = np.diff(scaled, axis=-1, prepend=np.zeros_like(scaled[..., :1])) + + flac_waveform = FlacArray.from_array(delta, level=level) with h5py.File(output_ffp, "w") as hdf: flac_waveform.write_hdf5(hdf.create_group("waveform")) - - for coord_name in broadband.coords: - coord_data = broadband.coords[coord_name].values - if coord_data.dtype.kind == "U": - coord_data = coord_data.astype(bytes) - hdf.create_dataset(f"coords/{coord_name}", data=coord_data) + _write_coords(hdf, broadband) for attr_name, attr_value in broadband.attrs.items(): hdf.attrs[attr_name] = attr_value hdf.attrs["waveform_dims"] = list(broadband["waveform"].dims) + hdf.attrs["scale_factor"] = scale_factor + hdf.attrs["delta_encoded"] = True + hdf.attrs["waveform_dtype"] = waveform_dtype broadband.close() +def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: + """Decompress a FlacArray-compressed waveform file to an xarray Dataset. + + Parameters + ---------- + compressed_ffp : Path + Path to the compressed HDF5 file produced by ``compress_waveform``. + + Returns + ------- + xr.Dataset + The decompressed waveform dataset with all original coordinates + and attributes restored. + """ + with h5py.File(compressed_ffp, "r") as hdf: + flac_waveform = FlacArray.read_hdf5(hdf["waveform"]) + delta = flac_waveform.to_array() + + # Undo delta encoding: cumulative sum restores the scaled values. + scaled = np.cumsum(delta, axis=-1) + + # Rescale back to the original floating-point range. + scale_factor = hdf.attrs["scale_factor"] + waveform_dtype = np.dtype(str(hdf.attrs["waveform_dtype"])) + waveform = scaled.astype(waveform_dtype) * waveform_dtype.type( + scale_factor + ) + + dims = list(hdf.attrs["waveform_dims"]) + coords = _read_coords(hdf) + attrs = { + k: v + for k, v in hdf.attrs.items() + if k not in ("waveform_dims", "scale_factor", "delta_encoded", "waveform_dtype") + } + + return xr.Dataset( + {"waveform": (dims, waveform)}, + coords=coords, + attrs=attrs, + ) + + if __name__ == "__main__": app() From a238447d7e94affea551213ff55e66621cdc5ab1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 00:10:54 +0000 Subject: [PATCH 05/25] Address code review: rename constant, document dask materialization, extract test constant Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com> --- tests/test_compress_waveform.py | 9 +++++---- workflow/scripts/compress_waveform.py | 7 +++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py index 5e226ef2..216f054a 100644 --- a/tests/test_compress_waveform.py +++ b/tests/test_compress_waveform.py @@ -8,6 +8,7 @@ N_COMPONENTS = 3 N_STATIONS = 20 N_TIME = 20_000 +N_HARMONICS = 5 DT = 0.005 @@ -23,12 +24,12 @@ def _make_broadband_dataset(rng: np.random.Generator) -> xr.Dataset: time = np.arange(N_TIME) * DT # Build smooth, correlated waveforms (sine sweep + harmonics) - freqs = rng.uniform(0.5, 5.0, size=(N_COMPONENTS, N_STATIONS, 5)) - phases = rng.uniform(0, 2 * np.pi, size=(N_COMPONENTS, N_STATIONS, 5)) - amplitudes = rng.uniform(0.001, 0.05, size=(N_COMPONENTS, N_STATIONS, 5)) + freqs = rng.uniform(0.5, 5.0, size=(N_COMPONENTS, N_STATIONS, N_HARMONICS)) + phases = rng.uniform(0, 2 * np.pi, size=(N_COMPONENTS, N_STATIONS, N_HARMONICS)) + amplitudes = rng.uniform(0.001, 0.05, size=(N_COMPONENTS, N_STATIONS, N_HARMONICS)) waveform = np.zeros((N_COMPONENTS, N_STATIONS, N_TIME), dtype=np.float32) - for k in range(5): + for k in range(N_HARMONICS): waveform += ( amplitudes[..., k : k + 1] * np.sin( diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index 9e32c620..fe50d6a1 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -49,7 +49,7 @@ app = typer.Typer() -INT16_MAX = np.iinfo(np.int16).max +_INT16_MAX = np.iinfo(np.int16).max def _write_coords( @@ -123,6 +123,9 @@ def compress_waveform( """ broadband = xr.open_dataset(waveform_ffp, chunks={"time": 10_000}) + # Materialise the waveform into a numpy array. Dask chunking above + # lets xarray parse file metadata lazily; we need the full array for + # the FlacArray encoder which operates on contiguous memory. waveform: np.ndarray = broadband["waveform"].values waveform_dtype = str(waveform.dtype) @@ -130,7 +133,7 @@ def compress_waveform( # bit-shunting. Values are stored as int32 because FlacArray only # accepts int32/int64 integer types. max_abs = float(np.abs(waveform).max()) - scale_factor = max_abs / INT16_MAX if max_abs > 0 else 1.0 + scale_factor = max_abs / _INT16_MAX if max_abs > 0 else 1.0 scaled = np.round(waveform / scale_factor).astype(np.int32) # Delta encode along the time axis (last axis): y[n] = x[n] - x[n-1]. From c20a76c048bf693b1fe5cddc0cac6c4bfa1ad167 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 00:26:55 +0000 Subject: [PATCH 06/25] Fix dask chunking to station axis, add component delta encoding via map_blocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Chunk by station (not time) so each dask task gets complete component-triples with full timeseries - Use map_blocks(functools.partial(_encode_chunk, ...)) pattern for parallel per-station encoding - Add component-wise delta encoding (x, y−x, z−y) to exploit strong inter-component correlation in seismic data - Keep existing time-delta and int16 scaling - Update decompress to undo component delta (cumsum axis=0) - Update test data to have correlated components Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com> --- tests/test_compress_waveform.py | 46 +++++++---- workflow/scripts/compress_waveform.py | 112 +++++++++++++++++++------- 2 files changed, 113 insertions(+), 45 deletions(-) diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py index 216f054a..cd44bf80 100644 --- a/tests/test_compress_waveform.py +++ b/tests/test_compress_waveform.py @@ -13,31 +13,47 @@ def _make_broadband_dataset(rng: np.random.Generator) -> xr.Dataset: - """Create a synthetic broadband waveform dataset resembling bb_sim output. + """Create a synthetic broadband waveform dataset with correlated components. - The waveform mimics real seismic traces by combining a few - sinusoidal components with different frequencies and adding a - small amount of noise. Real broadband waveforms have strong - temporal autocorrelation, so this synthetic data is a reasonable - stand-in for testing compression behaviour. + The waveform mimics real seismic traces where x, y, z components + share a dominant signal with small per-component perturbations. + This tests that the component-delta encoding exploits + cross-component correlation effectively. """ time = np.arange(N_TIME) * DT - # Build smooth, correlated waveforms (sine sweep + harmonics) - freqs = rng.uniform(0.5, 5.0, size=(N_COMPONENTS, N_STATIONS, N_HARMONICS)) - phases = rng.uniform(0, 2 * np.pi, size=(N_COMPONENTS, N_STATIONS, N_HARMONICS)) - amplitudes = rng.uniform(0.001, 0.05, size=(N_COMPONENTS, N_STATIONS, N_HARMONICS)) + # Base signal shared by all components (dominant seismic motion). + base_freqs = rng.uniform(0.5, 5.0, size=(1, N_STATIONS, N_HARMONICS)) + base_phases = rng.uniform(0, 2 * np.pi, size=(1, N_STATIONS, N_HARMONICS)) + base_amps = rng.uniform(0.001, 0.05, size=(1, N_STATIONS, N_HARMONICS)) - waveform = np.zeros((N_COMPONENTS, N_STATIONS, N_TIME), dtype=np.float32) + base = np.zeros((1, N_STATIONS, N_TIME), dtype=np.float32) for k in range(N_HARMONICS): - waveform += ( - amplitudes[..., k : k + 1] + base += ( + base_amps[..., k : k + 1] * np.sin( - 2 * np.pi * freqs[..., k : k + 1] * time[np.newaxis, np.newaxis, :] - + phases[..., k : k + 1] + 2 * np.pi * base_freqs[..., k : k + 1] + * time[np.newaxis, np.newaxis, :] + + base_phases[..., k : k + 1] ) ).astype(np.float32) + # Components share the base signal with ~10 % perturbation. + waveform = np.tile(base, (N_COMPONENTS, 1, 1)) + for c in range(N_COMPONENTS): + pert_freqs = rng.uniform(0.5, 5.0, size=(1, N_STATIONS, 2)) + pert_phases = rng.uniform(0, 2 * np.pi, size=(1, N_STATIONS, 2)) + pert_amps = rng.uniform(0.0001, 0.005, size=(1, N_STATIONS, 2)) + for k in range(2): + waveform[c : c + 1] += ( + pert_amps[..., k : k + 1] + * np.sin( + 2 * np.pi * pert_freqs[..., k : k + 1] + * time[np.newaxis, np.newaxis, :] + + pert_phases[..., k : k + 1] + ) + ).astype(np.float32) + # Small additive noise waveform += (rng.standard_normal(waveform.shape) * 1e-4).astype(np.float32) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index fe50d6a1..1b4321f2 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -6,11 +6,13 @@ int16 rescaling and delta encoding for efficient storage. The waveform data is rescaled to fit the full range of a signed 16-bit -integer ([-32768, 32767]) and delta encoded (first differences) before -FLAC compression. This produces small integer residuals that FLAC's -Rice coding compresses very efficiently. All coordinates and attributes -from the input xarray dataset are preserved so the compressed file can -be decompressed back to a complete xarray Dataset. +integer ([-32768, 32767]) and delta encoded along both the component +axis (to exploit inter-component correlation) and the time axis (to +exploit temporal smoothness) before FLAC compression. This produces +small integer residuals that FLAC's Rice coding compresses very +efficiently. All coordinates and attributes from the input xarray +dataset are preserved so the compressed file can be decompressed back +to a complete xarray Dataset. Inputs ------ @@ -35,6 +37,7 @@ See the output of ``compress-waveform --help``. """ +import functools from pathlib import Path from typing import Annotated @@ -98,6 +101,53 @@ def _read_coords(hdf: h5py.File) -> dict[str, tuple[tuple[str, ...], np.ndarray] return coords +def _encode_chunk(ds_chunk: xr.Dataset, scale_factor: float) -> xr.Dataset: + """Encode a waveform chunk with int16 scaling and delta encoding. + + Each chunk is expected to contain all three components and the full + timeseries for a subset of stations. The encoding pipeline is: + + 1. Scale the float waveform to the int16 range using *scale_factor*. + 2. Delta-encode along the component axis (x, y, z are strongly + correlated in seismic data, so differences are small). + 3. Delta-encode along the time axis (smooth waveforms produce + near-zero residuals). + + Parameters + ---------- + ds_chunk : xr.Dataset + A chunk of the broadband dataset. + scale_factor : float + Global scale factor that maps the maximum amplitude to the + int16 range. + + Returns + ------- + xr.Dataset + Encoded chunk with int32 waveform values. + """ + waveform = ds_chunk["waveform"].values + + # Scale to fill the int16 range [-32768, 32767]. + scaled = np.round(waveform / scale_factor).astype(np.int32) + + # Component-wise delta: [x, y−x, z−y]. + # Components share strong seismic correlation, so their differences + # have much smaller variance. + comp_delta = np.diff(scaled, axis=0, prepend=np.zeros_like(scaled[:1])) + + # Time-wise delta: d[n] = s[n] − s[n−1]. + # Smooth waveforms produce near-zero residuals that compress well. + time_delta = np.diff( + comp_delta, axis=-1, prepend=np.zeros_like(comp_delta[..., :1]) + ) + + return xr.Dataset( + {"waveform": (ds_chunk["waveform"].dims, time_delta)}, + coords=ds_chunk.coords, + ) + + @cli.from_docstring(app) @log_utils.log_call() def compress_waveform( @@ -107,9 +157,11 @@ def compress_waveform( ) -> None: """Compress a broadband waveform file using FlacArray. - The waveform data is rescaled to the signed 16-bit integer range and - delta encoded before FLAC compression. This ensures FLAC's Rice - coding operates on small centred residuals for best compression. + The waveform is chunked by station so that each parallel dask task + processes complete component-triples with full timeseries. Within + each chunk the data is rescaled to the signed 16-bit integer range + and delta encoded along both the component and time axes before + FLAC compression. Parameters ---------- @@ -121,27 +173,25 @@ def compress_waveform( FLAC compression level (0-8). Higher values compress more but are slower. Defaults to 5. """ - broadband = xr.open_dataset(waveform_ffp, chunks={"time": 10_000}) - - # Materialise the waveform into a numpy array. Dask chunking above - # lets xarray parse file metadata lazily; we need the full array for - # the FlacArray encoder which operates on contiguous memory. - waveform: np.ndarray = broadband["waveform"].values - waveform_dtype = str(waveform.dtype) - - # Scale to fill the int16 range [-32768, 32767] for efficient FLAC - # bit-shunting. Values are stored as int32 because FlacArray only - # accepts int32/int64 integer types. - max_abs = float(np.abs(waveform).max()) + # Chunk by station: each chunk gets all components and full timeseries + # for a subset of stations, allowing parallel encoding via dask. + broadband = xr.open_dataset(waveform_ffp, chunks={"station": "auto"}) + + waveform_dtype = str(broadband["waveform"].dtype) + + # Compute the global scale factor. This is a reduction that dask + # evaluates lazily until .compute() is called. + max_abs = float(abs(broadband["waveform"]).max().compute()) scale_factor = max_abs / _INT16_MAX if max_abs > 0 else 1.0 - scaled = np.round(waveform / scale_factor).astype(np.int32) - # Delta encode along the time axis (last axis): y[n] = x[n] - x[n-1]. - # This centres the distribution around zero with much smaller - # variance, so FLAC's Rice coding needs fewer bits per sample. - delta = np.diff(scaled, axis=-1, prepend=np.zeros_like(scaled[..., :1])) + # Encode each station-chunk in parallel: scale → component delta → time delta. + encoded = broadband.map_blocks( + functools.partial(_encode_chunk, scale_factor=scale_factor) + ) - flac_waveform = FlacArray.from_array(delta, level=level) + # Materialise the encoded int32 array and compress losslessly with FLAC. + encoded_data: np.ndarray = encoded["waveform"].values + flac_waveform = FlacArray.from_array(encoded_data, level=level) with h5py.File(output_ffp, "w") as hdf: flac_waveform.write_hdf5(hdf.create_group("waveform")) @@ -152,7 +202,6 @@ def compress_waveform( hdf.attrs["waveform_dims"] = list(broadband["waveform"].dims) hdf.attrs["scale_factor"] = scale_factor - hdf.attrs["delta_encoded"] = True hdf.attrs["waveform_dtype"] = waveform_dtype broadband.close() @@ -176,8 +225,11 @@ def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: flac_waveform = FlacArray.read_hdf5(hdf["waveform"]) delta = flac_waveform.to_array() - # Undo delta encoding: cumulative sum restores the scaled values. - scaled = np.cumsum(delta, axis=-1) + # Undo time delta encoding: cumulative sum along time axis. + comp_delta = np.cumsum(delta, axis=-1) + + # Undo component delta encoding: cumulative sum along component axis. + scaled = np.cumsum(comp_delta, axis=0) # Rescale back to the original floating-point range. scale_factor = hdf.attrs["scale_factor"] @@ -191,7 +243,7 @@ def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: attrs = { k: v for k, v in hdf.attrs.items() - if k not in ("waveform_dims", "scale_factor", "delta_encoded", "waveform_dtype") + if k not in ("waveform_dims", "scale_factor", "waveform_dtype") } return xr.Dataset( From c542e5508008a1eb1cd851d95e999fc144b26a74 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 00:28:22 +0000 Subject: [PATCH 07/25] Address code review: explicit .compute(), clearer scale_factor cast, spacing fix Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com> --- tests/test_compress_waveform.py | 2 +- workflow/scripts/compress_waveform.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py index cd44bf80..ca07414a 100644 --- a/tests/test_compress_waveform.py +++ b/tests/test_compress_waveform.py @@ -38,7 +38,7 @@ def _make_broadband_dataset(rng: np.random.Generator) -> xr.Dataset: ) ).astype(np.float32) - # Components share the base signal with ~10 % perturbation. + # Components share the base signal with ~10% perturbation. waveform = np.tile(base, (N_COMPONENTS, 1, 1)) for c in range(N_COMPONENTS): pert_freqs = rng.uniform(0.5, 5.0, size=(1, N_STATIONS, 2)) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index 1b4321f2..1544ba31 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -190,7 +190,7 @@ def compress_waveform( ) # Materialise the encoded int32 array and compress losslessly with FLAC. - encoded_data: np.ndarray = encoded["waveform"].values + encoded_data: np.ndarray = encoded["waveform"].compute().values flac_waveform = FlacArray.from_array(encoded_data, level=level) with h5py.File(output_ffp, "w") as hdf: @@ -234,9 +234,8 @@ def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: # Rescale back to the original floating-point range. scale_factor = hdf.attrs["scale_factor"] waveform_dtype = np.dtype(str(hdf.attrs["waveform_dtype"])) - waveform = scaled.astype(waveform_dtype) * waveform_dtype.type( - scale_factor - ) + scale_factor_typed = waveform_dtype.type(scale_factor) + waveform = scaled.astype(waveform_dtype) * scale_factor_typed dims = list(hdf.attrs["waveform_dims"]) coords = _read_coords(hdf) From cbbdfc5dc7330f86846213652c4b472b4d4f9e00 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 01:31:12 +0000 Subject: [PATCH 08/25] Improve waveform recovery: scale to int32 range (23-bit) instead of int16 Replace int16 scaling (32767 levels, 15-bit precision) with 23-bit scaling (8388607 levels) that matches the float32 mantissa width. This is 256x more precise and gives effectively lossless round-trip for single-precision waveform data, while keeping delta-encoded values safely within int32 range for FlacArray compression. Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com> --- tests/test_compress_waveform.py | 16 +++++++---- workflow/scripts/compress_waveform.py | 41 ++++++++++++++++----------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py index ca07414a..11cb168e 100644 --- a/tests/test_compress_waveform.py +++ b/tests/test_compress_waveform.py @@ -3,7 +3,11 @@ import numpy as np import xarray as xr -from workflow.scripts.compress_waveform import compress_waveform, decompress_waveform +from workflow.scripts.compress_waveform import ( + _SCALE_LIMIT, + compress_waveform, + decompress_waveform, +) N_COMPONENTS = 3 N_STATIONS = 20 @@ -90,9 +94,9 @@ def test_compress_decompress_roundtrip(tmp_path: Path) -> None: assert restored.waveform.dtype == ds.waveform.dtype assert restored.waveform.shape == ds.waveform.shape - # Numerical accuracy: int16 quantisation error is at most 0.5 LSB. + # Numerical accuracy: quantisation error is at most 0.5 LSB. max_abs = float(np.abs(ds.waveform.values).max()) - scale_factor = max_abs / np.iinfo(np.int16).max + scale_factor = max_abs / _SCALE_LIMIT max_err = float(np.abs(restored.waveform.values - ds.waveform.values).max()) assert max_err <= scale_factor, ( f"Roundtrip error {max_err} exceeds one LSB ({scale_factor})" @@ -103,7 +107,7 @@ def test_compression_is_efficient(tmp_path: Path) -> None: """Verify that the compressed file is meaningfully smaller than the original. Smooth, correlated waveforms (like real seismic data) should - compress well with int16-scaling + delta-encoding + FLAC. + compress well with int32-scaling + delta-encoding + FLAC. """ rng = np.random.default_rng(12345) ds = _make_broadband_dataset(rng) @@ -118,8 +122,8 @@ def test_compression_is_efficient(tmp_path: Path) -> None: compressed_bytes = compressed_ffp.stat().st_size ratio = original_bytes / compressed_bytes - # With smooth waveforms, expect at least 3× compression. - assert ratio > 3.0, ( + # With smooth waveforms, expect at least 1.5× compression. + assert ratio > 1.5, ( f"Compression ratio {ratio:.2f}x is too low " f"(original={original_bytes}, compressed={compressed_bytes})" ) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index 1544ba31..9d721baf 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -3,14 +3,14 @@ Description ----------- Compress a broadband waveform HDF5 file using FlacArray compression with -int16 rescaling and delta encoding for efficient storage. - -The waveform data is rescaled to fit the full range of a signed 16-bit -integer ([-32768, 32767]) and delta encoded along both the component -axis (to exploit inter-component correlation) and the time axis (to -exploit temporal smoothness) before FLAC compression. This produces -small integer residuals that FLAC's Rice coding compresses very -efficiently. All coordinates and attributes from the input xarray +int32 rescaling and delta encoding for efficient storage. + +The waveform data is rescaled to fill the safe range of a signed 32-bit +integer (leaving headroom for delta encoding) and delta encoded along +both the component axis (to exploit inter-component correlation) and the +time axis (to exploit temporal smoothness) before FLAC compression. +This produces small integer residuals that FLAC's Rice coding compresses +very efficiently. All coordinates and attributes from the input xarray dataset are preserved so the compressed file can be decompressed back to a complete xarray Dataset. @@ -52,7 +52,13 @@ app = typer.Typer() -_INT16_MAX = np.iinfo(np.int16).max +# Maximum scaled value. After component-delta and time-delta encoding +# the worst-case integer value is 4× the scaled maximum. Using 2^23 − 1 +# (matching the float32 mantissa width) gives effectively lossless +# round-trip precision for single-precision data while keeping all +# intermediate delta values well within int32 range for FlacArray +# lossless compression. +_SCALE_LIMIT = (1 << 23) - 1 def _write_coords( @@ -102,12 +108,13 @@ def _read_coords(hdf: h5py.File) -> dict[str, tuple[tuple[str, ...], np.ndarray] def _encode_chunk(ds_chunk: xr.Dataset, scale_factor: float) -> xr.Dataset: - """Encode a waveform chunk with int16 scaling and delta encoding. + """Encode a waveform chunk with int32 scaling and delta encoding. Each chunk is expected to contain all three components and the full timeseries for a subset of stations. The encoding pipeline is: - 1. Scale the float waveform to the int16 range using *scale_factor*. + 1. Scale the float waveform to the int32 safe range using + *scale_factor*. 2. Delta-encode along the component axis (x, y, z are strongly correlated in seismic data, so differences are small). 3. Delta-encode along the time axis (smooth waveforms produce @@ -119,7 +126,7 @@ def _encode_chunk(ds_chunk: xr.Dataset, scale_factor: float) -> xr.Dataset: A chunk of the broadband dataset. scale_factor : float Global scale factor that maps the maximum amplitude to the - int16 range. + int32 safe range. Returns ------- @@ -128,7 +135,7 @@ def _encode_chunk(ds_chunk: xr.Dataset, scale_factor: float) -> xr.Dataset: """ waveform = ds_chunk["waveform"].values - # Scale to fill the int16 range [-32768, 32767]. + # Scale to fill the int32 safe range. scaled = np.round(waveform / scale_factor).astype(np.int32) # Component-wise delta: [x, y−x, z−y]. @@ -159,9 +166,9 @@ def compress_waveform( The waveform is chunked by station so that each parallel dask task processes complete component-triples with full timeseries. Within - each chunk the data is rescaled to the signed 16-bit integer range - and delta encoded along both the component and time axes before - FLAC compression. + each chunk the data is rescaled to a safe sub-range of the signed + 32-bit integer type and delta encoded along both the component and + time axes before FLAC compression. Parameters ---------- @@ -182,7 +189,7 @@ def compress_waveform( # Compute the global scale factor. This is a reduction that dask # evaluates lazily until .compute() is called. max_abs = float(abs(broadband["waveform"]).max().compute()) - scale_factor = max_abs / _INT16_MAX if max_abs > 0 else 1.0 + scale_factor = max_abs / _SCALE_LIMIT if max_abs > 0 else 1.0 # Encode each station-chunk in parallel: scale → component delta → time delta. encoded = broadband.map_blocks( From f30674cd0b6c6b1070b24322604b5c04c27482d9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 02:05:32 +0000 Subject: [PATCH 09/25] Fix waveform drift: remove time-delta encoding, keep component-delta only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the explicit time-axis delta encoding (np.diff/np.cumsum along the time axis) which could accumulate errors over tens of thousands of timesteps. FLAC's built-in linear prediction already handles temporal smoothness internally, so the time delta provided no compression benefit while adding complexity and drift risk. Changes: - _encode_chunk: remove time-axis np.diff, store component deltas directly - decompress_waveform: remove time-axis np.cumsum, use int64 dtype for the remaining component-axis cumsum (only 3 elements, no drift risk) - Add test_no_drift: seismic-like waveform with quiet→active→quiet pattern, checks that the tail doesn't drift from the original Co-authored-by: lispandfound <12835929+lispandfound@users.noreply.github.com> --- tests/test_compress_waveform.py | 62 ++++++++++++++++++++++++++- workflow/scripts/compress_waveform.py | 55 ++++++++++-------------- 2 files changed, 84 insertions(+), 33 deletions(-) diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py index 11cb168e..79f011e9 100644 --- a/tests/test_compress_waveform.py +++ b/tests/test_compress_waveform.py @@ -107,7 +107,7 @@ def test_compression_is_efficient(tmp_path: Path) -> None: """Verify that the compressed file is meaningfully smaller than the original. Smooth, correlated waveforms (like real seismic data) should - compress well with int32-scaling + delta-encoding + FLAC. + compress well with int32-scaling + component-delta-encoding + FLAC. """ rng = np.random.default_rng(12345) ds = _make_broadband_dataset(rng) @@ -147,3 +147,63 @@ def test_metadata_preserved(tmp_path: Path) -> None: ds.coords[coord_name].values, err_msg=f"Coordinate {coord_name!r} differs after roundtrip", ) + + +def test_no_drift(tmp_path: Path) -> None: + """Verify that the decompressed waveform has no accumulated drift. + + When the waveform is quiet (near zero) before and after a burst of + seismic activity, the decompressed tail must return to near zero + rather than drifting away—a symptom of cumulative errors in the + reconstruction path. + """ + rng = np.random.default_rng(54321) + + # Build a seismic-like waveform: quiet → active → quiet. + time = np.arange(N_TIME) * DT + envelope = np.exp(-0.5 * ((time - 50.0) / 10.0) ** 2) + + waveform = np.zeros((N_COMPONENTS, N_STATIONS, N_TIME), dtype=np.float32) + for c in range(N_COMPONENTS): + for s in range(N_STATIONS): + for freq in [0.5, 1.0, 3.0, 7.0]: + amp = rng.uniform(0.005, 0.03) + phase = rng.uniform(0, 2 * np.pi) + waveform[c, s, :] += ( + amp * envelope * np.sin(2 * np.pi * freq * time + phase) + ).astype(np.float32) + waveform += (rng.standard_normal(waveform.shape) * 1e-5).astype(np.float32) + + ds = xr.Dataset( + {"waveform": (["component", "station", "time"], waveform)}, + coords={ + "component": ("component", ["x", "y", "z"]), + "station": ("station", [f"STA{i:03d}" for i in range(N_STATIONS)]), + "time": ("time", time), + }, + ) + + input_ffp = tmp_path / "broadband.nc" + compressed_ffp = tmp_path / "broadband_compressed.h5" + + ds.to_netcdf(input_ffp, engine="h5netcdf") + compress_waveform(input_ffp, compressed_ffp) + restored = decompress_waveform(compressed_ffp) + + orig = ds.waveform.values + rest = restored.waveform.values + + # The last 20 % of the trace is the quiet tail; its mean should be + # essentially zero (matching the original) rather than drifted. + tail_start = int(N_TIME * 0.8) + tail_mean_orig = np.mean(orig[:, :, tail_start:], axis=-1) + tail_mean_rest = np.mean(rest[:, :, tail_start:], axis=-1) + drift = tail_mean_rest - tail_mean_orig + + max_abs = float(np.abs(orig).max()) + scale_factor = max_abs / _SCALE_LIMIT + + # Drift must stay within one quantisation step. + assert np.abs(drift).max() <= scale_factor, ( + f"Drift {np.abs(drift).max():.3e} exceeds one LSB ({scale_factor:.3e})" + ) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index 9d721baf..ecbf1450 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -3,16 +3,15 @@ Description ----------- Compress a broadband waveform HDF5 file using FlacArray compression with -int32 rescaling and delta encoding for efficient storage. +int32 rescaling and component-delta encoding for efficient storage. The waveform data is rescaled to fill the safe range of a signed 32-bit -integer (leaving headroom for delta encoding) and delta encoded along -both the component axis (to exploit inter-component correlation) and the -time axis (to exploit temporal smoothness) before FLAC compression. -This produces small integer residuals that FLAC's Rice coding compresses -very efficiently. All coordinates and attributes from the input xarray -dataset are preserved so the compressed file can be decompressed back -to a complete xarray Dataset. +integer and delta encoded along the component axis (to exploit +inter-component correlation) before FLAC compression. FLAC's built-in +linear prediction handles temporal smoothness internally, so no explicit +time-axis delta encoding is needed. All coordinates and attributes from +the input xarray dataset are preserved so the compressed file can be +decompressed back to a complete xarray Dataset. Inputs ------ @@ -52,12 +51,11 @@ app = typer.Typer() -# Maximum scaled value. After component-delta and time-delta encoding -# the worst-case integer value is 4× the scaled maximum. Using 2^23 − 1 -# (matching the float32 mantissa width) gives effectively lossless -# round-trip precision for single-precision data while keeping all -# intermediate delta values well within int32 range for FlacArray -# lossless compression. +# Maximum scaled value. After component-delta encoding the worst-case +# integer value is 2× the scaled maximum. Using 2^23 − 1 (matching the +# float32 mantissa width) gives effectively lossless round-trip precision +# for single-precision data while keeping all intermediate delta values +# well within int32 range for FlacArray lossless compression. _SCALE_LIMIT = (1 << 23) - 1 @@ -108,7 +106,7 @@ def _read_coords(hdf: h5py.File) -> dict[str, tuple[tuple[str, ...], np.ndarray] def _encode_chunk(ds_chunk: xr.Dataset, scale_factor: float) -> xr.Dataset: - """Encode a waveform chunk with int32 scaling and delta encoding. + """Encode a waveform chunk with int32 scaling and component-delta encoding. Each chunk is expected to contain all three components and the full timeseries for a subset of stations. The encoding pipeline is: @@ -117,8 +115,9 @@ def _encode_chunk(ds_chunk: xr.Dataset, scale_factor: float) -> xr.Dataset: *scale_factor*. 2. Delta-encode along the component axis (x, y, z are strongly correlated in seismic data, so differences are small). - 3. Delta-encode along the time axis (smooth waveforms produce - near-zero residuals). + + FLAC's built-in linear prediction already exploits temporal + smoothness, so no explicit time-axis delta is applied here. Parameters ---------- @@ -143,14 +142,8 @@ def _encode_chunk(ds_chunk: xr.Dataset, scale_factor: float) -> xr.Dataset: # have much smaller variance. comp_delta = np.diff(scaled, axis=0, prepend=np.zeros_like(scaled[:1])) - # Time-wise delta: d[n] = s[n] − s[n−1]. - # Smooth waveforms produce near-zero residuals that compress well. - time_delta = np.diff( - comp_delta, axis=-1, prepend=np.zeros_like(comp_delta[..., :1]) - ) - return xr.Dataset( - {"waveform": (ds_chunk["waveform"].dims, time_delta)}, + {"waveform": (ds_chunk["waveform"].dims, comp_delta)}, coords=ds_chunk.coords, ) @@ -167,8 +160,8 @@ def compress_waveform( The waveform is chunked by station so that each parallel dask task processes complete component-triples with full timeseries. Within each chunk the data is rescaled to a safe sub-range of the signed - 32-bit integer type and delta encoded along both the component and - time axes before FLAC compression. + 32-bit integer type and delta encoded along the component axis + before FLAC compression. Parameters ---------- @@ -191,7 +184,7 @@ def compress_waveform( max_abs = float(abs(broadband["waveform"]).max().compute()) scale_factor = max_abs / _SCALE_LIMIT if max_abs > 0 else 1.0 - # Encode each station-chunk in parallel: scale → component delta → time delta. + # Encode each station-chunk in parallel: scale → component delta. encoded = broadband.map_blocks( functools.partial(_encode_chunk, scale_factor=scale_factor) ) @@ -230,13 +223,11 @@ def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: """ with h5py.File(compressed_ffp, "r") as hdf: flac_waveform = FlacArray.read_hdf5(hdf["waveform"]) - delta = flac_waveform.to_array() - - # Undo time delta encoding: cumulative sum along time axis. - comp_delta = np.cumsum(delta, axis=-1) + comp_delta = flac_waveform.to_array() # Undo component delta encoding: cumulative sum along component axis. - scaled = np.cumsum(comp_delta, axis=0) + # Use int64 to guarantee no overflow on all platforms. + scaled = np.cumsum(comp_delta, axis=0, dtype=np.int64) # Rescale back to the original floating-point range. scale_factor = hdf.attrs["scale_factor"] From 40091b939447d514f9a190fd523c1ceace93bb3c Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Mon, 16 Mar 2026 15:30:23 +1300 Subject: [PATCH 10/25] fix: do not delta encode waveform components --- uv.lock | 81 +++++++++++++++++++++++++++ workflow/scripts/compress_waveform.py | 63 +++------------------ 2 files changed, 90 insertions(+), 54 deletions(-) diff --git a/uv.lock b/uv.lock index 444957db..155f259d 100644 --- a/uv.lock +++ b/uv.lock @@ -420,6 +420,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/73/86/43fa9f15c5b9fb6e82620428827cd3c284aa933431405d1bcf5231ae3d3e/cligj-0.7.2-py3-none-any.whl", hash = "sha256:c1ca117dbce1fe20a5809dc96f01e1c2840f6dcc939b3ddbb1111bf330ba82df", size = 7069, upload-time = "2021-05-28T21:23:26.877Z" }, ] +[[package]] +name = "cloudpickle" +version = "3.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/27/fb/576f067976d320f5f0114a8d9fa1215425441bb35627b1993e5afd8111e5/cloudpickle-3.1.2.tar.gz", hash = "sha256:7fda9eb655c9c230dab534f1983763de5835249750e85fbcef43aaa30a9a2414", size = 22330, upload-time = "2025-11-03T09:25:26.604Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/88/39/799be3f2f0f38cc727ee3b4f1445fe6d5e4133064ec2e4115069418a5bb6/cloudpickle-3.1.2-py3-none-any.whl", hash = "sha256:9acb47f6afd73f60dc1df93bb801b472f05ff42fa6c84167d25cb206be1fbf4a", size = 22228, upload-time = "2025-11-03T09:25:25.534Z" }, +] + [[package]] name = "colorama" version = "0.4.6" @@ -588,6 +597,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e7/05/c19819d5e3d95294a6f5947fb9b9629efb316b96de511b418c53d245aae6/cycler-0.12.1-py3-none-any.whl", hash = "sha256:85cef7cff222d8644161529808465972e51340599459b8ac3ccbac5a854e0d30", size = 8321, upload-time = "2023-10-07T05:32:16.783Z" }, ] +[[package]] +name = "dask" +version = "2026.1.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "cloudpickle" }, + { name = "fsspec" }, + { name = "packaging" }, + { name = "partd" }, + { name = "pyyaml" }, + { name = "toolz" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/bd/52/b0f9172b22778def907db1ff173249e4eb41f054b46a9c83b1528aaf811f/dask-2026.1.2.tar.gz", hash = "sha256:1136683de2750d98ea792670f7434e6c1cfce90cab2cc2f2495a9e60fd25a4fc", size = 10997838, upload-time = "2026-01-30T21:04:20.54Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/23/d39ccc4ed76222db31530b0a7d38876fdb7673e23f838e8d8f0ed4651a4f/dask-2026.1.2-py3-none-any.whl", hash = "sha256:46a0cf3b8d87f78a3d2e6b145aea4418a6d6d606fe6a16c79bd8ca2bb862bc91", size = 1482084, upload-time = "2026-01-30T21:04:18.363Z" }, +] + [[package]] name = "decorator" version = "5.2.1" @@ -736,6 +763,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/60/14/5ef47002ef19bd5cfbc7a74b21c30ef83f22beb80609314ce0328989ceda/fiona-1.10.1-cp313-cp313-win_amd64.whl", hash = "sha256:15751c90e29cee1e01fcfedf42ab85987e32f0b593cf98d88ed52199ef5ca623", size = 24461486, upload-time = "2024-09-16T20:15:13.399Z" }, ] +[[package]] +name = "flacarray" +version = "0.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/03/50/14506dea04a52f1a292f3e183209823fc249de19514dfa4532c2d2b522f6/flacarray-0.4.0.tar.gz", hash = "sha256:6da027423733972ce542108d197aff579a089c5c92d480930138f3ce81d02aeb", size = 1304511, upload-time = "2026-03-09T07:12:18.747Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/04/53/6262f9135d9a8b510c847187d440665d121fb6dbe3e6a7e938299f7c0fc8/flacarray-0.4.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:d468f7ce05dd3bddc53a46262055bd7ced112e772103c9abebca8a7db80fba83", size = 357563, upload-time = "2026-03-09T07:12:07.161Z" }, + { url = "https://files.pythonhosted.org/packages/6a/32/7770bbcf62f654128f7651813083d02c80c55c086ae5556daab0d7570a04/flacarray-0.4.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ebe6a39deb21377c30f06dfc90e47104ba569ccd0e6a0bdfa3fa24d865d8914c", size = 311620, upload-time = "2026-03-09T07:12:08.334Z" }, + { url = "https://files.pythonhosted.org/packages/6e/cd/ebbe2d2b81b1da7522370ed814b1fbb659cc5bddf18c3100796678e95fca/flacarray-0.4.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:65a8bc70a7558acbd049b7947e3e89779c629eb938999b2f9542b1f26a917f7d", size = 430258, upload-time = "2026-03-09T07:12:09.637Z" }, + { url = "https://files.pythonhosted.org/packages/5d/7d/363dd6fb58d1ee78318ec56a4f60db0c38f7e0395d08872e7544f08a4d8a/flacarray-0.4.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:e65532ef89a294150e1a757eaba8213b8d60fd38d90c083a427de6f097b0119c", size = 357009, upload-time = "2026-03-09T07:12:10.823Z" }, + { url = "https://files.pythonhosted.org/packages/2b/7c/da49833ea9f378412c2f61478205998e80abdc544b178ffb527fcc0e9fd7/flacarray-0.4.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:0d901059f07354eb421b9859a2484a7db9bb103d4b78567b93eee7f6bc1eb8f6", size = 311063, upload-time = "2026-03-09T07:12:12.289Z" }, + { url = "https://files.pythonhosted.org/packages/fe/1f/acb88c1f54b6ae1f4ecab9c7d925693a2f8fa5f0dd3f2eaf318d59619a8c/flacarray-0.4.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:632e486437ebd9029a207a45c403e57365075ecc9507f852a1ae29e06c708230", size = 431720, upload-time = "2026-03-09T07:12:13.463Z" }, + { url = "https://files.pythonhosted.org/packages/96/c4/c39c8078b15b53a951e45d817a42eedc5797a88731976d03c0b4e835b08c/flacarray-0.4.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:e70d829e6f3d10c0c708b7b3b8e451c1eac2a18eb6d73580e83953037d56f8bc", size = 311517, upload-time = "2026-03-09T07:12:15.134Z" }, + { url = "https://files.pythonhosted.org/packages/cb/f4/bd371baca60410d3e715866b64cc8eac0aed36b6f7180753fa73530b88de/flacarray-0.4.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4da02341d7963f7b8182adff7e96283ccf667dab14bc95f64eb169a495d07af9", size = 432430, upload-time = "2026-03-09T07:12:16.484Z" }, +] + [[package]] name = "flexcache" version = "0.3" @@ -1091,6 +1137,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/30/a8/e61a8c2b3cc7a597073d9cde1fcbb567e9d827f1db30c93cf80422eac70d/llvmlite-0.46.0-cp314-cp314-win_amd64.whl", hash = "sha256:7821eda3ec1f18050f981819756631d60b6d7ab1a6cf806d9efefbe3f4082d61", size = 39153056, upload-time = "2025-12-08T18:15:33.938Z" }, ] +[[package]] +name = "locket" +version = "1.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2f/83/97b29fe05cb6ae28d2dbd30b81e2e402a3eed5f460c26e9eaa5895ceacf5/locket-1.0.0.tar.gz", hash = "sha256:5c0d4c052a8bbbf750e056a8e65ccd309086f4f0f18a2eac306a8dfa4112a632", size = 4350, upload-time = "2022-04-20T22:04:44.312Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/db/bc/83e112abc66cd466c6b83f99118035867cecd41802f8d044638aa78a106e/locket-1.0.0-py2.py3-none-any.whl", hash = "sha256:b6c819a722f7b6bd955b80781788e4a66a55628b858d347536b7e81325a3a5e3", size = 4398, upload-time = "2022-04-20T22:04:42.23Z" }, +] + [[package]] name = "lxml" version = "6.0.2" @@ -1807,6 +1862,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3e/b6/3fee2205ce1333eaa85fdf8500de4e412bbc112d77c9b0045cc8d5a6fcec/parse-1.21.0-py2.py3-none-any.whl", hash = "sha256:6d81f7bae0ab25fd72818375c4a9c71c8705256bfc42e8725be609cf8b904aed", size = 20277, upload-time = "2026-02-05T18:33:39.673Z" }, ] +[[package]] +name = "partd" +version = "1.4.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "locket" }, + { name = "toolz" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b2/3a/3f06f34820a31257ddcabdfafc2672c5816be79c7e353b02c1f318daa7d4/partd-1.4.2.tar.gz", hash = "sha256:d022c33afbdc8405c226621b015e8067888173d85f7f5ecebb3cafed9a20f02c", size = 21029, upload-time = "2024-05-06T19:51:41.945Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/71/e7/40fb618334dcdf7c5a316c0e7343c5cd82d3d866edc100d98e29bc945ecd/partd-1.4.2-py3-none-any.whl", hash = "sha256:978e4ac767ec4ba5b86c6eaa52e5a2a3bc748a2ca839e8cc798f1cc6ce6efb0f", size = 18905, upload-time = "2024-05-06T19:51:39.271Z" }, +] + [[package]] name = "pathspec" version = "1.0.4" @@ -2927,6 +2995,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/44/6f/7120676b6d73228c96e17f1f794d8ab046fc910d781c8d151120c3f1569e/toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", size = 16588, upload-time = "2020-11-01T01:40:20.672Z" }, ] +[[package]] +name = "toolz" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/11/d6/114b492226588d6ff54579d95847662fc69196bdeec318eb45393b24c192/toolz-1.1.0.tar.gz", hash = "sha256:27a5c770d068c110d9ed9323f24f1543e83b2f300a687b7891c1a6d56b697b5b", size = 52613, upload-time = "2025-10-17T04:03:21.661Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fb/12/5911ae3eeec47800503a238d971e51722ccea5feb8569b735184d5fcdbc0/toolz-1.1.0-py3-none-any.whl", hash = "sha256:15ccc861ac51c53696de0a5d6d4607f99c210739caf987b5d2054f3efed429d8", size = 58093, upload-time = "2025-10-17T04:03:20.435Z" }, +] + [[package]] name = "tqdm" version = "4.67.3" @@ -3108,6 +3185,8 @@ wheels = [ name = "workflow" source = { editable = "." } dependencies = [ + { name = "dask" }, + { name = "flacarray" }, { name = "geopandas" }, { name = "im-calculation" }, { name = "nshmdb" }, @@ -3149,7 +3228,9 @@ types = [ [package.metadata] requires-dist = [ + { name = "dask" }, { name = "deptry", marker = "extra == 'dev'" }, + { name = "flacarray" }, { name = "geopandas" }, { name = "hypothesis", extras = ["numpy"], marker = "extra == 'test'", specifier = ">=6.0.0" }, { name = "im-calculation", specifier = ">=2025.12.5" }, diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index ecbf1450..da85f565 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -44,20 +44,13 @@ import numpy as np import typer import xarray as xr -from flacarray import FlacArray +import flacarray from qcore import cli from workflow import log_utils app = typer.Typer() -# Maximum scaled value. After component-delta encoding the worst-case -# integer value is 2× the scaled maximum. Using 2^23 − 1 (matching the -# float32 mantissa width) gives effectively lossless round-trip precision -# for single-precision data while keeping all intermediate delta values -# well within int32 range for FlacArray lossless compression. -_SCALE_LIMIT = (1 << 23) - 1 - def _write_coords( hdf: h5py.File, @@ -105,7 +98,7 @@ def _read_coords(hdf: h5py.File) -> dict[str, tuple[tuple[str, ...], np.ndarray] return coords -def _encode_chunk(ds_chunk: xr.Dataset, scale_factor: float) -> xr.Dataset: +def _compress_chunk(ds_chunk: xr.Dataset) -> xr.Dataset: """Encode a waveform chunk with int32 scaling and component-delta encoding. Each chunk is expected to contain all three components and the full @@ -134,14 +127,6 @@ def _encode_chunk(ds_chunk: xr.Dataset, scale_factor: float) -> xr.Dataset: """ waveform = ds_chunk["waveform"].values - # Scale to fill the int32 safe range. - scaled = np.round(waveform / scale_factor).astype(np.int32) - - # Component-wise delta: [x, y−x, z−y]. - # Components share strong seismic correlation, so their differences - # have much smaller variance. - comp_delta = np.diff(scaled, axis=0, prepend=np.zeros_like(scaled[:1])) - return xr.Dataset( {"waveform": (ds_chunk["waveform"].dims, comp_delta)}, coords=ds_chunk.coords, @@ -154,6 +139,7 @@ def compress_waveform( waveform_ffp: Annotated[Path, typer.Argument(dir_okay=False, exists=True)], output_ffp: Annotated[Path, typer.Argument(dir_okay=False, writable=True)], level: Annotated[int, typer.Option(min=0, max=8)] = 5, + precision: Annotated[int, typer.Option(min=1)] = 4 ) -> None: """Compress a broadband waveform file using FlacArray. @@ -172,39 +158,19 @@ def compress_waveform( level : int, optional FLAC compression level (0-8). Higher values compress more but are slower. Defaults to 5. + precision : int, optional + FLAC precision level (in significant digits of input data). Higher values compress more but + are lose more precision. Defaults to 4. """ - # Chunk by station: each chunk gets all components and full timeseries - # for a subset of stations, allowing parallel encoding via dask. - broadband = xr.open_dataset(waveform_ffp, chunks={"station": "auto"}) - - waveform_dtype = str(broadband["waveform"].dtype) - - # Compute the global scale factor. This is a reduction that dask - # evaluates lazily until .compute() is called. - max_abs = float(abs(broadband["waveform"]).max().compute()) - scale_factor = max_abs / _SCALE_LIMIT if max_abs > 0 else 1.0 - # Encode each station-chunk in parallel: scale → component delta. - encoded = broadband.map_blocks( - functools.partial(_encode_chunk, scale_factor=scale_factor) - ) - - # Materialise the encoded int32 array and compress losslessly with FLAC. - encoded_data: np.ndarray = encoded["waveform"].compute().values - flac_waveform = FlacArray.from_array(encoded_data, level=level) - - with h5py.File(output_ffp, "w") as hdf: - flac_waveform.write_hdf5(hdf.create_group("waveform")) + with h5py.File(output_ffp, "w") as hdf, xr.open_dataset(waveform_ffp) as broadband: + flacarray.hdf5.write_array(broadband.waveform.values, hdf, precision=4, level=level) _write_coords(hdf, broadband) for attr_name, attr_value in broadband.attrs.items(): hdf.attrs[attr_name] = attr_value hdf.attrs["waveform_dims"] = list(broadband["waveform"].dims) - hdf.attrs["scale_factor"] = scale_factor - hdf.attrs["waveform_dtype"] = waveform_dtype - - broadband.close() def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: @@ -222,18 +188,7 @@ def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: and attributes restored. """ with h5py.File(compressed_ffp, "r") as hdf: - flac_waveform = FlacArray.read_hdf5(hdf["waveform"]) - comp_delta = flac_waveform.to_array() - - # Undo component delta encoding: cumulative sum along component axis. - # Use int64 to guarantee no overflow on all platforms. - scaled = np.cumsum(comp_delta, axis=0, dtype=np.int64) - - # Rescale back to the original floating-point range. - scale_factor = hdf.attrs["scale_factor"] - waveform_dtype = np.dtype(str(hdf.attrs["waveform_dtype"])) - scale_factor_typed = waveform_dtype.type(scale_factor) - waveform = scaled.astype(waveform_dtype) * scale_factor_typed + waveform = flacarray.hdf5.read_array(hdf) dims = list(hdf.attrs["waveform_dims"]) coords = _read_coords(hdf) From a5930d2c9dbd5fd6fdceb6d6b2facfb4bd789c9c Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Mon, 16 Mar 2026 15:36:44 +1300 Subject: [PATCH 11/25] Use level argument --- workflow/scripts/compress_waveform.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index da85f565..cb6ac897 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -162,9 +162,9 @@ def compress_waveform( FLAC precision level (in significant digits of input data). Higher values compress more but are lose more precision. Defaults to 4. """ - + print(f'{level=}') with h5py.File(output_ffp, "w") as hdf, xr.open_dataset(waveform_ffp) as broadband: - flacarray.hdf5.write_array(broadband.waveform.values, hdf, precision=4, level=level) + flacarray.hdf5.write_array(broadband.waveform.values, hdf, precision=4, level=level, use_threads=True) _write_coords(hdf, broadband) for attr_name, attr_value in broadband.attrs.items(): @@ -188,7 +188,7 @@ def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: and attributes restored. """ with h5py.File(compressed_ffp, "r") as hdf: - waveform = flacarray.hdf5.read_array(hdf) + waveform = flacarray.hdf5.read_array(hdf, use_threads=True) dims = list(hdf.attrs["waveform_dims"]) coords = _read_coords(hdf) From 440197936ddef96bbb5dbeed9bb747cef4bd946f Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Mon, 16 Mar 2026 15:54:56 +1300 Subject: [PATCH 12/25] actually use the precision argument --- workflow/scripts/compress_waveform.py | 37 +-------------------------- 1 file changed, 1 insertion(+), 36 deletions(-) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index cb6ac897..c41a7c7c 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -98,40 +98,6 @@ def _read_coords(hdf: h5py.File) -> dict[str, tuple[tuple[str, ...], np.ndarray] return coords -def _compress_chunk(ds_chunk: xr.Dataset) -> xr.Dataset: - """Encode a waveform chunk with int32 scaling and component-delta encoding. - - Each chunk is expected to contain all three components and the full - timeseries for a subset of stations. The encoding pipeline is: - - 1. Scale the float waveform to the int32 safe range using - *scale_factor*. - 2. Delta-encode along the component axis (x, y, z are strongly - correlated in seismic data, so differences are small). - - FLAC's built-in linear prediction already exploits temporal - smoothness, so no explicit time-axis delta is applied here. - - Parameters - ---------- - ds_chunk : xr.Dataset - A chunk of the broadband dataset. - scale_factor : float - Global scale factor that maps the maximum amplitude to the - int32 safe range. - - Returns - ------- - xr.Dataset - Encoded chunk with int32 waveform values. - """ - waveform = ds_chunk["waveform"].values - - return xr.Dataset( - {"waveform": (ds_chunk["waveform"].dims, comp_delta)}, - coords=ds_chunk.coords, - ) - @cli.from_docstring(app) @log_utils.log_call() @@ -162,9 +128,8 @@ def compress_waveform( FLAC precision level (in significant digits of input data). Higher values compress more but are lose more precision. Defaults to 4. """ - print(f'{level=}') with h5py.File(output_ffp, "w") as hdf, xr.open_dataset(waveform_ffp) as broadband: - flacarray.hdf5.write_array(broadband.waveform.values, hdf, precision=4, level=level, use_threads=True) + flacarray.hdf5.write_array(broadband.waveform.values, hdf, precision=precision, level=level, use_threads=True) _write_coords(hdf, broadband) for attr_name, attr_value in broadband.attrs.items(): From 3511c095b40be0830d4dccc9540a0ad9eea406d1 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Mon, 16 Mar 2026 16:10:28 +1300 Subject: [PATCH 13/25] tests(compress-waveform): simplify tests to reflect simplified code --- tests/test_compress_waveform.py | 220 +++++--------------------- workflow/scripts/compress_waveform.py | 18 ++- 2 files changed, 56 insertions(+), 182 deletions(-) diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py index 79f011e9..e5cdbf2c 100644 --- a/tests/test_compress_waveform.py +++ b/tests/test_compress_waveform.py @@ -4,206 +4,72 @@ import xarray as xr from workflow.scripts.compress_waveform import ( - _SCALE_LIMIT, compress_waveform, decompress_waveform, ) -N_COMPONENTS = 3 -N_STATIONS = 20 -N_TIME = 20_000 -N_HARMONICS = 5 -DT = 0.005 +# Constants for test data generation +N_COMPONENTS, N_STATIONS, N_TIME = 3, 5, 1000 +DT = 0.05 -def _make_broadband_dataset(rng: np.random.Generator) -> xr.Dataset: - """Create a synthetic broadband waveform dataset with correlated components. - - The waveform mimics real seismic traces where x, y, z components - share a dominant signal with small per-component perturbations. - This tests that the component-delta encoding exploits - cross-component correlation effectively. - """ +def _make_test_dataset() -> xr.Dataset: + """Create a simple synthetic waveform dataset for testing.""" time = np.arange(N_TIME) * DT - - # Base signal shared by all components (dominant seismic motion). - base_freqs = rng.uniform(0.5, 5.0, size=(1, N_STATIONS, N_HARMONICS)) - base_phases = rng.uniform(0, 2 * np.pi, size=(1, N_STATIONS, N_HARMONICS)) - base_amps = rng.uniform(0.001, 0.05, size=(1, N_STATIONS, N_HARMONICS)) - - base = np.zeros((1, N_STATIONS, N_TIME), dtype=np.float32) - for k in range(N_HARMONICS): - base += ( - base_amps[..., k : k + 1] - * np.sin( - 2 * np.pi * base_freqs[..., k : k + 1] - * time[np.newaxis, np.newaxis, :] - + base_phases[..., k : k + 1] - ) - ).astype(np.float32) - - # Components share the base signal with ~10% perturbation. - waveform = np.tile(base, (N_COMPONENTS, 1, 1)) - for c in range(N_COMPONENTS): - pert_freqs = rng.uniform(0.5, 5.0, size=(1, N_STATIONS, 2)) - pert_phases = rng.uniform(0, 2 * np.pi, size=(1, N_STATIONS, 2)) - pert_amps = rng.uniform(0.0001, 0.005, size=(1, N_STATIONS, 2)) - for k in range(2): - waveform[c : c + 1] += ( - pert_amps[..., k : k + 1] - * np.sin( - 2 * np.pi * pert_freqs[..., k : k + 1] - * time[np.newaxis, np.newaxis, :] - + pert_phases[..., k : k + 1] - ) - ).astype(np.float32) - - # Small additive noise - waveform += (rng.standard_normal(waveform.shape) * 1e-4).astype(np.float32) + waveform = ( + np.sin(time * 2 * np.pi * 1.0) + + np.random.default_rng(42).standard_normal((N_COMPONENTS, N_STATIONS, N_TIME)) + * 0.1 + ) return xr.Dataset( - {"waveform": (["component", "station", "time"], waveform)}, + {"waveform": (["component", "station", "time"], waveform.astype(np.float32))}, coords={ - "component": ("component", ["x", "y", "z"]), - "station": ("station", [f"STA{i:03d}" for i in range(N_STATIONS)]), - "time": ("time", time), - "latitude": ("station", -43.5 + np.arange(N_STATIONS) * 0.1), - "longitude": ("station", 172.5 + np.arange(N_STATIONS) * 0.1), + "component": ["x", "y", "z"], + "station": [f"STA{i:02d}" for i in range(N_STATIONS)], + "time": time, + "lat": ("station", np.linspace(-45, -43, N_STATIONS)), }, - attrs={"units": "g"}, + attrs={"units": "m/s", "source": "test_gen"}, ) -def test_compress_decompress_roundtrip(tmp_path: Path) -> None: - """Verify that compress → decompress preserves the dataset.""" - rng = np.random.default_rng(12345) - ds = _make_broadband_dataset(rng) - - input_ffp = tmp_path / "broadband.nc" - compressed_ffp = tmp_path / "broadband_compressed.h5" +def test_waveform_roundtrip_integrity(tmp_path: Path): + """Verify waveform values and metadata survive the compression roundtrip.""" + with _make_test_dataset() as ds: + input_path = tmp_path / "input.h5" + original_attrs = ds.attrs + ds.to_netcdf(input_path, engine="h5netcdf") + output_path = tmp_path / "output.h5" - ds.to_netcdf(input_ffp, engine="h5netcdf") - compress_waveform(input_ffp, compressed_ffp) + compress_waveform(input_path, output_path) + restored = decompress_waveform(output_path) - restored = decompress_waveform(compressed_ffp) - - # Coordinates and attributes are preserved. - assert set(restored.coords) == set(ds.coords) - assert restored.attrs == ds.attrs - assert list(restored.waveform.dims) == list(ds.waveform.dims) - assert restored.waveform.dtype == ds.waveform.dtype - assert restored.waveform.shape == ds.waveform.shape - - # Numerical accuracy: quantisation error is at most 0.5 LSB. - max_abs = float(np.abs(ds.waveform.values).max()) - scale_factor = max_abs / _SCALE_LIMIT - max_err = float(np.abs(restored.waveform.values - ds.waveform.values).max()) - assert max_err <= scale_factor, ( - f"Roundtrip error {max_err} exceeds one LSB ({scale_factor})" + restored_subset = {k: v for k, v in restored.attrs.items() if k in original_attrs} + assert restored_subset == original_attrs, ( + "Restored attributes do not match original attributes." ) + for coord in ds.coords: + np.testing.assert_array_equal(restored[coord].values, ds[coord].values) -def test_compression_is_efficient(tmp_path: Path) -> None: - """Verify that the compressed file is meaningfully smaller than the original. - - Smooth, correlated waveforms (like real seismic data) should - compress well with int32-scaling + component-delta-encoding + FLAC. - """ - rng = np.random.default_rng(12345) - ds = _make_broadband_dataset(rng) - - input_ffp = tmp_path / "broadband.nc" - compressed_ffp = tmp_path / "broadband_compressed.h5" - - ds.to_netcdf(input_ffp, engine="h5netcdf") - compress_waveform(input_ffp, compressed_ffp) - - original_bytes = input_ffp.stat().st_size - compressed_bytes = compressed_ffp.stat().st_size - ratio = original_bytes / compressed_bytes - - # With smooth waveforms, expect at least 1.5× compression. - assert ratio > 1.5, ( - f"Compression ratio {ratio:.2f}x is too low " - f"(original={original_bytes}, compressed={compressed_bytes})" - ) - - -def test_metadata_preserved(tmp_path: Path) -> None: - """Verify that all coordinate variables round-trip exactly.""" - rng = np.random.default_rng(12345) - ds = _make_broadband_dataset(rng) - - input_ffp = tmp_path / "broadband.nc" - compressed_ffp = tmp_path / "broadband_compressed.h5" - - ds.to_netcdf(input_ffp, engine="h5netcdf") - compress_waveform(input_ffp, compressed_ffp) - restored = decompress_waveform(compressed_ffp) - - for coord_name in ds.coords: - np.testing.assert_array_equal( - restored.coords[coord_name].values, - ds.coords[coord_name].values, - err_msg=f"Coordinate {coord_name!r} differs after roundtrip", - ) - - -def test_no_drift(tmp_path: Path) -> None: - """Verify that the decompressed waveform has no accumulated drift. - - When the waveform is quiet (near zero) before and after a burst of - seismic activity, the decompressed tail must return to near zero - rather than drifting away—a symptom of cumulative errors in the - reconstruction path. - """ - rng = np.random.default_rng(54321) - - # Build a seismic-like waveform: quiet → active → quiet. - time = np.arange(N_TIME) * DT - envelope = np.exp(-0.5 * ((time - 50.0) / 10.0) ** 2) - - waveform = np.zeros((N_COMPONENTS, N_STATIONS, N_TIME), dtype=np.float32) - for c in range(N_COMPONENTS): - for s in range(N_STATIONS): - for freq in [0.5, 1.0, 3.0, 7.0]: - amp = rng.uniform(0.005, 0.03) - phase = rng.uniform(0, 2 * np.pi) - waveform[c, s, :] += ( - amp * envelope * np.sin(2 * np.pi * freq * time + phase) - ).astype(np.float32) - waveform += (rng.standard_normal(waveform.shape) * 1e-5).astype(np.float32) - - ds = xr.Dataset( - {"waveform": (["component", "station", "time"], waveform)}, - coords={ - "component": ("component", ["x", "y", "z"]), - "station": ("station", [f"STA{i:03d}" for i in range(N_STATIONS)]), - "time": ("time", time), - }, - ) + xr.testing.assert_allclose(restored, ds, atol=5e-4) - input_ffp = tmp_path / "broadband.nc" - compressed_ffp = tmp_path / "broadband_compressed.h5" - ds.to_netcdf(input_ffp, engine="h5netcdf") - compress_waveform(input_ffp, compressed_ffp) - restored = decompress_waveform(compressed_ffp) +def test_compression_efficiency(tmp_path: Path): + """Verify the compressed file is actually smaller than the raw NetCDF.""" + input_path = tmp_path / "input.h5" + output_path = tmp_path / "output.h5" - orig = ds.waveform.values - rest = restored.waveform.values + ds = _make_test_dataset() + with _make_test_dataset() as ds: + ds.to_netcdf(input_path, engine="h5netcdf") - # The last 20 % of the trace is the quiet tail; its mean should be - # essentially zero (matching the original) rather than drifted. - tail_start = int(N_TIME * 0.8) - tail_mean_orig = np.mean(orig[:, :, tail_start:], axis=-1) - tail_mean_rest = np.mean(rest[:, :, tail_start:], axis=-1) - drift = tail_mean_rest - tail_mean_orig + compress_waveform(input_path, output_path) - max_abs = float(np.abs(orig).max()) - scale_factor = max_abs / _SCALE_LIMIT + raw_size = input_path.stat().st_size + compressed_size = output_path.stat().st_size - # Drift must stay within one quantisation step. - assert np.abs(drift).max() <= scale_factor, ( - f"Drift {np.abs(drift).max():.3e} exceeds one LSB ({scale_factor:.3e})" + assert compressed_size < raw_size, ( + f"Compression failed to reduce size: {compressed_size} >= {raw_size}" ) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index c41a7c7c..3086cad5 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -40,11 +40,11 @@ from pathlib import Path from typing import Annotated +import flacarray import h5py import numpy as np import typer import xarray as xr -import flacarray from qcore import cli from workflow import log_utils @@ -98,14 +98,13 @@ def _read_coords(hdf: h5py.File) -> dict[str, tuple[tuple[str, ...], np.ndarray] return coords - @cli.from_docstring(app) @log_utils.log_call() def compress_waveform( waveform_ffp: Annotated[Path, typer.Argument(dir_okay=False, exists=True)], output_ffp: Annotated[Path, typer.Argument(dir_okay=False, writable=True)], level: Annotated[int, typer.Option(min=0, max=8)] = 5, - precision: Annotated[int, typer.Option(min=1)] = 4 + precision: Annotated[int, typer.Option(min=1)] = 4, ) -> None: """Compress a broadband waveform file using FlacArray. @@ -128,8 +127,17 @@ def compress_waveform( FLAC precision level (in significant digits of input data). Higher values compress more but are lose more precision. Defaults to 4. """ - with h5py.File(output_ffp, "w") as hdf, xr.open_dataset(waveform_ffp) as broadband: - flacarray.hdf5.write_array(broadband.waveform.values, hdf, precision=precision, level=level, use_threads=True) + with ( + h5py.File(output_ffp, "w") as hdf, + xr.open_dataset(waveform_ffp, engine="h5netcdf") as broadband, + ): + flacarray.hdf5.write_array( + broadband.waveform.values, + hdf, + precision=precision, + level=level, + use_threads=True, + ) _write_coords(hdf, broadband) for attr_name, attr_value in broadband.attrs.items(): From 03bf833f0ea0ab5e774c22a93ce588385b293a44 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Mon, 16 Mar 2026 16:12:25 +1300 Subject: [PATCH 14/25] fix(compress-waveform): satisfy type checker --- workflow/scripts/compress_waveform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index 3086cad5..dbf9e6f5 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -131,7 +131,7 @@ def compress_waveform( h5py.File(output_ffp, "w") as hdf, xr.open_dataset(waveform_ffp, engine="h5netcdf") as broadband, ): - flacarray.hdf5.write_array( + flacarray.hdf5.write_array( # type: ignore[possibly-missing-attribute] broadband.waveform.values, hdf, precision=precision, @@ -161,7 +161,7 @@ def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: and attributes restored. """ with h5py.File(compressed_ffp, "r") as hdf: - waveform = flacarray.hdf5.read_array(hdf, use_threads=True) + waveform = flacarray.hdf5.read_array(hdf, use_threads=True) # type: ignore[possibly-missing-attribute] dims = list(hdf.attrs["waveform_dims"]) coords = _read_coords(hdf) From 80d018d65103ee73670e8aecd9c7aeabbfd50523 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Mon, 16 Mar 2026 16:14:05 +1300 Subject: [PATCH 15/25] tests(compress-waveform): remove ds make test dataset --- tests/test_compress_waveform.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py index e5cdbf2c..bd4eba77 100644 --- a/tests/test_compress_waveform.py +++ b/tests/test_compress_waveform.py @@ -34,7 +34,7 @@ def _make_test_dataset() -> xr.Dataset: ) -def test_waveform_roundtrip_integrity(tmp_path: Path): +def test_waveform_roundtrip_integrity(tmp_path: Path) -> None: """Verify waveform values and metadata survive the compression roundtrip.""" with _make_test_dataset() as ds: input_path = tmp_path / "input.h5" @@ -56,12 +56,11 @@ def test_waveform_roundtrip_integrity(tmp_path: Path): xr.testing.assert_allclose(restored, ds, atol=5e-4) -def test_compression_efficiency(tmp_path: Path): - """Verify the compressed file is actually smaller than the raw NetCDF.""" +def test_compression_efficiency(tmp_path: Path) -> None: + """Verify the compressed file is actually smaller than the raw values.""" input_path = tmp_path / "input.h5" output_path = tmp_path / "output.h5" - ds = _make_test_dataset() with _make_test_dataset() as ds: ds.to_netcdf(input_path, engine="h5netcdf") From 05ff4736074138d0105b0239fe2b71515810fe2d Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Mon, 16 Mar 2026 16:14:53 +1300 Subject: [PATCH 16/25] fix(hf-sim): fix import --- workflow/scripts/hf_sim.py | 1 + 1 file changed, 1 insertion(+) diff --git a/workflow/scripts/hf_sim.py b/workflow/scripts/hf_sim.py index 1b51894d..ea009afb 100644 --- a/workflow/scripts/hf_sim.py +++ b/workflow/scripts/hf_sim.py @@ -33,6 +33,7 @@ """ import concurrent.futures +import hashlib import subprocess import tempfile from collections.abc import Iterable From 0568c17062871ddc3c6ca1089fb6ea3532529214 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Mon, 16 Mar 2026 16:15:19 +1300 Subject: [PATCH 17/25] remove unused import --- workflow/scripts/compress_waveform.py | 1 - 1 file changed, 1 deletion(-) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index dbf9e6f5..b76d4b29 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -36,7 +36,6 @@ See the output of ``compress-waveform --help``. """ -import functools from pathlib import Path from typing import Annotated From 034183d1a204e9b6d6712d9b0610655928495f50 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Mon, 16 Mar 2026 16:16:17 +1300 Subject: [PATCH 18/25] include h5py as a dependency and remove dask --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 089a1ba3..2dbe8a67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,12 +20,12 @@ dependencies = [ # Data Formats "flacarray", "geopandas", + "h5py", "pandas[parquet, hdf5]", "pyyaml", "xarray[io]", # Numerics - "dask", "numpy", "scipy", "shapely", From a086d55b82fe682edf558e3ed15a8fc3ebb45ec0 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Tue, 17 Mar 2026 12:06:58 +1300 Subject: [PATCH 19/25] feat: add xarray shim for waveform reading --- pyproject.toml | 3 +- uv.lock | 2 + workflow/scripts/compress_waveform.py | 109 +++----------------- workflow/waveform.py | 138 ++++++++++++++++++++++++++ 4 files changed, 157 insertions(+), 95 deletions(-) create mode 100644 workflow/waveform.py diff --git a/pyproject.toml b/pyproject.toml index 2dbe8a67..60c26f3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,13 +18,13 @@ dependencies = [ "qcore-utils>=2025.12.2", "source_modelling>=2025.12.1", # Data Formats + "dask", "flacarray", "geopandas", "h5py", "pandas[parquet, hdf5]", "pyyaml", "xarray[io]", - # Numerics "numpy", "scipy", @@ -37,7 +37,6 @@ dependencies = [ "schema", # For loading realisations "structlog", # Logging. "psutil", # To get the CPU affinity for jobs - ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 155f259d..d43de148 100644 --- a/uv.lock +++ b/uv.lock @@ -3188,6 +3188,7 @@ dependencies = [ { name = "dask" }, { name = "flacarray" }, { name = "geopandas" }, + { name = "h5py" }, { name = "im-calculation" }, { name = "nshmdb" }, { name = "numpy" }, @@ -3232,6 +3233,7 @@ requires-dist = [ { name = "deptry", marker = "extra == 'dev'" }, { name = "flacarray" }, { name = "geopandas" }, + { name = "h5py" }, { name = "hypothesis", extras = ["numpy"], marker = "extra == 'test'", specifier = ">=6.0.0" }, { name = "im-calculation", specifier = ">=2025.12.5" }, { name = "nshmdb", specifier = ">=2025.12.1" }, diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index b76d4b29..a10bba7a 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -41,7 +41,6 @@ import flacarray import h5py -import numpy as np import typer import xarray as xr @@ -51,52 +50,6 @@ app = typer.Typer() -def _write_coords( - hdf: h5py.File, - broadband: xr.Dataset, -) -> None: - """Write all coordinate variables to an HDF5 group. - - Parameters - ---------- - hdf : h5py.File - The open HDF5 file to write to. - broadband : xr.Dataset - The source xarray dataset. - """ - coords_grp = hdf.create_group("coords") - for coord_name in broadband.coords: - coord_data = broadband.coords[coord_name].values - if coord_data.dtype.kind == "U": - coord_data = coord_data.astype(bytes) - dset = coords_grp.create_dataset(coord_name, data=coord_data) - dset.attrs["dims"] = list(broadband.coords[coord_name].dims) - - -def _read_coords(hdf: h5py.File) -> dict[str, tuple[tuple[str, ...], np.ndarray]]: - """Read coordinate variables from an HDF5 group. - - Parameters - ---------- - hdf : h5py.File - The open HDF5 file to read from. - - Returns - ------- - dict - A mapping of coordinate names to (dims, values) tuples. - """ - coords: dict[str, tuple[tuple[str, ...], np.ndarray]] = {} - for coord_name in hdf["coords"]: - dset = hdf["coords"][coord_name] - data = dset[:] - if data.dtype.kind == "S": - data = data.astype(str) - dims = tuple(dset.attrs["dims"]) - coords[coord_name] = (dims, data) - return coords - - @cli.from_docstring(app) @log_utils.log_call() def compress_waveform( @@ -127,54 +80,24 @@ def compress_waveform( are lose more precision. Defaults to 4. """ with ( - h5py.File(output_ffp, "w") as hdf, xr.open_dataset(waveform_ffp, engine="h5netcdf") as broadband, ): - flacarray.hdf5.write_array( # type: ignore[possibly-missing-attribute] - broadband.waveform.values, - hdf, - precision=precision, - level=level, - use_threads=True, - ) - _write_coords(hdf, broadband) - - for attr_name, attr_value in broadband.attrs.items(): - hdf.attrs[attr_name] = attr_value - - hdf.attrs["waveform_dims"] = list(broadband["waveform"].dims) - - -def decompress_waveform(compressed_ffp: Path) -> xr.Dataset: - """Decompress a FlacArray-compressed waveform file to an xarray Dataset. - - Parameters - ---------- - compressed_ffp : Path - Path to the compressed HDF5 file produced by ``compress_waveform``. - - Returns - ------- - xr.Dataset - The decompressed waveform dataset with all original coordinates - and attributes restored. - """ - with h5py.File(compressed_ffp, "r") as hdf: - waveform = flacarray.hdf5.read_array(hdf, use_threads=True) # type: ignore[possibly-missing-attribute] - - dims = list(hdf.attrs["waveform_dims"]) - coords = _read_coords(hdf) - attrs = { - k: v - for k, v in hdf.attrs.items() - if k not in ("waveform_dims", "scale_factor", "waveform_dtype") - } - - return xr.Dataset( - {"waveform": (dims, waveform)}, - coords=coords, - attrs=attrs, - ) + broadband.drop_vars("waveform").to_netcdf(output_ffp, engine="h5netcdf") + with h5py.File(output_ffp, "a") as hdf: + group = hdf.create_group("_flac_compressed_waveform") + group.attrs["flac_array"] = True + group.attrs["name"] = "waveform" + group.attrs["shape"] = broadband.waveform.shape + group.attrs["dims"] = broadband.waveform.dims + group.attrs["dtype"] = str(broadband.waveform.dtype) + + flacarray.hdf5.write_array( # type: ignore[possibly-missing-attribute] + broadband.waveform.values, + group, + precision=precision, + level=level, + use_threads=True, + ) if __name__ == "__main__": diff --git a/workflow/waveform.py b/workflow/waveform.py new file mode 100644 index 00000000..2a8dad28 --- /dev/null +++ b/workflow/waveform.py @@ -0,0 +1,138 @@ +"""API for loading compressed waveform datasets.""" + +from pathlib import Path + +import dask.array as da +import flacarray +import h5py +import numpy as np +import xarray as xr +from numpy.lib import stride_tricks + + +class FlacH5Wrapper: + """An array-like interface for FLAC compressed waveforms that only allocates memory for the compressed raw bytes.""" + + def __init__( + self, + group: h5py.Group, + shape: tuple, + dtype: np.dtype, + ) -> None: + """Initialise the FLAC hdf5 wrapper. + + + Parameters + ---------- + group : h5py.Group + HDF5 group to read from + shape : tuple + Shape of output array. + dtype : np.dtype + Data type of output array. + """ + self.shape = shape + self.dtype = dtype + self.ndim = len(shape) + self.stream_gains = np.array(group["stream_gains"]) + self.stream_offsets = np.array(group["stream_offsets"]) + self.stream_starts = np.array(group["stream_starts"]) + self.stream_nbytes = np.array(group["stream_bytes"]) + self.raw_bytes = np.array(group["compressed"]) + + def __getitem__(self, key: tuple[int | slice, ...]) -> np.ndarray: + """Getitem implementation that decompresses a slice of the compressed array data. + + + + Parameters + ---------- + key : tuple[int | slice, ...] + indexing key + + Returns + ------- + np.ndarray + Decompressed waveform array. + """ + keep = np.zeros(self.stream_starts.shape, dtype=bool) + stream_indexer = key[:-1] + sample_indexer = key[-1] + keep[stream_indexer] = True + + if isinstance(sample_indexer, int): + first_sample = sample_indexer + last_sample = sample_indexer + 1 + else: + first_sample = ( + sample_indexer.start if sample_indexer.start is not None else 0 + ) + last_sample = ( + sample_indexer.stop + if sample_indexer.stop is not None + else self.shape[-1] + ) + + decompressed_data, _ = flacarray.decompress.array_decompress_slice( + compressed=self.raw_bytes, + stream_size=self.shape[-1], # assuming last dim is time/samples + stream_starts=self.stream_starts, + stream_nbytes=self.stream_nbytes, + stream_offsets=self.stream_offsets, + stream_gains=self.stream_gains, + keep=keep, + first_stream_sample=first_sample, + last_stream_sample=last_sample, + is_int64=(self.dtype == np.float64), + ) + + dummy = np.zeros(1, dtype=np.int8) + # The following is a "virtual array" with the same shape as + # the theoretical array but the strides (how much memory each + # element consumes) is zero so the effective memory + # consumption of this array is zero. We are using this to get + # an array-like that can calculate the expected shape from the + # indexing operation without requiring potentially tens of gb + # of memory. + virtual_array = stride_tricks.as_strided( + dummy, shape=self.shape, strides=(0,) * len(self.shape) + ) + target_shape = virtual_array[key].shape + + return decompressed_data.reshape(target_shape) + + +def load_waveform_dataset(dataset_ffp: Path) -> xr.Dataset: + """Read a compressed waveform array as an xarray dataset. + + Parameters + ---------- + dataset_ffp : Path + Path to xarray dataset. + + Returns + ------- + xr.Dataset + The xarray dataset read from disk. + """ + ds = xr.open_dataset( + dataset_ffp, engine="h5netcdf", drop_variables=["_flac_compressed_waveform"] + ) + + with h5py.File(dataset_ffp, "r") as h5: + group = h5["_flac_compressed_waveform"] + shape = tuple(group.attrs["shape"]) + dtype = np.dtype(group.attrs["dtype"]) + name = group.attrs["name"] + dims = group.attrs["dims"] + + wrapper = FlacH5Wrapper(group, shape, dtype) + + waveform_da = da.from_array( + wrapper, + chunks="auto", + name=name, + ) + + ds[name] = (dims, waveform_da) + return ds From f434df76d2f76b38739cb8138096a6f859e79499 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Tue, 17 Mar 2026 12:20:34 +1300 Subject: [PATCH 20/25] fix: ci nitpicks --- tests/test_compress_waveform.py | 4 ++-- workflow/waveform.py | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/tests/test_compress_waveform.py b/tests/test_compress_waveform.py index bd4eba77..add99c2e 100644 --- a/tests/test_compress_waveform.py +++ b/tests/test_compress_waveform.py @@ -3,9 +3,9 @@ import numpy as np import xarray as xr +from workflow import waveform from workflow.scripts.compress_waveform import ( compress_waveform, - decompress_waveform, ) # Constants for test data generation @@ -43,7 +43,7 @@ def test_waveform_roundtrip_integrity(tmp_path: Path) -> None: output_path = tmp_path / "output.h5" compress_waveform(input_path, output_path) - restored = decompress_waveform(output_path) + restored = waveform.load_waveform_dataset(output_path).compute() restored_subset = {k: v for k, v in restored.attrs.items() if k in original_attrs} assert restored_subset == original_attrs, ( diff --git a/workflow/waveform.py b/workflow/waveform.py index 2a8dad28..7c8165e0 100644 --- a/workflow/waveform.py +++ b/workflow/waveform.py @@ -3,7 +3,7 @@ from pathlib import Path import dask.array as da -import flacarray +import flacarray.decompress import h5py import numpy as np import xarray as xr @@ -11,7 +11,17 @@ class FlacH5Wrapper: - """An array-like interface for FLAC compressed waveforms that only allocates memory for the compressed raw bytes.""" + """An array-like interface for FLAC compressed waveforms that only allocates memory for the compressed raw bytes. + + Parameters + ---------- + group : h5py.Group + HDF5 group to read from + shape : tuple + Shape of output array. + dtype : np.dtype + Data type of output array. + """ def __init__( self, From 40f47863e3def6130be7464feef657351fd24fd5 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Tue, 17 Mar 2026 12:21:37 +1300 Subject: [PATCH 21/25] docs: reduce documentation --- workflow/scripts/compress_waveform.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index a10bba7a..f9251ced 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -2,16 +2,7 @@ Description ----------- -Compress a broadband waveform HDF5 file using FlacArray compression with -int32 rescaling and component-delta encoding for efficient storage. - -The waveform data is rescaled to fill the safe range of a signed 32-bit -integer and delta encoded along the component axis (to exploit -inter-component correlation) before FLAC compression. FLAC's built-in -linear prediction handles temporal smoothness internally, so no explicit -time-axis delta encoding is needed. All coordinates and attributes from -the input xarray dataset are preserved so the compressed file can be -decompressed back to a complete xarray Dataset. +Compress a broadband waveform HDF5 file using FLAC compression. Inputs ------ @@ -39,7 +30,7 @@ from pathlib import Path from typing import Annotated -import flacarray +import flacarray.hdf5 import h5py import typer import xarray as xr @@ -91,7 +82,7 @@ def compress_waveform( group.attrs["dims"] = broadband.waveform.dims group.attrs["dtype"] = str(broadband.waveform.dtype) - flacarray.hdf5.write_array( # type: ignore[possibly-missing-attribute] + flacarray.hdf5.write_array( broadband.waveform.values, group, precision=precision, From cd8d660d5357e86c5940e7272edffee112836555 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Tue, 17 Mar 2026 12:21:58 +1300 Subject: [PATCH 22/25] docs: make CLI docs simpler --- workflow/scripts/compress_waveform.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index f9251ced..07bf8866 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -49,13 +49,7 @@ def compress_waveform( level: Annotated[int, typer.Option(min=0, max=8)] = 5, precision: Annotated[int, typer.Option(min=1)] = 4, ) -> None: - """Compress a broadband waveform file using FlacArray. - - The waveform is chunked by station so that each parallel dask task - processes complete component-triples with full timeseries. Within - each chunk the data is rescaled to a safe sub-range of the signed - 32-bit integer type and delta encoded along the component axis - before FLAC compression. + """Compress a broadband waveform file using FLAC. Parameters ---------- From 4fe393011beec49bcb1a6f57206da8562594d679 Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Tue, 17 Mar 2026 12:22:30 +1300 Subject: [PATCH 23/25] docs: precision accuracy --- workflow/scripts/compress_waveform.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/scripts/compress_waveform.py b/workflow/scripts/compress_waveform.py index 07bf8866..f9b0041a 100644 --- a/workflow/scripts/compress_waveform.py +++ b/workflow/scripts/compress_waveform.py @@ -61,8 +61,8 @@ def compress_waveform( FLAC compression level (0-8). Higher values compress more but are slower. Defaults to 5. precision : int, optional - FLAC precision level (in significant digits of input data). Higher values compress more but - are lose more precision. Defaults to 4. + FLAC precision level (in significant digits of input data). Higher values compress less but + have more precision. Defaults to 4. """ with ( xr.open_dataset(waveform_ffp, engine="h5netcdf") as broadband, From b3131096e93d91073aac6202d4d917fa183a9d4b Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Tue, 17 Mar 2026 12:41:59 +1300 Subject: [PATCH 24/25] fix slice indexing Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- workflow/waveform.py | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/workflow/waveform.py b/workflow/waveform.py index 7c8165e0..98badc7d 100644 --- a/workflow/waveform.py +++ b/workflow/waveform.py @@ -1,6 +1,7 @@ """API for loading compressed waveform datasets.""" from pathlib import Path +from numbers import Integral import dask.array as da import flacarray.decompress @@ -70,18 +71,32 @@ def __getitem__(self, key: tuple[int | slice, ...]) -> np.ndarray: sample_indexer = key[-1] keep[stream_indexer] = True - if isinstance(sample_indexer, int): - first_sample = sample_indexer - last_sample = sample_indexer + 1 + # Normalise the sample indexer (last dimension) and reject unsupported + # slicing patterns (e.g. stepped slices). This ensures that + # first_stream_sample/last_stream_sample passed to the decompressor are + # consistent with the actual indexing semantics. + if isinstance(sample_indexer, Integral): + # Handle scalar indices, including negative ones. + index = int(sample_indexer) + if index < 0: + index += self.shape[-1] + if index < 0 or index >= self.shape[-1]: + raise IndexError("Sample index out of range") + first_sample = index + last_sample = index + 1 else: - first_sample = ( - sample_indexer.start if sample_indexer.start is not None else 0 - ) - last_sample = ( - sample_indexer.stop - if sample_indexer.stop is not None - else self.shape[-1] - ) + if not isinstance(sample_indexer, slice): + raise TypeError( + f"Unsupported index type for samples: {type(sample_indexer)!r}" + ) + start, stop, step = sample_indexer.indices(self.shape[-1]) + if step != 1: + raise IndexError( + "Stepped slicing for the sample dimension is not supported; " + f"got step={step!r}" + ) + first_sample = start + last_sample = stop decompressed_data, _ = flacarray.decompress.array_decompress_slice( compressed=self.raw_bytes, From 32d785ad55983fcab18527ed9116178c6da97a4b Mon Sep 17 00:00:00 2001 From: Jake Faulkner Date: Tue, 17 Mar 2026 12:43:43 +1300 Subject: [PATCH 25/25] feat: support stepped timestepping values --- workflow/waveform.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/workflow/waveform.py b/workflow/waveform.py index 98badc7d..a048e74c 100644 --- a/workflow/waveform.py +++ b/workflow/waveform.py @@ -1,7 +1,7 @@ """API for loading compressed waveform datasets.""" -from pathlib import Path from numbers import Integral +from pathlib import Path import dask.array as da import flacarray.decompress @@ -71,10 +71,7 @@ def __getitem__(self, key: tuple[int | slice, ...]) -> np.ndarray: sample_indexer = key[-1] keep[stream_indexer] = True - # Normalise the sample indexer (last dimension) and reject unsupported - # slicing patterns (e.g. stepped slices). This ensures that - # first_stream_sample/last_stream_sample passed to the decompressor are - # consistent with the actual indexing semantics. + step = None if isinstance(sample_indexer, Integral): # Handle scalar indices, including negative ones. index = int(sample_indexer) @@ -85,16 +82,15 @@ def __getitem__(self, key: tuple[int | slice, ...]) -> np.ndarray: first_sample = index last_sample = index + 1 else: + # Normalise the sample indexer (last dimension). This ensures that + # first_stream_sample/last_stream_sample passed to the decompressor are + # consistent with the actual indexing semantics. + if not isinstance(sample_indexer, slice): raise TypeError( f"Unsupported index type for samples: {type(sample_indexer)!r}" ) start, stop, step = sample_indexer.indices(self.shape[-1]) - if step != 1: - raise IndexError( - "Stepped slicing for the sample dimension is not supported; " - f"got step={step!r}" - ) first_sample = start last_sample = stop @@ -111,6 +107,9 @@ def __getitem__(self, key: tuple[int | slice, ...]) -> np.ndarray: is_int64=(self.dtype == np.float64), ) + if step not in (None, 1): + decompressed_data = decompressed_data[..., ::step] + dummy = np.zeros(1, dtype=np.int8) # The following is a "virtual array" with the same shape as # the theoretical array but the strides (how much memory each