forked from capocchi/DEVSimPy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConnectionThread.py
More file actions
193 lines (153 loc) · 5.54 KB
/
ConnectionThread.py
File metadata and controls
193 lines (153 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# -*- coding: utf-8 -*-
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
# ConnectionThread.py ---
# --------------------------------
# Copyright (c) 2020
# L. CAPOCCHI (capocchi@univ-corse.fr)
# SPE Lab - SISU Group - University of Corsica
# --------------------------------
# Version 2.0 last modified: 03/15/20
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
#
# GENERAL NOTES AND REMARKS:
#
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
#
# GLOBAL VARIABLES AND FUNCTIONS
#
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
import os
import sys
import wx
from threading import Thread
import urllib.request, urllib.parse, urllib.error
import tempfile
import zipfile
import gettext
_ = gettext.gettext
__version_lib__ = 0.2
class unzip:
def __init__(self, verbose = False, percent = 10):
self.verbose = verbose
self.percent = percent
def extract(self, file, dir):
if not dir.endswith(':') and not os.path.exists(dir):
os.mkdir(dir)
zf = zipfile.ZipFile(file)
# create directory structure to house files
self._createstructure(file, dir)
num_files = len(zf.namelist())
percent = self.percent
divisions = 100 / percent
perc = int(num_files / divisions)
# extract files to directory structure
for i, name in enumerate(zf.namelist()):
if self.verbose == True:
sys.stdout.write(_("Extracting %s\n")%name)
elif perc > 0 and (i % perc) == 0 and i > 0:
complete = int (i / perc) * percent
sys.stdout.write(_("%s \% complete\n")%complete)
if not name.endswith('/'):
outfile = open(os.path.join(dir, name), 'wb')
outfile.write(zf.read(name))
outfile.flush()
outfile.close()
zf.close()
def _createstructure(self, file, dir):
self._makedirs(self._listdirs(file), dir)
def _makedirs(self, directories, basedir):
""" Create any directories that don't currently exist """
for dir in directories:
curdir = os.path.join(basedir, dir)
if not os.path.exists(curdir):
os.mkdir(curdir)
def _listdirs(self, file):
""" Grabs all the directories in the zip structure
This is necessary to create the structure before trying
to extract the file to it. """
zf = zipfile.ZipFile(file)
dirs = []
for name in zf.namelist():
if name.endswith('/'):
dirs.append(name)
zf.close()
dirs.sort()
return dirs
class UpgradeLibThread(Thread):
""" Worker thread class to attempt upgrade the libraries"""
def __init__(self, parent):
""" Initialize the worker thread.
"""
Thread.__init__(self)
self._parent = parent
self.setDaemon(True)
self.start()
def LoadZip(self, url):
"""
"""
temp = tempfile.NamedTemporaryFile()
zip = urllib.request.urlopen(url).read()
try:
temp.write(zip)
temp.seek(0)
finally:
dlg = wx.MessageDialog(None, _("Are you sure to upgrade librairies from new version ?"),
_("Upgrade Manager"),
wx.YES_NO | wx.YES_DEFAULT | wx.ICON_QUESTION)
if dlg.ShowModal() == wx.ID_YES:
unzipper = unzip()
zipsource = temp.name
zipdest = DOMAIN_PATH
unzipper.extract(zipsource, zipdest)
temp.close()
def CheckVersion(self, text):
""" Called by a worker thread which check DEVSimPy web page on the internet.
"""
if text is None:
# We can't get to the internet?
dial = wx.MessageDialog(None, _("Unable to connect to the internet."), _('Update Manager'), wx.OK | wx.ICON_ERROR)
dial.ShowModal()
else:
# A bit shaky, but it seems to work...
url = "http://devsimpy.googlecode.com/files/DEVSimPy_lib"
prefix = "_"
suffix = ".zip"
indx = text.find(url)
indx2 = text[indx:].find(prefix)
indx3 = text[indx:].find(suffix)
version = text[indx+indx2+len(prefix+suffix):indx+indx3]
if float(version) > float(__version_lib__):
# Time to upgrade maybe?
strs = _("A new version of DEVSimPy libraries is available!\n\n Do you want to download and install it ?")
dlg = wx.MessageDialog(None, strs, _("Update Manager"), wx.YES_NO | wx.YES_DEFAULT | wx.ICON_QUESTION)
else:
# No upgrade required
strs = _("You have the latest version of DEVSimPy libraries.")
dlg = wx.MessageDialog(None, strs, _('Update Manager'), wx.OK|wx.ICON_INFORMATION)
if dlg.ShowModal() == wx.ID_YES:
### update the message of the Progess dialog
self._parent.UpdatePulse(_("Downloading new libraries...."))
### go to download zip file
self.LoadZip(url+prefix+version+suffix)
def run(self):
""" Run worker thread.
"""
# This is the code executing in the new thread. Simulation of
# a long process as a simple urllib2 call
try:
# Try to read my web page
url = "http://code.google.com/p/devsimpy/downloads/list"
text = urllib.request.urlopen(url).read()
wx.CallAfter(self.CheckVersion, text)
except IOError:
# Unable to get to the internet
wx.CallAfter(self.CheckVersion, None)
except Exception:
# Some other strange error...
wx.CallAfter(self.CheckVersion, None)
return
def finish(self):
""" Return final value.
"""
return True