From 94ae0f8f38fd81c0f77490cebb82a1a5f38ac5b9 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Wed, 5 Nov 2025 17:37:01 +0000 Subject: [PATCH 01/14] include/exclude msg: controller driver changes. --- src/drunc/controller/controller_driver.py | 60 ++++++++++++++++------ src/drunc/controller/interface/commands.py | 24 +++------ 2 files changed, 52 insertions(+), 32 deletions(-) diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index e31b0a544..568c42fea 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -10,6 +10,8 @@ ExecuteFSMCommandRequest, ExecuteFSMCommandResponse, FSMCommand, + IncludeExcludeRequest, + IncludeExcludeResponse, RecomputeStatusResponse, StatusResponse, ) @@ -178,6 +180,48 @@ def execute_expert_command( return response + def include( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + timeout: int | float = 60, + ) -> IncludeExcludeResponse: + request = IncludeExcludeRequest( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.include(request, timeout=timeout) + except grpc.RpcError as e: + handle_grpc_error(e) + + return response + + def exclude( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + timeout: int | float = 60, + ) -> IncludeExcludeResponse: + request = IncludeExcludeRequest( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + request.token.CopyFrom(self.token) + + try: + response = self.stub.exclude(request, timeout=timeout) + except grpc.RpcError as e: + handle_grpc_error(e) + + return response + def recompute_status( self, target: str = "", @@ -230,22 +274,6 @@ def surrender_control( timeout=timeout, ) - @OLD_pack_empty_addressed_command - def include( - self, addressed_command: AddressedCommand, timeout: int | float = 60 - ) -> DecodedResponse: - return self.OLD_send_command( - "include", data=addressed_command, outformat=PlainText, timeout=timeout - ) - - @OLD_pack_empty_addressed_command - def exclude( - self, addressed_command: AddressedCommand, timeout: int | float = 60 - ) -> DecodedResponse: - return self.OLD_send_command( - "exclude", data=addressed_command, outformat=PlainText, timeout=timeout - ) - @OLD_pack_empty_addressed_command def to_error( self, addressed_command: AddressedCommand, timeout: int | float = 60 diff --git a/src/drunc/controller/interface/commands.py b/src/drunc/controller/interface/commands.py index bbedd55f3..c5ec58066 100644 --- a/src/drunc/controller/interface/commands.py +++ b/src/drunc/controller/interface/commands.py @@ -302,14 +302,10 @@ def include( obj: ControllerContext, target: str, ) -> None: - result = ( - obj.get_driver("controller") - .include( - target=target, - execute_along_path=False, - execute_on_all_subsequent_children_in_path=True, - ) - .data + result = obj.get_driver("controller").include( + target=target, + execute_along_path=False, + execute_on_all_subsequent_children_in_path=True, ) if not result: return @@ -324,14 +320,10 @@ def exclude( obj: ControllerContext, target: str, ) -> None: - result = ( - obj.get_driver("controller") - .exclude( - target=target, - execute_along_path=False, - execute_on_all_subsequent_children_in_path=True, - ) - .data + result = obj.get_driver("controller").exclude( + target=target, + execute_along_path=False, + execute_on_all_subsequent_children_in_path=True, ) if not result: return From a65256e6daf08f275f1bc4e1dc914f4b2a294f5d Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 6 Nov 2025 14:30:07 +0000 Subject: [PATCH 02/14] include/exclude controller fns. Also use lambdas for propagate_concurrently. --- src/drunc/controller/controller.py | 341 +++++++++++++++-------------- 1 file changed, 173 insertions(+), 168 deletions(-) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index deb147495..728ba5d95 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -19,6 +19,8 @@ ExecuteFSMCommandResponse, FSMCommand, FSMResponseFlag, + IncludeExcludeRequest, + IncludeExcludeResponse, RecomputeStatusResponse, StatusResponse, ) @@ -312,12 +314,10 @@ def init_controller(self) -> None: time.time() - time_start < timeout and self.stateful_node.node_is_in_error() == False ): - - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.status(target) - child_list = self.address_all() - child_responses = self.propagate_concurrently(child_command_fn, child_list) + child_responses = self.propagate_concurrently( + lambda child, target: child.status(target), child_list + ) children_states = {} for response in child_responses: @@ -741,13 +741,13 @@ def address_all( @staticmethod def propagate_concurrently( - child_command_fn: Callable[[ChildNode, str], T], + child_callable: Callable[[ChildNode, str], T], child_list: list[tuple[ChildNode, str]], ) -> list[T]: """Propagate commands concurrently to a list of children. Args: - child_command_fn: Callable to be executed for each child, with + child_callable: Callable to be executed for each child, with arguments (child, target). child_list: List of (node, target) for each addressed child. @@ -756,7 +756,7 @@ def propagate_concurrently( """ with ThreadPoolExecutor() as executor: futures = [ - executor.submit(child_command_fn, child_node, child_target) + executor.submit(child_callable, child_node, child_target) for child_node, child_target in child_list ] return [f.result() for f in as_completed(futures)] @@ -783,18 +783,18 @@ def status( response.status.CopyFrom(status) # Children nodes. - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.status( - target, - request.execute_along_path, - request.execute_on_all_subsequent_children_in_path, - ) - child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) + child_responses = self.propagate_concurrently( + lambda child, target: child.status( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) response.children.extend(child_responses) response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY @@ -828,18 +828,18 @@ def describe( response.description.CopyFrom(description) # Children nodes. - def child_command_fn(child: ChildNode, target: str) -> DescribeResponse: - return child.describe( - target, - request.execute_along_path, - request.execute_on_all_subsequent_children_in_path, - ) - child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) + child_responses = self.propagate_concurrently( + lambda child, target: child.describe( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) response.children.extend(child_responses) response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY @@ -888,19 +888,19 @@ def describe_fsm( response.description.CopyFrom(description) # Children nodes. - def child_command_fn(child: ChildNode, target: str) -> DescribeFSMResponse: - return child.describe_fsm( + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.describe_fsm( target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, key, - ) - - child_list = self.address_target_path( - request.target, - request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY @@ -1011,21 +1011,19 @@ def execute_fsm_command( child_command.CopyFrom(command) child_command.data = fsm_data - def child_command_fn( - child: ChildNode, target: str - ) -> ExecuteFSMCommandResponse: - return child.execute_fsm_command( + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.execute_fsm_command( child_command, target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) - - child_list = self.address_target_path( - request.target, - request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) # Finish propagating FSM transition to children. @@ -1059,22 +1057,19 @@ def child_command_fn( # Children nodes. else: - - def child_command_fn( - child: ChildNode, target: str - ) -> ExecuteFSMCommandResponse: - return child.execute_fsm_command( + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.execute_fsm_command( command, target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) - - child_list = self.address_target_path( - request.target, - request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY @@ -1100,24 +1095,122 @@ def execute_expert_command( response.data = f"'{self.name}' propagated expert command" # Children nodes. - def child_command_fn( - child: ChildNode, target: str - ) -> ExecuteExpertCommandResponse: - return child.execute_expert_command( + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.execute_expert_command( request.json_string, target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) + ), + child_list, + ) + response.children.extend(child_responses) + + response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY + + return response + + @broadcasted + @authentified_and_authorised(action=ActionType.UPDATE, system=SystemType.CONTROLLER) + @in_control + @publish_command_time + def include( + self, + request: IncludeExcludeRequest, + context: ServicerContext, + ) -> IncludeExcludeResponse: + request.target = self.parse_target_string(request.target) + response = IncludeExcludeResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + # This node. + if request.target == self.name or request.execute_along_path: + try: + self.stateful_node.include_node() + except CannotInclude: + response.text = f"{self.name} is already included" + else: + response.text = f"{self.name} included" + + # Children nodes (ignore exclusion). child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.include( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY + # # TODO: DO BELOW IN THE CHILDREN NODES INSTEAD OF HERE + # # Now we snoop into the addressed_commands and see if we can find a target that is a children, and include it + # for child_name, addressed_command in addressed_commands.items(): + # for n in self.children_nodes: + # if n.name == addressed_command.target: + # n.included = True + + return response + + @broadcasted + @authentified_and_authorised(action=ActionType.UPDATE, system=SystemType.CONTROLLER) + @in_control + @publish_command_time + def exclude( + self, + request: IncludeExcludeRequest, + context: ServicerContext, + ) -> IncludeExcludeResponse: + request.target = self.parse_target_string(request.target) + response = IncludeExcludeResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + # This node. + if request.target == self.name or request.execute_along_path: + try: + self.stateful_node.exclude_node() + except CannotExclude: + response.text = f"{self.name} is already excluded" + else: + response.text = f"{self.name} excluded" + + # Children nodes (ignore exclusion). + child_list = self.address_target_path( + request.target, + request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, + ) + child_responses = self.propagate_concurrently( + lambda child, target: child.exclude( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) + response.children.extend(child_responses) + + # # TODO: DO BELOW IN THE CHILDREN NODES INSTEAD OF HERE + # # Now we snoop into the addressed_commands and see if we can find a target that is a children, and exclude it + # for child_name, addressed_command in addressed_commands.items(): + # for n in self.children_nodes: + # if n.name == addressed_command.target: + # n.included = False return response @@ -1136,16 +1229,15 @@ def recompute_status( # This node. if request.target == self.name or request.execute_along_path: - - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.recompute_status( + child_list = self.address_all() + child_responses = self.propagate_concurrently( + lambda child, target: child.recompute_status( target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) - - child_list = self.address_all() - child_responses = self.propagate_concurrently(child_command_fn, child_list) + ), + child_list, + ) self_should_go_to_error = False children_states = set() @@ -1205,124 +1297,37 @@ def child_command_fn(child: ChildNode, target: str) -> StatusResponse: status = get_status_message(self) response.status.CopyFrom(status) - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.status( + child_list = self.address_all(ignore_exclusion=True) + child_responses = self.propagate_concurrently( + lambda child, target: child.status( target, request.execute_along_path, request.execute_on_all_subsequent_children_in_path, - ) - - child_list = self.address_all(ignore_exclusion=True) - child_responses = self.propagate_concurrently(child_command_fn, child_list) + ), + child_list, + ) response.children.extend(child_responses) # Children nodes. else: - - def child_command_fn(child: ChildNode, target: str) -> StatusResponse: - return child.recompute_status( - target, - request.execute_along_path, - request.execute_on_all_subsequent_children_in_path, - ) - child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, ) - child_responses = self.propagate_concurrently(child_command_fn, child_list) + child_responses = self.propagate_concurrently( + lambda child, target: child.recompute_status( + target, + request.execute_along_path, + request.execute_on_all_subsequent_children_in_path, + ), + child_list, + ) response.children.extend(child_responses) response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY return response - # ORDER MATTERS! - @broadcasted # outer most wrapper 1st step - @authentified_and_authorised( - action=ActionType.UPDATE, system=SystemType.CONTROLLER - ) # 2nd step - @in_control # 3rd step - @OLD_unpack_addressed_command_to() # 4th step - @publish_command_time - def include( - self, - addressed_commands: dict[str, AddressedCommand], - execute_on_self: bool, - token: Token, - ) -> PlainText: - resp = None - if execute_on_self: - try: - self.stateful_node.include_node() - except CannotInclude: - resp = PlainText(text=f"{self.name} is already included") - else: - resp = PlainText(text=f"{self.name} included") - - # Now we snoop into the addressed_commands and see if we can find a target that is a children, and include it - for child_name, addressed_command in addressed_commands.items(): - for n in self.children_nodes: - if n.name == addressed_command.target: - n.included = True - - response_children = self.OLD_propagate_to_children( - "include", - addressed_commands, - token, - ) - - return Response( - name=self.name, - token=token, - data=pack_to_any(resp) if resp else None, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=response_children, - ) - - # ORDER MATTERS! - @broadcasted # outer most wrapper 1st step - @authentified_and_authorised( - action=ActionType.UPDATE, system=SystemType.CONTROLLER - ) # 2nd step - @in_control - @OLD_unpack_addressed_command_to() # 3rd step - @publish_command_time - def exclude( - self, - addressed_commands: dict[str, AddressedCommand], - execute_on_self: bool, - token: Token, - ) -> PlainText: - resp = None - if execute_on_self: - try: - self.stateful_node.exclude_node() - except CannotExclude: - resp = PlainText(text=f"{self.name} is already excluded") - else: - resp = PlainText(text=f"{self.name} excluded") - - # Now we snoop into the addressed_commands and see if we can find a target that is a children, and exclude it - for child_name, addressed_command in addressed_commands.items(): - for n in self.children_nodes: - if n.name == addressed_command.target: - n.included = False - - response_children = self.OLD_propagate_to_children( - "exclude", - addressed_commands, - token, - ) - - return Response( - name=self.name, - token=token, - data=pack_to_any(resp) if resp else None, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - children=response_children, - ) - ########################################## ############# Actor commands ############# ########################################## From f2e4941aae578fb85a2638d42082a4689727a15e Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 6 Nov 2025 14:32:01 +0000 Subject: [PATCH 03/14] gRPC child: simplify setup check. --- .../children_interface/grpc_child.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/drunc/controller/children_interface/grpc_child.py b/src/drunc/controller/children_interface/grpc_child.py index dc98f4d24..dcbc92eef 100644 --- a/src/drunc/controller/children_interface/grpc_child.py +++ b/src/drunc/controller/children_interface/grpc_child.py @@ -110,18 +110,15 @@ def _setup_connection(self): response = self.stub.describe(request) except grpc.RpcError as error: - try: - self.handle_child_grpc_error(error) - except ServerUnreachable as server_unreachable_error: - if tries_remaining == 0: - raise server_unreachable_error - self.log.info( - ( - f"Could not connect to the controller ({self.uri}). " - f"Trying {tries_remaining} more times..." - ) + if tries_remaining == 0: + raise error + self.log.info( + ( + f"Could not connect to the controller ({self.uri}). " + f"Trying {tries_remaining} more times..." ) - time.sleep(5) + ) + time.sleep(5) else: self.log.info(f"Connected to the controller ({self.uri})!") From 72d2480062dc13e2053d34b64f138489791929d2 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 6 Nov 2025 14:55:04 +0000 Subject: [PATCH 04/14] controller: simplify response flag setting. --- src/drunc/controller/controller.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index 728ba5d95..aacfae96e 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -775,6 +775,7 @@ def status( response = StatusResponse( token=None, name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) # This node. @@ -797,8 +798,6 @@ def status( ) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response @broadcasted @@ -811,6 +810,7 @@ def describe( response = DescribeResponse( token=None, name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) # This node. @@ -842,8 +842,6 @@ def describe( ) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response @broadcasted @@ -856,6 +854,7 @@ def describe_fsm( response = DescribeFSMResponse( token=None, name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) # What transitions to describe. @@ -903,8 +902,6 @@ def describe_fsm( ) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response ######################################## @@ -932,6 +929,8 @@ def execute_fsm_command( token=None, name=self.name, command_name=command_name, + fsm_flag=FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) # Check controller readiness. @@ -939,6 +938,7 @@ def execute_fsm_command( self.log.error( f"Command '{command_name}' not executed: controller is not ready." ) + response.fsm_flag = FSMResponseFlag.FSM_FAILED response.flag = ResponseFlag.NOT_EXECUTED_NOT_READY return response @@ -946,14 +946,12 @@ def execute_fsm_command( if self.stateful_node.node_is_in_error(): self.log.error(f"Command '{command_name}' not executed: node is in error.") response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_IN_ERROR - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY return response # Check if node is excluded. if not self.stateful_node.node_is_included(): self.log.error(f"Command '{command_name}' not executed: node is excluded.") response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_EXCLUDED - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY return response # Check if transition is possible from current state. @@ -963,7 +961,6 @@ def execute_fsm_command( f"Command '{command_name}' not executed: not possible from state '{state}'." ) response.fsm_flag = FSMResponseFlag.FSM_INVALID_TRANSITION - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY return response # This node. @@ -1043,7 +1040,6 @@ def execute_fsm_command( ) # Set FSM error flag based on child responses. - response.fsm_flag = FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY for child_response in child_responses: if child_response.flag not in [ ResponseFlag.EXECUTED_SUCCESSFULLY, @@ -1072,8 +1068,6 @@ def execute_fsm_command( ) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response @broadcasted @@ -1089,6 +1083,7 @@ def execute_expert_command( response = ExecuteExpertCommandResponse( token=None, name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) # This node. @@ -1110,8 +1105,6 @@ def execute_expert_command( ) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response @broadcasted @@ -1225,6 +1218,7 @@ def recompute_status( response = RecomputeStatusResponse( token=None, name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) # This node. @@ -1324,8 +1318,6 @@ def recompute_status( ) response.children.extend(child_responses) - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response ########################################## From 94cb3b442ec9e307c23737236ec52f4e6b961a51 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 7 Nov 2025 13:35:06 +0000 Subject: [PATCH 05/14] Move remaining propagate_command contents into REST child. --- .../children_interface/client_side_child.py | 35 ------------------- .../children_interface/rest_api_child.py | 35 ++++++++++++++++++- 2 files changed, 34 insertions(+), 36 deletions(-) diff --git a/src/drunc/controller/children_interface/client_side_child.py b/src/drunc/controller/children_interface/client_side_child.py index b4a433e02..973dee0c9 100644 --- a/src/drunc/controller/children_interface/client_side_child.py +++ b/src/drunc/controller/children_interface/client_side_child.py @@ -1,14 +1,8 @@ from threading import Lock -from druncschema.controller_pb2 import AddressedCommand -from druncschema.generic_pb2 import PlainText -from druncschema.request_response_pb2 import Response, ResponseFlag -from druncschema.token_pb2 import Token - from drunc.controller.children_interface.child_node import ChildNode from drunc.fsm.configuration import FSMConfHandler from drunc.fsm.core import FSM -from drunc.utils.grpc_utils import pack_to_any from drunc.utils.utils import ControlType, get_logger @@ -85,32 +79,3 @@ def __init__( if fsm_configuration: fsmch = FSMConfHandler(fsm_configuration) self.fsm = FSM(conf=fsmch) - - def propagate_command( - self, - command: str, - request: AddressedCommand, - token: Token | None, - ) -> Response: - if command == "exclude": - self.state.exclude() - return Response( - name=self.name, - data=pack_to_any(PlainText(text=f"'{self.name}' excluded")), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - if command == "include": - self.state.include() - return Response( - name=self.name, - data=pack_to_any(PlainText(text=f"'{self.name}' included")), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - # If we get here, we don't run the command. - self.log.info(f"Ignoring command '{command}' sent to '{self.name}'") - return Response( - name=self.name, - flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, - ) diff --git a/src/drunc/controller/children_interface/rest_api_child.py b/src/drunc/controller/children_interface/rest_api_child.py index 46ce43b14..d6f5c5fc1 100644 --- a/src/drunc/controller/children_interface/rest_api_child.py +++ b/src/drunc/controller/children_interface/rest_api_child.py @@ -9,6 +9,7 @@ import requests import socks from druncschema.controller_pb2 import ( + AddressedCommand, ExecuteExpertCommandResponse, ExecuteFSMCommandResponse, FSMCommand, @@ -16,7 +17,9 @@ Status, StatusResponse, ) -from druncschema.request_response_pb2 import ResponseFlag +from druncschema.generic_pb2 import PlainText +from druncschema.request_response_pb2 import Response, ResponseFlag +from druncschema.token_pb2 import Token from flask import Flask, request from flask_restful import Api @@ -27,6 +30,7 @@ from drunc.fsm.core import FSM from drunc.utils.configuration import ConfHandler from drunc.utils.flask_manager import FlaskManager +from drunc.utils.grpc_utils import pack_to_any from drunc.utils.utils import ControlType, get_logger, get_new_port @@ -406,6 +410,35 @@ def __str__(self) -> str: def get_endpoint(self) -> str: return f"rest://{self.app_host}:{self.app_port}" + def propagate_command( + self, + command: str, + request: AddressedCommand, + token: Token | None, + ) -> Response: + if command == "exclude": + self.state.exclude() + return Response( + name=self.name, + data=pack_to_any(PlainText(text=f"'{self.name}' excluded")), + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + if command == "include": + self.state.include() + return Response( + name=self.name, + data=pack_to_any(PlainText(text=f"'{self.name}' included")), + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + # If we get here, we don't run the command. + self.log.info(f"Ignoring command '{command}' sent to '{self.name}'") + return Response( + name=self.name, + flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, + ) + def status( self, target: str = "", From d83c7265915bb53c30c77c42e2031f1a0f238949 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 7 Nov 2025 15:23:14 +0000 Subject: [PATCH 06/14] Proper abstract class pattern for child nodes. --- .../children_interface/child_node.py | 119 +++-------- .../children_interface/rest_api_child.py | 194 ++++++++++++------ 2 files changed, 153 insertions(+), 160 deletions(-) diff --git a/src/drunc/controller/children_interface/child_node.py b/src/drunc/controller/children_interface/child_node.py index 2eeaa0a6a..b935b6a71 100644 --- a/src/drunc/controller/children_interface/child_node.py +++ b/src/drunc/controller/children_interface/child_node.py @@ -1,4 +1,4 @@ -import os +from abc import ABC, abstractmethod from druncschema.controller_pb2 import ( AddressedCommand, @@ -7,15 +7,12 @@ ExecuteExpertCommandResponse, ExecuteFSMCommandResponse, FSMCommand, - Status, StatusResponse, ) -from druncschema.description_pb2 import Description -from druncschema.request_response_pb2 import Response, ResponseFlag +from druncschema.request_response_pb2 import Response from druncschema.token_pb2 import Token from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful -from drunc.controller.utils import get_detector_name from drunc.exceptions import DruncSetupException from drunc.utils.configuration import ConfTypes from drunc.utils.utils import ( @@ -31,7 +28,7 @@ def __init__(self, t, name): super().__init__(f"The type {t} is not supported for the ChildNode {name}") -class ChildNode: +class ChildNode(ABC): def __init__( self, name: str, @@ -45,94 +42,46 @@ def __init__( self.configuration = configuration self.included = True + # TODO: terminate abstraction + def terminate(self): + pass + + @abstractmethod def __str__(self) -> str: - return f"'{self.name}' (type {self.node_type})" + pass + @abstractmethod def get_endpoint(self) -> str: - return "" - - def terminate(self): pass + @abstractmethod def propagate_command( self, command: str, request: AddressedCommand, token: Token | None, ) -> Response: - return Response( - name=self.name, - token=token, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) + pass + @abstractmethod def status( self, target: str = "", execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> StatusResponse: - status = Status( - state="unknown", - sub_state="unknown", - in_error=False, - included=True, - ) - - response = StatusResponse( - token=None, - name=self.name, - status=status, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) - - return response + pass + @abstractmethod def describe( self, target: str = "", execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> DescribeResponse: - descriptionType = None - descriptionName = None - - if self.configuration is not None: - if hasattr( - self.configuration.data, "application_name" - ): # Get the application name and type - descriptionType = self.configuration.data.application_name - descriptionName = self.configuration.data.id - elif hasattr(self.configuration.data, "controller") and hasattr( - self.configuration.data.controller, "application_name" - ): # Get the controller name and type - descriptionType = self.configuration.data.controller.application_name - descriptionName = self.configuration.data.controller.id - - description = Description( - type=descriptionType, - name=descriptionName, - endpoint=self.get_endpoint(), - info=( - get_detector_name(self.configuration) - if self.configuration is not None - else None - ), - session=os.getenv("DUNEDAQ_SESSION"), - commands=None, - broadcast=None, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - response = DescribeResponse( - token=None, - name=self.name, - description=description, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - return response + pass + @abstractmethod def describe_fsm( self, target: str = "", @@ -140,14 +89,9 @@ def describe_fsm( execute_on_all_subsequent_children_in_path: bool = True, key: str = "", ) -> DescribeFSMResponse: - response = DescribeFSMResponse( - token=None, - name=self.name, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) - - return response + pass + @abstractmethod def execute_fsm_command( self, command: FSMCommand, @@ -155,15 +99,9 @@ def execute_fsm_command( execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> ExecuteFSMCommandResponse: - response = ExecuteFSMCommandResponse( - token=None, - name=self.name, - command_name=command.command_name, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) - - return response + pass + @abstractmethod def execute_expert_command( self, json_string: str, @@ -171,25 +109,16 @@ def execute_expert_command( execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> ExecuteExpertCommandResponse: - response = ExecuteExpertCommandResponse( - token=None, - name=self.name, - flag=ResponseFlag.NOT_EXECUTED_NOT_READY, - ) - - return response + pass + @abstractmethod def recompute_status( self, target: str = "", execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, ) -> StatusResponse: - return self.status( - target=target, - execute_along_path=execute_along_path, - execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, - ) + pass # TODO: needs reimplementation @staticmethod diff --git a/src/drunc/controller/children_interface/rest_api_child.py b/src/drunc/controller/children_interface/rest_api_child.py index d6f5c5fc1..44bbb9061 100644 --- a/src/drunc/controller/children_interface/rest_api_child.py +++ b/src/drunc/controller/children_interface/rest_api_child.py @@ -1,5 +1,6 @@ import json import multiprocessing +import os import queue import socket import threading @@ -10,6 +11,8 @@ import socks from druncschema.controller_pb2 import ( AddressedCommand, + DescribeFSMResponse, + DescribeResponse, ExecuteExpertCommandResponse, ExecuteFSMCommandResponse, FSMCommand, @@ -17,6 +20,7 @@ Status, StatusResponse, ) +from druncschema.description_pb2 import Description from druncschema.generic_pb2 import PlainText from druncschema.request_response_pb2 import Response, ResponseFlag from druncschema.token_pb2 import Token @@ -25,6 +29,7 @@ from drunc.controller.children_interface.client_side_child import ClientSideChild from drunc.controller.exceptions import ChildError, ExpertCommandException +from drunc.controller.utils import get_detector_name from drunc.exceptions import DruncException, DruncSetupException from drunc.fsm.configuration import FSMConfHandler from drunc.fsm.core import FSM @@ -463,6 +468,124 @@ def status( return response + def describe( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> DescribeResponse: + response = DescribeResponse( + token=None, + name=self.name, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + description = Description( + endpoint=self.get_endpoint(), + session=os.getenv("DUNEDAQ_SESSION"), + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + if self.configuration is not None: + description.info = get_detector_name(self.configuration) + if hasattr( + self.configuration.data, "application_name" + ): # Application nodes. + description.type = self.configuration.data.application_name + description.name = self.configuration.data.id + elif hasattr(self.configuration.data, "controller") and hasattr( + self.configuration.data.controller, "application_name" + ): # Controller nodes. + description.type = self.configuration.data.controller.application_name + description.name = self.configuration.data.controller.id + + response.description.CopyFrom(description) + + return response + + def describe_fsm( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + key: str = "", + ) -> DescribeFSMResponse: + return DescribeFSMResponse( + token=None, + name=self.name, + flag=ResponseFlag.NOT_EXECUTED_NOT_IMPLEMENTED, + ) + + def execute_fsm_command( + self, + command: FSMCommand, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> ExecuteFSMCommandResponse: + command_name = command.command_name + response = ExecuteFSMCommandResponse( + token=None, + name=self.name, + command_name=command_name, + fsm_flag=FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY, + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + # Don't execute command if we are excluded. + if self.state.excluded(): + response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_EXCLUDED + response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY + return response + + try: + module_data = json.loads(command.data if command.data else "{}") + except JSONDecodeError as e: + self.log.error(f"Error parsing JSON command data: {e}") + response.fsm_flag = FSMResponseFlag.FSM_FAILED + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + + cmd_data = {"modules": [{"data": module_data, "match": ""}]} + entry_state = self.state.get_operational_state() + transition = self.fsm.get_transition(command_name) + exit_state = self.fsm.get_destination_state(entry_state, transition) + self.state.executing_command_mark() + self.log.info(f"Sending '{command_name}' to '{self.name}'") + + try: + self.commander.send_app_command( + cmd_id=command_name, + module_data=cmd_data, + entry_state=entry_state.upper(), + exit_state=exit_state.upper(), + ) + self.log.debug(f"Sent '{command_name}' to '{self.name}'") + + r = self.commander.check_response(150) + self.log.debug(f"Got response from '{command_name}' to '{self.name}'") + + response.data = json.dumps(r) + + if not r["success"]: + # The RPC was successful, but the FSM command was not. + self.log.error(r["result"]) + self.state.to_error() + response.fsm_flag = FSMResponseFlag.FSM_FAILED + return response + + except Exception as e: + self.log.error(f"Got error from '{command_name}' to '{self.name}': {e!s}") + self.state.to_error() + response.fsm_flag = FSMResponseFlag.FSM_FAILED + response.flag = ResponseFlag.UNHANDLED_EXCEPTION_THROWN + return response + + self.state.end_command_execution_mark() + self.state.new_operational_state(exit_state) + + return response + def execute_expert_command( self, json_string: str, @@ -553,73 +676,14 @@ def execute_expert_command( return response - def execute_fsm_command( + def recompute_status( self, - command: FSMCommand, target: str = "", execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, - ) -> ExecuteFSMCommandResponse: - command_name = command.command_name - - response = ExecuteFSMCommandResponse( - token=None, - name=self.name, - command_name=command_name, - fsm_flag=FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY, - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) -> StatusResponse: + return self.status( + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, ) - - # Don't execute command if we are excluded. - if self.state.excluded(): - response.fsm_flag = FSMResponseFlag.FSM_NOT_EXECUTED_EXCLUDED - response.flag = ResponseFlag.EXECUTED_SUCCESSFULLY - return response - - try: - module_data = json.loads(command.data if command.data else "{}") - except JSONDecodeError as e: - self.log.error(f"Error parsing JSON command data: {e}") - response.fsm_flag = FSMResponseFlag.FSM_FAILED - response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT - return response - - cmd_data = {"modules": [{"data": module_data, "match": ""}]} - entry_state = self.state.get_operational_state() - transition = self.fsm.get_transition(command_name) - exit_state = self.fsm.get_destination_state(entry_state, transition) - self.state.executing_command_mark() - self.log.info(f"Sending '{command_name}' to '{self.name}'") - - try: - self.commander.send_app_command( - cmd_id=command_name, - module_data=cmd_data, - entry_state=entry_state.upper(), - exit_state=exit_state.upper(), - ) - self.log.debug(f"Sent '{command_name}' to '{self.name}'") - - r = self.commander.check_response(150) - self.log.debug(f"Got response from '{command_name}' to '{self.name}'") - - response.data = json.dumps(r) - - if not r["success"]: - # The RPC was successful, but the FSM command was not. - self.log.error(r["result"]) - self.state.to_error() - response.fsm_flag = FSMResponseFlag.FSM_FAILED - return response - - except Exception as e: - self.log.error(f"Got error from '{command_name}' to '{self.name}': {e!s}") - self.state.to_error() - response.fsm_flag = FSMResponseFlag.FSM_FAILED - response.flag = ResponseFlag.UNHANDLED_EXCEPTION_THROWN - return response - - self.state.end_command_execution_mark() - self.state.new_operational_state(exit_state) - - return response From 14d0b23f29d6f5e7e8372c5e6e19eb8c3ee8bb7d Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Fri, 7 Nov 2025 16:02:59 +0000 Subject: [PATCH 07/14] include/exclude for all child node types. --- .../children_interface/child_node.py | 28 ++++++- .../children_interface/grpc_child.py | 78 ++++++++++++++++--- .../children_interface/rest_api_child.py | 53 ++++++++----- src/drunc/controller/controller.py | 22 +----- 4 files changed, 131 insertions(+), 50 deletions(-) diff --git a/src/drunc/controller/children_interface/child_node.py b/src/drunc/controller/children_interface/child_node.py index b935b6a71..728b31d5e 100644 --- a/src/drunc/controller/children_interface/child_node.py +++ b/src/drunc/controller/children_interface/child_node.py @@ -7,6 +7,7 @@ ExecuteExpertCommandResponse, ExecuteFSMCommandResponse, FSMCommand, + IncludeExcludeResponse, StatusResponse, ) from druncschema.request_response_pb2 import Response @@ -29,6 +30,7 @@ def __init__(self, t, name): class ChildNode(ABC): + # TODO: __init__ abstraction def __init__( self, name: str, @@ -42,10 +44,6 @@ def __init__( self.configuration = configuration self.included = True - # TODO: terminate abstraction - def terminate(self): - pass - @abstractmethod def __str__(self) -> str: pass @@ -54,6 +52,10 @@ def __str__(self) -> str: def get_endpoint(self) -> str: pass + @abstractmethod + def terminate(self) -> None: + pass + @abstractmethod def propagate_command( self, @@ -111,6 +113,24 @@ def execute_expert_command( ) -> ExecuteExpertCommandResponse: pass + @abstractmethod + def include( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + pass + + @abstractmethod + def exclude( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + pass + @abstractmethod def recompute_status( self, diff --git a/src/drunc/controller/children_interface/grpc_child.py b/src/drunc/controller/children_interface/grpc_child.py index dcbc92eef..165f52ef5 100644 --- a/src/drunc/controller/children_interface/grpc_child.py +++ b/src/drunc/controller/children_interface/grpc_child.py @@ -12,6 +12,8 @@ ExecuteFSMCommandRequest, ExecuteFSMCommandResponse, FSMCommand, + IncludeExcludeRequest, + IncludeExcludeResponse, StatusResponse, ) from druncschema.controller_pb2_grpc import ControllerStub @@ -170,15 +172,7 @@ def __str__(self) -> str: def get_endpoint(self) -> str: return self.uri - def start_listening(self, bdesc): - self.broadcast = BroadcastHandler( - BroadcastClientConfHandler( - data=bdesc, - type=ConfTypes.ProtobufAny, - ) - ) - - def terminate(self): + def terminate(self) -> None: if self.channel: self.channel.close() del self.channel @@ -188,6 +182,14 @@ def terminate(self): self.channel = None self.broadcast.stop() + def start_listening(self, bdesc): + self.broadcast = BroadcastHandler( + BroadcastClientConfHandler( + data=bdesc, + type=ConfTypes.ProtobufAny, + ) + ) + def propagate_command( self, command: str, @@ -359,6 +361,64 @@ def execute_expert_command( return response + def include( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + request = IncludeExcludeRequest( + token=None, + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + self.included = True + + try: + response = self.stub.include(request) + except grpc.RpcError as e: + try: + self.handle_child_grpc_error(e) + except ServerUnreachable: + self.log.warning( + f"Connection to {self.name} at {self.uri} failed during include, attempting to reconnect..." + ) + response = self._attempt_reconnection( + lambda: self.stub.include(request) + ) + + return response + + def exclude( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + request = IncludeExcludeRequest( + token=None, + target=target, + execute_along_path=execute_along_path, + execute_on_all_subsequent_children_in_path=execute_on_all_subsequent_children_in_path, + ) + self.included = False + + try: + response = self.stub.exclude(request) + except grpc.RpcError as e: + try: + self.handle_child_grpc_error(e) + except ServerUnreachable: + self.log.warning( + f"Connection to {self.name} at {self.uri} failed during exclude, attempting to reconnect..." + ) + response = self._attempt_reconnection( + lambda: self.stub.exclude(request) + ) + + return response + def recompute_status( self, target: str = "", diff --git a/src/drunc/controller/children_interface/rest_api_child.py b/src/drunc/controller/children_interface/rest_api_child.py index 44bbb9061..52ed06506 100644 --- a/src/drunc/controller/children_interface/rest_api_child.py +++ b/src/drunc/controller/children_interface/rest_api_child.py @@ -17,11 +17,11 @@ ExecuteFSMCommandResponse, FSMCommand, FSMResponseFlag, + IncludeExcludeResponse, Status, StatusResponse, ) from druncschema.description_pb2 import Description -from druncschema.generic_pb2 import PlainText from druncschema.request_response_pb2 import Response, ResponseFlag from druncschema.token_pb2 import Token from flask import Flask, request @@ -35,7 +35,6 @@ from drunc.fsm.core import FSM from drunc.utils.configuration import ConfHandler from drunc.utils.flask_manager import FlaskManager -from drunc.utils.grpc_utils import pack_to_any from drunc.utils.utils import ControlType, get_logger, get_new_port @@ -415,29 +414,15 @@ def __str__(self) -> str: def get_endpoint(self) -> str: return f"rest://{self.app_host}:{self.app_port}" + def terminate(self) -> None: + pass + def propagate_command( self, command: str, request: AddressedCommand, token: Token | None, ) -> Response: - if command == "exclude": - self.state.exclude() - return Response( - name=self.name, - data=pack_to_any(PlainText(text=f"'{self.name}' excluded")), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - if command == "include": - self.state.include() - return Response( - name=self.name, - data=pack_to_any(PlainText(text=f"'{self.name}' included")), - flag=ResponseFlag.EXECUTED_SUCCESSFULLY, - ) - - # If we get here, we don't run the command. self.log.info(f"Ignoring command '{command}' sent to '{self.name}'") return Response( name=self.name, @@ -676,6 +661,36 @@ def execute_expert_command( return response + def include( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + self.state.include() + self.included = True + return IncludeExcludeResponse( + token=None, + name=self.name, + text=f"'{self.name}' included", + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + + def exclude( + self, + target: str = "", + execute_along_path: bool = True, + execute_on_all_subsequent_children_in_path: bool = True, + ) -> IncludeExcludeResponse: + self.state.exclude() + self.included = False + return IncludeExcludeResponse( + token=None, + name=self.name, + text=f"'{self.name}' excluded", + flag=ResponseFlag.EXECUTED_SUCCESSFULLY, + ) + def recompute_status( self, target: str = "", diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index aacfae96e..2367762c5 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -1128,9 +1128,9 @@ def include( try: self.stateful_node.include_node() except CannotInclude: - response.text = f"{self.name} is already included" + response.text = f"'{self.name}' is already included" else: - response.text = f"{self.name} included" + response.text = f"'{self.name}' included" # Children nodes (ignore exclusion). child_list = self.address_target_path( @@ -1148,13 +1148,6 @@ def include( ) response.children.extend(child_responses) - # # TODO: DO BELOW IN THE CHILDREN NODES INSTEAD OF HERE - # # Now we snoop into the addressed_commands and see if we can find a target that is a children, and include it - # for child_name, addressed_command in addressed_commands.items(): - # for n in self.children_nodes: - # if n.name == addressed_command.target: - # n.included = True - return response @broadcasted @@ -1178,9 +1171,9 @@ def exclude( try: self.stateful_node.exclude_node() except CannotExclude: - response.text = f"{self.name} is already excluded" + response.text = f"'{self.name}' is already excluded" else: - response.text = f"{self.name} excluded" + response.text = f"'{self.name}' excluded" # Children nodes (ignore exclusion). child_list = self.address_target_path( @@ -1198,13 +1191,6 @@ def exclude( ) response.children.extend(child_responses) - # # TODO: DO BELOW IN THE CHILDREN NODES INSTEAD OF HERE - # # Now we snoop into the addressed_commands and see if we can find a target that is a children, and exclude it - # for child_name, addressed_command in addressed_commands.items(): - # for n in self.children_nodes: - # if n.name == addressed_command.target: - # n.included = False - return response @broadcasted From 2f28819bfa4bad9592523d67907551987a28a561 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 10 Nov 2025 13:57:26 +0000 Subject: [PATCH 08/14] RecomputeStatusResponse -> StatusResponse. --- src/drunc/controller/controller.py | 5 ++--- src/drunc/controller/controller_driver.py | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index 2367762c5..a5d01df54 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -21,7 +21,6 @@ FSMResponseFlag, IncludeExcludeRequest, IncludeExcludeResponse, - RecomputeStatusResponse, StatusResponse, ) from druncschema.controller_pb2_grpc import ControllerServicer @@ -1199,9 +1198,9 @@ def exclude( @publish_command_time def recompute_status( self, request: AddressedCommand, context: ServicerContext - ) -> RecomputeStatusResponse: + ) -> StatusResponse: request.target = self.parse_target_string(request.target) - response = RecomputeStatusResponse( + response = StatusResponse( token=None, name=self.name, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index 568c42fea..240f1495b 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -12,7 +12,6 @@ FSMCommand, IncludeExcludeRequest, IncludeExcludeResponse, - RecomputeStatusResponse, StatusResponse, ) from druncschema.controller_pb2_grpc import ControllerStub @@ -228,7 +227,7 @@ def recompute_status( execute_along_path: bool = True, execute_on_all_subsequent_children_in_path: bool = True, timeout: int | float = 60, - ) -> RecomputeStatusResponse: + ) -> StatusResponse: request = AddressedCommand( command_name="recompute_status", target=target, From c5050b4d6b4af7a240b5d4f80380a445e63cc12c Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 10 Nov 2025 16:10:24 +0000 Subject: [PATCH 09/14] Type hints. --- src/drunc/controller/configuration.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/drunc/controller/configuration.py b/src/drunc/controller/configuration.py index 5f0fa9fcd..eb35819a5 100644 --- a/src/drunc/controller/configuration.py +++ b/src/drunc/controller/configuration.py @@ -1,6 +1,7 @@ import socket import threading +from druncschema.token_pb2 import Token from kafkaopmon.OpMonPublisher import OpMonPublisher as KafkaOpMonPublisher from opmonlib.publisher import OpMonPublisher from opmonlib.utils import parse_opmon_conf @@ -133,19 +134,19 @@ def get_dummy_children(self): def update_children( self, - children, - init_token, - without_excluded=False, + children: list[ChildNode], + init_token: Token, + without_excluded: bool = False, connectivity_service=None, session_name=None, - ): + ) -> list[ChildNode]: enabled_only = not without_excluded timeout = 60 # 60s for each application to start and show up on the connectivity service self.log.debug(f"get_children: connectivity service lookup timeout={timeout}") session = None - self.children = [] + self.children: list[ChildNode] = [] try: session = self.db.get_dal(class_name="Session", uid=self.oks_key.session) @@ -189,6 +190,7 @@ def process_segment(segment): got_child = True break if not got_child: + # TODO: is this *EVER* hit? self.children.append(new_node) def process_application(app): @@ -221,6 +223,7 @@ def process_application(app): got_child = True break if not got_child: + # TODO: is this *EVER* hit? self.children.append(new_node) # threading the children look up From 29a9ef34965bd230f73b697481d653dacf44f7a6 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Mon, 10 Nov 2025 22:47:54 +0000 Subject: [PATCH 10/14] Don't use dummy child coltrollr nodes. --- .../children_interface/child_node.py | 104 ---------- .../children_interface/grpc_child.py | 4 +- .../children_interface/rest_api_child.py | 5 +- src/drunc/controller/configuration.py | 181 +++++++++--------- src/drunc/controller/controller.py | 9 +- src/drunc/utils/utils.py | 24 ++- 6 files changed, 114 insertions(+), 213 deletions(-) diff --git a/src/drunc/controller/children_interface/child_node.py b/src/drunc/controller/children_interface/child_node.py index 728b31d5e..1bda903fe 100644 --- a/src/drunc/controller/children_interface/child_node.py +++ b/src/drunc/controller/children_interface/child_node.py @@ -13,13 +13,9 @@ from druncschema.request_response_pb2 import Response from druncschema.token_pb2 import Token -from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful from drunc.exceptions import DruncSetupException -from drunc.utils.configuration import ConfTypes from drunc.utils.utils import ( ControlType, - get_control_type_and_uri_from_cli, - get_control_type_and_uri_from_connectivity_service, get_logger, ) @@ -139,103 +135,3 @@ def recompute_status( execute_on_all_subsequent_children_in_path: bool = True, ) -> StatusResponse: pass - - # TODO: needs reimplementation - @staticmethod - def get_child( - name: str, - cli, - configuration, - init_token=None, - connectivity_service=None, - timeout=60, - **kwargs, - ): - log = get_logger("controller.child_node") - ctype = ControlType.Unknown - uri = None - node_in_error = False - - if connectivity_service: - try: - ctype, uri = get_control_type_and_uri_from_connectivity_service( - connectivity_service, name, timeout=timeout - ) - except ApplicationLookupUnsuccessful as alu: - log.error( - f"Could not find the application '{name}' in the connectivity service: {alu}" - ) - - if ctype == ControlType.Unknown: - try: - ctype, uri = get_control_type_and_uri_from_cli(cli) - except DruncSetupException as e: - log.error( - f"Could not understand how to talk to the application '{name}' from its CLI: {e}" - ) - - address = None - port = 0 - if uri is not None: - try: - address, port = uri.split(":") - port = int(port) - except ValueError as e: - log.debug(f"Could not split the URI {uri} into address and port: {e}") - - if ctype == ControlType.Unknown or address is None or port == 0: - log.error(f"Could not understand how to talk to '{name}'") - node_in_error = True - ctype = ControlType.Direct - - log.info(f"Child {name} is of type {ctype} and has the URI {uri}") - - match ctype: - case ControlType.gRPC: - from drunc.controller.children_interface.grpc_child import ( - gRCPChildConfHandler, - gRPCChildNode, - ) - - return gRPCChildNode( - configuration=gRCPChildConfHandler( - configuration, ConfTypes.PyObject - ), - init_token=init_token, - name=name, - uri=uri, - connectivity_service=connectivity_service, - **kwargs, - ) - - case ControlType.REST_API: - from drunc.controller.children_interface.rest_api_child import ( - RESTAPIChildNode, - RESTAPIChildNodeConfHandler, - ) - - return RESTAPIChildNode( - configuration=RESTAPIChildNodeConfHandler( - configuration, ConfTypes.PyObject - ), - name=name, - uri=uri, - # init_token = init_token, # No authentication for RESTAPI - **kwargs, - ) - - case ControlType.Direct: - from drunc.controller.children_interface.client_side_child import ( - ClientSideChild, - ) - - node = ClientSideChild( - name=name, - **kwargs, - ) - if node_in_error: - node.state.to_error() - return node - - case _: - raise ChildInterfaceTechnologyUnknown(ctype, name) diff --git a/src/drunc/controller/children_interface/grpc_child.py b/src/drunc/controller/children_interface/grpc_child.py index 165f52ef5..d87df6582 100644 --- a/src/drunc/controller/children_interface/grpc_child.py +++ b/src/drunc/controller/children_interface/grpc_child.py @@ -57,10 +57,10 @@ class gRPCChildNode(ChildNode): def __init__( self, name, + uri, configuration: gRCPChildConfHandler, + connectivity_service, init_token, - uri, - connectivity_service=None, ): super().__init__( name=name, diff --git a/src/drunc/controller/children_interface/rest_api_child.py b/src/drunc/controller/children_interface/rest_api_child.py index 52ed06506..acd72ed2b 100644 --- a/src/drunc/controller/children_interface/rest_api_child.py +++ b/src/drunc/controller/children_interface/rest_api_child.py @@ -364,9 +364,9 @@ class RESTAPIChildNode(ClientSideChild): def __init__( self, name, + uri, configuration: RESTAPIChildNodeConfHandler, fsm_configuration: FSMConfHandler, - uri, ): super().__init__( name=name, @@ -472,7 +472,8 @@ def describe( ) if self.configuration is not None: - description.info = get_detector_name(self.configuration) + if detector_name := get_detector_name(self.configuration): + description.info = detector_name if hasattr( self.configuration.data, "application_name" ): # Application nodes. diff --git a/src/drunc/controller/configuration.py b/src/drunc/controller/configuration.py index eb35819a5..88503223e 100644 --- a/src/drunc/controller/configuration.py +++ b/src/drunc/controller/configuration.py @@ -1,21 +1,31 @@ import socket import threading +import confmodel_dal from druncschema.token_pb2 import Token from kafkaopmon.OpMonPublisher import OpMonPublisher as KafkaOpMonPublisher from opmonlib.publisher import OpMonPublisher from opmonlib.utils import parse_opmon_conf +from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful from drunc.controller.children_interface.child_node import ChildNode +from drunc.controller.children_interface.grpc_child import ( + gRCPChildConfHandler, + gRPCChildNode, +) from drunc.controller.children_interface.rest_api_child import ( + RESTAPIChildNode, RESTAPIChildNodeConfHandler, ) from drunc.exceptions import DruncCommandException, DruncSetupException from drunc.process_manager.configuration import get_commandline_parameters from drunc.utils.configuration import ConfHandler, ConfTypes -from drunc.utils.utils import ControlType - -import confmodel_dal # isort: skip +from drunc.utils.utils import ( + ControlType, + get_control_type_and_uri_from_cli, + get_control_type_and_uri_from_connectivity_service, + get_logger, +) class ControllerConfData: # the bastardised OKS @@ -57,7 +67,6 @@ def _grab_segment_conf_from_controller(self, configuration): def _post_process_oks(self, *args, **kwargs): self.authoriser = None - self.children = [] self.data = self._grab_segment_conf_from_controller(self.data) self.this_host = self.data.controller.runs_on.runs_on.id @@ -106,97 +115,57 @@ def _post_process_oks(self, *args, **kwargs): raise DruncCommandException("Failed to initialize OpMonPublisher.") return - def get_dummy_children(self): - ret = [] - session = self.db.get_dal(class_name="Session", uid=self.oks_key.session) - - for seg in self.data.segments: - if confmodel_dal.component_disabled(self.db._obj, session.id, seg.id): - continue - ret.append( - ChildNode( - name=seg.controller.id, - configuration=RESTAPIChildNodeConfHandler(seg, ConfTypes.PyObject), - node_type=ControlType.Unknown, - ) - ) - for app in self.data.applications: - if confmodel_dal.component_disabled(self.db._obj, session.id, app.id): - continue - ret.append( - ChildNode( - name=app.id, - configuration=RESTAPIChildNodeConfHandler(app, ConfTypes.PyObject), - node_type=ControlType.Unknown, - ) - ) - return ret - - def update_children( + def init_children( self, - children: list[ChildNode], + session_name, init_token: Token, - without_excluded: bool = False, connectivity_service=None, - session_name=None, + enabled_only: bool = True, ) -> list[ChildNode]: - enabled_only = not without_excluded - timeout = 60 # 60s for each application to start and show up on the connectivity service - - self.log.debug(f"get_children: connectivity service lookup timeout={timeout}") + child_nodes: list[ChildNode] = [] - session = None - self.children: list[ChildNode] = [] + # 60s for applications to show on the connectivity service. + timeout = 60 + self.log.debug(f"init_children: connectivity service timeout: {timeout}") try: session = self.db.get_dal(class_name="Session", uid=self.oks_key.session) - except ImportError: + session = None if enabled_only: self.log.error( "OKS was not set up, so configuration does not know about include/exclude. All the children nodes will be returned" ) - enabled_only = True - - self.log.debug(f"looping over children\n{self.data.segments}") + enabled_only = False def process_segment(segment): - if enabled_only: - if confmodel_dal.component_disabled( - self.db._obj, session.id, segment.id - ): - return - - new_node = ChildNode.get_child( - cli=get_commandline_parameters( - db=self.db, - config_filename=self.initial_data, - session_id=session.id, - session_name=session_name, - obj=segment.controller, - ), + if enabled_only and confmodel_dal.component_disabled( + self.db._obj, session.id, segment.id + ): + return # Ignore disabled segments. + + commandline_parameters = get_commandline_parameters( + db=self.db, + config_filename=self.initial_data, + session_id=session.id, + session_name=session_name, + obj=segment.controller, + ) + node = self.child_node_factory( + cli=commandline_parameters, init_token=init_token, name=segment.controller.id, configuration=segment, connectivity_service=connectivity_service, timeout=timeout, ) - if new_node: - got_child = False - - for idx, child in enumerate(children): - if child.name == new_node.name: - children[idx] = new_node - got_child = True - break - if not got_child: - # TODO: is this *EVER* hit? - self.children.append(new_node) + child_nodes.append(node) def process_application(app): - if enabled_only: - if confmodel_dal.component_disabled(self.db._obj, session.id, app.id): - return + if enabled_only and confmodel_dal.component_disabled( + self.db._obj, session.id, app.id + ): + return # Ignore disabled applications. commandline_parameters = get_commandline_parameters( db=self.db, @@ -205,26 +174,15 @@ def process_application(app): session_name=session_name, obj=app, ) - - new_node = ChildNode.get_child( + node = self.child_node_factory( cli=commandline_parameters, name=app.id, configuration=app, fsm_configuration=self.data.controller.fsm, connectivity_service=connectivity_service, - timeout=60, + timeout=timeout, ) - if new_node: - got_child = False - - for idx, child in enumerate(children): - if child.name == new_node.name: - children[idx] = new_node - got_child = True - break - if not got_child: - # TODO: is this *EVER* hit? - self.children.append(new_node) + child_nodes.append(node) # threading the children look up threads = [] @@ -244,4 +202,53 @@ def process_application(app): for t in threads: t.join() - return self.children + return child_nodes + + @staticmethod + def child_node_factory( + name: str, + cli, + configuration, + init_token=None, + connectivity_service=None, + timeout=60, + **kwargs, + ): + log = get_logger("controller.child_node") + + if connectivity_service: + try: + # Query the connectivity service. + ctype, uri = get_control_type_and_uri_from_connectivity_service( + connectivity_service, name, timeout=timeout + ) + except ApplicationLookupUnsuccessful as e: + log.error(f"Could not find '{name}' in the connectivity service: {e}") + raise e + else: + try: + # Fall back to the CLI arguments. + ctype, uri = get_control_type_and_uri_from_cli(cli) + except DruncSetupException as e: + log.error(f"Could not get '{name}' protocol from CLI: {e}") + raise e + + log.info(f"Child '{name}' is of type '{ctype}' and has the URI '{uri}'") + + match ctype: + case ControlType.gRPC: + conf_handler = gRCPChildConfHandler(configuration, ConfTypes.PyObject) + return gRPCChildNode( + name, uri, conf_handler, connectivity_service, init_token, **kwargs + ) + + case ControlType.REST_API: + conf_handler = RESTAPIChildNodeConfHandler( + configuration, ConfTypes.PyObject + ) + return RESTAPIChildNode(name, uri, conf_handler, **kwargs) + + case _: + error_message = f"Unknown protocol '{ctype}' for child '{name}'" + log.error(error_message) + raise DruncSetupException(error_message) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index a5d01df54..00141f6bc 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -285,17 +285,16 @@ def __init__(self, configuration, name: str, session: str, token: Token): address=f"{connection_server}:{connection_port}", ) - self.children_nodes = self.configuration.get_dummy_children() - def init_controller(self) -> None: log_init_controller = get_logger("controller.init_controller") log_init_controller.info("Finishing initialisation of controller") - self.configuration.update_children( - self.children_nodes, + + self.children_nodes = self.configuration.init_children( + session_name=self.session, init_token=self.actor.get_token(), connectivity_service=self.connectivity_service, - session_name=self.session, ) + # At this point, we already waited for 60s for the children applications to # start and show up on the connectivity service # We now wait for each application to get from "initialising" to "ready" diff --git a/src/drunc/utils/utils.py b/src/drunc/utils/utils.py index 2c7712cc8..615e19118 100644 --- a/src/drunc/utils/utils.py +++ b/src/drunc/utils/utils.py @@ -402,19 +402,17 @@ class ControlType(Enum): Direct = 3 -def get_control_type_and_uri_from_cli(CLAs: list[str]) -> ControlType: - for CLA in CLAs: - if CLA.startswith("rest://"): - return ControlType.REST_API, resolve_localhost_and_127_ip_to_network_ip( - CLA.replace("rest://", "") - ) - elif CLA.startswith("grpc://"): - return ControlType.gRPC, resolve_localhost_and_127_ip_to_network_ip( - CLA.replace("grpc://", "") - ) - raise DruncSetupException( - "Could not find if the child was controlled by gRPC or a REST API" - ) +def get_control_type_and_uri_from_cli(cli_args: list[str]) -> tuple[ControlType, str]: + for arg in cli_args: + if arg.startswith("rest://"): + uri = arg.replace("rest://", "") + uri = resolve_localhost_and_127_ip_to_network_ip(uri) + return ControlType.REST_API, uri + elif arg.startswith("grpc://"): + uri = arg.replace("grpc://", "") + uri = resolve_localhost_and_127_ip_to_network_ip(uri) + return ControlType.gRPC, uri + raise DruncSetupException("Protocol must be 'grpc://' or 'rest://'") def get_control_type_and_uri_from_connectivity_service( From 7486ab3204ceccb26bfdcf1d3a49c4ba9bbb1e3d Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Tue, 11 Nov 2025 15:14:13 +0000 Subject: [PATCH 11/14] Abstract child node __init__ and final tidy. --- .../children_interface/child_node.py | 12 +----- .../children_interface/client_side_child.py | 13 +++--- .../children_interface/grpc_child.py | 18 +++----- .../children_interface/rest_api_child.py | 14 ++---- src/drunc/controller/configuration.py | 43 ++++++++++--------- 5 files changed, 40 insertions(+), 60 deletions(-) diff --git a/src/drunc/controller/children_interface/child_node.py b/src/drunc/controller/children_interface/child_node.py index 1bda903fe..51431adc9 100644 --- a/src/drunc/controller/children_interface/child_node.py +++ b/src/drunc/controller/children_interface/child_node.py @@ -26,18 +26,10 @@ def __init__(self, t, name): class ChildNode(ABC): - # TODO: __init__ abstraction - def __init__( - self, - name: str, - configuration, - node_type: ControlType, - **kwargs, - ) -> None: - self.node_type = node_type + def __init__(self, name: str, node_type: ControlType): self.log = get_logger(f"controller.{name}-child-node") self.name = name - self.configuration = configuration + self.node_type = node_type self.included = True @abstractmethod diff --git a/src/drunc/controller/children_interface/client_side_child.py b/src/drunc/controller/children_interface/client_side_child.py index 973dee0c9..8311e40e9 100644 --- a/src/drunc/controller/children_interface/client_side_child.py +++ b/src/drunc/controller/children_interface/client_side_child.py @@ -3,7 +3,7 @@ from drunc.controller.children_interface.child_node import ChildNode from drunc.fsm.configuration import FSMConfHandler from drunc.fsm.core import FSM -from drunc.utils.utils import ControlType, get_logger +from drunc.utils.utils import ControlType class ClientSideState: @@ -67,13 +67,12 @@ def in_error(self): class ClientSideChild(ChildNode): def __init__( self, - name, - node_type: ControlType = ControlType.Direct, - fsm_configuration: FSMConfHandler = None, - configuration=None, + name: str, + node_type: ControlType, + fsm_configuration: FSMConfHandler, ): - super().__init__(name=name, node_type=node_type, configuration=configuration) - self.log = get_logger(f"controller.{name}-client-side") + super().__init__(name, node_type) + self.state = ClientSideState() self.fsm_configuration = fsm_configuration if fsm_configuration: diff --git a/src/drunc/controller/children_interface/grpc_child.py b/src/drunc/controller/children_interface/grpc_child.py index 410fa1537..cd53d8332 100644 --- a/src/drunc/controller/children_interface/grpc_child.py +++ b/src/drunc/controller/children_interface/grpc_child.py @@ -32,14 +32,12 @@ from drunc.utils.configuration import ConfHandler, ConfTypes from drunc.utils.grpc_utils import ( ServerUnreachable, - copy_token, rethrow_if_unreachable_server, unpack_any, ) from drunc.utils.utils import ( ControlType, get_control_type_and_uri_from_connectivity_service, - get_logger, ) @@ -56,19 +54,15 @@ def get_uri(self): class gRPCChildNode(ChildNode): def __init__( self, - name, - uri, + name: str, configuration: gRCPChildConfHandler, + uri: str, connectivity_service, - init_token, + init_token: Token | None = None, ): - super().__init__( - name=name, - node_type=ControlType.gRPC, - configuration=configuration, - ) + super().__init__(name, ControlType.gRPC) - self.log = get_logger(f"controller.{self.name}-grpc-child") + self.configuration = configuration self.connectivity_service = connectivity_service self._lock = threading.Lock() self.init_token = init_token @@ -96,7 +90,7 @@ def _setup_connection(self): self.stub = ControllerStub(self.channel) request = AddressedCommand( - token=copy_token(self.init_token), + token=None, command_name="describe", command_data=None, target="", diff --git a/src/drunc/controller/children_interface/rest_api_child.py b/src/drunc/controller/children_interface/rest_api_child.py index acd72ed2b..fdf749219 100644 --- a/src/drunc/controller/children_interface/rest_api_child.py +++ b/src/drunc/controller/children_interface/rest_api_child.py @@ -363,20 +363,14 @@ def get_host_port(self): class RESTAPIChildNode(ClientSideChild): def __init__( self, - name, - uri, + name: str, configuration: RESTAPIChildNodeConfHandler, + uri: str, fsm_configuration: FSMConfHandler, ): - super().__init__( - name=name, - node_type=ControlType.REST_API, - configuration=configuration, - fsm_configuration=fsm_configuration, - ) - - self.log = get_logger(f"controller.{name}_rest_api_child") + super().__init__(name, ControlType.REST_API, fsm_configuration) + self.configuration = configuration self.response_listener = ResponseListener.get() if fsm_configuration: diff --git a/src/drunc/controller/configuration.py b/src/drunc/controller/configuration.py index 88503223e..3285ea0d3 100644 --- a/src/drunc/controller/configuration.py +++ b/src/drunc/controller/configuration.py @@ -7,6 +7,7 @@ from opmonlib.publisher import OpMonPublisher from opmonlib.utils import parse_opmon_conf +from drunc.connectivity_service.client import ConnectivityServiceClient from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful from drunc.controller.children_interface.child_node import ChildNode from drunc.controller.children_interface.grpc_child import ( @@ -117,9 +118,9 @@ def _post_process_oks(self, *args, **kwargs): def init_children( self, - session_name, + session_name: str, init_token: Token, - connectivity_service=None, + connectivity_service: ConnectivityServiceClient | None = None, enabled_only: bool = True, ) -> list[ChildNode]: child_nodes: list[ChildNode] = [] @@ -144,7 +145,7 @@ def process_segment(segment): ): return # Ignore disabled segments. - commandline_parameters = get_commandline_parameters( + cmd_args = get_commandline_parameters( db=self.db, config_filename=self.initial_data, session_id=session.id, @@ -152,7 +153,7 @@ def process_segment(segment): obj=segment.controller, ) node = self.child_node_factory( - cli=commandline_parameters, + cmd_args=cmd_args, init_token=init_token, name=segment.controller.id, configuration=segment, @@ -167,7 +168,7 @@ def process_application(app): ): return # Ignore disabled applications. - commandline_parameters = get_commandline_parameters( + cmd_args = get_commandline_parameters( db=self.db, config_filename=self.initial_data, session_id=session.id, @@ -175,10 +176,9 @@ def process_application(app): obj=app, ) node = self.child_node_factory( - cli=commandline_parameters, + cmd_args=cmd_args, name=app.id, configuration=app, - fsm_configuration=self.data.controller.fsm, connectivity_service=connectivity_service, timeout=timeout, ) @@ -204,19 +204,18 @@ def process_application(app): return child_nodes - @staticmethod def child_node_factory( + self, name: str, - cli, + cmd_args: list[str], configuration, - init_token=None, - connectivity_service=None, - timeout=60, - **kwargs, - ): - log = get_logger("controller.child_node") - - if connectivity_service: + init_token: Token | None = None, + connectivity_service: ConnectivityServiceClient | None = None, + timeout: int = 60, + ) -> ChildNode: + log = get_logger("controller.child_node_factory") + + if connectivity_service is not None: try: # Query the connectivity service. ctype, uri = get_control_type_and_uri_from_connectivity_service( @@ -227,8 +226,8 @@ def child_node_factory( raise e else: try: - # Fall back to the CLI arguments. - ctype, uri = get_control_type_and_uri_from_cli(cli) + # Fall back to the command line arguments. + ctype, uri = get_control_type_and_uri_from_cli(cmd_args) except DruncSetupException as e: log.error(f"Could not get '{name}' protocol from CLI: {e}") raise e @@ -239,14 +238,16 @@ def child_node_factory( case ControlType.gRPC: conf_handler = gRCPChildConfHandler(configuration, ConfTypes.PyObject) return gRPCChildNode( - name, uri, conf_handler, connectivity_service, init_token, **kwargs + name, conf_handler, uri, connectivity_service, init_token ) case ControlType.REST_API: conf_handler = RESTAPIChildNodeConfHandler( configuration, ConfTypes.PyObject ) - return RESTAPIChildNode(name, uri, conf_handler, **kwargs) + return RESTAPIChildNode( + name, conf_handler, uri, self.data.controller.fsm + ) case _: error_message = f"Unknown protocol '{ctype}' for child '{name}'" From ef3b0b460ad219f92d629daf8a7344812e090dc6 Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Wed, 12 Nov 2025 17:38:11 +0000 Subject: [PATCH 12/14] Check target properly. --- src/drunc/controller/controller.py | 80 ++++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 16 deletions(-) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index 8154c8875..dd6dac236 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -780,13 +780,19 @@ def propagate_concurrently( def status( self, request: AddressedCommand, context: ServicerContext ) -> StatusResponse: - request.target = self.parse_target_string(request.target) response = StatusResponse( token=None, name=self.name, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. if request.target == self.name or request.execute_along_path: status = get_status_message(self) @@ -815,13 +821,19 @@ def status( def describe( self, request: AddressedCommand, context: ServicerContext ) -> DescribeResponse: - request.target = self.parse_target_string(request.target) response = DescribeResponse( token=None, name=self.name, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. if request.target == self.name or request.execute_along_path: description = Description( @@ -859,13 +871,19 @@ def describe( def describe_fsm( self, request: AddressedCommand, context: ServicerContext ) -> DescribeFSMResponse: - request.target = self.parse_target_string(request.target) response = DescribeFSMResponse( token=None, name=self.name, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # What transitions to describe. key = unpack_any(request.command_data, PlainText).text @@ -926,22 +944,27 @@ def execute_fsm_command( request: ExecuteFSMCommandRequest, context: ServicerContext, ) -> ExecuteFSMCommandResponse: - request.target = self.parse_target_string(request.target) - - command = request.command - command_name = command.command_name - self.log.debug(f"FSM command: {command_name}") - transition = self.stateful_node.get_fsm_transition(command_name) - self.log.debug(f"FSM transition: {transition}") - response = ExecuteFSMCommandResponse( token=None, name=self.name, - command_name=command_name, + command_name=request.command.command_name, fsm_flag=FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + + command = request.command + command_name = command.command_name + self.log.debug(f"FSM command: {command_name}") + transition = self.stateful_node.get_fsm_transition(command_name) + self.log.debug(f"FSM transition: {transition}") + # Check controller readiness. if not self.stateful_node.get_ready_state(): self.log.error( @@ -1088,13 +1111,20 @@ def execute_expert_command( request: ExecuteExpertCommandRequest, context: ServicerContext, ) -> ExecuteExpertCommandResponse: - request.target = self.parse_target_string(request.target) response = ExecuteExpertCommandResponse( token=None, name=self.name, + fsm_flag=FSMResponseFlag.FSM_EXECUTED_SUCCESSFULLY, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. response.data = f"'{self.name}' propagated expert command" @@ -1125,13 +1155,19 @@ def include( request: IncludeExcludeRequest, context: ServicerContext, ) -> IncludeExcludeResponse: - request.target = self.parse_target_string(request.target) response = IncludeExcludeResponse( token=None, name=self.name, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. if request.target == self.name or request.execute_along_path: try: @@ -1168,13 +1204,19 @@ def exclude( request: IncludeExcludeRequest, context: ServicerContext, ) -> IncludeExcludeResponse: - request.target = self.parse_target_string(request.target) response = IncludeExcludeResponse( token=None, name=self.name, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. if request.target == self.name or request.execute_along_path: try: @@ -1209,13 +1251,19 @@ def exclude( def recompute_status( self, request: AddressedCommand, context: ServicerContext ) -> StatusResponse: - request.target = self.parse_target_string(request.target) response = StatusResponse( token=None, name=self.name, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + try: + # Parse and validate target. + request.target = self.parse_target_string(request.target) + except ValueError: + response.flag = ResponseFlag.NOT_EXECUTED_BAD_REQUEST_FORMAT + return response + # This node. if request.target == self.name or request.execute_along_path: child_list = self.address_all() From a72a0d85210a967c807c5382598f41da074b83ac Mon Sep 17 00:00:00 2001 From: James Paul Turner Date: Thu, 13 Nov 2025 11:23:36 +0000 Subject: [PATCH 13/14] Ignore exclusion in RPCs which do not alter FSM state. --- src/drunc/controller/controller.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/drunc/controller/controller.py b/src/drunc/controller/controller.py index dd6dac236..0f34637b9 100644 --- a/src/drunc/controller/controller.py +++ b/src/drunc/controller/controller.py @@ -798,10 +798,11 @@ def status( status = get_status_message(self) response.status.CopyFrom(status) - # Children nodes. + # Children nodes (ignore exclusion). child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, ) child_responses = self.propagate_concurrently( lambda child, target: child.status( @@ -848,10 +849,11 @@ def describe( description.broadcast.Pack(broadcast_description) response.description.CopyFrom(description) - # Children nodes. + # Children nodes (ignore exclusion). child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, ) child_responses = self.propagate_concurrently( lambda child, target: child.describe( @@ -913,10 +915,11 @@ def describe_fsm( description.sequences.extend(self.stateful_node.get_fsm_sequences()) response.description.CopyFrom(description) - # Children nodes. + # Children nodes (ignore exclusion). child_list = self.address_target_path( request.target, request.execute_on_all_subsequent_children_in_path, + ignore_exclusion=True, ) child_responses = self.propagate_concurrently( lambda child, target: child.describe_fsm( From 30168320fbf844e5b7fde862c29d33c12dd468ce Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Thu, 13 Nov 2025 20:03:43 +0100 Subject: [PATCH 14/14] Removing empty log --- src/drunc/controller/interface/commands.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/drunc/controller/interface/commands.py b/src/drunc/controller/interface/commands.py index c5ec58066..b3869bb84 100644 --- a/src/drunc/controller/interface/commands.py +++ b/src/drunc/controller/interface/commands.py @@ -307,7 +307,7 @@ def include( execute_along_path=False, execute_on_all_subsequent_children_in_path=True, ) - if not result: + if not result or not result.text: return log = get_logger(**logger_params) log.info(result.text) @@ -325,7 +325,7 @@ def exclude( execute_along_path=False, execute_on_all_subsequent_children_in_path=True, ) - if not result: + if not result or not result.text: return log = get_logger(**logger_params) log.info(result.text)