-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcomms.py
More file actions
77 lines (63 loc) · 2.1 KB
/
comms.py
File metadata and controls
77 lines (63 loc) · 2.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from typing import Any, Callable, Coroutine
import ubinascii
import network
import espnow
from .wifi_reset import wifi_reset
class Comms:
def __init__(self):
# A WLAN interface must be active to send()/recv()
self.sta = wifi_reset() # Reset wifi to AP off, STA on and disconnected
self.e = espnow.ESPNow()
self.e.active(True)
def reset(self) -> None:
self.__init__()
def get_mac(self) -> str:
wlan_sta = network.WLAN(network.STA_IF)
wlan_sta.active(True)
wlan_mac = wlan_sta.config("mac")
mac_str = ubinascii.hexlify(wlan_mac).decode()
print(f"MAC address: {mac_str}")
return mac_str
async def advertise(self):
print("Advertising")
async def scan(self):
print("Scanning")
async def send(
self,
message: str,
mac: str | bytes | None = None, # Send to broadcast by default
):
peer_mac: bytes = b""
if mac is None:
peer_mac = b"\xFF\xFF\xFF\xFF\xFF\xFF"
else:
if isinstance(mac, str):
peer_mac = bytes.fromhex(mac)
else:
peer_mac = mac
try:
self.e.add_peer(peer_mac) # Must add_peer() before send()
print("Peer added")
except OSError as e:
if "ESP_ERR_ESPNOW_EXIST" in str(e):
print("Peer already added")
else:
raise e
print(f"Sending to {peer_mac}")
try:
self.e.send(peer_mac, message)
self.e.send(peer_mac, b"end")
except OSError as e:
if "ETIMEDOUT" in str(e):
print("Timeout")
print("Sent message")
async def receive(self, on_receive: Callable[[bytes, str], None]):
while True:
host, msg = self.e.recv()
if msg: # msg == None if timeout in recv()
# convert message to string
msg_str = msg.decode()
# print(host, msg_str)
if msg == b"end":
break
on_receive(host, msg_str)