-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserial_server.py
More file actions
143 lines (124 loc) · 4.41 KB
/
serial_server.py
File metadata and controls
143 lines (124 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from serial.threaded import *
from serial.tools import list_ports
import multiprocessing as mp
import copy
import time
def to_list(queue: mp.Queue):
queue_list = [queue.get() for _ in range(queue.qsize()) if queue]
return queue_list
class SerialServer(mp.Process):
def __init__(self, port=None, baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, queue: mp.Queue=None):
mp.Process.__init__(self)
self.recv_queue = queue
self.send_queue = mp.Queue()
self.port = mp.Queue()
self.port.put(port)
self.baudrate = baudrate
self.bytesize = bytesize
self.parity = parity
self.stopbits = stopbits
self._lock = mp.Lock()
self._running_event = mp.Event()
self._running_event.clear()
self._alive = mp.Event()
self._alive.set()
self._recording_data = mp.Event()
self._recording_data.clear()
try:
self.serv = serial.Serial(port=self.port.get(), baudrate=self.baudrate, bytesize=self.bytesize,
parity=self.parity, stopbits=self.stopbits)
except serial.SerialException as ee:
print(ee)
def run(self):
while self._alive.is_set():
if not self._running_event.is_set():
try: time.sleep(0.05)
except:
self._alive.clear()
self._running_event.clear()
self._recording_data.clear()
break
else:
self._open_serial()
while self.serv.is_open and self._running_event.is_set() and self._recording_data.is_set():
try:
self.recv_queue.put(self.serv.readline().decode("ascii"))
except:
self._running_event.clear()
self._recording_data.clear()
break
else: time.sleep(0.05)
def _open_serial(self):
with self._lock:
if not self.serv.is_open:
s = self.port.get()
self.serv.port = s
self.serv.baudrate = self.baudrate
self.serv.bytesize = self.bytesize
self.serv.parity = self.parity
self.serv.stopbits = self.stopbits
try:
self.serv.open()
except Exception as e:
print("Connection error {}".format(e))
self._running_event.clear()
self._recording_data.clear()
def send_data(self, line):
self._open_serial()
with self._lock:
if self.serv.is_open:
try:
self.serv.write("{}\n".format(line).encode("ascii"))
except:
self._recording_data.clear()
def is_server_pending(self):
return self._running_event.is_set()
def start_recv(self):
self._running_event.set()
def stop_recv(self):
self._running_event.clear()
def record_data_start(self):
self._recording_data.set()
def record_data_stop(self):
self._recording_data.clear()
def is_recording_data(self):
return self._recording_data.is_set()
def set_port(self, port: str = None):
with self._lock:
while not self.port.empty():
self.port.get()
self.port.put(port)
def join(self, timeout=None):
self._running_event.clear()
self._recording_data.clear()
self._alive.clear()
mp.Process.join(self, timeout)
@staticmethod
def list_ports():
return list_ports.comports()
if __name__ == "__main__":
coms = SerialServer.list_ports()
q = mp.Queue()
server = SerialServer(port=coms[0].device, baudrate=115200, queue=q)
i = 0
# l = [x for x in range(100001)]
server.start()
server.start_recv()
server.record_data_start()
while True:
try:
elem = q.get().rstrip() + " " + str(i)
# i = l.pop(1)
i += 1
print(elem)
try:
server.send_data(elem)
except Exception as e:
print("END {}".format(e))
break
except KeyboardInterrupt:
print("Interrupt")
break
server.stop_recv()
server.join()