diff --git a/src/daq_queuing_service/api/api.py b/src/daq_queuing_service/api/api.py index 39fb4ff..18daad7 100644 --- a/src/daq_queuing_service/api/api.py +++ b/src/daq_queuing_service/api/api.py @@ -142,4 +142,12 @@ async def get_completed_tasks( async def clear_history(): return await queue.clear_history() + @router.get("/call_queue") + async def get_call_queue(): + return await queue.get_call_queue() + + @router.get("/call_history") + async def get_call_history(): + return await queue.get_call_history() + return router diff --git a/src/daq_queuing_service/api/errors.py b/src/daq_queuing_service/api/errors.py index 40a107f..359a51d 100644 --- a/src/daq_queuing_service/api/errors.py +++ b/src/daq_queuing_service/api/errors.py @@ -24,7 +24,7 @@ async def service_unavailable_error( request: Request, exception: ServiceUnavailableError ): return JSONResponse( - status_code=409, + status_code=404, content={"error": "blueapi_unavailable", "message": str(exception)}, ) diff --git a/src/daq_queuing_service/app/app.py b/src/daq_queuing_service/app/app.py index 49df90a..514717b 100644 --- a/src/daq_queuing_service/app/app.py +++ b/src/daq_queuing_service/app/app.py @@ -9,8 +9,9 @@ from daq_queuing_service.api.api import create_api_router from daq_queuing_service.api.errors import register_exception_handlers -from daq_queuing_service.blueapi_adapter import BlueapiClientAdapter +from daq_queuing_service.blueapi_interaction.blueapi_adapter import BlueapiClientAdapter from daq_queuing_service.plugins.construct_task_request import ( + construct_blueapi_call_list, construct_blueapi_task_request, ) from daq_queuing_service.task_queue.queue import TaskQueue @@ -47,7 +48,7 @@ def log_task_exception(task: asyncio.Task[NoReturn]): config = load_config() app = FastAPI(lifespan=lifespan) - app.state.queue = TaskQueue() + app.state.queue = TaskQueue(construct_blueapi_call_list) blueapi_rest_client = BlueapiRestClient(config=config.blueapi.api) blueapi_client = BlueapiClient.from_config(config.blueapi) diff --git a/src/daq_queuing_service/blueapi_interaction/__init__.py b/src/daq_queuing_service/blueapi_interaction/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/daq_queuing_service/blueapi_adapter.py b/src/daq_queuing_service/blueapi_interaction/blueapi_adapter.py similarity index 100% rename from src/daq_queuing_service/blueapi_adapter.py rename to src/daq_queuing_service/blueapi_interaction/blueapi_adapter.py diff --git a/src/daq_queuing_service/blueapi_interaction/blueapi_call.py b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py new file mode 100644 index 0000000..b2f163e --- /dev/null +++ b/src/daq_queuing_service/blueapi_interaction/blueapi_call.py @@ -0,0 +1,97 @@ +from datetime import datetime +from enum import StrEnum + +from blueapi.service.model import TaskRequest +from blueapi.worker.event import TaskError, TaskResult +from pydantic import BaseModel, Field + + +class CallStatus(StrEnum): + WAITING = "Waiting" # Waiting in the queue + CLAIMED = "Claimed" # Claimed by the worker + IN_PROGRESS = "In progress" # In progress inside BlueAPI + SUCCESS = "Success" # Completed successfully + ERROR = "Error" # Error while trying to run + + @property + def allowed_transitions(self): + allowed_transitions: dict[CallStatus, set[CallStatus]] = { # from: to + CallStatus.WAITING: {CallStatus.CLAIMED}, + CallStatus.CLAIMED: { + CallStatus.WAITING, + CallStatus.IN_PROGRESS, + CallStatus.ERROR, + }, + CallStatus.IN_PROGRESS: {CallStatus.SUCCESS, CallStatus.ERROR}, + CallStatus.SUCCESS: set(), + CallStatus.ERROR: set(), + } + return allowed_transitions[self] + + +class BlueapiCall(BaseModel): + task_request: TaskRequest + parent_task_id: str | None = None + status: CallStatus = CallStatus.WAITING + time_started: str | None = None + time_completed: str | None = None + result: TaskResult | None = None + errors: list[str | TaskError] = Field(default_factory=list[str | TaskError]) + blueapi_id: str | None = None + + def _update_status(self, new_status: CallStatus): + """Updates the status of the task, checking that the transition is valid + + Args: + new_status (Status): New status of the task + + Raises: + ValueError: Raised if the transition from the task's current status to the + new status is not permitted. + """ + allowed = self.status.allowed_transitions + if new_status not in allowed: + raise ValueError( + f"Can't go from current state '{self.status}' to '{new_status}'. " + + f"Allowed transitions from {self.status}: {allowed}." + ) + self.status = new_status + + def wait(self): + """Updates the task status to WAITING""" + self._update_status(CallStatus.WAITING) + + def claim(self): + """Updates the task status to CLAIMED""" + self._update_status(CallStatus.CLAIMED) + + def put_in_progress(self): + """Updates the task status to IN_PROGRESS and sets the time_started field to the + current time + """ + self._update_status(CallStatus.IN_PROGRESS) + self.time_started = datetime.now().isoformat() + + def succeed(self, result: TaskResult): + """Updates the task status to SUCCESS, sets the time_completed field to the + current time, and sets the result field with the result from blueapi + + Args: + result (TaskResult): The result of the task from blueapi + """ + self._update_status(CallStatus.SUCCESS) + self.result = result + self.time_completed = datetime.now().isoformat() + + def fail(self, errors: list[str | TaskError] | None = None): + """Updates the task status to ERROR, sets the time_completed field to the + current time, and adds any errors to the errors field. + + Args: + errors (list[str | TaskError] | None, optional): List of errors that + occurred when trying to run the task. Defaults to None. + """ + self._update_status(CallStatus.ERROR) + self.time_completed = datetime.now().isoformat() + if errors: + self.errors.extend(errors) diff --git a/src/daq_queuing_service/plugins/construct_task_request.py b/src/daq_queuing_service/plugins/construct_task_request.py index 75b7eec..8f3e70e 100644 --- a/src/daq_queuing_service/plugins/construct_task_request.py +++ b/src/daq_queuing_service/plugins/construct_task_request.py @@ -1,6 +1,7 @@ from blueapi.service.model import TaskRequest -from daq_queuing_service.task import ExperimentDefinition +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall +from daq_queuing_service.task import ExperimentDefinition, TaskWithPosition def construct_blueapi_task_request( @@ -11,3 +12,19 @@ def construct_blueapi_task_request( params=experiment_definition.params, instrument_session=experiment_definition.instrument_session, ) + + +def construct_blueapi_call_list( + queue: list[TaskWithPosition], + history: list[TaskWithPosition], + call_history: list[BlueapiCall], +) -> list[BlueapiCall]: + + call_list = [ + BlueapiCall( + parent_task_id=task.id, + task_request=construct_blueapi_task_request(task.experiment_definition), + ) + for task in queue + ] + return call_list diff --git a/src/daq_queuing_service/task.py b/src/daq_queuing_service/task.py index ec6ee56..ea78763 100644 --- a/src/daq_queuing_service/task.py +++ b/src/daq_queuing_service/task.py @@ -1,36 +1,22 @@ from collections.abc import Mapping -from datetime import datetime -from enum import StrEnum from typing import Any, Self from uuid import uuid4 -from blueapi.worker.event import TaskError, TaskResult -from pydantic import BaseModel, Field +from blueapi.service.model import StrEnum +from pydantic import BaseModel, Field, computed_field +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus -def _create_uuid_str() -> str: + +def create_uuid_str() -> str: return str(uuid4()) class Status(StrEnum): - WAITING = "Waiting" # Waiting in the queue - CLAIMED = "Claimed" # Claimed by the worker - IN_PROGRESS = "In progress" # In progress inside BlueAPI - SUCCESS = "Success" # Completed successfully - ERROR = "Error" # Error while trying to run - CANCELLED = "Cancelled" # Cancelled before being run - - @property - def allowed_transitions(self): - allowed_transitions: dict[Status, set[Status]] = { # from: to - Status.WAITING: {Status.CLAIMED, Status.CANCELLED}, - Status.CLAIMED: {Status.WAITING, Status.IN_PROGRESS, Status.ERROR}, - Status.IN_PROGRESS: {Status.SUCCESS, Status.ERROR}, - Status.SUCCESS: set(), - Status.ERROR: set(), - Status.CANCELLED: set(), - } - return allowed_transitions[self] + QUEUED = "Queued" + IN_PROGRESS = "In progress" + COMPLETE = "Complete" + CANCELLED = "Cancelled" class ExperimentDefinition(BaseModel): @@ -44,74 +30,35 @@ class ExperimentDefinition(BaseModel): class Task(BaseModel): experiment_definition: ExperimentDefinition - id: str = Field(default_factory=_create_uuid_str) - status: Status = Status.WAITING - time_started: str | None = None - time_completed: str | None = None - errors: list[str | TaskError] = Field(default_factory=list[str | TaskError]) - result: TaskResult | None = None - blueapi_id: str | None = None - - def _update_status(self, new_status: Status): - """Updates the status of the task, checking that the transition is valid - - Args: - new_status (Status): New status of the task - - Raises: - ValueError: Raised if the transition from the task's current status to the - new status is not permitted. - """ - allowed = self.status.allowed_transitions - if new_status not in allowed: - raise ValueError( - f"Can't go from current state '{self.status}' to '{new_status}'. " - + f"Allowed transitions from {self.status}: {allowed}." - ) - self.status = new_status - - def wait(self): - """Updates the task status to WAITING""" - self._update_status(Status.WAITING) - - def claim(self): - """Updates the task status to CLAIMED""" - self._update_status(Status.CLAIMED) - - def put_in_progress(self): - """Updates the task status to IN_PROGRESS and sets the time_started field to the - current time - """ - self._update_status(Status.IN_PROGRESS) - self.time_started = datetime.now().isoformat() - - def succeed(self, result: TaskResult): - """Updates the task status to SUCCESS, sets the time_completed field to the - current time, and sets the result field with the result from blueapi - - Args: - result (TaskResult): The result of the task from blueapi - """ - self._update_status(Status.SUCCESS) - self.result = result - self.time_completed = datetime.now().isoformat() - - def fail(self, errors: list[str | TaskError] | None = None): - """Updates the task status to ERROR, sets the time_completed field to the - current time, and adds any errors to the errors field. - - Args: - errors (list[str | TaskError] | None, optional): List of errors that - occurred when trying to run the task. Defaults to None. - """ - self._update_status(Status.ERROR) - self.time_completed = datetime.now().isoformat() - if errors: - self.errors.extend(errors) + id: str = Field(default_factory=create_uuid_str) + blueapi_calls: list[BlueapiCall] = [] + _cancelled: bool = False def cancel(self): - """Sets the task status to CANCELLED""" - self._update_status(Status.CANCELLED) + self._cancelled = True + + @computed_field + @property + def status(self) -> Status: + if self._cancelled: + return Status.CANCELLED + if self.blueapi_calls and all( + call.status in [CallStatus.SUCCESS, CallStatus.ERROR] + for call in self.blueapi_calls + ): + return Status.COMPLETE + if any( + call.status + in [ + Status.IN_PROGRESS, + CallStatus.CLAIMED, + CallStatus.SUCCESS, + CallStatus.ERROR, + ] + for call in self.blueapi_calls + ): + return Status.IN_PROGRESS + return Status.QUEUED class TaskWithPosition(Task): diff --git a/src/daq_queuing_service/task_queue/queue.py b/src/daq_queuing_service/task_queue/queue.py index f89424d..26de0b5 100644 --- a/src/daq_queuing_service/task_queue/queue.py +++ b/src/daq_queuing_service/task_queue/queue.py @@ -1,10 +1,13 @@ import asyncio import logging -from collections.abc import Sequence +from collections.abc import Callable, Sequence +from types import TracebackType +from typing import Any from blueapi.worker.event import TaskError, TaskResult from pydantic import BaseModel +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus from daq_queuing_service.task import Status, Task, TaskWithPosition from daq_queuing_service.task_queue.queue_utils import ( NegativePositionError, @@ -17,6 +20,11 @@ LOGGER = logging.getLogger(__name__) +Converter = Callable[ + [list[TaskWithPosition], list[TaskWithPosition], list[BlueapiCall]], + list[BlueapiCall], +] + class TaskRegistry(dict[str, Task]): def __missing__(self, task_id: str) -> Task: @@ -27,99 +35,154 @@ class QueueState(BaseModel): paused: bool +class Modifying(asyncio.Condition): + def __init__(self, on_exit: Callable[[], Any]): + super().__init__() + self._on_exit = on_exit + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ): + self._on_exit() + return await super().__aexit__(exc_type, exc, tb) + + class TaskQueue: - def __init__(self): + def __init__( + self, + convert: Converter, + ): self._tasks: TaskRegistry = TaskRegistry() self._queue: list[str] = [] self._history: list[str] = [] - self._condition = asyncio.Condition() + self._call_queue: list[BlueapiCall] = [] + self._call_history: list[BlueapiCall] = [] + self._queue_history: list[BlueapiCall] = [] self._state: QueueState = QueueState(paused=True) + self._convert = convert + self._modifying = Modifying(on_exit=self._sync) - async def claim_next_task_once_available(self) -> Task: - """Waits until a task is available before returning the task. A task is - available if it's at the top of the queue, is not already in progress or + def _sync(self): + """Syncs the task queue with the call queue, applying a conversion from queue of + tasks to a queue of blueapi calls. This is called every time the queue is + modified, and also right before a call is popped off the front of the queue. + """ + for task_id in list(self._queue): + task = self._tasks[task_id] + if task.status == Status.COMPLETE: + # Move task from queue to history if all blueapi calls complete + self._queue.remove(task_id) + self._history.append(task_id) + elif task.status != Status.IN_PROGRESS: + # If task is not in progress calls will be re-calculated + task.blueapi_calls = [] + + self._call_queue = [ + # Persist calls who's parent task is in progress + # Once a task is in progress it is not provided to the converter + # More work needed to allow for interleaved calls from different tasks, + # and for in progress tasks to inform conversion + call + for call in self._call_queue + if call.parent_task_id + and call.parent_task_id in self._queue + and self._tasks[call.parent_task_id].status == Status.IN_PROGRESS + ] + + new_queue = self._convert( + [task for task in self._get_queue() if task.status == Status.QUEUED], + self._get_history(), + self._queue_history, + ) + + for call in new_queue: + # Add children to parent tasks + if call.parent_task_id: + self._tasks[call.parent_task_id].blueapi_calls.append(call) + + self._call_queue.extend(new_queue) + + self._modifying.notify_all() + + async def get_next_call_once_available(self) -> BlueapiCall: + """Waits until a call is available before returning the call. A call is + available if it's at the top of the call queue, is not already in progress or claimed, and the queue is not paused. Returns: - Task: The task at the top of the queue + BlueapiCall: The call at the top of the queue """ - async with self._condition: + async with self._modifying: while not self._task_available(): - await self._condition.wait() - task = self._tasks[self._queue[0]] - task.claim() - self._condition.notify_all() - LOGGER.info(f"Task {task.id} has been claimed") - return task - - async def wait_until_task_available(self): + await self._modifying.wait() + self._sync() # Do conversion here to ensure conditions are up to date + call = self._call_queue[0] + call.claim() + LOGGER.info(f"Plan {call} has been claimed") + return call + + async def wait_until_call_available(self): """Waits until a task is available before returning. A task is available if it's at the top of the queue, is not already in progress or claimed, and the queue is not paused. """ - async with self._condition: + async with self._modifying: while not self._task_available(): - await self._condition.wait() + await self._modifying.wait() - async def return_task_to_queue(self, task: Task): + async def return_call_to_queue(self, call: BlueapiCall): """Returns a task to the queue that had previously been claimed Args: task (Task): The task to return Raises: - TaskNotClaimedError: Raised if the task's status is not have a 'Claimed' + TaskNotClaimedError: Raised if the task's status is not 'Claimed' """ - self._check_task_valid_to_be_returned(task) - async with self._condition: - match task.status: - case Status.CLAIMED: - assert task.id == self._queue[0] - task.wait() + self._check_call_valid_to_be_returned(call) + async with self._modifying: + match call.status: + case CallStatus.CLAIMED: + assert call == self._call_queue[0] + call.wait() case _: raise TaskNotClaimedError( - f"Cannot return task {task.id}, " - + f"it's status is {task.status}." + f"Cannot return call {call}, " + + f"it's status is {call.status}." ) - self._condition.notify_all() - LOGGER.info(f"Task {task.id} has been returned to the queue") + LOGGER.info(f"Call {call} has been returned to the queue") - async def complete_task(self, task: Task, result: TaskResult): + async def complete_call(self, call: BlueapiCall, result: TaskResult): """Sets a task to complete, removes it from the queue and adds it to history Args: task (Task): Task to be completed result (TaskResult): The result of the task from blueapi """ - async with self._condition: - self._check_task_valid_to_be_returned(task) - assert self._queue[0] == task.id, ( - f"This task is not at the front of the queue: {task}" - ) - task.succeed(result) - self._queue.pop(0) - self._history.append(task.id) - self._condition.notify_all() - LOGGER.info(f"Task {task.id} has been completed successfully: {result}") + async with self._modifying: + self._check_call_valid_to_be_returned(call) + call.succeed(result) + self._call_history.append(call) + LOGGER.info(f"Plan {call} has been completed successfully: {result}") - async def fail_task(self, task: Task, errors: list[str | TaskError] | None = None): - """Sets a task to failed, removes it from the queue and adds it to history + async def fail_call( + self, call: BlueapiCall, errors: list[str | TaskError] | None = None + ): + """Sets a task to failed, removes it from the call queue and adds it to history. Args: task (Task): The task to fail errors (list[str | TaskError] | None, optional): A list of errors that occurred when trying to run the task. Defaults to None. """ - async with self._condition: - self._check_task_valid_to_be_returned(task) - assert self._queue[0] == task.id, ( - f"This task is not at the front of the queue: {task}" - ) - task.fail(errors) - self._queue.pop(0) - self._history.append(task.id) - self._condition.notify_all() - LOGGER.info(f"Task {task.id} has failed with the following errors: {errors}") + async with self._modifying: + self._check_call_valid_to_be_returned(call) + call.fail(errors) + self._call_history.append(call) + LOGGER.info(f"Call {call} has failed with the following errors: {errors}") async def get_task_by_id(self, task_id: str) -> TaskWithPosition: """Returns a task based on it's task ID @@ -134,7 +197,7 @@ async def get_task_by_id(self, task_id: str) -> TaskWithPosition: TaskNotFoundError: Raised if the no task exists with the requested task ID. """ # Returns copy so don't have to be worried about caller modifying task. - async with self._condition: + async with self._modifying: return self._get_task_by_id(task_id) def _get_task_by_id(self, task_id: str) -> TaskWithPosition: @@ -153,7 +216,7 @@ async def get_task_by_position(self, position: int) -> TaskWithPosition | None: if no task exists at the requested position. """ # Returns copy so don't have to be worried about caller modifying task. - async with self._condition: + async with self._modifying: if position < -self.length or position >= self.length: return None return self._get_task_by_id(self._queue[position]) @@ -166,7 +229,7 @@ async def get_queue(self) -> list[TaskWithPosition]: will be run in. """ # Returns copies so don't have to be worried about caller modifying tasks. - async with self._condition: + async with self._modifying: return self._get_queue() async def get_history(self) -> list[TaskWithPosition]: @@ -177,7 +240,7 @@ async def get_history(self) -> list[TaskWithPosition]: chronological order. """ # Returns copies so don't have to be worried about caller modifying tasks. - async with self._condition: + async with self._modifying: return self._get_history() async def get_tasks(self) -> list[TaskWithPosition]: @@ -188,7 +251,7 @@ async def get_tasks(self) -> list[TaskWithPosition]: with the history. """ # Returns copies so don't have to be worried about caller modifying tasks. - async with self._condition: + async with self._modifying: return self._get_history() + self._get_queue() async def add_tasks(self, tasks: list[Task], position: int | None = None) -> None: @@ -202,12 +265,11 @@ async def add_tasks(self, tasks: list[Task], position: int | None = None) -> Non tasks (list[Task]): List of tasks to add position (int | None, optional): Position of the tasks. Defaults to None. """ - async with self._condition: + async with self._modifying: self._validate_new_tasks(tasks) if position is not None: position = self._get_valid_position(position) self._add_tasks(tasks, position) - self._condition.notify_all() LOGGER.info(f"Successfully added tasks to queue: {[task.id for task in tasks]}") async def move_task(self, task_id: str, position: int) -> int: @@ -222,12 +284,11 @@ async def move_task(self, task_id: str, position: int) -> int: Returns: int: The new position of the task (may be different to what was requested) """ - async with self._condition: + async with self._modifying: self._validate_tasks_for_move_or_deletion([task_id]) position = self._get_valid_position(position) self._remove_tasks_from_queue([task_id]) self._queue[position:position] = [task_id] - self._condition.notify_all() new_position = self._queue.index(task_id) LOGGER.info(f"Succesfully moved task {task_id} to position {new_position}") return new_position @@ -243,14 +304,13 @@ async def cancel_tasks(self, task_ids: Sequence[str]) -> list[Task]: Returns: list[Task]: List of the task objects that were removed from the queue. """ - async with self._condition: + async with self._modifying: task_ids = list(task_ids) self._validate_tasks_for_move_or_deletion(task_ids) self._remove_tasks_from_queue(task_ids) tasks = self._remove_tasks_from_registry(task_ids) for task in tasks: task.cancel() - self._condition.notify_all() LOGGER.info(f"Succesfully cancelled tasks: {task_ids}") return tasks @@ -258,11 +318,10 @@ async def clear_history(self): """Clears the history list. Any task in the history list at the time will be deleted permanently and inaccessible. """ - async with self._condition: + async with self._modifying: for task_id in self._history: self._tasks.pop(task_id) self._history.clear() - self._condition.notify_all() LOGGER.info("Succesfully cleared history") async def update_state(self, paused: bool | None = None) -> QueueState: @@ -274,11 +333,10 @@ async def update_state(self, paused: bool | None = None) -> QueueState: Returns: QueueState: The new state of the queue. """ - async with self._condition: + async with self._modifying: self._state = QueueState( paused=self._state.paused if paused is None else paused ) - self._condition.notify_all() LOGGER.info(f"Succesfully updated queue state to {self._state}") return self._state @@ -298,26 +356,28 @@ def _task_available(self) -> bool: Returns: bool: Whether or not the queue has a task available. """ - if self._state.paused or not self._queue: + if self._state.paused or not self._call_queue: return False - return self._tasks[self._queue[0]].status == Status.WAITING + return self._call_queue[0].status == CallStatus.WAITING - def _check_task_valid_to_be_returned(self, task: Task): + def _check_call_valid_to_be_returned(self, call: BlueapiCall): # Check caller has actual task object not copy # This ensures the caller has claimed the task, reducing the chance a task is # returned that is actually still being run/modified by a different process. # However if the worker crashes we then lose the Task object and can't return # the task? Needs discussion with others. - assert task is self._tasks[task.id] - assert task.id in self._queue, f"This task is not in the queue: {task}" + assert call is self._call_queue[0] + assert call.parent_task_id in self._queue, ( + f"This call has no parent task: {call}" + ) def _get_valid_position(self, position: int) -> int: if position < 0: raise NegativePositionError(f"Position must be >= 0, got {position}") if ( # if position 0 requested but a task is in progress, return position 1 - self.length - and position == 0 - and self._tasks[self._queue[0]].status != Status.WAITING + position == 0 + and self.length + and self._tasks[self._queue[0]].status != Status.QUEUED ): return 1 return position @@ -375,7 +435,7 @@ def _validate_tasks_for_move_or_deletion(self, task_ids: list[str]): task = self._tasks[task_id] if task_id not in self._queue: raise TaskNotInQueueError(f"Task {task_id} isn't present in queue") - if task.status != Status.WAITING: + if task.status != Status.QUEUED: raise TaskInProgressError( f"Cannot move task '{task_id}', it is currently in progress!" ) @@ -391,3 +451,11 @@ def _get_history(self) -> list[TaskWithPosition]: TaskWithPosition.from_task(self._tasks[task_id]) for task_id in self._history ] + + async def get_call_queue(self) -> list[BlueapiCall]: + async with self._modifying: + return list(self._call_queue) + + async def get_call_history(self) -> list[BlueapiCall]: + async with self._modifying: + return list(self._call_history) diff --git a/src/daq_queuing_service/worker/worker.py b/src/daq_queuing_service/worker/worker.py index f475e18..f2849c6 100644 --- a/src/daq_queuing_service/worker/worker.py +++ b/src/daq_queuing_service/worker/worker.py @@ -15,8 +15,9 @@ from blueapi.worker import ProgressEvent, TaskStatus, WorkerEvent, WorkerState from blueapi.worker.event import TaskError, TaskResult -from daq_queuing_service.blueapi_adapter import BlueapiClientAdapter -from daq_queuing_service.task import ExperimentDefinition, Status, Task +from daq_queuing_service.blueapi_interaction.blueapi_adapter import BlueapiClientAdapter +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus +from daq_queuing_service.task import ExperimentDefinition from daq_queuing_service.task_queue.queue import TaskQueue LOGGER = logging.getLogger(__name__) @@ -38,16 +39,16 @@ def __init__( async def run_loop(self): while True: - next_task = await self._wait_for_next_task() - await self._process_task(next_task) + next_call = await self._wait_for_next_task() + await self._process_call(next_call) self._at_loop_end() def _at_loop_end(self): LOGGER.info("Loop finished") - async def _wait_for_next_task(self): + async def _wait_for_next_task(self) -> BlueapiCall: while True: - await self._queue.wait_until_task_available() + await self._queue.wait_until_call_available() result = await self._client.get_state() if result.value == WorkerState.IDLE: break @@ -55,18 +56,17 @@ async def _wait_for_next_task(self): f"Waiting for BlueAPI worker to be IDLE, currently {result.value}" ) await asyncio.sleep(self.poll_time_s) - return await self._queue.claim_next_task_once_available() + return await self._queue.get_next_call_once_available() - async def _process_task(self, task: Task): - task_request = self._task_request_constructor(task.experiment_definition) - LOGGER.info(f"Sending task {task.id} to BlueAPI") + async def _process_call(self, call: BlueapiCall): + LOGGER.info(f"Sending call {call} to BlueAPI") result = await self._client.run_task( - task_request, on_event=partial(self._on_blueapi_event, task=task) + call.task_request, on_event=partial(self._on_blueapi_event, call=call) ) if result.error: - await self._handle_run_task_error(task, result.error) + await self._handle_run_task_error(call, result.error) return assert result.value @@ -75,28 +75,27 @@ async def _process_task(self, task: Task): match task_status.result: case TaskResult(): LOGGER.debug( - f"Task {task.id} completed succesfully: {task_status.result}" + f"Call {call} completed succesfully: {task_status.result}" ) - print("Task completed") - await self._queue.complete_task(task, task_status.result) + await self._queue.complete_call(call, task_status.result) case TaskError(): - LOGGER.debug(f"Task {task.id} failed: {task_status.result}") - await self._queue.fail_task(task, [task_status.result]) + LOGGER.debug(f"Call {call} failed: {task_status.result}") + await self._queue.fail_call(call, [task_status.result]) @staticmethod - def _on_blueapi_event(event: AnyEvent, task: Task): + def _on_blueapi_event(event: AnyEvent, call: BlueapiCall): match event: case WorkerEvent() as worker_event: if ( worker_event.state == WorkerState.RUNNING - and task.status != Status.IN_PROGRESS + and call.status != CallStatus.IN_PROGRESS ): assert worker_event.task_status - task.blueapi_id = worker_event.task_status.task_id + call.blueapi_id = worker_event.task_status.task_id LOGGER.info( - f"Task {task.id} is in progress, blueapi ID: " + task.blueapi_id + f"Call {call} is in progress, blueapi ID: {call.blueapi_id}" ) - task.put_in_progress() + call.put_in_progress() case ProgressEvent(): pass @@ -105,7 +104,7 @@ def _on_blueapi_event(event: AnyEvent, task: Task): async def _handle_run_task_error( self, - task: Task, + call: BlueapiCall, error: InvalidParametersError | UnknownPlanError | ServiceUnavailableError @@ -114,12 +113,11 @@ async def _handle_run_task_error( ): match error: case InvalidParametersError(): - await self._queue.fail_task( - task, - errors=[str(error) for error in error.errors], + await self._queue.fail_call( + call, errors=[str(error) for error in error.errors] ) case UnknownPlanError(): - await self._queue.fail_task(task, ["Unknown plan", str(error)]) + await self._queue.fail_call(call, ["Unknown plan", str(error)]) case BlueskyRemoteControlError() | ServiceUnavailableError(): # We get this error if the blueapi worker is busy or unavailable - await self._queue.return_task_to_queue(task) + await self._queue.return_call_to_queue(call) diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index 50b1bed..aa46779 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -1,6 +1,9 @@ import pytest from blueapi.worker.event import TaskError, TaskResult +from daq_queuing_service.plugins.construct_task_request import ( + construct_blueapi_call_list, +) from daq_queuing_service.task import ExperimentDefinition, Task from daq_queuing_service.task_queue.queue import TaskQueue @@ -20,7 +23,7 @@ def tasks() -> list[Task]: @pytest.fixture async def task_queue(tasks: list[Task]): - queue = TaskQueue() + queue = TaskQueue(convert=construct_blueapi_call_list) await queue.update_state(paused=False) await queue.add_tasks(tasks) return queue @@ -28,43 +31,37 @@ async def task_queue(tasks: list[Task]): @pytest.fixture async def task_queue_claimed(task_queue: TaskQueue): - first_task_id = task_queue._queue[0] - first_task = task_queue._tasks[first_task_id] - first_task.claim() + _ = task_queue.get_next_call_once_available() return task_queue @pytest.fixture -async def task_queue_in_progress(task_queue_claimed: TaskQueue): - first_task_id = task_queue_claimed._queue[0] - first_task = task_queue_claimed._tasks[first_task_id] - first_task.blueapi_id = "blueapi_id_0" - first_task.put_in_progress() - return task_queue_claimed +async def task_queue_in_progress(task_queue: TaskQueue): + first_call = await task_queue.get_next_call_once_available() + first_call.put_in_progress() + return task_queue @pytest.fixture async def task_queue_with_history(task_queue: TaskQueue): for i in range(2): - task = await task_queue.claim_next_task_once_available() - task.blueapi_id = f"blueapi_id_{i}" - task.put_in_progress() + call = await task_queue.get_next_call_once_available() + call.put_in_progress() if i % 2: - await task_queue.complete_task( - task, TaskResult(result=None, type="NoneType") + await task_queue.complete_call( + call, TaskResult(result=None, type="NoneType") ) else: - await task_queue.fail_task( - task, [TaskError(type="ValueError", message="Error during plan")] + await task_queue.fail_call( + call, [TaskError(type="ValueError", message="Error during plan")] ) # By this point should have 3 tasks in queue and 2 in history - for i, task_id in enumerate(task_queue._history): + for i, call in enumerate(task_queue._call_history): # Real timestamps will break tests - task_queue._tasks[task_id].time_started = f"2026-04-17T15:0{i}:00.000000" - task_queue._tasks[task_id].time_completed = f"2026-04-17T15:0{i}:59.000000" + call.time_started = f"2026-04-17T15:0{i}:00.000000" + call.time_completed = f"2026-04-17T15:0{i}:59.000000" - task = await task_queue.claim_next_task_once_available() - task.blueapi_id = f"blueapi_id_{2}" - task.put_in_progress() - task.time_started = "2026-04-17T15:02:00.000000" + call = await task_queue.get_next_call_once_available() + call.put_in_progress() + call.time_started = "2026-04-17T15:02:00.000000" return task_queue diff --git a/tests/unit_tests/test_api.py b/tests/unit_tests/test_api.py index 63493b5..e2934de 100644 --- a/tests/unit_tests/test_api.py +++ b/tests/unit_tests/test_api.py @@ -7,9 +7,11 @@ BlueapiRestClient, InvalidParametersError, ParameterError, + ServiceUnavailableError, UnknownPlanError, ) from blueapi.service.model import ( + TaskRequest, TaskResponse, ) from fastapi import FastAPI @@ -23,6 +25,7 @@ create_api_router, ) from daq_queuing_service.api.errors import register_exception_handlers +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus from daq_queuing_service.task import ( ExperimentDefinition, Status, @@ -101,11 +104,22 @@ def test_get_queued_tasks_returns_queued_task(test_client: TestClient): }, "id": "2", "status": "In progress", - "time_started": "2026-04-17T15:02:00.000000", - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": "blueapi_id_2", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "2", + "status": "In progress", + "time_started": "2026-04-17T15:02:00.000000", + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 0, }, { @@ -116,12 +130,23 @@ def test_get_queued_tasks_returns_queued_task(test_client: TestClient): "instrument_session": "", }, "id": "3", - "status": "Waiting", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "status": "Queued", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "3", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 1, }, { @@ -132,12 +157,23 @@ def test_get_queued_tasks_returns_queued_task(test_client: TestClient): "instrument_session": "", }, "id": "4", - "status": "Waiting", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "status": "Queued", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "4", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 2, }, ] @@ -146,7 +182,6 @@ def test_get_queued_tasks_returns_queued_task(test_client: TestClient): def test_get_queued_tasks_can_filter_by_task_status(test_client: TestClient): response = test_client.get("/queue", params={"status": Status.IN_PROGRESS}) assert response.status_code == 200 - print(response.json()) assert response.json() == [ { "experiment_definition": { @@ -156,13 +191,24 @@ def test_get_queued_tasks_can_filter_by_task_status(test_client: TestClient): "instrument_session": "", }, "id": "2", - "status": "In progress", - "time_started": "2026-04-17T15:02:00.000000", - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": "blueapi_id_2", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "2", + "status": "In progress", + "time_started": "2026-04-17T15:02:00.000000", + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 0, + "status": "In progress", } ] @@ -178,9 +224,42 @@ async def test_get_all_tasks_returns_all_tasks( async def test_get_all_tasks_can_filter_by_task_status(test_client: TestClient): - response = test_client.get("/tasks", params={"status": Status.SUCCESS}) + response = test_client.get("/tasks", params={"status": Status.COMPLETE}) assert response.status_code == 200 assert response.json() == [ + { + "experiment_definition": { + "plan_name": "test", + "sample_id": "0", + "params": {}, + "instrument_session": "", + }, + "id": "0", + "status": "Complete", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "0", + "status": "Error", + "time_started": "2026-04-17T15:00:00.000000", + "time_completed": "2026-04-17T15:00:59.000000", + "result": None, + "errors": [ + { + "outcome": "error", + "type": "ValueError", + "message": "Error during plan", + } + ], + "blueapi_id": None, + } + ], + "position": None, + }, { "experiment_definition": { "plan_name": "test", @@ -189,14 +268,29 @@ async def test_get_all_tasks_can_filter_by_task_status(test_client: TestClient): "instrument_session": "", }, "id": "1", - "status": "Success", - "time_started": "2026-04-17T15:01:00.000000", - "time_completed": "2026-04-17T15:01:59.000000", - "errors": [], - "result": {"outcome": "success", "result": None, "type": "NoneType"}, - "blueapi_id": "blueapi_id_1", + "status": "Complete", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "1", + "status": "Success", + "time_started": "2026-04-17T15:01:00.000000", + "time_completed": "2026-04-17T15:01:59.000000", + "result": { + "outcome": "success", + "result": None, + "type": "NoneType", + }, + "errors": [], + "blueapi_id": None, + } + ], "position": None, - } + }, ] @@ -210,36 +304,6 @@ async def test_get_completed_tasks_returns_completed_tasks( ) -async def test_get_completed_tasks_can_filter_by_status(test_client: TestClient): - - response = test_client.get("/history", params={"status": Status.ERROR}) - assert response.status_code == 200 - assert response.json() == [ - { - "experiment_definition": { - "plan_name": "test", - "sample_id": "0", - "params": {}, - "instrument_session": "", - }, - "id": "0", - "status": "Error", - "time_started": "2026-04-17T15:00:00.000000", - "time_completed": "2026-04-17T15:00:59.000000", - "errors": [ - { - "outcome": "error", - "type": "ValueError", - "message": "Error during plan", - } - ], - "result": None, - "blueapi_id": "blueapi_id_0", - "position": None, - } - ] - - async def test_add_tasks_to_queue_validates_and_adds_to_queue_and_and_returns_task_ids( test_client: TestClient, task_queue_with_history: TaskQueue ): @@ -268,13 +332,21 @@ async def test_add_tasks_to_queue_validates_and_adds_to_queue_and_and_returns_ta params={"time": 10}, instrument_session="abc", ), - id=task_ids[0], - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], - result=None, - blueapi_id=None, + id=task_ids[-1], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest( + name="add_tasks", params={"time": 10}, instrument_session="abc" + ), + parent_task_id=task_ids[-1], + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + result=None, + errors=[], + blueapi_id=None, + ) + ], position=3, ) @@ -511,11 +583,22 @@ async def test_cancel_tasks_removes_task_from_queue_and_returns_tasks( }, "id": "3", "status": "Cancelled", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "3", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], }, { "experiment_definition": { @@ -526,11 +609,22 @@ async def test_cancel_tasks_removes_task_from_queue_and_returns_tasks( }, "id": "4", "status": "Cancelled", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "4", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], }, ] @@ -603,12 +697,23 @@ def test_get_task_by_position_returns_expected_task(test_client: TestClient): "instrument_session": "", }, "id": "3", - "status": "Waiting", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "status": "Queued", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "3", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 1, } @@ -624,12 +729,23 @@ def test_get_task_by_id_returns_expected_task(test_client: TestClient): "instrument_session": "", }, "id": "3", - "status": "Waiting", - "time_started": None, - "time_completed": None, - "errors": [], - "result": None, - "blueapi_id": None, + "status": "Queued", + "blueapi_calls": [ + { + "task_request": { + "name": "test", + "params": {}, + "instrument_session": "", + }, + "parent_task_id": "3", + "status": "Waiting", + "time_started": None, + "time_completed": None, + "result": None, + "errors": [], + "blueapi_id": None, + } + ], "position": 1, } @@ -643,6 +759,22 @@ def test_get_task_by_id_gives_error_if_task_id_does_not_exist(test_client: TestC } +def test_get_call_queue_returns_calls_in_call_queue( + test_client: TestClient, task_queue_with_history: TaskQueue +): + response = test_client.get("/call_queue") + assert response.status_code == 200 + assert response.json() == jsonable_encoder(task_queue_with_history._call_queue) + + +def test_get_call_history_returns_calls_in_call_queue( + test_client: TestClient, task_queue_with_history: TaskQueue +): + response = test_client.get("/call_history") + assert response.status_code == 200 + assert response.json() == jsonable_encoder(task_queue_with_history._call_history) + + async def test_clear_history_deletes_history( test_client: TestClient, task_queue_with_history: TaskQueue ): @@ -672,6 +804,26 @@ def test_queue_error_caught_by_error_handler(test_client: TestClient): assert response.status_code == 409 +def test_blueapi_connection_error_caught_by_error_handler(test_client: TestClient): + with patch( + "daq_queuing_service.api.api._validate_tasks_with_blueapi", + side_effect=ServiceUnavailableError("Can't connect to blueapi"), + ): + response = test_client.post( + "/queue", + json=[ + { + "plan_name": "add_tasks", + "sample_id": "1", + "params": {"time": 10}, + "instrument_session": "abc", + } + ], + ) + + assert response.status_code == 404 + + def test__validate_tasks_with_blueapi_calls_create_task_and_then_removes_task( blueapi_client: BlueapiRestClient, tasks: list[Task] ): diff --git a/tests/unit_tests/test_blueapi_adapter.py b/tests/unit_tests/test_blueapi_adapter.py index 7bb88eb..cec1dd0 100644 --- a/tests/unit_tests/test_blueapi_adapter.py +++ b/tests/unit_tests/test_blueapi_adapter.py @@ -12,7 +12,10 @@ from blueapi.worker import WorkerState from blueapi.worker.event import TaskResult, TaskStatus -from daq_queuing_service.blueapi_adapter import BlueapiClientAdapter, BlueapiResult +from daq_queuing_service.blueapi_interaction.blueapi_adapter import ( + BlueapiClientAdapter, + BlueapiResult, +) async def test_blueapi_adapter_get_state_returns_blueapi_response_with_state(): diff --git a/tests/unit_tests/test_blueapi_call.py b/tests/unit_tests/test_blueapi_call.py new file mode 100644 index 0000000..28ab74d --- /dev/null +++ b/tests/unit_tests/test_blueapi_call.py @@ -0,0 +1,113 @@ +import pytest +from blueapi.service.model import TaskRequest +from blueapi.worker.event import TaskResult + +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus + + +@pytest.mark.parametrize( + "old_status, new_status", + [ + [CallStatus.WAITING, CallStatus.WAITING], + [CallStatus.WAITING, CallStatus.IN_PROGRESS], + [CallStatus.WAITING, CallStatus.SUCCESS], + [CallStatus.WAITING, CallStatus.ERROR], + [CallStatus.CLAIMED, CallStatus.CLAIMED], + [CallStatus.CLAIMED, CallStatus.SUCCESS], + [CallStatus.IN_PROGRESS, CallStatus.WAITING], + [CallStatus.IN_PROGRESS, CallStatus.CLAIMED], + [CallStatus.IN_PROGRESS, CallStatus.IN_PROGRESS], + [CallStatus.SUCCESS, CallStatus.WAITING], + [CallStatus.SUCCESS, CallStatus.CLAIMED], + [CallStatus.SUCCESS, CallStatus.IN_PROGRESS], + [CallStatus.SUCCESS, CallStatus.SUCCESS], + [CallStatus.SUCCESS, CallStatus.ERROR], + [CallStatus.ERROR, CallStatus.WAITING], + [CallStatus.ERROR, CallStatus.CLAIMED], + [CallStatus.ERROR, CallStatus.IN_PROGRESS], + [CallStatus.ERROR, CallStatus.SUCCESS], + [CallStatus.ERROR, CallStatus.ERROR], + ], +) +def test__update_status_raises_error_when_transitioned_to_wrong_status( + old_status: CallStatus, new_status: CallStatus +): + call = BlueapiCall( + status=old_status, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + with pytest.raises(ValueError): + call._update_status(new_status) + + +@pytest.mark.parametrize( + "old_status, new_status", + [ + [CallStatus.WAITING, CallStatus.CLAIMED], + [CallStatus.CLAIMED, CallStatus.WAITING], + [CallStatus.CLAIMED, CallStatus.IN_PROGRESS], + [CallStatus.CLAIMED, CallStatus.ERROR], + [CallStatus.IN_PROGRESS, CallStatus.SUCCESS], + [CallStatus.IN_PROGRESS, CallStatus.ERROR], + ], +) +def test__update_status_changes_status_when_correct_new_status_given( + old_status: CallStatus, new_status: CallStatus +): + call = BlueapiCall( + status=old_status, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call._update_status(new_status) + assert call.status == new_status + + +def test_wait_updates_status_to_waiting(): + call = BlueapiCall( + status=CallStatus.CLAIMED, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.wait() + assert call.status == CallStatus.WAITING + + +def test_claim_updates_status_to_claimed(): + call = BlueapiCall( + status=CallStatus.WAITING, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.claim() + assert call.status == CallStatus.CLAIMED + + +def test_put_in_progress_updates_status_to_in_progress_and_adds_fields(): + call = BlueapiCall( + status=CallStatus.CLAIMED, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.blueapi_id = "blueapi_id" + call.put_in_progress() + assert call.status == CallStatus.IN_PROGRESS + assert call.time_started is not None + assert call.blueapi_id == "blueapi_id" + + +def test_succeed_updates_status_to_success_and_adds_time_completed(): + call = BlueapiCall( + status=CallStatus.IN_PROGRESS, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.succeed(TaskResult(result=None, type="NoneType")) + assert call.status == CallStatus.SUCCESS + assert call.time_completed is not None + + +def test_fail_updates_status_to_error_and_adds_time_completed_and_errors(): + call = BlueapiCall( + status=CallStatus.IN_PROGRESS, + task_request=TaskRequest(name="", params={}, instrument_session=""), + ) + call.fail(["errors", "more_errors"]) + assert call.status == CallStatus.ERROR + assert call.time_completed is not None + assert call.errors == ["errors", "more_errors"] diff --git a/tests/unit_tests/test_queue.py b/tests/unit_tests/test_queue.py index 4cc6b01..7387269 100644 --- a/tests/unit_tests/test_queue.py +++ b/tests/unit_tests/test_queue.py @@ -2,8 +2,13 @@ import copy import pytest +from blueapi.service.model import TaskRequest from blueapi.worker.event import TaskError, TaskResult +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus +from daq_queuing_service.plugins.construct_task_request import ( + construct_blueapi_call_list, +) from daq_queuing_service.task import ExperimentDefinition, Status, Task from daq_queuing_service.task_queue.queue import ( TaskQueue, @@ -36,6 +41,21 @@ async def test_add_tasks_adds_to_end_when_no_position_given(task_queue: TaskQueu assert set(task_queue._tasks.keys()) == {"0", "1", "2", "3", "4", "new"} +async def test_add_tasks_adds_to_call_queue(): + task_queue = TaskQueue(convert=construct_blueapi_call_list) + await task_queue.add_tasks([make_new_task("new"), make_new_task("new_2")]) + assert task_queue._call_queue == [ + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="new", + ), + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="new_2", + ), + ] + + async def test_add_tasks_with_position_works_as_expected(task_queue: TaskQueue): new_task = make_new_task("new") await task_queue.add_tasks([new_task], 2) @@ -78,7 +98,7 @@ async def test_add_task_to_position_0_adds_to_position_0_if_first_task_waiting( ): new_tasks = new_tasks = [make_new_task("new"), make_new_task("new_2")] first_task = await task_queue.get_task_by_position(0) - assert first_task and first_task.status == Status.WAITING + assert first_task and first_task.status == Status.QUEUED await task_queue.add_tasks(new_tasks, 0) @@ -124,7 +144,7 @@ async def test_move_task_works_as_expected_and_returns_new_position( expected_order: list[int], expected_return_value: int, ): - queue = TaskQueue() + queue = TaskQueue(convert=construct_blueapi_call_list) tasks = [make_new_task(str(i)) for i in range(10)] await queue.add_tasks(tasks) task = str(task_to_move) @@ -144,32 +164,23 @@ async def test_move_task_to_position_0_moves_to_position_1_if_first_task_in_prog new_position = await task_queue_in_progress.move_task("4", 0) assert new_position == 1 - assert task_queue_in_progress._queue == ["0", "4", "1", "2", "3"] + expected_order = ["0", "4", "1", "2", "3"] + assert task_queue_in_progress._queue == expected_order + assert [ + call.parent_task_id for call in task_queue_in_progress._call_queue + ] == expected_order async def test_move_task_to_position_0_moves_to_position_0_if_first_task_waiting( task_queue: TaskQueue, ): task = await task_queue.get_task_by_position(0) - assert task and task.status == Status.WAITING + assert task and task.status == Status.QUEUED await task_queue.move_task("4", 0) assert task_queue._queue == ["4", "0", "1", "2", "3"] -async def test_move_task_does_not_move_task_that_is_claimed_and_raises_error( - task_queue_claimed: TaskQueue, -): - task = await task_queue_claimed.get_task_by_position(0) - assert task and task.status == Status.CLAIMED - - with pytest.raises(TaskInProgressError): - await task_queue_claimed.move_task("0", 3) - - assert task_queue_claimed._queue == ["0", "1", "2", "3", "4"] - assert set(task_queue_claimed._tasks.keys()) == {"0", "1", "2", "3", "4"} - - async def test_move_task_does_not_move_task_that_is_in_progress_and_raises_error( task_queue_in_progress: TaskQueue, ): @@ -200,19 +211,6 @@ async def test_remove_tasks_works_as_expected(task_queue: TaskQueue): assert task_queue._queue == ["0", "1", "3"] -async def test_remove_tasks_does_not_remove_task_that_is_claimed_and_raises_error( - task_queue_claimed: TaskQueue, -): - task = await task_queue_claimed.get_task_by_position(0) - assert task and task.status == Status.CLAIMED - - with pytest.raises(TaskInProgressError): - await task_queue_claimed.cancel_tasks(["0", "2"]) - - assert task_queue_claimed._queue == ["0", "1", "2", "3", "4"] - assert set(task_queue_claimed._tasks.keys()) == {"0", "1", "2", "3", "4"} - - async def test_remove_tasks_does_not_remove_task_that_is_in_progress_and_raises_error( task_queue_in_progress: TaskQueue, ): @@ -255,22 +253,33 @@ async def test_get_queue_only_returns_tasks_in_queue( plan_name="test", sample_id="2", instrument_session="" ), id="2", - status=Status.IN_PROGRESS, - time_started="2026-04-17T15:02:00.000000", - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="2", + status=CallStatus.IN_PROGRESS, + time_started="2026-04-17T15:02:00.000000", + time_completed=None, + errors=[], + ) + ], position=0, - blueapi_id="blueapi_id_2", ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="3", instrument_session="" ), id="3", - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="3", + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + errors=[], + ) + ], position=1, ), TaskWithPosition( @@ -278,10 +287,16 @@ async def test_get_queue_only_returns_tasks_in_queue( plan_name="test", sample_id="4", instrument_session="" ), id="4", - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="4", + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + errors=[], + ) + ], position=2, ), ] @@ -298,30 +313,42 @@ async def test_get_history_only_returns_tasks_in_history( plan_name="test", sample_id="0", instrument_session="" ), id="0", - status=Status.ERROR, - time_started="2026-04-17T15:00:00.000000", - time_completed="2026-04-17T15:00:59.000000", - errors=[ - TaskError( - outcome="error", type="ValueError", message="Error during plan" + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="0", + status=CallStatus.ERROR, + time_started="2026-04-17T15:00:00.000000", + time_completed="2026-04-17T15:00:59.000000", + errors=[ + TaskError( + outcome="error", + type="ValueError", + message="Error during plan", + ) + ], + result=None, ) ], position=None, - blueapi_id="blueapi_id_0", - result=None, ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="1", instrument_session="" ), id="1", - status=Status.SUCCESS, - time_started="2026-04-17T15:01:00.000000", - time_completed="2026-04-17T15:01:59.000000", - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="1", + status=CallStatus.SUCCESS, + time_started="2026-04-17T15:01:00.000000", + time_completed="2026-04-17T15:01:59.000000", + errors=[], + result=TaskResult(result=None, type="NoneType"), + ) + ], position=None, - blueapi_id="blueapi_id_1", - result=TaskResult(result=None, type="NoneType"), ), ] @@ -338,66 +365,93 @@ async def test_get_tasks_returns_tasks_in_queue_and_history( plan_name="test", sample_id="0", instrument_session="" ), id="0", - status=Status.ERROR, - time_started="2026-04-17T15:00:00.000000", - time_completed="2026-04-17T15:00:59.000000", - errors=[ - TaskError( - outcome="error", type="ValueError", message="Error during plan" + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="0", + status=CallStatus.ERROR, + time_started="2026-04-17T15:00:00.000000", + time_completed="2026-04-17T15:00:59.000000", + errors=[ + TaskError( + outcome="error", + type="ValueError", + message="Error during plan", + ) + ], + result=None, ) ], position=None, - blueapi_id="blueapi_id_0", - result=None, ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="1", instrument_session="" ), id="1", - status=Status.SUCCESS, - time_started="2026-04-17T15:01:00.000000", - time_completed="2026-04-17T15:01:59.000000", - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="1", + status=CallStatus.SUCCESS, + time_started="2026-04-17T15:01:00.000000", + time_completed="2026-04-17T15:01:59.000000", + errors=[], + result=TaskResult(result=None, type="NoneType"), + ) + ], position=None, - blueapi_id="blueapi_id_1", - result=TaskResult(result=None, type="NoneType"), ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="2", instrument_session="" ), id="2", - status=Status.IN_PROGRESS, - time_started="2026-04-17T15:02:00.000000", - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="2", + status=CallStatus.IN_PROGRESS, + time_started="2026-04-17T15:02:00.000000", + time_completed=None, + errors=[], + ) + ], position=0, - blueapi_id="blueapi_id_2", ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="3", instrument_session="" ), id="3", - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="3", + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + errors=[], + ) + ], position=1, - blueapi_id=None, ), TaskWithPosition( experiment_definition=ExperimentDefinition( plan_name="test", sample_id="4", instrument_session="" ), id="4", - status=Status.WAITING, - time_started=None, - time_completed=None, - errors=[], + blueapi_calls=[ + BlueapiCall( + task_request=TaskRequest(name="test", instrument_session=""), + parent_task_id="4", + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + errors=[], + ) + ], position=2, - blueapi_id=None, ), ] @@ -466,169 +520,234 @@ async def test_pausing_queue_prevents_task_from_being_claimed(task_queue: TaskQu await task_queue.update_state(paused=True) assert task_queue.state.paused with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for( - task_queue.claim_next_task_once_available(), timeout=0.05 - ) + await asyncio.wait_for(task_queue.get_next_call_once_available(), timeout=0.05) async def test_unpausing_queue_allows_tasks_to_being_claimed(task_queue: TaskQueue): await task_queue.update_state(paused=False) assert not task_queue.state.paused - await task_queue.claim_next_task_once_available() + await task_queue.get_next_call_once_available() -async def test_claim_next_task_once_available_claims_task_and_returns( +async def test_claim_next_call_once_available_claims_task_and_returns( task_queue: TaskQueue, ): next_task = task_queue._tasks[task_queue._queue[0]] - assert next_task and next_task.status == Status.WAITING + next_call = next_task.blueapi_calls[0] + assert next_task and next_task.status == Status.QUEUED + assert next_call.status == CallStatus.WAITING - claimed_task = await task_queue.claim_next_task_once_available() - assert claimed_task is next_task - assert claimed_task.status == Status.CLAIMED + claimed_call = await task_queue.get_next_call_once_available() + assert claimed_call.parent_task_id == next_call.parent_task_id + assert claimed_call.status == CallStatus.CLAIMED -async def test_claim_next_task_once_available_waits_if_next_task_is_already_claimed( +async def test_claim_next_call_once_available_waits_if_next_task_is_already_claimed( task_queue: TaskQueue, ): - claimed_task = await task_queue.claim_next_task_once_available() - assert claimed_task and claimed_task.status == Status.CLAIMED + claimed_call = await task_queue.get_next_call_once_available() + assert claimed_call and claimed_call.status == CallStatus.CLAIMED with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for( - task_queue.claim_next_task_once_available(), timeout=0.05 - ) + await asyncio.wait_for(task_queue.get_next_call_once_available(), timeout=0.05) -async def test_claim_next_task_once_available_waits_if_next_task_is_already_in_progress( +async def test_claim_next_call_once_available_waits_if_next_task_is_already_in_progress( task_queue: TaskQueue, ): - claimed_task = await task_queue.claim_next_task_once_available() - claimed_task.blueapi_id = "blueapi_id" - claimed_task.put_in_progress() - assert claimed_task and claimed_task.status == Status.IN_PROGRESS + claimed_call = await task_queue.get_next_call_once_available() + claimed_call.put_in_progress() + assert claimed_call and claimed_call.status == CallStatus.IN_PROGRESS with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for( - task_queue.claim_next_task_once_available(), timeout=0.05 - ) + await asyncio.wait_for(task_queue.get_next_call_once_available(), timeout=0.05) -async def test_wait_until_task_available_waits_if_next_task_is_claimed( +async def test_wait_until_call_available_waits_if_next_task_is_claimed( task_queue: TaskQueue, ): - claimed_task = await task_queue.claim_next_task_once_available() - assert claimed_task and claimed_task.status == Status.CLAIMED + claimed_call = await task_queue.get_next_call_once_available() + assert claimed_call and claimed_call.status == CallStatus.CLAIMED with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_wait_until_task_available_waits_if_next_task_is_in_progress( +async def test_wait_until_call_available_waits_if_next_task_is_in_progress( task_queue: TaskQueue, ): - claimed_task = await task_queue.claim_next_task_once_available() - claimed_task.blueapi_id = "blueapi_id" - claimed_task.put_in_progress() - assert claimed_task and claimed_task.status == Status.IN_PROGRESS + claimed_call = await task_queue.get_next_call_once_available() + claimed_call.put_in_progress() + assert claimed_call and claimed_call.status == CallStatus.IN_PROGRESS with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_wait_until_task_available_waits_if_queue_paused( +async def test_wait_until_call_available_waits_if_queue_paused( task_queue: TaskQueue, ): await task_queue.update_state(paused=True) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_wait_until_task_available_waits_if_queue_empty(): - task_queue = TaskQueue() +async def test_wait_until_call_available_waits_if_queue_empty(): + task_queue = TaskQueue(construct_blueapi_call_list) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_wait_until_task_available_does_not_wait_if_conditions_met( +async def test_wait_until_call_available_does_not_wait_if_conditions_met( task_queue: TaskQueue, ): - await asyncio.wait_for(task_queue.wait_until_task_available(), timeout=0.05) + await asyncio.wait_for(task_queue.wait_until_call_available(), timeout=0.05) -async def test_complete_task_puts_task_in_history_and_updates_status_to_complete( +async def test_complete_call_puts_call_in_history_and_updates_status_to_complete( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - task.blueapi_id = "blueapi_id" - task.put_in_progress() - assert task.status == Status.IN_PROGRESS - await task_queue.complete_task(task, TaskResult(result=None, type="NoneType")) - assert task.id not in task_queue._queue - assert task.id in task_queue._history - assert task.status == Status.SUCCESS + call = await task_queue.get_next_call_once_available() + call.put_in_progress() + assert call.status == CallStatus.IN_PROGRESS + await task_queue.complete_call(call, TaskResult(result=None, type="NoneType")) + assert call.parent_task_id not in task_queue._queue + assert call.parent_task_id in task_queue._history + assert call not in task_queue._call_queue + assert call in task_queue._call_history + assert call.status == CallStatus.SUCCESS -async def test_complete_task_must_receive_exact_same_object_as_was_claimed( +async def test_complete_call_must_receive_exact_same_object_as_was_claimed( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - similar_task = task.model_copy() - another_similar_task = copy.copy(task) + call = await task_queue.get_next_call_once_available() + similar_call = call.model_copy() + another_similar_call = copy.copy(call) with pytest.raises(AssertionError): - await task_queue.complete_task( - similar_task, TaskResult(result=None, type="NoneType") + await task_queue.complete_call( + similar_call, TaskResult(result=None, type="NoneType") ) with pytest.raises(AssertionError): - await task_queue.complete_task( - another_similar_task, TaskResult(result=None, type="NoneType") + await task_queue.complete_call( + another_similar_call, TaskResult(result=None, type="NoneType") ) -async def test_fail_task_puts_task_in_history_and_updates_status_to_complete( +async def test_fail_call_puts_task_in_history_and_updates_status_to_complete( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - assert task.status == Status.CLAIMED - await task_queue.fail_task(task) - assert task.id not in task_queue._queue - assert task.id in task_queue._history - assert task.status == Status.ERROR + call = await task_queue.get_next_call_once_available() + assert call.status == CallStatus.CLAIMED + await task_queue.fail_call(call) + assert call.parent_task_id not in task_queue._queue + assert call.parent_task_id in task_queue._history + assert call not in task_queue._call_queue + assert call in task_queue._call_history + assert call.status == CallStatus.ERROR -async def test_fail_task_must_receive_exact_same_object_as_was_claimed( +async def test_fail_call_must_receive_exact_same_object_as_was_claimed( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - similar_task = task.model_copy() - another_similar_task = copy.copy(task) + call = await task_queue.get_next_call_once_available() + similar_call = call.model_copy() + another_similar_call = copy.copy(call) with pytest.raises(AssertionError): - await task_queue.fail_task(similar_task) + await task_queue.fail_call(similar_call) with pytest.raises(AssertionError): - await task_queue.fail_task(another_similar_task) + await task_queue.fail_call(another_similar_call) -async def test_fail_task_with_errors_adds_errors_to_task( +async def test_fail_call_with_errors_adds_errors_to_call( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() + call = await task_queue.get_next_call_once_available() error = "This task failed" - await task_queue.fail_task(task, [str(error)]) - assert task.status == Status.ERROR - assert task.errors == ["This task failed"] - assert task.id in task_queue._history - assert task.id not in task_queue._queue + await task_queue.fail_call(call, [str(error)]) + assert call.status == CallStatus.ERROR + assert call.errors == ["This task failed"] -async def test_return_task_to_queue_changes_task_status_to_waiting( +async def test_return_call_to_queue_changes_task_status_to_waiting( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - assert task.status == Status.CLAIMED - await task_queue.return_task_to_queue(task) - assert task.status == Status.WAITING + call = await task_queue.get_next_call_once_available() + assert call.status == CallStatus.CLAIMED + await task_queue.return_call_to_queue(call) + assert call.status == CallStatus.WAITING async def test_return_task_to_queue_raises_error_if_task_has_not_been_claimed( task_queue: TaskQueue, ): - task = await task_queue.claim_next_task_once_available() - task.status = Status.SUCCESS + call = await task_queue.get_next_call_once_available() + call.status = CallStatus.SUCCESS with pytest.raises(TaskNotClaimedError): - await task_queue.return_task_to_queue(task) + await task_queue.return_call_to_queue(call) + + +async def test_get_call_queue_returns_calls_in_call_queue( + task_queue_with_history: TaskQueue, +): + result = await task_queue_with_history.get_call_queue() + assert result == [ + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="2", + status=CallStatus.IN_PROGRESS, + time_started="2026-04-17T15:02:00.000000", + time_completed=None, + result=None, + errors=[], + blueapi_id=None, + ), + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="3", + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + result=None, + errors=[], + blueapi_id=None, + ), + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="4", + status=CallStatus.WAITING, + time_started=None, + time_completed=None, + result=None, + errors=[], + blueapi_id=None, + ), + ] + + +async def test_get_call_history_returns_calls_in_call_history( + task_queue_with_history: TaskQueue, +): + result = await task_queue_with_history.get_call_history() + assert result == [ + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="0", + status=CallStatus.ERROR, + time_started="2026-04-17T15:00:00.000000", + time_completed="2026-04-17T15:00:59.000000", + result=None, + errors=[ + TaskError( + outcome="error", type="ValueError", message="Error during plan" + ) + ], + blueapi_id=None, + ), + BlueapiCall( + task_request=TaskRequest(name="test", params={}, instrument_session=""), + parent_task_id="1", + status=CallStatus.SUCCESS, + time_started="2026-04-17T15:01:00.000000", + time_completed="2026-04-17T15:01:59.000000", + result=TaskResult(outcome="success", result=None, type="NoneType"), + errors=[], + blueapi_id=None, + ), + ] diff --git a/tests/unit_tests/test_task.py b/tests/unit_tests/test_task.py index 4c7c70c..a55db12 100644 --- a/tests/unit_tests/test_task.py +++ b/tests/unit_tests/test_task.py @@ -1,148 +1,42 @@ import pytest -from blueapi.worker.event import TaskResult +from blueapi.service.model import TaskRequest +from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus from daq_queuing_service.task import ExperimentDefinition, Status, Task @pytest.mark.parametrize( - "old_status, new_status", + "blueapi_call_statuses, expected_task_status", [ - [Status.WAITING, Status.WAITING], - [Status.WAITING, Status.IN_PROGRESS], - [Status.WAITING, Status.SUCCESS], - [Status.WAITING, Status.ERROR], - [Status.CLAIMED, Status.CLAIMED], - [Status.CLAIMED, Status.SUCCESS], - [Status.CLAIMED, Status.CANCELLED], - [Status.IN_PROGRESS, Status.WAITING], - [Status.IN_PROGRESS, Status.CLAIMED], - [Status.IN_PROGRESS, Status.IN_PROGRESS], - [Status.IN_PROGRESS, Status.CANCELLED], - [Status.SUCCESS, Status.WAITING], - [Status.SUCCESS, Status.CLAIMED], - [Status.SUCCESS, Status.IN_PROGRESS], - [Status.SUCCESS, Status.SUCCESS], - [Status.SUCCESS, Status.ERROR], - [Status.SUCCESS, Status.CANCELLED], - [Status.ERROR, Status.WAITING], - [Status.ERROR, Status.CLAIMED], - [Status.ERROR, Status.IN_PROGRESS], - [Status.ERROR, Status.SUCCESS], - [Status.ERROR, Status.ERROR], - [Status.ERROR, Status.CANCELLED], - [Status.CANCELLED, Status.WAITING], - [Status.CANCELLED, Status.CLAIMED], - [Status.CANCELLED, Status.IN_PROGRESS], - [Status.CANCELLED, Status.SUCCESS], - [Status.CANCELLED, Status.ERROR], - [Status.CANCELLED, Status.CANCELLED], + ([CallStatus.WAITING], Status.QUEUED), + ([CallStatus.CLAIMED], Status.IN_PROGRESS), + ([CallStatus.IN_PROGRESS], Status.IN_PROGRESS), + ([CallStatus.SUCCESS], Status.COMPLETE), + ([CallStatus.ERROR], Status.COMPLETE), + ([CallStatus.WAITING, CallStatus.WAITING], Status.QUEUED), + ([CallStatus.WAITING, CallStatus.CLAIMED], Status.IN_PROGRESS), + ([CallStatus.WAITING, CallStatus.IN_PROGRESS], Status.IN_PROGRESS), + ([CallStatus.WAITING, CallStatus.SUCCESS], Status.IN_PROGRESS), + ([CallStatus.WAITING, CallStatus.ERROR], Status.IN_PROGRESS), + ([CallStatus.CLAIMED, CallStatus.SUCCESS], Status.IN_PROGRESS), + ([CallStatus.CLAIMED, CallStatus.ERROR], Status.IN_PROGRESS), + ([CallStatus.IN_PROGRESS, CallStatus.SUCCESS], Status.IN_PROGRESS), + ([CallStatus.IN_PROGRESS, CallStatus.ERROR], Status.IN_PROGRESS), + ([CallStatus.SUCCESS, CallStatus.ERROR], Status.COMPLETE), ], ) -def test__update_status_raises_error_when_transitioned_to_wrong_status( - old_status: Status, new_status: Status +def test_task_status_derived_correctly_from_call_statuses( + blueapi_call_statuses: list[CallStatus], expected_task_status: Status ): task = Task( + blueapi_calls=[ + BlueapiCall( + status=status, task_request=TaskRequest(name="", instrument_session="") + ) + for status in blueapi_call_statuses + ], experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" + plan_name="", sample_id="", instrument_session="" ), - status=old_status, ) - with pytest.raises(ValueError): - task._update_status(new_status) - - -@pytest.mark.parametrize( - "old_status, new_status", - [ - [Status.WAITING, Status.CLAIMED], - [Status.WAITING, Status.CANCELLED], - [Status.CLAIMED, Status.WAITING], - [Status.CLAIMED, Status.IN_PROGRESS], - [Status.CLAIMED, Status.ERROR], - [Status.IN_PROGRESS, Status.SUCCESS], - [Status.IN_PROGRESS, Status.ERROR], - ], -) -def test__update_status_changes_status_when_correct_new_status_given( - old_status: Status, new_status: Status -): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=old_status, - ) - task._update_status(new_status) - assert task.status == new_status - - -def test_wait_updates_status_to_waiting(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.CLAIMED, - ) - task.wait() - assert task.status == Status.WAITING - - -def test_claim_updates_status_to_claimed(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.WAITING, - ) - task.claim() - assert task.status == Status.CLAIMED - - -def test_put_in_progress_updates_status_to_in_progress_and_adds_fields(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.CLAIMED, - ) - task.blueapi_id = "blueapi_id" - task.put_in_progress() - assert task.status == Status.IN_PROGRESS - assert task.time_started is not None - assert task.blueapi_id == "blueapi_id" - - -def test_succeed_updates_status_to_success_and_adds_time_completed(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.IN_PROGRESS, - ) - task.succeed(TaskResult(result=None, type="NoneType")) - assert task.status == Status.SUCCESS - assert task.time_completed is not None - - -def test_fail_updates_status_to_error_and_adds_time_completed_and_errors(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.IN_PROGRESS, - ) - task.fail(["errors", "more_errors"]) - assert task.status == Status.ERROR - assert task.time_completed is not None - assert task.errors == ["errors", "more_errors"] - - -def test_cancel_updates_status_to_cancelled(): - task = Task( - experiment_definition=ExperimentDefinition( - plan_name="test", sample_id="sample", instrument_session="" - ), - status=Status.WAITING, - ) - task.cancel() - assert task.status == Status.CANCELLED + assert task.status == expected_task_status diff --git a/tests/unit_tests/test_worker.py b/tests/unit_tests/test_worker.py index c72d147..b8ba3e9 100644 --- a/tests/unit_tests/test_worker.py +++ b/tests/unit_tests/test_worker.py @@ -16,7 +16,11 @@ from blueapi.worker import ProgressEvent, TaskStatus, WorkerEvent, WorkerState from pytest import LogCaptureFixture -from daq_queuing_service.blueapi_adapter import BlueapiClientAdapter, BlueapiResult +from daq_queuing_service.blueapi_interaction.blueapi_adapter import ( + BlueapiClientAdapter, + BlueapiResult, +) +from daq_queuing_service.blueapi_interaction.blueapi_call import CallStatus from daq_queuing_service.task import ExperimentDefinition, Status from daq_queuing_service.task_queue.queue import TaskError, TaskQueue, TaskResult from daq_queuing_service.worker.worker import QueueWorker @@ -184,12 +188,12 @@ async def test_worker_run_loop_cycle( with pytest.raises(only_loop_once): await worker.run_loop() - assert first_task.status == Status.SUCCESS + assert first_task.status == Status.COMPLETE worker._client.get_state.assert_called_once() # type: ignore worker._client.run_task.assert_called_once() # type: ignore -async def test_when_parameter_error_then_task_failed_and_error_added_to_task( +async def test_when_parameter_error_then_call_failed_and_error_added_to_call( worker_with_parameter_error: QueueWorker, only_loop_once: type[Exception] ): @@ -199,13 +203,14 @@ async def test_when_parameter_error_then_task_failed_and_error_added_to_task( with pytest.raises(only_loop_once): await worker_with_parameter_error.run_loop() - assert first_task.status == Status.ERROR - assert first_task.errors == ["Unexpected field 'bad_param'"] + first_call = first_task.blueapi_calls[0] + assert first_call.status == CallStatus.ERROR + assert first_call.errors == ["Unexpected field 'bad_param'"] worker_with_parameter_error._client.get_state.assert_called_once() # type: ignore worker_with_parameter_error._client.run_task.assert_called_once() # type: ignore -async def test_when_plan_name_error_then_task_failed_and_error_added_to_task( +async def test_when_plan_name_error_then_call_failed_and_error_added_to_task( worker_with_unknown_plan_error: QueueWorker, only_loop_once: type[Exception] ): @@ -215,13 +220,14 @@ async def test_when_plan_name_error_then_task_failed_and_error_added_to_task( with pytest.raises(only_loop_once): await worker_with_unknown_plan_error.run_loop() - assert first_task.status == Status.ERROR - assert first_task.errors == ["Unknown plan", ""] + first_call = first_task.blueapi_calls[0] + assert first_call.status == CallStatus.ERROR + assert first_call.errors == ["Unknown plan", ""] worker_with_unknown_plan_error._client.get_state.assert_called_once() # type: ignore worker_with_unknown_plan_error._client.run_task.assert_called_once() # type: ignore -async def test_when_blueapi_error_then_task_put_back_into_queue( +async def test_when_blueapi_error_then_call_put_back_into_queue( worker_with_blueapi_error: QueueWorker, only_loop_once: type[Exception] ): @@ -231,7 +237,9 @@ async def test_when_blueapi_error_then_task_put_back_into_queue( with pytest.raises(only_loop_once): await worker_with_blueapi_error.run_loop() - assert first_task.status == Status.WAITING + first_call = first_task.blueapi_calls[0] + assert first_call.status == CallStatus.WAITING + assert first_task.status == Status.QUEUED worker_with_blueapi_error._client.get_state.assert_called_once() # type: ignore worker_with_blueapi_error._client.run_task.assert_called_once() # type: ignore assert not await worker_with_blueapi_error._queue.get_history() @@ -247,8 +255,9 @@ async def test_when_plan_error_then_task_failed_and_errors_added( with pytest.raises(only_loop_once): await worker_with_plan_error.run_loop() - assert first_task.status == Status.ERROR - assert first_task.errors == [ + first_call = first_task.blueapi_calls[0] + assert first_call.status == CallStatus.ERROR + assert first_call.errors == [ TaskError(outcome="error", type="ValueError", message="Error during plan") ] worker_with_plan_error._client.get_state.assert_called_once() # type: ignore @@ -282,7 +291,7 @@ async def test__wait_for_next_task_waits_for_queue_ready_to_give_task( worker: QueueWorker, ): queue = worker._queue - queue._tasks[queue._queue[0]].status = Status.IN_PROGRESS + queue._tasks[queue._queue[0]].blueapi_calls[0].status = CallStatus.IN_PROGRESS with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(worker._wait_for_next_task(), timeout=0.05)