-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunRemoteTasks.py
More file actions
executable file
·380 lines (327 loc) · 14.6 KB
/
runRemoteTasks.py
File metadata and controls
executable file
·380 lines (327 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
#!/usr/bin/env python
__author__ = 'Peter Nordin'
__license__ = "GPLv3"
__email__ = "peter.nordin@liu.se"
import shutil as sh
import sys
import os
import subprocess
import time
import argparse
# Input and output files
gInputTaskList = 'taskList'
gInputServerlist = 'serverlist'
gOutputDoneTaskList = 'doneTasks'
gOutputFailedTaskList = 'failedTasks'
# Program locations psoix=linux
if os.name == 'posix':
gSevenZip = r'7z'
gHopsanclient = r'/opt/hopsan/bin/HopsanServerClient'
# Program locations for Windows
elif os.name == 'nt':
gSevenZip = r'C:\Program Files\7-Zip\7z.exe'
gHopsanclient = r'C:\Program Files\Hopsan\bin\HopsanServerClient.exe'
#gHopsanclient = r'C:\svn\hopsan\trunk\bin\HopsanServerClient.exe'
# Remote files and identification
gUserID = 'anonymous'
gServerSideScript = 'serverSideRunTask.sh'
gServerSideWD = 'mytask'
gTaskZipFile = 'mytask.zip'
gOutputFile = 'mytaskoutput.txt'
# gResultFiles = ['hopsancli_debug.txt']
gResultFiles = []
# Other settings
gRunOnAll = False
gSleepTime = 5
gNumSlots = 2
gTest = False
def mkdirs(path):
try:
os.makedirs(path)
except OSError as exception:
if exception.errno != os.errno.EEXIST:
raise
def remove(path):
try:
os.remove(path)
except OSError as exception:
if exception.errno != os.errno.ENOENT:
raise
def move(src, dst):
if src != dst:
sh.move(src, dst)
else:
print('In move: Same src and dst')
def appendToFile(filepath, text):
with open(filepath, 'a+') as f:
f.write(text+'\n')
class ServerHandler:
def __init__(self):
self.servers = list()
self.takenservers = list()
def readservers(self, serverlistfile):
try:
with open(serverlistfile, 'r+') as f:
for line in f:
self.servers.append(line.strip())
print(self.servers)
except IOError as e:
print "I/O error({0}): {1}, {2}".format(e.errno, e.strerror, serverlistfile)
def numservers(self):
return len(self.servers)+len(self.takenservers)
def numfreeservers(self):
return len(self.servers)
def takeserver(self):
if self.numfreeservers() > 0:
srv = self.servers.pop(0)
self.takenservers.append(srv)
return srv
else:
return None
def returnserver(self, server):
self.servers.append(server)
self.takenservers.remove(server)
class Experiment:
def __init__(self, server, process, expdir):
self.server = server
self.process = process
self.experimentdirectory = expdir
self.outputdir = expdir
self.taskcompletedok = False
self.retrievalcompletdok = False
def isrunning(self):
if self.process is not None:
return self.process.poll() is None
else:
return False
def communicate(self):
if self.process is not None:
return self.process.communicate()
else:
return '', ''
def rc(self):
if self.process is not None:
#print('rc: '+str(self.process.returncode))
return self.process.returncode
else:
return 1
def compressDirectory(srcdir, dst):
print('Compressing: '+srcdir+' to dst: '+dst)
fname = os.path.basename(dst)
print([gSevenZip, 'a', '-tzip', fname, '-r', '*'])
if gTest:
return True
else:
remove(dst)
p = subprocess.Popen([gSevenZip, 'a', '-tzip', fname, '-r', '*'], cwd=srcdir)
p.wait()
move(os.path.join(srcdir, fname), dst)
return os.path.exists(dst)
def executeRemoteTask(lwd, address, userid, taskzipfile, serversidescript):
print('Running task at: '+address+' Using local WD: '+lwd)
print([gHopsanclient, '-s', address, '-u ' + userid, '-a ' + taskzipfile, '-a ' + serversidescript, '--numslots', str(gNumSlots), '--shellexec', '/bin/bash ' + os.path.basename(serversidescript)])
if gTest:
p = subprocess.Popen(['ping', '127.0.0.1', '-c', '4'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
p = subprocess.Popen([gHopsanclient, '-s', address, '-u ' + userid, '-a ' + taskzipfile, '-a ' + serversidescript,
'--numslots', str(gNumSlots), '--shellexec', '/bin/bash ' + os.path.basename(serversidescript)],
cwd=lwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return p
def retriveRemoteTaskResults(lwd, address, userid, outputfile, resultfiles, sequential=False):
print('Retrieving from: '+address+' Using local WD: ' + lwd)
mkdirs(lwd)
cmdlist = [gHopsanclient, '-s', address, '-u ' + userid, '--request ' + outputfile.replace('\\', '/')]
for r in resultfiles:
cmdlist.append('--request ' + r.replace('\\', '/'))
print(cmdlist)
if gTest:
p = subprocess.Popen(['ping', '127.0.0.1', '-c', '1'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
else:
p = subprocess.Popen(cmdlist, cwd=lwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if sequential:
p.wait()
if not gTest:
for r in resultfiles:
src = os.path.join(lwd, r)
if not os.path.exists(src):
print('Error: ' + src + ' does not exist!')
src = os.path.join(lwd, outputfile)
if not os.path.exists(src):
print('Error: ' + src + ' does not exist!')
return p
if __name__ == "__main__":
argparser = argparse.ArgumentParser()
argparser.add_argument('--user', help='User name to use')
argparser.add_argument('--hopsandir', help='Path to the Hopsan bin directory to use')
argparser.add_argument('--serverlist', help='Text file with server ip:port line by line')
argparser.add_argument('--ssscript', help='Server side script to execute')
argparser.add_argument('--tasklist', help='A list of tasks (directories) to process')
argparser.add_argument('--runonall', help='Run the task on every server', action='store_true')
argparser.add_argument('--outdir', help='The destination for received file (will be created if missing)')
argparser.add_argument('--resultfiles', nargs='*', action='append')
argparser.add_argument('--numslots', type=int, help='The number of required slots, n>0')
args = argparser.parse_args()
if args.runonall:
gRunOnAll = True
if args.user:
gUserID = args.user
if args.hopsandir:
gHopsanclient = os.path.realpath(os.path.join(args.hopsandir, 'HopsanServerClient'))
if args.serverlist:
gInputServerlist = os.path.realpath(args.serverlist)
if args.tasklist:
gInputTaskList = os.path.realpath(args.tasklist)
if args.ssscript:
gServerSideScript = os.path.realpath(args.ssscript)
outdir = os.path.join(os.path.expanduser('~'), 'Temp/RemoteOut')
if args.outdir:
outdir = os.path.realpath(args.outdir)
mkdirs(outdir)
if args.numslots:
gNumSlots = max(1, args.numslots)
print('User: '+gUserID)
print('HopsanClient: '+gHopsanclient)
print('ServerSideScript: '+gServerSideScript)
print('Using output directory: '+outdir)
print('Run task on every server: '+str(gRunOnAll))
if args.resultfiles:
gResultFiles = [rf for rfl in args.resultfiles for rf in rfl]
print 'Result files: ',
print(gResultFiles)
print('Requesting num slots: '+str(gNumSlots))
SH = ServerHandler()
SH.readservers(gInputServerlist)
print('Num servers: '+str(SH.numservers()))
if SH.numservers() < 1:
print('Error: No servers')
sys.exit(-1)
experiment_list = list()
try:
with open(gInputTaskList, 'r+') as f:
for line in f:
line = line.strip()
if line != '':
experiment_list.append(line)
except IOError as e:
print "I/O error({0}): {1}, {2}".format(e.errno, e.strerror, gInputTaskList)
running_experiments = list()
running_retreivals = list()
done_experiments = list()
failed_experiments = list()
zipped_tasks = list()
num_experiments = len(experiment_list)
# Duplicate tasks for every server, if they should be run on all
if gRunOnAll:
newlist = list()
for exp in list(experiment_list):
for i in range(SH.numfreeservers()):
newlist.append(exp)
experiment_list = newlist
num_experiments = len(experiment_list)
# Clear output files
if num_experiments > 0:
remove(gOutputDoneTaskList)
remove(gOutputFailedTaskList)
# Run until all experiments have been processed
while len(done_experiments)+len(failed_experiments) < num_experiments:
havefreeservers = SH.numfreeservers() > 0
if len(experiment_list) > 0 and havefreeservers:
# If every task should run on every server, then start it once for every server
if gRunOnAll:
while SH.numfreeservers() > 0 and len(experiment_list) > 0:
expdir = os.path.realpath(experiment_list.pop(0))
print('Processing experiment: ' + expdir)
zipfile = os.path.join(expdir, gTaskZipFile)
if zipfile not in zipped_tasks:
remove(zipfile) # Remove previous version to avoid including it in new zip
if compressDirectory(expdir, zipfile):
zipped_tasks.append(zipfile)
else:
print('Error: Failed to compress: ' + expdir)
failed_experiments.append(expdir)
print('\n')
if zipfile in zipped_tasks:
srv = SH.takeserver()
p = executeRemoteTask(os.getcwd(), srv, gUserID, zipfile, gServerSideScript)
exp = Experiment(srv, p, expdir+'_'+srv.replace(':', '_'))
running_experiments.append(exp)
havefreeservers = True
# Else start a new experiment on ONE server, if we have free servers
else:
expdir = os.path.realpath(experiment_list.pop(0))
print('Processing experiment: ' + expdir)
zipfile = os.path.join(expdir, gTaskZipFile)
if zipfile not in zipped_tasks:
remove(zipfile) # Remove previous version to avoid including it in new zip
if compressDirectory(expdir, zipfile):
zipped_tasks.append(zipfile)
else:
print('Error: Failed to compress: ' + expdir)
failed_experiments.append(expdir)
print('\n')
if zipfile in zipped_tasks:
srv = SH.takeserver()
p = executeRemoteTask(os.getcwd(), srv, gUserID, zipfile, gServerSideScript)
exp = Experiment(srv, p, expdir)
running_experiments.append(exp)
print('Free servers ' + str(SH.numfreeservers()) + ' out of ' + str(SH.numservers()))
# Check running and handle completed experiments
for rexp in list(running_experiments):
if not rexp.isrunning():
rexp.outputdir = os.path.join(outdir, os.path.basename(rexp.experimentdirectory))
mkdirs(rexp.outputdir)
clientSideLog = os.path.join(rexp.outputdir, 'clientSideTaskLog.txt')
remove(clientSideLog) # Remove old log file if it exists
stdout, stderr = rexp.communicate()
appendToFile(clientSideLog, stdout)
rexp.taskcompletedok = (rexp.rc() == 0)
# OK we are done, lets start a fetch results process
outputfile = os.path.join(gServerSideWD, gOutputFile)
resultfiles = [os.path.join(gServerSideWD, rf) for rf in gResultFiles]
p = retriveRemoteTaskResults(rexp.outputdir, rexp.server, gUserID, outputfile, resultfiles, True)
rexp.process = p
running_retreivals.append(rexp)
running_experiments.remove(rexp)
# Check retrievals and return the server as a free servers for further processing
for rret in list(running_retreivals):
if not rret.isrunning():
clientSideLog = os.path.join(rret.outputdir, 'clientSideRetrieveLog.txt')
remove(clientSideLog) # Remove old log file if it exists
stdout, stderr = rret.communicate()
appendToFile(clientSideLog, stdout)
running_retreivals.remove(rret)
rret.retrievalcompletedok = (rret.rc() == 0)
# We are done, lets check if both task and retrieval were ok
if rret.taskcompletedok and rret.retrievalcompletedok:
appendToFile(gOutputDoneTaskList, rret.experimentdirectory)
done_experiments.append(rret)
else:
appendToFile(gOutputFailedTaskList, rret.experimentdirectory)
failed_experiments.append(rret.experimentdirectory)
SH.returnserver(rret.server)
# Handle case when we have no free servers
if not havefreeservers:
print('All servers are busy, Still running')
# Wait a while until we try again
time.sleep(gSleepTime)
# Handle case when all experiments are running
elif len(experiment_list) == 0 and len(running_experiments) != 0:
print('Experiments are still running')
time.sleep(gSleepTime)
# Handle case when we are only waiting for transfers to complete
elif len(experiment_list) == 0 and len(running_retreivals) != 0:
# Sleep for a short time her, but do not spam with messages
time.sleep(1)
print('\n')
print('NumExperiments:' + str(num_experiments) + ' Done:' + str(len(done_experiments)) + ' RunningTasks:' +
str(len(running_experiments)) + ' RunningRetreivals:' + str(len(running_retreivals)) + ' Failed:' +
str(len(failed_experiments)))
print('Free servers ' + str(SH.numfreeservers()) + ' out of ' + str(SH.numservers()))
print('\n')
time.sleep(1)
# OK we are done, lets check that every server has been returned
print('Done: Free servers '+str(SH.numfreeservers())+' out of '+str(SH.numservers()))
if len(failed_experiments) > 0:
print('The following tasks failed:')
for fe in failed_experiments:
print(fe+' TaskOK: '+str(fe.taskcompletedok)+' RetrieveOK: '+str(fe.retrievalcompletdok))