-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patharanet_cloud.py
More file actions
442 lines (345 loc) · 14.8 KB
/
aranet_cloud.py
File metadata and controls
442 lines (345 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# Module for querying Aranet sensor data from the Aranet Cloud
# Currently only tested with Aranet CO2 sensors
# Reference: https://www.zabbix.com/la/integrations/aranet
import configparser
import json
import logging
import os
import pandas
import ssl
import time
from typing import Any, Dict, Optional
import urllib.error
import urllib.request
# Expiration time of the login cache in seconds
LOGIN_CACHE_EXPIRATION = 595
# Mappings of id to string for the Aranet sensor and telemetry data
# these can be obtained quering the 'metrics/<space id>' Aranet Cloud API URL
DEFAULT_METRICS_DICT = {"1": "temperature",
"2": "humidity",
"3": "CO2",
"4": "pressure"}
# Logger object
logger = logging.getLogger(__name__)
# SSL context
ssl_context = ssl.SSLContext()
class NotAuthorizedError(Exception):
"""Exception raised when a not authorized (HTTP 401) response is received."""
def __init__(self, message):
super().__init__(message)
class CacheExpired(Exception):
"""Exception raised if the cache has expired."""
def __init__(self, message):
super().__init__(message)
def get_login_data(cache_file) -> Dict[str, Any]:
"""Get the Aranet Cloud login data from the cache file.
Args:
cache_file (str or os.PathLike): A path-like object giving the pathname
of the cache file.
Raises:
CacheExpired: If the cache is older than LOGIN_CACHE_EXPIRATION.
Returns:
Dict[str, Any]: A Dict with the contents of the login data stored in
the cache.
"""
with open(cache_file) as f:
if LOGIN_CACHE_EXPIRATION is not None and LOGIN_CACHE_EXPIRATION > 0:
cache_stat = os.stat(f.fileno())
cache_time = time.time() - cache_stat.st_mtime
if cache_time >= LOGIN_CACHE_EXPIRATION:
raise CacheExpired("Login cache expired")
return json.loads(f.read())
def save_login_data(cache_file, login_data: str) -> None:
"""Save the Aranet Cloud login data to the cache file.
Args:
cache_file (str or os.PathLike): A path-like object giving the pathname
of the cache file.
login_data (str): The login data as returned by the Aranet Cloud.
"""
with open(cache_file, 'w') as f:
f.write(login_data)
logger.info("Saved login data to cache file")
def save_login_data_no_raise(cache_file: Optional,
login_data: str) -> None:
"""Save the Aranet Cloud login data to the cache file (Exception safe).
Args:
cache_file (str, os.PathLike, or None): A path-like object giving the
pathname of the cache file, or None. If None the function does
nothing.
login_data (str): The login data as returned by the Aranet Cloud.
"""
if cache_file is None:
return
try:
save_login_data(cache_file, login_data)
except Exception as e:
logger.error("Error saving the loging data to cache: " + str(e))
def login(aranet_conf, cache_file: Optional = None) -> Dict[str, Any]:
"""Login into the Aranet Cloud and obtain the login data.
If cache_file is not None, this function saves the login data to the cache.
Args:
aranet_conf: Object of the Aranet Cloud configuration.
cache_file (str, os.PathLike, or None): A path-like object giving the
pathname of the cache file. Defaults to None.
Raises:
NotAuthorizedError: If the login request results in a not authorized
(HTTP 401) response.
Returns:
Dict[str, Any]: A Dict with the contents of the login data returned by
the Aranet Cloud.
"""
logger.info("Making login request to Aranet Cloud")
req = urllib.request.Request(
url=aranet_conf["DEFAULT"]["endpoint"] + "/user/login",
method="POST", headers={"Content-Type": "application/json"},
data=json.dumps({"login": aranet_conf["DEFAULT"]["username"],
"passw": aranet_conf["DEFAULT"]["password"]}).encode())
try:
with urllib.request.urlopen(req, context=ssl_context) as f:
data = f.read().decode()
except urllib.error.HTTPError as e:
raise NotAuthorizedError(str(e.reason)) if e.code == 401 \
else Exception("Cannot login into Aranet Cloud: " +
str(e.reason))
login_data = json.loads(data)
if cache_file is not None:
save_login_data_no_raise(cache_file, data)
return login_data
def get_cloud_space_id(aranet_conf, login_data: Dict[str, Any]) -> str:
"""Get the Aranet Cloud space ID from the login data.
The `aranet_conf` configuration object should contain the parameter
`space_name` with the name of the space. Then, this method searches in
`login_data` for this space name and returns the corresponding space ID.
Args:
aranet_conf: Object of the Aranet Cloud configuration.
login_data (Dict[str, Any]): The login data.
Raises:
Exception: If the space name indicated in aranet_conf is not found.
Returns:
str: The Aranet Cloud space ID corresponding to the space name
indicated in aranet_conf.
"""
space_name = aranet_conf["DEFAULT"]["space_name"]
spaces = login_data["spaces"]
if len(spaces) == 0:
raise Exception("Aranet Cloud spaces list is empty")
elif len(spaces) == 1:
spaces_item = spaces.items()
id = list(spaces_item)[0][0]
name = list(spaces_item)[0][1]
if name != space_name:
logger.warn("Aranet Cloud space expected name was " + "\"" +
space_name + "\", but name is " + name)
return id
else:
id_list = [(x, y) for (x, y) in spaces.items() if y == space_name]
if len(id_list) == 0:
raise Exception("Aranet Cloud space list does not have the " +
"\"" + space_name + "\" space")
elif len(id_list) > 1:
raise Exception("Aranet Cloud space list has more than one " +
"space with name \"" + space_name + "\"")
else:
return id_list[0][0]
def __aranet_cloud_request(func):
"""Decorator function for handling the Aranet Cloud login and login cache.
The decorator will obtain the Aranet Cloud authentication token (by
making a reequest to the Aranet Cloud or from the cache) and then
call the function `func`.
The function `func` must receive an aranet configuration object as its
first parameter.
When using this decorator `func` will receive the additional keyword
parameters `cloud_space_id` and `auth_token`
Args:
func (function): Function to call.
Returns:
A function
"""
def do_request(aranet_conf, *args, **kwargs):
login_cache_file = aranet_conf["DEFAULT"].get("login_cache_file")
try:
login_data = get_login_data(login_cache_file)
cached = True
except Exception as e:
logger.info(str(e))
login_data = login(aranet_conf, login_cache_file)
cached = False
func_kwargs = {
'cloud_space_id': get_cloud_space_id(aranet_conf, login_data),
'auth_token': login_data["auth"]
}
try:
data = func(aranet_conf, *args, **kwargs, **func_kwargs)
except NotAuthorizedError as e:
logger.error(str(e))
if cached:
# maybe the login data expired, try login in again
login_data = login(aranet_conf, login_cache_file)
data = func(aranet_conf, *args, **kwargs, **func_kwargs)
else:
raise e
return data
return do_request
def __aranet_cloud_request_json(func):
"""Decorator function for Aranet Cloud API calls returning JSON.
The decorator will call func for obtaining an Aranet Cloud URL, then make
the request, and finally return a Dict representing the obtained JSON.
This decorator uses the decorator __aranet_cloud_request for obtaining
the required authentication token to do the request.
The function `func` must receive an aranet configuration object as its
first parameter and must return the string of the corresponding Aranet
Cloud API URL.
Args:
func (function): Function to call.
Returns:
A function
"""
@__aranet_cloud_request
def do_request(aranet_conf, *args, **kwargs):
req = urllib.request.Request(
url=func(aranet_conf, *args, **kwargs),
method="GET",
headers={"Content-Type": "application/json",
"Authorization": "Bearer " + kwargs['auth_token']})
try:
with urllib.request.urlopen(req, context=ssl_context) as f:
return json.loads(f.read().decode())
except urllib.error.HTTPError as e:
raise NotAuthorizedError(str(e.reason)) if e.code == 401 \
else Exception("Request error: " + str(e.reason))
return do_request
@__aranet_cloud_request_json
def get_sensors_info(
aranet_conf, fields: list[str] = ['metrics', 'telemetry', 'name'],
**kwargs) -> Dict[str, Any]:
"""Get information of sensors from the Aranet Cloud.
Args:
aranet_conf: Object of the Aranet Cloud configuration.
fields (list[str], optional): Fields of the data to request. The fields
available in the Aranet Cloud are:
- `alarms` Alarms raised by the sensor.
- `devices` List of base stations to which the
sensor is paired.
- `files` Number of files stored in the sensor.
- `integrations` ?
- `metrics` Latest data captured by the sensor,
e.g., CO2, temperature, humidity, pressure.
- `name` Name of the sensor.
- `position` Localization of the sensor.
- `rules` Alarm rules for the sensor.
- `skills` ?
- `tagids` Tags identifiers for the tags of the sensor.
- `telemetry` Telemetry data, e.g., battery, RSSI.
- `txint` ?
- `virtual` ?
The defaults is ['metrics', 'telemetry', 'name'].
Raises:
NotAuthorizedError: If the request results in a not authorized
(HTTP 401) response.
Returns:
Dict[str, Any]: A Dict with the contents of the response.
"""
return aranet_conf["DEFAULT"]["endpoint"] + \
"/sensors/" + kwargs['cloud_space_id'] + \
"?fields=" + ','.join(fields)
@__aranet_cloud_request
def get_sensor_data(
aranet_conf, sensor_id, from_time, to_time, timezone="0000",
metrics=list(DEFAULT_METRICS_DICT.keys()),
**kwargs) -> pandas.DataFrame:
"""Get the data for a given sensor from the Aranet Cloud.
Args:
aranet_conf: Object of the Aranet Cloud configuration.
sensor_id (str or int): The ID of the sensor.
from_time (str): The earliest time of the data in the ISO 8601 format.
to_time (str): The latest time of the data in the ISO 8601 format.
timezone (str, optional): The timezone in format hhmm, where hh are the
hours, and mm the minutes. Defaults to "0000".
metrics (list[str], optional): List of metrics IDs of the data to
recover. Defaults to list(DEFAULT_METRICS_DICT.keys()).
Raises:
NotAuthorizedError: If the request results in a not authorized
(HTTP 401) response.
Returns:
pandas.DataFrame: A pandas dataframe with the sensor data.
"""
logger.info("Making request for sensor " + str(sensor_id) + " " +
"data to Aranet Cloud")
aranet_query_url = \
aranet_conf["DEFAULT"]["endpoint"] + \
"/sensors/" + kwargs['cloud_space_id'] + \
"/sensor/" + str(sensor_id) + "/export?" + \
"metric=" + ",".join(metrics) + \
"&from=" + from_time + \
"&to=" + to_time + \
"&timezone=" + timezone
req = urllib.request.Request(
url=aranet_query_url,
method="GET",
headers={"Authorization": "Bearer " + kwargs["auth_token"]})
try:
with urllib.request.urlopen(req, context=ssl_context) as f:
# return data in pandas dataframe
df = pandas.read_csv(f, sep=";", header=1)
logger.info("Downloaded " + str(len(df)) + " data records for " +
"sensor " + str(sensor_id) + " from Aranet cloud")
return df
except urllib.error.HTTPError as e:
raise NotAuthorizedError(str(e.reason)) if e.code == 401 \
else Exception("Error obtaining the sensor data: " + str(e.reason))
@__aranet_cloud_request_json
def get_metrics(aranet_conf, **kwargs) -> Dict[str, Any]:
"""Get available metrics from the Aranet Cloud.
Args:
aranet_conf: Object of the Aranet Cloud configuration.
Raises:
NotAuthorizedError: If the request results in a not authorized
(HTTP 401) response.
Returns:
Dict[str, Any]: A Dict with the contents of the response.
"""
return aranet_conf["DEFAULT"]["endpoint"] + \
"/metrics/" + kwargs["cloud_space_id"]
@__aranet_cloud_request_json
def get_rules(aranet_conf, **kwargs) -> Dict[str, Any]:
"""Get rules from the Aranet Cloud.
Args:
aranet_conf: Object of the Aranet Cloud configuration.
Raises:
NotAuthorizedError: If the request results in a not authorized
(HTTP 401) response.
Returns:
Dict[str, Any]: A Dict with the contents of the response.
"""
return aranet_conf["DEFAULT"]["endpoint"] + \
"/alarms/" + kwargs["cloud_space_id"] + "/rules"
@__aranet_cloud_request_json
def get_gateways(aranet_conf, **kwargs) -> Dict[str, Any]:
"""Get gateways (base stations) information from the Aranet Cloud.
Args:
aranet_conf: Object of the Aranet Cloud configuration.
Raises:
NotAuthorizedError: If the request results in a not authorized
(HTTP 401) response.
Returns:
Dict[str, Any]: A Dict with the contents of the response.
"""
return aranet_conf["DEFAULT"]["endpoint"] + \
"/gateways/" + kwargs["cloud_space_id"]
def read_aranet_conf(file):
"""Reads the Aranet Cloud configuration file
Args:
file (str or os.PathLike): A path-like object giving the pathname of
the configuration file.
Returns:
[configparser.ConfigParser]: A ConfigParser object with the
configuration.
"""
aranet_conf = configparser.ConfigParser(
defaults={
"endpoint": "https://aranet.cloud/api"
}
)
with open(file) as f:
aranet_conf.read_file(f)
return aranet_conf