From 8704acbfc9d7991c6db9612b3205e38a4b6f1ed9 Mon Sep 17 00:00:00 2001 From: ninsbl Date: Thu, 5 Mar 2026 12:10:50 +0100 Subject: [PATCH 1/6] allow retries in case of ReadTimeouts --- src/actinia/utils.py | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/src/actinia/utils.py b/src/actinia/utils.py index e7f2753..4438e5e 100644 --- a/src/actinia/utils.py +++ b/src/actinia/utils.py @@ -32,14 +32,16 @@ from datetime import datetime -def request_and_check(method, url, status_code=(200,), **kwargs): - """Function to send a GET request to an URL and check the status code. +def request_and_check(method, url, status_code=(200,), retries=0, **kwargs): + """Send a request with given method to a URL and check the status code. Parameters: method (string): Request method (GET, POST, PUT, DELETE, ...) url (string): URL as string status_code (tuple): Tuple of acceptable status codes to check if it is set; default is 200 + retries (int): Maximal number of retries in case of read timeouts + default is 0. **kwargs: auth (tuple): Tuple of user and password timeout (tuple): Tuple of connection timeout and read timeout @@ -49,15 +51,34 @@ def request_and_check(method, url, status_code=(200,), **kwargs): Returns: (dict): returns text of the response as dictionary - Throws an error if the request does not have the status_code + Throws an error if the request does not have the status_code or + response content be paresd as JSON. """ - resp = requests.request(method, url, **kwargs) - # Use resp.raise_for_status() ? - if resp.status_code == 401: - raise Exception("Wrong user or password. Please check your inputs.") - elif resp.status_code not in status_code: - raise Exception(f"Error {resp.status_code}: {resp.text}") - return json.loads(resp.text) + attempt = 0 + while attempt <= retries: + attempt += 1 + resp = requests.request(method, url, **kwargs) + try: + if resp.status_code not in status_code: + resp.raise_for_status() + except requests.exceptions.ReadTimeout as e: + if attempt >= retries: + raise e + continue + except requests.exceptions.RequestException as e: + if resp.status_code == 401: + raise Exception( + "Wrong user or password. Please check your inputs." + ) from e + raise requests.exceptions.RequestException( + f"Error {resp.status_code}: {resp.text}", e + ) from None + try: + return json.loads(resp.text) + except json.JSONDecodeError: + raise RuntimeError( + "Invalid value returned. Cannot parse JSON from response:", resp.text + ) from None def set_job_names(name, default_name="unknown_job"): From 6b918dd4f6547300554da2b53ff0de90a9f2c8a6 Mon Sep 17 00:00:00 2001 From: ninsbl Date: Thu, 5 Mar 2026 12:13:06 +0100 Subject: [PATCH 2/6] pass retries for ReadTimeout to polling --- src/actinia/job.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/actinia/job.py b/src/actinia/job.py index 3275f7c..20315a1 100644 --- a/src/actinia/job.py +++ b/src/actinia/job.py @@ -54,9 +54,13 @@ def __update( for key in actinia_json_dict: setattr(self, key, actinia_json_dict[key]) - def poll(self, quiet=False): + def poll(self, quiet=False, retries=0): """ Update job by polling. + + Args: + quiet: Bool if the method should log process status + retries: Integer number of retries in case of ReadTimeout """ if self.status not in ["accepted", "running"]: log.warning("The job is not running and can not be updated.") @@ -67,7 +71,9 @@ def poll(self, quiet=False): "timeout": self.__actinia.timeout, } url = self.urls["status"] - resp = request_and_check("GET", url, status_code=(200, 400), **kwargs) + resp = request_and_check( + "GET", url, status_code=(200, 400), retries=retries, **kwargs + ) if "process_results" not in resp: resp["process_results"] = {} @@ -77,7 +83,7 @@ def poll(self, quiet=False): if not quiet: log.info(f"Status of {self.name} job is {self.status}.") - def poll_until_finished(self, waiting_time=5, quiet=False): + def poll_until_finished(self, waiting_time=5, quiet=False, retries=0): """ Polling job until finished or error. @@ -85,17 +91,15 @@ def poll_until_finished(self, waiting_time=5, quiet=False): waiting_time: Time to wait in seconds for next poll quiet: Bool if the method should log each process status or only changed + retries: Integer number of retries in case of ReadTimeout """ status_accepted_running = True status = None while status_accepted_running: - self.poll(quiet=True) + self.poll(quiet=True, retries=retries) if self.status not in ["accepted", "running"]: status_accepted_running = False - msg = ( - f"Status of {self.name} job is {self.status}: " - f"{self.message}" - ) + msg = f"Status of {self.name} job is {self.status}: {self.message}" if self.status in ["terminated", "error"]: log.error(msg) return 1 @@ -105,18 +109,12 @@ def poll_until_finished(self, waiting_time=5, quiet=False): sleep(waiting_time) if self.status != status and not quiet: status = self.status - msg = ( - f"Status of {self.name} job is {self.status}: " - f"{self.message}" - ) + msg = f"Status of {self.name} job is {self.status}: {self.message}" log.info(msg) def terminate(self): """Terminate the current job""" kwargs = {"auth": self._Job__auth, "timeout": self.__actinia.timeout} - url = ( - f"{self._Job__actinia.url}/resources/" - f"{self.user_id}/{self.resource_id}" - ) + url = f"{self._Job__actinia.url}/resources/{self.user_id}/{self.resource_id}" request_and_check("DELETE", url, **kwargs) log.info("Termination request for job {self.resource_id} committed.") From 1d9c142fa752de23b5ed00f23c326a2165a74527 Mon Sep 17 00:00:00 2001 From: ninsbl Date: Thu, 5 Mar 2026 12:22:49 +0100 Subject: [PATCH 3/6] black -l 79 --- src/actinia/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/actinia/utils.py b/src/actinia/utils.py index 4438e5e..18cbeb5 100644 --- a/src/actinia/utils.py +++ b/src/actinia/utils.py @@ -77,7 +77,8 @@ def request_and_check(method, url, status_code=(200,), retries=0, **kwargs): return json.loads(resp.text) except json.JSONDecodeError: raise RuntimeError( - "Invalid value returned. Cannot parse JSON from response:", resp.text + "Invalid value returned. Cannot parse JSON from response:", + resp.text, ) from None From fc8b619bcb686b820e69686d90bb5b614de7b45a Mon Sep 17 00:00:00 2001 From: ninsbl Date: Thu, 5 Mar 2026 12:23:59 +0100 Subject: [PATCH 4/6] grammar --- src/actinia/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/actinia/utils.py b/src/actinia/utils.py index 18cbeb5..cae1b5c 100644 --- a/src/actinia/utils.py +++ b/src/actinia/utils.py @@ -33,7 +33,7 @@ def request_and_check(method, url, status_code=(200,), retries=0, **kwargs): - """Send a request with given method to a URL and check the status code. + """Send a request with the given method to a URL and check the status code. Parameters: method (string): Request method (GET, POST, PUT, DELETE, ...) From c538cf33a433bc1621d9124f43d76b62e7f446b1 Mon Sep 17 00:00:00 2001 From: ninsbl Date: Thu, 5 Mar 2026 12:25:29 +0100 Subject: [PATCH 5/6] grammar --- src/actinia/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/actinia/utils.py b/src/actinia/utils.py index cae1b5c..07fab33 100644 --- a/src/actinia/utils.py +++ b/src/actinia/utils.py @@ -51,8 +51,8 @@ def request_and_check(method, url, status_code=(200,), retries=0, **kwargs): Returns: (dict): returns text of the response as dictionary - Throws an error if the request does not have the status_code or - response content be paresd as JSON. + Throws an error if the request does not have the right status_code or + the response content cannot be paresd as JSON. """ attempt = 0 while attempt <= retries: From af2630c9f7449853186690395a78b0daa101a90b Mon Sep 17 00:00:00 2001 From: ninsbl Date: Thu, 5 Mar 2026 12:35:03 +0100 Subject: [PATCH 6/6] flake8 --- src/actinia/job.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/actinia/job.py b/src/actinia/job.py index 20315a1..aefbd81 100644 --- a/src/actinia/job.py +++ b/src/actinia/job.py @@ -99,7 +99,10 @@ def poll_until_finished(self, waiting_time=5, quiet=False, retries=0): self.poll(quiet=True, retries=retries) if self.status not in ["accepted", "running"]: status_accepted_running = False - msg = f"Status of {self.name} job is {self.status}: {self.message}" + msg = ( + f"Status of {self.name} job is {self.status}: " + f"{self.message}" + ) if self.status in ["terminated", "error"]: log.error(msg) return 1 @@ -109,12 +112,18 @@ def poll_until_finished(self, waiting_time=5, quiet=False, retries=0): sleep(waiting_time) if self.status != status and not quiet: status = self.status - msg = f"Status of {self.name} job is {self.status}: {self.message}" + msg = ( + f"Status of {self.name} job is {self.status}: " + f"{self.message}" + ) log.info(msg) def terminate(self): """Terminate the current job""" kwargs = {"auth": self._Job__auth, "timeout": self.__actinia.timeout} - url = f"{self._Job__actinia.url}/resources/{self.user_id}/{self.resource_id}" + url = ( + f"{self._Job__actinia.url}/resources/" + f"{self.user_id}/{self.resource_id}" + ) request_and_check("DELETE", url, **kwargs) log.info("Termination request for job {self.resource_id} committed.")