11"""Module for parsing B01/Q7 map content.
22
3- Observed Q7 `MAP_RESPONSE` payloads follow this decode pipeline:
4- - base64-encoded ASCII
5- - AES-ECB encrypted with the derived map key
6- - PKCS7 padded
7- - ASCII hex for a zlib-compressed SCMap payload
8-
93The inner SCMap blob is parsed with protobuf messages generated from
104`roborock/map/proto/b01_scmap.proto`.
115"""
126
13- import base64
14- import binascii
15- import hashlib
167import io
17- import zlib
188from dataclasses import dataclass
199
20- from Crypto .Cipher import AES
21- from google .protobuf .message import DecodeError , Message
10+ from google .protobuf .message import DecodeError
2211from PIL import Image
2312from vacuum_map_parser_base .config .image_config import ImageConfig
2413from vacuum_map_parser_base .map_data import ImageData , MapData
2514
2615from roborock .exceptions import RoborockException
2716from roborock .map .proto .b01_scmap_pb2 import RobotMap # type: ignore[attr-defined]
28- from roborock .protocol import Utils
2917
3018from .map_parser import ParsedMapData
3119
@@ -46,10 +34,9 @@ class B01MapParser:
4634 def __init__ (self , config : B01MapParserConfig | None = None ) -> None :
4735 self ._config = config or B01MapParserConfig ()
4836
49- def parse (self , raw_payload : bytes , * , serial : str , model : str ) -> ParsedMapData :
50- """Parse a raw MAP_RESPONSE payload and return a PNG + MapData."""
51- inflated = _decode_b01_map_payload (raw_payload , serial = serial , model = model )
52- parsed = _parse_scmap_payload (inflated )
37+ def parse (self , payload : bytes ) -> ParsedMapData :
38+ """Parse an inflated SCMap payload and return a PNG + MapData."""
39+ parsed = _parse_scmap_payload (payload )
5340 size_x , size_y , grid = _extract_grid (parsed )
5441 room_names = _extract_room_names (parsed )
5542
@@ -78,54 +65,13 @@ def parse(self, raw_payload: bytes, *, serial: str, model: str) -> ParsedMapData
7865 )
7966
8067
81- def _derive_map_key (serial : str , model : str ) -> bytes :
82- """Derive the B01/Q7 map decrypt key from serial + model."""
83- model_suffix = model .split ("." )[- 1 ]
84- model_key = (model_suffix + "0" * 16 )[:16 ].encode ()
85- material = f"{ serial } +{ model_suffix } +{ serial } " .encode ()
86- encrypted = Utils .encrypt_ecb (material , model_key )
87- md5 = hashlib .md5 (base64 .b64encode (encrypted ), usedforsecurity = False ).hexdigest ()
88- return md5 [8 :24 ].encode ()
89-
90-
91- def _decode_base64_payload (raw_payload : bytes ) -> bytes :
92- blob = raw_payload .strip ()
93- padded = blob + b"=" * (- len (blob ) % 4 )
94- try :
95- return base64 .b64decode (padded , validate = True )
96- except binascii .Error as err :
97- raise RoborockException ("Failed to decode B01 map payload" ) from err
98-
99-
100- def _decode_b01_map_payload (raw_payload : bytes , * , serial : str , model : str ) -> bytes :
101- """Decode raw B01 `MAP_RESPONSE` payload into inflated SCMap bytes."""
102- # TODO: Move this lower-level B01 transport decode under `roborock.protocols`
103- # so this module only handles SCMap parsing/rendering.
104- encrypted_payload = _decode_base64_payload (raw_payload )
105- if len (encrypted_payload ) % AES .block_size != 0 :
106- raise RoborockException ("Unexpected encrypted B01 map payload length" )
107-
108- map_key = _derive_map_key (serial , model )
109-
110- try :
111- compressed_hex = Utils .decrypt_ecb (encrypted_payload , map_key ).decode ("ascii" )
112- compressed_payload = bytes .fromhex (compressed_hex )
113- return zlib .decompress (compressed_payload )
114- except (ValueError , UnicodeDecodeError , zlib .error ) as err :
115- raise RoborockException ("Failed to decode B01 map payload" ) from err
116-
117-
118- def _parse_proto (blob : bytes , message : Message , * , context : str ) -> None :
119- try :
120- message .ParseFromString (blob )
121- except DecodeError as err :
122- raise RoborockException (f"Failed to parse { context } " ) from err
123-
124-
12568def _parse_scmap_payload (payload : bytes ) -> RobotMap :
12669 """Parse inflated SCMap bytes into a generated protobuf message."""
12770 parsed = RobotMap ()
128- _parse_proto (payload , parsed , context = "B01 SCMap" )
71+ try :
72+ parsed .ParseFromString (payload )
73+ except DecodeError as err :
74+ raise RoborockException ("Failed to parse B01 SCMap" ) from err
12975 return parsed
13076
13177
0 commit comments