-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
152 lines (127 loc) · 4.73 KB
/
utils.py
File metadata and controls
152 lines (127 loc) · 4.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
WIFI_RETRY_INTERVAL = 15
def validate_body(data, rules):
"""Validate request body fields.
rules: {field: (type, required, extra)}
extra: None or dict with "enum", "min", "max" keys.
Returns None on success, error string on failure."""
for field, (typ, required, extra) in rules.items():
val = data.get(field)
if val is None:
if required:
return f"'{field}' is required"
continue
if not isinstance(val, typ) or (typ is int and isinstance(val, bool)):
return f"'{field}' must be {typ.__name__}"
if extra:
if "enum" in extra and val not in extra["enum"]:
return f"'{field}' must be one of {extra['enum']}"
if "min" in extra and val < extra["min"]:
return f"'{field}' must be >= {extra['min']}"
if "max" in extra and val > extra["max"]:
return f"'{field}' must be <= {extra['max']}"
return None
def format_time(seconds):
"""
Formats time in hours, minutes and seconds
"""
if seconds is not None:
hours = seconds // 3600 % 24
minutes = seconds % 3600 // 60
seconds = seconds % 3600 % 60
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
return "00:00:00"
def decode(name):
name = name.replace("+", " ")
name = name.replace("%20", " ")
name = name.replace("%2F", "/")
name = name.replace("%3A", ":")
name = name.replace("%3D", "=")
name = name.replace("%3F", "?")
name = name.replace("%23", "#")
name = name.replace("%26", "&")
name = name.replace("%2B", "+")
name = name.replace("%25", "%")
return name
def connect_to_network():
"""
Starts WiFi STA + AP. STA connects to router, AP creates direct-connect network.
STA connection is non-blocking — use wifi_manager_task() for background reconnection.
"""
import network
# --- STA mode (connect to router) ---
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if wlan.isconnected():
print(f"Network already connected. IP: {wlan.ipconfig('addr4')}")
else:
try:
import env
print("Starting WiFi connection in background...")
wlan.connect(env.WIFI_SSID, env.WIFI_PASSWD)
except Exception as e:
print(f"WiFi connection error: {e}")
# --- AP mode (direct connection) ---
try:
import env
ap = network.WLAN(network.AP_IF)
ap.active(True)
ap.config(essid=env.AP_SSID, password=env.AP_PASSWD, authmode=3)
print(f"AP started: {env.AP_SSID} (IP: {ap.ifconfig()[0]})")
except Exception as e:
print(f"AP setup error: {e}")
return True
async def wifi_manager_task(on_connect=None):
"""
Background task that ensures WiFi stays connected.
Retries every WIFI_RETRY_INTERVAL seconds if disconnected.
Calls on_connect(ip_str) when WiFi transitions from disconnected to connected.
"""
import network
import asyncio
wlan = network.WLAN(network.STA_IF)
was_connected = wlan.isconnected()
while True:
try:
if not wlan.isconnected():
was_connected = False
print("WiFi disconnected, retrying connection...")
try:
import env
wlan.connect(env.WIFI_SSID, env.WIFI_PASSWD)
for _ in range(30):
await asyncio.sleep(1)
if wlan.isconnected():
break
except Exception as e:
print(f"WiFi reconnection failed: {e}")
if wlan.isconnected():
if not was_connected and on_connect:
try:
on_connect(wlan.ipconfig('addr4')[0])
except Exception as e:
print(f"on_connect callback error: {e}")
was_connected = True
except Exception as e:
print(f"WiFi manager error: {e}")
await asyncio.sleep(WIFI_RETRY_INTERVAL)
async def led_status_task(error_blinks):
"""LED status indicator.
error_blinks=0: solid off (no WiFi) / solid on (WiFi connected)
error_blinks>0: blink N times, 3s pause, repeat
"""
import network
import asyncio
from machine import Pin
led = Pin(2, Pin.OUT, value=0)
wlan = network.WLAN(network.STA_IF)
while True:
if error_blinks == 0:
led.value(1 if wlan.isconnected() else 0)
await asyncio.sleep(1)
else:
for _ in range(error_blinks):
led.on()
await asyncio.sleep(0.2)
led.off()
await asyncio.sleep(0.2)
await asyncio.sleep(3)