diff --git a/docs/Unified-shell-reference.md b/docs/Unified-shell-reference.md index dd05aa0f5..2cf1c9c6a 100644 --- a/docs/Unified-shell-reference.md +++ b/docs/Unified-shell-reference.md @@ -437,6 +437,7 @@ The `ps` command must take at least one the following options: * `-n/--name`, to select a process to flush based on its "friendly name". * `-s/--session`, to select the processes to flush based on a session name. * `--long-format/-l`, to get a long listing format. +* `-w/--width`, to fix the table width to a supplied length. By default, `ps` list all the processes. diff --git a/integtest/integ_test_utils.py b/integtest/integ_test_utils.py new file mode 100644 index 000000000..2f85ee3a9 --- /dev/null +++ b/integtest/integ_test_utils.py @@ -0,0 +1,336 @@ +"""Shared helpers for drunc integration tests. + +This module centralizes common patterns used by process-manager integration tests. +Importantly, most of these are defined to help with processing the stdout log outputs +of the integ tests. + +Common functions include: +- searching ordered log output for marker lines, +- requiring regex/string matches with informative assertion errors, +- extracting process-table rows from `ps` command output, +- asserting process presence/absence by friendly name. + +The helpers are intentionally lightweight and pytest-friendly: failures are +reported through `assert` with context-rich messages. +""" + +import re +from collections.abc import Callable + +ANSI_ESCAPE_RE = re.compile(r"\x1B\[[0-9;]*[A-Za-z]") + + +def strip_ansi(text: str) -> str: + """Remove ANSI escape codes from a text block.""" + return ANSI_ESCAPE_RE.sub("", text) + + +def find_line_index( + lines: list[str], + predicate: Callable[[str], bool], + *, + start_idx: int = 0, +) -> int | None: + """Return the first line index at or after `start_idx` matching `predicate`. + + Returns `None` when no line matches. + + Example: + >>> lines = [ + ... "[2026/03/17 10:48:10 UTC] INFO drunc.controller.iface Command wait running for 5 seconds.", + ... "[2026/03/17 10:48:15 UTC] INFO drunc.controller.iface Command wait ran for 5 seconds.", + ... "[2026/03/17 10:48:15 UTC] INFO drunc.echo test_recovery_post", + ... ] + >>> find_line_index(lines, lambda line: "Command wait ran" in line) + 1 + >>> find_line_index(lines, lambda line: "test_wait_done" in line) is None + True + """ + return next( + (idx for idx in range(start_idx, len(lines)) if predicate(lines[idx])), + None, + ) + + +def require_line_index( + lines: list[str], + predicate: Callable[[str], bool], + *, + error_message: str, + start_idx: int = 0, +) -> int: + """Like `find_line_index`, but assert a match exists and return its index. + + Example: + >>> lines = [ + ... "[2026/03/17 10:47:38 UTC] INFO drunc.echo test_wait", + ... "[2026/03/17 10:47:48 UTC] INFO drunc.echo test_wait_done", + ... ] + >>> require_line_index( + ... lines, + ... lambda line: "test_wait_done" in line, + ... error_message="Could not find wait completion marker", + ... ) + 1 + """ + line_idx = find_line_index(lines, predicate, start_idx=start_idx) + assert line_idx is not None, error_message + return line_idx + + +def require_line_containing( + lines: list[str], + text: str, + *, + error_message: str, + start_idx: int = 0, +) -> int: + """Assert and return index of the first line containing `text`. + + Example: + [2026/03/17] WARNING drunc.process_manager_driver Bad query for logs + ────────────────────────────── root-controller logs ────────────────────────────── + [2026/03/17] INFO drunc.init_controller Taking control of trg-controller + + header_idx = require_line_containing( + lines, + "root-controller logs", + error_message="Did not find the 'root-controller logs' header line in stdout.", + ) + + + """ + return require_line_index( + lines, + lambda line: text in line, + error_message=error_message, + start_idx=start_idx, + ) + + +def require_echo_marker_index( + lines: list[str], echo_marker: str, *, start_idx: int = 0 +) -> int: + """Assert and return index of a `drunc.echo` line ending with `echo_marker`. + This is hardcoded since echo is a specific callable function with its own logger. + + Example: + >>> lines = [ + ... "[2026/03/17 10:48:15 UTC] INFO drunc.echo test_recovery_post", + ... "Processes running", + ... ] + >>> require_echo_marker_index(lines, "test_recovery_post") + 0 + """ + return require_line_index( + lines, + lambda line: "drunc.echo" in line and line.rstrip().endswith(echo_marker), + error_message=(f"Could not find drunc.echo marker '{echo_marker}' in stdout."), + start_idx=start_idx, + ) + + +def require_pattern_match_index( + lines: list[str], + pattern: re.Pattern[str], + *, + error_message: str, + start_idx: int = 0, +) -> tuple[int, re.Match[str]]: + """Assert and return `(index, match)` for first line matching `pattern`. + + Example: + >>> lines = [ + ... "[2026/03/17] INFO drunc.iface Command wait running for 10 seconds.", + ... "[2026/03/17] INFO drunc.iface Command wait ran for 10 seconds.", + ... ] + >>> pattern = re.compile(r"Command wait ran for (\\d+) seconds\\.") + >>> line_idx, match = require_pattern_match_index( + ... lines, + ... pattern, + ... error_message="Did not find wait completion log line.", + ... ) + >>> (line_idx, match.group(1)) + (1, '10') + """ + line_idx = require_line_index( + lines, + lambda line: pattern.search(line) is not None, + error_message=error_message, + start_idx=start_idx, + ) + match = pattern.search(lines[line_idx]) + assert match is not None + return line_idx, match + + +def require_pattern_match( + text: str, + pattern: re.Pattern[str], + *, + error_message: str, +) -> re.Match[str]: + """Assert `pattern` matches `text` and return the `re.Match` object. + + Example: + >>> line = "[2026/03/17] INFO Command wait ran for 10 seconds." + >>> pattern = re.compile(r"Command wait ran for (\\d+) seconds\\.") + >>> match = require_pattern_match( + ... line, + ... pattern, + ... error_message="Did not find wait completion log line.", + ... ) + >>> match.group(1) + '10' + """ + match = pattern.search(text) + assert match is not None, error_message + return match + + +def _parse_ps_table_from_index( + lines: list[str], start_idx: int +) -> list[dict[str, str]]: + """Parse a Unicode table of processes starting after `start_idx`. + + The parser expects rows that start with `│` and stops at a line starting + with `└`. It returns dictionaries with normalized column names. + """ + table_rows: list[dict[str, str]] = [] + + for line in lines[start_idx + 1 :]: + stripped = line.strip() + + if stripped.startswith("└"): + break + + if not stripped.startswith("│"): + continue + + cells = [cell.strip() for cell in stripped.strip("│").split("│")] + if len(cells) < 7: + continue + + table_rows.append( + { + "session": cells[0], + "friendly_name": cells[1], + "user": cells[2], + "host": cells[3], + "uuid": cells[4], + "alive": cells[5], + "exit_code": cells[6], + } + ) + + return table_rows + + +def get_ps_table_after_echo(stdout: str, echo_marker: str) -> list[dict[str, str]]: + """Return parsed process-table rows found after a specific echo marker. + + If no process table is found after the marker, returns an empty list. + + Example: + >>> stdout = ( + ... "[2026/03/17 10:48:15 UTC] INFO drunc.echo test_recovery_post\n" + ... "Processes running\n" + ... "│ minimal │ root-controller │ emmuhamm │ localhost │ f201f9c7-b910-4100-bd78-11765a4d2ee1 │ True │ 0 │\n" + ... "└" + ... ) + >>> table = get_ps_table_after_echo(stdout, "test_recovery_post") + >>> table[0]["friendly_name"] + 'root-controller' + """ + lines = strip_ansi(stdout).splitlines() + + echo_idx = require_echo_marker_index(lines, echo_marker) + + table_start_idx = find_line_index( + lines, + lambda line: "Processes running" in line, + start_idx=echo_idx + 1, + ) + if table_start_idx is None: + return [] + + return _parse_ps_table_from_index(lines, table_start_idx) + + +def get_column_for_friendly_name( + ps_table: list[dict[str, str]], friendly_name: str, column: str +) -> str: + """Return the column for `friendly_name` from a parsed process table. + + Raises: + AssertionError: if the friendly name is absent. + """ + for row in ps_table: + if row["friendly_name"].strip() == friendly_name: + return row[column] + + available_names = ", ".join(row["friendly_name"].strip() for row in ps_table) + raise AssertionError( + f"Could not find friendly name '{friendly_name}' in ps table. " + f"Available names: {available_names}" + ) + + +def get_rows_for_friendly_name( + ps_table: list[dict[str, str]], friendly_name: str +) -> list[dict[str, str]]: + """Return all rows whose `friendly_name` matches exactly after stripping.""" + return [row for row in ps_table if row["friendly_name"].strip() == friendly_name] + + +def assert_process_presence( + ps_table: list[dict[str, str]], + friendly_name: str, + *, + context: str, + expected_present: bool = True, +) -> None: + """Assert whether a process is present/absent in a process table. + + Args: + ps_table: Parsed process rows. + friendly_name: Process name to check. + expected_present: `True` if process should exist, `False` otherwise. + context: Short phrase appended to error text (e.g. "before kill"). + + Example: + >>> ps_table = [ + ... { + ... "session": "minimal", + ... "friendly_name": "root-controller", + ... "user": "daq", + ... "host": "localhost", + ... "uuid": "f201f9c7-b910-4100-bd78-11765a4d2ee1", + ... "alive": "True", + ... "exit_code": "0", + ... } + ... ] + >>> assert_process_presence( + ... ps_table, + ... "root-controller", + ... context="before restart", + ... expected_present=True, + ... ) + >>> assert_process_presence( + ... ps_table, + ... "mlt", + ... context="after restart", + ... expected_present=False, + ... ) + """ + matching_rows = get_rows_for_friendly_name(ps_table, friendly_name) + + if expected_present: + assert matching_rows, ( + f"Expected to find '{friendly_name}' in ps table {context}, but it was missing." + ) + return + + assert not matching_rows, ( + f"Expected '{friendly_name}' to be absent from ps table {context}, but it is still present." + ) diff --git a/integtest/process_manager_test.py b/integtest/process_manager_test.py new file mode 100644 index 000000000..3284bddee --- /dev/null +++ b/integtest/process_manager_test.py @@ -0,0 +1,407 @@ +import getpass +import os +import re +from datetime import datetime + +import integrationtest.data_classes as data_classes +import integrationtest.log_file_checks as log_file_checks +from integ_test_utils import ( + assert_process_presence, + get_column_for_friendly_name, + get_ps_table_after_echo, + require_echo_marker_index, + require_line_containing, + require_pattern_match, + require_pattern_match_index, + strip_ansi, +) + +pytest_plugins = "integrationtest.integrationtest_drunc" + +# Values that help determine the running conditions +number_of_data_producers = 2 +data_rate_slowdown_factor = 1 # 10 for ProtoWIB/DuneWIB +run_duration = 10 # seconds +readout_window_time_before = 1000 +readout_window_time_after = 1001 + +check_for_logfile_errors = True + +ignored_logfile_problems = { + "-controller": [ + "Worker with pid \\d+ was terminated due to signal", + "Connection '.*' not found on the application registry", + ], + "connectivity-service": [ + "errorlog: -", + ], +} + +# The next three variable declarations *must* be present as globals in the test +# file. They're read by the "fixtures" in conftest.py to determine how +# to run the config generation and nanorc + +# The arguments to pass to the config generator, excluding the json +# output directory (the test framework handles that) + +# CCM includes FSM, hosts; moduleconfs includes connections +object_databases = ["config/daqsystemtest/integrationtest-objects.data.xml"] + +conf_dict = data_classes.drunc_config() +conf_dict.dro_map_config.n_streams = number_of_data_producers +conf_dict.op_env = "integtest" +conf_dict.session = "minimal" +conf_dict.tpg_enabled = False + +# For testing, allow drunc to manage ConnectivityService (default is False, integrationtest manages Connectivity Service) +conf_dict.drunc_connsvc = True +# For testing, specify connectivity service port (default is 0, a random port is chosen for the Connectivity Service) +# conf_dict.connsvc_port = 12345 + +substitution = data_classes.attribute_substitution( + obj_id="random-tc-generator", + obj_class="RandomTCMakerConf", + updates={"trigger_rate_hz": 1}, +) +conf_dict.config_substitutions.append( + data_classes.attribute_substitution( + obj_class="TCReadoutMap", + obj_id="def-random-readout", + updates={ + "time_before": readout_window_time_before, + "time_after": readout_window_time_after, + }, + ) +) +conf_dict.config_substitutions.append(substitution) + + +confgen_arguments = {"MinimalSystem": conf_dict} +# The commands to run in dunerc +# The commands mostly come from the msqt, with a few minor changes +# The entire format is a standard that is basically copied over from the +# typical msqt tests, so they bear no direct effect on the scope of this test. +dunerc_command_list = f""" + +echo pre_boot +ps -u {getpass.getuser()} -w 180 +boot +echo post_boot +ps -u {getpass.getuser()} -w 180 + + +echo test_logs +logs --name unknown +logs --name root-controller --how-far 5 +logs --name mlt --how-far 5 +echo test_logs_done + +echo test_wait +wait 10 +echo test_wait_done + +echo pre_restart_mlt +restart -n mlt +restart -n root-controller +wait 5 +echo post_restart_mlt + + +echo test_kill_mlt +ps -u {getpass.getuser()} -w 180 +kill -n mlt +wait 2 +echo test_kill_mlt_post +ps -u {getpass.getuser()} -w 180 +echo test_kill_mlt_done + + +echo test_recovery +restart -n mlt +restart -n trg-controller +wait 5 +echo test_recovery_post +ps -u {getpass.getuser()} -w 180 +echo test_recovery_done + + +echo test_flush +ps -u {getpass.getuser()} -w 180 +kill -n mlt --crash +wait 5 +echo after_crash +ps -u {getpass.getuser()} -w 180 +flush +echo after_flush +ps -u {getpass.getuser()} -w 180 +echo test_flush_done + + +terminate + +""".split() + + +UUID_RE = re.compile( + r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$" +) + + +def test_nanorc_success(run_dunerc) -> None: + """Checks that the drunc integration command sequence completes successfully.""" + # print the name of the current test + current_test = os.environ.get("PYTEST_CURRENT_TEST") + match_obj = re.search(r".*\[(.+)-run_.*rc.*\d].*", current_test) + if match_obj: + current_test = match_obj.group(1) + banner_line = re.sub(".", "=", current_test) + print(banner_line) + print(current_test) + print(banner_line) + + # Check that nanorc completed correctly + assert run_dunerc.completed_process.returncode == 0 + + +def test_log_files(run_dunerc) -> None: + """Checks that expected process-manager log files exist and are free of errors.""" + # Check that at least some of the expected log files are present + assert any( + f"{run_dunerc.daq_session_name}_df-01" in str(logname) + for logname in run_dunerc.log_files + ) + assert any( + f"{run_dunerc.daq_session_name}_dfo" in str(logname) + for logname in run_dunerc.log_files + ) + assert any( + f"{run_dunerc.daq_session_name}_mlt" in str(logname) + for logname in run_dunerc.log_files + ) + assert any( + f"{run_dunerc.daq_session_name}_ru" in str(logname) + for logname in run_dunerc.log_files + ) + + if check_for_logfile_errors: + # Check that there are no warnings or errors in the log files + assert log_file_checks.logs_are_error_free( + [ + logname + for logname in run_dunerc.log_files + if "process_manager" in str(logname) + ], + True, + True, + ignored_logfile_problems, + ) + + +def test_boot(run_dunerc) -> None: + """Checks that boot starts the managed processes and exposes UUIDs in ps.""" + stdout = run_dunerc.completed_process.stdout + + ps_pre_boot = get_ps_table_after_echo(stdout, "pre_boot") + ps_post_boot = get_ps_table_after_echo(stdout, "post_boot") + + assert not ps_pre_boot, ( + f"Expected ps table before boot to be empty, but found {len(ps_pre_boot)} row(s): " + + ", ".join(row["friendly_name"] for row in ps_pre_boot) + ) + + assert ps_post_boot, ( + "Expected ps table after boot to contain processes, but it was empty." + ) + for row in ps_post_boot: + assert UUID_RE.match(row["uuid"]), ( + f"Expected a valid UUID for process '{row['friendly_name']}', got '{row['uuid']}'" + ) + + +def test_unknown_log_command(run_dunerc) -> None: + """Checks that querying logs for an unknown process reports the expected error.""" + test_str = ( + "Bad query for logs: The process corresponding to the query doesn't exist" + ) + assert test_str in run_dunerc.completed_process.stdout + + +def test_root_controller_logs(run_dunerc) -> None: + """ + Verifies that: + - the stdout contains a "root-controller logs" header line and a "root-controller end" footer line + - there are exactly 5 lines between those two lines + - among those 5 lines, the one from "drunc.controller.core.init_controller" ends with "Controller ready" + """ + lines = run_dunerc.completed_process.stdout.splitlines() + + # 1) Find the header/footer lines + header_idx = require_line_containing( + lines, + "root-controller logs", + error_message="Did not find the 'root-controller logs' header line in stdout.", + ) + footer_idx = require_line_containing( + lines, + "root-controller end", + error_message="Did not find the 'root-controller end' footer line in stdout.", + ) + assert footer_idx > header_idx, "Footer appears before header in stdout." + + # 2) Check there are 5 lines between header and footer + between = lines[header_idx + 1 : footer_idx] + assert len(between) == 5, ( + f"Expected exactly 5 lines between header and footer, found {len(between)}.\nBetween:\n" + + "\n".join(between) + ) + + # 3) Check one of the init_controller line ends with "Controller ready" + # Example line: + # [2026/03/13 08:17:47 UTC] INFO ... drunc.controller.core.init_controller ... Controller ready + init_controller_ready_re = re.compile( + r"drunc\.controller\.core\.init_controller.*Controller ready\s*$" + ) + + matches = [line for line in between if init_controller_ready_re.search(line)] + assert len(matches) >= 1, ( + "Did not find an init_controller line ending with 'Controller ready' within the 5 lines.\nBetween:\n" + + "\n".join(between) + ) + + +def test_wait_command_duration_from_logs(run_dunerc) -> None: + """Checks that the wait command logs the expected duration and elapsed time.""" + lines = strip_ansi(run_dunerc.completed_process.stdout).splitlines() + + echo_idx = require_echo_marker_index(lines, "test_wait") + + running_pattern = re.compile(r"Command wait running for (\d+) seconds\.") + ran_pattern = re.compile(r"Command wait ran for (\d+) seconds\.") + timestamp_pattern = re.compile(r"^\[(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) UTC\]") + + running_idx, running_match = require_pattern_match_index( + lines, + running_pattern, + error_message=( + "Did not find 'Command wait running for ... seconds.' after test_wait marker." + ), + start_idx=echo_idx + 1, + ) + + ran_idx, ran_match = require_pattern_match_index( + lines, + ran_pattern, + error_message=( + "Did not find 'Command wait ran for ... seconds.' after wait start log." + ), + start_idx=running_idx + 1, + ) + + expected_seconds = 10 + assert int(running_match.group(1)) == expected_seconds, ( + f"Expected wait start log to report {expected_seconds} seconds, got {running_match.group(1)}." + ) + assert int(ran_match.group(1)) == expected_seconds, ( + f"Expected wait end log to report {expected_seconds} seconds, got {ran_match.group(1)}." + ) + + start_ts_match = require_pattern_match( + lines[running_idx], + timestamp_pattern, + error_message="Could not parse timestamp in wait start log line.", + ) + end_ts_match = require_pattern_match( + lines[ran_idx], + timestamp_pattern, + error_message="Could not parse timestamp in wait end log line.", + ) + + ts_strp_pattern = "%Y/%m/%d %H:%M:%S" + start_ts = datetime.strptime(start_ts_match.group(1), ts_strp_pattern) + end_ts = datetime.strptime(end_ts_match.group(1), ts_strp_pattern) + elapsed_seconds = (end_ts - start_ts).total_seconds() + + tolerance_seconds = 1 + assert abs(elapsed_seconds - expected_seconds) <= tolerance_seconds, ( + f"Expected wait log timestamps to differ by {expected_seconds}±{tolerance_seconds} seconds, " + f"got {elapsed_seconds} seconds." + ) + + +def test_restart_mlt_logs(run_dunerc) -> None: + """Checks that restarting mlt produces the expected restart, exit, and boot logs.""" + stdout = run_dunerc.completed_process.stdout + lines = strip_ansi(stdout).splitlines() + + echo_idx = require_echo_marker_index(lines, "pre_restart_mlt") + + post_restart_idx = require_echo_marker_index( + lines, "post_restart_mlt", start_idx=echo_idx + 1 + ) + + restart_lines = lines[echo_idx + 1 : post_restart_idx] + restart_text = "\n".join(restart_lines) + + require_pattern_match( + restart_text, + re.compile( + r"Remote process .*?terminated gracefully following SIGQUIT signal\.", + re.DOTALL, + ), + error_message="Did not find the graceful termination log line for mlt after restart request.", + ) + + require_pattern_match( + restart_text, + re.compile(r"Process 'mlt'.*?process exited\s+with exit code 0", re.DOTALL), + error_message="Did not find the mlt exit-code log line after graceful termination.", + ) + + booted_match = require_pattern_match( + restart_text, + re.compile(r"Booted 'mlt'.*?with UUID\s+([^\s\n]+)", re.DOTALL), + error_message="Did not find the mlt boot log line after the restart exit log.", + ) + + booted_uuid = booted_match.group(1) + assert UUID_RE.match(booted_uuid), ( + f"Expected the mlt boot log to contain a UUID, got: {booted_uuid}" + ) + + +def test_kill_removes_mlt_from_ps_table(run_dunerc) -> None: + """Checks that killing mlt removes it from the subsequent ps table.""" + stdout = run_dunerc.completed_process.stdout + + ps_before_kill = get_ps_table_after_echo(stdout, "test_kill_mlt") + ps_after_kill = get_ps_table_after_echo(stdout, "test_kill_mlt_post") + + assert_process_presence(ps_before_kill, "mlt", context="before kill") + assert_process_presence( + ps_after_kill, "mlt", context="after kill", expected_present=False + ) + + +def test_mlt_recovers_after_kill(run_dunerc) -> None: + """Checks that mlt is present again after the recovery restart sequence.""" + stdout = run_dunerc.completed_process.stdout + ps_after_recovery = get_ps_table_after_echo(stdout, "test_recovery_post") + assert_process_presence(ps_after_recovery, "mlt", context="after recovery") + + +def test_flush(run_dunerc) -> None: + """Checks that flush work by crashing mlt, seeing that the process exists, + and then flushing to show its gone""" + + stdout = run_dunerc.completed_process.stdout + ps_initial = get_ps_table_after_echo(stdout, "test_flush") + assert_process_presence(ps_initial, "mlt", context="before crash") + + ps_after_crash = get_ps_table_after_echo(stdout, "after_crash") + mlt_alive = get_column_for_friendly_name(ps_after_crash, "mlt", "alive") + assert mlt_alive == "False", "The mlt should have crashed" + + ps_after_flash = get_ps_table_after_echo(stdout, "after_flush") + assert_process_presence( + ps_after_flash, "mlt", context="after crash", expected_present=False + ) diff --git a/scripts/drunc_integtest_bundle.sh b/scripts/drunc_integtest_bundle.sh new file mode 100755 index 000000000..7cea68c87 --- /dev/null +++ b/scripts/drunc_integtest_bundle.sh @@ -0,0 +1,299 @@ +#!/bin/bash + +# Defines a driver script for the drunc integration tests. +# The purpose of these scripts is to run a set of integration test with all of the features of drunc tested, so any introduced changes do not affect functionality of the existing infrastructure. +# Based entirely of the implementation of daqsystemtest_integtest_bundle.sh +# Original author: KAB, 10-Oct-2023 + +integtest_list=( "process_manager_test.py" ) +let last_test_index=${#integtest_list[@]}-1 + +usage() { + declare -r script_name=$(basename "$0") + echo """ +Usage: +"${script_name}" [option(s)] + +Options: + -h, --help : prints out usage information + -f + -l + -k + -n + -N + --stop-on-failure : causes the script to stop when one of the integtests reports a failure + --concise-output : suppresses run control and DAQApp messages in order to focus on test results + --tmpdir : specifies a root directory to use for test output, e.g. a directory instead of '/tmp' +""" + let counter=0 + echo "List of available tests:" + for tst in ${integtest_list[@]}; do + echo " ${counter}: $tst" + let counter=${counter}+1 + done + echo "" +} + +# 29-Dec-2025, KAB: Determine if a non-standard pytest tmpdir has been specified +# in the linux shell environment in which this script is being run. We need to know +# this value in order to direct functionality in this script to the right place. +# A user-specified command-line value for the tmpdir over-rides the value determined here. +tmpdir_root=`dst_get_pytest_tmpdir` + +# Removes the ANSI characters associated with formatting, including color coding and font styling +CaptureOutputNoANSI() { + tee -a >(sed -u 's/\x1b\[[0-9;]*m//g' >> "$1") +} +# Captures the output to the specified file, without changing the output +CaptureOutput() { + tee -a $1 +} + +GETOPT_TEMP=`getopt -o hs:f:l:k:n:N: --long help,stop-on-failure,concise-output,tmpdir: -- "$@"` +eval set -- "$GETOPT_TEMP" + +let first_test_index=0 +let individual_test_requested_iterations=1 +let full_set_requested_interations=1 +let stop_on_failure=0 +requested_test_names= +PYTEST_COMMAND="pytest -c /dev/null -s --tb=short" # our core pytest command, with DAQ printout included and short pytest traceback + +while true; do + case "$1" in + -h|--help) + usage + exit 0 + ;; + -f) + let first_test_index=$2 + shift 2 + ;; + -l) + let last_test_index=$2 + shift 2 + ;; + -k) + requested_test_names=$2 + shift 2 + ;; + -n) + let individual_test_requested_iterations=$2 + shift 2 + ;; + -N) + let full_set_requested_interations=$2 + shift 2 + ;; + --stop-on-failure) + let stop_on_failure=1 + PYTEST_COMMAND="${PYTEST_COMMAND} -x" # add the -x option to our pytest command to have it exit on first error + shift + ;; + --concise-output) + PYTEST_COMMAND="`echo ${PYTEST_COMMAND} | sed 's/ -s//'`" # remove the -s option to turn off messages from DAQ processes + shift + ;; + --tmpdir) + tmpdir_root=$2 + export PYTEST_DEBUG_TEMPROOT=${tmpdir_root} + shift 2 + ;; + --) + shift + break + ;; + esac +done + +# check if the numad daemon is running +numad_grep_output=`ps -ef | grep numad | grep -v grep` +if [[ "${numad_grep_output}" != "" ]]; then + echo "*********************************************************************" + echo "*** DANGER, DANGER, 'numad' appears to be running on this computer!" + echo "*** 'ps' output: ${numad_grep_output}" + echo "*** now if you want to abort this testing." + echo "*********************************************************************" + sleep 3 +fi + +# other setup +INITIAL_TIMESTAMP=`date '+%Y%m%d%H%M%S'` +# 30-Dec-2025, KAB: check that the specified tmpdir exists and is writeable +if [[ ! -d ${tmpdir_root} ]]; then + echo "*** ERROR: directory \"${tmpdir_root}\" does not exist." + exit 1 +fi +if [[ ! -w ${tmpdir_root} ]]; then + echo "*** ERROR: directory \"${tmpdir_root}\" is not writeable in the current environment." + exit 1 +fi +pytest_user_dir=${tmpdir_root}/pytest-of-${USER} +mkdir -p ${pytest_user_dir} +ITGRUNNER_LOG_FILE="${pytest_user_dir}/drunc_integtest_bundle_${INITIAL_TIMESTAMP}.log" +CURRENT_PID=$$ + +let number_of_individual_tests=0 +let test_index=0 +for TEST_NAME in "${integtest_list[@]}"; do + if [[ ${test_index} -ge ${first_test_index} && ${test_index} -le ${last_test_index} ]]; then + requested_test=`echo ${TEST_NAME} | egrep -i ${requested_test_names:-${TEST_NAME}}` + if [[ "${requested_test}" != "" ]]; then + let number_of_individual_tests=${number_of_individual_tests}+1 + fi + fi + let test_index=${test_index}+1 +done +let total_number_of_tests=${number_of_individual_tests}*${individual_test_requested_iterations}*${full_set_requested_interations} + +# run the tests +let overall_test_index=0 # this is only used for user feedback +let full_set_loop_count=0 +while [[ ${full_set_loop_count} -lt ${full_set_requested_interations} ]]; do + let test_index=0 + for TEST_NAME in "${integtest_list[@]}"; do + if [[ ${test_index} -ge ${first_test_index} && ${test_index} -le ${last_test_index} ]]; then + CURRENT_TIMESTAMP=`date '+%Y%m%d%H%M%S'` + # 15-Dec-2025, KAB: added the export of the following enviromental variable. This is used + # by the integrationtest infrastructure to put a bread-crumb file in the directory where + # the test results are located. That file, in turn, allows this script to find the directory + # for the current test, and make a copy of it if the test fails. + export DUNEDAQ_INTEGTEST_BUNDLE_INFO="${INITIAL_TIMESTAMP};${CURRENT_PID};${CURRENT_TIMESTAMP}" + requested_test=`echo ${TEST_NAME} | egrep -i ${requested_test_names:-${TEST_NAME}}` + if [[ "${requested_test}" != "" ]]; then + let individual_loop_count=0 + while [[ ${individual_loop_count} -lt ${individual_test_requested_iterations} ]]; do + let overall_test_index=${overall_test_index}+1 + echo "" + echo -e "\U0001F535 \033[0;34mStarting test ${overall_test_index} of ${total_number_of_tests}...\033[0m \U0001F535" | CaptureOutput ${ITGRUNNER_LOG_FILE} + + echo -e "\u2B95 \033[0;1mRunning ${TEST_NAME}\033[0m \u2B05" | CaptureOutput ${ITGRUNNER_LOG_FILE} + if [[ -e "./${TEST_NAME}" ]]; then + ${PYTEST_COMMAND} ./${TEST_NAME} | CaptureOutputNoANSI ${ITGRUNNER_LOG_FILE} + elif [[ -e "${DBT_AREA_ROOT}/pythoncode/drunc/integtest/${TEST_NAME}" ]]; then + if [[ -w "${DBT_AREA_ROOT}" ]]; then + ${PYTEST_COMMAND} ${DBT_AREA_ROOT}/pythoncode/drunc/integtest/${TEST_NAME} | CaptureOutputNoANSI ${ITGRUNNER_LOG_FILE} + else + ${PYTEST_COMMAND} -p no:cacheprovider ${DBT_AREA_ROOT}/pythoncode/drunc/integtest/${TEST_NAME} | CaptureOutputNoANSI ${ITGRUNNER_LOG_FILE} + fi + else + ${PYTEST_COMMAND} -p no:cacheprovider ${DAQSYSTEMTEST_SHARE}/integtest/${TEST_NAME} | CaptureOutputNoANSI ${ITGRUNNER_LOG_FILE} + fi + let pytest_return_code=${PIPESTATUS[0]} + + let individual_loop_count=${individual_loop_count}+1 + + # check if the test failed + if [[ ${pytest_return_code} -ne 0 ]]; then + # 15-Dec-2025, KAB: make a copy of the pytest directory. This allows + # testers to take a look at the results within a reasonable time frame. + # (If we can't find the "jq" JSON utility, we simply note that fact + # and continue.) + # This code makes use of a bread-crumb file that is created by the + # integrationtest infrastructure. + if [[ "`which jq 2>/dev/null`" != "" ]]; then + current_pytest_rundir="" + mapfile -t bundle_info_files < <(find "${pytest_user_dir}" -type f -name "bundle_script_info.json" -printf '%T@ %p\n' | grep -v 'failed-' | sort -nr | awk '{print $2}') + for info_file in "${bundle_info_files[@]}"; do + script_start_time=`jq -r .bundle_script_start_time ${info_file}` + script_pid=`jq -r .bundle_script_process_id ${info_file}` + individual_test_start_time=`jq -r .individual_test_start_time ${info_file}` + if [[ ${script_start_time} -eq ${INITIAL_TIMESTAMP} ]] && \ + [[ ${script_pid} -eq ${CURRENT_PID} ]] && \ + [[ ${individual_test_start_time} -eq ${CURRENT_TIMESTAMP} ]]; then + current_pytest_rundir=$info_file + break + fi + done + + was_successfully_copied="" + if [[ "${current_pytest_rundir}" != "" ]]; then + pytest_tmpdir=`echo ${current_pytest_rundir} | xargs -r dirname | xargs -r dirname` + if [[ "${pytest_tmpdir}" != "" ]]; then + pytest_rootdir=`echo ${pytest_tmpdir} | xargs -r dirname` + pytest_basedir=`echo ${pytest_tmpdir} | xargs -r basename` + if [[ "${pytest_rootdir}" != "" ]] && [[ "${pytest_basedir}" != "" ]]; then + new_dir="${pytest_rootdir}/failed-${pytest_basedir}" + echo "" + echo -e "\U1F535 Copying the files from failed test ${pytest_tmpdir} to ${new_dir}. \U1F535" + cp -pR "${pytest_tmpdir}" "${new_dir}" + if [[ $? == 0 ]]; then + was_successfully_copied="yes" + # 18-Dec-2025, KAB: added the removal of the "current" symbolic links + # from inside the copied directory (since they get broken in the copying) + rm -f "${new_dir}/configcurrent" + rm -f "${new_dir}/runcurrent" + fi + fi + fi + fi + if [[ "${was_successfully_copied}" == "" ]]; then + echo "" + echo -e "\U1f7e1 WARNING: Unable to copy the pytest directory for this failed test (${current_pytest_rundir}). \U1f7e1" + fi + else + echo "" + echo -e "\U1f7e1 WARNING: Unable to find the 'jq' utility which is needed to help identify which pytest directory to copy for this failed test. \U1f7e1" + fi + + # remove stale and surplus directories from failed tests + test_dirs_to_remove=() + mapfile -t all_failed_test_dirs < <(find ${pytest_user_dir} -maxdepth 1 -type d -printf '%T@ %p\n' | sort -nr | awk '{print $2}' | grep 'failed-') + surplus_dirs=("${all_failed_test_dirs[@]:10}") + for test_dir in "${surplus_dirs[@]}"; do + test_dirs_to_remove+=(${test_dir}) + done + stale_failed_test_dirs=(`find ${pytest_user_dir} -maxdepth 1 -type d -name 'failed-*' -cmin +1560 -print`) + for test_dir in "${stale_failed_test_dirs[@]}"; do + test_dirs_to_remove+=(${test_dir}) + done + if [[ ${#test_dirs_to_remove[@]} -gt 0 ]];then + echo -e "\U1F535 Removing ${#test_dirs_to_remove[@]} old failed test directory(ies). \U1F535" + for test_dir in "${test_dirs_to_remove[@]}"; do + if [[ -e "${test_dir}" ]]; then + rm -rf "${test_dir}" + fi + done + fi + + # exit out of this script if the user has requested that we stop on a failure + if [[ ${stop_on_failure} -gt 0 ]]; then + break 3 + fi + fi + done + fi + fi + let test_index=${test_index}+1 + done + + let full_set_loop_count=${full_set_loop_count}+1 +done + +# print out summary information +echo "" | CaptureOutput ${ITGRUNNER_LOG_FILE} +echo "" | CaptureOutput ${ITGRUNNER_LOG_FILE} +echo "+++++++++++++++++++++++++++++++++++++++++++++++++" | CaptureOutput ${ITGRUNNER_LOG_FILE} +echo "++++++++++++++++++++ SUMMARY ++++++++++++++++++++" | CaptureOutput ${ITGRUNNER_LOG_FILE} +echo "+++++++++++++++++++++++++++++++++++++++++++++++++" | CaptureOutput ${ITGRUNNER_LOG_FILE} +echo "" | CaptureOutput ${ITGRUNNER_LOG_FILE} +date | CaptureOutput ${ITGRUNNER_LOG_FILE} +echo "Log file is: ${ITGRUNNER_LOG_FILE}" | CaptureOutput ${ITGRUNNER_LOG_FILE} +echo "" | CaptureOutput ${ITGRUNNER_LOG_FILE} +summary_string="`egrep $'=====|\u2B95' ${ITGRUNNER_LOG_FILE} | egrep ' in |Running'`" +colorized_summary_string="`echo \"${summary_string}\" | sed 's/passed/passed \\\\U2705/' | sed 's/failed/failed \\\\U274c/' | sed 's/skipped/skipped \\\\U1f7e1/'`" +echo -e "${colorized_summary_string}" | CaptureOutput ${ITGRUNNER_LOG_FILE} + +# check again if the numad daemon is running +numad_grep_output=`ps -ef | grep numad | grep -v grep` +if [[ "${numad_grep_output}" != "" ]]; then + echo "" | CaptureOutput ${ITGRUNNER_LOG_FILE} + echo "********************************************************************************" | CaptureOutput ${ITGRUNNER_LOG_FILE} + echo "*** WARNING: 'numad' appears to be running on this computer!" | CaptureOutput ${ITGRUNNER_LOG_FILE} + echo "*** 'ps' output: ${numad_grep_output}" | CaptureOutput ${ITGRUNNER_LOG_FILE} + echo "*** This daemon can adversely affect the running of these tests, especially ones" | CaptureOutput ${ITGRUNNER_LOG_FILE} + echo "*** that are resource intensive in the Readout Apps. This is because numad moves" | CaptureOutput ${ITGRUNNER_LOG_FILE} + echo "*** processes (threads?) to different cores/numa nodes periodically, and that" | CaptureOutput ${ITGRUNNER_LOG_FILE} + echo "*** context switch can disrupt the stable running of the DAQ processes." | CaptureOutput ${ITGRUNNER_LOG_FILE} + echo "********************************************************************************" | CaptureOutput ${ITGRUNNER_LOG_FILE} +fi \ No newline at end of file diff --git a/src/drunc/controller/interface/commands.py b/src/drunc/controller/interface/commands.py index 4d26c00f3..f054a8e29 100644 --- a/src/drunc/controller/interface/commands.py +++ b/src/drunc/controller/interface/commands.py @@ -8,6 +8,7 @@ from drunc.utils.utils import get_logger log = get_logger("controller.iface", rich_handler=True) +log_echo = get_logger("echo", rich_handler=True) @click.command("list-transitions") @@ -243,6 +244,13 @@ def who_am_i(obj: ControllerContext) -> None: log.info(obj.get_token().user_name) +@click.command("echo") +@click.argument("text", required=False) +@click.pass_obj +def echo(obj, text: str | None) -> None: + log_echo.info(text or "") + + @click.command("who-is-in-charge") @click.option("--target", type=str, help="The target to address", default="") @click.option( diff --git a/src/drunc/controller/interface/shell.py b/src/drunc/controller/interface/shell.py index ab33f9cd5..5c71bde1c 100644 --- a/src/drunc/controller/interface/shell.py +++ b/src/drunc/controller/interface/shell.py @@ -8,6 +8,7 @@ from drunc.controller.interface.commands import ( connect, disconnect, + echo, exclude, expert_command, include, @@ -90,6 +91,7 @@ def controller_shell(ctx, controller_address: str, log_level: str) -> None: ctx.command.add_command(take_control, "take-control") ctx.command.add_command(surrender_control, "surrender-control") ctx.command.add_command(who_am_i, "whoami") + ctx.command.add_command(echo, "echo") ctx.command.add_command(who_is_in_charge, "who-is-in-charge") for transition in transitions.commands: ctx.command.add_command(*generate_fsm_command(ctx.obj, transition, desc.name)) diff --git a/src/drunc/process_manager/interface/commands.py b/src/drunc/process_manager/interface/commands.py index 9c85abf2d..129f3ea78 100644 --- a/src/drunc/process_manager/interface/commands.py +++ b/src/drunc/process_manager/interface/commands.py @@ -151,20 +151,34 @@ def dummy_boot( @click.command("terminate") +@click.option( + "-w", + "--width", + type=int, + default=None, + help="Table width. Default is automatically calculated", +) @click.pass_obj -def terminate(obj: ProcessManagerContext) -> None: +def terminate(obj: ProcessManagerContext, width: int | None) -> None: log = get_logger("process_manager.shell") log.debug("Terminating") result = obj.get_driver("process_manager").terminate() if not result: return obj.print( - tabulate_process_instance_list(result, "Terminated process", False) + tabulate_process_instance_list(result, "Terminated process", False, width=width) ) # rich tables require console printing obj.delete_driver("controller") @click.command("kill") +@click.option( + "-w", + "--width", + type=int, + default=None, + help="Table width. Default is automatically calculated", +) @add_query_options(at_least_one=True) @click.option( "--crash", @@ -173,28 +187,39 @@ def terminate(obj: ProcessManagerContext) -> None: help="Simulate a crash: send SIGKILL without any cleanup, leaving the process manager in an unexpected-death state.", ) @click.pass_obj -def kill(obj: ProcessManagerContext, query: ProcessQuery) -> None: +def kill(obj: ProcessManagerContext, query: ProcessQuery, width: int | None) -> None: log = get_logger("process_manager.shell") log.debug(f"Killing with query {query}") result = obj.get_driver("process_manager").kill(query) if not result: return obj.print( - tabulate_process_instance_list(result, "Killed process", False) + tabulate_process_instance_list(result, "Killed process", False, width=width) ) # rich tables require console printing @click.command("flush") +@click.option( + "-w", + "--width", + type=int, + default=None, + help="Table width. Default is automatically calculated", +) @add_query_options(at_least_one=False, all_processes_by_default=True) @click.pass_obj -def flush(obj: ProcessManagerContext, query: ProcessQuery) -> None: +def flush( + obj: ProcessManagerContext, + query: ProcessQuery, + width: int | None, +) -> None: log = get_logger("process_manager.shell") log.debug(f"Flushing with query {query}") result = obj.get_driver("process_manager").flush(query) if not result: return obj.print( - tabulate_process_instance_list(result, "Flushed process", False) + tabulate_process_instance_list(result, "Flushed process", False, width=width) ) # rich tables require console printing @@ -242,7 +267,7 @@ def logs( if grep is not None: line = line.replace(grep, f"[u]{grep}[/]") - obj.print(line) + obj.print(line, soft_wrap=True) if result.name is not None: obj.rule(f"[yellow]{display_name}[/yellow] end") @@ -266,8 +291,20 @@ def restart(obj: ProcessManagerContext, query: ProcessQuery) -> None: default=False, help="Whether to have a long output", ) +@click.option( + "-w", + "--width", + type=int, + default=None, + help="Table width. Default is automatically calculated", +) @click.pass_obj -def ps(obj: ProcessManagerContext, query: ProcessQuery, long_format: bool) -> None: +def ps( + obj: ProcessManagerContext, + query: ProcessQuery, + long_format: bool, + width: int | None, +) -> None: log = get_logger("process_manager.shell") log.debug(f"Running ps with query {query}") results = obj.get_driver("process_manager").ps(query) @@ -275,6 +312,8 @@ def ps(obj: ProcessManagerContext, query: ProcessQuery, long_format: bool) -> No return obj.print( tabulate_process_instance_list( - results, title="Processes running", long=long_format - ) + results, title="Processes running", long=long_format, width=width + ), + overflow="fold", + soft_wrap=True, ) diff --git a/src/drunc/process_manager/utils.py b/src/drunc/process_manager/utils.py index 602a2dab2..095d70df1 100644 --- a/src/drunc/process_manager/utils.py +++ b/src/drunc/process_manager/utils.py @@ -118,9 +118,9 @@ def walk(tree_id): def tabulate_process_instance_list( - pil: ProcessInstanceList, title: str, long: bool = False + pil: ProcessInstanceList, title: str, long: bool = False, width: int | None = None ): - t = Table(title=title) + t = Table(title=title, width=width) t.add_column("session") t.add_column("friendly name") t.add_column("user") diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index b80d8e33d..d007bfece 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -20,6 +20,7 @@ from drunc.controller.interface.commands import ( connect, disconnect, + echo, exclude, expert_command, include, @@ -151,10 +152,14 @@ def unified_shell( unified_shell_log.debug("Setting up the [green]unified_shell[/green] logger") # Parse the process manager argument to determine if it's a config or an address + unified_shell_log.critical( + f"Parsing the process manager argument: {process_manager}" + ) process_manager_url: ParseResult = urlparse(process_manager) internal_pm: bool = True if process_manager_url.scheme == "grpc": # i.e. if it's an address internal_pm = False + unified_shell_log.critical(f"{internal_pm=}, {process_manager_url=}") # If using a k8s process manager, validate the session name before proceeding if get_pm_type_from_name( @@ -381,6 +386,7 @@ def unified_shell( take_control, surrender_control, who_am_i, + echo, who_is_in_charge, include, exclude, diff --git a/tests/process_manager/interface/test_commands.py b/tests/process_manager/interface/test_commands.py index 6b98b1544..3571690e2 100644 --- a/tests/process_manager/interface/test_commands.py +++ b/tests/process_manager/interface/test_commands.py @@ -115,7 +115,7 @@ def __init__(self, driver=None): def get_driver(self, name): return self.driver - def print(self, msg, justify=None): + def print(self, msg, justify=None, overflow=None, soft_wrap=None): self.output.append(str(msg))