Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/user/hyperion/advanced/alerts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The currently supported events that will generate alerts are:
- When Hyperion moves on to a new container (puck).
- When Hyperion is paused waiting for beam for more than 10 minutes (xbpm_feedback position stable. Repeats every 10 minutes)
- When Hyperion data collection takes longer than 10 minutes (as measured from the last wait-for-beam check)
- When Hyperion supervisor is unable to communicate with hyperion-blueapi
- When Hyperion is unable to fetch the next instruction from agamemnon

Graylog Alert Configuration
===========================
Expand Down
16 changes: 16 additions & 0 deletions src/mx_bluesky/common/external_interaction/alerting/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from typing import Protocol
from urllib.parse import quote, urlencode

from dodal.utils import get_beamline_name


class Metadata(StrEnum):
"""Metadata fields that can be specified by the caller when raising an alert."""
Expand Down Expand Up @@ -47,6 +49,20 @@ def raise_alert(
"""
pass

def raise_error_alert(self, content: str, metadata: dict[Metadata, str]):
"""
Raise an alert that will be forwarded to beamline staff and EHC controllers for out-of-hours
support.
Args:
content: Plain text content detailing the nature of the incident.
metadata: A dict of strings that can be included as metadata in the alert for
those backends that support it. The summary and content will be included
by default.

"""
beamline = get_beamline_name("")
self.raise_alert(f"UDC encountered an error on {beamline}", content, metadata)


_alert_service: AlertService

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from mx_bluesky.common.external_interaction.alerting import Metadata
from mx_bluesky.common.external_interaction.alerting._service import (
AlertService,
ExtraMetadata,
graylog_url,
ispyb_url,
Expand All @@ -15,7 +16,7 @@
)


class LoggingAlertService:
class LoggingAlertService(AlertService):
"""
Implement an alert service that raises alerts by generating a specially formatted
log message, that may be intercepted by a logging service such as graylog and
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dodal.utils import get_beamline_name
from event_model import RunStart, RunStop

from mx_bluesky.common.external_interaction.alerting import (
Expand Down Expand Up @@ -70,9 +69,7 @@ def _record_exception(
sample_status = self._decode_sample_status(exception_type)
expeye.update_sample_status(self._sample_id, sample_status)
if sample_status == BLSampleStatus.ERROR_BEAMLINE:
beamline = get_beamline_name("")
get_alerting_service().raise_alert(
f"UDC encountered an error on {beamline}",
get_alerting_service().raise_error_alert(
f"Hyperion encountered the following beamline error: {reason}",
{
Metadata.SAMPLE_ID: str(self._sample_id),
Expand Down
47 changes: 44 additions & 3 deletions src/mx_bluesky/hyperion/external_interaction/agamemnon.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import re
import time
from collections.abc import Sequence
from enum import StrEnum
from os import path
Expand All @@ -9,7 +10,9 @@
import requests
from dodal.utils import get_beamline_name
from pydantic import BaseModel
from requests import ConnectionError, HTTPError, Timeout

from mx_bluesky.common.external_interaction.alerting import get_alerting_service
from mx_bluesky.common.parameters.components import (
WithVisit,
)
Expand All @@ -25,13 +28,17 @@
PinTypeParam,
SingleSamplePinTypeParam,
)
from mx_bluesky.hyperion.plan_runner import PlanError

T = TypeVar("T", bound=WithVisit)
MULTIPIN_PREFIX = "multipin"
MULTIPIN_FORMAT_DESC = "Expected multipin format is multipin_{number_of_wells}x{well_size}+{distance_between_tip_and_first_well}"
MULTIPIN_REGEX = rf"^{MULTIPIN_PREFIX}_(\d+)x(\d+(?:\.\d+)?)\+(\d+(?:\.\d+)?)$"
MX_GENERAL_ROOT_REGEX = r"^/dls/(?P<beamline>[^/]+)/data/[^/]*/(?P<visit>[^/]+)(?:/|$)"

MAX_TRIES = 3
RETRY_INITIAL_DELAY_S = 2


class _InstructionType(StrEnum):
WAIT = "wait"
Expand All @@ -43,7 +50,10 @@ def create_parameters_from_agamemnon() -> Sequence[BaseModel]:
mx-bluesky instructions.
Returns:
The generated sequence of mx-bluesky parameters, or empty list if
no instructions."""
no instructions.
Raises:
PlanError: if the instructions could not be fetched from Agamemnon
"""
beamline_name = get_beamline_name("i03")
agamemnon_instruction = _get_next_instruction(beamline_name)
if agamemnon_instruction:
Expand Down Expand Up @@ -72,8 +82,39 @@ def _instruction_and_data(agamemnon_instruction: dict) -> tuple[str, Any]:


def _get_parameters_from_url(url: str) -> dict:
response = requests.get(url, headers={"Accept": "application/json"})
response.raise_for_status()
tries, delay = MAX_TRIES, RETRY_INITIAL_DELAY_S
while tries > 0:
tries -= 1
try:
response = requests.get(url, headers={"Accept": "application/json"})
try:
response.raise_for_status()
break
except HTTPError as e:
if 500 <= response.status_code < 600:
LOGGER.warning(
f"Agamemnon returned server error status {response.status_code}, retries left {tries}: {str(e)}"
)
else:
msg = f"Agamemnon returned unexpected HTTP response status code {response.status_code}"
get_alerting_service().raise_error_alert(msg, {})
raise PlanError(msg) from e
except ConnectionError as e:
LOGGER.warning(
f"Connection error attempting to connect to agamemnon, retries left {tries}",
exc_info=e,
)
except Timeout:
LOGGER.warning(
f"Timed out attempting to connect to agamemnon, retries left {tries}"
)
if tries:
time.sleep(delay) # noqa
delay *= 2
else:
msg = f"Unable to fetch instruction from agamemnon after {MAX_TRIES} attempts, ending UDC."
get_alerting_service().raise_error_alert(msg, {})
raise PlanError(msg)
return json.loads(response.content)


Expand Down
41 changes: 38 additions & 3 deletions src/mx_bluesky/hyperion/supervisor/_supervisor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import time
from collections.abc import Sequence

from blueapi.client.client import BlueapiClient
from blueapi.client.event_bus import BlueskyStreamingError
from blueapi.client.rest import ServiceUnavailableError
from blueapi.config import ApplicationConfig
from blueapi.core import BlueskyContext
from blueapi.service.model import TaskRequest
Expand All @@ -10,6 +12,7 @@
from bluesky.utils import MsgGenerator
from pydantic import BaseModel

from mx_bluesky.common.external_interaction.alerting import get_alerting_service
from mx_bluesky.common.parameters.constants import Status
from mx_bluesky.common.utils.exceptions import CrystalNotFoundError, SampleError
from mx_bluesky.common.utils.log import LOGGER
Expand All @@ -23,6 +26,9 @@
from mx_bluesky.hyperion.plan_runner import PlanError, PlanRunner
from mx_bluesky.hyperion.supervisor._task_monitor import TaskMonitor

MAX_TRIES = 3
RETRY_INITIAL_DELAY_S = 2


class SupervisorRunner(PlanRunner):
"""Runner that executes plans by delegating to a remote blueapi instance"""
Expand Down Expand Up @@ -125,9 +131,38 @@ def shutdown(self):
def _run_task_remotely(self, task_request: TaskRequest):
try:
with TaskMonitor(self.blueapi_client, task_request) as task_monitor:
task_status = self.blueapi_client.run_task(
task_request, on_event=task_monitor.on_blueapi_event
)
tries, task_status, delay = MAX_TRIES, None, RETRY_INITIAL_DELAY_S
while tries > 0:
tries -= 1
try:
task_status = self.blueapi_client.run_task(
task_request, on_event=task_monitor.on_blueapi_event
)
break
except ServiceUnavailableError as e:
LOGGER.warning(
"Could not connect to blueapi client.", exc_info=e
)
time.sleep(delay) # noqa
delay += delay
except BlueskyStreamingError:
raise
except Exception as e:
get_alerting_service().raise_error_alert(
"Unexpected error communicating with hyperion-blueapi", {}
)
raise PlanError(
"Unexpected error communicating with hyperion-blueapi"
) from e
else:
LOGGER.error("Max retries reached, ending UDC.")
get_alerting_service().raise_error_alert(
"hyperion-supervisor stopped UDC because unable to connect to hyperion-blueapi.",
{},
)
raise PlanError(
f"Unable to connect to hyperion-blueapi after {MAX_TRIES} attempts, ending UDC"
)
LOGGER.info(
f"hyperion-blueapi completed task execution with task_status {task_status}"
)
Expand Down
4 changes: 1 addition & 3 deletions src/mx_bluesky/hyperion/supervisor/_task_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ def _reset_timer(self):
self._timer.start()

def _raise_alert_collection_is_stuck(self):
beamline = get_beamline_name("")
self._alerting_service.raise_alert(
f"UDC encountered an error on {beamline}",
self._alerting_service.raise_error_alert(
f"Hyperion Supervisor detected that BlueAPI was stuck for {self.DEFAULT_TIMEOUT_S} seconds.",
self._extract_metadata(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,25 @@ def test_logging_alerting_service_raises_a_log_message_with_additional_metadata_
Metadata.VISIT: "cm14451-2",
},
)


@patch("mx_bluesky.common.external_interaction.alerting.log_based_service.LOGGER")
def test_raise_error_alert_delegates_to_raise_alert(mock_logger: MagicMock):
set_alerting_service(LoggingAlertService(CONST.GRAYLOG_STREAM_ID, WARNING))
get_alerting_service().raise_error_alert(
"Test message", {Metadata.SAMPLE_ID: "123456", Metadata.VISIT: "cm14451-2"}
)
mock_logger.log.assert_called_once_with(
WARNING,
"***ALERT*** summary=UDC encountered an error on i03 content=Test message",
extra={
ExtraMetadata.ALERT_SUMMARY: "UDC encountered an error on i03",
ExtraMetadata.ALERT_CONTENT: "Test message",
ExtraMetadata.BEAMLINE: "i03",
ExtraMetadata.GRAYLOG_URL: EXPECTED_GRAYLOG_URL,
ExtraMetadata.ISPYB_URL: "https://ispyb.diamond.ac.uk/samples/sid/123456",
Metadata.SAMPLE_ID: "123456",
ExtraMetadata.PROPOSAL: "cm14451",
Metadata.VISIT: "cm14451-2",
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ def test_sample_handling_callback_raises_an_alert_when_beamline_error_occurs(
run_engine(plan_with_general_exception(exception_type, message))

if expect_alert:
mock_alert_service.raise_alert.assert_called_once_with(
"UDC encountered an error on i03",
mock_alert_service.raise_error_alert.assert_called_once_with(
f"Hyperion encountered the following beamline error: {message}",
{
Metadata.SAMPLE_ID: str(TEST_SAMPLE_ID),
Expand Down
Loading
Loading