diff --git a/src/drunc/controller/interface/context.py b/src/drunc/controller/interface/context.py index 082c42a00..c114756de 100644 --- a/src/drunc/controller/interface/context.py +++ b/src/drunc/controller/interface/context.py @@ -14,6 +14,8 @@ class ControllerContext(ShellContext): # boilerplatefest + shell_id = "controller_shell" + def __init__(self): self.status_receiver = None self.took_control = False diff --git a/src/drunc/process_manager/interface/cli_argument.py b/src/drunc/process_manager/interface/cli_argument.py index 27e84872f..abefd002c 100644 --- a/src/drunc/process_manager/interface/cli_argument.py +++ b/src/drunc/process_manager/interface/cli_argument.py @@ -7,15 +7,10 @@ def validate_conf_string(ctx, param, boot_configuration): return boot_configuration -def add_query_options(at_least_one: bool, all_processes_by_default: bool = False): - def wrapper(f0): - f1 = click.option( - "-s", - "--session", - type=str, - default=None, - help="Select the processes on a particular session", - )(f0) +def add_query_options_no_session( + at_least_one: bool, all_processes_by_default: bool = False +): + def wrapper(f1): f2 = click.option( "-n", "--name", @@ -41,3 +36,17 @@ def wrapper(f0): return generate_process_query(f4, at_least_one, all_processes_by_default) return wrapper + + +def add_query_options(at_least_one: bool, all_processes_by_default: bool = False): + def wrapper(f0): + f1 = click.option( + "-s", + "--session", + type=str, + default=None, + help="Select the processes on a particular session", + )(f0) + return add_query_options_no_session(at_least_one, all_processes_by_default)(f1) + + return wrapper diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 129f3ea78..cbfde6250 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -1,4 +1,5 @@ import getpass +from time import sleep import click from druncschema.process_manager_pb2 import LogRequest, ProcessQuery @@ -11,7 +12,7 @@ ) from drunc.process_manager.interface.context import ProcessManagerContext from drunc.process_manager.utils import tabulate_process_instance_list -from drunc.utils.shell_utils import InterruptedCommand +from drunc.utils.shell_utils import InterruptedCommand, log_pm_cmd from drunc.utils.utils import get_logger @@ -43,6 +44,7 @@ def boot( override_logs: bool, ) -> None: log = get_logger("process_manager.shell") + log_pm_cmd(obj) processes = obj.get_driver("process_manager").ps(ProcessQuery(user=user)) if len(processes.values) > 0: @@ -129,6 +131,7 @@ def dummy_boot( session_name: str, ) -> None: log = get_logger("process_manager.shell") + log_pm_cmd(obj) log.debug( f"Running dummy_boot with {n_processes} processes for {sleep} seconds {n_sleeps} times, requested by user {user}" ) @@ -150,6 +153,16 @@ def dummy_boot( return +@click.command("wait") +@click.argument("sleep_time", type=int, default=1) +@click.pass_obj +def wait(obj: ProcessManagerContext, sleep_time: int) -> None: + log = get_logger("process_manager.wait") + log.info(f"Command [green]wait[/green] running for {sleep_time} seconds.") + sleep(sleep_time) # seconds + log.info(f"Command [green]wait[/green] ran for {sleep_time} seconds.") + + @click.command("terminate") @click.option( "-w", @@ -161,6 +174,7 @@ def dummy_boot( @click.pass_obj def terminate(obj: ProcessManagerContext, width: int | None) -> None: log = get_logger("process_manager.shell") + log_pm_cmd(obj) log.debug("Terminating") result = obj.get_driver("process_manager").terminate() if not result: @@ -171,23 +185,35 @@ def terminate(obj: ProcessManagerContext, width: int | None) -> None: obj.delete_driver("controller") +def kill_decorators(f): + f = click.pass_obj(f) + f = click.option( + "-w", + "--width", + type=int, + default=None, + help="Table width. Default is automatically calculated", + )(f) + f = click.option( + "--crash", + is_flag=True, + default=False, + help="Simulate a crash: send SIGKILL without any cleanup, leaving the process manager in an unexpected-death state.", + )(f) + return f + + @click.command("kill") -@click.option( - "-w", - "--width", - type=int, - default=None, - help="Table width. Default is automatically calculated", -) @add_query_options(at_least_one=True) -@click.option( - "--crash", - is_flag=True, - default=False, - help="Simulate a crash: send SIGKILL without any cleanup, leaving the process manager in an unexpected-death state.", -) -@click.pass_obj -def kill(obj: ProcessManagerContext, query: ProcessQuery, width: int | None) -> None: +@kill_decorators +def kill(obj, query, width): + log_pm_cmd(obj) + return kill_impl(obj, query, width) + + +def kill_impl( + obj: ProcessManagerContext, query: ProcessQuery, width: int | None +) -> None: log = get_logger("process_manager.shell") log.debug(f"Killing with query {query}") result = obj.get_driver("process_manager").kill(query) @@ -198,17 +224,27 @@ def kill(obj: ProcessManagerContext, query: ProcessQuery, width: int | None) -> ) # rich tables require console printing +def flush_decorators(f): + f = click.pass_obj(f) + f = click.option( + "-w", + "--width", + type=int, + default=None, + help="Table width. Default is automatically calculated", + )(f) + return f + + @click.command("flush") -@click.option( - "-w", - "--width", - type=int, - default=None, - help="Table width. Default is automatically calculated", -) @add_query_options(at_least_one=False, all_processes_by_default=True) -@click.pass_obj -def flush( +@flush_decorators +def flush(obj, query, width): + log_pm_cmd(obj) + return flush_impl(obj, query, width) + + +def flush_impl( obj: ProcessManagerContext, query: ProcessQuery, width: int | None, @@ -223,22 +259,34 @@ def flush( ) # rich tables require console printing +def logs_decorators(f): + f = click.pass_obj(f) + f = click.option("--grep", type=str, default=None)(f) + f = click.option( + "--how-far", + type=int, + show_default=True, + default=100, + help="How many lines one wants", + )(f) + return f + + @click.command("logs") @add_query_options(at_least_one=True) -@click.option( - "--how-far", - type=int, - show_default=True, - default=100, - help="How many lines one wants", -) -@click.option("--grep", type=str, default=None) -@click.pass_obj -def logs( +@logs_decorators +def logs(obj, how_far, grep, query): + log_pm_cmd(obj) + return logs_impl(obj, how_far, grep, query) + + +def logs_impl( obj: ProcessManagerContext, how_far: int, grep: str, query: ProcessQuery ) -> None: log = get_logger("process_manager.shell") - log.debug(f"Running logs with query {query}") + # TODO: MOVE BACK TO DEBUG BEFORE MERGE + # THIS IS USEFUL FOR TESTING THOUGH + log.error(f"Running logs with query {query}") log_req = LogRequest( how_far=how_far, query=query, @@ -276,6 +324,11 @@ def logs( @add_query_options(at_least_one=True) @click.pass_obj def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None: + log_pm_cmd(obj) + return restart_impl(obj, query) + + +def restart_impl(obj: ProcessManagerContext, query: ProcessQuery) -> None: log = get_logger("process_manager.shell") log.debug(f"Restarting with query {query}") obj.get_driver("process_manager").restart(query) @@ -306,6 +359,7 @@ def ps( width: int | None, ) -> None: log = get_logger("process_manager.shell") + log_pm_cmd(obj) log.debug(f"Running ps with query {query}") results = obj.get_driver("process_manager").ps(query) if not results: diff --git a/src/drunc/process_manager/interface/context.py b/src/drunc/process_manager/interface/context.py index a81396130..a00400c51 100644 --- a/src/drunc/process_manager/interface/context.py +++ b/src/drunc/process_manager/interface/context.py @@ -14,6 +14,8 @@ class ProcessManagerContext(ShellContext): # boilerplatefest + shell_id = "process_manager_shell" + def __init__(self, *args, **kwargs): self.status_receiver = None super(ProcessManagerContext, self).__init__(*args, **kwargs) diff --git a/src/drunc/process_manager/interface/shell.py b/src/drunc/process_manager/interface/shell.py index 5da7434c5..a10350569 100644 --- a/src/drunc/process_manager/interface/shell.py +++ b/src/drunc/process_manager/interface/shell.py @@ -14,6 +14,7 @@ ps, restart, terminate, + wait, ) from drunc.utils.grpc_utils import ServerUnreachable from drunc.utils.utils import ( @@ -59,6 +60,10 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> # process_manager_shell_log.error(e.message) # TODO: Keep this for production branch, remove this from dev branch exit(1) + ctx.obj.get_driver("process_manager").send_msg( + f"{getpass.getuser()} connected from {ctx.obj.shell_id}" + ) + # Manually add file handler to process manager log # Not possible to initialise logger immediately as it requires # knowledge of the log path @@ -75,6 +80,9 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) -> ctx.obj.start_listening(desc.broadcast) def cleanup(): + ctx.obj.get_driver("process_manager").send_msg( + f"{getpass.getuser()} disconnected from {ctx.obj.shell_id}" + ) ctx.obj.terminate() process_manager_log.warning( f"[green]{getpass.getuser()}[/green] disconnected from the process manager through a [green]drunc-process-manager-shell[/green]" @@ -83,6 +91,7 @@ def cleanup(): ctx.call_on_close(cleanup) ctx.command.add_command(boot, "boot") + ctx.command.add_command(wait, "wait") ctx.command.add_command(terminate, "terminate") ctx.command.add_command(kill, "kill") ctx.command.add_command(flush, "flush") diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 165f760b6..00db084ee 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -13,6 +13,7 @@ # Local Application Imports from druncschema.broadcast_pb2 import BroadcastType +from druncschema.generic_pb2 import OutcomeFlag, OutcomeStatus from druncschema.process_manager_pb2 import ( BootRequest, LogLines, @@ -39,6 +40,7 @@ from drunc.process_manager.configuration import ( PROCESS_SHUTDOWN_ORDERING, ProcessManagerConfHandler, + ProcessManagerTypes, ) from drunc.process_manager.process_manager import ProcessManager from drunc.process_manager.utils import on_parent_exit, validate_k8s_session_name @@ -152,6 +154,8 @@ def run(self) -> None: class K8sProcessManager(ProcessManager): + pm_type = ProcessManagerTypes.K8s + def __init__(self, configuration: ProcessManagerConfHandler, **kwargs) -> None: """ Manages processes as Kubernetes Pods. @@ -1910,6 +1914,17 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: lines=[f"Could not retrieve logs: {e.reason}"], ) + def _send_msg_impl(self, msg: str, peer: str) -> OutcomeStatus: + # Note: currently exact same implementation as ssh manager + # Although there is room here to change as necessary + try: + self.log.info(f"{msg}; from {peer}") + except Exception as e: + self.log.critical(f"Failed to receive message with exception {e}") + return OutcomeStatus(flag=OutcomeFlag.FAIL) + + return OutcomeStatus(flag=OutcomeFlag.SUCCESS) + def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: """ Handles the 'boot' command from the gRPC interface. @@ -2516,7 +2531,9 @@ def kill_and_wait(uuids, grace_period=None) -> None: pods_by_role[role].append(uuid) if role == "segment-controller": tree_id = pod.metadata.labels.get(tree_id_label_key, "") - segment_controller_depths[uuid] = tree_id.count(".") if tree_id else 0 + segment_controller_depths[uuid] = ( + tree_id.count(".") if tree_id else 0 + ) # Kill in stages using our sorted lists for role in PROCESS_SHUTDOWN_ORDERING: diff --git a/src/drunc/process_manager/oks_parser.py b/src/drunc/process_manager/oks_parser.py index f679e7f1a..5feb6ba50 100644 --- a/src/drunc/process_manager/oks_parser.py +++ b/src/drunc/process_manager/oks_parser.py @@ -5,7 +5,7 @@ from drunc.exceptions import DruncException, DruncSetupException from drunc.process_manager.configuration import get_commandline_parameters -from drunc.utils.utils import get_logger +from drunc.utils.utils import file_is_read_only, get_logger if TYPE_CHECKING: import conffwk @@ -75,8 +75,13 @@ def get_full_db_path(db_path: str) -> str: err_str = f"No files found in DUNEDAQ_DB_PATH matching {db_path}." raise DruncSetupException(err_str) - # If multiple matches are found, take the first instance that matches. + # Prefer the first writable match; if every match is read-only, fall back to the first one. resolved_path = unique_matched_files[0] + for matched_file in unique_matched_files: + if not file_is_read_only(matched_file): + resolved_path = matched_file + break + log.debug(f"Path {db_path} resolved to {resolved_path}") return resolved_path diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index 8fc32e1d8..ee7d6d712 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -8,9 +8,11 @@ from druncschema.authoriser_pb2 import ActionType, SystemType from druncschema.broadcast_pb2 import BroadcastType from druncschema.description_pb2 import CommandDescription, Description +from druncschema.generic_pb2 import OutcomeStatus from druncschema.opmon.process_manager_pb2 import ProcessStatus from druncschema.process_manager_pb2 import ( BootRequest, + GenericNotificationMessage, LogLines, LogRequest, ProcessInstance, @@ -40,7 +42,7 @@ ProcessManagerTypes, ) from drunc.utils.configuration import ConfTypes -from drunc.utils.utils import get_logger, pid_info_str +from drunc.utils.utils import get_logger, pid_info_str, resolve_context_peer class BadQuery(DruncCommandException): @@ -49,6 +51,8 @@ def __init__(self, txt): class ProcessManager(abc.ABC, ProcessManagerServicer): + pm_type = ProcessManagerTypes.Unknown # Used for describe (and possibly others) + def __init__( self, configuration: ProcessManagerConfHandler, @@ -499,7 +503,7 @@ def describe(self, request: Request, context: ServicerContext) -> Description: self.log.debug(f"{self.name} running describe") response = Description( - type="process_manager", + type=self.pm_type.name, name=self.name, info=self.get_log_path(), session="no_session" if not self.session else self.session, @@ -560,6 +564,60 @@ def logs(self, request: LogRequest, context: ServicerContext) -> LogLines: return response + @abc.abstractmethod + def _send_msg_impl( + self, msg: str | None = None, peer: str | None = None + ) -> OutcomeStatus: + raise NotImplementedError + + @broadcasted + @authentified_and_authorised( + action=ActionType.READ, system=SystemType.PROCESS_MANAGER + ) + def send_msg(self, request: Request, context: ServicerContext) -> OutcomeStatus: + self.log.debug(f"{self.name} running send_msg") + + try: + peer = context.peer() + peer_display = resolve_context_peer(peer) + except Exception: + self.log.warning("Could not determine caller peer", exc_info=True) + peer_display = "unknown" + + # Try to extract an optional GenericNotificationMessage from request.data + try: + if ( + request is not None + and hasattr(request, "data") + and request.data is not None + ): + gm = GenericNotificationMessage() + request.data.Unpack(gm) + msg_value = gm.message + except Exception as e: + self.log.debug( + f"Error while extracting send_msg payload: {e}", exc_info=True + ) + msg_value = "unknown payload" + + try: + response = self._send_msg_impl(msg_value, peer_display) + except NotImplementedError: + raise DruncNotImplementedException( + message="Implementation missing", + domain="ProcessManager.send_msg", + ) + except Exception as e: + context_msg = f"Unhandled exception in ProcessManager.send_msg: {e}" + self.log.exception(context_msg) + + raise DruncCommandException( + message=context_msg, + domain="ProcessManager.send_msg", + ) + + return response + def _ensure_one_process( self, uuids: list[str], in_boot_request: bool = False ) -> str: @@ -620,25 +678,27 @@ def _match_processes_against_query( # Filter processes based on query criteria processes = [] for uuid in available_uuids: - accepted = False + accepted = True meta = boot_request_dict[uuid].process_description.metadata # Check UUID match - if uuid in uuid_selector: - accepted = True + if uuid_selector and uuid not in uuid_selector: + accepted = False # Check name pattern match (regex) - for name_reg in name_selector: - if re.search(name_reg, meta.name): - accepted = True + + if name_selector and not any( + re.search(reg, meta.name) for reg in name_selector + ): + accepted = False # Check session match - if session_selector == meta.session: - accepted = True + if session_selector and session_selector != meta.session: + accepted = False # Check user match - if user_selector == meta.user: - accepted = True + if user_selector and user_selector != meta.user: + accepted = False if accepted: processes.append(uuid) diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index cd7169f99..eebb2faf3 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -16,6 +16,7 @@ from druncschema.description_pb2 import Description from druncschema.process_manager_pb2 import ( BootRequest, + GenericNotificationMessage, LogLines, LogRequest, ProcessDescription, @@ -86,7 +87,35 @@ def close(self) -> None: except Exception as e: self.log.error(f"Error closing gRPC channel: {e}", exc_info=True) + def send_msg(self, msg): + request = Request(token=copy_token(self.token)) + + if msg is not None: + try: + gm = GenericNotificationMessage(message=str(msg)) + request.data.Pack(gm) + except Exception: + self.log.critical("Failed to pack send_msg payload", exc_info=True) + + timeout = 10 + + try: + response = self.stub.send_msg(request, timeout=timeout) + except grpc.RpcError as e: + try: + error_details = extract_grpc_rich_error(e) + self.log.error(error_details) + except Exception as extraction_error: + self.log.critical( + f"Could not extract rich error details from gRPC error: {extraction_error}", + exc_info=True, + ) + handle_grpc_error(e) + + return response + # ----- Boot workflow ----- + def boot( self, conf_file: str, @@ -103,6 +132,9 @@ def boot( ) -> Iterator[ProcessInstanceList] | None: self.log.info(f"Booting session [green]{session_name}[/green]") + # Assume oksconflibs if no framework is defined + conf_file = f"oksconflibs:{conf_file}" if ":" not in conf_file else conf_file + # Step 1 - consolidate configuration self._consolidate_config(session_name, conf_file) diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index e80d421b7..a151f5172 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -4,6 +4,7 @@ from typing import List, Optional from druncschema.broadcast_pb2 import BroadcastType +from druncschema.generic_pb2 import OutcomeFlag, OutcomeStatus from druncschema.process_manager_pb2 import ( BootRequest, LogLines, @@ -18,11 +19,14 @@ from druncschema.request_response_pb2 import ResponseFlag from drunc.exceptions import DruncCommandException +from drunc.process_manager.configuration import ProcessManagerTypes from drunc.process_manager.process_manager import ProcessManager from drunc.processes.ssh_process_lifetime_manager import ProcessLifetimeManager class SSHProcessManager(ProcessManager): + pm_type = ProcessManagerTypes.SSH_SHELL + def __init__( self, configuration, LifetimeManagerClass: ProcessLifetimeManager, **kwargs ): @@ -345,6 +349,9 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: # Update hostname in boot request for this attempt self.boot_request[uuid].process_description.metadata.hostname = host + # self.log.critical( + # f"Attempting to start process {uuid} on host {host} via SSH lifetime manager" + # ) # Start the process via SSH manager self.ssh_lifetime_manager.start_process( uuid=uuid, boot_request=self.boot_request[uuid] @@ -417,6 +424,10 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: boot_request_dict=self.boot_request, order_by="random", ) + # if query.session: + # self.log.warning( + # f"{self.name} found {len(process_uuids)} processes matching {query} for ps" + # ) # Iterate through all processes matching the query for proc_uuid in process_uuids: @@ -464,13 +475,35 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: else: pi.remote_pid = remote_pid_result.reason ret += [pi] - - return ProcessInstanceList( + ret_fmt = ProcessInstanceList( name=self.name, token=None, values=ret, flag=ResponseFlag.EXECUTED_SUCCESSFULLY, ) + # if query.session: + # self.log.critical( + # f"{self.name} returning {len(ret)} processes from ps query {query}" + # ) + # self.log.critical(ret_fmt) + # self.log.critical(f"TEST: {ret_fmt=}") + # else: + # self.log.warning( + # f"{self.name} returning {len(ret)} processes from ps query {query}" + # ) + # self.log.warning(ret) + return ret_fmt + + def _send_msg_impl(self, msg: str, peer: str) -> OutcomeStatus: + try: + # TODO: THIS IS CURRENTLY CRITICAL FOR EASIER TESTING + # DO _NOT_ MERGE UNTIL THIS IS BACK TO INFO! + self.log.critical(f"{msg}; from {peer}") + except Exception as e: + self.log.critical(f"Failed to receive message with exception {e}") + return OutcomeStatus(flag=OutcomeFlag.FAIL) + + return OutcomeStatus(flag=OutcomeFlag.SUCCESS) def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: self.log.debug(f"{self.name} running boot command") diff --git a/src/drunc/process_manager/utils.py b/src/drunc/process_manager/utils.py index 095d70df1..bf4979895 100644 --- a/src/drunc/process_manager/utils.py +++ b/src/drunc/process_manager/utils.py @@ -303,6 +303,9 @@ def validate_k8s_session_name(session: str) -> bool: return True +#! Note for future developers +# This can probably be removed since we've added the +# pm_type attribute in each of the process managers def get_pm_type_from_name(pm_name: str) -> ProcessManagerTypes: """ Get the ProcessManagerTypes enum value from a string name. diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index f9d28e506..aacbc5ed9 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -232,7 +232,7 @@ def __init__( """ self.disable_host_key_check = disable_host_key_check self.disable_localhost_host_key_check = disable_localhost_host_key_check - self.log = logger if logger else get_logger(__name__) + self.log = get_logger("PM_LMS_TEST", rich_handler=True) self.on_process_exit = on_process_exit # Create SSH command wrapper @@ -313,6 +313,9 @@ def start_process(self, uuid: str, boot_request: BootRequest) -> None: hostname = boot_request.process_description.metadata.hostname user = boot_request.process_description.metadata.user log_file = boot_request.process_description.process_logs_path + # self.log.critical( + # f"Starting process {uuid} on {hostname} as {user} with log file {log_file}" + # ) # Extract environment variables from boot request env_vars = ( @@ -326,13 +329,18 @@ def start_process(self, uuid: str, boot_request: BootRequest) -> None: for exe_arg in boot_request.process_description.executable_and_arguments: cmd += exe_arg.exec for arg in exe_arg.args: - cmd += f" {arg}" + if arg.endswith("daq_app_rte.sh"): + cmd += f" {os.getenv('DBT_AREA_ROOT')}/install/daq_app_rte.sh" + else: + cmd += f" {arg}" cmd += ";" # Remove trailing semicolon if present if cmd.endswith(";"): cmd = cmd[:-1] + # self.log.critical(f"Built command for {uuid}: {cmd}: {boot_request}") + # Execute the command via SSH self._execute_bootrequest_via_ssh( uuid=uuid, @@ -705,24 +713,35 @@ def _build_ssh_arguments(self, hostname: str, user_host: str) -> List[str]: Returns: List of SSH command arguments """ - disable_host_key_check = self.disable_host_key_check or ( - self.disable_localhost_host_key_check - and hostname in ("localhost", "127.0.0.1", "::1") + # disable_host_key_check = self.disable_host_key_check or ( + # self.disable_localhost_host_key_check + # and hostname in ("localhost", "127.0.0.1", "::1") + # ) + superuser_host = getpass.getuser() + "@" + user_host.split("@")[1] + # self.log.critical(f"Building SSH arguments for {user_host} with superuser host {superuser_host}") + arguments = [superuser_host, "-tt", "-o", "StrictHostKeyChecking=no"] + # self.log.critical(f"SSH arguments after adding StrictHostKeyChecking for {user_host}%s", arguments) + # self.log.critical(f"{arguments=}") + # self.log.critical(f"Test list: {test_list_print}") + # self.log.critical(f"Test list 2: %s", test_list_print) + + # "-F /nfs/home/{user_host.split('@')[0]}/.ssh/config", + + # if disable_host_key_check: + arguments.extend( + [ + "-o", + "LogLevel=info", + "-o", + "GlobalKnownHostsFile=/dev/null", + "-o", + "UserKnownHostsFile=/dev/null", + ] ) - - arguments = [user_host, "-tt", "-o", "StrictHostKeyChecking=no"] - - if disable_host_key_check: - arguments.extend( - [ - "-o", - "LogLevel=error", - "-o", - "GlobalKnownHostsFile=/dev/null", - "-o", - "UserKnownHostsFile=/dev/null", - ] - ) + # self.log.critical(f"SSH arguments for {user_host}: {arguments}") + # self.log.critical( + # f"PP: {getpass.getuser()} is running on {os.uname().nodename} with disable_host_key_check={self.disable_host_key_check} and disable_localhost_host_key_check={self.disable_localhost_host_key_check}" + # ) return arguments @@ -840,8 +859,15 @@ def read_process_metadata( arguments.append(remote_command) # Execute SSH command to wait for and read file (single round-trip) + # self.log.critical( + # f"Attempting to read metadata for {uuid} from {hostname} with timeout {timeout}s" + # ) result = self.ssh(*arguments) + # self.log.critical( + # f"DEBUG - Raw metadata content for {uuid} from {hostname}: {result}" + # ) json_content = str(result).strip() + # self.log.critical("Attempt successful?|???") # Parse JSON content and instantiate metadata object metadata = ProcessMetadata.from_json(json_content) @@ -881,6 +907,9 @@ def _execute_bootrequest_via_ssh( try: platform = os.uname().sysname.lower() is_macos = "darwin" in platform + hostname_for_gssapi = hostname + if hostname_for_gssapi == "localhost": + hostname_for_gssapi = os.uname().nodename user_host = f"{user}@{hostname}" # Build remote command with metadata file writing @@ -896,6 +925,9 @@ def _execute_bootrequest_via_ssh( remote_cmd += f"cd {boot_request.process_description.process_execution_directory} ; " metadata_file = SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid) + # self.log.critical( + # f"Metadata file for {uuid} will be written to {metadata_file} on remote host" + # ) tree_id = boot_request.process_description.metadata.tree_id name = boot_request.process_description.metadata.name role = ProcessMetadata.compute_role_from_tree_id(tree_id) diff --git a/src/drunc/unified_shell/commands.py b/src/drunc/unified_shell/commands.py index c5dfefc24..32fd03fdd 100644 --- a/src/drunc/unified_shell/commands.py +++ b/src/drunc/unified_shell/commands.py @@ -1,14 +1,26 @@ import getpass import sys +from functools import update_wrapper import click from druncschema.process_manager_pb2 import ProcessQuery from drunc.controller.interface.shell_utils import controller_setup from drunc.exceptions import DruncSetupException +from drunc.process_manager.interface.cli_argument import add_query_options_no_session +from drunc.process_manager.interface.commands import ( + flush_decorators, + flush_impl, + kill_decorators, + kill_impl, + logs_decorators, + logs_impl, + restart_impl, +) from drunc.process_manager.interface.context import ProcessManagerContext +from drunc.process_manager.utils import tabulate_process_instance_list from drunc.unified_shell.context import UnifiedShellMode -from drunc.utils.shell_utils import InterruptedCommand +from drunc.utils.shell_utils import InterruptedCommand, log_pm_cmd from drunc.utils.utils import get_logger @@ -32,6 +44,7 @@ def boot( sleep_between_app_boot: int | float = 0, ) -> None: log = get_logger("unified_shell.boot") + log_pm_cmd(obj) session_name = obj.session_name user = getpass.getuser() processes = obj.get_driver("process_manager").ps( @@ -42,12 +55,12 @@ def boot( override_logs_boot = obj.override_logs else: override_logs_boot = override_logs - if len(processes.values) > 0: - log.error( - f"Cannot boot: session {session_name} already has {len(processes.values)} processes running. " - "Please terminate the existing session first." - ) - return + # if len(processes.values) > 0: + # log.error( + # f"Cannot boot: session {session_name} already has {len(processes.values)} processes running. " + # "Please terminate the existing session first." + # ) + # return try: results = obj.get_driver("process_manager").boot( @@ -101,6 +114,90 @@ def boot( sys.exit(1) +@click.command("terminate") +@click.pass_obj +@click.pass_context +def terminate(ctx, obj): + """ + Execute the process manager terminate command, but only do this for the current + session + """ + + log = get_logger("unified_shell.terminate") + log_pm_cmd(obj) + session_query = ProcessQuery(session=ctx.obj.session_name) + log.info(f"Terminating session [green]{ctx.obj.session_name}[/]") + obj.get_driver("process_manager").kill(session_query) + + +@click.command("ps") +@click.pass_obj +@click.pass_context +def ps(ctx, obj): + """ + Execute the process manager terminate command, but only do this for the current + session + """ + + log = get_logger("unified_shell.ps") + log_pm_cmd(obj) + session_query = ProcessQuery(session=ctx.obj.session_name) + log.info(f"Listing session [green]{ctx.obj.session_name}[/]") + results = obj.get_driver("process_manager").ps(session_query) + obj.print( + tabulate_process_instance_list( + results, title=f"Processes running in session {ctx.obj.session_name}" + ), + overflow="fold", + soft_wrap=True, + ) + + +def session_injector(f): + @click.pass_context + def wrapper(ctx, *args, **kwargs): + kwargs["session"] = ctx.obj.session_name + return ctx.invoke(f, *args, **kwargs) + + return update_wrapper(wrapper, f) + + +@click.command("logs") +@session_injector +@add_query_options_no_session(at_least_one=True) +@logs_decorators +def logs(obj, how_far, grep, query): + log_pm_cmd(obj) + return logs_impl(obj, how_far, grep, query) + + +@click.command("kill") +@session_injector +@add_query_options_no_session(at_least_one=True) +@kill_decorators +def kill(obj, query, width): + log_pm_cmd(obj) + return kill_impl(obj, query, width) + + +@click.command("flush") +@session_injector +@add_query_options_no_session(at_least_one=True) +@flush_decorators +def flush(obj, query, width): + log_pm_cmd(obj) + return flush_impl(obj, query, width) + + +@click.command("restart") +@session_injector +@add_query_options_no_session(at_least_one=True) +@click.pass_obj +def restart(obj, query): + log_pm_cmd(obj) + return restart_impl(obj, query) + + @click.command("start-shell") @click.pass_obj @click.pass_context @@ -112,6 +209,7 @@ def start_shell(ctx, obj): allowing you to execute commands interactively. """ log = get_logger("unified_shell.start_shell") + log_pm_cmd(obj) obj.running_mode = UnifiedShellMode.SEMIBATCH log.info("Switching to interactive mode...") diff --git a/src/drunc/unified_shell/context.py b/src/drunc/unified_shell/context.py index 318983739..9997326d0 100644 --- a/src/drunc/unified_shell/context.py +++ b/src/drunc/unified_shell/context.py @@ -4,6 +4,7 @@ from druncschema.token_pb2 import Token from drunc.utils.shell_utils import ShellContext +from drunc.utils.utils import resolve_localhost_to_hostname class UnifiedShellMode(Enum): @@ -13,6 +14,8 @@ class UnifiedShellMode(Enum): class UnifiedShellContext(ShellContext): # boilerplatefest + shell_id = "unified_shell" + def __init__(self): self.status_receiver_pm = None self.status_receiver_controller = None @@ -28,7 +31,7 @@ def __init__(self): super(UnifiedShellContext, self).__init__() def reset(self, address_pm: str = ""): - self.address_pm = address_pm + self.address_pm = resolve_localhost_to_hostname(address_pm) super(UnifiedShellContext, self)._reset(name="unified_shell") def create_drivers(self, **kwargs) -> Mapping[str, object]: diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index a3dfd1678..c2a52e691 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -39,21 +39,29 @@ from drunc.fsm.configuration import FSMConfHandler from drunc.fsm.utils import convert_fsm_transition from drunc.process_manager.configuration import ( - ProcessManagerTypes, get_process_manager_configuration, validate_pm_config, ) -from drunc.process_manager.interface.commands import ( +from drunc.process_manager.interface.process_manager import run_pm +from drunc.unified_shell.commands import ( + boot, flush, kill, logs, ps, restart, + start_shell, terminate, ) -from drunc.process_manager.interface.process_manager import run_pm -from drunc.process_manager.utils import get_pm_type_from_name, validate_k8s_session_name -from drunc.unified_shell.commands import boot, start_shell + +#! Note with boot. We should discuss this +# When you run boot in the process manager with nothing else running, it works just fine +# however when you have a running session already, boot asks 'are you sure you want to do this? +# i bet you this is causing the behaviour that we are seeing with the unified shell +# when you log into an empty PM with the US, and you start-run, it works just fine +# but when you start-run from scratch with a running session in the PM, it doesn't work +# Theres also a whole thing about things not flushing correctly.. +# Aha! in the pm, my terminate doesn't flush properly! from drunc.unified_shell.context import UnifiedShellMode from drunc.unified_shell.shell_utils import generate_fsm_sequence_command from drunc.utils.configuration import ConfTypes, OKSKey @@ -152,24 +160,37 @@ def unified_shell( unified_shell_log.debug("Setting up the [green]unified_shell[/green] logger") # Parse the process manager argument to determine if it's a config or an address + # unified_shell_log.critical( + # f"Parsing the process manager argument: {process_manager}" + # ) process_manager_url: ParseResult = urlparse(process_manager) internal_pm: bool = True if process_manager_url.scheme == "grpc": # i.e. if it's an address internal_pm = False + unified_shell_log.critical( + f"Process manager argument parsed, internal_pm set to {internal_pm}" + ) - # If using a k8s process manager, validate the session name before proceeding - if get_pm_type_from_name( - process_manager - ) == ProcessManagerTypes.K8s and not validate_k8s_session_name(session_name): - unified_shell_log.error( - f"[red]Invalid session/namespace name [bold]({session_name})[/bold][/red]. " - "Must match RFC1123 label: lowercase alphanumeric or '-', start/end with " - "alphanumeric, max 63 chars." + # # If using a k8s process manager, validate the session name before proceeding + # if get_pm_type_from_name( + # process_manager + # ) == ProcessManagerTypes.K8s and not validate_k8s_session_name(session_name): + # unified_shell_log.error( + # f"[red]Invalid session/namespace name [bold]({session_name})[/bold][/red]. " + # "Must match RFC1123 label: lowercase alphanumeric or '-', start/end with " + # "alphanumeric, max 63 chars." + # ) + # sys.exit(1) + + # unified_shell_log.critical("TEST") + # Setup configuration related context variables + # Assume oksconflibs if no framework is defined + ctx.obj.configuration_file = ( + lambda path: ( + path if path.startswith("oksconflibs:") else f"oksconflibs:{path}" ) - sys.exit(1) + )(configuration_file) - # Setup configuration related context variables - ctx.obj.configuration_file = f"oksconflibs:{configuration_file}" ctx.obj.configuration_id = configuration_id ctx.obj.session_name = session_name @@ -256,13 +277,12 @@ def unified_shell( ) else: # Connect to an existing process manager at the provided address + unified_shell_log.critical( + "Connecting to an existing process manager at the provided address" + ) process_manager_address = process_manager.replace( "grpc://", "" ) # remove the grpc scheme - unified_shell_log.info( - f"[green]unified_shell[/green] connected to the [green]process_manager" - f"[/green] at address [green]{process_manager_address}[/green]" - ) unified_shell_log.debug( f"[green]process_manager[/green] started, communicating through address [green]" @@ -272,6 +292,8 @@ def unified_shell( # Run a simple command (describe) to check the connection with the process manager desc: Description | None = None + # unified_shell_log.critical("Getting driver") + try: desc = ctx.obj.get_driver().describe() except Exception as e: @@ -279,7 +301,7 @@ def unified_shell( f"[red]Could not connect to the process manager at the address: [/red]" f"[green]{process_manager_address}[/green]" ) - unified_shell_log.debug(f"Reason: {e}") + unified_shell_log.critical(f"Reason: {e}") if type(e) == ServerUnreachable: unified_shell_log.error( @@ -298,6 +320,16 @@ def unified_shell( ctx.obj.pm_process.join() sys.exit(1) + # unified_shell_log.critical("Process manager described successfully") + + unified_shell_log.info( + f"[green]unified_shell[/green] connected to the [green]process_manager" + f"[/green] at address [green]{process_manager_address}[/green]" + ) + + ctx.obj.get_driver("process_manager").send_msg( + f"{getpass.getuser()} connected from unified shell" + ) # Broadcasting configuration if requested if desc.HasField("broadcast"): @@ -308,18 +340,20 @@ def unified_shell( # Add the unified shell Click commands to the CLI unified_shell_log.debug("Adding [green]unified_shell[/green] commands") - ctx.command.add_command(boot, "boot") - ctx.obj.dynamic_commands.add("boot") + unified_shell_commands = [boot, ps, terminate] + for cmd in unified_shell_commands: + ctx.command.add_command(cmd, format_name_for_cli(cmd.name)) + ctx.obj.dynamic_commands.add(format_name_for_cli(cmd.name)) # Add the process manager Click commands to the CLI unified_shell_log.debug("Adding [green]process_manager[/green] commands") process_manager_commands: list[click.Command] = [ kill, - terminate, + # terminate, flush, logs, restart, - ps, + # ps, ] for cmd in process_manager_commands: ctx.command.add_command(cmd, format_name_for_cli(cmd.name)) @@ -457,7 +491,8 @@ def cleanup(): # Terminate any residual processes if ctx.obj.get_driver("process_manager"): - ctx.obj.get_driver("process_manager").terminate() + session_processes = ProcessQuery(session=ctx.obj.session_name) + ctx.obj.get_driver("process_manager").kill(session_processes) # Check if any processes are still running if ( @@ -498,6 +533,9 @@ def cleanup(): ) # Remove the connection to the process manager + ctx.obj.get_driver("process_manager").send_msg( + f"{getpass.getuser()} disconnected from unified shell" + ) ctx.obj.get_driver("process_manager").close() ctx.obj.delete_driver("process_manager") diff --git a/src/drunc/utils/shell_utils.py b/src/drunc/utils/shell_utils.py index d1443968a..5ef5ab09b 100644 --- a/src/drunc/utils/shell_utils.py +++ b/src/drunc/utils/shell_utils.py @@ -72,6 +72,11 @@ def __str__(self): class ShellContext: + shell_id = None # used for logging if its a PM shell or Unified shell etc + + def get_shell_id(self): + return self.shell_id + def _reset(self, name: str, token_args: dict = {}, driver_args: dict = {}): self._console = Console() self._token = self.create_token(**token_args) @@ -165,3 +170,32 @@ def print_status_summary(self) -> None: log.info( f"Current FSM status is [green]{current_state}[/green]. Available transitions are [green]{'[/green], [green]'.join(available_actions)}[/green]. Available sequence commands are [green]{'[/green], [green]'.join(available_sequences)}[/green]." ) + + +def log_pm_cmd(obj: ShellContext): + """Log a process-manager shell command with only explicitly provided arguments. + + The current Click command context is inspected and only parameters whose source is + ``COMMANDLINE`` are included in the log message. This keeps defaulted values out + of the message while still recording the command name, optional session name, and + shell identity. + + These are sent over via send_msg so that it can be displayed in the process manager + shell + + Args: + obj (ShellContext): Active shell context used to send the log message. + """ + + ctx_cmd = click.get_current_context(silent=True) + cmd_name = ctx_cmd.command.name if ctx_cmd else None + parms_dict = {} + for param in ctx_cmd.command.params: + name = param.name + if ctx_cmd.get_parameter_source(name) == click.core.ParameterSource.COMMANDLINE: + parms_dict[name] = f"{ctx_cmd.params[name]!r}" + + args = f" with arguments {parms_dict}" if parms_dict else "" + session = f" for session {obj.session_name}" if hasattr(obj, "session_name") else "" + msg = f"{getpass.getuser()} sent {cmd_name}{args}{session} via {obj.get_shell_id()}" + obj.get_driver("process_manager").send_msg(msg) diff --git a/src/drunc/utils/utils.py b/src/drunc/utils/utils.py index 08d085670..5ef464639 100644 --- a/src/drunc/utils/utils.py +++ b/src/drunc/utils/utils.py @@ -1,4 +1,5 @@ import ctypes +import ipaddress import logging import os import random @@ -463,6 +464,86 @@ def resolve_target_ip(host: str) -> str | None: return None +def resolve_context_peer(peer: str) -> str: + """Resolve a transport-qualified peer string to a display-friendly address. + + The input is expected to look like ``transport:address``. If the address contains + an IP literal, it is reverse-resolved where possible and IPv6 addresses are + re-wrapped in brackets. + + Example: + ``ipv4:10.73.136.70:41750`` -> ``np04-srv-028.cern.ch:41750`` + + Args: + peer (str): Transport-qualified peer string. + + Returns: + str: The original peer string, or a resolved ``host:port`` representation. + """ + + match = re.match(r"^(?P[^:]+):(?P
.+)$", peer) + if not match: + return peer + + parsed = _parse_host_port(match.group("address")) + if parsed is None: + return peer + + host, port = parsed + resolved_host = _resolve_host(host) + return f"{resolved_host}:{port}" + + +def _parse_host_port(address: str) -> tuple[str, str] | None: + """Extract a host and port from a peer address string. + + Supports bracketed IPv6 addresses such as ``[::1]:1234`` and unbracketed + ``host:port`` or ``ipv4:port`` forms. + + Args: + address (str): Address portion of a transport-qualified peer string. + + Returns: + tuple[str, str] | None: ``(host, port)`` when parsing succeeds, otherwise + ``None``. + """ + + bracket_match = re.match(r"^\[(?P[^\]]+)\]:(?P\d+)$", address) + if bracket_match: + return bracket_match.group("host"), bracket_match.group("port") + + host, sep, port = address.rpartition(":") + if sep and port.isdigit(): + return host, port + + return None + + +def _resolve_host(host: str) -> str: + """Reverse-resolve an IP host and keep IPv6 output bracketed. + + Args: + host (str): Hostname or IP literal to resolve. + + Returns: + str: The resolved hostname, or the original host if resolution fails. + """ + + try: + ip_obj = ipaddress.ip_address(host) + except ValueError: + return host + + try: + resolved_host, _, _ = socket.gethostbyaddr(str(ip_obj)) + except (socket.herror, socket.gaierror, socket.timeout, OSError): + resolved_host = host + + if ":" in resolved_host and not resolved_host.startswith("["): + return f"[{resolved_host}]" + return resolved_host + + def is_port_available(host: str, port: int, timeout: int = 2) -> bool: """ Check if the given port number on a specified host is available. diff --git a/tests/process_manager/interface/test_commands.py b/tests/process_manager/interface/test_commands.py index 3571690e2..6ba1e10ad 100644 --- a/tests/process_manager/interface/test_commands.py +++ b/tests/process_manager/interface/test_commands.py @@ -102,6 +102,10 @@ def logs(self, log_request): mock_result.lines = [] return mock_result + def send_msg(self, msg: str) -> None: + # simulate sending a message; tests don't assert on this, so store it + self._last_sent_msg = msg + class MockContext: """ @@ -115,6 +119,9 @@ def __init__(self, driver=None): def get_driver(self, name): return self.driver + def get_shell_id(self): + return "mock-shell" + def print(self, msg, justify=None, overflow=None, soft_wrap=None): self.output.append(str(msg)) diff --git a/tests/process_manager/process_manager_mock_impls.py b/tests/process_manager/process_manager_mock_impls.py index b9b84cf23..4b4aad7a6 100644 --- a/tests/process_manager/process_manager_mock_impls.py +++ b/tests/process_manager/process_manager_mock_impls.py @@ -9,6 +9,7 @@ from typing import Optional from unittest.mock import Mock +from druncschema.generic_pb2 import OutcomeFlag, OutcomeStatus from druncschema.process_manager_pb2 import ( BootRequest, LogLines, @@ -102,3 +103,12 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList: return self._not_implemented_response() + + def _send_msg_impl( + self, msg: str | None = None, peer: str | None = None + ) -> OutcomeStatus: + """ + Returns an empty response to indicate communication is working. + Accepts an optional message parameter for compatibility with new API. + """ + return OutcomeStatus(flag=OutcomeFlag.SUCCESS)