diff --git a/docs/serial_encryption.md b/docs/serial_encryption.md new file mode 100644 index 00000000..1ac3fadc --- /dev/null +++ b/docs/serial_encryption.md @@ -0,0 +1,185 @@ +# EVO Serial Encryption (E0 FE frames) + +EVO panels with firmware ≥ 7.50.000 encrypt the RS-232 serial bus using a +Paradox-custom AES-256-ECB algorithm. The unencrypted serial port can be +re-enabled with an unlock code from Paradox. + +--- + +## Two distinct E0 FE frame formats + +Both formats share the same two-byte header (`0xE0`, `0xFE`), but differ +in everything that follows. + +### Format 1 — BabyWare compact (firmware < 7.50 / BabyWare software) + +``` +[E0 | status_nibble] [FE] [length] [0x00] [request_nr] [data...] [checksum] [end] +``` + +- `byte[2]` is the **total frame length** (including all header/trailer bytes). +- Data payload is short (typically 4–8 bytes); the proprietary encryption + algorithm is unknown. +- PAI passes these frames to the handler unchanged. They cannot be decrypted. + +### Format 2 — Full-AES (ESP32 firmware / EVO ≥ 7.50) + +``` +[0xE0 | (payload[0] & 0x0F)] [0xFE] [AES-256 ciphertext...] [checksum] +``` + +- **No length byte** at position `[2]`. The low nibble of `byte[0]` echoes + the low nibble of the first byte of the plaintext payload. +- AES ciphertext occupies `ceil(len(payload) / 16) * 16` bytes + (payload is padded to a 16-byte boundary with `0xEE`). +- `byte[-1]` is `sum(all preceding bytes) % 256`. +- Frame boundary is determined by scanning for a **valid checksum at AES block + boundaries**: try `frame_len = 2 + n×16 + 1` for n = 1, 2, 3… and accept + the first position where `sum(frame[:-1]) % 256 == frame[-1]`. No length + field or inter-byte timeout is required. + +#### Example: 37-byte InitiateCommunication + +``` +plaintext : 37 bytes +padded : 48 bytes (3 × 16, trailing 0xEE fill) +frame size : 2 (header) + 48 (AES) + 1 (checksum) = 51 bytes +``` + +--- + +## Encryption algorithm + +Paradox uses a **custom pure-Python AES-256-ECB** implementation +(`paradox/lib/crypto.py`). The same algorithm is used for IP150 TCP +encryption; only the key derivation differs. + +### Key derivation for serial + +``` +key = pc_password_bytes + b"\xee" * (32 - len(pc_password_bytes)) +``` + +- `make_serial_key(password)` in `paradox/connections/serial_encryption.py` + handles `str`, `bytes`, and `int` password types. +- The PC password is the 4-hex-digit code configured on the panel + (e.g. `"0000"` → `b"0000" + b"\xee" * 28`). +- **Do not call `str()` on a `bytes` password** — `str(b"1234")` yields + `"b'1234'"`, producing the wrong key bytes. + +### Padding + +Plaintext is padded with `0xEE` bytes to the next 16-byte boundary before +encryption. After decryption, trailing `0xEE` bytes are stripped. + +--- + +## PAI implementation + +### Outgoing messages (`SerialConnectionProtocol.send_message`) + +When `SERIAL_ENCRYPTED = True`, `connection_made()` wraps the raw asyncio +transport with `EncryptedSerialTransport`. Every call to `transport.write()` +transparently passes through `encrypt_serial_message()` before hitting the +wire. + +``` +PAI message bytes + → encrypt_serial_message(payload, key) + → [E0|nibble][FE][AES...][cs] + → serial port +``` + +### Incoming messages (`SerialConnectionProtocol.data_received`) + +The framer distinguishes the two E0 FE modes by the `SERIAL_ENCRYPTED` config +flag: + +| `SERIAL_ENCRYPTED` | `buffer[1] == 0xFE` action | +|---|---| +| `False` | Read `buffer[2]` as total frame length (BabyWare compact) | +| `True` | Scan `frame_len = 2 + n×16 + 1` (n = 1…7); accept first valid checksum | + +For Format 2, the framer scans incrementally as bytes arrive — no timer is +needed. The decrypted payload is forwarded to `handler.on_message()` +immediately once a valid checksum boundary is found. + +### Configuration + +```python +# pai.conf +SERIAL_ENCRYPTED = True # default: False +PASSWORD = "0000" # PC password (same as used for non-encrypted serial) +``` + +`SERIAL_ENCRYPTED` applies to both `SERIAL_PORT` and `IP_CONNECTION_BARE` connections. +Both use `SerialConnectionProtocol`, so the same framing and decryption logic is active +for either transport. + +--- + +## Capture file decryption (`ip150_connection_decrypt --serial`) + +The `--serial` mode in `paradox/console_scripts/ip150_connection_decrypt.py` +can decrypt capture files produced by the ESP32 or a serial sniffer. + +### Capture file format + +``` +TX [51]: e0 fe 12 c5 ca 4a b7 dc b3 c5 92 06 f6 e9 eb 47 76 1e c9 28 bf 27 54 ee 41 dd d3 ab b4 d0 88 bb b3 ee 36 9b e2 17 50 fd 52 cc 91 19 ... +RX [51]: e0 fe ... +``` + +Each line is a complete framed packet. The tool calls +`decrypt_serial_message(frame, key)` directly on each E0 FE line; no +timeout-based framing is needed since packet boundaries are already known. + +### Usage + +```bash +pai-decrypt capture.serial --serial --pc-password 0000 +``` + +### Decryption behaviour + +| Frame type | Outcome | +|---|---| +| Full-AES E0 FE (≥ 16 bytes AES data) | Decrypted; inner message parsed by panel parsers | +| BabyWare compact E0 FE (< 16 bytes AES data) | Structure displayed (`request_nr`, raw data); content not decryptable | +| Regular Paradox frame | Parsed normally | + +--- + +## File locations + +| File | Role | +|---|---| +| `paradox/lib/crypto.py` | `encrypt()`, `decrypt()`, `encrypt_serial_message()`, `decrypt_serial_message()` | +| `paradox/connections/serial_encryption.py` | `EncryptedSerialTransport`, `make_serial_key()` | +| `paradox/connections/protocols.py` | `SerialConnectionProtocol` — framing and dispatch | +| `paradox/config.py` | `SERIAL_ENCRYPTED` flag | +| `paradox/console_scripts/ip150_connection_decrypt.py` | Offline capture decryption tool | +| `tests/lib/test_serial_crypto.py` | Unit tests for encrypt/decrypt roundtrips | +| `tests/connection/test_serial_protocol.py` | Integration tests for framing and decryption | + +--- + +## Known pitfalls + +- **No length byte in Format 2.** An earlier PAI implementation incorrectly + read `buffer[2]` as the frame length. In Format 2, that byte is AES + ciphertext — roughly half the time it exceeds the actual frame length, + causing the framer to stall. + +- **AES data offset.** An earlier implementation read `frame[3:-1]` for the + ciphertext, skipping three bytes instead of two, so decryption always + produced garbage. + +- **`str(bytes)` key corruption.** Passing a `bytes` password through + `str()` before encoding produces `"b'1234'"` instead of `"1234"`, + generating the wrong key. `make_serial_key()` handles `bytes` directly to + avoid this. + +- **BabyWare compact frames are not AES.** These 12–13 byte frames use a + proprietary algorithm; brute-forcing the PC password across 0000–9999 yields + no valid decryptions. diff --git a/paradox/config.py b/paradox/config.py index 9154ff99..8388494a 100644 --- a/paradox/config.py +++ b/paradox/config.py @@ -29,6 +29,7 @@ class Config: # Serial Connection Details "SERIAL_PORT": "/dev/ttyS1", # Pathname of the Serial Port "SERIAL_BAUD": 9600, # Baud rate of the Serial Port. Use 38400(default setting) or 57600 for EVO + "SERIAL_ENCRYPTED": False, # Set True for EVO panels with full serial encryption (firmware >= 7.50) # IP Connection Details "IP_CONNECTION_HOST": "127.0.0.1", # IP Module address when using direct IP Connection "IP_CONNECTION_PORT": ( @@ -57,7 +58,11 @@ class Config: "KEEP_ALIVE_INTERVAL": 10, # Interval between status updates "IO_TIMEOUT": 0.5, # Timeout for IO operations "LIMITS": {}, # By default all zones will be monitored - "MODULE_PGM_ADDRESSES": ({}, dict, None), # Map of bus module address -> pgm count, e.g. {4: 4} + "MODULE_PGM_ADDRESSES": ( + {}, + dict, + None, + ), # Map of bus module address -> pgm count, e.g. {4: 4} "LABEL_ENCODING": "paradox-en", # Encoding to use when decoding labels. paradox-* or https://docs.python.org/3/library/codecs.html#standard-encodings "LABEL_REFRESH_INTERVAL": ( 15 * 60, diff --git a/paradox/connections/protocols.py b/paradox/connections/protocols.py index b43eff14..8f5fdcbd 100644 --- a/paradox/connections/protocols.py +++ b/paradox/connections/protocols.py @@ -11,6 +11,11 @@ IPMessageResponse, IPMessageType, ) +from paradox.connections.serial_encryption import ( + EncryptedSerialTransport, + make_serial_key, +) +from paradox.lib.crypto import decrypt_serial_message logger = logging.getLogger("PAI").getChild(__name__) @@ -102,6 +107,15 @@ def __del__(self): class SerialConnectionProtocol(ConnectionProtocol): + def connection_made(self, transport): + if cfg.SERIAL_ENCRYPTED and cfg.PASSWORD: + self._serial_key = make_serial_key(cfg.PASSWORD) + transport = EncryptedSerialTransport(transport, self._serial_key) + logger.info("Serial encryption enabled (SERIAL_ENCRYPTED=True)") + else: + self._serial_key = None + super().connection_made(transport) + def send_message(self, message): if cfg.LOGGING_DUMP_PACKETS: logger.debug(f"PAI -> SER {binascii.hexlify(message)}") @@ -116,6 +130,7 @@ def data_received(self, recv_data): min_length = 4 if self.use_variable_message_length else 37 while len(self.buffer) >= min_length: + is_encrypted_frame = False if self.use_variable_message_length: if self.buffer[0] >> 4 == 0: potential_packet_length = 37 @@ -128,7 +143,31 @@ def data_received(self, recv_data): elif self.buffer[0] >> 4 == 0xC: potential_packet_length = self.buffer[1] * 256 + self.buffer[2] elif self.buffer[0] >> 4 == 0xE: - if self.buffer[1] < 37 or self.buffer[1] == 0xFF: + if self.buffer[1] == 0xFE: + if cfg.SERIAL_ENCRYPTED: + # Full-AES E0 FE: frame is [E0|x][FE][n*16 AES bytes][checksum]. + # Scan AES block boundaries for the first valid checksum to + # determine frame length without a timeout. + found = False + for n_blocks in range(1, 8): + frame_len = 2 + n_blocks * 16 + 1 + if len(self.buffer) < frame_len: + break + candidate = self.buffer[:frame_len] + if sum(candidate[:-1]) % 256 == candidate[-1]: + potential_packet_length = frame_len + is_encrypted_frame = True + found = True + break + if not found: + break # Wait for more data + else: + # BabyWare compact E0 FE: length byte at [2] + if len(self.buffer) < 3: + break + potential_packet_length = self.buffer[2] + is_encrypted_frame = True + elif self.buffer[1] < 37 or self.buffer[1] == 0xFF: # MG/SP in 21st century and EVO Live Events. Probable values=0x13, 0x13, 0x00, 0xFF potential_packet_length = 37 else: @@ -144,12 +183,24 @@ def data_received(self, recv_data): frame = self.buffer[:potential_packet_length] - if checksum(frame, min_length): + if is_encrypted_frame or checksum(frame, min_length): self.buffer = self.buffer[len(frame) :] # Remove message if cfg.LOGGING_DUMP_PACKETS: logger.debug(f"SER -> PAI {binascii.hexlify(frame)}") - self.handler.on_message(frame) + if cfg.SERIAL_ENCRYPTED and is_encrypted_frame: + decrypted = decrypt_serial_message(frame, self._serial_key) + if decrypted: + logger.debug( + f"SER DECRYPT: {len(frame)}b E0FE → {len(decrypted)}b" + ) + self.handler.on_message(decrypted) + else: + logger.warning( + f"SER: E0FE AES decrypt failed: {binascii.hexlify(frame)}" + ) + else: + self.handler.on_message(frame) else: self.buffer = self.buffer[1:] diff --git a/paradox/connections/serial_encryption.py b/paradox/connections/serial_encryption.py new file mode 100644 index 00000000..eb5898cc --- /dev/null +++ b/paradox/connections/serial_encryption.py @@ -0,0 +1,36 @@ +"""Transparent AES-256 E0 FE encryption layer for EVO serial connections.""" + +import logging + +from paradox.lib.crypto import encrypt_serial_message + +logger = logging.getLogger("PAI").getChild(__name__) + + +class EncryptedSerialTransport: + """Wraps a serial transport to transparently encrypt outgoing messages.""" + + def __init__(self, transport, key: bytes): + self._transport = transport + self._key = key + + def write(self, data: bytes) -> None: + encrypted = encrypt_serial_message(data, self._key) + logger.debug(f"SER ENCRYPT: {len(data)}b → {len(encrypted)}b E0FE frame") + self._transport.write(encrypted) + + def __getattr__(self, name): + return getattr(self._transport, name) + + +def make_serial_key(password) -> bytes: + """Derive the 32-byte serial encryption key from the panel PC password.""" + if isinstance(password, bytes): + raw = password + elif isinstance(password, int): + raw = str(password).zfill(4).encode("utf-8") + else: + raw = str(password).encode("utf-8") + if len(raw) < 32: + raw = raw + b"\xee" * (32 - len(raw)) + return raw[:32] diff --git a/paradox/console_scripts/ip150_connection_decrypt.py b/paradox/console_scripts/ip150_connection_decrypt.py index 1c8951f0..8918dfc3 100644 --- a/paradox/console_scripts/ip150_connection_decrypt.py +++ b/paradox/console_scripts/ip150_connection_decrypt.py @@ -14,8 +14,10 @@ IPMessageType, IPPayloadConnectResponse, ) +from paradox.connections.serial_encryption import make_serial_key from paradox.hardware import create_panel -from paradox.hardware.parsers import InitiateCommunicationResponse +from paradox.hardware.parsers import Encrypted, InitiateCommunicationResponse +from paradox.lib.crypto import decrypt_serial_message class Colors: # You may need to change color settings @@ -243,6 +245,90 @@ def decrypt_file(file, password, max_packets: int = None): print(f"{Colors.RED}{exc}{Colors.ENDC}") +def decrypt_serial_file(file, pc_password=None, max_packets: int = None): + import re + + line_re = re.compile(r"^(TX|RX) \[(\d+)\]: ([0-9A-Fa-f ]+)$") + panel = create_panel(None) + key_bytes = make_serial_key(pc_password) if pc_password else None + n = 0 + for line in file: + line = line.strip() + if not line: + continue + m = line_re.match(line) + if not m: + print(f"{Colors.RED}Unrecognised line: {line}{Colors.ENDC}") + continue + direction, length, hex_data = m.group(1), int(m.group(2)), m.group(3) + message = bytes.fromhex(hex_data.replace(" ", "")) + if len(message) != length: + print( + f"{Colors.RED}Length mismatch: declared {length}, got {len(message)}{Colors.ENDC}" + ) + continue + + color = Colors.BLUE if direction == "TX" else Colors.GREEN + print( + f"{color}{direction} [{length}]: {binascii.hexlify(message).decode()}{Colors.ENDC}" + ) + + if len(message) >= 2 and message[0] >> 4 == 0xE and message[1] == 0xFE: + # Try full-AES decryption first (ESP32-style frames) + if key_bytes: + plaintext = decrypt_serial_message(message, key_bytes) + if plaintext: + print( + f"{Colors.ON_WHITE} AES-256 decrypted ({len(message)}b → {len(plaintext)}b): " + f"{binascii.hexlify(plaintext).decode()}{Colors.ENDC}" + ) + try: + inner = panel.parse_message( + plaintext, + "topanel" if direction == "TX" else "frompanel", + ) + if inner: + print(f"{Colors.ON_WHITE} Parsed: {inner}{Colors.ENDC}") + if inner.fields.value.po.command == 0: + panel = create_panel(None, inner) + except Exception: + pass + continue + + # Fallback: BabyWare compact E0 FE (algorithm unknown, display structure only) + try: + parsed = Encrypted.parse(message) + print( + f"{Colors.ON_WHITE} Encrypted frame (compact): request_nr={parsed.fields.value.request_nr}" + f" data({len(parsed.fields.value.data)}b)={binascii.hexlify(parsed.fields.value.data).decode()}{Colors.ENDC}" + ) + except Exception: + print( + f"{Colors.ON_WHITE} E0 FE frame ({len(message)}b, no key provided or unknown format){Colors.ENDC}" + ) + else: + try: + parsed = panel.parse_message( + message, "topanel" if direction == "TX" else "frompanel" + ) + if parsed: + print(f"{Colors.ON_WHITE} {parsed}{Colors.ENDC}") + if parsed.fields.value.po.command == 0: + panel = create_panel(None, parsed) + else: + print( + f"{Colors.RED} No parser for message: {binascii.hexlify(message).decode()}{Colors.ENDC}" + ) + except Exception: + print(f"{Colors.RED} Parse error{Colors.ENDC}") + traceback.print_exc() + + n += 1 + if max_packets is not None and n >= max_packets: + print(f"Force stopped on {max_packets} packets") + return + + def main(): parser = argparse.ArgumentParser() parser.add_argument( @@ -255,8 +341,9 @@ def main(): parser.add_argument( "password", type=str, + nargs="?", default="paradox", - help="IP Module password for decryption", + help="IP Module password for decryption (not required in --serial mode)", ) parser.add_argument( "-n", @@ -264,10 +351,24 @@ def main(): type=int, help="Packets to decrypt", ) + parser.add_argument( + "--serial", + action="store_true", + help="Parse a .serial capture file (TX/RX hex lines) instead of IP YAML", + ) + parser.add_argument( + "--pc-password", + type=str, + default=None, + help="PC password for E0 FE AES-256 decryption attempt (serial mode only)", + ) args = parser.parse_args() - decrypt_file(args.file, args.password.encode("utf8"), args.packets) + if args.serial: + decrypt_serial_file(args.file, args.pc_password, args.packets) + else: + decrypt_file(args.file, args.password.encode("utf8"), args.packets) if __name__ == "__main__": diff --git a/paradox/event.py b/paradox/event.py index 7ad41da1..0c1b3864 100644 --- a/paradox/event.py +++ b/paradox/event.py @@ -129,7 +129,7 @@ def call_hook(self, *args, **kwargs): class LiveEvent(Event): def __init__(self, event: Container, event_map: dict, label_provider=None): raw = event.fields.value - if raw.po.command != 0xE: + if raw.po.command != 0xE and hasattr(raw, "event"): raise AssertionError("Message is not an event") # parse event map diff --git a/paradox/hardware/panel.py b/paradox/hardware/panel.py index f2c603ba..4298bf10 100644 --- a/paradox/hardware/panel.py +++ b/paradox/hardware/panel.py @@ -33,9 +33,12 @@ def __init__(self, core, variable_message_length=True): self.variable_message_length = variable_message_length def parse_message(self, message, direction="topanel") -> typing.Optional[Container]: - if message is None or len(message) == 0: + if message is None or len(message) < 2: return None + if message[0] >> 4 == 0xE and message[1] == 0xFE: + return parsers.Encrypted.parse(message) + if direction == "topanel": if message[0] == 0x72 and message[1] == 0: return parsers.InitiateCommunication.parse(message) @@ -46,8 +49,8 @@ def parse_message(self, message, direction="topanel") -> typing.Optional[Contain return parsers.InitiateCommunicationResponse.parse(message) elif message[0] == 0x00 and message[4] > 0: return parsers.StartCommunicationResponse.parse(message) - else: - return None + + return None def get_message(self, name) -> Construct: clsmembers = dict(inspect.getmembers(parsers)) diff --git a/paradox/hardware/parsers.py b/paradox/hardware/parsers.py index 93948094..41e6e967 100644 --- a/paradox/hardware/parsers.py +++ b/paradox/hardware/parsers.py @@ -1,8 +1,30 @@ -from construct import (BitsInteger, BitStruct, Bytes, Const, Default, Enum, - Flag, Int8ub, Int16ub, Nibble, Padding, RawCopy, Struct) +from construct import ( + BitsInteger, + BitStruct, + Bytes, + Checksum, + Const, + Default, + Enum, + Flag, + Int8ub, + Int16ub, + Nibble, + Padding, + RawCopy, + Struct, + this, +) -from .common import (CommunicationSourceIDEnum, HexInt, PacketChecksum, - PacketLength, ProductIdEnum, FamilyIdEnum) +from .common import ( + CommunicationSourceIDEnum, + FamilyIdEnum, + HexInt, + PacketChecksum, + PacketLength, + ProductIdEnum, + calculate_checksum, +) InitiateCommunication = Struct( "fields" @@ -120,3 +142,30 @@ ), "checksum" / PacketChecksum(Bytes(1)), ) + +Encrypted = Struct( + "fields" + / RawCopy( + Struct( + "po" + / BitStruct( + "command" / Const(0xE, Nibble), + "status" + / Struct( + "reserved" / Flag, + "alarm_reporting_pending" / Flag, + "Winload_connected" / Flag, + "NeWare_connected" / Flag, + ), + ), + "source" / Const(0xFE, Int8ub), + "length" / PacketLength(Int8ub), + "_not_used0" / Bytes(1), + "request_nr" / Int8ub, + "data" / Bytes(lambda this: this.length - 7), + ) + ), + "checksum" + / Checksum(Bytes(1), lambda data: calculate_checksum(data), this.fields.data), + "end" / Int8ub, +) diff --git a/paradox/lib/async_message_manager.py b/paradox/lib/async_message_manager.py index 8b7c9b30..7e13ac3f 100644 --- a/paradox/lib/async_message_manager.py +++ b/paradox/lib/async_message_manager.py @@ -14,7 +14,7 @@ class EventMessageHandler(PersistentHandler): def can_handle(self, data: Container) -> bool: assert isinstance(data, Container) values = data.fields.value - return values.po.command == 0xE and (not hasattr(values, "requested_event_nr")) + return values.po.command == 0xE and hasattr(values, "event") class ErrorMessageHandler(PersistentHandler): diff --git a/paradox/lib/crypto.py b/paradox/lib/crypto.py index 73db5aae..c9fdbd04 100644 --- a/paradox/lib/crypto.py +++ b/paradox/lib/crypto.py @@ -476,3 +476,56 @@ def decrypt(ctxt, key): extend(a) return bytes(dtxt) + + +def encrypt_serial_message(payload: bytes, key) -> bytes: + """Wrap a serial message payload in an encrypted E0 FE frame. + + Uses Paradox custom AES-256-ECB. Output format matches the ESP32 IP150 emulator: + [0xE0 | (payload[0] & 0x0F)][0xFE][AES-256(padded_payload)][checksum] + + There is no length byte — the frame boundary is determined by timeout on the + receiving end (matching the panel's own framing behaviour). + + Args: + payload: raw serial message bytes (e.g. a 37-byte InitiateCommunication) + key: PC password bytes; padded to 32 bytes with 0xEE if shorter + + Returns: + Complete E0 FE encrypted serial frame ready to write to the serial port. + """ + if not payload: + return b"" + encrypted = encrypt(payload, key) + frame = bytearray([0xE0 | (payload[0] & 0x0F), 0xFE]) + bytearray(encrypted) + checksum_byte = sum(frame) % 256 + frame.append(checksum_byte) + return bytes(frame) + + +def decrypt_serial_message(frame: bytes, key) -> bytes: + """Decrypt an E0 FE serial frame and return the original payload. + + Expects the ESP32-compatible format: [E0|x][FE][AES_blocks...][checksum] + (no length byte at position [2]). + + Args: + frame: raw E0 FE frame bytes (as received from the serial port) + key: PC password bytes; padded to 32 bytes with 0xEE if shorter + + Returns: + Decrypted payload bytes with trailing 0xEE padding stripped. + Returns b"" if the frame is too short or contains no full AES blocks. + """ + if len(frame) < 4 or frame[0] >> 4 != 0xE or frame[1] != 0xFE: + return b"" + # Full-AES frames: total = 2 + n*16 + 1 => (len - 3) must be a multiple of 16. + # BabyWare compact frames do NOT satisfy this — reject them early. + if (len(frame) - 3) % 16 != 0: + return b"" + # Verify the E0 FE checksum (sum of all bytes except last) before decrypting. + if sum(frame[:-1]) % 256 != frame[-1]: + return b"" + enc_data = frame[2:-1] # Guaranteed to be a multiple of 16 bytes + decrypted = decrypt(enc_data, key) + return decrypted.rstrip(b"\xee") diff --git a/tests/connection/ip/test_parsers.py b/tests/connection/ip/test_parsers.py index a28e6d4b..74894562 100644 --- a/tests/connection/ip/test_parsers.py +++ b/tests/connection/ip/test_parsers.py @@ -1,8 +1,11 @@ +import pytest + from paradox.connections.ip.parsers import ( IPMessageRequest, IPMessageResponse, IPPayloadConnectResponse, ) +from paradox.hardware.parsers import Encrypted def test_IPMessageRequest_defaults(): @@ -112,7 +115,7 @@ def test_parse_ip_payload_connect_response_success(): """Parse a successful IP connect response.""" raw = _build_ip_payload_connect_response( login_status=0x00, - key=b"\xAA" * 16, + key=b"\xaa" * 16, hardware_version=0x0100, ip_firmware_major=0x05, ip_firmware_minor=0x02, @@ -121,7 +124,7 @@ def test_parse_ip_payload_connect_response_success(): data = IPPayloadConnectResponse.parse(raw) assert data.login_status == "success" - assert data.key == b"\xAA" * 16 + assert data.key == b"\xaa" * 16 assert data.hardware_version == 0x0100 assert data.ip_module_serial == b"\x71\x12\x34\x56" @@ -188,7 +191,7 @@ def test_parse_ip_payload_connect_response_hardware_version(): def test_parse_ip_payload_connect_response_ip150_type(): """Parse ip_type as IP150 when first serial byte is 0x71.""" raw = _build_ip_payload_connect_response( - ip_module_serial=b"\x71\xAA\xBB\xCC", + ip_module_serial=b"\x71\xaa\xbb\xcc", ) data = IPPayloadConnectResponse.parse(raw) @@ -227,3 +230,83 @@ def test_parse_ip_payload_connect_response_serial_preserved(): data = IPPayloadConnectResponse.parse(raw) assert data.ip_module_serial == serial + + +# --------------------------------------------------------------------------- +# Encrypted — PARSE tests +# --------------------------------------------------------------------------- +# Real captured frames from EVO192 (firmware 7.50.000+) and SP6000+. +# Format: [E0|status_nibble][FE][length][not_used][request_nr][data...][checksum][end] +# Verifies that Encrypted.parse() extracts the data field correctly. + + +@pytest.mark.parametrize( + "payload_hex", + [ + # from EVO192 7.50.000+ firmware + # tx + ( + "E0 FE 2E 00 12 C5 CA 4A B7 DC B3 C5 92 06 F6 E9 EB 47 76 1E C9 28 BF 27 54 EE 41 DD D3 AB B4 D0 88 BB B3 EE 36 9B E2 17 50 FD 52 CC 91 19" + ), + # rx + ( + "E0 FE 2E 00 12 C5 3F 0A B7 DC 83 97 D4 06 F6 E9 EB 47 56 5C 89 38 BF 35 F0 EA A5 DC C3 2B 95 D2 80 E9 B3 EE 36 9B E2 17 50 FD B4 CC 7C 19" + ), + # tx + ( + "E0 FE 2E 00 12 01 79 71 35 21 C7 F1 C5 3F 0A B7 DC B3 E5 D0 46 E6 E9 F9 E3 72 FA C8 38 3F 06 56 E6 13 DD D3 AB B4 D0 88 BB B3 77 B4 11 19" + ), + # rx + ("E0 FE 0F 00 12 01 3D 77 35 21 F7 03 B4 B8 04"), + ( + "E0 FE 2E 00 13 80 4F F6 7E FD 6A 3B 91 85 52 E2 45 A5 52 DB 28 77 5D DC 9C 64 42 DB BA BE 47 79 71 35 21 F7 A3 83 3F 0A B7 DC E0 69 9B 16" + ), + ( + "E0 FE 2E 00 14 61 CB D8 54 3E 81 E5 F1 2B BC E0 EE BB B2 EE 36 9B E2 17 50 FD 2D 15 F3 05 D7 2F B5 59 19 60 FC 6B F3 CC 76 B8 28 D3 4A 18" + ), + # tx + ("E0 FE 11 00 14 F5 78 3D 21 F7 59 83 BF 54 BC 70 07"), + # rx + ( + "E0 FE 50 00 14 F5 7A 90 21 F7 59 83 0F 0A B7 DC B3 C5 92 06 F6 E9 EB 47 76 1E C9 28 BF 27 54 EE 41 DD D3 AB B4 D0 88 BB B3 EE 36 9B E2 17 50 FD 2D 15 F3 05 D7 2F B5 59 19 60 FC 6B F3 CC 76 B8 8E 81 F7 D4 49 FC 06 BE 6E E3 4E 29 99 BC E5 2A" + ), + # tx + ("E0 FE 11 00 14 E3 42 95 95 56 5A DB 25 19 8C A7 06"), + # rx + ( + "E0 FE 50 00 14 E3 40 38 95 56 5A DB A5 46 DB 28 77 5D DC 9C 64 42 DB BA BE 47 79 71 35 21 F7 A3 83 3F 0A B7 DC B3 C5 92 06 F6 E9 EB 47 76 1E C9 28 BF 27 54 EE 41 DD D3 AB B4 D0 88 B3 A3 FE B6 8B E2 17 50 ED 07 15 F3 05 D7 2F B5 F1 8C FD 29" + ), + # tx + ("E0 FE 11 00 14 A8 76 23 8A F9 6A EF 5A E3 24 81 07"), + # rx + ( + "E0 FE 50 00 14 A8 74 8E 8A F9 6A EF DA 3D B2 83 09 7E BC 6F 4B 9D 95 56 A0 DF A5 46 DB 28 77 5D DC 9C 64 42 DB BA BE 47 79 71 B5 21 F7 A3 83 3F 0A B7 DC B3 C5 92 06 F6 E9 EB 47 76 1E C9 28 BF 27 54 EE 41 DD D3 AB B4 D0 88 BB B3 16 24 69 2A" + ), + # tx + ("E0 FE 11 00 14 B7 E4 97 24 7F E4 CE FB 4F B8 8C 08"), + # rx + ( + "E0 FE 50 00 14 B7 F4 0A 24 7F E4 CE F9 90 CF DA 3D B2 83 09 7E BC 6F 4B 9D 95 56 A0 DF A5 46 DB 28 77 5D DC 9C 64 42 DB BA BE 47 79 71 35 21 F7 A3 83 3F 0A B7 DC B3 C5 92 06 F6 E9 EB 47 76 1E C9 28 BF 27 54 EE 41 DD D3 AB B4 D0 25 B8 72 2A" + ), + # tx + ("E0 FE 11 00 14 F7 BC 57 EC F4 65 C7 A4 1F 84 60 08"), + # rx + ( + "E0 FE 50 00 14 F7 BE FA EC F4 65 C7 24 7F 2B 8A F9 90 CF DA 3D B2 83 09 7E BC 6F 4B 9D 95 56 A0 DF A5 46 DB 28 77 5D DC 9C 64 42 DB BA BE 47 79 71 35 21 F7 A3 83 3F 0A B7 DC B3 C5 92 06 F6 E9 EB 47 76 1E C9 28 BF 27 54 EE 41 DD A3 84 C3 2A" + ), + # tx + ("E0 FE 11 00 14 F9 3F 57 79 71 8F C8 26 F8 10 01 07"), + # rx + ( + "E0 FE 4C 00 14 F9 2F 4A 79 71 8F C8 F7 A3 83 3F 0A B7 DC B3 C5 92 06 F6 E9 EB 47 76 1E C9 28 BF 27 54 EE 41 DD D3 AB B4 D0 88 BB B3 EE 36 9B E2 17 50 FD 2D 15 F3 05 D7 2F B5 59 19 60 FC 6B F3 CC 76 B8 8E 81 F7 D4 49 5D 10 7E 28" + ), + # from SP6000+ + ( + "e0 fe 2e 00 00 78 04 c1 92 06 f6 e9 eb 47 76 1e c9 28 bf 27 54 ee 41 dd d3 ab b4 d0 88 bb b3 ee 36 9b e2 17 50 fd 2d 15 f3 05 20 6b 7f 16" + ), + ], +) +def test_encrypted_parse(payload_hex: str): + payload = bytes.fromhex(payload_hex) + data = Encrypted.parse(payload) + assert data.fields.value.data == payload[5:-2] diff --git a/tests/connection/test_serial_encryption.py b/tests/connection/test_serial_encryption.py new file mode 100644 index 00000000..f21eb025 --- /dev/null +++ b/tests/connection/test_serial_encryption.py @@ -0,0 +1,116 @@ +from paradox.connections.serial_encryption import make_serial_key +from paradox.lib.crypto import decrypt_serial_message, encrypt_serial_message + +# PC password "1234" padded to 32 bytes with 0xEE +PC_PASSWORD = b"1234" + + +def test_roundtrip_37byte_message(): + """Encrypting and decrypting a 37-byte message recovers the original.""" + payload = bytes([0x72] + [0x00] * 35 + [0x72]) # InitiateCommunication + frame = encrypt_serial_message(payload, PC_PASSWORD) + recovered = decrypt_serial_message(frame, PC_PASSWORD) + assert recovered == payload + + +def test_roundtrip_8byte_message(): + """Encrypting and decrypting an 8-byte message recovers the original.""" + payload = bytes([0x50, 0x08, 0x00, 0x00, 0x00, 0x00, 0x40, 0x98]) + frame = encrypt_serial_message(payload, PC_PASSWORD) + recovered = decrypt_serial_message(frame, PC_PASSWORD) + assert recovered == payload + + +def test_encrypted_frame_starts_with_e0_fe(): + """E0 FE frames must start with 0xE0|cmd_nibble and 0xFE.""" + payload = bytes([0x72] + [0x00] * 35 + [0x72]) + frame = encrypt_serial_message(payload, PC_PASSWORD) + assert frame[0] >> 4 == 0xE + assert frame[1] == 0xFE + + +def test_encrypted_frame_checksum(): + """Checksum byte must equal sum of all preceding bytes mod 256.""" + payload = bytes([0x72] + [0x00] * 35 + [0x72]) + frame = encrypt_serial_message(payload, PC_PASSWORD) + expected = sum(frame[:-1]) % 256 + assert frame[-1] == expected + + +def test_decrypt_empty_returns_empty(): + assert decrypt_serial_message(b"", PC_PASSWORD) == b"" + + +def test_decrypt_short_frame_returns_empty(): + assert decrypt_serial_message(b"\xe0\xfe\x00", PC_PASSWORD) == b"" + + +def test_encrypted_frame_size(): + """Frame length must be 2 (header) + ceil(payload/16)*16 (AES) + 1 (checksum).""" + payload = bytes([0x72] + [0x00] * 35 + [0x72]) # 37 bytes → 3 blocks = 48 AES bytes + frame = encrypt_serial_message(payload, PC_PASSWORD) + expected_aes_bytes = ((len(payload) + 15) // 16) * 16 + assert len(frame) == 2 + expected_aes_bytes + 1 + + +# ── BabyWare compact frames from PR #337 (EVO192 v7.70) ───────────────────── +# decrypt_serial_message must return b"" for all BabyWare compact frames, +# even those large enough that the old code would attempt AES decryption. + +_PR337_BABYWARE_FRAMES = [ + # 13-byte TX (already rejected by old code — regression) + bytes.fromhex("E0FE0D0001A55FA8E417009304"), + # 27-byte TX (old code returned 16B garbage) + bytes.fromhex("E0FE1B0000B3FEB64AFF82D038F4157335C5BEB5591970FC00FA0C"), + # 36-byte RX (old code returned 32B garbage) + bytes.fromhex( + "E0FE2400052BFDCC0E388E81F7DC0DFC06BE6EE3" "4E2988DD57DD959BB8470AAF9300CC10" + ), + # 47-byte TX (old code returned 32B garbage) + bytes.fromhex( + "E0FE2F00002BA0369BE25752FD2D15F305D72FB5" + "591960FC6BF3CC76B88E81F7D449FC06BE6EE34E" + "2988DD5700B316" + ), + # 50-byte RX (main regression: old code returned 32B garbage) + bytes.fromhex( + "E0FE320001A5EFACC53587247F2FAADB054142" + "7D2003497ABE670B8D1516A4D369136BA83259" + "DE9424525BFABA4571003A14" + ), +] + + +def test_decrypt_returns_empty_for_babyware_compact_frames(): + """BabyWare compact E0 FE frames from PR #337 must return b'' (never garbage).""" + key = b"0000" + b"\xee" * 28 + for frame in _PR337_BABYWARE_FRAMES: + result = decrypt_serial_message(frame, key) + assert ( + result == b"" + ), f"{len(frame)}-byte BabyWare compact frame: expected b'' but got {len(result)}B" + + +# ── make_serial_key: integer password zero-padding ────────────────────────── + + +def test_make_serial_key_int_zero_pads_to_4_digits(): + """Integer passwords must be zero-padded to 4 digits before encoding.""" + assert make_serial_key(0)[:4] == b"0000" + assert make_serial_key(1)[:4] == b"0001" + assert make_serial_key(100)[:4] == b"0100" + assert make_serial_key(1234)[:4] == b"1234" + + +def test_make_serial_key_string_unchanged(): + """String passwords are encoded as-is.""" + assert make_serial_key("1234")[:4] == b"1234" + assert make_serial_key("abcd")[:4] == b"abcd" + + +def test_make_serial_key_always_32_bytes(): + """Key is always padded to exactly 32 bytes with 0xEE.""" + for pw in [0, 1, 1234, "1234", b"1234"]: + key = make_serial_key(pw) + assert len(key) == 32 + assert key[4:] == b"\xee" * 28 diff --git a/tests/connection/test_serial_protocol.py b/tests/connection/test_serial_protocol.py index 91953c5b..77a2e2cf 100644 --- a/tests/connection/test_serial_protocol.py +++ b/tests/connection/test_serial_protocol.py @@ -1,7 +1,7 @@ import binascii - from unittest.mock import MagicMock, call +from paradox.config import config as cfg from paradox.connections.serial_connection import SerialConnectionProtocol @@ -162,3 +162,185 @@ def test_evo_ram_reading(): cp.data_received(payload) handler.on_message.assert_called_with(payload) + + +# ── Encrypted (E0 FE) framing tests ───────────────────────────────────────── + + +def test_encrypted_message_13bytes_calls_handler(): + """A 13-byte E0 FE message must be framed by length field and delivered.""" + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + + # TX from BabyWare capture: E0 FE 0D 00 14 67 76 68 99 28 00 05 04 + payload = bytes.fromhex("E0FE0D001467766899280005" + "04") + cp.data_received(payload) + + handler.on_message.assert_called_once_with(payload) + + +def test_encrypted_message_12bytes_calls_handler(): + """A 12-byte E0 FE response message must be framed correctly.""" + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + + # RX from BabyWare capture: E0 FE 0C 00 14 67 36 1E A9 00 62 03 + payload = bytes.fromhex("E0FE0C001467361EA9006203") + cp.data_received(payload) + + handler.on_message.assert_called_once_with(payload) + + +def test_encrypted_message_in_chunks(): + """E0 FE frame must be reassembled correctly when delivered in chunks.""" + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + + payload = bytes.fromhex("E0FE0D001467766899280005" + "04") + for byte in payload: + cp.data_received(bytes([byte])) + + handler.on_message.assert_called_once_with(payload) + + +def test_encrypted_then_normal_message(): + """An E0 FE frame followed immediately by a normal message must both be delivered.""" + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + + enc_msg = bytes.fromhex("E0FE0C001467361EA9006203") + norm_msg = binascii.unhexlify("120600000018") + + cp.data_received(enc_msg + norm_msg) + + assert handler.on_message.call_count == 2 + handler.on_message.assert_any_call(enc_msg) + handler.on_message.assert_any_call(norm_msg) + + +def test_encrypted_message_46bytes(): + """A 46-byte E0 FE message from the test captures must be framed correctly.""" + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + + # From test_encryption.py - EVO192 7.50.000+ tx capture + payload = bytes.fromhex( + "E0FE2E0012C5CA4AB7DCB3C59206F6E9EB47761EC928BF2754EE41DDD3ABB4D088BBB3EE369BE21750FD52CC9119" + ) + cp.data_received(payload) + + handler.on_message.assert_called_once_with(payload) + + +def test_encrypted_message_27bytes_calls_handler(): + """A 27-byte BabyWare compact E0 FE frame (from PR #337 TX) must be framed by length byte.""" + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + + payload = bytes.fromhex("E0FE1B0000B3FEB64AFF82D038F4157335C5BEB5591970FC00FA0C") + cp.data_received(payload) + + handler.on_message.assert_called_once_with(payload) + + +def test_encrypted_message_50bytes_calls_handler(): + """A 50-byte BabyWare compact E0 FE frame (from PR #337 RX) must be framed by length byte.""" + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + + payload = bytes.fromhex( + "E0FE320001A5EFACC53587247F2FAADB054142" + "7D2003497ABE670B8D1516A4D369136BA83259" + "DE9424525BFABA4571003A14" + ) + cp.data_received(payload) + + handler.on_message.assert_called_once_with(payload) + + +def test_pr337_sequence_babyware_compact_and_unencrypted(): + """Mixed sequence from PR #337: BabyWare compact followed by normal unencrypted frames.""" + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + + # BabyWare compact TX (13 bytes) + RX (50 bytes) from the PR + compact_tx = bytes.fromhex("E0FE0D0001A55FA8E417009304") + compact_rx = bytes.fromhex( + "E0FE320001A5EFACC53587247F2FAADB054142" + "7D2003497ABE670B8D1516A4D369136BA83259" + "DE9424525BFABA4571003A14" + ) + # InitiateCommunicationResponse (37 bytes, unencrypted, from PR #337) + init_comm_rsp = bytes.fromhex( + "72FF04020000A15A01077001050B20D4" + "001001000F2710201031FF5745564F31" + "3932000083" + ) + + cp.data_received(compact_tx + compact_rx + init_comm_rsp) + + assert handler.on_message.call_count == 3 + handler.on_message.assert_any_call(compact_tx) + handler.on_message.assert_any_call(compact_rx) + handler.on_message.assert_any_call(init_comm_rsp) + + +# ── Encrypted mode integration tests ──────────────────────────────────────── + + +def test_encrypted_mode_outgoing_wraps_in_e0fe(mocker): + """When SERIAL_ENCRYPTED=True, send_message must write an E0 FE frame.""" + mocker.patch.object(cfg, "SERIAL_ENCRYPTED", True) + mocker.patch.object(cfg, "PASSWORD", "1234") + # Avoid asyncio.get_running_loop() in the base class connection_made + from paradox.connections.protocols import ConnectionProtocol + + mocker.patch.object(ConnectionProtocol, "connection_made") + + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + raw_transport = MagicMock() + cp.connection_made(raw_transport) + + # Manually populate the fields that the base connection_made would have set, + # so that check_active() passes. + from paradox.connections.serial_encryption import ( + EncryptedSerialTransport, + make_serial_key, + ) + + key = make_serial_key("1234") + cp.transport = EncryptedSerialTransport(raw_transport, key) + cp._closed = MagicMock() + cp._closed.done.return_value = False + + payload = bytes([0x72] + [0x00] * 35 + [0x72]) + cp.send_message(payload) + + written = raw_transport.write.call_args[0][0] + assert written[0] >> 4 == 0xE + assert written[1] == 0xFE + + +def test_encrypted_mode_incoming_e0fe_is_decrypted(mocker): + """When SERIAL_ENCRYPTED=True, incoming E0 FE AES-256 frames are decrypted.""" + from paradox.lib.crypto import encrypt_serial_message + + mocker.patch.object(cfg, "SERIAL_ENCRYPTED", True) + mocker.patch.object(cfg, "PASSWORD", "1234") + from paradox.connections.protocols import ConnectionProtocol + + mocker.patch.object(ConnectionProtocol, "connection_made") + + handler = MagicMock() + cp = SerialConnectionProtocol(handler) + transport = MagicMock() + cp.connection_made(transport) + + payload = bytes([0x72] + [0x00] * 35 + [0x72]) + key = b"1234" + b"\xee" * 28 + frame = encrypt_serial_message(payload, key) + + cp.data_received(frame) + + handler.on_message.assert_called_once_with(payload) diff --git a/tests/lib/test_async_message_manager.py b/tests/lib/test_async_message_manager.py index dc132204..60321b4a 100644 --- a/tests/lib/test_async_message_manager.py +++ b/tests/lib/test_async_message_manager.py @@ -1,10 +1,9 @@ import asyncio -import pytest from construct import Container +import pytest -from paradox.lib.async_message_manager import (AsyncMessageManager, - EventMessageHandler) +from paradox.lib.async_message_manager import AsyncMessageManager, EventMessageHandler from paradox.lib.handlers import PersistentHandler @@ -89,7 +88,11 @@ async def test_wait_for_message(mocker): @pytest.mark.asyncio async def test_handler_exception(mocker): msg = Container( - fields=Container(value=Container(po=Container(command=0xE), event_source=0xFF)) + fields=Container( + value=Container( + po=Container(command=0xE), event_source=0xFF, event=Container() + ) + ) ) mm = AsyncMessageManager() @@ -98,7 +101,7 @@ async def test_handler_exception(mocker): ) mm.register_handler(eh) - with pytest.raises(Exception): + with pytest.raises(Exception, match="screw it"): await mm.schedule_message_handling(msg) assert len(mm.handler_registry) == 1