diff --git a/core/src/drivers/plugins/python/shared/iec_type_converter.py b/core/src/drivers/plugins/python/shared/iec_type_converter.py new file mode 100644 index 00000000..70e1b63f --- /dev/null +++ b/core/src/drivers/plugins/python/shared/iec_type_converter.py @@ -0,0 +1,693 @@ +""" +IEC 61131-3 Type Converter + +This module provides centralized type conversion functions for all IEC 61131-3 data types. +It is designed to be used by all plugins (OPC-UA, Modbus, S7Comm, EtherCAT, etc.) to eliminate +code duplication and ensure consistent type handling across the plugin ecosystem. + +Main features: +- Value clamping with proper signed/unsigned handling +- Float <-> integer bit representation conversion +- Modbus register combination/splitting +- IEC_TIMESPEC handling for TIME/DATE/TOD/DT types +- Default value generation +- Endianness handling for multi-register types + +Usage: + from shared.iec_type_converter import IECTypeConverter + + # Clamp value to type bounds + value = IECTypeConverter.clamp_to_type(1000, "SINT") # Returns 127 + + # Convert float to integer representation + int_repr = IECTypeConverter.float_to_int_repr(3.14, "REAL") + + # Combine Modbus registers into value + value = IECTypeConverter.registers_to_value([0x1234, 0x5678], "DINT") +""" + +import ctypes +import struct +from datetime import datetime, timezone +from typing import Any, List, Tuple, Union + +try: + from .iec_type_registry import ( + FLOAT_TYPES, + get_canonical_name, + get_type_info, + ) +except ImportError: + from iec_type_registry import ( + FLOAT_TYPES, + get_canonical_name, + get_type_info, + ) + + +class IECTypeConverter: + """ + Centralized type conversion utilities for IEC 61131-3 data types. + + All methods are static and stateless, making this class thread-safe. + """ + + # ------------------------------------------------------------------------- + # Value Clamping and Type Coercion + # ------------------------------------------------------------------------- + + @staticmethod + def clamp_to_type(value: Union[int, float], type_name: str) -> int: + """ + Clamp a value to the bounds of an IEC type and return a ctypes-compatible value. + + This handles signed/unsigned conversion properly, ensuring that values + outside the valid range are clamped and the result has the correct + bit representation. + + Args: + value: Value to clamp (will be converted to int for integer types) + type_name: IEC type name (e.g., "SINT", "INT", "DINT") + + Returns: + Clamped value with correct ctypes representation + + Raises: + ValueError: If type_name is not a valid IEC type + """ + info = get_type_info(type_name) + if info is None: + raise ValueError(f"Unknown IEC type: {type_name}") + + # Handle float types separately + if info.is_float: + return float(value) + + # Handle string type + if info.is_string: + return str(value) + + # Convert to int and clamp + int_value = int(value) + clamped = max(info.min_value, min(info.max_value, int_value)) + + # Apply ctypes conversion to get correct bit representation + return info.ctype_class(clamped).value + + @staticmethod + def coerce_to_type(value: Any, type_name: str) -> Any: + """ + Coerce a value to the appropriate Python type for an IEC type. + + This is a more permissive version of clamp_to_type that handles + various input types (bool, str, etc.). + + Args: + value: Value to coerce + type_name: IEC type name + + Returns: + Coerced value appropriate for the type + """ + canonical = get_canonical_name(type_name) + if canonical is None: + return value + + info = get_type_info(canonical) + if info is None: + return value + + # Handle BOOL specially + if canonical == "BOOL": + if isinstance(value, bool): + return 1 if value else 0 + elif isinstance(value, (int, float)): + return 1 if value != 0 else 0 + elif isinstance(value, str): + return 1 if value.lower() in ["true", "1", "yes", "on"] else 0 + else: + return 1 if bool(value) else 0 + + # Handle STRING + if info.is_string: + return str(value) + + # Handle floats + if info.is_float: + return float(value) + + # Handle time types (expect tuple or pass through) + if info.is_time: + if isinstance(value, tuple) and len(value) == 2: + return value + elif isinstance(value, datetime): + return IECTypeConverter.datetime_to_timespec(value, canonical) + return value + + # Handle integers + return IECTypeConverter.clamp_to_type(value, canonical) + + # ------------------------------------------------------------------------- + # Float <-> Integer Bit Representation + # ------------------------------------------------------------------------- + + @staticmethod + def float_to_int_repr(value: float, type_name: str) -> int: + """ + Convert a float to its integer bit representation. + + This is used when floats need to be stored in integer buffers, + preserving the exact bit pattern. + + Args: + value: Float value to convert + type_name: "REAL" (32-bit) or "LREAL" (64-bit) + + Returns: + Integer with the same bit pattern as the float + + Raises: + ValueError: If type_name is not REAL or LREAL + """ + canonical = get_canonical_name(type_name) + if canonical not in FLOAT_TYPES: + raise ValueError(f"Type {type_name} is not a float type (REAL/LREAL)") + + try: + if canonical == "REAL": + # 32-bit float: pack as float, unpack as unsigned int + return struct.unpack("I", struct.pack("f", float(value)))[0] + else: # LREAL + # 64-bit double: pack as double, unpack as unsigned long long + return struct.unpack("Q", struct.pack("d", float(value)))[0] + except struct.error: + # Fallback for extreme values + return int(value) + + @staticmethod + def int_repr_to_float(value: int, type_name: str) -> float: + """ + Convert an integer bit representation back to a float. + + This reverses the operation of float_to_int_repr(). + + Args: + value: Integer with float bit pattern + type_name: "REAL" (32-bit) or "LREAL" (64-bit) + + Returns: + Float value + + Raises: + ValueError: If type_name is not REAL or LREAL + """ + canonical = get_canonical_name(type_name) + if canonical not in FLOAT_TYPES: + raise ValueError(f"Type {type_name} is not a float type (REAL/LREAL)") + + try: + if canonical == "REAL": + # 32-bit: unpack unsigned int as float + return struct.unpack("f", struct.pack("I", value))[0] + else: # LREAL + # 64-bit: unpack unsigned long long as double + return struct.unpack("d", struct.pack("Q", value))[0] + except struct.error: + # Fallback + return float(value) + + # ------------------------------------------------------------------------- + # Modbus Register Conversion + # ------------------------------------------------------------------------- + + @staticmethod + def registers_to_value(registers: List[int], type_name: str, big_endian: bool = False) -> int: + """ + Combine Modbus 16-bit registers into a single IEC value. + + Args: + registers: List of 16-bit register values + type_name: IEC type name (or size code: 'B', 'W', 'D', 'L') + big_endian: If True, use big-endian byte order + + Returns: + Combined value + + Raises: + ValueError: If insufficient registers or invalid type + """ + # Handle size codes for backward compatibility + size_code_map = {"B": "BYTE", "W": "WORD", "D": "DWORD", "L": "LWORD", "X": "BOOL"} + if type_name.upper() in size_code_map: + type_name = size_code_map[type_name.upper()] + + info = get_type_info(type_name) + if info is None: + raise ValueError(f"Unknown IEC type: {type_name}") + + # BOOL is handled separately (coils, not registers) + if info.register_count == 0: + raise ValueError(f"Type {type_name} does not use registers") + + if len(registers) < info.register_count: + raise ValueError( + f"Need at least {info.register_count} registers for {type_name}, " + f"got {len(registers)}" + ) + + if info.register_count == 1: + # 8-bit or 16-bit: single register + if info.size_bytes == 1: + return registers[0] & 0xFF + else: + return registers[0] & 0xFFFF + + elif info.register_count == 2: + # 32-bit: 2 registers + if big_endian: + value = (registers[0] << 16) | registers[1] + else: + value = (registers[1] << 16) | registers[0] + + # Apply sign conversion if needed + if info.signed and not info.is_float: + return ctypes.c_int32(value).value + return value + + elif info.register_count == 4: + # 64-bit: 4 registers + if big_endian: + value = ( + (registers[0] << 48) + | (registers[1] << 32) + | (registers[2] << 16) + | registers[3] + ) + else: + value = ( + (registers[3] << 48) + | (registers[2] << 32) + | (registers[1] << 16) + | registers[0] + ) + + # Apply sign conversion if needed + if info.signed and not info.is_float: + return ctypes.c_int64(value).value + return value + + else: + raise ValueError(f"Unsupported register count: {info.register_count}") + + @staticmethod + def value_to_registers(value: int, type_name: str, big_endian: bool = False) -> List[int]: + """ + Split an IEC value into Modbus 16-bit registers. + + Args: + value: IEC value to split + type_name: IEC type name (or size code: 'B', 'W', 'D', 'L') + big_endian: If True, use big-endian byte order + + Returns: + List of 16-bit register values + + Raises: + ValueError: If invalid type + """ + # Handle size codes for backward compatibility + size_code_map = {"B": "BYTE", "W": "WORD", "D": "DWORD", "L": "LWORD", "X": "BOOL"} + if type_name.upper() in size_code_map: + type_name = size_code_map[type_name.upper()] + + info = get_type_info(type_name) + if info is None: + raise ValueError(f"Unknown IEC type: {type_name}") + + if info.register_count == 0: + raise ValueError(f"Type {type_name} does not use registers") + + # Convert to unsigned for bit manipulation + if info.signed and value < 0: + if info.size_bytes == 4: + value = ctypes.c_uint32(value).value + elif info.size_bytes == 8: + value = ctypes.c_uint64(value).value + elif info.size_bytes == 2: + value = ctypes.c_uint16(value).value + elif info.size_bytes == 1: + value = ctypes.c_uint8(value).value + + if info.register_count == 1: + # 8-bit or 16-bit + if info.size_bytes == 1: + return [value & 0xFF] + else: + return [value & 0xFFFF] + + elif info.register_count == 2: + # 32-bit + if big_endian: + return [(value >> 16) & 0xFFFF, value & 0xFFFF] + else: + return [value & 0xFFFF, (value >> 16) & 0xFFFF] + + elif info.register_count == 4: + # 64-bit + if big_endian: + return [ + (value >> 48) & 0xFFFF, + (value >> 32) & 0xFFFF, + (value >> 16) & 0xFFFF, + value & 0xFFFF, + ] + else: + return [ + value & 0xFFFF, + (value >> 16) & 0xFFFF, + (value >> 32) & 0xFFFF, + (value >> 48) & 0xFFFF, + ] + + else: + raise ValueError(f"Unsupported register count: {info.register_count}") + + @staticmethod + def get_register_count(type_name: str) -> int: + """ + Get the number of 16-bit Modbus registers needed for a type. + + Args: + type_name: IEC type name or size code + + Returns: + Number of registers (0 for BOOL, 1 for 8/16-bit, 2 for 32-bit, 4 for 64-bit) + """ + # Handle size codes + size_code_map = {"X": 0, "B": 1, "W": 1, "D": 2, "L": 4} + if type_name.upper() in size_code_map: + return size_code_map[type_name.upper()] + + info = get_type_info(type_name) + if info: + return info.register_count + return 1 # Default fallback + + # ------------------------------------------------------------------------- + # Time Type Conversions (IEC_TIMESPEC) + # ------------------------------------------------------------------------- + + @staticmethod + def timespec_to_milliseconds(tv_sec: int, tv_nsec: int) -> int: + """ + Convert IEC_TIMESPEC (tv_sec, tv_nsec) to milliseconds. + + Args: + tv_sec: Seconds component + tv_nsec: Nanoseconds component + + Returns: + Total time in milliseconds + """ + return (tv_sec * 1000) + (tv_nsec // 1_000_000) + + @staticmethod + def milliseconds_to_timespec(ms: int) -> Tuple[int, int]: + """ + Convert milliseconds to IEC_TIMESPEC format. + + Args: + ms: Time in milliseconds + + Returns: + Tuple of (tv_sec, tv_nsec) + """ + tv_sec = ms // 1000 + tv_nsec = (ms % 1000) * 1_000_000 + return (tv_sec, tv_nsec) + + @staticmethod + def timespec_to_datetime(tv_sec: int, tv_nsec: int, time_type: str) -> datetime: + """ + Convert IEC_TIMESPEC to Python datetime. + + Args: + tv_sec: Seconds component + tv_nsec: Nanoseconds component + time_type: One of "TIME", "DATE", "TOD", "DT" + + Returns: + datetime object (UTC) + """ + canonical = get_canonical_name(time_type) + + if canonical == "TIME": + # TIME is a duration, not a point in time + # Return as datetime offset from epoch + try: + return datetime.fromtimestamp(tv_sec, tz=timezone.utc).replace( + microsecond=tv_nsec // 1000 + ) + except (OSError, OverflowError, ValueError): + return datetime(1970, 1, 1, tzinfo=timezone.utc) + + elif canonical == "TOD": + # TOD: seconds since midnight + hours = tv_sec // 3600 + minutes = (tv_sec % 3600) // 60 + seconds = tv_sec % 60 + microseconds = tv_nsec // 1000 + + today = datetime.now(timezone.utc).date() + try: + return datetime( + today.year, + today.month, + today.day, + hours % 24, # Clamp to valid range + minutes % 60, + seconds % 60, + microseconds % 1_000_000, + tzinfo=timezone.utc, + ) + except (ValueError, OverflowError): + return datetime(today.year, today.month, today.day, tzinfo=timezone.utc) + + elif canonical == "DATE": + # DATE: seconds since epoch, time portion ignored + try: + dt = datetime.fromtimestamp(tv_sec, tz=timezone.utc) + return dt.replace(hour=0, minute=0, second=0, microsecond=0) + except (OSError, OverflowError, ValueError): + return datetime(1970, 1, 1, tzinfo=timezone.utc) + + elif canonical == "DT": + # DT: full date and time from epoch + try: + dt = datetime.fromtimestamp(tv_sec, tz=timezone.utc) + return dt.replace(microsecond=tv_nsec // 1000) + except (OSError, OverflowError, ValueError): + return datetime(1970, 1, 1, tzinfo=timezone.utc) + + else: + # Unknown time type, treat as epoch timestamp + try: + return datetime.fromtimestamp(tv_sec, tz=timezone.utc) + except (OSError, OverflowError, ValueError): + return datetime(1970, 1, 1, tzinfo=timezone.utc) + + @staticmethod + def datetime_to_timespec(dt: datetime, time_type: str) -> Tuple[int, int]: + """ + Convert Python datetime to IEC_TIMESPEC. + + Args: + dt: datetime object + time_type: One of "TIME", "DATE", "TOD", "DT" + + Returns: + Tuple of (tv_sec, tv_nsec) + """ + canonical = get_canonical_name(time_type) + + if canonical == "TOD": + # TOD: extract time portion only (seconds since midnight) + tv_sec = dt.hour * 3600 + dt.minute * 60 + dt.second + tv_nsec = dt.microsecond * 1000 + return (tv_sec, tv_nsec) + + elif canonical == "DATE": + # DATE: midnight of the date + dt_midnight = dt.replace(hour=0, minute=0, second=0, microsecond=0) + tv_sec = int(dt_midnight.timestamp()) + return (tv_sec, 0) + + else: + # TIME, DT: full timestamp + tv_sec = int(dt.timestamp()) + tv_nsec = dt.microsecond * 1000 + return (tv_sec, tv_nsec) + + # ------------------------------------------------------------------------- + # Default Values + # ------------------------------------------------------------------------- + + @staticmethod + def get_default_value(type_name: str) -> Any: + """ + Get the default/safe value for an IEC type. + + Args: + type_name: IEC type name + + Returns: + Appropriate default value for the type + """ + canonical = get_canonical_name(type_name) + if canonical is None: + return 0 + + info = get_type_info(canonical) + if info is None: + return 0 + + if canonical == "BOOL": + return False + elif info.is_float: + return 0.0 + elif info.is_string: + return "" + elif info.is_time: + return (0, 0) # IEC_TIMESPEC + else: + return 0 + + @staticmethod + def get_default_value_for_protocol(type_name: str, protocol: str) -> Any: + """ + Get the default value formatted for a specific protocol. + + Args: + type_name: IEC type name + protocol: Protocol name ("opcua", "modbus", "s7comm") + + Returns: + Protocol-appropriate default value + """ + canonical = get_canonical_name(type_name) + if canonical is None: + return 0 + + info = get_type_info(canonical) + if info is None: + return 0 + + if protocol.lower() == "opcua": + if canonical == "BOOL": + return False + elif info.is_float: + return 0.0 + elif info.is_string: + return "" + elif canonical == "TIME": + return 0 # Milliseconds for OPC-UA + elif canonical in ("DATE", "TOD", "DT"): + return datetime(1970, 1, 1, tzinfo=timezone.utc) + else: + return 0 + + elif protocol.lower() == "modbus": + # Modbus always uses integers/register values + if canonical == "BOOL": + return 0 + else: + return 0 + + else: + return IECTypeConverter.get_default_value(type_name) + + # ------------------------------------------------------------------------- + # Utility Methods + # ------------------------------------------------------------------------- + + @staticmethod + def is_type_signed(type_name: str) -> bool: + """ + Check if an IEC type is signed. + + Args: + type_name: IEC type name + + Returns: + True if signed, False otherwise + """ + info = get_type_info(type_name) + return info.signed if info else False + + @staticmethod + def get_size_bytes(type_name: str) -> int: + """ + Get the size in bytes for an IEC type. + + Args: + type_name: IEC type name + + Returns: + Size in bytes, or 0 if type not found + """ + info = get_type_info(type_name) + return info.size_bytes if info else 0 + + @staticmethod + def swap_endianness_16(value: int) -> int: + """Swap bytes in a 16-bit value.""" + return ((value & 0xFF) << 8) | ((value >> 8) & 0xFF) + + @staticmethod + def swap_endianness_32(value: int) -> int: + """Swap bytes in a 32-bit value.""" + return ( + ((value & 0x000000FF) << 24) + | ((value & 0x0000FF00) << 8) + | ((value & 0x00FF0000) >> 8) + | ((value & 0xFF000000) >> 24) + ) + + @staticmethod + def swap_endianness_64(value: int) -> int: + """Swap bytes in a 64-bit value.""" + return ( + ((value & 0x00000000000000FF) << 56) + | ((value & 0x000000000000FF00) << 40) + | ((value & 0x0000000000FF0000) << 24) + | ((value & 0x00000000FF000000) << 8) + | ((value & 0x000000FF00000000) >> 8) + | ((value & 0x0000FF0000000000) >> 24) + | ((value & 0x00FF000000000000) >> 40) + | ((value & 0xFF00000000000000) >> 56) + ) + + @staticmethod + def swap_endianness(value: int, type_name: str) -> int: + """ + Swap endianness of a value based on its type size. + + Args: + value: Value to swap + type_name: IEC type name + + Returns: + Value with swapped endianness + """ + info = get_type_info(type_name) + if info is None: + return value + + if info.size_bytes == 2: + return IECTypeConverter.swap_endianness_16(value) + elif info.size_bytes == 4: + return IECTypeConverter.swap_endianness_32(value) + elif info.size_bytes == 8: + return IECTypeConverter.swap_endianness_64(value) + else: + return value diff --git a/core/src/drivers/plugins/python/shared/iec_type_registry.py b/core/src/drivers/plugins/python/shared/iec_type_registry.py new file mode 100644 index 00000000..6610b994 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/iec_type_registry.py @@ -0,0 +1,601 @@ +""" +IEC 61131-3 Type Registry + +This module provides a centralized registry of all IEC 61131-3 data types with their +metadata. It serves as the single source of truth for type information used by all +plugins (OPC-UA, Modbus, S7Comm, EtherCAT, etc.). + +The registry eliminates the need for each plugin to maintain its own type definitions, +ensuring consistency across the entire plugin ecosystem. + +Supported types (22 total): +- Boolean: BOOL +- Integer 8-bit: SINT, USINT, BYTE +- Integer 16-bit: INT, UINT, WORD +- Integer 32-bit: DINT, UDINT, DWORD +- Integer 64-bit: LINT, ULINT, LWORD +- Floating point: REAL, LREAL +- Time types: TIME, DATE, TOD, DT +- String: STRING +""" + +import ctypes +from dataclasses import dataclass +from typing import Dict, FrozenSet, Optional, Tuple, Type + + +@dataclass(frozen=True) +class IECTypeInfo: + """ + Immutable metadata for an IEC 61131-3 data type. + + Attributes: + name: Canonical type name (e.g., "SINT", "INT", "DINT") + size_bits: Size in bits (1, 8, 16, 32, 64) + size_bytes: Size in bytes (1, 2, 4, 8) + signed: True for signed types (SINT, INT, DINT, LINT) + min_value: Minimum valid value + max_value: Maximum valid value + ctype_class: Corresponding ctypes class for memory operations + is_float: True for REAL and LREAL + is_time: True for TIME, DATE, TOD, DT + is_string: True for STRING + register_count: Number of 16-bit Modbus registers needed (0 for BOOL) + iec_size_code: IEC size code ('X'=bit, 'B'=byte, 'W'=word, 'D'=dword, 'L'=lword) + aliases: Tuple of alternative names for this type + """ + + name: str + size_bits: int + size_bytes: int + signed: bool + min_value: int + max_value: int + ctype_class: Type + is_float: bool + is_time: bool + is_string: bool + register_count: int + iec_size_code: str + aliases: Tuple[str, ...] + + +# Type bounds constants for clarity +_SINT_MIN, _SINT_MAX = -128, 127 +_USINT_MIN, _USINT_MAX = 0, 255 +_INT_MIN, _INT_MAX = -32768, 32767 +_UINT_MIN, _UINT_MAX = 0, 65535 +_DINT_MIN, _DINT_MAX = -2147483648, 2147483647 +_UDINT_MIN, _UDINT_MAX = 0, 4294967295 +_LINT_MIN, _LINT_MAX = -9223372036854775808, 9223372036854775807 +_ULINT_MIN, _ULINT_MAX = 0, 18446744073709551615 + + +# Complete IEC 61131-3 Type Registry +_IEC_TYPE_REGISTRY: Dict[str, IECTypeInfo] = { + # Boolean type + "BOOL": IECTypeInfo( + name="BOOL", + size_bits=1, + size_bytes=1, + signed=False, + min_value=0, + max_value=1, + ctype_class=ctypes.c_uint8, + is_float=False, + is_time=False, + is_string=False, + register_count=0, # Handled via coils, not registers + iec_size_code="X", + aliases=(), + ), + # 8-bit signed integer + "SINT": IECTypeInfo( + name="SINT", + size_bits=8, + size_bytes=1, + signed=True, + min_value=_SINT_MIN, + max_value=_SINT_MAX, + ctype_class=ctypes.c_int8, + is_float=False, + is_time=False, + is_string=False, + register_count=1, + iec_size_code="B", + aliases=("INT8",), + ), + # 8-bit unsigned integer + "USINT": IECTypeInfo( + name="USINT", + size_bits=8, + size_bytes=1, + signed=False, + min_value=_USINT_MIN, + max_value=_USINT_MAX, + ctype_class=ctypes.c_uint8, + is_float=False, + is_time=False, + is_string=False, + register_count=1, + iec_size_code="B", + aliases=("UINT8",), + ), + # 8-bit unsigned (bit string semantics) + "BYTE": IECTypeInfo( + name="BYTE", + size_bits=8, + size_bytes=1, + signed=False, + min_value=_USINT_MIN, + max_value=_USINT_MAX, + ctype_class=ctypes.c_uint8, + is_float=False, + is_time=False, + is_string=False, + register_count=1, + iec_size_code="B", + aliases=(), + ), + # 16-bit signed integer + "INT": IECTypeInfo( + name="INT", + size_bits=16, + size_bytes=2, + signed=True, + min_value=_INT_MIN, + max_value=_INT_MAX, + ctype_class=ctypes.c_int16, + is_float=False, + is_time=False, + is_string=False, + register_count=1, + iec_size_code="W", + aliases=("INT16",), + ), + # 16-bit unsigned integer + "UINT": IECTypeInfo( + name="UINT", + size_bits=16, + size_bytes=2, + signed=False, + min_value=_UINT_MIN, + max_value=_UINT_MAX, + ctype_class=ctypes.c_uint16, + is_float=False, + is_time=False, + is_string=False, + register_count=1, + iec_size_code="W", + aliases=("UINT16",), + ), + # 16-bit unsigned (bit string semantics) + "WORD": IECTypeInfo( + name="WORD", + size_bits=16, + size_bytes=2, + signed=False, + min_value=_UINT_MIN, + max_value=_UINT_MAX, + ctype_class=ctypes.c_uint16, + is_float=False, + is_time=False, + is_string=False, + register_count=1, + iec_size_code="W", + aliases=(), + ), + # 32-bit signed integer + "DINT": IECTypeInfo( + name="DINT", + size_bits=32, + size_bytes=4, + signed=True, + min_value=_DINT_MIN, + max_value=_DINT_MAX, + ctype_class=ctypes.c_int32, + is_float=False, + is_time=False, + is_string=False, + register_count=2, + iec_size_code="D", + aliases=("INT32",), + ), + # 32-bit unsigned integer + "UDINT": IECTypeInfo( + name="UDINT", + size_bits=32, + size_bytes=4, + signed=False, + min_value=_UDINT_MIN, + max_value=_UDINT_MAX, + ctype_class=ctypes.c_uint32, + is_float=False, + is_time=False, + is_string=False, + register_count=2, + iec_size_code="D", + aliases=("UINT32",), + ), + # 32-bit unsigned (bit string semantics) + "DWORD": IECTypeInfo( + name="DWORD", + size_bits=32, + size_bytes=4, + signed=False, + min_value=_UDINT_MIN, + max_value=_UDINT_MAX, + ctype_class=ctypes.c_uint32, + is_float=False, + is_time=False, + is_string=False, + register_count=2, + iec_size_code="D", + aliases=(), + ), + # 64-bit signed integer + "LINT": IECTypeInfo( + name="LINT", + size_bits=64, + size_bytes=8, + signed=True, + min_value=_LINT_MIN, + max_value=_LINT_MAX, + ctype_class=ctypes.c_int64, + is_float=False, + is_time=False, + is_string=False, + register_count=4, + iec_size_code="L", + aliases=("INT64",), + ), + # 64-bit unsigned integer + "ULINT": IECTypeInfo( + name="ULINT", + size_bits=64, + size_bytes=8, + signed=False, + min_value=_ULINT_MIN, + max_value=_ULINT_MAX, + ctype_class=ctypes.c_uint64, + is_float=False, + is_time=False, + is_string=False, + register_count=4, + iec_size_code="L", + aliases=("UINT64",), + ), + # 64-bit unsigned (bit string semantics) + "LWORD": IECTypeInfo( + name="LWORD", + size_bits=64, + size_bytes=8, + signed=False, + min_value=_ULINT_MIN, + max_value=_ULINT_MAX, + ctype_class=ctypes.c_uint64, + is_float=False, + is_time=False, + is_string=False, + register_count=4, + iec_size_code="L", + aliases=(), + ), + # 32-bit floating point + "REAL": IECTypeInfo( + name="REAL", + size_bits=32, + size_bytes=4, + signed=True, # Floats are inherently signed + min_value=_DINT_MIN, # Integer representation bounds + max_value=_DINT_MAX, + ctype_class=ctypes.c_float, + is_float=True, + is_time=False, + is_string=False, + register_count=2, + iec_size_code="D", + aliases=("FLOAT",), + ), + # 64-bit floating point + "LREAL": IECTypeInfo( + name="LREAL", + size_bits=64, + size_bytes=8, + signed=True, # Floats are inherently signed + min_value=_LINT_MIN, # Integer representation bounds + max_value=_LINT_MAX, + ctype_class=ctypes.c_double, + is_float=True, + is_time=False, + is_string=False, + register_count=4, + iec_size_code="L", + aliases=("DOUBLE",), + ), + # Time duration (stored as IEC_TIMESPEC: tv_sec + tv_nsec) + "TIME": IECTypeInfo( + name="TIME", + size_bits=64, + size_bytes=8, + signed=True, + min_value=_LINT_MIN, + max_value=_LINT_MAX, + ctype_class=ctypes.c_int64, + is_float=False, + is_time=True, + is_string=False, + register_count=4, + iec_size_code="L", + aliases=(), + ), + # Date (stored as IEC_TIMESPEC: seconds since epoch) + "DATE": IECTypeInfo( + name="DATE", + size_bits=64, + size_bytes=8, + signed=True, + min_value=_LINT_MIN, + max_value=_LINT_MAX, + ctype_class=ctypes.c_int64, + is_float=False, + is_time=True, + is_string=False, + register_count=4, + iec_size_code="L", + aliases=("D",), + ), + # Time of day (stored as IEC_TIMESPEC: seconds since midnight) + "TOD": IECTypeInfo( + name="TOD", + size_bits=64, + size_bytes=8, + signed=True, + min_value=_LINT_MIN, + max_value=_LINT_MAX, + ctype_class=ctypes.c_int64, + is_float=False, + is_time=True, + is_string=False, + register_count=4, + iec_size_code="L", + aliases=("TIME_OF_DAY",), + ), + # Date and time (stored as IEC_TIMESPEC: seconds since epoch) + "DT": IECTypeInfo( + name="DT", + size_bits=64, + size_bytes=8, + signed=True, + min_value=_LINT_MIN, + max_value=_LINT_MAX, + ctype_class=ctypes.c_int64, + is_float=False, + is_time=True, + is_string=False, + register_count=4, + iec_size_code="L", + aliases=("DATE_AND_TIME",), + ), + # Variable-length string (IEC_STRING: 1 byte len + 126 bytes body) + "STRING": IECTypeInfo( + name="STRING", + size_bits=127 * 8, + size_bytes=127, + signed=False, + min_value=0, + max_value=126, # Max string length + ctype_class=ctypes.c_char, + is_float=False, + is_time=False, + is_string=True, + register_count=64, # 127 bytes = 64 registers (rounded up) + iec_size_code="S", + aliases=(), + ), +} + +# Build alias lookup table +_ALIAS_TO_CANONICAL: Dict[str, str] = {} +for _type_name, _type_info in _IEC_TYPE_REGISTRY.items(): + for alias in _type_info.aliases: + _ALIAS_TO_CANONICAL[alias.upper()] = _type_name + +# Frozen sets for type categories +INTEGER_TYPES: FrozenSet[str] = frozenset( + [ + "SINT", + "USINT", + "BYTE", + "INT", + "UINT", + "WORD", + "DINT", + "UDINT", + "DWORD", + "LINT", + "ULINT", + "LWORD", + ] +) + +SIGNED_INTEGER_TYPES: FrozenSet[str] = frozenset(["SINT", "INT", "DINT", "LINT"]) + +UNSIGNED_INTEGER_TYPES: FrozenSet[str] = frozenset( + ["USINT", "BYTE", "UINT", "WORD", "UDINT", "DWORD", "ULINT", "LWORD"] +) + +FLOAT_TYPES: FrozenSet[str] = frozenset(["REAL", "LREAL"]) + +TIME_TYPES: FrozenSet[str] = frozenset(["TIME", "DATE", "TOD", "DT"]) + +BIT_STRING_TYPES: FrozenSet[str] = frozenset(["BYTE", "WORD", "DWORD", "LWORD"]) + +ALL_TYPES: FrozenSet[str] = frozenset(_IEC_TYPE_REGISTRY.keys()) + + +def get_type_info(type_name: str) -> Optional[IECTypeInfo]: + """ + Get type information for an IEC 61131-3 type. + + Args: + type_name: Type name (case-insensitive), can be canonical or alias + + Returns: + IECTypeInfo if found, None otherwise + """ + upper_name = type_name.upper() + + # Try direct lookup first + if upper_name in _IEC_TYPE_REGISTRY: + return _IEC_TYPE_REGISTRY[upper_name] + + # Try alias lookup + canonical = _ALIAS_TO_CANONICAL.get(upper_name) + if canonical: + return _IEC_TYPE_REGISTRY[canonical] + + return None + + +def get_canonical_name(type_name: str) -> Optional[str]: + """ + Get the canonical name for a type (resolves aliases). + + Args: + type_name: Type name (case-insensitive), can be canonical or alias + + Returns: + Canonical type name if found, None otherwise + """ + upper_name = type_name.upper() + + if upper_name in _IEC_TYPE_REGISTRY: + return upper_name + + return _ALIAS_TO_CANONICAL.get(upper_name) + + +def is_valid_type(type_name: str) -> bool: + """ + Check if a type name is valid (canonical or alias). + + Args: + type_name: Type name to check (case-insensitive) + + Returns: + True if valid, False otherwise + """ + upper_name = type_name.upper() + return upper_name in _IEC_TYPE_REGISTRY or upper_name in _ALIAS_TO_CANONICAL + + +def get_type_bounds(type_name: str) -> Optional[Tuple[int, int]]: + """ + Get the value bounds for a type. + + Args: + type_name: Type name (case-insensitive) + + Returns: + Tuple of (min_value, max_value) if found, None otherwise + """ + info = get_type_info(type_name) + if info: + return (info.min_value, info.max_value) + return None + + +def get_register_count(type_name: str) -> int: + """ + Get the number of 16-bit Modbus registers needed for a type. + + Args: + type_name: Type name (case-insensitive) + + Returns: + Number of registers, or 0 if type not found or is BOOL + """ + info = get_type_info(type_name) + if info: + return info.register_count + return 0 + + +def get_size_code(type_name: str) -> Optional[str]: + """ + Get the IEC size code for a type. + + Args: + type_name: Type name (case-insensitive) + + Returns: + Size code ('X', 'B', 'W', 'D', 'L', 'S') or None if not found + """ + info = get_type_info(type_name) + if info: + return info.iec_size_code + return None + + +def get_ctype_class(type_name: str) -> Optional[Type]: + """ + Get the ctypes class for a type. + + Args: + type_name: Type name (case-insensitive) + + Returns: + ctypes class or None if not found + """ + info = get_type_info(type_name) + if info: + return info.ctype_class + return None + + +def get_all_types() -> Dict[str, IECTypeInfo]: + """ + Get a copy of the complete type registry. + + Returns: + Dictionary mapping type names to IECTypeInfo objects + """ + return _IEC_TYPE_REGISTRY.copy() + + +def get_types_by_size(size_bytes: int) -> FrozenSet[str]: + """ + Get all types with a specific size in bytes. + + Args: + size_bytes: Size in bytes (1, 2, 4, 8, 127) + + Returns: + Frozen set of type names with that size + """ + return frozenset( + name for name, info in _IEC_TYPE_REGISTRY.items() if info.size_bytes == size_bytes + ) + + +def size_code_to_type_name(size_code: str) -> Optional[str]: + """ + Get a representative type name for an IEC size code. + + This is useful for Modbus operations that use size codes. + + Args: + size_code: IEC size code ('X', 'B', 'W', 'D', 'L') + + Returns: + Representative type name (unsigned variant) or None if invalid + """ + mapping = { + "X": "BOOL", + "B": "BYTE", + "W": "WORD", + "D": "DWORD", + "L": "LWORD", + } + return mapping.get(size_code.upper()) diff --git a/tests/unit/test_iec_type_converter.py b/tests/unit/test_iec_type_converter.py new file mode 100644 index 00000000..5589d249 --- /dev/null +++ b/tests/unit/test_iec_type_converter.py @@ -0,0 +1,466 @@ +""" +Unit tests for IEC 61131-3 Type Converter + +Tests the iec_type_converter module which provides centralized type conversion +functions for all IEC 61131-3 data types. +""" + +import sys +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +# Add shared module to path +shared_path = Path(__file__).parent.parent.parent / "core/src/drivers/plugins/python/shared" +sys.path.insert(0, str(shared_path)) + +from iec_type_converter import IECTypeConverter + + +class TestClampToType: + """Tests for value clamping functionality.""" + + @pytest.mark.parametrize( + "value,type_name,expected", + [ + # SINT bounds (-128 to 127) + (0, "SINT", 0), + (127, "SINT", 127), + (128, "SINT", 127), # Clamped to max + (-128, "SINT", -128), + (-129, "SINT", -128), # Clamped to min + (1000, "SINT", 127), + # USINT/BYTE bounds (0 to 255) + (0, "USINT", 0), + (255, "USINT", 255), + (256, "USINT", 255), # Clamped + (-1, "USINT", 0), # Clamped + (0, "BYTE", 0), + (255, "BYTE", 255), + # INT bounds (-32768 to 32767) + (0, "INT", 0), + (32767, "INT", 32767), + (32768, "INT", 32767), # Clamped + (-32768, "INT", -32768), + (-32769, "INT", -32768), # Clamped + # UINT/WORD bounds (0 to 65535) + (0, "UINT", 0), + (65535, "UINT", 65535), + (65536, "UINT", 65535), # Clamped + (-1, "WORD", 0), # Clamped + # DINT bounds (-2147483648 to 2147483647) + (0, "DINT", 0), + (2147483647, "DINT", 2147483647), + (-2147483648, "DINT", -2147483648), + # UDINT/DWORD bounds (0 to 4294967295) + (0, "UDINT", 0), + (4294967295, "UDINT", 4294967295), + (4294967296, "DWORD", 4294967295), # Clamped + # LINT bounds + (0, "LINT", 0), + (9223372036854775807, "LINT", 9223372036854775807), + (-9223372036854775808, "LINT", -9223372036854775808), + # ULINT/LWORD bounds + (0, "ULINT", 0), + (18446744073709551615, "ULINT", 18446744073709551615), + (-1, "LWORD", 0), # Clamped + ], + ) + def test_clamp_integer_types(self, value, type_name, expected): + """Test clamping for all integer types.""" + result = IECTypeConverter.clamp_to_type(value, type_name) + assert result == expected + + def test_clamp_signed_conversion(self): + """Verify signed values are correctly represented.""" + # -1 as SINT should stay -1, not become 255 + result = IECTypeConverter.clamp_to_type(-1, "SINT") + assert result == -1 + + # -1 as INT should stay -1 + result = IECTypeConverter.clamp_to_type(-1, "INT") + assert result == -1 + + def test_clamp_case_insensitive(self): + """Verify type name is case-insensitive.""" + assert IECTypeConverter.clamp_to_type(100, "sint") == 100 + assert IECTypeConverter.clamp_to_type(100, "SINT") == 100 + assert IECTypeConverter.clamp_to_type(100, "SiNt") == 100 + + def test_clamp_alias_support(self): + """Verify type aliases work correctly.""" + # INT32 is alias for DINT + assert IECTypeConverter.clamp_to_type(2147483648, "INT32") == 2147483647 + + def test_clamp_unknown_type_raises(self): + """Verify ValueError for unknown types.""" + with pytest.raises(ValueError, match="Unknown IEC type"): + IECTypeConverter.clamp_to_type(100, "UNKNOWN_TYPE") + + def test_clamp_float_returns_float(self): + """Verify REAL/LREAL return float values.""" + result = IECTypeConverter.clamp_to_type(3.14, "REAL") + assert isinstance(result, float) + assert result == 3.14 + + result = IECTypeConverter.clamp_to_type(3.14159265358979, "LREAL") + assert isinstance(result, float) + + def test_clamp_string_returns_string(self): + """Verify STRING type returns string.""" + result = IECTypeConverter.clamp_to_type(123, "STRING") + assert result == "123" + + +class TestCoerceToType: + """Tests for value coercion functionality.""" + + def test_coerce_bool_from_various_types(self): + """Test BOOL coercion from various input types.""" + assert IECTypeConverter.coerce_to_type(True, "BOOL") == 1 + assert IECTypeConverter.coerce_to_type(False, "BOOL") == 0 + assert IECTypeConverter.coerce_to_type(1, "BOOL") == 1 + assert IECTypeConverter.coerce_to_type(0, "BOOL") == 0 + assert IECTypeConverter.coerce_to_type(100, "BOOL") == 1 + assert IECTypeConverter.coerce_to_type("true", "BOOL") == 1 + assert IECTypeConverter.coerce_to_type("false", "BOOL") == 0 + assert IECTypeConverter.coerce_to_type("yes", "BOOL") == 1 + assert IECTypeConverter.coerce_to_type("no", "BOOL") == 0 + + +class TestFloatConversion: + """Tests for float <-> integer bit representation conversion.""" + + def test_real_to_int_repr(self): + """Test REAL to integer representation.""" + # 3.14 as 32-bit float + int_repr = IECTypeConverter.float_to_int_repr(3.14, "REAL") + # Verify it can be converted back + back = IECTypeConverter.int_repr_to_float(int_repr, "REAL") + assert abs(back - 3.14) < 0.0001 + + def test_lreal_to_int_repr(self): + """Test LREAL to integer representation.""" + # Pi as 64-bit double + int_repr = IECTypeConverter.float_to_int_repr(3.141592653589793, "LREAL") + back = IECTypeConverter.int_repr_to_float(int_repr, "LREAL") + assert abs(back - 3.141592653589793) < 1e-10 + + def test_real_zero(self): + """Test zero value for REAL.""" + int_repr = IECTypeConverter.float_to_int_repr(0.0, "REAL") + back = IECTypeConverter.int_repr_to_float(int_repr, "REAL") + assert back == 0.0 + + def test_negative_real(self): + """Test negative float value.""" + int_repr = IECTypeConverter.float_to_int_repr(-123.456, "REAL") + back = IECTypeConverter.int_repr_to_float(int_repr, "REAL") + assert abs(back - (-123.456)) < 0.001 + + def test_float_conversion_non_float_type_raises(self): + """Verify error for non-float types.""" + with pytest.raises(ValueError, match="not a float type"): + IECTypeConverter.float_to_int_repr(3.14, "INT") + + with pytest.raises(ValueError, match="not a float type"): + IECTypeConverter.int_repr_to_float(12345, "INT") + + +class TestRegisterConversion: + """Tests for Modbus register conversion.""" + + def test_single_register_byte(self): + """Test 8-bit value from single register.""" + # BYTE: lower 8 bits of register + result = IECTypeConverter.registers_to_value([0x12AB], "BYTE") + assert result == 0xAB + + def test_single_register_word(self): + """Test 16-bit value from single register.""" + result = IECTypeConverter.registers_to_value([0x1234], "WORD") + assert result == 0x1234 + + def test_two_registers_dword_little_endian(self): + """Test 32-bit value from two registers (little-endian).""" + # Little-endian: [low, high] + result = IECTypeConverter.registers_to_value([0x5678, 0x1234], "DWORD", big_endian=False) + assert result == 0x12345678 + + def test_two_registers_dword_big_endian(self): + """Test 32-bit value from two registers (big-endian).""" + # Big-endian: [high, low] + result = IECTypeConverter.registers_to_value([0x1234, 0x5678], "DWORD", big_endian=True) + assert result == 0x12345678 + + def test_four_registers_lword_little_endian(self): + """Test 64-bit value from four registers (little-endian).""" + result = IECTypeConverter.registers_to_value( + [0x0123, 0x4567, 0x89AB, 0xCDEF], + "LWORD", + big_endian=False, + ) + # Little-endian: reg[0] is lowest + expected = 0xCDEF89AB45670123 + assert result == expected + + def test_four_registers_lword_big_endian(self): + """Test 64-bit value from four registers (big-endian).""" + result = IECTypeConverter.registers_to_value( + [0x0123, 0x4567, 0x89AB, 0xCDEF], "LWORD", big_endian=True + ) + # Big-endian: reg[0] is highest + expected = 0x0123456789ABCDEF + assert result == expected + + def test_signed_dint_conversion(self): + """Test signed 32-bit value handling.""" + # -1 as unsigned 32-bit is 0xFFFFFFFF + result = IECTypeConverter.registers_to_value([0xFFFF, 0xFFFF], "DINT", big_endian=False) + assert result == -1 + + def test_signed_lint_conversion(self): + """Test signed 64-bit value handling.""" + # -1 as unsigned 64-bit + result = IECTypeConverter.registers_to_value( + [0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF], "LINT", big_endian=False + ) + assert result == -1 + + def test_size_code_support(self): + """Test backward compatibility with size codes.""" + # 'W' is WORD + result = IECTypeConverter.registers_to_value([0x1234], "W") + assert result == 0x1234 + + # 'D' is DWORD + result = IECTypeConverter.registers_to_value([0x5678, 0x1234], "D", big_endian=False) + assert result == 0x12345678 + + def test_insufficient_registers_raises(self): + """Verify error when not enough registers provided.""" + with pytest.raises(ValueError, match="Need at least 2 registers"): + IECTypeConverter.registers_to_value([0x1234], "DWORD") + + with pytest.raises(ValueError, match="Need at least 4 registers"): + IECTypeConverter.registers_to_value([0x1234, 0x5678], "LWORD") + + +class TestValueToRegisters: + """Tests for value to Modbus register conversion.""" + + def test_byte_to_register(self): + """Test 8-bit value to single register.""" + result = IECTypeConverter.value_to_registers(0xAB, "BYTE") + assert result == [0xAB] + + def test_word_to_register(self): + """Test 16-bit value to single register.""" + result = IECTypeConverter.value_to_registers(0x1234, "WORD") + assert result == [0x1234] + + def test_dword_to_registers_little_endian(self): + """Test 32-bit value to two registers (little-endian).""" + result = IECTypeConverter.value_to_registers(0x12345678, "DWORD", big_endian=False) + assert result == [0x5678, 0x1234] + + def test_dword_to_registers_big_endian(self): + """Test 32-bit value to two registers (big-endian).""" + result = IECTypeConverter.value_to_registers(0x12345678, "DWORD", big_endian=True) + assert result == [0x1234, 0x5678] + + def test_lword_to_registers_little_endian(self): + """Test 64-bit value to four registers (little-endian).""" + result = IECTypeConverter.value_to_registers(0x0123456789ABCDEF, "LWORD", big_endian=False) + assert result == [0xCDEF, 0x89AB, 0x4567, 0x0123] + + def test_lword_to_registers_big_endian(self): + """Test 64-bit value to four registers (big-endian).""" + result = IECTypeConverter.value_to_registers(0x0123456789ABCDEF, "LWORD", big_endian=True) + assert result == [0x0123, 0x4567, 0x89AB, 0xCDEF] + + def test_signed_negative_value(self): + """Test negative signed value conversion.""" + # -1 as DINT + result = IECTypeConverter.value_to_registers(-1, "DINT", big_endian=False) + assert result == [0xFFFF, 0xFFFF] + + def test_roundtrip_conversion(self): + """Verify value -> registers -> value roundtrip.""" + original = 0x12345678 + registers = IECTypeConverter.value_to_registers(original, "DWORD", big_endian=True) + recovered = IECTypeConverter.registers_to_value(registers, "DWORD", big_endian=True) + assert recovered == original + + def test_roundtrip_signed(self): + """Verify signed value roundtrip.""" + original = -12345 + registers = IECTypeConverter.value_to_registers(original, "DINT", big_endian=False) + recovered = IECTypeConverter.registers_to_value(registers, "DINT", big_endian=False) + assert recovered == original + + +class TestTimeConversion: + """Tests for TIME type conversions.""" + + def test_timespec_to_milliseconds(self): + """Test IEC_TIMESPEC to milliseconds conversion.""" + # 1 second, 500 million nanoseconds = 1500 ms + result = IECTypeConverter.timespec_to_milliseconds(1, 500_000_000) + assert result == 1500 + + # 0 seconds, 0 nanoseconds = 0 ms + result = IECTypeConverter.timespec_to_milliseconds(0, 0) + assert result == 0 + + # 10 seconds, 0 nanoseconds = 10000 ms + result = IECTypeConverter.timespec_to_milliseconds(10, 0) + assert result == 10000 + + def test_milliseconds_to_timespec(self): + """Test milliseconds to IEC_TIMESPEC conversion.""" + # 1500 ms = 1 second, 500 million nanoseconds + tv_sec, tv_nsec = IECTypeConverter.milliseconds_to_timespec(1500) + assert tv_sec == 1 + assert tv_nsec == 500_000_000 + + # 0 ms + tv_sec, tv_nsec = IECTypeConverter.milliseconds_to_timespec(0) + assert tv_sec == 0 + assert tv_nsec == 0 + + def test_timespec_to_datetime_tod(self): + """Test TOD (time of day) conversion.""" + # 3600 seconds = 1 hour since midnight + dt = IECTypeConverter.timespec_to_datetime(3600, 0, "TOD") + assert dt.hour == 1 + assert dt.minute == 0 + assert dt.second == 0 + + def test_timespec_to_datetime_date(self): + """Test DATE conversion.""" + # Epoch timestamp for 2024-01-01 + timestamp = 1704067200 # 2024-01-01 00:00:00 UTC + dt = IECTypeConverter.timespec_to_datetime(timestamp, 0, "DATE") + assert dt.year == 2024 + assert dt.month == 1 + assert dt.day == 1 + assert dt.hour == 0 # Time should be zeroed + + def test_timespec_to_datetime_dt(self): + """Test DT (date and time) conversion.""" + # Epoch timestamp with microseconds + timestamp = 1704067200 # 2024-01-01 00:00:00 UTC + dt = IECTypeConverter.timespec_to_datetime(timestamp, 123_000_000, "DT") + assert dt.year == 2024 + assert dt.microsecond == 123000 + + def test_datetime_to_timespec_tod(self): + """Test datetime to TOD conversion.""" + dt = datetime(2024, 1, 1, 13, 30, 45, 500000, tzinfo=timezone.utc) + tv_sec, tv_nsec = IECTypeConverter.datetime_to_timespec(dt, "TOD") + # 13:30:45 = 13*3600 + 30*60 + 45 = 48645 seconds since midnight + assert tv_sec == 48645 + assert tv_nsec == 500_000_000 # 500000 microseconds = 500M nanoseconds + + def test_datetime_to_timespec_date(self): + """Test datetime to DATE conversion.""" + dt = datetime(2024, 1, 1, 13, 30, 45, tzinfo=timezone.utc) + tv_sec, tv_nsec = IECTypeConverter.datetime_to_timespec(dt, "DATE") + # Should be midnight of that day + expected_dt = datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + assert tv_sec == int(expected_dt.timestamp()) + assert tv_nsec == 0 + + +class TestDefaultValues: + """Tests for default value generation.""" + + @pytest.mark.parametrize( + "type_name,expected", + [ + ("BOOL", False), + ("SINT", 0), + ("INT", 0), + ("DINT", 0), + ("REAL", 0.0), + ("LREAL", 0.0), + ("STRING", ""), + ("TIME", (0, 0)), + ("DATE", (0, 0)), + ("TOD", (0, 0)), + ("DT", (0, 0)), + ], + ) + def test_default_values(self, type_name, expected): + """Verify correct default values for all types.""" + result = IECTypeConverter.get_default_value(type_name) + assert result == expected + + def test_protocol_default_opcua_time(self): + """Test OPC-UA protocol default for TIME types.""" + # TIME in OPC-UA is milliseconds + result = IECTypeConverter.get_default_value_for_protocol("TIME", "opcua") + assert result == 0 + + # DATE/TOD/DT in OPC-UA is DateTime + result = IECTypeConverter.get_default_value_for_protocol("DATE", "opcua") + assert isinstance(result, datetime) + + +class TestEndianSwap: + """Tests for endianness swapping.""" + + def test_swap_16(self): + """Test 16-bit endianness swap.""" + assert IECTypeConverter.swap_endianness_16(0x1234) == 0x3412 + + def test_swap_32(self): + """Test 32-bit endianness swap.""" + assert IECTypeConverter.swap_endianness_32(0x12345678) == 0x78563412 + + def test_swap_64(self): + """Test 64-bit endianness swap.""" + assert IECTypeConverter.swap_endianness_64(0x0123456789ABCDEF) == 0xEFCDAB8967452301 + + def test_swap_by_type(self): + """Test endianness swap based on type.""" + assert IECTypeConverter.swap_endianness(0x1234, "INT") == 0x3412 + assert IECTypeConverter.swap_endianness(0x12345678, "DINT") == 0x78563412 + + +class TestUtilityMethods: + """Tests for utility methods.""" + + def test_is_type_signed(self): + """Test signed type detection.""" + assert IECTypeConverter.is_type_signed("SINT") is True + assert IECTypeConverter.is_type_signed("INT") is True + assert IECTypeConverter.is_type_signed("DINT") is True + assert IECTypeConverter.is_type_signed("LINT") is True + assert IECTypeConverter.is_type_signed("USINT") is False + assert IECTypeConverter.is_type_signed("UINT") is False + assert IECTypeConverter.is_type_signed("BYTE") is False + assert IECTypeConverter.is_type_signed("WORD") is False + + def test_get_size_bytes(self): + """Test size in bytes retrieval.""" + assert IECTypeConverter.get_size_bytes("SINT") == 1 + assert IECTypeConverter.get_size_bytes("INT") == 2 + assert IECTypeConverter.get_size_bytes("DINT") == 4 + assert IECTypeConverter.get_size_bytes("LINT") == 8 + assert IECTypeConverter.get_size_bytes("UNKNOWN") == 0 + + def test_get_register_count(self): + """Test register count retrieval.""" + assert IECTypeConverter.get_register_count("BOOL") == 0 + assert IECTypeConverter.get_register_count("BYTE") == 1 + assert IECTypeConverter.get_register_count("WORD") == 1 + assert IECTypeConverter.get_register_count("DWORD") == 2 + assert IECTypeConverter.get_register_count("LWORD") == 4 + # Size codes + assert IECTypeConverter.get_register_count("B") == 1 + assert IECTypeConverter.get_register_count("W") == 1 + assert IECTypeConverter.get_register_count("D") == 2 + assert IECTypeConverter.get_register_count("L") == 4 diff --git a/tests/unit/test_iec_type_registry.py b/tests/unit/test_iec_type_registry.py new file mode 100644 index 00000000..fbdb2685 --- /dev/null +++ b/tests/unit/test_iec_type_registry.py @@ -0,0 +1,381 @@ +""" +Unit tests for IEC 61131-3 Type Registry + +Tests the iec_type_registry module which provides centralized type metadata +for all IEC 61131-3 data types. +""" + +import ctypes +import sys +from pathlib import Path + +import pytest + +# Add shared module to path +shared_path = Path(__file__).parent.parent.parent / "core/src/drivers/plugins/python/shared" +sys.path.insert(0, str(shared_path)) + +from iec_type_registry import ( + ALL_TYPES, + BIT_STRING_TYPES, + FLOAT_TYPES, + INTEGER_TYPES, + SIGNED_INTEGER_TYPES, + TIME_TYPES, + UNSIGNED_INTEGER_TYPES, + IECTypeInfo, + get_all_types, + get_canonical_name, + get_ctype_class, + get_register_count, + get_size_code, + get_type_bounds, + get_type_info, + get_types_by_size, + is_valid_type, + size_code_to_type_name, +) + + +class TestTypeRegistry: + """Tests for the type registry core functions.""" + + def test_all_22_types_present(self): + """Verify all 22 IEC 61131-3 types are in the registry.""" + expected_types = { + "BOOL", + "SINT", + "USINT", + "BYTE", + "INT", + "UINT", + "WORD", + "DINT", + "UDINT", + "DWORD", + "LINT", + "ULINT", + "LWORD", + "REAL", + "LREAL", + "TIME", + "DATE", + "TOD", + "DT", + "STRING", + } + # Note: Some types share the same underlying storage (e.g., BYTE/USINT) + # but are distinct types + assert expected_types.issubset(ALL_TYPES) + + def test_get_type_info_returns_correct_type(self): + """Verify get_type_info returns correct IECTypeInfo objects.""" + info = get_type_info("SINT") + assert info is not None + assert isinstance(info, IECTypeInfo) + assert info.name == "SINT" + assert info.size_bits == 8 + assert info.size_bytes == 1 + assert info.signed is True + assert info.min_value == -128 + assert info.max_value == 127 + + def test_get_type_info_case_insensitive(self): + """Verify type lookup is case-insensitive.""" + info1 = get_type_info("sint") + info2 = get_type_info("SINT") + info3 = get_type_info("SiNt") + assert info1 == info2 == info3 + + def test_get_type_info_unknown_type(self): + """Verify None is returned for unknown types.""" + assert get_type_info("UNKNOWN_TYPE") is None + assert get_type_info("") is None + + def test_aliases_resolve_correctly(self): + """Verify type aliases resolve to canonical types.""" + # INT32 should resolve to DINT + assert get_canonical_name("INT32") == "DINT" + # FLOAT should resolve to REAL + assert get_canonical_name("FLOAT") == "REAL" + # DOUBLE should resolve to LREAL + assert get_canonical_name("DOUBLE") == "LREAL" + # TIME_OF_DAY should resolve to TOD + assert get_canonical_name("TIME_OF_DAY") == "TOD" + # DATE_AND_TIME should resolve to DT + assert get_canonical_name("DATE_AND_TIME") == "DT" + + def test_is_valid_type(self): + """Verify is_valid_type correctly identifies valid types.""" + assert is_valid_type("SINT") is True + assert is_valid_type("INT32") is True # Alias + assert is_valid_type("UNKNOWN") is False + assert is_valid_type("") is False + + +class TestTypeBounds: + """Tests for type value bounds.""" + + @pytest.mark.parametrize( + "type_name,expected_min,expected_max", + [ + ("BOOL", 0, 1), + ("SINT", -128, 127), + ("USINT", 0, 255), + ("BYTE", 0, 255), + ("INT", -32768, 32767), + ("UINT", 0, 65535), + ("WORD", 0, 65535), + ("DINT", -2147483648, 2147483647), + ("UDINT", 0, 4294967295), + ("DWORD", 0, 4294967295), + ("LINT", -9223372036854775808, 9223372036854775807), + ("ULINT", 0, 18446744073709551615), + ("LWORD", 0, 18446744073709551615), + ], + ) + def test_integer_type_bounds(self, type_name, expected_min, expected_max): + """Verify correct bounds for all integer types.""" + bounds = get_type_bounds(type_name) + assert bounds is not None + assert bounds[0] == expected_min + assert bounds[1] == expected_max + + def test_float_types_have_integer_bounds(self): + """Verify float types report integer representation bounds.""" + real_bounds = get_type_bounds("REAL") + assert real_bounds is not None + # REAL uses 32-bit storage + assert real_bounds == (-2147483648, 2147483647) + + lreal_bounds = get_type_bounds("LREAL") + assert lreal_bounds is not None + # LREAL uses 64-bit storage + assert lreal_bounds == (-9223372036854775808, 9223372036854775807) + + +class TestTypeSizes: + """Tests for type size information.""" + + @pytest.mark.parametrize( + "type_name,expected_bytes", + [ + ("BOOL", 1), + ("SINT", 1), + ("USINT", 1), + ("BYTE", 1), + ("INT", 2), + ("UINT", 2), + ("WORD", 2), + ("DINT", 4), + ("UDINT", 4), + ("DWORD", 4), + ("REAL", 4), + ("LINT", 8), + ("ULINT", 8), + ("LWORD", 8), + ("LREAL", 8), + ("TIME", 8), + ("DATE", 8), + ("TOD", 8), + ("DT", 8), + ("STRING", 127), + ], + ) + def test_type_sizes(self, type_name, expected_bytes): + """Verify correct size in bytes for all types.""" + info = get_type_info(type_name) + assert info is not None + assert info.size_bytes == expected_bytes + + +class TestRegisterCounts: + """Tests for Modbus register count calculation.""" + + @pytest.mark.parametrize( + "type_name,expected_count", + [ + ("BOOL", 0), # BOOL uses coils, not registers + ("SINT", 1), + ("USINT", 1), + ("BYTE", 1), + ("INT", 1), + ("UINT", 1), + ("WORD", 1), + ("DINT", 2), + ("UDINT", 2), + ("DWORD", 2), + ("REAL", 2), + ("LINT", 4), + ("ULINT", 4), + ("LWORD", 4), + ("LREAL", 4), + ("TIME", 4), + ], + ) + def test_register_counts(self, type_name, expected_count): + """Verify correct Modbus register count for all types.""" + assert get_register_count(type_name) == expected_count + + +class TestSizeCodes: + """Tests for IEC size code mapping.""" + + @pytest.mark.parametrize( + "type_name,expected_code", + [ + ("BOOL", "X"), + ("SINT", "B"), + ("USINT", "B"), + ("BYTE", "B"), + ("INT", "W"), + ("UINT", "W"), + ("WORD", "W"), + ("DINT", "D"), + ("UDINT", "D"), + ("DWORD", "D"), + ("REAL", "D"), + ("LINT", "L"), + ("ULINT", "L"), + ("LWORD", "L"), + ("LREAL", "L"), + ("STRING", "S"), + ], + ) + def test_size_codes(self, type_name, expected_code): + """Verify correct IEC size codes for all types.""" + assert get_size_code(type_name) == expected_code + + @pytest.mark.parametrize( + "size_code,expected_type", + [ + ("X", "BOOL"), + ("B", "BYTE"), + ("W", "WORD"), + ("D", "DWORD"), + ("L", "LWORD"), + ], + ) + def test_size_code_to_type(self, size_code, expected_type): + """Verify size code to type name conversion.""" + assert size_code_to_type_name(size_code) == expected_type + + +class TestCtypeClasses: + """Tests for ctypes class mapping.""" + + @pytest.mark.parametrize( + "type_name,expected_ctype", + [ + ("BOOL", ctypes.c_uint8), + ("SINT", ctypes.c_int8), + ("USINT", ctypes.c_uint8), + ("BYTE", ctypes.c_uint8), + ("INT", ctypes.c_int16), + ("UINT", ctypes.c_uint16), + ("WORD", ctypes.c_uint16), + ("DINT", ctypes.c_int32), + ("UDINT", ctypes.c_uint32), + ("DWORD", ctypes.c_uint32), + ("LINT", ctypes.c_int64), + ("ULINT", ctypes.c_uint64), + ("LWORD", ctypes.c_uint64), + ("REAL", ctypes.c_float), + ("LREAL", ctypes.c_double), + ], + ) + def test_ctype_classes(self, type_name, expected_ctype): + """Verify correct ctypes classes for all types.""" + assert get_ctype_class(type_name) == expected_ctype + + +class TestTypeCategories: + """Tests for type category sets.""" + + def test_integer_types_set(self): + """Verify INTEGER_TYPES contains all integer types.""" + assert "SINT" in INTEGER_TYPES + assert "INT" in INTEGER_TYPES + assert "DINT" in INTEGER_TYPES + assert "LINT" in INTEGER_TYPES + assert "REAL" not in INTEGER_TYPES # Float + assert "BOOL" not in INTEGER_TYPES + + def test_signed_vs_unsigned_partition(self): + """Verify signed/unsigned sets are mutually exclusive and complete.""" + assert SIGNED_INTEGER_TYPES.isdisjoint(UNSIGNED_INTEGER_TYPES) + assert SIGNED_INTEGER_TYPES | UNSIGNED_INTEGER_TYPES == INTEGER_TYPES + + def test_float_types_set(self): + """Verify FLOAT_TYPES contains exactly REAL and LREAL.""" + assert FLOAT_TYPES == {"REAL", "LREAL"} + + def test_time_types_set(self): + """Verify TIME_TYPES contains all time-related types.""" + assert TIME_TYPES == {"TIME", "DATE", "TOD", "DT"} + + def test_bit_string_types_set(self): + """Verify BIT_STRING_TYPES contains byte/word/dword/lword.""" + assert BIT_STRING_TYPES == {"BYTE", "WORD", "DWORD", "LWORD"} + + +class TestTypeLookupBySize: + """Tests for looking up types by size.""" + + def test_get_types_by_size_1_byte(self): + """Verify 1-byte types are correctly identified.""" + one_byte_types = get_types_by_size(1) + assert "BOOL" in one_byte_types + assert "SINT" in one_byte_types + assert "USINT" in one_byte_types + assert "BYTE" in one_byte_types + assert "INT" not in one_byte_types + + def test_get_types_by_size_2_bytes(self): + """Verify 2-byte types are correctly identified.""" + two_byte_types = get_types_by_size(2) + assert "INT" in two_byte_types + assert "UINT" in two_byte_types + assert "WORD" in two_byte_types + assert "DINT" not in two_byte_types + + def test_get_types_by_size_4_bytes(self): + """Verify 4-byte types are correctly identified.""" + four_byte_types = get_types_by_size(4) + assert "DINT" in four_byte_types + assert "UDINT" in four_byte_types + assert "DWORD" in four_byte_types + assert "REAL" in four_byte_types + assert "LINT" not in four_byte_types + + def test_get_types_by_size_8_bytes(self): + """Verify 8-byte types are correctly identified.""" + eight_byte_types = get_types_by_size(8) + assert "LINT" in eight_byte_types + assert "ULINT" in eight_byte_types + assert "LWORD" in eight_byte_types + assert "LREAL" in eight_byte_types + assert "TIME" in eight_byte_types + assert "DATE" in eight_byte_types + assert "TOD" in eight_byte_types + assert "DT" in eight_byte_types + + +class TestTypeInfoImmutability: + """Tests for IECTypeInfo immutability.""" + + def test_type_info_is_frozen(self): + """Verify IECTypeInfo instances cannot be modified.""" + info = get_type_info("SINT") + with pytest.raises(AttributeError): + info.name = "MODIFIED" + with pytest.raises(AttributeError): + info.size_bits = 999 + + def test_registry_copy_is_independent(self): + """Verify get_all_types returns an independent copy.""" + registry1 = get_all_types() + registry2 = get_all_types() + # Modifying one should not affect the other + del registry1["SINT"] + assert "SINT" in registry2