-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
186 lines (164 loc) · 6.84 KB
/
__init__.py
File metadata and controls
186 lines (164 loc) · 6.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import paho.mqtt.client as mqtt
from paho.mqtt.enums import MQTTErrorCode
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from typing import *
from pathlib import Path
import os, signal, logging
from threading import Event, Thread
import inspect, traceback
@dataclass_json
@dataclass
class MQTT:
data_sources: List[str]
broker: str
port: int
timeout: int
dis_timeout: int
loglevel: Optional[str] = None
@dataclass_json
@dataclass
class DAEMON:
loglevel: Optional[str] = None
class MAGDaemon(mqtt.Client):
""" Not actually a daemon but rather a wrapper for normal MAGLaboratory items """
@dataclass_json
@dataclass
class Config:
name: str
comment: str
mqtt: MQTT
daemon: DAEMON
_connect_evt = Event()
exit_evt = Event()
checkup_evt = Event()
logger = None
config = None
_disconnect_thread = None
def set_config(self, long_name, cfg_file_name):
files = []
top_path = Path(inspect.stack().pop().filename).parent.absolute()
try:
l_logger = self.__logger
except AttributeError:
l_logger = self.logger
l_logger.debug(f"Top path at {top_path}")
home_dir_config = f"{Path.home()}{os.sep}.config{os.sep}{long_name}"
if top_path.is_relative_to("/usr"):
files = [f"/etc/{long_name}", home_dir_config]
else:
paths = [home_dir_config, top_path]
for path in paths:
file = f"{path}{os.sep}{cfg_file_name}.json"
l_logger.debug(f"Attempting to read {file}")
if not Path(file).is_file():
l_logger.debug("Not a file")
continue
with open(file, "r") as configFile:
l_logger.info(f"Reading config {file}")
self.config = self.Config.from_json(configFile.read())
self.active_config_file = file
l_logger.debug(f"{self.config}")
break
else:
raise FileNotFoundError("Could not locate configuration file.")
def config_log(self, log_obj=None, config_obj=None):
if log_obj is None:
log_obj = self.logger
if config_obj is None:
config_obj = self.config
name = log_obj.name
try:
loglevel = config_obj.loglevel.upper()
""" check if this is a valid loglevel """
if type(logging.getLevelName(loglevel)) is int:
log_obj.setLevel(loglevel)
else:
self.__logger.warning(f"{name} log level not configured. Defaulting to WARNING.")
self.log_obj.setLevel("WARNING")
except (KeyError, AttributeError) as e:
self.__logger.warning(f"{name} log level not configured. Defaulting to WARNING. Caught: {str(e)}")
log_obj.setLevel("WARNING")
def __init__(self, long_name, cfg_file_name):
"""
This is the init function
Arguments:
long_name: a long name string representing the name of this module
cfg_file_name: the name of the configuration file
"""
""" set logging for this module """
self.__logger = logging.getLogger(__name__)
self.__logger.setLevel(logging.DEBUG) # starts as debug when in devel
""" set signal handlers """
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
""" det. where the config files should be """
if self.config is None:
self.set_config(long_name, cfg_file_name)
""" log levels """
self.config_log(self.__logger, self.config.daemon)
self._mqtt_logger = logging.getLogger("PAHO")
self.config_log(self._mqtt_logger, self.config.mqtt)
self.__logger.info("Starting MQTT")
super().__init__(mqtt.CallbackAPIVersion.VERSION2, self.config.name)
def signal_handler(self, signum, _):
""" signal handler helper function """
self.__logger.critical(f"Caught a deadly signal: {signal.Signals(signum).name}")
self._connect_evt.set()
self.exit_evt.set()
self._thread_terminate = True
def on_log(self, client, userdata, level, buf):
if level == mqtt.MQTT_LOG_DEBUG:
self._mqtt_logger.debug(buf)
elif level == mqtt.MQTT_LOG_INFO:
self._mqtt_logger.info(buf)
elif level == mqtt.MQTT_LOG_NOTICE:
self._mqtt_logger.info(buf)
elif level == mqtt.MQTT_LOG_WARNING:
self._mqtt_logger.warning(buf)
else:
self._mqtt_logger.error(buf)
def on_connect(self, client, userdata, flags, rc, properties):
""" subscribes to the relevant channels """
if rc.is_failure:
self.__logger.warning(f"Temporary failure to connect: {str(rc)}")
else:
self.__logger.info(f"Connected {str(rc)}")
for src in self.config.mqtt.data_sources:
self.__logger.debug(f"Subscribing to {src}")
self.subscribe(src)
self._connect_evt.set()
try:
self.__logger.debug("Collecting the disconnect thread.")
self._disconnect_thread.join()
self.__logger.info("Collected the disconnect thread")
except AttributeError as e:
self.__logger.debug(f"Expected exception: {e}")
pass
def on_disconnect(self, client, userdata, flags, rc, properties):
""" handles mqtt disconnects """
self._connect_evt.clear()
if rc.is_failure:
self.__logger.debug(f"Received: {rc}")
self.__logger.debug(traceback.extract_stack())
if self._disconnect_thread is None or not self._disconnect_thread.is_alive():
self.__logger.warning("Unexpected disconnect. Starting disconnect timer.")
self._disconnect_thread = Thread(target=self._discon_thread_fun)
self._disconnect_thread.start()
else:
self.__logger.debug("Duplicate on_disconnect call")
else:
self.__logger.info("Disconnected gracefully")
def _discon_thread_fun(self):
""" sets the exit event if the timeout is not set in time """
self.__logger.debug("Disconnect timer started.")
if not self._connect_evt.wait(self.config.mqtt.dis_timeout):
self.__logger.critical("Disconnect timer triggering program exit.")
self.exit_evt.set()
self._thread_terminate = True
""" some parts are hung on the connect event, so we set it. """
self._connect_evt.set()
self.__logger.debug("Disconnect timer ended.")
def main(self):
self.__logger.info("MQTT Connecting")
self.connect(host=self.config.mqtt.broker, port=self.config.mqtt.port, keepalive=self.config.mqtt.timeout)