11import abc
2+ import atexit
23from enum import Enum
34import logging
45import subprocess
@@ -57,6 +58,7 @@ def __init__(self):
5758 self ._listeners : Set [ServiceListener ] = set ()
5859 self ._listeners_lock = threading .Lock ()
5960 self ._error = None
61+ self .log_directory : Optional [Path ] = None
6062
6163 def set_experiment (self , xp : "Experiment" ) -> None :
6264 """Called when the service is added to an experiment.
@@ -68,10 +70,9 @@ def set_experiment(self, xp: "Experiment") -> None:
6870 xp: The experiment this service is being added to.
6971 """
7072 self ._experiment = xp
71-
72- # Create log directory for this service
73- if self .log_directory :
74- self .log_directory .mkdir (parents = True , exist_ok = True )
73+ self .log_directory = (
74+ self ._experiment .workspace .scheduler_services_path / self .id
75+ )
7576
7677 @property
7778 def experiment_id (self ) -> str :
@@ -87,13 +88,6 @@ def run_id(self) -> str:
8788 return self ._experiment .run_id or ""
8889 return ""
8990
90- @property
91- def log_directory (self ) -> Optional [Path ]:
92- """Return the directory for service logs (None if not attached to experiment)"""
93- if self ._experiment is None :
94- return None
95- return self ._experiment .workspace .scheduler_services_path / self .id
96-
9791 @property
9892 def stdout (self ) -> Optional [Path ]:
9993 """Return path to stdout log file"""
@@ -261,6 +255,10 @@ def setup_logging(
261255 if not self .stdout or not self .stderr :
262256 return None , None
263257
258+ # Ensure log directory exists
259+ if self .log_directory :
260+ self .log_directory .mkdir (parents = True , exist_ok = True )
261+
264262 # Get logger for this service
265263 service_logger = logging .getLogger (f"xpm.service.{ self .id } " )
266264 service_logger .setLevel (logging .INFO )
@@ -622,18 +620,27 @@ def _start_process(self):
622620 """Start the service as a subprocess"""
623621 # Build command to run service
624622 cmd = self ._build_command ()
623+ logger .info (
624+ "Starting service %s (log_directory=%s): %s" ,
625+ self .id ,
626+ self .log_directory ,
627+ " " .join (cmd ),
628+ )
625629
626- # Redirect stdout/stderr to log files
630+ # Ensure log directory exists and redirect stdout/stderr to log files
631+ if self .log_directory :
632+ self .log_directory .mkdir (parents = True , exist_ok = True )
627633 stdout_file = open (self .stdout , "w" ) if self .stdout else subprocess .DEVNULL
628634 stderr_file = open (self .stderr , "w" ) if self .stderr else subprocess .DEVNULL
629635
630- # Start process
636+ # Start process (ensure it gets killed when the parent exits)
631637 self .process = subprocess .Popen (
632638 cmd ,
633639 stdout = stdout_file ,
634640 stderr = stderr_file ,
635641 cwd = str (self .log_directory ) if self .log_directory else None ,
636642 )
643+ atexit .register (self ._kill_process )
637644
638645 # Monitor process in background thread
639646 monitor_thread = threading .Thread (
@@ -656,8 +663,9 @@ def _monitor_process(self):
656663 self .process .wait ()
657664
658665 except Exception as e :
659- logger .exception (f"Service { self .id } monitoring failed: { e } " )
660- self .state = ServiceState .ERROR
666+ logger .error ("Service %s failed: %s" , self .id , e )
667+ self .set_error (str (e ))
668+
661669 finally :
662670 if running_event and not running_event .is_set ():
663671 running_event .set ()
@@ -701,6 +709,16 @@ def stop(self, timeout: float = 2.0):
701709 running_event .wait ()
702710
703711 # Terminate process
712+ self ._kill_process (timeout = timeout )
713+ atexit .unregister (self ._kill_process )
714+
715+ with self ._start_lock :
716+ self .url = None
717+ self ._running_event = None
718+ self .state = ServiceState .STOPPED
719+
720+ def _kill_process (self , timeout : float = 2.0 ):
721+ """Kill the subprocess if it is still running."""
704722 if self .process and self .process .poll () is None :
705723 logger .info (f"Terminating service { self .id } (PID { self .process .pid } )" )
706724 self .process .terminate ()
@@ -710,8 +728,3 @@ def stop(self, timeout: float = 2.0):
710728 logger .warning (f"Service { self .id } did not terminate, killing" )
711729 self .process .kill ()
712730 self .process .wait ()
713-
714- with self ._start_lock :
715- self .url = None
716- self ._running_event = None
717- self .state = ServiceState .STOPPED
0 commit comments