From 85e2d8da1d64029bc24a3c6838eee41db2a7146e Mon Sep 17 00:00:00 2001 From: glopesdev Date: Sat, 22 Mar 2025 12:25:58 +0000 Subject: [PATCH 1/5] Unify read and parse into a single function --- harp/__init__.py | 4 +-- harp/io.py | 75 +++++++++--------------------------------------- harp/reader.py | 63 +++++++++------------------------------- harp/typing.py | 9 ++++-- tests/test_io.py | 37 ++++++++++-------------- 5 files changed, 50 insertions(+), 138 deletions(-) diff --git a/harp/__init__.py b/harp/__init__.py index 31ec97f..d3581a5 100644 --- a/harp/__init__.py +++ b/harp/__init__.py @@ -1,5 +1,5 @@ -from harp.io import REFERENCE_EPOCH, MessageType, parse, read +from harp.io import REFERENCE_EPOCH, MessageType, read from harp.reader import create_reader from harp.schema import read_schema -__all__ = ["REFERENCE_EPOCH", "MessageType", "parse", "read", "create_reader", "read_schema"] +__all__ = ["REFERENCE_EPOCH", "MessageType", "read", "create_reader", "read_schema"] diff --git a/harp/io.py b/harp/io.py index 28ffe1f..7640417 100644 --- a/harp/io.py +++ b/harp/io.py @@ -8,7 +8,7 @@ import pandas as pd from pandas._typing import Axes -from harp.typing import BufferLike +from harp.typing import _BufferLike, _FileLike REFERENCE_EPOCH = datetime(1904, 1, 1) """The reference epoch for UTC harp time.""" @@ -41,48 +41,7 @@ class MessageType(IntEnum): def read( - file: Union[str, bytes, PathLike[Any], BinaryIO], - address: Optional[int] = None, - dtype: Optional[np.dtype] = None, - length: Optional[int] = None, - columns: Optional[Axes] = None, - epoch: Optional[datetime] = None, - keep_type: bool = False, -): - """Read single-register Harp data from the specified file. - - Parameters - ---------- - file - Open file object or filename containing binary data from - a single device register. - address - Expected register address. If specified, the address of - the first message in the file is used for validation. - dtype - Expected data type of the register payload. If specified, the - payload type of the first message in the file is used for validation. - length - Expected number of elements in register payload. If specified, the - payload length of the first message in the file is used for validation. - columns - The optional column labels to use for the data values. - epoch - Reference datetime at which time zero begins. If specified, - the result data frame will have a datetime index. - keep_type - Specifies whether to include a column with the message type. - - Returns - ------- - A pandas data frame containing message data, sorted by time. - """ - data = np.fromfile(file, dtype=np.uint8) - return _fromraw(data, address, dtype, length, columns, epoch, keep_type) - - -def parse( - buffer: BufferLike, + file_or_buf: Union[_FileLike, _BufferLike], address: Optional[int] = None, dtype: Optional[np.dtype] = None, length: Optional[int] = None, @@ -90,22 +49,22 @@ def parse( epoch: Optional[datetime] = None, keep_type: bool = False, ): - """Parse single-register Harp data from the specified buffer. + """Read single-register Harp data from the specified file or buffer. Parameters ---------- - buffer - An object that exposes a buffer interface containing binary data from + file_or_buf + File path, open file object, or buffer containing binary data from a single device register. address Expected register address. If specified, the address of - the first message in the buffer is used for validation. + the first message is used for validation. dtype Expected data type of the register payload. If specified, the - payload type of the first message in the buffer is used for validation. + payload type of the first message is used for validation. length Expected number of elements in register payload. If specified, the - payload length of the first message in the buffer is used for validation. + payload length of the first message is used for validation. columns The optional column labels to use for the data values. epoch @@ -118,19 +77,13 @@ def parse( ------- A pandas data frame containing message data, sorted by time. """ - data = np.frombuffer(buffer, dtype=np.uint8) - return _fromraw(data, address, dtype, length, columns, epoch, keep_type) - + if isinstance(file_or_buf, (str, PathLike, BinaryIO)) or hasattr(file_or_buf, "readinto"): + # TODO: in the below we ignore the type as otherwise + # we have no way to runtime check _IOProtocol + data = np.fromfile(file_or_buf, dtype=np.uint8) # type: ignore + else: + data = np.frombuffer(file_or_buf, dtype=np.uint8) -def _fromraw( - data: npt.NDArray[np.uint8], - address: Optional[int] = None, - dtype: Optional[np.dtype] = None, - length: Optional[int] = None, - columns: Optional[Axes] = None, - epoch: Optional[datetime] = None, - keep_type: bool = False, -): if len(data) == 0: return pd.DataFrame( columns=columns, diff --git a/harp/reader.py b/harp/reader.py index 3954892..590164c 100644 --- a/harp/reader.py +++ b/harp/reader.py @@ -6,21 +6,21 @@ from math import log2 from os import PathLike from pathlib import Path -from typing import Any, BinaryIO, Callable, Iterable, Mapping, Optional, Protocol, Union +from typing import Callable, Iterable, Mapping, Optional, Protocol, Union from numpy import dtype from pandas import DataFrame, Series from pandas._typing import Axes -from harp.io import MessageType, parse, read +from harp.io import MessageType, read from harp.model import BitMask, GroupMask, Model, PayloadMember, Register from harp.schema import read_schema -from harp.typing import BufferLike +from harp.typing import _BufferLike, _FileLike @dataclass class _ReaderParams: - path: Path + base_path: Path epoch: Optional[datetime] = None keep_type: bool = False @@ -28,16 +28,7 @@ class _ReaderParams: class _ReadRegister(Protocol): def __call__( self, - file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None, - epoch: Optional[datetime] = None, - keep_type: bool = False, - ) -> DataFrame: ... - - -class _ParseRegister(Protocol): - def __call__( - self, - buffer: BufferLike, + file_or_buf: Optional[Union[_FileLike, _BufferLike]] = None, epoch: Optional[datetime] = None, keep_type: bool = False, ) -> DataFrame: ... @@ -46,17 +37,14 @@ def __call__( class RegisterReader: register: Register read: _ReadRegister - parse: _ParseRegister def __init__( self, register: Register, read: _ReadRegister, - parse: _ParseRegister, ) -> None: self.register = register self.read = read - self.parse = parse class RegisterMap(UserDict[str, RegisterReader]): @@ -180,16 +168,16 @@ def parser(df: DataFrame): def _create_register_reader(register: Register, params: _ReaderParams): def reader( - file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None, + file_or_buf: Optional[Union[_FileLike, _BufferLike]] = None, columns: Optional[Axes] = None, epoch: Optional[datetime] = params.epoch, keep_type: bool = params.keep_type, ): - if file is None: - file = f"{params.path}_{register.address}.bin" + if file_or_buf is None: + file_or_buf = f"{params.base_path}_{register.address}.bin" data = read( - file, + file_or_buf, address=register.address, dtype=dtype(register.type), length=register.length, @@ -202,30 +190,9 @@ def reader( return reader -def _create_register_parser(register: Register, params: _ReaderParams): - def parser( - buffer: BufferLike, - columns: Optional[Axes] = None, - epoch: Optional[datetime] = params.epoch, - keep_type: bool = params.keep_type, - ): - return parse( - buffer, - address=register.address, - dtype=dtype(register.type), - length=register.length, - columns=columns, - epoch=epoch, - keep_type=keep_type, - ) - - return parser - - def _create_register_handler(device: Model, name: str, params: _ReaderParams): register = device.registers[name] reader = _create_register_reader(register, params) - parser = _create_register_parser(register, params) if register.maskType is not None: key = register.maskType.root @@ -233,15 +200,13 @@ def _create_register_handler(device: Model, name: str, params: _ReaderParams): if bitMask is not None: bitmask_parser = _create_bitmask_parser(bitMask) reader = _compose_parser(bitmask_parser, reader, params) - parser = _compose_parser(bitmask_parser, parser, params) - return RegisterReader(register, reader, parser) + return RegisterReader(register, reader) groupMask = None if device.groupMasks is None else device.groupMasks.get(key) if groupMask is not None: groupmask_parser = _create_groupmask_parser(name, groupMask) reader = _compose_parser(groupmask_parser, reader, params) - parser = _compose_parser(groupmask_parser, parser, params) - return RegisterReader(register, reader, parser) + return RegisterReader(register, reader) if register.payloadSpec is not None: member_parsers = [ @@ -253,8 +218,7 @@ def payload_parser(df: DataFrame): return DataFrame({n: f(df) for n, f in member_parsers}, index=df.index) reader = _compose_parser(payload_parser, reader, params) - parser = _compose_parser(payload_parser, parser, params) - return RegisterReader(register, reader, parser) + return RegisterReader(register, reader) columns = ( [name] @@ -262,8 +226,7 @@ def payload_parser(df: DataFrame): else [f"{name}_{i}" for i in range(register.length)] ) reader = partial(reader, columns=columns) - parser = partial(parser, columns=columns) - return RegisterReader(register, reader, parser) + return RegisterReader(register, reader) def create_reader( diff --git a/harp/typing.py b/harp/typing.py index e462ecd..1738f70 100644 --- a/harp/typing.py +++ b/harp/typing.py @@ -1,10 +1,13 @@ import mmap import sys -from typing import Any, Union +from os import PathLike +from typing import Any, BinaryIO, Union from numpy.typing import NDArray if sys.version_info >= (3, 12): - from collections.abc import Buffer as BufferLike + from collections.abc import Buffer as _BufferLike else: - BufferLike = Union[bytes, bytearray, memoryview, mmap.mmap, NDArray[Any]] + _BufferLike = Union[bytes, bytearray, memoryview, mmap.mmap, NDArray[Any]] + +_FileLike = Union[str, PathLike[str], BinaryIO] diff --git a/tests/test_io.py b/tests/test_io.py index 74536b3..7d1e4a3 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -5,7 +5,7 @@ import pytest from pytest import mark -from harp.io import REFERENCE_EPOCH, MessageType, format, parse, read +from harp.io import REFERENCE_EPOCH, MessageType, format, read from tests.params import DataFileParam testdata = [ @@ -41,27 +41,20 @@ def test_read(dataFile: DataFileParam): context = pytest.raises if dataFile.expected_error else nullcontext with context(dataFile.expected_error): # type: ignore - path = dataFile.path + file_or_buf = dataFile.path if dataFile.repeat_data: - with open(path, "rb") as f: - buffer = f.read() * dataFile.repeat_data - data = parse( - buffer, - address=dataFile.expected_address, - dtype=dataFile.expected_dtype, - length=dataFile.expected_length, - epoch=dataFile.epoch, - keep_type=dataFile.keep_type, - ) - else: - data = read( - path, - address=dataFile.expected_address, - dtype=dataFile.expected_dtype, - length=dataFile.expected_length, - epoch=dataFile.epoch, - keep_type=dataFile.keep_type, - ) + with open(file_or_buf, "rb") as f: + file_or_buf = f.read() * dataFile.repeat_data + + data = read( + file_or_buf, + address=dataFile.expected_address, + dtype=dataFile.expected_dtype, + length=dataFile.expected_length, + epoch=dataFile.epoch, + keep_type=dataFile.keep_type, + ) + assert len(data) == dataFile.expected_rows assert isinstance(data.index, pd.DatetimeIndex if dataFile.epoch else pd.Index) if dataFile.keep_type: @@ -83,7 +76,7 @@ def test_write(dataFile: DataFileParam): raise AssertionError("expected address must be defined for all write tests") buffer = np.fromfile(dataFile.path, np.uint8) - data = parse( + data = read( buffer, address=dataFile.expected_address, dtype=dataFile.expected_dtype, From 5a99e743bc67578f679b9f0d065cedee1fdf56e9 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Sat, 22 Mar 2025 12:35:37 +0000 Subject: [PATCH 2/5] Rename write and format functions for clarity --- harp/io.py | 12 ++++++------ tests/test_io.py | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/harp/io.py b/harp/io.py index 7640417..4a93a46 100644 --- a/harp/io.py +++ b/harp/io.py @@ -138,8 +138,8 @@ def read( return result -def write( - file: Union[str, bytes, PathLike[Any], BinaryIO], +def to_file( + file: _FileLike, data: pd.DataFrame, address: int, dtype: Optional[np.dtype] = None, @@ -152,7 +152,7 @@ def write( Parameters ---------- file - Open file object or filename where to store binary data from + File path, or open file object in which to store binary data from a single device register. data Pandas data frame containing message payload. @@ -170,11 +170,11 @@ def write( Optional message type used for all formatted Harp messages. If not specified, data must contain a MessageType column. """ - buffer = format(data, address, dtype, port, epoch, message_type) + buffer = to_buffer(data, address, dtype, port, epoch, message_type) buffer.tofile(file) -def format( +def to_buffer( data: pd.DataFrame, address: int, dtype: Optional[np.dtype] = None, @@ -182,7 +182,7 @@ def format( epoch: Optional[datetime] = None, message_type: Optional[MessageType] = None, ) -> npt.NDArray[np.uint8]: - """Format single-register Harp data as a flat binary buffer. + """Convert single-register Harp data to a flat binary buffer. Parameters ---------- diff --git a/tests/test_io.py b/tests/test_io.py index 7d1e4a3..9c0e76b 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -5,7 +5,7 @@ import pytest from pytest import mark -from harp.io import REFERENCE_EPOCH, MessageType, format, read +from harp.io import REFERENCE_EPOCH, MessageType, read, to_buffer from tests.params import DataFileParam testdata = [ @@ -84,5 +84,5 @@ def test_write(dataFile: DataFileParam): keep_type=dataFile.keep_type, ) assert len(data) == dataFile.expected_rows - write_buffer = format(data, address=dataFile.expected_address) + write_buffer = to_buffer(data, address=dataFile.expected_address) assert np.array_equal(buffer, write_buffer) From 99665a62e0821e9a0b354000d4ba4de75f80165a Mon Sep 17 00:00:00 2001 From: glopesdev Date: Sat, 22 Mar 2025 12:36:02 +0000 Subject: [PATCH 3/5] Add conversion functions to base module --- harp/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/harp/__init__.py b/harp/__init__.py index d3581a5..ba6211c 100644 --- a/harp/__init__.py +++ b/harp/__init__.py @@ -1,5 +1,5 @@ -from harp.io import REFERENCE_EPOCH, MessageType, read +from harp.io import REFERENCE_EPOCH, MessageType, read, to_buffer, to_file from harp.reader import create_reader from harp.schema import read_schema -__all__ = ["REFERENCE_EPOCH", "MessageType", "read", "create_reader", "read_schema"] +__all__ = ["REFERENCE_EPOCH", "MessageType", "read", "to_buffer", "to_file", "create_reader", "read_schema"] From 3a1f1b82b32e1eeb29d4e063412c8ee342dd2717 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Sat, 22 Mar 2025 13:28:22 +0000 Subject: [PATCH 4/5] Optional length validation on binary conversion --- harp/io.py | 26 +++++++++++++++++++------- tests/test_io.py | 2 +- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/harp/io.py b/harp/io.py index 4a93a46..209a36f 100644 --- a/harp/io.py +++ b/harp/io.py @@ -139,10 +139,11 @@ def read( def to_file( - file: _FileLike, data: pd.DataFrame, + file: _FileLike, address: int, dtype: Optional[np.dtype] = None, + length: Optional[int] = None, port: Optional[int] = None, epoch: Optional[datetime] = None, message_type: Optional[MessageType] = None, @@ -151,16 +152,19 @@ def to_file( Parameters ---------- + data + Pandas data frame containing message payload. file File path, or open file object in which to store binary data from a single device register. - data - Pandas data frame containing message payload. address Register address used to identify all formatted Harp messages. dtype Data type of the register payload. If specified, all data will be converted before formatting the binary payload. + length + Expected number of elements in register payload. If specified, the + number of columns in the input data frame is validated. port Optional port value used for all formatted Harp messages. epoch @@ -170,7 +174,7 @@ def to_file( Optional message type used for all formatted Harp messages. If not specified, data must contain a MessageType column. """ - buffer = to_buffer(data, address, dtype, port, epoch, message_type) + buffer = to_buffer(data, address, dtype, port, length, epoch, message_type) buffer.tofile(file) @@ -178,6 +182,7 @@ def to_buffer( data: pd.DataFrame, address: int, dtype: Optional[np.dtype] = None, + length: Optional[int] = None, port: Optional[int] = None, epoch: Optional[datetime] = None, message_type: Optional[MessageType] = None, @@ -193,6 +198,9 @@ def to_buffer( dtype Data type of the register payload. If specified, all data will be converted before formatting the binary payload. + length + Expected number of elements in register payload. If specified, the + number of columns in the input data frame is validated. port Optional port value used for all formatted Harp messages. epoch @@ -207,7 +215,8 @@ def to_buffer( An array object containing message data formatted according to the Harp binary protocol. """ - if len(data) == 0: + nrows = len(data) + if nrows == 0: return np.empty(0, dtype=np.uint8) if "MessageType" in data.columns: @@ -231,17 +240,20 @@ def to_buffer( if dtype is not None: payload = payload.astype(dtype, copy=False) + ncols = payload.shape[1] + if length is not None and ncols != length: + raise ValueError(f"expected payload length {length} but got {ncols}") + if port is None: port = 255 payloadtype = _payloadtypefromdtype[payload.dtype] - payloadlength = payload.shape[1] * payload.dtype.itemsize + payloadlength = ncols * payload.dtype.itemsize stride = payloadlength + 6 if is_timestamped: payloadtype |= _PAYLOAD_TIMESTAMP_MASK stride += 6 - nrows = len(data) buffer = np.empty((nrows, stride), dtype=np.uint8) buffer[:, 0] = msgtype buffer[:, 1:5] = [stride - 2, address, port, payloadtype] diff --git a/tests/test_io.py b/tests/test_io.py index 9c0e76b..3a94632 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -84,5 +84,5 @@ def test_write(dataFile: DataFileParam): keep_type=dataFile.keep_type, ) assert len(data) == dataFile.expected_rows - write_buffer = to_buffer(data, address=dataFile.expected_address) + write_buffer = to_buffer(data, address=dataFile.expected_address, length=dataFile.expected_length) assert np.array_equal(buffer, write_buffer) From 63194f5fca91a037d87139a3377b639bce93cf01 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Sat, 22 Mar 2025 15:40:03 +0000 Subject: [PATCH 5/5] Use message type enum name for symmetry --- harp/io.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/harp/io.py b/harp/io.py index 209a36f..5b9091b 100644 --- a/harp/io.py +++ b/harp/io.py @@ -219,9 +219,9 @@ def to_buffer( if nrows == 0: return np.empty(0, dtype=np.uint8) - if "MessageType" in data.columns: - msgtype = data["MessageType"].cat.codes - payload = data[data.columns.drop("MessageType")].values + if MessageType.__name__ in data.columns: + msgtype = data[MessageType.__name__].cat.codes + payload = data[data.columns.drop(MessageType.__name__)].values elif message_type is not None: msgtype = message_type payload = data.values