-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_resource.py
More file actions
95 lines (65 loc) · 2.93 KB
/
api_resource.py
File metadata and controls
95 lines (65 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import subprocess
import logging
from flask import request
from flask_restful import Resource
import helpers.utils as utils
from jsonschema import validate, FormatChecker
class APIResource(Resource):
''' Resource class with API KEY Authentication supported
'''
__name__ = 'APIResource'
_config = None
_logger = logging.getLogger(__name__)
_json_schema = {}
def __init__(self, *args) -> None:
super().__init__()
self._config = utils.json_to_namespace(args[0][0])
self._json_schema = utils.json_to_object(args[0][1])
def get(self):
return self.process_request(self.process_get)
def post(self):
return self.process_request(self.process_post)
def put(self):
return self.process_request(self.process_put)
def process_request(self, callback):
api_key = self.get_header_apikey()
if api_key is not None and api_key == self.get_config_apikey():
try:
pay_load = self.process_payload()
json_pay_load = utils.json_namespace_to_object(pay_load)
validate(instance=json_pay_load, schema=self._json_schema, format_checker=FormatChecker())
return callback()
except Exception as error:
self._logger.error (f'Receive invalid request with error: {error}')
return self.process_invalid_schema()
return self.process_invalid_apikey()
def process_post(self):
payload = self.process_payload()
command = [self._config.target] + payload.args
self._logger.info(f'Running command: {command}')
result = subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Get return code from command
return_code = result.returncode
self._logger.info(f'Return code: {return_code}')
self._logger.debug(f'Result: {result.stdout}')
return {'status': 200, 'data': result.stdout, 'code': return_code}, 200
def process_get(self):
return {'status': 501, 'message': 'Method not supported'}, 501
def process_put(self):
return {'status': 501, 'message': 'Method not supported'}, 501
def process_payload(self):
return utils.json_to_namespace(request.get_data(as_text=True))
def process_invalid_schema(self):
return {'status': 400, 'message': 'Invalid request data'}, 400
def process_invalid_apikey(self):
return {'status': 401, 'message': 'Authorization failed'}, 401
def get_config_apikey(self):
if self._config.api_key.key:
return utils.base64_decode(self._config.api_key.key)
else:
return ""
def get_header_apikey(self):
api_key = ""
if self._config.api_key.header and self._config.api_key.header in request.headers:
api_key = request.headers[self._config.api_key.header]
return api_key