From c544dda7ab74d762763ab4af3e6f00ac2bd365bc Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Tue, 11 Nov 2025 17:06:17 +0100 Subject: [PATCH 1/6] moved mounts to json conf, added data location mounts --- src/drunc/data/process_manager/k8s.json | 14 +++++++- .../process_manager/k8s_process_manager.py | 34 ++++++++++++------- src/drunc/process_manager/oks_parser.py | 12 +++++++ .../process_manager/process_manager_driver.py | 10 +++++- 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/src/drunc/data/process_manager/k8s.json b/src/drunc/data/process_manager/k8s.json index 2fedaf06c..17a89b4ee 100644 --- a/src/drunc/data/process_manager/k8s.json +++ b/src/drunc/data/process_manager/k8s.json @@ -45,6 +45,18 @@ "host_cache_expiry": 300, "grpc_startup_timeout": 30, "socket_retry_timeout": 1.0 - } + }, + "volumes": [ + { + "name": "nfs", + "mount_path": "/nfs", + "host_path": "/nfs" + }, + { + "name": "cvmfs", + "mount_path": "/cvmfs", + "host_path": "/cvmfs" + } + ] } } diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index ee1feb1bb..e95848a86 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -182,6 +182,9 @@ def __init__(self, configuration, **kwargs) -> None: self.kill_timeout = pod_management.get("kill_timeout", 20) self.pod_ready_timeout = pod_management.get("pod_ready_timeout", 60) + # Volume mounts + self.volume_configs = settings.get("volumes", []) + # Cleanup cleanup = settings.get("cleanup", {}) self.restart_cleanup_time = cleanup.get("restart_cleanup_time", 10.0) @@ -562,6 +565,12 @@ def _build_pod_main_container( f"'{podname}' identified as a Python app, no preStop hook needed." ) + # Prepare mounts + container_volume_mounts = [ + client.V1VolumeMount(name=vc["name"], mount_path=vc["mount_path"]) + for vc in self.volume_configs + ] + main_container = client.V1Container( name=podname, image=pod_image, @@ -573,10 +582,7 @@ def _build_pod_main_container( ], lifecycle=lifecycle_hook, ports=container_ports, - volume_mounts=[ - client.V1VolumeMount(name="nfs", mount_path="/nfs"), - client.V1VolumeMount(name="cvmfs", mount_path="/cvmfs"), - ], + volume_mounts=container_volume_mounts, working_dir=boot_request.process_description.process_execution_directory, security_context=client.V1SecurityContext( run_as_user=os.getuid(), run_as_group=os.getgid() @@ -646,6 +652,16 @@ def _build_pod_manifest( host_aliases: list[client.V1HostAlias] | None, ) -> client.V1Pod: """Assembles the final V1Pod object.""" + + # Prepare mounts + pod_volumes = [ + client.V1Volume( + name=vc["name"], + host_path=client.V1HostPathVolumeSource(path=vc["host_path"]), + ) + for vc in self.volume_configs + ] + return client.V1Pod( api_version="v1", kind="Pod", @@ -663,15 +679,7 @@ def _build_pod_manifest( restart_policy="Never", containers=[main_container], host_aliases=host_aliases if host_aliases else None, - volumes=[ - client.V1Volume( - name="nfs", host_path=client.V1HostPathVolumeSource(path="/nfs") - ), - client.V1Volume( - name="cvmfs", - host_path=client.V1HostPathVolumeSource(path="/cvmfs"), - ), - ], + volumes=pod_volumes, ), ) diff --git a/src/drunc/process_manager/oks_parser.py b/src/drunc/process_manager/oks_parser.py index 302d9b982..68b101d03 100644 --- a/src/drunc/process_manager/oks_parser.py +++ b/src/drunc/process_manager/oks_parser.py @@ -151,6 +151,17 @@ def collect_apps( ) log.debug(f"Collecting app {app.id} with args {args}") + data_path = None + if "DFApplication" in app.oksTypes(): + try: + # DFApplication -> data_writers -> data_store_params -> directory_path + data_path = app.data_writers[0].data_store_params.directory_path + except (AttributeError, IndexError): + log.debug( + f"DFApplication {app.id} is missing its data path configuration." + ) + pass + apps.append( { "name": app.id, @@ -161,6 +172,7 @@ def collect_apps( "env": app_env, "tree_id": app_tree_id_str, "log_path": app.log_path, + "data_path": data_path, } ) app_index += 1 diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 4feadbc30..b295df832 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -237,6 +237,7 @@ def _build_boot_request( args = app["args"] env = app["env"] app_log_path = app["log_path"] + data_path = app.get("data_path") env["DUNE_DAQ_BASE_RELEASE"] = os.getenv("DUNE_DAQ_BASE_RELEASE") env["SPACK_RELEASES_DIR"] = os.getenv("SPACK_RELEASES_DIR") tree_id = app["tree_id"] @@ -261,6 +262,13 @@ def _build_boot_request( if host_is_local(host) and not os.path.exists(os.path.dirname(log_path)): raise DruncShellException(f"Log path {log_path} does not exist.") + process_restriction = ProcessRestriction(allowed_hosts=[host]) + if data_path: + self.log.debug( + f"Attaching data_path '{data_path}' to the boot request for '{name}'" + ) + process_restriction.data_mount = data_path + self.log.debug(f"{name}'s env:\n{env}") breq = BootRequest( token=copy_token(self.token), @@ -277,7 +285,7 @@ def _build_boot_request( process_execution_directory=pwd, process_logs_path=log_path, ), - process_restriction=ProcessRestriction(allowed_hosts=[host]), + process_restriction=process_restriction, ) self.log.debug(f"{breq=}\n\n") return breq From 79376873f1affbf35c7143ea42a3a3546ef436d1 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Tue, 11 Nov 2025 19:52:08 +0100 Subject: [PATCH 2/6] mounts --- src/drunc/data/process_manager/k8s.json | 9 ++++++--- src/drunc/process_manager/k8s_process_manager.py | 13 ++++++++++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/drunc/data/process_manager/k8s.json b/src/drunc/data/process_manager/k8s.json index 17a89b4ee..62a66a322 100644 --- a/src/drunc/data/process_manager/k8s.json +++ b/src/drunc/data/process_manager/k8s.json @@ -32,7 +32,8 @@ "name": "local-connection-server" }, "pod_management": { - "kill_timeout": 20, + "kill_timeout": 60, + "total_shutdown_timeout": 120, "pod_ready_timeout": 60 }, "cleanup": { @@ -50,12 +51,14 @@ { "name": "nfs", "mount_path": "/nfs", - "host_path": "/nfs" + "host_path": "/nfs", + "read_only": false }, { "name": "cvmfs", "mount_path": "/cvmfs", - "host_path": "/cvmfs" + "host_path": "/cvmfs", + "read_only": true } ] } diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index e95848a86..545c46caa 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -179,8 +179,9 @@ def __init__(self, configuration, **kwargs) -> None: # Pod management pod_management = settings.get("pod_management", {}) - self.kill_timeout = pod_management.get("kill_timeout", 20) + self.kill_timeout = pod_management.get("kill_timeout", 30) self.pod_ready_timeout = pod_management.get("pod_ready_timeout", 60) + self.total_shutdown_timeout = pod_management.get("total_shutdown_timeout", 60) # Volume mounts self.volume_configs = settings.get("volumes", []) @@ -567,7 +568,11 @@ def _build_pod_main_container( # Prepare mounts container_volume_mounts = [ - client.V1VolumeMount(name=vc["name"], mount_path=vc["mount_path"]) + client.V1VolumeMount( + name=vc["name"], + mount_path=vc["mount_path"], + read_only=vc.get("read_only", True), + ) for vc in self.volume_configs ] @@ -657,7 +662,9 @@ def _build_pod_manifest( pod_volumes = [ client.V1Volume( name=vc["name"], - host_path=client.V1HostPathVolumeSource(path=vc["host_path"]), + host_path=client.V1HostPathVolumeSource( + path=vc["host_path"], type="Directory" + ), ) for vc in self.volume_configs ] From 61c4bdac6e71a195627bf05e2a24016205f53c57 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Wed, 12 Nov 2025 12:44:00 +0100 Subject: [PATCH 3/6] rework of mounting procedure --- .../process_manager/k8s_process_manager.py | 175 ++++++++++++++---- 1 file changed, 134 insertions(+), 41 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 545c46caa..3fa7d2b71 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -516,8 +516,109 @@ def _create_nodeport_service(self, podname, session, pod_uid) -> None: self.log.error(error_message) raise DruncK8sException(error_message) from e + def _get_pod_volumes_and_mounts( + self, boot_request: BootRequest + ) -> tuple[list[client.V1Volume], list[client.V1VolumeMount]]: + """ + Prepares all pod volumes and container mounts, including static + configs and the dynamic data_mount. + """ + pod_volumes = [] + container_volume_mounts = [] + + # Volumes from json configuration + for vc in self.volume_configs: + pod_volumes.append( + client.V1Volume( + name=vc["name"], + host_path=client.V1HostPathVolumeSource( + path=vc["host_path"], type="Directory" + ), + ) + ) + container_volume_mounts.append( + client.V1VolumeMount( + name=vc["name"], + mount_path=vc["mount_path"], + read_only=vc.get("read_only", True), + ) + ) + + # Add log_mount from process_logs_path + log_dir = None + log_file_path = boot_request.process_description.process_logs_path + if log_file_path: + log_dir = os.path.dirname(log_file_path) + self.log.info(f"Adding 'log-mount' for directory: '{log_dir}'") + + pod_volumes.append( + client.V1Volume( + name="log-mount", + host_path=client.V1HostPathVolumeSource( + path=log_dir, + type="DirectoryOrCreate", + ), + ) + ) + container_volume_mounts.append( + client.V1VolumeMount( + name="log-mount", + mount_path=log_dir, + read_only=False, + ) + ) + + # Add dynamic data_mount if present in the boot request + data_mount_path = None + if boot_request.process_restriction.data_mount: + self.log.info( + f"Found data_mount request: '{boot_request.process_restriction.data_mount}'" + ) + if boot_request.process_restriction.data_mount == ".": + data_mount_path = ( + boot_request.process_description.process_execution_directory + ) + self.log.info( + f"Resolving '.' data_mount to process_execution_directory: '{data_mount_path}'" + ) + else: + data_mount_path = boot_request.process_restriction.data_mount + self.log.info(f"Using provided data_mount path: '{data_mount_path}'") + + if data_mount_path: + if data_mount_path == log_dir: + self.log.info( + f"Skipping 'data-mount' as its path '{data_mount_path}' is already covered by 'log-mount'." + ) + else: + self.log.info( + f"Adding 'data-mount' for directory: '{data_mount_path}'" + ) + pod_volumes.append( + client.V1Volume( + name="data-mount", + host_path=client.V1HostPathVolumeSource( + path=data_mount_path, + type="Directory", + ), + ) + ) + container_volume_mounts.append( + client.V1VolumeMount( + name="data-mount", + mount_path=data_mount_path, + read_only=False, + ) + ) + + return pod_volumes, container_volume_mounts + def _build_pod_main_container( - self, podname: str, boot_request: BootRequest, lcs_port: int | None + self, + podname: str, + boot_request: BootRequest, + lcs_port: int | None, + container_volume_mounts: list[client.V1VolumeMount], ) -> client.V1Container: """Builds the primary V1Container manifest, including command and preStop hook.""" @@ -539,7 +640,7 @@ def _build_pod_main_container( prefix = "exec " command_parts.append(prefix + " ".join([e_and_a.exec] + list(e_and_a.args))) - main_command_str = " && ".join(command_parts) + main_command_chain = " && ".join(command_parts) container_ports = [] if podname == self.connection_server_name and lcs_port is not None: @@ -547,40 +648,32 @@ def _build_pod_main_container( client.V1ContainerPort(container_port=lcs_port, name="http-port") ) - # Only add preStop hook for C++ applications (non-controllers) - lifecycle_hook = None - if "controller" not in podname and podname != self.connection_server_name: - self.log.debug( - f"'{podname}' identified as a C++ app, adding preStop hook with SIGQUIT." - ) - shutdown_command = "kill -QUIT 1" - lifecycle_hook = client.V1Lifecycle( - pre_stop=client.V1LifecycleHandler( - _exec=client.V1ExecAction( - command=["/bin/sh", "-c", shutdown_command] - ) - ) - ) - else: - self.log.debug( - f"'{podname}' identified as a Python app, no preStop hook needed." + # Add preStop hook + self.log.debug( + f"'{podname}' identified as a C++ app, adding preStop hook with SIGQUIT." + ) + shutdown_command = "kill -QUIT 1" + lifecycle_hook = client.V1Lifecycle( + pre_stop=client.V1LifecycleHandler( + _exec=client.V1ExecAction(command=["/bin/sh", "-c", shutdown_command]) ) + ) - # Prepare mounts - container_volume_mounts = [ - client.V1VolumeMount( - name=vc["name"], - mount_path=vc["mount_path"], - read_only=vc.get("read_only", True), - ) - for vc in self.volume_configs - ] + # Redirect logs + log_file_path = boot_request.process_description.process_logs_path + final_command_args: str + + if log_file_path: + self.log.info(f"Redirecting pod stdout/stderr to '{log_file_path}'") + final_command_args = f"exec > {log_file_path} 2>&1; {main_command_chain}" + else: + final_command_args = main_command_chain main_container = client.V1Container( name=podname, image=pod_image, command=["/bin/sh", "-c"], - args=[main_command_str], + args=[final_command_args], env=[ client.V1EnvVar(name=k, value=v) for k, v in boot_request.process_description.env.items() @@ -655,20 +748,10 @@ def _build_pod_manifest( main_container: client.V1Container, node_selector: dict, host_aliases: list[client.V1HostAlias] | None, + pod_volumes: list[client.V1Volume], ) -> client.V1Pod: """Assembles the final V1Pod object.""" - # Prepare mounts - pod_volumes = [ - client.V1Volume( - name=vc["name"], - host_path=client.V1HostPathVolumeSource( - path=vc["host_path"], type="Directory" - ), - ) - for vc in self.volume_configs - ] - return client.V1Pod( api_version="v1", kind="Pod", @@ -775,9 +858,18 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: f"Could not extract port for LCS '{podname}'." ) + # Prepare volume mounts + ( + pod_volumes, + container_volume_mounts, + ) = self._get_pod_volumes_and_mounts(boot_request) + # Build the main container manifest main_container = self._build_pod_main_container( - podname, boot_request, lcs_port + podname, + boot_request, + lcs_port, + container_volume_mounts, ) # Node_selector, host_aliases, pod_manifest @@ -791,6 +883,7 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: main_container, node_selector, host_aliases, + pod_volumes, ) # Execute the pod creation API call From e82f715097c3abf6c5a8e678b5050436ceb781da Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Wed, 12 Nov 2025 14:29:16 +0100 Subject: [PATCH 4/6] finish on mount, changed prehook, reworked kill process --- src/drunc/data/process_manager/k8s.json | 4 +- .../process_manager/k8s_process_manager.py | 168 ++++++++++++++---- 2 files changed, 139 insertions(+), 33 deletions(-) diff --git a/src/drunc/data/process_manager/k8s.json b/src/drunc/data/process_manager/k8s.json index 62a66a322..1718ffe16 100644 --- a/src/drunc/data/process_manager/k8s.json +++ b/src/drunc/data/process_manager/k8s.json @@ -32,8 +32,8 @@ "name": "local-connection-server" }, "pod_management": { - "kill_timeout": 60, - "total_shutdown_timeout": 120, + "kill_timeout": 30, + "total_shutdown_timeout": 60, "pod_ready_timeout": 60 }, "cleanup": { diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 3fa7d2b71..14721e539 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -613,6 +613,35 @@ def _get_pod_volumes_and_mounts( return pod_volumes, container_volume_mounts + def _get_tree_labels(self, tree_id: str, podname: str) -> dict[str, str]: + """ + Determines the role of a pod based on its tree_id, + and returns a dictionary of labels to be applied. + """ + role = "unknown" + + labels = {f"tree-id.{self.drunc_label}": tree_id} + + if not tree_id: + role = "unknown" + elif tree_id == "0": + role = "root-controller" + elif tree_id == "1": + role = "local-connection-server" + else: + # Count the depth + depth = tree_id.count(".") + if depth == 1: + role = "segment-controller" + elif depth == 2: + role = "application" + + labels[f"role.{self.drunc_label}"] = role + self.log.info( + f"Assigning labels for '{podname}': role={role}, tree-id={tree_id}" + ) + return labels + def _build_pod_main_container( self, podname: str, @@ -630,12 +659,11 @@ def _build_pod_main_container( for i, e_and_a in enumerate(exec_and_args_list): is_last_command = i == len(exec_and_args_list) - 1 prefix = "" - # Only add 'exec' to the C++ apps (non-controllers) + if ( - "controller" not in podname - and podname != self.connection_server_name - and is_last_command + is_last_command and e_and_a.exec != "source" + and podname != self.connection_server_name ): prefix = "exec " @@ -648,16 +676,24 @@ def _build_pod_main_container( client.V1ContainerPort(container_port=lcs_port, name="http-port") ) - # Add preStop hook - self.log.debug( - f"'{podname}' identified as a C++ app, adding preStop hook with SIGQUIT." - ) - shutdown_command = "kill -QUIT 1" - lifecycle_hook = client.V1Lifecycle( - pre_stop=client.V1LifecycleHandler( - _exec=client.V1ExecAction(command=["/bin/sh", "-c", shutdown_command]) + # Only add preStop hook for C++ applications (non-controllers) + lifecycle_hook = None + if "controller" not in podname and podname != self.connection_server_name: + self.log.debug( + f"'{podname}' identified as a C++ app, adding preStop hook with SIGQUIT." + ) + shutdown_command = "kill -QUIT 1" + lifecycle_hook = client.V1Lifecycle( + pre_stop=client.V1LifecycleHandler( + _exec=client.V1ExecAction( + command=["/bin/sh", "-c", shutdown_command] + ) + ) + ) + else: + self.log.debug( + f"'{podname}' identified as a Python app, no preStop hook needed." ) - ) # Redirect logs log_file_path = boot_request.process_description.process_logs_path @@ -665,9 +701,21 @@ def _build_pod_main_container( if log_file_path: self.log.info(f"Redirecting pod stdout/stderr to '{log_file_path}'") - final_command_args = f"exec > {log_file_path} 2>&1; {main_command_chain}" + log_redirect_cmd = f"exec > {log_file_path} 2>&1;" + else: + log_redirect_cmd = "" + + if podname == self.connection_server_name: + # LCS (gunicorn) needs a shell trap to handle SIGTERM grace + final_command_args = ( + f"{log_redirect_cmd} " + f"trap 'kill -KILL $child; wait $child; exit 0' TERM QUIT; " + f"{main_command_chain} & " + f"child=$!; " + f"wait $child" + ) else: - final_command_args = main_command_chain + final_command_args = f"{log_redirect_cmd} {main_command_chain}" main_container = client.V1Container( name=podname, @@ -749,19 +797,25 @@ def _build_pod_manifest( node_selector: dict, host_aliases: list[client.V1HostAlias] | None, pod_volumes: list[client.V1Volume], + extra_labels: dict[str, str] | None = None, ) -> client.V1Pod: """Assembles the final V1Pod object.""" + # Get pod labels + pod_labels = { + "app": podname, + f"creator.{self.drunc_label}": self.__class__.__name__, + } + if extra_labels: + pod_labels.update(extra_labels) + return client.V1Pod( api_version="v1", kind="Pod", metadata=self._meta_v1_api( name=podname, namespace=session, - labels={ - "app": podname, - f"creator.{self.drunc_label}": self.__class__.__name__, - }, + labels=pod_labels, ), spec=self._pod_spec_v1_api( node_selector=node_selector, @@ -846,6 +900,7 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: """Constructs and creates a Kubernetes Pod manifest and its associated service.""" try: lcs_port = None + tree_id = boot_request.process_description.metadata.tree_id # Early Port Extraction and Class Variable Setup for LCS if podname == self.connection_server_name: @@ -858,6 +913,9 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: f"Could not extract port for LCS '{podname}'." ) + # Get correct label using tree_id + tree_labels = self._get_tree_labels(tree_id, podname) + # Prepare volume mounts ( pod_volumes, @@ -884,6 +942,7 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: node_selector, host_aliases, pod_volumes, + extra_labels=tree_labels, ) # Execute the pod creation API call @@ -1423,20 +1482,30 @@ def _kill_pod(self, podname, session, grace_period_seconds=None) -> None: ) def _kill_impl(self, query: ProcessQuery) -> ProcessInstanceList: - """Handles the 'kill' command.""" - uuids_to_kill = self._get_process_uid(query, order_by="leaf_first") - if not uuids_to_kill: - return ProcessInstanceList(values=[]) + """ + Handles the 'kill' command with staged, role-based shutdown + by querying pod labels. + """ - self.log.info(f"Starting termination of {len(uuids_to_kill)} pods...") + # Get all UUIDs + targeted_uuids = set(self._get_process_uid(query)) + if not targeted_uuids: + return ProcessInstanceList(values=[]) - apps = [] - for uuid_str in uuids_to_kill: - if uuid_str not in self.boot_request: - continue + self.log.info( + f"Starting staged termination for {len(targeted_uuids)} pod(s)..." + ) - apps.append(uuid_str) + # Define the shutdown order + shutdown_order = [ + "unknown", + "application", + "segment-controller", + "root-controller", + "local-connection-server", + ] + # Define the blocking kill_and_wait helper def kill_and_wait(uuids, grace_period=None) -> None: if not uuids: return @@ -1471,10 +1540,47 @@ def kill_and_wait(uuids, grace_period=None) -> None: self.uuids_pending_deletion.clear() - kill_and_wait(apps) + # Execute staged shutdown + all_pods = [] + try: + pod_list = self._core_v1_api.list_pod_for_all_namespaces( + label_selector=self._get_creator_label_selector() + ) + all_pods = pod_list.items + except self._api_error_v1_api as e: + self.log.error(f"Could not list pods for kill operation: {e}") + + # Map pods by their role label + pods_by_role = { + "unknown": [], + "application": [], + "segment-controller": [], + "root-controller": [], + "local-connection-server": [], + } + + uuid_label_key = f"uuid.{self.drunc_label}" + role_label_key = f"role.{self.drunc_label}" + + for pod in all_pods: + uuid = pod.metadata.labels.get(uuid_label_key) + if uuid and uuid in targeted_uuids: + role = pod.metadata.labels.get(role_label_key, "unknown") + pods_by_role[role].append(uuid) + + # Kill in stages using our sorted lists + for role in shutdown_order: + uuids_in_step = pods_by_role[role] + if uuids_in_step: + self.log.info( + f"--- Termination Step: Shutting down role '{role}' ({len(uuids_in_step)} pod(s)) ---" + ) + kill_and_wait(uuids_in_step) # This call is blocking + self.log.info(f"--- Termination Step: Role '{role}' complete ---") + # Finalize and clean up final_ret = [] - for proc_uuid in uuids_to_kill: + for proc_uuid in targeted_uuids: if proc_uuid in self.boot_request: pi = ProcessInstance( process_description=self.boot_request[ From 747c44e332cab9bf87481a29ae03ee2f902400a7 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Wed, 12 Nov 2025 15:30:37 +0100 Subject: [PATCH 5/6] better handling for no k8s config file --- .../process_manager/k8s_process_manager.py | 12 ++++++++- src/drunc/unified_shell/shell.py | 25 ++++++++++++++++--- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 14721e539..bec6f1dac 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -27,6 +27,7 @@ # Third-Party Imports from kubernetes import client, config, watch from kubernetes.client.rest import ApiException +from kubernetes.config.config_exception import ConfigException from drunc.k8s_exceptions import ( DruncK8sException, @@ -141,7 +142,16 @@ def __init__(self, configuration, **kwargs) -> None: super().__init__(configuration=configuration, session=self.session, **kwargs) self.log = get_logger("process_manager.k8s-process-manager") - config.load_kube_config() + try: + config.load_kube_config() + except ConfigException as e: + self.log.critical("--- 🚨 KUBERNETES CONFIGURATION ERROR ---") + self.log.critical(f"Failed to load kube-config: {e}") + self.log.critical( + "Please ensure 'kubectl' is configured correctly or the KUBECONFIG environment variable is set." + ) + self.log.critical("----------------------------------------------") + raise self._k8s_client = client self._core_v1_api = client.CoreV1Api() diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 738c7c8b6..9e4128acf 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -225,12 +225,31 @@ def unified_shell( unified_shell_log.debug("[green]Process manager[/green] started") # Check if the process manager started correctly - for _ in range(100): + process_started = False + for _ in range(100): # 10s timeout if ready_event.is_set(): + process_started = True break + + if not ctx.obj.pm_process.is_alive(): + exit_code = ctx.obj.pm_process.exitcode + unified_shell_log.error( + f"[red]Process manager process died unexpectedly with exit code {exit_code}." + ) + unified_shell_log.error( + "[red]This is likely a configuration error (e.g., bad kube-config)." + ) + unified_shell_log.error( + "[red]Please check the full traceback in the terminal above this message.[/red]" + ) + sys.exit(exit_code if exit_code else 1) sleep(0.1) - if not ready_event.is_set(): - raise DruncSetupException("[red]Process manager didn't start in time[/red]") + + if not process_started: + # This message will only show if the process is *alive* but never sent the "ready" signal + raise DruncSetupException( + "[red]Process manager timed out starting. Check logs for details.[/red]" + ) # Setup the process manager address process_manager_address = resolve_localhost_and_127_ip_to_network_ip( From dbae8b0b9f942af3dc0efc0dd62849b544a63835 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Wed, 12 Nov 2025 15:53:24 +0100 Subject: [PATCH 6/6] updating k8s jsons --- src/drunc/data/process_manager/k8s-CERN.json | 38 ++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/drunc/data/process_manager/k8s-CERN.json b/src/drunc/data/process_manager/k8s-CERN.json index 7b9806005..cf01c415a 100644 --- a/src/drunc/data/process_manager/k8s-CERN.json +++ b/src/drunc/data/process_manager/k8s-CERN.json @@ -23,5 +23,43 @@ "opmon_conf":{ "level": "info", "interval_s" : 10.0 + }, + "settings": { + "labels": { + "drunc_label": "drunc.daq" + }, + "connection_server": { + "name": "local-connection-server" + }, + "pod_management": { + "kill_timeout": 30, + "total_shutdown_timeout": 60, + "pod_ready_timeout": 60 + }, + "cleanup": { + "restart_cleanup_time": 10, + "restart_cleanup_polling": 0.5 + }, + "checking": { + "watcher_retry_sleep": 5, + "pod_status_check_sleep": 1, + "host_cache_expiry": 300, + "grpc_startup_timeout": 30, + "socket_retry_timeout": 1.0 + }, + "volumes": [ + { + "name": "nfs", + "mount_path": "/nfs", + "host_path": "/nfs", + "read_only": false + }, + { + "name": "cvmfs", + "mount_path": "/cvmfs", + "host_path": "/cvmfs", + "read_only": true + } + ] } }