From 49e98838fc5acf00aad1aa402270835e321816b0 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Wed, 22 Apr 2026 12:47:11 +0200 Subject: [PATCH 01/31] WIP --- src/drunc/unified_shell/shell.py | 35 ++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index b80d8e33d..6fb57d516 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -38,7 +38,6 @@ from drunc.fsm.configuration import FSMConfHandler from drunc.fsm.utils import convert_fsm_transition from drunc.process_manager.configuration import ( - ProcessManagerTypes, get_process_manager_configuration, validate_pm_config, ) @@ -51,7 +50,6 @@ terminate, ) from drunc.process_manager.interface.process_manager import run_pm -from drunc.process_manager.utils import get_pm_type_from_name, validate_k8s_session_name from drunc.unified_shell.commands import boot, start_shell from drunc.unified_shell.context import UnifiedShellMode from drunc.unified_shell.shell_utils import generate_fsm_sequence_command @@ -151,22 +149,29 @@ def unified_shell( unified_shell_log.debug("Setting up the [green]unified_shell[/green] logger") # Parse the process manager argument to determine if it's a config or an address + unified_shell_log.critical( + f"Parsing the process manager argument: {process_manager}" + ) process_manager_url: ParseResult = urlparse(process_manager) internal_pm: bool = True if process_manager_url.scheme == "grpc": # i.e. if it's an address internal_pm = False + unified_shell_log.critical( + f"Process manager argument parsed, internal_pm set to {internal_pm}" + ) - # If using a k8s process manager, validate the session name before proceeding - if get_pm_type_from_name( - process_manager - ) == ProcessManagerTypes.K8s and not validate_k8s_session_name(session_name): - unified_shell_log.error( - f"[red]Invalid session/namespace name [bold]({session_name})[/bold][/red]. " - "Must match RFC1123 label: lowercase alphanumeric or '-', start/end with " - "alphanumeric, max 63 chars." - ) - sys.exit(1) - + # # If using a k8s process manager, validate the session name before proceeding + # if get_pm_type_from_name( + # process_manager + # ) == ProcessManagerTypes.K8s and not validate_k8s_session_name(session_name): + # unified_shell_log.error( + # f"[red]Invalid session/namespace name [bold]({session_name})[/bold][/red]. " + # "Must match RFC1123 label: lowercase alphanumeric or '-', start/end with " + # "alphanumeric, max 63 chars." + # ) + # sys.exit(1) + + unified_shell_log.critical("TEST") # Setup configuration related context variables ctx.obj.configuration_file = f"oksconflibs:{configuration_file}" ctx.obj.configuration_id = configuration_id @@ -255,6 +260,9 @@ def unified_shell( ) else: # Connect to an existing process manager at the provided address + unified_shell_log.critical( + "Connecting to an existing process manager at the provided address" + ) process_manager_address = process_manager.replace( "grpc://", "" ) # remove the grpc scheme @@ -271,6 +279,7 @@ def unified_shell( # Run a simple command (describe) to check the connection with the process manager desc: Description | None = None + unified_shell_log.critical("Getting driver") try: desc = ctx.obj.get_driver().describe() except Exception as e: From 68f5f1a24407f38bd2e4a421565b3067353b1018 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 22 Apr 2026 12:48:49 +0200 Subject: [PATCH 02/31] Add testing for logger --- src/drunc/processes/ssh_process_lifetime_manager_shell.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index f9d28e506..f596dc3dd 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -232,7 +232,7 @@ def __init__( """ self.disable_host_key_check = disable_host_key_check self.disable_localhost_host_key_check = disable_localhost_host_key_check - self.log = logger if logger else get_logger(__name__) + self.log = get_logger("PM_LMS_TEST", rich_handler=True) self.on_process_exit = on_process_exit # Create SSH command wrapper From 211a75395b842e17c7e33c32d1fd01c1c7a4bf62 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Wed, 22 Apr 2026 16:43:09 +0200 Subject: [PATCH 03/31] More hacks --- .../process_manager/ssh_process_manager.py | 3 ++ .../ssh_process_lifetime_manager_shell.py | 53 +++++++++++++------ 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index e80d421b7..7792f89ab 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -345,6 +345,9 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: # Update hostname in boot request for this attempt self.boot_request[uuid].process_description.metadata.hostname = host + self.log.critical( + f"Attempting to start process {uuid} on host {host} via SSH lifetime manager" + ) # Start the process via SSH manager self.ssh_lifetime_manager.start_process( uuid=uuid, boot_request=self.boot_request[uuid] diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index f596dc3dd..9d0e560d3 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -313,6 +313,9 @@ def start_process(self, uuid: str, boot_request: BootRequest) -> None: hostname = boot_request.process_description.metadata.hostname user = boot_request.process_description.metadata.user log_file = boot_request.process_description.process_logs_path + self.log.critical( + f"Starting process {uuid} on {hostname} as {user} with log file {log_file}" + ) # Extract environment variables from boot request env_vars = ( @@ -333,6 +336,8 @@ def start_process(self, uuid: str, boot_request: BootRequest) -> None: if cmd.endswith(";"): cmd = cmd[:-1] + self.log.critical(f"Built command for {uuid}: {cmd}: {boot_request}") + # Execute the command via SSH self._execute_bootrequest_via_ssh( uuid=uuid, @@ -705,25 +710,26 @@ def _build_ssh_arguments(self, hostname: str, user_host: str) -> List[str]: Returns: List of SSH command arguments """ - disable_host_key_check = self.disable_host_key_check or ( - self.disable_localhost_host_key_check - and hostname in ("localhost", "127.0.0.1", "::1") - ) + # disable_host_key_check = self.disable_host_key_check or ( + # self.disable_localhost_host_key_check + # and hostname in ("localhost", "127.0.0.1", "::1") + # ) arguments = [user_host, "-tt", "-o", "StrictHostKeyChecking=no"] - - if disable_host_key_check: - arguments.extend( - [ - "-o", - "LogLevel=error", - "-o", - "GlobalKnownHostsFile=/dev/null", - "-o", - "UserKnownHostsFile=/dev/null", - ] - ) - + # "-F /nfs/home/{user_host.split('@')[0]}/.ssh/config", + + # if disable_host_key_check: + arguments.extend( + [ + "-o", + "LogLevel=debug", + "-o", + "GlobalKnownHostsFile=/dev/null", + "-o", + "UserKnownHostsFile=/dev/null", + ] + ) + self.log.critical(f"SSH arguments for {user_host}: {arguments}") return arguments def read_log_file( @@ -840,8 +846,15 @@ def read_process_metadata( arguments.append(remote_command) # Execute SSH command to wait for and read file (single round-trip) + self.log.critical( + f"Attempting to read metadata for {uuid} from {hostname} with timeout {timeout}s" + ) result = self.ssh(*arguments) + self.log.critical( + f"DEBUG - Raw metadata content for {uuid} from {hostname}: {result}" + ) json_content = str(result).strip() + self.log.critical("Attempt successful?|???") # Parse JSON content and instantiate metadata object metadata = ProcessMetadata.from_json(json_content) @@ -881,6 +894,9 @@ def _execute_bootrequest_via_ssh( try: platform = os.uname().sysname.lower() is_macos = "darwin" in platform + hostname_for_gssapi = hostname + if hostname_for_gssapi == "localhost": + hostname_for_gssapi = os.uname().nodename user_host = f"{user}@{hostname}" # Build remote command with metadata file writing @@ -896,6 +912,9 @@ def _execute_bootrequest_via_ssh( remote_cmd += f"cd {boot_request.process_description.process_execution_directory} ; " metadata_file = SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid) + self.log.critical( + f"Metadata file for {uuid} will be written to {metadata_file} on remote host" + ) tree_id = boot_request.process_description.metadata.tree_id name = boot_request.process_description.metadata.name role = ProcessMetadata.compute_role_from_tree_id(tree_id) From d33fedee3a28fe55f83c4896fd2d7dccc60e450d Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 24 Apr 2026 12:57:23 +0200 Subject: [PATCH 04/31] Multi user support enabled --- .../ssh_process_lifetime_manager_shell.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index 9d0e560d3..ae6e75430 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -329,7 +329,10 @@ def start_process(self, uuid: str, boot_request: BootRequest) -> None: for exe_arg in boot_request.process_description.executable_and_arguments: cmd += exe_arg.exec for arg in exe_arg.args: - cmd += f" {arg}" + if arg.endswith("daq_app_rte.sh"): + cmd += f" {os.getenv('DBT_AREA_ROOT')}/install/daq_app_rte.sh" + else: + cmd += f" {arg}" cmd += ";" # Remove trailing semicolon if present @@ -714,15 +717,21 @@ def _build_ssh_arguments(self, hostname: str, user_host: str) -> List[str]: # self.disable_localhost_host_key_check # and hostname in ("localhost", "127.0.0.1", "::1") # ) + superuser_host = getpass.getuser() + "@" + user_host.split("@")[1] + # self.log.critical(f"Building SSH arguments for {user_host} with superuser host {superuser_host}") + arguments = [superuser_host, "-tt", "-o", "StrictHostKeyChecking=no"] + # self.log.critical(f"SSH arguments after adding StrictHostKeyChecking for {user_host}%s", arguments) + # self.log.critical(f"{arguments=}") + # self.log.critical(f"Test list: {test_list_print}") + # self.log.critical(f"Test list 2: %s", test_list_print) - arguments = [user_host, "-tt", "-o", "StrictHostKeyChecking=no"] # "-F /nfs/home/{user_host.split('@')[0]}/.ssh/config", # if disable_host_key_check: arguments.extend( [ "-o", - "LogLevel=debug", + "LogLevel=info", "-o", "GlobalKnownHostsFile=/dev/null", "-o", @@ -730,6 +739,10 @@ def _build_ssh_arguments(self, hostname: str, user_host: str) -> List[str]: ] ) self.log.critical(f"SSH arguments for {user_host}: {arguments}") + self.log.critical( + f"PP: {getpass.getuser()} is running on {os.uname().nodename} with disable_host_key_check={self.disable_host_key_check} and disable_localhost_host_key_check={self.disable_localhost_host_key_check}" + ) + return arguments def read_log_file( From 6c2d752c262b3df6df3aa3f23ffaf1413a5f8d04 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 24 Apr 2026 16:30:06 +0200 Subject: [PATCH 05/31] WIP --- .../process_manager/ssh_process_manager.py | 19 ++++++- src/drunc/unified_shell/commands.py | 50 ++++++++++++++++--- src/drunc/unified_shell/shell.py | 20 +++++--- 3 files changed, 73 insertions(+), 16 deletions(-) diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 7792f89ab..729a9e846 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -420,6 +420,10 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: boot_request_dict=self.boot_request, order_by="random", ) + if query.session: + self.log.warning( + f"{self.name} found {len(process_uuids)} processes matching {query} for ps" + ) # Iterate through all processes matching the query for proc_uuid in process_uuids: @@ -467,13 +471,24 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: else: pi.remote_pid = remote_pid_result.reason ret += [pi] - - return ProcessInstanceList( + ret_fmt = ProcessInstanceList( name=self.name, token=None, values=ret, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + if query.session: + self.log.critical( + f"{self.name} returning {len(ret)} processes from ps query {query}" + ) + self.log.critical(ret_fmt) + self.log.critical(f"TEST: {ret_fmt=}") + else: + self.log.warning( + f"{self.name} returning {len(ret)} processes from ps query {query}" + ) + self.log.warning(ret) + return ret_fmt def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: self.log.debug(f"{self.name} running boot command") diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index c5dfefc24..d8ee13cda 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -7,6 +7,7 @@ from drunc.controller.interface.shell_utils import controller_setup from drunc.exceptions import DruncSetupException from drunc.process_manager.interface.context import ProcessManagerContext +from drunc.process_manager.utils import tabulate_process_instance_list from drunc.unified_shell.context import UnifiedShellMode from drunc.utils.shell_utils import InterruptedCommand from drunc.utils.utils import get_logger @@ -42,12 +43,12 @@ def boot( override_logs_boot = obj.override_logs else: override_logs_boot = override_logs - if len(processes.values) > 0: - log.error( - f"Cannot boot: session {session_name} already has {len(processes.values)} processes running. " - "Please terminate the existing session first." - ) - return + # if len(processes.values) > 0: + # log.error( + # f"Cannot boot: session {session_name} already has {len(processes.values)} processes running. " + # "Please terminate the existing session first." + # ) + # return try: results = obj.get_driver("process_manager").boot( @@ -101,6 +102,43 @@ def boot( sys.exit(1) +@click.command("terminate") +@click.pass_obj +@click.pass_context +def terminate(ctx, obj): + """ + Execute the process manager terminate command, but only do this for the current + session + """ + + log = get_logger("unified_shell.terminate") + session_query = ProcessQuery(session=ctx.obj.session_name) + log.info(f"Terminating session [green]{ctx.obj.session_name}[/]") + obj.get_driver("process_manager").kill(session_query) + + +@click.command("ps") +@click.pass_obj +@click.pass_context +def ps(ctx, obj): + """ + Execute the process manager terminate command, but only do this for the current + session + """ + + log = get_logger("unified_shell.ps") + session_query = ProcessQuery(session=ctx.obj.session_name) + log.info(f"Listing session [green]{ctx.obj.session_name}[/]") + results = obj.get_driver("process_manager").ps(session_query) + obj.print( + tabulate_process_instance_list( + results, title=f"Processes running in session {ctx.obj.session_name}" + ), + overflow="fold", + soft_wrap=True, + ) + + @click.command("start-shell") @click.pass_obj @click.pass_context diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 0bddc5849..a52c4a8bf 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -46,12 +46,12 @@ flush, kill, logs, - ps, + # ps, restart, - terminate, + # terminate, ) from drunc.process_manager.interface.process_manager import run_pm -from drunc.unified_shell.commands import boot, start_shell +from drunc.unified_shell.commands import boot, ps, start_shell, terminate from drunc.unified_shell.context import UnifiedShellMode from drunc.unified_shell.shell_utils import generate_fsm_sequence_command from drunc.utils.configuration import ConfTypes, OKSKey @@ -307,6 +307,7 @@ def unified_shell( ctx.obj.pm_process.join() sys.exit(1) + unified_shell_log.critical("Process manager described successfully") # Broadcasting configuration if requested if desc.HasField("broadcast"): @@ -317,18 +318,20 @@ def unified_shell( # Add the unified shell Click commands to the CLI unified_shell_log.debug("Adding [green]unified_shell[/green] commands") - ctx.command.add_command(boot, "boot") - ctx.obj.dynamic_commands.add("boot") + unified_shell_commands = [boot, ps, terminate] + for cmd in unified_shell_commands: + ctx.command.add_command(cmd, format_name_for_cli(cmd.name)) + ctx.obj.dynamic_commands.add(format_name_for_cli(cmd.name)) # Add the process manager Click commands to the CLI unified_shell_log.debug("Adding [green]process_manager[/green] commands") process_manager_commands: list[click.Command] = [ kill, - terminate, + # terminate, flush, logs, restart, - ps, + # ps, ] for cmd in process_manager_commands: ctx.command.add_command(cmd, format_name_for_cli(cmd.name)) @@ -466,7 +469,8 @@ def cleanup(): # Terminate any residual processes if ctx.obj.get_driver("process_manager"): - ctx.obj.get_driver("process_manager").terminate() + session_processes = ProcessQuery(session=ctx.obj.session_name) + ctx.obj.get_driver("process_manager").kill(session_processes) # Check if any processes are still running if ( From 7cbb5117bbe2b509d7bc7a6ebe652a09143f2fc0 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Tue, 5 May 2026 11:42:51 +0200 Subject: [PATCH 06/31] fix db path resolver --- src/drunc/process_manager/oks_parser.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/drunc/process_manager/oks_parser.py b/src/drunc/process_manager/oks_parser.py index f679e7f1a..2615afc7e 100644 --- a/src/drunc/process_manager/oks_parser.py +++ b/src/drunc/process_manager/oks_parser.py @@ -5,7 +5,7 @@ from drunc.exceptions import DruncException, DruncSetupException from drunc.process_manager.configuration import get_commandline_parameters -from drunc.utils.utils import get_logger +from drunc.utils.utils import file_is_read_only, get_logger if TYPE_CHECKING: import conffwk @@ -76,7 +76,19 @@ def get_full_db_path(db_path: str) -> str: raise DruncSetupException(err_str) # If multiple matches are found, take the first instance that matches. + #! This is a horrible way of choosing it... it could have gone into the cvmfs thing + #! we should put in the file_is_read_only config + #! and loop across until you find one where its not read only + + #! Also clean up this logic right here + + # Prefer the first writable match; if every match is read-only, fall back to the first one. resolved_path = unique_matched_files[0] + for matched_file in unique_matched_files: + if not file_is_read_only(matched_file): + resolved_path = matched_file + break + log.debug(f"Path {db_path} resolved to {resolved_path}") return resolved_path From b19aca773cbb0546a18a12ef93255d44ca664911 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 6 May 2026 18:04:38 +0200 Subject: [PATCH 07/31] Commenting out a bunch of debug logs --- .../process_manager/ssh_process_manager.py | 39 ++++++++++--------- .../ssh_process_lifetime_manager_shell.py | 36 ++++++++--------- src/drunc/unified_shell/shell.py | 16 +++++--- 3 files changed, 49 insertions(+), 42 deletions(-) diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 729a9e846..e0b1ab8e0 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -22,6 +22,9 @@ from drunc.processes.ssh_process_lifetime_manager import ProcessLifetimeManager +from drunc.utils.utils import get_logger + + class SSHProcessManager(ProcessManager): def __init__( self, configuration, LifetimeManagerClass: ProcessLifetimeManager, **kwargs @@ -345,9 +348,9 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: # Update hostname in boot request for this attempt self.boot_request[uuid].process_description.metadata.hostname = host - self.log.critical( - f"Attempting to start process {uuid} on host {host} via SSH lifetime manager" - ) + # self.log.critical( + # f"Attempting to start process {uuid} on host {host} via SSH lifetime manager" + # ) # Start the process via SSH manager self.ssh_lifetime_manager.start_process( uuid=uuid, boot_request=self.boot_request[uuid] @@ -420,10 +423,10 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: boot_request_dict=self.boot_request, order_by="random", ) - if query.session: - self.log.warning( - f"{self.name} found {len(process_uuids)} processes matching {query} for ps" - ) + # if query.session: + # self.log.warning( + # f"{self.name} found {len(process_uuids)} processes matching {query} for ps" + # ) # Iterate through all processes matching the query for proc_uuid in process_uuids: @@ -477,17 +480,17 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: values=ret, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) - if query.session: - self.log.critical( - f"{self.name} returning {len(ret)} processes from ps query {query}" - ) - self.log.critical(ret_fmt) - self.log.critical(f"TEST: {ret_fmt=}") - else: - self.log.warning( - f"{self.name} returning {len(ret)} processes from ps query {query}" - ) - self.log.warning(ret) + # if query.session: + # self.log.critical( + # f"{self.name} returning {len(ret)} processes from ps query {query}" + # ) + # self.log.critical(ret_fmt) + # self.log.critical(f"TEST: {ret_fmt=}") + # else: + # self.log.warning( + # f"{self.name} returning {len(ret)} processes from ps query {query}" + # ) + # self.log.warning(ret) return ret_fmt def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index ae6e75430..aacbc5ed9 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -313,9 +313,9 @@ def start_process(self, uuid: str, boot_request: BootRequest) -> None: hostname = boot_request.process_description.metadata.hostname user = boot_request.process_description.metadata.user log_file = boot_request.process_description.process_logs_path - self.log.critical( - f"Starting process {uuid} on {hostname} as {user} with log file {log_file}" - ) + # self.log.critical( + # f"Starting process {uuid} on {hostname} as {user} with log file {log_file}" + # ) # Extract environment variables from boot request env_vars = ( @@ -339,7 +339,7 @@ def start_process(self, uuid: str, boot_request: BootRequest) -> None: if cmd.endswith(";"): cmd = cmd[:-1] - self.log.critical(f"Built command for {uuid}: {cmd}: {boot_request}") + # self.log.critical(f"Built command for {uuid}: {cmd}: {boot_request}") # Execute the command via SSH self._execute_bootrequest_via_ssh( @@ -738,10 +738,10 @@ def _build_ssh_arguments(self, hostname: str, user_host: str) -> List[str]: "UserKnownHostsFile=/dev/null", ] ) - self.log.critical(f"SSH arguments for {user_host}: {arguments}") - self.log.critical( - f"PP: {getpass.getuser()} is running on {os.uname().nodename} with disable_host_key_check={self.disable_host_key_check} and disable_localhost_host_key_check={self.disable_localhost_host_key_check}" - ) + # self.log.critical(f"SSH arguments for {user_host}: {arguments}") + # self.log.critical( + # f"PP: {getpass.getuser()} is running on {os.uname().nodename} with disable_host_key_check={self.disable_host_key_check} and disable_localhost_host_key_check={self.disable_localhost_host_key_check}" + # ) return arguments @@ -859,15 +859,15 @@ def read_process_metadata( arguments.append(remote_command) # Execute SSH command to wait for and read file (single round-trip) - self.log.critical( - f"Attempting to read metadata for {uuid} from {hostname} with timeout {timeout}s" - ) + # self.log.critical( + # f"Attempting to read metadata for {uuid} from {hostname} with timeout {timeout}s" + # ) result = self.ssh(*arguments) - self.log.critical( - f"DEBUG - Raw metadata content for {uuid} from {hostname}: {result}" - ) + # self.log.critical( + # f"DEBUG - Raw metadata content for {uuid} from {hostname}: {result}" + # ) json_content = str(result).strip() - self.log.critical("Attempt successful?|???") + # self.log.critical("Attempt successful?|???") # Parse JSON content and instantiate metadata object metadata = ProcessMetadata.from_json(json_content) @@ -925,9 +925,9 @@ def _execute_bootrequest_via_ssh( remote_cmd += f"cd {boot_request.process_description.process_execution_directory} ; " metadata_file = SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid) - self.log.critical( - f"Metadata file for {uuid} will be written to {metadata_file} on remote host" - ) + # self.log.critical( + # f"Metadata file for {uuid} will be written to {metadata_file} on remote host" + # ) tree_id = boot_request.process_description.metadata.tree_id name = boot_request.process_description.metadata.name role = ProcessMetadata.compute_role_from_tree_id(tree_id) diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index a52c4a8bf..4f55e6b20 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -150,9 +150,9 @@ def unified_shell( unified_shell_log.debug("Setting up the [green]unified_shell[/green] logger") # Parse the process manager argument to determine if it's a config or an address - unified_shell_log.critical( - f"Parsing the process manager argument: {process_manager}" - ) + # unified_shell_log.critical( + # f"Parsing the process manager argument: {process_manager}" + # ) process_manager_url: ParseResult = urlparse(process_manager) internal_pm: bool = True if process_manager_url.scheme == "grpc": # i.e. if it's an address @@ -172,7 +172,7 @@ def unified_shell( # ) # sys.exit(1) - unified_shell_log.critical("TEST") + # unified_shell_log.critical("TEST") # Setup configuration related context variables ctx.obj.configuration_file = f"oksconflibs:{configuration_file}" ctx.obj.configuration_id = configuration_id @@ -276,11 +276,15 @@ def unified_shell( f"[green]process_manager[/green] started, communicating through address [green]" f"{process_manager_address}[/green]" ) + + #! This is where the context objects connection detail ctx.obj.reset(address_pm=process_manager_address) # Run a simple command (describe) to check the connection with the process manager desc: Description | None = None - unified_shell_log.critical("Getting driver") + # unified_shell_log.critical("Getting driver") + + #! Here we're getting the driver! try: desc = ctx.obj.get_driver().describe() except Exception as e: @@ -307,7 +311,7 @@ def unified_shell( ctx.obj.pm_process.join() sys.exit(1) - unified_shell_log.critical("Process manager described successfully") + # unified_shell_log.critical("Process manager described successfully") # Broadcasting configuration if requested if desc.HasField("broadcast"): From fc5576925acbc7977c416ac3c9d42bbf01dc4c3d Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 6 May 2026 18:13:21 +0200 Subject: [PATCH 08/31] Add send_random test --- src/drunc/process_manager/process_manager.py | 30 +++++++++++++++++++ .../process_manager/process_manager_driver.py | 20 +++++++++++++ .../process_manager/ssh_process_manager.py | 6 +++- src/drunc/unified_shell/shell.py | 5 ++++ 4 files changed, 60 insertions(+), 1 deletion(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 8fc32e1d8..f470eac18 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -7,6 +7,7 @@ from daqpytools.logging import LogHandlerConf, exceptions, setup_daq_ers_logger from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType +from google.protobuf.empty_pb2 import Empty from druncschema.description_pb2 import CommandDescription, Description from druncschema.opmon.process_manager_pb2 import ProcessStatus from druncschema.process_manager_pb2 import ( @@ -560,6 +561,35 @@ def logs(self, request: LogRequest, context: ServicerContext) -> LogLines: return response + @abc.abstractmethod + def _send_random_impl(self) -> Empty: + raise NotImplementedError + + @broadcasted + @authentified_and_authorised( + action=ActionType.READ, system=SystemType.PROCESS_MANAGER + ) + def send_random(self, request: Request, context: ServicerContext) -> Empty: + self.log.debug(f"{self.name} running send_random") + + try: + response = self._send_random_impl() + except NotImplementedError: + raise DruncNotImplementedException( + message="Implementation missing", + domain="ProcessManager.send_random", + ) + except Exception as e: + context_msg = f"Unhandled exception in ProcessManager.send_random: {e}" + self.log.exception(context_msg) + + raise DruncCommandException( + message=context_msg, + domain="ProcessManager.send_random", + ) + + return response + def _ensure_one_process( self, uuids: list[str], in_boot_request: bool = False ) -> str: diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index cd7169f99..72ea73079 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -87,6 +87,26 @@ def close(self) -> None: self.log.error(f"Error closing gRPC channel: {e}", exc_info=True) # ----- Boot workflow ----- + + def send_random(self): + request = Request(token=copy_token(self.token)) + timeout = 10 + + try: + self.stub.send_random(request, timeout=timeout) + except grpc.RpcError as e: + try: + error_details = extract_grpc_rich_error(e) + self.log.error(error_details) + except Exception as extraction_error: + self.log.debug( + f"Could not extract rich error details from gRPC error: {extraction_error}", + exc_info=True, + ) + handle_grpc_error(e) + + self.log.critical("Hello, world!") + def boot( self, conf_file: str, diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index e0b1ab8e0..779fbc44d 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -16,7 +16,7 @@ ProcessUUID, ) from druncschema.request_response_pb2 import ResponseFlag - +from google.protobuf.empty_pb2 import Empty from drunc.exceptions import DruncCommandException from drunc.process_manager.process_manager import ProcessManager from drunc.processes.ssh_process_lifetime_manager import ProcessLifetimeManager @@ -493,6 +493,10 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: # self.log.warning(ret) return ret_fmt + def _send_random_impl(self) -> Empty: + self.log.critical("Hello, worldsssear!") + return Empty() + def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: self.log.debug(f"{self.name} running boot command") this_uuid = str(uuid.uuid4()) diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 4f55e6b20..c958817ed 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -313,6 +313,11 @@ def unified_shell( sys.exit(1) # unified_shell_log.critical("Process manager described successfully") + #! So now we have a working get_driver object that can communicate with the pm. + + # lets try sending a random command.. + ctx.obj.get_driver("process_manager").send_random() + # Broadcasting configuration if requested if desc.HasField("broadcast"): unified_shell_log.debug("Broadcasting") From e03689b63494d805f27221df16a9dd2acce18370 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 6 May 2026 17:54:55 +0200 Subject: [PATCH 09/31] Fix #709 --- src/drunc/process_manager/process_manager_driver.py | 3 +++ src/drunc/unified_shell/shell.py | 8 +++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 72ea73079..e8e7619b4 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -123,6 +123,9 @@ def boot( ) -> Iterator[ProcessInstanceList] | None: self.log.info(f"Booting session [green]{session_name}[/green]") + # Assume oksconflibs if no framework is defined + conf_file = f"oksconflibs:{conf_file}" if ":" not in conf_file else conf_file + # Step 1 - consolidate configuration self._consolidate_config(session_name, conf_file) diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index c958817ed..9bddb4272 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -174,7 +174,13 @@ def unified_shell( # unified_shell_log.critical("TEST") # Setup configuration related context variables - ctx.obj.configuration_file = f"oksconflibs:{configuration_file}" + # Assume oksconflibs if no framework is defined + ctx.obj.configuration_file = ( + lambda path: ( + path if path.startswith("oksconflibs:") else f"oksconflibs:{path}" + ) + )(configuration_file) + ctx.obj.configuration_id = configuration_id ctx.obj.session_name = session_name From 1e9eec0cf5a1bdf28fd8160743f3c8c55839e084 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Fri, 8 May 2026 10:42:38 +0200 Subject: [PATCH 10/31] fix logic bug in query matching --- src/drunc/process_manager/process_manager.py | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index f470eac18..ec04302f4 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -7,7 +7,6 @@ from daqpytools.logging import LogHandlerConf, exceptions, setup_daq_ers_logger from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType -from google.protobuf.empty_pb2 import Empty from druncschema.description_pb2 import CommandDescription, Description from druncschema.opmon.process_manager_pb2 import ProcessStatus from druncschema.process_manager_pb2 import ( @@ -23,6 +22,7 @@ Request, ResponseFlag, ) +from google.protobuf.empty_pb2 import Empty from google.rpc import code_pb2 from grpc import ServicerContext @@ -650,25 +650,27 @@ def _match_processes_against_query( # Filter processes based on query criteria processes = [] for uuid in available_uuids: - accepted = False + accepted = True meta = boot_request_dict[uuid].process_description.metadata # Check UUID match - if uuid in uuid_selector: - accepted = True + if uuid_selector and uuid not in uuid_selector: + accepted = False # Check name pattern match (regex) - for name_reg in name_selector: - if re.search(name_reg, meta.name): - accepted = True + + if name_selector and not any( + re.search(reg, meta.name) for reg in name_selector + ): + accepted = False # Check session match - if session_selector == meta.session: - accepted = True + if session_selector and session_selector != meta.session: + accepted = False # Check user match - if user_selector == meta.user: - accepted = True + if user_selector and user_selector != meta.user: + accepted = False if accepted: processes.append(uuid) From e0f1a4ea14f51e68cdbdb832987d69c74a6263ae Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Fri, 8 May 2026 10:50:38 +0200 Subject: [PATCH 11/31] [messy] Add working logging command --- .../process_manager/interface/commands.py | 21 ++-- .../process_manager/ssh_process_manager.py | 3 - src/drunc/unified_shell/commands.py | 101 ++++++++++++++++++ src/drunc/unified_shell/shell.py | 15 ++- 4 files changed, 124 insertions(+), 16 deletions(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 129f3ea78..753958c7a 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -223,22 +223,21 @@ def flush( ) # rich tables require console printing +def logs_decorators(f): + # order matters: apply options from outermost to innermost + f = click.pass_obj(f) + f = click.option("--grep", type=str, default=None)(f) + f = click.option("--how-far", type=int, show_default=True, default=100, help="How many lines one wants")(f) + f = add_query_options(at_least_one=True)(f) + return f + @click.command("logs") -@add_query_options(at_least_one=True) -@click.option( - "--how-far", - type=int, - show_default=True, - default=100, - help="How many lines one wants", -) -@click.option("--grep", type=str, default=None) -@click.pass_obj +@logs_decorators def logs( obj: ProcessManagerContext, how_far: int, grep: str, query: ProcessQuery ) -> None: log = get_logger("process_manager.shell") - log.debug(f"Running logs with query {query}") + log.info(f"Running logs with query {query}") log_req = LogRequest( how_far=how_far, query=query, diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 779fbc44d..4783341fe 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -22,9 +22,6 @@ from drunc.processes.ssh_process_lifetime_manager import ProcessLifetimeManager -from drunc.utils.utils import get_logger - - class SSHProcessManager(ProcessManager): def __init__( self, configuration, LifetimeManagerClass: ProcessLifetimeManager, **kwargs diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index d8ee13cda..1af6eeac5 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -139,6 +139,105 @@ def ps(ctx, obj): ) +# Logs + +from drunc.process_manager.interface.commands import logs as pm_logs_cmd +from drunc.process_manager.interface.commands import logs_decorators as pm_logs_deco + + +@click.command("logs") +@pm_logs_deco +@click.pass_context +def logs(ctx, obj, how_far: int, grep: str, query: ProcessQuery) -> None: + # enforce session + + #! this kinda works! we still need to figure out how to deal with the session tho + # mainly because we want a strict default here but we have to worry about the + # decorator adding the sessino + + log = get_logger("unified_shell.logs") + + query.session = ctx.obj.session_name + + log.info("getting logs") + # delegate to the process_manager implementation: unwrap decorators + cb = pm_logs_cmd.callback + # unwrap any decorator layers to reach the original function + while hasattr(cb, "__wrapped__"): + cb = cb.__wrapped__ + + # call the original function which expects (obj, how_far, grep, query) + + # this kinda works but now i need to undertstand how the process query works because i cannot add different queries together for some reason + return cb(obj, how_far, grep, query) + + + + + + + + + + +#### DO NOT COMMIT + +# # Restart +# @click.command("restart") +# @click.pass_obj +# @click.pass_context +# def restart(ctx, obj): +# """ +# Execute the process manager restart command, but only do this for the current +# session +# """ +# log = get_logger("unified_shell.restart") +# session_query = ProcessQuery(session=ctx.obj.session_name) +# log.info(f"Restarting session [green]{ctx.obj.session_name}[/]") +# obj.get_driver("process_manager").restart(session_query) + + +# # Flush + + +# @click.command("flush") +# @click.pass_obj +# @click.pass_context +# def flush(ctx, obj): +# """ +# Execute the process manager flush command, but only do this for the current +# session +# """ +# log = get_logger("unified_shell.flush") +# session_query = ProcessQuery(session=ctx.obj.session_name) +# log.info(f"Flushing session [green]{ctx.obj.session_name}[/]") +# obj.get_driver("process_manager").flush(session_query) + + +# # Kill + + +# @click.command("kill") +# @click.pass_obj +# @click.pass_context +# def kill(ctx, obj): +# """ +# Execute the process manager kill command, but only do this for the current +# session +# """ +# log = get_logger("unified_shell.kill") +# session_query = ProcessQuery(session=ctx.obj.session_name) +# log.info(f"Killing processes in session [green]{ctx.obj.session_name}[/]") +# obj.get_driver("process_manager").kill(session_query) + + +# # Wait #also put it in the PM shell + +#! Note: i cant seem to do a start-run from no boot in the unified shell.. might have to check if this is related somehow +#! I also cannot do ps? +#### /DO NOT COMMIT + + @click.command("start-shell") @click.pass_obj @click.pass_context @@ -153,3 +252,5 @@ def start_shell(ctx, obj): obj.running_mode = UnifiedShellMode.SEMIBATCH log.info("Switching to interactive mode...") + + diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 9bddb4272..23809cb66 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -42,16 +42,27 @@ get_process_manager_configuration, validate_pm_config, ) + from drunc.process_manager.interface.commands import ( flush, kill, - logs, + # logs, # ps, restart, # terminate, ) + from drunc.process_manager.interface.process_manager import run_pm -from drunc.unified_shell.commands import boot, ps, start_shell, terminate +from drunc.unified_shell.commands import ( + boot, + # flush, + # kill, + logs, + ps, + # restart, + start_shell, + terminate, +) from drunc.unified_shell.context import UnifiedShellMode from drunc.unified_shell.shell_utils import generate_fsm_sequence_command from drunc.utils.configuration import ConfTypes, OKSKey From aaadd1a204620ea73ebb2dfabe3917e64e60d72a Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Fri, 8 May 2026 18:27:24 +0200 Subject: [PATCH 12/31] A clean logging implementation --- .../process_manager/interface/cli_argument.py | 27 +++++++---- .../process_manager/interface/commands.py | 17 +++++-- .../process_manager/ssh_process_manager.py | 1 + src/drunc/unified_shell/commands.py | 48 ++++++------------- src/drunc/unified_shell/shell.py | 2 - 5 files changed, 48 insertions(+), 47 deletions(-) diff --git a/src/drunc/process_manager/interface/cli_argument.py b/src/drunc/process_manager/interface/cli_argument.py index 27e84872f..e50210f16 100644 --- a/src/drunc/process_manager/interface/cli_argument.py +++ b/src/drunc/process_manager/interface/cli_argument.py @@ -7,15 +7,10 @@ def validate_conf_string(ctx, param, boot_configuration): return boot_configuration -def add_query_options(at_least_one: bool, all_processes_by_default: bool = False): - def wrapper(f0): - f1 = click.option( - "-s", - "--session", - type=str, - default=None, - help="Select the processes on a particular session", - )(f0) +def add_query_options_no_session( + at_least_one: bool, all_processes_by_default: bool = False +): + def wrapper(f1): # -> _Wrapped[Callable[..., Any], Any, Callable[..., Any], Any]: f2 = click.option( "-n", "--name", @@ -41,3 +36,17 @@ def wrapper(f0): return generate_process_query(f4, at_least_one, all_processes_by_default) return wrapper + + +def add_query_options(at_least_one: bool, all_processes_by_default: bool = False): + def wrapper(f0): + f1 = click.option( + "-s", + "--session", + type=str, + default=None, + help="Select the processes on a particular session", + )(f0) + return add_query_options_no_session(at_least_one, all_processes_by_default)(f1) + + return wrapper diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 753958c7a..5012164fa 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -227,13 +227,24 @@ def logs_decorators(f): # order matters: apply options from outermost to innermost f = click.pass_obj(f) f = click.option("--grep", type=str, default=None)(f) - f = click.option("--how-far", type=int, show_default=True, default=100, help="How many lines one wants")(f) - f = add_query_options(at_least_one=True)(f) + f = click.option( + "--how-far", + type=int, + show_default=True, + default=100, + help="How many lines one wants", + )(f) return f + @click.command("logs") +@add_query_options(at_least_one=True) @logs_decorators -def logs( +def logs(obj, how_far, grep, query): + return logs_impl(obj, how_far, grep, query) + + +def logs_impl( obj: ProcessManagerContext, how_far: int, grep: str, query: ProcessQuery ) -> None: log = get_logger("process_manager.shell") diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 4783341fe..c6dc5ec2a 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -17,6 +17,7 @@ ) from druncschema.request_response_pb2 import ResponseFlag from google.protobuf.empty_pb2 import Empty + from drunc.exceptions import DruncCommandException from drunc.process_manager.process_manager import ProcessManager from drunc.processes.ssh_process_lifetime_manager import ProcessLifetimeManager diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index 1af6eeac5..eae1c1d1b 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -1,11 +1,14 @@ import getpass import sys +from functools import update_wrapper import click from druncschema.process_manager_pb2 import ProcessQuery from drunc.controller.interface.shell_utils import controller_setup from drunc.exceptions import DruncSetupException +from drunc.process_manager.interface.cli_argument import add_query_options_no_session +from drunc.process_manager.interface.commands import logs_decorators, logs_impl from drunc.process_manager.interface.context import ProcessManagerContext from drunc.process_manager.utils import tabulate_process_instance_list from drunc.unified_shell.context import UnifiedShellMode @@ -139,45 +142,26 @@ def ps(ctx, obj): ) -# Logs +def session_injector(f): + @click.pass_context + def wrapper(ctx, *args, **kwargs): + kwargs["session"] = ctx.obj.session_name + return ctx.invoke(f, *args, **kwargs) -from drunc.process_manager.interface.commands import logs as pm_logs_cmd -from drunc.process_manager.interface.commands import logs_decorators as pm_logs_deco + return update_wrapper(wrapper, f) +# Logs @click.command("logs") -@pm_logs_deco +@session_injector +@add_query_options_no_session(at_least_one=True) +@logs_decorators @click.pass_context -def logs(ctx, obj, how_far: int, grep: str, query: ProcessQuery) -> None: - # enforce session - - #! this kinda works! we still need to figure out how to deal with the session tho - # mainly because we want a strict default here but we have to worry about the - # decorator adding the sessino - +def logs(ctx, obj, how_far, grep, query): log = get_logger("unified_shell.logs") - - query.session = ctx.obj.session_name - log.info("getting logs") - # delegate to the process_manager implementation: unwrap decorators - cb = pm_logs_cmd.callback - # unwrap any decorator layers to reach the original function - while hasattr(cb, "__wrapped__"): - cb = cb.__wrapped__ - - # call the original function which expects (obj, how_far, grep, query) - - # this kinda works but now i need to undertstand how the process query works because i cannot add different queries together for some reason - return cb(obj, how_far, grep, query) - - - - - - - + return logs_impl(obj, how_far, grep, query) #### DO NOT COMMIT @@ -252,5 +236,3 @@ def start_shell(ctx, obj): obj.running_mode = UnifiedShellMode.SEMIBATCH log.info("Switching to interactive mode...") - - diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 23809cb66..a7183a461 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -42,7 +42,6 @@ get_process_manager_configuration, validate_pm_config, ) - from drunc.process_manager.interface.commands import ( flush, kill, @@ -51,7 +50,6 @@ restart, # terminate, ) - from drunc.process_manager.interface.process_manager import run_pm from drunc.unified_shell.commands import ( boot, From 3f4e34af64cd5f6b5d2d5a359b9d6b1528fe26e6 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Mon, 11 May 2026 10:17:02 +0200 Subject: [PATCH 13/31] Port flush --- .../process_manager/interface/commands.py | 27 +++++++++----- src/drunc/unified_shell/commands.py | 36 +++++++++---------- src/drunc/unified_shell/shell.py | 16 ++++++--- 3 files changed, 46 insertions(+), 33 deletions(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 5012164fa..9dc92d918 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -198,17 +198,26 @@ def kill(obj: ProcessManagerContext, query: ProcessQuery, width: int | None) -> ) # rich tables require console printing +def flush_decorators(f): + f = click.pass_obj(f) + f = click.option( + "-w", + "--width", + type=int, + default=None, + help="Table width. Default is automatically calculated", + )(f) + return f + + @click.command("flush") -@click.option( - "-w", - "--width", - type=int, - default=None, - help="Table width. Default is automatically calculated", -) @add_query_options(at_least_one=False, all_processes_by_default=True) -@click.pass_obj -def flush( +@flush_decorators +def flush(obj, query, width): + return flush_impl(obj, query, width) + + +def flush_impl( obj: ProcessManagerContext, query: ProcessQuery, width: int | None, diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index eae1c1d1b..c3c8d5769 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -8,7 +8,12 @@ from drunc.controller.interface.shell_utils import controller_setup from drunc.exceptions import DruncSetupException from drunc.process_manager.interface.cli_argument import add_query_options_no_session -from drunc.process_manager.interface.commands import logs_decorators, logs_impl +from drunc.process_manager.interface.commands import ( + flush_decorators, + flush_impl, + logs_decorators, + logs_impl, +) from drunc.process_manager.interface.context import ProcessManagerContext from drunc.process_manager.utils import tabulate_process_instance_list from drunc.unified_shell.context import UnifiedShellMode @@ -156,14 +161,22 @@ def wrapper(ctx, *args, **kwargs): @session_injector @add_query_options_no_session(at_least_one=True) @logs_decorators -@click.pass_context -def logs(ctx, obj, how_far, grep, query): +def logs(obj, how_far, grep, query): log = get_logger("unified_shell.logs") log.info("getting logs") return logs_impl(obj, how_far, grep, query) +# # Flush +@click.command("flush") +@session_injector +@add_query_options_no_session(at_least_one=True) +@flush_decorators +def flush(obj, query, width): + return flush_impl(obj, query, width) + + #### DO NOT COMMIT # # Restart @@ -181,23 +194,6 @@ def logs(ctx, obj, how_far, grep, query): # obj.get_driver("process_manager").restart(session_query) -# # Flush - - -# @click.command("flush") -# @click.pass_obj -# @click.pass_context -# def flush(ctx, obj): -# """ -# Execute the process manager flush command, but only do this for the current -# session -# """ -# log = get_logger("unified_shell.flush") -# session_query = ProcessQuery(session=ctx.obj.session_name) -# log.info(f"Flushing session [green]{ctx.obj.session_name}[/]") -# obj.get_driver("process_manager").flush(session_query) - - # # Kill diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index a7183a461..df2aafe4e 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -43,7 +43,7 @@ validate_pm_config, ) from drunc.process_manager.interface.commands import ( - flush, + # flush, kill, # logs, # ps, @@ -52,15 +52,23 @@ ) from drunc.process_manager.interface.process_manager import run_pm from drunc.unified_shell.commands import ( - boot, - # flush, + boot, # TODO: double check. I bet you.. + flush, # kill, logs, - ps, + ps, # TODO: double check # restart, start_shell, terminate, + # FINISH THE REST NOW, IT SOULD BE RELATIVELYT STRAIGHTFORWARD ) + +#! Note with boot +# When you run boot in the process manager with nothing else running, it works just fine +# however when you have a running session already, boot asks 'are you sure you want to do this? +# i bet you this is causing the behaviour that we are seeing with the unified shell +# when you log into an empty PM with the US, and you start-run, it works just fine +# but when you start-run from scratch with a running session in the PM, it doesn't work from drunc.unified_shell.context import UnifiedShellMode from drunc.unified_shell.shell_utils import generate_fsm_sequence_command from drunc.utils.configuration import ConfTypes, OKSKey From 0dba1e2810646c055fc95d25b3c469c6a1f8fdb5 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Mon, 11 May 2026 14:47:30 +0200 Subject: [PATCH 14/31] Finish porting pm commands --- .../process_manager/interface/commands.py | 45 ++++++++++++------- src/drunc/unified_shell/commands.py | 23 ++++++++++ src/drunc/unified_shell/shell.py | 23 +++++----- 3 files changed, 66 insertions(+), 25 deletions(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 9dc92d918..708f9a017 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -171,23 +171,34 @@ def terminate(obj: ProcessManagerContext, width: int | None) -> None: obj.delete_driver("controller") +def kill_decorators(f): + f = click.pass_obj(f) + f = click.option( + "-w", + "--width", + type=int, + default=None, + help="Table width. Default is automatically calculated", + )(f) + f = click.option( + "--crash", + is_flag=True, + default=False, + help="Simulate a crash: send SIGKILL without any cleanup, leaving the process manager in an unexpected-death state.", + )(f) + return f + + @click.command("kill") -@click.option( - "-w", - "--width", - type=int, - default=None, - help="Table width. Default is automatically calculated", -) @add_query_options(at_least_one=True) -@click.option( - "--crash", - is_flag=True, - default=False, - help="Simulate a crash: send SIGKILL without any cleanup, leaving the process manager in an unexpected-death state.", -) -@click.pass_obj -def kill(obj: ProcessManagerContext, query: ProcessQuery, width: int | None) -> None: +@kill_decorators +def kill(obj, query, width): + return kill_impl(obj, query, width) + + +def kill_impl( + obj: ProcessManagerContext, query: ProcessQuery, width: int | None +) -> None: log = get_logger("process_manager.shell") log.debug(f"Killing with query {query}") result = obj.get_driver("process_manager").kill(query) @@ -295,6 +306,10 @@ def logs_impl( @add_query_options(at_least_one=True) @click.pass_obj def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None: + return restart_impl(obj, query) + + +def restart_impl(obj: ProcessManagerContext, query: ProcessQuery) -> None: log = get_logger("process_manager.shell") log.debug(f"Restarting with query {query}") obj.get_driver("process_manager").restart(query) diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index c3c8d5769..06032a528 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -11,8 +11,11 @@ from drunc.process_manager.interface.commands import ( flush_decorators, flush_impl, + kill_decorators, + kill_impl, logs_decorators, logs_impl, + restart_impl, ) from drunc.process_manager.interface.context import ProcessManagerContext from drunc.process_manager.utils import tabulate_process_instance_list @@ -168,6 +171,18 @@ def logs(obj, how_far, grep, query): return logs_impl(obj, how_far, grep, query) +# kill +@click.command("kill") +@session_injector +@add_query_options_no_session(at_least_one=True) +@kill_decorators +def kill(obj, query, width): + log = get_logger("unified_shell.logs") + log.info("getting logs") + + return kill_impl(obj, query, width) + + # # Flush @click.command("flush") @session_injector @@ -177,6 +192,14 @@ def flush(obj, query, width): return flush_impl(obj, query, width) +@click.command("restart") +@session_injector +@add_query_options_no_session(at_least_one=True) +@click.pass_obj +def restart(obj, query): + return restart_impl(obj, query) + + #### DO NOT COMMIT # # Restart diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index df2aafe4e..815075753 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -42,22 +42,23 @@ get_process_manager_configuration, validate_pm_config, ) -from drunc.process_manager.interface.commands import ( - # flush, - kill, - # logs, - # ps, - restart, - # terminate, -) + +# from drunc.process_manager.interface.commands import ( +# flush, +# kill, +# logs, +# ps, +# restart, +# terminate, +# ) from drunc.process_manager.interface.process_manager import run_pm from drunc.unified_shell.commands import ( boot, # TODO: double check. I bet you.. flush, - # kill, + kill, logs, ps, # TODO: double check - # restart, + restart, start_shell, terminate, # FINISH THE REST NOW, IT SOULD BE RELATIVELYT STRAIGHTFORWARD @@ -69,6 +70,8 @@ # i bet you this is causing the behaviour that we are seeing with the unified shell # when you log into an empty PM with the US, and you start-run, it works just fine # but when you start-run from scratch with a running session in the PM, it doesn't work +# Theres also a whole thing about things not flushing correctly.. +# Aha! in the pm, my terminate doesn't flush properly! from drunc.unified_shell.context import UnifiedShellMode from drunc.unified_shell.shell_utils import generate_fsm_sequence_command from drunc.utils.configuration import ConfTypes, OKSKey From 658c82885e19c0237441126f01fab61f833f8429 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Mon, 11 May 2026 15:23:35 +0200 Subject: [PATCH 15/31] Add wait to PM cuz why not --- src/drunc/process_manager/interface/commands.py | 11 +++++++++++ src/drunc/process_manager/interface/shell.py | 2 ++ 2 files changed, 13 insertions(+) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 708f9a017..22865308f 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -1,4 +1,5 @@ import getpass +from time import sleep import click from druncschema.process_manager_pb2 import LogRequest, ProcessQuery @@ -150,6 +151,16 @@ def dummy_boot( return +@click.command("wait") +@click.argument("sleep_time", type=int, default=1) +@click.pass_obj +def wait(obj: ProcessManagerContext, sleep_time: int) -> None: + log = get_logger("process_manager.wait") + log.info(f"Command [green]wait[/green] running for {sleep_time} seconds.") + sleep(sleep_time) # seconds + log.info(f"Command [green]wait[/green] ran for {sleep_time} seconds.") + + @click.command("terminate") @click.option( "-w", diff --git a/src/drunc/process_manager/interface/shell.py b/src/drunc/process_manager/interface/shell.py index 5da7434c5..931f4f1e3 100644 --- a/src/drunc/process_manager/interface/shell.py +++ b/src/drunc/process_manager/interface/shell.py @@ -14,6 +14,7 @@ ps, restart, terminate, + wait, ) from drunc.utils.grpc_utils import ServerUnreachable from drunc.utils.utils import ( @@ -83,6 +84,7 @@ def cleanup(): ctx.call_on_close(cleanup) ctx.command.add_command(boot, "boot") + ctx.command.add_command(wait, "wait") ctx.command.add_command(terminate, "terminate") ctx.command.add_command(kill, "kill") ctx.command.add_command(flush, "flush") From c048f473369d8004977a9f66ae4f3483ed72e3f7 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Mon, 11 May 2026 15:23:47 +0200 Subject: [PATCH 16/31] commment cleanup --- src/drunc/unified_shell/commands.py | 35 ----------------------------- 1 file changed, 35 deletions(-) diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index 06032a528..919ce3a97 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -200,45 +200,10 @@ def restart(obj, query): return restart_impl(obj, query) -#### DO NOT COMMIT - -# # Restart -# @click.command("restart") -# @click.pass_obj -# @click.pass_context -# def restart(ctx, obj): -# """ -# Execute the process manager restart command, but only do this for the current -# session -# """ -# log = get_logger("unified_shell.restart") -# session_query = ProcessQuery(session=ctx.obj.session_name) -# log.info(f"Restarting session [green]{ctx.obj.session_name}[/]") -# obj.get_driver("process_manager").restart(session_query) - - -# # Kill - - -# @click.command("kill") -# @click.pass_obj -# @click.pass_context -# def kill(ctx, obj): -# """ -# Execute the process manager kill command, but only do this for the current -# session -# """ -# log = get_logger("unified_shell.kill") -# session_query = ProcessQuery(session=ctx.obj.session_name) -# log.info(f"Killing processes in session [green]{ctx.obj.session_name}[/]") -# obj.get_driver("process_manager").kill(session_query) - - # # Wait #also put it in the PM shell #! Note: i cant seem to do a start-run from no boot in the unified shell.. might have to check if this is related somehow #! I also cannot do ps? -#### /DO NOT COMMIT @click.command("start-shell") From 435d3a022cd772160d3351e87a4b30713059451d Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Mon, 11 May 2026 17:45:38 +0200 Subject: [PATCH 17/31] First prototype of sending message over --- src/drunc/process_manager/process_manager.py | 41 +++++++++++++++---- .../process_manager/process_manager_driver.py | 24 +++++++++-- .../process_manager/ssh_process_manager.py | 17 ++++++-- src/drunc/unified_shell/shell.py | 2 +- .../process_manager_mock_impls.py | 8 ++++ 5 files changed, 76 insertions(+), 16 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index ec04302f4..3cb73ed42 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -16,13 +16,17 @@ ProcessInstance, ProcessInstanceList, ProcessQuery, + GenericNotificationMessage, + PMResponseFlag, + PMmsg, ) from druncschema.process_manager_pb2_grpc import ProcessManagerServicer from druncschema.request_response_pb2 import ( Request, ResponseFlag, ) -from google.protobuf.empty_pb2 import Empty + +# Note: send_msg now returns PMResponseFlag (defined in process_manager.proto) from google.rpc import code_pb2 from grpc import ServicerContext @@ -562,32 +566,53 @@ def logs(self, request: LogRequest, context: ServicerContext) -> LogLines: return response @abc.abstractmethod - def _send_random_impl(self) -> Empty: + def _send_msg_impl(self, msg: str | None = None) -> PMmsg: raise NotImplementedError @broadcasted @authentified_and_authorised( action=ActionType.READ, system=SystemType.PROCESS_MANAGER ) - def send_random(self, request: Request, context: ServicerContext) -> Empty: - self.log.debug(f"{self.name} running send_random") + def send_msg(self, request: Request, context: ServicerContext) -> PMmsg: + self.log.debug(f"{self.name} running send_msg") + # Try to extract an optional GenericNotificationMessage from request.data + msg_value = None + try: + if ( + request is not None + and hasattr(request, "data") + and request.data is not None + ): + gm = GenericNotificationMessage() + try: + request.data.Unpack(gm) + msg_value = gm.message + except Exception: + # If unpacking fails, ignore and proceed with None + msg_value = None + + except Exception as e: + self.log.debug( + f"Error while extracting send_msg payload: {e}", exc_info=True + ) try: - response = self._send_random_impl() + response = self._send_msg_impl(msg_value) except NotImplementedError: raise DruncNotImplementedException( message="Implementation missing", - domain="ProcessManager.send_random", + domain="ProcessManager.send_msg", ) except Exception as e: - context_msg = f"Unhandled exception in ProcessManager.send_random: {e}" + context_msg = f"Unhandled exception in ProcessManager.send_msg: {e}" self.log.exception(context_msg) raise DruncCommandException( message=context_msg, - domain="ProcessManager.send_random", + domain="ProcessManager.send_msg", ) + # Expect a PMResponseFlag enum instance return response def _ensure_one_process( diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index e8e7619b4..20d6431d7 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -24,6 +24,7 @@ ProcessQuery, ProcessRestriction, ) +from druncschema.process_manager_pb2 import GenericNotificationMessage from druncschema.process_manager_pb2_grpc import ProcessManagerStub from druncschema.request_response_pb2 import Request, ResponseFlag from druncschema.token_pb2 import Token @@ -88,12 +89,20 @@ def close(self) -> None: # ----- Boot workflow ----- - def send_random(self): + def send_msg(self, msg): request = Request(token=copy_token(self.token)) + # Pack provided message into the Any `data` field when present + if msg is not None: + try: + gm = GenericNotificationMessage(message=str(msg)) + request.data.Pack(gm) + except Exception: + self.log.debug("Failed to pack send_msg payload", exc_info=True) + timeout = 10 try: - self.stub.send_random(request, timeout=timeout) + response = self.stub.send_msg(request, timeout=timeout) except grpc.RpcError as e: try: error_details = extract_grpc_rich_error(e) @@ -104,8 +113,17 @@ def send_random(self): exc_info=True, ) handle_grpc_error(e) + # Log the message that was sent (fall back to previous text) + try: + self.log.critical(str(msg) if msg is not None else "Hello, world!") + except Exception: + self.log.critical("Hello, world!") - self.log.critical("Hello, world!") + # Return the PMResponseFlag from the server (if any) + try: + return response + except UnboundLocalError: + return None def boot( self, diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index c6dc5ec2a..0460b6dce 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -16,7 +16,7 @@ ProcessUUID, ) from druncschema.request_response_pb2 import ResponseFlag -from google.protobuf.empty_pb2 import Empty +from druncschema.process_manager_pb2 import PMResponseFlag, PMmsg from drunc.exceptions import DruncCommandException from drunc.process_manager.process_manager import ProcessManager @@ -491,9 +491,18 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: # self.log.warning(ret) return ret_fmt - def _send_random_impl(self) -> Empty: - self.log.critical("Hello, worldsssear!") - return Empty() + def _send_msg_impl(self, msg: str | None = None) -> PMmsg: + # If a custom message was provided, log it. Otherwise keep legacy text. + try: + if msg: + self.log.critical(msg) + else: + self.log.critical("Hello, worldsssear!") + except Exception: + self.log.critical("Omigosh an exception!") + return PMmsg(flag=PMResponseFlag.FAIL) + + return PMmsg(flag=PMResponseFlag.SUCCESS) def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: self.log.debug(f"{self.name} running boot command") diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 815075753..a37df72f1 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -342,7 +342,7 @@ def unified_shell( #! So now we have a working get_driver object that can communicate with the pm. # lets try sending a random command.. - ctx.obj.get_driver("process_manager").send_random() + ctx.obj.get_driver("process_manager").send_msg("Yolo") # Broadcasting configuration if requested if desc.HasField("broadcast"): diff --git a/tests/process_manager/process_manager_mock_impls.py b/tests/process_manager/process_manager_mock_impls.py index b9b84cf23..2950d42e9 100644 --- a/tests/process_manager/process_manager_mock_impls.py +++ b/tests/process_manager/process_manager_mock_impls.py @@ -16,6 +16,7 @@ ProcessInstanceList, ProcessQuery, ) +from druncschema.process_manager_pb2 import PMResponseFlag, PMmsg from drunc.process_manager.configuration import ( ProcessManagerConfHandler, @@ -102,3 +103,10 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList: return self._not_implemented_response() + + def _send_msg_impl(self, msg: str | None = None) -> PMmsg: + """ + Returns an empty response to indicate communication is working. + Accepts an optional message parameter for compatibility with new API. + """ + return PMmsg(flag=PMResponseFlag.SUCCESS) From 2508d9949cefcaf8fa43304e8c23d4d92c4e6658 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Tue, 12 May 2026 11:14:49 +0200 Subject: [PATCH 18/31] Working peer model? --- src/drunc/process_manager/process_manager.py | 18 +++++++++++++----- .../process_manager/process_manager_driver.py | 13 ++++++++----- .../process_manager/ssh_process_manager.py | 11 ++++++++--- src/drunc/unified_shell/shell.py | 2 +- .../process_manager_mock_impls.py | 5 +++-- 5 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 3cb73ed42..f39abdddc 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -11,14 +11,13 @@ from druncschema.opmon.process_manager_pb2 import ProcessStatus from druncschema.process_manager_pb2 import ( BootRequest, + GenericNotificationMessage, LogLines, LogRequest, + PMmsg, ProcessInstance, ProcessInstanceList, ProcessQuery, - GenericNotificationMessage, - PMResponseFlag, - PMmsg, ) from druncschema.process_manager_pb2_grpc import ProcessManagerServicer from druncschema.request_response_pb2 import ( @@ -566,7 +565,7 @@ def logs(self, request: LogRequest, context: ServicerContext) -> LogLines: return response @abc.abstractmethod - def _send_msg_impl(self, msg: str | None = None) -> PMmsg: + def _send_msg_impl(self, msg: str | None = None, peer: str | None = None) -> PMmsg: raise NotImplementedError @broadcasted @@ -575,6 +574,15 @@ def _send_msg_impl(self, msg: str | None = None) -> PMmsg: ) def send_msg(self, request: Request, context: ServicerContext) -> PMmsg: self.log.debug(f"{self.name} running send_msg") + peer = None + try: + peer = context.peer() + except Exception: + self.log.debug("Could not determine caller peer", exc_info=True) + + if peer: + self.log.info(f"{self.name} send_msg called from {peer}") + # Try to extract an optional GenericNotificationMessage from request.data msg_value = None try: @@ -597,7 +605,7 @@ def send_msg(self, request: Request, context: ServicerContext) -> PMmsg: ) try: - response = self._send_msg_impl(msg_value) + response = self._send_msg_impl(msg_value, peer) except NotImplementedError: raise DruncNotImplementedException( message="Implementation missing", diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 20d6431d7..4e8c06ffe 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -16,6 +16,7 @@ from druncschema.description_pb2 import Description from druncschema.process_manager_pb2 import ( BootRequest, + GenericNotificationMessage, LogLines, LogRequest, ProcessDescription, @@ -24,7 +25,6 @@ ProcessQuery, ProcessRestriction, ) -from druncschema.process_manager_pb2 import GenericNotificationMessage from druncschema.process_manager_pb2_grpc import ProcessManagerStub from druncschema.request_response_pb2 import Request, ResponseFlag from druncschema.token_pb2 import Token @@ -87,8 +87,6 @@ def close(self) -> None: except Exception as e: self.log.error(f"Error closing gRPC channel: {e}", exc_info=True) - # ----- Boot workflow ----- - def send_msg(self, msg): request = Request(token=copy_token(self.token)) # Pack provided message into the Any `data` field when present @@ -113,11 +111,14 @@ def send_msg(self, msg): exc_info=True, ) handle_grpc_error(e) + # Log the message that was sent (fall back to previous text) try: - self.log.critical(str(msg) if msg is not None else "Hello, world!") + self.log.critical( + f"MSG: {msg}" + ) # TODO: Remove this if you dont want this here except Exception: - self.log.critical("Hello, world!") + self.log.critical("AN EXCEPTION! GASP") # Return the PMResponseFlag from the server (if any) try: @@ -125,6 +126,8 @@ def send_msg(self, msg): except UnboundLocalError: return None + # ----- Boot workflow ----- + def boot( self, conf_file: str, diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 0460b6dce..282691a86 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -8,6 +8,8 @@ BootRequest, LogLines, LogRequest, + PMmsg, + PMResponseFlag, ProcessDescription, ProcessInstance, ProcessInstanceList, @@ -16,7 +18,6 @@ ProcessUUID, ) from druncschema.request_response_pb2 import ResponseFlag -from druncschema.process_manager_pb2 import PMResponseFlag, PMmsg from drunc.exceptions import DruncCommandException from drunc.process_manager.process_manager import ProcessManager @@ -491,11 +492,15 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: # self.log.warning(ret) return ret_fmt - def _send_msg_impl(self, msg: str | None = None) -> PMmsg: + def _send_msg_impl(self, msg: str | None = None, peer: str | None = None) -> PMmsg: # If a custom message was provided, log it. Otherwise keep legacy text. + + # TODO: don't forget to do _something_ for the k8s instance as well try: + # if peer: + # self.log.info(f"send_msg originated from {peer}") if msg: - self.log.critical(msg) + self.log.critical(f"msg: {msg}, from:{peer}") else: self.log.critical("Hello, worldsssear!") except Exception: diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index a37df72f1..aaff843cf 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -342,7 +342,7 @@ def unified_shell( #! So now we have a working get_driver object that can communicate with the pm. # lets try sending a random command.. - ctx.obj.get_driver("process_manager").send_msg("Yolo") + ctx.obj.get_driver("process_manager").send_msg(f"{getpass.getuser()} connected!") # Broadcasting configuration if requested if desc.HasField("broadcast"): diff --git a/tests/process_manager/process_manager_mock_impls.py b/tests/process_manager/process_manager_mock_impls.py index 2950d42e9..2151806c1 100644 --- a/tests/process_manager/process_manager_mock_impls.py +++ b/tests/process_manager/process_manager_mock_impls.py @@ -13,10 +13,11 @@ BootRequest, LogLines, LogRequest, + PMmsg, + PMResponseFlag, ProcessInstanceList, ProcessQuery, ) -from druncschema.process_manager_pb2 import PMResponseFlag, PMmsg from drunc.process_manager.configuration import ( ProcessManagerConfHandler, @@ -104,7 +105,7 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList: return self._not_implemented_response() - def _send_msg_impl(self, msg: str | None = None) -> PMmsg: + def _send_msg_impl(self, msg: str | None = None, peer: str | None = None) -> PMmsg: """ Returns an empty response to indicate communication is working. Accepts an optional message parameter for compatibility with new API. From d138c328cc3f1f6999dc36a2f1a2c552e3a56a77 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Tue, 12 May 2026 12:40:15 +0200 Subject: [PATCH 19/31] Clean up grpc vibes --- src/drunc/process_manager/process_manager.py | 8 +++++--- .../process_manager/ssh_process_manager.py | 17 +++++------------ .../process_manager_mock_impls.py | 9 +++++---- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index f39abdddc..754bd8042 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -8,13 +8,13 @@ from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.description_pb2 import CommandDescription, Description +from druncschema.generic_pb2 import OutcomeStatus from druncschema.opmon.process_manager_pb2 import ProcessStatus from druncschema.process_manager_pb2 import ( BootRequest, GenericNotificationMessage, LogLines, LogRequest, - PMmsg, ProcessInstance, ProcessInstanceList, ProcessQuery, @@ -565,14 +565,16 @@ def logs(self, request: LogRequest, context: ServicerContext) -> LogLines: return response @abc.abstractmethod - def _send_msg_impl(self, msg: str | None = None, peer: str | None = None) -> PMmsg: + def _send_msg_impl( + self, msg: str | None = None, peer: str | None = None + ) -> OutcomeStatus: raise NotImplementedError @broadcasted @authentified_and_authorised( action=ActionType.READ, system=SystemType.PROCESS_MANAGER ) - def send_msg(self, request: Request, context: ServicerContext) -> PMmsg: + def send_msg(self, request: Request, context: ServicerContext) -> OutcomeStatus: self.log.debug(f"{self.name} running send_msg") peer = None try: diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 282691a86..f7f138c6f 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -4,12 +4,11 @@ from typing import List, Optional from druncschema.broadcast_pb2 import BroadcastType +from druncschema.generic_pb2 import OutcomeFlag, OutcomeStatus from druncschema.process_manager_pb2 import ( BootRequest, LogLines, LogRequest, - PMmsg, - PMResponseFlag, ProcessDescription, ProcessInstance, ProcessInstanceList, @@ -492,22 +491,16 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: # self.log.warning(ret) return ret_fmt - def _send_msg_impl(self, msg: str | None = None, peer: str | None = None) -> PMmsg: + def _send_msg_impl(self, msg: str, peer: str) -> OutcomeStatus: # If a custom message was provided, log it. Otherwise keep legacy text. - # TODO: don't forget to do _something_ for the k8s instance as well try: - # if peer: - # self.log.info(f"send_msg originated from {peer}") - if msg: - self.log.critical(f"msg: {msg}, from:{peer}") - else: - self.log.critical("Hello, worldsssear!") + self.log.critical(f"{msg}; from {peer}") except Exception: self.log.critical("Omigosh an exception!") - return PMmsg(flag=PMResponseFlag.FAIL) + return OutcomeStatus(flag=OutcomeFlag.FAIL) - return PMmsg(flag=PMResponseFlag.SUCCESS) + return OutcomeStatus(flag=OutcomeFlag.SUCCESS) def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: self.log.debug(f"{self.name} running boot command") diff --git a/tests/process_manager/process_manager_mock_impls.py b/tests/process_manager/process_manager_mock_impls.py index 2151806c1..4b4aad7a6 100644 --- a/tests/process_manager/process_manager_mock_impls.py +++ b/tests/process_manager/process_manager_mock_impls.py @@ -9,12 +9,11 @@ from typing import Optional from unittest.mock import Mock +from druncschema.generic_pb2 import OutcomeFlag, OutcomeStatus from druncschema.process_manager_pb2 import ( BootRequest, LogLines, LogRequest, - PMmsg, - PMResponseFlag, ProcessInstanceList, ProcessQuery, ) @@ -105,9 +104,11 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList: return self._not_implemented_response() - def _send_msg_impl(self, msg: str | None = None, peer: str | None = None) -> PMmsg: + def _send_msg_impl( + self, msg: str | None = None, peer: str | None = None + ) -> OutcomeStatus: """ Returns an empty response to indicate communication is working. Accepts an optional message parameter for compatibility with new API. """ - return PMmsg(flag=PMResponseFlag.SUCCESS) + return OutcomeStatus(flag=OutcomeFlag.SUCCESS) From ce1d81480ba364650163f5f1fd7a14768358d814 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Tue, 12 May 2026 12:42:58 +0200 Subject: [PATCH 20/31] [untested] k8s impl --- src/drunc/process_manager/k8s_process_manager.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 165f760b6..9c52ec1a5 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -13,6 +13,7 @@ # Local Application Imports from druncschema.broadcast_pb2 import BroadcastType +from druncschema.generic_pb2 import OutcomeFlag, OutcomeStatus from druncschema.process_manager_pb2 import ( BootRequest, LogLines, @@ -1910,6 +1911,17 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: lines=[f"Could not retrieve logs: {e.reason}"], ) + def _send_msg_impl(self, msg: str, peer: str) -> OutcomeStatus: + # If a custom message was provided, log it. Otherwise keep legacy text. + # TODO: don't forget to do _something_ for the k8s instance as well + try: + self.log.critical(f"{msg}; from {peer}") + except Exception: + self.log.critical("Omigosh an exception!") + return OutcomeStatus(flag=OutcomeFlag.FAIL) + + return OutcomeStatus(flag=OutcomeFlag.SUCCESS) + def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: """ Handles the 'boot' command from the gRPC interface. @@ -2516,7 +2528,9 @@ def kill_and_wait(uuids, grace_period=None) -> None: pods_by_role[role].append(uuid) if role == "segment-controller": tree_id = pod.metadata.labels.get(tree_id_label_key, "") - segment_controller_depths[uuid] = tree_id.count(".") if tree_id else 0 + segment_controller_depths[uuid] = ( + tree_id.count(".") if tree_id else 0 + ) # Kill in stages using our sorted lists for role in PROCESS_SHUTDOWN_ORDERING: From ce5d688c0c85220eba536eab270b466acd3c13c8 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Tue, 12 May 2026 12:48:29 +0200 Subject: [PATCH 21/31] Add disconnect message --- src/drunc/unified_shell/shell.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index aaff843cf..90a69c5e4 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -546,6 +546,9 @@ def cleanup(): ) # Remove the connection to the process manager + ctx.obj.get_driver("process_manager").send_msg( + f"{getpass.getuser()} disconnected!" + ) ctx.obj.get_driver("process_manager").close() ctx.obj.delete_driver("process_manager") From 5e083bfd92600a5eae9231e7704687a520db634c Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Tue, 12 May 2026 15:18:52 +0200 Subject: [PATCH 22/31] Fix #904 --- src/drunc/unified_shell/context.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/drunc/unified_shell/context.py b/src/drunc/unified_shell/context.py index 318983739..b7a682888 100644 --- a/src/drunc/unified_shell/context.py +++ b/src/drunc/unified_shell/context.py @@ -4,6 +4,7 @@ from druncschema.token_pb2 import Token from drunc.utils.shell_utils import ShellContext +from drunc.utils.utils import resolve_localhost_to_hostname class UnifiedShellMode(Enum): @@ -28,7 +29,7 @@ def __init__(self): super(UnifiedShellContext, self).__init__() def reset(self, address_pm: str = ""): - self.address_pm = address_pm + self.address_pm = resolve_localhost_to_hostname(address_pm) super(UnifiedShellContext, self)._reset(name="unified_shell") def create_drivers(self, **kwargs) -> Mapping[str, object]: From cc89b87a9d6f3402ef77457347cc7934554a89d1 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 13 May 2026 12:59:23 +0200 Subject: [PATCH 23/31] add connect/disconnect in pm shell and say where from --- src/drunc/process_manager/interface/shell.py | 7 +++++++ src/drunc/unified_shell/shell.py | 6 ++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/drunc/process_manager/interface/shell.py b/src/drunc/process_manager/interface/shell.py index 931f4f1e3..6525113d9 100644 --- a/src/drunc/process_manager/interface/shell.py +++ b/src/drunc/process_manager/interface/shell.py @@ -60,6 +60,10 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> # process_manager_shell_log.error(e.message) # TODO: Keep this for production branch, remove this from dev branch exit(1) + ctx.obj.get_driver("process_manager").send_msg( + f"{getpass.getuser()} connected from PM shell" + ) + # Manually add file handler to process manager log # Not possible to initialise logger immediately as it requires # knowledge of the log path @@ -76,6 +80,9 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> ctx.obj.start_listening(desc.broadcast) def cleanup(): + ctx.obj.get_driver("process_manager").send_msg( + f"{getpass.getuser()} disconnected from PM shell" + ) ctx.obj.terminate() process_manager_log.warning( f"[green]{getpass.getuser()}[/green] disconnected from the process manager through a [green]drunc-process-manager-shell[/green]" diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 90a69c5e4..fc3278f35 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -342,7 +342,9 @@ def unified_shell( #! So now we have a working get_driver object that can communicate with the pm. # lets try sending a random command.. - ctx.obj.get_driver("process_manager").send_msg(f"{getpass.getuser()} connected!") + ctx.obj.get_driver("process_manager").send_msg( + f"{getpass.getuser()} connected from unified shell" + ) # Broadcasting configuration if requested if desc.HasField("broadcast"): @@ -547,7 +549,7 @@ def cleanup(): # Remove the connection to the process manager ctx.obj.get_driver("process_manager").send_msg( - f"{getpass.getuser()} disconnected!" + f"{getpass.getuser()} disconnected from unified shell" ) ctx.obj.get_driver("process_manager").close() ctx.obj.delete_driver("process_manager") From 12e730254a4fc328c98048f952a3caef07452525 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 13 May 2026 13:00:16 +0200 Subject: [PATCH 24/31] resolve hostname --- src/drunc/process_manager/process_manager.py | 46 +++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 754bd8042..6013fe830 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -1,5 +1,7 @@ import abc +import ipaddress import re +import socket import sys import threading import time @@ -583,7 +585,47 @@ def send_msg(self, request: Request, context: ServicerContext) -> OutcomeStatus: self.log.debug("Could not determine caller peer", exc_info=True) if peer: - self.log.info(f"{self.name} send_msg called from {peer}") + peer_display = peer + peer_match = re.match(r"^(?P[^:]+):(?P
.+)$", peer) + if peer_match: + # transport = peer_match.group("transport") + address = peer_match.group("address") + host = None + port = None + + # Handle both ipv4:host:port and ipv6:[host]:port formats. + bracket_match = re.match( + r"^\[(?P[^\]]+)\]:(?P\d+)$", address + ) + if bracket_match: + host = bracket_match.group("host") + port = bracket_match.group("port") + else: + host_port = address.rsplit(":", 1) + if len(host_port) == 2 and host_port[1].isdigit(): + host, port = host_port + + if host: + resolved_host = host + try: + ip_obj = ipaddress.ip_address(host) + try: + resolved_host, _, _ = socket.gethostbyaddr(str(ip_obj)) + except ( + socket.herror, + socket.gaierror, + socket.timeout, + OSError, + ): + resolved_host = host + except ValueError: + resolved_host = host + + if ":" in resolved_host and not resolved_host.startswith("["): + resolved_host = f"[{resolved_host}]" + peer_display = f"{resolved_host}:{port}" + + # self.log.info(f"{self.name} send_msg called from {peer_display}") # Try to extract an optional GenericNotificationMessage from request.data msg_value = None @@ -607,7 +649,7 @@ def send_msg(self, request: Request, context: ServicerContext) -> OutcomeStatus: ) try: - response = self._send_msg_impl(msg_value, peer) + response = self._send_msg_impl(msg_value, peer_display) except NotImplementedError: raise DruncNotImplementedException( message="Implementation missing", From a663093bfb1105cc91b0b67dd17b35db4d76569f Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 13 May 2026 17:02:46 +0200 Subject: [PATCH 25/31] Add basic ability to send over logs --- src/drunc/controller/interface/context.py | 2 ++ .../process_manager/interface/commands.py | 18 +++++++++++++++++- src/drunc/process_manager/interface/context.py | 2 ++ src/drunc/unified_shell/commands.py | 17 ++++++++++++++++- src/drunc/unified_shell/context.py | 2 ++ src/drunc/utils/shell_utils.py | 12 ++++++++++++ 6 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/drunc/controller/interface/context.py b/src/drunc/controller/interface/context.py index 082c42a00..c114756de 100644 --- a/src/drunc/controller/interface/context.py +++ b/src/drunc/controller/interface/context.py @@ -14,6 +14,8 @@ class ControllerContext(ShellContext): # boilerplatefest + shell_id = "controller_shell" + def __init__(self): self.status_receiver = None self.took_control = False diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 22865308f..1e6c6e092 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -12,10 +12,17 @@ ) from drunc.process_manager.interface.context import ProcessManagerContext from drunc.process_manager.utils import tabulate_process_instance_list -from drunc.utils.shell_utils import InterruptedCommand +from drunc.utils.shell_utils import InterruptedCommand, send_cmd_msg_pm from drunc.utils.utils import get_logger +def log_cmd(obj: ProcessManagerContext): + ctx_test = click.get_current_context(silent=True) + cmd_name_test = ctx_test.command.name if ctx_test else None + send_cmd_msg_pm(obj, cmd_name_test) + #! TODO: Missing the queries!!! + + @click.command("boot") @click.option( "-u", @@ -44,6 +51,8 @@ def boot( override_logs: bool, ) -> None: log = get_logger("process_manager.shell") + log_cmd(obj) + processes = obj.get_driver("process_manager").ps(ProcessQuery(user=user)) if len(processes.values) > 0: @@ -130,6 +139,7 @@ def dummy_boot( session_name: str, ) -> None: log = get_logger("process_manager.shell") + log_cmd(obj) log.debug( f"Running dummy_boot with {n_processes} processes for {sleep} seconds {n_sleeps} times, requested by user {user}" ) @@ -172,6 +182,7 @@ def wait(obj: ProcessManagerContext, sleep_time: int) -> None: @click.pass_obj def terminate(obj: ProcessManagerContext, width: int | None) -> None: log = get_logger("process_manager.shell") + log_cmd(obj) log.debug("Terminating") result = obj.get_driver("process_manager").terminate() if not result: @@ -204,6 +215,7 @@ def kill_decorators(f): @add_query_options(at_least_one=True) @kill_decorators def kill(obj, query, width): + log_cmd(obj) return kill_impl(obj, query, width) @@ -236,6 +248,7 @@ def flush_decorators(f): @add_query_options(at_least_one=False, all_processes_by_default=True) @flush_decorators def flush(obj, query, width): + log_cmd(obj) return flush_impl(obj, query, width) @@ -272,6 +285,7 @@ def logs_decorators(f): @add_query_options(at_least_one=True) @logs_decorators def logs(obj, how_far, grep, query): + log_cmd(obj) return logs_impl(obj, how_far, grep, query) @@ -317,6 +331,7 @@ def logs_impl( @add_query_options(at_least_one=True) @click.pass_obj def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None: + log_cmd(obj) return restart_impl(obj, query) @@ -351,6 +366,7 @@ def ps( width: int | None, ) -> None: log = get_logger("process_manager.shell") + log_cmd(obj) log.debug(f"Running ps with query {query}") results = obj.get_driver("process_manager").ps(query) if not results: diff --git a/src/drunc/process_manager/interface/context.py b/src/drunc/process_manager/interface/context.py index a81396130..a00400c51 100644 --- a/src/drunc/process_manager/interface/context.py +++ b/src/drunc/process_manager/interface/context.py @@ -14,6 +14,8 @@ class ProcessManagerContext(ShellContext): # boilerplatefest + shell_id = "process_manager_shell" + def __init__(self, *args, **kwargs): self.status_receiver = None super(ProcessManagerContext, self).__init__(*args, **kwargs) diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index 919ce3a97..6c58f24d8 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -20,10 +20,17 @@ from drunc.process_manager.interface.context import ProcessManagerContext from drunc.process_manager.utils import tabulate_process_instance_list from drunc.unified_shell.context import UnifiedShellMode -from drunc.utils.shell_utils import InterruptedCommand +from drunc.utils.shell_utils import InterruptedCommand, send_cmd_msg_pm from drunc.utils.utils import get_logger +def log_cmd(obj: ProcessManagerContext): + ctx_test = click.get_current_context(silent=True) + cmd_name_test = ctx_test.command.name if ctx_test else None + send_cmd_msg_pm(obj, cmd_name_test) + #! TODO: Missing the queries!!! + + @click.command("boot") @click.option( "-o/-no", @@ -44,6 +51,7 @@ def boot( sleep_between_app_boot: int | float = 0, ) -> None: log = get_logger("unified_shell.boot") + log_cmd(obj) session_name = obj.session_name user = getpass.getuser() processes = obj.get_driver("process_manager").ps( @@ -123,6 +131,7 @@ def terminate(ctx, obj): """ log = get_logger("unified_shell.terminate") + log_cmd(obj) session_query = ProcessQuery(session=ctx.obj.session_name) log.info(f"Terminating session [green]{ctx.obj.session_name}[/]") obj.get_driver("process_manager").kill(session_query) @@ -138,6 +147,7 @@ def ps(ctx, obj): """ log = get_logger("unified_shell.ps") + log_cmd(obj) session_query = ProcessQuery(session=ctx.obj.session_name) log.info(f"Listing session [green]{ctx.obj.session_name}[/]") results = obj.get_driver("process_manager").ps(session_query) @@ -167,6 +177,7 @@ def wrapper(ctx, *args, **kwargs): def logs(obj, how_far, grep, query): log = get_logger("unified_shell.logs") log.info("getting logs") + log_cmd(obj) return logs_impl(obj, how_far, grep, query) @@ -179,6 +190,7 @@ def logs(obj, how_far, grep, query): def kill(obj, query, width): log = get_logger("unified_shell.logs") log.info("getting logs") + log_cmd(obj) return kill_impl(obj, query, width) @@ -189,6 +201,7 @@ def kill(obj, query, width): @add_query_options_no_session(at_least_one=True) @flush_decorators def flush(obj, query, width): + log_cmd(obj) return flush_impl(obj, query, width) @@ -197,6 +210,7 @@ def flush(obj, query, width): @add_query_options_no_session(at_least_one=True) @click.pass_obj def restart(obj, query): + log_cmd(obj) return restart_impl(obj, query) @@ -217,6 +231,7 @@ def start_shell(ctx, obj): allowing you to execute commands interactively. """ log = get_logger("unified_shell.start_shell") + log_cmd(obj) obj.running_mode = UnifiedShellMode.SEMIBATCH log.info("Switching to interactive mode...") diff --git a/src/drunc/unified_shell/context.py b/src/drunc/unified_shell/context.py index b7a682888..9997326d0 100644 --- a/src/drunc/unified_shell/context.py +++ b/src/drunc/unified_shell/context.py @@ -14,6 +14,8 @@ class UnifiedShellMode(Enum): class UnifiedShellContext(ShellContext): # boilerplatefest + shell_id = "unified_shell" + def __init__(self): self.status_receiver_pm = None self.status_receiver_controller = None diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index d1443968a..5cea82c27 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -72,6 +72,11 @@ def __str__(self): class ShellContext: + shell_id = None + + def get_shell_id(self): + return self.shell_id + def _reset(self, name: str, token_args: dict = {}, driver_args: dict = {}): self._console = Console() self._token = self.create_token(**token_args) @@ -165,3 +170,10 @@ def print_status_summary(self) -> None: log.info( f"Current FSM status is [green]{current_state}[/green]. Available transitions are [green]{'[/green], [green]'.join(available_actions)}[/green]. Available sequence commands are [green]{'[/green], [green]'.join(available_sequences)}[/green]." ) + + +def send_cmd_msg_pm(sc: ShellContext, cmd: str): + # Keep it pm for now, easy to generalise though + sc.get_driver("process_manager").send_msg( + f" {getpass.getuser()} sent {cmd} from {sc.get_shell_id()}" + ) From d020839af05bc14520c80875fd3a08e0cb500f72 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Wed, 13 May 2026 17:45:19 +0200 Subject: [PATCH 26/31] help fix #326 --- src/drunc/process_manager/configuration.py | 4 ++++ src/drunc/process_manager/interface/shell.py | 5 +++++ .../process_manager/k8s_process_manager.py | 3 +++ src/drunc/process_manager/process_manager.py | 11 +++++++++-- .../process_manager/ssh_process_manager.py | 5 +++++ src/drunc/unified_shell/shell.py | 17 ++++++++++++----- 6 files changed, 38 insertions(+), 7 deletions(-) diff --git a/src/drunc/process_manager/configuration.py b/src/drunc/process_manager/configuration.py index da9198722..8ae9b9c22 100644 --- a/src/drunc/process_manager/configuration.py +++ b/src/drunc/process_manager/configuration.py @@ -38,6 +38,10 @@ class ProcessManagerTypes(Enum): SSH_PARAMIKO = 3 +# we'll probably want to leveare these types +# and then we'll unpack to name + + class ProcessManagerConfData: def __init__(self): self.broadcaster = None diff --git a/src/drunc/process_manager/interface/shell.py b/src/drunc/process_manager/interface/shell.py index 6525113d9..e755b39f7 100644 --- a/src/drunc/process_manager/interface/shell.py +++ b/src/drunc/process_manager/interface/shell.py @@ -48,9 +48,12 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> ) process_manager_shell_log = get_logger("process_manager.shell") + process_manager_shell_log.warning(f"{process_manager_address=}") + ctx.obj.reset(address=process_manager_address) try: + process_manager_log.critical("About to run describe") desc = ctx.obj.get_driver("process_manager").describe() except ServerUnreachable as e: process_manager_shell_log.critical("Could not connect to the process manager") @@ -76,6 +79,8 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> process_manager_shell_log.info( f"Connected to {process_manager_address}, running '{desc.name}.{desc.session}' (name.session), starting listening..." ) + + process_manager_shell_log.error(f"This is a test {desc.name}, {desc.type}, {desc}") if desc.HasField("broadcast"): ctx.obj.start_listening(desc.broadcast) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 9c52ec1a5..c2456343f 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -40,6 +40,7 @@ from drunc.process_manager.configuration import ( PROCESS_SHUTDOWN_ORDERING, ProcessManagerConfHandler, + ProcessManagerTypes, ) from drunc.process_manager.process_manager import ProcessManager from drunc.process_manager.utils import on_parent_exit, validate_k8s_session_name @@ -153,6 +154,8 @@ def run(self) -> None: class K8sProcessManager(ProcessManager): + pm_type = ProcessManagerTypes.K8s + def __init__(self, configuration: ProcessManagerConfHandler, **kwargs) -> None: """ Manages processes as Kubernetes Pods. diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 6013fe830..56ba36613 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -55,6 +55,8 @@ def __init__(self, txt): class ProcessManager(abc.ABC, ProcessManagerServicer): + pm_type = ProcessManagerTypes.Unknown + def __init__( self, configuration: ProcessManagerConfHandler, @@ -502,10 +504,15 @@ def flush( action=ActionType.READ, system=SystemType.PROCESS_MANAGER ) # 2nd step def describe(self, request: Request, context: ServicerContext) -> Description: - self.log.debug(f"{self.name} running describe") + self.log.warning(f"{self.name} running describe") + + # response = self._describe_impl() response = Description( - type="process_manager", + type=self.pm_type.name, # change the type based on what it reads as. ITS AN ENUM! + # try to get the superclass class name + # hook into the configuration, so you get the class name from the enum + # so we want this to be a string (has to be string) name=self.name, info=self.get_log_path(), session="no_session" if not self.session else self.session, diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index f7f138c6f..127256926 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -19,11 +19,14 @@ from druncschema.request_response_pb2 import ResponseFlag from drunc.exceptions import DruncCommandException +from drunc.process_manager.configuration import ProcessManagerTypes from drunc.process_manager.process_manager import ProcessManager from drunc.processes.ssh_process_lifetime_manager import ProcessLifetimeManager class SSHProcessManager(ProcessManager): + pm_type = ProcessManagerTypes.SSH_SHELL + def __init__( self, configuration, LifetimeManagerClass: ProcessLifetimeManager, **kwargs ): @@ -226,6 +229,8 @@ def _terminate_impl(self) -> ProcessInstanceList: flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + # def _describe_impl() + def _logs_impl(self, log_request: LogRequest) -> LogLines: """ Retrieve log output from a remote process. diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index fc3278f35..6ecb99cb3 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -293,10 +293,6 @@ def unified_shell( process_manager_address = process_manager.replace( "grpc://", "" ) # remove the grpc scheme - unified_shell_log.info( - f"[green]unified_shell[/green] connected to the [green]process_manager" - f"[/green] at address [green]{process_manager_address}[/green]" - ) unified_shell_log.debug( f"[green]process_manager[/green] started, communicating through address [green]" @@ -304,6 +300,8 @@ def unified_shell( ) #! This is where the context objects connection detail + + unified_shell_log.warning(f"{process_manager_address=}") ctx.obj.reset(address_pm=process_manager_address) # Run a simple command (describe) to check the connection with the process manager @@ -312,13 +310,16 @@ def unified_shell( #! Here we're getting the driver! try: + unified_shell_log.critical("About to run describe") desc = ctx.obj.get_driver().describe() except Exception as e: unified_shell_log.error( f"[red]Could not connect to the process manager at the address: [/red]" f"[green]{process_manager_address}[/green]" ) - unified_shell_log.debug(f"Reason: {e}") + unified_shell_log.critical(f"Reason: {e}") + + #! Basically also parse out the type and log it around here as well if type(e) == ServerUnreachable: unified_shell_log.error( @@ -342,6 +343,12 @@ def unified_shell( #! So now we have a working get_driver object that can communicate with the pm. # lets try sending a random command.. + + unified_shell_log.info( + f"[green]unified_shell[/green] connected to the [green]process_manager" # this is bad + f"[/green] at address [green]{process_manager_address}[/green]" + ) + ctx.obj.get_driver("process_manager").send_msg( f"{getpass.getuser()} connected from unified shell" ) From aa8b514f4bbc005e35e9d3fd77fa97234f349070 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Thu, 14 May 2026 10:35:04 +0200 Subject: [PATCH 27/31] Add queries --- .../process_manager/interface/commands.py | 25 ++++++---------- src/drunc/unified_shell/commands.py | 29 +++++++++---------- src/drunc/utils/shell_utils.py | 18 ++++++++---- 3 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 1e6c6e092..6708f15bc 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -12,17 +12,10 @@ ) from drunc.process_manager.interface.context import ProcessManagerContext from drunc.process_manager.utils import tabulate_process_instance_list -from drunc.utils.shell_utils import InterruptedCommand, send_cmd_msg_pm +from drunc.utils.shell_utils import InterruptedCommand, log_pm_cmd from drunc.utils.utils import get_logger -def log_cmd(obj: ProcessManagerContext): - ctx_test = click.get_current_context(silent=True) - cmd_name_test = ctx_test.command.name if ctx_test else None - send_cmd_msg_pm(obj, cmd_name_test) - #! TODO: Missing the queries!!! - - @click.command("boot") @click.option( "-u", @@ -51,7 +44,7 @@ def boot( override_logs: bool, ) -> None: log = get_logger("process_manager.shell") - log_cmd(obj) + log_pm_cmd(obj) processes = obj.get_driver("process_manager").ps(ProcessQuery(user=user)) @@ -139,7 +132,7 @@ def dummy_boot( session_name: str, ) -> None: log = get_logger("process_manager.shell") - log_cmd(obj) + log_pm_cmd(obj) log.debug( f"Running dummy_boot with {n_processes} processes for {sleep} seconds {n_sleeps} times, requested by user {user}" ) @@ -182,7 +175,7 @@ def wait(obj: ProcessManagerContext, sleep_time: int) -> None: @click.pass_obj def terminate(obj: ProcessManagerContext, width: int | None) -> None: log = get_logger("process_manager.shell") - log_cmd(obj) + log_pm_cmd(obj) log.debug("Terminating") result = obj.get_driver("process_manager").terminate() if not result: @@ -215,7 +208,7 @@ def kill_decorators(f): @add_query_options(at_least_one=True) @kill_decorators def kill(obj, query, width): - log_cmd(obj) + log_pm_cmd(obj) return kill_impl(obj, query, width) @@ -248,7 +241,7 @@ def flush_decorators(f): @add_query_options(at_least_one=False, all_processes_by_default=True) @flush_decorators def flush(obj, query, width): - log_cmd(obj) + log_pm_cmd(obj) return flush_impl(obj, query, width) @@ -285,7 +278,7 @@ def logs_decorators(f): @add_query_options(at_least_one=True) @logs_decorators def logs(obj, how_far, grep, query): - log_cmd(obj) + log_pm_cmd(obj) return logs_impl(obj, how_far, grep, query) @@ -331,7 +324,7 @@ def logs_impl( @add_query_options(at_least_one=True) @click.pass_obj def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None: - log_cmd(obj) + log_pm_cmd(obj) return restart_impl(obj, query) @@ -366,7 +359,7 @@ def ps( width: int | None, ) -> None: log = get_logger("process_manager.shell") - log_cmd(obj) + log_pm_cmd(obj) log.debug(f"Running ps with query {query}") results = obj.get_driver("process_manager").ps(query) if not results: diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index 6c58f24d8..d72c28aea 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -20,15 +20,14 @@ from drunc.process_manager.interface.context import ProcessManagerContext from drunc.process_manager.utils import tabulate_process_instance_list from drunc.unified_shell.context import UnifiedShellMode -from drunc.utils.shell_utils import InterruptedCommand, send_cmd_msg_pm +from drunc.utils.shell_utils import InterruptedCommand, log_pm_cmd from drunc.utils.utils import get_logger - -def log_cmd(obj: ProcessManagerContext): - ctx_test = click.get_current_context(silent=True) - cmd_name_test = ctx_test.command.name if ctx_test else None - send_cmd_msg_pm(obj, cmd_name_test) - #! TODO: Missing the queries!!! +# def log_pm_cmd(obj: ProcessManagerContext): +# ctx_test = click.get_current_context(silent=True) +# cmd_name_test = ctx_test.command.name if ctx_test else None +# send_cmd_msg_pm(obj, cmd_name_test) +# #! TODO: Missing the queries!!! @click.command("boot") @@ -51,7 +50,7 @@ def boot( sleep_between_app_boot: int | float = 0, ) -> None: log = get_logger("unified_shell.boot") - log_cmd(obj) + log_pm_cmd(obj) session_name = obj.session_name user = getpass.getuser() processes = obj.get_driver("process_manager").ps( @@ -131,7 +130,7 @@ def terminate(ctx, obj): """ log = get_logger("unified_shell.terminate") - log_cmd(obj) + log_pm_cmd(obj) session_query = ProcessQuery(session=ctx.obj.session_name) log.info(f"Terminating session [green]{ctx.obj.session_name}[/]") obj.get_driver("process_manager").kill(session_query) @@ -147,7 +146,7 @@ def ps(ctx, obj): """ log = get_logger("unified_shell.ps") - log_cmd(obj) + log_pm_cmd(obj) session_query = ProcessQuery(session=ctx.obj.session_name) log.info(f"Listing session [green]{ctx.obj.session_name}[/]") results = obj.get_driver("process_manager").ps(session_query) @@ -177,7 +176,7 @@ def wrapper(ctx, *args, **kwargs): def logs(obj, how_far, grep, query): log = get_logger("unified_shell.logs") log.info("getting logs") - log_cmd(obj) + log_pm_cmd(obj) return logs_impl(obj, how_far, grep, query) @@ -190,7 +189,7 @@ def logs(obj, how_far, grep, query): def kill(obj, query, width): log = get_logger("unified_shell.logs") log.info("getting logs") - log_cmd(obj) + log_pm_cmd(obj) return kill_impl(obj, query, width) @@ -201,7 +200,7 @@ def kill(obj, query, width): @add_query_options_no_session(at_least_one=True) @flush_decorators def flush(obj, query, width): - log_cmd(obj) + log_pm_cmd(obj) return flush_impl(obj, query, width) @@ -210,7 +209,7 @@ def flush(obj, query, width): @add_query_options_no_session(at_least_one=True) @click.pass_obj def restart(obj, query): - log_cmd(obj) + log_pm_cmd(obj) return restart_impl(obj, query) @@ -231,7 +230,7 @@ def start_shell(ctx, obj): allowing you to execute commands interactively. """ log = get_logger("unified_shell.start_shell") - log_cmd(obj) + log_pm_cmd(obj) obj.running_mode = UnifiedShellMode.SEMIBATCH log.info("Switching to interactive mode...") diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index 5cea82c27..d737c312c 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -172,8 +172,16 @@ def print_status_summary(self) -> None: ) -def send_cmd_msg_pm(sc: ShellContext, cmd: str): - # Keep it pm for now, easy to generalise though - sc.get_driver("process_manager").send_msg( - f" {getpass.getuser()} sent {cmd} from {sc.get_shell_id()}" - ) +def log_pm_cmd(obj: ShellContext): + ctx_cmd = click.get_current_context(silent=True) + cmd_name = ctx_cmd.command.name if ctx_cmd else None + parms_dict = {} + for param in ctx_cmd.command.params: + name = param.name + if ctx_cmd.get_parameter_source(name) == click.core.ParameterSource.COMMANDLINE: + parms_dict[name] = f"{ctx_cmd.params[name]!r}" + + args = f" with arguments {parms_dict}" if parms_dict else "" + session = f" for session {obj.session_name}" if hasattr(obj, "session_name") else "" + msg = f"{getpass.getuser()} sent {cmd_name}{args}{session} via {obj.get_shell_id()}" + obj.get_driver("process_manager").send_msg(msg) From 424737817466c5c2c5fd2822992686c037c6ecef Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Thu, 14 May 2026 11:11:01 +0200 Subject: [PATCH 28/31] Clean up send_msg logic --- src/drunc/process_manager/process_manager.py | 78 +++---------------- src/drunc/utils/utils.py | 81 ++++++++++++++++++++ 2 files changed, 92 insertions(+), 67 deletions(-) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 56ba36613..ee7d6d712 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -1,7 +1,5 @@ import abc -import ipaddress import re -import socket import sys import threading import time @@ -26,8 +24,6 @@ Request, ResponseFlag, ) - -# Note: send_msg now returns PMResponseFlag (defined in process_manager.proto) from google.rpc import code_pb2 from grpc import ServicerContext @@ -46,7 +42,7 @@ ProcessManagerTypes, ) from drunc.utils.configuration import ConfTypes -from drunc.utils.utils import get_logger, pid_info_str +from drunc.utils.utils import get_logger, pid_info_str, resolve_context_peer class BadQuery(DruncCommandException): @@ -55,7 +51,7 @@ def __init__(self, txt): class ProcessManager(abc.ABC, ProcessManagerServicer): - pm_type = ProcessManagerTypes.Unknown + pm_type = ProcessManagerTypes.Unknown # Used for describe (and possibly others) def __init__( self, @@ -504,15 +500,10 @@ def flush( action=ActionType.READ, system=SystemType.PROCESS_MANAGER ) # 2nd step def describe(self, request: Request, context: ServicerContext) -> Description: - self.log.warning(f"{self.name} running describe") - - # response = self._describe_impl() + self.log.debug(f"{self.name} running describe") response = Description( - type=self.pm_type.name, # change the type based on what it reads as. ITS AN ENUM! - # try to get the superclass class name - # hook into the configuration, so you get the class name from the enum - # so we want this to be a string (has to be string) + type=self.pm_type.name, name=self.name, info=self.get_log_path(), session="no_session" if not self.session else self.session, @@ -585,57 +576,15 @@ def _send_msg_impl( ) def send_msg(self, request: Request, context: ServicerContext) -> OutcomeStatus: self.log.debug(f"{self.name} running send_msg") - peer = None + try: peer = context.peer() + peer_display = resolve_context_peer(peer) except Exception: - self.log.debug("Could not determine caller peer", exc_info=True) - - if peer: - peer_display = peer - peer_match = re.match(r"^(?P[^:]+):(?P
.+)$", peer) - if peer_match: - # transport = peer_match.group("transport") - address = peer_match.group("address") - host = None - port = None - - # Handle both ipv4:host:port and ipv6:[host]:port formats. - bracket_match = re.match( - r"^\[(?P[^\]]+)\]:(?P\d+)$", address - ) - if bracket_match: - host = bracket_match.group("host") - port = bracket_match.group("port") - else: - host_port = address.rsplit(":", 1) - if len(host_port) == 2 and host_port[1].isdigit(): - host, port = host_port - - if host: - resolved_host = host - try: - ip_obj = ipaddress.ip_address(host) - try: - resolved_host, _, _ = socket.gethostbyaddr(str(ip_obj)) - except ( - socket.herror, - socket.gaierror, - socket.timeout, - OSError, - ): - resolved_host = host - except ValueError: - resolved_host = host - - if ":" in resolved_host and not resolved_host.startswith("["): - resolved_host = f"[{resolved_host}]" - peer_display = f"{resolved_host}:{port}" - - # self.log.info(f"{self.name} send_msg called from {peer_display}") + self.log.warning("Could not determine caller peer", exc_info=True) + peer_display = "unknown" # Try to extract an optional GenericNotificationMessage from request.data - msg_value = None try: if ( request is not None @@ -643,17 +592,13 @@ def send_msg(self, request: Request, context: ServicerContext) -> OutcomeStatus: and request.data is not None ): gm = GenericNotificationMessage() - try: - request.data.Unpack(gm) - msg_value = gm.message - except Exception: - # If unpacking fails, ignore and proceed with None - msg_value = None - + request.data.Unpack(gm) + msg_value = gm.message except Exception as e: self.log.debug( f"Error while extracting send_msg payload: {e}", exc_info=True ) + msg_value = "unknown payload" try: response = self._send_msg_impl(msg_value, peer_display) @@ -671,7 +616,6 @@ def send_msg(self, request: Request, context: ServicerContext) -> OutcomeStatus: domain="ProcessManager.send_msg", ) - # Expect a PMResponseFlag enum instance return response def _ensure_one_process( diff --git a/src/drunc/utils/utils.py b/src/drunc/utils/utils.py index 08d085670..5ef464639 100644 --- a/src/drunc/utils/utils.py +++ b/src/drunc/utils/utils.py @@ -1,4 +1,5 @@ import ctypes +import ipaddress import logging import os import random @@ -463,6 +464,86 @@ def resolve_target_ip(host: str) -> str | None: return None +def resolve_context_peer(peer: str) -> str: + """Resolve a transport-qualified peer string to a display-friendly address. + + The input is expected to look like ``transport:address``. If the address contains + an IP literal, it is reverse-resolved where possible and IPv6 addresses are + re-wrapped in brackets. + + Example: + ``ipv4:10.73.136.70:41750`` -> ``np04-srv-028.cern.ch:41750`` + + Args: + peer (str): Transport-qualified peer string. + + Returns: + str: The original peer string, or a resolved ``host:port`` representation. + """ + + match = re.match(r"^(?P[^:]+):(?P
.+)$", peer) + if not match: + return peer + + parsed = _parse_host_port(match.group("address")) + if parsed is None: + return peer + + host, port = parsed + resolved_host = _resolve_host(host) + return f"{resolved_host}:{port}" + + +def _parse_host_port(address: str) -> tuple[str, str] | None: + """Extract a host and port from a peer address string. + + Supports bracketed IPv6 addresses such as ``[::1]:1234`` and unbracketed + ``host:port`` or ``ipv4:port`` forms. + + Args: + address (str): Address portion of a transport-qualified peer string. + + Returns: + tuple[str, str] | None: ``(host, port)`` when parsing succeeds, otherwise + ``None``. + """ + + bracket_match = re.match(r"^\[(?P[^\]]+)\]:(?P\d+)$", address) + if bracket_match: + return bracket_match.group("host"), bracket_match.group("port") + + host, sep, port = address.rpartition(":") + if sep and port.isdigit(): + return host, port + + return None + + +def _resolve_host(host: str) -> str: + """Reverse-resolve an IP host and keep IPv6 output bracketed. + + Args: + host (str): Hostname or IP literal to resolve. + + Returns: + str: The resolved hostname, or the original host if resolution fails. + """ + + try: + ip_obj = ipaddress.ip_address(host) + except ValueError: + return host + + try: + resolved_host, _, _ = socket.gethostbyaddr(str(ip_obj)) + except (socket.herror, socket.gaierror, socket.timeout, OSError): + resolved_host = host + + if ":" in resolved_host and not resolved_host.startswith("["): + return f"[{resolved_host}]" + return resolved_host + + def is_port_available(host: str, port: int, timeout: int = 2) -> bool: """ Check if the given port number on a specified host is available. From 2edf27adb42e6650b42fa2e9eefa9a3c2cdae81f Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Thu, 14 May 2026 11:18:35 +0200 Subject: [PATCH 29/31] Major cleanup --- src/drunc/process_manager/configuration.py | 4 --- .../process_manager/interface/cli_argument.py | 2 +- .../process_manager/interface/commands.py | 2 -- src/drunc/process_manager/interface/shell.py | 9 ++---- .../process_manager/k8s_process_manager.py | 10 +++---- src/drunc/process_manager/oks_parser.py | 7 ----- .../process_manager/ssh_process_manager.py | 10 +++---- src/drunc/process_manager/utils.py | 3 ++ src/drunc/unified_shell/commands.py | 21 ------------- src/drunc/unified_shell/shell.py | 30 +++---------------- src/drunc/utils/shell_utils.py | 16 +++++++++- 11 files changed, 34 insertions(+), 80 deletions(-) diff --git a/src/drunc/process_manager/configuration.py b/src/drunc/process_manager/configuration.py index 8ae9b9c22..da9198722 100644 --- a/src/drunc/process_manager/configuration.py +++ b/src/drunc/process_manager/configuration.py @@ -38,10 +38,6 @@ class ProcessManagerTypes(Enum): SSH_PARAMIKO = 3 -# we'll probably want to leveare these types -# and then we'll unpack to name - - class ProcessManagerConfData: def __init__(self): self.broadcaster = None diff --git a/src/drunc/process_manager/interface/cli_argument.py b/src/drunc/process_manager/interface/cli_argument.py index e50210f16..abefd002c 100644 --- a/src/drunc/process_manager/interface/cli_argument.py +++ b/src/drunc/process_manager/interface/cli_argument.py @@ -10,7 +10,7 @@ def validate_conf_string(ctx, param, boot_configuration): def add_query_options_no_session( at_least_one: bool, all_processes_by_default: bool = False ): - def wrapper(f1): # -> _Wrapped[Callable[..., Any], Any, Callable[..., Any], Any]: + def wrapper(f1): f2 = click.option( "-n", "--name", diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 6708f15bc..a6b9cb684 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -45,7 +45,6 @@ def boot( ) -> None: log = get_logger("process_manager.shell") log_pm_cmd(obj) - processes = obj.get_driver("process_manager").ps(ProcessQuery(user=user)) if len(processes.values) > 0: @@ -261,7 +260,6 @@ def flush_impl( def logs_decorators(f): - # order matters: apply options from outermost to innermost f = click.pass_obj(f) f = click.option("--grep", type=str, default=None)(f) f = click.option( diff --git a/src/drunc/process_manager/interface/shell.py b/src/drunc/process_manager/interface/shell.py index e755b39f7..a10350569 100644 --- a/src/drunc/process_manager/interface/shell.py +++ b/src/drunc/process_manager/interface/shell.py @@ -48,12 +48,9 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> ) process_manager_shell_log = get_logger("process_manager.shell") - process_manager_shell_log.warning(f"{process_manager_address=}") - ctx.obj.reset(address=process_manager_address) try: - process_manager_log.critical("About to run describe") desc = ctx.obj.get_driver("process_manager").describe() except ServerUnreachable as e: process_manager_shell_log.critical("Could not connect to the process manager") @@ -64,7 +61,7 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> exit(1) ctx.obj.get_driver("process_manager").send_msg( - f"{getpass.getuser()} connected from PM shell" + f"{getpass.getuser()} connected from {ctx.obj.shell_id}" ) # Manually add file handler to process manager log @@ -79,14 +76,12 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> process_manager_shell_log.info( f"Connected to {process_manager_address}, running '{desc.name}.{desc.session}' (name.session), starting listening..." ) - - process_manager_shell_log.error(f"This is a test {desc.name}, {desc.type}, {desc}") if desc.HasField("broadcast"): ctx.obj.start_listening(desc.broadcast) def cleanup(): ctx.obj.get_driver("process_manager").send_msg( - f"{getpass.getuser()} disconnected from PM shell" + f"{getpass.getuser()} disconnected from {ctx.obj.shell_id}" ) ctx.obj.terminate() process_manager_log.warning( diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index c2456343f..00db084ee 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -1915,12 +1915,12 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: ) def _send_msg_impl(self, msg: str, peer: str) -> OutcomeStatus: - # If a custom message was provided, log it. Otherwise keep legacy text. - # TODO: don't forget to do _something_ for the k8s instance as well + # Note: currently exact same implementation as ssh manager + # Although there is room here to change as necessary try: - self.log.critical(f"{msg}; from {peer}") - except Exception: - self.log.critical("Omigosh an exception!") + self.log.info(f"{msg}; from {peer}") + except Exception as e: + self.log.critical(f"Failed to receive message with exception {e}") return OutcomeStatus(flag=OutcomeFlag.FAIL) return OutcomeStatus(flag=OutcomeFlag.SUCCESS) diff --git a/src/drunc/process_manager/oks_parser.py b/src/drunc/process_manager/oks_parser.py index 2615afc7e..5feb6ba50 100644 --- a/src/drunc/process_manager/oks_parser.py +++ b/src/drunc/process_manager/oks_parser.py @@ -75,13 +75,6 @@ def get_full_db_path(db_path: str) -> str: err_str = f"No files found in DUNEDAQ_DB_PATH matching {db_path}." raise DruncSetupException(err_str) - # If multiple matches are found, take the first instance that matches. - #! This is a horrible way of choosing it... it could have gone into the cvmfs thing - #! we should put in the file_is_read_only config - #! and loop across until you find one where its not read only - - #! Also clean up this logic right here - # Prefer the first writable match; if every match is read-only, fall back to the first one. resolved_path = unique_matched_files[0] for matched_file in unique_matched_files: diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 127256926..a151f5172 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -229,8 +229,6 @@ def _terminate_impl(self) -> ProcessInstanceList: flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) - # def _describe_impl() - def _logs_impl(self, log_request: LogRequest) -> LogLines: """ Retrieve log output from a remote process. @@ -497,12 +495,12 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: return ret_fmt def _send_msg_impl(self, msg: str, peer: str) -> OutcomeStatus: - # If a custom message was provided, log it. Otherwise keep legacy text. - # TODO: don't forget to do _something_ for the k8s instance as well try: + # TODO: THIS IS CURRENTLY CRITICAL FOR EASIER TESTING + # DO _NOT_ MERGE UNTIL THIS IS BACK TO INFO! self.log.critical(f"{msg}; from {peer}") - except Exception: - self.log.critical("Omigosh an exception!") + except Exception as e: + self.log.critical(f"Failed to receive message with exception {e}") return OutcomeStatus(flag=OutcomeFlag.FAIL) return OutcomeStatus(flag=OutcomeFlag.SUCCESS) diff --git a/src/drunc/process_manager/utils.py b/src/drunc/process_manager/utils.py index 095d70df1..bf4979895 100644 --- a/src/drunc/process_manager/utils.py +++ b/src/drunc/process_manager/utils.py @@ -303,6 +303,9 @@ def validate_k8s_session_name(session: str) -> bool: return True +#! Note for future developers +# This can probably be removed since we've added the +# pm_type attribute in each of the process managers def get_pm_type_from_name(pm_name: str) -> ProcessManagerTypes: """ Get the ProcessManagerTypes enum value from a string name. diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index d72c28aea..32fd03fdd 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -23,12 +23,6 @@ from drunc.utils.shell_utils import InterruptedCommand, log_pm_cmd from drunc.utils.utils import get_logger -# def log_pm_cmd(obj: ProcessManagerContext): -# ctx_test = click.get_current_context(silent=True) -# cmd_name_test = ctx_test.command.name if ctx_test else None -# send_cmd_msg_pm(obj, cmd_name_test) -# #! TODO: Missing the queries!!! - @click.command("boot") @click.option( @@ -168,33 +162,24 @@ def wrapper(ctx, *args, **kwargs): return update_wrapper(wrapper, f) -# Logs @click.command("logs") @session_injector @add_query_options_no_session(at_least_one=True) @logs_decorators def logs(obj, how_far, grep, query): - log = get_logger("unified_shell.logs") - log.info("getting logs") log_pm_cmd(obj) - return logs_impl(obj, how_far, grep, query) -# kill @click.command("kill") @session_injector @add_query_options_no_session(at_least_one=True) @kill_decorators def kill(obj, query, width): - log = get_logger("unified_shell.logs") - log.info("getting logs") log_pm_cmd(obj) - return kill_impl(obj, query, width) -# # Flush @click.command("flush") @session_injector @add_query_options_no_session(at_least_one=True) @@ -213,12 +198,6 @@ def restart(obj, query): return restart_impl(obj, query) -# # Wait #also put it in the PM shell - -#! Note: i cant seem to do a start-run from no boot in the unified shell.. might have to check if this is related somehow -#! I also cannot do ps? - - @click.command("start-shell") @click.pass_obj @click.pass_context diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 6ecb99cb3..c2a52e691 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -42,29 +42,19 @@ get_process_manager_configuration, validate_pm_config, ) - -# from drunc.process_manager.interface.commands import ( -# flush, -# kill, -# logs, -# ps, -# restart, -# terminate, -# ) from drunc.process_manager.interface.process_manager import run_pm from drunc.unified_shell.commands import ( - boot, # TODO: double check. I bet you.. + boot, flush, kill, logs, - ps, # TODO: double check + ps, restart, start_shell, terminate, - # FINISH THE REST NOW, IT SOULD BE RELATIVELYT STRAIGHTFORWARD ) -#! Note with boot +#! Note with boot. We should discuss this # When you run boot in the process manager with nothing else running, it works just fine # however when you have a running session already, boot asks 'are you sure you want to do this? # i bet you this is causing the behaviour that we are seeing with the unified shell @@ -298,19 +288,13 @@ def unified_shell( f"[green]process_manager[/green] started, communicating through address [green]" f"{process_manager_address}[/green]" ) - - #! This is where the context objects connection detail - - unified_shell_log.warning(f"{process_manager_address=}") ctx.obj.reset(address_pm=process_manager_address) # Run a simple command (describe) to check the connection with the process manager desc: Description | None = None # unified_shell_log.critical("Getting driver") - #! Here we're getting the driver! try: - unified_shell_log.critical("About to run describe") desc = ctx.obj.get_driver().describe() except Exception as e: unified_shell_log.error( @@ -319,8 +303,6 @@ def unified_shell( ) unified_shell_log.critical(f"Reason: {e}") - #! Basically also parse out the type and log it around here as well - if type(e) == ServerUnreachable: unified_shell_log.error( "[red]This can happen if you have the webproxy enabled at CERN. Ensure " @@ -340,12 +322,8 @@ def unified_shell( sys.exit(1) # unified_shell_log.critical("Process manager described successfully") - #! So now we have a working get_driver object that can communicate with the pm. - - # lets try sending a random command.. - unified_shell_log.info( - f"[green]unified_shell[/green] connected to the [green]process_manager" # this is bad + f"[green]unified_shell[/green] connected to the [green]process_manager" f"[/green] at address [green]{process_manager_address}[/green]" ) diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index d737c312c..5ef5ab09b 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -72,7 +72,7 @@ def __str__(self): class ShellContext: - shell_id = None + shell_id = None # used for logging if its a PM shell or Unified shell etc def get_shell_id(self): return self.shell_id @@ -173,6 +173,20 @@ def print_status_summary(self) -> None: def log_pm_cmd(obj: ShellContext): + """Log a process-manager shell command with only explicitly provided arguments. + + The current Click command context is inspected and only parameters whose source is + ``COMMANDLINE`` are included in the log message. This keeps defaulted values out + of the message while still recording the command name, optional session name, and + shell identity. + + These are sent over via send_msg so that it can be displayed in the process manager + shell + + Args: + obj (ShellContext): Active shell context used to send the log message. + """ + ctx_cmd = click.get_current_context(silent=True) cmd_name = ctx_cmd.command.name if ctx_cmd else None parms_dict = {} From 5595e47ced3012fbe01bda02bad6a144b349beb0 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Thu, 14 May 2026 11:31:40 +0200 Subject: [PATCH 30/31] Remove final log messages --- .../process_manager/process_manager_driver.py | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 4e8c06ffe..eebb2faf3 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -89,13 +89,13 @@ def close(self) -> None: def send_msg(self, msg): request = Request(token=copy_token(self.token)) - # Pack provided message into the Any `data` field when present + if msg is not None: try: gm = GenericNotificationMessage(message=str(msg)) request.data.Pack(gm) except Exception: - self.log.debug("Failed to pack send_msg payload", exc_info=True) + self.log.critical("Failed to pack send_msg payload", exc_info=True) timeout = 10 @@ -106,25 +106,13 @@ def send_msg(self, msg): error_details = extract_grpc_rich_error(e) self.log.error(error_details) except Exception as extraction_error: - self.log.debug( + self.log.critical( f"Could not extract rich error details from gRPC error: {extraction_error}", exc_info=True, ) handle_grpc_error(e) - # Log the message that was sent (fall back to previous text) - try: - self.log.critical( - f"MSG: {msg}" - ) # TODO: Remove this if you dont want this here - except Exception: - self.log.critical("AN EXCEPTION! GASP") - - # Return the PMResponseFlag from the server (if any) - try: - return response - except UnboundLocalError: - return None + return response # ----- Boot workflow ----- From 3a760f52c64392782a62b472363b90dd7d3b6c75 Mon Sep 17 00:00:00 2001 From: Emir Muhammad Date: Thu, 14 May 2026 12:58:42 +0200 Subject: [PATCH 31/31] Fix pytest --- src/drunc/process_manager/interface/commands.py | 4 +++- tests/process_manager/interface/test_commands.py | 7 +++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index a6b9cb684..cbfde6250 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -284,7 +284,9 @@ def logs_impl( obj: ProcessManagerContext, how_far: int, grep: str, query: ProcessQuery ) -> None: log = get_logger("process_manager.shell") - log.info(f"Running logs with query {query}") + # TODO: MOVE BACK TO DEBUG BEFORE MERGE + # THIS IS USEFUL FOR TESTING THOUGH + log.error(f"Running logs with query {query}") log_req = LogRequest( how_far=how_far, query=query, diff --git a/tests/process_manager/interface/test_commands.py b/tests/process_manager/interface/test_commands.py index 3571690e2..6ba1e10ad 100644 --- a/tests/process_manager/interface/test_commands.py +++ b/tests/process_manager/interface/test_commands.py @@ -102,6 +102,10 @@ def logs(self, log_request): mock_result.lines = [] return mock_result + def send_msg(self, msg: str) -> None: + # simulate sending a message; tests don't assert on this, so store it + self._last_sent_msg = msg + class MockContext: """ @@ -115,6 +119,9 @@ def __init__(self, driver=None): def get_driver(self, name): return self.driver + def get_shell_id(self): + return "mock-shell" + def print(self, msg, justify=None, overflow=None, soft_wrap=None): self.output.append(str(msg))