diff --git a/README.md b/README.md index b8fa306..8e4e0d6 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,23 @@ able to arm the DAQ. If you run with the dispatcher you additionally need a collection called **detector_control**, which will supercede control as the top-level user interface. The **control** collection will still be used by the dispatcher to control the readout nodes but the user should not use it in this case. If there is just one readout node you may want to skip the dispatcher and just directly control that node for simplicity. +## Corrupt data handling (formatter) + +The formatter can now handle malformed/corrupt datapackets gracefully and continue readout while marking affected periods with artificial deadtime fragments. + +These options are read from the run/options document: + +- `graceful_corruption_handling` (int/bool, default `1`) + - `1`: catch packet/event parsing exceptions, log, skip bad packet, continue + - `0`: fail-fast behavior (exceptions propagate as before) +- `inject_deadtime_on_corrupt` (int/bool, default `1`) + - `1`: insert artificial deadtime marker when corruption/fail is handled + - `0`: skip marker insertion + +Notes: +- This mitigation does not fix underlying hardware/driver transport issues. +- Artificial deadtime markers are intended for downstream bookkeeping (e.g. veto/deadtime accounting in straxen). + ## First steps: from nothing to starting a run Install all prerequisites, a mongodb database, and the redax software as described above. If you have XENON wiki access there are some build notes on the DAQ page [here](https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:dsg:daq#reader wiki), however if you don't have access don't worry too much since everything is straight off google searches. diff --git a/StraxFormatter.cc b/StraxFormatter.cc index 21ee1dc..f669221 100644 --- a/StraxFormatter.cc +++ b/StraxFormatter.cc @@ -58,14 +58,18 @@ const std::map&, st }; StraxFormatter::StraxFormatter(std::shared_ptr& opts, std::shared_ptr& log){ + fOptions = opts; + fLog = log; fActive = true; fChunkNameLength=6; fStraxHeaderSize=24; fBytesProcessed = 0; + fCorruptPackets = 0; fInputBufferSize = 0; fOutputBufferSize = 0; + fGracefulCorruptHandling = fOptions->GetInt("graceful_corruption_handling", 1) != 0; + fInjectDeadtimeOnCorrupt = fOptions->GetInt("inject_deadtime_on_corrupt", 1) != 0; fProcTimeDP = fProcTimeEv = fProcTimeCh = fCompTime = 0.; - fOptions = opts; fChunkLength = long(fOptions->GetDouble("strax_chunk_length", 5)*1e9); // default 5s fChunkOverlap = long(fOptions->GetDouble("strax_chunk_overlap", 0.5)*1e9); // default 0.5s fFragmentBytes = fOptions->GetInt("strax_fragment_payload_bytes", 110*2); @@ -89,8 +93,6 @@ StraxFormatter::StraxFormatter(std::shared_ptr& opts, std::shared_ptrGetInt("strax_buffer_num_chunks", 2); fWarnIfChunkOlderThan = fOptions->GetInt("strax_chunk_phase_limit", 2); fMutexWaitTime.reserve(1<<20); @@ -152,6 +154,15 @@ void StraxFormatter::GenerateArtificialDeadtime(int64_t timestamp, const std::sh return; } +void StraxFormatter::HandleCorruptPacket(const std::unique_ptr& dp, const std::string& reason) { + fCorruptPackets++; + int bid = (dp && dp->digi) ? dp->digi->bid() : -1; + fLog->Entry(MongoLog::Error, "Corrupt packet on board %i: %s", bid, reason.c_str()); + if (fInjectDeadtimeOnCorrupt && dp && dp->digi) { + GenerateArtificialDeadtime(((dp->clock_counter<<31) + dp->header_time), dp->digi); + } +} + void StraxFormatter::ProcessDatapacket(std::unique_ptr dp){ // Take a buffer and break it up into one document per channel auto it = dp->buff.begin(); @@ -162,9 +173,31 @@ void StraxFormatter::ProcessDatapacket(std::unique_ptr dp){ if((*it)>>28 == 0xA){ missed = true; // it works out words = (*it)&0xFFFFFFF; + if (words < event_header_words) { + if (fGracefulCorruptHandling) { + HandleCorruptPacket(dp, "Event header word-count < minimum"); + break; + } + throw std::runtime_error("Corrupt event: word-count < minimum"); + } + if ((size_t)words > (size_t)std::distance(it, dp->buff.end())) { + if (fGracefulCorruptHandling) { + HandleCorruptPacket(dp, "Event word-count exceeds remaining datapacket size"); + break; + } + throw std::runtime_error("Corrupt event: word-count exceeds remaining datapacket size"); + } std::u32string_view sv(dp->buff.data() + std::distance(dp->buff.begin(), it), words); // std::u32string_view sv(it, it+words); //c++20 :( - ProcessEvent(sv, dp, dpc); + try { + ProcessEvent(sv, dp, dpc); + } catch (const std::exception& e) { + if (fGracefulCorruptHandling) { + HandleCorruptPacket(dp, std::string("Exception while processing event: ") + e.what()); + break; + } + throw; + } evs_this_dp++; it += words; } else { @@ -195,11 +228,22 @@ int StraxFormatter::ProcessEvent(std::u32string_view buff, const std::unique_ptr& dp, std::map& dpc) { // buff = start of event + if (buff.size() < event_header_words) { + throw std::runtime_error("Event shorter than header"); + } // returns {words this event, channel mask, board fail, header timestamp} auto [words, channel_mask, fail, event_time] = dp->digi->UnpackEventHeader(buff); + if (words < event_header_words) { + throw std::runtime_error("Invalid event word-count in header"); + } + if ((size_t)words > buff.size()) { + throw std::runtime_error("Event header word-count exceeds event view size"); + } if(fail){ // board fail - //GenerateArtificialDeadtime(((dp->clock_counter<<31) + dp->header_time), dp->digi); + if (fInjectDeadtimeOnCorrupt) { + GenerateArtificialDeadtime(((dp->clock_counter<<31) + dp->header_time), dp->digi); + } dp->digi->CheckFail(true); fFailCounter[dp->digi->bid()]++; return event_header_words; @@ -213,6 +257,9 @@ int StraxFormatter::ProcessEvent(std::u32string_view buff, for(unsigned ch=0; ch buff.size()) { + throw std::runtime_error("Invalid channel size while processing event"); + } buff.remove_prefix(ret); } } @@ -229,6 +276,9 @@ int StraxFormatter::ProcessChannel(std::u32string_view buff, int words_in_event, // returns {timestamp (ns), words this channel, baseline, waveform} auto [timestamp, channel_words, baseline_ch, wf] = dp->digi->UnpackChannelHeader( buff, dp->clock_counter, dp->header_time, event_time, words_in_event, n_channels, channel); + if (channel_words < 2 || (size_t)channel_words > buff.size()) { + throw std::runtime_error("Corrupt channel header/size"); + } uint32_t samples_in_pulse = wf.size()*sizeof(char32_t)/sizeof(uint16_t); uint16_t sw = dp->digi->SampleWidth(); @@ -336,7 +386,12 @@ void StraxFormatter::Process() { dp = std::move(fBuffer.front()); fBuffer.pop_front(); lk.unlock(); - ProcessDatapacket(std::move(dp)); + try { + ProcessDatapacket(std::move(dp)); + } catch (const std::exception& e) { + fLog->Entry(MongoLog::Error, "Fatal formatter exception in thread %lx: %s", fThreadId, e.what()); + if (!fGracefulCorruptHandling) throw; + } if (fActive == true) WriteOutChunks(); } else { lk.unlock(); @@ -345,6 +400,9 @@ void StraxFormatter::Process() { if (fBytesProcessed > 0) End(); if (fMutexWaitTime.size() > 0) std::sort(fMutexWaitTime.begin(), fMutexWaitTime.end()); + if (fCorruptPackets > 0) { + fLog->Entry(MongoLog::Warning, "Thread %lx handled %i corrupt datapacket(s) gracefully", fThreadId, fCorruptPackets); + } } void StraxFormatter::WriteOutChunk(int chunk_i){ @@ -486,4 +544,3 @@ std::vector StraxFormatter::GetChunkNames(int chunk) { GetStringFormat(chunk+1)+"_pre"}}; return ret; } - diff --git a/StraxFormatter.hh b/StraxFormatter.hh index 9db9645..a87e52e 100644 --- a/StraxFormatter.hh +++ b/StraxFormatter.hh @@ -72,6 +72,7 @@ private: void WriteOutChunks(); void End(); void GenerateArtificialDeadtime(int64_t, const std::shared_ptr&); + void HandleCorruptPacket(const std::unique_ptr&, const std::string&); void AddFragmentToBuffer(std::string, uint32_t, int); std::vector GetChunkNames(int); @@ -105,6 +106,9 @@ private: std::map fBytesPerChunk; std::atomic_int fInputBufferSize, fOutputBufferSize; long fBytesProcessed; + bool fGracefulCorruptHandling; + bool fInjectDeadtimeOnCorrupt; + long fCorruptPackets; double fProcTimeDP, fProcTimeEv, fProcTimeCh, fCompTime; std::thread::id fThreadId;