-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAllSerial IO Test.py
More file actions
98 lines (83 loc) · 2.5 KB
/
AllSerial IO Test.py
File metadata and controls
98 lines (83 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import serial
import serial.tools.list_ports
import time
import threading
ports = serial.tools.list_ports.comports()
ser = serial.Serial()
ser.baudrate = 19200
ser.timeout = 0.1
class RxThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global ser
try:
cts = ser.cts
dsr = ser.dsr
ri = ser.ri
dcd = ser.cd
while True:
rx = ser.read(255)
if len(rx) > 0:
print(F"RX: {rx}")
if (cts != ser.cts):
cts = not cts
print(F"CTS -> {cts}")
if (dsr != ser.dsr):
dsr = not dsr
print(F"DSR -> {dsr}")
if (ri != ser.ri):
ri = not ri
print(F"RING -> {ri}")
if (dcd != ser.cd):
dcd = not dcd
print(F"DCD -> {dcd}")
except Exception as e:
print("RX:", e)
return
class TxThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global ser
i = 0
print("Starting TX")
while True:
try:
ser.write(b"ABCDEFGH\r\n")
ser.rts = i == 1
ser.dtr = i == 3
i += 1
if i == 4:
i = 0
except Exception as e:
print("TX:", e)
del self
return
time.sleep(0.5)
while True:
time.sleep(0.25)
newports = serial.tools.list_ports.comports()
if newports != ports:
for newport in newports:
if not newport in ports:
print(F"Device new: {newport.device}")
if not ser.is_open:
print(F"Opening {newport.device}")
ser.port = newport.device
ser.open()
txthread = TxThread()
txthread.start()
rxthread = RxThread()
rxthread.start()
break
for port in ports:
if not port in newports:
print("device left", port.device)
if ser.is_open and ser.port == port.device:
print(F"Closing {ser.port}")
ser.close()
ports = newports
ser.close()
rxthread.join()
txthread.join()