diff --git a/docs/user/hyperion/advanced/alerts.rst b/docs/user/hyperion/advanced/alerts.rst index 03aafd2612..8078db2a67 100644 --- a/docs/user/hyperion/advanced/alerts.rst +++ b/docs/user/hyperion/advanced/alerts.rst @@ -15,6 +15,8 @@ The currently supported events that will generate alerts are: - When Hyperion moves on to a new container (puck). - When Hyperion is paused waiting for beam for more than 10 minutes (xbpm_feedback position stable. Repeats every 10 minutes) - When Hyperion data collection takes longer than 10 minutes (as measured from the last wait-for-beam check) +- When Hyperion supervisor is unable to communicate with hyperion-blueapi +- When Hyperion is unable to fetch the next instruction from agamemnon Graylog Alert Configuration =========================== diff --git a/src/mx_bluesky/common/external_interaction/alerting/_service.py b/src/mx_bluesky/common/external_interaction/alerting/_service.py index 665eb503aa..c502b20ae6 100644 --- a/src/mx_bluesky/common/external_interaction/alerting/_service.py +++ b/src/mx_bluesky/common/external_interaction/alerting/_service.py @@ -3,6 +3,8 @@ from typing import Protocol from urllib.parse import quote, urlencode +from dodal.utils import get_beamline_name + class Metadata(StrEnum): """Metadata fields that can be specified by the caller when raising an alert.""" @@ -47,6 +49,20 @@ def raise_alert( """ pass + def raise_error_alert(self, content: str, metadata: dict[Metadata, str]): + """ + Raise an alert that will be forwarded to beamline staff and EHC controllers for out-of-hours + support. + Args: + content: Plain text content detailing the nature of the incident. + metadata: A dict of strings that can be included as metadata in the alert for + those backends that support it. The summary and content will be included + by default. + + """ + beamline = get_beamline_name("") + self.raise_alert(f"UDC encountered an error on {beamline}", content, metadata) + _alert_service: AlertService diff --git a/src/mx_bluesky/common/external_interaction/alerting/log_based_service.py b/src/mx_bluesky/common/external_interaction/alerting/log_based_service.py index 70993ebee4..b0934b1446 100644 --- a/src/mx_bluesky/common/external_interaction/alerting/log_based_service.py +++ b/src/mx_bluesky/common/external_interaction/alerting/log_based_service.py @@ -6,6 +6,7 @@ from mx_bluesky.common.external_interaction.alerting import Metadata from mx_bluesky.common.external_interaction.alerting._service import ( + AlertService, ExtraMetadata, graylog_url, ispyb_url, @@ -15,7 +16,7 @@ ) -class LoggingAlertService: +class LoggingAlertService(AlertService): """ Implement an alert service that raises alerts by generating a specially formatted log message, that may be intercepted by a logging service such as graylog and diff --git a/src/mx_bluesky/common/external_interaction/callbacks/sample_handling/sample_handling_callback.py b/src/mx_bluesky/common/external_interaction/callbacks/sample_handling/sample_handling_callback.py index 7b53cb9eac..c402429f77 100644 --- a/src/mx_bluesky/common/external_interaction/callbacks/sample_handling/sample_handling_callback.py +++ b/src/mx_bluesky/common/external_interaction/callbacks/sample_handling/sample_handling_callback.py @@ -1,4 +1,3 @@ -from dodal.utils import get_beamline_name from event_model import RunStart, RunStop from mx_bluesky.common.external_interaction.alerting import ( @@ -70,9 +69,7 @@ def _record_exception( sample_status = self._decode_sample_status(exception_type) expeye.update_sample_status(self._sample_id, sample_status) if sample_status == BLSampleStatus.ERROR_BEAMLINE: - beamline = get_beamline_name("") - get_alerting_service().raise_alert( - f"UDC encountered an error on {beamline}", + get_alerting_service().raise_error_alert( f"Hyperion encountered the following beamline error: {reason}", { Metadata.SAMPLE_ID: str(self._sample_id), diff --git a/src/mx_bluesky/hyperion/external_interaction/agamemnon.py b/src/mx_bluesky/hyperion/external_interaction/agamemnon.py index e355b65d54..a2be022dea 100644 --- a/src/mx_bluesky/hyperion/external_interaction/agamemnon.py +++ b/src/mx_bluesky/hyperion/external_interaction/agamemnon.py @@ -1,6 +1,7 @@ import json import os import re +import time from collections.abc import Sequence from enum import StrEnum from os import path @@ -9,7 +10,9 @@ import requests from dodal.utils import get_beamline_name from pydantic import BaseModel +from requests import ConnectionError, HTTPError, Timeout +from mx_bluesky.common.external_interaction.alerting import get_alerting_service from mx_bluesky.common.parameters.components import ( WithVisit, ) @@ -25,6 +28,7 @@ PinTypeParam, SingleSamplePinTypeParam, ) +from mx_bluesky.hyperion.plan_runner import PlanError T = TypeVar("T", bound=WithVisit) MULTIPIN_PREFIX = "multipin" @@ -32,6 +36,9 @@ MULTIPIN_REGEX = rf"^{MULTIPIN_PREFIX}_(\d+)x(\d+(?:\.\d+)?)\+(\d+(?:\.\d+)?)$" MX_GENERAL_ROOT_REGEX = r"^/dls/(?P[^/]+)/data/[^/]*/(?P[^/]+)(?:/|$)" +MAX_TRIES = 3 +RETRY_INITIAL_DELAY_S = 2 + class _InstructionType(StrEnum): WAIT = "wait" @@ -43,7 +50,10 @@ def create_parameters_from_agamemnon() -> Sequence[BaseModel]: mx-bluesky instructions. Returns: The generated sequence of mx-bluesky parameters, or empty list if - no instructions.""" + no instructions. + Raises: + PlanError: if the instructions could not be fetched from Agamemnon + """ beamline_name = get_beamline_name("i03") agamemnon_instruction = _get_next_instruction(beamline_name) if agamemnon_instruction: @@ -72,8 +82,39 @@ def _instruction_and_data(agamemnon_instruction: dict) -> tuple[str, Any]: def _get_parameters_from_url(url: str) -> dict: - response = requests.get(url, headers={"Accept": "application/json"}) - response.raise_for_status() + tries, delay = MAX_TRIES, RETRY_INITIAL_DELAY_S + while tries > 0: + tries -= 1 + try: + response = requests.get(url, headers={"Accept": "application/json"}) + try: + response.raise_for_status() + break + except HTTPError as e: + if 500 <= response.status_code < 600: + LOGGER.warning( + f"Agamemnon returned server error status {response.status_code}, retries left {tries}: {str(e)}" + ) + else: + msg = f"Agamemnon returned unexpected HTTP response status code {response.status_code}" + get_alerting_service().raise_error_alert(msg, {}) + raise PlanError(msg) from e + except ConnectionError as e: + LOGGER.warning( + f"Connection error attempting to connect to agamemnon, retries left {tries}", + exc_info=e, + ) + except Timeout: + LOGGER.warning( + f"Timed out attempting to connect to agamemnon, retries left {tries}" + ) + if tries: + time.sleep(delay) # noqa + delay *= 2 + else: + msg = f"Unable to fetch instruction from agamemnon after {MAX_TRIES} attempts, ending UDC." + get_alerting_service().raise_error_alert(msg, {}) + raise PlanError(msg) return json.loads(response.content) diff --git a/src/mx_bluesky/hyperion/supervisor/_supervisor.py b/src/mx_bluesky/hyperion/supervisor/_supervisor.py index 768b2a038d..4a225a28ce 100644 --- a/src/mx_bluesky/hyperion/supervisor/_supervisor.py +++ b/src/mx_bluesky/hyperion/supervisor/_supervisor.py @@ -1,7 +1,9 @@ +import time from collections.abc import Sequence from blueapi.client.client import BlueapiClient from blueapi.client.event_bus import BlueskyStreamingError +from blueapi.client.rest import ServiceUnavailableError from blueapi.config import ApplicationConfig from blueapi.core import BlueskyContext from blueapi.service.model import TaskRequest @@ -10,6 +12,7 @@ from bluesky.utils import MsgGenerator from pydantic import BaseModel +from mx_bluesky.common.external_interaction.alerting import get_alerting_service from mx_bluesky.common.parameters.constants import Status from mx_bluesky.common.utils.exceptions import CrystalNotFoundError, SampleError from mx_bluesky.common.utils.log import LOGGER @@ -23,6 +26,9 @@ from mx_bluesky.hyperion.plan_runner import PlanError, PlanRunner from mx_bluesky.hyperion.supervisor._task_monitor import TaskMonitor +MAX_TRIES = 3 +RETRY_INITIAL_DELAY_S = 2 + class SupervisorRunner(PlanRunner): """Runner that executes plans by delegating to a remote blueapi instance""" @@ -125,9 +131,38 @@ def shutdown(self): def _run_task_remotely(self, task_request: TaskRequest): try: with TaskMonitor(self.blueapi_client, task_request) as task_monitor: - task_status = self.blueapi_client.run_task( - task_request, on_event=task_monitor.on_blueapi_event - ) + tries, task_status, delay = MAX_TRIES, None, RETRY_INITIAL_DELAY_S + while tries > 0: + tries -= 1 + try: + task_status = self.blueapi_client.run_task( + task_request, on_event=task_monitor.on_blueapi_event + ) + break + except ServiceUnavailableError as e: + LOGGER.warning( + "Could not connect to blueapi client.", exc_info=e + ) + time.sleep(delay) # noqa + delay += delay + except BlueskyStreamingError: + raise + except Exception as e: + get_alerting_service().raise_error_alert( + "Unexpected error communicating with hyperion-blueapi", {} + ) + raise PlanError( + "Unexpected error communicating with hyperion-blueapi" + ) from e + else: + LOGGER.error("Max retries reached, ending UDC.") + get_alerting_service().raise_error_alert( + "hyperion-supervisor stopped UDC because unable to connect to hyperion-blueapi.", + {}, + ) + raise PlanError( + f"Unable to connect to hyperion-blueapi after {MAX_TRIES} attempts, ending UDC" + ) LOGGER.info( f"hyperion-blueapi completed task execution with task_status {task_status}" ) diff --git a/src/mx_bluesky/hyperion/supervisor/_task_monitor.py b/src/mx_bluesky/hyperion/supervisor/_task_monitor.py index 45c344e83d..c347b5b2db 100644 --- a/src/mx_bluesky/hyperion/supervisor/_task_monitor.py +++ b/src/mx_bluesky/hyperion/supervisor/_task_monitor.py @@ -86,9 +86,7 @@ def _reset_timer(self): self._timer.start() def _raise_alert_collection_is_stuck(self): - beamline = get_beamline_name("") - self._alerting_service.raise_alert( - f"UDC encountered an error on {beamline}", + self._alerting_service.raise_error_alert( f"Hyperion Supervisor detected that BlueAPI was stuck for {self.DEFAULT_TIMEOUT_S} seconds.", self._extract_metadata(), ) diff --git a/tests/unit_tests/common/external_interaction/alerting/alerting/test_alerting.py b/tests/unit_tests/common/external_interaction/alerting/alerting/test_alerting.py index 2735f042b0..95a698f530 100644 --- a/tests/unit_tests/common/external_interaction/alerting/alerting/test_alerting.py +++ b/tests/unit_tests/common/external_interaction/alerting/alerting/test_alerting.py @@ -84,3 +84,25 @@ def test_logging_alerting_service_raises_a_log_message_with_additional_metadata_ Metadata.VISIT: "cm14451-2", }, ) + + +@patch("mx_bluesky.common.external_interaction.alerting.log_based_service.LOGGER") +def test_raise_error_alert_delegates_to_raise_alert(mock_logger: MagicMock): + set_alerting_service(LoggingAlertService(CONST.GRAYLOG_STREAM_ID, WARNING)) + get_alerting_service().raise_error_alert( + "Test message", {Metadata.SAMPLE_ID: "123456", Metadata.VISIT: "cm14451-2"} + ) + mock_logger.log.assert_called_once_with( + WARNING, + "***ALERT*** summary=UDC encountered an error on i03 content=Test message", + extra={ + ExtraMetadata.ALERT_SUMMARY: "UDC encountered an error on i03", + ExtraMetadata.ALERT_CONTENT: "Test message", + ExtraMetadata.BEAMLINE: "i03", + ExtraMetadata.GRAYLOG_URL: EXPECTED_GRAYLOG_URL, + ExtraMetadata.ISPYB_URL: "https://ispyb.diamond.ac.uk/samples/sid/123456", + Metadata.SAMPLE_ID: "123456", + ExtraMetadata.PROPOSAL: "cm14451", + Metadata.VISIT: "cm14451-2", + }, + ) diff --git a/tests/unit_tests/common/external_interaction/callbacks/sample_handling/test_sample_handling_callback.py b/tests/unit_tests/common/external_interaction/callbacks/sample_handling/test_sample_handling_callback.py index 648b017907..a5c960abbb 100644 --- a/tests/unit_tests/common/external_interaction/callbacks/sample_handling/test_sample_handling_callback.py +++ b/tests/unit_tests/common/external_interaction/callbacks/sample_handling/test_sample_handling_callback.py @@ -230,8 +230,7 @@ def test_sample_handling_callback_raises_an_alert_when_beamline_error_occurs( run_engine(plan_with_general_exception(exception_type, message)) if expect_alert: - mock_alert_service.raise_alert.assert_called_once_with( - "UDC encountered an error on i03", + mock_alert_service.raise_error_alert.assert_called_once_with( f"Hyperion encountered the following beamline error: {message}", { Metadata.SAMPLE_ID: str(TEST_SAMPLE_ID), diff --git a/tests/unit_tests/hyperion/external_interaction/test_agamemnon.py b/tests/unit_tests/hyperion/external_interaction/test_agamemnon.py index bd3ea24824..9f021219fb 100644 --- a/tests/unit_tests/hyperion/external_interaction/test_agamemnon.py +++ b/tests/unit_tests/hyperion/external_interaction/test_agamemnon.py @@ -3,10 +3,11 @@ from math import isclose from pathlib import PosixPath from typing import cast -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, call, patch import pytest from dodal.devices.zebra.zebra import RotationDirection +from requests import ConnectionError, HTTPError, Response, Timeout from mx_bluesky.hyperion._plan_runner_params import Wait from mx_bluesky.hyperion.blueapi.parameters import ( @@ -24,6 +25,7 @@ _instruction_and_data, create_parameters_from_agamemnon, ) +from mx_bluesky.hyperion.plan_runner import PlanError def set_up_agamemnon_params( @@ -417,3 +419,122 @@ def test_create_parameters_from_agamemnon_creates_wait(agamemnon_response): assert len(params) == 1 assert isinstance(params[0], Wait) assert params[0].duration_s == 12.34 + + +@patch("mx_bluesky.hyperion.external_interaction.agamemnon.time.sleep", MagicMock()) +@patch("mx_bluesky.hyperion.external_interaction.agamemnon.requests") +def test_create_parameters_from_agamemnon_retries_on_timeout_error( + mock_requests: MagicMock, +): + mock_requests.get.side_effect = Timeout() + with pytest.raises(PlanError, match="Unable to fetch instruction from agamemnon"): + create_parameters_from_agamemnon() + mock_requests.get.assert_has_calls( + [ + call( + "http://agamemnon.diamond.ac.uk/getnextcollect/i03", + headers={"Accept": "application/json"}, + ), + ] + * 3 + ) + + +@patch("mx_bluesky.hyperion.external_interaction.agamemnon.time.sleep") +@patch("mx_bluesky.hyperion.external_interaction.agamemnon.requests") +def test_create_parameters_from_agamemnon_retries_on_connection_error( + mock_requests: MagicMock, + mock_sleep: MagicMock, +): + mock_requests.get.side_effect = ConnectionError() + parent = MagicMock() + parent.attach_mock(mock_sleep, "sleep") + parent.attach_mock(mock_requests, "requests") + with pytest.raises(PlanError, match="Unable to fetch instruction from agamemnon"): + create_parameters_from_agamemnon() + parent.assert_has_calls( + [ + call.requests.get( + "http://agamemnon.diamond.ac.uk/getnextcollect/i03", + headers={"Accept": "application/json"}, + ), + call.sleep(2), + call.requests.get( + "http://agamemnon.diamond.ac.uk/getnextcollect/i03", + headers={"Accept": "application/json"}, + ), + call.sleep(4), + call.requests.get( + "http://agamemnon.diamond.ac.uk/getnextcollect/i03", + headers={"Accept": "application/json"}, + ), + ] + ) + + +@patch("mx_bluesky.hyperion.external_interaction.agamemnon.time.sleep") +@patch("mx_bluesky.hyperion.external_interaction.agamemnon.requests.get") +def test_create_parameters_from_agamemnon_retries_on_500_error( + mock_requests_get: MagicMock, + mock_sleep: MagicMock, + mock_alert_service: MagicMock, +): + response = Mock(spec=Response) + response.status_code = 500 + response.raise_for_status.side_effect = HTTPError("Test 500 error", response) + mock_requests_get.return_value = response + parent = MagicMock() + parent.attach_mock(mock_sleep, "sleep") + parent.requests.attach_mock(mock_requests_get, "get") + parent.attach_mock(response, "response") + with pytest.raises(PlanError, match="Unable to fetch instruction from agamemnon"): + create_parameters_from_agamemnon() + parent.assert_has_calls( + [ + call.requests.get( + "http://agamemnon.diamond.ac.uk/getnextcollect/i03", + headers={"Accept": "application/json"}, + ), + call.response.raise_for_status(), + call.sleep(2), + call.requests.get( + "http://agamemnon.diamond.ac.uk/getnextcollect/i03", + headers={"Accept": "application/json"}, + ), + call.response.raise_for_status(), + call.sleep(4), + call.requests.get( + "http://agamemnon.diamond.ac.uk/getnextcollect/i03", + headers={"Accept": "application/json"}, + ), + call.response.raise_for_status(), + ] + ) + mock_alert_service.raise_error_alert.assert_called_once_with( + "Unable to fetch instruction from agamemnon after 3 attempts, ending UDC.", {} + ) + + +@patch("mx_bluesky.hyperion.external_interaction.agamemnon.time.sleep") +@patch("mx_bluesky.hyperion.external_interaction.agamemnon.requests.get") +def test_create_parameters_from_agamemnon_fails_on_40x_error_and_ends_udc( + mock_requests_get: MagicMock, + mock_sleep: MagicMock, + mock_alert_service: MagicMock, +): + response = Mock(spec=Response) + response.status_code = 400 + response.raise_for_status.side_effect = HTTPError("Test 400 error", response) + mock_requests_get.return_value = response + parent = MagicMock() + parent.attach_mock(mock_sleep, "sleep") + parent.requests.attach_mock(mock_requests_get, "get") + with pytest.raises( + PlanError, match="Agamemnon returned unexpected HTTP response status code 400" + ): + create_parameters_from_agamemnon() + mock_requests_get.assert_called_once() + mock_sleep.assert_not_called() + mock_alert_service.raise_error_alert.assert_called_once_with( + "Agamemnon returned unexpected HTTP response status code 400", {} + ) diff --git a/tests/unit_tests/hyperion/supervisor/test_supervisor.py b/tests/unit_tests/hyperion/supervisor/test_supervisor.py index 45912a9e37..c14fa44d86 100644 --- a/tests/unit_tests/hyperion/supervisor/test_supervisor.py +++ b/tests/unit_tests/hyperion/supervisor/test_supervisor.py @@ -4,8 +4,17 @@ import pytest from blueapi.client.event_bus import BlueskyStreamingError +from blueapi.client.rest import ( + BlueskyRemoteControlError, + BlueskyRequestError, + InvalidParametersError, + NoContentError, + ServiceUnavailableError, + UnauthorisedAccessError, + UnknownPlanError, +) from blueapi.core import BlueskyContext -from blueapi.service.model import TaskRequest +from blueapi.service.model import TaskRequest, WorkerTask from blueapi.worker import TaskStatus, WorkerState from blueapi.worker.event import TaskError, TaskResult from bluesky import RunEngine, RunEngineInterrupted @@ -380,3 +389,68 @@ def test_sample_error_skips_subsequent_instructions( ) ) mock_blueapi_client.run_task.assert_called_once_with(ANY, on_event=ANY) + + +@patch("mx_bluesky.hyperion.supervisor._supervisor.time.sleep") +def test_supervisor_retries_service_unavailable_error( + mock_sleep: MagicMock, + runner: SupervisorRunner, + external_load_centre_collect_params: LoadCentreCollectParams, + mock_blueapi_client: MagicMock, + mock_alert_service: MagicMock, +): + mock_blueapi_client.run_task.side_effect = ServiceUnavailableError() + parent = MagicMock() + parent.attach_mock(mock_sleep, "sleep") + parent.attach_mock(mock_blueapi_client.run_task, "run_task") + with pytest.raises(PlanError, match="Unable to connect to hyperion-blueapi"): + runner.context.run_engine( + runner.decode_and_execute(TEST_VISIT, [external_load_centre_collect_params]) + ) + parent.assert_has_calls( + [ + call.run_task(ANY, on_event=ANY), + call.sleep(2), + call.run_task(ANY, on_event=ANY), + call.sleep(4), + call.run_task(ANY, on_event=ANY), + ] + ) + mock_alert_service.raise_error_alert.assert_called_once_with( + "hyperion-supervisor stopped UDC because unable to connect to hyperion-blueapi.", + {}, + ) + + +@pytest.mark.parametrize( + "exception_to_raise", + [ + BlueskyRemoteControlError(), + NoContentError(WorkerTask), + UnauthorisedAccessError(), + UnknownPlanError(), + InvalidParametersError([]), + BlueskyRequestError(422, "Test message"), + ], +) +@patch("mx_bluesky.hyperion.supervisor._supervisor.time.sleep") +def test_supervisor_hands_back_baton_if_any_other_error( + mock_sleep: MagicMock, + runner: SupervisorRunner, + external_load_centre_collect_params: LoadCentreCollectParams, + mock_blueapi_client: MagicMock, + exception_to_raise: Exception, + mock_alert_service: MagicMock, +): + mock_blueapi_client.run_task.side_effect = exception_to_raise + with pytest.raises( + PlanError, match="Unexpected error communicating with hyperion-blueapi" + ): + runner.context.run_engine( + runner.decode_and_execute(TEST_VISIT, [external_load_centre_collect_params]) + ) + mock_blueapi_client.run_task.assert_called_once_with(ANY, on_event=ANY) + mock_sleep.assert_not_called() + mock_alert_service.raise_error_alert.assert_called_once_with( + "Unexpected error communicating with hyperion-blueapi", {} + ) diff --git a/tests/unit_tests/hyperion/supervisor/test_task_monitor.py b/tests/unit_tests/hyperion/supervisor/test_task_monitor.py index d19bf30660..30a980b443 100644 --- a/tests/unit_tests/hyperion/supervisor/test_task_monitor.py +++ b/tests/unit_tests/hyperion/supervisor/test_task_monitor.py @@ -115,8 +115,7 @@ def test_task_monitor_alerts_and_cancels_request_if_stuck_not_waiting_for_beam( ) sleep(0.5) blueapi_client.abort.assert_called_with(ANY) - mock_alerting_service.raise_alert.assert_called_once_with( - "UDC encountered an error on i03", + mock_alerting_service.raise_error_alert.assert_called_once_with( "Hyperion Supervisor detected that BlueAPI was stuck for 0.25 seconds.", { Metadata.SAMPLE_ID: "5461074", @@ -137,8 +136,7 @@ def test_task_monitor_alerts_with_no_metadata_and_cancels_request_if_stuck_on_ot with monitor: sleep(0.5) blueapi_client.abort.assert_called_with(ANY) - mock_alerting_service.raise_alert.assert_called_once_with( - "UDC encountered an error on i03", + mock_alerting_service.raise_error_alert.assert_called_once_with( "Hyperion Supervisor detected that BlueAPI was stuck for 0.25 seconds.", {}, )