1212
1313
1414class Megaplan_Auth :
15- __slots__ = ['login' , 'password' , 'host' ,
16- '__proto' , 'accessid' , 'secretkey' , 'domain' ]
15+ __slots__ = [
16+ 'login' , 'password' , 'host' ,
17+ '__proto' , 'accessid' , 'secretkey' , 'domain'
18+ ]
1719
18- def __init__ (self , login , password , host , proto = 'https://' ):
19- self .login = login
20- self .password = self .__password_crypt (password )
21- self .host = host
22- self .__proto = proto
23- self .domain = self .__proto + self .host
24- self .accessid , self .secretkey = self .get_key ()
20+ def __init__ (self , host : str , proto = 'https://' ):
21+ self .host , self .__proto = host , proto
22+ self .domain = f"{ self .__proto } { self .host } "
2523
26- def __password_crypt (self , password ) :
24+ def __password_crypt (self , password : str ) -> str :
2725 return hashlib .md5 (password .encode ()).hexdigest ()
2826
29- def __get_otk (self ):
30- response = requests .post (self .domain + '/BumsCommonApiV01/User/createOneTimeKeyAuth.api' , headers = {
31- 'Accept' : 'application/json' }, data = {'Login' : self .login , 'Password' : self .password })
32- try :
33- resp_json = response .json ()
34- except json .decoder .JSONDecodeError as jsde :
35- print (response .text )
36- raise jsde
27+ def __get_otk (self , login : str , encrupy_password : str ):
28+ response = requests .post (
29+ url = f'{ self .domain } /BumsCommonApiV01/User/createOneTimeKeyAuth.api' ,
30+ headers = {'Accept' : 'application/json' },
31+ data = {'Login' : login , 'Password' : encrupy_password })
32+ resp_json = response .json ()
3733 data = resp_json .get ("data" )
3834 status = resp_json .get ("status" )
3935 code = status .get ("code" )
4036 if code == "error" :
41- raise Exception (status .get ("message" ))
37+ raise ValueError (status .get ("message" ))
4238 else :
4339 return data
4440
45- def get_key (self ):
46- _authdata = requests .post (self .domain + '/BumsCommonApiV01/User/authorize.api' , headers = {
47- 'Accept' : 'application/json' }, data = {'Login' : self .login , 'Password' : self .password , 'OneTimeKey' : self .__get_otk ()}).json ()['data' ]
48- _AccessId = _authdata ['AccessId' ]
49- _SecretKey = _authdata ['SecretKey' ].encode ()
41+ def get_key (self , login : str , password : str ):
42+ encrupy_password = self .__password_crypt (password )
43+ response = requests .post (
44+ url = f'{ self .domain } /BumsCommonApiV01/User/authorize.api' ,
45+ headers = {'Accept' : 'application/json' },
46+ data = {'Login' : login , 'Password' : encrupy_password , 'OneTimeKey' : self .__get_otk (login , encrupy_password )})
47+ resp_json = response .json ()
48+ _AccessId = resp_json ["data" ]["AccessId" ]
49+ _SecretKey = resp_json ["data" ]["SecretKey" ]
5050 return _AccessId , _SecretKey
5151
5252
5353class Megaplan_Api :
54- __slots__ = ['_HOST' , '_HOST_full' , '_today' ,
55- 'AccessId' , 'SecretKey' , 'host' , 'proto' , 'domain' ]
54+ __slots__ = [
55+ '_today' , 'AccessId' , 'SecretKey' ,
56+ 'host' , '__proto' , 'domain' , '_today'
57+ ]
5658
57- def __init__ (self , AccessId , SecretKey , host , proto = 'https://' ):
58- self .host = host
59- self .proto = proto
60- self .domain = self .proto + self .host
61- self ._today = formatdate (time .time ())
62- self .AccessId = AccessId
63- self .SecretKey = SecretKey
59+ def __init__ (self , AccessId : str , SecretKey : str , host : str , proto = 'https://' ):
60+ self .host , self .__proto = host , proto
61+ self .domain = f"{ self .__proto } { self .host } "
62+ self .AccessId , self .SecretKey = AccessId , SecretKey .encode ()
6463
6564 def query_hasher (self , request_type , uri , payload = None ):
65+ self ._today = formatdate (time .time ())
6666 if payload :
67- uri = uri + '?' + urlencode (payload , doseq = True )
68- query = request_type + '\n \n ' + 'application/x-www-form-urlencoded' + '\n ' + \
69- self ._today + '\n ' + self .host + uri
67+ uri = f"{ uri } ?{ urlencode (payload , doseq = True )} "
68+ query = f"{ request_type } \n \n application/x-www-form-urlencoded\n { self ._today } \n { self .host } { uri } "
7069 hash_query = base64 .b64encode (hmac .new (
7170 self .SecretKey ,
7271 query .encode (),
@@ -75,27 +74,31 @@ def query_hasher(self, request_type, uri, payload=None):
7574 Auth_Heared = {
7675 'Date' : self ._today ,
7776 'Accept' : 'application/json' ,
78- 'X-Authorization' : self .AccessId + ':' + hash_query ,
77+ 'X-Authorization' : f" { self .AccessId } : { hash_query } " ,
7978 'Content-Type' : 'application/x-www-form-urlencoded' ,
8079 'accept-encoding' : 'gzip, deflate, br'
8180 }
8281 return Auth_Heared
8382
84- def get_query (self , uri_query , payload = '' ):
83+ def get_query (self , uri_query : str , payload = None ):
8584 head = self .query_hasher ('GET' , uri_query , payload )
8685 response = requests .get (
87- self .domain + uri_query ,
86+ url = f" { self .domain } { uri_query } " ,
8887 headers = head ,
89- params = urlencode (payload , doseq = True ), timeout = 60 )
88+ params = urlencode (payload , doseq = True ) if payload else None , timeout = 60 )
9089 resp_json = response .json ()
9190 status = resp_json .get ("status" )
9291 if status and status .get ("code" ) == "error" :
9392 raise ValueError (status ["message" ])
9493 return resp_json .get ("data" )
9594
96- def post_query (self , uri_query , payload ):
95+ def post_query (self , uri_query : str , payload : dict ):
9796 head = self .query_hasher ('POST' , uri_query , None )
98- return requests .post (self .domain + uri_query , headers = head , data = payload ).json ()
97+ response = requests .post (
98+ url = f"{ self .domain } { uri_query } " ,
99+ headers = head ,
100+ data = payload )
101+ return response .json ()
99102
100103 def __repr__ (self ):
101104 return f"<API [{ self .domain } ]>"
0 commit comments