From 977ab6abec7fcadca929ce1521e3d1684abfba20 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Fri, 17 Apr 2026 08:38:42 +0100 Subject: [PATCH 01/15] add more detailed exit status --- .../process_manager/ssh_process_manager.py | 90 +++++++++----- src/drunc/processes/exit_status.py | 100 +++++++++++++++ .../processes/ssh_process_lifetime_manager.py | 24 ++-- ...ss_lifetime_manager_from_forked_process.py | 39 +++--- .../ssh_process_lifetime_manager_shell.py | 117 +++++++++++------- 5 files changed, 264 insertions(+), 106 deletions(-) create mode 100644 src/drunc/processes/exit_status.py diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index e80d421b7..9a8708322 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -19,6 +19,7 @@ from drunc.exceptions import DruncCommandException from drunc.process_manager.process_manager import ProcessManager +from drunc.processes.exit_status import ExitStatus from drunc.processes.ssh_process_lifetime_manager import ProcessLifetimeManager @@ -55,8 +56,8 @@ def __init__( logger=self.log, on_process_exit=self._on_ssh_process_exit, ) - # stores the exit codes for all dead processes by uuid - self.archived_exit_codes: dict[str, int] = {} + # stores the exit statuses for all dead processes by uuid + self.archived_exit_statuses: dict[str, ExitStatus] = {} def _build_process_instance( self, @@ -105,7 +106,10 @@ def _get_process_timeouts(self, uuids: List[str]) -> dict[str, float]: return process_timeouts def _on_ssh_process_exit( - self, uuid: str, exit_code: Optional[int], exception: Optional[Exception] + self, + uuid: str, + exit_status: Optional[ExitStatus], + exception: Optional[Exception], ) -> None: if uuid not in self.boot_request: return @@ -115,16 +119,23 @@ def _on_ssh_process_exit( f"Process with UUID {uuid} threw an exception when we tried to kill it: {exception!s}" ) - if exit_code is None: + if exit_status is None: self.log.error( f"Process with UUID {uuid} is still running but on_ssh_process_exit was called." ) return else: - self.log.debug(f"Process with UUID {uuid} exited with code {exit_code}.") + self.log.debug( + f"Process with UUID {uuid} exited with status {exit_status!r} triggering on_ssh_process_exit." + ) - if uuid not in self.archived_exit_codes: - self.archived_exit_codes[uuid] = exit_code + # Processes killed cleanly via the kill endpoint will already + # have their exit code and dead status recorded, so there is no benefit from + # overwriting it asynchronously here. This is only used to + # record exit codes for processes that were killed unexpectedly + # (e.g. due to a crash or external kill signal) + if uuid not in self.archived_exit_statuses: + self.archived_exit_statuses[uuid] = exit_status if uuid not in self.expected_dead_applications: self.add_process_to_expected_dead_processes(uuid) @@ -133,7 +144,12 @@ def _on_ssh_process_exit( session = boot_req.process_description.metadata.session user = boot_req.process_description.metadata.user - self.notify_join(name=name, session=session, user=user, exit_code=exit_code) + self.notify_join( + name=name, + session=session, + user=user, + exit_status=self.archived_exit_statuses[uuid], + ) def kill_processes(self, uuids: list) -> ProcessInstanceList: """ @@ -148,21 +164,27 @@ def kill_processes(self, uuids: list) -> ProcessInstanceList: Returns: ProcessInstanceList containing status of terminated processes """ - # Delegate shutdown to lifetime manager and retrieve exit codes - exit_codes = self.ssh_lifetime_manager.kill_processes( + # Delegate shutdown to lifetime manager and retrieve exit statuses + exit_statuses = self.ssh_lifetime_manager.kill_processes( uuids, self._get_process_timeouts(uuids) ) for proc_uuid in uuids: self.add_process_to_expected_dead_processes(proc_uuid) - self.archived_exit_codes.update(exit_codes) + for proc_uuid, exit_status in exit_statuses.items(): + if exit_status is not None: + self.archived_exit_statuses[proc_uuid] = exit_status # Build ProcessInstance objects from termination results ret = [ self._build_process_instance( uuid=uuid, status_code=ProcessInstance.StatusCode.DEAD, - return_code=exit_codes.get(uuid), + return_code=( + exit_statuses[uuid].get_reported_exit_code() + if exit_statuses.get(uuid) is not None + else None + ), ) for uuid in uuids ] @@ -296,9 +318,9 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines: flag=ResponseFlag.UNHANDLED_EXCEPTION_THROWN, ) - def notify_join(self, name, session, user, exit_code): + def notify_join(self, name, session, user, exit_status: ExitStatus): self.log.debug(f"{self.name} sending broadcast after ssh process exit") - end_str = f"Process '{name}' (session: '{session}', user: '{user}') process exited with exit code {exit_code}" + end_str = exit_status.get_process_manager_log_message(name, session, user) self.log.info(end_str) self.broadcast(end_str, BroadcastType.SUBPROCESS_STATUS_UPDATE) @@ -370,12 +392,14 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: # Query current process status alive = self.ssh_lifetime_manager.is_process_alive(uuid) - return_code = self.ssh_lifetime_manager.pop_early_exit_code(uuid) + return_status = self.ssh_lifetime_manager.pop_early_exit_status(uuid) # Archive exit code if process exited early - if return_code is not None: - self.log.debug(f"Process {uuid} exited early with exit code: {return_code}") - self.archived_exit_codes[uuid] = return_code + if return_status is not None: + self.log.debug( + f"Process {uuid} exited early with exit status: {return_status!r}" + ) + self.archived_exit_statuses[uuid] = return_status # Determine status code based on liveness status_code = ( @@ -388,7 +412,11 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: pi = self._build_process_instance( uuid=uuid, status_code=status_code, - return_code=return_code, + return_code=( + return_status.get_reported_exit_code() + if return_status is not None + else None + ), ) return pi @@ -443,13 +471,13 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList: ret += [pi] continue - exit_code = self.archived_exit_codes.get(proc_uuid, None) + exit_status = self.archived_exit_statuses.get(proc_uuid, None) - if exit_code is not None: + if exit_status is not None: pi = self._build_process_instance( uuid=proc_uuid, status_code=ProcessInstance.StatusCode.DEAD, - return_code=exit_code, + return_code=exit_status.get_reported_exit_code(), ) else: pi = self._build_process_instance( @@ -501,9 +529,11 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: # reported as unexpectedly dead self.add_process_to_expected_dead_processes(uuid) - self.archived_exit_codes[uuid] = self.ssh_lifetime_manager.kill_process( + exit_status = self.ssh_lifetime_manager.kill_process( uuid, self.configuration.data.kill_timeout ) + if exit_status is not None: + self.archived_exit_statuses[uuid] = exit_status del self.boot_request[uuid] @@ -512,7 +542,7 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList: # Remove the application from the list of dead applications self.remove_process_from_expected_dead_processes(uuid) - del self.archived_exit_codes[uuid] + self.archived_exit_statuses.pop(uuid, None) del uuid del same_uuid_br del same_uuid @@ -605,7 +635,7 @@ def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList: Matches processes against the query, checks each for liveness via the SSH lifetime manager, and removes any dead ones from boot_request and - archived_exit_codes. Only dead processes are affected by this command. + archived_exit_statuses. Only dead processes are affected by this command. Args: query: ProcessQuery specifying which processes to consider for flushing. @@ -649,12 +679,16 @@ def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList: ) continue - return_code = self.archived_exit_codes.pop(proc_uuid, None) + exit_status = self.archived_exit_statuses.pop(proc_uuid, None) pi = self._build_process_instance( uuid=proc_uuid, status_code=ProcessInstance.StatusCode.DEAD, - return_code=return_code, + return_code=( + exit_status.get_reported_exit_code() + if exit_status is not None + else None + ), ) del self.boot_request[proc_uuid] @@ -666,7 +700,7 @@ def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList: self.log.info( f"Flushed dead process {proc_uuid} " f"(name: {pi.process_description.metadata.name}, " - f"exit code: {return_code})." + f"exit code: {pi.return_code})." ) flushed.append(pi) diff --git a/src/drunc/processes/exit_status.py b/src/drunc/processes/exit_status.py new file mode 100644 index 000000000..7641e5a32 --- /dev/null +++ b/src/drunc/processes/exit_status.py @@ -0,0 +1,100 @@ +from enum import Enum +from typing import Optional + + +class ExitStatusSource(Enum): + CLIENT_MONITORING = "client_monitoring" + REMOTE_MONITORING = "remote_monitoring" + MANUAL_KILL = "manual_kill" + + +class ExitStatus: + def __init__( + self, + source: ExitStatusSource, + raw_exit_code: Optional[int], + ) -> None: + self._source = source + self._raw_exit_code = raw_exit_code + self._reported_exit_code, self._message_fragment = self._interpret() + + def _interpret(self) -> tuple[Optional[int], str]: + if self._raw_exit_code is None: + return None, "exit state could not be determined" + + if self._source is ExitStatusSource.CLIENT_MONITORING: + return self._interpret_client_monitoring() + + if self._source is ExitStatusSource.REMOTE_MONITORING: + return self._interpret_remote_monitoring() + + return self._interpret_manual_kill() + + def _interpret_client_monitoring(self) -> tuple[Optional[int], str]: + if self._raw_exit_code == 255: + return ( + 0, + "was terminated cleanly after SSH client monitoring ended with status 255 following local SIGQUIT fallback", + ) + + if self._raw_exit_code == -9: + return ( + -9, + "lost its SSH client because that client was SIGKILLed externally while client monitoring was active", + ) + + return ( + self._raw_exit_code, + f"exited while relying on SSH client monitoring with raw exit status {self._raw_exit_code}", + ) + + def _interpret_remote_monitoring(self) -> tuple[Optional[int], str]: + return ( + self._raw_exit_code, + f"exited while being monitored through the remote PID watcher with raw exit status {self._raw_exit_code}", + ) + + def _interpret_manual_kill(self) -> tuple[Optional[int], str]: + if self._raw_exit_code == 255: + return ( + 0, + "was terminated cleanly by the process manager; SSH client shutdown reported raw exit status 255 and is normalised to 0", + ) + + if self._raw_exit_code == 0: + return 0, "was terminated cleanly by the process manager" + + return ( + self._raw_exit_code, + f"was terminated by the process manager with raw exit status {self._raw_exit_code}", + ) + + def get_source(self) -> ExitStatusSource: + return self._source + + def get_raw_exit_code(self) -> Optional[int]: + return self._raw_exit_code + + def get_reported_exit_code(self) -> Optional[int]: + return self._reported_exit_code + + def get_process_manager_log_message( + self, + process_name: str, + session: str, + user: str, + ) -> str: + return ( + f"Process '{process_name}' (session: '{session}', user: '{user}') " + f"{self._message_fragment}. " + f"Reported exit code: {self._reported_exit_code}." + ) + + def __repr__(self) -> str: + return ( + "ExitStatus(" + f"source={self._source.value!r}, " + f"raw_exit_code={self._raw_exit_code!r}, " + f"reported_exit_code={self._reported_exit_code!r}" + ")" + ) diff --git a/src/drunc/processes/ssh_process_lifetime_manager.py b/src/drunc/processes/ssh_process_lifetime_manager.py index 031a3d20d..d015faf56 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager.py +++ b/src/drunc/processes/ssh_process_lifetime_manager.py @@ -12,6 +12,7 @@ from druncschema.process_manager_pb2 import BootRequest from drunc.processes.connection_utils import wait_for +from drunc.processes.exit_status import ExitStatus @dataclass @@ -116,24 +117,24 @@ def is_process_alive(self, uuid: str) -> bool: pass @abstractmethod - def pop_early_exit_code(self, uuid: str) -> Optional[int]: + def pop_early_exit_status(self, uuid: str) -> Optional[ExitStatus]: """ If a process was killed before kill_process was called. This method - retrieves and removes the exit code from internal storage. Otherwise + retrieves and removes the exit status from internal storage. Otherwise it will return None. Args: uuid: Process UUID Returns: - Exit code if process is dead, None if still running or not found + ExitStatus if process is dead, None if still running or not found """ pass @abstractmethod def kill_process( self, uuid: str, timeout: float = DEFAULT_TIMEOUT_FOR_KILLING_PROCESS - ) -> Optional[int]: + ) -> Optional[ExitStatus]: """ Kill a remote process and clean up associated resources upon successful termination. Sends termination signals to the remote process and waits for it to die. @@ -145,7 +146,8 @@ def kill_process( timeout: Timeout for graceful termination in seconds Returns: - the exit code of the process if it was able to be determined (None otherwise). + the interpreted exit status of the process if it was able to be determined + (None otherwise). """ pass @@ -169,7 +171,7 @@ def crash_process(self, uuid: str) -> None: @abstractmethod def kill_processes( self, uuids: List[str], process_timeouts: Optional[Dict[str, float]] = None - ) -> Dict[str, Optional[int]]: + ) -> Dict[str, Optional[ExitStatus]]: """ Kill multiple processes by their UUIDs in role-based shutdown order. @@ -185,7 +187,7 @@ def kill_processes( timeout for unmapped UUIDs. Returns: - Dictionary mapping process UUIDs to their exit codes. None indicates + Dictionary mapping process UUIDs to their exit statuses. None indicates exit code could not be determined. """ pass @@ -193,7 +195,7 @@ def kill_processes( @abstractmethod def kill_all_processes( self, process_timeouts: Optional[Dict[str, float]] = None - ) -> Dict[str, Optional[int]]: + ) -> Dict[str, Optional[ExitStatus]]: """ Kill all managed processes and clean up resources. @@ -205,7 +207,7 @@ def kill_all_processes( If not specified a default timeout will be used for all processes. Returns: - Dictionary mapping process UUIDs to their exit codes (None if not determined) + Dictionary mapping process UUIDs to their exit statuses (None if not determined) """ pass @@ -215,7 +217,7 @@ def kill_processes_by_role( role: str, candidate_uuids: List[str], process_timeouts: Optional[Dict[str, float]] = None, - ) -> Dict[str, Optional[int]]: + ) -> Dict[str, Optional[ExitStatus]]: """ Kill all processes with the specified role from candidate UUID list. @@ -229,7 +231,7 @@ def kill_processes_by_role( in seconds. Uses default timeout for unmapped UUIDs. Returns: - Dictionary mapping terminated process UUIDs to their exit codes. + Dictionary mapping terminated process UUIDs to their exit statuses. Only includes processes matching the specified role. """ pass diff --git a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py index 8bce21ff9..3af19eb3b 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py @@ -16,6 +16,7 @@ from druncschema.process_manager_pb2 import BootRequest +from drunc.processes.exit_status import ExitStatus from drunc.processes.ssh_process_lifetime_manager import ( ProcessLifetimeManager, RemotePidResult, @@ -112,7 +113,7 @@ def _worker_process_main( (request_id, result, error) tuples onto response_queue. Process-exit callbacks are forwarded to the parent via callback_queue as - (uuid, exit_code, exception_string) tuples. + (uuid, exit_status, exception_string) tuples. A None sentinel placed on request_queue causes a clean shutdown. @@ -133,13 +134,13 @@ def _worker_process_main( def _on_process_exit( uuid: str, - exit_code: Optional[int], + exit_status: Optional[ExitStatus], exception: Optional[Exception], ) -> None: """Relay process-exit events back to the parent via the callback queue.""" try: callback_queue.put_nowait( - (uuid, exit_code, str(exception) if exception is not None else None) + (uuid, exit_status, str(exception) if exception is not None else None) ) except Exception: pass # Never raise inside a background callback @@ -208,7 +209,7 @@ def __init__( disable_localhost_host_key_check: bool = False, logger: Optional[logging.Logger] = None, on_process_exit: Optional[ - Callable[[str, Optional[int], Optional[Exception]], None] + Callable[[str, Optional[ExitStatus], Optional[Exception]], None] ] = None, ) -> None: """ @@ -224,7 +225,7 @@ def __init__( its own independent logger. on_process_exit: Optional callback invoked in the *parent* process when a managed - process exits. Signature: (uuid, exit_code, exception). + process exits. Signature: (uuid, exit_status, exception). The exception is reconstructed as a RuntimeError from the serialised message forwarded by the child process. """ @@ -395,7 +396,7 @@ def _run_callback_dispatcher(self) -> None: if message is None: break - uuid, exit_code, exception_string = message + uuid, exit_status, exception_string = message if self._on_process_exit is not None: try: @@ -405,7 +406,7 @@ def _run_callback_dispatcher(self) -> None: if exception_string is not None else None ) - self._on_process_exit(uuid, exit_code, exception) + self._on_process_exit(uuid, exit_status, exception) except Exception as exc: self.log.error( f"Error in on_process_exit callback for process {uuid}: {exc}" @@ -498,24 +499,24 @@ def is_process_alive(self, uuid: str) -> bool: """ return self._call("is_process_alive", uuid) - def pop_early_exit_code(self, uuid: str) -> Optional[int]: + def pop_early_exit_status(self, uuid: str) -> Optional[ExitStatus]: """ - Retrieve and remove the exit code of a process that exited unexpectedly. + Retrieve and remove the exit status of a process that exited unexpectedly. Args: uuid: Process UUID. Returns: - Exit code if the process terminated early without being explicitly killed, + ExitStatus if the process terminated early without being explicitly killed, None if still running or not found. """ - return self._call("pop_early_exit_code", uuid) + return self._call("pop_early_exit_status", uuid) def kill_process( self, uuid: str, timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, - ) -> Optional[int]: + ) -> Optional[ExitStatus]: """ Kill a remote process and clean up its resources. @@ -524,7 +525,7 @@ def kill_process( timeout: Graceful termination timeout in seconds. Returns: - Exit code of the terminated process, or None if undetermined. + ExitStatus of the terminated process, or None if undetermined. """ return self._call("kill_process", uuid, timeout) @@ -545,7 +546,7 @@ def kill_processes( self, uuids: List[str], process_timeouts: Optional[Dict[str, float]] = None, - ) -> Dict[str, Optional[int]]: + ) -> Dict[str, Optional[ExitStatus]]: """ Kill multiple processes in role-based shutdown order. @@ -554,14 +555,14 @@ def kill_processes( process_timeouts: Optional per-UUID timeout overrides in seconds. Returns: - Dictionary mapping process UUIDs to their exit codes. + Dictionary mapping process UUIDs to their exit statuses. """ return self._call("kill_processes", uuids, process_timeouts) def kill_all_processes( self, process_timeouts: Optional[Dict[str, float]] = None, - ) -> Dict[str, Optional[int]]: + ) -> Dict[str, Optional[ExitStatus]]: """ Kill all managed processes. @@ -569,7 +570,7 @@ def kill_all_processes( process_timeouts: Optional per-UUID timeout overrides in seconds. Returns: - Dictionary mapping all process UUIDs to their exit codes. + Dictionary mapping all process UUIDs to their exit statuses. """ return self._call("kill_all_processes", process_timeouts) @@ -578,7 +579,7 @@ def kill_processes_by_role( role: str, candidate_uuids: List[str], process_timeouts: Optional[Dict[str, float]] = None, - ) -> Dict[str, Optional[int]]: + ) -> Dict[str, Optional[ExitStatus]]: """ Kill all processes with the specified role from the candidate UUID list. @@ -588,7 +589,7 @@ def kill_processes_by_role( process_timeouts: Optional per-UUID timeout overrides in seconds. Returns: - Dictionary mapping terminated process UUIDs to their exit codes. + Dictionary mapping terminated process UUIDs to their exit statuses. """ return self._call( "kill_processes_by_role", role, candidate_uuids, process_timeouts diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index ddcd00b1e..00e788db2 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -18,6 +18,7 @@ from drunc.process_manager.configuration import PROCESS_SHUTDOWN_ORDERING from drunc.process_manager.utils import on_parent_exit from drunc.processes.connection_utils import wait_for +from drunc.processes.exit_status import ExitStatus, ExitStatusSource from drunc.processes.process_metadata import ProcessMetadata from drunc.processes.ssh_process_lifetime_manager import ( ProcessLifetimeManager, @@ -39,7 +40,9 @@ def __init__( hostname: str, user: str, metadata_file: str, - on_exit: Optional[Callable[[str, Optional[int], Optional[Exception]], None]], + on_exit: Optional[ + Callable[[str, Optional[ExitStatus], Optional[Exception]], None] + ], logger: logging.Logger, ): """ @@ -77,6 +80,7 @@ def run(self): self.hostname, self.user, ) + metadata = None if metadata: with self.manager.lock: self.manager.metadata[self.uuid] = metadata @@ -120,7 +124,7 @@ def _monitor_remote_process(self, remote_pid: int) -> None: Uses SSH to run a blocking command that exits when the process dies. """ exception = None - exit_code = None + raw_exit_code = None try: user_host = f"{self.user}@{self.hostname}" @@ -147,23 +151,25 @@ def _monitor_remote_process(self, remote_pid: int) -> None: ) self.process.wait() - exit_code = self.process.exit_code + raw_exit_code = self.process.exit_code self.logger.debug( - f"SSH client for {self.uuid} exited with code {exit_code}" + f"SSH client for {self.uuid} exited with code {raw_exit_code}" ) except sh.ErrorReturnCode as e: exception = e - exit_code = e.exit_code + raw_exit_code = e.exit_code self.logger.debug(f"Remote process {self.uuid} monitoring error: {e}") except Exception as e: exception = e self.logger.error(f"Remote process {self.uuid} watcher error: {e}") + exit_status = ExitStatus(ExitStatusSource.REMOTE_MONITORING, raw_exit_code) + # Invoke callback with results if self.on_exit: try: - self.on_exit(self.uuid, exit_code, exception) + self.on_exit(self.uuid, exit_status, exception) except Exception as callback_error: self.logger.error( f"Error in process exit callback for {self.uuid}: {callback_error}" @@ -175,27 +181,29 @@ def _monitor_ssh_client(self) -> None: fallback if the remote PID of the process is unavailable. """ exception = None - exit_code = None + raw_exit_code = None try: self.process.wait() - exit_code = self.process.exit_code + raw_exit_code = self.process.exit_code self.logger.debug( - f"SSH client for {self.uuid} exited with code {exit_code}" + f"SSH client for {self.uuid} exited with code {raw_exit_code}" ) except sh.ErrorReturnCode as e: exception = e - exit_code = e.exit_code + raw_exit_code = e.exit_code self.logger.debug(f"SSH client for {self.uuid} error: {e}") except Exception as e: exception = e self.logger.error(f"SSH client for {self.uuid} watcher error: {e}") + exit_status = ExitStatus(ExitStatusSource.CLIENT_MONITORING, raw_exit_code) + if self.on_exit: try: - self.on_exit(self.uuid, exit_code, exception) + self.on_exit(self.uuid, exit_status, exception) except Exception as callback_error: self.logger.error( f"Error in process exit callback for {self.uuid}: {callback_error}" @@ -224,7 +232,7 @@ def __init__( disable_localhost_host_key_check: bool = False, logger: Optional[logging.Logger] = None, on_process_exit: Optional[ - Callable[[str, Optional[int], Optional[Exception]], None] + Callable[[str, Optional[ExitStatus], Optional[Exception]], None] ] = None, ): """ @@ -234,7 +242,7 @@ def __init__( disable_host_key_check: Disable SSH host key verification for all hosts disable_localhost_host_key_check: Disable SSH host key verification for localhost logger: Logger instance for real-time output logging - on_process_exit: Optional callback function(uuid, exit_code, exception) invoked when process exits + on_process_exit: Optional callback function(uuid, exit_status, exception) invoked when process exits """ self.disable_host_key_check = disable_host_key_check self.disable_localhost_host_key_check = disable_localhost_host_key_check @@ -414,7 +422,7 @@ def is_process_alive(self, uuid: str) -> bool: ) return process.is_alive() and remote_process_alive - def pop_early_exit_code(self, uuid: str) -> Optional[int]: + def pop_early_exit_status(self, uuid: str) -> Optional[ExitStatus]: """ Get process exit code if process exited early without being killed. @@ -426,7 +434,7 @@ def pop_early_exit_code(self, uuid: str) -> Optional[int]: uuid: Process UUID Returns: - Exit code if process has terminated early, None if still running or not found + ExitStatus if process has terminated early, None if still running or not found """ if uuid not in self.process_store: self.log.debug(f"Process {uuid} not found in store for exit code retrieval") @@ -437,21 +445,29 @@ def pop_early_exit_code(self, uuid: str) -> Optional[int]: return None try: + process.wait() early_exit_code = process.exit_code + except sh.ErrorReturnCode as e: + early_exit_code = e.exit_code except Exception as e: self.log.debug(f"Exception thrown getting exit code for {uuid}: {e}") return None if early_exit_code is not None: + exit_status = ExitStatus( + ExitStatusSource.CLIENT_MONITORING, + early_exit_code, + ) self.log.warning( - f"Process {uuid} exited early without being killed. Exit code {early_exit_code}" + f"Process {uuid} exited early without being killed. Exit status {exit_status!r}" ) self.log.debug( - f"Cleaning up resources for process {uuid} with exit code {early_exit_code}" + f"Cleaning up resources for process {uuid} with exit status {exit_status!r}" ) self._cleanup_process_resources(uuid) + return exit_status - return early_exit_code + return None def get_process_stdout(self, uuid: str) -> Optional[str]: """ @@ -506,7 +522,7 @@ def kill_processes_by_role( role: str, candidate_uuids: List[str], process_timeouts: Optional[Dict[str, float]] = None, - ) -> Dict[str, Optional[int]]: + ) -> Dict[str, Optional[ExitStatus]]: """ Kill all processes with the specified role from candidate UUID list. @@ -520,7 +536,7 @@ def kill_processes_by_role( in seconds. Uses default timeout for unmapped UUIDs. Returns: - Dictionary mapping terminated process UUIDs to their exit codes + Dictionary mapping terminated process UUIDs to their exit statuses """ self.log.debug(f"process_timeouts: {process_timeouts}") if process_timeouts is None: @@ -547,7 +563,7 @@ def kill_processes_by_role( f"from {len(candidate_uuids)} candidates" ) - exit_codes: Dict[str, Optional[int]] = {} + exit_statuses: Dict[str, Optional[ExitStatus]] = {} # Terminate processes asynchronously using thread pool with ThreadPoolExecutor(max_workers=len(uuids_to_kill)) as executor: @@ -561,19 +577,18 @@ def kill_processes_by_role( for future in as_completed(future_to_uuid): uuid = future_to_uuid[future] try: - exit_code = future.result() - exit_codes[uuid] = exit_code + exit_statuses[uuid] = future.result() except Exception as e: self.log.error( f"Error during termination of process {uuid} with role '{role}': {e}" ) - exit_codes[uuid] = None + exit_statuses[uuid] = None - return exit_codes + return exit_statuses def kill_processes( self, uuids: List[str], process_timeouts: Optional[Dict[str, float]] = None - ) -> Dict[str, Optional[int]]: + ) -> Dict[str, Optional[ExitStatus]]: """ Kill multiple processes by their UUIDs in role-based shutdown order. @@ -587,7 +602,7 @@ def kill_processes( in seconds. Uses default timeout for unmapped UUIDs. Returns: - Dictionary mapping process UUIDs to their exit codes + Dictionary mapping process UUIDs to their exit statuses """ if not uuids: self.log.debug("No processes to kill") @@ -601,7 +616,7 @@ def kill_processes( if uuid not in process_timeouts: process_timeouts[uuid] = self.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS - all_exit_codes: Dict[str, Optional[int]] = {} + all_exit_statuses: Dict[str, Optional[ExitStatus]] = {} killed_uuids = set() # Execute role-based shutdown in stages @@ -609,13 +624,13 @@ def kill_processes( self.log.info( f"--- Shutdown stage: Terminating role '{role}' from provided UUIDs ---" ) - role_exit_codes = self.kill_processes_by_role( + role_exit_statuses = self.kill_processes_by_role( role, uuids, process_timeouts=process_timeouts ) - all_exit_codes.update(role_exit_codes) - killed_uuids.update(role_exit_codes.keys()) + all_exit_statuses.update(role_exit_statuses) + killed_uuids.update(role_exit_statuses.keys()) - if role_exit_codes: + if role_exit_statuses: self.log.info(f"--- Shutdown stage: Role '{role}' complete ---") # Identify processes not killed during role-based shutdown @@ -628,7 +643,7 @@ def kill_processes( ) # Kill remaining processes asynchronously without role ordering - fallback_exit_codes: Dict[str, Optional[int]] = {} + fallback_exit_statuses: Dict[str, Optional[ExitStatus]] = {} with ThreadPoolExecutor(max_workers=len(remaining_uuids)) as executor: # Submit kill tasks for all remaining processes @@ -643,21 +658,20 @@ def kill_processes( for future in as_completed(future_to_uuid): uuid = future_to_uuid[future] try: - exit_code = future.result() - fallback_exit_codes[uuid] = exit_code + fallback_exit_statuses[uuid] = future.result() except Exception as e: self.log.error( f"Error during fallback termination of process {uuid}: {e}" ) - fallback_exit_codes[uuid] = None + fallback_exit_statuses[uuid] = None - all_exit_codes.update(fallback_exit_codes) + all_exit_statuses.update(fallback_exit_statuses) - return all_exit_codes + return all_exit_statuses def kill_all_processes( self, process_timeouts: Optional[Dict[str, float]] = None - ) -> Dict[str, Optional[int]]: + ) -> Dict[str, Optional[ExitStatus]]: """ Kill all active processes in role-based shutdown order. @@ -670,7 +684,7 @@ def kill_all_processes( in seconds. Uses default timeout for unmapped UUIDs. Returns: - Dictionary mapping all process UUIDs to their exit codes + Dictionary mapping all process UUIDs to their exit statuses """ # Retrieve all active process UUIDs with self.lock: @@ -683,7 +697,7 @@ def kill_all_processes( self.log.info(f"Terminating all {len(active_uuids)} active process(es)") # Delegate to kill_processes for role-based shutdown ordering - all_exit_codes = self.kill_processes(active_uuids, process_timeouts) + all_exit_statuses = self.kill_processes(active_uuids, process_timeouts) # Wait for all watcher threads to complete with self.lock: @@ -698,7 +712,7 @@ def kill_all_processes( with self.lock: self.watchers.clear() - return all_exit_codes + return all_exit_statuses def _build_ssh_arguments( self, hostname: str, user_host: str, use_tty: bool = True @@ -974,7 +988,7 @@ def _kill_client_process(self, process_info: Dict) -> None: recieve a SIGHUP when the SSH client terminates. """ try: - process_info["process"].signal_group(signal.SIGKILL) + process_info["process"].signal_group(signal.SIGQUIT) except Exception as e: self.log.debug( f"Exception was raised when terminating SSH client process: {e}" @@ -1005,19 +1019,22 @@ def check_exit_status(): if got_exit: try: + process.wait() return process.exit_code + except sh.ErrorReturnCode as e: + return e.exit_code except Exception as e: self.log.debug(f"Exception getting exit code for {uuid}: {e}") return None else: - self.log.debug(f"Timeout waiting for exit code of process {uuid}") + self.log.warning(f"Timeout waiting for exit code of process {uuid}") return None def kill_process( self, uuid: str, timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, - ) -> int | None: + ) -> ExitStatus | None: """ Kill a remote process and clean up all associated resources. @@ -1030,7 +1047,7 @@ def kill_process( timeout: Timeout for graceful termination in seconds Returns: - Exit code of the terminated process, or None if not found or still running + ExitStatus of the terminated process, or None if not found or still running """ if uuid not in self.process_store: return None @@ -1050,7 +1067,9 @@ def kill_process( self._kill_client_process(process_info) exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) self._cleanup_process_resources(uuid) - return exit_code + if exit_code is None: + return None + return ExitStatus(ExitStatusSource.MANUAL_KILL, exit_code) remote_pid = metadata.pid process_dead = False @@ -1101,7 +1120,9 @@ def kill_process( exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) self._cleanup_remote_file(hostname, user, metadata_file) self._cleanup_process_resources(uuid) - return exit_code + if exit_code is None: + return None + return ExitStatus(ExitStatusSource.MANUAL_KILL, exit_code) except Exception as e: self.log.error(f"Error terminating remote process {uuid}: {e}") From 9eb819c9ffc53b41848ec62192808341ab4767e1 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Tue, 28 Apr 2026 16:26:25 +0100 Subject: [PATCH 02/15] clearer exit status logging --- src/drunc/processes/exit_status.py | 53 +++++++++------ .../ssh_process_lifetime_manager_shell.py | 65 ++++++++++++++++--- 2 files changed, 89 insertions(+), 29 deletions(-) diff --git a/src/drunc/processes/exit_status.py b/src/drunc/processes/exit_status.py index 7641e5a32..fc7b96820 100644 --- a/src/drunc/processes/exit_status.py +++ b/src/drunc/processes/exit_status.py @@ -5,7 +5,8 @@ class ExitStatusSource(Enum): CLIENT_MONITORING = "client_monitoring" REMOTE_MONITORING = "remote_monitoring" - MANUAL_KILL = "manual_kill" + MANUAL_KILL_THROUGH_SSH_CLIENT = "manual_kill_through_ssh_client" + MANUAL_KILL_THROUGH_REMOTE_PID = "manual_kill_through_remote_pid" class ExitStatus: @@ -18,7 +19,7 @@ def __init__( self._raw_exit_code = raw_exit_code self._reported_exit_code, self._message_fragment = self._interpret() - def _interpret(self) -> tuple[Optional[int], str]: + def _interpret(self) -> tuple[None | int, str]: if self._raw_exit_code is None: return None, "exit state could not be determined" @@ -28,53 +29,65 @@ def _interpret(self) -> tuple[Optional[int], str]: if self._source is ExitStatusSource.REMOTE_MONITORING: return self._interpret_remote_monitoring() - return self._interpret_manual_kill() + if self._source is ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT: + return self._interpret_manual_kill_through_ssh_client() - def _interpret_client_monitoring(self) -> tuple[Optional[int], str]: + if self._source is ExitStatusSource.MANUAL_KILL_THROUGH_REMOTE_PID: + return self._interpret_manual_kill_through_remote_pid() + + return None, "Unrecognised exit status" + + def _interpret_client_monitoring(self) -> tuple[None | int, str]: if self._raw_exit_code == 255: return ( - 0, - "was terminated cleanly after SSH client monitoring ended with status 255 following local SIGQUIT fallback", + None, + "was terminated unexpectedly with SIGQUIT on the SSH client (SIGHUP on the server)", ) if self._raw_exit_code == -9: return ( - -9, - "lost its SSH client because that client was SIGKILLed externally while client monitoring was active", + None, + "was terminated unexpectedly by a SIGKILL to the SSH client (SIGHUP on the server)", ) return ( self._raw_exit_code, - f"exited while relying on SSH client monitoring with raw exit status {self._raw_exit_code}", + f"was terminated unexpectedly with unusual SSH client exit code of {self._raw_exit_code}", ) def _interpret_remote_monitoring(self) -> tuple[Optional[int], str]: return ( self._raw_exit_code, - f"exited while being monitored through the remote PID watcher with raw exit status {self._raw_exit_code}", + "was terminated unexpectedly through the remote pid", ) - def _interpret_manual_kill(self) -> tuple[Optional[int], str]: - if self._raw_exit_code == 255: + def _interpret_manual_kill_through_ssh_client(self) -> tuple[Optional[int], str]: + if self._raw_exit_code == 255 or self._raw_exit_code == 0: return ( - 0, - "was terminated cleanly by the process manager; SSH client shutdown reported raw exit status 255 and is normalised to 0", + None, + "was terminated by the process manager through the SSH client", ) - if self._raw_exit_code == 0: - return 0, "was terminated cleanly by the process manager" + if self._raw_exit_code == -9: + return ( + None, + "was terminated with a SIGKILL by the process manager through the SSH client", + ) return ( self._raw_exit_code, - f"was terminated by the process manager with raw exit status {self._raw_exit_code}", + f"was terminated by the process manager through the SSH client with an unusual exit code of: {self._raw_exit_code}", + ) + + def _interpret_manual_kill_through_remote_pid(self) -> tuple[Optional[int], str]: + return ( + self._raw_exit_code, + "was terminated by the process manager through the remote pid", ) def get_source(self) -> ExitStatusSource: return self._source - def get_raw_exit_code(self) -> Optional[int]: - return self._raw_exit_code - def get_reported_exit_code(self) -> Optional[int]: return self._reported_exit_code diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index 00e788db2..eb24ace75 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -80,7 +80,6 @@ def run(self): self.hostname, self.user, ) - metadata = None if metadata: with self.manager.lock: self.manager.metadata[self.uuid] = metadata @@ -164,7 +163,12 @@ def _monitor_remote_process(self, remote_pid: int) -> None: exception = e self.logger.error(f"Remote process {self.uuid} watcher error: {e}") - exit_status = ExitStatus(ExitStatusSource.REMOTE_MONITORING, raw_exit_code) + exit_status = ExitStatus( + self.manager._consume_exit_status_source( + self.uuid, ExitStatusSource.REMOTE_MONITORING + ), + raw_exit_code, + ) # Invoke callback with results if self.on_exit: @@ -199,7 +203,12 @@ def _monitor_ssh_client(self) -> None: exception = e self.logger.error(f"SSH client for {self.uuid} watcher error: {e}") - exit_status = ExitStatus(ExitStatusSource.CLIENT_MONITORING, raw_exit_code) + exit_status = ExitStatus( + self.manager._consume_exit_status_source( + self.uuid, ExitStatusSource.CLIENT_MONITORING + ), + raw_exit_code, + ) if self.on_exit: try: @@ -264,6 +273,26 @@ def __init__( # metadata for each process self.metadata: Dict[str, ProcessMetadata] = {} + # Override watcher-reported exit sources when kill_process() initiated + # the termination path. + self.manual_exit_status_sources: Dict[str, ExitStatusSource] = {} + + def _set_manual_exit_status_source( + self, uuid: str, source: ExitStatusSource + ) -> None: + with self.lock: + self.manual_exit_status_sources[uuid] = source + + def _clear_manual_exit_status_source(self, uuid: str) -> None: + with self.lock: + self.manual_exit_status_sources.pop(uuid, None) + + def _consume_exit_status_source( + self, uuid: str, default: ExitStatusSource + ) -> ExitStatusSource: + with self.lock: + return self.manual_exit_status_sources.pop(uuid, default) + @staticmethod def get_metadata_file_path(uuid: str) -> str: """ @@ -1064,12 +1093,18 @@ def kill_process( self.log.warning( f"No remote PID for {uuid}, terminating SSH client. Cannot guarantee remote process termination." ) + self._set_manual_exit_status_source( + uuid, ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT + ) self._kill_client_process(process_info) exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) self._cleanup_process_resources(uuid) if exit_code is None: + self._clear_manual_exit_status_source(uuid) return None - return ExitStatus(ExitStatusSource.MANUAL_KILL, exit_code) + return ExitStatus( + ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT, exit_code + ) remote_pid = metadata.pid process_dead = False @@ -1079,9 +1114,17 @@ def kill_process( self.log.info( f"Skipping killing remote process {uuid} (PID {remote_pid}). It is already dead." ) - process_dead = True + exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) + self._cleanup_remote_file(hostname, user, metadata_file) + self._cleanup_process_resources(uuid) + if exit_code is None: + return None + return ExitStatus(ExitStatusSource.REMOTE_MONITORING, exit_code) if not process_dead: + self._set_manual_exit_status_source( + uuid, ExitStatusSource.MANUAL_KILL_THROUGH_REMOTE_PID + ) self.log.debug(f"Sending SIGQUIT to remote PID {remote_pid}") self._send_remote_signal(hostname, user, remote_pid, "QUIT") process_dead = self.wait_for_process_to_die( @@ -1093,7 +1136,7 @@ def kill_process( ) else: self.log.info( - f"Remote process {uuid} (PID {remote_pid}) did not terminate after SIGQUIT signal." + f"Remote process {uuid} (PID {remote_pid}) did not terminate within timeout of {timeout} seconds after SIGQUIT signal." ) if not process_dead: @@ -1108,23 +1151,27 @@ def kill_process( f"Remote process {uuid} (PID {remote_pid}) terminated forcibly following SIGKILL signal." ) else: - self.log.debug( - f"Remote process {uuid} (PID {remote_pid}) did not terminate after SIGKILL signal." + self.log.info( + f"Remote process {uuid} (PID {remote_pid}) did not terminate within timeout of {timeout} seconds after SIGKILL signal." ) if not process_dead: self.log.error( f"Remote process {uuid} (PID {remote_pid}) still did not terminate after SIGKILL signal." ) + self._clear_manual_exit_status_source(uuid) else: exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) self._cleanup_remote_file(hostname, user, metadata_file) self._cleanup_process_resources(uuid) if exit_code is None: return None - return ExitStatus(ExitStatusSource.MANUAL_KILL, exit_code) + return ExitStatus( + ExitStatusSource.MANUAL_KILL_THROUGH_REMOTE_PID, exit_code + ) except Exception as e: + self._clear_manual_exit_status_source(uuid) self.log.error(f"Error terminating remote process {uuid}: {e}") return None From 1a53da005a92f5a2ed3bb7f3296884e4335a4324 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Tue, 28 Apr 2026 17:10:59 +0100 Subject: [PATCH 03/15] add testing that different types of process deaths are matched to the correct logging branch Co-authored-by: Copilot --- .../processes/ssh_process_lifetime_manager.py | 29 +++ ...ss_lifetime_manager_from_forked_process.py | 27 ++ .../ssh_process_lifetime_manager_shell.py | 93 +++++-- ...est_ssh_process_lifetime_manager_common.py | 246 ++++++++++++++++++ ...h_process_lifetime_manager_forked_shell.py | 11 + ...test_ssh_process_lifetime_manager_shell.py | 15 ++ 6 files changed, 400 insertions(+), 21 deletions(-) diff --git a/src/drunc/processes/ssh_process_lifetime_manager.py b/src/drunc/processes/ssh_process_lifetime_manager.py index d015faf56..677b1ada9 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager.py +++ b/src/drunc/processes/ssh_process_lifetime_manager.py @@ -152,6 +152,35 @@ def kill_process( """ pass + @abstractmethod + def kill_process_without_metadata( + self, + uuid: str, + signal_name: str = "QUIT", + as_manual_kill: bool = False, + timeout: float = DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, + ) -> Optional[ExitStatus]: + """ + Terminate a process via SSH client signalling without relying on remote PID metadata. + + This method is used in two contexts: + 1) As an internal fallback from kill_process when metadata/PID is unavailable. + 2) In tests to emulate SSH-client-driven termination paths (e.g. SIGQUIT/SIGKILL). + + Args: + uuid: Process UUID to terminate. + signal_name: Signal to send to the local SSH client process group + (e.g. "QUIT" or "KILL"). + as_manual_kill: If True, classify termination as + MANUAL_KILL_THROUGH_SSH_CLIENT. If False, classify as + CLIENT_MONITORING. + timeout: Maximum time to wait for process termination in seconds. + + Returns: + ExitStatus if termination state can be determined, None otherwise. + """ + pass + @abstractmethod def crash_process(self, uuid: str) -> None: """ diff --git a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py index 3af19eb3b..3f718f018 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py @@ -529,6 +529,33 @@ def kill_process( """ return self._call("kill_process", uuid, timeout) + def kill_process_without_metadata( + self, + uuid: str, + signal_name: str = "QUIT", + as_manual_kill: bool = False, + timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, + ) -> Optional[ExitStatus]: + """ + Kill a process by signalling the SSH client without using remote metadata. + + Args: + uuid: Process UUID to terminate. + signal_name: Signal name to send to SSH client process group. + as_manual_kill: If True, classify as process-manager initiated kill. + timeout: Graceful termination timeout in seconds. + + Returns: + ExitStatus of the terminated process, or None if undetermined. + """ + return self._call( + "kill_process_without_metadata", + uuid, + signal_name, + as_manual_kill, + timeout, + ) + def crash_process(self, uuid: str) -> None: """ Simulate a process crash by sending SIGKILL without performing any cleanup. diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index eb24ace75..c23043cdc 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -1011,18 +1011,75 @@ def _execute_bootrequest_via_ssh( del self.process_store[uuid] raise RuntimeError(f"Failed to execute SSH command for {uuid}: {e}") - def _kill_client_process(self, process_info: Dict) -> None: + def _kill_client_process( + self, process_info: Dict, signal_name: str = "QUIT" + ) -> None: """ - Kill a local SSH client process. The remote process will typically - recieve a SIGHUP when the SSH client terminates. + Send a signal to the local SSH client process group. + + The remote process will typically receive a SIGHUP when the SSH client + terminates. """ try: - process_info["process"].signal_group(signal.SIGQUIT) + signal_name = signal_name.upper() + if signal_name == "QUIT": + process_info["process"].signal_group(signal.SIGQUIT) + return + if signal_name == "KILL": + process_info["process"].signal_group(signal.SIGKILL) + return + raise ValueError(f"Unsupported signal_name '{signal_name}'") except Exception as e: self.log.debug( f"Exception was raised when terminating SSH client process: {e}" ) + def kill_process_without_metadata( + self, + uuid: str, + signal_name: str = "QUIT", + as_manual_kill: bool = False, + timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, + ) -> Optional[ExitStatus]: + """ + Terminate process by signalling the local SSH client without using remote metadata. + + Args: + uuid: Process UUID to terminate + signal_name: Signal to send to SSH client process group (QUIT/KILL) + as_manual_kill: Classify outcome as manual process-manager kill when True + timeout: Maximum time to wait for process termination in seconds + + Returns: + ExitStatus if termination state can be determined, None otherwise + """ + with self.lock: + process_info = self.process_store.get(uuid) + + if process_info is None: + self.log.warning( + f"kill_process_without_metadata called for unknown UUID {uuid}" + ) + return None + + source_for_return = ( + ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT + if as_manual_kill + else ExitStatusSource.CLIENT_MONITORING + ) + # Ensure watcher callbacks classify this termination path as requested. + self._set_manual_exit_status_source(uuid, source_for_return) + + self._kill_client_process(process_info, signal_name=signal_name) + exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) + self._cleanup_process_resources(uuid) + + if exit_code is None: + self._clear_manual_exit_status_source(uuid) + return None + + return ExitStatus(source_for_return, exit_code) + def _wait_for_process_exit_code(self, uuid: str, timeout: float) -> Optional[int]: """ Wait for specified timeout to see if a process exit code is available. @@ -1081,31 +1138,25 @@ def kill_process( if uuid not in self.process_store: return None - process_info = self.process_store[uuid] - - hostname = process_info["hostname"] - user = process_info["user"] - metadata_file = SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid) - # Read metadata to get remote PID metadata = self.metadata.get(uuid, None) if metadata is None or metadata.pid is None: self.log.warning( f"No remote PID for {uuid}, terminating SSH client. Cannot guarantee remote process termination." ) - self._set_manual_exit_status_source( - uuid, ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT - ) - self._kill_client_process(process_info) - exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) - self._cleanup_process_resources(uuid) - if exit_code is None: - self._clear_manual_exit_status_source(uuid) - return None - return ExitStatus( - ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT, exit_code + return self.kill_process_without_metadata( + uuid, + signal_name="QUIT", + as_manual_kill=True, + timeout=timeout, ) + process_info = self.process_store[uuid] + + hostname = process_info["hostname"] + user = process_info["user"] + metadata_file = SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid) + remote_pid = metadata.pid process_dead = False diff --git a/tests/processes/test_ssh_process_lifetime_manager_common.py b/tests/processes/test_ssh_process_lifetime_manager_common.py index 81e960107..741610215 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_common.py +++ b/tests/processes/test_ssh_process_lifetime_manager_common.py @@ -4,8 +4,12 @@ import getpass import tempfile +import threading import uuid +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from pathlib import Path +from typing import Optional from druncschema.process_manager_pb2 import ( BootRequest, @@ -14,6 +18,7 @@ ) from drunc.processes.connection_utils import wait_for +from drunc.processes.exit_status import ExitStatus, ExitStatusSource def create_boot_request(process_name, tree_id, log_file, test_file_path): @@ -51,6 +56,247 @@ def create_boot_request(process_name, tree_id, log_file, test_file_path): return boot_request +@dataclass(frozen=True) +class ExitMessageScenario: + name: str + kill_mode: str + expected_source: ExitStatusSource + expected_message_fragment: str + expected_reported_exit_code: Optional[int] = None + + +def boot_processes_and_verify_exit_state_messages( + ssh_manager, + test_file_path, +): + """ + Boot all exit-status scenarios together and validate emitted status messages. + + This helper starts one process per expected exit-status message variant, + then applies mixed kill strategies (client signal, remote crash, manager kill) + and verifies the callback-delivered ExitStatus source and + get_process_manager_log_message() output for each process. + + Args: + ssh_manager: SSH process lifetime manager implementation under test + test_file_path: Path to the test file (used to locate simple_process.py) + """ + scenarios = [ + ExitMessageScenario( + name="case_client_sigquit", + kill_mode="client_sigquit", + expected_source=ExitStatusSource.CLIENT_MONITORING, + expected_message_fragment="was terminated unexpectedly with SIGQUIT on the SSH client (SIGHUP on the server)", + expected_reported_exit_code=None, + ), + ExitMessageScenario( + name="case_client_sigkill", + kill_mode="client_sigkill", + expected_source=ExitStatusSource.CLIENT_MONITORING, + expected_message_fragment="was terminated unexpectedly by a SIGKILL to the SSH client (SIGHUP on the server)", + expected_reported_exit_code=None, + ), + ExitMessageScenario( + name="case_remote_sigkill", + kill_mode="remote_sigkill", + expected_source=ExitStatusSource.REMOTE_MONITORING, + expected_message_fragment="was terminated unexpectedly through the remote pid", + ), + ExitMessageScenario( + name="case_manual_remote_pid", + kill_mode="manual_remote_pid", + expected_source=ExitStatusSource.MANUAL_KILL_THROUGH_REMOTE_PID, + expected_message_fragment="was terminated by the process manager through the remote pid", + expected_reported_exit_code=0, + ), + ExitMessageScenario( + name="case_manual_ssh_client", + kill_mode="manual_ssh_client", + expected_source=ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT, + expected_message_fragment="was terminated by the process manager through the SSH client", + expected_reported_exit_code=None, + ), + ] + + with tempfile.TemporaryDirectory() as temp_dir: + log_dir = Path(temp_dir) + + callbacks_lock = threading.Lock() + callback_events: dict[str, threading.Event] = {} + callback_statuses: dict[str, ExitStatus] = {} + callback_messages: dict[str, str] = {} + + process_uuids: list[str] = [] + process_info: dict[str, dict] = {} + + def on_exit(cb_uuid: str, exit_status: Optional[ExitStatus], exception): + if exit_status is None: + return + with callbacks_lock: + callback_statuses[cb_uuid] = exit_status + metadata = process_info.get(cb_uuid) + if metadata is not None: + callback_messages[cb_uuid] = ( + exit_status.get_process_manager_log_message( + metadata["name"], + metadata["session"], + metadata["user"], + ) + ) + event = callback_events.get(cb_uuid) + if event is not None: + event.set() + + # Shell manager uses `on_process_exit`; forked wrapper dispatches through + # `_on_process_exit` in the parent process. + if hasattr(ssh_manager, "on_process_exit"): + ssh_manager.on_process_exit = on_exit + if hasattr(ssh_manager, "_on_process_exit"): + ssh_manager._on_process_exit = on_exit + + print("\n=== Booting one process per exit-status scenario ===") + for scenario_config in scenarios: + process_name = scenario_config.name + process_uuid = str(uuid.uuid4()) + log_file = str(log_dir / f"{process_name}.log") + process_uuids.append(process_uuid) + callback_events[process_uuid] = threading.Event() + + boot_request = create_boot_request( + process_name=process_name, + tree_id="this.isan.application", + log_file=log_file, + test_file_path=test_file_path, + ) + + ssh_manager.start_process(uuid=process_uuid, boot_request=boot_request) + + process_info[process_uuid] = { + "name": process_name, + "session": boot_request.process_description.metadata.session, + "user": boot_request.process_description.metadata.user, + "kill_mode": scenario_config.kill_mode, + "scenario": scenario_config, + "log_file": log_file, + } + + print( + f"Executed {process_name} with UUID {process_uuid} " + f"(kill mode: {scenario_config.kill_mode})" + ) + + verify_all_processes_alive(ssh_manager, process_uuids, len(scenarios)) + verify_log_output(ssh_manager, process_uuids, process_info) + + # Wait until metadata is available for all scenarios that require remote PID. + for process_uuid in process_uuids: + kill_mode = process_info[process_uuid]["kill_mode"] + if kill_mode in ("client_sigquit", "client_sigkill", "manual_ssh_client"): + continue + metadata_ready = wait_for( + lambda u=process_uuid: ssh_manager.get_remote_pid(u).successful, + expected_value=True, + timeout=10.0, + poll_interval=0.2, + ) + assert metadata_ready, ( + f"Remote PID not available for process {process_uuid}" + ) + + print("\n=== Triggering all termination paths concurrently ===") + + def trigger_termination(process_uuid: str): + kill_mode = process_info[process_uuid]["kill_mode"] + + if kill_mode == "client_sigquit": + ssh_manager.kill_process_without_metadata( + process_uuid, + signal_name="QUIT", + as_manual_kill=False, + timeout=10.0, + ) + return + + if kill_mode == "client_sigkill": + ssh_manager.kill_process_without_metadata( + process_uuid, + signal_name="KILL", + as_manual_kill=False, + timeout=10.0, + ) + return + + if kill_mode == "remote_sigkill": + ssh_manager.crash_process(process_uuid) + return + + if kill_mode == "manual_remote_pid": + ssh_manager.kill_process(process_uuid, timeout=10.0) + return + + if kill_mode == "manual_ssh_client": + ssh_manager.kill_process_without_metadata( + process_uuid, + signal_name="QUIT", + as_manual_kill=True, + timeout=10.0, + ) + return + + raise RuntimeError(f"Unhandled kill mode: {kill_mode}") + + with ThreadPoolExecutor(max_workers=len(process_uuids)) as executor: + futures = [ + executor.submit(trigger_termination, process_uuid) + for process_uuid in process_uuids + ] + for future in futures: + future.result() + + print("\n=== Waiting for and validating on_process_exit messages ===") + for process_uuid in process_uuids: + callback_fired = callback_events[process_uuid].wait(timeout=15.0) + assert callback_fired, ( + f"Exit callback did not fire for {process_info[process_uuid]['name']}" + ) + + exit_status = callback_statuses.get(process_uuid) + assert exit_status is not None, ( + f"No ExitStatus captured for {process_info[process_uuid]['name']}" + ) + + scenario: ExitMessageScenario = process_info[process_uuid]["scenario"] + emitted_message = callback_messages[process_uuid] + + assert exit_status.get_source() is scenario.expected_source, ( + f"Unexpected source for {process_info[process_uuid]['name']}: " + f"got {exit_status.get_source()}, expected {scenario.expected_source}" + ) + + assert scenario.expected_message_fragment in emitted_message, ( + f"Unexpected emitted message for {process_info[process_uuid]['name']}: " + f"{emitted_message}" + ) + + if scenario.expected_reported_exit_code is not None: + assert ( + exit_status.get_reported_exit_code() + == scenario.expected_reported_exit_code + ), ( + f"Unexpected reported exit code for " + f"{process_info[process_uuid]['name']}: " + f"got {exit_status.get_reported_exit_code()}, expected " + f"{scenario.expected_reported_exit_code}" + ) + + # Ensure all resources are cleaned up even for paths that intentionally + # bypass normal cleanup (e.g. crash_process and direct client signals). + ssh_manager.kill_all_processes( + process_timeouts={process_uuid: 10.0 for process_uuid in process_uuids} + ) + verify_cleanup_complete(ssh_manager) + + def verify_log_output(ssh_manager, process_uuids, process_info, timeout=10.0): """ Verify that all processes have generated expected log output. diff --git a/tests/processes/test_ssh_process_lifetime_manager_forked_shell.py b/tests/processes/test_ssh_process_lifetime_manager_forked_shell.py index f16932954..458838eae 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_forked_shell.py +++ b/tests/processes/test_ssh_process_lifetime_manager_forked_shell.py @@ -8,6 +8,7 @@ from tests.processes.test_ssh_process_lifetime_manager_common import ( boot_processes_and_kill_individually, boot_processes_and_terminate_all_same_role, + boot_processes_and_verify_exit_state_messages, create_boot_request, verify_all_processes_alive, verify_all_processes_dead, @@ -39,6 +40,16 @@ def test_ssh_terminate_all_same_role_forked(ssh_manager_forked): boot_processes_and_terminate_all_same_role(ssh_manager_forked, Path(__file__)) +def test_ssh_exit_status_messages_for_kill_paths_forked(ssh_manager_forked): + """ + Verify forked shell manager emits correct exit-status messages for all kill paths. + + Boots one process per exit-status scenario, triggers all required kill modes + concurrently, and validates callback-delivered ExitStatus source and message. + """ + boot_processes_and_verify_exit_state_messages(ssh_manager_forked, Path(__file__)) + + def boot_processes_and_terminate_all_different_role_forked(test_file_path): """ Execute SSH processes with different roles via the forked manager and verify diff --git a/tests/processes/test_ssh_process_lifetime_manager_shell.py b/tests/processes/test_ssh_process_lifetime_manager_shell.py index dac148ec2..27b091093 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_shell.py +++ b/tests/processes/test_ssh_process_lifetime_manager_shell.py @@ -4,6 +4,7 @@ boot_processes_and_kill_individually, boot_processes_and_terminate_all_different_role, boot_processes_and_terminate_all_same_role, + boot_processes_and_verify_exit_state_messages, ) @@ -36,3 +37,17 @@ def test_ssh_terminate_all_different_role_shell(ssh_manager_shell): and confirms complete cleanup. """ boot_processes_and_terminate_all_different_role(ssh_manager_shell, Path(__file__)) + + +def test_ssh_exit_status_messages_for_kill_paths_shell(ssh_manager_shell): + """ + Verify shell manager emits correct exit-status messages for all kill paths. + + Boots one process per exit-status scenario, triggers all required kill modes + concurrently, and validates callback-delivered ExitStatus source and message. + """ + + boot_processes_and_verify_exit_state_messages( + ssh_manager=ssh_manager_shell, + test_file_path=Path(__file__), + ) From 0787789f5ecab7510182f56381cdd0d8e5a22f25 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Tue, 28 Apr 2026 17:25:48 +0100 Subject: [PATCH 04/15] improve logging cleanup testing code --- src/drunc/processes/exit_status.py | 2 +- .../ssh_process_lifetime_manager_shell.py | 4 +- ...est_ssh_process_lifetime_manager_common.py | 46 +++++++------------ 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/src/drunc/processes/exit_status.py b/src/drunc/processes/exit_status.py index fc7b96820..34390da96 100644 --- a/src/drunc/processes/exit_status.py +++ b/src/drunc/processes/exit_status.py @@ -100,7 +100,7 @@ def get_process_manager_log_message( return ( f"Process '{process_name}' (session: '{session}', user: '{user}') " f"{self._message_fragment}. " - f"Reported exit code: {self._reported_exit_code}." + f"Reported exit code: {self._reported_exit_code if self._reported_exit_code is not None else 'N/A (the nature of the shutdown prevented exit code collection)'}." ) def __repr__(self) -> str: diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index c23043cdc..e9e38c102 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -256,7 +256,7 @@ def __init__( self.disable_host_key_check = disable_host_key_check self.disable_localhost_host_key_check = disable_localhost_host_key_check self.log = logger if logger else get_logger(__name__) - self.on_process_exit = on_process_exit + self._on_process_exit = on_process_exit # Create SSH command wrapper self.ssh = sh.Command("/usr/bin/ssh") @@ -415,7 +415,7 @@ def _start_process_watcher( hostname=hostname, user=user, metadata_file=metadata_file, - on_exit=self.on_process_exit, + on_exit=self._on_process_exit, logger=self.log, ) watcher.start() diff --git a/tests/processes/test_ssh_process_lifetime_manager_common.py b/tests/processes/test_ssh_process_lifetime_manager_common.py index 741610215..ceebb4ad0 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_common.py +++ b/tests/processes/test_ssh_process_lifetime_manager_common.py @@ -147,12 +147,7 @@ def on_exit(cb_uuid: str, exit_status: Optional[ExitStatus], exception): if event is not None: event.set() - # Shell manager uses `on_process_exit`; forked wrapper dispatches through - # `_on_process_exit` in the parent process. - if hasattr(ssh_manager, "on_process_exit"): - ssh_manager.on_process_exit = on_exit - if hasattr(ssh_manager, "_on_process_exit"): - ssh_manager._on_process_exit = on_exit + ssh_manager._on_process_exit = on_exit print("\n=== Booting one process per exit-status scenario ===") for scenario_config in scenarios: @@ -208,42 +203,35 @@ def on_exit(cb_uuid: str, exit_status: Optional[ExitStatus], exception): def trigger_termination(process_uuid: str): kill_mode = process_info[process_uuid]["kill_mode"] - if kill_mode == "client_sigquit": - ssh_manager.kill_process_without_metadata( + kill_mode_actions = { + "client_sigquit": lambda: ssh_manager.kill_process_without_metadata( process_uuid, signal_name="QUIT", as_manual_kill=False, timeout=10.0, - ) - return - - if kill_mode == "client_sigkill": - ssh_manager.kill_process_without_metadata( + ), + "client_sigkill": lambda: ssh_manager.kill_process_without_metadata( process_uuid, signal_name="KILL", as_manual_kill=False, timeout=10.0, - ) - return - - if kill_mode == "remote_sigkill": - ssh_manager.crash_process(process_uuid) - return - - if kill_mode == "manual_remote_pid": - ssh_manager.kill_process(process_uuid, timeout=10.0) - return - - if kill_mode == "manual_ssh_client": - ssh_manager.kill_process_without_metadata( + ), + "remote_sigkill": lambda: ssh_manager.crash_process(process_uuid), + "manual_remote_pid": lambda: ssh_manager.kill_process( + process_uuid, timeout=10.0 + ), + "manual_ssh_client": lambda: ssh_manager.kill_process_without_metadata( process_uuid, signal_name="QUIT", as_manual_kill=True, timeout=10.0, - ) - return + ), + } - raise RuntimeError(f"Unhandled kill mode: {kill_mode}") + action = kill_mode_actions.get(kill_mode) + if action is None: + raise RuntimeError(f"Unhandled kill mode: {kill_mode}") + action() with ThreadPoolExecutor(max_workers=len(process_uuids)) as executor: futures = [ From 764bd3bfa62f4339b2085d9a697d395369b71dd9 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Wed, 29 Apr 2026 12:50:11 +0100 Subject: [PATCH 05/15] update tests with new exit status Co-authored-by: Copilot --- .../test_ssh_process_lifetime_manager_common.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/processes/test_ssh_process_lifetime_manager_common.py b/tests/processes/test_ssh_process_lifetime_manager_common.py index ceebb4ad0..885cdf9ec 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_common.py +++ b/tests/processes/test_ssh_process_lifetime_manager_common.py @@ -353,7 +353,9 @@ def verify_all_processes_alive(ssh_manager, process_uuids, expected_count): print(f"āœ“ All {expected_count} processes confirmed alive") -def verify_exit_codes(exit_codes, process_uuids, process_info=None): +def verify_exit_codes( + exit_codes: dict[str, ExitStatus], process_uuids, process_info=None +): """ Verify that all processes terminated with expected exit codes. @@ -367,8 +369,9 @@ def verify_exit_codes(exit_codes, process_uuids, process_info=None): """ print("\n=== Verifying exit codes ===") for process_uuid in process_uuids: - exit_code = exit_codes.get(process_uuid) - assert exit_code is not None, f"No exit code for {process_uuid}" + exit_status = exit_codes.get(process_uuid) + assert exit_status is not None, f"No exit status for {process_uuid}" + exit_code = exit_status.get_reported_exit_code() # Exit code 0 indicates process successfully handled SIGQUIT assert exit_code == 0, f"Unexpected exit code {exit_code} for {process_uuid}" From 4c8e1ecd8b98291c7f41538001430b527648919c Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Wed, 29 Apr 2026 13:09:27 +0100 Subject: [PATCH 06/15] improve clarity of kill_process_without_metadata parameters Co-authored-by: Copilot --- .../processes/ssh_process_lifetime_manager.py | 15 ++++++--------- ...rocess_lifetime_manager_from_forked_process.py | 7 ++++--- .../ssh_process_lifetime_manager_shell.py | 9 +++++---- .../test_ssh_process_lifetime_manager_common.py | 6 +++--- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/drunc/processes/ssh_process_lifetime_manager.py b/src/drunc/processes/ssh_process_lifetime_manager.py index 677b1ada9..a8de20554 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager.py +++ b/src/drunc/processes/ssh_process_lifetime_manager.py @@ -157,23 +157,20 @@ def kill_process_without_metadata( self, uuid: str, signal_name: str = "QUIT", - as_manual_kill: bool = False, + as_manual_pm_kill: bool = True, timeout: float = DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, ) -> Optional[ExitStatus]: """ - Terminate a process via SSH client signalling without relying on remote PID metadata. - - This method is used in two contexts: - 1) As an internal fallback from kill_process when metadata/PID is unavailable. - 2) In tests to emulate SSH-client-driven termination paths (e.g. SIGQUIT/SIGKILL). + Terminate a process via the given signal without having access to metadata or remote PID + Generally this should only be used as a fallback or for testing purposes, kill_process + should be preferred. Args: uuid: Process UUID to terminate. signal_name: Signal to send to the local SSH client process group (e.g. "QUIT" or "KILL"). - as_manual_kill: If True, classify termination as - MANUAL_KILL_THROUGH_SSH_CLIENT. If False, classify as - CLIENT_MONITORING. + as_manual_pm_kill: If True, classify as process-manager initiated kill. + If False, classify as external kill i.e. outside of process manager control timeout: Maximum time to wait for process termination in seconds. Returns: diff --git a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py index 3f718f018..5c3be7bb2 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py @@ -533,7 +533,7 @@ def kill_process_without_metadata( self, uuid: str, signal_name: str = "QUIT", - as_manual_kill: bool = False, + as_manual_pm_kill: bool = True, timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, ) -> Optional[ExitStatus]: """ @@ -542,7 +542,8 @@ def kill_process_without_metadata( Args: uuid: Process UUID to terminate. signal_name: Signal name to send to SSH client process group. - as_manual_kill: If True, classify as process-manager initiated kill. + as_manual_pm_kill: If True, classify as process-manager initiated kill. + If False, classify as external kill i.e. outside of process manager control timeout: Graceful termination timeout in seconds. Returns: @@ -552,7 +553,7 @@ def kill_process_without_metadata( "kill_process_without_metadata", uuid, signal_name, - as_manual_kill, + as_manual_pm_kill, timeout, ) diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index e9e38c102..3181ebe8e 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -1038,7 +1038,7 @@ def kill_process_without_metadata( self, uuid: str, signal_name: str = "QUIT", - as_manual_kill: bool = False, + as_manual_pm_kill: bool = True, timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, ) -> Optional[ExitStatus]: """ @@ -1047,7 +1047,8 @@ def kill_process_without_metadata( Args: uuid: Process UUID to terminate signal_name: Signal to send to SSH client process group (QUIT/KILL) - as_manual_kill: Classify outcome as manual process-manager kill when True + as_manual_pm_kill: If True, classify as process-manager initiated kill. + If False, classify as external kill i.e. outside of process manager control timeout: Maximum time to wait for process termination in seconds Returns: @@ -1064,7 +1065,7 @@ def kill_process_without_metadata( source_for_return = ( ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT - if as_manual_kill + if as_manual_pm_kill else ExitStatusSource.CLIENT_MONITORING ) # Ensure watcher callbacks classify this termination path as requested. @@ -1147,7 +1148,7 @@ def kill_process( return self.kill_process_without_metadata( uuid, signal_name="QUIT", - as_manual_kill=True, + as_manual_pm_kill=True, timeout=timeout, ) diff --git a/tests/processes/test_ssh_process_lifetime_manager_common.py b/tests/processes/test_ssh_process_lifetime_manager_common.py index 885cdf9ec..5a30adde7 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_common.py +++ b/tests/processes/test_ssh_process_lifetime_manager_common.py @@ -207,13 +207,13 @@ def trigger_termination(process_uuid: str): "client_sigquit": lambda: ssh_manager.kill_process_without_metadata( process_uuid, signal_name="QUIT", - as_manual_kill=False, + as_manual_pm_kill=False, timeout=10.0, ), "client_sigkill": lambda: ssh_manager.kill_process_without_metadata( process_uuid, signal_name="KILL", - as_manual_kill=False, + as_manual_pm_kill=False, timeout=10.0, ), "remote_sigkill": lambda: ssh_manager.crash_process(process_uuid), @@ -223,7 +223,7 @@ def trigger_termination(process_uuid: str): "manual_ssh_client": lambda: ssh_manager.kill_process_without_metadata( process_uuid, signal_name="QUIT", - as_manual_kill=True, + as_manual_pm_kill=True, timeout=10.0, ), } From 4c10545d53be860ba2f21a69d11b671c578b8e40 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Thu, 30 Apr 2026 14:28:08 +0100 Subject: [PATCH 07/15] simplify data tracking for ssh processes Co-authored-by: Copilot --- .../ssh_process_lifetime_manager_shell.py | 216 +++++++++--------- src/drunc/processes/ssh_shell_process.py | 64 ++++++ 2 files changed, 175 insertions(+), 105 deletions(-) create mode 100644 src/drunc/processes/ssh_shell_process.py diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index 3181ebe8e..cc032ffc6 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -24,6 +24,7 @@ ProcessLifetimeManager, RemotePidResult, ) +from drunc.processes.ssh_shell_process import RunningSSHProcess from drunc.utils.utils import get_logger @@ -35,7 +36,7 @@ class ProcessWatcherThread(threading.Thread): def __init__( self, uuid: str, - process: sh.RunningCommand, + running_process: RunningSSHProcess, manager: "SSHProcessLifetimeManagerShell", hostname: str, user: str, @@ -50,7 +51,7 @@ def __init__( Args: uuid: Process UUID to monitor - process: sh.RunningCommand instance to monitor + running_process: Runtime model for the process being monitored manager: Parent manager instance for metadata updates hostname: Remote hostname for metadata retrieval user: Remote user for metadata retrieval @@ -60,7 +61,7 @@ def __init__( """ super().__init__(name=f"ShellWatcher-{uuid}", daemon=True) self.uuid = uuid - self.process = process + self.running_process = running_process self.manager = manager self.hostname = hostname self.user = user @@ -95,7 +96,7 @@ def run(self): ) self.logger.debug( f"To manually kill the local SSH client for '{metadata.name}' (UUID: {self.uuid}), run: " - f"kill -9 {self.process.pid}" + f"kill -9 {self.running_process.process.pid}" ) else: # If metadata could not be read, fall back to monitoring SSH client @@ -113,6 +114,14 @@ def run(self): self._monitor_ssh_client() return + if metadata.pid is None: + self.logger.warning( + f"Metadata for process {self.uuid} did not contain a PID. " + f"Falling back to SSH client monitoring." + ) + self._monitor_ssh_client() + return + # Monitor the remote process directly self._monitor_remote_process(metadata.pid) @@ -122,6 +131,9 @@ def _monitor_remote_process(self, remote_pid: int) -> None: Uses SSH to run a blocking command that exits when the process dies. """ + assert self.running_process is not None, ( + "running_process must be set before monitoring" + ) exception = None raw_exit_code = None @@ -136,21 +148,32 @@ def _monitor_remote_process(self, remote_pid: int) -> None: ) # Remote ssh command that will block until process exits - remote_cmd = ( - f"while kill -0 {remote_pid} 2>/dev/null; do sleep 0.1; done; exit 0" - ) + # Output the remote monitoring process PID, then run the monitoring loop + remote_cmd = f"echo $$ && while kill -0 {remote_pid} 2>/dev/null; do sleep 0.1; done; exit 0" arguments.append(remote_cmd) self.__is_monitoring_remotely = True - # This ssh command will block until the remote process exits - self.manager.ssh(*arguments) + # Start the SSH monitoring command in background to capture its PID immediately + monitoring_process = self.manager.ssh(*arguments, _bg=True, _bg_exc=False) + + # Store the local SSH client PID for the monitoring process + self.running_process.client_monitoring_pid = getattr( + monitoring_process, "pid", None + ) + + # block until the monitored process exits + try: + monitoring_process.wait() + except sh.ErrorReturnCode as _: + pass + self.__is_monitoring_remotely = False self.logger.debug( f"Remote process {self.uuid} (PID {remote_pid}) has exited" ) - self.process.wait() - raw_exit_code = self.process.exit_code + self.running_process.process.wait() + raw_exit_code = self.running_process.process.exit_code self.logger.debug( f"SSH client for {self.uuid} exited with code {raw_exit_code}" ) @@ -163,12 +186,10 @@ def _monitor_remote_process(self, remote_pid: int) -> None: exception = e self.logger.error(f"Remote process {self.uuid} watcher error: {e}") - exit_status = ExitStatus( - self.manager._consume_exit_status_source( - self.uuid, ExitStatusSource.REMOTE_MONITORING - ), - raw_exit_code, - ) + with self.manager.lock: + exit_status = self.running_process.finalise_exit( + ExitStatusSource.REMOTE_MONITORING, raw_exit_code + ) # Invoke callback with results if self.on_exit: @@ -188,8 +209,8 @@ def _monitor_ssh_client(self) -> None: raw_exit_code = None try: - self.process.wait() - raw_exit_code = self.process.exit_code + self.running_process.process.wait() + raw_exit_code = self.running_process.process.exit_code self.logger.debug( f"SSH client for {self.uuid} exited with code {raw_exit_code}" ) @@ -203,12 +224,10 @@ def _monitor_ssh_client(self) -> None: exception = e self.logger.error(f"SSH client for {self.uuid} watcher error: {e}") - exit_status = ExitStatus( - self.manager._consume_exit_status_source( - self.uuid, ExitStatusSource.CLIENT_MONITORING - ), - raw_exit_code, - ) + with self.manager.lock: + exit_status = self.running_process.finalise_exit( + ExitStatusSource.CLIENT_MONITORING, raw_exit_code + ) if self.on_exit: try: @@ -262,36 +281,16 @@ def __init__( self.ssh = sh.Command("/usr/bin/ssh") # Process tracking (one per UUID) - self.process_store: Dict[str, sh.RunningCommand] = {} + self.process_store: Dict[str, RunningSSHProcess] = {} # Thread tracking for monitoring - self.watchers: Dict[str, threading.Thread] = {} + self.watchers: Dict[str, ProcessWatcherThread] = {} # Thread-safe lock for process store modifications self.lock = threading.Lock() # metadata for each process - self.metadata: Dict[str, ProcessMetadata] = {} - - # Override watcher-reported exit sources when kill_process() initiated - # the termination path. - self.manual_exit_status_sources: Dict[str, ExitStatusSource] = {} - - def _set_manual_exit_status_source( - self, uuid: str, source: ExitStatusSource - ) -> None: - with self.lock: - self.manual_exit_status_sources[uuid] = source - - def _clear_manual_exit_status_source(self, uuid: str) -> None: - with self.lock: - self.manual_exit_status_sources.pop(uuid, None) - - def _consume_exit_status_source( - self, uuid: str, default: ExitStatusSource - ) -> ExitStatusSource: - with self.lock: - return self.manual_exit_status_sources.pop(uuid, default) + self.metadata: Dict[str, Optional[ProcessMetadata]] = {} @staticmethod def get_metadata_file_path(uuid: str) -> str: @@ -332,6 +331,9 @@ def get_remote_pid(self, uuid: str) -> RemotePidResult: been written or could not be read. """ with self.lock: + running_process = self.process_store.get(uuid) + if running_process is not None and running_process.remote_pid is not None: + return RemotePidResult(pid=running_process.remote_pid) metadata = self.metadata.get(uuid) if metadata is None or metadata.pid is None: return RemotePidResult(reason="no metadata") @@ -390,7 +392,7 @@ def start_process(self, uuid: str, boot_request: BootRequest) -> None: def _start_process_watcher( self, uuid: str, - process: sh.RunningCommand, + running_process: RunningSSHProcess, hostname: str, user: str, metadata_file: str, @@ -403,14 +405,14 @@ def _start_process_watcher( Args: uuid: Process UUID - process: sh.RunningCommand to monitor + running_process: Runtime model for process state and handles hostname: Remote hostname for metadata retrieval user: Remote user for metadata retrieval metadata_file: Path to metadata file on remote host """ watcher = ProcessWatcherThread( uuid=uuid, - process=process, + running_process=running_process, manager=self, hostname=hostname, user=user, @@ -435,8 +437,9 @@ def is_process_alive(self, uuid: str) -> bool: if uuid not in self.process_store: return False - process = self.process_store[uuid]["process"] - metadata: ProcessMetadata = self.metadata.get(uuid, None) + running_process = self.process_store[uuid] + process = running_process.process + metadata: Optional[ProcessMetadata] = self.metadata.get(uuid, None) if metadata is None or metadata.pid is None: self.log.debug( f"No metadata or PID found for {uuid}, relying on SSH client process status" @@ -444,8 +447,8 @@ def is_process_alive(self, uuid: str) -> bool: return process.is_alive() remote_process_alive = self._is_remote_process_alive( - self.process_store[uuid]["hostname"], - self.process_store[uuid]["user"], + running_process.hostname, + running_process.user, metadata.pid, uuid, ) @@ -469,7 +472,7 @@ def pop_early_exit_status(self, uuid: str) -> Optional[ExitStatus]: self.log.debug(f"Process {uuid} not found in store for exit code retrieval") return None - process = self.process_store[uuid]["process"] + process = self.process_store[uuid].process if process.is_alive(): return None @@ -512,7 +515,7 @@ def get_process_stdout(self, uuid: str) -> Optional[str]: return None try: - process = self.process_store[uuid]["process"] + process = self.process_store[uuid].process if hasattr(process, "stdout"): stdout_data = process.stdout if stdout_data: @@ -536,7 +539,7 @@ def get_process_stderr(self, uuid: str) -> Optional[str]: return None try: - process = self.process_store[uuid]["process"] + process = self.process_store[uuid].process if hasattr(process, "stderr"): stderr_data = process.stderr if stderr_data: @@ -910,6 +913,11 @@ def read_process_metadata( # Parse JSON content and instantiate metadata object metadata = ProcessMetadata.from_json(json_content) + with self.lock: + running_process = self.process_store.get(uuid) + if running_process is not None: + running_process.populate_from_metadata(metadata) + return metadata except Exception as e: @@ -939,7 +947,7 @@ def _execute_bootrequest_via_ssh( user: str, command: str, log_file: str, - env_vars: Dict[str, str] = None, + env_vars: Optional[Dict[str, str]] = None, ) -> None: """Execute SSH command using sh library.""" try: @@ -993,17 +1001,27 @@ def _execute_bootrequest_via_ssh( _new_session=True, _preexec_fn=on_parent_exit(signal.SIGTERM) if not is_macos else None, ) + assert isinstance(process, sh.RunningCommand), ( + "Expected a RunningCommand instance from sh library" + ) # Store process info with self.lock: - self.process_store[uuid] = { - "process": process, - "hostname": hostname, - "user": user, - } + running_process = RunningSSHProcess( + process=process, + hostname=hostname, + user=user, + ) + self.process_store[uuid] = running_process # Metadata will be populated asynchronously by watcher thread self.metadata[uuid] = None - self._start_process_watcher(uuid, process, hostname, user, metadata_file) + self._start_process_watcher( + uuid, + running_process, + hostname, + user, + metadata_file, + ) self.log.debug(f"SSH command started for {uuid}") except Exception as e: with self.lock: @@ -1011,29 +1029,6 @@ def _execute_bootrequest_via_ssh( del self.process_store[uuid] raise RuntimeError(f"Failed to execute SSH command for {uuid}: {e}") - def _kill_client_process( - self, process_info: Dict, signal_name: str = "QUIT" - ) -> None: - """ - Send a signal to the local SSH client process group. - - The remote process will typically receive a SIGHUP when the SSH client - terminates. - """ - try: - signal_name = signal_name.upper() - if signal_name == "QUIT": - process_info["process"].signal_group(signal.SIGQUIT) - return - if signal_name == "KILL": - process_info["process"].signal_group(signal.SIGKILL) - return - raise ValueError(f"Unsupported signal_name '{signal_name}'") - except Exception as e: - self.log.debug( - f"Exception was raised when terminating SSH client process: {e}" - ) - def kill_process_without_metadata( self, uuid: str, @@ -1055,9 +1050,9 @@ def kill_process_without_metadata( ExitStatus if termination state can be determined, None otherwise """ with self.lock: - process_info = self.process_store.get(uuid) + running_process = self.process_store.get(uuid) - if process_info is None: + if running_process is None: self.log.warning( f"kill_process_without_metadata called for unknown UUID {uuid}" ) @@ -1069,14 +1064,21 @@ def kill_process_without_metadata( else ExitStatusSource.CLIENT_MONITORING ) # Ensure watcher callbacks classify this termination path as requested. - self._set_manual_exit_status_source(uuid, source_for_return) + with self.lock: + running_process.pending_exit_status_source = source_for_return - self._kill_client_process(process_info, signal_name=signal_name) + try: + running_process.kill_client(signal_name=signal_name) + except Exception as e: + self.log.debug( + f"Exception was raised when terminating SSH client process: {e}" + ) exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) self._cleanup_process_resources(uuid) if exit_code is None: - self._clear_manual_exit_status_source(uuid) + with self.lock: + running_process.pending_exit_status_source = None return None return ExitStatus(source_for_return, exit_code) @@ -1096,7 +1098,7 @@ def _wait_for_process_exit_code(self, uuid: str, timeout: float) -> Optional[int with self.lock: if uuid not in self.process_store: return None - process = self.process_store[uuid]["process"] + process = self.process_store[uuid].process def check_exit_status(): return not process.is_alive() @@ -1152,10 +1154,10 @@ def kill_process( timeout=timeout, ) - process_info = self.process_store[uuid] + running_process = self.process_store[uuid] - hostname = process_info["hostname"] - user = process_info["user"] + hostname = running_process.hostname + user = running_process.user metadata_file = SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid) remote_pid = metadata.pid @@ -1174,9 +1176,10 @@ def kill_process( return ExitStatus(ExitStatusSource.REMOTE_MONITORING, exit_code) if not process_dead: - self._set_manual_exit_status_source( - uuid, ExitStatusSource.MANUAL_KILL_THROUGH_REMOTE_PID - ) + with self.lock: + running_process.pending_exit_status_source = ( + ExitStatusSource.MANUAL_KILL_THROUGH_REMOTE_PID + ) self.log.debug(f"Sending SIGQUIT to remote PID {remote_pid}") self._send_remote_signal(hostname, user, remote_pid, "QUIT") process_dead = self.wait_for_process_to_die( @@ -1211,7 +1214,8 @@ def kill_process( self.log.error( f"Remote process {uuid} (PID {remote_pid}) still did not terminate after SIGKILL signal." ) - self._clear_manual_exit_status_source(uuid) + with self.lock: + running_process.pending_exit_status_source = None else: exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) self._cleanup_remote_file(hostname, user, metadata_file) @@ -1223,7 +1227,9 @@ def kill_process( ) except Exception as e: - self._clear_manual_exit_status_source(uuid) + with self.lock: + if uuid in self.process_store: + self.process_store[uuid].pending_exit_status_source = None self.log.error(f"Error terminating remote process {uuid}: {e}") return None @@ -1246,9 +1252,9 @@ def crash_process(self, uuid: str) -> None: self.log.warning(f"crash_process called for unknown UUID {uuid}") return - process_info = self.process_store[uuid] - hostname = process_info["hostname"] - user = process_info["user"] + running_process = self.process_store[uuid] + hostname = running_process.hostname + user = running_process.user metadata = self.metadata.get(uuid, None) if metadata is None or metadata.pid is None: diff --git a/src/drunc/processes/ssh_shell_process.py b/src/drunc/processes/ssh_shell_process.py new file mode 100644 index 000000000..a468b3890 --- /dev/null +++ b/src/drunc/processes/ssh_shell_process.py @@ -0,0 +1,64 @@ +"""Runtime model for a process launched through the SSH shell lifetime manager.""" + +import signal as _signal +from typing import Optional + +import sh + +from drunc.processes.exit_status import ExitStatus, ExitStatusSource +from drunc.processes.process_metadata import ProcessMetadata + + +class RunningSSHProcess: + """Holds runtime state for a process launched via SSH shell.""" + + def __init__(self, process: sh.RunningCommand, hostname: str, user: str) -> None: + """Initialise runtime state for a managed SSH-launched process. + + Args: + process: Background SSH command handle returned by ``sh``. + hostname: Target host used to launch and monitor the remote process. + user: Remote user used for SSH operations. + """ + self.process = process + self.hostname = hostname + self.user = user + self.ssh_client_pid = getattr(self.process, "pid", None) + self.remote_pid: Optional[int] = None + self.exit_status: Optional[ExitStatus] = None + self.pending_exit_status_source: Optional[ExitStatusSource] = None + self.remote_monitoring_pid: Optional[int] = ( + None # PID of watcher SSH process running remote monitoring + ) + self.client_monitoring_pid: Optional[int] = ( + None # PID of watcher SSH process in client monitoring mode + ) + + def populate_from_metadata(self, metadata: ProcessMetadata) -> None: + """Populate runtime fields from asynchronously read process metadata.""" + self.remote_pid = metadata.pid + + def consume_exit_status_source(self, default: ExitStatusSource) -> ExitStatusSource: + if self.pending_exit_status_source is None: + return default + source = self.pending_exit_status_source + self.pending_exit_status_source = None + return source + + def finalise_exit( + self, default_source: ExitStatusSource, raw_exit_code: Optional[int] + ) -> ExitStatus: + """Consume the pending exit source, build an ExitStatus, and store it.""" + source = self.consume_exit_status_source(default_source) + self.exit_status = ExitStatus(source, raw_exit_code) + return self.exit_status + + def kill_client(self, signal_name: str = "QUIT") -> None: + """Send a signal to the local SSH client process group.""" + signal_name = signal_name.upper() + if signal_name == "QUIT": + self.process.signal_group(_signal.SIGQUIT) + elif signal_name == "KILL": + self.process.signal_group(_signal.SIGKILL) + else: + raise ValueError(f"Unsupported signal_name '{signal_name}'") From 0fcf7c3820c604029406a84218eebd0faab4f8b2 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Thu, 7 May 2026 17:26:19 +0100 Subject: [PATCH 08/15] add more robust checks for zombie processes in tests Co-authored-by: Copilot --- ...ss_lifetime_manager_from_forked_process.py | 4 + .../ssh_process_lifetime_manager_shell.py | 19 +++ ...est_ssh_process_lifetime_manager_common.py | 113 +++++++++++++++++- ...h_process_lifetime_manager_forked_shell.py | 9 +- 4 files changed, 139 insertions(+), 6 deletions(-) diff --git a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py index 5c3be7bb2..380b63ae3 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py @@ -703,3 +703,7 @@ def get_remote_pid(self, uuid: str) -> RemotePidResult: why the PID is unavailable (e.g. metadata not yet written). """ return self._call("get_remote_pid", uuid) + + def get_runtime_pids(self, uuid: str) -> Dict[str, Optional[int]]: + """Return best-effort runtime PID snapshot from the child manager.""" + return self._call("get_runtime_pids", uuid) diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index cc032ffc6..8ba2b9634 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -339,6 +339,25 @@ def get_remote_pid(self, uuid: str) -> RemotePidResult: return RemotePidResult(reason="no metadata") return RemotePidResult(pid=metadata.pid) + def get_runtime_pids(self, uuid: str) -> Dict[str, Optional[int]]: + """Return best-effort runtime PID snapshot for a managed process.""" + with self.lock: + running_process = self.process_store.get(uuid) + if running_process is None: + return { + "ssh_client_pid": None, + "remote_pid": None, + "remote_monitoring_pid": None, + "client_monitoring_pid": None, + } + + return { + "ssh_client_pid": running_process.ssh_client_pid, + "remote_pid": running_process.remote_pid, + "remote_monitoring_pid": running_process.remote_monitoring_pid, + "client_monitoring_pid": running_process.client_monitoring_pid, + } + def start_process(self, uuid: str, boot_request: BootRequest) -> None: """ Start a remote process via SSH using the boot request configuration. diff --git a/tests/processes/test_ssh_process_lifetime_manager_common.py b/tests/processes/test_ssh_process_lifetime_manager_common.py index 5a30adde7..12702f0f0 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_common.py +++ b/tests/processes/test_ssh_process_lifetime_manager_common.py @@ -3,6 +3,7 @@ """ import getpass +import os import tempfile import threading import uuid @@ -183,6 +184,8 @@ def on_exit(cb_uuid: str, exit_status: Optional[ExitStatus], exception): verify_all_processes_alive(ssh_manager, process_uuids, len(scenarios)) verify_log_output(ssh_manager, process_uuids, process_info) + pid_snapshots = capture_process_pid_snapshots(ssh_manager, process_uuids) + # Wait until metadata is available for all scenarios that require remote PID. for process_uuid in process_uuids: kill_mode = process_info[process_uuid]["kill_mode"] @@ -282,7 +285,7 @@ def trigger_termination(process_uuid: str): ssh_manager.kill_all_processes( process_timeouts={process_uuid: 10.0 for process_uuid in process_uuids} ) - verify_cleanup_complete(ssh_manager) + verify_cleanup_complete(ssh_manager, pid_snapshots=pid_snapshots) def verify_log_output(ssh_manager, process_uuids, process_info, timeout=10.0): @@ -383,7 +386,98 @@ def verify_exit_codes( print(f"Process {process_uuid}: exit code {exit_code}") -def verify_cleanup_complete(ssh_manager): +def _pid_exists(pid: int) -> bool: + """Return True if PID exists (including zombie), False otherwise.""" + try: + os.kill(pid, 0) + return True + except ProcessLookupError: + return False + except PermissionError: + return True + + +def _pid_state(pid: int) -> Optional[str]: + """Return /proc state letter for PID, or None when unavailable.""" + try: + with open(f"/proc/{pid}/stat", "r", encoding="utf-8") as proc_stat: + fields = proc_stat.read().split() + if len(fields) >= 3: + return fields[2] + except FileNotFoundError: + return None + except ProcessLookupError: + return None + except PermissionError: + return None + return None + + +def capture_process_pid_snapshots( + ssh_manager, process_uuids +) -> dict[str, dict[str, int]]: + """Capture best-effort PID snapshots for processes before kill/cleanup.""" + snapshots: dict[str, dict[str, int]] = {} + get_runtime_pids = getattr(ssh_manager, "get_runtime_pids", None) + + for process_uuid in process_uuids: + pid_snapshot: dict[str, int] = {} + + if callable(get_runtime_pids): + runtime_pids = get_runtime_pids(process_uuid) or {} + for label, pid in runtime_pids.items(): + if isinstance(pid, int): + pid_snapshot[label] = pid + + remote_pid_result = ssh_manager.get_remote_pid(process_uuid) + if remote_pid_result.successful and isinstance(remote_pid_result.pid, int): + pid_snapshot["remote_pid"] = remote_pid_result.pid + + snapshots[process_uuid] = pid_snapshot + + return snapshots + + +def _verify_os_pid_cleanup( + pid_snapshots: dict[str, dict[str, int]], + timeout_per_pid: float = 10.0, +) -> None: + """Verify tracked PIDs fully disappear from the OS and are not zombies.""" + seen_pids: set[int] = set() + checked_count = 0 + + for process_uuid, snapshot in pid_snapshots.items(): + for pid_type, pid in snapshot.items(): + if pid in seen_pids: + continue + seen_pids.add(pid) + + cleaned = wait_for( + lambda p=pid: not _pid_exists(p), + expected_value=True, + timeout=timeout_per_pid, + poll_interval=0.1, + ) + if cleaned: + checked_count += 1 + continue + + state = _pid_state(pid) + if state == "Z": + raise AssertionError( + f"PID {pid} ({pid_type}, process UUID {process_uuid}) still exists as a zombie" + ) + + raise AssertionError( + f"PID {pid} ({pid_type}, process UUID {process_uuid}) still exists after cleanup " + f"with state '{state if state is not None else 'unknown'}'" + ) + + if checked_count: + print(f"āœ“ OS-level cleanup verified for {checked_count} tracked PID(s)") + + +def verify_cleanup_complete(ssh_manager, pid_snapshots=None): """ Verify that all process resources have been cleaned up. @@ -398,6 +492,9 @@ def verify_cleanup_complete(ssh_manager): f"Found {len(active_keys)} active processes after cleanup" ) + if pid_snapshots: + _verify_os_pid_cleanup(pid_snapshots) + def verify_all_processes_dead(ssh_manager, process_uuids, expected_count): """ @@ -469,6 +566,8 @@ def boot_processes_and_kill_individually(ssh_manager, test_file_path): verify_all_processes_alive(ssh_manager, process_uuids, num_processes) verify_log_output(ssh_manager, process_uuids, process_info) + pid_snapshots = capture_process_pid_snapshots(ssh_manager, process_uuids) + exit_codes = {} print("\n=== Terminating all processes ===") for process_uuid in process_uuids: @@ -478,7 +577,7 @@ def boot_processes_and_kill_individually(ssh_manager, test_file_path): verify_all_processes_dead(ssh_manager, process_uuids, num_processes) verify_exit_codes(exit_codes, process_uuids) - verify_cleanup_complete(ssh_manager) + verify_cleanup_complete(ssh_manager, pid_snapshots=pid_snapshots) print( "\nāœ“ Test passed: All processes executed, logged, and cleaned up successfully" @@ -532,6 +631,8 @@ def boot_processes_and_terminate_all_same_role(ssh_manager, test_file_path): verify_all_processes_alive(ssh_manager, process_uuids, num_processes) verify_log_output(ssh_manager, process_uuids, process_info) + pid_snapshots = capture_process_pid_snapshots(ssh_manager, process_uuids) + print(f"\n=== Terminating all processes with role '{role}' ===") exit_codes = ssh_manager.kill_processes( process_uuids, process_timeouts={uuid: 10.0 for uuid in process_uuids} @@ -539,7 +640,7 @@ def boot_processes_and_terminate_all_same_role(ssh_manager, test_file_path): verify_all_processes_dead(ssh_manager, process_uuids, num_processes) verify_exit_codes(exit_codes, process_uuids, process_info) - verify_cleanup_complete(ssh_manager) + verify_cleanup_complete(ssh_manager, pid_snapshots=pid_snapshots) print( f"\nāœ“ Test passed: All {num_processes} processes with role '{role}' " @@ -617,6 +718,8 @@ def boot_processes_and_terminate_all_different_role(ssh_manager, test_file_path) verify_all_processes_alive(ssh_manager, process_uuids, len(process_configs)) verify_log_output(ssh_manager, process_uuids, process_info) + pid_snapshots = capture_process_pid_snapshots(ssh_manager, process_uuids) + print("\n=== Terminating all processes (role-based shutdown) ===") def monitor_termination(uuid_to_monitor): @@ -700,7 +803,7 @@ def monitor_termination(uuid_to_monitor): "'application' before 'segment-controller'" ) - verify_cleanup_complete(ssh_manager) + verify_cleanup_complete(ssh_manager, pid_snapshots=pid_snapshots) print( "\nāœ“ Test passed: Processes with different roles executed, logged, " diff --git a/tests/processes/test_ssh_process_lifetime_manager_forked_shell.py b/tests/processes/test_ssh_process_lifetime_manager_forked_shell.py index 458838eae..e72f41817 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_forked_shell.py +++ b/tests/processes/test_ssh_process_lifetime_manager_forked_shell.py @@ -9,6 +9,7 @@ boot_processes_and_kill_individually, boot_processes_and_terminate_all_same_role, boot_processes_and_verify_exit_state_messages, + capture_process_pid_snapshots, create_boot_request, verify_all_processes_alive, verify_all_processes_dead, @@ -148,6 +149,8 @@ def on_exit(cb_uuid: str, exit_code, exception): verify_all_processes_alive(manager, process_uuids, len(process_configs)) verify_log_output(manager, process_uuids, process_info) + pid_snapshots = capture_process_pid_snapshots(manager, process_uuids) + print("\n=== Terminating all processes (role-based shutdown, forked) ===") exit_codes = manager.kill_processes( process_uuids, @@ -191,7 +194,7 @@ def on_exit(cb_uuid: str, exit_code, exception): "'application' before 'segment-controller'" ) - verify_cleanup_complete(manager) + verify_cleanup_complete(manager, pid_snapshots=pid_snapshots) finally: manager.shutdown() @@ -291,6 +294,8 @@ def on_exit(cb_uuid: str, exit_code, exception): ) assert process_alive, "Process should be alive before kill" + pid_snapshots = capture_process_pid_snapshots(manager, [process_uuid]) + manager.kill_process(process_uuid, timeout=10.0) # Allow time for the exit event to propagate from child to parent. @@ -305,5 +310,7 @@ def on_exit(cb_uuid: str, exit_code, exception): assert callback_results.get("exit_code") is not None, ( "Callback should receive a non-None exit code" ) + + verify_cleanup_complete(manager, pid_snapshots=pid_snapshots) finally: manager.shutdown() From 61f0a15f6ee8bba8beeb685c002a68eb7ab2139a Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Fri, 8 May 2026 12:54:56 +0100 Subject: [PATCH 09/15] use seperate threads for watching ssh client and the remote process for a simpler tracking model Co-authored-by: Copilot --- .../processes/ssh_process_lifetime_manager.py | 4 +- ...ss_lifetime_manager_from_forked_process.py | 2 +- .../ssh_process_lifetime_manager_shell.py | 338 ++++++++++++++---- src/drunc/processes/ssh_shell_process.py | 13 +- ...est_ssh_process_lifetime_manager_common.py | 6 +- 5 files changed, 278 insertions(+), 85 deletions(-) diff --git a/src/drunc/processes/ssh_process_lifetime_manager.py b/src/drunc/processes/ssh_process_lifetime_manager.py index a8de20554..9be5484a3 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager.py +++ b/src/drunc/processes/ssh_process_lifetime_manager.py @@ -156,7 +156,7 @@ def kill_process( def kill_process_without_metadata( self, uuid: str, - signal_name: str = "QUIT", + signal_name: str = "KILL", as_manual_pm_kill: bool = True, timeout: float = DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, ) -> Optional[ExitStatus]: @@ -168,7 +168,7 @@ def kill_process_without_metadata( Args: uuid: Process UUID to terminate. signal_name: Signal to send to the local SSH client process group - (e.g. "QUIT" or "KILL"). + (e.g. "KILL" or "QUIT"). Defaults to "KILL". as_manual_pm_kill: If True, classify as process-manager initiated kill. If False, classify as external kill i.e. outside of process manager control timeout: Maximum time to wait for process termination in seconds. diff --git a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py index 380b63ae3..d57e899bb 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py @@ -532,7 +532,7 @@ def kill_process( def kill_process_without_metadata( self, uuid: str, - signal_name: str = "QUIT", + signal_name: str = "KILL", as_manual_pm_kill: bool = True, timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, ) -> Optional[ExitStatus]: diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index 8ba2b9634..43cd098a9 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -99,35 +99,32 @@ def run(self): f"kill -9 {self.running_process.process.pid}" ) else: - # If metadata could not be read, fall back to monitoring SSH client self.logger.warning( f"Failed to retrieve metadata for process {self.uuid}. " - f"Falling back to SSH client monitoring." + f"Remote process monitoring will not be started." ) - self._monitor_ssh_client() return except Exception as e: self.logger.warning( f"Exception reading metadata for process {self.uuid}: {e}. " - f"Falling back to SSH client monitoring." + f"Remote process monitoring will not be started." ) - self._monitor_ssh_client() return if metadata.pid is None: self.logger.warning( f"Metadata for process {self.uuid} did not contain a PID. " - f"Falling back to SSH client monitoring." + f"Remote process monitoring will not be started." ) - self._monitor_ssh_client() return # Monitor the remote process directly + self.manager._register_remote_process_watcher(self.uuid, self) self._monitor_remote_process(metadata.pid) def _monitor_remote_process(self, remote_pid: int) -> None: """ - Monitor remote process by polling until PID disappears. + Monitor remote process until the remote PID disappears. Uses SSH to run a blocking command that exits when the process dies. """ @@ -153,27 +150,31 @@ def _monitor_remote_process(self, remote_pid: int) -> None: arguments.append(remote_cmd) self.__is_monitoring_remotely = True - # Start the SSH monitoring command in background to capture its PID immediately monitoring_process = self.manager.ssh(*arguments, _bg=True, _bg_exc=False) + assert isinstance(monitoring_process, sh.RunningCommand), ( + "Expected remote monitoring process to be a RunningCommand instance" + ) - # Store the local SSH client PID for the monitoring process - self.running_process.client_monitoring_pid = getattr( + self.running_process.remote_monitoring_pid = getattr( monitoring_process, "pid", None ) - # block until the monitored process exits try: monitoring_process.wait() - except sh.ErrorReturnCode as _: - pass + except sh.ErrorReturnCode as remote_monitor_error: + exception = remote_monitor_error self.__is_monitoring_remotely = False self.logger.debug( f"Remote process {self.uuid} (PID {remote_pid}) has exited" ) - self.running_process.process.wait() - raw_exit_code = self.running_process.process.exit_code + raw_exit_code, client_exception = self.manager._wait_and_get_exit_code( + self.running_process.process, + f"SSH client for {self.uuid}", + ) + if client_exception is not None and exception is None: + exception = client_exception self.logger.debug( f"SSH client for {self.uuid} exited with code {raw_exit_code}" ) @@ -186,65 +187,98 @@ def _monitor_remote_process(self, remote_pid: int) -> None: exception = e self.logger.error(f"Remote process {self.uuid} watcher error: {e}") - with self.manager.lock: - exit_status = self.running_process.finalise_exit( - ExitStatusSource.REMOTE_MONITORING, raw_exit_code - ) - - # Invoke callback with results - if self.on_exit: - try: - self.on_exit(self.uuid, exit_status, exception) - except Exception as callback_error: - self.logger.error( - f"Error in process exit callback for {self.uuid}: {callback_error}" - ) + self.manager._emit_exit_callback_once( + self.uuid, + self.running_process, + ExitStatusSource.REMOTE_MONITORING, + raw_exit_code, + exception, + ) - def _monitor_ssh_client(self) -> None: + def is_monitoring_remotely(self) -> bool: """ - Monitor the SSH client process until it stops, this can be used as a - fallback if the remote PID of the process is unavailable. + Check if the watcher is monitoring the remote process directly. + + Returns: + True if monitoring remote process, False if monitoring SSH client """ - exception = None - raw_exit_code = None + return self.__is_monitoring_remotely - try: - self.running_process.process.wait() - raw_exit_code = self.running_process.process.exit_code - self.logger.debug( - f"SSH client for {self.uuid} exited with code {raw_exit_code}" - ) - except sh.ErrorReturnCode as e: - exception = e - raw_exit_code = e.exit_code - self.logger.debug(f"SSH client for {self.uuid} error: {e}") +class SSHClientWatcherThread(threading.Thread): + """Thread that monitors the local SSH client and classifies its exit.""" - except Exception as e: - exception = e - self.logger.error(f"SSH client for {self.uuid} watcher error: {e}") + def __init__( + self, + uuid: str, + running_process: RunningSSHProcess, + manager: "SSHProcessLifetimeManagerShell", + hostname: str, + user: str, + metadata_file: str, + logger: logging.Logger, + ): + super().__init__(name=f"ShellClientWatcher-{uuid}", daemon=True) + self.uuid = uuid + self.running_process = running_process + self.manager = manager + self.hostname = hostname + self.user = user + self.metadata_file = metadata_file + self.logger = logger - with self.manager.lock: - exit_status = self.running_process.finalise_exit( - ExitStatusSource.CLIENT_MONITORING, raw_exit_code - ) + def run(self) -> None: + self._monitor_ssh_client() - if self.on_exit: - try: - self.on_exit(self.uuid, exit_status, exception) - except Exception as callback_error: - self.logger.error( - f"Error in process exit callback for {self.uuid}: {callback_error}" - ) + def _monitor_ssh_client(self) -> None: + """Monitor the SSH client process until it stops.""" + exception = None + raw_exit_code = None - def is_monitoring_remotely(self) -> bool: - """ - Check if the watcher is monitoring the remote process directly. + raw_exit_code, exception = self.manager._wait_and_get_exit_code( + self.running_process.process, + f"SSH client for {self.uuid}", + ) - Returns: - True if monitoring remote process, False if monitoring SSH client - """ - return self.__is_monitoring_remotely + default_source = ExitStatusSource.CLIENT_MONITORING + remote_pid = self.running_process.remote_pid + + # External SIGQUIT hit the SSH client unexpectedly; trigger remote-PID failsafe cleanup. + if ( + self.running_process.pending_exit_status_source is None + and raw_exit_code == 255 + ): + self.manager._handle_external_client_sigquit( + self.uuid, + self.hostname, + self.user, + remote_pid, + self.metadata_file, + ) + # Client exited after the remote PID is dead, so classify this as remote-driven termination. + # We shouldn't hit this path in normal operation as the remote watcher should trigger first + elif ( + self.running_process.pending_exit_status_source is None + and remote_pid is not None + and not self.manager._is_remote_pid_alive( + self.hostname, + self.user, + remote_pid, + ) + ): + self.logger.warning( + f"Unusual Shutdown: remote process uuid={self.uuid} is dead but SSH client exited before remote watcher callback. " + f"The remote watcher PID is: {self.running_process.remote_monitoring_pid}" + ) + default_source = ExitStatusSource.REMOTE_MONITORING + + self.manager._emit_exit_callback_once( + self.uuid, + self.running_process, + default_source, + raw_exit_code, + exception, + ) class SSHProcessLifetimeManagerShell(ProcessLifetimeManager): @@ -284,7 +318,8 @@ def __init__( self.process_store: Dict[str, RunningSSHProcess] = {} # Thread tracking for monitoring - self.watchers: Dict[str, ProcessWatcherThread] = {} + self.client_watchers: Dict[str, SSHClientWatcherThread] = {} + self.remote_process_watchers: Dict[str, ProcessWatcherThread] = {} # Thread-safe lock for process store modifications self.lock = threading.Lock() @@ -429,6 +464,19 @@ def _start_process_watcher( user: Remote user for metadata retrieval metadata_file: Path to metadata file on remote host """ + client_watcher = SSHClientWatcherThread( + uuid=uuid, + running_process=running_process, + manager=self, + hostname=hostname, + user=user, + metadata_file=metadata_file, + logger=self.log, + ) + client_watcher.start() + with self.lock: + self.client_watchers[uuid] = client_watcher + watcher = ProcessWatcherThread( uuid=uuid, running_process=running_process, @@ -440,8 +488,40 @@ def _start_process_watcher( logger=self.log, ) watcher.start() + + def _register_remote_process_watcher( + self, + uuid: str, + watcher: ProcessWatcherThread, + ) -> None: + """Track the remote-process watcher once metadata is available.""" with self.lock: - self.watchers[uuid] = watcher + self.remote_process_watchers[uuid] = watcher + + def _emit_exit_callback_once( + self, + uuid: str, + running_process: RunningSSHProcess, + default_source: ExitStatusSource, + raw_exit_code: Optional[int], + exception: Optional[Exception], + ) -> None: + """Publish the first exit observation across concurrent watcher threads.""" + with self.lock: + exit_status, should_emit = running_process.finalise_exit_once( + default_source, + raw_exit_code, + ) + + if not should_emit or self._on_process_exit is None: + return + + try: + self._on_process_exit(uuid, exit_status, exception) + except Exception as callback_error: + self.log.error( + f"Error in process exit callback for {uuid}: {callback_error}" + ) def is_process_alive(self, uuid: str) -> bool: """ @@ -668,7 +748,7 @@ def kill_processes( process_timeouts[uuid] = self.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS all_exit_statuses: Dict[str, Optional[ExitStatus]] = {} - killed_uuids = set() + killed_uuids: set[str] = set() # Execute role-based shutdown in stages for role in PROCESS_SHUTDOWN_ORDERING: @@ -752,7 +832,9 @@ def kill_all_processes( # Wait for all watcher threads to complete with self.lock: - watchers_to_join = list(self.watchers.values()) + watchers_to_join = list(self.client_watchers.values()) + list( + self.remote_process_watchers.values() + ) for watcher in watchers_to_join: try: @@ -761,7 +843,8 @@ def kill_all_processes( pass with self.lock: - self.watchers.clear() + self.client_watchers.clear() + self.remote_process_watchers.clear() return all_exit_statuses @@ -943,6 +1026,69 @@ def read_process_metadata( self.log.debug(f"Failed to read metadata for {uuid}: {e}") return None + def _wait_and_get_exit_code( + self, + process: sh.RunningCommand, + process_description: str, + ) -> tuple[Optional[int], Optional[Exception]]: + """Wait for a sh process and return its exit code plus any raised exception.""" + try: + process.wait() + exit_code = process.exit_code + self.log.debug(f"{process_description} exited with code {exit_code}") + return exit_code, None + except sh.ErrorReturnCode as e: + self.log.debug(f"{process_description} error: {e}") + return e.exit_code, e + except Exception as e: + self.log.error(f"{process_description} watcher error: {e}") + return None, e + + def _handle_external_client_sigquit( + self, + uuid: str, + hostname: str, + user: str, + remote_pid: Optional[int], + metadata_file: str, + timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, + ) -> None: + """React to an externally delivered SIGQUIT on the SSH client.""" + if remote_pid is None: + self.log.warning( + f"SSH client for {uuid} received SIGQUIT without remote PID metadata. " + f"Remote cleanup cannot be guaranteed." + ) + return + + self.log.info( + f"SSH client for {uuid} received external SIGQUIT. " + f"Enforcing remote cleanup through PID {remote_pid}." + ) + + if self._is_remote_pid_alive(hostname, user, remote_pid): + self.log.info( + f"Remote process {uuid} (PID {remote_pid}) is still alive after SSH client SIGQUIT. " + f"Sending SIGKILL via remote PID as failsafe." + ) + self._send_remote_signal(hostname, user, remote_pid, "KILL") + remote_dead = self._wait_for_remote_pid_exit( + hostname, + user, + remote_pid, + timeout=timeout, + ) + if not remote_dead: + self.log.error( + f"Remote process {uuid} (PID {remote_pid}) did not exit after remote SIGKILL failsafe." + ) + else: + self.log.info( + f"Remote process {uuid} (PID {remote_pid}) had already exited after SSH client SIGQUIT." + ) + + self._cleanup_remote_file(hostname, user, metadata_file) + def _ssh_client_stderr_logger(self, chunk): """Filter the logging of an SSH client stderr to the appropriate log level @@ -1051,7 +1197,7 @@ def _execute_bootrequest_via_ssh( def kill_process_without_metadata( self, uuid: str, - signal_name: str = "QUIT", + signal_name: str = "KILL", as_manual_pm_kill: bool = True, timeout: float = ProcessLifetimeManager.DEFAULT_TIMEOUT_FOR_KILLING_PROCESS, ) -> Optional[ExitStatus]: @@ -1070,6 +1216,7 @@ def kill_process_without_metadata( """ with self.lock: running_process = self.process_store.get(uuid) + metadata = self.metadata.get(uuid) if running_process is None: self.log.warning( @@ -1077,6 +1224,8 @@ def kill_process_without_metadata( ) return None + signal_name = signal_name.upper() + source_for_return = ( ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT if as_manual_pm_kill @@ -1092,14 +1241,28 @@ def kill_process_without_metadata( self.log.debug( f"Exception was raised when terminating SSH client process: {e}" ) + exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) - self._cleanup_process_resources(uuid) + + if exit_code is None and signal_name == "QUIT" and not as_manual_pm_kill: + remote_pid = metadata.pid if metadata is not None else None + self._handle_external_client_sigquit( + uuid, + running_process.hostname, + running_process.user, + remote_pid, + SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid), + timeout=timeout, + ) + exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) if exit_code is None: with self.lock: running_process.pending_exit_status_source = None return None + self._cleanup_process_resources(uuid) + return ExitStatus(source_for_return, exit_code) def _wait_for_process_exit_code(self, uuid: str, timeout: float) -> Optional[int]: @@ -1168,7 +1331,6 @@ def kill_process( ) return self.kill_process_without_metadata( uuid, - signal_name="QUIT", as_manual_pm_kill=True, timeout=timeout, ) @@ -1296,8 +1458,10 @@ def _cleanup_process_resources(self, uuid: str) -> None: del self.process_store[uuid] if uuid in self.metadata: del self.metadata[uuid] - if uuid in self.watchers: - del self.watchers[uuid] + if uuid in self.client_watchers: + del self.client_watchers[uuid] + if uuid in self.remote_process_watchers: + del self.remote_process_watchers[uuid] def _send_remote_signal( self, hostname: str, user: str, pid: int, signal_name: str @@ -1329,13 +1493,16 @@ def _is_remote_process_alive( True if process is alive, False otherwise """ with self.lock: - watcher = self.watchers.get(uuid) + watcher = self.remote_process_watchers.get(uuid) if watcher and watcher.is_alive() and watcher.is_monitoring_remotely(): # Watcher is blocking on remote process, so remote process must be alive return True - # Verify remote process via SSH (requires another connection) + return self._is_remote_pid_alive(hostname, user, pid) + + def _is_remote_pid_alive(self, hostname: str, user: str, pid: int) -> bool: + """Check whether a remote PID exists using a direct SSH probe.""" try: user_host = f"{user}@{hostname}" arguments = self._build_ssh_arguments(hostname, user_host) @@ -1345,6 +1512,23 @@ def _is_remote_process_alive( except Exception: return False + def _wait_for_remote_pid_exit( + self, + hostname: str, + user: str, + pid: int, + timeout: float, + ) -> bool: + """Wait until a remote PID disappears.""" + return bool( + wait_for( + lambda: not self._is_remote_pid_alive(hostname, user, pid), + expected_value=True, + timeout=timeout, + poll_interval=0.2, + ) + ) + def _cleanup_remote_file(self, hostname: str, user: str, remote_file: str) -> None: """Remove remote file via SSH.""" try: diff --git a/src/drunc/processes/ssh_shell_process.py b/src/drunc/processes/ssh_shell_process.py index a468b3890..bca339fd5 100644 --- a/src/drunc/processes/ssh_shell_process.py +++ b/src/drunc/processes/ssh_shell_process.py @@ -1,6 +1,7 @@ """Runtime model for a process launched through the SSH shell lifetime manager.""" import signal as _signal +import threading from typing import Optional import sh @@ -33,6 +34,7 @@ def __init__(self, process: sh.RunningCommand, hostname: str, user: str) -> None self.client_monitoring_pid: Optional[int] = ( None # PID of watcher SSH process in client monitoring mode ) + self._exit_status_lock = threading.Lock() def populate_from_metadata(self, metadata: ProcessMetadata) -> None: """Populate runtime fields from asynchronously read process metadata.""" @@ -53,7 +55,16 @@ def finalise_exit( self.exit_status = ExitStatus(source, raw_exit_code) return self.exit_status - def kill_client(self, signal_name: str = "QUIT") -> None: + def finalise_exit_once( + self, default_source: ExitStatusSource, raw_exit_code: Optional[int] + ) -> tuple[ExitStatus, bool]: + """Set exit status once across concurrent watcher threads.""" + with self._exit_status_lock: + if self.exit_status is not None: + return self.exit_status, False + return self.finalise_exit(default_source, raw_exit_code), True + + def kill_client(self, signal_name: str = "KILL") -> None: """Send a signal to the local SSH client process group.""" signal_name = signal_name.upper() if signal_name == "QUIT": diff --git a/tests/processes/test_ssh_process_lifetime_manager_common.py b/tests/processes/test_ssh_process_lifetime_manager_common.py index 12702f0f0..c6c0a7a20 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_common.py +++ b/tests/processes/test_ssh_process_lifetime_manager_common.py @@ -114,7 +114,7 @@ def boot_processes_and_verify_exit_state_messages( name="case_manual_ssh_client", kill_mode="manual_ssh_client", expected_source=ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT, - expected_message_fragment="was terminated by the process manager through the SSH client", + expected_message_fragment="was terminated with a SIGKILL by the process manager through the SSH client", expected_reported_exit_code=None, ), ] @@ -189,7 +189,7 @@ def on_exit(cb_uuid: str, exit_status: Optional[ExitStatus], exception): # Wait until metadata is available for all scenarios that require remote PID. for process_uuid in process_uuids: kill_mode = process_info[process_uuid]["kill_mode"] - if kill_mode in ("client_sigquit", "client_sigkill", "manual_ssh_client"): + if kill_mode in ("client_sigkill", "manual_ssh_client"): continue metadata_ready = wait_for( lambda u=process_uuid: ssh_manager.get_remote_pid(u).successful, @@ -215,7 +215,6 @@ def trigger_termination(process_uuid: str): ), "client_sigkill": lambda: ssh_manager.kill_process_without_metadata( process_uuid, - signal_name="KILL", as_manual_pm_kill=False, timeout=10.0, ), @@ -225,7 +224,6 @@ def trigger_termination(process_uuid: str): ), "manual_ssh_client": lambda: ssh_manager.kill_process_without_metadata( process_uuid, - signal_name="QUIT", as_manual_pm_kill=True, timeout=10.0, ), From 8f8f85925709345fd37be2bf8e569bcfb3f6d0c8 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Fri, 8 May 2026 12:59:51 +0100 Subject: [PATCH 10/15] add clarification to docstring --- src/drunc/processes/ssh_process_lifetime_manager_shell.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index 43cd098a9..2e31969fc 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -1203,6 +1203,7 @@ def kill_process_without_metadata( ) -> Optional[ExitStatus]: """ Terminate process by signalling the local SSH client without using remote metadata. + Prefer kill_process(..) to this method, this is mainly intended to help with testing Args: uuid: Process UUID to terminate From 67f7524d3b2e815450762811b1084513caa32fbf Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Fri, 8 May 2026 13:02:30 +0100 Subject: [PATCH 11/15] clarify comments --- src/drunc/processes/ssh_process_lifetime_manager_shell.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index 2e31969fc..9d166f207 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -256,7 +256,6 @@ def _monitor_ssh_client(self) -> None: self.metadata_file, ) # Client exited after the remote PID is dead, so classify this as remote-driven termination. - # We shouldn't hit this path in normal operation as the remote watcher should trigger first elif ( self.running_process.pending_exit_status_source is None and remote_pid is not None @@ -266,10 +265,6 @@ def _monitor_ssh_client(self) -> None: remote_pid, ) ): - self.logger.warning( - f"Unusual Shutdown: remote process uuid={self.uuid} is dead but SSH client exited before remote watcher callback. " - f"The remote watcher PID is: {self.running_process.remote_monitoring_pid}" - ) default_source = ExitStatusSource.REMOTE_MONITORING self.manager._emit_exit_callback_once( From 32692b9a54975dbdb42b50f3f1e8e5ea11b96bd3 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Fri, 8 May 2026 13:15:40 +0100 Subject: [PATCH 12/15] add reaping trap Co-authored-by: Copilot --- .../processes/ssh_process_lifetime_manager_shell.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index 9d166f207..88324a2f1 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -1145,6 +1145,7 @@ def _execute_bootrequest_via_ssh( remote_cmd += ( f"mkdir -p ${{XDG_RUNTIME_DIR:-/tmp}}/drunc ; " f"{command} &> {log_file} & PID=$! ; " + f"trap 'kill -HUP $PID 2>/dev/null || true; wait $PID 2>/dev/null || true' HUP TERM INT QUIT ; " f"echo '{remote_metadata_json}' > {metadata_file} ; " f"wait $PID" ) @@ -1498,11 +1499,18 @@ def _is_remote_process_alive( return self._is_remote_pid_alive(hostname, user, pid) def _is_remote_pid_alive(self, hostname: str, user: str, pid: int) -> bool: - """Check whether a remote PID exists using a direct SSH probe.""" + """Check whether a remote PID is alive (not exited and not zombie).""" try: user_host = f"{user}@{hostname}" arguments = self._build_ssh_arguments(hostname, user_host) - arguments.extend([f"[ -d /proc/{pid} ]"]) + arguments.extend( + [ + ( + f"test -d /proc/{pid} && " + f'[ "$(awk \'{{print $3}}\' /proc/{pid}/stat 2>/dev/null)" != "Z" ]' + ) + ] + ) self.ssh(*arguments) return True except Exception: From 8b5d6800097925378b90954d5273938af08fbf9c Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Wed, 13 May 2026 13:19:09 +0100 Subject: [PATCH 13/15] add another exit test case fix exit logic for unexpected process kill Co-authored-by: Copilot --- .../process_manager/ssh_process_manager.py | 2 +- .../processes/ssh_process_lifetime_manager.py | 5 +- ...ss_lifetime_manager_from_forked_process.py | 9 ++- .../ssh_process_lifetime_manager_shell.py | 74 +++++++++---------- ...est_ssh_process_lifetime_manager_common.py | 15 +++- 5 files changed, 57 insertions(+), 48 deletions(-) diff --git a/src/drunc/process_manager/ssh_process_manager.py b/src/drunc/process_manager/ssh_process_manager.py index 9a8708322..835697f34 100644 --- a/src/drunc/process_manager/ssh_process_manager.py +++ b/src/drunc/process_manager/ssh_process_manager.py @@ -612,7 +612,7 @@ def _crash_processes(self, uuids: list) -> ProcessInstanceList: self.log.info( f"Simulating crash of process {this_uuid} (sending SIGKILL, no cleanup)." ) - self.ssh_lifetime_manager.crash_process(this_uuid) + self.ssh_lifetime_manager.crash_process(this_uuid, signal="KILL") ret = [ self._build_process_instance( diff --git a/src/drunc/processes/ssh_process_lifetime_manager.py b/src/drunc/processes/ssh_process_lifetime_manager.py index 9be5484a3..3479ab208 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager.py +++ b/src/drunc/processes/ssh_process_lifetime_manager.py @@ -179,9 +179,9 @@ def kill_process_without_metadata( pass @abstractmethod - def crash_process(self, uuid: str) -> None: + def crash_process(self, uuid: str, signal: str = "KILL") -> None: """ - Simulate a process crash by sending SIGKILL without performing any cleanup. + Simulate a process crash by sending a signal without performing any cleanup. Unlike kill_process, this method only sends the kill signal to the remote process without waiting for termination or cleaning up associated resources @@ -191,6 +191,7 @@ def crash_process(self, uuid: str) -> None: Args: uuid: Process UUID to crash + signal: Signal to send to simulate crash """ pass diff --git a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py index d57e899bb..1a79636ab 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_from_forked_process.py @@ -557,18 +557,19 @@ def kill_process_without_metadata( timeout, ) - def crash_process(self, uuid: str) -> None: + def crash_process(self, uuid: str, signal: str = "KILL") -> None: """ - Simulate a process crash by sending SIGKILL without performing any cleanup. + Simulate a process crash by sending a signal without performing any cleanup. Delegates to the underlying SSHProcessLifetimeManagerShell running in - the forked worker process. Sends SIGKILL to the remote process without + the forked worker process. Sends the specified signal to the remote process without cleaning up any associated resources, simulating an unexpected crash. Args: uuid: Process UUID to crash + signal: Signal to send to simulate crash """ - self._call("crash_process", uuid) + self._call("crash_process", uuid, signal) def kill_processes( self, diff --git a/src/drunc/processes/ssh_process_lifetime_manager_shell.py b/src/drunc/processes/ssh_process_lifetime_manager_shell.py index 88324a2f1..bbdb661e3 100644 --- a/src/drunc/processes/ssh_process_lifetime_manager_shell.py +++ b/src/drunc/processes/ssh_process_lifetime_manager_shell.py @@ -169,12 +169,9 @@ def _monitor_remote_process(self, remote_pid: int) -> None: f"Remote process {self.uuid} (PID {remote_pid}) has exited" ) - raw_exit_code, client_exception = self.manager._wait_and_get_exit_code( - self.running_process.process, - f"SSH client for {self.uuid}", + raw_exit_code = self.manager.wait_for_process_exit_code( + self.uuid, timeout=30.0 ) - if client_exception is not None and exception is None: - exception = client_exception self.logger.debug( f"SSH client for {self.uuid} exited with code {raw_exit_code}" ) @@ -235,10 +232,28 @@ def _monitor_ssh_client(self) -> None: exception = None raw_exit_code = None - raw_exit_code, exception = self.manager._wait_and_get_exit_code( - self.running_process.process, - f"SSH client for {self.uuid}", + try: + self.running_process.process.wait() + client_exit_code = self.running_process.process.exit_code + self.logger.debug( + f"SSH client for {self.uuid} exited with code {client_exit_code}" + ) + except sh.ErrorReturnCode as e: + self.logger.debug(f"SSH client for {self.uuid} error: {e}") + exception = e + client_exit_code = e.exit_code + except Exception as e: + self.logger.error(f"SSH client for {self.uuid} watcher error: {e}") + exception = e + client_exit_code = None + + remote_exit_code = self.manager.wait_for_process_exit_code( + self.uuid, timeout=30.0 ) + if remote_exit_code is not None: + raw_exit_code = remote_exit_code + else: + raw_exit_code = client_exit_code default_source = ExitStatusSource.CLIENT_MONITORING remote_pid = self.running_process.remote_pid @@ -1021,24 +1036,6 @@ def read_process_metadata( self.log.debug(f"Failed to read metadata for {uuid}: {e}") return None - def _wait_and_get_exit_code( - self, - process: sh.RunningCommand, - process_description: str, - ) -> tuple[Optional[int], Optional[Exception]]: - """Wait for a sh process and return its exit code plus any raised exception.""" - try: - process.wait() - exit_code = process.exit_code - self.log.debug(f"{process_description} exited with code {exit_code}") - return exit_code, None - except sh.ErrorReturnCode as e: - self.log.debug(f"{process_description} error: {e}") - return e.exit_code, e - except Exception as e: - self.log.error(f"{process_description} watcher error: {e}") - return None, e - def _handle_external_client_sigquit( self, uuid: str, @@ -1239,7 +1236,7 @@ def kill_process_without_metadata( f"Exception was raised when terminating SSH client process: {e}" ) - exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) + exit_code = self.wait_for_process_exit_code(uuid, timeout=timeout) if exit_code is None and signal_name == "QUIT" and not as_manual_pm_kill: remote_pid = metadata.pid if metadata is not None else None @@ -1251,7 +1248,7 @@ def kill_process_without_metadata( SSHProcessLifetimeManagerShell.get_metadata_file_path(uuid), timeout=timeout, ) - exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) + exit_code = self.wait_for_process_exit_code(uuid, timeout=timeout) if exit_code is None: with self.lock: @@ -1262,7 +1259,7 @@ def kill_process_without_metadata( return ExitStatus(source_for_return, exit_code) - def _wait_for_process_exit_code(self, uuid: str, timeout: float) -> Optional[int]: + def wait_for_process_exit_code(self, uuid: str, timeout: float) -> Optional[int]: """ Wait for specified timeout to see if a process exit code is available. @@ -1346,7 +1343,7 @@ def kill_process( self.log.info( f"Skipping killing remote process {uuid} (PID {remote_pid}). It is already dead." ) - exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) + exit_code = self.wait_for_process_exit_code(uuid, timeout=timeout) self._cleanup_remote_file(hostname, user, metadata_file) self._cleanup_process_resources(uuid) if exit_code is None: @@ -1395,7 +1392,7 @@ def kill_process( with self.lock: running_process.pending_exit_status_source = None else: - exit_code = self._wait_for_process_exit_code(uuid, timeout=timeout) + exit_code = self.wait_for_process_exit_code(uuid, timeout=timeout) self._cleanup_remote_file(hostname, user, metadata_file) self._cleanup_process_resources(uuid) if exit_code is None: @@ -1413,13 +1410,10 @@ def kill_process( return None - def crash_process(self, uuid: str) -> None: + def crash_process(self, uuid: str, signal: str = "KILL") -> None: """ - Simulate a process crash by sending SIGKILL without performing any cleanup. - - Sends SIGKILL to the remote process identified by uuid but deliberately - skips all cleanup steps (metadata file removal, internal tracking cleanup, - SSH client termination). This leaves the process manager in the same state + Simulate an unexpected process crash by sending by ending the remote process without cleanup. + This leaves the process manager in the same state as if the process had crashed unexpectedly, allowing crash-recovery logic to be exercised in tests. @@ -1437,16 +1431,16 @@ def crash_process(self, uuid: str) -> None: metadata = self.metadata.get(uuid, None) if metadata is None or metadata.pid is None: self.log.warning( - f"No remote PID for {uuid}, cannot send SIGKILL to simulate crash." + f"No remote PID for {uuid}, cannot send {signal} to simulate crash." ) return remote_pid = metadata.pid self.log.debug( f"Simulating crash of process {uuid} (PID {remote_pid}): " - f"sending SIGKILL without cleanup." + f"sending {signal} without cleanup." ) - self._send_remote_signal(hostname, user, remote_pid, "KILL") + self._send_remote_signal(hostname, user, remote_pid, signal) def _cleanup_process_resources(self, uuid: str) -> None: """Remove all resources associated with a process UUID.""" diff --git a/tests/processes/test_ssh_process_lifetime_manager_common.py b/tests/processes/test_ssh_process_lifetime_manager_common.py index c6c0a7a20..729f10959 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_common.py +++ b/tests/processes/test_ssh_process_lifetime_manager_common.py @@ -97,11 +97,19 @@ def boot_processes_and_verify_exit_state_messages( expected_message_fragment="was terminated unexpectedly by a SIGKILL to the SSH client (SIGHUP on the server)", expected_reported_exit_code=None, ), + ExitMessageScenario( + name="case_remote_sigquit", + kill_mode="remote_sigquit", + expected_source=ExitStatusSource.REMOTE_MONITORING, + expected_message_fragment="was terminated unexpectedly through the remote pid", + expected_reported_exit_code=0, + ), ExitMessageScenario( name="case_remote_sigkill", kill_mode="remote_sigkill", expected_source=ExitStatusSource.REMOTE_MONITORING, expected_message_fragment="was terminated unexpectedly through the remote pid", + expected_reported_exit_code=128 + 9, # SIGKILL ), ExitMessageScenario( name="case_manual_remote_pid", @@ -218,7 +226,12 @@ def trigger_termination(process_uuid: str): as_manual_pm_kill=False, timeout=10.0, ), - "remote_sigkill": lambda: ssh_manager.crash_process(process_uuid), + "remote_sigkill": lambda: ssh_manager.crash_process( + process_uuid, signal="KILL" + ), + "remote_sigquit": lambda: ssh_manager.crash_process( + process_uuid, signal="QUIT" + ), "manual_remote_pid": lambda: ssh_manager.kill_process( process_uuid, timeout=10.0 ), From 457c08ebcfa4e2011a486382ccb0a0b8c8a7d327 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Wed, 13 May 2026 13:56:43 +0100 Subject: [PATCH 14/15] make sigkill exit logging consistent Co-authored-by: Copilot --- ...erage.aurash-VMware-Virtual-Platform.pid1778665.XQC9ty7x | 0 src/drunc/processes/exit_status.py | 6 ++++++ tests/processes/test_ssh_process_lifetime_manager_common.py | 4 ++-- 3 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 .coverage.aurash-VMware-Virtual-Platform.pid1778665.XQC9ty7x diff --git a/.coverage.aurash-VMware-Virtual-Platform.pid1778665.XQC9ty7x b/.coverage.aurash-VMware-Virtual-Platform.pid1778665.XQC9ty7x new file mode 100644 index 000000000..e69de29bb diff --git a/src/drunc/processes/exit_status.py b/src/drunc/processes/exit_status.py index 34390da96..5d1ef7d8d 100644 --- a/src/drunc/processes/exit_status.py +++ b/src/drunc/processes/exit_status.py @@ -56,6 +56,12 @@ def _interpret_client_monitoring(self) -> tuple[None | int, str]: ) def _interpret_remote_monitoring(self) -> tuple[Optional[int], str]: + if self._raw_exit_code == 137: + return ( + None, + "was terminated unexpectedly through the remote pid by a SIGKILL (error code 128 + 9)", + ) + return ( self._raw_exit_code, "was terminated unexpectedly through the remote pid", diff --git a/tests/processes/test_ssh_process_lifetime_manager_common.py b/tests/processes/test_ssh_process_lifetime_manager_common.py index 729f10959..33aef9359 100644 --- a/tests/processes/test_ssh_process_lifetime_manager_common.py +++ b/tests/processes/test_ssh_process_lifetime_manager_common.py @@ -108,8 +108,8 @@ def boot_processes_and_verify_exit_state_messages( name="case_remote_sigkill", kill_mode="remote_sigkill", expected_source=ExitStatusSource.REMOTE_MONITORING, - expected_message_fragment="was terminated unexpectedly through the remote pid", - expected_reported_exit_code=128 + 9, # SIGKILL + expected_message_fragment="was terminated unexpectedly through the remote pid by a SIGKILL (error code 128 + 9)", + expected_reported_exit_code=None, ), ExitMessageScenario( name="case_manual_remote_pid", From e7d5d1248c5d9ffb18c487dc91e7bc5ab9de60f1 Mon Sep 17 00:00:00 2001 From: Aurash Karimi Date: Wed, 13 May 2026 13:59:02 +0100 Subject: [PATCH 15/15] add unexpected exit case --- ...erage.aurash-VMware-Virtual-Platform.pid1778665.XQC9ty7x | 0 src/drunc/processes/exit_status.py | 6 ++++++ 2 files changed, 6 insertions(+) delete mode 100644 .coverage.aurash-VMware-Virtual-Platform.pid1778665.XQC9ty7x diff --git a/.coverage.aurash-VMware-Virtual-Platform.pid1778665.XQC9ty7x b/.coverage.aurash-VMware-Virtual-Platform.pid1778665.XQC9ty7x deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/drunc/processes/exit_status.py b/src/drunc/processes/exit_status.py index 5d1ef7d8d..9d3970d3a 100644 --- a/src/drunc/processes/exit_status.py +++ b/src/drunc/processes/exit_status.py @@ -56,6 +56,12 @@ def _interpret_client_monitoring(self) -> tuple[None | int, str]: ) def _interpret_remote_monitoring(self) -> tuple[Optional[int], str]: + if self._raw_exit_code == -9: + return ( + None, + "was terminated unexpectedly through the remote pid by a SIGKILL (error code -9)", + ) + if self._raw_exit_code == 137: return ( None,