From 99bad0f48b38f5f918764c9d0d8488017bda1c81 Mon Sep 17 00:00:00 2001 From: rrivera Date: Thu, 7 May 2026 11:28:31 -0500 Subject: [PATCH 01/15] Adding retransmission mode to an acknowledge() --- otsdaq/GatewaySupervisor/GatewaySupervisor.cc | 89 +-- otsdaq/NetworkUtilities/TransceiverSocket.cc | 593 +++++++++++++++++- otsdaq/NetworkUtilities/TransceiverSocket.h | 40 +- 3 files changed, 637 insertions(+), 85 deletions(-) diff --git a/otsdaq/GatewaySupervisor/GatewaySupervisor.cc b/otsdaq/GatewaySupervisor/GatewaySupervisor.cc index daae95ca..ced59208 100644 --- a/otsdaq/GatewaySupervisor/GatewaySupervisor.cc +++ b/otsdaq/GatewaySupervisor/GatewaySupervisor.cc @@ -2797,15 +2797,18 @@ void GatewaySupervisor::SendRemoteGatewayCommand( Socket gatewayRemoteSocket(parsedFields[1], atoi(parsedFields[2].c_str())); - std::string commandResponseString = remoteGatewaySocket->sendAndReceive( + // Use retransmission-mode sendAndReceiveAll for reliable multi-packet + // config dump transfer. This replaces the old sendAndReceive + manual + // receive loop, providing automatic packet ordering, dropped packet + // detection, and retransmit requests. + std::string commandResponseString = remoteGatewaySocket->sendAndReceiveAll( gatewayRemoteSocket, command, 10 /*timeoutSeconds*/, - 0, - false, - 200000 /*interPacketTimeoutUSeconds=200ms*/); + 10 /*retransmitMaxRetries*/, + false /*verbose*/); __COUT__ << "Response from subsystem '" << remoteGatewayApp.appInfo.name - << "' received: " << commandResponseString << __E__; + << "' received: " << commandResponseString.size() << " bytes" << __E__; size_t donePos = commandResponseString.find("Done"); if(donePos != 0) //then error @@ -2860,58 +2863,26 @@ void GatewaySupervisor::SendRemoteGatewayCommand( if(commandResponseString.size() > strlen("Done") + 1) { - //make sure we received everything - const size_t MAX_RETRIES = 10; - size_t tryCnt = 0; - while(++tryCnt < 20 && - commandResponseString.size() > 10 && //must end with 'END---' - (commandResponseString[commandResponseString.size() - 1] != '-' || - commandResponseString[commandResponseString.size() - 2] != '-' || - commandResponseString[commandResponseString.size() - 3] != '-' || - commandResponseString[commandResponseString.size() - 4] != 'D' || - commandResponseString[commandResponseString.size() - 5] != 'N' || - commandResponseString[commandResponseString.size() - 6] != 'E')) - { - __COUT__ << "There must be more, try = " << tryCnt << __E__; - std::string more; - if(remoteGatewaySocket->receive(more, 1 /*timeoutSeconds*/) == - 0 /* success */) - { - commandResponseString += more; - tryCnt = 0; //reset since we received data - } - else if(tryCnt >= MAX_RETRIES) - { - __SS__ << "Timeout after " << MAX_RETRIES - << " attempts waiting for more data from Remote Gateway '" - << remoteGatewayApp.appInfo.name - << "' (URL=" << remoteGatewayApp.appInfo.url << "). "; - if(commandResponseString.empty()) - { - ss << "No data was received at all."; - } - else - { - const size_t maxPrint = 500; - ss << "Received " << commandResponseString.size() - << " bytes so far. "; - if(commandResponseString.size() <= maxPrint) - { - ss << "Full received text: [" << commandResponseString << "]"; - } - else - { - ss << "First " << maxPrint << " chars: [" - << commandResponseString.substr(0, maxPrint) - << "] ... Last " << maxPrint << " chars: [" - << commandResponseString.substr( - commandResponseString.size() - maxPrint) - << "]"; - } - } - ss << __E__; - __SS_THROW__; - } + // With retransmission mode, the full response is already assembled + // by sendAndReceiveAll(). Verify the END--- marker is present. + if(commandResponseString.size() > 10 && + !commandResponseString.ends_with("END---")) + { + __SS__ << "Config dump response from Remote Gateway '" + << remoteGatewayApp.appInfo.name + << "' is missing END--- termination marker. " + << "Received " << commandResponseString.size() + << " bytes." << __E__; + const size_t maxPrint = 500; + if(commandResponseString.size() <= maxPrint) + ss << " Full text: [" << commandResponseString << "]"; + else + ss << " Last " << maxPrint << " chars: [" + << commandResponseString.substr( + commandResponseString.size() - maxPrint) + << "]"; + ss << __E__; + __SS_THROW__; } //assume have config dump response! @@ -4450,7 +4421,9 @@ void GatewaySupervisor::StateChangerWorkLoop(GatewaySupervisor* theSupervisor) : "" //append extra done content, if any ), true /* verbose */, - extraDoneContent.size() ? 65500 : 1500 /*maxChunkSize*/); + extraDoneContent.size() ? 65500 : 1500 /*maxChunkSize*/, + 0 /*interPacketGapUSeconds*/, + extraDoneContent.size() > 0 /*enableRetransmission - use retransmit protocol for large config dump transfers*/); } } catch(...) diff --git a/otsdaq/NetworkUtilities/TransceiverSocket.cc b/otsdaq/NetworkUtilities/TransceiverSocket.cc index 607d2720..f8131040 100644 --- a/otsdaq/NetworkUtilities/TransceiverSocket.cc +++ b/otsdaq/NetworkUtilities/TransceiverSocket.cc @@ -2,11 +2,16 @@ #include "otsdaq/Macros/CoutMacros.h" #include "otsdaq/MessageFacility/MessageFacility.h" +#include #include #include +#include #include +#include #include +#include #include +#include using namespace ots; @@ -28,10 +33,15 @@ TransceiverSocket::~TransceiverSocket(void) {} //============================================================================== /// returns 0 on success -int TransceiverSocket::acknowledge(const std::string& buffer, - bool verbose /* = false */, - size_t maxChunkSize /* = 1500 */, - unsigned int interPacketGapUSeconds /* = 0 */) +/// When enableRetransmission is true, each sent packet is prepended with an +/// 8-byte retransmission header so the receiver can detect dropped packets and +/// request retransmission. +int TransceiverSocket::acknowledge( + const std::string& buffer, + bool verbose /* = false */, + size_t maxChunkSize /* = 1500 */, + unsigned int interPacketGapUSeconds /* = 0 */, + bool enableRetransmission /* = false */) { // lockout other senders for the remainder of the scope std::lock_guard lock(sendMutex_); @@ -40,39 +50,218 @@ int TransceiverSocket::acknowledge(const std::string& buffer, __COUTT__ << "Acknowledging on Socket Descriptor #: " << socketNumber_ << " from-port: " << ntohs(socketAddress_.sin_port) << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port) + << " retransmission: " << (enableRetransmission ? "ON" : "OFF") << std::endl; const size_t MAX_SEND_SIZE = maxChunkSize > 65500u ? static_cast(65500u) : maxChunkSize; - size_t offset = 0; - int sendToSize = 1; - int sizeInBytes = 1; + if(!enableRetransmission) + { + //==================================================================== + // Original non-retransmission mode (unchanged behavior) + //==================================================================== + size_t offset = 0; + int sendToSize = 1; + int sizeInBytes = 1; + + while(offset < buffer.size() && sendToSize > 0) + { + auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE + ? MAX_SEND_SIZE + : sizeInBytes * (buffer.size() - offset); + if(verbose) + __COUTTV__(thisSize); + sendToSize = sendto(socketNumber_, + &buffer[0] + offset, + thisSize, + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + offset += sendToSize / sizeInBytes; + if(interPacketGapUSeconds > 0 && offset < buffer.size() && sendToSize > 0) + usleep(interPacketGapUSeconds); + } + + if(sendToSize <= 0) + { + __SS__ << "Error writing buffer from port " + << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": " + << strerror(errno) << std::endl; + __SS_THROW__; + } + return 0; + } + + //==================================================================== + // Retransmission mode: prepend 8-byte header to each packet + // [0-1] magic 0xD2C4 (network byte order) + // [2-3] packet index (network byte order uint16) + // [4-5] total packets (network byte order uint16) + // [6-7] payload size (network byte order uint16) + //==================================================================== + + // The payload per packet is reduced by the header size + const size_t payloadMax = + MAX_SEND_SIZE > RETRANSMIT_HEADER_SIZE + ? MAX_SEND_SIZE - RETRANSMIT_HEADER_SIZE + : 1; + + // Calculate total number of packets + uint16_t totalPackets = static_cast( + (buffer.size() + payloadMax - 1) / payloadMax); + if(totalPackets == 0) + totalPackets = 1; // send at least one packet even for empty buffer + + if(verbose) + __COUT__ << "Retransmission mode: sending " << totalPackets + << " packets for " << buffer.size() << " bytes, payloadMax=" + << payloadMax << __E__; + + // Build and cache all packets so they can be re-sent on retransmit request + std::vector packets(totalPackets); + size_t offset = 0; + for(uint16_t pi = 0; pi < totalPackets; ++pi) + { + size_t payloadSize = (buffer.size() - offset) > payloadMax + ? payloadMax + : (buffer.size() - offset); + + // Build the 8-byte header + char header[RETRANSMIT_HEADER_SIZE]; + uint16_t netMagic = htons(RETRANSMIT_MAGIC); + uint16_t netIndex = htons(pi); + uint16_t netTotal = htons(totalPackets); + uint16_t netPaySize = htons(static_cast(payloadSize)); + std::memcpy(header + 0, &netMagic, 2); + std::memcpy(header + 2, &netIndex, 2); + std::memcpy(header + 4, &netTotal, 2); + std::memcpy(header + 6, &netPaySize, 2); + + packets[pi].assign(header, RETRANSMIT_HEADER_SIZE); + packets[pi].append(buffer, offset, payloadSize); - while(offset < buffer.size() && sendToSize > 0) + offset += payloadSize; + } + + // Send all packets initially + for(uint16_t pi = 0; pi < totalPackets; ++pi) { - auto thisSize = sizeInBytes * (buffer.size() - offset) > MAX_SEND_SIZE - ? MAX_SEND_SIZE - : sizeInBytes * (buffer.size() - offset); + int sendToSize = sendto(socketNumber_, + packets[pi].data(), + packets[pi].size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + if(sendToSize <= 0) + { + __SS__ << "Error writing retransmit packet " << pi << " from port " + << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": " + << strerror(errno) << std::endl; + __SS_THROW__; + } if(verbose) - __COUTTV__(thisSize); - sendToSize = sendto(socketNumber_, - &buffer[0] + offset, - thisSize, - 0, - (struct sockaddr*)&(ReceiverSocket::fromAddress_), - sizeof(sockaddr_in)); - offset += sendToSize / sizeInBytes; - if(interPacketGapUSeconds > 0 && offset < buffer.size() && sendToSize > 0) + __COUTT__ << "Sent retransmit packet " << pi << "/" << totalPackets + << " size=" << packets[pi].size() << std::endl; + + if(interPacketGapUSeconds > 0 && pi + 1 < totalPackets) usleep(interPacketGapUSeconds); } - if(sendToSize <= 0) + // Now wait for potential retransmit requests from receiver. + // A retransmit request is a packet starting with the magic marker 0xD2C4 + // followed by a list of uint16_t packet indices that need to be resent. + // A "done" acknowledgment from the receiver is a packet starting with magic + // followed by 0xFFFF (meaning "all received, done"). + const unsigned int retransmitTimeoutSeconds = 5; + const unsigned int maxRetransmitRounds = 20; + + for(unsigned int round = 0; round < maxRetransmitRounds; ++round) { - __SS__ << "Error writing buffer from port " - << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": " - << strerror(errno) << std::endl; - __SS_THROW__; //return -1; + std::string retransmitRequest; + int rc = receive(retransmitRequest, + retransmitTimeoutSeconds, + 0 /*timeoutUSeconds*/, + false /*verbose*/); + if(rc < 0) + { + // Timeout - assume receiver got everything (or gave up) + if(verbose) + __COUT__ << "No retransmit request received after " + << retransmitTimeoutSeconds + << "s timeout, assuming all packets received." << __E__; + break; + } + + // Check if this is a valid retransmit request (must start with magic) + if(retransmitRequest.size() >= 4) + { + uint16_t reqMagic; + std::memcpy(&reqMagic, retransmitRequest.data(), 2); + reqMagic = ntohs(reqMagic); + + if(reqMagic == RETRANSMIT_MAGIC) + { + // Check for "done" signal (magic + 0xFFFF) + if(retransmitRequest.size() >= 4) + { + uint16_t firstIndex; + std::memcpy(&firstIndex, retransmitRequest.data() + 2, 2); + firstIndex = ntohs(firstIndex); + if(firstIndex == 0xFFFF) + { + if(verbose) + __COUT__ << "Received 'all done' from receiver." << __E__; + break; + } + } + + // Parse list of missing packet indices and resend them + size_t numIndices = + (retransmitRequest.size() - 2) / 2; // skip 2-byte magic + if(verbose) + __COUT__ << "Retransmit request for " << numIndices + << " packets (round " << round << ")." << __E__; + + for(size_t i = 0; i < numIndices; ++i) + { + uint16_t missingIdx; + std::memcpy( + &missingIdx, retransmitRequest.data() + 2 + i * 2, 2); + missingIdx = ntohs(missingIdx); + + if(missingIdx < totalPackets) + { + int sendToSize = + sendto(socketNumber_, + packets[missingIdx].data(), + packets[missingIdx].size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + if(sendToSize <= 0) + { + __SS__ << "Error resending retransmit packet " + << missingIdx << ": " << strerror(errno) + << std::endl; + __SS_THROW__; + } + if(verbose) + __COUTT__ << "Resent packet " << missingIdx << std::endl; + + if(interPacketGapUSeconds > 0) + usleep(interPacketGapUSeconds); + } + else + { + __COUT_WARN__ + << "Retransmit request for invalid packet index " + << missingIdx << " (total=" << totalPackets << ")" + << __E__; + } + } + } + } } return 0; @@ -155,3 +344,357 @@ std::string TransceiverSocket::sendAndReceive( return receiveBuffer; } //end sendAndReceive() + +//============================================================================== +/// receiveAll() receives a multi-packet retransmission-mode response. +/// Packets are expected to have an 8-byte retransmission header: +/// [0-1] magic 0xD2C4 (network byte order) +/// [2-3] packet index (network byte order uint16) +/// [4-5] total packets (network byte order uint16) +/// [6-7] payload size (network byte order uint16) +/// +/// After all initial packets are received (or timeout), missing packets are +/// identified and a retransmit request is sent back to the sender containing +/// the magic marker followed by the list of missing packet indices. This +/// repeats up to retransmitMaxRetries times. When all packets are received, +/// a "done" signal (magic + 0xFFFF) is sent to the sender. +/// +/// Returns 0 on success (assembled buffer placed in 'buffer'), -1 on failure. +int TransceiverSocket::receiveAll(std::string& buffer, + unsigned int timeoutSeconds /* = 5 */, + unsigned int retransmitMaxRetries /* = 10 */, + bool verbose /* = false */) +{ + using clock = std::chrono::steady_clock; + auto start = clock::now(); + + // Map of packet index -> payload data + std::map receivedPackets; + uint16_t totalPackets = 0; + bool totalKnown = false; + + if(verbose) + __COUT__ << "receiveAll: waiting for retransmission-mode packets, timeout=" + << timeoutSeconds << "s" << __E__; + + // Phase 1: Receive all initial packets until timeout + // Use a per-packet timeout that is shorter than the overall timeout, + // so we can detect "no more packets arriving" vs "still waiting for first" + const unsigned int interPacketTimeoutUSeconds = 100000; // 100ms between packets + bool firstPacketReceived = false; + + while(true) + { + std::string rawPacket; + int rc = receive(rawPacket, + firstPacketReceived ? 0 : timeoutSeconds, + firstPacketReceived ? interPacketTimeoutUSeconds : 0, + false /*verbose*/); + + if(rc < 0) + { + if(!firstPacketReceived) + { + // Never received any packet at all + if(verbose) + __COUT__ << "receiveAll: timeout waiting for first packet after " + << timeoutSeconds << "s" << __E__; + return -1; + } + // Timeout between packets - move to retransmit phase + break; + } + + // Check for retransmission header + if(rawPacket.size() < RETRANSMIT_HEADER_SIZE) + { + // Too small to be a retransmission packet - might be a non-retransmit + // response; just return it as-is + if(!firstPacketReceived) + { + buffer = rawPacket; + return 0; + } + // Skip malformed packet during multi-packet receive + if(verbose) + __COUT_WARN__ << "receiveAll: skipping undersized packet (" + << rawPacket.size() << " bytes)" << __E__; + continue; + } + + // Parse header + uint16_t magic, packetIndex, pktTotal, payloadSize; + std::memcpy(&magic, rawPacket.data() + 0, 2); + std::memcpy(&packetIndex, rawPacket.data() + 2, 2); + std::memcpy(&pktTotal, rawPacket.data() + 4, 2); + std::memcpy(&payloadSize, rawPacket.data() + 6, 2); + magic = ntohs(magic); + packetIndex = ntohs(packetIndex); + pktTotal = ntohs(pktTotal); + payloadSize = ntohs(payloadSize); + + if(magic != RETRANSMIT_MAGIC) + { + // Not a retransmission packet - if first packet, return as-is + if(!firstPacketReceived) + { + buffer = rawPacket; + return 0; + } + if(verbose) + __COUT_WARN__ << "receiveAll: skipping packet with bad magic 0x" + << std::hex << magic << std::dec << __E__; + continue; + } + + firstPacketReceived = true; + totalPackets = pktTotal; + totalKnown = true; + + // Extract payload (everything after the 8-byte header, limited by payloadSize) + size_t actualPayload = rawPacket.size() - RETRANSMIT_HEADER_SIZE; + if(actualPayload > payloadSize) + actualPayload = payloadSize; + + receivedPackets[packetIndex] = + rawPacket.substr(RETRANSMIT_HEADER_SIZE, actualPayload); + + if(verbose) + __COUTT__ << "receiveAll: received packet " << packetIndex << "/" + << totalPackets << " payload=" << actualPayload << " total_received=" + << receivedPackets.size() << std::endl; + + // Check if we have all packets + if(totalKnown && + receivedPackets.size() >= static_cast(totalPackets)) + break; + + // Check overall timeout + auto elapsed = std::chrono::duration_cast( + clock::now() - start); + if(elapsed.count() >= static_cast(timeoutSeconds * (retransmitMaxRetries + 1))) + { + if(verbose) + __COUT_WARN__ << "receiveAll: overall timeout reached" << __E__; + break; + } + } + + // Phase 2: Retransmit missing packets + if(totalKnown && + receivedPackets.size() < static_cast(totalPackets)) + { + for(unsigned int retry = 0; retry < retransmitMaxRetries; ++retry) + { + // Build list of missing packet indices + std::set missing; + for(uint16_t i = 0; i < totalPackets; ++i) + { + if(receivedPackets.find(i) == receivedPackets.end()) + missing.insert(i); + } + + if(missing.empty()) + break; + + if(verbose) + __COUT__ << "receiveAll: retry " << retry + 1 << "/" + << retransmitMaxRetries << ", requesting retransmit of " + << missing.size() << " packets" << __E__; + + // Build retransmit request: magic(2 bytes) + list of uint16 indices + std::string retransmitReq; + retransmitReq.resize(2 + missing.size() * 2); + uint16_t netMagic = htons(RETRANSMIT_MAGIC); + std::memcpy(&retransmitReq[0], &netMagic, 2); + size_t pos = 2; + for(uint16_t idx : missing) + { + uint16_t netIdx = htons(idx); + std::memcpy(&retransmitReq[pos], &netIdx, 2); + pos += 2; + } + + // Send retransmit request back to sender (acknowledge to last receive addr) + { + // Use sendto directly to fromAddress_ (the sender) + int sendToSize = sendto(socketNumber_, + retransmitReq.data(), + retransmitReq.size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + if(sendToSize <= 0) + { + __COUT_WARN__ << "receiveAll: failed to send retransmit request: " + << strerror(errno) << __E__; + } + } + + // Receive retransmitted packets + while(true) + { + std::string rawPacket; + int rc = receive(rawPacket, + timeoutSeconds, + 0 /*timeoutUSeconds*/, + false /*verbose*/); + if(rc < 0) + break; // timeout, will retry + + if(rawPacket.size() < RETRANSMIT_HEADER_SIZE) + continue; + + uint16_t magic2, packetIndex2, pktTotal2, payloadSize2; + std::memcpy(&magic2, rawPacket.data() + 0, 2); + std::memcpy(&packetIndex2, rawPacket.data() + 2, 2); + std::memcpy(&pktTotal2, rawPacket.data() + 4, 2); + std::memcpy(&payloadSize2, rawPacket.data() + 6, 2); + magic2 = ntohs(magic2); + packetIndex2 = ntohs(packetIndex2); + pktTotal2 = ntohs(pktTotal2); + payloadSize2 = ntohs(payloadSize2); + + if(magic2 != RETRANSMIT_MAGIC) + continue; + + size_t actualPayload2 = rawPacket.size() - RETRANSMIT_HEADER_SIZE; + if(actualPayload2 > payloadSize2) + actualPayload2 = payloadSize2; + + receivedPackets[packetIndex2] = + rawPacket.substr(RETRANSMIT_HEADER_SIZE, actualPayload2); + + if(verbose) + __COUTT__ << "receiveAll: retransmit received packet " + << packetIndex2 << "/" << totalPackets + << " total_received=" << receivedPackets.size() + << std::endl; + + // Check if we now have all packets + if(receivedPackets.size() >= static_cast(totalPackets)) + break; + } + + if(receivedPackets.size() >= static_cast(totalPackets)) + break; + } + } + + // Phase 3: Send "done" acknowledgment to sender (magic + 0xFFFF) + { + std::string doneSignal(4, '\0'); + uint16_t netMagic = htons(RETRANSMIT_MAGIC); + uint16_t netDone = htons(0xFFFF); + std::memcpy(&doneSignal[0], &netMagic, 2); + std::memcpy(&doneSignal[2], &netDone, 2); + sendto(socketNumber_, + doneSignal.data(), + doneSignal.size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + } + + // Phase 4: Assemble full buffer in order + if(!totalKnown || receivedPackets.empty()) + { + __SS__ << "receiveAll: failed to receive any retransmission-mode packets" + << __E__; + __SS_THROW__; + } + + if(receivedPackets.size() < static_cast(totalPackets)) + { + // Build list of still-missing indices for the error message + std::string missingStr; + for(uint16_t i = 0; i < totalPackets; ++i) + { + if(receivedPackets.find(i) == receivedPackets.end()) + { + if(!missingStr.empty()) + missingStr += ", "; + missingStr += std::to_string(i); + } + } + __SS__ << "receiveAll: failed to receive all packets after " + << retransmitMaxRetries << " retransmit retries. " + << "Received " << receivedPackets.size() << "/" << totalPackets + << " packets. Missing indices: [" << missingStr << "]" << __E__; + __SS_THROW__; + } + + // Assemble in order + buffer.clear(); + for(uint16_t i = 0; i < totalPackets; ++i) + buffer += receivedPackets[i]; + + if(verbose) + __COUT__ << "receiveAll: successfully assembled " << buffer.size() + << " bytes from " << totalPackets << " packets in " + << std::chrono::duration_cast( + clock::now() - start) + .count() + << " ms" << __E__; + + return 0; +} //end receiveAll() + +//============================================================================== +/// sendAndReceiveAll() sends a command then reliably receives the full +/// multi-packet response using the retransmission protocol. +/// The remote sender must use acknowledge() with enableRetransmission=true. +/// +/// This mirrors sendAndReceive() but uses receiveAll() instead of the +/// simple multi-packet loop, providing: +/// - Packet ordering via indexed headers +/// - Dropped packet detection via known total count +/// - Automatic retransmit requests for missing packets +/// - Fully assembled, ordered response buffer +/// +/// Throws on timeout or error. +std::string TransceiverSocket::sendAndReceiveAll( + Socket& toSocket, + const std::string& sendBuffer, + unsigned int timeoutSeconds /* = 5 */, + unsigned int retransmitMaxRetries /* = 10 */, + bool verbose /* = false */) +{ + using clock = std::chrono::steady_clock; + auto start = clock::now(); + + // lockout other sender and receive attempts for the remainder of the scope + std::lock_guard lock(sendAndReceiveMutex_); + + flush(); // make sure nothing to read before sending + send(toSocket, sendBuffer, verbose); + + __COUTT__ << " ----> Time sendAndReceiveAll '" << sendBuffer + << "' (socketNumber=" << socketNumber_ << ") check ==> " + << std::chrono::duration_cast(clock::now() - + start) + .count() + << " milliseconds. PID=" << getpid() + << " TID=" << std::this_thread::get_id() << std::endl; + + std::string receiveBuffer; + if(receiveAll(receiveBuffer, timeoutSeconds, retransmitMaxRetries, verbose) < 0) + { + __SS__ << "Timeout (" << timeoutSeconds + << " s) or Error receiving retransmission response from remote ip:port " + << toSocket.getIPAddress() << ":" << toSocket.getPort() + << " to this ip:port " << Socket::getIPAddress() << ":" + << Socket::getPort() << __E__; + __SS_ONLY_THROW__; + } + + __COUTT__ << " ----> Time sendAndReceiveAll complete: " << receiveBuffer.size() + << " bytes (socketNumber=" << socketNumber_ << ") ==> " + << std::chrono::duration_cast(clock::now() - + start) + .count() + << " milliseconds. PID=" << getpid() + << " TID=" << std::this_thread::get_id() << std::endl; + + return receiveBuffer; +} //end sendAndReceiveAll() diff --git a/otsdaq/NetworkUtilities/TransceiverSocket.h b/otsdaq/NetworkUtilities/TransceiverSocket.h index 2888a5c7..3b0b37ff 100644 --- a/otsdaq/NetworkUtilities/TransceiverSocket.h +++ b/otsdaq/NetworkUtilities/TransceiverSocket.h @@ -14,11 +14,29 @@ class TransceiverSocket : public TransmitterSocket, public ReceiverSocket TransceiverSocket(std::string IPAddress, unsigned int port = 0); virtual ~TransceiverSocket(void); - /// acknowledge() responds to last receive location + /// acknowledge() responds to last receive location. + /// When enableRetransmission is true, each sent packet is prepended with an 8-byte + /// retransmission header so the receiver can detect dropped packets and request + /// retransmission. The header format is: + /// [0-1] magic marker 0xD2C4 (network byte order) + /// [2-3] packet index (0-based, network byte order uint16) + /// [4-5] total packet count (network byte order uint16) + /// [6-7] payload size in this packet (network byte order uint16) int acknowledge(const std::string& buffer, bool verbose = false, size_t maxChunkSize = 1500, - unsigned int interPacketGapUSeconds = 0); + unsigned int interPacketGapUSeconds = 0, + bool enableRetransmission = false); + + /// receiveAll() receives a multi-packet retransmission-mode response. + /// It assembles the full message from individually-headered packets, + /// detects missing packets by index, and requests retransmission + /// from the sender for any dropped packets. + /// Returns 0 on success (assembled buffer placed in 'buffer'), -1 on failure. + int receiveAll(std::string& buffer, + unsigned int timeoutSeconds = 5, + unsigned int retransmitMaxRetries = 10, + bool verbose = false); std::string sendAndReceive(Socket& toSocket, const std::string& sendBuffer, @@ -27,6 +45,24 @@ class TransceiverSocket : public TransmitterSocket, public ReceiverSocket bool verbose = false, unsigned int interPacketTimeoutUSeconds = 10000); + /// sendAndReceiveAll() sends a command then uses the retransmission protocol + /// to reliably receive the full multi-packet response. The sender must use + /// acknowledge() with enableRetransmission=true. This method handles: + /// 1. Flushing and sending the request + /// 2. Receiving all retransmission-headered packets + /// 3. Detecting missing packets and sending retransmit requests + /// 4. Assembling and returning the complete response + /// Throws on timeout or error. + std::string sendAndReceiveAll(Socket& toSocket, + const std::string& sendBuffer, + unsigned int timeoutSeconds = 5, + unsigned int retransmitMaxRetries = 10, + bool verbose = false); + + /// Retransmission protocol constants + static constexpr uint16_t RETRANSMIT_MAGIC = 0xD2C4; + static constexpr size_t RETRANSMIT_HEADER_SIZE = 8; + protected: TransceiverSocket(void); From bcab13731de5522dcd91457fcae5d504bf6dc4f1 Mon Sep 17 00:00:00 2001 From: rrivera Date: Thu, 7 May 2026 13:10:39 -0500 Subject: [PATCH 02/15] Added sendAll() to Transceiver class as well --- otsdaq/NetworkUtilities/TransceiverSocket.cc | 269 ++++++++++--------- otsdaq/NetworkUtilities/TransceiverSocket.h | 38 ++- 2 files changed, 169 insertions(+), 138 deletions(-) diff --git a/otsdaq/NetworkUtilities/TransceiverSocket.cc b/otsdaq/NetworkUtilities/TransceiverSocket.cc index f8131040..5d71a378 100644 --- a/otsdaq/NetworkUtilities/TransceiverSocket.cc +++ b/otsdaq/NetworkUtilities/TransceiverSocket.cc @@ -33,9 +33,8 @@ TransceiverSocket::~TransceiverSocket(void) {} //============================================================================== /// returns 0 on success -/// When enableRetransmission is true, each sent packet is prepended with an -/// 8-byte retransmission header so the receiver can detect dropped packets and -/// request retransmission. +/// When enableRetransmission is true, uses sendAll() to send the buffer with +/// retransmission headers, then waits for retransmit requests from the receiver. int TransceiverSocket::acknowledge( const std::string& buffer, bool verbose /* = false */, @@ -43,9 +42,6 @@ int TransceiverSocket::acknowledge( unsigned int interPacketGapUSeconds /* = 0 */, bool enableRetransmission /* = false */) { - // lockout other senders for the remainder of the scope - std::lock_guard lock(sendMutex_); - if(verbose) __COUTT__ << "Acknowledging on Socket Descriptor #: " << socketNumber_ << " from-port: " << ntohs(socketAddress_.sin_port) @@ -53,14 +49,16 @@ int TransceiverSocket::acknowledge( << " retransmission: " << (enableRetransmission ? "ON" : "OFF") << std::endl; - const size_t MAX_SEND_SIZE = - maxChunkSize > 65500u ? static_cast(65500u) : maxChunkSize; - if(!enableRetransmission) { //==================================================================== // Original non-retransmission mode (unchanged behavior) //==================================================================== + // lockout other senders for the remainder of this scope + std::lock_guard lock(sendMutex_); + + const size_t MAX_SEND_SIZE = + maxChunkSize > 65500u ? static_cast(65500u) : maxChunkSize; size_t offset = 0; int sendToSize = 1; int sizeInBytes = 1; @@ -94,12 +92,37 @@ int TransceiverSocket::acknowledge( } //==================================================================== - // Retransmission mode: prepend 8-byte header to each packet - // [0-1] magic 0xD2C4 (network byte order) - // [2-3] packet index (network byte order uint16) - // [4-5] total packets (network byte order uint16) - // [6-7] payload size (network byte order uint16) + // Retransmission mode: delegate entirely to sendAll() which handles + // packet building, initial send, and retransmit request handling. //==================================================================== + return sendAll(buffer, verbose, maxChunkSize, interPacketGapUSeconds); +} //end acknowledge() + +//============================================================================== +/// sendAll() sends a buffer to the last receive address (fromAddress_) using the +/// retransmission protocol. Fully self-contained: +/// 1. Builds all packets with 8-byte retransmission headers +/// 2. Sends all packets +/// 3. Waits for retransmit requests from the receiver +/// 4. Resends requested packets +/// 5. Returns when receiver sends "done" or timeout expires +/// +/// Returns 0 on success. +int TransceiverSocket::sendAll( + const std::string& buffer, + bool verbose /* = false */, + size_t maxChunkSize /* = 65500 */, + unsigned int interPacketGapUSeconds /* = 0 */) +{ + if(verbose) + __COUT__ << "sendAll: retransmission-mode send on Socket Descriptor #: " + << socketNumber_ + << " from-port: " << ntohs(socketAddress_.sin_port) + << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port) + << " buffer size: " << buffer.size() << __E__; + + const size_t MAX_SEND_SIZE = + maxChunkSize > 65500u ? static_cast(65500u) : maxChunkSize; // The payload per packet is reduced by the header size const size_t payloadMax = @@ -114,67 +137,68 @@ int TransceiverSocket::acknowledge( totalPackets = 1; // send at least one packet even for empty buffer if(verbose) - __COUT__ << "Retransmission mode: sending " << totalPackets + __COUT__ << "sendAll: sending " << totalPackets << " packets for " << buffer.size() << " bytes, payloadMax=" << payloadMax << __E__; - // Build and cache all packets so they can be re-sent on retransmit request + // Build and cache all packets (header + payload) for retransmit use std::vector packets(totalPackets); - size_t offset = 0; - for(uint16_t pi = 0; pi < totalPackets; ++pi) { - size_t payloadSize = (buffer.size() - offset) > payloadMax - ? payloadMax - : (buffer.size() - offset); - - // Build the 8-byte header - char header[RETRANSMIT_HEADER_SIZE]; - uint16_t netMagic = htons(RETRANSMIT_MAGIC); - uint16_t netIndex = htons(pi); - uint16_t netTotal = htons(totalPackets); - uint16_t netPaySize = htons(static_cast(payloadSize)); - std::memcpy(header + 0, &netMagic, 2); - std::memcpy(header + 2, &netIndex, 2); - std::memcpy(header + 4, &netTotal, 2); - std::memcpy(header + 6, &netPaySize, 2); - - packets[pi].assign(header, RETRANSMIT_HEADER_SIZE); - packets[pi].append(buffer, offset, payloadSize); - - offset += payloadSize; + size_t offset = 0; + for(uint16_t pi = 0; pi < totalPackets; ++pi) + { + size_t payloadSize = (buffer.size() - offset) > payloadMax + ? payloadMax + : (buffer.size() - offset); + + char header[RETRANSMIT_HEADER_SIZE]; + uint16_t netMagic = htons(RETRANSMIT_MAGIC); + uint16_t netIndex = htons(pi); + uint16_t netTotal = htons(totalPackets); + uint16_t netPaySize = htons(static_cast(payloadSize)); + std::memcpy(header + 0, &netMagic, 2); + std::memcpy(header + 2, &netIndex, 2); + std::memcpy(header + 4, &netTotal, 2); + std::memcpy(header + 6, &netPaySize, 2); + + packets[pi].assign(header, RETRANSMIT_HEADER_SIZE); + packets[pi].append(buffer, offset, payloadSize); + offset += payloadSize; + } } - // Send all packets initially - for(uint16_t pi = 0; pi < totalPackets; ++pi) + // Send all packets initially (lock sendMutex_ for the burst) { - int sendToSize = sendto(socketNumber_, - packets[pi].data(), - packets[pi].size(), - 0, - (struct sockaddr*)&(ReceiverSocket::fromAddress_), - sizeof(sockaddr_in)); - if(sendToSize <= 0) + std::lock_guard lock(sendMutex_); + for(uint16_t pi = 0; pi < totalPackets; ++pi) { - __SS__ << "Error writing retransmit packet " << pi << " from port " - << ntohs(TransmitterSocket::socketAddress_.sin_port) << ": " - << strerror(errno) << std::endl; - __SS_THROW__; - } - if(verbose) - __COUTT__ << "Sent retransmit packet " << pi << "/" << totalPackets - << " size=" << packets[pi].size() << std::endl; + int sendToSize = sendto(socketNumber_, + packets[pi].data(), + packets[pi].size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + if(sendToSize <= 0) + { + __SS__ << "sendAll: error writing packet " << pi << "/" << totalPackets + << " from port " << ntohs(socketAddress_.sin_port) << ": " + << strerror(errno) << std::endl; + __SS_THROW__; + } + if(verbose) + __COUTT__ << "sendAll: sent packet " << pi << "/" << totalPackets + << " size=" << packets[pi].size() << std::endl; - if(interPacketGapUSeconds > 0 && pi + 1 < totalPackets) - usleep(interPacketGapUSeconds); + if(interPacketGapUSeconds > 0 && pi + 1 < totalPackets) + usleep(interPacketGapUSeconds); + } } - // Now wait for potential retransmit requests from receiver. - // A retransmit request is a packet starting with the magic marker 0xD2C4 - // followed by a list of uint16_t packet indices that need to be resent. - // A "done" acknowledgment from the receiver is a packet starting with magic - // followed by 0xFFFF (meaning "all received, done"). - const unsigned int retransmitTimeoutSeconds = 5; - const unsigned int maxRetransmitRounds = 20; + // Wait for retransmit requests from receiver. + // Retransmit request format: magic(2 bytes) + list of uint16 missing indices + // Done signal format: magic(2 bytes) + 0xFFFF(2 bytes) + const unsigned int retransmitTimeoutSeconds = 5; + const unsigned int maxRetransmitRounds = 20; for(unsigned int round = 0; round < maxRetransmitRounds; ++round) { @@ -187,85 +211,78 @@ int TransceiverSocket::acknowledge( { // Timeout - assume receiver got everything (or gave up) if(verbose) - __COUT__ << "No retransmit request received after " + __COUT__ << "sendAll: no retransmit request after " << retransmitTimeoutSeconds - << "s timeout, assuming all packets received." << __E__; + << "s timeout, assuming transfer complete." << __E__; break; } - // Check if this is a valid retransmit request (must start with magic) - if(retransmitRequest.size() >= 4) + if(retransmitRequest.size() < 4) + continue; + + uint16_t reqMagic; + std::memcpy(&reqMagic, retransmitRequest.data(), 2); + reqMagic = ntohs(reqMagic); + if(reqMagic != RETRANSMIT_MAGIC) + continue; + + // Check for "done" signal (magic + 0xFFFF) + uint16_t firstVal; + std::memcpy(&firstVal, retransmitRequest.data() + 2, 2); + firstVal = ntohs(firstVal); + if(firstVal == 0xFFFF) { - uint16_t reqMagic; - std::memcpy(&reqMagic, retransmitRequest.data(), 2); - reqMagic = ntohs(reqMagic); + if(verbose) + __COUT__ << "sendAll: received 'all done' from receiver." << __E__; + break; + } - if(reqMagic == RETRANSMIT_MAGIC) + // Parse list of missing packet indices and resend them + size_t numIndices = (retransmitRequest.size() - 2) / 2; + if(verbose) + __COUT__ << "sendAll: retransmit request for " << numIndices + << " packets (round " << round << ")." << __E__; + + // Lock sendMutex_ for the resend burst + std::lock_guard lock(sendMutex_); + for(size_t i = 0; i < numIndices; ++i) + { + uint16_t missingIdx; + std::memcpy(&missingIdx, retransmitRequest.data() + 2 + i * 2, 2); + missingIdx = ntohs(missingIdx); + + if(missingIdx < totalPackets) { - // Check for "done" signal (magic + 0xFFFF) - if(retransmitRequest.size() >= 4) + int sendToSize = + sendto(socketNumber_, + packets[missingIdx].data(), + packets[missingIdx].size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); + if(sendToSize <= 0) { - uint16_t firstIndex; - std::memcpy(&firstIndex, retransmitRequest.data() + 2, 2); - firstIndex = ntohs(firstIndex); - if(firstIndex == 0xFFFF) - { - if(verbose) - __COUT__ << "Received 'all done' from receiver." << __E__; - break; - } + __SS__ << "sendAll: error resending packet " << missingIdx + << ": " << strerror(errno) << std::endl; + __SS_THROW__; } - - // Parse list of missing packet indices and resend them - size_t numIndices = - (retransmitRequest.size() - 2) / 2; // skip 2-byte magic if(verbose) - __COUT__ << "Retransmit request for " << numIndices - << " packets (round " << round << ")." << __E__; + __COUTT__ << "sendAll: resent packet " << missingIdx << std::endl; - for(size_t i = 0; i < numIndices; ++i) - { - uint16_t missingIdx; - std::memcpy( - &missingIdx, retransmitRequest.data() + 2 + i * 2, 2); - missingIdx = ntohs(missingIdx); - - if(missingIdx < totalPackets) - { - int sendToSize = - sendto(socketNumber_, - packets[missingIdx].data(), - packets[missingIdx].size(), - 0, - (struct sockaddr*)&(ReceiverSocket::fromAddress_), - sizeof(sockaddr_in)); - if(sendToSize <= 0) - { - __SS__ << "Error resending retransmit packet " - << missingIdx << ": " << strerror(errno) - << std::endl; - __SS_THROW__; - } - if(verbose) - __COUTT__ << "Resent packet " << missingIdx << std::endl; - - if(interPacketGapUSeconds > 0) - usleep(interPacketGapUSeconds); - } - else - { - __COUT_WARN__ - << "Retransmit request for invalid packet index " - << missingIdx << " (total=" << totalPackets << ")" - << __E__; - } - } + if(interPacketGapUSeconds > 0) + usleep(interPacketGapUSeconds); + } + else + { + __COUT_WARN__ + << "sendAll: retransmit request for invalid packet index " + << missingIdx << " (total=" << totalPackets << ")" << __E__; } } } return 0; -} //end acknowledge() +} //end sendAll() //============================================================================== /// Receives one packet with the specified timeout, then attempts to receive diff --git a/otsdaq/NetworkUtilities/TransceiverSocket.h b/otsdaq/NetworkUtilities/TransceiverSocket.h index 3b0b37ff..3e2c4f18 100644 --- a/otsdaq/NetworkUtilities/TransceiverSocket.h +++ b/otsdaq/NetworkUtilities/TransceiverSocket.h @@ -5,6 +5,7 @@ #include "otsdaq/NetworkUtilities/TransmitterSocket.h" #include +#include namespace ots { @@ -15,19 +16,32 @@ class TransceiverSocket : public TransmitterSocket, public ReceiverSocket virtual ~TransceiverSocket(void); /// acknowledge() responds to last receive location. - /// When enableRetransmission is true, each sent packet is prepended with an 8-byte - /// retransmission header so the receiver can detect dropped packets and request - /// retransmission. The header format is: - /// [0-1] magic marker 0xD2C4 (network byte order) - /// [2-3] packet index (0-based, network byte order uint16) - /// [4-5] total packet count (network byte order uint16) - /// [6-7] payload size in this packet (network byte order uint16) + /// When enableRetransmission is true, delegates to sendAll() for reliable + /// multi-packet transfer with retransmit handling. int acknowledge(const std::string& buffer, bool verbose = false, size_t maxChunkSize = 1500, unsigned int interPacketGapUSeconds = 0, bool enableRetransmission = false); + /// sendAll() sends a buffer to the last receive address using the + /// retransmission protocol. This is fully self-contained: it builds + /// headered packets, sends them all, then waits for retransmit requests + /// from the receiver and resends any missing packets. Only returns when + /// the transfer is complete (receiver sends "done") or timeout expires. + /// + /// Each packet is prepended with an 8-byte retransmission header: + /// [0-1] magic marker 0xD2C4 (network byte order) + /// [2-3] packet index (0-based, network byte order uint16) + /// [4-5] total packet count (network byte order uint16) + /// [6-7] payload size in this packet (network byte order uint16) + /// + /// Returns 0 on success. + int sendAll(const std::string& buffer, + bool verbose = false, + size_t maxChunkSize = 65500, + unsigned int interPacketGapUSeconds = 0); + /// receiveAll() receives a multi-packet retransmission-mode response. /// It assembles the full message from individually-headered packets, /// detects missing packets by index, and requests retransmission @@ -47,11 +61,11 @@ class TransceiverSocket : public TransmitterSocket, public ReceiverSocket /// sendAndReceiveAll() sends a command then uses the retransmission protocol /// to reliably receive the full multi-packet response. The sender must use - /// acknowledge() with enableRetransmission=true. This method handles: - /// 1. Flushing and sending the request - /// 2. Receiving all retransmission-headered packets - /// 3. Detecting missing packets and sending retransmit requests - /// 4. Assembling and returning the complete response + /// acknowledge() with enableRetransmission=true (or sendAll()). This method: + /// 1. Flushes and sends the request + /// 2. Receives all retransmission-headered packets + /// 3. Detects missing packets and sends retransmit requests + /// 4. Assembles and returns the complete response /// Throws on timeout or error. std::string sendAndReceiveAll(Socket& toSocket, const std::string& sendBuffer, From a25d861b3ce51b856c26adff469d9e9ac7b5bd0c Mon Sep 17 00:00:00 2001 From: rrivera Date: Thu, 7 May 2026 15:23:36 -0500 Subject: [PATCH 03/15] Format fix --- otsdaq/GatewaySupervisor/GatewaySupervisor.cc | 23 ++-- otsdaq/NetworkUtilities/TransceiverSocket.cc | 124 ++++++++---------- otsdaq/NetworkUtilities/TransceiverSocket.h | 12 +- 3 files changed, 75 insertions(+), 84 deletions(-) diff --git a/otsdaq/GatewaySupervisor/GatewaySupervisor.cc b/otsdaq/GatewaySupervisor/GatewaySupervisor.cc index ed3a8223..a7a23e01 100644 --- a/otsdaq/GatewaySupervisor/GatewaySupervisor.cc +++ b/otsdaq/GatewaySupervisor/GatewaySupervisor.cc @@ -2810,12 +2810,12 @@ void GatewaySupervisor::SendRemoteGatewayCommand( // config dump transfer. This replaces the old sendAndReceive + manual // receive loop, providing automatic packet ordering, dropped packet // detection, and retransmit requests. - std::string commandResponseString = remoteGatewaySocket->sendAndReceiveAll( - gatewayRemoteSocket, - command, - 10 /*timeoutSeconds*/, - 10 /*retransmitMaxRetries*/, - false /*verbose*/); + std::string commandResponseString = + remoteGatewaySocket->sendAndReceiveAll(gatewayRemoteSocket, + command, + 10 /*timeoutSeconds*/, + 10 /*retransmitMaxRetries*/, + false /*verbose*/); __COUT__ << "Response from subsystem '" << remoteGatewayApp.appInfo.name << "' received: " << commandResponseString.size() << " bytes" << __E__; @@ -2880,15 +2880,15 @@ void GatewaySupervisor::SendRemoteGatewayCommand( __SS__ << "Config dump response from Remote Gateway '" << remoteGatewayApp.appInfo.name << "' is missing END--- termination marker. " - << "Received " << commandResponseString.size() - << " bytes." << __E__; + << "Received " << commandResponseString.size() << " bytes." + << __E__; const size_t maxPrint = 500; if(commandResponseString.size() <= maxPrint) ss << " Full text: [" << commandResponseString << "]"; else ss << " Last " << maxPrint << " chars: [" - << commandResponseString.substr( - commandResponseString.size() - maxPrint) + << commandResponseString.substr(commandResponseString.size() - + maxPrint) << "]"; ss << __E__; __SS_THROW__; @@ -4432,7 +4432,8 @@ void GatewaySupervisor::StateChangerWorkLoop(GatewaySupervisor* theSupervisor) true /* verbose */, extraDoneContent.size() ? 65500 : 1500 /*maxChunkSize*/, 0 /*interPacketGapUSeconds*/, - extraDoneContent.size() > 0 /*enableRetransmission - use retransmit protocol for large config dump transfers*/); + extraDoneContent.size() > + 0 /*enableRetransmission - use retransmit protocol for large config dump transfers*/); } } catch(...) diff --git a/otsdaq/NetworkUtilities/TransceiverSocket.cc b/otsdaq/NetworkUtilities/TransceiverSocket.cc index 5d71a378..aa38bf85 100644 --- a/otsdaq/NetworkUtilities/TransceiverSocket.cc +++ b/otsdaq/NetworkUtilities/TransceiverSocket.cc @@ -35,12 +35,11 @@ TransceiverSocket::~TransceiverSocket(void) {} /// returns 0 on success /// When enableRetransmission is true, uses sendAll() to send the buffer with /// retransmission headers, then waits for retransmit requests from the receiver. -int TransceiverSocket::acknowledge( - const std::string& buffer, - bool verbose /* = false */, - size_t maxChunkSize /* = 1500 */, - unsigned int interPacketGapUSeconds /* = 0 */, - bool enableRetransmission /* = false */) +int TransceiverSocket::acknowledge(const std::string& buffer, + bool verbose /* = false */, + size_t maxChunkSize /* = 1500 */, + unsigned int interPacketGapUSeconds /* = 0 */, + bool enableRetransmission /* = false */) { if(verbose) __COUTT__ << "Acknowledging on Socket Descriptor #: " << socketNumber_ @@ -59,8 +58,8 @@ int TransceiverSocket::acknowledge( const size_t MAX_SEND_SIZE = maxChunkSize > 65500u ? static_cast(65500u) : maxChunkSize; - size_t offset = 0; - int sendToSize = 1; + size_t offset = 0; + int sendToSize = 1; int sizeInBytes = 1; while(offset < buffer.size() && sendToSize > 0) @@ -108,16 +107,14 @@ int TransceiverSocket::acknowledge( /// 5. Returns when receiver sends "done" or timeout expires /// /// Returns 0 on success. -int TransceiverSocket::sendAll( - const std::string& buffer, - bool verbose /* = false */, - size_t maxChunkSize /* = 65500 */, - unsigned int interPacketGapUSeconds /* = 0 */) +int TransceiverSocket::sendAll(const std::string& buffer, + bool verbose /* = false */, + size_t maxChunkSize /* = 65500 */, + unsigned int interPacketGapUSeconds /* = 0 */) { if(verbose) __COUT__ << "sendAll: retransmission-mode send on Socket Descriptor #: " - << socketNumber_ - << " from-port: " << ntohs(socketAddress_.sin_port) + << socketNumber_ << " from-port: " << ntohs(socketAddress_.sin_port) << " to-port: " << ntohs(ReceiverSocket::fromAddress_.sin_port) << " buffer size: " << buffer.size() << __E__; @@ -125,21 +122,19 @@ int TransceiverSocket::sendAll( maxChunkSize > 65500u ? static_cast(65500u) : maxChunkSize; // The payload per packet is reduced by the header size - const size_t payloadMax = - MAX_SEND_SIZE > RETRANSMIT_HEADER_SIZE - ? MAX_SEND_SIZE - RETRANSMIT_HEADER_SIZE - : 1; + const size_t payloadMax = MAX_SEND_SIZE > RETRANSMIT_HEADER_SIZE + ? MAX_SEND_SIZE - RETRANSMIT_HEADER_SIZE + : 1; // Calculate total number of packets - uint16_t totalPackets = static_cast( - (buffer.size() + payloadMax - 1) / payloadMax); + uint16_t totalPackets = + static_cast((buffer.size() + payloadMax - 1) / payloadMax); if(totalPackets == 0) totalPackets = 1; // send at least one packet even for empty buffer if(verbose) - __COUT__ << "sendAll: sending " << totalPackets - << " packets for " << buffer.size() << " bytes, payloadMax=" - << payloadMax << __E__; + __COUT__ << "sendAll: sending " << totalPackets << " packets for " + << buffer.size() << " bytes, payloadMax=" << payloadMax << __E__; // Build and cache all packets (header + payload) for retransmit use std::vector packets(totalPackets); @@ -156,9 +151,9 @@ int TransceiverSocket::sendAll( uint16_t netIndex = htons(pi); uint16_t netTotal = htons(totalPackets); uint16_t netPaySize = htons(static_cast(payloadSize)); - std::memcpy(header + 0, &netMagic, 2); - std::memcpy(header + 2, &netIndex, 2); - std::memcpy(header + 4, &netTotal, 2); + std::memcpy(header + 0, &netMagic, 2); + std::memcpy(header + 2, &netIndex, 2); + std::memcpy(header + 4, &netTotal, 2); std::memcpy(header + 6, &netPaySize, 2); packets[pi].assign(header, RETRANSMIT_HEADER_SIZE); @@ -204,9 +199,9 @@ int TransceiverSocket::sendAll( { std::string retransmitRequest; int rc = receive(retransmitRequest, - retransmitTimeoutSeconds, - 0 /*timeoutUSeconds*/, - false /*verbose*/); + retransmitTimeoutSeconds, + 0 /*timeoutUSeconds*/, + false /*verbose*/); if(rc < 0) { // Timeout - assume receiver got everything (or gave up) @@ -253,17 +248,16 @@ int TransceiverSocket::sendAll( if(missingIdx < totalPackets) { - int sendToSize = - sendto(socketNumber_, - packets[missingIdx].data(), - packets[missingIdx].size(), - 0, - (struct sockaddr*)&(ReceiverSocket::fromAddress_), - sizeof(sockaddr_in)); + int sendToSize = sendto(socketNumber_, + packets[missingIdx].data(), + packets[missingIdx].size(), + 0, + (struct sockaddr*)&(ReceiverSocket::fromAddress_), + sizeof(sockaddr_in)); if(sendToSize <= 0) { - __SS__ << "sendAll: error resending packet " << missingIdx - << ": " << strerror(errno) << std::endl; + __SS__ << "sendAll: error resending packet " << missingIdx << ": " + << strerror(errno) << std::endl; __SS_THROW__; } if(verbose) @@ -274,9 +268,8 @@ int TransceiverSocket::sendAll( } else { - __COUT_WARN__ - << "sendAll: retransmit request for invalid packet index " - << missingIdx << " (total=" << totalPackets << ")" << __E__; + __COUT_WARN__ << "sendAll: retransmit request for invalid packet index " + << missingIdx << " (total=" << totalPackets << ")" << __E__; } } } @@ -403,10 +396,10 @@ int TransceiverSocket::receiveAll(std::string& buffer, while(true) { std::string rawPacket; - int rc = receive(rawPacket, - firstPacketReceived ? 0 : timeoutSeconds, - firstPacketReceived ? interPacketTimeoutUSeconds : 0, - false /*verbose*/); + int rc = receive(rawPacket, + firstPacketReceived ? 0 : timeoutSeconds, + firstPacketReceived ? interPacketTimeoutUSeconds : 0, + false /*verbose*/); if(rc < 0) { @@ -441,9 +434,9 @@ int TransceiverSocket::receiveAll(std::string& buffer, // Parse header uint16_t magic, packetIndex, pktTotal, payloadSize; - std::memcpy(&magic, rawPacket.data() + 0, 2); + std::memcpy(&magic, rawPacket.data() + 0, 2); std::memcpy(&packetIndex, rawPacket.data() + 2, 2); - std::memcpy(&pktTotal, rawPacket.data() + 4, 2); + std::memcpy(&pktTotal, rawPacket.data() + 4, 2); std::memcpy(&payloadSize, rawPacket.data() + 6, 2); magic = ntohs(magic); packetIndex = ntohs(packetIndex); @@ -478,18 +471,18 @@ int TransceiverSocket::receiveAll(std::string& buffer, if(verbose) __COUTT__ << "receiveAll: received packet " << packetIndex << "/" - << totalPackets << " payload=" << actualPayload << " total_received=" - << receivedPackets.size() << std::endl; + << totalPackets << " payload=" << actualPayload + << " total_received=" << receivedPackets.size() << std::endl; // Check if we have all packets - if(totalKnown && - receivedPackets.size() >= static_cast(totalPackets)) + if(totalKnown && receivedPackets.size() >= static_cast(totalPackets)) break; // Check overall timeout - auto elapsed = std::chrono::duration_cast( - clock::now() - start); - if(elapsed.count() >= static_cast(timeoutSeconds * (retransmitMaxRetries + 1))) + auto elapsed = + std::chrono::duration_cast(clock::now() - start); + if(elapsed.count() >= + static_cast(timeoutSeconds * (retransmitMaxRetries + 1))) { if(verbose) __COUT_WARN__ << "receiveAll: overall timeout reached" << __E__; @@ -498,8 +491,7 @@ int TransceiverSocket::receiveAll(std::string& buffer, } // Phase 2: Retransmit missing packets - if(totalKnown && - receivedPackets.size() < static_cast(totalPackets)) + if(totalKnown && receivedPackets.size() < static_cast(totalPackets)) { for(unsigned int retry = 0; retry < retransmitMaxRetries; ++retry) { @@ -552,10 +544,8 @@ int TransceiverSocket::receiveAll(std::string& buffer, while(true) { std::string rawPacket; - int rc = receive(rawPacket, - timeoutSeconds, - 0 /*timeoutUSeconds*/, - false /*verbose*/); + int rc = receive( + rawPacket, timeoutSeconds, 0 /*timeoutUSeconds*/, false /*verbose*/); if(rc < 0) break; // timeout, will retry @@ -563,9 +553,9 @@ int TransceiverSocket::receiveAll(std::string& buffer, continue; uint16_t magic2, packetIndex2, pktTotal2, payloadSize2; - std::memcpy(&magic2, rawPacket.data() + 0, 2); + std::memcpy(&magic2, rawPacket.data() + 0, 2); std::memcpy(&packetIndex2, rawPacket.data() + 2, 2); - std::memcpy(&pktTotal2, rawPacket.data() + 4, 2); + std::memcpy(&pktTotal2, rawPacket.data() + 4, 2); std::memcpy(&payloadSize2, rawPacket.data() + 6, 2); magic2 = ntohs(magic2); packetIndex2 = ntohs(packetIndex2); @@ -583,8 +573,8 @@ int TransceiverSocket::receiveAll(std::string& buffer, rawPacket.substr(RETRANSMIT_HEADER_SIZE, actualPayload2); if(verbose) - __COUTT__ << "receiveAll: retransmit received packet " - << packetIndex2 << "/" << totalPackets + __COUTT__ << "receiveAll: retransmit received packet " << packetIndex2 + << "/" << totalPackets << " total_received=" << receivedPackets.size() << std::endl; @@ -649,8 +639,8 @@ int TransceiverSocket::receiveAll(std::string& buffer, if(verbose) __COUT__ << "receiveAll: successfully assembled " << buffer.size() << " bytes from " << totalPackets << " packets in " - << std::chrono::duration_cast( - clock::now() - start) + << std::chrono::duration_cast(clock::now() - + start) .count() << " ms" << __E__; diff --git a/otsdaq/NetworkUtilities/TransceiverSocket.h b/otsdaq/NetworkUtilities/TransceiverSocket.h index 3e2c4f18..90f393dc 100644 --- a/otsdaq/NetworkUtilities/TransceiverSocket.h +++ b/otsdaq/NetworkUtilities/TransceiverSocket.h @@ -49,8 +49,8 @@ class TransceiverSocket : public TransmitterSocket, public ReceiverSocket /// Returns 0 on success (assembled buffer placed in 'buffer'), -1 on failure. int receiveAll(std::string& buffer, unsigned int timeoutSeconds = 5, - unsigned int retransmitMaxRetries = 10, - bool verbose = false); + unsigned int retransmitMaxRetries = 10, + bool verbose = false); std::string sendAndReceive(Socket& toSocket, const std::string& sendBuffer, @@ -69,12 +69,12 @@ class TransceiverSocket : public TransmitterSocket, public ReceiverSocket /// Throws on timeout or error. std::string sendAndReceiveAll(Socket& toSocket, const std::string& sendBuffer, - unsigned int timeoutSeconds = 5, - unsigned int retransmitMaxRetries = 10, - bool verbose = false); + unsigned int timeoutSeconds = 5, + unsigned int retransmitMaxRetries = 10, + bool verbose = false); /// Retransmission protocol constants - static constexpr uint16_t RETRANSMIT_MAGIC = 0xD2C4; + static constexpr uint16_t RETRANSMIT_MAGIC = 0xD2C4; static constexpr size_t RETRANSMIT_HEADER_SIZE = 8; protected: From 409ec3bd5db71da6dc98d256cc7579a035a09e57 Mon Sep 17 00:00:00 2001 From: RYAN RIVERA Date: Thu, 14 May 2026 11:45:40 -0500 Subject: [PATCH 04/15] Checkpoint in log checking improvement; extraction of compile aliases as general tool --- tools/CMakeLists.txt | 1 + tools/ots | 713 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 708 insertions(+), 6 deletions(-) diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index c620111f..dac4fedb 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -13,6 +13,7 @@ cet_make_exec(NAME otsdaq_load_json_document LIBRARIES otsdaq::ConfigurationInte cet_script(ALWAYS_COPY ots_common.sh + ots_compile_aliases.sh EclipseBuild.sh ots ots_remote_start diff --git a/tools/ots b/tools/ots index 48d34157..b293513b 100644 --- a/tools/ots +++ b/tools/ots @@ -126,6 +126,8 @@ SOFTKILLALL=0 #only kill user data areas KILLALL=0 ISMACROMAKER=0 ISFINDLOGFILES=0 +ISFINDLOGFILES_LOCAL=0 +ISFINDLOGFILES_ARTDAQ=0 ISCONFIG=0 QUIET=1 CHROME=0 @@ -198,6 +200,18 @@ while :; do info "DISPLAYING LOG FILE LOCATIONS!" ISFINDLOGFILES=1 ;; + --logfind-local | -ll) + # Local-only log find: search $USER_DATA/Logs on THIS host only (no remote SSH). + # Used by fast_ots_setup.sh 'logs' action to provide results back to a primary ots -l caller. + info "DISPLAYING LOCAL LOG FILE LOCATIONS (no remote search)!" + ISFINDLOGFILES_LOCAL=1 + ;; + --logfind-artdaq | -lll) + # Local log find + ARTDAQ hosts from latest ranks.txt. + # Used by fast_ots_setup.sh 'artdaqlogs' action. + info "DISPLAYING LOCAL + ARTDAQ HOST LOG FILE LOCATIONS!" + ISFINDLOGFILES_ARTDAQ=1 + ;; --chrome | -c) # Enable launching chrome after startup info "CHROME LAUNCH ENABLED!" @@ -277,6 +291,12 @@ while :; do out "To display potential log file locations based on current otsdaq configuration, add one of these options:" out " --logfind -l" out " e.g.: ots -l or ots --logfind" + out " --logfind-local -ll" + out " Local-only log find (no remote SSH). Used by fast_ots_setup.sh logs action." + out " e.g.: ots -ll or ots --logfind-local" + out " --logfind-artdaq -lll" + out " Local log find + ARTDAQ hosts from latest ranks.txt. Used by fast_ots_setup.sh artdaqlogs action." + out " e.g.: ots -lll or ots --logfind-artdaq" out out "To start otsdaq and launch google-chrome, add one of these options:" out " --chrome -c" @@ -2408,17 +2428,478 @@ launchOTSLogFind() { done < ${XDAQ_CONFIGURATION_DATA_PATH}/${XDAQ_OTS_XML}.xml + ############################################################################## + # Display log files recently written to under $USER_DATA/Logs on all hosts + # - Always search locally on THIS_HOST (in case context config has changed). + # - Also search each remote host from the XML via: + # ssh -K host "cd && source ./fast_ots_setup.sh logs" + # which sources the environment and runs 'ots -ll' (local-only log find). + # - ots -ll outputs machine-parseable lines: LOGFIND_LOCAL:: + # - Escalating time window is handled by ots -ll on each host independently. + ############################################################################## + out #blank line + out #blank line + + #-- helper: format file age as human-readable string -- + # < 1 hour: MM:SS ago e.g. "30:00 ago" + # < 2 days: HH:MM:SS ago e.g. "12:32:21 ago" + # >= 2 days: N days ago e.g. "3 days ago" + _logfind_file_age() { + local file_path="$1" + local file_mtime now_epoch age_secs days hours mins secs + file_mtime=$(stat -L -c %Y "$file_path" 2>/dev/null) + if [ -z "$file_mtime" ]; then echo ""; return; fi + now_epoch=$(date +%s) + age_secs=$(( now_epoch - file_mtime )) + if [ $age_secs -lt 0 ]; then age_secs=0; fi + days=$(( age_secs / 86400 )) + if [ $days -ge 2 ]; then + echo "${days} days ago" + else + hours=$(( age_secs / 3600 )) + mins=$(( (age_secs % 3600) / 60 )) + secs=$(( age_secs % 60 )) + if [ $hours -gt 0 ]; then + printf "%d:%02d:%02d ago" $hours $mins $secs + else + printf "%02d:%02d ago" $mins $secs + fi + fi + } + + #-- helper: check if a hostname refers to the local machine -- + # Matches THIS_HOST exactly, THIS_HOST.domain, localhost, + # or THIS_HOST with -data/-ipmi suffix (private network aliases for same machine) + _logfind_is_local() { + local h="$1" + # direct matches + if [[ "$h" == "${THIS_HOST}" || "$h" == "${THIS_HOST}."* || "$h" == "localhost" ]]; then + return 0 + fi + # strip -data or -ipmi suffix and check again + local h_stripped="${h/-data/}" + h_stripped="${h_stripped/-ipmi/}" + if [[ "$h_stripped" == "${THIS_HOST}" || "$h_stripped" == "${THIS_HOST}."* ]]; then + return 0 + fi + return 1 + } + + #-- build unique host list (avoid duplicates) -- + declare -A _logfind_seen_hosts + unset _logfind_remote_hosts + _logfind_remote_hosts=() + + # always include the local host (mark as seen so it won't appear in remote list) + _logfind_seen_hosts["${THIS_HOST}"]=1 + + # add gateway host (if remote) + if [ -n "$gatewayHostname" ]; then + if [ -z "${_logfind_seen_hosts[$gatewayHostname]+x}" ]; then + if _logfind_is_local "$gatewayHostname"; then + _logfind_seen_hosts["$gatewayHostname"]=1 + else + _logfind_seen_hosts["$gatewayHostname"]=1 + _logfind_remote_hosts+=("$gatewayHostname") + fi + fi + fi + # add non-gateway hosts (if remote) + for _lf_host in "${xdaqHost[@]}"; do + if [ -z "${_logfind_seen_hosts[$_lf_host]+x}" ]; then + if _logfind_is_local "$_lf_host"; then + _logfind_seen_hosts["$_lf_host"]=1 + else + _logfind_seen_hosts["$_lf_host"]=1 + _logfind_remote_hosts+=("$_lf_host") + fi + fi + done + unset _logfind_seen_hosts + + out "${Bold}${Cyan}Searching for recently written log files (under \$USER_DATA/Logs)...${RstClr}" + out " ${IBlack}Hosts to search: ${THIS_HOST} (local)${_logfind_remote_hosts[@]:+ ${_logfind_remote_hosts[*]} (remote)}${RstClr}" + + #-- Step 1a: Local search in $USER_DATA/Logs using escalating time windows -- + out " ${IBlack}[${THIS_HOST}] Searching locally in \$USER_DATA/Logs = ${USER_DATA}/Logs ...${RstClr}" + + _logfind_local_label="" + _logfind_local_files="" + for _ll_window in "10 minutes:10" "1 hour:60" "1 day:1440" "1 week:10080" "1 month:43200" "1 year:525600"; do + _ll_label="${_ll_window%%:*}" + _ll_mmin="${_ll_window##*:}" + _ll_files=$(find -L "${USER_DATA}/Logs" -maxdepth 5 -type f -mmin -${_ll_mmin} 2>/dev/null) + if [ -n "$_ll_files" ]; then + _logfind_local_label="$_ll_label" + _logfind_local_files="$_ll_files" + out " ${IBlack}[${THIS_HOST}] Found log files within last ${_ll_label}.${RstClr}" + break + fi + done + if [ -z "$_logfind_local_files" ]; then + out " ${IBlack}[${THIS_HOST}] No log files found in the last year.${RstClr}" + fi + + #-- Step 1b: Search for action handler logs in ots_dir/tmp/ -- + out " ${IBlack}[${THIS_HOST}] Searching for action handler logs in ${PWD}/tmp/ ...${RstClr}" + + _logfind_action_label="" + _logfind_action_files="" + for _ll_window in "10 minutes:10" "1 hour:60" "1 day:1440" "1 week:10080" "1 month:43200" "1 year:525600"; do + _ll_label="${_ll_window%%:*}" + _ll_mmin="${_ll_window##*:}" + _ll_files=$(find -L "${PWD}/tmp" -maxdepth 1 -type f -name "otsActionsHandler*${THIS_HOST}*" -mmin -${_ll_mmin} 2>/dev/null) + if [ -n "$_ll_files" ]; then + _logfind_action_label="$_ll_label" + _logfind_action_files="$_ll_files" + out " ${IBlack}[${THIS_HOST}] Found action handler logs within last ${_ll_label}.${RstClr}" + break + fi + done + if [ -z "$_logfind_action_files" ]; then + out " ${IBlack}[${THIS_HOST}] No action handler logs found in the last year.${RstClr}" + fi + + #-- Step 1c: Search for ARTDAQ configuration files in $USER_DATA/ARTDAQConfigurations (excluding run_records) -- + out " ${IBlack}[${THIS_HOST}] Searching for ARTDAQ config files in \$USER_DATA/ARTDAQConfigurations ...${RstClr}" + + _logfind_artdaq_label="" + _logfind_artdaq_files="" + for _ll_window in "10 minutes:10" "1 hour:60" "1 day:1440" "1 week:10080" "1 month:43200" "1 year:525600"; do + _ll_label="${_ll_window%%:*}" + _ll_mmin="${_ll_window##*:}" + _ll_files=$(find -L "${USER_DATA}/ARTDAQConfigurations" -maxdepth 5 -not -path "*/run_records/*" -not -path "*/run_records" -type f -mmin -${_ll_mmin} 2>/dev/null) + if [ -n "$_ll_files" ]; then + _logfind_artdaq_label="$_ll_label" + _logfind_artdaq_files="$_ll_files" + out " ${IBlack}[${THIS_HOST}] Found ARTDAQ config files within last ${_ll_label}.${RstClr}" + break + fi + done + if [ -z "$_logfind_artdaq_files" ]; then + out " ${IBlack}[${THIS_HOST}] No ARTDAQ config files found in the last year.${RstClr}" + fi + + #-- Step 1d: Search for ARTDAQ run records in the LATEST run_records folder only -- + out " ${IBlack}[${THIS_HOST}] Searching for latest ARTDAQ run record in \$USER_DATA/ARTDAQConfigurations/run_records ...${RstClr}" + + _logfind_runrecords_label="" + _logfind_runrecords_files="" + _logfind_runrecords_dir="" + + # find the latest (highest numbered) run record directory + _logfind_latest_run_dir=$(find -L "${USER_DATA}/ARTDAQConfigurations/run_records" -maxdepth 1 -mindepth 1 -type d 2>/dev/null | sort -t/ -k$(echo "${USER_DATA}/ARTDAQConfigurations/run_records/x" | tr -cd '/' | wc -c) -n | tail -1) + + if [ -n "$_logfind_latest_run_dir" ]; then + _logfind_runrecords_dir="$_logfind_latest_run_dir" + _logfind_run_number=$(basename "$_logfind_latest_run_dir") + out " ${IBlack}[${THIS_HOST}] Latest run record directory: ${_logfind_run_number}${RstClr}" + + _logfind_runrecords_files=$(find -L "$_logfind_latest_run_dir" -maxdepth 1 -type f 2>/dev/null) + if [ -n "$_logfind_runrecords_files" ]; then + # get the age label from the directory's most recent file + _logfind_runrecords_label="" + for _ll_window in "10 minutes:10" "1 hour:60" "1 day:1440" "1 week:10080" "1 month:43200" "1 year:525600"; do + _ll_label="${_ll_window%%:*}" + _ll_mmin="${_ll_window##*:}" + _ll_check=$(find -L "$_logfind_latest_run_dir" -maxdepth 1 -type f -mmin -${_ll_mmin} 2>/dev/null | head -1) + if [ -n "$_ll_check" ]; then + _logfind_runrecords_label="$_ll_label" + break + fi + done + if [ -z "$_logfind_runrecords_label" ]; then + _logfind_runrecords_label="over 1 year" + fi + out " ${IBlack}[${THIS_HOST}] Found run record ${_logfind_run_number} files (last ${_logfind_runrecords_label}).${RstClr}" + fi + fi + if [ -z "$_logfind_runrecords_files" ]; then + out " ${IBlack}[${THIS_HOST}] No ARTDAQ run records found.${RstClr}" + fi + + #-- Step 2: Remote search on each remote host via ssh + fast_ots_setup.sh logs -- + # Runs 'ots -ll' on the remote host (via fast_ots_setup.sh logs), which produces + # the same display output as ots -l but local-only. We parse the standard output + # for file paths (lines containing '--> less -R ') and the time label + # (lines containing '(local, last '). + # Skipped when LOGFIND_LOCAL_ONLY=1 (i.e. when called from ots -ll) + declare -A _logfind_remote_results # key=host, value=newline-separated file list + declare -A _logfind_remote_labels # key=host, value=time label + + if [ "${LOGFIND_LOCAL_ONLY:-0}" == "1" ]; then + out " ${IBlack}(skipping remote host search -- local-only mode)${RstClr}" + else + for _lf_host in "${_logfind_remote_hosts[@]}"; do + _lf_ssh_cmd="cd '${PWD}' && source ./fast_ots_setup.sh '${OTS_SETUP_TYPE}' logs" + + # Try SSH to the original hostname first + out " ${IBlack}[${_lf_host}] Searching via SSH:${RstClr}" + out " ${IBlack} ssh -K ${_lf_host} \"${_lf_ssh_cmd}\"${RstClr}" + + _lf_ssh_output=$(ssh -K -o ConnectTimeout=10 -o BatchMode=yes "$_lf_host" \ + "$_lf_ssh_cmd" 2>/dev/null) + _lf_ssh_rc=$? + _lf_ssh_host_used="$_lf_host" + + # If SSH failed, try stripping -data or -ipmi suffix from hostname + # (these are often private network designators that may not be SSH-reachable) + if [ $_lf_ssh_rc -ne 0 ]; then + _lf_alt_host="" + if [[ "$_lf_host" == *"-data"* ]]; then + _lf_alt_host="${_lf_host/-data/}" + elif [[ "$_lf_host" == *"-ipmi"* ]]; then + _lf_alt_host="${_lf_host/-ipmi/}" + fi + + if [ -n "$_lf_alt_host" ]; then + out " ${Yellow}[${_lf_host}] SSH failed (exit code ${_lf_ssh_rc}). Retrying with stripped hostname: ${_lf_alt_host}${RstClr}" + out " ${IBlack} ssh -K ${_lf_alt_host} \"${_lf_ssh_cmd}\"${RstClr}" + + _lf_ssh_output=$(ssh -K -o ConnectTimeout=10 -o BatchMode=yes "$_lf_alt_host" \ + "$_lf_ssh_cmd" 2>/dev/null) + _lf_ssh_rc=$? + _lf_ssh_host_used="$_lf_alt_host" + + if [ $_lf_ssh_rc -ne 0 ]; then + out " ${Red}[${_lf_host}] SSH failed to both ${_lf_host} and ${_lf_alt_host} (exit code ${_lf_ssh_rc}). Skipping.${RstClr}" + continue + fi + else + out " ${Red}[${_lf_host}] SSH failed (exit code ${_lf_ssh_rc}). Skipping.${RstClr}" + continue + fi + fi + + out " ${IBlack}[${_lf_ssh_host_used}] SSH succeeded. Parsing remote ots -ll output...${RstClr}" + + # Parse the standard display output from remote ots -ll: + # Time label from line matching: (local, last