From a6b5f59fff8de05c354bbd56da48ddeca382f50f Mon Sep 17 00:00:00 2001 From: Cameron Waldron <1000178+CamW@users.noreply.github.com> Date: Mon, 9 Mar 2026 09:53:52 +0200 Subject: [PATCH 1/7] Add Combus module PGM control (MODULE_PGM_ADDRESSES) Implements control of PGMs on Paradox expansion modules. (PGM4, zone extenders with PGM outputs, etc.) using the 0xA4 command class. Paradox handles "Module" PGMs differently to "Panel" PGMs which are already supported in pai. Protocol: - TX: A4 17 00 00 00 00*12 - RX: A2 07 00 00 00 - Module address = Babyware enrollment slot index (1-254) - No RAM status block; state tracked optimistically after each command Changes: - parsers.py: PerformModulePGMAction + PerformModulePGMActionResponse structs, MODULE_PGM_OUTPUTS_PER_MODULE constant - panel.py (evo): parse_message dispatch for 0xA4/0xA, control_module_pgm_outputs() - panel.py (base): abstract control_module_pgm_outputs stub - config.py: MODULE_PGM_ADDRESSES dict config (address -> pgm_count) - paradox.py: _init_module_pgms(), extended control_output() for module_pgm container - helpers.py: module_pgm topic mapping - homeassistant.py: _publish_module_pgm_configs() for HA autodiscovery - switch.py: ModulePGMSwitch with on_override/off_override payloads - factory.py: make_module_pgm_switch() - test_module_pgm_action.py: 16 tests covering build and parse against captured frames --- paradox/config.py | 1 + paradox/hardware/evo/panel.py | 30 +++ paradox/hardware/evo/parsers.py | 36 ++++ paradox/hardware/panel.py | 4 + paradox/interfaces/mqtt/entities/factory.py | 5 +- paradox/interfaces/mqtt/entities/switch.py | 20 ++ paradox/interfaces/mqtt/helpers.py | 1 + paradox/interfaces/mqtt/homeassistant.py | 10 + paradox/paradox.py | 73 ++++--- tests/hardware/evo/test_module_pgm_action.py | 194 +++++++++++++++++++ tests/hardware/evo/test_pgm.py | 4 +- 11 files changed, 353 insertions(+), 25 deletions(-) create mode 100644 tests/hardware/evo/test_module_pgm_action.py diff --git a/paradox/config.py b/paradox/config.py index 92255287..9154ff99 100644 --- a/paradox/config.py +++ b/paradox/config.py @@ -57,6 +57,7 @@ class Config: "KEEP_ALIVE_INTERVAL": 10, # Interval between status updates "IO_TIMEOUT": 0.5, # Timeout for IO operations "LIMITS": {}, # By default all zones will be monitored + "MODULE_PGM_ADDRESSES": ({}, dict, None), # Map of bus module address -> pgm count, e.g. {4: 4} "LABEL_ENCODING": "paradox-en", # Encoding to use when decoding labels. paradox-* or https://docs.python.org/3/library/codecs.html#standard-encodings "LABEL_REFRESH_INTERVAL": ( 15 * 60, diff --git a/paradox/hardware/evo/panel.py b/paradox/hardware/evo/panel.py index c94534a2..159e1d92 100644 --- a/paradox/hardware/evo/panel.py +++ b/paradox/hardware/evo/panel.py @@ -128,6 +128,8 @@ def parse_message( return parsers.PerformPartitionAction.parse(message) elif message[0] == 0xD0: return parsers.PerformZoneAction.parse(message) + elif message[0] == 0xA4: + return parsers.PerformModulePGMAction.parse(message) else: if message[0] >> 4 == 0x7: return parsers.ErrorMessage.parse(message) @@ -139,6 +141,8 @@ def parse_message( return parsers.PerformActionResponse.parse(message) elif message[0] >> 4 == 0xD: return parsers.PerformZoneActionResponse.parse(message) + elif message[0] >> 4 == 0xA: + return parsers.PerformModulePGMActionResponse.parse(message) # elif message[0] == 0x50 and message[2] == 0x80: # return PanelStatus.parse(message) # elif message[0] == 0x50 and message[2] < 0x80: @@ -304,6 +308,32 @@ async def control_outputs(self, outputs, command) -> bool: logger.info('PGM command: "%s" failed' % command) return reply is not None + async def control_module_pgm_outputs(self, module_address: int, pgm_index: int, command: str) -> bool: + """ + Control PGM module outputs + :param int module_address: bus address of the PGM module + :param int pgm_index: 1-4 index of the PGM output + :param str command: textual command + :return: True if accepted + """ + pgm_commands = ["release"] * parsers.MODULE_PGM_OUTPUTS_PER_MODULE + pgm_commands[pgm_index - 1] = command + + args = {"module_address": module_address, "pgm_commands": pgm_commands} + try: + reply = await self.core.send_wait( + parsers.PerformModulePGMAction, args, reply_expected=0xA + ) + except MappingError: + logger.error('Module PGM command: "%s" is not supported' % command) + return False + + if reply: + logger.info('Module PGM command: "%s" succeeded' % command) + else: + logger.info('Module PGM command: "%s" failed' % command) + return reply is not None + async def control_doors(self, doors, command) -> bool: """ Control Doors diff --git a/paradox/hardware/evo/parsers.py b/paradox/hardware/evo/parsers.py index f498d962..6a7e95cc 100644 --- a/paradox/hardware/evo/parsers.py +++ b/paradox/hardware/evo/parsers.py @@ -823,6 +823,42 @@ def _parse(self, stream, context, path): "checksum" / PacketChecksum(Bytes(1)), ) +MODULE_PGM_OUTPUTS_PER_MODULE = 4 + +PerformModulePGMAction = Struct( + "fields" + / RawCopy( + Struct( + "po" / Struct("command" / Const(0xA4, Int8ub)), + "packet_length" / PacketLength(Int8ub), + "_not_used0" / Padding(1), + "module_address" / Default(Int8ub, 0), + "_not_used1" / Padding(2), + "pgm_commands" / Array(MODULE_PGM_OUTPUTS_PER_MODULE, Default(_PGMCommandEnum, "release")), + "_not_used2" / Padding(12), + ) + ), + "checksum" / PacketChecksum(Bytes(1)), +) + +PerformModulePGMActionResponse = Struct( + "fields" + / RawCopy( + Struct( + "po" + / BitStruct( + "command" / Const(0xA, Nibble), + "_not_used" / Nibble, + ), + "packet_length" / PacketLength(Int8ub), + "_not_used0" / Padding(1), + "module_address" / Int8ub, + "_not_used1" / Padding(2), + ) + ), + "checksum" / PacketChecksum(Bytes(1)), +) + _PGMBroadcastCommandEnum = Enum( Int8ub, no_change=0, diff --git a/paradox/hardware/panel.py b/paradox/hardware/panel.py index 3477d117..f2c603ba 100644 --- a/paradox/hardware/panel.py +++ b/paradox/hardware/panel.py @@ -357,6 +357,10 @@ def control_partitions(self, partitions, command) -> bool: def control_outputs(self, outputs, command) -> bool: raise NotImplementedError("override control_outputs in a subclass") + @abstractmethod + def control_module_pgm_outputs(self, module_address, pgm_index, command) -> bool: + raise NotImplementedError("override control_module_pgm_outputs in a subclass") + @abstractmethod def control_doors(self, doors, command) -> bool: raise NotImplementedError("override control_doors in a subclass") diff --git a/paradox/interfaces/mqtt/entities/factory.py b/paradox/interfaces/mqtt/entities/factory.py index fa674db9..f9296db0 100644 --- a/paradox/interfaces/mqtt/entities/factory.py +++ b/paradox/interfaces/mqtt/entities/factory.py @@ -2,7 +2,7 @@ from paradox.interfaces.mqtt.entities.binary_sensors import ZoneStatusBinarySensor, \ SystemBinarySensor, PartitionBinarySensor from paradox.interfaces.mqtt.entities.sensor import PAIStatusSensor, SystemStatusSensor, ZoneNumericSensor -from paradox.interfaces.mqtt.entities.switch import ZoneBypassSwitch, PGMSwitch +from paradox.interfaces.mqtt.entities.switch import ZoneBypassSwitch, PGMSwitch, ModulePGMSwitch class MQTTAutodiscoveryEntityFactory: @@ -34,6 +34,9 @@ def make_zone_status_numeric_sensor(self, zone, status): def make_pgm_switch(self, pgm): return PGMSwitch(pgm, self.device, self.availability_topic) + def make_module_pgm_switch(self, module_pgm): + return ModulePGMSwitch(module_pgm, self.device, self.availability_topic) + def make_system_status(self, system_key, status): if system_key == 'troubles': return SystemBinarySensor(system_key, status, self.device, self.availability_topic) diff --git a/paradox/interfaces/mqtt/entities/switch.py b/paradox/interfaces/mqtt/entities/switch.py index b8246e89..51c77b35 100644 --- a/paradox/interfaces/mqtt/entities/switch.py +++ b/paradox/interfaces/mqtt/entities/switch.py @@ -47,3 +47,23 @@ def __init__(self, pgm, device, availability_topic: str): self.property = "on" self.pai_entity_type = "pgm" + + +class ModulePGMSwitch(Switch): + def __init__(self, module_pgm, device, availability_topic: str): + super().__init__(device, availability_topic) + self.module_pgm = module_pgm + + self.key = sanitize_key(module_pgm["key"]) + self.label = module_pgm["label"] + self.property = "on" + + self.pai_entity_type = "pgm" + + def serialize(self): + config = super().serialize() + config.update(dict( + payload_on="on_override", + payload_off="off_override", + )) + return config diff --git a/paradox/interfaces/mqtt/helpers.py b/paradox/interfaces/mqtt/helpers.py index cc280e31..59265efe 100644 --- a/paradox/interfaces/mqtt/helpers.py +++ b/paradox/interfaces/mqtt/helpers.py @@ -5,6 +5,7 @@ zone=cfg.MQTT_ZONE_TOPIC, output=cfg.MQTT_OUTPUT_TOPIC, pgm=cfg.MQTT_OUTPUT_TOPIC, + module_pgm=cfg.MQTT_OUTPUT_TOPIC, repeater=cfg.MQTT_REPEATER_TOPIC, bus=cfg.MQTT_BUS_TOPIC, module=cfg.MQTT_MODULE_TOPIC, diff --git a/paradox/interfaces/mqtt/homeassistant.py b/paradox/interfaces/mqtt/homeassistant.py index b325c6f8..a57f7d70 100644 --- a/paradox/interfaces/mqtt/homeassistant.py +++ b/paradox/interfaces/mqtt/homeassistant.py @@ -25,6 +25,7 @@ def __init__(self, alarm): self.partitions = {} self.zones = {} self.pgms = {} + self.module_pgms = {} self.entity_factory = MQTTAutodiscoveryEntityFactory( self.mqtt.availability_topic @@ -69,6 +70,7 @@ def _handle_labels_loaded(self, data): self.zones = data.get("zone", {}) self.pgms = data.get("pgm", {}) + self.module_pgms = data.get("module_pgm", {}) def _publish_when_ready(self, panel: DetectedPanel, status): self.entity_factory.set_device(Device(panel)) @@ -80,6 +82,9 @@ def _publish_when_ready(self, panel: DetectedPanel, status): self._publish_zone_configs(status["zone"]) if "pgm" in status: self._publish_pgm_configs(status["pgm"]) + module_pgms = dict(self.alarm.storage.get_container("module_pgm")) + if module_pgms: + self._publish_module_pgm_configs(module_pgms) if "system" in status: self._publish_system_property_configs(status["system"]) @@ -162,6 +167,11 @@ def _publish_pgm_configs(self, pgm_statuses): pgm_switch_config = self.entity_factory.make_pgm_switch(pgm) self._publish_config(pgm_switch_config) + def _publish_module_pgm_configs(self, module_pgms): + for module_pgm_key, module_pgm in module_pgms.items(): + module_pgm_switch_config = self.entity_factory.make_module_pgm_switch(module_pgm) + self._publish_config(module_pgm_switch_config) + def _publish_system_property_configs(self, system_statuses): for system_key, system_status in system_statuses.items(): for property_name in system_status: diff --git a/paradox/paradox.py b/paradox/paradox.py index 633bba15..ff8a844b 100644 --- a/paradox/paradox.py +++ b/paradox/paradox.py @@ -216,6 +216,7 @@ async def full_connect(self) -> bool: logger.info("Loading data from panel memory") await self.panel.load_memory() + self._init_module_pgms() logger.info("Running") self.run_state = RunState.RUN @@ -489,33 +490,61 @@ async def control_partition(self, partition: str, command: str) -> bool: return accepted + def _init_module_pgms(self): + for addr, pgm_count in cfg.MODULE_PGM_ADDRESSES.items(): + for pgm_index in range(1, pgm_count + 1): + key = f"module{addr}_pgm{pgm_index}" + self.storage.get_container("module_pgm")[key] = { + "id": pgm_index, + "key": key, + "label": f"Module {addr} PGM {pgm_index}", + "module_address": addr, + "pgm_index": pgm_index, + } + self.storage.update_container_object("module_pgm", key, {"on": False}) + async def control_output(self, output, command) -> bool: command = command.lower() logger.debug(f"Control Output: {output} - {command}") outputs_selected = self.storage.get_container("pgm").select(output) - - # Not Found - if len(outputs_selected) == 0: - logger.error("No outputs selected") - return False - - # Apply state changes - accepted = False - try: - accepted = await self.panel.control_outputs(outputs_selected, command) - except NotImplementedError: - logger.error("control_output is not implemented for this alarm type") - except asyncio.CancelledError: - logger.error("control_output canceled") - except asyncio.TimeoutError: - logger.error("control_output timeout") - # Apply state changes - - # Refresh status - self.request_status_refresh() # Trigger status update - - return accepted + if outputs_selected: + accepted = False + try: + accepted = await self.panel.control_outputs(outputs_selected, command) + except NotImplementedError: + logger.error("control_output is not implemented for this alarm type") + except asyncio.CancelledError: + logger.error("control_output canceled") + except asyncio.TimeoutError: + logger.error("control_output timeout") + self.request_status_refresh() + return accepted + + module_pgm_selected = self.storage.get_container("module_pgm").select(output) + if module_pgm_selected: + accepted = False + module_pgm_container = self.storage.get_container("module_pgm") + for key in module_pgm_selected: + out = module_pgm_container[key] + try: + accepted = await self.panel.control_module_pgm_outputs( + out["module_address"], out["pgm_index"], command + ) + except NotImplementedError: + logger.error("control_module_pgm_outputs is not implemented for this alarm type") + except asyncio.CancelledError: + logger.error("control_output canceled") + except asyncio.TimeoutError: + logger.error("control_output timeout") + if accepted: + is_on = command in ("on", "on_override") + self.storage.update_container_object("module_pgm", out["key"], {"on": is_on}) + self.request_status_refresh() + return accepted + + logger.error("No outputs selected") + return False async def send_panic(self, partition_id, panic_type, user_id) -> bool: logger.debug( diff --git a/tests/hardware/evo/test_module_pgm_action.py b/tests/hardware/evo/test_module_pgm_action.py new file mode 100644 index 00000000..671e20d5 --- /dev/null +++ b/tests/hardware/evo/test_module_pgm_action.py @@ -0,0 +1,194 @@ +""" +Tests for PerformModulePGMAction (build) and PerformModulePGMActionResponse (parse). + +PerformModulePGMAction layout (build, PAI -> panel): + Offset Size Field + 0 1 po: 0xA4 (command byte) + 1 1 packet_length = 23 (0x17) + 2 1 _not_used0 + 3 1 module_address + 4 2 _not_used1 + 6 1 pgm1_command (_PGMCommandEnum) + 7 1 pgm2_command + 8 1 pgm3_command + 9 1 module_pgm_command + 10 12 _not_used2 + 22 1 checksum + Total = 23 bytes + +PerformModulePGMActionResponse layout (parse, panel -> PAI): + Offset Size Field + 0 1 po: high nibble = 0xA, low nibble = status flags + 1 1 packet_length = 7 + 2 1 _not_used0 + 3 1 module_address + 4 2 _not_used1 + 6 1 checksum + Total = 7 bytes +""" + +from binascii import unhexlify + +from paradox.hardware.evo.parsers import PerformModulePGMAction, PerformModulePGMActionResponse + + +def _checksum(data: bytes) -> int: + return sum(data) % 256 + + +def _build(module_address, pgm_commands): + return PerformModulePGMAction.build( + {"fields": {"value": {"module_address": module_address, "pgm_commands": pgm_commands}}} + ) + + +# --------------------------------------------------------------------------- +# PerformModulePGMAction — BUILD tests +# --------------------------------------------------------------------------- + + +def test_build_trigger_pgm1_pulse_on(): + """Trigger PGM 1 (pulse on): A4 17 00 04 00 00 04 00 00 00 ... C3""" + raw = _build(4, ["on_override", "release", "release", "release"]) + + assert raw == unhexlify("a4170004000004000000000000000000000000000000c3") + + +def test_build_release_pgm1_pulse_off(): + """Release PGM 1 (pulse off): A4 17 00 04 00 00 02 00 00 00 ... C1""" + raw = _build(4, ["off_override", "release", "release", "release"]) + + assert raw == unhexlify("a4170004000002000000000000000000000000000000c1") + + +def test_build_activate_pgm3_steady_on(): + """Activate PGM 3 steady on: A4 17 00 04 00 00 00 00 03 00 ... C2""" + raw = _build(4, ["release", "release", "on", "release"]) + + assert raw == unhexlify("a4170004000000000300000000000000000000000000c2") + + +def test_build_trigger_module_pgm_pulse_on(): + """Trigger PGM 4 (pulse on): A4 17 00 04 00 00 00 00 00 04 ... C3""" + raw = _build(4, ["release", "release", "release", "on_override"]) + + assert raw == unhexlify("a4170004000000000004000000000000000000000000c3") + + +def test_build_deactivate_pgm1_steady_off(): + """Deactivate PGM 1 steady off (off=1): byte 6 = 0x01, checksum = 0xC0""" + raw = _build(4, ["off", "release", "release", "release"]) + + assert raw[0] == 0xA4 + assert raw[6] == 0x01 # off + assert raw[7] == 0x00 + assert raw[8] == 0x00 + assert raw[9] == 0x00 + assert raw[-1] == _checksum(raw[:-1]) + + +def test_build_activate_pgm2_steady_on(): + """Activate PGM 2 steady on (on=3): byte 7 = 0x03""" + raw = _build(4, ["release", "on", "release", "release"]) + + assert raw[6] == 0x00 + assert raw[7] == 0x03 # on + assert raw[8] == 0x00 + assert raw[9] == 0x00 + assert raw[-1] == _checksum(raw[:-1]) + + +def test_build_packet_length_always_23(): + """Packet length byte is always 23 (0x17) regardless of command.""" + for pgm_idx in range(4): + commands = ["release"] * 4 + commands[pgm_idx] = "on" + raw = _build(4, commands) + assert len(raw) == 23 + assert raw[1] == 0x17 + + +def test_build_command_byte_is_0xa4(): + """First byte is always 0xA4.""" + raw = _build(4, ["on", "release", "release", "release"]) + assert raw[0] == 0xA4 + + +def test_build_module_address_at_byte3(): + """Module address is encoded at byte 3.""" + raw = _build(4, ["on", "release", "release", "release"]) + assert raw[3] == 0x04 + + raw2 = _build(7, ["on", "release", "release", "release"]) + assert raw2[3] == 0x07 + + +def test_build_non_command_bytes_are_zero(): + """Bytes 2, 4, 5 and the 12 trailing padding bytes are always zero.""" + raw = _build(4, ["on", "release", "release", "release"]) + assert raw[2] == 0x00 # _not_used0 + assert raw[4:6] == b"\x00\x00" # _not_used1 + assert raw[10:22] == b"\x00" * 12 # _not_used2 + + +def test_build_only_target_pgm_byte_nonzero(): + """Only the byte for the targeted PGM output is non-zero.""" + for pgm_idx in range(4): + commands = ["release"] * 4 + commands[pgm_idx] = "on" + raw = _build(4, commands) + for i, cmd_byte in enumerate(raw[6:10]): + if i == pgm_idx: + assert cmd_byte == 3 # on + else: + assert cmd_byte == 0 # release + + +def test_build_checksum_correct(): + """Checksum is sum of all preceding bytes mod 256.""" + raw = _build(4, ["on_override", "release", "release", "release"]) + assert raw[-1] == _checksum(raw[:-1]) + + +# --------------------------------------------------------------------------- +# PerformModulePGMActionResponse — PARSE tests +# --------------------------------------------------------------------------- + + +def test_parse_response_known_ack(): + """Parse the known panel ACK: A2 07 00 04 00 00 AD""" + raw = unhexlify("a20700040000ad") + parsed = PerformModulePGMActionResponse.parse(raw) + + assert parsed.fields.value.po.command == 0xA + assert parsed.fields.value.packet_length == 7 + assert parsed.fields.value.module_address == 4 + + +def test_parse_response_po_command_is_nibble_0xa(): + """po.command (high nibble) is always 0xA regardless of low nibble.""" + # Low nibble 2 = WinLoad connected flag + raw = unhexlify("a20700040000ad") + parsed = PerformModulePGMActionResponse.parse(raw) + assert parsed.fields.value.po.command == 0xA + + +def test_parse_response_different_module_address(): + """Parse response for a module at address 7.""" + addr = 7 + body = bytes([0xA2, 0x07, 0x00, addr, 0x00, 0x00]) + cs = _checksum(body) + raw = body + bytes([cs]) + + parsed = PerformModulePGMActionResponse.parse(raw) + assert parsed.fields.value.module_address == addr + + +def test_parse_response_reply_expected_match(): + """po.command == 0xA satisfies the reply_expected=0xA check used in send_wait.""" + raw = unhexlify("a20700040000ad") + parsed = PerformModulePGMActionResponse.parse(raw) + + # This is the exact lambda used in send_wait + reply_expected = 0xA + assert parsed.fields.value.po.command == reply_expected diff --git a/tests/hardware/evo/test_pgm.py b/tests/hardware/evo/test_pgm.py index d8b681c1..ac35319c 100644 --- a/tests/hardware/evo/test_pgm.py +++ b/tests/hardware/evo/test_pgm.py @@ -50,7 +50,7 @@ def test_pgm3_deactivate_and_monitor(): assert a == expected_out -def test_pgm4_activate_and_monitor(): +def test_module_pgm_activate_and_monitor(): expected_out = unhexlify("40130600000008000000000000000300000064") pgms = [4] @@ -60,7 +60,7 @@ def test_pgm4_activate_and_monitor(): assert a == expected_out -def test_pgm4_deactivate_and_monitor(): +def test_module_pgm_deactivate_and_monitor(): expected_out = unhexlify("40130600000008000000000000000100000062") pgms = [4] From 9c22cab4a94ec28017c9ddd1452af8f43efeee90 Mon Sep 17 00:00:00 2001 From: Cameron Waldron <1000178+CamW@users.noreply.github.com> Date: Fri, 13 Mar 2026 08:56:46 +0200 Subject: [PATCH 2/7] Re-raise CancelledError after logging in control_output --- paradox/paradox.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paradox/paradox.py b/paradox/paradox.py index ff8a844b..a074f05d 100644 --- a/paradox/paradox.py +++ b/paradox/paradox.py @@ -516,6 +516,7 @@ async def control_output(self, output, command) -> bool: logger.error("control_output is not implemented for this alarm type") except asyncio.CancelledError: logger.error("control_output canceled") + raise except asyncio.TimeoutError: logger.error("control_output timeout") self.request_status_refresh() @@ -535,6 +536,7 @@ async def control_output(self, output, command) -> bool: logger.error("control_module_pgm_outputs is not implemented for this alarm type") except asyncio.CancelledError: logger.error("control_output canceled") + raise except asyncio.TimeoutError: logger.error("control_output timeout") if accepted: From 5fd3f0e25749a9ae8cb54a70df92379ea57c1083 Mon Sep 17 00:00:00 2001 From: Cameron Waldron <1000178+CamW@users.noreply.github.com> Date: Tue, 24 Mar 2026 21:23:57 +0200 Subject: [PATCH 3/7] remove unused variable Co-authored-by: Jevgeni Kiski --- paradox/interfaces/mqtt/homeassistant.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paradox/interfaces/mqtt/homeassistant.py b/paradox/interfaces/mqtt/homeassistant.py index a57f7d70..e581e3fc 100644 --- a/paradox/interfaces/mqtt/homeassistant.py +++ b/paradox/interfaces/mqtt/homeassistant.py @@ -168,7 +168,7 @@ def _publish_pgm_configs(self, pgm_statuses): self._publish_config(pgm_switch_config) def _publish_module_pgm_configs(self, module_pgms): - for module_pgm_key, module_pgm in module_pgms.items(): + for _, module_pgm in module_pgms.items(): module_pgm_switch_config = self.entity_factory.make_module_pgm_switch(module_pgm) self._publish_config(module_pgm_switch_config) From dd680c052e94475a4aa39517db4bf4358e645c51 Mon Sep 17 00:00:00 2001 From: Cameron Waldron <1000178+CamW@users.noreply.github.com> Date: Tue, 24 Mar 2026 21:33:08 +0200 Subject: [PATCH 4/7] remove pointless refresh Co-authored-by: Jevgeni Kiski --- paradox/paradox.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paradox/paradox.py b/paradox/paradox.py index a074f05d..1409ace3 100644 --- a/paradox/paradox.py +++ b/paradox/paradox.py @@ -542,7 +542,6 @@ async def control_output(self, output, command) -> bool: if accepted: is_on = command in ("on", "on_override") self.storage.update_container_object("module_pgm", out["key"], {"on": is_on}) - self.request_status_refresh() return accepted logger.error("No outputs selected") From 6741a2c88e1d64f03b1ea64563e93d3dc31c155d Mon Sep 17 00:00:00 2001 From: Cameron Waldron <1000178+CamW@users.noreply.github.com> Date: Tue, 24 Mar 2026 21:33:46 +0200 Subject: [PATCH 5/7] remove unnecessary tests Co-authored-by: Jevgeni Kiski --- tests/hardware/evo/test_module_pgm_action.py | 75 -------------------- 1 file changed, 75 deletions(-) diff --git a/tests/hardware/evo/test_module_pgm_action.py b/tests/hardware/evo/test_module_pgm_action.py index 671e20d5..96c52454 100644 --- a/tests/hardware/evo/test_module_pgm_action.py +++ b/tests/hardware/evo/test_module_pgm_action.py @@ -75,81 +75,6 @@ def test_build_trigger_module_pgm_pulse_on(): assert raw == unhexlify("a4170004000000000004000000000000000000000000c3") -def test_build_deactivate_pgm1_steady_off(): - """Deactivate PGM 1 steady off (off=1): byte 6 = 0x01, checksum = 0xC0""" - raw = _build(4, ["off", "release", "release", "release"]) - - assert raw[0] == 0xA4 - assert raw[6] == 0x01 # off - assert raw[7] == 0x00 - assert raw[8] == 0x00 - assert raw[9] == 0x00 - assert raw[-1] == _checksum(raw[:-1]) - - -def test_build_activate_pgm2_steady_on(): - """Activate PGM 2 steady on (on=3): byte 7 = 0x03""" - raw = _build(4, ["release", "on", "release", "release"]) - - assert raw[6] == 0x00 - assert raw[7] == 0x03 # on - assert raw[8] == 0x00 - assert raw[9] == 0x00 - assert raw[-1] == _checksum(raw[:-1]) - - -def test_build_packet_length_always_23(): - """Packet length byte is always 23 (0x17) regardless of command.""" - for pgm_idx in range(4): - commands = ["release"] * 4 - commands[pgm_idx] = "on" - raw = _build(4, commands) - assert len(raw) == 23 - assert raw[1] == 0x17 - - -def test_build_command_byte_is_0xa4(): - """First byte is always 0xA4.""" - raw = _build(4, ["on", "release", "release", "release"]) - assert raw[0] == 0xA4 - - -def test_build_module_address_at_byte3(): - """Module address is encoded at byte 3.""" - raw = _build(4, ["on", "release", "release", "release"]) - assert raw[3] == 0x04 - - raw2 = _build(7, ["on", "release", "release", "release"]) - assert raw2[3] == 0x07 - - -def test_build_non_command_bytes_are_zero(): - """Bytes 2, 4, 5 and the 12 trailing padding bytes are always zero.""" - raw = _build(4, ["on", "release", "release", "release"]) - assert raw[2] == 0x00 # _not_used0 - assert raw[4:6] == b"\x00\x00" # _not_used1 - assert raw[10:22] == b"\x00" * 12 # _not_used2 - - -def test_build_only_target_pgm_byte_nonzero(): - """Only the byte for the targeted PGM output is non-zero.""" - for pgm_idx in range(4): - commands = ["release"] * 4 - commands[pgm_idx] = "on" - raw = _build(4, commands) - for i, cmd_byte in enumerate(raw[6:10]): - if i == pgm_idx: - assert cmd_byte == 3 # on - else: - assert cmd_byte == 0 # release - - -def test_build_checksum_correct(): - """Checksum is sum of all preceding bytes mod 256.""" - raw = _build(4, ["on_override", "release", "release", "release"]) - assert raw[-1] == _checksum(raw[:-1]) - - # --------------------------------------------------------------------------- # PerformModulePGMActionResponse — PARSE tests # --------------------------------------------------------------------------- From 53d5eff9cf864d77f608b50763ac15ef2a55afce Mon Sep 17 00:00:00 2001 From: Cameron Waldron <1000178+CamW@users.noreply.github.com> Date: Tue, 24 Mar 2026 22:29:35 +0200 Subject: [PATCH 6/7] Review improvements --- paradox/hardware/evo/panel.py | 3 +- paradox/hardware/evo/parsers.py | 6 ++- tests/test_paradox.py | 68 +++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/paradox/hardware/evo/panel.py b/paradox/hardware/evo/panel.py index 159e1d92..10202530 100644 --- a/paradox/hardware/evo/panel.py +++ b/paradox/hardware/evo/panel.py @@ -316,7 +316,8 @@ async def control_module_pgm_outputs(self, module_address: int, pgm_index: int, :param str command: textual command :return: True if accepted """ - pgm_commands = ["release"] * parsers.MODULE_PGM_OUTPUTS_PER_MODULE + assert 1 <= pgm_index <= parsers.MODULE_PGM_PACKET_SLOTS, "pgm_index must be between 1 and %d" % parsers.MODULE_PGM_PACKET_SLOTS + pgm_commands = ["release"] * parsers.MODULE_PGM_PACKET_SLOTS pgm_commands[pgm_index - 1] = command args = {"module_address": module_address, "pgm_commands": pgm_commands} diff --git a/paradox/hardware/evo/parsers.py b/paradox/hardware/evo/parsers.py index 6a7e95cc..8df0069d 100644 --- a/paradox/hardware/evo/parsers.py +++ b/paradox/hardware/evo/parsers.py @@ -823,7 +823,9 @@ def _parse(self, stream, context, path): "checksum" / PacketChecksum(Bytes(1)), ) -MODULE_PGM_OUTPUTS_PER_MODULE = 4 +# Fixed number of PGM command slots in the 0xA4 packet — a protocol constant, +# not the number of PGMs a specific module exposes (which is configured separately). +MODULE_PGM_PACKET_SLOTS = 4 PerformModulePGMAction = Struct( "fields" @@ -834,7 +836,7 @@ def _parse(self, stream, context, path): "_not_used0" / Padding(1), "module_address" / Default(Int8ub, 0), "_not_used1" / Padding(2), - "pgm_commands" / Array(MODULE_PGM_OUTPUTS_PER_MODULE, Default(_PGMCommandEnum, "release")), + "pgm_commands" / Array(MODULE_PGM_PACKET_SLOTS, Default(_PGMCommandEnum, "release")), "_not_used2" / Padding(12), ) ), diff --git a/tests/test_paradox.py b/tests/test_paradox.py index 530f52ef..6f1d890d 100644 --- a/tests/test_paradox.py +++ b/tests/test_paradox.py @@ -39,3 +39,71 @@ async def test_control_doors(mocker): assert await alarm.control_door("Door 1", "unlock") alarm.panel.control_doors.assert_called_once_with([1], "unlock") + + +def _add_module_pgm(alarm, module_address, pgm_index): + key = f"module{module_address}_pgm{pgm_index}" + alarm.storage.get_container("module_pgm")[key] = { + "id": pgm_index, + "key": key, + "label": f"Module {module_address} PGM {pgm_index}", + "module_address": module_address, + "pgm_index": pgm_index, + } + alarm.storage.update_container_object("module_pgm", key, {"on": False}) + return key + + +@pytest.mark.asyncio +async def test_control_output_regular_pgm_routes_to_control_outputs(mocker): + """Regular PGM uses control_outputs, not control_module_pgm_outputs.""" + alarm = Paradox() + alarm.panel = mocker.Mock(spec=Panel) + alarm.panel.control_outputs = AsyncMock(return_value=True) + alarm.panel.control_module_pgm_outputs = AsyncMock(return_value=True) + + alarm.storage.get_container("pgm").deep_merge({1: {"id": 1, "key": "PGM 1"}}) + + assert await alarm.control_output("PGM 1", "on") + alarm.panel.control_outputs.assert_called_once() + alarm.panel.control_module_pgm_outputs.assert_not_called() + + +@pytest.mark.asyncio +async def test_control_output_module_pgm_routes_to_control_module_pgm_outputs(mocker): + """Module PGM uses control_module_pgm_outputs with the correct address and index.""" + alarm = Paradox() + alarm.panel = mocker.Mock(spec=Panel) + alarm.panel.control_outputs = AsyncMock(return_value=True) + alarm.panel.control_module_pgm_outputs = AsyncMock(return_value=True) + + _add_module_pgm(alarm, module_address=4, pgm_index=2) + + assert await alarm.control_output("module4_pgm2", "on_override") + alarm.panel.control_module_pgm_outputs.assert_called_once_with(4, 2, "on_override") + alarm.panel.control_outputs.assert_not_called() + + +@pytest.mark.asyncio +async def test_control_output_module_pgm_optimistic_state(mocker): + """Accepted on_override sets on=True; off_override sets on=False.""" + alarm = Paradox() + alarm.panel = mocker.Mock(spec=Panel) + alarm.panel.control_module_pgm_outputs = AsyncMock(return_value=True) + + key = _add_module_pgm(alarm, module_address=4, pgm_index=1) + + await alarm.control_output(key, "on_override") + assert alarm.storage.get_container("module_pgm")[key]["on"] is True + + await alarm.control_output(key, "off_override") + assert alarm.storage.get_container("module_pgm")[key]["on"] is False + + +@pytest.mark.asyncio +async def test_control_output_no_match_returns_false(mocker): + """Returns False when the output key matches neither pgm nor module_pgm.""" + alarm = Paradox() + alarm.panel = mocker.Mock(spec=Panel) + + assert await alarm.control_output("nonexistent", "on") is False From 7326036c437d9a64ef3cdedc5e801c3ec313801b Mon Sep 17 00:00:00 2001 From: Jevgeni Kiski Date: Wed, 25 Mar 2026 12:41:19 +0200 Subject: [PATCH 7/7] Add validation for module addresses and pgm counts; update test function names --- paradox/paradox.py | 27 ++++++++++++++++++++++++--- tests/hardware/evo/test_pgm.py | 18 +++++++++++------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/paradox/paradox.py b/paradox/paradox.py index 1409ace3..58d99d0f 100644 --- a/paradox/paradox.py +++ b/paradox/paradox.py @@ -22,6 +22,7 @@ async_loop_unhandled_exception_handler, ) from paradox.hardware import Panel, create_panel +from paradox.hardware.evo.parsers import MODULE_PGM_PACKET_SLOTS from paradox.lib import ps from paradox.lib.async_message_manager import ErrorMessageHandler, EventMessageHandler from paradox.lib.handlers import PersistentHandler @@ -492,6 +493,22 @@ async def control_partition(self, partition: str, command: str) -> bool: def _init_module_pgms(self): for addr, pgm_count in cfg.MODULE_PGM_ADDRESSES.items(): + if not isinstance(addr, int) or not (1 <= addr <= 254): + logger.warning( + "MODULE_PGM_ADDRESSES: invalid module address %r (expected int 1-254), skipping", + addr, + ) + continue + if not isinstance(pgm_count, int) or not ( + 1 <= pgm_count <= MODULE_PGM_PACKET_SLOTS + ): + logger.warning( + "MODULE_PGM_ADDRESSES: invalid pgm_count %r for address %d (expected int 1-%d), skipping", + pgm_count, + addr, + MODULE_PGM_PACKET_SLOTS, + ) + continue for pgm_index in range(1, pgm_count + 1): key = f"module{addr}_pgm{pgm_index}" self.storage.get_container("module_pgm")[key] = { @@ -533,15 +550,19 @@ async def control_output(self, output, command) -> bool: out["module_address"], out["pgm_index"], command ) except NotImplementedError: - logger.error("control_module_pgm_outputs is not implemented for this alarm type") + logger.error( + "control_module_pgm_outputs is not implemented for this alarm type" + ) except asyncio.CancelledError: - logger.error("control_output canceled") + logger.error("control_module_pgm_output canceled") raise except asyncio.TimeoutError: logger.error("control_output timeout") if accepted: is_on = command in ("on", "on_override") - self.storage.update_container_object("module_pgm", out["key"], {"on": is_on}) + self.storage.update_container_object( + "module_pgm", out["key"], {"on": is_on} + ) return accepted logger.error("No outputs selected") diff --git a/tests/hardware/evo/test_pgm.py b/tests/hardware/evo/test_pgm.py index ac35319c..88687bcd 100644 --- a/tests/hardware/evo/test_pgm.py +++ b/tests/hardware/evo/test_pgm.py @@ -1,9 +1,7 @@ -from binascii import hexlify, unhexlify +from binascii import unhexlify from paradox.hardware.evo.adapters import PGMFlags -from paradox.hardware.evo.parsers import (BroadcastRequest, BroadcastResponse, - PerformPGMAction, - PGMBroadcastCommand) +from paradox.hardware.evo.parsers import PerformActionResponse, PerformPGMAction def test_pgm3_activate_and_monitor(): @@ -50,7 +48,7 @@ def test_pgm3_deactivate_and_monitor(): assert a == expected_out -def test_module_pgm_activate_and_monitor(): +def test_pgm4_activate_and_monitor(): expected_out = unhexlify("40130600000008000000000000000300000064") pgms = [4] @@ -60,7 +58,7 @@ def test_module_pgm_activate_and_monitor(): assert a == expected_out -def test_module_pgm_deactivate_and_monitor(): +def test_pgm4_deactivate_and_monitor(): expected_out = unhexlify("40130600000008000000000000000100000062") pgms = [4] @@ -119,4 +117,10 @@ def test_pgm_flags_3(): def test_pgm_confirmation(): - payload = unhexlify("42070000000049") + raw = unhexlify("42070000000049") + parsed = PerformActionResponse.parse(raw) + + assert parsed.fields.value.po.command == 0x4 + assert parsed.fields.value.po.status.Winload_connected is True + assert parsed.fields.value.po.status.alarm_reporting_pending is False + assert parsed.fields.value.packet_length == 7