forked from OpenBubbles/rustpush
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecode.py
More file actions
321 lines (248 loc) · 9.67 KB
/
decode.py
File metadata and controls
321 lines (248 loc) · 9.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
import base64
import binascii
import struct
import json
import os
import sys
import zlib
# --- Utility Functions ---
def parse_input(input_str):
"""Auto-detects hex vs base64 and returns a bytearray."""
cleaned = input_str.replace(" ", "").replace("\n", "").replace("\r", "")
normalized_hex = cleaned.lower().replace("0x", "")
if all(c in "0123456789abcdef" for c in normalized_hex) and len(normalized_hex) % 2 == 0:
try:
return bytearray(binascii.unhexlify(normalized_hex))
except binascii.Error:
pass
try:
padding = 4 - (len(cleaned) % 4)
if padding and padding < 4:
cleaned += "=" * padding
return bytearray(base64.b64decode(cleaned))
except Exception as e:
raise ValueError(f"Could not parse input as Hex or Base64. Error: {e}")
def decompress_if_needed(buffer):
if len(buffer) >= 2 and buffer[0] == 0x78:
if (buffer[0] * 256 + buffer[1]) % 31 == 0:
try:
decompressed = zlib.decompress(buffer)
print(f"[+] Zlib compression detected! Decompressed {len(buffer)} bytes -> {len(decompressed)} bytes.")
return bytearray(decompressed)
except zlib.error as e:
print(f"[-] Warning: Matched Zlib header but decompression failed ({e}). Proceeding as raw data.")
return buffer
def zigzag_decode(n):
if n % 2 == 0:
return n // 2
else:
return -((n + 1) // 2)
def twos_complement(n, bits):
if n & (1 << (bits - 1)):
return n - (1 << bits)
return n
def decode_varint(buffer, offset):
res = 0
shift = 0
start_offset = offset
while True:
if offset >= len(buffer):
raise IndexError("Index out of bounds decoding varint")
byte = buffer[offset]
offset += 1
res += (byte & 0x7f) << shift
shift += 7
if not (byte & 0x80):
break
return res, offset - start_offset
def to_rust_vec(buffer):
"""Formats a bytearray into a valid Rust vec![0x..., 0x...] string."""
if not buffer:
return "vec![]"
return "vec![" + ", ".join(f"0x{b:02x}" for b in buffer) + "]"
# --- Decoders for specific wire types ---
def decode_fixed32(buffer):
if len(buffer) != 4:
return {"error": "Invalid length for fixed32"}
float_val = struct.unpack('<f', buffer)[0]
int_val = struct.unpack('<i', buffer)[0]
uint_val = struct.unpack('<I', buffer)[0]
result = {"int": int_val}
if int_val != uint_val:
result["uint"] = uint_val
result["float"] = float_val
return result
def decode_fixed64(buffer):
if len(buffer) != 8:
return {"error": "Invalid length for fixed64"}
double_val = struct.unpack('<d', buffer)[0]
int_val = struct.unpack('<q', buffer)[0]
uint_val = struct.unpack('<Q', buffer)[0]
result = {"int": int_val}
if int_val != uint_val:
result["uint"] = uint_val
result["double"] = double_val
return result
def decode_varint_parts(value):
result = {"uint": value}
for bits in [8, 16, 32, 64]:
int_val = twos_complement(value, bits)
if int_val != value:
result[f"int{bits}"] = int_val
signed_val = zigzag_decode(value)
if signed_val != value:
result["sint"] = signed_val
return result
def decode_string_or_bytes(buffer):
if not buffer:
return {"type": "string|bytes", "value": ""}
try:
decoded = buffer.decode('utf-8')
return {"type": "string", "value": decoded}
except UnicodeDecodeError:
hex_str = binascii.hexlify(buffer).decode('ascii')
spaced_hex = ' '.join(hex_str[i:i+2] for i in range(0, len(hex_str), 2))
return {"type": "bytes", "value": spaced_hex}
# --- Core Protobuf Reader ---
class BufferReader:
def __init__(self, buffer):
self.buffer = buffer
self.offset = 0
self.saved_offset = 0
def read_varint(self):
val, length = decode_varint(self.buffer, self.offset)
self.offset += length
return val
def read_buffer(self, length):
self.check_byte(length)
res = self.buffer[self.offset : self.offset + length]
self.offset += length
return res
def try_skip_grpc_header(self):
backup_offset = self.offset
if self.left_bytes() >= 5 and self.buffer[self.offset] == 0:
self.offset += 1
length = struct.unpack('>i', self.buffer[self.offset : self.offset + 4])[0]
self.offset += 4
if length > self.left_bytes():
self.offset = backup_offset
def left_bytes(self):
return len(self.buffer) - self.offset
def check_byte(self, length):
if length > self.left_bytes():
raise IndexError(f"Not enough bytes left. Requested: {length}, Left: {self.left_bytes()}")
def checkpoint(self):
self.saved_offset = self.offset
def reset_to_checkpoint(self):
self.offset = self.saved_offset
TYPES = {
"VARINT": 0,
"FIXED64": 1,
"LENDELIM": 2,
"FIXED32": 5,
"MSG_LEN_DELIMITER": -1
}
def wire_type_to_string(wire_type):
for k, v in TYPES.items():
if v == wire_type:
return k
return f"UNKNOWN({wire_type})"
def decode_proto(buffer, parse_delimited=False):
reader = BufferReader(buffer)
parts = []
if not parse_delimited:
reader.try_skip_grpc_header()
proto_msg_end = 0
while reader.left_bytes() > 0:
reader.checkpoint()
if parse_delimited and proto_msg_end == reader.offset:
pass
try:
start_offset = reader.offset
index_type = reader.read_varint()
wire_type = index_type & 0b111
field_number = index_type >> 3
value = None
content_type = wire_type_to_string(wire_type)
# Keep a reference to inner payload for Rust extraction
inner_raw_buf = b""
if wire_type == TYPES["VARINT"]:
raw_val = reader.read_varint()
value = decode_varint_parts(raw_val)
elif wire_type == TYPES["LENDELIM"]:
length = reader.read_varint()
raw_buf = reader.read_buffer(length)
inner_raw_buf = raw_buf # Save for Rust Vector output
try:
nested_parts, leftover = decode_proto(raw_buf, parse_delimited=False)
if len(leftover) == 0 and len(nested_parts) > 0:
value = nested_parts
content_type = "nested_protobuf"
else:
raise ValueError("Not a clean nested protobuf")
except Exception:
str_or_bytes = decode_string_or_bytes(raw_buf)
value = str_or_bytes["value"]
content_type = str_or_bytes["type"]
elif wire_type == TYPES["FIXED32"]:
raw_buf = reader.read_buffer(4)
inner_raw_buf = raw_buf
value = decode_fixed32(raw_buf)
elif wire_type == TYPES["FIXED64"]:
raw_buf = reader.read_buffer(8)
inner_raw_buf = raw_buf
value = decode_fixed64(raw_buf)
else:
raise ValueError(f"Unknown wire type: {wire_type}")
end_offset = reader.offset
# --- RUST INJECTION LOGIC ---
item = {
"byte_range": f"{start_offset}-{end_offset} ({end_offset - start_offset} B)",
"field_number": field_number,
"type": content_type,
}
# Only add the Rust syntax if it's a type that requires byte mapping in prost (Length Delimited)
if wire_type == TYPES["LENDELIM"]:
item["inner_hex"] = binascii.hexlify(inner_raw_buf).decode('ascii')
item["rust_vec_syntax"] = to_rust_vec(inner_raw_buf)
item["content"] = value
parts.append(item)
except Exception as e:
reader.reset_to_checkpoint()
break
return parts, reader.read_buffer(reader.left_bytes())
# --- CLI Interface ---
def main():
print("=== Protobuf to JSON Decoder (Rust Ready!) ===")
print("Paste your Base64 or Hex payload below (Zlib auto-detected). Press Enter twice to finish:")
lines = []
while True:
line = sys.stdin.readline()
if line == "\n":
break
lines.append(line.strip())
payload = "".join(lines)
if not payload:
print("Error: No payload provided.")
return
try:
raw_buffer = parse_input(payload)
decompressed_buffer = decompress_if_needed(raw_buffer)
except ValueError as e:
print(f"[-] {e}")
return
parts, leftover = decode_proto(decompressed_buffer)
output_data = {
"decoded_messages": parts,
}
if leftover:
hex_leftover = binascii.hexlify(leftover).decode('ascii')
spaced_hex = ' '.join(hex_leftover[i:i+2] for i in range(0, len(hex_leftover), 2))
output_data["leftover_bytes"] = spaced_hex
print(f"[!] Warning: {len(leftover)} bytes were left over (could not be cleanly parsed).")
output_file = "decoded_protobuf.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(output_data, f, indent=2)
print(f"[+] Success! Output safely written to {os.path.abspath(output_file)}")
if __name__ == "__main__":
main()