Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
49e9883
WIP
Apr 22, 2026
68f5f1a
Add testing for logger
emmuhamm Apr 22, 2026
211a753
More hacks
Apr 22, 2026
3574164
Merge branch 'develop' into PawelPlesniakEmuhammad/SplitShellFixing
Apr 24, 2026
d33fede
Multi user support enabled
Apr 24, 2026
6c2d752
WIP
Apr 24, 2026
7cbb511
fix db path resolver
emmuhamm May 5, 2026
b19aca7
Commenting out a bunch of debug logs
emmuhamm May 6, 2026
fc55769
Add send_random test
emmuhamm May 6, 2026
e03689b
Fix #709
emmuhamm May 6, 2026
1e9eec0
fix logic bug in query matching
emmuhamm May 8, 2026
e0f1a4e
[messy] Add working logging command
emmuhamm May 8, 2026
aaadd1a
A clean logging implementation
emmuhamm May 8, 2026
3f4e34a
Port flush
emmuhamm May 11, 2026
0dba1e2
Finish porting pm commands
emmuhamm May 11, 2026
658c828
Add wait to PM cuz why not
emmuhamm May 11, 2026
c048f47
commment cleanup
emmuhamm May 11, 2026
435d3a0
First prototype of sending message over
emmuhamm May 11, 2026
2508d99
Working peer model?
emmuhamm May 12, 2026
d138c32
Clean up grpc vibes
emmuhamm May 12, 2026
ce1d814
[untested] k8s impl
emmuhamm May 12, 2026
ce5d688
Add disconnect message
emmuhamm May 12, 2026
5e083bf
Fix #904
emmuhamm May 12, 2026
cc89b87
add connect/disconnect in pm shell and say where from
emmuhamm May 13, 2026
12e7302
resolve hostname
emmuhamm May 13, 2026
a663093
Add basic ability to send over logs
emmuhamm May 13, 2026
d020839
help fix #326
emmuhamm May 13, 2026
aa8b514
Add queries
emmuhamm May 14, 2026
4247378
Clean up send_msg logic
emmuhamm May 14, 2026
2edf27a
Major cleanup
emmuhamm May 14, 2026
5595e47
Remove final log messages
emmuhamm May 14, 2026
3a760f5
Fix pytest
emmuhamm May 14, 2026
66c8707
Merge pull request #912 from DUNE-DAQ/emmuhamm/split-shell-improvements
PawelPlesniak May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/drunc/controller/interface/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@


class ControllerContext(ShellContext): # boilerplatefest
shell_id = "controller_shell"

def __init__(self):
self.status_receiver = None
self.took_control = False
Expand Down
27 changes: 18 additions & 9 deletions src/drunc/process_manager/interface/cli_argument.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,10 @@ def validate_conf_string(ctx, param, boot_configuration):
return boot_configuration


def add_query_options(at_least_one: bool, all_processes_by_default: bool = False):
def wrapper(f0):
f1 = click.option(
"-s",
"--session",
type=str,
default=None,
help="Select the processes on a particular session",
)(f0)
def add_query_options_no_session(
at_least_one: bool, all_processes_by_default: bool = False
):
def wrapper(f1):
f2 = click.option(
"-n",
"--name",
Expand All @@ -41,3 +36,17 @@ def wrapper(f0):
return generate_process_query(f4, at_least_one, all_processes_by_default)

return wrapper


def add_query_options(at_least_one: bool, all_processes_by_default: bool = False):
def wrapper(f0):
f1 = click.option(
"-s",
"--session",
type=str,
default=None,
help="Select the processes on a particular session",
)(f0)
return add_query_options_no_session(at_least_one, all_processes_by_default)(f1)

return wrapper
126 changes: 90 additions & 36 deletions src/drunc/process_manager/interface/commands.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import getpass
from time import sleep

import click
from druncschema.process_manager_pb2 import LogRequest, ProcessQuery
Expand All @@ -11,7 +12,7 @@
)
from drunc.process_manager.interface.context import ProcessManagerContext
from drunc.process_manager.utils import tabulate_process_instance_list
from drunc.utils.shell_utils import InterruptedCommand
from drunc.utils.shell_utils import InterruptedCommand, log_pm_cmd
from drunc.utils.utils import get_logger


Expand Down Expand Up @@ -43,6 +44,7 @@ def boot(
override_logs: bool,
) -> None:
log = get_logger("process_manager.shell")
log_pm_cmd(obj)
processes = obj.get_driver("process_manager").ps(ProcessQuery(user=user))

if len(processes.values) > 0:
Expand Down Expand Up @@ -129,6 +131,7 @@ def dummy_boot(
session_name: str,
) -> None:
log = get_logger("process_manager.shell")
log_pm_cmd(obj)
log.debug(
f"Running dummy_boot with {n_processes} processes for {sleep} seconds {n_sleeps} times, requested by user {user}"
)
Expand All @@ -150,6 +153,16 @@ def dummy_boot(
return


@click.command("wait")
@click.argument("sleep_time", type=int, default=1)
@click.pass_obj
def wait(obj: ProcessManagerContext, sleep_time: int) -> None:
log = get_logger("process_manager.wait")
log.info(f"Command [green]wait[/green] running for {sleep_time} seconds.")
sleep(sleep_time) # seconds
log.info(f"Command [green]wait[/green] ran for {sleep_time} seconds.")


@click.command("terminate")
@click.option(
"-w",
Expand All @@ -161,6 +174,7 @@ def dummy_boot(
@click.pass_obj
def terminate(obj: ProcessManagerContext, width: int | None) -> None:
log = get_logger("process_manager.shell")
log_pm_cmd(obj)
log.debug("Terminating")
result = obj.get_driver("process_manager").terminate()
if not result:
Expand All @@ -171,23 +185,35 @@ def terminate(obj: ProcessManagerContext, width: int | None) -> None:
obj.delete_driver("controller")


def kill_decorators(f):
f = click.pass_obj(f)
f = click.option(
"-w",
"--width",
type=int,
default=None,
help="Table width. Default is automatically calculated",
)(f)
f = click.option(
"--crash",
is_flag=True,
default=False,
help="Simulate a crash: send SIGKILL without any cleanup, leaving the process manager in an unexpected-death state.",
)(f)
return f


@click.command("kill")
@click.option(
"-w",
"--width",
type=int,
default=None,
help="Table width. Default is automatically calculated",
)
@add_query_options(at_least_one=True)
@click.option(
"--crash",
is_flag=True,
default=False,
help="Simulate a crash: send SIGKILL without any cleanup, leaving the process manager in an unexpected-death state.",
)
@click.pass_obj
def kill(obj: ProcessManagerContext, query: ProcessQuery, width: int | None) -> None:
@kill_decorators
def kill(obj, query, width):
log_pm_cmd(obj)
return kill_impl(obj, query, width)


def kill_impl(
obj: ProcessManagerContext, query: ProcessQuery, width: int | None
) -> None:
log = get_logger("process_manager.shell")
log.debug(f"Killing with query {query}")
result = obj.get_driver("process_manager").kill(query)
Expand All @@ -198,17 +224,27 @@ def kill(obj: ProcessManagerContext, query: ProcessQuery, width: int | None) ->
) # rich tables require console printing


def flush_decorators(f):
f = click.pass_obj(f)
f = click.option(
"-w",
"--width",
type=int,
default=None,
help="Table width. Default is automatically calculated",
)(f)
return f


@click.command("flush")
@click.option(
"-w",
"--width",
type=int,
default=None,
help="Table width. Default is automatically calculated",
)
@add_query_options(at_least_one=False, all_processes_by_default=True)
@click.pass_obj
def flush(
@flush_decorators
def flush(obj, query, width):
log_pm_cmd(obj)
return flush_impl(obj, query, width)


def flush_impl(
obj: ProcessManagerContext,
query: ProcessQuery,
width: int | None,
Expand All @@ -223,22 +259,34 @@ def flush(
) # rich tables require console printing


def logs_decorators(f):
f = click.pass_obj(f)
f = click.option("--grep", type=str, default=None)(f)
f = click.option(
"--how-far",
type=int,
show_default=True,
default=100,
help="How many lines one wants",
)(f)
return f


@click.command("logs")
@add_query_options(at_least_one=True)
@click.option(
"--how-far",
type=int,
show_default=True,
default=100,
help="How many lines one wants",
)
@click.option("--grep", type=str, default=None)
@click.pass_obj
def logs(
@logs_decorators
def logs(obj, how_far, grep, query):
log_pm_cmd(obj)
return logs_impl(obj, how_far, grep, query)


def logs_impl(
obj: ProcessManagerContext, how_far: int, grep: str, query: ProcessQuery
) -> None:
log = get_logger("process_manager.shell")
log.debug(f"Running logs with query {query}")
# TODO: MOVE BACK TO DEBUG BEFORE MERGE
# THIS IS USEFUL FOR TESTING THOUGH
log.error(f"Running logs with query {query}")
log_req = LogRequest(
how_far=how_far,
query=query,
Expand Down Expand Up @@ -276,6 +324,11 @@ def logs(
@add_query_options(at_least_one=True)
@click.pass_obj
def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None:
log_pm_cmd(obj)
return restart_impl(obj, query)


def restart_impl(obj: ProcessManagerContext, query: ProcessQuery) -> None:
log = get_logger("process_manager.shell")
log.debug(f"Restarting with query {query}")
obj.get_driver("process_manager").restart(query)
Expand Down Expand Up @@ -306,6 +359,7 @@ def ps(
width: int | None,
) -> None:
log = get_logger("process_manager.shell")
log_pm_cmd(obj)
log.debug(f"Running ps with query {query}")
results = obj.get_driver("process_manager").ps(query)
if not results:
Expand Down
2 changes: 2 additions & 0 deletions src/drunc/process_manager/interface/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@


class ProcessManagerContext(ShellContext): # boilerplatefest
shell_id = "process_manager_shell"

def __init__(self, *args, **kwargs):
self.status_receiver = None
super(ProcessManagerContext, self).__init__(*args, **kwargs)
Expand Down
9 changes: 9 additions & 0 deletions src/drunc/process_manager/interface/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ps,
restart,
terminate,
wait,
)
from drunc.utils.grpc_utils import ServerUnreachable
from drunc.utils.utils import (
Expand Down Expand Up @@ -59,6 +60,10 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) ->
# process_manager_shell_log.error(e.message) # TODO: Keep this for production branch, remove this from dev branch
exit(1)

ctx.obj.get_driver("process_manager").send_msg(
f"{getpass.getuser()} connected from {ctx.obj.shell_id}"
)

# Manually add file handler to process manager log
# Not possible to initialise logger immediately as it requires
# knowledge of the log path
Expand All @@ -75,6 +80,9 @@ def process_manager_shell(ctx, process_manager_address: str, log_level: str) ->
ctx.obj.start_listening(desc.broadcast)

def cleanup():
ctx.obj.get_driver("process_manager").send_msg(
f"{getpass.getuser()} disconnected from {ctx.obj.shell_id}"
)
ctx.obj.terminate()
process_manager_log.warning(
f"[green]{getpass.getuser()}[/green] disconnected from the process manager through a [green]drunc-process-manager-shell[/green]"
Expand All @@ -83,6 +91,7 @@ def cleanup():
ctx.call_on_close(cleanup)

ctx.command.add_command(boot, "boot")
ctx.command.add_command(wait, "wait")
ctx.command.add_command(terminate, "terminate")
ctx.command.add_command(kill, "kill")
ctx.command.add_command(flush, "flush")
Expand Down
19 changes: 18 additions & 1 deletion src/drunc/process_manager/k8s_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

# Local Application Imports
from druncschema.broadcast_pb2 import BroadcastType
from druncschema.generic_pb2 import OutcomeFlag, OutcomeStatus
from druncschema.process_manager_pb2 import (
BootRequest,
LogLines,
Expand All @@ -39,6 +40,7 @@
from drunc.process_manager.configuration import (
PROCESS_SHUTDOWN_ORDERING,
ProcessManagerConfHandler,
ProcessManagerTypes,
)
from drunc.process_manager.process_manager import ProcessManager
from drunc.process_manager.utils import on_parent_exit, validate_k8s_session_name
Expand Down Expand Up @@ -152,6 +154,8 @@ def run(self) -> None:


class K8sProcessManager(ProcessManager):
pm_type = ProcessManagerTypes.K8s

def __init__(self, configuration: ProcessManagerConfHandler, **kwargs) -> None:
"""
Manages processes as Kubernetes Pods.
Expand Down Expand Up @@ -1910,6 +1914,17 @@ def _logs_impl(self, log_request: LogRequest) -> LogLines:
lines=[f"Could not retrieve logs: {e.reason}"],
)

def _send_msg_impl(self, msg: str, peer: str) -> OutcomeStatus:
# Note: currently exact same implementation as ssh manager
# Although there is room here to change as necessary
try:
self.log.info(f"{msg}; from {peer}")
except Exception as e:
self.log.critical(f"Failed to receive message with exception {e}")
return OutcomeStatus(flag=OutcomeFlag.FAIL)

return OutcomeStatus(flag=OutcomeFlag.SUCCESS)

def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList:
"""
Handles the 'boot' command from the gRPC interface.
Expand Down Expand Up @@ -2516,7 +2531,9 @@ def kill_and_wait(uuids, grace_period=None) -> None:
pods_by_role[role].append(uuid)
if role == "segment-controller":
tree_id = pod.metadata.labels.get(tree_id_label_key, "")
segment_controller_depths[uuid] = tree_id.count(".") if tree_id else 0
segment_controller_depths[uuid] = (
tree_id.count(".") if tree_id else 0
)

# Kill in stages using our sorted lists
for role in PROCESS_SHUTDOWN_ORDERING:
Expand Down
9 changes: 7 additions & 2 deletions src/drunc/process_manager/oks_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from drunc.exceptions import DruncException, DruncSetupException
from drunc.process_manager.configuration import get_commandline_parameters
from drunc.utils.utils import get_logger
from drunc.utils.utils import file_is_read_only, get_logger

if TYPE_CHECKING:
import conffwk
Expand Down Expand Up @@ -75,8 +75,13 @@ def get_full_db_path(db_path: str) -> str:
err_str = f"No files found in DUNEDAQ_DB_PATH matching {db_path}."
raise DruncSetupException(err_str)

# If multiple matches are found, take the first instance that matches.
# Prefer the first writable match; if every match is read-only, fall back to the first one.
resolved_path = unique_matched_files[0]
for matched_file in unique_matched_files:
if not file_is_read_only(matched_file):
resolved_path = matched_file
break

log.debug(f"Path {db_path} resolved to {resolved_path}")
return resolved_path

Expand Down
Loading