Skip to content
8 changes: 8 additions & 0 deletions src/daq_queuing_service/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,12 @@ async def get_completed_tasks(
async def clear_history():
return await queue.clear_history()

@router.get("/call_queue")
async def get_call_queue():
return await queue.get_call_queue()

@router.get("/call_history")
async def get_call_history():
return await queue.get_call_history()

return router
2 changes: 1 addition & 1 deletion src/daq_queuing_service/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def service_unavailable_error(
request: Request, exception: ServiceUnavailableError
):
return JSONResponse(
status_code=409,
status_code=404,
content={"error": "blueapi_unavailable", "message": str(exception)},
)

Expand Down
5 changes: 3 additions & 2 deletions src/daq_queuing_service/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

from daq_queuing_service.api.api import create_api_router
from daq_queuing_service.api.errors import register_exception_handlers
from daq_queuing_service.blueapi_adapter import BlueapiClientAdapter
from daq_queuing_service.blueapi_interaction.blueapi_adapter import BlueapiClientAdapter
from daq_queuing_service.plugins.construct_task_request import (
construct_blueapi_call_list,
construct_blueapi_task_request,
)
from daq_queuing_service.task_queue.queue import TaskQueue
Expand Down Expand Up @@ -47,7 +48,7 @@ def log_task_exception(task: asyncio.Task[NoReturn]):
config = load_config()
app = FastAPI(lifespan=lifespan)

app.state.queue = TaskQueue()
app.state.queue = TaskQueue(construct_blueapi_call_list)

blueapi_rest_client = BlueapiRestClient(config=config.blueapi.api)
blueapi_client = BlueapiClient.from_config(config.blueapi)
Expand Down
Empty file.
97 changes: 97 additions & 0 deletions src/daq_queuing_service/blueapi_interaction/blueapi_call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from datetime import datetime
from enum import StrEnum

from blueapi.service.model import TaskRequest
from blueapi.worker.event import TaskError, TaskResult
from pydantic import BaseModel, Field


class CallStatus(StrEnum):
WAITING = "Waiting" # Waiting in the queue
CLAIMED = "Claimed" # Claimed by the worker
IN_PROGRESS = "In progress" # In progress inside BlueAPI
SUCCESS = "Success" # Completed successfully
ERROR = "Error" # Error while trying to run

@property
def allowed_transitions(self):
allowed_transitions: dict[CallStatus, set[CallStatus]] = { # from: to
CallStatus.WAITING: {CallStatus.CLAIMED},
CallStatus.CLAIMED: {
CallStatus.WAITING,
CallStatus.IN_PROGRESS,
CallStatus.ERROR,
},
CallStatus.IN_PROGRESS: {CallStatus.SUCCESS, CallStatus.ERROR},
CallStatus.SUCCESS: set(),
CallStatus.ERROR: set(),
}
return allowed_transitions[self]


class BlueapiCall(BaseModel):
task_request: TaskRequest
parent_task_id: str | None = None
status: CallStatus = CallStatus.WAITING
time_started: str | None = None
time_completed: str | None = None
result: TaskResult | None = None
errors: list[str | TaskError] = Field(default_factory=list[str | TaskError])
blueapi_id: str | None = None

def _update_status(self, new_status: CallStatus):
"""Updates the status of the task, checking that the transition is valid

Args:
new_status (Status): New status of the task

Raises:
ValueError: Raised if the transition from the task's current status to the
new status is not permitted.
"""
allowed = self.status.allowed_transitions
if new_status not in allowed:
raise ValueError(
f"Can't go from current state '{self.status}' to '{new_status}'. "
+ f"Allowed transitions from {self.status}: {allowed}."
)
self.status = new_status

def wait(self):
"""Updates the task status to WAITING"""
self._update_status(CallStatus.WAITING)

def claim(self):
"""Updates the task status to CLAIMED"""
self._update_status(CallStatus.CLAIMED)

def put_in_progress(self):
"""Updates the task status to IN_PROGRESS and sets the time_started field to the
current time
"""
self._update_status(CallStatus.IN_PROGRESS)
self.time_started = datetime.now().isoformat()

def succeed(self, result: TaskResult):
"""Updates the task status to SUCCESS, sets the time_completed field to the
current time, and sets the result field with the result from blueapi

Args:
result (TaskResult): The result of the task from blueapi
"""
self._update_status(CallStatus.SUCCESS)
self.result = result
self.time_completed = datetime.now().isoformat()

def fail(self, errors: list[str | TaskError] | None = None):
"""Updates the task status to ERROR, sets the time_completed field to the
current time, and adds any errors to the errors field.

Args:
errors (list[str | TaskError] | None, optional): List of errors that
occurred when trying to run the task. Defaults to None.
"""
self._update_status(CallStatus.ERROR)
self.time_completed = datetime.now().isoformat()
if errors:
self.errors.extend(errors)
19 changes: 18 additions & 1 deletion src/daq_queuing_service/plugins/construct_task_request.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from blueapi.service.model import TaskRequest

from daq_queuing_service.task import ExperimentDefinition
from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall
from daq_queuing_service.task import ExperimentDefinition, TaskWithPosition


def construct_blueapi_task_request(
Expand All @@ -11,3 +12,19 @@ def construct_blueapi_task_request(
params=experiment_definition.params,
instrument_session=experiment_definition.instrument_session,
)


def construct_blueapi_call_list(
queue: list[TaskWithPosition],
history: list[TaskWithPosition],
call_history: list[BlueapiCall],
) -> list[BlueapiCall]:

call_list = [
BlueapiCall(
parent_task_id=task.id,
task_request=construct_blueapi_task_request(task.experiment_definition),
)
for task in queue
]
return call_list
125 changes: 36 additions & 89 deletions src/daq_queuing_service/task.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,22 @@
from collections.abc import Mapping
from datetime import datetime
from enum import StrEnum
from typing import Any, Self
from uuid import uuid4

from blueapi.worker.event import TaskError, TaskResult
from pydantic import BaseModel, Field
from blueapi.service.model import StrEnum
from pydantic import BaseModel, Field, computed_field

from daq_queuing_service.blueapi_interaction.blueapi_call import BlueapiCall, CallStatus

def _create_uuid_str() -> str:

def create_uuid_str() -> str:
return str(uuid4())


class Status(StrEnum):
WAITING = "Waiting" # Waiting in the queue
CLAIMED = "Claimed" # Claimed by the worker
IN_PROGRESS = "In progress" # In progress inside BlueAPI
SUCCESS = "Success" # Completed successfully
ERROR = "Error" # Error while trying to run
CANCELLED = "Cancelled" # Cancelled before being run

@property
def allowed_transitions(self):
allowed_transitions: dict[Status, set[Status]] = { # from: to
Status.WAITING: {Status.CLAIMED, Status.CANCELLED},
Status.CLAIMED: {Status.WAITING, Status.IN_PROGRESS, Status.ERROR},
Status.IN_PROGRESS: {Status.SUCCESS, Status.ERROR},
Status.SUCCESS: set(),
Status.ERROR: set(),
Status.CANCELLED: set(),
}
return allowed_transitions[self]
QUEUED = "Queued"
IN_PROGRESS = "In progress"
COMPLETE = "Complete"
CANCELLED = "Cancelled"


class ExperimentDefinition(BaseModel):
Expand All @@ -44,74 +30,35 @@ class ExperimentDefinition(BaseModel):

class Task(BaseModel):
experiment_definition: ExperimentDefinition
id: str = Field(default_factory=_create_uuid_str)
status: Status = Status.WAITING
time_started: str | None = None
time_completed: str | None = None
errors: list[str | TaskError] = Field(default_factory=list[str | TaskError])
result: TaskResult | None = None
blueapi_id: str | None = None

def _update_status(self, new_status: Status):
"""Updates the status of the task, checking that the transition is valid

Args:
new_status (Status): New status of the task

Raises:
ValueError: Raised if the transition from the task's current status to the
new status is not permitted.
"""
allowed = self.status.allowed_transitions
if new_status not in allowed:
raise ValueError(
f"Can't go from current state '{self.status}' to '{new_status}'. "
+ f"Allowed transitions from {self.status}: {allowed}."
)
self.status = new_status

def wait(self):
"""Updates the task status to WAITING"""
self._update_status(Status.WAITING)

def claim(self):
"""Updates the task status to CLAIMED"""
self._update_status(Status.CLAIMED)

def put_in_progress(self):
"""Updates the task status to IN_PROGRESS and sets the time_started field to the
current time
"""
self._update_status(Status.IN_PROGRESS)
self.time_started = datetime.now().isoformat()

def succeed(self, result: TaskResult):
"""Updates the task status to SUCCESS, sets the time_completed field to the
current time, and sets the result field with the result from blueapi

Args:
result (TaskResult): The result of the task from blueapi
"""
self._update_status(Status.SUCCESS)
self.result = result
self.time_completed = datetime.now().isoformat()

def fail(self, errors: list[str | TaskError] | None = None):
"""Updates the task status to ERROR, sets the time_completed field to the
current time, and adds any errors to the errors field.

Args:
errors (list[str | TaskError] | None, optional): List of errors that
occurred when trying to run the task. Defaults to None.
"""
self._update_status(Status.ERROR)
self.time_completed = datetime.now().isoformat()
if errors:
self.errors.extend(errors)
id: str = Field(default_factory=create_uuid_str)
blueapi_calls: list[BlueapiCall] = []
_cancelled: bool = False

def cancel(self):
"""Sets the task status to CANCELLED"""
self._update_status(Status.CANCELLED)
self._cancelled = True

@computed_field
@property
def status(self) -> Status:
if self._cancelled:
return Status.CANCELLED
if self.blueapi_calls and all(
call.status in [CallStatus.SUCCESS, CallStatus.ERROR]
for call in self.blueapi_calls
):
return Status.COMPLETE
if any(
call.status
in [
Status.IN_PROGRESS,
CallStatus.CLAIMED,
CallStatus.SUCCESS,
CallStatus.ERROR,
]
for call in self.blueapi_calls
):
return Status.IN_PROGRESS
return Status.QUEUED


class TaskWithPosition(Task):
Expand Down
Loading
Loading