-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsocket_listener.py
More file actions
36 lines (29 loc) · 1.04 KB
/
socket_listener.py
File metadata and controls
36 lines (29 loc) · 1.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import os
import socket
import time
from dotenv import load_dotenv
from logger import Logger
load_dotenv()
URX_HOST = os.getenv("URX_HOST")
URX_PORT = int(os.getenv("URX_PORT"))
LISTENER_SLEEP_TIME = int(os.getenv("LISTENER_SLEEP_TIME")) if os.getenv("LISTENER_SLEEP_TIME") and int(
os.getenv("LISTENER_SLEEP_TIME")) >= 0 else 1
logger = Logger("Socket Listener")
robot_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
robot_conn.connect((URX_HOST, URX_PORT))
except ConnectionRefusedError:
logger.error(f"Failed to connect to robot at {URX_HOST}:{URX_PORT}")
exit(1)
logger.info(f"Connected to robot at {URX_HOST}:{URX_PORT}")
while True:
data = robot_conn.recv(1024)
if data is not None:
if type(data) is bytes:
try:
logger.info(f"Message received: {data.decode('utf-8')}")
except ValueError:
logger.info(f"Message received: {data}")
elif type(data) is str:
logger.info(f"Message received: {data}")
time.sleep(LISTENER_SLEEP_TIME)