-
Notifications
You must be signed in to change notification settings - Fork 75
Expand file tree
/
Copy pathtest_local_decoder_padding.py
More file actions
89 lines (67 loc) · 2.51 KB
/
test_local_decoder_padding.py
File metadata and controls
89 lines (67 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from roborock.protocol import create_local_decoder, create_local_encoder
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
TEST_LOCAL_KEY = "local_key"
def test_decoder_clean_message():
encoder = create_local_encoder(TEST_LOCAL_KEY)
decoder = create_local_decoder(TEST_LOCAL_KEY)
msg = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_REQUEST,
payload=b"test_payload",
version=b"1.0",
seq=1,
random=123,
)
encoded = encoder(msg)
decoded = decoder(encoded)
assert len(decoded) == 1
assert decoded[0].payload == b"test_payload"
def test_decoder_4byte_padding():
"""Test existing behavior: 4 byte padding should be skipped."""
encoder = create_local_encoder(TEST_LOCAL_KEY)
decoder = create_local_decoder(TEST_LOCAL_KEY)
msg = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_REQUEST,
payload=b"test_payload",
version=b"1.0",
)
encoded = encoder(msg)
# Prepend 4 bytes of garbage
garbage = b"\x00\x00\x05\xa1"
data = garbage + encoded
decoded = decoder(data)
assert len(decoded) == 1
assert decoded[0].payload == b"test_payload"
def test_decoder_variable_padding():
"""Test variable length padding handling."""
encoder = create_local_encoder(TEST_LOCAL_KEY, connect_nonce=123, ack_nonce=456)
decoder = create_local_decoder(TEST_LOCAL_KEY, connect_nonce=123, ack_nonce=456)
msg = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_REQUEST,
payload=b"test_payload",
version=b"L01",
)
encoded = encoder(msg)
# Prepend 6 bytes of garbage
garbage = b"\x00\x00\x05\xa1\xff\xff"
data = garbage + encoded
decoded = decoder(data)
assert len(decoded) == 1
assert decoded[0].payload == b"test_payload"
def test_decoder_split_padding_variable():
"""Test variable padding split across chunks."""
encoder = create_local_encoder(TEST_LOCAL_KEY, connect_nonce=123, ack_nonce=456)
decoder = create_local_decoder(TEST_LOCAL_KEY, connect_nonce=123, ack_nonce=456)
msg = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_REQUEST,
payload=b"test_payload",
version=b"L01",
)
encoded = encoder(msg)
garbage = b"\x00\x00\x05\xa1\xff\xff" # 6 bytes
# Send garbage
decoded1 = decoder(garbage)
assert len(decoded1) == 0
# Send message
decoded2 = decoder(encoded)
assert len(decoded2) == 1
assert decoded2[0].payload == b"test_payload"