Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions paradox/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Config:
"KEEP_ALIVE_INTERVAL": 10, # Interval between status updates
"IO_TIMEOUT": 0.5, # Timeout for IO operations
"LIMITS": {}, # By default all zones will be monitored
"MODULE_PGM_ADDRESSES": ({}, dict, None), # Map of bus module address -> pgm count, e.g. {4: 4}
"LABEL_ENCODING": "paradox-en", # Encoding to use when decoding labels. paradox-* or https://docs.python.org/3/library/codecs.html#standard-encodings
"LABEL_REFRESH_INTERVAL": (
15 * 60,
Expand Down
31 changes: 31 additions & 0 deletions paradox/hardware/evo/panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def parse_message(
return parsers.PerformPartitionAction.parse(message)
elif message[0] == 0xD0:
return parsers.PerformZoneAction.parse(message)
elif message[0] == 0xA4:
return parsers.PerformModulePGMAction.parse(message)
else:
if message[0] >> 4 == 0x7:
return parsers.ErrorMessage.parse(message)
Expand All @@ -139,6 +141,8 @@ def parse_message(
return parsers.PerformActionResponse.parse(message)
elif message[0] >> 4 == 0xD:
return parsers.PerformZoneActionResponse.parse(message)
elif message[0] >> 4 == 0xA:
return parsers.PerformModulePGMActionResponse.parse(message)
# elif message[0] == 0x50 and message[2] == 0x80:
# return PanelStatus.parse(message)
# elif message[0] == 0x50 and message[2] < 0x80:
Expand Down Expand Up @@ -304,6 +308,33 @@ async def control_outputs(self, outputs, command) -> bool:
logger.info('PGM command: "%s" failed' % command)
return reply is not None

async def control_module_pgm_outputs(self, module_address: int, pgm_index: int, command: str) -> bool:
"""
Control PGM module outputs
:param int module_address: bus address of the PGM module
:param int pgm_index: 1-4 index of the PGM output
:param str command: textual command
:return: True if accepted
"""
assert 1 <= pgm_index <= parsers.MODULE_PGM_PACKET_SLOTS, "pgm_index must be between 1 and %d" % parsers.MODULE_PGM_PACKET_SLOTS
pgm_commands = ["release"] * parsers.MODULE_PGM_PACKET_SLOTS
pgm_commands[pgm_index - 1] = command
Comment on lines +318 to +321
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""
pgm_commands = ["release"] * parsers.MODULE_PGM_OUTPUTS_PER_MODULE
pgm_commands[pgm_index - 1] = command
"""
assert 1 <= pgm_index <= parsers.MODULE_PGM_OUTPUTS_PER_MODULE, "pgm_index must be between 1 and %d" % parsers.MODULE_PGM_OUTPUTS_PER_MODULE
pgm_commands = ["release"] * parsers.MODULE_PGM_OUTPUTS_PER_MODULE
pgm_commands[pgm_index - 1] = command

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll include this change but I'm also renaming MODULE_PGM_OUTPUTS_PER_MODULE for clarity so I won't accept this suggestion exactly.


args = {"module_address": module_address, "pgm_commands": pgm_commands}
try:
reply = await self.core.send_wait(
parsers.PerformModulePGMAction, args, reply_expected=0xA
)
except MappingError:
logger.error('Module PGM command: "%s" is not supported' % command)
return False

if reply:
logger.info('Module PGM command: "%s" succeeded' % command)
else:
logger.info('Module PGM command: "%s" failed' % command)
return reply is not None

async def control_doors(self, doors, command) -> bool:
"""
Control Doors
Expand Down
38 changes: 38 additions & 0 deletions paradox/hardware/evo/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,44 @@ def _parse(self, stream, context, path):
"checksum" / PacketChecksum(Bytes(1)),
)

# Fixed number of PGM command slots in the 0xA4 packet — a protocol constant,
# not the number of PGMs a specific module exposes (which is configured separately).
MODULE_PGM_PACKET_SLOTS = 4

PerformModulePGMAction = Struct(
"fields"
/ RawCopy(
Struct(
"po" / Struct("command" / Const(0xA4, Int8ub)),
"packet_length" / PacketLength(Int8ub),
"_not_used0" / Padding(1),
"module_address" / Default(Int8ub, 0),
"_not_used1" / Padding(2),
"pgm_commands" / Array(MODULE_PGM_PACKET_SLOTS, Default(_PGMCommandEnum, "release")),
"_not_used2" / Padding(12),
)
),
"checksum" / PacketChecksum(Bytes(1)),
)

PerformModulePGMActionResponse = Struct(
"fields"
/ RawCopy(
Struct(
"po"
/ BitStruct(
"command" / Const(0xA, Nibble),
"_not_used" / Nibble,
),
"packet_length" / PacketLength(Int8ub),
"_not_used0" / Padding(1),
"module_address" / Int8ub,
"_not_used1" / Padding(2),
)
),
"checksum" / PacketChecksum(Bytes(1)),
)

_PGMBroadcastCommandEnum = Enum(
Int8ub,
no_change=0,
Expand Down
4 changes: 4 additions & 0 deletions paradox/hardware/panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ def control_partitions(self, partitions, command) -> bool:
def control_outputs(self, outputs, command) -> bool:
raise NotImplementedError("override control_outputs in a subclass")

@abstractmethod
def control_module_pgm_outputs(self, module_address, pgm_index, command) -> bool:
raise NotImplementedError("override control_module_pgm_outputs in a subclass")

@abstractmethod
def control_doors(self, doors, command) -> bool:
raise NotImplementedError("override control_doors in a subclass")
Expand Down
5 changes: 4 additions & 1 deletion paradox/interfaces/mqtt/entities/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from paradox.interfaces.mqtt.entities.binary_sensors import ZoneStatusBinarySensor, \
SystemBinarySensor, PartitionBinarySensor
from paradox.interfaces.mqtt.entities.sensor import PAIStatusSensor, SystemStatusSensor, ZoneNumericSensor
from paradox.interfaces.mqtt.entities.switch import ZoneBypassSwitch, PGMSwitch
from paradox.interfaces.mqtt.entities.switch import ZoneBypassSwitch, PGMSwitch, ModulePGMSwitch


class MQTTAutodiscoveryEntityFactory:
Expand Down Expand Up @@ -34,6 +34,9 @@ def make_zone_status_numeric_sensor(self, zone, status):
def make_pgm_switch(self, pgm):
return PGMSwitch(pgm, self.device, self.availability_topic)

def make_module_pgm_switch(self, module_pgm):
return ModulePGMSwitch(module_pgm, self.device, self.availability_topic)

def make_system_status(self, system_key, status):
if system_key == 'troubles':
return SystemBinarySensor(system_key, status, self.device, self.availability_topic)
Expand Down
20 changes: 20 additions & 0 deletions paradox/interfaces/mqtt/entities/switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,23 @@
self.property = "on"

self.pai_entity_type = "pgm"


class ModulePGMSwitch(Switch):
def __init__(self, module_pgm, device, availability_topic: str):
super().__init__(device, availability_topic)
self.module_pgm = module_pgm

self.key = sanitize_key(module_pgm["key"])
self.label = module_pgm["label"]
self.property = "on"

self.pai_entity_type = "pgm"

def serialize(self):
config = super().serialize()
config.update(dict(

Check warning on line 65 in paradox/interfaces/mqtt/entities/switch.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this constructor call with a literal.

See more on https://sonarcloud.io/project/issues?id=ParadoxAlarmInterface_pai&issues=AZzl9FtPlLOgYi19qSbW&open=AZzl9FtPlLOgYi19qSbW&pullRequest=591
payload_on="on_override",
payload_off="off_override",
Comment on lines +66 to +67
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not work with on and off? Not sure if *_override is required. Can you test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works for me

> $ cat switch_fence.sh                                                                                                                                                     
mosquitto_pub -t "paradox/control/outputs/module4_pgm4" -m "off_override"
mosquitto_pub -t "paradox/control/outputs/module4_pgm4" -m "on_override"

switching "*_override" out for "off" and "on" - I get nothing. No relay click, no triggering of the attached device. This is for a relay on a PGM4.

))
return config
1 change: 1 addition & 0 deletions paradox/interfaces/mqtt/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
zone=cfg.MQTT_ZONE_TOPIC,
output=cfg.MQTT_OUTPUT_TOPIC,
pgm=cfg.MQTT_OUTPUT_TOPIC,
module_pgm=cfg.MQTT_OUTPUT_TOPIC,
repeater=cfg.MQTT_REPEATER_TOPIC,
bus=cfg.MQTT_BUS_TOPIC,
module=cfg.MQTT_MODULE_TOPIC,
Expand Down
10 changes: 10 additions & 0 deletions paradox/interfaces/mqtt/homeassistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(self, alarm):
self.partitions = {}
self.zones = {}
self.pgms = {}
self.module_pgms = {}

self.entity_factory = MQTTAutodiscoveryEntityFactory(
self.mqtt.availability_topic
Expand Down Expand Up @@ -69,6 +70,7 @@ def _handle_labels_loaded(self, data):

self.zones = data.get("zone", {})
self.pgms = data.get("pgm", {})
self.module_pgms = data.get("module_pgm", {})

def _publish_when_ready(self, panel: DetectedPanel, status):
self.entity_factory.set_device(Device(panel))
Expand All @@ -80,6 +82,9 @@ def _publish_when_ready(self, panel: DetectedPanel, status):
self._publish_zone_configs(status["zone"])
if "pgm" in status:
self._publish_pgm_configs(status["pgm"])
module_pgms = dict(self.alarm.storage.get_container("module_pgm"))
if module_pgms:
self._publish_module_pgm_configs(module_pgms)
if "system" in status:
self._publish_system_property_configs(status["system"])

Expand Down Expand Up @@ -162,6 +167,11 @@ def _publish_pgm_configs(self, pgm_statuses):
pgm_switch_config = self.entity_factory.make_pgm_switch(pgm)
self._publish_config(pgm_switch_config)

def _publish_module_pgm_configs(self, module_pgms):
for _, module_pgm in module_pgms.items():
module_pgm_switch_config = self.entity_factory.make_module_pgm_switch(module_pgm)
self._publish_config(module_pgm_switch_config)

def _publish_system_property_configs(self, system_statuses):
for system_key, system_status in system_statuses.items():
for property_name in system_status:
Expand Down
93 changes: 72 additions & 21 deletions paradox/paradox.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
async_loop_unhandled_exception_handler,
)
from paradox.hardware import Panel, create_panel
from paradox.hardware.evo.parsers import MODULE_PGM_PACKET_SLOTS
from paradox.lib import ps
from paradox.lib.async_message_manager import ErrorMessageHandler, EventMessageHandler
from paradox.lib.handlers import PersistentHandler
Expand Down Expand Up @@ -216,6 +217,7 @@

logger.info("Loading data from panel memory")
await self.panel.load_memory()
self._init_module_pgms()

logger.info("Running")
self.run_state = RunState.RUN
Expand Down Expand Up @@ -489,33 +491,82 @@

return accepted

def _init_module_pgms(self):
for addr, pgm_count in cfg.MODULE_PGM_ADDRESSES.items():
if not isinstance(addr, int) or not (1 <= addr <= 254):
logger.warning(
"MODULE_PGM_ADDRESSES: invalid module address %r (expected int 1-254), skipping",
addr,
)
continue
if not isinstance(pgm_count, int) or not (
1 <= pgm_count <= MODULE_PGM_PACKET_SLOTS
):
logger.warning(
"MODULE_PGM_ADDRESSES: invalid pgm_count %r for address %d (expected int 1-%d), skipping",
pgm_count,
addr,
MODULE_PGM_PACKET_SLOTS,
)
continue
for pgm_index in range(1, pgm_count + 1):
key = f"module{addr}_pgm{pgm_index}"
self.storage.get_container("module_pgm")[key] = {
"id": pgm_index,
"key": key,
"label": f"Module {addr} PGM {pgm_index}",
"module_address": addr,
"pgm_index": pgm_index,
}
self.storage.update_container_object("module_pgm", key, {"on": False})

async def control_output(self, output, command) -> bool:

Check failure on line 523 in paradox/paradox.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 22 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ParadoxAlarmInterface_pai&issues=AZzl9FwilLOgYi19qSbX&open=AZzl9FwilLOgYi19qSbX&pullRequest=591
command = command.lower()
logger.debug(f"Control Output: {output} - {command}")

outputs_selected = self.storage.get_container("pgm").select(output)
if outputs_selected:
accepted = False
try:
accepted = await self.panel.control_outputs(outputs_selected, command)
except NotImplementedError:
logger.error("control_output is not implemented for this alarm type")
except asyncio.CancelledError:
logger.error("control_output canceled")
raise
except asyncio.TimeoutError:
logger.error("control_output timeout")
self.request_status_refresh()
return accepted

module_pgm_selected = self.storage.get_container("module_pgm").select(output)
if module_pgm_selected:
accepted = False
module_pgm_container = self.storage.get_container("module_pgm")
for key in module_pgm_selected:
out = module_pgm_container[key]
try:
accepted = await self.panel.control_module_pgm_outputs(
out["module_address"], out["pgm_index"], command
)
except NotImplementedError:
logger.error(
"control_module_pgm_outputs is not implemented for this alarm type"
)
except asyncio.CancelledError:
logger.error("control_module_pgm_output canceled")
raise
except asyncio.TimeoutError:
logger.error("control_output timeout")
if accepted:
is_on = command in ("on", "on_override")
self.storage.update_container_object(
"module_pgm", out["key"], {"on": is_on}
)
return accepted

# Not Found
if len(outputs_selected) == 0:
logger.error("No outputs selected")
return False

# Apply state changes
accepted = False
try:
accepted = await self.panel.control_outputs(outputs_selected, command)
except NotImplementedError:
logger.error("control_output is not implemented for this alarm type")
except asyncio.CancelledError:
logger.error("control_output canceled")
except asyncio.TimeoutError:
logger.error("control_output timeout")
# Apply state changes

# Refresh status
self.request_status_refresh() # Trigger status update

return accepted
logger.error("No outputs selected")
return False
Comment on lines 523 to +569
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function requires a unit test. It is quite complicated. Especially how will it distinguish if it needs to trigger PGM on the main module or external module.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some tests for this.


async def send_panic(self, partition_id, panic_type, user_id) -> bool:
logger.debug(
Expand Down
Loading