-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpushNotifications.py
More file actions
130 lines (104 loc) · 4.59 KB
/
pushNotifications.py
File metadata and controls
130 lines (104 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
Developed by Pablo G. Thuillier
@pablogthuillier
https://github.com/pablogthuillier
For using this library you can write lines like these:
priv_filepath = "certifications/MyAppCert.pem"
cert_filepath = "certifications/MyAppKey.pem"
ios = IOSPushNotifications(privatekey_filepath=priv_filepath,certificate_filepath=cert_filepath)
ios.send_push_notification(message="This is a test push",token_device="abf02b")
Optional arguments in iOS push might be:
- badge (indicates the number badge in the app icon)
- sound (indicates the sound it must be sounded)
"""
from OpenSSL import SSL
from gcm import GCM
from socket import socket
import os
import json
import struct, binascii
class AndroidPushNotifications:
"""
Constructor with one argument.
api_key: the API KEY string proporcionated by Google
"""
def __init__(self,api_key=None):
self.__api_key = api_key
def send_push_notification(self,**kwargs):
try:
if not len(kwargs):
raise Exception("Message and token must be given")
if "message" not in kwargs:
raise Exception("Message not found")
if "token_device" not in kwargs:
raise Exception("Token device not found")
if not self.__api_key:
raise Exception("Api key not found")
data = {'message': kwargs["message"].encode('utf-8')}
if len(kwargs)>2:
for key in kwargs:
if key!="message" and key!="token_device":
data[key] = kwargs[key]
gcm = GCM(self.__api_key)
gcm.plaintext_request(registration_id=kwargs["token_device"], data=data)
return {'status':'success','response':'android_push_sent'}
except Exception as e:
return {'status':'failed','response':'push_notification_android_error','description':e.args[0]}
class IOSPushNotifications:
"""
Constructor with two arguments.
certificate_filepath: the certificate filepath (e.g. "certificates/MyAppCert.pem")
privatekey_filepath: the private key filepath (e.g. "certificates/MyAppKey.pem")
"""
def __init__(self,privatekey_filepath=None,certificate_filepath=None,sandbox=False):
self.__private_key = ""
self.__certificate = ""
self.__sandbox = sandbox
if privatekey_filepath and certificate_filepath:
privatekey_file = os.path.join(os.path.dirname(__file__), privatekey_filepath)
certificate_file = os.path.join(os.path.dirname(__file__), certificate_filepath)
if os.path.isfile(privatekey_file) and os.path.isfile(certificate_file):
self.__private_key = privatekey_file
self.__certificate = certificate_file
def send_push_notification(self,**kwargs):
try:
if not len(kwargs):
raise Exception("Message and token device must be given")
if "message" not in kwargs:
raise Exception("Message not found")
if "token_device" not in kwargs:
raise Exception("Token device not found")
if not self.__private_key:
raise Exception("Private key file not found")
if not self.__certificate:
raise Exception("Certificate file not found")
if self.__sandbox:
apnHost = "gateway.sandbox.push.apple.com"
else:
apnHost = "gateway.push.apple.com"
ctx=SSL.Context(SSL.SSLv3_METHOD)
ctx.use_certificate_file(self.__certificate)
ctx.use_privatekey_file(self.__private_key)
payload = {}
aps = {}
aps["alert"] = kwargs["message"].encode('utf-8')
if len(kwargs)>2:
for key in kwargs:
if key!="message" and key!="token_device":
aps[key] = kwargs[key]
payload["aps"] = aps
token = binascii.unhexlify(kwargs["token"])
payloadstr = json.dumps(payload, separators=(',',':'))
payloadLen = len(payloadstr)
fmt = "!cH32sH%ds" % payloadLen
command = '\x00'
msg = struct.pack(fmt, command, 32, token, payloadLen, payloadstr)
sock = socket()
s = SSL.Connection(ctx, sock)
s.connect((apnHost, 2195))
s.send(msg)
s.shutdown()
s.close()
return {'status':'success','response':'ios_push_sent'}
except Exception as e:
return {'status':'failed','response':'push_notification_ios_error','description':e.args[0]}