diff --git a/otsdaq/ConfigurationInterface/ConfigurationManager.cc b/otsdaq/ConfigurationInterface/ConfigurationManager.cc index 4c266409..5f63d296 100644 --- a/otsdaq/ConfigurationInterface/ConfigurationManager.cc +++ b/otsdaq/ConfigurationInterface/ConfigurationManager.cc @@ -4312,7 +4312,6 @@ void ConfigurationManager::initializeFromFhicl(const std::string& fhiclPath) TableView* view = table->getViewP(); __GEN_COUT__ << "Activated version: " << view->getVersion() << __E__; - // view->print(); // add context record --------------------- view->addRow(); @@ -4324,7 +4323,12 @@ void ConfigurationManager::initializeFromFhicl(const std::string& fhiclPath) view->setValue("1", 0, colMap[TableViewColumnInfo::COL_NAME_STATUS]); __GEN_COUT__ << "Done adding context record..." << __E__; - view->print(); + if(TTEST(1)) + { + std::stringstream ss; + view->print(ss); + __GEN_COUTT__ << "view->print()" << ss.str() << __E__; + } } // done with context record @@ -4346,7 +4350,6 @@ void ConfigurationManager::initializeFromFhicl(const std::string& fhiclPath) TableView* view = table->getViewP(); __GEN_COUT__ << "Activated version: " << view->getVersion() << __E__; - // view->print(); // add application record --------------------- view->addRow(); @@ -4360,7 +4363,12 @@ void ConfigurationManager::initializeFromFhicl(const std::string& fhiclPath) view->setValue(__ENV__("FE_SUPERVISOR_ID"), 0, colMap["Id"]); // XDAQ LID __GEN_COUT__ << "Done adding application record..." << __E__; - view->print(); + if(TTEST(1)) + { + std::stringstream ss; + view->print(ss); + __GEN_COUTT__ << "view->print()" << ss.str() << __E__; + } } // done with app record // create FE Supervisor table and Supervisor record @@ -4380,7 +4388,6 @@ void ConfigurationManager::initializeFromFhicl(const std::string& fhiclPath) TableView* view = table->getViewP(); __GEN_COUT__ << "Activated version: " << view->getVersion() << __E__; - // view->print(); // add application record --------------------- view->addRow(); @@ -4392,7 +4399,12 @@ void ConfigurationManager::initializeFromFhicl(const std::string& fhiclPath) "MacroMakerFESupervisorInterfaces", 0, colMap["LinkToFEInterfaceGroupID"]); __GEN_COUT__ << "Done adding supervisor record..." << __E__; - view->print(); + if(TTEST(1)) + { + std::stringstream ss; + view->print(ss); + __GEN_COUTT__ << "view->print()" << ss.str() << __E__; + } } // done with app record // create FE Interface table and interface record(s) @@ -4972,6 +4984,18 @@ ConfigurationManager::getOtherSubsystemActiveTableGroups( ss << userDataPath << __E__; __SS_ONLY_THROW__; } + else if(cmdResult.find("No such file or directory") != std::string::npos) + { + __GEN_SS__ << "\n\nActive tables groups file not found at user data path " + "specified for subsystem '" + << otherSubsystemUID << "': "; + if(username != "") + ss << username << "@"; + if(hostname != "") + ss << hostname << ":"; + ss << userDataPath << __E__; + __SS_ONLY_THROW__; + } auto subsystemActiveGroupMap = StringMacros::getVectorFromString( cmdResult, {'\n'} /* delimieter*/, {' ', '\t'} /* whitespace*/); diff --git a/otsdaq/FECore/FEVInterface.cc b/otsdaq/FECore/FEVInterface.cc index 05c5b607..eb80ed12 100644 --- a/otsdaq/FECore/FEVInterface.cc +++ b/otsdaq/FECore/FEVInterface.cc @@ -924,6 +924,10 @@ void FEVInterface::registerFEMacroFunction( __FE_SS_THROW__; } + __FE_COUTT__ << "Registering FE Macro Function: '" << feMacroName + << "' with requiredUserPermissions=" << requiredUserPermissions + << " allowedCallingFEs=" << allowedCallingFEs << __E__; + mapOfFEMacroFunctions_.insert(std::pair( feMacroName, frontEndMacroStruct_t(feMacroName, @@ -933,6 +937,7 @@ void FEVInterface::registerFEMacroFunction( requiredUserPermissions, allowedCallingFEs, feMacroTooltip))); + } // end registerFEMacroFunction() //============================================================================== diff --git a/otsdaq/GatewaySupervisor/GatewaySupervisor.cc b/otsdaq/GatewaySupervisor/GatewaySupervisor.cc index 3fd2c79d..c5d74323 100644 --- a/otsdaq/GatewaySupervisor/GatewaySupervisor.cc +++ b/otsdaq/GatewaySupervisor/GatewaySupervisor.cc @@ -33,6 +33,7 @@ #include // for mkdir #include // for std::isspace #include // std::chrono::seconds +#include // for snprintf #include #include // std::this_thread::sleep_for @@ -439,12 +440,24 @@ try std::pair> availableDiskSpaceKB_map; + // Single shared "last alert" timestamp per disk per context — any window firing + // resets it, so a flood of correlated alerts is suppressed. Faster-window + // thresholds still get more chances because they use shorter silence periods. std::map - rateToLogDiskLastHourAlert_map, rateToLogDiskLastHalfHourAlert_map, - rateToLogDiskLastQuarterHourAlert_map, rateToLogDiskNowAlert_map; - std::map - rateToDataDiskLastHourAlert_map, rateToDataDiskLastHalfHourAlert_map, - rateToDataDiskLastQuarterHourAlert_map, rateToDataDiskNowAlert_map; + rateToLogDiskAlert_map, rateToDataDiskAlert_map; + // Per-disk "first time the trip condition was observed" — used to require the + // condition to be sustained for a few seconds before firing. Cleared on any + // pass that does not see a trip, so brief transients reset the clock. + std::map + firstTripLogObserved_map, firstTripDataObserved_map; + // Time-suppression for the hard-low "available disk space low" alarm so a + // disk hovering near MIN does not spam every status pass. + std::map hardLowLogAlert_map, + hardLowDataAlert_map; + // Workloop start time — used to skip the rate alarms during a warmup window + // while the historical-sample deque is dominated by the seed value (which is + // usually captured during the noisy startup burst). + const time_t workloopStartTime = time(0); int64_t availableLogSpaceKB_MIN = 0, availableDataSpaceKB_MIN = 0; @@ -467,6 +480,30 @@ try availableDataSpaceKB_MIN = 1000000; //1 GB default in KBs; } __COUTV__(availableDataSpaceKB_MIN); + + auto formatRateKBps = [](float rateKBps) -> std::string { + float absRate = rateKBps < 0 ? -rateKBps : rateKBps; + float value; + const char* unit; + if(absRate < 1024.f) + { + value = rateKBps; + unit = " KB/s"; + } + else if(absRate < 1024.f * 1024.f) + { + value = rateKBps / 1024.f; + unit = " MB/s"; + } + else + { + value = rateKBps / (1024.f * 1024.f); + unit = " GB/s"; + } + char buf[32]; + snprintf(buf, sizeof(buf), "%.1f", value); + return std::string(buf) + unit; + }; const std::string otsdaq_log_dir = __ENV__("OTSDAQ_LOG_DIR"); const std::string otsdaq_data_dir = __ENV__("OTSDAQ_DATA"); @@ -2356,18 +2393,27 @@ try //alert and record available disk space auto spaceIt = availableDiskSpaceKB_map.find(appInfo.getContextName()); + const time_t hardLowSilenceSecs = 5 * 60; //rate-limit hard-low alarms + const time_t nowForHardLow = time(0); if(availableLogSpaceKB) //if non-zero, then assume is latest valid value { if((spaceIt == availableDiskSpaceKB_map.end() || //and new value spaceIt->second.first > availableLogSpaceKB) && availableLogSpaceKB < availableLogSpaceKB_MIN) //and below threshold - { //then alert users! - theSupervisor->addSystemMessage( - "*", - "Available log disk space low (at host='" + - appInfo.getHostname() + "' and path='" + otsdaq_log_dir + - "/'): " + std::to_string(availableLogSpaceKB / 1024) + - " MB remaining."); + { + //rate-limit: do not fire if we already alerted recently for this context + auto lastIt = hardLowLogAlert_map.find(appInfo.getContextName()); + if(lastIt == hardLowLogAlert_map.end() || + nowForHardLow - lastIt->second > hardLowSilenceSecs) + { + theSupervisor->addSystemMessage( + "*", + "LOG disk space low (at host='" + appInfo.getHostname() + + "' and path='" + otsdaq_log_dir + + "/'): " + std::to_string(availableLogSpaceKB / 1024) + + " MB remaining."); + hardLowLogAlert_map[appInfo.getContextName()] = nowForHardLow; + } } availableDiskSpaceKB_map[appInfo.getContextName()].first = availableLogSpaceKB; @@ -2381,13 +2427,19 @@ try if((spaceIt == availableDiskSpaceKB_map.end() || //and new value spaceIt->second.second > availableDataSpaceKB) && availableDataSpaceKB < availableDataSpaceKB_MIN) //and below threshold - { //then alert users! - theSupervisor->addSystemMessage( - "*", - "Available data disk space low (at host='" + - appInfo.getHostname() + "' and path='" + otsdaq_data_dir + - "/'): " + std::to_string(availableDataSpaceKB / 1024) + - " MB remaining."); + { + auto lastIt = hardLowDataAlert_map.find(appInfo.getContextName()); + if(lastIt == hardLowDataAlert_map.end() || + nowForHardLow - lastIt->second > hardLowSilenceSecs) + { + theSupervisor->addSystemMessage( + "*", + "DATA disk space low (at host='" + appInfo.getHostname() + + "' and path='" + otsdaq_data_dir + + "/'): " + std::to_string(availableDataSpaceKB / 1024) + + " MB remaining."); + hardLowDataAlert_map[appInfo.getContextName()] = nowForHardLow; + } } availableDiskSpaceKB_map[appInfo.getContextName()].second = availableDataSpaceKB; @@ -2410,203 +2462,148 @@ try availableDataSpaceKB); //if no recent alert, check if rate to disk is too high ------------ - auto rateIt = rateToLogDiskLastHourAlert_map.find(appInfo.getContextName()); - time_t now = time(0); - if(rateIt == rateToLogDiskLastHourAlert_map.end() || - now - rateIt->second > 30 * 60) //alert at most every 30 minutes - { - float logUsageRateLastHourKBps = - theSupervisor->allSupervisorInfo_.getAllSupervisorInfo() - .at(appInfo.getId()) - .getLogUsageRateLastHourKBps(); - - if(availableLogSpaceKB && - availableLogSpaceKB - logUsageRateLastHourKBps * 3600 < - availableLogSpaceKB_MIN) - { - theSupervisor->addSystemMessage( - "*", - "Log disk space low ALARM (at host='" + appInfo.getHostname() + - "' and path='" + otsdaq_log_dir + - "/'): " + std::to_string(availableLogSpaceKB / 1024) + - " MB remaining and log usage rate over last hour is " + - std::to_string(logUsageRateLastHourKBps) + " KB/s."); - rateToLogDiskLastHourAlert_map[appInfo.getContextName()] = - now; //record time of this alert - } - } //end last hour log rate alert - rateIt = rateToLogDiskLastHalfHourAlert_map.find(appInfo.getContextName()); - if(rateIt == rateToLogDiskLastHalfHourAlert_map.end() || - now - rateIt->second > 15 * 60) //alert at most every 15 minutes - { - float logUsageRateLastHalfHourKBps = - theSupervisor->allSupervisorInfo_.getAllSupervisorInfo() - .at(appInfo.getId()) - .getLogUsageRateLastHalfHourKBps(); - - if(availableLogSpaceKB && - availableLogSpaceKB - logUsageRateLastHalfHourKBps * 1800 < - availableLogSpaceKB_MIN) - { - theSupervisor->addSystemMessage( - "*", - "Log disk space low ALARM (at host='" + appInfo.getHostname() + - "' and path='" + otsdaq_log_dir + - "/'): " + std::to_string(availableLogSpaceKB / 1024) + - " MB remaining and log usage rate over last half-hour is " + - std::to_string(logUsageRateLastHalfHourKBps) + " KB/s."); - rateToLogDiskLastHalfHourAlert_map[appInfo.getContextName()] = - now; //record time of this alert - } - } //end last half-hour log rate alert - rateIt = rateToLogDiskLastQuarterHourAlert_map.find(appInfo.getContextName()); - if(rateIt == rateToLogDiskLastQuarterHourAlert_map.end() || - now - rateIt->second > 15 * 30) //alert at most every 7.5 minutes - { - float logUsageRateLastQuarterHourKBps = - theSupervisor->allSupervisorInfo_.getAllSupervisorInfo() - .at(appInfo.getId()) - .getLogUsageRateLastQuarterHourKBps(); - - if(availableLogSpaceKB && - availableLogSpaceKB - logUsageRateLastQuarterHourKBps * 900 < - availableLogSpaceKB_MIN) - { - theSupervisor->addSystemMessage( - "*", - "Log disk space low ALARM (at host='" + appInfo.getHostname() + - "' and path='" + otsdaq_log_dir + - "/'): " + std::to_string(availableLogSpaceKB / 1024) + - " MB remaining and log usage rate over last quarter-hour " - "is " + - std::to_string(logUsageRateLastQuarterHourKBps) + " KB/s."); - rateToLogDiskLastQuarterHourAlert_map[appInfo.getContextName()] = - now; //record time of this alert - } - } //end last quarter-hour log rate alert - rateIt = rateToLogDiskNowAlert_map.find(appInfo.getContextName()); - if(rateIt == rateToLogDiskNowAlert_map.end() || - now - rateIt->second > 15 * 15) //alert at most every 3.75 minutes - { - float logUsageRateNowKBps = - theSupervisor->allSupervisorInfo_.getAllSupervisorInfo() - .at(appInfo.getId()) - .getLogUsageRateNowKBps(); - - if(availableLogSpaceKB && - availableLogSpaceKB - logUsageRateNowKBps * 450 < - availableLogSpaceKB_MIN) - { - theSupervisor->addSystemMessage( - "*", - "Log disk space low ALARM (at host='" + appInfo.getHostname() + - "' and path='" + otsdaq_log_dir + - "/'): " + std::to_string(availableLogSpaceKB / 1024) + - " MB remaining and log usage rate over last few minutes is " + - std::to_string(logUsageRateNowKBps) + " KB/s."); - rateToLogDiskNowAlert_map[appInfo.getContextName()] = - now; //record time of this alert - } - } //end last few minutes log rate alert - rateIt = rateToDataDiskLastHourAlert_map.find(appInfo.getContextName()); - if(rateIt == rateToDataDiskLastHourAlert_map.end() || - now - rateIt->second > 30 * 60) //alert at most every 30 minutes - { - float dataUsageRateLastHourKBps = - theSupervisor->allSupervisorInfo_.getAllSupervisorInfo() - .at(appInfo.getId()) - .getDataUsageRateLastHourKBps(); - - if(availableDataSpaceKB && - availableDataSpaceKB - dataUsageRateLastHourKBps * 3600 < - availableDataSpaceKB_MIN) + // Windows go from longest-and-quietest to shortest-and-loudest. A + // single shared timestamp per disk means the first window to fire + // silences all the others in this pass — so flooding all 4 alerts + // at once is impossible. Shorter windows still get more chances + // because their silence periods (below) are shorter. + // + // Three protections against false alarms: + // (1) WARMUP: skip all rate checks for the first 5 min after the + // workloop starts, so the seed sample (captured during the noisy + // startup burst) has time to age out of the historical deque. + // (2) MIN LOOKBACK: each window requires its historical sample to + // actually be at least N seconds old before its rate is trusted + // (otherwise a very young sample produces a misleading rate + // that gets multiplied by a much larger projection window). + // (3) SUSTAINED TRIP: a trip condition must be observed continuously + // for ~30 s before we fire, so a brief transient (a one-off log + // flush, file rotation) is filtered out. + time_t now = time(0); + const time_t warmupSecs = 5 * 60; //5-minute startup grace + const time_t sustainSecs = 30; //must trip for this long + const size_t slotForWindow[4] = {9, 7, 5, 1}; + const time_t minLookbackSecs[4] = {300, 300, 300, 60}; //5,5,5,1 min + const int windowSecs[4] = {3600, 1800, 900, 450}; + const int silenceSecs[4] = { + 30 * 60, 15 * 60, 15 * 30, 15 * 15}; //30, 15, 7.5, 3.75 minutes + const char* const windowLabels[4] = { + "last hour", "last half-hour", "last quarter-hour", "last few minutes"}; + + const bool inWarmup = (now - workloopStartTime) < warmupSecs; + + const auto& info = + theSupervisor->allSupervisorInfo_.getAllSupervisorInfo().at( + appInfo.getId()); + const float logRates[4] = {info.getLogUsageRateLastHourKBps(), + info.getLogUsageRateLastHalfHourKBps(), + info.getLogUsageRateLastQuarterHourKBps(), + info.getLogUsageRateNowKBps()}; + const float dataRates[4] = {info.getDataUsageRateLastHourKBps(), + info.getDataUsageRateLastHalfHourKBps(), + info.getDataUsageRateLastQuarterHourKBps(), + info.getDataUsageRateNowKBps()}; + const time_t logSpans[4] = { + info.getLogSpaceSampleAgeSeconds(slotForWindow[0]), + info.getLogSpaceSampleAgeSeconds(slotForWindow[1]), + info.getLogSpaceSampleAgeSeconds(slotForWindow[2]), + info.getLogSpaceSampleAgeSeconds(slotForWindow[3])}; + const time_t dataSpans[4] = { + info.getDataSpaceSampleAgeSeconds(slotForWindow[0]), + info.getDataSpaceSampleAgeSeconds(slotForWindow[1]), + info.getDataSpaceSampleAgeSeconds(slotForWindow[2]), + info.getDataSpaceSampleAgeSeconds(slotForWindow[3])}; + + auto checkDiskRateAlerts = [&](const std::string& diskTag, //"LOG" or "DATA" + const std::string& diskWord, //"log" or "data" + const std::string& diskPath, + int64_t availableSpaceKB, + int64_t availableSpaceKB_MIN, + const float(&rates)[4], + const time_t(&spans)[4], + std::map& alertMap, + std::map& firstTripMap) { + const std::string& ctx = appInfo.getContextName(); + if(inWarmup || !availableSpaceKB) { - theSupervisor->addSystemMessage( - "*", - "Data disk space low ALARM (at host='" + appInfo.getHostname() + - "' and path='" + otsdaq_data_dir + - "/'): " + std::to_string(availableDataSpaceKB / 1024) + - " MB remaining and data usage rate over last hour is " + - std::to_string(dataUsageRateLastHourKBps) + " KB/s."); - rateToDataDiskLastHourAlert_map[appInfo.getContextName()] = - now; //record time of this alert + firstTripMap.erase(ctx); + return; } - } //end last hour data rate alert - rateIt = rateToDataDiskLastHalfHourAlert_map.find(appInfo.getContextName()); - if(rateIt == rateToDataDiskLastHalfHourAlert_map.end() || - now - rateIt->second > 15 * 60) //alert at most every 15 minutes - { - float dataUsageRateLastHalfHourKBps = - theSupervisor->allSupervisorInfo_.getAllSupervisorInfo() - .at(appInfo.getId()) - .getDataUsageRateLastHalfHourKBps(); - if(availableDataSpaceKB && - availableDataSpaceKB - dataUsageRateLastHalfHourKBps * 1800 < - availableDataSpaceKB_MIN) + + //find the first (longest) window that meets criteria and is in a trip + int trippedW = -1; + for(int w = 0; w < 4; ++w) { - theSupervisor->addSystemMessage( - "*", - "Data disk space low ALARM (at host='" + appInfo.getHostname() + - "' and path='" + otsdaq_data_dir + - "/'): " + std::to_string(availableDataSpaceKB / 1024) + - " MB remaining and data usage rate over last half-hour is " + - std::to_string(dataUsageRateLastHalfHourKBps) + " KB/s."); - rateToDataDiskLastHalfHourAlert_map[appInfo.getContextName()] = - now; //record time of this alert + auto it = alertMap.find(ctx); + if(it != alertMap.end() && now - it->second <= silenceSecs[w]) + continue; //still inside this window's silence period + if(spans[w] < minLookbackSecs[w]) + continue; //not enough real lookback to trust this rate + if(availableSpaceKB - rates[w] * windowSecs[w] >= + availableSpaceKB_MIN) + continue; //projection stays above MIN — no trip + trippedW = w; + break; } - } //end last half-hour data rate alert - rateIt = - rateToDataDiskLastQuarterHourAlert_map.find(appInfo.getContextName()); - if(rateIt == rateToDataDiskLastQuarterHourAlert_map.end() || - now - rateIt->second > 15 * 30) //alert at most every 7.5 minutes - { - float dataUsageRateLastQuarterHourKBps = - theSupervisor->allSupervisorInfo_.getAllSupervisorInfo() - .at(appInfo.getId()) - .getDataUsageRateLastQuarterHourKBps(); - - if(availableDataSpaceKB && - availableDataSpaceKB - dataUsageRateLastQuarterHourKBps * 900 < - availableDataSpaceKB_MIN) + + if(trippedW < 0) { - theSupervisor->addSystemMessage( - "*", - "Data disk space low ALARM (at host='" + appInfo.getHostname() + - "' and path='" + otsdaq_data_dir + - "/'): " + std::to_string(availableDataSpaceKB / 1024) + - " MB remaining and data usage rate over last quarter-hour " - "is " + - std::to_string(dataUsageRateLastQuarterHourKBps) + " KB/s."); - rateToDataDiskLastQuarterHourAlert_map[appInfo.getContextName()] = - now; //record time of this alert + //no trip this pass — reset the sustained-trip clock + firstTripMap.erase(ctx); + return; } - } //end last quarter-hour data rate alert - rateIt = rateToDataDiskNowAlert_map.find(appInfo.getContextName()); - if(rateIt == rateToDataDiskNowAlert_map.end() || - now - rateIt->second > 15 * 15) //alert at most every 3.75 minutes - { - float dataUsageRateNowKBps = - theSupervisor->allSupervisorInfo_.getAllSupervisorInfo() - .at(appInfo.getId()) - .getDataUsageRateNowKBps(); - - if(availableDataSpaceKB && - availableDataSpaceKB - dataUsageRateNowKBps * 450 < - availableDataSpaceKB_MIN) + + //trip seen — require it to persist for sustainSecs before firing + auto firstIt = firstTripMap.find(ctx); + if(firstIt == firstTripMap.end()) { - theSupervisor->addSystemMessage( - "*", - "Data disk space low ALARM (at host='" + appInfo.getHostname() + - "' and path='" + otsdaq_data_dir + - "/'): " + std::to_string(availableDataSpaceKB / 1024) + - " MB remaining and data usage rate over last few minutes " - "is " + - std::to_string(dataUsageRateNowKBps) + " KB/s."); - rateToDataDiskNowAlert_map[appInfo.getContextName()] = - now; //record time of this alert + firstTripMap[ctx] = now; + return; } - } //end last few minutes data rate alert - } // end of app loop + if(now - firstIt->second < sustainSecs) + return; //not sustained long enough yet + + theSupervisor->addSystemMessage( + "*", + diskTag + " disk space low ALARM (at host='" + appInfo.getHostname() + + "' and path='" + diskPath + "/'): " + + std::to_string(availableSpaceKB / 1024) + " MB remaining and " + + diskWord + " usage rate over " + windowLabels[trippedW] + " is " + + formatRateKBps(rates[trippedW]) + "."); + alertMap[ctx] = now; + firstTripMap.erase(ctx); //re-arm: next trip must sustain again + }; + + checkDiskRateAlerts("LOG", + "log", + otsdaq_log_dir, + availableLogSpaceKB, + availableLogSpaceKB_MIN, + logRates, + logSpans, + rateToLogDiskAlert_map, + firstTripLogObserved_map); + + //if data disk looks identical to log disk (same free space and same + //rates across all windows), treat them as the same physical disk and + //skip the data alerts to avoid a duplicate noisy alarm. + bool dataIsSameAsLog = + (availableDataSpaceKB == availableLogSpaceKB) && + (dataRates[0] == logRates[0]) && (dataRates[1] == logRates[1]) && + (dataRates[2] == logRates[2]) && (dataRates[3] == logRates[3]); + if(!dataIsSameAsLog) + checkDiskRateAlerts("DATA", + "data", + otsdaq_data_dir, + availableDataSpaceKB, + availableDataSpaceKB_MIN, + dataRates, + dataSpans, + rateToDataDiskAlert_map, + firstTripDataObserved_map); + else + firstTripDataObserved_map.erase(appInfo.getContextName()); + } // end of app loop if(oneStatusReqHasFailed) { @@ -2806,15 +2803,18 @@ void GatewaySupervisor::SendRemoteGatewayCommand( Socket gatewayRemoteSocket(parsedFields[1], atoi(parsedFields[2].c_str())); - std::string commandResponseString = remoteGatewaySocket->sendAndReceive( - gatewayRemoteSocket, - command, - 10 /*timeoutSeconds*/, - 0, - false, - 200000 /*interPacketTimeoutUSeconds=200ms*/); + // Use retransmission-mode sendAndReceiveAll for reliable multi-packet + // config dump transfer. This replaces the old sendAndReceive + manual + // receive loop, providing automatic packet ordering, dropped packet + // detection, and retransmit requests. + std::string commandResponseString = + remoteGatewaySocket->sendAndReceiveAll(gatewayRemoteSocket, + command, + 10 /*timeoutSeconds*/, + 10 /*retransmitMaxRetries*/, + false /*verbose*/); __COUT__ << "Response from subsystem '" << remoteGatewayApp.appInfo.name - << "' received: " << commandResponseString << __E__; + << "' received: " << commandResponseString.size() << " bytes" << __E__; size_t donePos = commandResponseString.find("Done"); if(donePos != 0) //then error @@ -2869,58 +2869,26 @@ void GatewaySupervisor::SendRemoteGatewayCommand( if(commandResponseString.size() > strlen("Done") + 1) { - //make sure we received everything - const size_t MAX_RETRIES = 10; - size_t tryCnt = 0; - while(++tryCnt < 20 && - commandResponseString.size() > 10 && //must end with 'END---' - (commandResponseString[commandResponseString.size() - 1] != '-' || - commandResponseString[commandResponseString.size() - 2] != '-' || - commandResponseString[commandResponseString.size() - 3] != '-' || - commandResponseString[commandResponseString.size() - 4] != 'D' || - commandResponseString[commandResponseString.size() - 5] != 'N' || - commandResponseString[commandResponseString.size() - 6] != 'E')) - { - __COUT__ << "There must be more, try = " << tryCnt << __E__; - std::string more; - if(remoteGatewaySocket->receive(more, 1 /*timeoutSeconds*/) == - 0 /* success */) - { - commandResponseString += more; - tryCnt = 0; //reset since we received data - } - else if(tryCnt >= MAX_RETRIES) - { - __SS__ << "Timeout after " << MAX_RETRIES - << " attempts waiting for more data from Remote Gateway '" - << remoteGatewayApp.appInfo.name - << "' (URL=" << remoteGatewayApp.appInfo.url << "). "; - if(commandResponseString.empty()) - { - ss << "No data was received at all."; - } - else - { - const size_t maxPrint = 500; - ss << "Received " << commandResponseString.size() - << " bytes so far. "; - if(commandResponseString.size() <= maxPrint) - { - ss << "Full received text: [" << commandResponseString << "]"; - } - else - { - ss << "First " << maxPrint << " chars: [" - << commandResponseString.substr(0, maxPrint) - << "] ... Last " << maxPrint << " chars: [" - << commandResponseString.substr( - commandResponseString.size() - maxPrint) - << "]"; - } - } - ss << __E__; - __SS_THROW__; - } + // With retransmission mode, the full response is already assembled + // by sendAndReceiveAll(). Verify the END--- marker is present. + if(commandResponseString.size() > 10 && + !commandResponseString.ends_with("END---")) + { + __SS__ << "Config dump response from Remote Gateway '" + << remoteGatewayApp.appInfo.name + << "' is missing END--- termination marker. " + << "Received " << commandResponseString.size() << " bytes." + << __E__; + const size_t maxPrint = 500; + if(commandResponseString.size() <= maxPrint) + ss << " Full text: [" << commandResponseString << "]"; + else + ss << " Last " << maxPrint << " chars: [" + << commandResponseString.substr(commandResponseString.size() - + maxPrint) + << "]"; + ss << __E__; + __SS_THROW__; } //assume have config dump response! @@ -4459,7 +4427,10 @@ void GatewaySupervisor::StateChangerWorkLoop(GatewaySupervisor* theSupervisor) : "" //append extra done content, if any ), true /* verbose */, - extraDoneContent.size() ? 65500 : 1500 /*maxChunkSize*/); + extraDoneContent.size() ? 65500 : 1500 /*maxChunkSize*/, + 0 /*interPacketGapUSeconds*/, + extraDoneContent.size() > + 0 /*enableRetransmission - use retransmit protocol for large config dump transfers*/); } } catch(...) diff --git a/otsdaq/NetworkUtilities/TransceiverSocket.cc b/otsdaq/NetworkUtilities/TransceiverSocket.cc index 607d2720..aa38bf85 100644 --- a/otsdaq/NetworkUtilities/TransceiverSocket.cc +++ b/otsdaq/NetworkUtilities/TransceiverSocket.cc @@ -2,11 +2,16 @@ #include "otsdaq/Macros/CoutMacros.h" #include "otsdaq/MessageFacility/MessageFacility.h" +#include #include #include +#include #include +#include #include +#include #include +#include using namespace ots; @@ -28,55 +33,249 @@ TransceiverSocket::~TransceiverSocket(void) {} //============================================================================== /// returns 0 on success +/// When enableRetransmission is true, uses sendAll() to send the buffer with +/// retransmission headers, then waits for retransmit requests from the receiver. int TransceiverSocket::acknowledge(const std::string& buffer, bool verbose /* = false */, size_t maxChunkSize /* = 1500 */, - unsigned int interPacketGapUSeconds /* = 0 */) + unsigned int interPacketGapUSeconds /* = 0 */, + bool enableRetransmission /* = false */) { - // lockout other senders for the remainder of the scope - std::lock_guard lock(sendMutex_); - if(verbose) __COUTT__ << "Acknowledging on Socket Descriptor #: " << socketNumber_ << " from-port: " << ntohs(socketAddress_.sin_port) << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port) + << " retransmission: " << (enableRetransmission ? "ON" : "OFF") << std::endl; + if(!enableRetransmission) + { + //==================================================================== + // Original non-retransmission mode (unchanged behavior) + //==================================================================== + // lockout other senders for the remainder of this scope + std::lock_guard lock(sendMutex_); + + const size_t MAX_SEND_SIZE = + maxChunkSize > 65500u ? static_cast(65500u) : maxChunkSize; + size_t offset = 0; + int sendToSize = 1; + int sizeInBytes = 1; + + while(offset < buffer.size() && sendToSize > 0) + { + auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE + ? MAX_SEND_SIZE + : sizeInBytes * (buffer.size() - offset); + if(verbose) + __COUTTV__(thisSize); + sendToSize = sendto(socketNumber_, + &buffer[0] + offset, + thisSize, + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + offset += sendToSize / sizeInBytes; + if(interPacketGapUSeconds > 0 && offset < buffer.size() && sendToSize > 0) + usleep(interPacketGapUSeconds); + } + + if(sendToSize <= 0) + { + __SS__ << "Error writing buffer from port " + << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": " + << strerror(errno) << std::endl; + __SS_THROW__; + } + return 0; + } + + //==================================================================== + // Retransmission mode: delegate entirely to sendAll() which handles + // packet building, initial send, and retransmit request handling. + //==================================================================== + return sendAll(buffer, verbose, maxChunkSize, interPacketGapUSeconds); +} //end acknowledge() + +//============================================================================== +/// sendAll() sends a buffer to the last receive address (fromAddress_) using the +/// retransmission protocol. Fully self-contained: +/// 1. Builds all packets with 8-byte retransmission headers +/// 2. Sends all packets +/// 3. Waits for retransmit requests from the receiver +/// 4. Resends requested packets +/// 5. Returns when receiver sends "done" or timeout expires +/// +/// Returns 0 on success. +int TransceiverSocket::sendAll(const std::string& buffer, + bool verbose /* = false */, + size_t maxChunkSize /* = 65500 */, + unsigned int interPacketGapUSeconds /* = 0 */) +{ + if(verbose) + __COUT__ << "sendAll: retransmission-mode send on Socket Descriptor #: " + << socketNumber_ << " from-port: " << ntohs(socketAddress_.sin_port) + << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port) + << " buffer size: " << buffer.size() << __E__; + const size_t MAX_SEND_SIZE = maxChunkSize > 65500u ? static_cast(65500u) : maxChunkSize; - size_t offset = 0; - int sendToSize = 1; - int sizeInBytes = 1; + // The payload per packet is reduced by the header size + const size_t payloadMax = MAX_SEND_SIZE > RETRANSMIT_HEADER_SIZE + ? MAX_SEND_SIZE - RETRANSMIT_HEADER_SIZE + : 1; + + // Calculate total number of packets + uint16_t totalPackets = + static_cast((buffer.size() + payloadMax - 1) / payloadMax); + if(totalPackets == 0) + totalPackets = 1; // send at least one packet even for empty buffer + + if(verbose) + __COUT__ << "sendAll: sending " << totalPackets << " packets for " + << buffer.size() << " bytes, payloadMax=" << payloadMax << __E__; - while(offset < buffer.size() && sendToSize > 0) + // Build and cache all packets (header + payload) for retransmit use + std::vector packets(totalPackets); { - auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE - ? MAX_SEND_SIZE - : sizeInBytes * (buffer.size() - offset); - if(verbose) - __COUTTV__(thisSize); - sendToSize = sendto(socketNumber_, - &buffer[0] + offset, - thisSize, - 0, - (struct sockaddr*)&(ReceiverSocket::fromAddress_), - sizeof(sockaddr_in)); - offset += sendToSize / sizeInBytes; - if(interPacketGapUSeconds > 0 && offset < buffer.size() && sendToSize > 0) - usleep(interPacketGapUSeconds); + size_t offset = 0; + for(uint16_t pi = 0; pi < totalPackets; ++pi) + { + size_t payloadSize = (buffer.size() - offset) > payloadMax + ? payloadMax + : (buffer.size() - offset); + + char header[RETRANSMIT_HEADER_SIZE]; + uint16_t netMagic = htons(RETRANSMIT_MAGIC); + uint16_t netIndex = htons(pi); + uint16_t netTotal = htons(totalPackets); + uint16_t netPaySize = htons(static_cast(payloadSize)); + std::memcpy(header + 0, &netMagic, 2); + std::memcpy(header + 2, &netIndex, 2); + std::memcpy(header + 4, &netTotal, 2); + std::memcpy(header + 6, &netPaySize, 2); + + packets[pi].assign(header, RETRANSMIT_HEADER_SIZE); + packets[pi].append(buffer, offset, payloadSize); + offset += payloadSize; + } } - if(sendToSize <= 0) + // Send all packets initially (lock sendMutex_ for the burst) { - __SS__ << "Error writing buffer from port " - << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": " - << strerror(errno) << std::endl; - __SS_THROW__; //return -1; + std::lock_guard lock(sendMutex_); + for(uint16_t pi = 0; pi < totalPackets; ++pi) + { + int sendToSize = sendto(socketNumber_, + packets[pi].data(), + packets[pi].size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + if(sendToSize <= 0) + { + __SS__ << "sendAll: error writing packet " << pi << "/" << totalPackets + << " from port " << ntohs(socketAddress_.sin_port) << ": " + << strerror(errno) << std::endl; + __SS_THROW__; + } + if(verbose) + __COUTT__ << "sendAll: sent packet " << pi << "/" << totalPackets + << " size=" << packets[pi].size() << std::endl; + + if(interPacketGapUSeconds > 0 && pi + 1 < totalPackets) + usleep(interPacketGapUSeconds); + } + } + + // Wait for retransmit requests from receiver. + // Retransmit request format: magic(2 bytes) + list of uint16 missing indices + // Done signal format: magic(2 bytes) + 0xFFFF(2 bytes) + const unsigned int retransmitTimeoutSeconds = 5; + const unsigned int maxRetransmitRounds = 20; + + for(unsigned int round = 0; round < maxRetransmitRounds; ++round) + { + std::string retransmitRequest; + int rc = receive(retransmitRequest, + retransmitTimeoutSeconds, + 0 /*timeoutUSeconds*/, + false /*verbose*/); + if(rc < 0) + { + // Timeout - assume receiver got everything (or gave up) + if(verbose) + __COUT__ << "sendAll: no retransmit request after " + << retransmitTimeoutSeconds + << "s timeout, assuming transfer complete." << __E__; + break; + } + + if(retransmitRequest.size() < 4) + continue; + + uint16_t reqMagic; + std::memcpy(&reqMagic, retransmitRequest.data(), 2); + reqMagic = ntohs(reqMagic); + if(reqMagic != RETRANSMIT_MAGIC) + continue; + + // Check for "done" signal (magic + 0xFFFF) + uint16_t firstVal; + std::memcpy(&firstVal, retransmitRequest.data() + 2, 2); + firstVal = ntohs(firstVal); + if(firstVal == 0xFFFF) + { + if(verbose) + __COUT__ << "sendAll: received 'all done' from receiver." << __E__; + break; + } + + // Parse list of missing packet indices and resend them + size_t numIndices = (retransmitRequest.size() - 2) / 2; + if(verbose) + __COUT__ << "sendAll: retransmit request for " << numIndices + << " packets (round " << round << ")." << __E__; + + // Lock sendMutex_ for the resend burst + std::lock_guard lock(sendMutex_); + for(size_t i = 0; i < numIndices; ++i) + { + uint16_t missingIdx; + std::memcpy(&missingIdx, retransmitRequest.data() + 2 + i * 2, 2); + missingIdx = ntohs(missingIdx); + + if(missingIdx < totalPackets) + { + int sendToSize = sendto(socketNumber_, + packets[missingIdx].data(), + packets[missingIdx].size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + if(sendToSize <= 0) + { + __SS__ << "sendAll: error resending packet " << missingIdx << ": " + << strerror(errno) << std::endl; + __SS_THROW__; + } + if(verbose) + __COUTT__ << "sendAll: resent packet " << missingIdx << std::endl; + + if(interPacketGapUSeconds > 0) + usleep(interPacketGapUSeconds); + } + else + { + __COUT_WARN__ << "sendAll: retransmit request for invalid packet index " + << missingIdx << " (total=" << totalPackets << ")" << __E__; + } + } } return 0; -} //end acknowledge() +} //end sendAll() //============================================================================== /// Receives one packet with the specified timeout, then attempts to receive @@ -155,3 +354,354 @@ std::string TransceiverSocket::sendAndReceive( return receiveBuffer; } //end sendAndReceive() + +//============================================================================== +/// receiveAll() receives a multi-packet retransmission-mode response. +/// Packets are expected to have an 8-byte retransmission header: +/// [0-1] magic 0xD2C4 (network byte order) +/// [2-3] packet index (network byte order uint16) +/// [4-5] total packets (network byte order uint16) +/// [6-7] payload size (network byte order uint16) +/// +/// After all initial packets are received (or timeout), missing packets are +/// identified and a retransmit request is sent back to the sender containing +/// the magic marker followed by the list of missing packet indices. This +/// repeats up to retransmitMaxRetries times. When all packets are received, +/// a "done" signal (magic + 0xFFFF) is sent to the sender. +/// +/// Returns 0 on success (assembled buffer placed in 'buffer'), -1 on failure. +int TransceiverSocket::receiveAll(std::string& buffer, + unsigned int timeoutSeconds /* = 5 */, + unsigned int retransmitMaxRetries /* = 10 */, + bool verbose /* = false */) +{ + using clock = std::chrono::steady_clock; + auto start = clock::now(); + + // Map of packet index -> payload data + std::map receivedPackets; + uint16_t totalPackets = 0; + bool totalKnown = false; + + if(verbose) + __COUT__ << "receiveAll: waiting for retransmission-mode packets, timeout=" + << timeoutSeconds << "s" << __E__; + + // Phase 1: Receive all initial packets until timeout + // Use a per-packet timeout that is shorter than the overall timeout, + // so we can detect "no more packets arriving" vs "still waiting for first" + const unsigned int interPacketTimeoutUSeconds = 100000; // 100ms between packets + bool firstPacketReceived = false; + + while(true) + { + std::string rawPacket; + int rc = receive(rawPacket, + firstPacketReceived ? 0 : timeoutSeconds, + firstPacketReceived ? interPacketTimeoutUSeconds : 0, + false /*verbose*/); + + if(rc < 0) + { + if(!firstPacketReceived) + { + // Never received any packet at all + if(verbose) + __COUT__ << "receiveAll: timeout waiting for first packet after " + << timeoutSeconds << "s" << __E__; + return -1; + } + // Timeout between packets - move to retransmit phase + break; + } + + // Check for retransmission header + if(rawPacket.size() < RETRANSMIT_HEADER_SIZE) + { + // Too small to be a retransmission packet - might be a non-retransmit + // response; just return it as-is + if(!firstPacketReceived) + { + buffer = rawPacket; + return 0; + } + // Skip malformed packet during multi-packet receive + if(verbose) + __COUT_WARN__ << "receiveAll: skipping undersized packet (" + << rawPacket.size() << " bytes)" << __E__; + continue; + } + + // Parse header + uint16_t magic, packetIndex, pktTotal, payloadSize; + std::memcpy(&magic, rawPacket.data() + 0, 2); + std::memcpy(&packetIndex, rawPacket.data() + 2, 2); + std::memcpy(&pktTotal, rawPacket.data() + 4, 2); + std::memcpy(&payloadSize, rawPacket.data() + 6, 2); + magic = ntohs(magic); + packetIndex = ntohs(packetIndex); + pktTotal = ntohs(pktTotal); + payloadSize = ntohs(payloadSize); + + if(magic != RETRANSMIT_MAGIC) + { + // Not a retransmission packet - if first packet, return as-is + if(!firstPacketReceived) + { + buffer = rawPacket; + return 0; + } + if(verbose) + __COUT_WARN__ << "receiveAll: skipping packet with bad magic 0x" + << std::hex << magic << std::dec << __E__; + continue; + } + + firstPacketReceived = true; + totalPackets = pktTotal; + totalKnown = true; + + // Extract payload (everything after the 8-byte header, limited by payloadSize) + size_t actualPayload = rawPacket.size() - RETRANSMIT_HEADER_SIZE; + if(actualPayload > payloadSize) + actualPayload = payloadSize; + + receivedPackets[packetIndex] = + rawPacket.substr(RETRANSMIT_HEADER_SIZE, actualPayload); + + if(verbose) + __COUTT__ << "receiveAll: received packet " << packetIndex << "/" + << totalPackets << " payload=" << actualPayload + << " total_received=" << receivedPackets.size() << std::endl; + + // Check if we have all packets + if(totalKnown && receivedPackets.size() >= static_cast(totalPackets)) + break; + + // Check overall timeout + auto elapsed = + std::chrono::duration_cast(clock::now() - start); + if(elapsed.count() >= + static_cast(timeoutSeconds * (retransmitMaxRetries + 1))) + { + if(verbose) + __COUT_WARN__ << "receiveAll: overall timeout reached" << __E__; + break; + } + } + + // Phase 2: Retransmit missing packets + if(totalKnown && receivedPackets.size() < static_cast(totalPackets)) + { + for(unsigned int retry = 0; retry < retransmitMaxRetries; ++retry) + { + // Build list of missing packet indices + std::set missing; + for(uint16_t i = 0; i < totalPackets; ++i) + { + if(receivedPackets.find(i) == receivedPackets.end()) + missing.insert(i); + } + + if(missing.empty()) + break; + + if(verbose) + __COUT__ << "receiveAll: retry " << retry + 1 << "/" + << retransmitMaxRetries << ", requesting retransmit of " + << missing.size() << " packets" << __E__; + + // Build retransmit request: magic(2 bytes) + list of uint16 indices + std::string retransmitReq; + retransmitReq.resize(2 + missing.size() * 2); + uint16_t netMagic = htons(RETRANSMIT_MAGIC); + std::memcpy(&retransmitReq[0], &netMagic, 2); + size_t pos = 2; + for(uint16_t idx : missing) + { + uint16_t netIdx = htons(idx); + std::memcpy(&retransmitReq[pos], &netIdx, 2); + pos += 2; + } + + // Send retransmit request back to sender (acknowledge to last receive addr) + { + // Use sendto directly to fromAddress_ (the sender) + int sendToSize = sendto(socketNumber_, + retransmitReq.data(), + retransmitReq.size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + if(sendToSize <= 0) + { + __COUT_WARN__ << "receiveAll: failed to send retransmit request: " + << strerror(errno) << __E__; + } + } + + // Receive retransmitted packets + while(true) + { + std::string rawPacket; + int rc = receive( + rawPacket, timeoutSeconds, 0 /*timeoutUSeconds*/, false /*verbose*/); + if(rc < 0) + break; // timeout, will retry + + if(rawPacket.size() < RETRANSMIT_HEADER_SIZE) + continue; + + uint16_t magic2, packetIndex2, pktTotal2, payloadSize2; + std::memcpy(&magic2, rawPacket.data() + 0, 2); + std::memcpy(&packetIndex2, rawPacket.data() + 2, 2); + std::memcpy(&pktTotal2, rawPacket.data() + 4, 2); + std::memcpy(&payloadSize2, rawPacket.data() + 6, 2); + magic2 = ntohs(magic2); + packetIndex2 = ntohs(packetIndex2); + pktTotal2 = ntohs(pktTotal2); + payloadSize2 = ntohs(payloadSize2); + + if(magic2 != RETRANSMIT_MAGIC) + continue; + + size_t actualPayload2 = rawPacket.size() - RETRANSMIT_HEADER_SIZE; + if(actualPayload2 > payloadSize2) + actualPayload2 = payloadSize2; + + receivedPackets[packetIndex2] = + rawPacket.substr(RETRANSMIT_HEADER_SIZE, actualPayload2); + + if(verbose) + __COUTT__ << "receiveAll: retransmit received packet " << packetIndex2 + << "/" << totalPackets + << " total_received=" << receivedPackets.size() + << std::endl; + + // Check if we now have all packets + if(receivedPackets.size() >= static_cast(totalPackets)) + break; + } + + if(receivedPackets.size() >= static_cast(totalPackets)) + break; + } + } + + // Phase 3: Send "done" acknowledgment to sender (magic + 0xFFFF) + { + std::string doneSignal(4, '\0'); + uint16_t netMagic = htons(RETRANSMIT_MAGIC); + uint16_t netDone = htons(0xFFFF); + std::memcpy(&doneSignal[0], &netMagic, 2); + std::memcpy(&doneSignal[2], &netDone, 2); + sendto(socketNumber_, + doneSignal.data(), + doneSignal.size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + } + + // Phase 4: Assemble full buffer in order + if(!totalKnown || receivedPackets.empty()) + { + __SS__ << "receiveAll: failed to receive any retransmission-mode packets" + << __E__; + __SS_THROW__; + } + + if(receivedPackets.size() < static_cast(totalPackets)) + { + // Build list of still-missing indices for the error message + std::string missingStr; + for(uint16_t i = 0; i < totalPackets; ++i) + { + if(receivedPackets.find(i) == receivedPackets.end()) + { + if(!missingStr.empty()) + missingStr += ", "; + missingStr += std::to_string(i); + } + } + __SS__ << "receiveAll: failed to receive all packets after " + << retransmitMaxRetries << " retransmit retries. " + << "Received " << receivedPackets.size() << "/" << totalPackets + << " packets. Missing indices: [" << missingStr << "]" << __E__; + __SS_THROW__; + } + + // Assemble in order + buffer.clear(); + for(uint16_t i = 0; i < totalPackets; ++i) + buffer += receivedPackets[i]; + + if(verbose) + __COUT__ << "receiveAll: successfully assembled " << buffer.size() + << " bytes from " << totalPackets << " packets in " + << std::chrono::duration_cast(clock::now() - + start) + .count() + << " ms" << __E__; + + return 0; +} //end receiveAll() + +//============================================================================== +/// sendAndReceiveAll() sends a command then reliably receives the full +/// multi-packet response using the retransmission protocol. +/// The remote sender must use acknowledge() with enableRetransmission=true. +/// +/// This mirrors sendAndReceive() but uses receiveAll() instead of the +/// simple multi-packet loop, providing: +/// - Packet ordering via indexed headers +/// - Dropped packet detection via known total count +/// - Automatic retransmit requests for missing packets +/// - Fully assembled, ordered response buffer +/// +/// Throws on timeout or error. +std::string TransceiverSocket::sendAndReceiveAll( + Socket& toSocket, + const std::string& sendBuffer, + unsigned int timeoutSeconds /* = 5 */, + unsigned int retransmitMaxRetries /* = 10 */, + bool verbose /* = false */) +{ + using clock = std::chrono::steady_clock; + auto start = clock::now(); + + // lockout other sender and receive attempts for the remainder of the scope + std::lock_guard lock(sendAndReceiveMutex_); + + flush(); // make sure nothing to read before sending + send(toSocket, sendBuffer, verbose); + + __COUTT__ << " ----> Time sendAndReceiveAll '" << sendBuffer + << "' (socketNumber=" << socketNumber_ << ") check ==> " + << std::chrono::duration_cast(clock::now() - + start) + .count() + << " milliseconds. PID=" << getpid() + << " TID=" << std::this_thread::get_id() << std::endl; + + std::string receiveBuffer; + if(receiveAll(receiveBuffer, timeoutSeconds, retransmitMaxRetries, verbose) < 0) + { + __SS__ << "Timeout (" << timeoutSeconds + << " s) or Error receiving retransmission response from remote ip:port " + << toSocket.getIPAddress() << ":" << toSocket.getPort() + << " to this ip:port " << Socket::getIPAddress() << ":" + << Socket::getPort() << __E__; + __SS_ONLY_THROW__; + } + + __COUTT__ << " ----> Time sendAndReceiveAll complete: " << receiveBuffer.size() + << " bytes (socketNumber=" << socketNumber_ << ") ==> " + << std::chrono::duration_cast(clock::now() - + start) + .count() + << " milliseconds. PID=" << getpid() + << " TID=" << std::this_thread::get_id() << std::endl; + + return receiveBuffer; +} //end sendAndReceiveAll() diff --git a/otsdaq/NetworkUtilities/TransceiverSocket.h b/otsdaq/NetworkUtilities/TransceiverSocket.h index 2888a5c7..90f393dc 100644 --- a/otsdaq/NetworkUtilities/TransceiverSocket.h +++ b/otsdaq/NetworkUtilities/TransceiverSocket.h @@ -5,6 +5,7 @@ #include "otsdaq/NetworkUtilities/TransmitterSocket.h" #include +#include namespace ots { @@ -14,11 +15,42 @@ class TransceiverSocket : public TransmitterSocket, public ReceiverSocket TransceiverSocket(std::string IPAddress, unsigned int port = 0); virtual ~TransceiverSocket(void); - /// acknowledge() responds to last receive location + /// acknowledge() responds to last receive location. + /// When enableRetransmission is true, delegates to sendAll() for reliable + /// multi-packet transfer with retransmit handling. int acknowledge(const std::string& buffer, bool verbose = false, size_t maxChunkSize = 1500, - unsigned int interPacketGapUSeconds = 0); + unsigned int interPacketGapUSeconds = 0, + bool enableRetransmission = false); + + /// sendAll() sends a buffer to the last receive address using the + /// retransmission protocol. This is fully self-contained: it builds + /// headered packets, sends them all, then waits for retransmit requests + /// from the receiver and resends any missing packets. Only returns when + /// the transfer is complete (receiver sends "done") or timeout expires. + /// + /// Each packet is prepended with an 8-byte retransmission header: + /// [0-1] magic marker 0xD2C4 (network byte order) + /// [2-3] packet index (0-based, network byte order uint16) + /// [4-5] total packet count (network byte order uint16) + /// [6-7] payload size in this packet (network byte order uint16) + /// + /// Returns 0 on success. + int sendAll(const std::string& buffer, + bool verbose = false, + size_t maxChunkSize = 65500, + unsigned int interPacketGapUSeconds = 0); + + /// receiveAll() receives a multi-packet retransmission-mode response. + /// It assembles the full message from individually-headered packets, + /// detects missing packets by index, and requests retransmission + /// from the sender for any dropped packets. + /// Returns 0 on success (assembled buffer placed in 'buffer'), -1 on failure. + int receiveAll(std::string& buffer, + unsigned int timeoutSeconds = 5, + unsigned int retransmitMaxRetries = 10, + bool verbose = false); std::string sendAndReceive(Socket& toSocket, const std::string& sendBuffer, @@ -27,6 +59,24 @@ class TransceiverSocket : public TransmitterSocket, public ReceiverSocket bool verbose = false, unsigned int interPacketTimeoutUSeconds = 10000); + /// sendAndReceiveAll() sends a command then uses the retransmission protocol + /// to reliably receive the full multi-packet response. The sender must use + /// acknowledge() with enableRetransmission=true (or sendAll()). This method: + /// 1. Flushes and sends the request + /// 2. Receives all retransmission-headered packets + /// 3. Detects missing packets and sends retransmit requests + /// 4. Assembles and returns the complete response + /// Throws on timeout or error. + std::string sendAndReceiveAll(Socket& toSocket, + const std::string& sendBuffer, + unsigned int timeoutSeconds = 5, + unsigned int retransmitMaxRetries = 10, + bool verbose = false); + + /// Retransmission protocol constants + static constexpr uint16_t RETRANSMIT_MAGIC = 0xD2C4; + static constexpr size_t RETRANSMIT_HEADER_SIZE = 8; + protected: TransceiverSocket(void); diff --git a/otsdaq/SupervisorInfo/SupervisorInfo.h b/otsdaq/SupervisorInfo/SupervisorInfo.h index ffc25360..b7d34ba6 100755 --- a/otsdaq/SupervisorInfo/SupervisorInfo.h +++ b/otsdaq/SupervisorInfo/SupervisorInfo.h @@ -107,6 +107,12 @@ class SupervisorInfo int64_t getAvailableLogSpaceKB (void) const { return availableLogSpaceKB_.size() > 0 ? availableLogSpaceKB_.front().second : 0; } int64_t getAvailableDataSpaceKB (void) const { return availableDataSpaceKB_.size() > 0 ? availableDataSpaceKB_.front().second : 0; } + // Actual lookback span (seconds) between the newest sample and the sample at the + // given slot. Used by the disk-space alarm to refuse rates whose historical + // sample is still too young to be meaningful (e.g. just after startup). + time_t getLogSpaceSampleAgeSeconds (size_t slot) const { return availableLogSpaceKB_.size() > slot ? availableLogSpaceKB_.front().first - availableLogSpaceKB_[slot].first : 0; } + time_t getDataSpaceSampleAgeSeconds (size_t slot) const { return availableDataSpaceKB_.size() > slot ? availableDataSpaceKB_.front().first - availableDataSpaceKB_[slot].first : 0; } + float getLogUsageRateLastHourKBps (void) const { return availableLogSpaceKB_.size() > 9 ? (availableLogSpaceKB_[9].second - availableLogSpaceKB_.front().second)*1.0f/ std::max(static_cast(1), availableLogSpaceKB_.front().first - availableLogSpaceKB_[9].first) : 0; } float getLogUsageRateLastHalfHourKBps (void) const { return availableLogSpaceKB_.size() > 7 ? (availableLogSpaceKB_[7].second - availableLogSpaceKB_.front().second)*1.0f/ diff --git a/otsdaq/TableCore/TableGroupKey.cc b/otsdaq/TableCore/TableGroupKey.cc index 5d180dbc..e9351bb3 100644 --- a/otsdaq/TableCore/TableGroupKey.cc +++ b/otsdaq/TableCore/TableGroupKey.cc @@ -206,8 +206,8 @@ std::string TableGroupKey::getFullGroupString(const std::string& groupName, "Group Name must be alpha-numeric: \"" + groupName + "\"") << std::endl; - __COUT_ERR__ << ss.str(); - __SS_THROW__; + __COUT__ << ss.str(); + __SS_ONLY_THROW__; } } } diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index c620111f..dac4fedb 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -13,6 +13,7 @@ cet_make_exec(NAME otsdaq_load_json_document LIBRARIES otsdaq::ConfigurationInte cet_script(ALWAYS_COPY ots_common.sh + ots_compile_aliases.sh EclipseBuild.sh ots ots_remote_start diff --git a/tools/ots b/tools/ots index 48d34157..b38cd848 100644 --- a/tools/ots +++ b/tools/ots @@ -126,6 +126,25 @@ SOFTKILLALL=0 #only kill user data areas KILLALL=0 ISMACROMAKER=0 ISFINDLOGFILES=0 +ISFINDLOGFILES_LOCAL=0 +ISFINDLOGFILES_ARTDAQ=0 +ISDELETEOLDLOGS=0 +ISDELETEOLDLOGS_LOCAL=0 +ISDELETEOLDLOGS_ARTDAQ=0 +DELETE_LOGS_TIME_INT="" +DELETE_LOGS_TIME_UNIT="" +DELETE_LOGS_NOPROMPT=0 +DELETE_LOGS_DRYRUN=0 +DELETE_LOGS_GRAND_TOTAL=0 +DELETE_LOGS_LAST_COUNT=0 +ISLOGSCAN_ERROR=0 +ISLOGSCAN_ERROR_ARTDAQ=0 +ISLOGSCAN_ERROR_LOCAL=0 +ISLOGSCAN_WARN=0 +ISLOGSCAN_WARN_ARTDAQ=0 +ISLOGSCAN_WARN_LOCAL=0 +LOGSCAN_TIME_INT="" +LOGSCAN_TIME_UNIT="" ISCONFIG=0 QUIET=1 CHROME=0 @@ -198,6 +217,91 @@ while :; do info "DISPLAYING LOG FILE LOCATIONS!" ISFINDLOGFILES=1 ;; + --logfind-artdaq | -ll) + # Local log find + ARTDAQ hosts from latest ranks.txt. + # Used by fast_ots_setup.sh 'artdaqlogs' action. + info "DISPLAYING LOCAL + ARTDAQ HOST LOG FILE LOCATIONS!" + ISFINDLOGFILES_ARTDAQ=1 + ;; + --logfind-local | -lll) + # Local-only log find (leaf): search $USER_DATA/Logs on THIS host only (no remote SSH, no ranks.txt). + # Used by fast_ots_setup.sh 'logs' action to provide results back to a primary ots -l or ots -ll caller. + info "DISPLAYING LOCAL LOG FILE LOCATIONS (no remote search)!" + ISFINDLOGFILES_LOCAL=1 + ;; + --logdelete | -lx) + # Delete old logs older than [dryrun] (local + all remote + ARTDAQ hosts) + info "DELETE OLD LOGS (ALL HOSTS)!" + ISDELETEOLDLOGS=1 + DELETE_LOGS_TIME_INT="$2" + DELETE_LOGS_TIME_UNIT="$3" + if [[ "$4" == "dryrun" ]]; then DELETE_LOGS_DRYRUN=1; shift; fi + shift 2 + ;; + --logdelete-artdaq | -llx) + # Delete old logs older than [dryrun] (local + ARTDAQ hosts) + info "DELETE OLD LOGS (LOCAL + ARTDAQ HOSTS)!" + ISDELETEOLDLOGS_ARTDAQ=1 + DELETE_LOGS_TIME_INT="$2" + DELETE_LOGS_TIME_UNIT="$3" + if [[ "$4" == "dryrun" ]]; then DELETE_LOGS_DRYRUN=1; shift; fi + shift 2 + ;; + --logdelete-local | -lllx) + # Delete old logs older than [dryrun] (local only, leaf) + info "DELETE OLD LOGS (LOCAL ONLY)!" + ISDELETEOLDLOGS_LOCAL=1 + DELETE_LOGS_TIME_INT="$2" + DELETE_LOGS_TIME_UNIT="$3" + if [[ "$4" == "dryrun" ]]; then DELETE_LOGS_DRYRUN=1; shift; fi + shift 2 + ;; + --logscan-error | -le) + # Show ERROR lines from log files newer than (local + all remote + ARTDAQ hosts) + info "SCAN RECENT LOGS FOR ERRORS (ALL HOSTS)!" + ISLOGSCAN_ERROR=1 + if [[ "$2" =~ ^[0-9]+$ ]]; then LOGSCAN_TIME_INT="$2"; LOGSCAN_TIME_UNIT="$3"; shift 2 + else LOGSCAN_TIME_INT=30; LOGSCAN_TIME_UNIT="second"; fi + ;; + --logscan-error-artdaq | -lle) + # Show ERROR lines from log files newer than (local + ARTDAQ hosts) + info "SCAN RECENT LOGS FOR ERRORS (LOCAL + ARTDAQ HOSTS)!" + ISLOGSCAN_ERROR_ARTDAQ=1 + if [[ "$2" =~ ^[0-9]+$ ]]; then LOGSCAN_TIME_INT="$2"; LOGSCAN_TIME_UNIT="$3"; shift 2 + else LOGSCAN_TIME_INT=30; LOGSCAN_TIME_UNIT="second"; fi + ;; + --logscan-error-local | -llle) + # Show ERROR lines from log files newer than (local only, leaf) + info "SCAN RECENT LOGS FOR ERRORS (LOCAL ONLY)!" + ISLOGSCAN_ERROR_LOCAL=1 + if [[ "$2" =~ ^[0-9]+$ ]]; then LOGSCAN_TIME_INT="$2"; LOGSCAN_TIME_UNIT="$3"; shift 2 + else LOGSCAN_TIME_INT=30; LOGSCAN_TIME_UNIT="second"; fi + ;; + --logscan-warn | -lw) + # Show WARN|ERROR lines from log files newer than (local + all remote + ARTDAQ hosts) + info "SCAN RECENT LOGS FOR WARNINGS/ERRORS (ALL HOSTS)!" + ISLOGSCAN_WARN=1 + if [[ "$2" =~ ^[0-9]+$ ]]; then LOGSCAN_TIME_INT="$2"; LOGSCAN_TIME_UNIT="$3"; shift 2 + else LOGSCAN_TIME_INT=30; LOGSCAN_TIME_UNIT="second"; fi + ;; + --logscan-warn-artdaq | -llw) + # Show WARN|ERROR lines from log files newer than (local + ARTDAQ hosts) + info "SCAN RECENT LOGS FOR WARNINGS/ERRORS (LOCAL + ARTDAQ HOSTS)!" + ISLOGSCAN_WARN_ARTDAQ=1 + if [[ "$2" =~ ^[0-9]+$ ]]; then LOGSCAN_TIME_INT="$2"; LOGSCAN_TIME_UNIT="$3"; shift 2 + else LOGSCAN_TIME_INT=30; LOGSCAN_TIME_UNIT="second"; fi + ;; + --logscan-warn-local | -lllw) + # Show WARN|ERROR lines from log files newer than (local only, leaf) + info "SCAN RECENT LOGS FOR WARNINGS/ERRORS (LOCAL ONLY)!" + ISLOGSCAN_WARN_LOCAL=1 + if [[ "$2" =~ ^[0-9]+$ ]]; then LOGSCAN_TIME_INT="$2"; LOGSCAN_TIME_UNIT="$3"; shift 2 + else LOGSCAN_TIME_INT=30; LOGSCAN_TIME_UNIT="second"; fi + ;; + --logdelete-noprompt) + # Internal flag: skip the "are you sure?" prompt (used by nested calls) + DELETE_LOGS_NOPROMPT=1 + ;; --chrome | -c) # Enable launching chrome after startup info "CHROME LAUNCH ENABLED!" @@ -262,39 +366,80 @@ while :; do out "${Purple}${REV} ************* ots Usage ************** ${RstClr}" out "${Purple}${REV} ****************************************************** ${RstClr}${Reset}" out - out "To kill all otsdaq running processes, please use any of these options:" + out "To ${Bold}kill${RstClr} all running processes:" out " --killall --kill --kx -k" out " e.g.: ots --kx" out - out "To start otsdaq in 'wiz mode' please use any of these options:" + out "To ${Bold}start${RstClr} in wiz mode:" out " --configure --config --wizard --wiz -w" out " e.g.: ots --wiz" out - out "To start otsdaq with 'verbose mode' enabled, add one of these options:" + out "To ${Bold}start${RstClr} with verbose mode enabled:" out " --verbose -v" out " e.g.: ots --wiz -v or ots --verbose" out - out "To display potential log file locations based on current otsdaq configuration, add one of these options:" + out "To ${Bold}display log files${RstClr} based on current configuration:" out " --logfind -l" + out " Display log files across all hosts." out " e.g.: ots -l or ots --logfind" + out " --logfind-artdaq -ll" + out " Display log files on local + ARTDAQ hosts from latest ranks.txt." + out " e.g.: ots -ll or ots --logfind-artdaq" + out " --logfind-local -lll" + out " Display log files on local host only (leaf)." + out " e.g.: ots -lll or ots --logfind-local" out - out "To start otsdaq and launch google-chrome, add one of these options:" + out "To ${Bold}delete old log files${RstClr}, use with [dryrun]:" + out " --logdelete -lx [dryrun]" + out " Delete log files older than N units (local + all remote + ARTDAQ hosts)." + out " e.g.: ots -lx 5 minutes or ots -lx 1 week dryrun" + out " --logdelete-artdaq -llx [dryrun]" + out " Delete log files older than N units (local + ARTDAQ hosts)." + out " e.g.: ots -llx 2 hours or ots -llx 2 hours dryrun" + out " --logdelete-local -lllx [dryrun]" + out " Delete log files older than N units (local only, leaf)." + out " e.g.: ots -lllx 1 day or ots -lllx 1 day dryrun" + out " Time units: seconds, minutes, hours, days, weeks, years (singular also accepted)" + out " Add 'dryrun' to preview what would be deleted without actually deleting." + out + out "To ${Bold}scan recent log files for errors/warnings${RstClr}, use with optional (default: 30 seconds):" + out " --logscan-error -le [N unit]" + out " Show last 10 ERROR lines from log files newer than N units (local + all remote + ARTDAQ hosts)." + out " e.g.: ots -le or ots -le 5 minutes or ots -le 1 hour" + out " --logscan-error-artdaq -lle [N unit]" + out " Show last 10 ERROR lines from log files newer than N units (local + ARTDAQ hosts)." + out " e.g.: ots -lle or ots -lle 2 hours" + out " --logscan-error-local -llle [N unit]" + out " Show last 10 ERROR lines from log files newer than N units (local only, leaf)." + out " e.g.: ots -llle or ots -llle 30 minutes" + out " --logscan-warn -lw [N unit]" + out " Show last 10 WARN|ERROR lines from log files newer than N units (local + all remote + ARTDAQ hosts)." + out " e.g.: ots -lw or ots -lw 5 minutes or ots -lw 1 hour" + out " --logscan-warn-artdaq -llw [N unit]" + out " Show last 10 WARN|ERROR lines from log files newer than N units (local + ARTDAQ hosts)." + out " e.g.: ots -llw or ots -llw 2 hours" + out " --logscan-warn-local -lllw [N unit]" + out " Show last 10 WARN|ERROR lines from log files newer than N units (local only, leaf)." + out " e.g.: ots -lllw or ots -lllw 30 minutes" + out " Time units: seconds, minutes, hours, days, weeks, years (singular also accepted)" + out + out "To ${Bold}start${RstClr} and launch google-chrome:" out " --chrome -c" out " e.g.: ots --wiz -c or ots --chrome" out - out "To start otsdaq and launch firefox, add one of these options:" + out "To ${Bold}start${RstClr} and launch firefox:" out " --firefox -f" out " e.g.: ots --wiz -f or ots --firefox" out - out "To backup and not overwrite previous quiet log files, add one of these options:" + out "To ${Bold}backup${RstClr} and not overwrite previous quiet log files:" out " --backup -b" out " e.g.: ots -b or ots --backup" out - out "To disable otsdaq startup checking and relaunching, add one of these options:" + out "To ${Bold}disable startup checking${RstClr} and relaunching:" out " --relaunch -r" out " e.g.: ots -r or ots --relaunch" out - out "To disable otsdaq remote process startup, add one of these options:" + out "To ${Bold}disable remote startup${RstClr}:" out " --startremote -s" out " e.g.: ots -s or ots --startremote" out @@ -2275,8 +2420,469 @@ launchOTS() { } #end launchOTS export -f launchOTS -launchOTSLogFind() { +# _logfind_artdaq_symlink_name +# Given an ARTDAQ process subdirectory name like "DTC0Trk03-mu2e-trk-03-16100", +# extracts the process name (e.g. "DTC0Trk03") and searches ARTDAQ role directories +# (boardreader/, eventbuilder/, dispatcher/, datalogger/, routingmanager/) under +# for the latest symlink matching "run*-.log". +# If found, prints the symlink name without .log extension (e.g. "run120431-DTC0Trk03"). +# Prints nothing if no match found. +_logfind_artdaq_symlink_name() { + local _subdir_name="$1" + local _logs_root="$2" + + # Extract the ARTDAQ process name from the subdirectory name. + # Subdirectory format: -- + # e.g. "DTC0Trk03-mu2e-trk-03-16100" → "DTC0Trk03" + # "EvB0Trk03-mu2e-trk-03-16101" → "EvB0Trk03" + # The process name is everything before the first "-mu2e-" or "--" portion. + # A robust approach: the process name ends where "-" begins; host portions + # typically start with "mu2e-" or contain the hostname. Use the first match of + # "-mu2e-" as the split point; fall back to the full subdir name if not found. + local _proc_name + if [[ "$_subdir_name" == *"-mu2e-"* ]]; then + _proc_name="${_subdir_name%%-mu2e-*}" + else + # Try splitting on pattern: name-host-port where port is digits at end + # e.g. "SomeProc-somehost-12345" + _proc_name="${_subdir_name}" + fi + + [ -z "$_proc_name" ] && return 1 + + local _role_dir _latest_link _latest_run _link _linkname _run_num + _latest_link="" + _latest_run=0 + + for _role_dir in boardreader eventbuilder dispatcher datalogger routingmanager; do + [ -d "${_logs_root}/${_role_dir}" ] || continue + for _link in "${_logs_root}/${_role_dir}/"run*"-${_proc_name}.log"; do + [ -L "$_link" ] || continue + _linkname=$(basename "$_link") + # Extract run number from "run-.log" + _run_num="${_linkname#run}" + _run_num="${_run_num%%-*}" + if [ "$_run_num" -gt "$_latest_run" ] 2>/dev/null; then + _latest_run="$_run_num" + _latest_link="$_linkname" + fi + done + done + + if [ -n "$_latest_link" ]; then + # Output "run120431 DTC0Trk03" (replace first dash with space) + local _result="${_latest_link%.log}" + echo "${_result/-/ }" + return 0 + fi + return 1 +} + +# _logdelete_normalize_time_unit +# Normalizes time unit strings to a canonical singular form. +# Handles: second(s), minute(s), hour(s), day(s), week(s), year(s) +# Prints the canonical form or empty string on failure. +_logdelete_normalize_time_unit() { + local _unit="$1" + # strip trailing 's' for plural handling + _unit="${_unit%s}" + case "$_unit" in + second) echo "second" ;; + minute) echo "minute" ;; + hour) echo "hour" ;; + day) echo "day" ;; + week) echo "week" ;; + year) echo "year" ;; + *) echo "" ;; + esac +} + +# _logdelete_time_to_minutes +# Converts a time value to minutes for use with find -mmin. +# Prints the number of minutes, or empty string on failure. +_logdelete_time_to_minutes() { + local _val="$1" + local _unit="$2" + case "$_unit" in + second) echo $(( _val / 60 > 0 ? _val / 60 : 1 )) ;; + minute) echo "$_val" ;; + hour) echo $(( _val * 60 )) ;; + day) echo $(( _val * 1440 )) ;; + week) echo $(( _val * 10080 )) ;; + year) echo $(( _val * 525600 )) ;; + *) echo "" ;; + esac +} + +# _logdelete_validate_and_prompt +# Validates DELETE_LOGS_TIME_INT and DELETE_LOGS_TIME_UNIT. +# If valid and DELETE_LOGS_NOPROMPT is not set, prompts the user for confirmation. +# Returns 0 on success (proceed with deletion), 1 on failure/abort. +_logdelete_validate_and_prompt() { + # Validate time integer + if ! [[ "$DELETE_LOGS_TIME_INT" =~ ^[0-9]+$ ]] || [ "$DELETE_LOGS_TIME_INT" -le 0 ]; then + error "Invalid time value: '${DELETE_LOGS_TIME_INT}'. Must be a positive integer." + out " Usage: ots -lx [dryrun]" + out " Time units: seconds, minutes, hours, days, weeks, years (singular also accepted)" + out " Example: ots -lx 5 minutes or ots -lx 5 minutes dryrun" + return 1 + fi + + # Normalize time unit + local _norm_unit + _norm_unit=$(_logdelete_normalize_time_unit "$DELETE_LOGS_TIME_UNIT") + if [ -z "$_norm_unit" ]; then + error "Invalid time unit: '${DELETE_LOGS_TIME_UNIT}'. Must be one of: seconds, minutes, hours, days, weeks, years." + out " Usage: ots -lx [dryrun]" + out " Time units: seconds, minutes, hours, days, weeks, years (singular also accepted)" + out " Example: ots -lx 5 minutes or ots -lx 5 minutes dryrun" + return 1 + fi + DELETE_LOGS_TIME_UNIT="$_norm_unit" + + # Build human-readable label + local _label="${DELETE_LOGS_TIME_INT} ${DELETE_LOGS_TIME_UNIT}" + if [ "$DELETE_LOGS_TIME_INT" -ne 1 ]; then + _label="${_label}s" + fi + + # Prompt for confirmation (unless noprompt) + if [ "$DELETE_LOGS_NOPROMPT" -ne 1 ]; then + out + if [ "$DELETE_LOGS_DRYRUN" -eq 1 ]; then + out "${Bold}${Red}WARNING: [this is a DRYRUN otherwise...] This will delete all log files older than ${_label} under \$USER_DATA/Logs.${RstClr}" + read -p "[this is a DRYRUN] Are you sure you want to delete all logs older than ${_label}? [Y/N] " _answer + else + out "${Bold}${Red}WARNING: This will delete all log files older than ${_label} under \$USER_DATA/Logs.${RstClr}" + out " Usage: ots -lx [dryrun]" + read -p "Are you sure you want to delete (if you are not sure, add dryrun parameter) all logs older than ${_label}? [Y/N] " _answer + fi + if [[ "$_answer" != "Y" && "$_answer" != "y" ]]; then + warning "Aborting log deletion." + return 1 + fi + # Mark as prompted so nested calls don't re-prompt + DELETE_LOGS_NOPROMPT=1 + fi + + return 0 +} + +# _logdelete_get_context_hosts +# Parses the OTS XML config to extract unique context hostnames. +# Prints each unique hostname on a separate line (excluding THIS_HOST). +_logdelete_get_context_hosts() { + local _xml_file="${XDAQ_CONFIGURATION_DATA_PATH}/ots.xml" + if [ ! -f "$_xml_file" ]; then + return 1 + fi + + local _re="http(s*)://(.+):([0-9]+)" + declare -A _seen_hosts + _seen_hosts["${THIS_HOST}"]=1 + + while read -r _line; do + if [[ "$_line" == *"xc:Context"* && "$_line" == *"url"* ]]; then + if [[ "$_line" =~ $_re ]]; then + local _host="${BASH_REMATCH[2]}" + # Skip if already seen + [ -n "${_seen_hosts[$_host]+x}" ] && continue + _seen_hosts["$_host"]=1 + # Skip if it's the local host (with or without domain/data suffix) + local _h_stripped="${_host/-data/}" + _h_stripped="${_h_stripped/-ipmi/}" + if [[ "$_h_stripped" == "${THIS_HOST}" || "$_h_stripped" == "${THIS_HOST}."* ]]; then + continue + fi + echo "$_host" + fi + fi + done < "$_xml_file" +} + +# _logdelete_delete_old_logs +# Deletes log files under $USER_DATA/Logs that are older than the specified time. +# Respects DELETE_LOGS_TIME_INT and DELETE_LOGS_TIME_UNIT (must be validated first). +_logdelete_delete_old_logs() { + local _mmin + _mmin=$(_logdelete_time_to_minutes "$DELETE_LOGS_TIME_INT" "$DELETE_LOGS_TIME_UNIT") + if [ -z "$_mmin" ] || [ "$_mmin" -le 0 ]; then + error "Could not compute time threshold in minutes." + return 1 + fi + + local _label="${DELETE_LOGS_TIME_INT} ${DELETE_LOGS_TIME_UNIT}" + if [ "$DELETE_LOGS_TIME_INT" -ne 1 ]; then + _label="${_label}s" + fi + + out + out "${Bold}${Cyan}Searching for local log files older than ${_label} to delete (under \$USER_DATA/Logs)...${RstClr}" + + local _logs_dir="${USER_DATA}/Logs" + if [ ! -d "$_logs_dir" ]; then + warning "Logs directory not found: ${_logs_dir}" + return 1 + fi + + # Define local file age helper (since _logfind_file_age may have been unset) + _logdelete_file_age() { + local file_path="$1" + local file_mtime now_epoch age_secs days hours mins + file_mtime=$(stat -L -c %Y "$file_path" 2>/dev/null) + if [ -z "$file_mtime" ]; then echo ""; return; fi + now_epoch=$(date +%s) + age_secs=$(( now_epoch - file_mtime )) + days=$(( age_secs / 86400 )) + hours=$(( (age_secs % 86400) / 3600 )) + mins=$(( (age_secs % 3600) / 60 )) + if [ $days -gt 0 ]; then + echo "${days} days ago" + elif [ $hours -gt 0 ]; then + echo "${hours} hours ago" + elif [ $mins -gt 0 ]; then + echo "${mins} minutes ago" + else + echo "just now" + fi + } + + # Find files older than the threshold + local _old_files + _old_files=$(find -L "$_logs_dir" -type f -mmin +${_mmin} -not -name 'otsdaq_is_alive-*.dat' 2>/dev/null) + + if [ -z "$_old_files" ]; then + out " ${IBlack}No log files older than ${_label} found.${RstClr}" + return 0 + fi + + local _count _shown _max_show=20 + _count=$(echo "$_old_files" | wc -l) + DELETE_LOGS_LAST_COUNT=$_count + + # Header with truncation note if needed + if [ "$_count" -gt "$_max_show" ]; then + out " Here is the list of ${Bold}${_count}${RstClr} files to delete (showing first ${_max_show}):" + else + out " Here is the list of ${Bold}${_count}${RstClr} files to delete:" + fi + out + + # List files that would be deleted (truncate to _max_show) + _shown=0 + while IFS= read -r _file; do + if [ $_shown -ge $_max_show ]; then + out " ${IBlack}... and $(( _count - _max_show )) more files not shown.${RstClr}" + break + fi + local _age + _age=$(_logdelete_file_age "$_file" 2>/dev/null) + out " ${Red}DELETE${RstClr} ${_file} ${IBlack}(${_age})${RstClr}" + _shown=$(( _shown + 1 )) + done <<< "$_old_files" + + # Show count again after the list if > 5 files (easier to find in output) + if [ "$_count" -gt 5 ]; then + out + out " Total files to delete: ${Bold}${_count}${RstClr}" + fi + + out + + if [ "$DELETE_LOGS_DRYRUN" -eq 1 ]; then + out " ${Yellow}[DRY RUN] No files were deleted.${RstClr}" + else + # Delete the files + while IFS= read -r _file; do + rm -f "$_file" + done <<< "$_old_files" + out " ${Bold}Deleted ${_count} files.${RstClr}" + fi + + # Clean up empty subdirectories under Logs/ (Logs/ itself is preserved by -mindepth 1) + out + out " ${IBlack}Checking for empty subdirectories to clean up...${RstClr}" + find -L "$_logs_dir" -mindepth 1 -type d 2>/dev/null | while IFS= read -r _dir; do + local _dir_count + _dir_count=$(find -L "$_dir" -maxdepth 1 -type f 2>/dev/null | wc -l) + if [ "$_dir_count" -eq 0 ]; then + if [ "$DELETE_LOGS_DRYRUN" -eq 1 ]; then + out " ${Red}RMDIR${RstClr} ${_dir} ${IBlack}(empty, would be removed)${RstClr}" + else + out " ${Red}RMDIR${RstClr} ${_dir} ${IBlack}(empty, removing)${RstClr}" + rmdir "$_dir" 2>/dev/null + fi + else + out " ${IBlack}KEEP ${_dir} (${_dir_count} files)${RstClr}" + fi + done + + if [ "$DELETE_LOGS_DRYRUN" -eq 1 ]; then + out + out " ${Bold}${Yellow}This was a DRY RUN. No files or directories were deleted.${RstClr}" + out " ${Yellow}To actually delete, remove 'dryrun' from the command:${RstClr}" + out " ${Yellow} e.g.: ots -lllx ${DELETE_LOGS_TIME_INT} ${DELETE_LOGS_TIME_UNIT}${RstClr}" + fi + + out " ${IBlack}Done.${RstClr}" +} +export -f _logdelete_delete_old_logs + +# _logscan_validate +# Validates LOGSCAN_TIME_INT and LOGSCAN_TIME_UNIT (reuses _logdelete_ helpers). +# Returns 0 on success, 1 on failure. +_logscan_validate() { + if ! [[ "$LOGSCAN_TIME_INT" =~ ^[0-9]+$ ]] || [ "$LOGSCAN_TIME_INT" -le 0 ]; then + error "Invalid time value: '${LOGSCAN_TIME_INT}'. Must be a positive integer." + out " Usage: ots -le or ots -lw " + out " Time units: seconds, minutes, hours, days, weeks, years (singular also accepted)" + out " Example: ots -le 5 minutes or ots -lw 1 hour" + return 1 + fi + + local _norm_unit + _norm_unit=$(_logdelete_normalize_time_unit "$LOGSCAN_TIME_UNIT") + if [ -z "$_norm_unit" ]; then + error "Invalid time unit: '${LOGSCAN_TIME_UNIT}'. Must be one of: seconds, minutes, hours, days, weeks, years." + out " Usage: ots -le or ots -lw " + out " Time units: seconds, minutes, hours, days, weeks, years (singular also accepted)" + return 1 + fi + LOGSCAN_TIME_UNIT="$_norm_unit" + + return 0 +} + +# _logscan_display_local +# Finds log files under $USER_DATA/Logs newer than LOGSCAN_TIME_INT LOGSCAN_TIME_UNIT +# (-mmin -N: modified within the last N minutes, i.e. newer than the threshold). +# For each file with matches, enumerates the last 10 lines (0-9) with type label and +# count summary; each line is truncated to fit terminal width. +_logscan_display_local() { + local _pattern_label="$1" + local _grep_pattern="$2" + local _mmin + _mmin=$(_logdelete_time_to_minutes "$LOGSCAN_TIME_INT" "$LOGSCAN_TIME_UNIT") + if [ -z "$_mmin" ] || [ "$_mmin" -le 0 ]; then + error "Could not compute time threshold in minutes." + return 1 + fi + + local _label="${LOGSCAN_TIME_INT} ${LOGSCAN_TIME_UNIT}" + if [ "$LOGSCAN_TIME_INT" -ne 1 ]; then + _label="${_label}s" + fi + + # Terminal width for truncation (0 = no truncation) + local _cols + _cols=$(tput cols 2>/dev/null) + [[ "$_cols" =~ ^[0-9]+$ ]] && [ "$_cols" -gt 0 ] || _cols=0 + + out + out "${Bold}${Cyan}Scanning local log files newer than ${_label} for ${_pattern_label} (under \$USER_DATA/Logs)...${RstClr}" + + local _logs_dir="${USER_DATA}/Logs" + if [ ! -d "$_logs_dir" ]; then + warning "Logs directory not found: ${_logs_dir}" + return 1 + fi + + # -mmin -N finds files modified within the last N minutes (newer than threshold) + local _new_files + _new_files=$(find -L "$_logs_dir" -type f -mmin -${_mmin} 2>/dev/null) + + if [ -z "$_new_files" ]; then + out " ${IBlack}No log files newer than ${_label} found.${RstClr}" + out " ${IBlack}Done.${RstClr}" + return 0 + fi + + local _file_count + _file_count=$(echo "$_new_files" | wc -l) + out " ${IBlack}Found ${_file_count} file(s) newer than ${_label}. Scanning for ${_pattern_label}...${RstClr}" + while IFS= read -r _sf_f; do + [ -z "$_sf_f" ] && continue + local _sf_base _sf_age _sf_mtime _sf_now _sf_age_secs _sf_days _sf_hours _sf_mins _sf_secs + _sf_base=$(basename "$_sf_f") + _sf_mtime=$(stat -L -c %Y "$_sf_f" 2>/dev/null) + _sf_now=$(date +%s) + _sf_age_secs=$(( _sf_now - _sf_mtime )) + [ "$_sf_age_secs" -lt 0 ] && _sf_age_secs=0 + _sf_days=$(( _sf_age_secs / 86400 )) + if [ "$_sf_days" -ge 2 ]; then + _sf_age="${_sf_days} days ago" + else + _sf_hours=$(( _sf_age_secs / 3600 )) + _sf_mins=$(( (_sf_age_secs % 3600) / 60 )) + _sf_secs=$(( _sf_age_secs % 60 )) + if [ "$_sf_hours" -gt 0 ]; then + _sf_age=$(printf "%d:%02d:%02d ago" "$_sf_hours" "$_sf_mins" "$_sf_secs") + else + _sf_age=$(printf "%d:%02d ago" "$_sf_mins" "$_sf_secs") + fi + fi + out " ${Bold}${_sf_base}${RstClr} --> less ${_sf_f} ${IBlack}(${_sf_age})${RstClr}" + done <<< "$_new_files" + + local _any_match=0 + while IFS= read -r _sf_file; do + [ -z "$_sf_file" ] && continue + local _sf_matches + _sf_matches=$(grep -a -i -E "$_grep_pattern" "$_sf_file" 2>/dev/null \ + | grep -E "^.{0,42}[0-9]{2}:[0-9]{2}:[0-9]{2}" | tail -10) + if [ -n "$_sf_matches" ]; then + _any_match=1 + + # Count by type for the file header summary + local _sf_nerr _sf_nwarn + _sf_nerr=$(echo "$_sf_matches" | grep -ci "ERROR" 2>/dev/null) + _sf_nwarn=$(echo "$_sf_matches" | grep -vi "ERROR" | grep -ci "WARN" 2>/dev/null) + + local _sf_summary="" + [ "$_sf_nerr" -gt 0 ] && _sf_summary="${_sf_summary} ERROR:${_sf_nerr}" + [ "$_sf_nwarn" -gt 0 ] && _sf_summary="${_sf_summary} WARN:${_sf_nwarn}" + + out + out " ${Bold}${Yellow}${_sf_file}${RstClr} ${Bold}[${_sf_summary# }]${RstClr}" + + # Enumerate each matching line 0-9 with type label, truncated to terminal width + local _sf_idx=0 + # Prefix " N [TYPE] " is always 12 chars: 2 + 1(digit) + 1 + 1([) + 5(TYPE) + 1(]) + 1( ) + # Additional 6-char offset accounts for terminal rendering overhead + local _sf_prefix_width=18 + while IFS= read -r _sf_line; do + # Determine type label from line content (case-insensitive) + local _sf_type + if echo "$_sf_line" | grep -qi "ERROR"; then + _sf_type="ERROR" + elif echo "$_sf_line" | grep -qi "WARN"; then + _sf_type="WARN " + else + _sf_type=" " + fi + + local _sf_prefix=" ${_sf_idx} [${_sf_type}] " + local _sf_content="$_sf_line" + if [ "$_cols" -gt 0 ]; then + local _sf_max=$(( _cols - _sf_prefix_width )) + [ "$_sf_max" -lt 10 ] && _sf_max=10 + _sf_content="${_sf_line:0:${_sf_max}}" + fi + echo "${_sf_prefix}${_sf_content}" + _sf_idx=$(( _sf_idx + 1 )) + done <<< "$_sf_matches" + fi + done <<< "$_new_files" + if [ "$_any_match" -eq 0 ]; then + out " ${IBlack}No ${_pattern_label} lines found in recent log files.${RstClr}" + fi + + out " ${IBlack}Done.${RstClr}" +} +export -f _logscan_display_local + +launchOTSLogFind() { ############################################################################## XDAQ_OTS_XML="ots" #used for normal mode launch as copy location for stable script launching @@ -2408,17 +3014,589 @@ launchOTSLogFind() { done < ${XDAQ_CONFIGURATION_DATA_PATH}/${XDAQ_OTS_XML}.xml + ############################################################################## + # Display log files recently written to under $USER_DATA/Logs on all hosts + # - Always search locally on THIS_HOST (in case context config has changed). + # - Also search each remote host from the XML via: + # ssh -K host "cd && source ./Fast_ots_setup.sh logs" + # which sources the environment and runs 'ots -lll' (local-only log find leaf). + # - ots -lll outputs machine-parseable lines: LOGFIND_LOCAL:: + # - Escalating time window is handled by ots -lll on each host independently. + ############################################################################## out #blank line out #blank line + + #-- helper: format file age as human-readable string -- + # < 1 hour: MM:SS ago e.g. "30:00 ago" + # < 2 days: HH:MM:SS ago e.g. "12:32:21 ago" + # >= 2 days: N days ago e.g. "3 days ago" + _logfind_file_age() { + local file_path="$1" + local file_mtime now_epoch age_secs days hours mins secs + file_mtime=$(stat -L -c %Y "$file_path" 2>/dev/null) + if [ -z "$file_mtime" ]; then echo ""; return; fi + now_epoch=$(date +%s) + age_secs=$(( now_epoch - file_mtime )) + if [ $age_secs -lt 0 ]; then age_secs=0; fi + days=$(( age_secs / 86400 )) + if [ $days -ge 2 ]; then + echo "${days} days ago" + else + hours=$(( age_secs / 3600 )) + mins=$(( (age_secs % 3600) / 60 )) + secs=$(( age_secs % 60 )) + if [ $hours -gt 0 ]; then + printf "%d:%02d:%02d ago" $hours $mins $secs + else + printf "%02d:%02d ago" $mins $secs + fi + fi + } + + #-- helper: check if a hostname refers to the local machine -- + # Matches THIS_HOST exactly, THIS_HOST.domain, localhost, + # or THIS_HOST with -data/-ipmi suffix (private network aliases for same machine) + _logfind_is_local() { + local h="$1" + # direct matches + if [[ "$h" == "${THIS_HOST}" || "$h" == "${THIS_HOST}."* || "$h" == "localhost" ]]; then + return 0 + fi + # strip -data or -ipmi suffix and check again + local h_stripped="${h/-data/}" + h_stripped="${h_stripped/-ipmi/}" + if [[ "$h_stripped" == "${THIS_HOST}" || "$h_stripped" == "${THIS_HOST}."* ]]; then + return 0 + fi + return 1 + } + + #-- build unique host list (avoid duplicates) -- + declare -A _logfind_seen_hosts + unset _logfind_remote_hosts + _logfind_remote_hosts=() + + # always include the local host (mark as seen so it won't appear in remote list) + _logfind_seen_hosts["${THIS_HOST}"]=1 + + # add gateway host (if remote) + if [ -n "$gatewayHostname" ]; then + if [ -z "${_logfind_seen_hosts[$gatewayHostname]+x}" ]; then + if _logfind_is_local "$gatewayHostname"; then + _logfind_seen_hosts["$gatewayHostname"]=1 + else + _logfind_seen_hosts["$gatewayHostname"]=1 + _logfind_remote_hosts+=("$gatewayHostname") + fi + fi + fi + # add non-gateway hosts (if remote) + for _lf_host in "${xdaqHost[@]}"; do + if [ -z "${_logfind_seen_hosts[$_lf_host]+x}" ]; then + if _logfind_is_local "$_lf_host"; then + _logfind_seen_hosts["$_lf_host"]=1 + else + _logfind_seen_hosts["$_lf_host"]=1 + _logfind_remote_hosts+=("$_lf_host") + fi + fi + done + unset _logfind_seen_hosts + + out "${Bold}${Cyan}Searching for recently written log files (under \$USER_DATA/Logs)...${RstClr}" + out " ${IBlack}Hosts to search: ${THIS_HOST} (local)${_logfind_remote_hosts[@]:+ ${_logfind_remote_hosts[*]} (remote)}${RstClr}" + + #-- Step 1a: Local search in $USER_DATA/Logs using escalating time windows -- + out " ${IBlack}[${THIS_HOST}] Searching locally in \$USER_DATA/Logs = ${USER_DATA}/Logs ...${RstClr}" + + _logfind_local_label="" + _logfind_local_files="" + for _ll_window in "10 minutes:10" "1 hour:60" "1 day:1440" "1 week:10080" "2 weeks:20160" "1 month:43200" "1 year:525600"; do + _ll_label="${_ll_window%%:*}" + _ll_mmin="${_ll_window##*:}" + _ll_files=$(find -L "${USER_DATA}/Logs" -maxdepth 5 -type f -mmin -${_ll_mmin} 2>/dev/null) + if [ -n "$_ll_files" ]; then + _logfind_local_label="$_ll_label" + _logfind_local_files="$_ll_files" + out " ${IBlack}[${THIS_HOST}] Found log files within last ${_ll_label}.${RstClr}" + break + fi + done + if [ -z "$_logfind_local_files" ]; then + out " ${IBlack}[${THIS_HOST}] No log files found in the last year.${RstClr}" + fi + + #-- Step 1b: Search for action handler logs in ots_dir/tmp/ -- + out " ${IBlack}[${THIS_HOST}] Searching for action handler logs in ${PWD}/tmp/ ...${RstClr}" + + _logfind_action_label="" + _logfind_action_files="" + for _ll_window in "10 minutes:10" "1 hour:60" "1 day:1440" "1 week:10080" "2 weeks:20160" "1 month:43200" "1 year:525600"; do + _ll_label="${_ll_window%%:*}" + _ll_mmin="${_ll_window##*:}" + _ll_files=$(find -L "${PWD}/tmp" -maxdepth 1 -type f -name "otsActionsHandler_${OTS_SETUP_TYPE}-${THIS_HOST}*" -mmin -${_ll_mmin} 2>/dev/null) + if [ -n "$_ll_files" ]; then + _logfind_action_label="$_ll_label" + _logfind_action_files="$_ll_files" + out " ${IBlack}[${THIS_HOST}] Found action handler logs within last ${_ll_label}.${RstClr}" + break + fi + done + if [ -z "$_logfind_action_files" ]; then + out " ${IBlack}[${THIS_HOST}] No action handler logs found in the last year.${RstClr}" + fi + + #-- Step 1c: Search for ARTDAQ configuration files in $USER_DATA/ARTDAQConfigurations (excluding run_records) -- + out " ${IBlack}[${THIS_HOST}] Searching for ARTDAQ config files in \$USER_DATA/ARTDAQConfigurations ...${RstClr}" + + _logfind_artdaq_label="" + _logfind_artdaq_files="" + for _ll_window in "10 minutes:10" "1 hour:60" "1 day:1440" "1 week:10080" "2 weeks:20160" "1 month:43200" "1 year:525600"; do + _ll_label="${_ll_window%%:*}" + _ll_mmin="${_ll_window##*:}" + _ll_files=$(find -L "${USER_DATA}/ARTDAQConfigurations" -maxdepth 5 -not -path "*/run_records/*" -not -path "*/run_records" -type f -mmin -${_ll_mmin} 2>/dev/null) + if [ -n "$_ll_files" ]; then + _logfind_artdaq_label="$_ll_label" + _logfind_artdaq_files="$_ll_files" + out " ${IBlack}[${THIS_HOST}] Found ARTDAQ config files within last ${_ll_label}.${RstClr}" + break + fi + done + if [ -z "$_logfind_artdaq_files" ]; then + out " ${IBlack}[${THIS_HOST}] No ARTDAQ config files found in the last year.${RstClr}" + fi + + #-- Step 1d: Search for ARTDAQ run records in the LATEST run_records folder only -- + out " ${IBlack}[${THIS_HOST}] Searching for latest ARTDAQ run record in \$USER_DATA/ARTDAQConfigurations/run_records ...${RstClr}" + + _logfind_runrecords_label="" + _logfind_runrecords_files="" + _logfind_runrecords_dir="" + + # find the latest (highest numbered) run record directory + _logfind_latest_run_dir=$(find -L "${USER_DATA}/ARTDAQConfigurations/run_records" -maxdepth 1 -mindepth 1 -type d 2>/dev/null | sort -t/ -k$(echo "${USER_DATA}/ARTDAQConfigurations/run_records/x" | tr -cd '/' | wc -c) -n | tail -1) + + if [ -n "$_logfind_latest_run_dir" ]; then + _logfind_runrecords_dir="$_logfind_latest_run_dir" + _logfind_run_number=$(basename "$_logfind_latest_run_dir") + out " ${IBlack}[${THIS_HOST}] Latest run record directory: ${_logfind_run_number}${RstClr}" + + _logfind_runrecords_files=$(find -L "$_logfind_latest_run_dir" -maxdepth 1 -type f 2>/dev/null) + if [ -n "$_logfind_runrecords_files" ]; then + # get the age label from the directory's most recent file + _logfind_runrecords_label="" + for _ll_window in "10 minutes:10" "1 hour:60" "1 day:1440" "1 week:10080" "2 weeks:20160" "1 month:43200" "1 year:525600"; do + _ll_label="${_ll_window%%:*}" + _ll_mmin="${_ll_window##*:}" + _ll_check=$(find -L "$_logfind_latest_run_dir" -maxdepth 1 -type f -mmin -${_ll_mmin} 2>/dev/null | head -1) + if [ -n "$_ll_check" ]; then + _logfind_runrecords_label="$_ll_label" + break + fi + done + if [ -z "$_logfind_runrecords_label" ]; then + _logfind_runrecords_label="over 1 year" + fi + out " ${IBlack}[${THIS_HOST}] Found run record ${_logfind_run_number} files (last ${_logfind_runrecords_label}).${RstClr}" + fi + fi + if [ -z "$_logfind_runrecords_files" ]; then + out " ${IBlack}[${THIS_HOST}] No ARTDAQ run records found.${RstClr}" + fi + + #-- Step 2: Remote search on each remote host via ssh + fast_ots_setup.sh artdaqlogs -- + # Runs 'ots -ll' on the remote host (via fast_ots_setup.sh artdaqlogs), which + # does local log find + ARTDAQ host search on that host. We parse the standard + # output for file paths (lines containing 'less /') and the time label + # (lines containing '(local, last '). + # Skipped when LOGFIND_LOCAL_ONLY=1 (i.e. when called from ots -lll) + declare -A _logfind_remote_results # key=host, value=newline-separated file list + declare -A _logfind_remote_labels # key=host, value=time label + + if [ "${LOGFIND_LOCAL_ONLY:-0}" == "1" ]; then + out " ${IBlack}(skipping remote host search -- local-only mode)${RstClr}" + else + _lf_ssh_cmd="{ cd '${OTS_SETUP_DIR}/otsdaq-mu2e-config' && source ./Fast_ots_setup.sh '${OTS_SETUP_TYPE}' artdaqlogs; } 2>&1" + _lf_tmpdir=$(mktemp -d /tmp/ots_logfind_XXXXXX 2>/dev/null || { mkdir -p "/tmp/ots_logfind_$$" && echo "/tmp/ots_logfind_$$"; }) + + declare -A _lf_orig_pids # logical_host -> orig SSH subshell PID + declare -A _lf_alt_pids # logical_host -> alt SSH subshell PID + declare -A _lf_alt_hostmap # logical_host -> alt hostname + + # Launch all SSH jobs in parallel -- original hostname and stripped alt simultaneously + for _lf_host in "${_logfind_remote_hosts[@]}"; do + # sanitize hostname for use as a filename (replace non-alphanumeric with _) + _lf_safe_host="${_lf_host//[^a-zA-Z0-9._-]/_}" + + _lf_alt_host="" + if [[ "$_lf_host" == *"-data"* ]]; then + _lf_alt_host="${_lf_host/-data/}" + elif [[ "$_lf_host" == *"-ipmi"* ]]; then + _lf_alt_host="${_lf_host/-ipmi/}" + fi + + out " ${IBlack}[${_lf_host}] Launching parallel SSH search...${RstClr}" + + # Original hostname -- subshell writes output and rc to temp files + ( ssh -K -o ConnectTimeout=10 -o BatchMode=yes "$_lf_host" \ + "$_lf_ssh_cmd" > "${_lf_tmpdir}/${_lf_safe_host}.orig.out" 2>/dev/null + echo $? > "${_lf_tmpdir}/${_lf_safe_host}.orig.rc" ) & + _lf_orig_pids["$_lf_host"]=$! + + # Stripped alt hostname (if applicable) -- launched simultaneously + if [ -n "$_lf_alt_host" ]; then + _lf_alt_hostmap["$_lf_host"]="$_lf_alt_host" + ( ssh -K -o ConnectTimeout=10 -o BatchMode=yes "$_lf_alt_host" \ + "$_lf_ssh_cmd" > "${_lf_tmpdir}/${_lf_safe_host}.alt.out" 2>/dev/null + echo $? > "${_lf_tmpdir}/${_lf_safe_host}.alt.rc" ) & + _lf_alt_pids["$_lf_host"]=$! + fi + done + + out " ${IBlack}Waiting for ${#_logfind_remote_hosts[@]} parallel SSH searches to complete...${RstClr}" + + # Wait for all background jobs to finish (in original host order to preserve display order) + for _lf_host in "${_logfind_remote_hosts[@]}"; do + [ -n "${_lf_orig_pids[$_lf_host]+x}" ] && wait "${_lf_orig_pids[$_lf_host]}" 2>/dev/null + [ -n "${_lf_alt_pids[$_lf_host]+x}" ] && wait "${_lf_alt_pids[$_lf_host]}" 2>/dev/null + done + + # Parse results -- for each logical host, pick the first successful response + # (original wins over alt if both succeed; whichever's rc=0 is used) + for _lf_host in "${_logfind_remote_hosts[@]}"; do + _lf_safe_host="${_lf_host//[^a-zA-Z0-9._-]/_}" + _lf_alt_host="${_lf_alt_hostmap[$_lf_host]:-}" + + _lf_ssh_output="" + _lf_ssh_host_used="" + + _lf_rc_orig=$(cat "${_lf_tmpdir}/${_lf_safe_host}.orig.rc" 2>/dev/null) + if [ "${_lf_rc_orig}" = "0" ]; then + _lf_ssh_output=$(cat "${_lf_tmpdir}/${_lf_safe_host}.orig.out" 2>/dev/null) + _lf_ssh_host_used="$_lf_host" + elif [ -n "$_lf_alt_host" ]; then + _lf_rc_alt=$(cat "${_lf_tmpdir}/${_lf_safe_host}.alt.rc" 2>/dev/null) + if [ "${_lf_rc_alt}" = "0" ]; then + _lf_ssh_output=$(cat "${_lf_tmpdir}/${_lf_safe_host}.alt.out" 2>/dev/null) + _lf_ssh_host_used="$_lf_alt_host" + fi + fi + + # Clean up temp files for this host + rm -f "${_lf_tmpdir}/${_lf_safe_host}.orig.out" "${_lf_tmpdir}/${_lf_safe_host}.orig.rc" \ + "${_lf_tmpdir}/${_lf_safe_host}.alt.out" "${_lf_tmpdir}/${_lf_safe_host}.alt.rc" 2>/dev/null + + if [ -z "$_lf_ssh_host_used" ]; then + if [ -n "$_lf_alt_host" ]; then + out " ${Red}[${_lf_host}] SSH failed to both ${_lf_host} and ${_lf_alt_host}. Skipping.${RstClr}" + else + out " ${Red}[${_lf_host}] SSH failed. Skipping.${RstClr}" + fi + continue + fi + + out " ${IBlack}[${_lf_ssh_host_used}] SSH succeeded. Parsing remote ots -ll output...${RstClr}" + + # Parse the standard display output from remote ots -ll: + # Time label from line matching: (local, last