Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 63 additions & 29 deletions src/drunc/process_manager/ssh_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from drunc.exceptions import DruncCommandException
from drunc.process_manager.process_manager import ProcessManager
from drunc.processes.exit_status import ExitStatus
from drunc.processes.ssh_process_lifetime_manager import ProcessLifetimeManager


Expand Down Expand Up @@ -55,8 +56,8 @@ def __init__(
logger=self.log,
on_process_exit=self._on_ssh_process_exit,
)
# stores the exit codes for all dead processes by uuid
self.archived_exit_codes: dict[str, int] = {}
# stores the exit statuses for all dead processes by uuid
self.archived_exit_statuses: dict[str, ExitStatus] = {}

def _build_process_instance(
self,
Expand Down Expand Up @@ -105,7 +106,10 @@ def _get_process_timeouts(self, uuids: List[str]) -> dict[str, float]:
return process_timeouts

def _on_ssh_process_exit(
self, uuid: str, exit_code: Optional[int], exception: Optional[Exception]
self,
uuid: str,
exit_status: Optional[ExitStatus],
exception: Optional[Exception],
) -> None:
if uuid not in self.boot_request:
return
Expand All @@ -115,16 +119,23 @@ def _on_ssh_process_exit(
f"Process with UUID {uuid} threw an exception when we tried to kill it: {exception!s}"
)

if exit_code is None:
if exit_status is None:
self.log.error(
f"Process with UUID {uuid} is still running but on_ssh_process_exit was called."
)
return
else:
self.log.debug(f"Process with UUID {uuid} exited with code {exit_code}.")
self.log.debug(
f"Process with UUID {uuid} exited with status {exit_status!r} triggering on_ssh_process_exit."
)

if uuid not in self.archived_exit_codes:
self.archived_exit_codes[uuid] = exit_code
# Processes killed cleanly via the kill endpoint will already
# have their exit code and dead status recorded, so there is no benefit from
# overwriting it asynchronously here. This is only used to
# record exit codes for processes that were killed unexpectedly
# (e.g. due to a crash or external kill signal)
if uuid not in self.archived_exit_statuses:
self.archived_exit_statuses[uuid] = exit_status
if uuid not in self.expected_dead_applications:
self.add_process_to_expected_dead_processes(uuid)

Expand All @@ -133,7 +144,12 @@ def _on_ssh_process_exit(
session = boot_req.process_description.metadata.session
user = boot_req.process_description.metadata.user

self.notify_join(name=name, session=session, user=user, exit_code=exit_code)
self.notify_join(
name=name,
session=session,
user=user,
exit_status=self.archived_exit_statuses[uuid],
)

def kill_processes(self, uuids: list) -> ProcessInstanceList:
"""
Expand All @@ -148,21 +164,27 @@ def kill_processes(self, uuids: list) -> ProcessInstanceList:
Returns:
ProcessInstanceList containing status of terminated processes
"""
# Delegate shutdown to lifetime manager and retrieve exit codes
exit_codes = self.ssh_lifetime_manager.kill_processes(
# Delegate shutdown to lifetime manager and retrieve exit statuses
exit_statuses = self.ssh_lifetime_manager.kill_processes(
uuids, self._get_process_timeouts(uuids)
)

for proc_uuid in uuids:
self.add_process_to_expected_dead_processes(proc_uuid)
self.archived_exit_codes.update(exit_codes)
for proc_uuid, exit_status in exit_statuses.items():
if exit_status is not None:
self.archived_exit_statuses[proc_uuid] = exit_status

# Build ProcessInstance objects from termination results
ret = [
self._build_process_instance(
uuid=uuid,
status_code=ProcessInstance.StatusCode.DEAD,
return_code=exit_codes.get(uuid),
return_code=(
exit_statuses[uuid].get_reported_exit_code()
if exit_statuses.get(uuid) is not None
else None
),
)
for uuid in uuids
]
Expand Down Expand Up @@ -296,9 +318,9 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines:
flag=ResponseFlag.UNHANDLED_EXCEPTION_THROWN,
)

def notify_join(self, name, session, user, exit_code):
def notify_join(self, name, session, user, exit_status: ExitStatus):
self.log.debug(f"{self.name} sending broadcast after ssh process exit")
end_str = f"Process '{name}' (session: '{session}', user: '{user}') process exited with exit code {exit_code}"
end_str = exit_status.get_process_manager_log_message(name, session, user)
self.log.info(end_str)
self.broadcast(end_str, BroadcastType.SUBPROCESS_STATUS_UPDATE)

Expand Down Expand Up @@ -370,12 +392,14 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance:

# Query current process status
alive = self.ssh_lifetime_manager.is_process_alive(uuid)
return_code = self.ssh_lifetime_manager.pop_early_exit_code(uuid)
return_status = self.ssh_lifetime_manager.pop_early_exit_status(uuid)

# Archive exit code if process exited early
if return_code is not None:
self.log.debug(f"Process {uuid} exited early with exit code: {return_code}")
self.archived_exit_codes[uuid] = return_code
if return_status is not None:
self.log.debug(
f"Process {uuid} exited early with exit status: {return_status!r}"
)
self.archived_exit_statuses[uuid] = return_status

# Determine status code based on liveness
status_code = (
Expand All @@ -388,7 +412,11 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance:
pi = self._build_process_instance(
uuid=uuid,
status_code=status_code,
return_code=return_code,
return_code=(
return_status.get_reported_exit_code()
if return_status is not None
else None
),
)

return pi
Expand Down Expand Up @@ -443,13 +471,13 @@ def _ps_impl(self, query: ProcessQuery) -> ProcessInstanceList:
ret += [pi]
continue

exit_code = self.archived_exit_codes.get(proc_uuid, None)
exit_status = self.archived_exit_statuses.get(proc_uuid, None)

if exit_code is not None:
if exit_status is not None:
pi = self._build_process_instance(
uuid=proc_uuid,
status_code=ProcessInstance.StatusCode.DEAD,
return_code=exit_code,
return_code=exit_status.get_reported_exit_code(),
)
else:
pi = self._build_process_instance(
Expand Down Expand Up @@ -501,9 +529,11 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList:
# reported as unexpectedly dead
self.add_process_to_expected_dead_processes(uuid)

self.archived_exit_codes[uuid] = self.ssh_lifetime_manager.kill_process(
exit_status = self.ssh_lifetime_manager.kill_process(
uuid, self.configuration.data.kill_timeout
)
if exit_status is not None:
self.archived_exit_statuses[uuid] = exit_status

del self.boot_request[uuid]

Expand All @@ -512,7 +542,7 @@ def _restart_impl(self, query: ProcessQuery) -> ProcessInstanceList:
# Remove the application from the list of dead applications
self.remove_process_from_expected_dead_processes(uuid)

del self.archived_exit_codes[uuid]
self.archived_exit_statuses.pop(uuid, None)
del uuid
del same_uuid_br
del same_uuid
Expand Down Expand Up @@ -582,7 +612,7 @@ def _crash_processes(self, uuids: list) -> ProcessInstanceList:
self.log.info(
f"Simulating crash of process {this_uuid} (sending SIGKILL, no cleanup)."
)
self.ssh_lifetime_manager.crash_process(this_uuid)
self.ssh_lifetime_manager.crash_process(this_uuid, signal="KILL")

ret = [
self._build_process_instance(
Expand All @@ -605,7 +635,7 @@ def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList:

Matches processes against the query, checks each for liveness via the
SSH lifetime manager, and removes any dead ones from boot_request and
archived_exit_codes. Only dead processes are affected by this command.
archived_exit_statuses. Only dead processes are affected by this command.

Args:
query: ProcessQuery specifying which processes to consider for flushing.
Expand Down Expand Up @@ -649,12 +679,16 @@ def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList:
)
continue

return_code = self.archived_exit_codes.pop(proc_uuid, None)
exit_status = self.archived_exit_statuses.pop(proc_uuid, None)

pi = self._build_process_instance(
uuid=proc_uuid,
status_code=ProcessInstance.StatusCode.DEAD,
return_code=return_code,
return_code=(
exit_status.get_reported_exit_code()
if exit_status is not None
else None
),
)

del self.boot_request[proc_uuid]
Expand All @@ -666,7 +700,7 @@ def _flush_impl(self, query: ProcessQuery) -> ProcessInstanceList:
self.log.info(
f"Flushed dead process {proc_uuid} "
f"(name: {pi.process_description.metadata.name}, "
f"exit code: {return_code})."
f"exit code: {pi.return_code})."
)
flushed.append(pi)

Expand Down
125 changes: 125 additions & 0 deletions src/drunc/processes/exit_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from enum import Enum
from typing import Optional


class ExitStatusSource(Enum):
CLIENT_MONITORING = "client_monitoring"
REMOTE_MONITORING = "remote_monitoring"
MANUAL_KILL_THROUGH_SSH_CLIENT = "manual_kill_through_ssh_client"
MANUAL_KILL_THROUGH_REMOTE_PID = "manual_kill_through_remote_pid"


class ExitStatus:
def __init__(
self,
source: ExitStatusSource,
raw_exit_code: Optional[int],
) -> None:
self._source = source
self._raw_exit_code = raw_exit_code
self._reported_exit_code, self._message_fragment = self._interpret()

def _interpret(self) -> tuple[None | int, str]:
if self._raw_exit_code is None:
return None, "exit state could not be determined"

if self._source is ExitStatusSource.CLIENT_MONITORING:
return self._interpret_client_monitoring()

if self._source is ExitStatusSource.REMOTE_MONITORING:
return self._interpret_remote_monitoring()

if self._source is ExitStatusSource.MANUAL_KILL_THROUGH_SSH_CLIENT:
return self._interpret_manual_kill_through_ssh_client()

if self._source is ExitStatusSource.MANUAL_KILL_THROUGH_REMOTE_PID:
return self._interpret_manual_kill_through_remote_pid()

return None, "Unrecognised exit status"

def _interpret_client_monitoring(self) -> tuple[None | int, str]:
if self._raw_exit_code == 255:
return (
None,
"was terminated unexpectedly with SIGQUIT on the SSH client (SIGHUP on the server)",
)

if self._raw_exit_code == -9:
return (
None,
"was terminated unexpectedly by a SIGKILL to the SSH client (SIGHUP on the server)",
)

return (
self._raw_exit_code,
f"was terminated unexpectedly with unusual SSH client exit code of {self._raw_exit_code}",
)

def _interpret_remote_monitoring(self) -> tuple[Optional[int], str]:
if self._raw_exit_code == -9:
return (
None,
"was terminated unexpectedly through the remote pid by a SIGKILL (error code -9)",
)

if self._raw_exit_code == 137:
return (
None,
"was terminated unexpectedly through the remote pid by a SIGKILL (error code 128 + 9)",
)

return (
self._raw_exit_code,
"was terminated unexpectedly through the remote pid",
)

def _interpret_manual_kill_through_ssh_client(self) -> tuple[Optional[int], str]:
if self._raw_exit_code == 255 or self._raw_exit_code == 0:
return (
None,
"was terminated by the process manager through the SSH client",
)

if self._raw_exit_code == -9:
return (
None,
"was terminated with a SIGKILL by the process manager through the SSH client",
)

return (
self._raw_exit_code,
f"was terminated by the process manager through the SSH client with an unusual exit code of: {self._raw_exit_code}",
)

def _interpret_manual_kill_through_remote_pid(self) -> tuple[Optional[int], str]:
return (
self._raw_exit_code,
"was terminated by the process manager through the remote pid",
)

def get_source(self) -> ExitStatusSource:
return self._source

def get_reported_exit_code(self) -> Optional[int]:
return self._reported_exit_code

def get_process_manager_log_message(
self,
process_name: str,
session: str,
user: str,
) -> str:
return (
f"Process '{process_name}' (session: '{session}', user: '{user}') "
f"{self._message_fragment}. "
f"Reported exit code: {self._reported_exit_code if self._reported_exit_code is not None else 'N/A (the nature of the shutdown prevented exit code collection)'}."
)

def __repr__(self) -> str:
return (
"ExitStatus("
f"source={self._source.value!r}, "
f"raw_exit_code={self._raw_exit_code!r}, "
f"reported_exit_code={self._reported_exit_code!r}"
")"
)
Loading
Loading