From 4b2f43587bf4601ccca2a967cc88187fcea99371 Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Fri, 8 May 2026 17:02:01 +0100 Subject: [PATCH 01/14] Extend condition context manager to do conversion --- src/daq_queuing_service/app/app.py | 5 +- .../blueapi_interaction/__init__.py | 0 .../blueapi_adapter.py | 0 .../blueapi_interaction/blueapi_call.py | 7 ++ .../plugins/construct_task_request.py | 17 +++- src/daq_queuing_service/task_queue/queue.py | 95 +++++++++++++------ src/daq_queuing_service/worker/worker.py | 2 +- tests/unit_tests/test_blueapi_adapter.py | 5 +- tests/unit_tests/test_worker.py | 5 +- 9 files changed, 100 insertions(+), 36 deletions(-) create mode 100644 src/daq_queuing_service/blueapi_interaction/__init__.py rename src/daq_queuing_service/{ => blueapi_interaction}/blueapi_adapter.py (100%) create mode 100644 src/daq_queuing_service/blueapi_interaction/blueapi_call.py diff --git a/src/daq_queuing_service/app/app.py b/src/daq_queuing_service/app/app.py index 49df90a..514717b 100644 --- a/src/daq_queuing_service/app/app.py +++ b/src/daq_queuing_service/app/app.py @@ -9,8 +9,9 @@ from daq_queuing_service.api.api import create_api_router from daq_queuing_service.api.errors import register_exception_handlers -from daq_queuing_service.blueapi_adapter import BlueapiClientAdapter +from daq_queuing_service.blueapi_interaction.blueapi_adapter import BlueapiClientAdapter from daq_queuing_service.plugins.construct_task_request import ( + construct_blueapi_call_list, construct_blueapi_task_request, ) from daq_queuing_service.task_queue.queue import TaskQueue @@ -47,7 +48,7 @@ def log_task_exception(task: asyncio.Task[NoReturn]): config = load_config() app = FastAPI(lifespan=lifespan) - app.state.queue = TaskQueue() + app.state.queue = TaskQueue(construct_blueapi_call_list) blueapi_rest_client = BlueapiRestClient(config=config.blueapi.api) blueapi_client = BlueapiClient.from_config(config.blueapi) diff --git a/src/daq_queuing_service/blueapi_interaction/__init__.py b/src/daq_queuing_service/blueapi_interaction/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/daq_queuing_service/blueapi_adapter.py b/src/daq_queuing_service/blueapi_interaction/blueapi_adapter.py similarity index 100% rename from src/daq_queuing_service/blueapi_adapter.py rename to src/daq_queuing_service/blueapi_interaction/blueapi_adapter.py diff --git a/src/daq_queuing_service/blueapi_interaction/blueapi_call.py b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py new file mode 100644 index 0000000..6944b68 --- /dev/null +++ b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py @@ -0,0 +1,7 @@ +from blueapi.service.model import TaskRequest +from pydantic import BaseModel + + +class BlueapiCall(BaseModel): + task_request: TaskRequest + parent_task_id: str diff --git a/src/daq_queuing_service/plugins/construct_task_request.py b/src/daq_queuing_service/plugins/construct_task_request.py index 75b7eec..f36145d 100644 --- a/src/daq_queuing_service/plugins/construct_task_request.py +++ b/src/daq_queuing_service/plugins/construct_task_request.py @@ -1,6 +1,7 @@ from blueapi.service.model import TaskRequest -from daq_queuing_service.task import ExperimentDefinition +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall +from daq_queuing_service.task import ExperimentDefinition, TaskWithPosition def construct_blueapi_task_request( @@ -11,3 +12,17 @@ def construct_blueapi_task_request( params=experiment_definition.params, instrument_session=experiment_definition.instrument_session, ) + + +def construct_blueapi_call_list( + queue: list[TaskWithPosition], history: list[TaskWithPosition] +) -> list[BlueapiCall]: + + call_list = [ + BlueapiCall( + parent_task_id=task.id, + task_request=construct_blueapi_task_request(task.experiment_definition), + ) + for task in queue + ] + return call_list diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index f89424d..c227e38 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -1,11 +1,18 @@ import asyncio import logging -from collections.abc import Sequence +from collections.abc import Callable, Sequence +from types import TracebackType +from typing import Any from blueapi.worker.event import TaskError, TaskResult from pydantic import BaseModel -from daq_queuing_service.task import Status, Task, TaskWithPosition +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall +from daq_queuing_service.task import ( + Status, + Task, + TaskWithPosition, +) from daq_queuing_service.task_queue.queue_utils import ( NegativePositionError, TaskIdInUseError, @@ -17,6 +24,10 @@ LOGGER = logging.getLogger(__name__) +Converter = Callable[ + [list[TaskWithPosition], list[TaskWithPosition]], list[BlueapiCall] +] + class TaskRegistry(dict[str, Task]): def __missing__(self, task_id: str) -> Task: @@ -27,13 +38,37 @@ class QueueState(BaseModel): paused: bool +class Modifying(asyncio.Condition): + def __init__(self, on_exit: Callable[[], Any]): + super().__init__() + self._on_exit = on_exit + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ): + self._on_exit() + + return await super().__aexit__(exc_type, exc, tb) + + class TaskQueue: - def __init__(self): + def __init__( + self, + convert: Converter, + ): self._tasks: TaskRegistry = TaskRegistry() self._queue: list[str] = [] self._history: list[str] = [] - self._condition = asyncio.Condition() + self._call_queue: list[BlueapiCall] = [] self._state: QueueState = QueueState(paused=True) + self._convert = convert + self._modifying = Modifying(on_exit=self._after_modifying) + + def _after_modifying(self): + self._call_queue = self._convert(self._get_queue(), self._get_history()) async def claim_next_task_once_available(self) -> Task: """Waits until a task is available before returning the task. A task is @@ -43,12 +78,12 @@ async def claim_next_task_once_available(self) -> Task: Returns: Task: The task at the top of the queue """ - async with self._condition: + async with self._modifying: while not self._task_available(): - await self._condition.wait() + await self._modifying.wait() task = self._tasks[self._queue[0]] task.claim() - self._condition.notify_all() + self._modifying.notify_all() LOGGER.info(f"Task {task.id} has been claimed") return task @@ -57,9 +92,9 @@ async def wait_until_task_available(self): available if it's at the top of the queue, is not already in progress or claimed, and the queue is not paused. """ - async with self._condition: + async with self._modifying: while not self._task_available(): - await self._condition.wait() + await self._modifying.wait() async def return_task_to_queue(self, task: Task): """Returns a task to the queue that had previously been claimed @@ -71,7 +106,7 @@ async def return_task_to_queue(self, task: Task): TaskNotClaimedError: Raised if the task's status is not have a 'Claimed' """ self._check_task_valid_to_be_returned(task) - async with self._condition: + async with self._modifying: match task.status: case Status.CLAIMED: assert task.id == self._queue[0] @@ -81,7 +116,7 @@ async def return_task_to_queue(self, task: Task): f"Cannot return task {task.id}, " + f"it's status is {task.status}." ) - self._condition.notify_all() + self._modifying.notify_all() LOGGER.info(f"Task {task.id} has been returned to the queue") async def complete_task(self, task: Task, result: TaskResult): @@ -91,7 +126,7 @@ async def complete_task(self, task: Task, result: TaskResult): task (Task): Task to be completed result (TaskResult): The result of the task from blueapi """ - async with self._condition: + async with self._modifying: self._check_task_valid_to_be_returned(task) assert self._queue[0] == task.id, ( f"This task is not at the front of the queue: {task}" @@ -99,7 +134,7 @@ async def complete_task(self, task: Task, result: TaskResult): task.succeed(result) self._queue.pop(0) self._history.append(task.id) - self._condition.notify_all() + self._modifying.notify_all() LOGGER.info(f"Task {task.id} has been completed successfully: {result}") async def fail_task(self, task: Task, errors: list[str | TaskError] | None = None): @@ -110,7 +145,7 @@ async def fail_task(self, task: Task, errors: list[str | TaskError] | None = Non errors (list[str | TaskError] | None, optional): A list of errors that occurred when trying to run the task. Defaults to None. """ - async with self._condition: + async with self._modifying: self._check_task_valid_to_be_returned(task) assert self._queue[0] == task.id, ( f"This task is not at the front of the queue: {task}" @@ -118,7 +153,7 @@ async def fail_task(self, task: Task, errors: list[str | TaskError] | None = Non task.fail(errors) self._queue.pop(0) self._history.append(task.id) - self._condition.notify_all() + self._modifying.notify_all() LOGGER.info(f"Task {task.id} has failed with the following errors: {errors}") async def get_task_by_id(self, task_id: str) -> TaskWithPosition: @@ -134,7 +169,7 @@ async def get_task_by_id(self, task_id: str) -> TaskWithPosition: TaskNotFoundError: Raised if the no task exists with the requested task ID. """ # Returns copy so don't have to be worried about caller modifying task. - async with self._condition: + async with self._modifying: return self._get_task_by_id(task_id) def _get_task_by_id(self, task_id: str) -> TaskWithPosition: @@ -153,7 +188,7 @@ async def get_task_by_position(self, position: int) -> TaskWithPosition | None: if no task exists at the requested position. """ # Returns copy so don't have to be worried about caller modifying task. - async with self._condition: + async with self._modifying: if position < -self.length or position >= self.length: return None return self._get_task_by_id(self._queue[position]) @@ -166,7 +201,7 @@ async def get_queue(self) -> list[TaskWithPosition]: will be run in. """ # Returns copies so don't have to be worried about caller modifying tasks. - async with self._condition: + async with self._modifying: return self._get_queue() async def get_history(self) -> list[TaskWithPosition]: @@ -177,7 +212,7 @@ async def get_history(self) -> list[TaskWithPosition]: chronological order. """ # Returns copies so don't have to be worried about caller modifying tasks. - async with self._condition: + async with self._modifying: return self._get_history() async def get_tasks(self) -> list[TaskWithPosition]: @@ -188,7 +223,7 @@ async def get_tasks(self) -> list[TaskWithPosition]: with the history. """ # Returns copies so don't have to be worried about caller modifying tasks. - async with self._condition: + async with self._modifying: return self._get_history() + self._get_queue() async def add_tasks(self, tasks: list[Task], position: int | None = None) -> None: @@ -202,12 +237,12 @@ async def add_tasks(self, tasks: list[Task], position: int | None = None) -> Non tasks (list[Task]): List of tasks to add position (int | None, optional): Position of the tasks. Defaults to None. """ - async with self._condition: + async with self._modifying: self._validate_new_tasks(tasks) if position is not None: position = self._get_valid_position(position) self._add_tasks(tasks, position) - self._condition.notify_all() + self._modifying.notify_all() LOGGER.info(f"Successfully added tasks to queue: {[task.id for task in tasks]}") async def move_task(self, task_id: str, position: int) -> int: @@ -222,12 +257,12 @@ async def move_task(self, task_id: str, position: int) -> int: Returns: int: The new position of the task (may be different to what was requested) """ - async with self._condition: + async with self._modifying: self._validate_tasks_for_move_or_deletion([task_id]) position = self._get_valid_position(position) self._remove_tasks_from_queue([task_id]) self._queue[position:position] = [task_id] - self._condition.notify_all() + self._modifying.notify_all() new_position = self._queue.index(task_id) LOGGER.info(f"Succesfully moved task {task_id} to position {new_position}") return new_position @@ -243,14 +278,14 @@ async def cancel_tasks(self, task_ids: Sequence[str]) -> list[Task]: Returns: list[Task]: List of the task objects that were removed from the queue. """ - async with self._condition: + async with self._modifying: task_ids = list(task_ids) self._validate_tasks_for_move_or_deletion(task_ids) self._remove_tasks_from_queue(task_ids) tasks = self._remove_tasks_from_registry(task_ids) for task in tasks: task.cancel() - self._condition.notify_all() + self._modifying.notify_all() LOGGER.info(f"Succesfully cancelled tasks: {task_ids}") return tasks @@ -258,11 +293,11 @@ async def clear_history(self): """Clears the history list. Any task in the history list at the time will be deleted permanently and inaccessible. """ - async with self._condition: + async with self._modifying: for task_id in self._history: self._tasks.pop(task_id) self._history.clear() - self._condition.notify_all() + self._modifying.notify_all() LOGGER.info("Succesfully cleared history") async def update_state(self, paused: bool | None = None) -> QueueState: @@ -274,11 +309,11 @@ async def update_state(self, paused: bool | None = None) -> QueueState: Returns: QueueState: The new state of the queue. """ - async with self._condition: + async with self._modifying: self._state = QueueState( paused=self._state.paused if paused is None else paused ) - self._condition.notify_all() + self._modifying.notify_all() LOGGER.info(f"Succesfully updated queue state to {self._state}") return self._state diff --git a/src/daq_queuing_service/worker/worker.py b/src/daq_queuing_service/worker/worker.py index f475e18..09ff909 100644 --- a/src/daq_queuing_service/worker/worker.py +++ b/src/daq_queuing_service/worker/worker.py @@ -15,7 +15,7 @@ from blueapi.worker import ProgressEvent, TaskStatus, WorkerEvent, WorkerState from blueapi.worker.event import TaskError, TaskResult -from daq_queuing_service.blueapi_adapter import BlueapiClientAdapter +from daq_queuing_service.blueapi_interaction.blueapi_adapter import BlueapiClientAdapter from daq_queuing_service.task import ExperimentDefinition, Status, Task from daq_queuing_service.task_queue.queue import TaskQueue diff --git a/tests/unit_tests/test_blueapi_adapter.py b/tests/unit_tests/test_blueapi_adapter.py index 7bb88eb..cec1dd0 100644 --- a/tests/unit_tests/test_blueapi_adapter.py +++ b/tests/unit_tests/test_blueapi_adapter.py @@ -12,7 +12,10 @@ from blueapi.worker import WorkerState from blueapi.worker.event import TaskResult, TaskStatus -from daq_queuing_service.blueapi_adapter import BlueapiClientAdapter, BlueapiResult +from daq_queuing_service.blueapi_interaction.blueapi_adapter import ( + BlueapiClientAdapter, + BlueapiResult, +) async def test_blueapi_adapter_get_state_returns_blueapi_response_with_state(): diff --git a/tests/unit_tests/test_worker.py b/tests/unit_tests/test_worker.py index c72d147..6253deb 100644 --- a/tests/unit_tests/test_worker.py +++ b/tests/unit_tests/test_worker.py @@ -16,7 +16,10 @@ from blueapi.worker import ProgressEvent, TaskStatus, WorkerEvent, WorkerState from pytest import LogCaptureFixture -from daq_queuing_service.blueapi_adapter import BlueapiClientAdapter, BlueapiResult +from daq_queuing_service.blueapi_interaction.blueapi_adapter import ( + BlueapiClientAdapter, + BlueapiResult, +) from daq_queuing_service.task import ExperimentDefinition, Status from daq_queuing_service.task_queue.queue import TaskError, TaskQueue, TaskResult from daq_queuing_service.worker.worker import QueueWorker From b58093e607fb7bb9291f9ec2c53a78f818298846 Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Fri, 8 May 2026 18:16:11 +0100 Subject: [PATCH 02/14] Update some tests --- tests/unit_tests/conftest.py | 5 ++++- tests/unit_tests/test_queue.py | 30 +++++++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index 50b1bed..e4094ae 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -1,6 +1,9 @@ import pytest from blueapi.worker.event import TaskError, TaskResult +from daq_queuing_service.plugins.construct_task_request import ( + construct_blueapi_call_list, +) from daq_queuing_service.task import ExperimentDefinition, Task from daq_queuing_service.task_queue.queue import TaskQueue @@ -20,7 +23,7 @@ def tasks() -> list[Task]: @pytest.fixture async def task_queue(tasks: list[Task]): - queue = TaskQueue() + queue = TaskQueue(convert=construct_blueapi_call_list) await queue.update_state(paused=False) await queue.add_tasks(tasks) return queue diff --git a/tests/unit_tests/test_queue.py b/tests/unit_tests/test_queue.py index 4cc6b01..c4ebc7f 100644 --- a/tests/unit_tests/test_queue.py +++ b/tests/unit_tests/test_queue.py @@ -2,8 +2,13 @@ import copy import pytest +from blueapi.service.model import TaskRequest from blueapi.worker.event import TaskError, TaskResult +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall +from daq_queuing_service.plugins.construct_task_request import ( + construct_blueapi_call_list, +) from daq_queuing_service.task import ExperimentDefinition, Status, Task from daq_queuing_service.task_queue.queue import ( TaskQueue, @@ -36,6 +41,21 @@ async def test_add_tasks_adds_to_end_when_no_position_given(task_queue: TaskQueu assert set(task_queue._tasks.keys()) == {"0", "1", "2", "3", "4", "new"} +async def test_add_tasks_adds_to_call_queue(): + task_queue = TaskQueue(convert=construct_blueapi_call_list) + await task_queue.add_tasks([make_new_task("new"), make_new_task("new_2")]) + assert task_queue._call_queue == [ + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="new", + ), + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="new_2", + ), + ] + + async def test_add_tasks_with_position_works_as_expected(task_queue: TaskQueue): new_task = make_new_task("new") await task_queue.add_tasks([new_task], 2) @@ -124,7 +144,7 @@ async def test_move_task_works_as_expected_and_returns_new_position( expected_order: list[int], expected_return_value: int, ): - queue = TaskQueue() + queue = TaskQueue(convert=construct_blueapi_call_list) tasks = [make_new_task(str(i)) for i in range(10)] await queue.add_tasks(tasks) task = str(task_to_move) @@ -144,7 +164,11 @@ async def test_move_task_to_position_0_moves_to_position_1_if_first_task_in_prog new_position = await task_queue_in_progress.move_task("4", 0) assert new_position == 1 - assert task_queue_in_progress._queue == ["0", "4", "1", "2", "3"] + expected_order = ["0", "4", "1", "2", "3"] + assert task_queue_in_progress._queue == expected_order + assert [ + call.parent_task_id for call in task_queue_in_progress._call_queue + ] == expected_order async def test_move_task_to_position_0_moves_to_position_0_if_first_task_waiting( @@ -541,7 +565,7 @@ async def test_wait_until_task_available_waits_if_queue_paused( async def test_wait_until_task_available_waits_if_queue_empty(): - task_queue = TaskQueue() + task_queue = TaskQueue(construct_blueapi_call_list) with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) From 1bf6917aafcc101ad1d854d02e93336b2ac3a8ad Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Mon, 11 May 2026 08:38:06 +0100 Subject: [PATCH 03/14] WIP --- .../blueapi_interaction/blueapi_call.py | 1 + .../plugins/construct_task_request.py | 2 +- src/daq_queuing_service/task.py | 8 +++++--- src/daq_queuing_service/task_queue/queue.py | 19 ++++++++++++++----- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src/daq_queuing_service/blueapi_interaction/blueapi_call.py b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py index 6944b68..7519388 100644 --- a/src/daq_queuing_service/blueapi_interaction/blueapi_call.py +++ b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py @@ -5,3 +5,4 @@ class BlueapiCall(BaseModel): task_request: TaskRequest parent_task_id: str + blueapi_id: str | None = None diff --git a/src/daq_queuing_service/plugins/construct_task_request.py b/src/daq_queuing_service/plugins/construct_task_request.py index f36145d..06dbdd3 100644 --- a/src/daq_queuing_service/plugins/construct_task_request.py +++ b/src/daq_queuing_service/plugins/construct_task_request.py @@ -15,7 +15,7 @@ def construct_blueapi_task_request( def construct_blueapi_call_list( - queue: list[TaskWithPosition], history: list[TaskWithPosition] + queue: list[TaskWithPosition], history: list[TaskWithPosition], call_history: list[BlueapiCall] ) -> list[BlueapiCall]: call_list = [ diff --git a/src/daq_queuing_service/task.py b/src/daq_queuing_service/task.py index ec6ee56..66156d1 100644 --- a/src/daq_queuing_service/task.py +++ b/src/daq_queuing_service/task.py @@ -7,8 +7,10 @@ from blueapi.worker.event import TaskError, TaskResult from pydantic import BaseModel, Field +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall -def _create_uuid_str() -> str: + +def create_uuid_str() -> str: return str(uuid4()) @@ -44,13 +46,13 @@ class ExperimentDefinition(BaseModel): class Task(BaseModel): experiment_definition: ExperimentDefinition - id: str = Field(default_factory=_create_uuid_str) + id: str = Field(default_factory=create_uuid_str) status: Status = Status.WAITING time_started: str | None = None time_completed: str | None = None errors: list[str | TaskError] = Field(default_factory=list[str | TaskError]) result: TaskResult | None = None - blueapi_id: str | None = None + blueapi_calls: list[BlueapiCall] = [] def _update_status(self, new_status: Status): """Updates the status of the task, checking that the transition is valid diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index c227e38..e8aa274 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -25,7 +25,8 @@ LOGGER = logging.getLogger(__name__) Converter = Callable[ - [list[TaskWithPosition], list[TaskWithPosition]], list[BlueapiCall] + [list[TaskWithPosition], list[TaskWithPosition], list[BlueapiCall]], + list[BlueapiCall], ] @@ -50,7 +51,6 @@ async def __aexit__( tb: TracebackType | None, ): self._on_exit() - return await super().__aexit__(exc_type, exc, tb) @@ -63,12 +63,20 @@ def __init__( self._queue: list[str] = [] self._history: list[str] = [] self._call_queue: list[BlueapiCall] = [] + self._queue_history: list[BlueapiCall] = [] self._state: QueueState = QueueState(paused=True) self._convert = convert - self._modifying = Modifying(on_exit=self._after_modifying) + self._modifying = Modifying(on_exit=self._update_call_queue) - def _after_modifying(self): - self._call_queue = self._convert(self._get_queue(), self._get_history()) + def _update_call_queue(self): + self._call_queue = self._convert( + self._get_queue(), self._get_history(), self._queue_history + ) + for task_id in self._queue: + task = self._tasks[task_id] + task.blueapi_calls = [] + for call in self._call_queue: + self._tasks[call.parent_task_id].blueapi_calls.append(call) async def claim_next_task_once_available(self) -> Task: """Waits until a task is available before returning the task. A task is @@ -81,6 +89,7 @@ async def claim_next_task_once_available(self) -> Task: async with self._modifying: while not self._task_available(): await self._modifying.wait() + self._update_call_queue() task = self._tasks[self._queue[0]] task.claim() self._modifying.notify_all() From dba7dcfef167abd6b625f89ce6c1c4bd951b3c8d Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Mon, 11 May 2026 10:21:12 +0100 Subject: [PATCH 04/14] Notify condition wait after updating call queue --- src/daq_queuing_service/task_queue/queue.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index e8aa274..4012c3a 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -77,6 +77,7 @@ def _update_call_queue(self): task.blueapi_calls = [] for call in self._call_queue: self._tasks[call.parent_task_id].blueapi_calls.append(call) + self._modifying.notify_all() async def claim_next_task_once_available(self) -> Task: """Waits until a task is available before returning the task. A task is @@ -92,7 +93,6 @@ async def claim_next_task_once_available(self) -> Task: self._update_call_queue() task = self._tasks[self._queue[0]] task.claim() - self._modifying.notify_all() LOGGER.info(f"Task {task.id} has been claimed") return task @@ -125,7 +125,6 @@ async def return_task_to_queue(self, task: Task): f"Cannot return task {task.id}, " + f"it's status is {task.status}." ) - self._modifying.notify_all() LOGGER.info(f"Task {task.id} has been returned to the queue") async def complete_task(self, task: Task, result: TaskResult): @@ -143,7 +142,6 @@ async def complete_task(self, task: Task, result: TaskResult): task.succeed(result) self._queue.pop(0) self._history.append(task.id) - self._modifying.notify_all() LOGGER.info(f"Task {task.id} has been completed successfully: {result}") async def fail_task(self, task: Task, errors: list[str | TaskError] | None = None): @@ -162,7 +160,6 @@ async def fail_task(self, task: Task, errors: list[str | TaskError] | None = Non task.fail(errors) self._queue.pop(0) self._history.append(task.id) - self._modifying.notify_all() LOGGER.info(f"Task {task.id} has failed with the following errors: {errors}") async def get_task_by_id(self, task_id: str) -> TaskWithPosition: @@ -251,7 +248,6 @@ async def add_tasks(self, tasks: list[Task], position: int | None = None) -> Non if position is not None: position = self._get_valid_position(position) self._add_tasks(tasks, position) - self._modifying.notify_all() LOGGER.info(f"Successfully added tasks to queue: {[task.id for task in tasks]}") async def move_task(self, task_id: str, position: int) -> int: @@ -271,7 +267,6 @@ async def move_task(self, task_id: str, position: int) -> int: position = self._get_valid_position(position) self._remove_tasks_from_queue([task_id]) self._queue[position:position] = [task_id] - self._modifying.notify_all() new_position = self._queue.index(task_id) LOGGER.info(f"Succesfully moved task {task_id} to position {new_position}") return new_position @@ -294,7 +289,6 @@ async def cancel_tasks(self, task_ids: Sequence[str]) -> list[Task]: tasks = self._remove_tasks_from_registry(task_ids) for task in tasks: task.cancel() - self._modifying.notify_all() LOGGER.info(f"Succesfully cancelled tasks: {task_ids}") return tasks @@ -306,7 +300,6 @@ async def clear_history(self): for task_id in self._history: self._tasks.pop(task_id) self._history.clear() - self._modifying.notify_all() LOGGER.info("Succesfully cleared history") async def update_state(self, paused: bool | None = None) -> QueueState: @@ -322,7 +315,6 @@ async def update_state(self, paused: bool | None = None) -> QueueState: self._state = QueueState( paused=self._state.paused if paused is None else paused ) - self._modifying.notify_all() LOGGER.info(f"Succesfully updated queue state to {self._state}") return self._state From 959b9f60a3b47c2739b55dba34c39e9ef5e3d7b2 Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Mon, 11 May 2026 16:40:21 +0100 Subject: [PATCH 05/14] Hold status inside blueapi call and update queue to match --- .../blueapi_interaction/blueapi_call.py | 89 ++++- .../plugins/construct_task_request.py | 4 +- src/daq_queuing_service/task.py | 112 ++---- src/daq_queuing_service/task_queue/queue.py | 126 +++--- tests/unit_tests/conftest.py | 40 +- tests/unit_tests/test_queue.py | 371 ++++++++++-------- tests/unit_tests/test_task.py | 164 ++------ 7 files changed, 430 insertions(+), 476 deletions(-) diff --git a/src/daq_queuing_service/blueapi_interaction/blueapi_call.py b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py index 7519388..d01b960 100644 --- a/src/daq_queuing_service/blueapi_interaction/blueapi_call.py +++ b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py @@ -1,8 +1,93 @@ +from datetime import datetime +from enum import StrEnum + from blueapi.service.model import TaskRequest -from pydantic import BaseModel +from blueapi.worker.event import TaskError, TaskResult +from pydantic import BaseModel, Field + + +class Status(StrEnum): + WAITING = "Waiting" # Waiting in the queue + CLAIMED = "Claimed" # Claimed by the worker + IN_PROGRESS = "In progress" # In progress inside BlueAPI + SUCCESS = "Success" # Completed successfully + ERROR = "Error" # Error while trying to run + + @property + def allowed_transitions(self): + allowed_transitions: dict[Status, set[Status]] = { # from: to + Status.WAITING: {Status.CLAIMED}, + Status.CLAIMED: {Status.WAITING, Status.IN_PROGRESS, Status.ERROR}, + Status.IN_PROGRESS: {Status.SUCCESS, Status.ERROR}, + Status.SUCCESS: set(), + Status.ERROR: set(), + } + return allowed_transitions[self] class BlueapiCall(BaseModel): task_request: TaskRequest - parent_task_id: str + parent_task_id: str | None = None + status: Status = Status.WAITING + time_started: str | None = None + time_completed: str | None = None + result: TaskResult | None = None + errors: list[str | TaskError] = Field(default_factory=list[str | TaskError]) blueapi_id: str | None = None + + def _update_status(self, new_status: Status): + """Updates the status of the task, checking that the transition is valid + + Args: + new_status (Status): New status of the task + + Raises: + ValueError: Raised if the transition from the task's current status to the + new status is not permitted. + """ + allowed = self.status.allowed_transitions + if new_status not in allowed: + raise ValueError( + f"Can't go from current state '{self.status}' to '{new_status}'. " + + f"Allowed transitions from {self.status}: {allowed}." + ) + self.status = new_status + + def wait(self): + """Updates the task status to WAITING""" + self._update_status(Status.WAITING) + + def claim(self): + """Updates the task status to CLAIMED""" + self._update_status(Status.CLAIMED) + + def put_in_progress(self): + """Updates the task status to IN_PROGRESS and sets the time_started field to the + current time + """ + self._update_status(Status.IN_PROGRESS) + self.time_started = datetime.now().isoformat() + + def succeed(self, result: TaskResult): + """Updates the task status to SUCCESS, sets the time_completed field to the + current time, and sets the result field with the result from blueapi + + Args: + result (TaskResult): The result of the task from blueapi + """ + self._update_status(Status.SUCCESS) + self.result = result + self.time_completed = datetime.now().isoformat() + + def fail(self, errors: list[str | TaskError] | None = None): + """Updates the task status to ERROR, sets the time_completed field to the + current time, and adds any errors to the errors field. + + Args: + errors (list[str | TaskError] | None, optional): List of errors that + occurred when trying to run the task. Defaults to None. + """ + self._update_status(Status.ERROR) + self.time_completed = datetime.now().isoformat() + if errors: + self.errors.extend(errors) diff --git a/src/daq_queuing_service/plugins/construct_task_request.py b/src/daq_queuing_service/plugins/construct_task_request.py index 06dbdd3..8f3e70e 100644 --- a/src/daq_queuing_service/plugins/construct_task_request.py +++ b/src/daq_queuing_service/plugins/construct_task_request.py @@ -15,7 +15,9 @@ def construct_blueapi_task_request( def construct_blueapi_call_list( - queue: list[TaskWithPosition], history: list[TaskWithPosition], call_history: list[BlueapiCall] + queue: list[TaskWithPosition], + history: list[TaskWithPosition], + call_history: list[BlueapiCall], ) -> list[BlueapiCall]: call_list = [ diff --git a/src/daq_queuing_service/task.py b/src/daq_queuing_service/task.py index 66156d1..e890c70 100644 --- a/src/daq_queuing_service/task.py +++ b/src/daq_queuing_service/task.py @@ -1,38 +1,22 @@ from collections.abc import Mapping -from datetime import datetime -from enum import StrEnum from typing import Any, Self from uuid import uuid4 -from blueapi.worker.event import TaskError, TaskResult +from blueapi.service.model import StrEnum from pydantic import BaseModel, Field -from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, Status def create_uuid_str() -> str: return str(uuid4()) -class Status(StrEnum): - WAITING = "Waiting" # Waiting in the queue - CLAIMED = "Claimed" # Claimed by the worker - IN_PROGRESS = "In progress" # In progress inside BlueAPI - SUCCESS = "Success" # Completed successfully - ERROR = "Error" # Error while trying to run - CANCELLED = "Cancelled" # Cancelled before being run - - @property - def allowed_transitions(self): - allowed_transitions: dict[Status, set[Status]] = { # from: to - Status.WAITING: {Status.CLAIMED, Status.CANCELLED}, - Status.CLAIMED: {Status.WAITING, Status.IN_PROGRESS, Status.ERROR}, - Status.IN_PROGRESS: {Status.SUCCESS, Status.ERROR}, - Status.SUCCESS: set(), - Status.ERROR: set(), - Status.CANCELLED: set(), - } - return allowed_transitions[self] +class TaskStatus(StrEnum): + QUEUED = "Queued" + IN_PROGRESS = "In progress" + COMPLETE = "Complete" + CANCELLED = "Cancelled" class ExperimentDefinition(BaseModel): @@ -47,73 +31,27 @@ class ExperimentDefinition(BaseModel): class Task(BaseModel): experiment_definition: ExperimentDefinition id: str = Field(default_factory=create_uuid_str) - status: Status = Status.WAITING - time_started: str | None = None - time_completed: str | None = None - errors: list[str | TaskError] = Field(default_factory=list[str | TaskError]) - result: TaskResult | None = None blueapi_calls: list[BlueapiCall] = [] - - def _update_status(self, new_status: Status): - """Updates the status of the task, checking that the transition is valid - - Args: - new_status (Status): New status of the task - - Raises: - ValueError: Raised if the transition from the task's current status to the - new status is not permitted. - """ - allowed = self.status.allowed_transitions - if new_status not in allowed: - raise ValueError( - f"Can't go from current state '{self.status}' to '{new_status}'. " - + f"Allowed transitions from {self.status}: {allowed}." - ) - self.status = new_status - - def wait(self): - """Updates the task status to WAITING""" - self._update_status(Status.WAITING) - - def claim(self): - """Updates the task status to CLAIMED""" - self._update_status(Status.CLAIMED) - - def put_in_progress(self): - """Updates the task status to IN_PROGRESS and sets the time_started field to the - current time - """ - self._update_status(Status.IN_PROGRESS) - self.time_started = datetime.now().isoformat() - - def succeed(self, result: TaskResult): - """Updates the task status to SUCCESS, sets the time_completed field to the - current time, and sets the result field with the result from blueapi - - Args: - result (TaskResult): The result of the task from blueapi - """ - self._update_status(Status.SUCCESS) - self.result = result - self.time_completed = datetime.now().isoformat() - - def fail(self, errors: list[str | TaskError] | None = None): - """Updates the task status to ERROR, sets the time_completed field to the - current time, and adds any errors to the errors field. - - Args: - errors (list[str | TaskError] | None, optional): List of errors that - occurred when trying to run the task. Defaults to None. - """ - self._update_status(Status.ERROR) - self.time_completed = datetime.now().isoformat() - if errors: - self.errors.extend(errors) + _cancelled: bool = False def cancel(self): - """Sets the task status to CANCELLED""" - self._update_status(Status.CANCELLED) + self._cancelled = True + + @property + def status(self) -> TaskStatus: + if self._cancelled: + return TaskStatus.CANCELLED + if self.blueapi_calls and all( + call.status in [Status.SUCCESS, Status.ERROR] for call in self.blueapi_calls + ): + return TaskStatus.COMPLETE + if any( + call.status + in [Status.IN_PROGRESS, Status.CLAIMED, Status.SUCCESS, Status.ERROR] + for call in self.blueapi_calls + ): + return TaskStatus.IN_PROGRESS + return TaskStatus.QUEUED class TaskWithPosition(Task): diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index 4012c3a..8cc40f3 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -7,12 +7,8 @@ from blueapi.worker.event import TaskError, TaskResult from pydantic import BaseModel -from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall -from daq_queuing_service.task import ( - Status, - Task, - TaskWithPosition, -) +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, Status +from daq_queuing_service.task import Task, TaskStatus, TaskWithPosition from daq_queuing_service.task_queue.queue_utils import ( NegativePositionError, TaskIdInUseError, @@ -63,40 +59,61 @@ def __init__( self._queue: list[str] = [] self._history: list[str] = [] self._call_queue: list[BlueapiCall] = [] + self._call_history: list[BlueapiCall] = [] self._queue_history: list[BlueapiCall] = [] self._state: QueueState = QueueState(paused=True) self._convert = convert self._modifying = Modifying(on_exit=self._update_call_queue) def _update_call_queue(self): - self._call_queue = self._convert( - self._get_queue(), self._get_history(), self._queue_history - ) for task_id in self._queue: task = self._tasks[task_id] - task.blueapi_calls = [] - for call in self._call_queue: - self._tasks[call.parent_task_id].blueapi_calls.append(call) + if task.status == TaskStatus.COMPLETE: + self._queue.remove(task_id) + self._history.append(task_id) + elif task.status != TaskStatus.IN_PROGRESS: + task.blueapi_calls = [] + + self._call_queue = [ + call + for call in self._call_queue + if call.parent_task_id + and call.parent_task_id in self._queue + and self._tasks[call.parent_task_id].status == TaskStatus.IN_PROGRESS + ] + + new_queue = self._convert( + [task for task in self._get_queue() if task.status == TaskStatus.QUEUED], + self._get_history(), + self._queue_history, + ) + + for call in new_queue: + if call.parent_task_id: + self._tasks[call.parent_task_id].blueapi_calls.append(call) + + self._call_queue.extend(new_queue) + self._modifying.notify_all() - async def claim_next_task_once_available(self) -> Task: - """Waits until a task is available before returning the task. A task is - available if it's at the top of the queue, is not already in progress or + async def get_next_call_once_available(self) -> BlueapiCall: + """Waits until a call is available before returning the call. A call is + available if it's at the top of the call queue, is not already in progress or claimed, and the queue is not paused. Returns: - Task: The task at the top of the queue + BlueapiCall: The call at the top of the queue """ async with self._modifying: while not self._task_available(): await self._modifying.wait() self._update_call_queue() - task = self._tasks[self._queue[0]] - task.claim() - LOGGER.info(f"Task {task.id} has been claimed") - return task + call = self._call_queue[0] + call.claim() + LOGGER.info(f"Plan {call} has been claimed") + return call - async def wait_until_task_available(self): + async def wait_until_call_available(self): """Waits until a task is available before returning. A task is available if it's at the top of the queue, is not already in progress or claimed, and the queue is not paused. @@ -105,7 +122,7 @@ async def wait_until_task_available(self): while not self._task_available(): await self._modifying.wait() - async def return_task_to_queue(self, task: Task): + async def return_call_to_queue(self, call: BlueapiCall): """Returns a task to the queue that had previously been claimed Args: @@ -114,20 +131,20 @@ async def return_task_to_queue(self, task: Task): Raises: TaskNotClaimedError: Raised if the task's status is not have a 'Claimed' """ - self._check_task_valid_to_be_returned(task) + self._check_call_valid_to_be_returned(call) async with self._modifying: - match task.status: + match call.status: case Status.CLAIMED: - assert task.id == self._queue[0] - task.wait() + assert call == self._call_queue[0] + call.wait() case _: raise TaskNotClaimedError( - f"Cannot return task {task.id}, " - + f"it's status is {task.status}." + f"Cannot return call {call}, " + + f"it's status is {call.status}." ) - LOGGER.info(f"Task {task.id} has been returned to the queue") + LOGGER.info(f"Call {call} has been returned to the queue") - async def complete_task(self, task: Task, result: TaskResult): + async def complete_call(self, call: BlueapiCall, result: TaskResult): """Sets a task to complete, removes it from the queue and adds it to history Args: @@ -135,16 +152,17 @@ async def complete_task(self, task: Task, result: TaskResult): result (TaskResult): The result of the task from blueapi """ async with self._modifying: - self._check_task_valid_to_be_returned(task) - assert self._queue[0] == task.id, ( - f"This task is not at the front of the queue: {task}" + self._check_call_valid_to_be_returned(call) + assert call == self._call_queue[0], ( + f"This call is not at the front of the queue: {call}" ) - task.succeed(result) - self._queue.pop(0) - self._history.append(task.id) - LOGGER.info(f"Task {task.id} has been completed successfully: {result}") + call.succeed(result) + self._call_history.append(call) + LOGGER.info(f"Plan {call} has been completed successfully: {result}") - async def fail_task(self, task: Task, errors: list[str | TaskError] | None = None): + async def fail_call( + self, call: BlueapiCall, errors: list[str | TaskError] | None = None + ): """Sets a task to failed, removes it from the queue and adds it to history Args: @@ -153,14 +171,10 @@ async def fail_task(self, task: Task, errors: list[str | TaskError] | None = Non occurred when trying to run the task. Defaults to None. """ async with self._modifying: - self._check_task_valid_to_be_returned(task) - assert self._queue[0] == task.id, ( - f"This task is not at the front of the queue: {task}" - ) - task.fail(errors) - self._queue.pop(0) - self._history.append(task.id) - LOGGER.info(f"Task {task.id} has failed with the following errors: {errors}") + self._check_call_valid_to_be_returned(call) + call.fail(errors) + self._call_history.append(call) + LOGGER.info(f"Call {call} has failed with the following errors: {errors}") async def get_task_by_id(self, task_id: str) -> TaskWithPosition: """Returns a task based on it's task ID @@ -334,26 +348,28 @@ def _task_available(self) -> bool: Returns: bool: Whether or not the queue has a task available. """ - if self._state.paused or not self._queue: + if self._state.paused or not self._call_queue: return False - return self._tasks[self._queue[0]].status == Status.WAITING + return self._call_queue[0].status == Status.WAITING - def _check_task_valid_to_be_returned(self, task: Task): + def _check_call_valid_to_be_returned(self, call: BlueapiCall): # Check caller has actual task object not copy # This ensures the caller has claimed the task, reducing the chance a task is # returned that is actually still being run/modified by a different process. # However if the worker crashes we then lose the Task object and can't return # the task? Needs discussion with others. - assert task is self._tasks[task.id] - assert task.id in self._queue, f"This task is not in the queue: {task}" + assert call is self._call_queue[0] + assert call.parent_task_id in self._queue, ( + f"This call has no parent task: {call}" + ) def _get_valid_position(self, position: int) -> int: if position < 0: raise NegativePositionError(f"Position must be >= 0, got {position}") if ( # if position 0 requested but a task is in progress, return position 1 - self.length - and position == 0 - and self._tasks[self._queue[0]].status != Status.WAITING + position == 0 + and self.length + and self._tasks[self._queue[0]].status != TaskStatus.QUEUED ): return 1 return position @@ -411,7 +427,7 @@ def _validate_tasks_for_move_or_deletion(self, task_ids: list[str]): task = self._tasks[task_id] if task_id not in self._queue: raise TaskNotInQueueError(f"Task {task_id} isn't present in queue") - if task.status != Status.WAITING: + if task.status != TaskStatus.QUEUED: raise TaskInProgressError( f"Cannot move task '{task_id}', it is currently in progress!" ) diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index e4094ae..aa46779 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -31,43 +31,37 @@ async def task_queue(tasks: list[Task]): @pytest.fixture async def task_queue_claimed(task_queue: TaskQueue): - first_task_id = task_queue._queue[0] - first_task = task_queue._tasks[first_task_id] - first_task.claim() + _ = task_queue.get_next_call_once_available() return task_queue @pytest.fixture -async def task_queue_in_progress(task_queue_claimed: TaskQueue): - first_task_id = task_queue_claimed._queue[0] - first_task = task_queue_claimed._tasks[first_task_id] - first_task.blueapi_id = "blueapi_id_0" - first_task.put_in_progress() - return task_queue_claimed +async def task_queue_in_progress(task_queue: TaskQueue): + first_call = await task_queue.get_next_call_once_available() + first_call.put_in_progress() + return task_queue @pytest.fixture async def task_queue_with_history(task_queue: TaskQueue): for i in range(2): - task = await task_queue.claim_next_task_once_available() - task.blueapi_id = f"blueapi_id_{i}" - task.put_in_progress() + call = await task_queue.get_next_call_once_available() + call.put_in_progress() if i % 2: - await task_queue.complete_task( - task, TaskResult(result=None, type="NoneType") + await task_queue.complete_call( + call, TaskResult(result=None, type="NoneType") ) else: - await task_queue.fail_task( - task, [TaskError(type="ValueError", message="Error during plan")] + await task_queue.fail_call( + call, [TaskError(type="ValueError", message="Error during plan")] ) # By this point should have 3 tasks in queue and 2 in history - for i, task_id in enumerate(task_queue._history): + for i, call in enumerate(task_queue._call_history): # Real timestamps will break tests - task_queue._tasks[task_id].time_started = f"2026-04-17T15:0{i}:00.000000" - task_queue._tasks[task_id].time_completed = f"2026-04-17T15:0{i}:59.000000" + call.time_started = f"2026-04-17T15:0{i}:00.000000" + call.time_completed = f"2026-04-17T15:0{i}:59.000000" - task = await task_queue.claim_next_task_once_available() - task.blueapi_id = f"blueapi_id_{2}" - task.put_in_progress() - task.time_started = "2026-04-17T15:02:00.000000" + call = await task_queue.get_next_call_once_available() + call.put_in_progress() + call.time_started = "2026-04-17T15:02:00.000000" return task_queue diff --git a/tests/unit_tests/test_queue.py b/tests/unit_tests/test_queue.py index c4ebc7f..203e65a 100644 --- a/tests/unit_tests/test_queue.py +++ b/tests/unit_tests/test_queue.py @@ -9,7 +9,7 @@ from daq_queuing_service.plugins.construct_task_request import ( construct_blueapi_call_list, ) -from daq_queuing_service.task import ExperimentDefinition, Status, Task +from daq_queuing_service.task import ExperimentDefinition, Status, Task, TaskStatus from daq_queuing_service.task_queue.queue import ( TaskQueue, TaskWithPosition, @@ -77,7 +77,7 @@ async def test_add_task_to_position_0_adds_to_position_1_if_first_task_in_progre ): new_tasks = [make_new_task("new"), make_new_task("new_2")] first_task = await task_queue_in_progress.get_task_by_position(0) - assert first_task and first_task.status == Status.IN_PROGRESS + assert first_task and first_task.status == TaskStatus.IN_PROGRESS await task_queue_in_progress.add_tasks(new_tasks, 0) @@ -98,7 +98,7 @@ async def test_add_task_to_position_0_adds_to_position_0_if_first_task_waiting( ): new_tasks = new_tasks = [make_new_task("new"), make_new_task("new_2")] first_task = await task_queue.get_task_by_position(0) - assert first_task and first_task.status == Status.WAITING + assert first_task and first_task.status == TaskStatus.QUEUED await task_queue.add_tasks(new_tasks, 0) @@ -175,30 +175,17 @@ async def test_move_task_to_position_0_moves_to_position_0_if_first_task_waiting task_queue: TaskQueue, ): task = await task_queue.get_task_by_position(0) - assert task and task.status == Status.WAITING + assert task and task.status == TaskStatus.QUEUED await task_queue.move_task("4", 0) assert task_queue._queue == ["4", "0", "1", "2", "3"] -async def test_move_task_does_not_move_task_that_is_claimed_and_raises_error( - task_queue_claimed: TaskQueue, -): - task = await task_queue_claimed.get_task_by_position(0) - assert task and task.status == Status.CLAIMED - - with pytest.raises(TaskInProgressError): - await task_queue_claimed.move_task("0", 3) - - assert task_queue_claimed._queue == ["0", "1", "2", "3", "4"] - assert set(task_queue_claimed._tasks.keys()) == {"0", "1", "2", "3", "4"} - - async def test_move_task_does_not_move_task_that_is_in_progress_and_raises_error( task_queue_in_progress: TaskQueue, ): task = await task_queue_in_progress.get_task_by_position(0) - assert task and task.status == Status.IN_PROGRESS + assert task and task.status == TaskStatus.IN_PROGRESS with pytest.raises(TaskInProgressError): await task_queue_in_progress.move_task("0", 3) @@ -211,7 +198,7 @@ async def test_move_task_raises_error_if_wrong_task_id_given( task_queue_in_progress: TaskQueue, ): task = await task_queue_in_progress.get_task_by_position(0) - assert task and task.status == Status.IN_PROGRESS + assert task and task.status == TaskStatus.IN_PROGRESS with pytest.raises(TaskNotFoundError): await task_queue_in_progress.move_task("10", 3) @@ -224,24 +211,11 @@ async def test_remove_tasks_works_as_expected(task_queue: TaskQueue): assert task_queue._queue == ["0", "1", "3"] -async def test_remove_tasks_does_not_remove_task_that_is_claimed_and_raises_error( - task_queue_claimed: TaskQueue, -): - task = await task_queue_claimed.get_task_by_position(0) - assert task and task.status == Status.CLAIMED - - with pytest.raises(TaskInProgressError): - await task_queue_claimed.cancel_tasks(["0", "2"]) - - assert task_queue_claimed._queue == ["0", "1", "2", "3", "4"] - assert set(task_queue_claimed._tasks.keys()) == {"0", "1", "2", "3", "4"} - - async def test_remove_tasks_does_not_remove_task_that_is_in_progress_and_raises_error( task_queue_in_progress: TaskQueue, ): task = await task_queue_in_progress.get_task_by_position(0) - assert task and task.status == Status.IN_PROGRESS + assert task and task.status == TaskStatus.IN_PROGRESS with pytest.raises(TaskInProgressError): await task_queue_in_progress.cancel_tasks(["0", "2"]) @@ -279,22 +253,33 @@ async def test_get_queue_only_returns_tasks_in_queue( plan_name="test", sample_id="2", instrument_session="" ), id="2", - status=Status.IN_PROGRESS, - time_started="2026-04-17T15:02:00.000000", - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="2", + status=Status.IN_PROGRESS, + time_started="2026-04-17T15:02:00.000000", + time_completed=None, + errors=[], + ) + ], position=0, - blueapi_id="blueapi_id_2", ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="3", instrument_session="" ), id="3", - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="3", + status=Status.WAITING, + time_started=None, + time_completed=None, + errors=[], + ) + ], position=1, ), TaskWithPosition( @@ -302,10 +287,16 @@ async def test_get_queue_only_returns_tasks_in_queue( plan_name="test", sample_id="4", instrument_session="" ), id="4", - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="4", + status=Status.WAITING, + time_started=None, + time_completed=None, + errors=[], + ) + ], position=2, ), ] @@ -322,30 +313,42 @@ async def test_get_history_only_returns_tasks_in_history( plan_name="test", sample_id="0", instrument_session="" ), id="0", - status=Status.ERROR, - time_started="2026-04-17T15:00:00.000000", - time_completed="2026-04-17T15:00:59.000000", - errors=[ - TaskError( - outcome="error", type="ValueError", message="Error during plan" + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="0", + status=Status.ERROR, + time_started="2026-04-17T15:00:00.000000", + time_completed="2026-04-17T15:00:59.000000", + errors=[ + TaskError( + outcome="error", + type="ValueError", + message="Error during plan", + ) + ], + result=None, ) ], position=None, - blueapi_id="blueapi_id_0", - result=None, ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="1", instrument_session="" ), id="1", - status=Status.SUCCESS, - time_started="2026-04-17T15:01:00.000000", - time_completed="2026-04-17T15:01:59.000000", - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="1", + status=Status.SUCCESS, + time_started="2026-04-17T15:01:00.000000", + time_completed="2026-04-17T15:01:59.000000", + errors=[], + result=TaskResult(result=None, type="NoneType"), + ) + ], position=None, - blueapi_id="blueapi_id_1", - result=TaskResult(result=None, type="NoneType"), ), ] @@ -362,66 +365,93 @@ async def test_get_tasks_returns_tasks_in_queue_and_history( plan_name="test", sample_id="0", instrument_session="" ), id="0", - status=Status.ERROR, - time_started="2026-04-17T15:00:00.000000", - time_completed="2026-04-17T15:00:59.000000", - errors=[ - TaskError( - outcome="error", type="ValueError", message="Error during plan" + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="0", + status=Status.ERROR, + time_started="2026-04-17T15:00:00.000000", + time_completed="2026-04-17T15:00:59.000000", + errors=[ + TaskError( + outcome="error", + type="ValueError", + message="Error during plan", + ) + ], + result=None, ) ], position=None, - blueapi_id="blueapi_id_0", - result=None, ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="1", instrument_session="" ), id="1", - status=Status.SUCCESS, - time_started="2026-04-17T15:01:00.000000", - time_completed="2026-04-17T15:01:59.000000", - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="1", + status=Status.SUCCESS, + time_started="2026-04-17T15:01:00.000000", + time_completed="2026-04-17T15:01:59.000000", + errors=[], + result=TaskResult(result=None, type="NoneType"), + ) + ], position=None, - blueapi_id="blueapi_id_1", - result=TaskResult(result=None, type="NoneType"), ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="2", instrument_session="" ), id="2", - status=Status.IN_PROGRESS, - time_started="2026-04-17T15:02:00.000000", - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="2", + status=Status.IN_PROGRESS, + time_started="2026-04-17T15:02:00.000000", + time_completed=None, + errors=[], + ) + ], position=0, - blueapi_id="blueapi_id_2", ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="3", instrument_session="" ), id="3", - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="3", + status=Status.WAITING, + time_started=None, + time_completed=None, + errors=[], + ) + ], position=1, - blueapi_id=None, ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="4", instrument_session="" ), id="4", - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="4", + status=Status.WAITING, + time_started=None, + time_completed=None, + errors=[], + ) + ], position=2, - blueapi_id=None, ), ] @@ -490,169 +520,164 @@ async def test_pausing_queue_prevents_task_from_being_claimed(task_queue: TaskQu await task_queue.update_state(paused=True) assert task_queue.state.paused with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for( - task_queue.claim_next_task_once_available(), timeout=0.05 - ) + await asyncio.wait_for(task_queue.get_next_call_once_available(), timeout=0.05) async def test_unpausing_queue_allows_tasks_to_being_claimed(task_queue: TaskQueue): await task_queue.update_state(paused=False) assert not task_queue.state.paused - await task_queue.claim_next_task_once_available() + await task_queue.get_next_call_once_available() -async def test_claim_next_task_once_available_claims_task_and_returns( +async def test_claim_next_call_once_available_claims_task_and_returns( task_queue: TaskQueue, ): next_task = task_queue._tasks[task_queue._queue[0]] - assert next_task and next_task.status == Status.WAITING + next_call = next_task.blueapi_calls[0] + assert next_task and next_task.status == TaskStatus.QUEUED + assert next_call.status == Status.WAITING - claimed_task = await task_queue.claim_next_task_once_available() - assert claimed_task is next_task - assert claimed_task.status == Status.CLAIMED + claimed_call = await task_queue.get_next_call_once_available() + assert claimed_call.parent_task_id == next_call.parent_task_id + assert claimed_call.status == Status.CLAIMED -async def test_claim_next_task_once_available_waits_if_next_task_is_already_claimed( +async def test_claim_next_call_once_available_waits_if_next_task_is_already_claimed( task_queue: TaskQueue, ): - claimed_task = await task_queue.claim_next_task_once_available() - assert claimed_task and claimed_task.status == Status.CLAIMED + claimed_call = await task_queue.get_next_call_once_available() + assert claimed_call and claimed_call.status == Status.CLAIMED with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for( - task_queue.claim_next_task_once_available(), timeout=0.05 - ) + await asyncio.wait_for(task_queue.get_next_call_once_available(), timeout=0.05) -async def test_claim_next_task_once_available_waits_if_next_task_is_already_in_progress( +async def test_claim_next_call_once_available_waits_if_next_task_is_already_in_progress( task_queue: TaskQueue, ): - claimed_task = await task_queue.claim_next_task_once_available() - claimed_task.blueapi_id = "blueapi_id" - claimed_task.put_in_progress() - assert claimed_task and claimed_task.status == Status.IN_PROGRESS + claimed_call = await task_queue.get_next_call_once_available() + claimed_call.put_in_progress() + assert claimed_call and claimed_call.status == Status.IN_PROGRESS with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for( - task_queue.claim_next_task_once_available(), timeout=0.05 - ) + await asyncio.wait_for(task_queue.get_next_call_once_available(), timeout=0.05) -async def test_wait_until_task_available_waits_if_next_task_is_claimed( +async def test_wait_until_call_available_waits_if_next_task_is_claimed( task_queue: TaskQueue, ): - claimed_task = await task_queue.claim_next_task_once_available() - assert claimed_task and claimed_task.status == Status.CLAIMED + claimed_call = await task_queue.get_next_call_once_available() + assert claimed_call and claimed_call.status == Status.CLAIMED with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_wait_until_task_available_waits_if_next_task_is_in_progress( +async def test_wait_until_call_available_waits_if_next_task_is_in_progress( task_queue: TaskQueue, ): - claimed_task = await task_queue.claim_next_task_once_available() - claimed_task.blueapi_id = "blueapi_id" - claimed_task.put_in_progress() - assert claimed_task and claimed_task.status == Status.IN_PROGRESS + claimed_call = await task_queue.get_next_call_once_available() + claimed_call.put_in_progress() + assert claimed_call and claimed_call.status == Status.IN_PROGRESS with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_wait_until_task_available_waits_if_queue_paused( +async def test_wait_until_call_available_waits_if_queue_paused( task_queue: TaskQueue, ): await task_queue.update_state(paused=True) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_wait_until_task_available_waits_if_queue_empty(): +async def test_wait_until_call_available_waits_if_queue_empty(): task_queue = TaskQueue(construct_blueapi_call_list) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_wait_until_task_available_does_not_wait_if_conditions_met( +async def test_wait_until_call_available_does_not_wait_if_conditions_met( task_queue: TaskQueue, ): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_complete_task_puts_task_in_history_and_updates_status_to_complete( +async def test_complete_call_puts_call_in_history_and_updates_status_to_complete( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - task.blueapi_id = "blueapi_id" - task.put_in_progress() - assert task.status == Status.IN_PROGRESS - await task_queue.complete_task(task, TaskResult(result=None, type="NoneType")) - assert task.id not in task_queue._queue - assert task.id in task_queue._history - assert task.status == Status.SUCCESS + call = await task_queue.get_next_call_once_available() + call.put_in_progress() + assert call.status == Status.IN_PROGRESS + await task_queue.complete_call(call, TaskResult(result=None, type="NoneType")) + assert call.parent_task_id not in task_queue._queue + assert call.parent_task_id in task_queue._history + assert call not in task_queue._call_queue + assert call in task_queue._call_history + assert call.status == Status.SUCCESS -async def test_complete_task_must_receive_exact_same_object_as_was_claimed( +async def test_complete_call_must_receive_exact_same_object_as_was_claimed( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - similar_task = task.model_copy() - another_similar_task = copy.copy(task) + call = await task_queue.get_next_call_once_available() + similar_call = call.model_copy() + another_similar_call = copy.copy(call) with pytest.raises(AssertionError): - await task_queue.complete_task( - similar_task, TaskResult(result=None, type="NoneType") + await task_queue.complete_call( + similar_call, TaskResult(result=None, type="NoneType") ) with pytest.raises(AssertionError): - await task_queue.complete_task( - another_similar_task, TaskResult(result=None, type="NoneType") + await task_queue.complete_call( + another_similar_call, TaskResult(result=None, type="NoneType") ) -async def test_fail_task_puts_task_in_history_and_updates_status_to_complete( +async def test_fail_call_puts_task_in_history_and_updates_status_to_complete( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - assert task.status == Status.CLAIMED - await task_queue.fail_task(task) - assert task.id not in task_queue._queue - assert task.id in task_queue._history - assert task.status == Status.ERROR + call = await task_queue.get_next_call_once_available() + assert call.status == Status.CLAIMED + await task_queue.fail_call(call) + assert call.parent_task_id not in task_queue._queue + assert call.parent_task_id in task_queue._history + assert call not in task_queue._call_queue + assert call in task_queue._call_history + assert call.status == Status.ERROR -async def test_fail_task_must_receive_exact_same_object_as_was_claimed( +async def test_fail_call_must_receive_exact_same_object_as_was_claimed( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - similar_task = task.model_copy() - another_similar_task = copy.copy(task) + call = await task_queue.get_next_call_once_available() + similar_call = call.model_copy() + another_similar_call = copy.copy(call) with pytest.raises(AssertionError): - await task_queue.fail_task(similar_task) + await task_queue.fail_call(similar_call) with pytest.raises(AssertionError): - await task_queue.fail_task(another_similar_task) + await task_queue.fail_call(another_similar_call) -async def test_fail_task_with_errors_adds_errors_to_task( +async def test_fail_call_with_errors_adds_errors_to_call( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() + call = await task_queue.get_next_call_once_available() error = "This task failed" - await task_queue.fail_task(task, [str(error)]) - assert task.status == Status.ERROR - assert task.errors == ["This task failed"] - assert task.id in task_queue._history - assert task.id not in task_queue._queue + await task_queue.fail_call(call, [str(error)]) + assert call.status == Status.ERROR + assert call.errors == ["This task failed"] -async def test_return_task_to_queue_changes_task_status_to_waiting( +async def test_return_call_to_queue_changes_task_status_to_waiting( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - assert task.status == Status.CLAIMED - await task_queue.return_task_to_queue(task) - assert task.status == Status.WAITING + call = await task_queue.get_next_call_once_available() + assert call.status == Status.CLAIMED + await task_queue.return_call_to_queue(call) + assert call.status == Status.WAITING async def test_return_task_to_queue_raises_error_if_task_has_not_been_claimed( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - task.status = Status.SUCCESS + call = await task_queue.get_next_call_once_available() + call.status = Status.SUCCESS with pytest.raises(TaskNotClaimedError): - await task_queue.return_task_to_queue(task) + await task_queue.return_call_to_queue(call) diff --git a/tests/unit_tests/test_task.py b/tests/unit_tests/test_task.py index 4c7c70c..43034ed 100644 --- a/tests/unit_tests/test_task.py +++ b/tests/unit_tests/test_task.py @@ -1,148 +1,42 @@ import pytest -from blueapi.worker.event import TaskResult +from blueapi.service.model import TaskRequest -from daq_queuing_service.task import ExperimentDefinition, Status, Task +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, Status +from daq_queuing_service.task import ExperimentDefinition, Task, TaskStatus @pytest.mark.parametrize( - "old_status, new_status", + "blueapi_call_statuses, expected_task_status", [ - [Status.WAITING, Status.WAITING], - [Status.WAITING, Status.IN_PROGRESS], - [Status.WAITING, Status.SUCCESS], - [Status.WAITING, Status.ERROR], - [Status.CLAIMED, Status.CLAIMED], - [Status.CLAIMED, Status.SUCCESS], - [Status.CLAIMED, Status.CANCELLED], - [Status.IN_PROGRESS, Status.WAITING], - [Status.IN_PROGRESS, Status.CLAIMED], - [Status.IN_PROGRESS, Status.IN_PROGRESS], - [Status.IN_PROGRESS, Status.CANCELLED], - [Status.SUCCESS, Status.WAITING], - [Status.SUCCESS, Status.CLAIMED], - [Status.SUCCESS, Status.IN_PROGRESS], - [Status.SUCCESS, Status.SUCCESS], - [Status.SUCCESS, Status.ERROR], - [Status.SUCCESS, Status.CANCELLED], - [Status.ERROR, Status.WAITING], - [Status.ERROR, Status.CLAIMED], - [Status.ERROR, Status.IN_PROGRESS], - [Status.ERROR, Status.SUCCESS], - [Status.ERROR, Status.ERROR], - [Status.ERROR, Status.CANCELLED], - [Status.CANCELLED, Status.WAITING], - [Status.CANCELLED, Status.CLAIMED], - [Status.CANCELLED, Status.IN_PROGRESS], - [Status.CANCELLED, Status.SUCCESS], - [Status.CANCELLED, Status.ERROR], - [Status.CANCELLED, Status.CANCELLED], + ([Status.WAITING], TaskStatus.QUEUED), + ([Status.CLAIMED], TaskStatus.IN_PROGRESS), + ([Status.IN_PROGRESS], TaskStatus.IN_PROGRESS), + ([Status.SUCCESS], TaskStatus.COMPLETE), + ([Status.ERROR], TaskStatus.COMPLETE), + ([Status.WAITING, Status.WAITING], TaskStatus.QUEUED), + ([Status.WAITING, Status.CLAIMED], TaskStatus.IN_PROGRESS), + ([Status.WAITING, Status.IN_PROGRESS], TaskStatus.IN_PROGRESS), + ([Status.WAITING, Status.SUCCESS], TaskStatus.IN_PROGRESS), + ([Status.WAITING, Status.ERROR], TaskStatus.IN_PROGRESS), + ([Status.CLAIMED, Status.SUCCESS], TaskStatus.IN_PROGRESS), + ([Status.CLAIMED, Status.ERROR], TaskStatus.IN_PROGRESS), + ([Status.IN_PROGRESS, Status.SUCCESS], TaskStatus.IN_PROGRESS), + ([Status.IN_PROGRESS, Status.ERROR], TaskStatus.IN_PROGRESS), + ([Status.SUCCESS, Status.ERROR], TaskStatus.COMPLETE), ], ) -def test__update_status_raises_error_when_transitioned_to_wrong_status( - old_status: Status, new_status: Status +def test_task_status_derived_correctly_from_call_statuses( + blueapi_call_statuses: list[Status], expected_task_status: TaskStatus ): task = Task( + blueapi_calls=[ + BlueapiCall( + status=status, task_request=TaskRequest(name="", instrument_session="") + ) + for status in blueapi_call_statuses + ], experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" + plan_name="", sample_id="", instrument_session="" ), - status=old_status, ) - with pytest.raises(ValueError): - task._update_status(new_status) - - -@pytest.mark.parametrize( - "old_status, new_status", - [ - [Status.WAITING, Status.CLAIMED], - [Status.WAITING, Status.CANCELLED], - [Status.CLAIMED, Status.WAITING], - [Status.CLAIMED, Status.IN_PROGRESS], - [Status.CLAIMED, Status.ERROR], - [Status.IN_PROGRESS, Status.SUCCESS], - [Status.IN_PROGRESS, Status.ERROR], - ], -) -def test__update_status_changes_status_when_correct_new_status_given( - old_status: Status, new_status: Status -): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=old_status, - ) - task._update_status(new_status) - assert task.status == new_status - - -def test_wait_updates_status_to_waiting(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.CLAIMED, - ) - task.wait() - assert task.status == Status.WAITING - - -def test_claim_updates_status_to_claimed(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.WAITING, - ) - task.claim() - assert task.status == Status.CLAIMED - - -def test_put_in_progress_updates_status_to_in_progress_and_adds_fields(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.CLAIMED, - ) - task.blueapi_id = "blueapi_id" - task.put_in_progress() - assert task.status == Status.IN_PROGRESS - assert task.time_started is not None - assert task.blueapi_id == "blueapi_id" - - -def test_succeed_updates_status_to_success_and_adds_time_completed(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.IN_PROGRESS, - ) - task.succeed(TaskResult(result=None, type="NoneType")) - assert task.status == Status.SUCCESS - assert task.time_completed is not None - - -def test_fail_updates_status_to_error_and_adds_time_completed_and_errors(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.IN_PROGRESS, - ) - task.fail(["errors", "more_errors"]) - assert task.status == Status.ERROR - assert task.time_completed is not None - assert task.errors == ["errors", "more_errors"] - - -def test_cancel_updates_status_to_cancelled(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.WAITING, - ) - task.cancel() - assert task.status == Status.CANCELLED + assert task.status == expected_task_status From ce78f0737ae6579fd3a020b3a7c8d945166b0840 Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Mon, 11 May 2026 17:02:37 +0100 Subject: [PATCH 06/14] Rename statuses and update worker --- .../blueapi_interaction/blueapi_call.py | 32 ++++++----- src/daq_queuing_service/task.py | 24 +++++---- src/daq_queuing_service/task_queue/queue.py | 20 +++---- src/daq_queuing_service/worker/worker.py | 53 +++++++++---------- tests/unit_tests/test_task.py | 36 ++++++------- tests/unit_tests/test_worker.py | 30 ++++++----- 6 files changed, 105 insertions(+), 90 deletions(-) diff --git a/src/daq_queuing_service/blueapi_interaction/blueapi_call.py b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py index d01b960..b2f163e 100644 --- a/src/daq_queuing_service/blueapi_interaction/blueapi_call.py +++ b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py @@ -6,7 +6,7 @@ from pydantic import BaseModel, Field -class Status(StrEnum): +class CallStatus(StrEnum): WAITING = "Waiting" # Waiting in the queue CLAIMED = "Claimed" # Claimed by the worker IN_PROGRESS = "In progress" # In progress inside BlueAPI @@ -15,12 +15,16 @@ class Status(StrEnum): @property def allowed_transitions(self): - allowed_transitions: dict[Status, set[Status]] = { # from: to - Status.WAITING: {Status.CLAIMED}, - Status.CLAIMED: {Status.WAITING, Status.IN_PROGRESS, Status.ERROR}, - Status.IN_PROGRESS: {Status.SUCCESS, Status.ERROR}, - Status.SUCCESS: set(), - Status.ERROR: set(), + allowed_transitions: dict[CallStatus, set[CallStatus]] = { # from: to + CallStatus.WAITING: {CallStatus.CLAIMED}, + CallStatus.CLAIMED: { + CallStatus.WAITING, + CallStatus.IN_PROGRESS, + CallStatus.ERROR, + }, + CallStatus.IN_PROGRESS: {CallStatus.SUCCESS, CallStatus.ERROR}, + CallStatus.SUCCESS: set(), + CallStatus.ERROR: set(), } return allowed_transitions[self] @@ -28,14 +32,14 @@ def allowed_transitions(self): class BlueapiCall(BaseModel): task_request: TaskRequest parent_task_id: str | None = None - status: Status = Status.WAITING + status: CallStatus = CallStatus.WAITING time_started: str | None = None time_completed: str | None = None result: TaskResult | None = None errors: list[str | TaskError] = Field(default_factory=list[str | TaskError]) blueapi_id: str | None = None - def _update_status(self, new_status: Status): + def _update_status(self, new_status: CallStatus): """Updates the status of the task, checking that the transition is valid Args: @@ -55,17 +59,17 @@ def _update_status(self, new_status: Status): def wait(self): """Updates the task status to WAITING""" - self._update_status(Status.WAITING) + self._update_status(CallStatus.WAITING) def claim(self): """Updates the task status to CLAIMED""" - self._update_status(Status.CLAIMED) + self._update_status(CallStatus.CLAIMED) def put_in_progress(self): """Updates the task status to IN_PROGRESS and sets the time_started field to the current time """ - self._update_status(Status.IN_PROGRESS) + self._update_status(CallStatus.IN_PROGRESS) self.time_started = datetime.now().isoformat() def succeed(self, result: TaskResult): @@ -75,7 +79,7 @@ def succeed(self, result: TaskResult): Args: result (TaskResult): The result of the task from blueapi """ - self._update_status(Status.SUCCESS) + self._update_status(CallStatus.SUCCESS) self.result = result self.time_completed = datetime.now().isoformat() @@ -87,7 +91,7 @@ def fail(self, errors: list[str | TaskError] | None = None): errors (list[str | TaskError] | None, optional): List of errors that occurred when trying to run the task. Defaults to None. """ - self._update_status(Status.ERROR) + self._update_status(CallStatus.ERROR) self.time_completed = datetime.now().isoformat() if errors: self.errors.extend(errors) diff --git a/src/daq_queuing_service/task.py b/src/daq_queuing_service/task.py index e890c70..331b99c 100644 --- a/src/daq_queuing_service/task.py +++ b/src/daq_queuing_service/task.py @@ -5,14 +5,14 @@ from blueapi.service.model import StrEnum from pydantic import BaseModel, Field -from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, Status +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus def create_uuid_str() -> str: return str(uuid4()) -class TaskStatus(StrEnum): +class Status(StrEnum): QUEUED = "Queued" IN_PROGRESS = "In progress" COMPLETE = "Complete" @@ -38,20 +38,26 @@ def cancel(self): self._cancelled = True @property - def status(self) -> TaskStatus: + def status(self) -> Status: if self._cancelled: - return TaskStatus.CANCELLED + return Status.CANCELLED if self.blueapi_calls and all( - call.status in [Status.SUCCESS, Status.ERROR] for call in self.blueapi_calls + call.status in [CallStatus.SUCCESS, CallStatus.ERROR] + for call in self.blueapi_calls ): - return TaskStatus.COMPLETE + return Status.COMPLETE if any( call.status - in [Status.IN_PROGRESS, Status.CLAIMED, Status.SUCCESS, Status.ERROR] + in [ + Status.IN_PROGRESS, + CallStatus.CLAIMED, + CallStatus.SUCCESS, + CallStatus.ERROR, + ] for call in self.blueapi_calls ): - return TaskStatus.IN_PROGRESS - return TaskStatus.QUEUED + return Status.IN_PROGRESS + return Status.QUEUED class TaskWithPosition(Task): diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index 8cc40f3..49da306 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -7,8 +7,8 @@ from blueapi.worker.event import TaskError, TaskResult from pydantic import BaseModel -from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, Status -from daq_queuing_service.task import Task, TaskStatus, TaskWithPosition +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus +from daq_queuing_service.task import Status, Task, TaskWithPosition from daq_queuing_service.task_queue.queue_utils import ( NegativePositionError, TaskIdInUseError, @@ -68,10 +68,10 @@ def __init__( def _update_call_queue(self): for task_id in self._queue: task = self._tasks[task_id] - if task.status == TaskStatus.COMPLETE: + if task.status == Status.COMPLETE: self._queue.remove(task_id) self._history.append(task_id) - elif task.status != TaskStatus.IN_PROGRESS: + elif task.status != Status.IN_PROGRESS: task.blueapi_calls = [] self._call_queue = [ @@ -79,11 +79,11 @@ def _update_call_queue(self): for call in self._call_queue if call.parent_task_id and call.parent_task_id in self._queue - and self._tasks[call.parent_task_id].status == TaskStatus.IN_PROGRESS + and self._tasks[call.parent_task_id].status == Status.IN_PROGRESS ] new_queue = self._convert( - [task for task in self._get_queue() if task.status == TaskStatus.QUEUED], + [task for task in self._get_queue() if task.status == Status.QUEUED], self._get_history(), self._queue_history, ) @@ -134,7 +134,7 @@ async def return_call_to_queue(self, call: BlueapiCall): self._check_call_valid_to_be_returned(call) async with self._modifying: match call.status: - case Status.CLAIMED: + case CallStatus.CLAIMED: assert call == self._call_queue[0] call.wait() case _: @@ -350,7 +350,7 @@ def _task_available(self) -> bool: """ if self._state.paused or not self._call_queue: return False - return self._call_queue[0].status == Status.WAITING + return self._call_queue[0].status == CallStatus.WAITING def _check_call_valid_to_be_returned(self, call: BlueapiCall): # Check caller has actual task object not copy @@ -369,7 +369,7 @@ def _get_valid_position(self, position: int) -> int: if ( # if position 0 requested but a task is in progress, return position 1 position == 0 and self.length - and self._tasks[self._queue[0]].status != TaskStatus.QUEUED + and self._tasks[self._queue[0]].status != Status.QUEUED ): return 1 return position @@ -427,7 +427,7 @@ def _validate_tasks_for_move_or_deletion(self, task_ids: list[str]): task = self._tasks[task_id] if task_id not in self._queue: raise TaskNotInQueueError(f"Task {task_id} isn't present in queue") - if task.status != TaskStatus.QUEUED: + if task.status != Status.QUEUED: raise TaskInProgressError( f"Cannot move task '{task_id}', it is currently in progress!" ) diff --git a/src/daq_queuing_service/worker/worker.py b/src/daq_queuing_service/worker/worker.py index 09ff909..bb486e4 100644 --- a/src/daq_queuing_service/worker/worker.py +++ b/src/daq_queuing_service/worker/worker.py @@ -16,7 +16,8 @@ from blueapi.worker.event import TaskError, TaskResult from daq_queuing_service.blueapi_interaction.blueapi_adapter import BlueapiClientAdapter -from daq_queuing_service.task import ExperimentDefinition, Status, Task +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus +from daq_queuing_service.task import ExperimentDefinition from daq_queuing_service.task_queue.queue import TaskQueue LOGGER = logging.getLogger(__name__) @@ -38,16 +39,16 @@ def __init__( async def run_loop(self): while True: - next_task = await self._wait_for_next_task() - await self._process_task(next_task) + next_call = await self._wait_for_next_task() + await self._process_call(next_call) self._at_loop_end() def _at_loop_end(self): LOGGER.info("Loop finished") - async def _wait_for_next_task(self): + async def _wait_for_next_task(self) -> BlueapiCall: while True: - await self._queue.wait_until_task_available() + await self._queue.wait_until_call_available() result = await self._client.get_state() if result.value == WorkerState.IDLE: break @@ -55,18 +56,18 @@ async def _wait_for_next_task(self): f"Waiting for BlueAPI worker to be IDLE, currently {result.value}" ) await asyncio.sleep(self.poll_time_s) - return await self._queue.claim_next_task_once_available() + return await self._queue.get_next_call_once_available() - async def _process_task(self, task: Task): - task_request = self._task_request_constructor(task.experiment_definition) - LOGGER.info(f"Sending task {task.id} to BlueAPI") + async def _process_call(self, call: BlueapiCall): + task_request = call.task_request + LOGGER.info(f"Sending call {call} to BlueAPI") result = await self._client.run_task( - task_request, on_event=partial(self._on_blueapi_event, task=task) + task_request, on_event=partial(self._on_blueapi_event, call=call) ) if result.error: - await self._handle_run_task_error(task, result.error) + await self._handle_run_task_error(call, result.error) return assert result.value @@ -75,28 +76,27 @@ async def _process_task(self, task: Task): match task_status.result: case TaskResult(): LOGGER.debug( - f"Task {task.id} completed succesfully: {task_status.result}" + f"Call {call} completed succesfully: {task_status.result}" ) - print("Task completed") - await self._queue.complete_task(task, task_status.result) + await self._queue.complete_call(call, task_status.result) case TaskError(): - LOGGER.debug(f"Task {task.id} failed: {task_status.result}") - await self._queue.fail_task(task, [task_status.result]) + LOGGER.debug(f"Call {call} failed: {task_status.result}") + await self._queue.fail_call(call, [task_status.result]) @staticmethod - def _on_blueapi_event(event: AnyEvent, task: Task): + def _on_blueapi_event(event: AnyEvent, call: BlueapiCall): match event: case WorkerEvent() as worker_event: if ( worker_event.state == WorkerState.RUNNING - and task.status != Status.IN_PROGRESS + and call.status != CallStatus.IN_PROGRESS ): assert worker_event.task_status - task.blueapi_id = worker_event.task_status.task_id + call.blueapi_id = worker_event.task_status.task_id LOGGER.info( - f"Task {task.id} is in progress, blueapi ID: " + task.blueapi_id + f"Call {call} is in progress, blueapi ID: {call.blueapi_id}" ) - task.put_in_progress() + call.put_in_progress() case ProgressEvent(): pass @@ -105,7 +105,7 @@ def _on_blueapi_event(event: AnyEvent, task: Task): async def _handle_run_task_error( self, - task: Task, + call: BlueapiCall, error: InvalidParametersError | UnknownPlanError | ServiceUnavailableError @@ -114,12 +114,11 @@ async def _handle_run_task_error( ): match error: case InvalidParametersError(): - await self._queue.fail_task( - task, - errors=[str(error) for error in error.errors], + await self._queue.fail_call( + call, errors=[str(error) for error in error.errors] ) case UnknownPlanError(): - await self._queue.fail_task(task, ["Unknown plan", str(error)]) + await self._queue.fail_call(call, ["Unknown plan", str(error)]) case BlueskyRemoteControlError() | ServiceUnavailableError(): # We get this error if the blueapi worker is busy or unavailable - await self._queue.return_task_to_queue(task) + await self._queue.return_call_to_queue(call) diff --git a/tests/unit_tests/test_task.py b/tests/unit_tests/test_task.py index 43034ed..a55db12 100644 --- a/tests/unit_tests/test_task.py +++ b/tests/unit_tests/test_task.py @@ -1,32 +1,32 @@ import pytest from blueapi.service.model import TaskRequest -from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, Status -from daq_queuing_service.task import ExperimentDefinition, Task, TaskStatus +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus +from daq_queuing_service.task import ExperimentDefinition, Status, Task @pytest.mark.parametrize( "blueapi_call_statuses, expected_task_status", [ - ([Status.WAITING], TaskStatus.QUEUED), - ([Status.CLAIMED], TaskStatus.IN_PROGRESS), - ([Status.IN_PROGRESS], TaskStatus.IN_PROGRESS), - ([Status.SUCCESS], TaskStatus.COMPLETE), - ([Status.ERROR], TaskStatus.COMPLETE), - ([Status.WAITING, Status.WAITING], TaskStatus.QUEUED), - ([Status.WAITING, Status.CLAIMED], TaskStatus.IN_PROGRESS), - ([Status.WAITING, Status.IN_PROGRESS], TaskStatus.IN_PROGRESS), - ([Status.WAITING, Status.SUCCESS], TaskStatus.IN_PROGRESS), - ([Status.WAITING, Status.ERROR], TaskStatus.IN_PROGRESS), - ([Status.CLAIMED, Status.SUCCESS], TaskStatus.IN_PROGRESS), - ([Status.CLAIMED, Status.ERROR], TaskStatus.IN_PROGRESS), - ([Status.IN_PROGRESS, Status.SUCCESS], TaskStatus.IN_PROGRESS), - ([Status.IN_PROGRESS, Status.ERROR], TaskStatus.IN_PROGRESS), - ([Status.SUCCESS, Status.ERROR], TaskStatus.COMPLETE), + ([CallStatus.WAITING], Status.QUEUED), + ([CallStatus.CLAIMED], Status.IN_PROGRESS), + ([CallStatus.IN_PROGRESS], Status.IN_PROGRESS), + ([CallStatus.SUCCESS], Status.COMPLETE), + ([CallStatus.ERROR], Status.COMPLETE), + ([CallStatus.WAITING, CallStatus.WAITING], Status.QUEUED), + ([CallStatus.WAITING, CallStatus.CLAIMED], Status.IN_PROGRESS), + ([CallStatus.WAITING, CallStatus.IN_PROGRESS], Status.IN_PROGRESS), + ([CallStatus.WAITING, CallStatus.SUCCESS], Status.IN_PROGRESS), + ([CallStatus.WAITING, CallStatus.ERROR], Status.IN_PROGRESS), + ([CallStatus.CLAIMED, CallStatus.SUCCESS], Status.IN_PROGRESS), + ([CallStatus.CLAIMED, CallStatus.ERROR], Status.IN_PROGRESS), + ([CallStatus.IN_PROGRESS, CallStatus.SUCCESS], Status.IN_PROGRESS), + ([CallStatus.IN_PROGRESS, CallStatus.ERROR], Status.IN_PROGRESS), + ([CallStatus.SUCCESS, CallStatus.ERROR], Status.COMPLETE), ], ) def test_task_status_derived_correctly_from_call_statuses( - blueapi_call_statuses: list[Status], expected_task_status: TaskStatus + blueapi_call_statuses: list[CallStatus], expected_task_status: Status ): task = Task( blueapi_calls=[ diff --git a/tests/unit_tests/test_worker.py b/tests/unit_tests/test_worker.py index 6253deb..b8ba3e9 100644 --- a/tests/unit_tests/test_worker.py +++ b/tests/unit_tests/test_worker.py @@ -20,6 +20,7 @@ BlueapiClientAdapter, BlueapiResult, ) +from daq_queuing_service.blueapi_interaction.blueapi_call import CallStatus from daq_queuing_service.task import ExperimentDefinition, Status from daq_queuing_service.task_queue.queue import TaskError, TaskQueue, TaskResult from daq_queuing_service.worker.worker import QueueWorker @@ -187,12 +188,12 @@ async def test_worker_run_loop_cycle( with pytest.raises(only_loop_once): await worker.run_loop() - assert first_task.status == Status.SUCCESS + assert first_task.status == Status.COMPLETE worker._client.get_state.assert_called_once() # type: ignore worker._client.run_task.assert_called_once() # type: ignore -async def test_when_parameter_error_then_task_failed_and_error_added_to_task( +async def test_when_parameter_error_then_call_failed_and_error_added_to_call( worker_with_parameter_error: QueueWorker, only_loop_once: type[Exception] ): @@ -202,13 +203,14 @@ async def test_when_parameter_error_then_task_failed_and_error_added_to_task( with pytest.raises(only_loop_once): await worker_with_parameter_error.run_loop() - assert first_task.status == Status.ERROR - assert first_task.errors == ["Unexpected field 'bad_param'"] + first_call = first_task.blueapi_calls[0] + assert first_call.status == CallStatus.ERROR + assert first_call.errors == ["Unexpected field 'bad_param'"] worker_with_parameter_error._client.get_state.assert_called_once() # type: ignore worker_with_parameter_error._client.run_task.assert_called_once() # type: ignore -async def test_when_plan_name_error_then_task_failed_and_error_added_to_task( +async def test_when_plan_name_error_then_call_failed_and_error_added_to_task( worker_with_unknown_plan_error: QueueWorker, only_loop_once: type[Exception] ): @@ -218,13 +220,14 @@ async def test_when_plan_name_error_then_task_failed_and_error_added_to_task( with pytest.raises(only_loop_once): await worker_with_unknown_plan_error.run_loop() - assert first_task.status == Status.ERROR - assert first_task.errors == ["Unknown plan", ""] + first_call = first_task.blueapi_calls[0] + assert first_call.status == CallStatus.ERROR + assert first_call.errors == ["Unknown plan", ""] worker_with_unknown_plan_error._client.get_state.assert_called_once() # type: ignore worker_with_unknown_plan_error._client.run_task.assert_called_once() # type: ignore -async def test_when_blueapi_error_then_task_put_back_into_queue( +async def test_when_blueapi_error_then_call_put_back_into_queue( worker_with_blueapi_error: QueueWorker, only_loop_once: type[Exception] ): @@ -234,7 +237,9 @@ async def test_when_blueapi_error_then_task_put_back_into_queue( with pytest.raises(only_loop_once): await worker_with_blueapi_error.run_loop() - assert first_task.status == Status.WAITING + first_call = first_task.blueapi_calls[0] + assert first_call.status == CallStatus.WAITING + assert first_task.status == Status.QUEUED worker_with_blueapi_error._client.get_state.assert_called_once() # type: ignore worker_with_blueapi_error._client.run_task.assert_called_once() # type: ignore assert not await worker_with_blueapi_error._queue.get_history() @@ -250,8 +255,9 @@ async def test_when_plan_error_then_task_failed_and_errors_added( with pytest.raises(only_loop_once): await worker_with_plan_error.run_loop() - assert first_task.status == Status.ERROR - assert first_task.errors == [ + first_call = first_task.blueapi_calls[0] + assert first_call.status == CallStatus.ERROR + assert first_call.errors == [ TaskError(outcome="error", type="ValueError", message="Error during plan") ] worker_with_plan_error._client.get_state.assert_called_once() # type: ignore @@ -285,7 +291,7 @@ async def test__wait_for_next_task_waits_for_queue_ready_to_give_task( worker: QueueWorker, ): queue = worker._queue - queue._tasks[queue._queue[0]].status = Status.IN_PROGRESS + queue._tasks[queue._queue[0]].blueapi_calls[0].status = CallStatus.IN_PROGRESS with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(worker._wait_for_next_task(), timeout=0.05) From 677bd8b8bd01096921c3ecfcea1c4eeb45d5257e Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Mon, 11 May 2026 17:02:57 +0100 Subject: [PATCH 07/14] Add tests --- tests/unit_tests/test_blueapi_call.py | 113 ++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 tests/unit_tests/test_blueapi_call.py diff --git a/tests/unit_tests/test_blueapi_call.py b/tests/unit_tests/test_blueapi_call.py new file mode 100644 index 0000000..f7116f5 --- /dev/null +++ b/tests/unit_tests/test_blueapi_call.py @@ -0,0 +1,113 @@ +import pytest +from blueapi.service.model import TaskRequest +from blueapi.worker.event import TaskResult + +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, Status + + +@pytest.mark.parametrize( + "old_status, new_status", + [ + [Status.WAITING, Status.WAITING], + [Status.WAITING, Status.IN_PROGRESS], + [Status.WAITING, Status.SUCCESS], + [Status.WAITING, Status.ERROR], + [Status.CLAIMED, Status.CLAIMED], + [Status.CLAIMED, Status.SUCCESS], + [Status.IN_PROGRESS, Status.WAITING], + [Status.IN_PROGRESS, Status.CLAIMED], + [Status.IN_PROGRESS, Status.IN_PROGRESS], + [Status.SUCCESS, Status.WAITING], + [Status.SUCCESS, Status.CLAIMED], + [Status.SUCCESS, Status.IN_PROGRESS], + [Status.SUCCESS, Status.SUCCESS], + [Status.SUCCESS, Status.ERROR], + [Status.ERROR, Status.WAITING], + [Status.ERROR, Status.CLAIMED], + [Status.ERROR, Status.IN_PROGRESS], + [Status.ERROR, Status.SUCCESS], + [Status.ERROR, Status.ERROR], + ], +) +def test__update_status_raises_error_when_transitioned_to_wrong_status( + old_status: Status, new_status: Status +): + call = BlueapiCall( + status=old_status, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + with pytest.raises(ValueError): + call._update_status(new_status) + + +@pytest.mark.parametrize( + "old_status, new_status", + [ + [Status.WAITING, Status.CLAIMED], + [Status.CLAIMED, Status.WAITING], + [Status.CLAIMED, Status.IN_PROGRESS], + [Status.CLAIMED, Status.ERROR], + [Status.IN_PROGRESS, Status.SUCCESS], + [Status.IN_PROGRESS, Status.ERROR], + ], +) +def test__update_status_changes_status_when_correct_new_status_given( + old_status: Status, new_status: Status +): + call = BlueapiCall( + status=old_status, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call._update_status(new_status) + assert call.status == new_status + + +def test_wait_updates_status_to_waiting(): + call = BlueapiCall( + status=Status.CLAIMED, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.wait() + assert call.status == Status.WAITING + + +def test_claim_updates_status_to_claimed(): + call = BlueapiCall( + status=Status.WAITING, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.claim() + assert call.status == Status.CLAIMED + + +def test_put_in_progress_updates_status_to_in_progress_and_adds_fields(): + call = BlueapiCall( + status=Status.CLAIMED, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.blueapi_id = "blueapi_id" + call.put_in_progress() + assert call.status == Status.IN_PROGRESS + assert call.time_started is not None + assert call.blueapi_id == "blueapi_id" + + +def test_succeed_updates_status_to_success_and_adds_time_completed(): + call = BlueapiCall( + status=Status.IN_PROGRESS, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.succeed(TaskResult(result=None, type="NoneType")) + assert call.status == Status.SUCCESS + assert call.time_completed is not None + + +def test_fail_updates_status_to_error_and_adds_time_completed_and_errors(): + call = BlueapiCall( + status=Status.IN_PROGRESS, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.fail(["errors", "more_errors"]) + assert call.status == Status.ERROR + assert call.time_completed is not None + assert call.errors == ["errors", "more_errors"] From 9cf6e167101c5eaefa0b148e41e000b1f7ae9301 Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Mon, 11 May 2026 17:37:42 +0100 Subject: [PATCH 08/14] Fix tests --- src/daq_queuing_service/api/errors.py | 2 +- src/daq_queuing_service/task.py | 3 +- tests/unit_tests/test_api.py | 297 ++++++++++++++++++-------- tests/unit_tests/test_blueapi_call.py | 76 +++---- tests/unit_tests/test_queue.py | 66 +++--- 5 files changed, 280 insertions(+), 164 deletions(-) diff --git a/src/daq_queuing_service/api/errors.py b/src/daq_queuing_service/api/errors.py index 40a107f..359a51d 100644 --- a/src/daq_queuing_service/api/errors.py +++ b/src/daq_queuing_service/api/errors.py @@ -24,7 +24,7 @@ async def service_unavailable_error( request: Request, exception: ServiceUnavailableError ): return JSONResponse( - status_code=409, + status_code=404, content={"error": "blueapi_unavailable", "message": str(exception)}, ) diff --git a/src/daq_queuing_service/task.py b/src/daq_queuing_service/task.py index 331b99c..ea78763 100644 --- a/src/daq_queuing_service/task.py +++ b/src/daq_queuing_service/task.py @@ -3,7 +3,7 @@ from uuid import uuid4 from blueapi.service.model import StrEnum -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, computed_field from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus @@ -37,6 +37,7 @@ class Task(BaseModel): def cancel(self): self._cancelled = True + @computed_field @property def status(self) -> Status: if self._cancelled: diff --git a/tests/unit_tests/test_api.py b/tests/unit_tests/test_api.py index 63493b5..b785aff 100644 --- a/tests/unit_tests/test_api.py +++ b/tests/unit_tests/test_api.py @@ -10,6 +10,7 @@ UnknownPlanError, ) from blueapi.service.model import ( + TaskRequest, TaskResponse, ) from fastapi import FastAPI @@ -23,6 +24,7 @@ create_api_router, ) from daq_queuing_service.api.errors import register_exception_handlers +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus from daq_queuing_service.task import ( ExperimentDefinition, Status, @@ -101,11 +103,22 @@ def test_get_queued_tasks_returns_queued_task(test_client: TestClient): }, "id": "2", "status": "In progress", - "time_started": "2026-04-17T15:02:00.000000", - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": "blueapi_id_2", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "2", + "status": "In progress", + "time_started": "2026-04-17T15:02:00.000000", + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 0, }, { @@ -116,12 +129,23 @@ def test_get_queued_tasks_returns_queued_task(test_client: TestClient): "instrument_session": "", }, "id": "3", - "status": "Waiting", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "status": "Queued", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "3", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 1, }, { @@ -132,12 +156,23 @@ def test_get_queued_tasks_returns_queued_task(test_client: TestClient): "instrument_session": "", }, "id": "4", - "status": "Waiting", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "status": "Queued", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "4", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 2, }, ] @@ -146,7 +181,6 @@ def test_get_queued_tasks_returns_queued_task(test_client: TestClient): def test_get_queued_tasks_can_filter_by_task_status(test_client: TestClient): response = test_client.get("/queue", params={"status": Status.IN_PROGRESS}) assert response.status_code == 200 - print(response.json()) assert response.json() == [ { "experiment_definition": { @@ -156,13 +190,24 @@ def test_get_queued_tasks_can_filter_by_task_status(test_client: TestClient): "instrument_session": "", }, "id": "2", - "status": "In progress", - "time_started": "2026-04-17T15:02:00.000000", - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": "blueapi_id_2", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "2", + "status": "In progress", + "time_started": "2026-04-17T15:02:00.000000", + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 0, + "status": "In progress", } ] @@ -178,9 +223,42 @@ async def test_get_all_tasks_returns_all_tasks( async def test_get_all_tasks_can_filter_by_task_status(test_client: TestClient): - response = test_client.get("/tasks", params={"status": Status.SUCCESS}) + response = test_client.get("/tasks", params={"status": Status.COMPLETE}) assert response.status_code == 200 assert response.json() == [ + { + "experiment_definition": { + "plan_name": "test", + "sample_id": "0", + "params": {}, + "instrument_session": "", + }, + "id": "0", + "status": "Complete", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "0", + "status": "Error", + "time_started": "2026-04-17T15:00:00.000000", + "time_completed": "2026-04-17T15:00:59.000000", + "result": None, + "errors": [ + { + "outcome": "error", + "type": "ValueError", + "message": "Error during plan", + } + ], + "blueapi_id": None, + } + ], + "position": None, + }, { "experiment_definition": { "plan_name": "test", @@ -189,14 +267,29 @@ async def test_get_all_tasks_can_filter_by_task_status(test_client: TestClient): "instrument_session": "", }, "id": "1", - "status": "Success", - "time_started": "2026-04-17T15:01:00.000000", - "time_completed": "2026-04-17T15:01:59.000000", - "errors": [], - "result": {"outcome": "success", "result": None, "type": "NoneType"}, - "blueapi_id": "blueapi_id_1", + "status": "Complete", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "1", + "status": "Success", + "time_started": "2026-04-17T15:01:00.000000", + "time_completed": "2026-04-17T15:01:59.000000", + "result": { + "outcome": "success", + "result": None, + "type": "NoneType", + }, + "errors": [], + "blueapi_id": None, + } + ], "position": None, - } + }, ] @@ -210,36 +303,6 @@ async def test_get_completed_tasks_returns_completed_tasks( ) -async def test_get_completed_tasks_can_filter_by_status(test_client: TestClient): - - response = test_client.get("/history", params={"status": Status.ERROR}) - assert response.status_code == 200 - assert response.json() == [ - { - "experiment_definition": { - "plan_name": "test", - "sample_id": "0", - "params": {}, - "instrument_session": "", - }, - "id": "0", - "status": "Error", - "time_started": "2026-04-17T15:00:00.000000", - "time_completed": "2026-04-17T15:00:59.000000", - "errors": [ - { - "outcome": "error", - "type": "ValueError", - "message": "Error during plan", - } - ], - "result": None, - "blueapi_id": "blueapi_id_0", - "position": None, - } - ] - - async def test_add_tasks_to_queue_validates_and_adds_to_queue_and_and_returns_task_ids( test_client: TestClient, task_queue_with_history: TaskQueue ): @@ -268,13 +331,21 @@ async def test_add_tasks_to_queue_validates_and_adds_to_queue_and_and_returns_ta params={"time": 10}, instrument_session="abc", ), - id=task_ids[0], - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], - result=None, - blueapi_id=None, + id=task_ids[-1], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest( + name="add_tasks", params={"time": 10}, instrument_session="abc" + ), + parent_task_id=task_ids[-1], + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + result=None, + errors=[], + blueapi_id=None, + ) + ], position=3, ) @@ -511,11 +582,22 @@ async def test_cancel_tasks_removes_task_from_queue_and_returns_tasks( }, "id": "3", "status": "Cancelled", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "3", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], }, { "experiment_definition": { @@ -526,11 +608,22 @@ async def test_cancel_tasks_removes_task_from_queue_and_returns_tasks( }, "id": "4", "status": "Cancelled", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "4", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], }, ] @@ -603,12 +696,23 @@ def test_get_task_by_position_returns_expected_task(test_client: TestClient): "instrument_session": "", }, "id": "3", - "status": "Waiting", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "status": "Queued", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "3", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 1, } @@ -624,12 +728,23 @@ def test_get_task_by_id_returns_expected_task(test_client: TestClient): "instrument_session": "", }, "id": "3", - "status": "Waiting", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "status": "Queued", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "3", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 1, } diff --git a/tests/unit_tests/test_blueapi_call.py b/tests/unit_tests/test_blueapi_call.py index f7116f5..28ab74d 100644 --- a/tests/unit_tests/test_blueapi_call.py +++ b/tests/unit_tests/test_blueapi_call.py @@ -2,35 +2,35 @@ from blueapi.service.model import TaskRequest from blueapi.worker.event import TaskResult -from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, Status +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus @pytest.mark.parametrize( "old_status, new_status", [ - [Status.WAITING, Status.WAITING], - [Status.WAITING, Status.IN_PROGRESS], - [Status.WAITING, Status.SUCCESS], - [Status.WAITING, Status.ERROR], - [Status.CLAIMED, Status.CLAIMED], - [Status.CLAIMED, Status.SUCCESS], - [Status.IN_PROGRESS, Status.WAITING], - [Status.IN_PROGRESS, Status.CLAIMED], - [Status.IN_PROGRESS, Status.IN_PROGRESS], - [Status.SUCCESS, Status.WAITING], - [Status.SUCCESS, Status.CLAIMED], - [Status.SUCCESS, Status.IN_PROGRESS], - [Status.SUCCESS, Status.SUCCESS], - [Status.SUCCESS, Status.ERROR], - [Status.ERROR, Status.WAITING], - [Status.ERROR, Status.CLAIMED], - [Status.ERROR, Status.IN_PROGRESS], - [Status.ERROR, Status.SUCCESS], - [Status.ERROR, Status.ERROR], + [CallStatus.WAITING, CallStatus.WAITING], + [CallStatus.WAITING, CallStatus.IN_PROGRESS], + [CallStatus.WAITING, CallStatus.SUCCESS], + [CallStatus.WAITING, CallStatus.ERROR], + [CallStatus.CLAIMED, CallStatus.CLAIMED], + [CallStatus.CLAIMED, CallStatus.SUCCESS], + [CallStatus.IN_PROGRESS, CallStatus.WAITING], + [CallStatus.IN_PROGRESS, CallStatus.CLAIMED], + [CallStatus.IN_PROGRESS, CallStatus.IN_PROGRESS], + [CallStatus.SUCCESS, CallStatus.WAITING], + [CallStatus.SUCCESS, CallStatus.CLAIMED], + [CallStatus.SUCCESS, CallStatus.IN_PROGRESS], + [CallStatus.SUCCESS, CallStatus.SUCCESS], + [CallStatus.SUCCESS, CallStatus.ERROR], + [CallStatus.ERROR, CallStatus.WAITING], + [CallStatus.ERROR, CallStatus.CLAIMED], + [CallStatus.ERROR, CallStatus.IN_PROGRESS], + [CallStatus.ERROR, CallStatus.SUCCESS], + [CallStatus.ERROR, CallStatus.ERROR], ], ) def test__update_status_raises_error_when_transitioned_to_wrong_status( - old_status: Status, new_status: Status + old_status: CallStatus, new_status: CallStatus ): call = BlueapiCall( status=old_status, @@ -43,16 +43,16 @@ def test__update_status_raises_error_when_transitioned_to_wrong_status( @pytest.mark.parametrize( "old_status, new_status", [ - [Status.WAITING, Status.CLAIMED], - [Status.CLAIMED, Status.WAITING], - [Status.CLAIMED, Status.IN_PROGRESS], - [Status.CLAIMED, Status.ERROR], - [Status.IN_PROGRESS, Status.SUCCESS], - [Status.IN_PROGRESS, Status.ERROR], + [CallStatus.WAITING, CallStatus.CLAIMED], + [CallStatus.CLAIMED, CallStatus.WAITING], + [CallStatus.CLAIMED, CallStatus.IN_PROGRESS], + [CallStatus.CLAIMED, CallStatus.ERROR], + [CallStatus.IN_PROGRESS, CallStatus.SUCCESS], + [CallStatus.IN_PROGRESS, CallStatus.ERROR], ], ) def test__update_status_changes_status_when_correct_new_status_given( - old_status: Status, new_status: Status + old_status: CallStatus, new_status: CallStatus ): call = BlueapiCall( status=old_status, @@ -64,50 +64,50 @@ def test__update_status_changes_status_when_correct_new_status_given( def test_wait_updates_status_to_waiting(): call = BlueapiCall( - status=Status.CLAIMED, + status=CallStatus.CLAIMED, task_request=TaskRequest(name="", params={}, instrument_session=""), ) call.wait() - assert call.status == Status.WAITING + assert call.status == CallStatus.WAITING def test_claim_updates_status_to_claimed(): call = BlueapiCall( - status=Status.WAITING, + status=CallStatus.WAITING, task_request=TaskRequest(name="", params={}, instrument_session=""), ) call.claim() - assert call.status == Status.CLAIMED + assert call.status == CallStatus.CLAIMED def test_put_in_progress_updates_status_to_in_progress_and_adds_fields(): call = BlueapiCall( - status=Status.CLAIMED, + status=CallStatus.CLAIMED, task_request=TaskRequest(name="", params={}, instrument_session=""), ) call.blueapi_id = "blueapi_id" call.put_in_progress() - assert call.status == Status.IN_PROGRESS + assert call.status == CallStatus.IN_PROGRESS assert call.time_started is not None assert call.blueapi_id == "blueapi_id" def test_succeed_updates_status_to_success_and_adds_time_completed(): call = BlueapiCall( - status=Status.IN_PROGRESS, + status=CallStatus.IN_PROGRESS, task_request=TaskRequest(name="", params={}, instrument_session=""), ) call.succeed(TaskResult(result=None, type="NoneType")) - assert call.status == Status.SUCCESS + assert call.status == CallStatus.SUCCESS assert call.time_completed is not None def test_fail_updates_status_to_error_and_adds_time_completed_and_errors(): call = BlueapiCall( - status=Status.IN_PROGRESS, + status=CallStatus.IN_PROGRESS, task_request=TaskRequest(name="", params={}, instrument_session=""), ) call.fail(["errors", "more_errors"]) - assert call.status == Status.ERROR + assert call.status == CallStatus.ERROR assert call.time_completed is not None assert call.errors == ["errors", "more_errors"] diff --git a/tests/unit_tests/test_queue.py b/tests/unit_tests/test_queue.py index 203e65a..44dc671 100644 --- a/tests/unit_tests/test_queue.py +++ b/tests/unit_tests/test_queue.py @@ -5,11 +5,11 @@ from blueapi.service.model import TaskRequest from blueapi.worker.event import TaskError, TaskResult -from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus from daq_queuing_service.plugins.construct_task_request import ( construct_blueapi_call_list, ) -from daq_queuing_service.task import ExperimentDefinition, Status, Task, TaskStatus +from daq_queuing_service.task import ExperimentDefinition, Status, Task from daq_queuing_service.task_queue.queue import ( TaskQueue, TaskWithPosition, @@ -77,7 +77,7 @@ async def test_add_task_to_position_0_adds_to_position_1_if_first_task_in_progre ): new_tasks = [make_new_task("new"), make_new_task("new_2")] first_task = await task_queue_in_progress.get_task_by_position(0) - assert first_task and first_task.status == TaskStatus.IN_PROGRESS + assert first_task and first_task.status == Status.IN_PROGRESS await task_queue_in_progress.add_tasks(new_tasks, 0) @@ -98,7 +98,7 @@ async def test_add_task_to_position_0_adds_to_position_0_if_first_task_waiting( ): new_tasks = new_tasks = [make_new_task("new"), make_new_task("new_2")] first_task = await task_queue.get_task_by_position(0) - assert first_task and first_task.status == TaskStatus.QUEUED + assert first_task and first_task.status == Status.QUEUED await task_queue.add_tasks(new_tasks, 0) @@ -175,7 +175,7 @@ async def test_move_task_to_position_0_moves_to_position_0_if_first_task_waiting task_queue: TaskQueue, ): task = await task_queue.get_task_by_position(0) - assert task and task.status == TaskStatus.QUEUED + assert task and task.status == Status.QUEUED await task_queue.move_task("4", 0) assert task_queue._queue == ["4", "0", "1", "2", "3"] @@ -185,7 +185,7 @@ async def test_move_task_does_not_move_task_that_is_in_progress_and_raises_error task_queue_in_progress: TaskQueue, ): task = await task_queue_in_progress.get_task_by_position(0) - assert task and task.status == TaskStatus.IN_PROGRESS + assert task and task.status == Status.IN_PROGRESS with pytest.raises(TaskInProgressError): await task_queue_in_progress.move_task("0", 3) @@ -198,7 +198,7 @@ async def test_move_task_raises_error_if_wrong_task_id_given( task_queue_in_progress: TaskQueue, ): task = await task_queue_in_progress.get_task_by_position(0) - assert task and task.status == TaskStatus.IN_PROGRESS + assert task and task.status == Status.IN_PROGRESS with pytest.raises(TaskNotFoundError): await task_queue_in_progress.move_task("10", 3) @@ -215,7 +215,7 @@ async def test_remove_tasks_does_not_remove_task_that_is_in_progress_and_raises_ task_queue_in_progress: TaskQueue, ): task = await task_queue_in_progress.get_task_by_position(0) - assert task and task.status == TaskStatus.IN_PROGRESS + assert task and task.status == Status.IN_PROGRESS with pytest.raises(TaskInProgressError): await task_queue_in_progress.cancel_tasks(["0", "2"]) @@ -257,7 +257,7 @@ async def test_get_queue_only_returns_tasks_in_queue( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="2", - status=Status.IN_PROGRESS, + status=CallStatus.IN_PROGRESS, time_started="2026-04-17T15:02:00.000000", time_completed=None, errors=[], @@ -274,7 +274,7 @@ async def test_get_queue_only_returns_tasks_in_queue( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="3", - status=Status.WAITING, + status=CallStatus.WAITING, time_started=None, time_completed=None, errors=[], @@ -291,7 +291,7 @@ async def test_get_queue_only_returns_tasks_in_queue( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="4", - status=Status.WAITING, + status=CallStatus.WAITING, time_started=None, time_completed=None, errors=[], @@ -317,7 +317,7 @@ async def test_get_history_only_returns_tasks_in_history( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="0", - status=Status.ERROR, + status=CallStatus.ERROR, time_started="2026-04-17T15:00:00.000000", time_completed="2026-04-17T15:00:59.000000", errors=[ @@ -341,7 +341,7 @@ async def test_get_history_only_returns_tasks_in_history( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="1", - status=Status.SUCCESS, + status=CallStatus.SUCCESS, time_started="2026-04-17T15:01:00.000000", time_completed="2026-04-17T15:01:59.000000", errors=[], @@ -369,7 +369,7 @@ async def test_get_tasks_returns_tasks_in_queue_and_history( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="0", - status=Status.ERROR, + status=CallStatus.ERROR, time_started="2026-04-17T15:00:00.000000", time_completed="2026-04-17T15:00:59.000000", errors=[ @@ -393,7 +393,7 @@ async def test_get_tasks_returns_tasks_in_queue_and_history( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="1", - status=Status.SUCCESS, + status=CallStatus.SUCCESS, time_started="2026-04-17T15:01:00.000000", time_completed="2026-04-17T15:01:59.000000", errors=[], @@ -411,7 +411,7 @@ async def test_get_tasks_returns_tasks_in_queue_and_history( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="2", - status=Status.IN_PROGRESS, + status=CallStatus.IN_PROGRESS, time_started="2026-04-17T15:02:00.000000", time_completed=None, errors=[], @@ -428,7 +428,7 @@ async def test_get_tasks_returns_tasks_in_queue_and_history( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="3", - status=Status.WAITING, + status=CallStatus.WAITING, time_started=None, time_completed=None, errors=[], @@ -445,7 +445,7 @@ async def test_get_tasks_returns_tasks_in_queue_and_history( BlueapiCall( task_request=TaskRequest(name="test", instrument_session=""), parent_task_id="4", - status=Status.WAITING, + status=CallStatus.WAITING, time_started=None, time_completed=None, errors=[], @@ -534,19 +534,19 @@ async def test_claim_next_call_once_available_claims_task_and_returns( ): next_task = task_queue._tasks[task_queue._queue[0]] next_call = next_task.blueapi_calls[0] - assert next_task and next_task.status == TaskStatus.QUEUED - assert next_call.status == Status.WAITING + assert next_task and next_task.status == Status.QUEUED + assert next_call.status == CallStatus.WAITING claimed_call = await task_queue.get_next_call_once_available() assert claimed_call.parent_task_id == next_call.parent_task_id - assert claimed_call.status == Status.CLAIMED + assert claimed_call.status == CallStatus.CLAIMED async def test_claim_next_call_once_available_waits_if_next_task_is_already_claimed( task_queue: TaskQueue, ): claimed_call = await task_queue.get_next_call_once_available() - assert claimed_call and claimed_call.status == Status.CLAIMED + assert claimed_call and claimed_call.status == CallStatus.CLAIMED with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(task_queue.get_next_call_once_available(), timeout=0.05) @@ -556,7 +556,7 @@ async def test_claim_next_call_once_available_waits_if_next_task_is_already_in_p ): claimed_call = await task_queue.get_next_call_once_available() claimed_call.put_in_progress() - assert claimed_call and claimed_call.status == Status.IN_PROGRESS + assert claimed_call and claimed_call.status == CallStatus.IN_PROGRESS with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(task_queue.get_next_call_once_available(), timeout=0.05) @@ -565,7 +565,7 @@ async def test_wait_until_call_available_waits_if_next_task_is_claimed( task_queue: TaskQueue, ): claimed_call = await task_queue.get_next_call_once_available() - assert claimed_call and claimed_call.status == Status.CLAIMED + assert claimed_call and claimed_call.status == CallStatus.CLAIMED with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) @@ -575,7 +575,7 @@ async def test_wait_until_call_available_waits_if_next_task_is_in_progress( ): claimed_call = await task_queue.get_next_call_once_available() claimed_call.put_in_progress() - assert claimed_call and claimed_call.status == Status.IN_PROGRESS + assert claimed_call and claimed_call.status == CallStatus.IN_PROGRESS with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) @@ -605,13 +605,13 @@ async def test_complete_call_puts_call_in_history_and_updates_status_to_complete ): call = await task_queue.get_next_call_once_available() call.put_in_progress() - assert call.status == Status.IN_PROGRESS + assert call.status == CallStatus.IN_PROGRESS await task_queue.complete_call(call, TaskResult(result=None, type="NoneType")) assert call.parent_task_id not in task_queue._queue assert call.parent_task_id in task_queue._history assert call not in task_queue._call_queue assert call in task_queue._call_history - assert call.status == Status.SUCCESS + assert call.status == CallStatus.SUCCESS async def test_complete_call_must_receive_exact_same_object_as_was_claimed( @@ -634,13 +634,13 @@ async def test_fail_call_puts_task_in_history_and_updates_status_to_complete( task_queue: TaskQueue, ): call = await task_queue.get_next_call_once_available() - assert call.status == Status.CLAIMED + assert call.status == CallStatus.CLAIMED await task_queue.fail_call(call) assert call.parent_task_id not in task_queue._queue assert call.parent_task_id in task_queue._history assert call not in task_queue._call_queue assert call in task_queue._call_history - assert call.status == Status.ERROR + assert call.status == CallStatus.ERROR async def test_fail_call_must_receive_exact_same_object_as_was_claimed( @@ -661,7 +661,7 @@ async def test_fail_call_with_errors_adds_errors_to_call( call = await task_queue.get_next_call_once_available() error = "This task failed" await task_queue.fail_call(call, [str(error)]) - assert call.status == Status.ERROR + assert call.status == CallStatus.ERROR assert call.errors == ["This task failed"] @@ -669,15 +669,15 @@ async def test_return_call_to_queue_changes_task_status_to_waiting( task_queue: TaskQueue, ): call = await task_queue.get_next_call_once_available() - assert call.status == Status.CLAIMED + assert call.status == CallStatus.CLAIMED await task_queue.return_call_to_queue(call) - assert call.status == Status.WAITING + assert call.status == CallStatus.WAITING async def test_return_task_to_queue_raises_error_if_task_has_not_been_claimed( task_queue: TaskQueue, ): call = await task_queue.get_next_call_once_available() - call.status = Status.SUCCESS + call.status = CallStatus.SUCCESS with pytest.raises(TaskNotClaimedError): await task_queue.return_call_to_queue(call) From 75585811f9deb6d34d248076e7201ba3393613d9 Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Mon, 11 May 2026 17:42:43 +0100 Subject: [PATCH 09/14] Add some comments --- src/daq_queuing_service/task_queue/queue.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index 49da306..8cc57ed 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -63,18 +63,22 @@ def __init__( self._queue_history: list[BlueapiCall] = [] self._state: QueueState = QueueState(paused=True) self._convert = convert - self._modifying = Modifying(on_exit=self._update_call_queue) + self._modifying = Modifying(on_exit=self._sync) - def _update_call_queue(self): + def _sync(self): for task_id in self._queue: task = self._tasks[task_id] if task.status == Status.COMPLETE: + # Move task from queue to history if all blueapi calls complete self._queue.remove(task_id) self._history.append(task_id) elif task.status != Status.IN_PROGRESS: + # If task is not in progress calls will be re-calculated task.blueapi_calls = [] self._call_queue = [ + # Persist calls who's parent tasks are in progress + # More work needed to allow for interleaved calls from different tasks call for call in self._call_queue if call.parent_task_id @@ -89,6 +93,7 @@ def _update_call_queue(self): ) for call in new_queue: + # Add children to parent tasks if call.parent_task_id: self._tasks[call.parent_task_id].blueapi_calls.append(call) @@ -107,7 +112,7 @@ async def get_next_call_once_available(self) -> BlueapiCall: async with self._modifying: while not self._task_available(): await self._modifying.wait() - self._update_call_queue() + self._sync() call = self._call_queue[0] call.claim() LOGGER.info(f"Plan {call} has been claimed") From 33a54a3d678451c178b31958710e102db372ceae Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Mon, 11 May 2026 17:44:27 +0100 Subject: [PATCH 10/14] Small change --- src/daq_queuing_service/worker/worker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/daq_queuing_service/worker/worker.py b/src/daq_queuing_service/worker/worker.py index bb486e4..f2849c6 100644 --- a/src/daq_queuing_service/worker/worker.py +++ b/src/daq_queuing_service/worker/worker.py @@ -59,11 +59,10 @@ async def _wait_for_next_task(self) -> BlueapiCall: return await self._queue.get_next_call_once_available() async def _process_call(self, call: BlueapiCall): - task_request = call.task_request LOGGER.info(f"Sending call {call} to BlueAPI") result = await self._client.run_task( - task_request, on_event=partial(self._on_blueapi_event, call=call) + call.task_request, on_event=partial(self._on_blueapi_event, call=call) ) if result.error: From e1086b68f1b75b8b9bf6b17a0daaaecaa44ac1cb Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Tue, 12 May 2026 11:02:14 +0100 Subject: [PATCH 11/14] Add more comments --- src/daq_queuing_service/task_queue/queue.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index 8cc57ed..2efcba4 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -66,6 +66,10 @@ def __init__( self._modifying = Modifying(on_exit=self._sync) def _sync(self): + """Syncs the task queue with the call queue, applying a conversion from queue of + tasks to a queue of blueapi calls. This is called every time the queue is + modified, and also right before a call is popped off the front of the queue. + """ for task_id in self._queue: task = self._tasks[task_id] if task.status == Status.COMPLETE: @@ -78,7 +82,9 @@ def _sync(self): self._call_queue = [ # Persist calls who's parent tasks are in progress - # More work needed to allow for interleaved calls from different tasks + # Once a task is in progress it is not provided to the converter + # More work needed to allow for interleaved calls from different tasks, + # and for in progress tasks to inform conversion call for call in self._call_queue if call.parent_task_id @@ -112,7 +118,7 @@ async def get_next_call_once_available(self) -> BlueapiCall: async with self._modifying: while not self._task_available(): await self._modifying.wait() - self._sync() + self._sync() # Do conversion here to ensure conditions are up to date call = self._call_queue[0] call.claim() LOGGER.info(f"Plan {call} has been claimed") @@ -134,7 +140,7 @@ async def return_call_to_queue(self, call: BlueapiCall): task (Task): The task to return Raises: - TaskNotClaimedError: Raised if the task's status is not have a 'Claimed' + TaskNotClaimedError: Raised if the task's status is not 'Claimed' """ self._check_call_valid_to_be_returned(call) async with self._modifying: @@ -158,9 +164,6 @@ async def complete_call(self, call: BlueapiCall, result: TaskResult): """ async with self._modifying: self._check_call_valid_to_be_returned(call) - assert call == self._call_queue[0], ( - f"This call is not at the front of the queue: {call}" - ) call.succeed(result) self._call_history.append(call) LOGGER.info(f"Plan {call} has been completed successfully: {result}") @@ -168,7 +171,7 @@ async def complete_call(self, call: BlueapiCall, result: TaskResult): async def fail_call( self, call: BlueapiCall, errors: list[str | TaskError] | None = None ): - """Sets a task to failed, removes it from the queue and adds it to history + """Sets a task to failed, removes it from the call queue and adds it to history. Args: task (Task): The task to fail From e1f638d342081954b8c1a2a5e83cf89b3ebe2757 Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Tue, 12 May 2026 12:58:20 +0100 Subject: [PATCH 12/14] Add endpoints to get call queue and history --- src/daq_queuing_service/api/api.py | 8 ++++++++ src/daq_queuing_service/task_queue/queue.py | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/src/daq_queuing_service/api/api.py b/src/daq_queuing_service/api/api.py index 39fb4ff..18daad7 100644 --- a/src/daq_queuing_service/api/api.py +++ b/src/daq_queuing_service/api/api.py @@ -142,4 +142,12 @@ async def get_completed_tasks( async def clear_history(): return await queue.clear_history() + @router.get("/call_queue") + async def get_call_queue(): + return await queue.get_call_queue() + + @router.get("/call_history") + async def get_call_history(): + return await queue.get_call_history() + return router diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index 2efcba4..32c2cc8 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -451,3 +451,11 @@ def _get_history(self) -> list[TaskWithPosition]: TaskWithPosition.from_task(self._tasks[task_id]) for task_id in self._history ] + + async def get_call_queue(self) -> list[BlueapiCall]: + async with self._modifying: + return list(self._call_queue) + + async def get_call_history(self) -> list[BlueapiCall]: + async with self._modifying: + return list(self._call_history) From 60cf0352d2ef389456bf7d0f1a5a0bbf7cd45af7 Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Tue, 12 May 2026 13:16:44 +0100 Subject: [PATCH 13/14] Minor tweaks --- src/daq_queuing_service/task_queue/queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index 32c2cc8..26de0b5 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -70,7 +70,7 @@ def _sync(self): tasks to a queue of blueapi calls. This is called every time the queue is modified, and also right before a call is popped off the front of the queue. """ - for task_id in self._queue: + for task_id in list(self._queue): task = self._tasks[task_id] if task.status == Status.COMPLETE: # Move task from queue to history if all blueapi calls complete @@ -81,7 +81,7 @@ def _sync(self): task.blueapi_calls = [] self._call_queue = [ - # Persist calls who's parent tasks are in progress + # Persist calls who's parent task is in progress # Once a task is in progress it is not provided to the converter # More work needed to allow for interleaved calls from different tasks, # and for in progress tasks to inform conversion From f4b20853a27ffc750e18d48d336703f84a8fbeae Mon Sep 17 00:00:00 2001 From: Jacob Williamson Date: Tue, 12 May 2026 18:51:21 +0100 Subject: [PATCH 14/14] Add tests --- tests/unit_tests/test_api.py | 37 ++++++++++++++++++ tests/unit_tests/test_queue.py | 70 ++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/tests/unit_tests/test_api.py b/tests/unit_tests/test_api.py index b785aff..e2934de 100644 --- a/tests/unit_tests/test_api.py +++ b/tests/unit_tests/test_api.py @@ -7,6 +7,7 @@ BlueapiRestClient, InvalidParametersError, ParameterError, + ServiceUnavailableError, UnknownPlanError, ) from blueapi.service.model import ( @@ -758,6 +759,22 @@ def test_get_task_by_id_gives_error_if_task_id_does_not_exist(test_client: TestC } +def test_get_call_queue_returns_calls_in_call_queue( + test_client: TestClient, task_queue_with_history: TaskQueue +): + response = test_client.get("/call_queue") + assert response.status_code == 200 + assert response.json() == jsonable_encoder(task_queue_with_history._call_queue) + + +def test_get_call_history_returns_calls_in_call_queue( + test_client: TestClient, task_queue_with_history: TaskQueue +): + response = test_client.get("/call_history") + assert response.status_code == 200 + assert response.json() == jsonable_encoder(task_queue_with_history._call_history) + + async def test_clear_history_deletes_history( test_client: TestClient, task_queue_with_history: TaskQueue ): @@ -787,6 +804,26 @@ def test_queue_error_caught_by_error_handler(test_client: TestClient): assert response.status_code == 409 +def test_blueapi_connection_error_caught_by_error_handler(test_client: TestClient): + with patch( + "daq_queuing_service.api.api._validate_tasks_with_blueapi", + side_effect=ServiceUnavailableError("Can't connect to blueapi"), + ): + response = test_client.post( + "/queue", + json=[ + { + "plan_name": "add_tasks", + "sample_id": "1", + "params": {"time": 10}, + "instrument_session": "abc", + } + ], + ) + + assert response.status_code == 404 + + def test__validate_tasks_with_blueapi_calls_create_task_and_then_removes_task( blueapi_client: BlueapiRestClient, tasks: list[Task] ): diff --git a/tests/unit_tests/test_queue.py b/tests/unit_tests/test_queue.py index 44dc671..7387269 100644 --- a/tests/unit_tests/test_queue.py +++ b/tests/unit_tests/test_queue.py @@ -681,3 +681,73 @@ async def test_return_task_to_queue_raises_error_if_task_has_not_been_claimed( call.status = CallStatus.SUCCESS with pytest.raises(TaskNotClaimedError): await task_queue.return_call_to_queue(call) + + +async def test_get_call_queue_returns_calls_in_call_queue( + task_queue_with_history: TaskQueue, +): + result = await task_queue_with_history.get_call_queue() + assert result == [ + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="2", + status=CallStatus.IN_PROGRESS, + time_started="2026-04-17T15:02:00.000000", + time_completed=None, + result=None, + errors=[], + blueapi_id=None, + ), + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="3", + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + result=None, + errors=[], + blueapi_id=None, + ), + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="4", + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + result=None, + errors=[], + blueapi_id=None, + ), + ] + + +async def test_get_call_history_returns_calls_in_call_history( + task_queue_with_history: TaskQueue, +): + result = await task_queue_with_history.get_call_history() + assert result == [ + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="0", + status=CallStatus.ERROR, + time_started="2026-04-17T15:00:00.000000", + time_completed="2026-04-17T15:00:59.000000", + result=None, + errors=[ + TaskError( + outcome="error", type="ValueError", message="Error during plan" + ) + ], + blueapi_id=None, + ), + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="1", + status=CallStatus.SUCCESS, + time_started="2026-04-17T15:01:00.000000", + time_completed="2026-04-17T15:01:59.000000", + result=TaskResult(outcome="success", result=None, type="NoneType"), + errors=[], + blueapi_id=None, + ), + ]