From 6ef119fa70f6d1867ee005ea406f74fad5a5c999 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Tue, 28 Oct 2025 14:22:48 +0100 Subject: [PATCH 01/14] fixes after k8s update --- .../process_manager/k8s_process_manager.py | 170 +++++++++++++++--- .../process_manager/process_manager_driver.py | 133 ++++++++++++-- 2 files changed, 261 insertions(+), 42 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 584796beb..028105261 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -4,6 +4,8 @@ import re import signal import threading +import urllib.error +import urllib.request import uuid from time import sleep, time @@ -461,6 +463,7 @@ def _create_nodeport_service(self, podname, session, pod_uid) -> None: ), spec=client.V1ServiceSpec( type="NodePort", + external_traffic_policy="Local", selector={"app": podname}, ports=[ client.V1ServicePort( @@ -655,6 +658,22 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: if podname == self.connection_server_name: self._create_nodeport_service(podname, session, pod_uid) + elif "root-controller" in podname: + self.log.info( + f"'{podname}' is the root controller, creating isolated NodePort service." + ) + port = self._extract_port_from_cmd(boot_request) + if port: + self.log.info(f"Extracted port {port} for '{podname}' NodePort.") + self.connection_server_port = port + self.connection_server_node_port = port + self._create_nodeport_service(podname, session, pod_uid) + else: + self.log.warning( + f"Could not extract port for '{podname}', falling back to headless." + ) + self._create_headless_service(podname, session, pod_uid) + else: self._create_headless_service(podname, session, pod_uid) @@ -682,19 +701,57 @@ def _get_connection_server_cluster_ip(self, session) -> str: return None def _extract_port_from_cmd(self, boot_request) -> int | None: - # Find the gunicorn port argument from exec_and_args_list + """ + Parses the boot request's command arguments to find a port. + """ + # Check all command parts for a port argument for e_and_a in boot_request.process_description.executable_and_arguments: - if "gunicorn" in e_and_a.exec or ( - "gunicorn" in " ".join(list(e_and_a.args)) - ): - all_args = [e_and_a.exec] + list(e_and_a.args) - arg_str = " ".join(all_args) + all_args = [e_and_a.exec] + list(e_and_a.args) + arg_str = " ".join(all_args) + + # 1. Check for gunicorn bind syntax (for local-connection-server) + # e.g., gunicorn --bind=0.0.0.0:30005 + if "gunicorn" in arg_str: match = re.search(r"-b\s+[\w\.]+:(\d+)", arg_str) if not match: - # Try to match '--bind' match = re.search(r"--bind[\s=]+[\w\.]+:(\d+)", arg_str) if match: - return int(match.group(1)) + port = int(match.group(1)) + if port != 0: + self.log.info(f"Extracted gunicorn port {port} from command.") + return port + + # 2. Check for drunc-controller --port syntax + # e.g., drunc-controller --port 12345 + if "controller" in arg_str: + # Matches --port 12345 or --port=12345 + match = re.search(r"--port[\s=]+(\d+)", arg_str) + if match: + port = int(match.group(1)) + if port != 0: + self.log.info( + f"Extracted drunc-controller port {port} from command." + ) + return port + + # 3. NEW: Check for drunc-controller -c grpc://... syntax + # e.g., drunc-controller -c grpc://localhost:12345 or -c grpc://localhost:0 + if "controller" in arg_str: + # Matches -c grpc://...:12345 or -c "grpc://...:12345" + match = re.search(r"-c\s+[\"\']?grpc:\/\/[^:]+:(\d+)[\"\']?", arg_str) + if match: + port = int(match.group(1)) + if port != 0: + self.log.info( + f"Extracted drunc-controller gRPC port {port} from command." + ) + return port + else: + self.log.warning( + "Controller gRPC port is 0, cannot create NodePort." + ) + + # If no non-zero port is found, return None return None def _get_process_uid(self, query: ProcessQuery, order_by: str = None) -> list[str]: @@ -765,6 +822,7 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: - For the connection server: Wait for it to be ready and check the NodePort service - For all other pods: Boot is NON-BLOCKING. """ + print(boot_request) session = boot_request.process_description.metadata.session podname = boot_request.process_description.metadata.name @@ -842,10 +900,15 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: # Special handling only for the connection server if podname == self.connection_server_name: - self.log.info(f"Waiting for '{podname}' to become ready...") - + node_name = None + pod_ready = False start_time = time() - while time() - start_time < self.pod_ready_timeout: + + # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- + self.log.info( + f"Stage 1: Waiting for '{podname}' pod to be Running and Ready..." + ) + while not pod_ready and (time() - start_time < self.pod_ready_timeout): try: pod_status = self._core_v1_api.read_namespaced_pod_status( podname, session @@ -854,33 +917,82 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: pod_status.status.phase == "Running" and pod_status.status.pod_ip ): - self.log.info( - f"'{podname}' is ready with IP {pod_status.status.pod_ip}." - ) - self.local_connection_server_is_booted = True - - # Log connection information using the NodePort service - node_name = pod_status.spec.node_name - self.log.info(f"Connection server '{podname}' is ready.") - self.log.info( - f" -> For internal cluster access: 'http://localhost:{self.connection_server_port}'" - ) - self.log.info( - f" -> For external access, use NodePort {self.connection_server_node_port} on any cluster node IP (e.g., http://{node_name}:{self.connection_server_node_port})" - ) + # Check readiness condition + is_ready = False + if pod_status.status.conditions: + for condition in pod_status.status.conditions: + if ( + condition.type == "Ready" + and condition.status == "True" + ): + is_ready = True + break + + if is_ready: + self.log.info( + f"Stage 1: Pod '{podname}' is API Ready with IP {pod_status.status.pod_ip}." + ) + node_name = pod_status.spec.node_name + pod_ready = True # Exit this loop and go to Stage 2 - break except self._api_error_v1_api as e: if e.status == 404: - pass + pass # Pod not yet created/visible, keep polling else: raise e sleep(self.pod_status_check_sleep) - else: + + if not pod_ready: raise DruncK8sException( - f"'{podname}' did not become ready in {self.pod_ready_timeout} seconds." + f"'{podname}' pod did not become API Ready in {self.pod_ready_timeout} seconds." ) + # --- STAGE 2: Wait for NodePort to be externally reachable --- + self.log.info( + f"Stage 2: Waiting for NodePort {node_name}:{self.connection_server_node_port} to be reachable..." + ) + nodeport_ready = False + url = f"http://{node_name}:{self.connection_server_node_port}" + + # Use the *remaining* time for this check + remaining_time = self.pod_ready_timeout - (time() - start_time) + nodeport_start_time = time() + + while not nodeport_ready and ( + time() - nodeport_start_time < remaining_time + ): + try: + # We don't care about the response, just that it doesn't error + # Timeout set to 1s for a quick check + urllib.request.urlopen(url, timeout=1) + nodeport_ready = True + self.log.info(f"Stage 2: NodePort {url} is now active.") + except ( + urllib.error.URLError, + ConnectionRefusedError, + TimeoutError, + OSError, + ) as e: + # Keep polling until timeout + self.log.debug(f"NodePort not ready yet ({e}), retrying...") + sleep(self.pod_status_check_sleep) + + if not nodeport_ready: + raise DruncK8sException( + f"NodePort {url} did not become reachable in {self.pod_ready_timeout} seconds." + ) + + # --- All clear --- + self.local_connection_server_is_booted = True + + self.log.info(f"Connection server '{podname}' is fully ready.") + self.log.info( + f" -> For internal cluster access: 'http://localhost:{self.connection_server_port}'" + ) + self.log.info( + f" -> For external access, use NodePort {self.connection_server_node_port} on any cluster node IP (e.g., http://{node_name}:{self.connection_server_node_port})" + ) + pd, pr, pu = ProcessDescription(), ProcessRestriction(), ProcessUUID(uuid=uuid) pd.CopyFrom(self.boot_request[uuid].process_description) pr.CopyFrom(self.boot_request[uuid].process_restriction) diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 489f2b191..27075aaff 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -25,8 +25,6 @@ from druncschema.token_pb2 import Token from drunc.connectivity_service.client import ConnectivityServiceClient -from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful -from drunc.controller.utils import get_segment_lookup_timeout from drunc.exceptions import DruncSetupException, DruncShellException from drunc.process_manager.utils import get_log_path, get_rte_script from drunc.utils.grpc_utils import ( @@ -35,7 +33,6 @@ handle_grpc_error, ) from drunc.utils.utils import ( - get_control_type_and_uri_from_connectivity_service, get_logger, host_is_local, resolve_localhost_and_127_ip_to_network_ip, @@ -50,7 +47,8 @@ def __init__(self, address: str, token: Token): self.log = get_logger("controller.ProcessManagerDriver") self.address = address options = [ - ("grpc.keepalive_time_ms", 60000) # pings the server every 60 seconds + ("grpc.keepalive_time_ms", 60000), # pings the server every 60 seconds + ("grpc.enable_retries", 1), ] self.channel = grpc.insecure_channel(self.address, options=options) self.stub = ProcessManagerStub(self.channel) @@ -350,6 +348,14 @@ def _connect_to_service( if session_dal.connectivity_service: connection_server = session_dal.connectivity_service.host connection_port = session_dal.connectivity_service.service.port + + if connection_server == "localhost": + resolved_server = resolve_localhost_to_hostname(connection_server) + self.log.debug( + f"Resolved connection server 'localhost' to '{resolved_server}' to avoid K8s hairpinning." + ) + connection_server = resolved_server + client = ConnectivityServiceClient( session_name, f"{connection_server}:{connection_port}" ) @@ -374,6 +380,8 @@ def get_controller_address(session_dal, session_name): env = {} collect_variables(session_dal.environment, env) + + """ if csc: try: timeout = ( @@ -400,23 +408,122 @@ def get_controller_address(session_dal, session_name): return return uri.replace("grpc://", "") + """ - service_id = top_controller_name + "_control" port_number = None protocol = None + service_found = None + top_controller_name = None # Initialize here + + print("ARE WE HERE NOW") + + try: + top_controller_name = session_dal.segment.controller.id + self.log.info( + f"Top controller name from OKS config: '{top_controller_name}'" + ) + + # Check if exposes_service relationship exists and is populated + if ( + not hasattr(session_dal.segment.controller, "exposes_service") + or not session_dal.segment.controller.exposes_service + ): + self.log.error( + f"Controller '{top_controller_name}' in OKS config has no 'exposes_service' relationship defined or it's empty." + ) + return None + + self.log.debug( + f"Controller '{top_controller_name}' exposes services: {[s.id for s in session_dal.segment.controller.exposes_service]}" + ) + + # Get the first (and presumably only) control service linked + # Using next(iter(...)) provides a slightly cleaner way to get the first item or None + service_found = next( + iter(session_dal.segment.controller.exposes_service), None + ) - for service in session_dal.segment.controller.exposes_service: - if service.id == service_id: - port_number = service.port - protocol = service.protocol - break + if service_found: + self.log.info( + f"Found linked control service object with ID: '{service_found.id}'" + ) + # Check if the service object actually has the port and protocol attributes + if ( + hasattr(service_found, "port") + and service_found.port is not None + ): + port_number = service_found.port + self.log.info( + f"Extracted port from service '{service_found.id}': {port_number}" + ) + else: + self.log.error( + f"Service object '{service_found.id}' is missing the 'port' attribute or it's null." + ) + + if hasattr(service_found, "protocol") and service_found.protocol: + protocol = service_found.protocol + self.log.info( + f"Extracted protocol from service '{service_found.id}': {protocol}" + ) + else: + self.log.error( + f"Service object '{service_found.id}' is missing the 'protocol' attribute or it's empty." + ) + + else: + # This case should ideally not be reached due to the check above, but good for robustness + self.log.error( + f"Could not retrieve the first service object from 'exposes_service' for controller '{top_controller_name}'." + ) + return None # Exit if service object itself couldn't be retrieved + + except AttributeError as e: + self.log.error( + f"Error accessing OKS configuration attributes: {e}. Check structure around session_dal.segment.controller." + ) + return None + except Exception as e: + self.log.error( + f"Unexpected error during service discovery from OKS: {e}" + ) + return None + + # Check if we successfully got a port and protocol if port_number is None or protocol is None: + self.log.error( + f"Failed to extract valid port ({port_number}) or protocol ({protocol}) for service '{service_found.id if service_found else 'N/A'}'. Cannot determine controller address." + ) + return None # Exit if service definition is incomplete + + # Resolve the IP address of the host where the controller runs + try: + host_id = session_dal.segment.controller.runs_on.runs_on.id + self.log.info(f"Controller runs on host ID: '{host_id}'") + ip = resolve_localhost_and_127_ip_to_network_ip(host_id) + self.log.info(f"Resolved host ID '{host_id}' to IP: {ip}") + except AttributeError as e: + self.log.error( + f"Error accessing OKS configuration attributes for host resolution: {e}. Check structure around session_dal.segment.controller.runs_on." + ) + return None + except Exception as e: + self.log.error(f"Unexpected error during host IP resolution: {e}") + return None + + if not ip: + self.log.error( + f"Host ID '{host_id}' resolved to an empty or invalid IP address." + ) return None - ip = resolve_localhost_and_127_ip_to_network_ip( - session_dal.segment.controller.runs_on.runs_on.id + # If all checks passed, return the address + final_address = f"{ip}:{port_number}" + self.log.info( + f"Successfully resolved controller address from OKS config: {final_address}" ) - return f"{ip}:{port_number}" + return final_address + # --- END CORRECTED LOGIC WITH ADDED LOGGING --- def keyboard_interrupt_on_sigint(signal, frame): self.log.warning("Interrupted") From 0a1f2b00b0fdc8da9fc18a5efc45e90f31e9fb2b Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Mon, 3 Nov 2025 17:16:57 +0100 Subject: [PATCH 02/14] temp branch for fix --- src/drunc/controller/controller_driver.py | 37 +++- src/drunc/controller/interface/controller.py | 76 ++++++++- src/drunc/controller/interface/shell_utils.py | 159 +++++++++++++++--- .../process_manager/k8s_process_manager.py | 91 ++++++++++ src/drunc/utils/grpc_utils.py | 23 ++- 5 files changed, 353 insertions(+), 33 deletions(-) diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index e31b0a544..3b5f55930 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -1,3 +1,4 @@ +import os from functools import wraps import grpc @@ -33,10 +34,10 @@ class ControllerDriver: def __init__(self, address: str, token: Token): self.log = get_logger("controller.ControllerDriver") self.address = address - options = [ - ("grpc.keepalive_time_ms", 60000) # pings the server every 60 seconds - ] - self.channel = grpc.insecure_channel(self.address, options=options) + options = [] + target_address = f"ipv4:{self.address}" + self.log.info(f"Creating ControllerDriver for '{self.address}', connecting to gRPC target '{target_address}'") + self.channel = grpc.insecure_channel(target_address, options=options) self.stub = ControllerStub(self.channel) self.token = Token() self.token.CopyFrom(token) @@ -101,8 +102,36 @@ def describe( ) request.token.CopyFrom(self.token) + # --- NEW DIAGNOSTIC BLOCK --- + print("\n--- INSIDE ControllerDriver.describe() ---", flush=True) + print(f"Connecting to self.address: {self.address}", flush=True) + print(f"Request token user: {request.token.user_name}", flush=True) + print(f"Request target: '{request.target}'", flush=True) + print(f"Timeout: {timeout}s", flush=True) + + # Check environment variables *at the moment of the call* + print(f"ENV http_proxy: {os.environ.get('http_proxy')}", flush=True) + print(f"ENV https_proxy: {os.environ.get('https_proxy')}", flush=True) + print(f"ENV no_proxy: {os.environ.get('no_proxy')}", flush=True) + print(f"ENV GRPC_ENABLE_FORK_SUPPORT: {os.environ.get('GRPC_ENABLE_FORK_SUPPORT')}", flush=True) + print(f"gRPC Channel Target (from self.channel): {self.channel._channel.target().decode('utf-8')}", flush=True) + print("--- CALLING self.stub.describe() NOW (Line 102) ---", flush=True) + # --- END DIAGNOSTIC BLOCK --- + try: + # This is the line that fails (line 102) + print("RIGHT HERE KID") response = self.stub.describe(request, timeout=timeout) + print("RIGHT AFTER") + print(response) + + except socket.herror as e: + # Also add a specific catch to be 100% sure + print(f"\n--- CAUGHT socket.herror AT THE SOURCE ---", flush=True) + print(f"ERROR: {e}", flush=True) + print(f"gRPC Channel Target was: {self.channel._channel.target().decode('utf-8')}", flush=True) + raise e # Re-raise the error + except grpc.RpcError as e: handle_grpc_error(e) diff --git a/src/drunc/controller/interface/controller.py b/src/drunc/controller/interface/controller.py index 15d72874d..fd5b1cc19 100644 --- a/src/drunc/controller/interface/controller.py +++ b/src/drunc/controller/interface/controller.py @@ -106,14 +106,51 @@ def controller_cli( token=token, ) - def serve(listen_addr: str) -> None: +# In src/drunc/controller/interface/controller.py + + def serve(listen_addr: str) -> tuple: server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10)) add_ControllerServicer_to_server(ctrlr, server) - port = server.add_insecure_port(listen_addr) + + # --- MODIFIED BINDING LOGIC --- + bind_addr = listen_addr # Default fallback + actual_port = 0 + try: + # Extract port, removing potential grpc:// prefix and host part + port_str = listen_addr.split(':')[-1] + port = int(port_str) + + # We MUST bind to '[::]' (all interfaces) for HostPort to work + bind_addr = f'[::]:{port}' + + log.info(f"Original listen address '{listen_addr}', binding server to '{bind_addr}'.") + actual_port = server.add_insecure_port(bind_addr) # Bind here + + if actual_port == 0 and port != 0: # Check if binding failed (port=0 is OK) + raise RuntimeError(f"Failed to bind server to {bind_addr}, port came back as 0.") + elif port == 0 and actual_port != 0: + log.info(f"OS assigned port {actual_port} for binding '[::]:0'") + port = actual_port # Update port to the one assigned + elif actual_port != port: + log.warning(f"Requested port {port} but bound to {actual_port}. Check for conflicts.") + + except (ValueError, IndexError, RuntimeError) as e: + log.critical(f"CRITICAL: Failed to parse port or bind server: {e}. Attempting fallback...") + # Fallback: Try binding to the original address directly + try: + actual_port = server.add_insecure_port(listen_addr) + if actual_port == 0: + raise RuntimeError(f"Fallback bind to '{listen_addr}' also failed (port 0).") + log.warning(f"Bound to fallback address '{listen_addr}' on port {actual_port}. HostPort might not work correctly.") + bind_addr = listen_addr # Update bind_addr for logging + except Exception as fallback_e: + log.critical(f"CRITICAL: Fallback server bind failed: {fallback_e}") + sys.exit(1) # Exit pod on total bind failure + # --- END MODIFIED BINDING LOGIC --- server.start() - log.debug(f"'{ctrlr.name}' was started on '{port}'") - return server, port + log.info(f"'{ctrlr.name}' gRPC server started, listening internally on '{bind_addr}' (reported port: {actual_port})") + return server, actual_port # Return the port it *actually* bound to def controller_shutdown(): log.info("Requested termination") @@ -125,7 +162,8 @@ def kill_me(sig, frame): l = get_logger("controller.kill_me") l.info("Sending SIGKILL") if ctrlr.top_segment_controller: - ctrlr.connectivity_service.retract_partition(fail_quickly=True) + if hasattr(ctrlr, "connectivity_service") and ctrlr.connectivity_service: + ctrlr.connectivity_service.retract_partition(fail_quickly=True) pgrp = os.getpgid(os.getpid()) os.killpg(pgrp, signal.SIGKILL) @@ -138,9 +176,27 @@ def shutdown(sig, frame): kill_me(sig, frame) try: - server, port = serve(commandfacility) - server_name = commandfacility.split(":")[0] - ctrlr.advertise_control_address(f"grpc://{server_name}:{port}") + # Pass commandfacility (which becomes listen_addr inside serve) + server, actual_port_bound = serve(commandfacility) + + # --- MODIFIED ADVERTISE LOGIC --- + # We need to advertise the EXTERNAL address, not the internal '[::]' + advertise_host = commandfacility + if advertise_host.startswith('grpc://'): + advertise_host = advertise_host[len('grpc://'):] + advertise_host = advertise_host.split(':')[0] # Get 'localhost' or IP from -c arg + + # Resolve 'localhost' to the node's real, external IP for advertising + advertise_host_resolved = resolve_localhost_and_127_ip_to_network_ip(advertise_host) + + # Use the actual port the server bound to + advertise_address = f"grpc://{advertise_host_resolved}:{actual_port_bound}" + + log.info(f"Advertising controller address as: {advertise_address}") + # Update the controller's internal URI, which is used by describe() + ctrlr.advertise_control_address(advertise_address) + # --- END MODIFIED ADVERTISE LOGIC --- + ctrlr.init_controller() # Add signal handling for gRPC server @@ -167,3 +223,7 @@ def signal_handler(signum, frame): except Exception as e: log.exception(e) + log.critical("Controller_cli failed to start, exiting.") + controller_shutdown() # Try to clean up + sys.exit(1) # Exit with error + diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 4a453812e..8d23cda3c 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -10,6 +10,13 @@ from functools import partial from urllib.parse import urlparse +import subprocess +import json +from druncschema.controller_pb2_grpc import ControllerStub +from druncschema.controller_pb2 import AddressedCommand +from druncschema.token_pb2 import Token +from drunc.utils.grpc_utils import grpc_proto_to_dict, dict_to_grpc_proto + import click import grpc from druncschema.controller_pb2 import ( @@ -45,6 +52,56 @@ ) from drunc.utils.utils import format_name_for_cli, get_logger +_GRPC_HELPER_SCRIPT = """ +import grpc +import sys +import json +from druncschema.controller_pb2_grpc import ControllerStub +from druncschema.controller_pb2 import AddressedCommand +from druncschema.token_pb2 import Token +from druncschema.request_response_pb2 import Request +from drunc.utils.grpc_utils import grpc_proto_to_dict, dict_to_grpc_proto + +# This function mimics the successful test_grpc_client.py +def run_describe(address, token_dict): + try: + # Re-create the token object from the dictionary + token = dict_to_grpc_proto(token_dict, Token()) + + options = [] # Use minimal, clean options, just like the test script + channel = grpc.insecure_channel(address, options=options) + stub = ControllerStub(channel) + + request = AddressedCommand( + token=token, + command_name="describe", + target="", + execute_along_path=True, + ) + + # Use a short, reasonable timeout + response = stub.describe(request, timeout=10.0) + + # Print the successful response as JSON to stdout + print(json.dumps(grpc_proto_to_dict(response))) + sys.exit(0) + + except Exception as e: + # Print any errors to stderr + print(f"gRPC helper script failed: {e}", file=sys.stderr) + import traceback + traceback.print_exc(file=sys.stderr) + sys.exit(1) + +if __name__ == "__main__": + if len(sys.argv) != 3: + print(f"Usage: {sys.argv[0]}
", file=sys.stderr) + sys.exit(1) + + address = sys.argv[1] + token_dict = json.loads(sys.argv[2]) + run_describe(address, token_dict) +""" @dataclass(slots=True) class StatusDescriptionPair: @@ -68,8 +125,11 @@ def match_children( def get_status_table( status_response: StatusResponse, describe_response: DescribeResponse ): + print("GST 1") status = status_response.status + print("GST 2") description = describe_response.description + print("GST 3") t = Table( title=( @@ -86,17 +146,24 @@ def get_status_table( t.add_column("Included") t.add_column("Endpoint") + print("GST 4") + def add_status_to_table( table: Table, status_response: StatusResponse, describe_response: DescribeResponse, prefix: str, ): + print("GST 41") status = status_response.status + print("GST 42") description = describe_response.description + print("GST 43") if status is None or description is None: return + print("GST 44") + def update_endpoint(endpoint: str) -> str: """ Parses endpoint to a human readable hostname @@ -110,11 +177,18 @@ def update_endpoint(endpoint: str) -> str: if not endpoint: return "" + print("GST 45") + ip_address = urlparse(endpoint).hostname + print("GST 46") if not ip_address: return "" - hostname, _, _ = socket.gethostbyaddr(ip_address) - return endpoint.replace(ip_address, hostname) + print("GST 47") + print(ip_address) + #hostname, _, _ = socket.gethostbyaddr(ip_address) + print("GST 48") + #return endpoint.replace(ip_address, hostname) + return endpoint table.add_row( prefix + status_response.name, @@ -126,14 +200,19 @@ def update_endpoint(endpoint: str) -> str: update_endpoint(description.endpoint), ) + print("GST 49.1") children = match_children(status_response.children, describe_response.children) + print("GST 49.2") children_list = sorted(list(children.keys())) + print("GST 49.3") + for child in children_list: child_status = getattr(children[child], "status", None) if not child_status: continue child_describe = children[child].description + print("GST 49.6") if child_status is None or child_describe is None: raise DruncShellException( f"No matching status and description for child '{child}'" @@ -142,6 +221,8 @@ def update_endpoint(endpoint: str) -> str: add_status_to_table(t, status_response, describe_response, "") + print("GST 5") + def add_runinfo_to_table(table: Table, status: Status): table.add_row("Run number", str(status.run_info.run_number)) table.add_row("Run type", status.run_info.run_type) @@ -162,6 +243,8 @@ def add_runinfo_to_table(table: Table, status: Status): table.add_row("Config file", status.run_info.run_config_file) table.add_row("Config ID", status.run_info.run_config_name) + print("GST 6") + if status.HasField("run_info"): runinfo_table = Table( title="Run Info", @@ -172,6 +255,8 @@ def add_runinfo_to_table(table: Table, status: Status): add_runinfo_to_table(runinfo_table, status) return Group(t, runinfo_table) + print("GST 7") + return t @@ -183,8 +268,11 @@ def __init__(self, ctx, refresh_per_second=2, *args, **kwargs) -> None: def update_table(self): statuses = self.ctx.get_driver("controller").status() + print("MAYBE") descriptions = self.ctx.get_driver("controller").describe() + print("MAYBE2") self.table = get_status_table(statuses, descriptions) + print("MAYBE3") def get_renderable(self) -> ConsoleRenderable | RichCast | str: renderable = Group(self.table, *self.get_renderables()) @@ -223,7 +311,6 @@ def controller_cleanup(): return controller_cleanup - def controller_setup(ctx, controller_address): log = logging.getLogger("controller.shell_utils") if not hasattr(ctx, "took_control"): @@ -232,8 +319,7 @@ def controller_setup(ctx, controller_address): ) desc = Description() - - timeout = 60 + timeout = 60 # Total timeout for the connection loop with Progress( SpinnerColumn(), @@ -248,29 +334,67 @@ def controller_setup(ctx, controller_address): ) stored_exception = None + response_json_str = None # Store the stdout from the helper start_time = time.time() while time.time() - start_time < timeout: progress.update(waiting, completed=time.time() - start_time) try: - desc = ctx.get_driver("controller").describe().description - stored_exception = None - break - except ServerUnreachable as e: + # --- NEW SUBPROCESS CONNECTION LOGIC --- + + python_exe = sys.executable + token_dict = grpc_proto_to_dict(ctx.get_token()) # This was line 316 + env = os.environ.copy() + + process = subprocess.Popen( + [python_exe, "-c", _GRPC_HELPER_SCRIPT, controller_address, json.dumps(token_dict)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + env=env + ) + + stdout, stderr = process.communicate(timeout=15) + + if process.returncode == 0: + response_json_str = stdout + stored_exception = None + log.debug("gRPC helper script connected successfully.") + break + else: + raise ServerUnreachable(f"gRPC helper script failed (rc={process.returncode}): {stderr}") + + # --- END NEW SUBPROCESS LOGIC --- + + except (ServerUnreachable, subprocess.TimeoutExpired, json.JSONDecodeError) as e: stored_exception = e - time.sleep(1) + log.debug(f"Connection attempt failed: {e}. Retrying in 1s...") + time.sleep(1) # Wait 1s before retrying except Exception as e: - ctx.critical("Could not get the controller's status") - ctx.critical(e) - ctx.critical("Exiting.") + # --- FIX IS HERE --- + ctx.log.critical("Could not get the controller's status") # Use ctx.log.critical + ctx.log.critical(e) + ctx.log.critical("Exiting.") + # --- END FIX --- ctx.terminate() raise e if stored_exception is not None: + log.error("Failed to connect to controller after timeout.") raise stored_exception + try: + response_dict = json.loads(response_json_str) + if 'description' not in response_dict: + raise DruncSetupException(f"Invalid 'describe' response from helper script: 'description' field missing. Response: {response_json_str}") + desc = dict_to_grpc_proto(response_dict['description'], Description()) + except Exception as e: + log.error(f"Failed to parse gRPC helper JSON response: {e}") + log.error(f"Response was: {response_json_str}") + raise DruncSetupException(f"Failed to parse gRPC response: {e}") + log.info( f"{controller_address} is '{desc.name}.{desc.session}' (name.session), starting listening..." ) @@ -279,20 +403,18 @@ def controller_setup(ctx, controller_address): ctx.start_listening_controller(desc.broadcast) log.debug("Connected to the controller") - - # 60s for everyone to show up on the connectivity service, and 10s to come out of initialising state + + # --- This section remains the same, checking status and taking control --- timeout = 60 + 10 - time_start = time.time() state = ctx.get_driver("controller").status().status.state.lower() with StatusTableUpdater(ctx) as updater: task = updater.add_task("Waiting on tree initialisation...", total=timeout) while time.time() - time_start < timeout and state == "initialising": state = ctx.get_driver("controller").status().status.state.lower() - updater.update(task, completed=time.time() - time_start) + updater.update(task, completed=time.time() - start_time) updater.update_table() time.sleep(0.5) - updater.update_table() if state == "initialising": @@ -318,7 +440,6 @@ def controller_setup(ctx, controller_address): return desc - def search_fsm_command(command_name: str, command_list: list[FSMCommand]): for command in command_list: if command_name == command.name: diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 028105261..f1f7477a2 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -3,6 +3,7 @@ import os import re import signal +import socket import threading import urllib.error import urllib.request @@ -993,6 +994,96 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: f" -> For external access, use NodePort {self.connection_server_node_port} on any cluster node IP (e.g., http://{node_name}:{self.connection_server_node_port})" ) + elif "root-controller" in podname: + self.log.info(f"Waiting for '{podname}' (HostPort) to become ready...") + node_name = None + pod_ready = False + controller_port = self._extract_port_from_cmd(boot_request) + + if not controller_port or controller_port == 0: + raise DruncK8sException(f"Cannot wait for '{podname}', port is 0 or missing.") + + # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- + self.log.info(f"Stage 1: Waiting for '{podname}' pod to be Running and Ready...") + start_time = time() + api_ready_timeout = self.pod_ready_timeout # Use standard pod timeout + + while not pod_ready and (time() - start_time < api_ready_timeout): + try: + pod_status = self._core_v1_api.read_namespaced_pod_status( + podname, session + ) + if ( + pod_status.status.phase == "Running" + and pod_status.status.pod_ip + ): + is_ready = False + if pod_status.status.conditions: + for condition in pod_status.status.conditions: + if condition.type == "Ready" and condition.status == "True": + is_ready = True + break + if is_ready: + self.log.info( + f"Stage 1: Pod '{podname}' is API Ready with IP {pod_status.status.pod_ip}." + ) + node_name = pod_status.spec.node_name + pod_ready = True # Exit this loop and go to Stage 2 + + except self._api_error_v1_api as e: + if e.status == 404: pass # Pod not yet created/visible + else: raise e + sleep(self.pod_status_check_sleep) + + if not pod_ready: + raise DruncK8sException( + f"'{podname}' pod did not become API Ready in {api_ready_timeout} seconds." + ) + + # --- STAGE 2: Wait for HostPort to be externally reachable (using TCP socket) --- + self.log.info(f"Stage 2: Waiting for HostPort {node_name}:{controller_port} to be reachable...") + hostport_ready = False + + grpc_startup_timeout = 120 + hostport_start_time = time() + + while not hostport_ready and (time() - hostport_start_time < grpc_startup_timeout): + # We will try to open a simple TCP socket instead of using HTTP + sock = None # Initialize sock to None + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1.0) # 1 second timeout + # Try to connect + result = sock.connect_ex((node_name, controller_port)) + + if result == 0: + # 0 means the connection was successful + hostport_ready = True + self.log.info(f"Stage 2: HostPort {node_name}:{controller_port} is active (TCP connect success).") + else: + # Connection failed (e.g., connection refused, no route) + self.log.debug(f"HostPort {node_name}:{controller_port} not ready yet (socket error {result}), retrying...") + sleep(self.pod_status_check_sleep) # Wait before retrying + + except socket.gaierror as e: + # Handle DNS name resolution error (e.g., node_name not found) + self.log.warning(f"Failed to resolve hostname '{node_name}': {e}. Retrying...") + sleep(self.pod_status_check_sleep) + except Exception as e: + # Catch any other socket errors + self.log.debug(f"HostPort not ready yet (Socket error: {e}), retrying...") + sleep(self.pod_status_check_sleep) + finally: + if sock: + sock.close() # Always close the socket + + if not hostport_ready: + raise DruncK8sException( + f"HostPort {node_name}:{controller_port} did not become reachable in {grpc_startup_timeout} seconds." + ) + + self.log.info(f"Controller '{podname}' is fully ready.") + pd, pr, pu = ProcessDescription(), ProcessRestriction(), ProcessUUID(uuid=uuid) pd.CopyFrom(self.boot_request[uuid].process_description) pr.CopyFrom(self.boot_request[uuid].process_restriction) diff --git a/src/drunc/utils/grpc_utils.py b/src/drunc/utils/grpc_utils.py index 972a91e2a..092f5f4e4 100644 --- a/src/drunc/utils/grpc_utils.py +++ b/src/drunc/utils/grpc_utils.py @@ -5,7 +5,7 @@ from druncschema.generic_pb2 import PlainText from druncschema.request_response_pb2 import Response, ResponseFlag from druncschema.token_pb2 import Token -from google.protobuf import any_pb2 +from google.protobuf import any_pb2, json_format from google.protobuf.descriptor import FieldDescriptor from google.protobuf.message import Message from google.rpc import code_pb2, error_details_pb2 @@ -170,7 +170,6 @@ def copy_token(token: Token) -> Token: token_copy.CopyFrom(token) return token_copy - @dataclass class GrpcErrorDetails: """ @@ -310,4 +309,24 @@ def extract_grpc_rich_error(grpc_error: grpc.RpcError) -> GrpcErrorDetails: return GrpcErrorDetails( code=code, message=status.message or "No message", details=error_details + +def grpc_proto_to_dict(proto_message: Message) -> dict: + """ + Converts a gRPC Protobuf message object to a Python dictionary. + """ + return json_format.MessageToDict( + proto_message, + preserving_proto_field_name=True + # Removed: including_default_value_fields=True + ) + +def dict_to_grpc_proto(data: dict, proto_class_instance: Message) -> Message: + """ + Converts a Python dictionary into an instance of a gRPC Protobuf message. + 'proto_class_instance' should be an empty instance, e.g., Token() + """ + return json_format.ParseDict( + data, + proto_class_instance, + ignore_unknown_fields=True ) From c1c05ea62babfce66d2c66d421361a51487d35af Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Mon, 3 Nov 2025 23:59:08 +0100 Subject: [PATCH 03/14] collected changes for k8s pm update --- src/drunc/controller/controller_driver.py | 34 +- src/drunc/controller/interface/controller.py | 76 +- src/drunc/controller/interface/shell_utils.py | 233 +++--- .../process_manager/k8s_process_manager.py | 689 ++++++++++-------- .../process_manager/process_manager_driver.py | 78 +- src/drunc/utils/grpc_utils.py | 2 + 6 files changed, 554 insertions(+), 558 deletions(-) diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index 3b5f55930..75aaa29a2 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -1,4 +1,3 @@ -import os from functools import wraps import grpc @@ -34,9 +33,10 @@ class ControllerDriver: def __init__(self, address: str, token: Token): self.log = get_logger("controller.ControllerDriver") self.address = address - options = [] + options = [ + ("grpc.keepalive_time_ms", 60000) # pings the server every 60 seconds + ] target_address = f"ipv4:{self.address}" - self.log.info(f"Creating ControllerDriver for '{self.address}', connecting to gRPC target '{target_address}'") self.channel = grpc.insecure_channel(target_address, options=options) self.stub = ControllerStub(self.channel) self.token = Token() @@ -102,36 +102,8 @@ def describe( ) request.token.CopyFrom(self.token) - # --- NEW DIAGNOSTIC BLOCK --- - print("\n--- INSIDE ControllerDriver.describe() ---", flush=True) - print(f"Connecting to self.address: {self.address}", flush=True) - print(f"Request token user: {request.token.user_name}", flush=True) - print(f"Request target: '{request.target}'", flush=True) - print(f"Timeout: {timeout}s", flush=True) - - # Check environment variables *at the moment of the call* - print(f"ENV http_proxy: {os.environ.get('http_proxy')}", flush=True) - print(f"ENV https_proxy: {os.environ.get('https_proxy')}", flush=True) - print(f"ENV no_proxy: {os.environ.get('no_proxy')}", flush=True) - print(f"ENV GRPC_ENABLE_FORK_SUPPORT: {os.environ.get('GRPC_ENABLE_FORK_SUPPORT')}", flush=True) - print(f"gRPC Channel Target (from self.channel): {self.channel._channel.target().decode('utf-8')}", flush=True) - print("--- CALLING self.stub.describe() NOW (Line 102) ---", flush=True) - # --- END DIAGNOSTIC BLOCK --- - try: - # This is the line that fails (line 102) - print("RIGHT HERE KID") response = self.stub.describe(request, timeout=timeout) - print("RIGHT AFTER") - print(response) - - except socket.herror as e: - # Also add a specific catch to be 100% sure - print(f"\n--- CAUGHT socket.herror AT THE SOURCE ---", flush=True) - print(f"ERROR: {e}", flush=True) - print(f"gRPC Channel Target was: {self.channel._channel.target().decode('utf-8')}", flush=True) - raise e # Re-raise the error - except grpc.RpcError as e: handle_grpc_error(e) diff --git a/src/drunc/controller/interface/controller.py b/src/drunc/controller/interface/controller.py index fd5b1cc19..15d72874d 100644 --- a/src/drunc/controller/interface/controller.py +++ b/src/drunc/controller/interface/controller.py @@ -106,51 +106,14 @@ def controller_cli( token=token, ) -# In src/drunc/controller/interface/controller.py - - def serve(listen_addr: str) -> tuple: + def serve(listen_addr: str) -> None: server = grpc.server(concurrent.futures.ThreadPoolExecutor(max_workers=10)) add_ControllerServicer_to_server(ctrlr, server) - - # --- MODIFIED BINDING LOGIC --- - bind_addr = listen_addr # Default fallback - actual_port = 0 - try: - # Extract port, removing potential grpc:// prefix and host part - port_str = listen_addr.split(':')[-1] - port = int(port_str) - - # We MUST bind to '[::]' (all interfaces) for HostPort to work - bind_addr = f'[::]:{port}' - - log.info(f"Original listen address '{listen_addr}', binding server to '{bind_addr}'.") - actual_port = server.add_insecure_port(bind_addr) # Bind here - - if actual_port == 0 and port != 0: # Check if binding failed (port=0 is OK) - raise RuntimeError(f"Failed to bind server to {bind_addr}, port came back as 0.") - elif port == 0 and actual_port != 0: - log.info(f"OS assigned port {actual_port} for binding '[::]:0'") - port = actual_port # Update port to the one assigned - elif actual_port != port: - log.warning(f"Requested port {port} but bound to {actual_port}. Check for conflicts.") - - except (ValueError, IndexError, RuntimeError) as e: - log.critical(f"CRITICAL: Failed to parse port or bind server: {e}. Attempting fallback...") - # Fallback: Try binding to the original address directly - try: - actual_port = server.add_insecure_port(listen_addr) - if actual_port == 0: - raise RuntimeError(f"Fallback bind to '{listen_addr}' also failed (port 0).") - log.warning(f"Bound to fallback address '{listen_addr}' on port {actual_port}. HostPort might not work correctly.") - bind_addr = listen_addr # Update bind_addr for logging - except Exception as fallback_e: - log.critical(f"CRITICAL: Fallback server bind failed: {fallback_e}") - sys.exit(1) # Exit pod on total bind failure - # --- END MODIFIED BINDING LOGIC --- + port = server.add_insecure_port(listen_addr) server.start() - log.info(f"'{ctrlr.name}' gRPC server started, listening internally on '{bind_addr}' (reported port: {actual_port})") - return server, actual_port # Return the port it *actually* bound to + log.debug(f"'{ctrlr.name}' was started on '{port}'") + return server, port def controller_shutdown(): log.info("Requested termination") @@ -162,8 +125,7 @@ def kill_me(sig, frame): l = get_logger("controller.kill_me") l.info("Sending SIGKILL") if ctrlr.top_segment_controller: - if hasattr(ctrlr, "connectivity_service") and ctrlr.connectivity_service: - ctrlr.connectivity_service.retract_partition(fail_quickly=True) + ctrlr.connectivity_service.retract_partition(fail_quickly=True) pgrp = os.getpgid(os.getpid()) os.killpg(pgrp, signal.SIGKILL) @@ -176,27 +138,9 @@ def shutdown(sig, frame): kill_me(sig, frame) try: - # Pass commandfacility (which becomes listen_addr inside serve) - server, actual_port_bound = serve(commandfacility) - - # --- MODIFIED ADVERTISE LOGIC --- - # We need to advertise the EXTERNAL address, not the internal '[::]' - advertise_host = commandfacility - if advertise_host.startswith('grpc://'): - advertise_host = advertise_host[len('grpc://'):] - advertise_host = advertise_host.split(':')[0] # Get 'localhost' or IP from -c arg - - # Resolve 'localhost' to the node's real, external IP for advertising - advertise_host_resolved = resolve_localhost_and_127_ip_to_network_ip(advertise_host) - - # Use the actual port the server bound to - advertise_address = f"grpc://{advertise_host_resolved}:{actual_port_bound}" - - log.info(f"Advertising controller address as: {advertise_address}") - # Update the controller's internal URI, which is used by describe() - ctrlr.advertise_control_address(advertise_address) - # --- END MODIFIED ADVERTISE LOGIC --- - + server, port = serve(commandfacility) + server_name = commandfacility.split(":")[0] + ctrlr.advertise_control_address(f"grpc://{server_name}:{port}") ctrlr.init_controller() # Add signal handling for gRPC server @@ -223,7 +167,3 @@ def signal_handler(signum, frame): except Exception as e: log.exception(e) - log.critical("Controller_cli failed to start, exiting.") - controller_shutdown() # Try to clean up - sys.exit(1) # Exit with error - diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 8d23cda3c..4917f1c7c 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -1,4 +1,5 @@ import datetime +import ipaddress import logging import os import socket @@ -7,16 +8,9 @@ from collections.abc import Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from functools import partial +from functools import lru_cache, partial from urllib.parse import urlparse -import subprocess -import json -from druncschema.controller_pb2_grpc import ControllerStub -from druncschema.controller_pb2 import AddressedCommand -from druncschema.token_pb2 import Token -from drunc.utils.grpc_utils import grpc_proto_to_dict, dict_to_grpc_proto - import click import grpc from druncschema.controller_pb2 import ( @@ -52,56 +46,6 @@ ) from drunc.utils.utils import format_name_for_cli, get_logger -_GRPC_HELPER_SCRIPT = """ -import grpc -import sys -import json -from druncschema.controller_pb2_grpc import ControllerStub -from druncschema.controller_pb2 import AddressedCommand -from druncschema.token_pb2 import Token -from druncschema.request_response_pb2 import Request -from drunc.utils.grpc_utils import grpc_proto_to_dict, dict_to_grpc_proto - -# This function mimics the successful test_grpc_client.py -def run_describe(address, token_dict): - try: - # Re-create the token object from the dictionary - token = dict_to_grpc_proto(token_dict, Token()) - - options = [] # Use minimal, clean options, just like the test script - channel = grpc.insecure_channel(address, options=options) - stub = ControllerStub(channel) - - request = AddressedCommand( - token=token, - command_name="describe", - target="", - execute_along_path=True, - ) - - # Use a short, reasonable timeout - response = stub.describe(request, timeout=10.0) - - # Print the successful response as JSON to stdout - print(json.dumps(grpc_proto_to_dict(response))) - sys.exit(0) - - except Exception as e: - # Print any errors to stderr - print(f"gRPC helper script failed: {e}", file=sys.stderr) - import traceback - traceback.print_exc(file=sys.stderr) - sys.exit(1) - -if __name__ == "__main__": - if len(sys.argv) != 3: - print(f"Usage: {sys.argv[0]}
", file=sys.stderr) - sys.exit(1) - - address = sys.argv[1] - token_dict = json.loads(sys.argv[2]) - run_describe(address, token_dict) -""" @dataclass(slots=True) class StatusDescriptionPair: @@ -125,11 +69,8 @@ def match_children( def get_status_table( status_response: StatusResponse, describe_response: DescribeResponse ): - print("GST 1") status = status_response.status - print("GST 2") description = describe_response.description - print("GST 3") t = Table( title=( @@ -146,24 +87,17 @@ def get_status_table( t.add_column("Included") t.add_column("Endpoint") - print("GST 4") - def add_status_to_table( table: Table, status_response: StatusResponse, describe_response: DescribeResponse, prefix: str, ): - print("GST 41") status = status_response.status - print("GST 42") description = describe_response.description - print("GST 43") if status is None or description is None: return - print("GST 44") - def update_endpoint(endpoint: str) -> str: """ Parses endpoint to a human readable hostname @@ -177,18 +111,11 @@ def update_endpoint(endpoint: str) -> str: if not endpoint: return "" - print("GST 45") - ip_address = urlparse(endpoint).hostname - print("GST 46") if not ip_address: return "" - print("GST 47") - print(ip_address) - #hostname, _, _ = socket.gethostbyaddr(ip_address) - print("GST 48") - #return endpoint.replace(ip_address, hostname) - return endpoint + resolved_host = get_hostname_smart(ip_address) + return endpoint.replace(ip_address, resolved_host) table.add_row( prefix + status_response.name, @@ -200,19 +127,14 @@ def update_endpoint(endpoint: str) -> str: update_endpoint(description.endpoint), ) - print("GST 49.1") children = match_children(status_response.children, describe_response.children) - print("GST 49.2") children_list = sorted(list(children.keys())) - print("GST 49.3") - for child in children_list: child_status = getattr(children[child], "status", None) if not child_status: continue child_describe = children[child].description - print("GST 49.6") if child_status is None or child_describe is None: raise DruncShellException( f"No matching status and description for child '{child}'" @@ -221,8 +143,6 @@ def update_endpoint(endpoint: str) -> str: add_status_to_table(t, status_response, describe_response, "") - print("GST 5") - def add_runinfo_to_table(table: Table, status: Status): table.add_row("Run number", str(status.run_info.run_number)) table.add_row("Run type", status.run_info.run_type) @@ -243,8 +163,6 @@ def add_runinfo_to_table(table: Table, status: Status): table.add_row("Config file", status.run_info.run_config_file) table.add_row("Config ID", status.run_info.run_config_name) - print("GST 6") - if status.HasField("run_info"): runinfo_table = Table( title="Run Info", @@ -255,8 +173,6 @@ def add_runinfo_to_table(table: Table, status: Status): add_runinfo_to_table(runinfo_table, status) return Group(t, runinfo_table) - print("GST 7") - return t @@ -268,11 +184,8 @@ def __init__(self, ctx, refresh_per_second=2, *args, **kwargs) -> None: def update_table(self): statuses = self.ctx.get_driver("controller").status() - print("MAYBE") descriptions = self.ctx.get_driver("controller").describe() - print("MAYBE2") self.table = get_status_table(statuses, descriptions) - print("MAYBE3") def get_renderable(self) -> ConsoleRenderable | RichCast | str: renderable = Group(self.table, *self.get_renderables()) @@ -311,6 +224,7 @@ def controller_cleanup(): return controller_cleanup + def controller_setup(ctx, controller_address): log = logging.getLogger("controller.shell_utils") if not hasattr(ctx, "took_control"): @@ -319,7 +233,8 @@ def controller_setup(ctx, controller_address): ) desc = Description() - timeout = 60 # Total timeout for the connection loop + + timeout = 60 with Progress( SpinnerColumn(), @@ -334,67 +249,29 @@ def controller_setup(ctx, controller_address): ) stored_exception = None - response_json_str = None # Store the stdout from the helper start_time = time.time() while time.time() - start_time < timeout: progress.update(waiting, completed=time.time() - start_time) try: - # --- NEW SUBPROCESS CONNECTION LOGIC --- - - python_exe = sys.executable - token_dict = grpc_proto_to_dict(ctx.get_token()) # This was line 316 - env = os.environ.copy() - - process = subprocess.Popen( - [python_exe, "-c", _GRPC_HELPER_SCRIPT, controller_address, json.dumps(token_dict)], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - env=env - ) - - stdout, stderr = process.communicate(timeout=15) - - if process.returncode == 0: - response_json_str = stdout - stored_exception = None - log.debug("gRPC helper script connected successfully.") - break - else: - raise ServerUnreachable(f"gRPC helper script failed (rc={process.returncode}): {stderr}") - - # --- END NEW SUBPROCESS LOGIC --- - - except (ServerUnreachable, subprocess.TimeoutExpired, json.JSONDecodeError) as e: + desc = ctx.get_driver("controller").describe().description + stored_exception = None + break + except ServerUnreachable as e: stored_exception = e - log.debug(f"Connection attempt failed: {e}. Retrying in 1s...") - time.sleep(1) # Wait 1s before retrying + time.sleep(1) except Exception as e: - # --- FIX IS HERE --- - ctx.log.critical("Could not get the controller's status") # Use ctx.log.critical - ctx.log.critical(e) - ctx.log.critical("Exiting.") - # --- END FIX --- + ctx.critical("Could not get the controller's status") + ctx.critical(e) + ctx.critical("Exiting.") ctx.terminate() raise e if stored_exception is not None: - log.error("Failed to connect to controller after timeout.") raise stored_exception - try: - response_dict = json.loads(response_json_str) - if 'description' not in response_dict: - raise DruncSetupException(f"Invalid 'describe' response from helper script: 'description' field missing. Response: {response_json_str}") - desc = dict_to_grpc_proto(response_dict['description'], Description()) - except Exception as e: - log.error(f"Failed to parse gRPC helper JSON response: {e}") - log.error(f"Response was: {response_json_str}") - raise DruncSetupException(f"Failed to parse gRPC response: {e}") - log.info( f"{controller_address} is '{desc.name}.{desc.session}' (name.session), starting listening..." ) @@ -403,18 +280,20 @@ def controller_setup(ctx, controller_address): ctx.start_listening_controller(desc.broadcast) log.debug("Connected to the controller") - - # --- This section remains the same, checking status and taking control --- + + # 60s for everyone to show up on the connectivity service, and 10s to come out of initialising state timeout = 60 + 10 + time_start = time.time() state = ctx.get_driver("controller").status().status.state.lower() with StatusTableUpdater(ctx) as updater: task = updater.add_task("Waiting on tree initialisation...", total=timeout) while time.time() - time_start < timeout and state == "initialising": state = ctx.get_driver("controller").status().status.state.lower() - updater.update(task, completed=time.time() - start_time) + updater.update(task, completed=time.time() - time_start) updater.update_table() time.sleep(0.5) + updater.update_table() if state == "initialising": @@ -440,6 +319,7 @@ def controller_setup(ctx, controller_address): return desc + def search_fsm_command(command_name: str, command_list: list[FSMCommand]): for command in command_list: if command_name == command.name: @@ -830,3 +710,74 @@ def grab_default_value_from_env(argument_name): )(cmd) return cmd, cmd_name + + +# --- Helper function to check for internal IPs --- +# We cache this check too, though it's already very fast. +@lru_cache(maxsize=1024) +def is_private_ip(ip_str: str) -> bool: + """ + Checks if an IP address is private (RFC 1918), loopback, or link-local. + These IPs will almost never have a public reverse DNS record. + """ + if not ip_str: + return True + try: + ip_obj = ipaddress.ip_address(ip_str) + # .is_private = 10.x, 172.16-31.x, 192.168.x + # .is_loopback = 127.x.x.x + # .is_link_local = 169.254.x.x + return ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local + except ValueError: + # Not a valid IP address, treat as "private" to skip lookup + return True + + +# --- HELPER 2: Checks if a string is an IP vs. a domain name --- +@lru_cache(maxsize=1024) +def is_valid_ip(hostname: str) -> bool: + """Checks if a string is a valid IPv4 or IPv6 address.""" + if not hostname: + return False + try: + ipaddress.ip_address(hostname) + return True + except ValueError: + return False + + +# --- Your new, improved function --- +@lru_cache(maxsize=4096) +def get_hostname_smart(ip_address: str, timeout_seconds: float = 0.2) -> str: + """ + Resolves an IP to a hostname, with optimizations: + 1. Caches all results. + 2. Immediately skips private/internal IPs (like K8s). + 3. Uses a short timeout for public IPs. + """ + + # 1. The Kubernetes/Internal IP check: + # If it's a private IP, don't even try to resolve it. + # This is the optimization that fixes your K8s problem. + if is_private_ip(ip_address): + return ip_address + + # 2. It's a public IP, so let's try to resolve it. + # We must use a try/finally to ensure we reset the global timeout. + original_timeout = socket.getdefaulttimeout() + try: + # Set a short global timeout *just* for this operation + socket.setdefaulttimeout(timeout_seconds) + + hostname, _, _ = socket.gethostbyaddr(ip_address) + return hostname + + except (socket.herror, socket.gaierror, socket.timeout): + # This catches "host not found", "name or service not known", + # or our own 0.2-second timeout. + # We cache the failure by returning the IP. + return ip_address + + finally: + # 3. CRITICAL: Always reset the global timeout + socket.setdefaulttimeout(original_timeout) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index f1f7477a2..3dd5cace2 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -488,8 +488,11 @@ def _create_nodeport_service(self, podname, session, pod_uid) -> None: if e.status != 409: self.log.error(f"Failed to create NodePort service for {podname}: {e}") - def _create_pod(self, podname, session, boot_request: BootRequest) -> None: - """Constructs and creates a Kubernetes Pod manifest.""" + def _build_pod_main_container( + self, podname: str, boot_request: BootRequest, lcs_port: int | None + ) -> client.V1Container: + """Builds the primary V1Container manifest, including command and preStop hook.""" + pod_image = self.configuration.data.image exec_and_args_list = boot_request.process_description.executable_and_arguments @@ -529,18 +532,24 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: f"'{podname}' identified as a Python app, no preStop hook needed." ) - # Create container with conditional lifecycle hook - container_kwargs = { - "name": podname, - "image": pod_image, - "command": ["/bin/sh", "-c"], - "args": [main_command_str], - "env": [ + container_ports = [] + if podname == self.connection_server_name and lcs_port is not None: + container_ports.append( + client.V1ContainerPort(container_port=lcs_port, name="http-port") + ) + + main_container = client.V1Container( + name=podname, + image=pod_image, + command=["/bin/sh", "-c"], + args=[main_command_str], + env=[ client.V1EnvVar(name=k, value=v) for k, v in boot_request.process_description.env.items() ], - "ports": [], - "volume_mounts": [ + lifecycle=lifecycle_hook, + ports=container_ports, + volume_mounts=[ client.V1VolumeMount(name="nfs", mount_path="/nfs"), client.V1VolumeMount(name="cvmfs", mount_path="/cvmfs"), ], @@ -548,40 +557,45 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: "security_context": client.V1SecurityContext( run_as_user=os.getuid(), run_as_group=os.getgid() ), - } - # Only add lifecycle hook for C++ applications if lifecycle_hook is not None: container_kwargs["lifecycle"] = lifecycle_hook main_container = client.V1Container(**container_kwargs) + ) + return main_container - all_containers = [main_container] - + def _get_pod_node_selector( + self, podname: str, restriction: ProcessRestriction + ) -> dict: + """Verifies the target host and returns the Kubernetes node selector.""" node_selector = {} - if boot_request.process_restriction.allowed_hosts: - target_host = boot_request.process_restriction.allowed_hosts[0] - # Resolve localhost to actual hostname for Kubernetes node selection + if restriction.allowed_hosts: + target_host = restriction.allowed_hosts[0] + if target_host == "localhost": target_host = resolve_localhost_to_hostname(target_host) self.log.info( f"Resolved localhost to '{target_host}' for node selection" ) - # Verify the target host is available in the cluster before scheduling self._verify_host_in_cluster(target_host) node_selector = {"kubernetes.io/hostname": target_host} self.log.info( f"Pod '{podname}' will be scheduled on node '{target_host}' (from boot request)" ) + return node_selector - host_aliases = [] + def _get_pod_host_aliases( + self, podname: str, session: str + ) -> list[client.V1HostAlias] | None: + """Gets the ClusterIP of the connection server and prepares host aliases.""" + host_aliases = None if ( podname != self.connection_server_name and self.local_connection_server_is_booted ): - # Wait for service to get ClusterIP connection_server_ip = None retry_count = 0 max_retries = 10 @@ -602,8 +616,18 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: self.log.warning( f"Could not get connection server ClusterIP for pod '{podname}'" ) - - pod_manifest = client.V1Pod( + return host_aliases + + def _build_pod_manifest( + self, + podname: str, + session: str, + main_container: client.V1Container, + node_selector: dict, + host_aliases: list[client.V1HostAlias] | None, + ) -> client.V1Pod: + """Assembles the final V1Pod object.""" + return client.V1Pod( api_version="v1", kind="Pod", metadata=self._meta_v1_api( @@ -618,7 +642,7 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: node_selector=node_selector, termination_grace_period_seconds=self.kill_timeout, restart_policy="Never", - containers=all_containers, + containers=[main_container], host_aliases=host_aliases if host_aliases else None, volumes=[ client.V1Volume( @@ -632,53 +656,119 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: ), ) - try: - start_time = time() - pod_uid = None + def _execute_pod_creation_api( + self, session: str, podname: str, pod_manifest: client.V1Pod + ) -> str: + """Executes the API call to create the pod, handling 409 conflict during restarts.""" + start_time = time() + pod_uid = None - while True: - try: - created_pod = self._core_v1_api.create_namespaced_pod( - session, pod_manifest - ) - self.log.info(f'Creating pod "{session}.{podname}"') - pod_uid = created_pod.metadata.uid - break + while True: + try: + created_pod = self._core_v1_api.create_namespaced_pod( + session, pod_manifest + ) + self.log.info(f'Creating pod "{session}.{podname}"') + pod_uid = created_pod.metadata.uid + break - # this covers restart where we need to wait for cleanup - except self._api_error_v1_api as e: - is_409_conflict = e.status == 409 + except self._api_error_v1_api as e: + is_409_conflict = e.status == 409 - if ( - is_409_conflict - and time() - start_time < self.restart_cleanup_time - ): - sleep(self.restart_cleanup_polling) - continue - raise e + if is_409_conflict and time() - start_time < self.restart_cleanup_time: + sleep(self.restart_cleanup_polling) + continue + raise e + return pod_uid + + def _create_associated_service( + self, + podname: str, + session: str, + pod_uid: str, + boot_request: BootRequest, + lcs_port: int | None, + ) -> None: + """Calls the appropriate service creation method based on pod type.""" + if podname == self.connection_server_name: + if lcs_port is None: + # Should not happen after the check in _create_pod, but for safety: + raise DruncK8sException( + "LCS service creation failed: port was not extracted." + ) - if podname == self.connection_server_name: + # If LCS, call nodeport service creation + self._create_nodeport_service( + podname, session, pod_uid + ) # Note: _create_nodeport_service must rely on self.connection_server_port, which was set in _create_pod + + elif "root-controller" in podname: + self.log.info( + f"'{podname}' is the root controller, checking for NodePort service." + ) + port = self._extract_port_from_cmd(boot_request) + if port: + self.log.info(f"Extracted port {port} for '{podname}' NodePort.") + self.connection_server_port = port + self.connection_server_node_port = port self._create_nodeport_service(podname, session, pod_uid) - elif "root-controller" in podname: - self.log.info( - f"'{podname}' is the root controller, creating isolated NodePort service." + else: + self.log.warning( + f"Could not extract port for '{podname}', falling back to headless." ) - port = self._extract_port_from_cmd(boot_request) - if port: - self.log.info(f"Extracted port {port} for '{podname}' NodePort.") - self.connection_server_port = port - self.connection_server_node_port = port - self._create_nodeport_service(podname, session, pod_uid) + self._create_headless_service(podname, session, pod_uid) + + else: + self._create_headless_service(podname, session, pod_uid) + + def _create_pod(self, podname, session, boot_request: BootRequest) -> None: + """Constructs and creates a Kubernetes Pod manifest and its associated service.""" + try: + lcs_port = None + + # A. Early Port Extraction and Class Variable Setup for LCS + if podname == self.connection_server_name: + lcs_port = self._extract_port_from_cmd(boot_request) + if lcs_port: + # CRITICAL FIXES: Set both port variables + self.connection_server_port = lcs_port + self.connection_server_node_port = ( + lcs_port # <--- THIS LINE WAS MISSING + ) else: - self.log.warning( - f"Could not extract port for '{podname}', falling back to headless." + raise DruncK8sException( + f"Could not extract port for LCS '{podname}'." ) - self._create_headless_service(podname, session, pod_uid) - else: - self._create_headless_service(podname, session, pod_uid) + # B. Build the main container manifest (Signature takes 4 args) + main_container = self._build_pod_main_container( + podname, boot_request, lcs_port + ) + # C/D/E. [Unchanged code for node_selector, host_aliases, pod_manifest] + node_selector = self._get_pod_node_selector( + podname, boot_request.process_restriction + ) + host_aliases = self._get_pod_host_aliases(podname, session) + pod_manifest = self._build_pod_manifest( + podname, + session, + main_container, + node_selector, + host_aliases, + ) + + # F. Execute the pod creation API call + pod_uid = self._execute_pod_creation_api(session, podname, pod_manifest) + + # G. Create associated service (Signature takes 6 args) + self._create_associated_service( + podname, session, pod_uid, boot_request, lcs_port + ) + + # H. Error Handling except self._api_error_v1_api as e: + start_time = time() error_message = f'Couldn\'t create resources for pod "{session}.{podname}". Reason: {e.reason}. Kubernetes API Error: ({e.status})' if e.status == 409 and time() - start_time >= self.restart_cleanup_time: @@ -689,6 +779,12 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: self.log.error(error_message) raise DruncK8sException(error_message) from e + except DruncK8sException: + raise + except Exception as e: + raise DruncK8sException( + f"Failed to create pod '{session}.{podname}': {e}" + ) from e def _get_connection_server_cluster_ip(self, session) -> str: """Gets the ClusterIP of the connection server service.""" @@ -704,6 +800,7 @@ def _get_connection_server_cluster_ip(self, session) -> str: def _extract_port_from_cmd(self, boot_request) -> int | None: """ Parses the boot request's command arguments to find a port. + It must cover Gunicorn (hardcoded and env var) and drunc-controller. """ # Check all command parts for a port argument for e_and_a in boot_request.process_description.executable_and_arguments: @@ -711,21 +808,46 @@ def _extract_port_from_cmd(self, boot_request) -> int | None: arg_str = " ".join(all_args) # 1. Check for gunicorn bind syntax (for local-connection-server) - # e.g., gunicorn --bind=0.0.0.0:30005 if "gunicorn" in arg_str: - match = re.search(r"-b\s+[\w\.]+:(\d+)", arg_str) - if not match: - match = re.search(r"--bind[\s=]+[\w\.]+:(\d+)", arg_str) - if match: - port = int(match.group(1)) + # Regex 1: Match hardcoded port: e.g., --bind=0.0.0.0:30005 + match_hardcoded = re.search(r"(-b|--bind)[\s=]+[\w\.]+:(\d+)", arg_str) + + if match_hardcoded: + port = int(match_hardcoded.group(2)) if port != 0: - self.log.info(f"Extracted gunicorn port {port} from command.") + self.log.info( + f"Extracted hardcoded gunicorn port {port} from command." + ) return port - # 2. Check for drunc-controller --port syntax - # e.g., drunc-controller --port 12345 + # Regex 2: Match environment variable port: e.g., --bind=0.0.0.0:${CONNECTION_PORT} + # Group 2 captures the variable name (e.g., 'CONNECTION_PORT') + match_var = re.search(r"(-b|--bind)[\s=]+[\w\.]+:\$\{(\w+)\}", arg_str) + + if match_var: + var_name = match_var.group(2) + # Look up the value in the environment variables + port_val = boot_request.process_description.env.get(var_name) + + if port_val is not None: + try: + port = int(port_val) + if port != 0: + self.log.info( + f"Extracted gunicorn port {port} from environment variable '{var_name}'." + ) + return port + except ValueError: + self.log.error( + f"Environment variable '{var_name}' ('{port_val}') is not an integer port." + ) + else: + self.log.warning( + f"Extracted port variable '{var_name}' but it was not found in environment map." + ) + + # 2. Check for drunc-controller --port syntax (unchanged) if "controller" in arg_str: - # Matches --port 12345 or --port=12345 match = re.search(r"--port[\s=]+(\d+)", arg_str) if match: port = int(match.group(1)) @@ -735,10 +857,8 @@ def _extract_port_from_cmd(self, boot_request) -> int | None: ) return port - # 3. NEW: Check for drunc-controller -c grpc://... syntax - # e.g., drunc-controller -c grpc://localhost:12345 or -c grpc://localhost:0 + # 3. Check for drunc-controller -c grpc://... syntax (unchanged) if "controller" in arg_str: - # Matches -c grpc://...:12345 or -c "grpc://...:12345" match = re.search(r"-c\s+[\"\']?grpc:\/\/[^:]+:(\d+)[\"\']?", arg_str) if match: port = int(match.group(1)) @@ -752,7 +872,6 @@ def _extract_port_from_cmd(self, boot_request) -> int | None: "Controller gRPC port is 0, cannot create NodePort." ) - # If no non-zero port is found, return None return None def _get_process_uid(self, query: ProcessQuery, order_by: str = None) -> list[str]: @@ -817,63 +936,18 @@ def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: process = self.__boot(boot_request, this_uuid) return ProcessInstanceList(values=[process]) - def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: - """ - Internal boot method. Handles pod creation and special logic for the connection server. - - For the connection server: Wait for it to be ready and check the NodePort service - - For all other pods: Boot is NON-BLOCKING. - """ - print(boot_request) - session = boot_request.process_description.metadata.session - podname = boot_request.process_description.metadata.name - + def _run_pre_boot_checks( + self, session: str, podname: str, boot_request: BootRequest + ) -> None: + """Performs initial validation and checks for NodePort collision.""" if not validate_k8s_session_name(session): raise DruncK8sNamespaceException( f'Invalid session/namespace name "{session}". Must match RFC1123 label: ' "lowercase alphanumeric or '-', start/end with alphanumeric, max 63 chars." ) - if boot_request.process_restriction.allowed_hosts: - hostname = boot_request.process_restriction.allowed_hosts[0] - boot_request.process_description.metadata.hostname = hostname - - if uuid in self.boot_request: - raise DruncK8sPodException(f'"{session}.{podname}":{uuid} already exists!') - - # Extract ports for LCS - if podname == self.connection_server_name: - self.log.info(f"Waiting for '{podname}' to become ready...") - - port = None - env_vars = boot_request.process_description.env - - if "CONNECTION_PORT" in env_vars: - port_str = env_vars["CONNECTION_PORT"] - try: - port = int(port_str) - self.log.info( - f"Using port {port} from 'CONNECTION_PORT' environment variable." - ) - except (ValueError, TypeError): - raise DruncK8sException( - f"The provided CONNECTION_PORT '{port_str}' is not a valid integer." - ) - - if port is None: - self.log.info( - "CONNECTION_PORT not found in env, falling back to parsing gunicorn command." - ) - port = self._extract_port_from_cmd(boot_request) - - if port: - self.connection_server_port = port - self.connection_server_node_port = port - else: - raise DruncK8sException( - "Could not determine connection server port from 'CONNECTION_PORT' env var or gunicorn command." - ) - - # Check for NodePort collision + # Check for NodePort collision (only necessary if connection server is being started) + if podname == self.connection_server_name and self.connection_server_node_port: api = self._core_v1_api all_services = api.list_service_for_all_namespaces() for svc in all_services.items: @@ -890,208 +964,223 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: "Cannot start another local connection server with the same port." ) - self._create_namespace(session) + def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: + """Blocking two-stage wait for the Local Connection Server (NodePort) to be fully ready.""" + node_name = None + pod_ready = False + start_time = time() + api_ready_timeout = self.pod_ready_timeout - self.boot_request[uuid] = BootRequest() - self.boot_request[uuid].CopyFrom(boot_request) + # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- + self.log.info( + f"Stage 1: Waiting for '{podname}' pod to be Running and Ready..." + ) + while not pod_ready and (time() - start_time < api_ready_timeout): + try: + pod_status = self._core_v1_api.read_namespaced_pod_status( + podname, session + ) + if pod_status.status.phase == "Running": + # Check readiness condition + is_ready = False + if pod_status.status.conditions: + for condition in pod_status.status.conditions: + if condition.type == "Ready" and condition.status == "True": + is_ready = True + break + + if is_ready: + # Success for Stage 1 + pod_ready = True + node_name = pod_status.spec.node_name + self.log.info(f"Stage 1: Pod '{podname}' is API Ready.") - self._create_pod(podname, session, boot_request) - self._add_label(podname, "pod", "uuid", uuid, session=session) - self.log.info(f'"{session}.{podname}":{uuid} boot request sent.') + except self._api_error_v1_api as e: + if e.status == 404: + pass + else: + raise e - # Special handling only for the connection server - if podname == self.connection_server_name: - node_name = None - pod_ready = False - start_time = time() + sleep(self.pod_status_check_sleep) - # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- - self.log.info( - f"Stage 1: Waiting for '{podname}' pod to be Running and Ready..." + if not pod_ready: + raise DruncK8sException( + f"'{podname}' pod did not become API Ready in {api_ready_timeout} seconds." ) - while not pod_ready and (time() - start_time < self.pod_ready_timeout): - try: - pod_status = self._core_v1_api.read_namespaced_pod_status( - podname, session - ) - if ( - pod_status.status.phase == "Running" - and pod_status.status.pod_ip - ): - # Check readiness condition - is_ready = False - if pod_status.status.conditions: - for condition in pod_status.status.conditions: - if ( - condition.type == "Ready" - and condition.status == "True" - ): - is_ready = True - break - - if is_ready: - self.log.info( - f"Stage 1: Pod '{podname}' is API Ready with IP {pod_status.status.pod_ip}." - ) - node_name = pod_status.spec.node_name - pod_ready = True # Exit this loop and go to Stage 2 - except self._api_error_v1_api as e: - if e.status == 404: - pass # Pod not yet created/visible, keep polling - else: - raise e - sleep(self.pod_status_check_sleep) + # --- STAGE 2: Wait for NodePort to be externally reachable (using HTTP urllib) --- + url = f"http://{node_name}:{self.connection_server_node_port}" + self.log.info(f"Stage 2: Waiting for NodePort {url} to be reachable...") + nodeport_ready = False - if not pod_ready: - raise DruncK8sException( - f"'{podname}' pod did not become API Ready in {self.pod_ready_timeout} seconds." - ) + # Use the *remaining* time for this check + remaining_time = self.pod_ready_timeout - (time() - start_time) + nodeport_start_time = time() - # --- STAGE 2: Wait for NodePort to be externally reachable --- - self.log.info( - f"Stage 2: Waiting for NodePort {node_name}:{self.connection_server_node_port} to be reachable..." + while not nodeport_ready and (time() - nodeport_start_time < remaining_time): + try: + urllib.request.urlopen(url, timeout=1) + nodeport_ready = True + self.log.info(f"Stage 2: NodePort {url} is now active.") + except ( + urllib.error.URLError, + ConnectionRefusedError, + TimeoutError, + OSError, + ) as e: + self.log.debug(f"NodePort not ready yet ({e}), retrying...") + sleep(self.pod_status_check_sleep) + + if not nodeport_ready: + raise DruncK8sException( + f"NodePort {url} did not become reachable in {self.pod_ready_timeout} seconds." ) - nodeport_ready = False - url = f"http://{node_name}:{self.connection_server_node_port}" - # Use the *remaining* time for this check - remaining_time = self.pod_ready_timeout - (time() - start_time) - nodeport_start_time = time() + self.local_connection_server_is_booted = True - while not nodeport_ready and ( - time() - nodeport_start_time < remaining_time - ): - try: - # We don't care about the response, just that it doesn't error - # Timeout set to 1s for a quick check - urllib.request.urlopen(url, timeout=1) - nodeport_ready = True - self.log.info(f"Stage 2: NodePort {url} is now active.") - except ( - urllib.error.URLError, - ConnectionRefusedError, - TimeoutError, - OSError, - ) as e: - # Keep polling until timeout - self.log.debug(f"NodePort not ready yet ({e}), retrying...") - sleep(self.pod_status_check_sleep) + self.log.info(f"Connection server '{podname}' is fully ready.") - if not nodeport_ready: - raise DruncK8sException( - f"NodePort {url} did not become reachable in {self.pod_ready_timeout} seconds." - ) + def _wait_for_controller_readiness( + self, podname: str, session: str, boot_request: BootRequest + ) -> None: + """Blocking two-stage wait for Drunc Controller (HostPort) to be fully ready.""" - # --- All clear --- - self.local_connection_server_is_booted = True + self.log.info(f"Waiting for '{podname}' (HostPort) to become ready...") + node_name = None + pod_ready = False + controller_port = self._extract_port_from_cmd(boot_request) + start_time = time() + api_ready_timeout = self.pod_ready_timeout + grpc_startup_timeout = 120 - self.log.info(f"Connection server '{podname}' is fully ready.") - self.log.info( - f" -> For internal cluster access: 'http://localhost:{self.connection_server_port}'" + if not controller_port or controller_port == 0: + raise DruncK8sException( + f"Cannot wait for '{podname}', port is 0 or missing." ) - self.log.info( - f" -> For external access, use NodePort {self.connection_server_node_port} on any cluster node IP (e.g., http://{node_name}:{self.connection_server_node_port})" + + # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- + self.log.info("Stage 1: Waiting for K8s API Readiness...") + while not pod_ready and (time() - start_time < api_ready_timeout): + try: + pod_status = self._core_v1_api.read_namespaced_pod_status( + podname, session + ) + if pod_status.status.phase == "Running": + # Check readiness condition + is_ready = False + if pod_status.status.conditions: + for condition in pod_status.status.conditions: + if condition.type == "Ready" and condition.status == "True": + is_ready = True + break + + if is_ready: + pod_ready = True + node_name = pod_status.spec.node_name + self.log.info( + f"Stage 1: Pod '{podname}' is API Ready on node {node_name}." + ) + break + + except self._api_error_v1_api as e: + if e.status == 404: + pass + else: + raise e + sleep(self.pod_status_check_sleep) + + if not pod_ready: + raise DruncK8sException( + f"'{podname}' pod did not become API Ready in {api_ready_timeout} seconds." ) - elif "root-controller" in podname: - self.log.info(f"Waiting for '{podname}' (HostPort) to become ready...") - node_name = None - pod_ready = False - controller_port = self._extract_port_from_cmd(boot_request) - - if not controller_port or controller_port == 0: - raise DruncK8sException(f"Cannot wait for '{podname}', port is 0 or missing.") - - # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- - self.log.info(f"Stage 1: Waiting for '{podname}' pod to be Running and Ready...") - start_time = time() - api_ready_timeout = self.pod_ready_timeout # Use standard pod timeout - - while not pod_ready and (time() - start_time < api_ready_timeout): - try: - pod_status = self._core_v1_api.read_namespaced_pod_status( - podname, session + # --- STAGE 2: Wait for HostPort to be externally reachable (using TCP socket) --- + self.log.info( + f"Stage 2: Waiting for HostPort {node_name}:{controller_port} to be reachable..." + ) + hostport_ready = False + hostport_start_time = time() + + while not hostport_ready and ( + time() - hostport_start_time < grpc_startup_timeout + ): + sock = None + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1.0) + result = sock.connect_ex((node_name, controller_port)) + + if result == 0: + hostport_ready = True + self.log.info( + f"Stage 2: HostPort {node_name}:{controller_port} is active (TCP connect success)." ) - if ( - pod_status.status.phase == "Running" - and pod_status.status.pod_ip - ): - is_ready = False - if pod_status.status.conditions: - for condition in pod_status.status.conditions: - if condition.type == "Ready" and condition.status == "True": - is_ready = True - break - if is_ready: - self.log.info( - f"Stage 1: Pod '{podname}' is API Ready with IP {pod_status.status.pod_ip}." - ) - node_name = pod_status.spec.node_name - pod_ready = True # Exit this loop and go to Stage 2 - - except self._api_error_v1_api as e: - if e.status == 404: pass # Pod not yet created/visible - else: raise e + else: + self.log.debug( + f"HostPort {node_name}:{controller_port} not ready yet (socket error {result}), retrying..." + ) + sleep(self.pod_status_check_sleep) + + except socket.gaierror as e: + self.log.warning( + f"Failed to resolve hostname '{node_name}': {e}. Retrying..." + ) sleep(self.pod_status_check_sleep) - - if not pod_ready: - raise DruncK8sException( - f"'{podname}' pod did not become API Ready in {api_ready_timeout} seconds." + except Exception as e: + self.log.debug( + f"HostPort not ready yet (Socket error: {e}), retrying..." ) + sleep(self.pod_status_check_sleep) + finally: + if sock: + sock.close() - # --- STAGE 2: Wait for HostPort to be externally reachable (using TCP socket) --- - self.log.info(f"Stage 2: Waiting for HostPort {node_name}:{controller_port} to be reachable...") - hostport_ready = False - - grpc_startup_timeout = 120 - hostport_start_time = time() - - while not hostport_ready and (time() - hostport_start_time < grpc_startup_timeout): - # We will try to open a simple TCP socket instead of using HTTP - sock = None # Initialize sock to None - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(1.0) # 1 second timeout - # Try to connect - result = sock.connect_ex((node_name, controller_port)) - - if result == 0: - # 0 means the connection was successful - hostport_ready = True - self.log.info(f"Stage 2: HostPort {node_name}:{controller_port} is active (TCP connect success).") - else: - # Connection failed (e.g., connection refused, no route) - self.log.debug(f"HostPort {node_name}:{controller_port} not ready yet (socket error {result}), retrying...") - sleep(self.pod_status_check_sleep) # Wait before retrying - - except socket.gaierror as e: - # Handle DNS name resolution error (e.g., node_name not found) - self.log.warning(f"Failed to resolve hostname '{node_name}': {e}. Retrying...") - sleep(self.pod_status_check_sleep) - except Exception as e: - # Catch any other socket errors - self.log.debug(f"HostPort not ready yet (Socket error: {e}), retrying...") - sleep(self.pod_status_check_sleep) - finally: - if sock: - sock.close() # Always close the socket - - if not hostport_ready: - raise DruncK8sException( - f"HostPort {node_name}:{controller_port} did not become reachable in {grpc_startup_timeout} seconds." - ) - - self.log.info(f"Controller '{podname}' is fully ready.") + if not hostport_ready: + raise DruncK8sException( + f"HostPort {node_name}:{controller_port} did not become reachable in {grpc_startup_timeout} seconds." + ) + + self.log.info(f"Drunc controller '{podname}' is fully ready.") + + def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: + """ + Internal boot method. Handles pre-checks, pod creation, and blocking wait for critical services. + """ + session = boot_request.process_description.metadata.session + podname = boot_request.process_description.metadata.name + + # A. Pre-checks (Session validation, NodePort collision) + self._run_pre_boot_checks(session, podname, boot_request) - pd, pr, pu = ProcessDescription(), ProcessRestriction(), ProcessUUID(uuid=uuid) - pd.CopyFrom(self.boot_request[uuid].process_description) - pr.CopyFrom(self.boot_request[uuid].process_restriction) + # B. Resource Creation (Namespace, Pod, Labels) + self._create_namespace(session) + self.boot_request[uuid] = BootRequest() + self.boot_request[uuid].CopyFrom(boot_request) + + self._create_pod(podname, session, boot_request) + self._add_label(podname, "pod", "uuid", uuid, session=session) + self.log.info(f'"{session}.{podname}":{uuid} boot request sent.') + + # C/D. Special handling and blocking wait for critical processes + if podname == self.connection_server_name: + self._wait_for_lcs_readiness(podname, session) + elif "root-controller" in podname: + self._wait_for_controller_readiness(podname, session, boot_request) + + # E. Post-Process (Return the ProcessInstance) + pd, pr, pu = ( + ProcessDescription(), + ProcessRestriction(), + ProcessUUID(uuid=uuid), + ) + pd.CopyFrom(boot_request.process_description) + pr.CopyFrom(boot_request.process_restriction) return ProcessInstance( process_description=pd, process_restriction=pr, status_code=ProcessInstance.StatusCode.RUNNING, + return_code=0, uuid=pu, ) diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 27075aaff..07e3a91f8 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -25,14 +25,20 @@ from druncschema.token_pb2 import Token from drunc.connectivity_service.client import ConnectivityServiceClient +from drunc.connectivity_service.exceptions import ApplicationLookupUnsuccessful +from drunc.controller.utils import get_segment_lookup_timeout from drunc.exceptions import DruncSetupException, DruncShellException -from drunc.process_manager.utils import get_log_path, get_rte_script +from drunc.process_manager.utils import ( + get_log_path, + get_rte_script, +) from drunc.utils.grpc_utils import ( copy_token, extract_grpc_rich_error, handle_grpc_error, ) from drunc.utils.utils import ( + get_control_type_and_uri_from_connectivity_service, get_logger, host_is_local, resolve_localhost_and_127_ip_to_network_ip, @@ -47,8 +53,7 @@ def __init__(self, address: str, token: Token): self.log = get_logger("controller.ProcessManagerDriver") self.address = address options = [ - ("grpc.keepalive_time_ms", 60000), # pings the server every 60 seconds - ("grpc.enable_retries", 1), + ("grpc.keepalive_time_ms", 60000) # pings the server every 60 seconds ] self.channel = grpc.insecure_channel(self.address, options=options) self.stub = ProcessManagerStub(self.channel) @@ -372,21 +377,34 @@ def _discover_controller( ): """ Attempts to discover the controller address after booting applications. + Tries dynamic lookup via connectivity service first, then falls back + to static OKS configuration. """ - top_controller_name = session_dal.segment.controller.id + # Define controller name ONCE in the outer scope + try: + top_controller_name = session_dal.segment.controller.id + except AttributeError as e: + self.log.error(f"Could not determine controller name from OKS: {e}") + top_controller_name = "Unknown-Controller" # Set a default def get_controller_address(session_dal, session_name): from drunc.process_manager.oks_parser import collect_variables env = {} collect_variables(session_dal.environment, env) + # Note: 'env' variable is not currently used in this function. - """ + # --- + # Strategy 1: Try dynamic lookup via Connectivity Service (if available) + # --- if csc: + self.log.info( + f"Attempting to discover controller '{top_controller_name}' via connectivity service at {connection_server}:{connection_port}" + ) try: timeout = ( get_segment_lookup_timeout(session_dal.segment, 60) + 60 - ) # root-controller timout to find all its children + 60s for the root controller to start itself + ) # root-controller timeout to find all its children + 60s for the root controller to start itself self.log.debug( f"Using a timeout of {timeout}s to find the [green]{top_controller_name}[/] on the connectivity service" ) @@ -398,27 +416,54 @@ def get_controller_address(session_dal, session_name): progress_bar=True, title=f"Looking for [green]{top_controller_name}[/] on the connectivity service...", ) + + address = uri.replace("grpc://", "") + self.log.info( + f"Successfully discovered controller '{top_controller_name}' via connectivity service: {address}" + ) + return address # SUCCESS: Return dynamically found address + except ApplicationLookupUnsuccessful: + self.log.warning( + f"Connectivity service lookup failed: Application '{top_controller_name}' not found." + ) + # Log the original failure details self._log_controller_lookup_failure( session_name, top_controller_name, connection_server, connection_port, ) - return + self.log.warning( + "Falling back to static OKS configuration for address resolution." + ) + # DO NOT return. Fall through to Strategy 2. - return uri.replace("grpc://", "") - """ + except Exception as e: + self.log.error( + f"An unexpected error occurred during connectivity service lookup: {e}. " + "Falling back to static OKS configuration." + ) + # DO NOT return. Fall through to Strategy 2. + else: + self.log.info( + "Connectivity service client (csc) is not available. Using static OKS configuration only." + ) + + # --- + # Strategy 2: Fallback to static OKS configuration + # --- + self.log.info( + "Attempting to resolve controller address from static OKS configuration." + ) port_number = None protocol = None service_found = None - top_controller_name = None # Initialize here - - print("ARE WE HERE NOW") + # top_controller_name is already available from the outer scope try: - top_controller_name = session_dal.segment.controller.id + # We already have top_controller_name from outer scope self.log.info( f"Top controller name from OKS config: '{top_controller_name}'" ) @@ -438,7 +483,6 @@ def get_controller_address(session_dal, session_name): ) # Get the first (and presumably only) control service linked - # Using next(iter(...)) provides a slightly cleaner way to get the first item or None service_found = next( iter(session_dal.segment.controller.exposes_service), None ) @@ -447,7 +491,6 @@ def get_controller_address(session_dal, session_name): self.log.info( f"Found linked control service object with ID: '{service_found.id}'" ) - # Check if the service object actually has the port and protocol attributes if ( hasattr(service_found, "port") and service_found.port is not None @@ -472,7 +515,6 @@ def get_controller_address(session_dal, session_name): ) else: - # This case should ideally not be reached due to the check above, but good for robustness self.log.error( f"Could not retrieve the first service object from 'exposes_service' for controller '{top_controller_name}'." ) @@ -523,7 +565,7 @@ def get_controller_address(session_dal, session_name): f"Successfully resolved controller address from OKS config: {final_address}" ) return final_address - # --- END CORRECTED LOGIC WITH ADDED LOGGING --- + # --- END OKS FALLBACK LOGIC --- def keyboard_interrupt_on_sigint(signal, frame): self.log.warning("Interrupted") @@ -538,7 +580,7 @@ def keyboard_interrupt_on_sigint(signal, frame): connection_server = session_dal.connectivity_service.host connection_port = session_dal.connectivity_service.service.port self._log_controller_interrupt( - self, top_controller_name, connection_server, connection_port + top_controller_name, connection_server, connection_port ) else: self.log.warning( diff --git a/src/drunc/utils/grpc_utils.py b/src/drunc/utils/grpc_utils.py index 092f5f4e4..11b1ccaee 100644 --- a/src/drunc/utils/grpc_utils.py +++ b/src/drunc/utils/grpc_utils.py @@ -170,6 +170,7 @@ def copy_token(token: Token) -> Token: token_copy.CopyFrom(token) return token_copy + @dataclass class GrpcErrorDetails: """ @@ -330,3 +331,4 @@ def dict_to_grpc_proto(data: dict, proto_class_instance: Message) -> Message: proto_class_instance, ignore_unknown_fields=True ) + From a8823b1d74a142e03a83d48de65e472a65bc3a3f Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Tue, 4 Nov 2025 16:23:29 +0100 Subject: [PATCH 04/14] improvement logging and comments --- src/drunc/controller/interface/shell_utils.py | 18 ++------ .../process_manager/k8s_process_manager.py | 43 +++++++----------- .../process_manager/process_manager_driver.py | 45 +++++++------------ 3 files changed, 35 insertions(+), 71 deletions(-) diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 4917f1c7c..7fe8b8d50 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -712,8 +712,6 @@ def grab_default_value_from_env(argument_name): return cmd, cmd_name -# --- Helper function to check for internal IPs --- -# We cache this check too, though it's already very fast. @lru_cache(maxsize=1024) def is_private_ip(ip_str: str) -> bool: """ @@ -729,11 +727,10 @@ def is_private_ip(ip_str: str) -> bool: # .is_link_local = 169.254.x.x return ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local except ValueError: - # Not a valid IP address, treat as "private" to skip lookup + # Not 'valid' IP address -> treat as private return True -# --- HELPER 2: Checks if a string is an IP vs. a domain name --- @lru_cache(maxsize=1024) def is_valid_ip(hostname: str) -> bool: """Checks if a string is a valid IPv4 or IPv6 address.""" @@ -746,7 +743,6 @@ def is_valid_ip(hostname: str) -> bool: return False -# --- Your new, improved function --- @lru_cache(maxsize=4096) def get_hostname_smart(ip_address: str, timeout_seconds: float = 0.2) -> str: """ @@ -756,28 +752,20 @@ def get_hostname_smart(ip_address: str, timeout_seconds: float = 0.2) -> str: 3. Uses a short timeout for public IPs. """ - # 1. The Kubernetes/Internal IP check: - # If it's a private IP, don't even try to resolve it. - # This is the optimization that fixes your K8s problem. + # If private IP (k8s), don't try to resolve it if is_private_ip(ip_address): return ip_address - # 2. It's a public IP, so let's try to resolve it. - # We must use a try/finally to ensure we reset the global timeout. + # If public IP, try to resolve it. original_timeout = socket.getdefaulttimeout() try: - # Set a short global timeout *just* for this operation socket.setdefaulttimeout(timeout_seconds) hostname, _, _ = socket.gethostbyaddr(ip_address) return hostname except (socket.herror, socket.gaierror, socket.timeout): - # This catches "host not found", "name or service not known", - # or our own 0.2-second timeout. - # We cache the failure by returning the IP. return ip_address finally: - # 3. CRITICAL: Always reset the global timeout socket.setdefaulttimeout(original_timeout) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 3dd5cace2..99f08d280 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -692,15 +692,12 @@ def _create_associated_service( """Calls the appropriate service creation method based on pod type.""" if podname == self.connection_server_name: if lcs_port is None: - # Should not happen after the check in _create_pod, but for safety: raise DruncK8sException( "LCS service creation failed: port was not extracted." ) # If LCS, call nodeport service creation - self._create_nodeport_service( - podname, session, pod_uid - ) # Note: _create_nodeport_service must rely on self.connection_server_port, which was set in _create_pod + self._create_nodeport_service(podname, session, pod_uid) elif "root-controller" in podname: self.log.info( @@ -726,26 +723,23 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: try: lcs_port = None - # A. Early Port Extraction and Class Variable Setup for LCS + # Early Port Extraction and Class Variable Setup for LCS if podname == self.connection_server_name: lcs_port = self._extract_port_from_cmd(boot_request) if lcs_port: - # CRITICAL FIXES: Set both port variables self.connection_server_port = lcs_port - self.connection_server_node_port = ( - lcs_port # <--- THIS LINE WAS MISSING - ) + self.connection_server_node_port = lcs_port else: raise DruncK8sException( f"Could not extract port for LCS '{podname}'." ) - # B. Build the main container manifest (Signature takes 4 args) + # Build the main container manifest main_container = self._build_pod_main_container( podname, boot_request, lcs_port ) - # C/D/E. [Unchanged code for node_selector, host_aliases, pod_manifest] + # Node_selector, host_aliases, pod_manifest node_selector = self._get_pod_node_selector( podname, boot_request.process_restriction ) @@ -758,15 +752,14 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: host_aliases, ) - # F. Execute the pod creation API call + # Execute the pod creation API call pod_uid = self._execute_pod_creation_api(session, podname, pod_manifest) - # G. Create associated service (Signature takes 6 args) + # Create associated service (Signature takes 6 args) self._create_associated_service( podname, session, pod_uid, boot_request, lcs_port ) - # H. Error Handling except self._api_error_v1_api as e: start_time = time() error_message = f'Couldn\'t create resources for pod "{session}.{podname}". Reason: {e.reason}. Kubernetes API Error: ({e.status})' @@ -807,9 +800,8 @@ def _extract_port_from_cmd(self, boot_request) -> int | None: all_args = [e_and_a.exec] + list(e_and_a.args) arg_str = " ".join(all_args) - # 1. Check for gunicorn bind syntax (for local-connection-server) + # Check for gunicorn bind syntax (for local-connection-server) if "gunicorn" in arg_str: - # Regex 1: Match hardcoded port: e.g., --bind=0.0.0.0:30005 match_hardcoded = re.search(r"(-b|--bind)[\s=]+[\w\.]+:(\d+)", arg_str) if match_hardcoded: @@ -820,8 +812,7 @@ def _extract_port_from_cmd(self, boot_request) -> int | None: ) return port - # Regex 2: Match environment variable port: e.g., --bind=0.0.0.0:${CONNECTION_PORT} - # Group 2 captures the variable name (e.g., 'CONNECTION_PORT') + # Match environment variable port: e.g., --bind=0.0.0.0:${CONNECTION_PORT} match_var = re.search(r"(-b|--bind)[\s=]+[\w\.]+:\$\{(\w+)\}", arg_str) if match_var: @@ -846,7 +837,7 @@ def _extract_port_from_cmd(self, boot_request) -> int | None: f"Extracted port variable '{var_name}' but it was not found in environment map." ) - # 2. Check for drunc-controller --port syntax (unchanged) + # Check for drunc-controller --port syntax (unchanged) if "controller" in arg_str: match = re.search(r"--port[\s=]+(\d+)", arg_str) if match: @@ -857,7 +848,7 @@ def _extract_port_from_cmd(self, boot_request) -> int | None: ) return port - # 3. Check for drunc-controller -c grpc://... syntax (unchanged) + # Check for drunc-controller -c grpc://... syntax (unchanged) if "controller" in arg_str: match = re.search(r"-c\s+[\"\']?grpc:\/\/[^:]+:(\d+)[\"\']?", arg_str) if match: @@ -981,7 +972,6 @@ def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: podname, session ) if pod_status.status.phase == "Running": - # Check readiness condition is_ready = False if pod_status.status.conditions: for condition in pod_status.status.conditions: @@ -990,7 +980,6 @@ def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: break if is_ready: - # Success for Stage 1 pod_ready = True node_name = pod_status.spec.node_name self.log.info(f"Stage 1: Pod '{podname}' is API Ready.") @@ -1013,7 +1002,6 @@ def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: self.log.info(f"Stage 2: Waiting for NodePort {url} to be reachable...") nodeport_ready = False - # Use the *remaining* time for this check remaining_time = self.pod_ready_timeout - (time() - start_time) nodeport_start_time = time() @@ -1066,7 +1054,6 @@ def _wait_for_controller_readiness( podname, session ) if pod_status.status.phase == "Running": - # Check readiness condition is_ready = False if pod_status.status.conditions: for condition in pod_status.status.conditions: @@ -1149,10 +1136,10 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: session = boot_request.process_description.metadata.session podname = boot_request.process_description.metadata.name - # A. Pre-checks (Session validation, NodePort collision) + # Pre-checks (Session validation, NodePort collision) self._run_pre_boot_checks(session, podname, boot_request) - # B. Resource Creation (Namespace, Pod, Labels) + # Resource Creation (Namespace, Pod, Labels) self._create_namespace(session) self.boot_request[uuid] = BootRequest() self.boot_request[uuid].CopyFrom(boot_request) @@ -1161,13 +1148,13 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: self._add_label(podname, "pod", "uuid", uuid, session=session) self.log.info(f'"{session}.{podname}":{uuid} boot request sent.') - # C/D. Special handling and blocking wait for critical processes + # Special handling and blocking wait for critical processes if podname == self.connection_server_name: self._wait_for_lcs_readiness(podname, session) elif "root-controller" in podname: self._wait_for_controller_readiness(podname, session, boot_request) - # E. Post-Process (Return the ProcessInstance) + # Post-Process pd, pr, pu = ( ProcessDescription(), ProcessRestriction(), diff --git a/src/drunc/process_manager/process_manager_driver.py b/src/drunc/process_manager/process_manager_driver.py index 07e3a91f8..4feadbc30 100644 --- a/src/drunc/process_manager/process_manager_driver.py +++ b/src/drunc/process_manager/process_manager_driver.py @@ -380,7 +380,6 @@ def _discover_controller( Tries dynamic lookup via connectivity service first, then falls back to static OKS configuration. """ - # Define controller name ONCE in the outer scope try: top_controller_name = session_dal.segment.controller.id except AttributeError as e: @@ -392,13 +391,10 @@ def get_controller_address(session_dal, session_name): env = {} collect_variables(session_dal.environment, env) - # Note: 'env' variable is not currently used in this function. - # --- - # Strategy 1: Try dynamic lookup via Connectivity Service (if available) - # --- + # 1: Try dynamic lookup via Connectivity Service if csc: - self.log.info( + self.log.debug( f"Attempting to discover controller '{top_controller_name}' via connectivity service at {connection_server}:{connection_port}" ) try: @@ -418,10 +414,10 @@ def get_controller_address(session_dal, session_name): ) address = uri.replace("grpc://", "") - self.log.info( + self.log.debug( f"Successfully discovered controller '{top_controller_name}' via connectivity service: {address}" ) - return address # SUCCESS: Return dynamically found address + return address except ApplicationLookupUnsuccessful: self.log.warning( @@ -437,38 +433,32 @@ def get_controller_address(session_dal, session_name): self.log.warning( "Falling back to static OKS configuration for address resolution." ) - # DO NOT return. Fall through to Strategy 2. except Exception as e: self.log.error( f"An unexpected error occurred during connectivity service lookup: {e}. " "Falling back to static OKS configuration." ) - # DO NOT return. Fall through to Strategy 2. + else: - self.log.info( + self.log.warning( "Connectivity service client (csc) is not available. Using static OKS configuration only." ) - # --- - # Strategy 2: Fallback to static OKS configuration - # --- - self.log.info( + # 2: Fallback to static OKS configuration + self.log.debug( "Attempting to resolve controller address from static OKS configuration." ) port_number = None protocol = None service_found = None - # top_controller_name is already available from the outer scope try: - # We already have top_controller_name from outer scope - self.log.info( + self.log.debug( f"Top controller name from OKS config: '{top_controller_name}'" ) - # Check if exposes_service relationship exists and is populated if ( not hasattr(session_dal.segment.controller, "exposes_service") or not session_dal.segment.controller.exposes_service @@ -488,7 +478,7 @@ def get_controller_address(session_dal, session_name): ) if service_found: - self.log.info( + self.log.debug( f"Found linked control service object with ID: '{service_found.id}'" ) if ( @@ -496,7 +486,7 @@ def get_controller_address(session_dal, session_name): and service_found.port is not None ): port_number = service_found.port - self.log.info( + self.log.debug( f"Extracted port from service '{service_found.id}': {port_number}" ) else: @@ -506,7 +496,7 @@ def get_controller_address(session_dal, session_name): if hasattr(service_found, "protocol") and service_found.protocol: protocol = service_found.protocol - self.log.info( + self.log.debug( f"Extracted protocol from service '{service_found.id}': {protocol}" ) else: @@ -518,7 +508,7 @@ def get_controller_address(session_dal, session_name): self.log.error( f"Could not retrieve the first service object from 'exposes_service' for controller '{top_controller_name}'." ) - return None # Exit if service object itself couldn't be retrieved + return None except AttributeError as e: self.log.error( @@ -536,14 +526,14 @@ def get_controller_address(session_dal, session_name): self.log.error( f"Failed to extract valid port ({port_number}) or protocol ({protocol}) for service '{service_found.id if service_found else 'N/A'}'. Cannot determine controller address." ) - return None # Exit if service definition is incomplete + return None # Resolve the IP address of the host where the controller runs try: host_id = session_dal.segment.controller.runs_on.runs_on.id - self.log.info(f"Controller runs on host ID: '{host_id}'") + self.log.debug(f"Controller runs on host ID: '{host_id}'") ip = resolve_localhost_and_127_ip_to_network_ip(host_id) - self.log.info(f"Resolved host ID '{host_id}' to IP: {ip}") + self.log.debug(f"Resolved host ID '{host_id}' to IP: {ip}") except AttributeError as e: self.log.error( f"Error accessing OKS configuration attributes for host resolution: {e}. Check structure around session_dal.segment.controller.runs_on." @@ -561,11 +551,10 @@ def get_controller_address(session_dal, session_name): # If all checks passed, return the address final_address = f"{ip}:{port_number}" - self.log.info( + self.log.debug( f"Successfully resolved controller address from OKS config: {final_address}" ) return final_address - # --- END OKS FALLBACK LOGIC --- def keyboard_interrupt_on_sigint(signal, frame): self.log.warning("Interrupted") From b9106b8713a34f31ad69f99c306bdb436accf620 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Wed, 5 Nov 2025 14:28:05 +0100 Subject: [PATCH 05/14] addressing review suggestions --- src/drunc/controller/controller_driver.py | 1 + src/drunc/controller/interface/shell_utils.py | 12 ------------ src/drunc/process_manager/k8s_process_manager.py | 1 - 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/src/drunc/controller/controller_driver.py b/src/drunc/controller/controller_driver.py index 75aaa29a2..187783399 100644 --- a/src/drunc/controller/controller_driver.py +++ b/src/drunc/controller/controller_driver.py @@ -36,6 +36,7 @@ def __init__(self, address: str, token: Token): options = [ ("grpc.keepalive_time_ms", 60000) # pings the server every 60 seconds ] + # The 'ipv4:' prefix forces IPv4 resolution, which helps avoid Kubernetes hairpinning issues target_address = f"ipv4:{self.address}" self.channel = grpc.insecure_channel(target_address, options=options) self.stub = ControllerStub(self.channel) diff --git a/src/drunc/controller/interface/shell_utils.py b/src/drunc/controller/interface/shell_utils.py index 7fe8b8d50..dc7e2fd81 100644 --- a/src/drunc/controller/interface/shell_utils.py +++ b/src/drunc/controller/interface/shell_utils.py @@ -731,18 +731,6 @@ def is_private_ip(ip_str: str) -> bool: return True -@lru_cache(maxsize=1024) -def is_valid_ip(hostname: str) -> bool: - """Checks if a string is a valid IPv4 or IPv6 address.""" - if not hostname: - return False - try: - ipaddress.ip_address(hostname) - return True - except ValueError: - return False - - @lru_cache(maxsize=4096) def get_hostname_smart(ip_address: str, timeout_seconds: float = 0.2) -> str: """ diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 99f08d280..eac9d3705 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -1167,7 +1167,6 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: process_description=pd, process_restriction=pr, status_code=ProcessInstance.StatusCode.RUNNING, - return_code=0, uuid=pu, ) From c78d2c7afc67ec9eef32323ba6f4700d7847abee Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 7 Nov 2025 16:13:17 +0100 Subject: [PATCH 06/14] Adding missing connectivity service port --- config/tests/one-controller-config.data.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/config/tests/one-controller-config.data.xml b/config/tests/one-controller-config.data.xml index 6193886c5..7b3ee77c0 100644 --- a/config/tests/one-controller-config.data.xml +++ b/config/tests/one-controller-config.data.xml @@ -116,6 +116,7 @@ + From 544410c18ab434cf23ce92ed6a7f6a77b2786cd4 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 7 Nov 2025 16:23:48 +0100 Subject: [PATCH 07/14] Adding remaining missing connectivity server addresseszx --- config/tests/deep-segments-config.data.xml | 1 + config/tests/nestedConfig.data.xml | 1 + tests/controller/test_controller_driver.py | 8 ++++++++ 3 files changed, 10 insertions(+) diff --git a/config/tests/deep-segments-config.data.xml b/config/tests/deep-segments-config.data.xml index f8833549a..796e32b0f 100644 --- a/config/tests/deep-segments-config.data.xml +++ b/config/tests/deep-segments-config.data.xml @@ -218,6 +218,7 @@ + diff --git a/config/tests/nestedConfig.data.xml b/config/tests/nestedConfig.data.xml index 4761605e9..23592ceb8 100644 --- a/config/tests/nestedConfig.data.xml +++ b/config/tests/nestedConfig.data.xml @@ -209,6 +209,7 @@ + diff --git a/tests/controller/test_controller_driver.py b/tests/controller/test_controller_driver.py index 571b49b55..9be7b2abf 100644 --- a/tests/controller/test_controller_driver.py +++ b/tests/controller/test_controller_driver.py @@ -6,10 +6,18 @@ from drunc.controller.controller_driver import ControllerDriver from drunc.exceptions import DruncException from drunc.utils.shell_utils import create_dummy_token_from_uname +import json +import pprint def setup_controller_driver(processes_and_logs, dal, session_name) -> ControllerDriver: connectivity_service_port = dal.connectivity_service.service.port + try: + print(json.dumps(processes_and_logs, indent=2, sort_keys=True, default=str)) + except Exception: + pprint.pprint(processes_and_logs, width=120, sort_dicts=True) + print(f"{dal=}") + print(f"{session_name=}") csc = ConnectivityServiceClient( session_name, From f85623e0fd79d18ee4a1585babe9f793efcee7a5 Mon Sep 17 00:00:00 2001 From: PawelPlesniak Date: Fri, 7 Nov 2025 16:35:40 +0100 Subject: [PATCH 08/14] updating tests for service connection lookup from 'localhost' to physcial host name --- .../process_manager/test_process_manager_driver.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tests/process_manager/test_process_manager_driver.py b/tests/process_manager/test_process_manager_driver.py index 6357efbbc..77c712dd5 100644 --- a/tests/process_manager/test_process_manager_driver.py +++ b/tests/process_manager/test_process_manager_driver.py @@ -9,6 +9,7 @@ or if a bug was introduced. """ +import socket from unittest.mock import MagicMock, patch import grpc @@ -485,15 +486,22 @@ def test_connect_to_service_success(mock_client_class, mock_driver): mock_client_instance = MagicMock() mock_client_class.return_value = mock_client_instance + pytest_hostname = socket.gethostname() + mock_session_dal = MagicMock() mock_session_dal.connectivity_service.host = "localhost" mock_session_dal.connectivity_service.service.port = 1234 - result = mock_driver._connect_to_service(mock_session_dal, "session1") + result_localhost = mock_driver._connect_to_service(mock_session_dal, "session1") + mock_client_class.assert_called_once_with("session1", f"{pytest_hostname}:1234") + assert result_localhost == (mock_client_instance, pytest_hostname, 1234) - mock_client_class.assert_called_once_with("session1", "localhost:1234") + mock_session_dal.connectivity_service.host = pytest_hostname + result_pytest_hostname = mock_driver._connect_to_service(mock_session_dal, "session2") + mock_client_class.assert_called_with("session2", f"{pytest_hostname}:1234") + assert result_pytest_hostname == (mock_client_instance, pytest_hostname, 1234) - assert result == (mock_client_instance, "localhost", 1234) + mock_client_class.assert_called_with("session2", f"{pytest_hostname}:1234") def test_connect_to_service_none(mock_driver): From 6d7d930d4a9177823bd7df2553ae275c2104a053 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Mon, 10 Nov 2025 17:30:26 +0100 Subject: [PATCH 09/14] moving grpc timeout to conf json --- src/drunc/data/process_manager/k8s.json | 3 ++- src/drunc/process_manager/k8s_process_manager.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/drunc/data/process_manager/k8s.json b/src/drunc/data/process_manager/k8s.json index d37750c5d..9247ac38c 100644 --- a/src/drunc/data/process_manager/k8s.json +++ b/src/drunc/data/process_manager/k8s.json @@ -42,7 +42,8 @@ "checking": { "watcher_retry_sleep": 5, "pod_status_check_sleep": 1, - "host_cache_expiry": 300 + "host_cache_expiry": 300, + "grpc_startup_timeout": 30 } } } diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index eac9d3705..0409865a6 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -192,6 +192,7 @@ def __init__(self, configuration, **kwargs) -> None: self.watcher_retry_sleep = checking.get("watcher_retry_sleep", 5) self.pod_status_check_sleep = checking.get("pod_status_check_sleep", 1) self._host_cache_expiry = checking.get("host_cache_expiry", 300) + self.grpc_startup_timeout = checking.get("grpc_startup_timeout", 30) self.log.debug(f"Using kill_timeout of {self.kill_timeout} seconds.") @@ -1039,7 +1040,7 @@ def _wait_for_controller_readiness( controller_port = self._extract_port_from_cmd(boot_request) start_time = time() api_ready_timeout = self.pod_ready_timeout - grpc_startup_timeout = 120 + grpc_startup_timeout = self.grpc_startup_timeout if not controller_port or controller_port == 0: raise DruncK8sException( From 34fac12cd7a6c9dd6ef561e29eb30435a8f02ebc Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Mon, 10 Nov 2025 17:52:01 +0100 Subject: [PATCH 10/14] improving wait functions, factoring out similar steps --- src/drunc/data/process_manager/k8s.json | 3 +- .../process_manager/k8s_process_manager.py | 204 +++++++++--------- 2 files changed, 104 insertions(+), 103 deletions(-) diff --git a/src/drunc/data/process_manager/k8s.json b/src/drunc/data/process_manager/k8s.json index 9247ac38c..2fedaf06c 100644 --- a/src/drunc/data/process_manager/k8s.json +++ b/src/drunc/data/process_manager/k8s.json @@ -43,7 +43,8 @@ "watcher_retry_sleep": 5, "pod_status_check_sleep": 1, "host_cache_expiry": 300, - "grpc_startup_timeout": 30 + "grpc_startup_timeout": 30, + "socket_retry_timeout": 1.0 } } } diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index 0409865a6..a49967307 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -193,6 +193,7 @@ def __init__(self, configuration, **kwargs) -> None: self.pod_status_check_sleep = checking.get("pod_status_check_sleep", 1) self._host_cache_expiry = checking.get("host_cache_expiry", 300) self.grpc_startup_timeout = checking.get("grpc_startup_timeout", 30) + self.socket_retry_timeout = checking.get("socket_retry_timeout", 1.0) self.log.debug(f"Using kill_timeout of {self.kill_timeout} seconds.") @@ -956,18 +957,21 @@ def _run_pre_boot_checks( "Cannot start another local connection server with the same port." ) - def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: - """Blocking two-stage wait for the Local Connection Server (NodePort) to be fully ready.""" - node_name = None - pod_ready = False - start_time = time() - api_ready_timeout = self.pod_ready_timeout - - # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- + def _wait_for_pod_api_ready( + self, podname: str, session: str, timeout: float + ) -> str: + """ + [HELPER] Blocking wait for a pod to be 'Running' and 'Ready' + in the K8s API. + Returns the node_name on success. + Raises DruncK8sException on timeout. + """ self.log.info( f"Stage 1: Waiting for '{podname}' pod to be Running and Ready..." ) - while not pod_ready and (time() - start_time < api_ready_timeout): + start_time = time() + + while time() - start_time < timeout: try: pod_status = self._core_v1_api.read_namespaced_pod_status( podname, session @@ -981,36 +985,40 @@ def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: break if is_ready: - pod_ready = True node_name = pod_status.spec.node_name - self.log.info(f"Stage 1: Pod '{podname}' is API Ready.") + self.log.info( + f"Stage 1: Pod '{podname}' is API Ready on node {node_name}." + ) + return node_name # Success! except self._api_error_v1_api as e: if e.status == 404: + # Pod not created yet, this is expected, continue loop pass else: + # Re-raise other K8s API errors raise e sleep(self.pod_status_check_sleep) - if not pod_ready: - raise DruncK8sException( - f"'{podname}' pod did not become API Ready in {api_ready_timeout} seconds." - ) + # If we exit the loop, it's a timeout + raise DruncK8sException( + f"'{podname}' pod did not become API Ready in {timeout} seconds." + ) - # --- STAGE 2: Wait for NodePort to be externally reachable (using HTTP urllib) --- - url = f"http://{node_name}:{self.connection_server_node_port}" + def _wait_for_nodeport_http_ready(self, url: str, timeout: float) -> None: + """ + [HELPER] Blocking wait for a NodePort URL to be reachable via HTTP. + Raises DruncK8sException on timeout. + """ self.log.info(f"Stage 2: Waiting for NodePort {url} to be reachable...") - nodeport_ready = False - - remaining_time = self.pod_ready_timeout - (time() - start_time) - nodeport_start_time = time() + start_time = time() - while not nodeport_ready and (time() - nodeport_start_time < remaining_time): + while time() - start_time < timeout: try: urllib.request.urlopen(url, timeout=1) - nodeport_ready = True self.log.info(f"Stage 2: NodePort {url} is now active.") + return # Success! except ( urllib.error.URLError, ConnectionRefusedError, @@ -1020,114 +1028,106 @@ def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: self.log.debug(f"NodePort not ready yet ({e}), retrying...") sleep(self.pod_status_check_sleep) - if not nodeport_ready: - raise DruncK8sException( - f"NodePort {url} did not become reachable in {self.pod_ready_timeout} seconds." - ) - - self.local_connection_server_is_booted = True - - self.log.info(f"Connection server '{podname}' is fully ready.") + raise DruncK8sException( + f"NodePort {url} did not become reachable in {timeout} seconds." + ) - def _wait_for_controller_readiness( - self, podname: str, session: str, boot_request: BootRequest + def _wait_for_nodeport_tcp_ready( + self, node_name: str, port: int, timeout: float ) -> None: - """Blocking two-stage wait for Drunc Controller (HostPort) to be fully ready.""" - - self.log.info(f"Waiting for '{podname}' (HostPort) to become ready...") - node_name = None - pod_ready = False - controller_port = self._extract_port_from_cmd(boot_request) - start_time = time() - api_ready_timeout = self.pod_ready_timeout - grpc_startup_timeout = self.grpc_startup_timeout - - if not controller_port or controller_port == 0: - raise DruncK8sException( - f"Cannot wait for '{podname}', port is 0 or missing." - ) - - # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- - self.log.info("Stage 1: Waiting for K8s API Readiness...") - while not pod_ready and (time() - start_time < api_ready_timeout): - try: - pod_status = self._core_v1_api.read_namespaced_pod_status( - podname, session - ) - if pod_status.status.phase == "Running": - is_ready = False - if pod_status.status.conditions: - for condition in pod_status.status.conditions: - if condition.type == "Ready" and condition.status == "True": - is_ready = True - break - - if is_ready: - pod_ready = True - node_name = pod_status.spec.node_name - self.log.info( - f"Stage 1: Pod '{podname}' is API Ready on node {node_name}." - ) - break - - except self._api_error_v1_api as e: - if e.status == 404: - pass - else: - raise e - sleep(self.pod_status_check_sleep) - - if not pod_ready: - raise DruncK8sException( - f"'{podname}' pod did not become API Ready in {api_ready_timeout} seconds." - ) - - # --- STAGE 2: Wait for HostPort to be externally reachable (using TCP socket) --- + """ + [HELPER] Blocking wait for a NodePort to be reachable via TCP socket. + Raises DruncK8sException on timeout. + """ self.log.info( - f"Stage 2: Waiting for HostPort {node_name}:{controller_port} to be reachable..." + f"Stage 2: Waiting for NodePort {node_name}:{port} to be reachable..." ) - hostport_ready = False - hostport_start_time = time() + start_time = time() - while not hostport_ready and ( - time() - hostport_start_time < grpc_startup_timeout - ): + while time() - start_time < timeout: sock = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(1.0) - result = sock.connect_ex((node_name, controller_port)) + sock.settimeout(self.socket_retry_timeout) + result = sock.connect_ex((node_name, port)) if result == 0: - hostport_ready = True self.log.info( - f"Stage 2: HostPort {node_name}:{controller_port} is active (TCP connect success)." + f"Stage 2: NodePort {node_name}:{port} is active (TCP connect success)." ) + return # Success! else: self.log.debug( - f"HostPort {node_name}:{controller_port} not ready yet (socket error {result}), retrying..." + f"NodePort {node_name}:{port} not ready yet (socket error {result}), retrying..." ) - sleep(self.pod_status_check_sleep) - except socket.gaierror as e: self.log.warning( f"Failed to resolve hostname '{node_name}': {e}. Retrying..." ) - sleep(self.pod_status_check_sleep) except Exception as e: self.log.debug( - f"HostPort not ready yet (Socket error: {e}), retrying..." + f"NodePort not ready yet (Socket error: {e}), retrying..." ) - sleep(self.pod_status_check_sleep) finally: if sock: sock.close() - if not hostport_ready: + sleep(self.pod_status_check_sleep) + + raise DruncK8sException( + f"NodePort {node_name}:{port} did not become reachable in {timeout} seconds." + ) + + def _wait_for_lcs_readiness(self, podname: str, session: str) -> None: + """Blocking two-stage wait for the Local Connection Server (NodePort) to be fully ready.""" + self.log.info(f"Waiting for LCS '{podname}' to be fully ready...") + start_time = time() + total_timeout = self.pod_ready_timeout + + # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- + node_name = self._wait_for_pod_api_ready(podname, session, total_timeout) + + # --- STAGE 2: Wait for NodePort to be externally reachable (using HTTP urllib) --- + url = f"http://{node_name}:{self.connection_server_node_port}" + + # Calculate remaining time for stage 2, preserving original logic + elapsed_stage1 = time() - start_time + remaining_time = total_timeout - elapsed_stage1 + + if remaining_time <= 0: raise DruncK8sException( - f"HostPort {node_name}:{controller_port} did not become reachable in {grpc_startup_timeout} seconds." + f"NodePort {url} check failed: No time left after API readiness." ) + self._wait_for_nodeport_http_ready(url, remaining_time) + + self.local_connection_server_is_booted = True + self.log.info(f"Connection server '{podname}' is fully ready.") + + def _wait_for_controller_readiness( + self, podname: str, session: str, boot_request: BootRequest + ) -> None: + """Blocking two-stage wait for Drunc Controller (NodePort) to be fully ready.""" + self.log.info( + f"Waiting for controller '{podname}' (NodePort) to become ready..." + ) + + controller_port = self._extract_port_from_cmd(boot_request) + if not controller_port or controller_port == 0: + raise DruncK8sException( + f"Cannot wait for '{podname}', port is 0 or missing." + ) + + # --- STAGE 1: Wait for Pod to be Running/Ready in K8s API --- + node_name = self._wait_for_pod_api_ready( + podname, session, self.pod_ready_timeout + ) + + # --- STAGE 2: Wait for NodePort to be externally reachable (using TCP socket) --- + self._wait_for_nodeport_tcp_ready( + node_name, controller_port, self.grpc_startup_timeout + ) + self.log.info(f"Drunc controller '{podname}' is fully ready.") def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance: From 2b441ae680df5172348e88fa6095bcd12ea47754 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Mon, 10 Nov 2025 18:36:58 +0100 Subject: [PATCH 11/14] better error handling / timeouts for pod creation --- .../process_manager/k8s_process_manager.py | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index a49967307..a081d39a1 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -663,7 +663,6 @@ def _execute_pod_creation_api( ) -> str: """Executes the API call to create the pod, handling 409 conflict during restarts.""" start_time = time() - pod_uid = None while True: try: @@ -671,17 +670,25 @@ def _execute_pod_creation_api( session, pod_manifest ) self.log.info(f'Creating pod "{session}.{podname}"') - pod_uid = created_pod.metadata.uid - break + return created_pod.metadata.uid except self._api_error_v1_api as e: is_409_conflict = e.status == 409 + elapsed_time = time() - start_time - if is_409_conflict and time() - start_time < self.restart_cleanup_time: + if is_409_conflict and elapsed_time < self.restart_cleanup_time: sleep(self.restart_cleanup_polling) continue + + if is_409_conflict: + error_message = ( + f"Timeout (>{self.restart_cleanup_time}s) waiting for old pod object " + f'"{session}/{podname}" to be fully deleted. Could not restart pod.' + ) + self.log.error(error_message) + raise DruncK8sException(error_message) from e + raise e - return pod_uid def _create_associated_service( self, @@ -757,26 +764,23 @@ def _create_pod(self, podname, session, boot_request: BootRequest) -> None: # Execute the pod creation API call pod_uid = self._execute_pod_creation_api(session, podname, pod_manifest) - # Create associated service (Signature takes 6 args) + # Create associated service self._create_associated_service( podname, session, pod_uid, boot_request, lcs_port ) except self._api_error_v1_api as e: - start_time = time() + # *other* K8s errors (e.g., 400, 403, 500) error_message = f'Couldn\'t create resources for pod "{session}.{podname}". Reason: {e.reason}. Kubernetes API Error: ({e.status})' - - if e.status == 409 and time() - start_time >= self.restart_cleanup_time: - error_message = ( - f"Timeout (>{self.restart_cleanup_time}s) waiting for old pod object " - f'"{session}/{podname}" to be fully deleted. Could not restart pod.' - ) - self.log.error(error_message) raise DruncK8sException(error_message) from e + except DruncK8sException: + # any other DruncK8sException raise + except Exception as e: + # generic catch-all raise DruncK8sException( f"Failed to create pod '{session}.{podname}': {e}" ) from e From e9e047669c3941b7ad340001d444ef2702b69404 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Mon, 10 Nov 2025 18:51:05 +0100 Subject: [PATCH 12/14] more efficient port check for nodeport --- .../process_manager/k8s_process_manager.py | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index a081d39a1..d5773a167 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -486,9 +486,31 @@ def _create_nodeport_service(self, podname, session, pod_uid) -> None: f'Created NodePort service "{session}.{podname}" on port {self.connection_server_port} ' f"(NodePort: {self.connection_server_node_port} for external access)" ) + except self._api_error_v1_api as e: - if e.status != 409: - self.log.error(f"Failed to create NodePort service for {podname}: {e}") + is_port_conflict = False + + # Check for 422="Unprocessable Entity" or 409="Conflict" status + if e.status == 422 or e.status == 409: + if e.body and ( + "provided nodeport is already allocated" in e.body.lower() + or "port is already in use" in e.body.lower() + ): + is_port_conflict = True + + if is_port_conflict: + port = self.connection_server_node_port + error_message = ( + f"NodePort {port} is already in use by another service. " + f"Cannot start '{podname}'." + ) + self.log.error(error_message) + raise DruncK8sException(error_message) from e + else: + # other K8s API error + error_message = f"Failed to create NodePort service for {podname}: {e.reason} ({e.status})" + self.log.error(error_message) + raise DruncK8sException(error_message) from e def _build_pod_main_container( self, podname: str, boot_request: BootRequest, lcs_port: int | None @@ -936,31 +958,13 @@ def _boot_impl(self, boot_request: BootRequest) -> ProcessInstanceList: def _run_pre_boot_checks( self, session: str, podname: str, boot_request: BootRequest ) -> None: - """Performs initial validation and checks for NodePort collision.""" + """Performs initial validation.""" if not validate_k8s_session_name(session): raise DruncK8sNamespaceException( f'Invalid session/namespace name "{session}". Must match RFC1123 label: ' "lowercase alphanumeric or '-', start/end with alphanumeric, max 63 chars." ) - # Check for NodePort collision (only necessary if connection server is being started) - if podname == self.connection_server_name and self.connection_server_node_port: - api = self._core_v1_api - all_services = api.list_service_for_all_namespaces() - for svc in all_services.items: - if not svc.spec.type == "NodePort": - continue - for p in svc.spec.ports: - if p.node_port == self.connection_server_node_port and ( - svc.metadata.namespace != session - or svc.metadata.name != podname - ): - raise DruncK8sException( - f"NodePort {self.connection_server_node_port} is already in use by service " - f"{svc.metadata.name} in namespace {svc.metadata.namespace}. " - "Cannot start another local connection server with the same port." - ) - def _wait_for_pod_api_ready( self, podname: str, session: str, timeout: float ) -> str: From 9b7fb899173d7a2a4f5ccaced4c4a0a61f312fa8 Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Mon, 10 Nov 2025 19:08:45 +0100 Subject: [PATCH 13/14] better socket creation handling --- .../process_manager/k8s_process_manager.py | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index d5773a167..f49b35794 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -1053,21 +1053,21 @@ def _wait_for_nodeport_tcp_ready( start_time = time() while time() - start_time < timeout: - sock = None try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(self.socket_retry_timeout) - result = sock.connect_ex((node_name, port)) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(self.socket_retry_timeout) + result = sock.connect_ex((node_name, port)) + + if result == 0: + self.log.info( + f"Stage 2: NodePort {node_name}:{port} is active (TCP connect success)." + ) + return + else: + self.log.debug( + f"NodePort {node_name}:{port} not ready yet (socket error {result}), retrying..." + ) - if result == 0: - self.log.info( - f"Stage 2: NodePort {node_name}:{port} is active (TCP connect success)." - ) - return # Success! - else: - self.log.debug( - f"NodePort {node_name}:{port} not ready yet (socket error {result}), retrying..." - ) except socket.gaierror as e: self.log.warning( f"Failed to resolve hostname '{node_name}': {e}. Retrying..." @@ -1076,9 +1076,6 @@ def _wait_for_nodeport_tcp_ready( self.log.debug( f"NodePort not ready yet (Socket error: {e}), retrying..." ) - finally: - if sock: - sock.close() sleep(self.pod_status_check_sleep) From 8d081c7762194471b9faa058e8302e1c87384e9a Mon Sep 17 00:00:00 2001 From: MRiganSUSX Date: Mon, 10 Nov 2025 20:16:29 +0100 Subject: [PATCH 14/14] rebasing due to many reverts on dev... --- .../process_manager/k8s_process_manager.py | 21 +++++++------------ src/drunc/utils/grpc_utils.py | 12 +++++------ 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/src/drunc/process_manager/k8s_process_manager.py b/src/drunc/process_manager/k8s_process_manager.py index f49b35794..ee1feb1bb 100644 --- a/src/drunc/process_manager/k8s_process_manager.py +++ b/src/drunc/process_manager/k8s_process_manager.py @@ -537,6 +537,12 @@ def _build_pod_main_container( command_parts.append(prefix + " ".join([e_and_a.exec] + list(e_and_a.args))) main_command_str = " && ".join(command_parts) + container_ports = [] + if podname == self.connection_server_name and lcs_port is not None: + container_ports.append( + client.V1ContainerPort(container_port=lcs_port, name="http-port") + ) + # Only add preStop hook for C++ applications (non-controllers) lifecycle_hook = None if "controller" not in podname and podname != self.connection_server_name: @@ -556,12 +562,6 @@ def _build_pod_main_container( f"'{podname}' identified as a Python app, no preStop hook needed." ) - container_ports = [] - if podname == self.connection_server_name and lcs_port is not None: - container_ports.append( - client.V1ContainerPort(container_port=lcs_port, name="http-port") - ) - main_container = client.V1Container( name=podname, image=pod_image, @@ -577,15 +577,10 @@ def _build_pod_main_container( client.V1VolumeMount(name="nfs", mount_path="/nfs"), client.V1VolumeMount(name="cvmfs", mount_path="/cvmfs"), ], - "working_dir": boot_request.process_description.process_execution_directory, - "security_context": client.V1SecurityContext( + working_dir=boot_request.process_description.process_execution_directory, + security_context=client.V1SecurityContext( run_as_user=os.getuid(), run_as_group=os.getgid() ), - - if lifecycle_hook is not None: - container_kwargs["lifecycle"] = lifecycle_hook - - main_container = client.V1Container(**container_kwargs) ) return main_container diff --git a/src/drunc/utils/grpc_utils.py b/src/drunc/utils/grpc_utils.py index 11b1ccaee..67a447280 100644 --- a/src/drunc/utils/grpc_utils.py +++ b/src/drunc/utils/grpc_utils.py @@ -310,6 +310,8 @@ def extract_grpc_rich_error(grpc_error: grpc.RpcError) -> GrpcErrorDetails: return GrpcErrorDetails( code=code, message=status.message or "No message", details=error_details + ) + def grpc_proto_to_dict(proto_message: Message) -> dict: """ @@ -317,18 +319,14 @@ def grpc_proto_to_dict(proto_message: Message) -> dict: """ return json_format.MessageToDict( proto_message, - preserving_proto_field_name=True + preserving_proto_field_name=True, # Removed: including_default_value_fields=True ) + def dict_to_grpc_proto(data: dict, proto_class_instance: Message) -> Message: """ Converts a Python dictionary into an instance of a gRPC Protobuf message. 'proto_class_instance' should be an empty instance, e.g., Token() """ - return json_format.ParseDict( - data, - proto_class_instance, - ignore_unknown_fields=True - ) - + return json_format.ParseDict(data, proto_class_instance, ignore_unknown_fields=True)