Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 43 additions & 207 deletions src/drunc/controller/children_interface/child_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
from abc import ABC, abstractmethod

from druncschema.controller_pb2 import (
AddressedCommand,
Expand All @@ -7,21 +7,15 @@
ExecuteExpertCommandResponse,
ExecuteFSMCommandResponse,
FSMCommand,
Status,
IncludeExcludeResponse,
StatusResponse,
)
from druncschema.description_pb2 import Description
from druncschema.request_response_pb2 import Response, ResponseFlag
from druncschema.request_response_pb2 import Response
from druncschema.token_pb2 import Token

from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful
from drunc.controller.utils import get_detector_name
from drunc.exceptions import DruncSetupException
from drunc.utils.configuration import ConfTypes
from drunc.utils.utils import (
ControlType,
get_control_type_and_uri_from_cli,
get_control_type_and_uri_from_connectivity_service,
get_logger,
)

Expand All @@ -31,263 +25,105 @@ def __init__(self, t, name):
super().__init__(f"The type {t} is not supported for the ChildNode {name}")


class ChildNode:
def __init__(
self,
name: str,
configuration,
node_type: ControlType,
**kwargs,
) -> None:
self.node_type = node_type
class ChildNode(ABC):
def __init__(self, name: str, node_type: ControlType):
self.log = get_logger(f"controller.{name}-child-node")
self.name = name
self.configuration = configuration
self.node_type = node_type
self.included = True

@abstractmethod
def __str__(self) -> str:
return f"'{self.name}' (type {self.node_type})"
pass

@abstractmethod
def get_endpoint(self) -> str:
return ""
pass

def terminate(self):
self.log.info(f"Terminating {self.name}")
@abstractmethod
def terminate(self) -> None:
pass

@abstractmethod
def propagate_command(
self,
command: str,
request: AddressedCommand,
token: Token | None,
) -> Response:
return Response(
name=self.name,
token=token,
flag=ResponseFlag.NOT_EXECUTED_NOT_READY,
)
pass

@abstractmethod
def status(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> StatusResponse:
status = Status(
state="unknown",
sub_state="unknown",
in_error=False,
included=True,
)

response = StatusResponse(
token=None,
name=self.name,
status=status,
flag=ResponseFlag.NOT_EXECUTED_NOT_READY,
)

return response
pass

@abstractmethod
def describe(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> DescribeResponse:
descriptionType = None
descriptionName = None

if self.configuration is not None:
if hasattr(
self.configuration.data, "application_name"
): # Get the application name and type
descriptionType = self.configuration.data.application_name
descriptionName = self.configuration.data.id
elif hasattr(self.configuration.data, "controller") and hasattr(
self.configuration.data.controller, "application_name"
): # Get the controller name and type
descriptionType = self.configuration.data.controller.application_name
descriptionName = self.configuration.data.controller.id

description = Description(
type=descriptionType,
name=descriptionName,
endpoint=self.get_endpoint(),
info=(
get_detector_name(self.configuration)
if self.configuration is not None
else None
),
session=os.getenv("DUNEDAQ_SESSION"),
commands=None,
broadcast=None,
flag=ResponseFlag.EXECUTED_SUCCESSFULLY,
)

response = DescribeResponse(
token=None,
name=self.name,
description=description,
flag=ResponseFlag.EXECUTED_SUCCESSFULLY,
)

return response
pass

@abstractmethod
def describe_fsm(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
key: str = "",
) -> DescribeFSMResponse:
response = DescribeFSMResponse(
token=None,
name=self.name,
flag=ResponseFlag.NOT_EXECUTED_NOT_READY,
)

return response
pass

@abstractmethod
def execute_fsm_command(
self,
command: FSMCommand,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> ExecuteFSMCommandResponse:
response = ExecuteFSMCommandResponse(
token=None,
name=self.name,
command_name=command.command_name,
flag=ResponseFlag.NOT_EXECUTED_NOT_READY,
)

return response
pass

@abstractmethod
def execute_expert_command(
self,
json_string: str,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> ExecuteExpertCommandResponse:
response = ExecuteExpertCommandResponse(
token=None,
name=self.name,
flag=ResponseFlag.NOT_EXECUTED_NOT_READY,
)
pass

@abstractmethod
def include(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> IncludeExcludeResponse:
pass

return response
@abstractmethod
def exclude(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> IncludeExcludeResponse:
pass

@abstractmethod
def recompute_status(
self,
target: str = "",
execute_along_path: bool = True,
execute_on_all_subsequent_children_in_path: bool = True,
) -> StatusResponse:
return self.status(
target=target,
execute_along_path=execute_along_path,
execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path,
)

# TODO: needs reimplementation
@staticmethod
def get_child(
name: str,
cli,
configuration,
init_token=None,
connectivity_service=None,
timeout=60,
**kwargs,
):
log = get_logger("controller.child_node")
ctype = ControlType.Unknown
uri = None
node_in_error = False

if connectivity_service:
try:
ctype, uri = get_control_type_and_uri_from_connectivity_service(
connectivity_service, name, timeout=timeout
)
except ApplicationLookupUnsuccessful as alu:
log.error(
f"Could not find the application '{name}' in the connectivity service: {alu}"
)

if ctype == ControlType.Unknown:
try:
ctype, uri = get_control_type_and_uri_from_cli(cli)
except DruncSetupException as e:
log.error(
f"Could not understand how to talk to the application '{name}' from its CLI: {e}"
)

address = None
port = 0
if uri is not None:
try:
address, port = uri.split(":")
port = int(port)
except ValueError as e:
log.debug(f"Could not split the URI {uri} into address and port: {e}")

if ctype == ControlType.Unknown or address is None or port == 0:
log.error(f"Could not understand how to talk to '{name}'")
node_in_error = True
ctype = ControlType.Direct

log.info(f"Child {name} is of type {ctype} and has the URI {uri}")

match ctype:
case ControlType.gRPC:
from drunc.controller.children_interface.grpc_child import (
gRCPChildConfHandler,
gRPCChildNode,
)

return gRPCChildNode(
configuration=gRCPChildConfHandler(
configuration, ConfTypes.PyObject
),
init_token=init_token,
name=name,
uri=uri,
connectivity_service=connectivity_service,
**kwargs,
)

case ControlType.REST_API:
from drunc.controller.children_interface.rest_api_child import (
RESTAPIChildNode,
RESTAPIChildNodeConfHandler,
)

return RESTAPIChildNode(
configuration=RESTAPIChildNodeConfHandler(
configuration, ConfTypes.PyObject
),
name=name,
uri=uri,
# init_token = init_token, # No authentication for RESTAPI
**kwargs,
)

case ControlType.Direct:
from drunc.controller.children_interface.client_side_child import (
ClientSideChild,
)

node = ClientSideChild(
name=name,
**kwargs,
)
if node_in_error:
node.state.to_error()
return node

case _:
raise ChildInterfaceTechnologyUnknown(ctype, name)
pass
Loading
Loading