Skip to content

Commit 57d93be

Browse files
committed
feat: Implement direct device trait updates from data protocol messages using dps metadata and add corresponding update listeners.
This uses the same dps converter patern used by q10, but does not share code explicitly.
1 parent f0eb62a commit 57d93be

File tree

16 files changed

+408
-41
lines changed

16 files changed

+408
-41
lines changed

roborock/data/v1/v1_containers.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
ROBOROCK_G20S_Ultra,
3838
)
3939
from roborock.exceptions import RoborockException
40+
from roborock.roborock_message import RoborockDataProtocol
4041

4142
from ..containers import NamedRoomMapping, RoborockBase, RoborockBaseTimer, _attr_repr
4243
from .v1_clean_modes import WashTowelModes
@@ -294,11 +295,11 @@ class StatusV2(RoborockBase):
294295

295296
msg_ver: int | None = None
296297
msg_seq: int | None = None
297-
state: RoborockStateCode | None = _requires_schema_code("state")
298-
battery: int | None = _requires_schema_code("battery")
298+
state: RoborockStateCode | None = field(default=None, metadata={"requires_schema_code": "state", "dps": RoborockDataProtocol.STATE})
299+
battery: int | None = field(default=None, metadata={"requires_schema_code": "battery", "dps": RoborockDataProtocol.BATTERY})
299300
clean_time: int | None = None
300301
clean_area: int | None = None
301-
error_code: RoborockErrorCode | None = _requires_schema_code("error_code")
302+
error_code: RoborockErrorCode | None = field(default=None, metadata={"requires_schema_code": "error_code", "dps": RoborockDataProtocol.ERROR_CODE})
302303
map_present: int | None = None
303304
in_cleaning: RoborockInCleaning | None = None
304305
in_returning: int | None = None
@@ -308,12 +309,12 @@ class StatusV2(RoborockBase):
308309
back_type: int | None = None
309310
wash_phase: int | None = None
310311
wash_ready: int | None = None
311-
fan_power: int | None = _requires_schema_code("fan_power")
312+
fan_power: int | None = field(default=None, metadata={"requires_schema_code": "fan_power", "dps": RoborockDataProtocol.FAN_POWER})
312313
dnd_enabled: int | None = None
313314
map_status: int | None = None
314315
is_locating: int | None = None
315316
lock_status: int | None = None
316-
water_box_mode: int | None = _requires_schema_code("water_box_mode")
317+
water_box_mode: int | None = field(default=None, metadata={"requires_schema_code": "water_box_mode", "dps": RoborockDataProtocol.WATER_BOX_MODE})
317318
water_box_carriage_status: int | None = None
318319
mop_forbidden_enable: int | None = None
319320
camera_status: int | None = None
@@ -330,14 +331,14 @@ class StatusV2(RoborockBase):
330331
debug_mode: int | None = None
331332
collision_avoid_status: int | None = None
332333
switch_map_mode: int | None = None
333-
dock_error_status: RoborockDockErrorCode | None = _requires_schema_code("dock_error_status")
334-
charge_status: int | None = _requires_schema_code("charge_status")
334+
dock_error_status: RoborockDockErrorCode | None = field(default=None, metadata={"requires_schema_code": "dock_error_status"})
335+
charge_status: int | None = field(default=None, metadata={"requires_schema_code": "charge_status", "dps": RoborockDataProtocol.CHARGE_STATUS})
335336
unsave_map_reason: int | None = None
336337
unsave_map_flag: int | None = None
337338
wash_status: int | None = None
338339
distance_off: int | None = None
339340
in_warmup: int | None = None
340-
dry_status: int | None = _requires_schema_code("drying_status")
341+
dry_status: int | None = field(default=None, metadata={"requires_schema_code": "drying_status", "dps": RoborockDataProtocol.DRYING_STATUS})
341342
rdt: int | None = None
342343
clean_percent: int | None = None
343344
rss: int | None = None
@@ -643,9 +644,9 @@ class ConsumableField(FieldNameBase):
643644

644645
@dataclass
645646
class Consumable(RoborockBase):
646-
main_brush_work_time: int | None = field(metadata={"requires_schema_code": "main_brush_life"}, default=None)
647-
side_brush_work_time: int | None = field(metadata={"requires_schema_code": "side_brush_life"}, default=None)
648-
filter_work_time: int | None = field(metadata={"requires_schema_code": "filter_life"}, default=None)
647+
main_brush_work_time: int | None = field(default=None, metadata={"requires_schema_code": "main_brush_life", "dps": RoborockDataProtocol.MAIN_BRUSH_WORK_TIME})
648+
side_brush_work_time: int | None = field(default=None, metadata={"requires_schema_code": "side_brush_life", "dps": RoborockDataProtocol.SIDE_BRUSH_WORK_TIME})
649+
filter_work_time: int | None = field(default=None, metadata={"requires_schema_code": "filter_life", "dps": RoborockDataProtocol.FILTER_WORK_TIME})
649650
filter_element_work_time: int | None = None
650651
sensor_dirty_time: int | None = None
651652
strainer_work_times: int | None = None

roborock/devices/device.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ async def connect(self) -> None:
199199
unsub = await self._channel.subscribe(self._on_message)
200200
try:
201201
if self.v1_properties is not None:
202-
await self.v1_properties.discover_features()
202+
await self.v1_properties.start()
203203
elif self.b01_q10_properties is not None:
204204
await self.b01_q10_properties.start()
205205
except RoborockException:
@@ -216,6 +216,8 @@ async def close(self) -> None:
216216
await self._connect_task
217217
except asyncio.CancelledError:
218218
pass
219+
if self.v1_properties is not None:
220+
self.v1_properties.close()
219221
if self.b01_q10_properties is not None:
220222
await self.b01_q10_properties.close()
221223
if self._unsub:

roborock/devices/device_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDat
236236
channel.rpc_channel,
237237
channel.mqtt_rpc_channel,
238238
channel.map_rpc_channel,
239+
channel.add_dps_listener,
239240
web_api,
240241
device_cache=device_cache,
241242
map_parser_config=map_parser_config,

roborock/devices/rpc/v1_channel.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from dataclasses import dataclass
1212
from typing import Any, TypeVar
1313

14+
from roborock.callbacks import CallbackList
1415
from roborock.data import HomeDataDevice, NetworkInfo, RoborockBase, UserData
1516
from roborock.devices.cache import DeviceCache
1617
from roborock.devices.transport.channel import Channel
@@ -30,9 +31,10 @@
3031
V1RpcChannel,
3132
create_map_response_decoder,
3233
create_security_data,
34+
decode_data_protocol_message,
3335
decode_rpc_response,
3436
)
35-
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
37+
from roborock.roborock_message import RoborockDataProtocol, RoborockMessage, RoborockMessageProtocol
3638
from roborock.roborock_typing import RoborockCommand
3739
from roborock.util import RoborockLoggerAdapter
3840

@@ -188,6 +190,7 @@ def __init__(
188190
self._device_cache = device_cache
189191
self._reconnect_task: asyncio.Task[None] | None = None
190192
self._last_network_info_refresh: datetime.datetime | None = None
193+
self._dps_listeners = CallbackList[dict[RoborockDataProtocol, Any]]()
191194

192195
@property
193196
def is_connected(self) -> bool:
@@ -305,12 +308,14 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab
305308
loop = asyncio.get_running_loop()
306309
self._reconnect_task = loop.create_task(self._background_reconnect())
307310

308-
if not self.is_local_connected:
309-
# We were not able to connect locally, so fallback to MQTT and at least
310-
# establish that connection explicitly. If this fails then raise an
311-
# error and let the caller know we failed to subscribe.
312-
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
313-
self._logger.debug("V1Channel connected to device via MQTT")
311+
# Always subscribe to MQTT to receive protocol updates (data points)
312+
# even if we have a local connection. Protocol updates only come via cloud/MQTT.
313+
# Local connection is used for RPC commands, but push notifications come via MQTT.
314+
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
315+
if self.is_local_connected:
316+
self._logger.debug("V1Channel connected via local and MQTT (for protocol updates)")
317+
else:
318+
self._logger.debug("V1Channel connected via MQTT only")
314319

315320
def unsub() -> None:
316321
"""Unsubscribe from all messages."""
@@ -328,6 +333,16 @@ def unsub() -> None:
328333
self._callback = callback
329334
return unsub
330335

336+
def add_dps_listener(self, listener: Callable[[dict[RoborockDataProtocol, Any]], None]) -> Callable[[], None]:
337+
"""Add a listener for DPS updates.
338+
339+
This will attach a listener to the existing subscription, invoking
340+
the listener whenever new DPS values arrive from the subscription.
341+
This will only work if a subscription has already been setup, which is
342+
handled by the device setup.
343+
"""
344+
return self._dps_listeners.add_callback(listener)
345+
331346
async def _get_networking_info(self, *, prefer_cache: bool = True) -> NetworkInfo:
332347
"""Retrieve networking information for the device.
333348
@@ -428,6 +443,11 @@ def _on_mqtt_message(self, message: RoborockMessage) -> None:
428443
self._logger.debug("V1Channel received MQTT message: %s", message)
429444
if self._callback:
430445
self._callback(message)
446+
try:
447+
if datapoints := decode_data_protocol_message(message):
448+
self._dps_listeners(datapoints)
449+
except RoborockException as e:
450+
self._logger.debug("Error decoding data protocol message: %s", e)
431451

432452
def _on_local_message(self, message: RoborockMessage) -> None:
433453
"""Handle incoming local messages."""

roborock/devices/traits/v1/__init__.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,18 @@
5353
"""
5454

5555
import logging
56+
from collections.abc import Callable
5657
from dataclasses import dataclass, field, fields
5758
from typing import Any, get_args
5859

5960
from roborock.data.containers import HomeData, HomeDataProduct, RoborockBase
6061
from roborock.data.v1.v1_code_mappings import RoborockDockTypeCode
6162
from roborock.devices.cache import DeviceCache
6263
from roborock.devices.traits import Trait
64+
from roborock.exceptions import RoborockException
6365
from roborock.map.map_parser import MapParserConfig
64-
from roborock.protocols.v1_protocol import V1RpcChannel
66+
from roborock.protocols.v1_protocol import V1RpcChannel, decode_data_protocol_message
67+
from roborock.roborock_message import RoborockDataProtocol, RoborockMessage
6568
from roborock.web_api import UserWebApiClient
6669

6770
from . import (
@@ -176,6 +179,7 @@ def __init__(
176179
rpc_channel: V1RpcChannel,
177180
mqtt_rpc_channel: V1RpcChannel,
178181
map_rpc_channel: V1RpcChannel,
182+
add_dps_listener: Callable[[Callable[[dict[RoborockDataProtocol, Any]], None]], Callable[[], None]],
179183
web_api: UserWebApiClient,
180184
device_cache: DeviceCache,
181185
map_parser_config: MapParserConfig | None = None,
@@ -189,6 +193,8 @@ def __init__(
189193
self._web_api = web_api
190194
self._device_cache = device_cache
191195
self._region = region
196+
self._unsub: Callable[[], None] | None = None
197+
self._add_dps_listener = add_dps_listener
192198

193199
self.device_features = DeviceFeaturesTrait(product, self._device_cache)
194200
self.status = StatusTrait(self.device_features, region=self._region)
@@ -227,6 +233,24 @@ def _get_rpc_channel(self, trait: V1TraitMixin) -> V1RpcChannel:
227233
else:
228234
return self._rpc_channel
229235

236+
async def start(self) -> None:
237+
"""Start the properties API and discover features."""
238+
await self.discover_features()
239+
self._unsub = self._add_dps_listener(self._on_dps_update)
240+
241+
def close(self) -> None:
242+
if self._unsub:
243+
self._unsub()
244+
245+
def _on_dps_update(self, dps: dict[RoborockDataProtocol, Any]) -> None:
246+
"""Handle incoming messages from the device.
247+
248+
This will notify all traits of the new values.
249+
"""
250+
_LOGGER.debug("Received message from device: %s", dps)
251+
self.status.update_from_dps(dps)
252+
self.consumables.update_from_dps(dps)
253+
230254
async def discover_features(self) -> None:
231255
"""Populate any supported traits that were not initialized in __init__."""
232256
_LOGGER.debug("Starting optional trait discovery")
@@ -330,6 +354,7 @@ def create(
330354
rpc_channel: V1RpcChannel,
331355
mqtt_rpc_channel: V1RpcChannel,
332356
map_rpc_channel: V1RpcChannel,
357+
add_dps_listener: Callable[[Callable[[dict[RoborockDataProtocol, Any]], None]], Callable[[], None]],
333358
web_api: UserWebApiClient,
334359
device_cache: DeviceCache,
335360
map_parser_config: MapParserConfig | None = None,
@@ -343,6 +368,7 @@ def create(
343368
rpc_channel,
344369
mqtt_rpc_channel,
345370
map_rpc_channel,
371+
add_dps_listener,
346372
web_api,
347373
device_cache,
348374
map_parser_config,

roborock/devices/traits/v1/common.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55

66
import logging
77
from abc import ABC, abstractmethod
8+
from collections.abc import Callable
89
from dataclasses import fields
9-
from typing import ClassVar
10+
from typing import Any, ClassVar
1011

12+
from roborock.callbacks import CallbackList
1113
from roborock.data import RoborockBase
1214
from roborock.protocols.v1_protocol import V1RpcChannel
15+
from roborock.roborock_message import RoborockDataProtocol
1316
from roborock.roborock_typing import RoborockCommand
1417

1518
_LOGGER = logging.getLogger(__name__)
@@ -173,3 +176,74 @@ def wrapper(*args, **kwargs):
173176

174177
cls.map_rpc_channel = True # type: ignore[attr-defined]
175178
return wrapper
179+
180+
181+
# TODO(allenporter): Merge with roborock.devices.traits.b01.q10.common.TraitUpdateListener
182+
class TraitUpdateListener(ABC):
183+
"""Trait update listener.
184+
185+
This is a base class for traits to support notifying listeners when they
186+
have been updated. Clients may register callbacks to be notified when the
187+
trait has been updated. When the listener callback is invoked, the client
188+
should read the trait's properties to get the updated values.
189+
"""
190+
191+
def __init__(self, logger: logging.Logger) -> None:
192+
"""Initialize the trait update listener."""
193+
self._update_callbacks: CallbackList[None] = CallbackList(logger=logger)
194+
195+
def add_update_listener(self, callback: Callable[[], None]) -> Callable[[], None]:
196+
"""Register a callback when the trait has been updated.
197+
198+
Returns a callable to remove the listener.
199+
"""
200+
# We wrap the callback to ignore the value passed to it.
201+
return self._update_callbacks.add_callback(lambda _: callback())
202+
203+
def _notify_update(self) -> None:
204+
"""Notify all update listeners."""
205+
self._update_callbacks(None)
206+
207+
208+
class DpsDataConverter:
209+
"""Utility to handle the transformation and merging of DPS data into models.
210+
211+
This class pre-calculates the mapping between Data Point IDs and dataclass fields
212+
to optimize repeated updates from device streams.
213+
"""
214+
215+
def __init__(self, dps_type_map: dict[RoborockDataProtocol, type], dps_field_map: dict[RoborockDataProtocol, str]):
216+
"""Initialize the converter for a specific RoborockBase-derived class."""
217+
self._dps_type_map = dps_type_map
218+
self._dps_field_map = dps_field_map
219+
220+
@classmethod
221+
def from_dataclass(cls, dataclass_type: type[RoborockBase]):
222+
"""Initialize the converter for a specific RoborockBase-derived class."""
223+
dps_type_map: dict[RoborockDataProtocol, type] = {}
224+
dps_field_map: dict[RoborockDataProtocol, str] = {}
225+
for field_obj in fields(dataclass_type):
226+
if field_obj.metadata and "dps" in field_obj.metadata:
227+
dps_id = field_obj.metadata["dps"]
228+
dps_type_map[dps_id] = field_obj.type
229+
dps_field_map[dps_id] = field_obj.name
230+
return cls(dps_type_map, dps_field_map)
231+
232+
def update_from_dps(self, target: RoborockBase, decoded_dps: dict[RoborockDataProtocol, Any]) -> bool:
233+
"""Convert and merge raw DPS data into the target object.
234+
235+
Uses the pre-calculated type mapping to ensure values are converted to the
236+
correct Python types before being updated on the target.
237+
238+
Args:
239+
target: The target object to update.
240+
decoded_dps: The decoded DPS data to convert.
241+
242+
Returns:
243+
True if any values were updated, False otherwise.
244+
"""
245+
conversions = RoborockBase.convert_dict(self._dps_type_map, decoded_dps)
246+
for dps_id, value in conversions.items():
247+
field_name = self._dps_field_map[dps_id]
248+
setattr(target, field_name, value)
249+
return bool(conversions)

roborock/devices/traits/v1/consumeable.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,21 @@
55
"""
66

77
from enum import StrEnum
8-
from typing import Self
8+
from typing import Any, Self
99

1010
from roborock.data import Consumable
1111
from roborock.devices.traits.v1 import common
12+
from roborock.roborock_message import RoborockDataProtocol
1213
from roborock.roborock_typing import RoborockCommand
1314

15+
from .common import TraitUpdateListener
16+
1417
__all__ = [
1518
"ConsumableTrait",
1619
]
1720

21+
_DPS_CONVERTER = common.DpsDataConverter.from_dataclass(Consumable)
22+
1823

1924
class ConsumableAttribute(StrEnum):
2025
"""Enum for consumable attributes."""
@@ -35,7 +40,7 @@ def from_str(cls, value: str) -> Self:
3540
raise ValueError(f"Unknown ConsumableAttribute: {value}")
3641

3742

38-
class ConsumableTrait(Consumable, common.V1TraitMixin):
43+
class ConsumableTrait(Consumable, common.V1TraitMixin, TraitUpdateListener):
3944
"""Trait for managing consumable attributes on Roborock devices.
4045
4146
After the first refresh, you can tell what consumables are supported by
@@ -49,3 +54,12 @@ async def reset_consumable(self, consumable: ConsumableAttribute) -> None:
4954
"""Reset a specific consumable attribute on the device."""
5055
await self.rpc_channel.send_command(RoborockCommand.RESET_CONSUMABLE, params=[consumable.value])
5156
await self.refresh()
57+
58+
def update_from_dps(self, decoded_dps: dict[RoborockDataProtocol, Any]) -> None:
59+
"""Update the trait from data protocol push message data.
60+
61+
This handles unsolicited status updates pushed by the device
62+
via RoborockDataProtocol codes (e.g. STATE=121, BATTERY=122).
63+
"""
64+
if _DPS_CONVERTER.update_from_dps(self, decoded_dps):
65+
self._notify_update()

0 commit comments

Comments
 (0)