diff --git a/src/drunc/controller/children_interface/child_node.py b/src/drunc/controller/children_interface/child_node.py index 552f86417..51431adc9 100644 --- a/src/drunc/controller/children_interface/child_node.py +++ b/src/drunc/controller/children_interface/child_node.py @@ -1,4 +1,4 @@ -import os +from abc import ABC, abstractmethod from druncschema.controller_pb2 import ( AddressedCommand, @@ -7,21 +7,15 @@ ExecuteExpertCommandResponse, ExecuteFSMCommandResponse, FSMCommand, - Status, + IncludeExcludeResponse, StatusResponse, ) -from druncschema.description_pb2 import Description -from druncschema.request_response_pb2 import Response, ResponseFlag +from druncschema.request_response_pb2 import Response from druncschema.token_pb2 import Token -from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful -from drunc.controller.utils import get_detector_name from drunc.exceptions import DruncSetupException -from drunc.utils.configuration import ConfTypes from drunc.utils.utils import ( ControlType, - get_control_type_and_uri_from_cli, - get_control_type_and_uri_from_connectivity_service, get_logger, ) @@ -31,109 +25,53 @@ def __init__(self, t, name): super().__init__(f"The type {t} is not supported for the ChildNode {name}") -class ChildNode: - def __init__( - self, - name: str, - configuration, - node_type: ControlType, - **kwargs, - ) -> None: - self.node_type = node_type +class ChildNode(ABC): + def __init__(self, name: str, node_type: ControlType): self.log = get_logger(f"controller.{name}-child-node") self.name = name - self.configuration = configuration + self.node_type = node_type self.included = True + @abstractmethod def __str__(self) -> str: - return f"'{self.name}' (type {self.node_type})" + pass + @abstractmethod def get_endpoint(self) -> str: - return "" + pass - def terminate(self): - self.log.info(f"Terminating {self.name}") + @abstractmethod + def terminate(self) -> None: pass + @abstractmethod def propagate_command( self, command: str, request: AddressedCommand, token: Token | None, ) -> Response: - return Response( - name=self.name, - token=token, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) + pass + @abstractmethod def status( self, target: str = "", execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> StatusResponse: - status = Status( - state="unknown", - sub_state="unknown", - in_error=False, - included=True, - ) - - response = StatusResponse( - token=None, - name=self.name, - status=status, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) - - return response + pass + @abstractmethod def describe( self, target: str = "", execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> DescribeResponse: - descriptionType = None - descriptionName = None - - if self.configuration is not None: - if hasattr( - self.configuration.data, "application_name" - ): # Get the application name and type - descriptionType = self.configuration.data.application_name - descriptionName = self.configuration.data.id - elif hasattr(self.configuration.data, "controller") and hasattr( - self.configuration.data.controller, "application_name" - ): # Get the controller name and type - descriptionType = self.configuration.data.controller.application_name - descriptionName = self.configuration.data.controller.id - - description = Description( - type=descriptionType, - name=descriptionName, - endpoint=self.get_endpoint(), - info=( - get_detector_name(self.configuration) - if self.configuration is not None - else None - ), - session=os.getenv("DUNEDAQ_SESSION"), - commands=None, - broadcast=None, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - response = DescribeResponse( - token=None, - name=self.name, - description=description, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - return response + pass + @abstractmethod def describe_fsm( self, target: str = "", @@ -141,14 +79,9 @@ def describe_fsm( execute_on_all_subsequent_children_in_path: bool = True, key: str = "", ) -> DescribeFSMResponse: - response = DescribeFSMResponse( - token=None, - name=self.name, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) - - return response + pass + @abstractmethod def execute_fsm_command( self, command: FSMCommand, @@ -156,15 +89,9 @@ def execute_fsm_command( execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> ExecuteFSMCommandResponse: - response = ExecuteFSMCommandResponse( - token=None, - name=self.name, - command_name=command.command_name, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) - - return response + pass + @abstractmethod def execute_expert_command( self, json_string: str, @@ -172,122 +99,31 @@ def execute_expert_command( execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> ExecuteExpertCommandResponse: - response = ExecuteExpertCommandResponse( - token=None, - name=self.name, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) + pass + + @abstractmethod + def include( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + pass - return response + @abstractmethod + def exclude( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + pass + @abstractmethod def recompute_status( self, target: str = "", execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> StatusResponse: - return self.status( - target=target, - execute_along_path=execute_along_path, - execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, - ) - - # TODO: needs reimplementation - @staticmethod - def get_child( - name: str, - cli, - configuration, - init_token=None, - connectivity_service=None, - timeout=60, - **kwargs, - ): - log = get_logger("controller.child_node") - ctype = ControlType.Unknown - uri = None - node_in_error = False - - if connectivity_service: - try: - ctype, uri = get_control_type_and_uri_from_connectivity_service( - connectivity_service, name, timeout=timeout - ) - except ApplicationLookupUnsuccessful as alu: - log.error( - f"Could not find the application '{name}' in the connectivity service: {alu}" - ) - - if ctype == ControlType.Unknown: - try: - ctype, uri = get_control_type_and_uri_from_cli(cli) - except DruncSetupException as e: - log.error( - f"Could not understand how to talk to the application '{name}' from its CLI: {e}" - ) - - address = None - port = 0 - if uri is not None: - try: - address, port = uri.split(":") - port = int(port) - except ValueError as e: - log.debug(f"Could not split the URI {uri} into address and port: {e}") - - if ctype == ControlType.Unknown or address is None or port == 0: - log.error(f"Could not understand how to talk to '{name}'") - node_in_error = True - ctype = ControlType.Direct - - log.info(f"Child {name} is of type {ctype} and has the URI {uri}") - - match ctype: - case ControlType.gRPC: - from drunc.controller.children_interface.grpc_child import ( - gRCPChildConfHandler, - gRPCChildNode, - ) - - return gRPCChildNode( - configuration=gRCPChildConfHandler( - configuration, ConfTypes.PyObject - ), - init_token=init_token, - name=name, - uri=uri, - connectivity_service=connectivity_service, - **kwargs, - ) - - case ControlType.REST_API: - from drunc.controller.children_interface.rest_api_child import ( - RESTAPIChildNode, - RESTAPIChildNodeConfHandler, - ) - - return RESTAPIChildNode( - configuration=RESTAPIChildNodeConfHandler( - configuration, ConfTypes.PyObject - ), - name=name, - uri=uri, - # init_token = init_token, # No authentication for RESTAPI - **kwargs, - ) - - case ControlType.Direct: - from drunc.controller.children_interface.client_side_child import ( - ClientSideChild, - ) - - node = ClientSideChild( - name=name, - **kwargs, - ) - if node_in_error: - node.state.to_error() - return node - - case _: - raise ChildInterfaceTechnologyUnknown(ctype, name) + pass diff --git a/src/drunc/controller/children_interface/client_side_child.py b/src/drunc/controller/children_interface/client_side_child.py index b4a433e02..8311e40e9 100644 --- a/src/drunc/controller/children_interface/client_side_child.py +++ b/src/drunc/controller/children_interface/client_side_child.py @@ -1,15 +1,9 @@ from threading import Lock -from druncschema.controller_pb2 import AddressedCommand -from druncschema.generic_pb2 import PlainText -from druncschema.request_response_pb2 import Response, ResponseFlag -from druncschema.token_pb2 import Token - from drunc.controller.children_interface.child_node import ChildNode from drunc.fsm.configuration import FSMConfHandler from drunc.fsm.core import FSM -from drunc.utils.grpc_utils import pack_to_any -from drunc.utils.utils import ControlType, get_logger +from drunc.utils.utils import ControlType class ClientSideState: @@ -73,44 +67,14 @@ def in_error(self): class ClientSideChild(ChildNode): def __init__( self, - name, - node_type: ControlType = ControlType.Direct, - fsm_configuration: FSMConfHandler = None, - configuration=None, + name: str, + node_type: ControlType, + fsm_configuration: FSMConfHandler, ): - super().__init__(name=name, node_type=node_type, configuration=configuration) - self.log = get_logger(f"controller.{name}-client-side") + super().__init__(name, node_type) + self.state = ClientSideState() self.fsm_configuration = fsm_configuration if fsm_configuration: fsmch = FSMConfHandler(fsm_configuration) self.fsm = FSM(conf=fsmch) - - def propagate_command( - self, - command: str, - request: AddressedCommand, - token: Token | None, - ) -> Response: - if command == "exclude": - self.state.exclude() - return Response( - name=self.name, - data=pack_to_any(PlainText(text=f"'{self.name}' excluded")), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - if command == "include": - self.state.include() - return Response( - name=self.name, - data=pack_to_any(PlainText(text=f"'{self.name}' included")), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - # If we get here, we don't run the command. - self.log.info(f"Ignoring command '{command}' sent to '{self.name}'") - return Response( - name=self.name, - flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - ) diff --git a/src/drunc/controller/children_interface/grpc_child.py b/src/drunc/controller/children_interface/grpc_child.py index 7de703afa..cd53d8332 100644 --- a/src/drunc/controller/children_interface/grpc_child.py +++ b/src/drunc/controller/children_interface/grpc_child.py @@ -12,6 +12,8 @@ ExecuteFSMCommandRequest, ExecuteFSMCommandResponse, FSMCommand, + IncludeExcludeRequest, + IncludeExcludeResponse, StatusResponse, ) from druncschema.controller_pb2_grpc import ControllerStub @@ -30,14 +32,12 @@ from drunc.utils.configuration import ConfHandler, ConfTypes from drunc.utils.grpc_utils import ( ServerUnreachable, - copy_token, rethrow_if_unreachable_server, unpack_any, ) from drunc.utils.utils import ( ControlType, get_control_type_and_uri_from_connectivity_service, - get_logger, ) @@ -54,19 +54,15 @@ def get_uri(self): class gRPCChildNode(ChildNode): def __init__( self, - name, + name: str, configuration: gRCPChildConfHandler, - init_token, - uri, - connectivity_service=None, + uri: str, + connectivity_service, + init_token: Token | None = None, ): - super().__init__( - name=name, - node_type=ControlType.gRPC, - configuration=configuration, - ) + super().__init__(name, ControlType.gRPC) - self.log = get_logger(f"controller.{self.name}-grpc-child") + self.configuration = configuration self.connectivity_service = connectivity_service self._lock = threading.Lock() self.init_token = init_token @@ -94,7 +90,7 @@ def _setup_connection(self): self.stub = ControllerStub(self.channel) request = AddressedCommand( - token=copy_token(self.init_token), + token=None, command_name="describe", command_data=None, target="", @@ -110,18 +106,15 @@ def _setup_connection(self): response = self.stub.describe(request) except grpc.RpcError as error: - try: - self.handle_child_grpc_error(error) - except ServerUnreachable as server_unreachable_error: - if tries_remaining == 0: - raise server_unreachable_error - self.log.info( - ( - f"Could not connect to the controller ({self.uri}). " - f"Trying {tries_remaining} more times..." - ) + if tries_remaining == 0: + raise error + self.log.info( + ( + f"Could not connect to the controller ({self.uri}). " + f"Trying {tries_remaining} more times..." ) - time.sleep(5) + ) + time.sleep(5) else: self.log.info(f"Connected to the controller ({self.uri})!") @@ -173,15 +166,7 @@ def __str__(self) -> str: def get_endpoint(self) -> str: return self.uri - def start_listening(self, bdesc): - self.broadcast = BroadcastHandler( - BroadcastClientConfHandler( - data=bdesc, - type=ConfTypes.ProtobufAny, - ) - ) - - def terminate(self): + def terminate(self) -> None: if self.channel: self.channel.close() del self.channel @@ -191,6 +176,14 @@ def terminate(self): self.channel = None self.broadcast.stop() + def start_listening(self, bdesc): + self.broadcast = BroadcastHandler( + BroadcastClientConfHandler( + data=bdesc, + type=ConfTypes.ProtobufAny, + ) + ) + def propagate_command( self, command: str, @@ -362,6 +355,64 @@ def execute_expert_command( return response + def include( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + request = IncludeExcludeRequest( + token=None, + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + self.included = True + + try: + response = self.stub.include(request) + except grpc.RpcError as e: + try: + self.handle_child_grpc_error(e) + except ServerUnreachable: + self.log.warning( + f"Connection to {self.name} at {self.uri} failed during include, attempting to reconnect..." + ) + response = self._attempt_reconnection( + lambda: self.stub.include(request) + ) + + return response + + def exclude( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + request = IncludeExcludeRequest( + token=None, + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + self.included = False + + try: + response = self.stub.exclude(request) + except grpc.RpcError as e: + try: + self.handle_child_grpc_error(e) + except ServerUnreachable: + self.log.warning( + f"Connection to {self.name} at {self.uri} failed during exclude, attempting to reconnect..." + ) + response = self._attempt_reconnection( + lambda: self.stub.exclude(request) + ) + + return response + def recompute_status( self, target: str = "", diff --git a/src/drunc/controller/children_interface/rest_api_child.py b/src/drunc/controller/children_interface/rest_api_child.py index 46ce43b14..fdf749219 100644 --- a/src/drunc/controller/children_interface/rest_api_child.py +++ b/src/drunc/controller/children_interface/rest_api_child.py @@ -1,5 +1,6 @@ import json import multiprocessing +import os import queue import socket import threading @@ -9,19 +10,26 @@ import requests import socks from druncschema.controller_pb2 import ( + AddressedCommand, + DescribeFSMResponse, + DescribeResponse, ExecuteExpertCommandResponse, ExecuteFSMCommandResponse, FSMCommand, FSMResponseFlag, + IncludeExcludeResponse, Status, StatusResponse, ) -from druncschema.request_response_pb2 import ResponseFlag +from druncschema.description_pb2 import Description +from druncschema.request_response_pb2 import Response, ResponseFlag +from druncschema.token_pb2 import Token from flask import Flask, request from flask_restful import Api from drunc.controller.children_interface.client_side_child import ClientSideChild from drunc.controller.exceptions import ChildError, ExpertCommandException +from drunc.controller.utils import get_detector_name from drunc.exceptions import DruncException, DruncSetupException from drunc.fsm.configuration import FSMConfHandler from drunc.fsm.core import FSM @@ -355,20 +363,14 @@ def get_host_port(self): class RESTAPIChildNode(ClientSideChild): def __init__( self, - name, + name: str, configuration: RESTAPIChildNodeConfHandler, + uri: str, fsm_configuration: FSMConfHandler, - uri, ): - super().__init__( - name=name, - node_type=ControlType.REST_API, - configuration=configuration, - fsm_configuration=fsm_configuration, - ) - - self.log = get_logger(f"controller.{name}_rest_api_child") + super().__init__(name, ControlType.REST_API, fsm_configuration) + self.configuration = configuration self.response_listener = ResponseListener.get() if fsm_configuration: @@ -406,6 +408,21 @@ def __str__(self) -> str: def get_endpoint(self) -> str: return f"rest://{self.app_host}:{self.app_port}" + def terminate(self) -> None: + pass + + def propagate_command( + self, + command: str, + request: AddressedCommand, + token: Token | None, + ) -> Response: + self.log.info(f"Ignoring command '{command}' sent to '{self.name}'") + return Response( + name=self.name, + flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, + ) + def status( self, target: str = "", @@ -430,6 +447,125 @@ def status( return response + def describe( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> DescribeResponse: + response = DescribeResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + description = Description( + endpoint=self.get_endpoint(), + session=os.getenv("DUNEDAQ_SESSION"), + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + if self.configuration is not None: + if detector_name := get_detector_name(self.configuration): + description.info = detector_name + if hasattr( + self.configuration.data, "application_name" + ): # Application nodes. + description.type = self.configuration.data.application_name + description.name = self.configuration.data.id + elif hasattr(self.configuration.data, "controller") and hasattr( + self.configuration.data.controller, "application_name" + ): # Controller nodes. + description.type = self.configuration.data.controller.application_name + description.name = self.configuration.data.controller.id + + response.description.CopyFrom(description) + + return response + + def describe_fsm( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + key: str = "", + ) -> DescribeFSMResponse: + return DescribeFSMResponse( + token=None, + name=self.name, + flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, + ) + + def execute_fsm_command( + self, + command: FSMCommand, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> ExecuteFSMCommandResponse: + command_name = command.command_name + response = ExecuteFSMCommandResponse( + token=None, + name=self.name, + command_name=command_name, + fsm_flag=FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + # Don't execute command if we are excluded. + if self.state.excluded(): + response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_EXCLUDED + response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY + return response + + try: + module_data = json.loads(command.data if command.data else "{}") + except JSONDecodeError as e: + self.log.error(f"Error parsing JSON command data: {e}") + response.fsm_flag = FSMResponseFlag.FSM_FAILED + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + + cmd_data = {"modules": [{"data": module_data, "match": ""}]} + entry_state = self.state.get_operational_state() + transition = self.fsm.get_transition(command_name) + exit_state = self.fsm.get_destination_state(entry_state, transition) + self.state.executing_command_mark() + self.log.info(f"Sending '{command_name}' to '{self.name}'") + + try: + self.commander.send_app_command( + cmd_id=command_name, + module_data=cmd_data, + entry_state=entry_state.upper(), + exit_state=exit_state.upper(), + ) + self.log.debug(f"Sent '{command_name}' to '{self.name}'") + + r = self.commander.check_response(150) + self.log.debug(f"Got response from '{command_name}' to '{self.name}'") + + response.data = json.dumps(r) + + if not r["success"]: + # The RPC was successful, but the FSM command was not. + self.log.error(r["result"]) + self.state.to_error() + response.fsm_flag = FSMResponseFlag.FSM_FAILED + return response + + except Exception as e: + self.log.error(f"Got error from '{command_name}' to '{self.name}': {e!s}") + self.state.to_error() + response.fsm_flag = FSMResponseFlag.FSM_FAILED + response.flag = ResponseFlag.UNHANDLED_EXCEPTION_THROWN + return response + + self.state.end_command_execution_mark() + self.state.new_operational_state(exit_state) + + return response + def execute_expert_command( self, json_string: str, @@ -520,73 +656,44 @@ def execute_expert_command( return response - def execute_fsm_command( + def include( self, - command: FSMCommand, target: str = "", execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, - ) -> ExecuteFSMCommandResponse: - command_name = command.command_name - - response = ExecuteFSMCommandResponse( + ) -> IncludeExcludeResponse: + self.state.include() + self.included = True + return IncludeExcludeResponse( token=None, name=self.name, - command_name=command_name, - fsm_flag=FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY, + text=f"'{self.name}' included", flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) - # Don't execute command if we are excluded. - if self.state.excluded(): - response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_EXCLUDED - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response - - try: - module_data = json.loads(command.data if command.data else "{}") - except JSONDecodeError as e: - self.log.error(f"Error parsing JSON command data: {e}") - response.fsm_flag = FSMResponseFlag.FSM_FAILED - response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT - return response - - cmd_data = {"modules": [{"data": module_data, "match": ""}]} - entry_state = self.state.get_operational_state() - transition = self.fsm.get_transition(command_name) - exit_state = self.fsm.get_destination_state(entry_state, transition) - self.state.executing_command_mark() - self.log.info(f"Sending '{command_name}' to '{self.name}'") - - try: - self.commander.send_app_command( - cmd_id=command_name, - module_data=cmd_data, - entry_state=entry_state.upper(), - exit_state=exit_state.upper(), - ) - self.log.debug(f"Sent '{command_name}' to '{self.name}'") - - r = self.commander.check_response(150) - self.log.debug(f"Got response from '{command_name}' to '{self.name}'") - - response.data = json.dumps(r) - - if not r["success"]: - # The RPC was successful, but the FSM command was not. - self.log.error(r["result"]) - self.state.to_error() - response.fsm_flag = FSMResponseFlag.FSM_FAILED - return response - - except Exception as e: - self.log.error(f"Got error from '{command_name}' to '{self.name}': {e!s}") - self.state.to_error() - response.fsm_flag = FSMResponseFlag.FSM_FAILED - response.flag = ResponseFlag.UNHANDLED_EXCEPTION_THROWN - return response - - self.state.end_command_execution_mark() - self.state.new_operational_state(exit_state) + def exclude( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + self.state.exclude() + self.included = False + return IncludeExcludeResponse( + token=None, + name=self.name, + text=f"'{self.name}' excluded", + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) - return response + def recompute_status( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> StatusResponse: + return self.status( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) diff --git a/src/drunc/controller/configuration.py b/src/drunc/controller/configuration.py index 5f0fa9fcd..3285ea0d3 100644 --- a/src/drunc/controller/configuration.py +++ b/src/drunc/controller/configuration.py @@ -1,20 +1,32 @@ import socket import threading +import confmodel_dal +from druncschema.token_pb2 import Token from kafkaopmon.OpMonPublisher import OpMonPublisher as KafkaOpMonPublisher from opmonlib.publisher import OpMonPublisher from opmonlib.utils import parse_opmon_conf +from drunc.connectivity_service.client import ConnectivityServiceClient +from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful from drunc.controller.children_interface.child_node import ChildNode +from drunc.controller.children_interface.grpc_child import ( + gRCPChildConfHandler, + gRPCChildNode, +) from drunc.controller.children_interface.rest_api_child import ( + RESTAPIChildNode, RESTAPIChildNodeConfHandler, ) from drunc.exceptions import DruncCommandException, DruncSetupException from drunc.process_manager.configuration import get_commandline_parameters from drunc.utils.configuration import ConfHandler, ConfTypes -from drunc.utils.utils import ControlType - -import confmodel_dal # isort: skip +from drunc.utils.utils import ( + ControlType, + get_control_type_and_uri_from_cli, + get_control_type_and_uri_from_connectivity_service, + get_logger, +) class ControllerConfData: # the bastardised OKS @@ -56,7 +68,6 @@ def _grab_segment_conf_from_controller(self, configuration): def _post_process_oks(self, *args, **kwargs): self.authoriser = None - self.children = [] self.data = self._grab_segment_conf_from_controller(self.data) self.this_host = self.data.controller.runs_on.runs_on.id @@ -105,123 +116,73 @@ def _post_process_oks(self, *args, **kwargs): raise DruncCommandException("Failed to initialize OpMonPublisher.") return - def get_dummy_children(self): - ret = [] - session = self.db.get_dal(class_name="Session", uid=self.oks_key.session) - - for seg in self.data.segments: - if confmodel_dal.component_disabled(self.db._obj, session.id, seg.id): - continue - ret.append( - ChildNode( - name=seg.controller.id, - configuration=RESTAPIChildNodeConfHandler(seg, ConfTypes.PyObject), - node_type=ControlType.Unknown, - ) - ) - for app in self.data.applications: - if confmodel_dal.component_disabled(self.db._obj, session.id, app.id): - continue - ret.append( - ChildNode( - name=app.id, - configuration=RESTAPIChildNodeConfHandler(app, ConfTypes.PyObject), - node_type=ControlType.Unknown, - ) - ) - return ret - - def update_children( + def init_children( self, - children, - init_token, - without_excluded=False, - connectivity_service=None, - session_name=None, - ): - enabled_only = not without_excluded - timeout = 60 # 60s for each application to start and show up on the connectivity service - - self.log.debug(f"get_children: connectivity service lookup timeout={timeout}") + session_name: str, + init_token: Token, + connectivity_service: ConnectivityServiceClient | None = None, + enabled_only: bool = True, + ) -> list[ChildNode]: + child_nodes: list[ChildNode] = [] - session = None - self.children = [] + # 60s for applications to show on the connectivity service. + timeout = 60 + self.log.debug(f"init_children: connectivity service timeout: {timeout}") try: session = self.db.get_dal(class_name="Session", uid=self.oks_key.session) - except ImportError: + session = None if enabled_only: self.log.error( "OKS was not set up, so configuration does not know about include/exclude. All the children nodes will be returned" ) - enabled_only = True - - self.log.debug(f"looping over children\n{self.data.segments}") + enabled_only = False def process_segment(segment): - if enabled_only: - if confmodel_dal.component_disabled( - self.db._obj, session.id, segment.id - ): - return - - new_node = ChildNode.get_child( - cli=get_commandline_parameters( - db=self.db, - config_filename=self.initial_data, - session_id=session.id, - session_name=session_name, - obj=segment.controller, - ), + if enabled_only and confmodel_dal.component_disabled( + self.db._obj, session.id, segment.id + ): + return # Ignore disabled segments. + + cmd_args = get_commandline_parameters( + db=self.db, + config_filename=self.initial_data, + session_id=session.id, + session_name=session_name, + obj=segment.controller, + ) + node = self.child_node_factory( + cmd_args=cmd_args, init_token=init_token, name=segment.controller.id, configuration=segment, connectivity_service=connectivity_service, timeout=timeout, ) - if new_node: - got_child = False - - for idx, child in enumerate(children): - if child.name == new_node.name: - children[idx] = new_node - got_child = True - break - if not got_child: - self.children.append(new_node) + child_nodes.append(node) def process_application(app): - if enabled_only: - if confmodel_dal.component_disabled(self.db._obj, session.id, app.id): - return + if enabled_only and confmodel_dal.component_disabled( + self.db._obj, session.id, app.id + ): + return # Ignore disabled applications. - commandline_parameters = get_commandline_parameters( + cmd_args = get_commandline_parameters( db=self.db, config_filename=self.initial_data, session_id=session.id, session_name=session_name, obj=app, ) - - new_node = ChildNode.get_child( - cli=commandline_parameters, + node = self.child_node_factory( + cmd_args=cmd_args, name=app.id, configuration=app, - fsm_configuration=self.data.controller.fsm, connectivity_service=connectivity_service, - timeout=60, + timeout=timeout, ) - if new_node: - got_child = False - - for idx, child in enumerate(children): - if child.name == new_node.name: - children[idx] = new_node - got_child = True - break - if not got_child: - self.children.append(new_node) + child_nodes.append(node) # threading the children look up threads = [] @@ -241,4 +202,54 @@ def process_application(app): for t in threads: t.join() - return self.children + return child_nodes + + def child_node_factory( + self, + name: str, + cmd_args: list[str], + configuration, + init_token: Token | None = None, + connectivity_service: ConnectivityServiceClient | None = None, + timeout: int = 60, + ) -> ChildNode: + log = get_logger("controller.child_node_factory") + + if connectivity_service is not None: + try: + # Query the connectivity service. + ctype, uri = get_control_type_and_uri_from_connectivity_service( + connectivity_service, name, timeout=timeout + ) + except ApplicationLookupUnsuccessful as e: + log.error(f"Could not find '{name}' in the connectivity service: {e}") + raise e + else: + try: + # Fall back to the command line arguments. + ctype, uri = get_control_type_and_uri_from_cli(cmd_args) + except DruncSetupException as e: + log.error(f"Could not get '{name}' protocol from CLI: {e}") + raise e + + log.info(f"Child '{name}' is of type '{ctype}' and has the URI '{uri}'") + + match ctype: + case ControlType.gRPC: + conf_handler = gRCPChildConfHandler(configuration, ConfTypes.PyObject) + return gRPCChildNode( + name, conf_handler, uri, connectivity_service, init_token + ) + + case ControlType.REST_API: + conf_handler = RESTAPIChildNodeConfHandler( + configuration, ConfTypes.PyObject + ) + return RESTAPIChildNode( + name, conf_handler, uri, self.data.controller.fsm + ) + + case _: + error_message = f"Unknown protocol '{ctype}' for child '{name}'" + log.error(error_message) + raise DruncSetupException(error_message) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index b2fe56e78..0f34637b9 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -19,7 +19,8 @@ ExecuteFSMCommandResponse, FSMCommand, FSMResponseFlag, - RecomputeStatusResponse, + IncludeExcludeRequest, + IncludeExcludeResponse, StatusResponse, ) from druncschema.controller_pb2_grpc import ControllerServicer @@ -284,17 +285,16 @@ def __init__(self, configuration, name: str, session: str, token: Token): address=f"{connection_server}:{connection_port}", ) - self.children_nodes = self.configuration.get_dummy_children() - def init_controller(self) -> None: log_init_controller = get_logger("controller.init_controller") log_init_controller.info("Finishing initialisation of controller") - self.configuration.update_children( - self.children_nodes, + + self.children_nodes = self.configuration.init_children( + session_name=self.session, init_token=self.actor.get_token(), connectivity_service=self.connectivity_service, - session_name=self.session, ) + # At this point, we already waited for 60s for the children applications to # start and show up on the connectivity service # We now wait for each application to get from "initialising" to "ready" @@ -312,12 +312,10 @@ def init_controller(self) -> None: time.time() - time_start < timeout and self.stateful_node.node_is_in_error() == False ): - - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.status(target) - child_list = self.address_all() - child_responses = self.propagate_concurrently(child_command_fn, child_list) + child_responses = self.propagate_concurrently( + lambda child, target: child.status(target), child_list + ) children_states = {} for response in child_responses: @@ -752,13 +750,13 @@ def address_all( @staticmethod def propagate_concurrently( - child_command_fn: Callable[[ChildNode, str], T], + child_callable: Callable[[ChildNode, str], T], child_list: list[tuple[ChildNode, str]], ) -> list[T]: """Propagate commands concurrently to a list of children. Args: - child_command_fn: Callable to be executed for each child, with + child_callable: Callable to be executed for each child, with arguments (child, target). child_list: List of (node, target) for each addressed child. @@ -767,7 +765,7 @@ def propagate_concurrently( """ with ThreadPoolExecutor() as executor: futures = [ - executor.submit(child_command_fn, child_node, child_target) + executor.submit(child_callable, child_node, child_target) for child_node, child_target in child_list ] return [f.result() for f in as_completed(futures)] @@ -782,34 +780,40 @@ def propagate_concurrently( def status( self, request: AddressedCommand, context: ServicerContext ) -> StatusResponse: - request.target = self.parse_target_string(request.target) response = StatusResponse( token=None, name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. if request.target == self.name or request.execute_along_path: status = get_status_message(self) response.status.CopyFrom(status) - # Children nodes. - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.status( - target, - request.execute_along_path, - request.execute_on_all_subsequent_children_in_path, - ) - + # Children nodes (ignore exclusion). child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.status( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response @broadcasted @@ -818,12 +822,19 @@ def child_command_fn(child: ChildNode, target: str) -> StatusResponse: def describe( self, request: AddressedCommand, context: ServicerContext ) -> DescribeResponse: - request.target = self.parse_target_string(request.target) response = DescribeResponse( token=None, name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. if request.target == self.name or request.execute_along_path: description = Description( @@ -838,23 +849,22 @@ def describe( description.broadcast.Pack(broadcast_description) response.description.CopyFrom(description) - # Children nodes. - def child_command_fn(child: ChildNode, target: str) -> DescribeResponse: - return child.describe( - target, - request.execute_along_path, - request.execute_on_all_subsequent_children_in_path, - ) - + # Children nodes (ignore exclusion). child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.describe( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response @broadcasted @@ -863,12 +873,19 @@ def child_command_fn(child: ChildNode, target: str) -> DescribeResponse: def describe_fsm( self, request: AddressedCommand, context: ServicerContext ) -> DescribeFSMResponse: - request.target = self.parse_target_string(request.target) response = DescribeFSMResponse( token=None, name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # What transitions to describe. key = unpack_any(request.command_data, PlainText).text @@ -898,24 +915,23 @@ def describe_fsm( description.sequences.extend(self.stateful_node.get_fsm_sequences()) response.description.CopyFrom(description) - # Children nodes. - def child_command_fn(child: ChildNode, target: str) -> DescribeFSMResponse: - return child.describe_fsm( + # Children nodes (ignore exclusion). + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.describe_fsm( target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, key, - ) - - child_list = self.address_target_path( - request.target, - request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response ######################################## @@ -931,7 +947,20 @@ def execute_fsm_command( request: ExecuteFSMCommandRequest, context: ServicerContext, ) -> ExecuteFSMCommandResponse: - request.target = self.parse_target_string(request.target) + response = ExecuteFSMCommandResponse( + token=None, + name=self.name, + command_name=request.command.command_name, + fsm_flag=FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response command = request.command command_name = command.command_name @@ -939,17 +968,12 @@ def execute_fsm_command( transition = self.stateful_node.get_fsm_transition(command_name) self.log.debug(f"FSM transition: {transition}") - response = ExecuteFSMCommandResponse( - token=None, - name=self.name, - command_name=command_name, - ) - # Check controller readiness. if not self.stateful_node.get_ready_state(): self.log.error( f"Command '{command_name}' not executed: controller is not ready." ) + response.fsm_flag = FSMResponseFlag.FSM_FAILED response.flag = ResponseFlag.NOT_EXECUTED_NOT_READY return response @@ -957,14 +981,12 @@ def execute_fsm_command( if self.stateful_node.node_is_in_error(): self.log.error(f"Command '{command_name}' not executed: node is in error.") response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_IN_ERROR - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY return response # Check if node is excluded. if not self.stateful_node.node_is_included(): self.log.error(f"Command '{command_name}' not executed: node is excluded.") response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_EXCLUDED - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY return response # Check if transition is possible from current state. @@ -974,7 +996,6 @@ def execute_fsm_command( f"Command '{command_name}' not executed: not possible from state '{state}'." ) response.fsm_flag = FSMResponseFlag.FSM_INVALID_TRANSITION - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY return response # This node. @@ -1022,21 +1043,19 @@ def execute_fsm_command( child_command.CopyFrom(command) child_command.data = fsm_data - def child_command_fn( - child: ChildNode, target: str - ) -> ExecuteFSMCommandResponse: - return child.execute_fsm_command( + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.execute_fsm_command( child_command, target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) - - child_list = self.address_target_path( - request.target, - request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) # Finish propagating FSM transition to children. @@ -1056,7 +1075,6 @@ def child_command_fn( ) # Set FSM error flag based on child responses. - response.fsm_flag = FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY for child_response in child_responses: if child_response.flag not in [ ResponseFlag.EXECUTED_SUCCESSFULLY, @@ -1070,26 +1088,21 @@ def child_command_fn( # Children nodes. else: - - def child_command_fn( - child: ChildNode, target: str - ) -> ExecuteFSMCommandResponse: - return child.execute_fsm_command( + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.execute_fsm_command( command, target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) - - child_list = self.address_target_path( - request.target, - request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response @broadcasted @@ -1101,34 +1114,136 @@ def execute_expert_command( request: ExecuteExpertCommandRequest, context: ServicerContext, ) -> ExecuteExpertCommandResponse: - request.target = self.parse_target_string(request.target) response = ExecuteExpertCommandResponse( token=None, name=self.name, + fsm_flag=FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. response.data = f"'{self.name}' propagated expert command" # Children nodes. - def child_command_fn( - child: ChildNode, target: str - ) -> ExecuteExpertCommandResponse: - return child.execute_expert_command( + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.execute_expert_command( request.json_string, target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) + ), + child_list, + ) + response.children.extend(child_responses) + + return response + + @broadcasted + @authentified_and_authorised(action=ActionType.UPDATE, system=SystemType.CONTROLLER) + @in_control + @publish_command_time + def include( + self, + request: IncludeExcludeRequest, + context: ServicerContext, + ) -> IncludeExcludeResponse: + response = IncludeExcludeResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + + # This node. + if request.target == self.name or request.execute_along_path: + try: + self.stateful_node.include_node() + except CannotInclude: + response.text = f"'{self.name}' is already included" + else: + response.text = f"'{self.name}' included" + + # Children nodes (ignore exclusion). child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.include( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY + return response + + @broadcasted + @authentified_and_authorised(action=ActionType.UPDATE, system=SystemType.CONTROLLER) + @in_control + @publish_command_time + def exclude( + self, + request: IncludeExcludeRequest, + context: ServicerContext, + ) -> IncludeExcludeResponse: + response = IncludeExcludeResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + + # This node. + if request.target == self.name or request.execute_along_path: + try: + self.stateful_node.exclude_node() + except CannotExclude: + response.text = f"'{self.name}' is already excluded" + else: + response.text = f"'{self.name}' excluded" + + # Children nodes (ignore exclusion). + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.exclude( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) + response.children.extend(child_responses) return response @@ -1138,25 +1253,31 @@ def child_command_fn( @publish_command_time def recompute_status( self, request: AddressedCommand, context: ServicerContext - ) -> RecomputeStatusResponse: - request.target = self.parse_target_string(request.target) - response = RecomputeStatusResponse( + ) -> StatusResponse: + response = StatusResponse( token=None, name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. if request.target == self.name or request.execute_along_path: - - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.recompute_status( + child_list = self.address_all() + child_responses = self.propagate_concurrently( + lambda child, target: child.recompute_status( target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) - - child_list = self.address_all() - child_responses = self.propagate_concurrently(child_command_fn, child_list) + ), + child_list, + ) self_should_go_to_error = False children_states = set() @@ -1216,124 +1337,35 @@ def child_command_fn(child: ChildNode, target: str) -> StatusResponse: status = get_status_message(self) response.status.CopyFrom(status) - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.status( + child_list = self.address_all(ignore_exclusion=True) + child_responses = self.propagate_concurrently( + lambda child, target: child.status( target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) - - child_list = self.address_all(ignore_exclusion=True) - child_responses = self.propagate_concurrently(child_command_fn, child_list) + ), + child_list, + ) response.children.extend(child_responses) # Children nodes. else: - - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.recompute_status( - target, - request.execute_along_path, - request.execute_on_all_subsequent_children_in_path, - ) - child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) + child_responses = self.propagate_concurrently( + lambda child, target: child.recompute_status( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response - # ORDER MATTERS! - @broadcasted # outer most wrapper 1st step - @authentified_and_authorised( - action=ActionType.UPDATE, system=SystemType.CONTROLLER - ) # 2nd step - @in_control # 3rd step - @OLD_unpack_addressed_command_to() # 4th step - @publish_command_time - def include( - self, - addressed_commands: dict[str, AddressedCommand], - execute_on_self: bool, - token: Token, - ) -> PlainText: - resp = None - if execute_on_self: - try: - self.stateful_node.include_node() - except CannotInclude: - resp = PlainText(text=f"{self.name} is already included") - else: - resp = PlainText(text=f"{self.name} included") - - # Now we snoop into the addressed_commands and see if we can find a target that is a children, and include it - for child_name, addressed_command in addressed_commands.items(): - for n in self.children_nodes: - if n.name == addressed_command.target: - n.included = True - - response_children = self.OLD_propagate_to_children( - "include", - addressed_commands, - token, - ) - - return Response( - name=self.name, - token=token, - data=pack_to_any(resp) if resp else None, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=response_children, - ) - - # ORDER MATTERS! - @broadcasted # outer most wrapper 1st step - @authentified_and_authorised( - action=ActionType.UPDATE, system=SystemType.CONTROLLER - ) # 2nd step - @in_control - @OLD_unpack_addressed_command_to() # 3rd step - @publish_command_time - def exclude( - self, - addressed_commands: dict[str, AddressedCommand], - execute_on_self: bool, - token: Token, - ) -> PlainText: - resp = None - if execute_on_self: - try: - self.stateful_node.exclude_node() - except CannotExclude: - resp = PlainText(text=f"{self.name} is already excluded") - else: - resp = PlainText(text=f"{self.name} excluded") - - # Now we snoop into the addressed_commands and see if we can find a target that is a children, and exclude it - for child_name, addressed_command in addressed_commands.items(): - for n in self.children_nodes: - if n.name == addressed_command.target: - n.included = False - - response_children = self.OLD_propagate_to_children( - "exclude", - addressed_commands, - token, - ) - - return Response( - name=self.name, - token=token, - data=pack_to_any(resp) if resp else None, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=response_children, - ) - ########################################## ############# Actor commands ############# ########################################## diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index 187783399..59bd86a99 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -10,7 +10,8 @@ ExecuteFSMCommandRequest, ExecuteFSMCommandResponse, FSMCommand, - RecomputeStatusResponse, + IncludeExcludeRequest, + IncludeExcludeResponse, StatusResponse, ) from druncschema.controller_pb2_grpc import ControllerStub @@ -180,13 +181,55 @@ def execute_expert_command( return response + def include( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + timeout: int | float = 60, + ) -> IncludeExcludeResponse: + request = IncludeExcludeRequest( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.include(request, timeout=timeout) + except grpc.RpcError as e: + handle_grpc_error(e) + + return response + + def exclude( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + timeout: int | float = 60, + ) -> IncludeExcludeResponse: + request = IncludeExcludeRequest( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.exclude(request, timeout=timeout) + except grpc.RpcError as e: + handle_grpc_error(e) + + return response + def recompute_status( self, target: str = "", execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, timeout: int | float = 60, - ) -> RecomputeStatusResponse: + ) -> StatusResponse: request = AddressedCommand( command_name="recompute_status", target=target, @@ -232,22 +275,6 @@ def surrender_control( timeout=timeout, ) - @OLD_pack_empty_addressed_command - def include( - self, addressed_command: AddressedCommand, timeout: int | float = 60 - ) -> DecodedResponse: - return self.OLD_send_command( - "include", data=addressed_command, outformat=PlainText, timeout=timeout - ) - - @OLD_pack_empty_addressed_command - def exclude( - self, addressed_command: AddressedCommand, timeout: int | float = 60 - ) -> DecodedResponse: - return self.OLD_send_command( - "exclude", data=addressed_command, outformat=PlainText, timeout=timeout - ) - @OLD_pack_empty_addressed_command def to_error( self, addressed_command: AddressedCommand, timeout: int | float = 60 diff --git a/src/drunc/controller/interface/commands.py b/src/drunc/controller/interface/commands.py index bbedd55f3..b3869bb84 100644 --- a/src/drunc/controller/interface/commands.py +++ b/src/drunc/controller/interface/commands.py @@ -302,16 +302,12 @@ def include( obj: ControllerContext, target: str, ) -> None: - result = ( - obj.get_driver("controller") - .include( - target=target, - execute_along_path=False, - execute_on_all_subsequent_children_in_path=True, - ) - .data + result = obj.get_driver("controller").include( + target=target, + execute_along_path=False, + execute_on_all_subsequent_children_in_path=True, ) - if not result: + if not result or not result.text: return log = get_logger(**logger_params) log.info(result.text) @@ -324,16 +320,12 @@ def exclude( obj: ControllerContext, target: str, ) -> None: - result = ( - obj.get_driver("controller") - .exclude( - target=target, - execute_along_path=False, - execute_on_all_subsequent_children_in_path=True, - ) - .data + result = obj.get_driver("controller").exclude( + target=target, + execute_along_path=False, + execute_on_all_subsequent_children_in_path=True, ) - if not result: + if not result or not result.text: return log = get_logger(**logger_params) log.info(result.text) diff --git a/src/drunc/utils/utils.py b/src/drunc/utils/utils.py index 2c7712cc8..615e19118 100644 --- a/src/drunc/utils/utils.py +++ b/src/drunc/utils/utils.py @@ -402,19 +402,17 @@ class ControlType(Enum): Direct = 3 -def get_control_type_and_uri_from_cli(CLAs: list[str]) -> ControlType: - for CLA in CLAs: - if CLA.startswith("rest://"): - return ControlType.REST_API, resolve_localhost_and_127_ip_to_network_ip( - CLA.replace("rest://", "") - ) - elif CLA.startswith("grpc://"): - return ControlType.gRPC, resolve_localhost_and_127_ip_to_network_ip( - CLA.replace("grpc://", "") - ) - raise DruncSetupException( - "Could not find if the child was controlled by gRPC or a REST API" - ) +def get_control_type_and_uri_from_cli(cli_args: list[str]) -> tuple[ControlType, str]: + for arg in cli_args: + if arg.startswith("rest://"): + uri = arg.replace("rest://", "") + uri = resolve_localhost_and_127_ip_to_network_ip(uri) + return ControlType.REST_API, uri + elif arg.startswith("grpc://"): + uri = arg.replace("grpc://", "") + uri = resolve_localhost_and_127_ip_to_network_ip(uri) + return ControlType.gRPC, uri + raise DruncSetupException("Protocol must be 'grpc://' or 'rest://'") def get_control_type_and_uri_from_connectivity_service(