From c1d2d16b2d98c2640b6416844b234510c0d2b99a Mon Sep 17 00:00:00 2001 From: Simon Corrodi Date: Wed, 17 Dec 2025 22:55:42 -0600 Subject: [PATCH 1/2] adding functionality to save run records to postgresql --- rc/control/daqinterface.py | 78 ++++++++++++++++ rc/control/save_run_record.py | 161 ++++++++++++++++++++++++++++++++-- 2 files changed, 234 insertions(+), 5 deletions(-) diff --git a/rc/control/daqinterface.py b/rc/control/daqinterface.py index 76a4f487..5ac79c71 100755 --- a/rc/control/daqinterface.py +++ b/rc/control/daqinterface.py @@ -895,6 +895,26 @@ def read_settings(self): self.spackdir = None + # Database settings for run record saving (optional) + # Initialize from environment variables first (as defaults), then settings file will overwrite + # Support both OTSDAQ_RUN_RECORD_DATABASE_* and RUN_RECORD_DATABASE_* patterns + env_enabled = os.environ.get("RUN_RECORD_DATABASE_ENABLED") + if env_enabled: + token = env_enabled.strip() + res = re.search(r"[Tt]rue", token) + self.enable_run_record_database = bool(res) + else: + self.enable_run_record_database = False + + self.run_record_database_name = os.environ.get("OTSDAQ_RUNINFO_DATABASE_NAME") or os.environ.get("RUN_RECORD_DATABASE_NAME") or "run_info" + self.run_record_database_host = os.environ.get("OTSDAQ_RUNINFO_DATABASE_HOST") or os.environ.get("RUN_RECORD_DATABASE_HOST") or "" + self.run_record_database_port = os.environ.get("OTSDAQ_RUNINFO_DATABASE_PORT") or os.environ.get("RUN_RECORD_DATABASE_PORT") or "" + self.run_record_database_user = os.environ.get("OTSDAQ_RUNINFO_DATABASE_USER") or os.environ.get("RUN_RECORD_DATABASE_USER") or "" + self.run_record_database_pwd = os.environ.get("OTSDAQ_RUNINFO_DATABASE_PWD") or os.environ.get("RUN_RECORD_DATABASE_PWD") or "" + self.run_record_database_schema = os.environ.get("OTSDAQ_RUNINFO_DATABASE_SCHEMA") or os.environ.get("RUN_RECORD_DATABASE_SCHEMA") or "test" + # Prefix only checks RUN_RECORD_DATABASE_PREFIX (no OTSDAQ_ version) + self.run_record_database_prefix = os.environ.get("RUN_RECORD_DATABASE_PREFIX") or "artdaq" + for line in inf.readlines(): line = expand_environment_variable_in_string(line) @@ -1126,6 +1146,52 @@ def read_settings(self): if res: self.attempt_existing_pid_kill = True + elif ( + "run_record_database_enabled" in line + or "run record database enabled" in line + ): + token = line.split()[-1].strip() + res = re.search(r"[Tt]rue", token) + if res: + self.enable_run_record_database = True + elif ( + "run_record_database_name" in line + or "run record database name" in line + ): + self.run_record_database_name = line.split()[-1].strip() + elif ( + "run_record_database_host" in line + or "run record database host" in line + ): + self.run_record_database_host = line.split()[-1].strip() + elif ( + "run_record_database_port" in line + or "run record database port" in line + ): + self.run_record_database_port = line.split()[-1].strip() + elif ( + "run_record_database_user" in line + or "run record database user" in line + ): + self.run_record_database_user = line.split()[-1].strip() + elif ( + "run_record_database_pwd" in line + or "run record database pwd" in line + or "run_record_database_password" in line + or "run record database password" in line + ): + self.run_record_database_pwd = line.split()[-1].strip() + elif ( + "run_record_database_schema" in line + or "run record database schema" in line + ): + self.run_record_database_schema = line.split()[-1].strip() + elif ( + "run_record_database_prefix" in line + or "run record database prefix" in line + ): + self.run_record_database_prefix = line.split()[-1].strip() + missing_vars = [] @@ -3754,6 +3820,18 @@ def do_start_running(self, run_number=None): try: shutil.copytree(self.tmp_run_record, run_record_directory) + + # Save run record to database now that run_number is available + try: + from rc.control.save_run_record import _save_run_record_to_database + _save_run_record_to_database(self) + except Exception: + # Don't fail the start transition if database save fails + self.print_log("w", traceback.format_exc()) + self.print_log( + "w", + "Failed to save run record to database, but continuing with start transition", + ) except: self.print_log("e", traceback.format_exc()) self.alert_and_recover( diff --git a/rc/control/save_run_record.py b/rc/control/save_run_record.py index af8457ec..910ba375 100644 --- a/rc/control/save_run_record.py +++ b/rc/control/save_run_record.py @@ -15,6 +15,141 @@ from rc.control.utilities import get_build_info from rc.control.utilities import expand_environment_variable_in_string +# Database support (optional) +try: + from rc.control.run_record_database import get_db_connection + from rc.control.run_record_database import get_db_schema + from rc.control.run_record_database import is_database_enabled + from rc.control.run_record_database import get_db_prefix + from rc.control.run_record_database import create_tables_if_not_exist +except ImportError: + # If module doesn't exist, define no-op function + # is_database_enabled returns False, which causes early exit in _save_run_record_to_database + def is_database_enabled(self): + return False + + +def _save_run_record_to_database(self): + """Save run record procinfo and FHiCL to PostgreSQL database. + + Store process information in {prefix}_components and the used FHiCL content to {prefix}_fcl + linked with run_number in PostgreSQL database. Tables are created automatically if they don't exist. + + Note: This function should be called from do_start_running when run_number is available. + """ + # Check if database saving is enabled + if not is_database_enabled(self): + return + + # Get run_number (must be set when this is called from do_start_running) + run_number = getattr(self, 'run_number', None) + if run_number is None: + self.print_log( + "w", + "run_number not set, skipping database save", + 2, + ) + return + + # Get database connection + conn = get_db_connection(self) + if conn is None: + return + + try: + import psycopg2 + from psycopg2 import sql + except ImportError: + return + + try: + cursor = conn.cursor() + dbschema = get_db_schema(self) + prefix = get_db_prefix(self) + + # Create tables if they don't exist + create_tables_if_not_exist(cursor, dbschema, prefix) + + # Save process information (procinfo) to {prefix}_components table + if hasattr(self, 'procinfos') and self.procinfos: + components_table = sql.Identifier(dbschema, "%s_components" % prefix) + + for procinfo in self.procinfos: + # Fields: run_number, name, rank, host, port, label, subsystem, allowed_processors, target + components_query = sql.SQL( + "INSERT INTO {table} (" + "run_number, name, rank, host, port, label, subsystem, allowed_processors, target) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) " + "ON CONFLICT (run_number, label) DO UPDATE SET " + "name = EXCLUDED.name, " + "rank = EXCLUDED.rank, " + "host = EXCLUDED.host, " + "port = EXCLUDED.port, " + "subsystem = EXCLUDED.subsystem, " + "allowed_processors = EXCLUDED.allowed_processors, " + "target = EXCLUDED.target" + ).format(table=components_table) + + cursor.execute( + components_query, + ( + run_number, + procinfo.name, + procinfo.rank, + procinfo.host, + procinfo.port, + procinfo.label, + procinfo.subsystem, + getattr(procinfo, 'allowed_processors', None), + getattr(procinfo, 'target', None), + ), + ) + + # Save FHiCL content to {prefix}_fcl table + # Fields: run_number, name (process type/name), label, content (fhicl_used) + if hasattr(procinfo, 'fhicl_used') and procinfo.fhicl_used: + fcl_table = sql.Identifier(dbschema, "%s_fcl" % prefix) + + fcl_query = sql.SQL( + "INSERT INTO {table} (" + "run_number, name, label, content) " + "VALUES (%s, %s, %s, %s) " + "ON CONFLICT (run_number, label) DO UPDATE SET " + "name = EXCLUDED.name, " + "content = EXCLUDED.content" + ).format(table=fcl_table) + + cursor.execute( + fcl_query, + ( + run_number, + procinfo.name, # process type/name + procinfo.label, + procinfo.fhicl_used, # FHiCL content + ), + ) + + conn.commit() + self.print_log( + "d", + "Saved run record to database (run_number: %s, prefix: %s)" % ( + run_number, + prefix + ), + 2, + ) + + except Exception as e: + conn.rollback() + self.print_log( + "w", + make_paragraph( + "Failed to save run record to database: %s. " + "File-based saving completed successfully." % str(e) + ), + ) + self.print_log("w", traceback.format_exc()) + def save_run_record_base(self): @@ -187,7 +322,10 @@ def save_run_record_base(self): ] # Directly assigning would make buildinfo_packages a reference, not a copy buildinfo_packages.append("artdaq-daqinterface") - package_buildinfo_dict = get_build_info(buildinfo_packages, self.daq_setup_script) + try: + package_buildinfo_dict = get_build_info(buildinfo_packages, self.daq_setup_script) + except Exception: + package_buildinfo_dict = None try: @@ -215,7 +353,10 @@ def save_run_record_base(self): % (self.package_versions["artdaq-daqinterface"]) ) - outf.write(" %s\n\n" % (package_buildinfo_dict["artdaq-daqinterface"])) + if package_buildinfo_dict: + outf.write(" %s\n\n" % (package_buildinfo_dict["artdaq-daqinterface"])) + else: + outf.write(" %s\n\n" % ("Build info not available")) package_commit_dict = {} packages_whose_versions_we_need = [] @@ -253,9 +394,10 @@ def save_run_record_base(self): self.package_versions[pkgname], ) - for pkg in sorted(package_commit_dict.keys()): - outf.write("%s" % (package_commit_dict[pkg])) - outf.write(" %s\n\n" % (package_buildinfo_dict[pkg])) + if package_buildinfo_dict: + for pkg in sorted(package_commit_dict.keys()): + outf.write("%s" % (package_commit_dict[pkg])) + outf.write(" %s\n\n" % (package_buildinfo_dict[pkg])) outf.write( "\nprocess management method: %s\n" @@ -312,6 +454,15 @@ def save_run_record_base(self): ), 2, ) + + # Database saving is done in do_start_running when run_number is available + self.print_log( + "w", + make_paragraph( + "Warning: Exception occurred while saving run record to database. " + "File-based saving completed successfully." + ), + ) def save_metadata_value_base(self, key, value): From 8e10a11e3df48d77e530d4af00e853748fc44df1 Mon Sep 17 00:00:00 2001 From: Mu2e Date: Thu, 18 Dec 2025 08:41:54 -0600 Subject: [PATCH 2/2] adding functionality to save run records to postgresql: adding missing file --- rc/control/run_record_database.py | 164 ++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 rc/control/run_record_database.py diff --git a/rc/control/run_record_database.py b/rc/control/run_record_database.py new file mode 100644 index 00000000..c87f2251 --- /dev/null +++ b/rc/control/run_record_database.py @@ -0,0 +1,164 @@ + +""" +Database helper functions for saving run records to PostgreSQL. + +""" + +import os + + +def get_db_connection(self): + """ + Get or create database connection. Returns None if database saving is disabled. + + Connection is stored in self._run_record_db_conn for reuse. + Database settings are read from instance variables by read_settings(). + + Returns: + psycopg2.connection or None: Database connection if enabled and successful, None otherwise + """ + # Check if database saving is enabled (from settings file) + if not hasattr(self, 'enable_run_record_database') or not self.enable_run_record_database: + return None + + # Return existing connection if available (stored in instance) + if hasattr(self, '_run_record_db_conn') and self._run_record_db_conn is not None: + return self._run_record_db_conn + + # Try to import psycopg2 + try: + import psycopg2 + except ImportError: + self.print_log( + "w", + "psycopg2 module not available. Database saving disabled. " + "Install psycopg2 to enable database saving." + ) + return None + + # Get database configuration from instance variables (set by read_settings()) + conn_params = { + 'host': getattr(self, 'run_record_database_host', ''), + 'port': getattr(self, 'run_record_database_port', ''), + 'dbname': getattr(self, 'run_record_database_name', 'run_info'), + 'user': getattr(self, 'run_record_database_user', ''), + 'password': getattr(self, 'run_record_database_pwd', ''), + } + + # Build connection string - only add parameters if they are set (non-empty) + conn_info_parts = ["%s=%s" % (k, v) for k, v in conn_params.items() if v] + conn_info_parts.append("connect_timeout=10") + conn_info = " ".join(conn_info_parts) + + try: + self._run_record_db_conn = psycopg2.connect(conn_info) + self.print_log( + "d", + "Database connection opened successfully for run record saving", + 2, + ) + return self._run_record_db_conn + except Exception as e: + self.print_log( + "w", + "Failed to connect to database for run record saving: %s. " + "Database saving will be skipped." % str(e) + ) + self._run_record_db_conn = None + return None + + +def get_db_schema(self): + """Get database schema name from instance variable (set by read_settings()).""" + return getattr(self, 'run_record_database_schema', 'test') + + +def is_database_enabled(self): + """Check if database saving is enabled via instance variable (set by read_settings()).""" + return hasattr(self, 'enable_run_record_database') and self.enable_run_record_database + + +def get_db_prefix(self): + """Get database table prefix from instance variable (set by read_settings()).""" + return getattr(self, 'run_record_database_prefix', 'artdaq') + + +def create_tables_if_not_exist(cursor, dbschema, prefix="artdaq"): + """Create database tables if they don't exist. + + Creates two tables: + - {prefix}_components: stores process information (procinfo) per run + - {prefix}_fcl: stores FHiCL content per run + + Args: + cursor: psycopg2 cursor object + dbschema: Database schema name + prefix: Table name prefix (defaults to 'artdaq') + """ + from psycopg2 import sql + + # Create {prefix}_components table + # Uses run_number as primary key (saved during do_start_running when run_number is available) + components_table = sql.Identifier(dbschema, "%s_components" % prefix) + create_components_table = sql.SQL( + "CREATE TABLE IF NOT EXISTS {table} (" + "run_number INTEGER NOT NULL, " + "name VARCHAR(255) NOT NULL, " + "rank INTEGER NOT NULL, " + "host VARCHAR(255) NOT NULL, " + "port VARCHAR(50) NOT NULL, " + "label VARCHAR(255) NOT NULL, " + "subsystem VARCHAR(50), " + "allowed_processors VARCHAR(255), " + "target VARCHAR(255), " + "insertion_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + "PRIMARY KEY (run_number, label)" + ")" + ).format(table=components_table) + + cursor.execute(create_components_table) + + # Create index on run_number + components_index = sql.Identifier(dbschema, "%s_components_run_number_idx" % prefix) + create_components_index = sql.SQL( + "CREATE INDEX IF NOT EXISTS {index} ON {table} (run_number)" + ).format(index=components_index, table=components_table) + cursor.execute(create_components_index) + + # Create composite index on (run_number, name) for efficient queries by run and process type + components_run_name_index = sql.Identifier(dbschema, "%s_components_run_name_idx" % prefix) + create_components_run_name_index = sql.SQL( + "CREATE INDEX IF NOT EXISTS {index} ON {table} (run_number, name)" + ).format(index=components_run_name_index, table=components_table) + cursor.execute(create_components_run_name_index) + + # Create {prefix}_fcl table + # Uses run_number as primary key (saved during do_start_running when run_number is available) + fcl_table = sql.Identifier(dbschema, "%s_fcl" % prefix) + create_fcl_table = sql.SQL( + "CREATE TABLE IF NOT EXISTS {table} (" + "run_number INTEGER NOT NULL, " + "name VARCHAR(255) NOT NULL, " + "label VARCHAR(255) NOT NULL, " + "content TEXT NOT NULL, " + "insertion_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + "PRIMARY KEY (run_number, label)" + ")" + ).format(table=fcl_table) + + cursor.execute(create_fcl_table) + + # Create index on run_number + fcl_index = sql.Identifier(dbschema, "%s_fcl_run_number_idx" % prefix) + create_fcl_index = sql.SQL( + "CREATE INDEX IF NOT EXISTS {index} ON {table} (run_number)" + ).format(index=fcl_index, table=fcl_table) + cursor.execute(create_fcl_index) + + # Create composite index on (run_number, name) for efficient queries by run and process type + fcl_run_name_index = sql.Identifier(dbschema, "%s_fcl_run_name_idx" % prefix) + create_fcl_run_name_index = sql.SQL( + "CREATE INDEX IF NOT EXISTS {index} ON {table} (run_number, name)" + ).format(index=fcl_run_name_index, table=fcl_table) + cursor.execute(create_fcl_run_name_index) +