-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpump_advertiser.py
More file actions
194 lines (159 loc) · 6.92 KB
/
pump_advertiser.py
File metadata and controls
194 lines (159 loc) · 6.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import logging
from datetime import datetime
from time import sleep
import atexit
import re
from threading import Thread
import subprocess
from log_manager import LogManager
from utils import exec
from bluezero.device import Device
class PumpAdvertiser():
adv_name:str = None
log:logging.Logger = None
instance_id:int = None
adv_started:datetime|None = None
sleep_delay:int = 0.5 # this needs to be very high, since it can SILENTLY DROP COMMANDS!!! debugging this was a fucking pain. state of linux bluetooth in 2026 everyone
connected:bool = False
fake_adv_time:int = 5 # in sec. this is passed down to the OS
adv_thread_time:float = 4.9 # the real value we let the advertisement packets live
already_paired:bool
startup_commands:list[str] = [
"sudo btmgmt power off",
"sudo btmgmt bredr off",
"sudo btmgmt le on",
"sudo btmgmt sc off",
"sudo btmgmt pairable on",
"sudo btmgmt connectable on",
"sudo btmgmt bondable on",
"sudo btmgmt discov on",
"sudo btmgmt io-cap 3", # this is very important!
"sudo btmgmt power on",
# DOES NOT WORK, NEEDS CONFIG WORKAROUND!!!: "sudo btmgmt privacy device"
]
def __is_valid_adv_name(self, s: str) -> bool:
return bool(re.fullmatch(r"Mobile .{0,7}", s))
def __is_kernel_locked_down(self) -> bool:
locked = True
with open("/sys/kernel/security/lockdown", "r") as f:
for f in f.readlines():
if "[none]" in f:
locked = False
break
self.logger.info(f"kernel is locked down? {locked}")
return locked
def __check_kernel_workaround_applied(self) -> bool:
paths = [
'/sys/kernel/debug/bluetooth/hci0/adv_min_interval',
'/sys/kernel/debug/bluetooth/hci0/adv_max_interval'
]
for p in paths:
result = subprocess.run(
["sudo", "cat", p],
capture_output=True,
text=True,
check=True
)
result = result.stdout.strip()
result = int(result)
if result > 100:
self.logger.warning("kernel level interval fix is NOT applied!")
return False
self.logger.info("kernel level interval fix is already applied!")
return True
def __set_kernel_fix(self) -> None:
self.logger.info(f"applying kernel level fix...")
exec("echo 50 | sudo tee /sys/kernel/debug/bluetooth/hci0/adv_min_interval")
exec("echo 60 | sudo tee /sys/kernel/debug/bluetooth/hci0/adv_max_interval")
return
def __init__(self, adv_name:str, already_paired:bool=False, instance_id:int=1):
"""
adv_name: name for advertising
already_paired: to advertise on the other uuid with a bit flip
instance_id: the bluez instance id
"""
self.instance_id = instance_id
self.logger = LogManager.get_logger(self.__class__.__name__)
self.already_paired = already_paired
if already_paired:
self.adv_thread_time = 1.0 # its helpful because we clear the default bluezero advertismeent more often
if not self.__check_kernel_workaround_applied():
if not self.__is_kernel_locked_down():
self.__set_kernel_fix()
else:
self.adv_thread_time = 0.05 # userland workaround if we can not use the kernel level fix. if we spam the commands, we can get down under the 1s+ default advertising interval without any extra effort
# NOTE: the pump only accepts advertisers with a specific naming scheme
self.adv_name = "Mobile " + adv_name
if not self.__is_valid_adv_name(self.adv_name):
raise Exception(f"Invalid advertising name given: {self.adv_name}")
# run btmgmt commands
for c in self.startup_commands:
exec(c)
sleep(self.sleep_delay) # wait for hci to actually perform it. NOTE: make this delay larger if you see errors!
atexit.register(self.stop_adv) # just to be on the safe side
self.logger.warning("always accept the pairing if your desktop environment asks for it!")
return
def __create_adv_cmd(self) -> str:
data = ""
# Flags: we have turned BR/EDR off in self.startup_commands
data += "02 01 06"
# 16-bit Service Class UUIDs: SAKE
data += "03 03 "
data += "81 fe" if self.already_paired else "82 fe"
# Device Name
length = 1 + len(self.adv_name)
data += f"{length:02x} 09 {self.adv_name.encode().hex()}"
data = data.replace(" ", "")
# timeout is how long the bluez object lives (??)
# set duration and timeout to the same for now, idk
full_cmd = f"sudo btmgmt add-adv -d {data} -t {self.fake_adv_time} -D {self.fake_adv_time} {self.instance_id}"
return full_cmd
def __clear_adv(self):
exec("sudo btmgmt clr-adv")
return
def stop_adv(self) -> None:
self.logger.info("advertising stopped")
# WARNING! this is a very hacky and deliberate almost-race-condition... dont change these two lines
self.adv_started = None
self.__clear_adv()
return
def start_adv(self) -> None:
if self.adv_started != None:
self.logger.error(f"advertisement already running? skipping...")
return
self.adv_started = datetime.now()
self.logger.info(f"advertisement started at {self.adv_started} as {self.adv_name}")
thread = Thread(target = self.__adv_thread)
thread.start()
return
def on_connect_cb(self, device:Device):
self.logger.warning(f"device {device.address} connected!")
self.connected = True
self.stop_adv()
# does not work: exec(f"bluetoothctl trust {device.address}") # auto accept it (skip gui check)
return
def on_disconnect_cb(self, device:Device):
self.logger.warning(f"device {device.address} disconnected!")
self.connected = False
self.start_adv()
return
def __adv_thread(self):
# hacky, since bluezero also starts an advertisement, which is not good for us and we need to "fight it"
while True:
if self.adv_started == None:
return
cmd = self.__create_adv_cmd()
exec(cmd)
sleep(self.adv_thread_time)
self.__clear_adv()
# def __get_advertisement_count(self):
# result = subprocess.run(
# ["sudo", "btmgmt", "advinfo"],
# capture_output=True,
# text=True,
# check=True,
# )
# match = re.search(r"Instances list with (\d+) item", result.stdout)
# if not match:
# raise RuntimeError(f"Could not find advertisement count in: {result.stdout}")
# return int(match.group(1))