Skip to content
23 changes: 16 additions & 7 deletions blacs/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,20 +693,29 @@ def on_open_preferences(self,*args,**kwargs):
self.settings.create_dialog()

class ExperimentServer(ZMQServer):
def handler(self, h5_filepath):
print(h5_filepath)
message = self.process(h5_filepath)
def handler(self, data):
"""
This is an override to accept remote messages.
"""
message = self.process(data)
logger.info('Request handler: %s ' % message.strip())
return message

@inmain_decorator(wait_for_return=True)
def process(self,h5_filepath):
def process(self, data):

agnostic_path = data["agnostic_path"]
lyse_host = data["lyse_host"]

# Convert path to local slashes and shared drive prefix:
logger.info('received filepath: %s'%h5_filepath)
h5_filepath = labscript_utils.shared_drive.path_to_local(h5_filepath)
logger.info('received filepath: %s'%agnostic_path)
h5_filepath = labscript_utils.shared_drive.path_to_local(agnostic_path)
logger.info('local filepath: %s'%h5_filepath)
return app.queue.process_request(h5_filepath)

# NASTY CODE STYLE: this is in reference to the global variable `app = BLACS(qapplication)` defined below
# Took me 30 minutes to track down this logic.
# TODO: banish global variables of this type.
return app.queue.process_request(h5_filepath, lyse_host=lyse_host)

if __name__ == '__main__':
if 'tracelog' in sys.argv:
Expand Down
267 changes: 159 additions & 108 deletions blacs/analysis_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@


class AnalysisSubmission(object):

icon_names = {'checking': ':/qtutils/fugue/hourglass',
'online': ':/qtutils/fugue/tick',
'offline': ':/qtutils/fugue/exclamation',
'': ':/qtutils/fugue/status-offline'}

tooltips = {'checking': 'Checking...',
'online': 'Server is responding',
'offline': 'Server not responding',
'': 'Disabled'}

def __init__(self, BLACS, blacs_ui):
self.inqueue = queue.Queue()
self.BLACS = BLACS
Expand All @@ -41,18 +52,20 @@ def __init__(self, BLACS, blacs_ui):
blacs_ui.analysis.addWidget(self._ui)
self._ui.frame.setMinimumWidth(blacs_ui.queue_controls_frame.sizeHint().width())
elide_label(self._ui.resend_shots_label, self._ui.failed_to_send_frame.layout(), Qt.ElideRight)

self._waiting_for_submission = {}
self.failure_reason = {}
self.send_to_server = False
self.server = ''
self.time_of_last_connectivity_check = 0
self.server_online = {}

# connect signals
self._ui.send_to_server.toggled.connect(lambda state: self._set_send_to_server(state))
self._ui.server.editingFinished.connect(lambda: self._set_server(self._ui.server.text()))
self._ui.clear_unsent_shots_button.clicked.connect(lambda _: self.clear_waiting_files())
self._ui.retry_button.clicked.connect(lambda _: self.check_retry())

self._waiting_for_submission = []
self.failure_reason = None
self.server_online = 'offline'
self.send_to_server = False
self.server = ''
self.time_of_last_connectivity_check = 0

self.mainloop_thread = threading.Thread(target=self.mainloop)
self.mainloop_thread.daemon = True
Expand All @@ -64,16 +77,23 @@ def restore_save_data(self,data):
if "send_to_server" in data:
self.send_to_server = data["send_to_server"]
if "waiting_for_submission" in data:
self._waiting_for_submission = list(data["waiting_for_submission"])
self.inqueue.put(['save data restored', None])
self._waiting_for_submission = dict(data["waiting_for_submission"])
self.inqueue.put(['save data restored', None, None])
self.check_retry()

def get_save_data(self):
return {"waiting_for_submission":list(self._waiting_for_submission),
"server":self.server,
"send_to_server":self.send_to_server
return {"waiting_for_submission": dict(self._waiting_for_submission),
"server": self.server,
"send_to_server": self.send_to_server
}

def _waiting_for_submission_len(self):
length = 0
for k, v in enumerate(self._waiting_for_submission):
length += len(v)

return length

def _set_send_to_server(self,value):
self.send_to_server = value

Expand Down Expand Up @@ -118,24 +138,36 @@ def server_online(self):

@server_online.setter
@inmain_decorator(True)
def server_online(self,value):
self._server_online = str(value)

icon_names = {'checking': ':/qtutils/fugue/hourglass',
'online': ':/qtutils/fugue/tick',
'offline': ':/qtutils/fugue/exclamation',
'': ':/qtutils/fugue/status-offline'}
def server_online(self, value):

tooltips = {'checking': 'Checking...',
'online': 'Server is responding',
'offline': 'Server not responding',
'': 'Disabled'}
self._server_online = value

icon = QIcon(icon_names.get(self._server_online, ':/qtutils/fugue/exclamation-red'))
status = 'online'
tooltip = ''
for server in self._waiting_for_submission:

if server not in value:
value[server] = ''

v = value[server]

if v == 'offline':
status = 'offline'
if tooltip != '':
tooltip += '\n'

tip = self.tooltips.get(status, 'Invalid message {}'.format(status))
tooltip += 'Server {} status: {}'.format(server, tip)

if server not in self.failure_reason:
self.failure_reason[server] = None
tooltip += 'Server not checked yet'

if self.failure_reason[server] is not None:
tooltip += '[[{}]]'.format(self.failure_reason[server])

icon = QIcon(self.icon_names.get(status, ':/qtutils/fugue/exclamation-red'))
pixmap = icon.pixmap(QSize(16, 16))
tooltip = tooltips.get(self._server_online, "Invalid server status: %s" % self._server_online)
if self.failure_reason is not None:
tooltip += '\n' + self.failure_reason

# Update GUI:
self._ui.server_online.setPixmap(pixmap)
Expand All @@ -145,33 +177,46 @@ def server_online(self,value):

@inmain_decorator(True)
def update_waiting_files_message(self):
# if there is only one shot and we haven't encountered failure yet, do
# not show the error frame:
if (self.server_online == 'checking') and (len(self._waiting_for_submission) == 1) and not self._ui.failed_to_send_frame.isVisible():
return
if self._waiting_for_submission:

message = ''
failed = False
for server, shots in self._waiting_for_submission.items():
length = len(shots)

# The server may never have been checked
if server not in self.server_online:
self._server_online[server] = ''

# if there is only one shot and we haven't encountered failure yet, do
# not show the error frame:
if (self.server_online[server] == 'checking') and (length == 1) and not self._ui.failed_to_send_frame.isVisible():
pass
elif length:
if self.server_online[server] == 'checking':
message += 'Server {}: Sending {} shot(s)...'.format(server, length)
else:
message += 'Server {}: {} shot(s) to send...'.format(server, length)

if failed and self._waiting_for_submission_len():
self._ui.failed_to_send_frame.show()
if self.server_online == 'checking':
self._ui.retry_button.hide()
text = 'Sending %s shot(s)...' % len(self._waiting_for_submission)
else:
self._ui.retry_button.show()
text = '%s shot(s) to send' % len(self._waiting_for_submission)
self._ui.resend_shots_label.setText(text)
else:
self._ui.failed_to_send_frame.hide()

self._ui.resend_shots_label.setText(message)

self._ui.retry_button.show()

def get_queue(self):
return self.inqueue

@inmain_decorator(True)
def clear_waiting_files(self):
self._waiting_for_submission = []
self._waiting_for_submission = {}
self.update_waiting_files_message()

@inmain_decorator(True)
def check_retry(self):
self.inqueue.put(['check/retry', None])
self.inqueue.put(['check/retry', None, None])

def mainloop(self):
self._mainloop_logger = logging.getLogger('BLACS.AnalysisSubmission.mainloop')
Expand All @@ -182,39 +227,27 @@ def mainloop(self):
while True:
try:
try:
signal, data = self.inqueue.get(timeout=timeout)
signal, data, lyse_host = self.inqueue.get(timeout=timeout)
except queue.Empty:
timeout = 10
# Periodic checking of connectivity and resending of files.
# Don't trigger a re-check if we already failed a connectivity
# check within the last second:
if (time.time() - self.time_of_last_connectivity_check) > 1:
signal = 'check/retry'
else:
continue
if signal == 'check/retry':
self.check_connectivity()
if self.server_online == 'online':
self.submit_waiting_files()
elif signal == 'file':
continue

if signal == 'file':
if self.send_to_server:
self._waiting_for_submission.append(data)
if self.server_online != 'online':
# Don't stack connectivity checks if many files are
# arriving. If we failed a connectivity check less
# than a second ago then don't check again.
if (time.time() - self.time_of_last_connectivity_check) > 1:
self.check_connectivity()
else:
# But do queue up a check for when we have
# been idle for one second:
timeout = 1
if self.server_online == 'online':
self.submit_waiting_files()

lyse_host = lyse_host if lyse_host != '' else self.server

if lyse_host not in self._waiting_for_submission:
self._waiting_for_submission[lyse_host] = []

self._waiting_for_submission[lyse_host].append(data)

self.submit_waiting_files()
elif signal == 'close':
break
elif signal == 'save data restored':
continue
elif signal == 'check/retry':
self.submit_waiting_files()
else:
raise ValueError('Invalid signal: %s'%str(signal))

Expand All @@ -225,53 +258,71 @@ def mainloop(self):
self._mainloop_logger.exception("Exception in mainloop, continuing")

def check_connectivity(self):
host = self.server
send_to_server = self.send_to_server
if host and send_to_server:
self.server_online = 'checking'
try:
response = zmq_get(self.port, host, 'hello', timeout=1)
self.failure_reason = None
except (TimeoutError, gaierror, AuthenticationFailure) as e:
success = False
self.failure_reason = str(e)
else:
success = (response == 'hello')
if not success:
self.failure_reason = "unexpected reponse: %s" % str(response)

server_online = {}

for server in self._waiting_for_submission:
send_to_server = self.send_to_server
if send_to_server:
server_online[server] = 'checking'
self.server_online = server_online # update GUI

# update GUI
self.server_online = 'online' if success else 'offline'
else:
self.server_online = ''
try:
response = zmq_get(self.port, server, 'hello', timeout=1)
self.failure_reason[k] = None
except (TimeoutError, gaierror, AuthenticationFailure) as e:
success = False
self.failure_reason[k] = str(e)
else:
success = (response == 'hello')
if not success:
self.failure_reason[k] = "unexpected reponse: %s" % str(response)

server_online[server] = 'online' if success else 'offline'
else:
server_online[server] = ''

# update GUI
self.server_online = server_online

self.time_of_last_connectivity_check = time.time()

def submit_waiting_files(self):
success = True
while self._waiting_for_submission and success:
path = self._waiting_for_submission[0]
self._mainloop_logger.info('Submitting run file %s.\n'%os.path.basename(path))
data = {'filepath': labscript_utils.shared_drive.path_to_agnostic(path)}
self.server_online = 'checking'
try:
response = zmq_get(self.port, self.server, data, timeout=1)
self.failure_reason = None
except (TimeoutError, gaierror, AuthenticationFailure) as e:
success = False
self.failure_reason = str(e)
else:
success = (response == 'added successfully')
if not success:
self.failure_reason = "unexpected reponse: %s" % str(response)

server_online = {}
for server, shots in self._waiting_for_submission.items():
success = True

while shots and success:
path = shots[0]
self.server = server

self._mainloop_logger.info('Submitting run file %s.\n'%os.path.basename(path))
data = {'filepath': labscript_utils.shared_drive.path_to_agnostic(path)}

server_online[server] = 'checking'
self.server_online = server_online # update GUI

try:
self._waiting_for_submission.pop(0)
except IndexError:
# Queue has been cleared
pass
if not success:
break
response = zmq_get(self.port, server, data, timeout=1)
self.failure_reason[server] = None
except (TimeoutError, gaierror, AuthenticationFailure) as e:
success = False
self.failure_reason[server] = str(e)
else:
success = (response == 'added successfully')
if not success:
self.failure_reason[server] = "unexpected reponse: %s" % str(response)
try:
shots.pop(0)
except IndexError:
# Queue has been cleared
pass

server_online[server] = 'online' if success else 'offline'

# update GUI
self.server_online = 'online' if success else 'offline'
self.server_online = server_online

self.time_of_last_connectivity_check = time.time()

Loading