Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ able to arm the DAQ.

If you run with the dispatcher you additionally need a collection called **detector_control**, which will supercede control as the top-level user interface. The **control** collection will still be used by the dispatcher to control the readout nodes but the user should not use it in this case. If there is just one readout node you may want to skip the dispatcher and just directly control that node for simplicity.

## Corrupt data handling (formatter)

The formatter can now handle malformed/corrupt datapackets gracefully and continue readout while marking affected periods with artificial deadtime fragments.

These options are read from the run/options document:

- `graceful_corruption_handling` (int/bool, default `1`)
- `1`: catch packet/event parsing exceptions, log, skip bad packet, continue
- `0`: fail-fast behavior (exceptions propagate as before)
- `inject_deadtime_on_corrupt` (int/bool, default `1`)
- `1`: insert artificial deadtime marker when corruption/fail is handled
- `0`: skip marker insertion

Notes:
- This mitigation does not fix underlying hardware/driver transport issues.
- Artificial deadtime markers are intended for downstream bookkeeping (e.g. veto/deadtime accounting in straxen).

## First steps: from nothing to starting a run

Install all prerequisites, a mongodb database, and the redax software as described above. If you have XENON wiki access there are some build notes on the DAQ page [here](https://xe1t-wiki.lngs.infn.it/doku.php?id=xenon:xenonnt:dsg:daq#reader wiki), however if you don't have access don't worry too much since everything is straight off google searches.
Expand Down
71 changes: 64 additions & 7 deletions StraxFormatter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,18 @@ const std::map<std::string, std::function<long(std::shared_ptr<std::string>&, st
};

StraxFormatter::StraxFormatter(std::shared_ptr<Options>& opts, std::shared_ptr<MongoLog>& log){
fOptions = opts;
fLog = log;
fActive = true;
fChunkNameLength=6;
fStraxHeaderSize=24;
fBytesProcessed = 0;
fCorruptPackets = 0;
fInputBufferSize = 0;
fOutputBufferSize = 0;
fGracefulCorruptHandling = fOptions->GetInt("graceful_corruption_handling", 1) != 0;
fInjectDeadtimeOnCorrupt = fOptions->GetInt("inject_deadtime_on_corrupt", 1) != 0;
fProcTimeDP = fProcTimeEv = fProcTimeCh = fCompTime = 0.;
fOptions = opts;
fChunkLength = long(fOptions->GetDouble("strax_chunk_length", 5)*1e9); // default 5s
fChunkOverlap = long(fOptions->GetDouble("strax_chunk_overlap", 0.5)*1e9); // default 0.5s
fFragmentBytes = fOptions->GetInt("strax_fragment_payload_bytes", 110*2);
Expand All @@ -89,8 +93,6 @@ StraxFormatter::StraxFormatter(std::shared_ptr<Options>& opts, std::shared_ptr<M
}

fEmptyVerified = 0;
fLog = log;

fBufferNumChunks = fOptions->GetInt("strax_buffer_num_chunks", 2);
fWarnIfChunkOlderThan = fOptions->GetInt("strax_chunk_phase_limit", 2);
fMutexWaitTime.reserve(1<<20);
Expand Down Expand Up @@ -152,6 +154,15 @@ void StraxFormatter::GenerateArtificialDeadtime(int64_t timestamp, const std::sh
return;
}

void StraxFormatter::HandleCorruptPacket(const std::unique_ptr<data_packet>& dp, const std::string& reason) {
fCorruptPackets++;
int bid = (dp && dp->digi) ? dp->digi->bid() : -1;
fLog->Entry(MongoLog::Error, "Corrupt packet on board %i: %s", bid, reason.c_str());
if (fInjectDeadtimeOnCorrupt && dp && dp->digi) {
GenerateArtificialDeadtime(((dp->clock_counter<<31) + dp->header_time), dp->digi);
}
}

void StraxFormatter::ProcessDatapacket(std::unique_ptr<data_packet> dp){
// Take a buffer and break it up into one document per channel
auto it = dp->buff.begin();
Expand All @@ -162,9 +173,31 @@ void StraxFormatter::ProcessDatapacket(std::unique_ptr<data_packet> dp){
if((*it)>>28 == 0xA){
missed = true; // it works out
words = (*it)&0xFFFFFFF;
if (words < event_header_words) {
if (fGracefulCorruptHandling) {
HandleCorruptPacket(dp, "Event header word-count < minimum");
break;
}
throw std::runtime_error("Corrupt event: word-count < minimum");
}
if ((size_t)words > (size_t)std::distance(it, dp->buff.end())) {
if (fGracefulCorruptHandling) {
HandleCorruptPacket(dp, "Event word-count exceeds remaining datapacket size");
break;
}
throw std::runtime_error("Corrupt event: word-count exceeds remaining datapacket size");
}
std::u32string_view sv(dp->buff.data() + std::distance(dp->buff.begin(), it), words);
// std::u32string_view sv(it, it+words); //c++20 :(
ProcessEvent(sv, dp, dpc);
try {
ProcessEvent(sv, dp, dpc);
} catch (const std::exception& e) {
if (fGracefulCorruptHandling) {
HandleCorruptPacket(dp, std::string("Exception while processing event: ") + e.what());
break;
}
throw;
}
evs_this_dp++;
it += words;
} else {
Expand Down Expand Up @@ -195,11 +228,22 @@ int StraxFormatter::ProcessEvent(std::u32string_view buff,
const std::unique_ptr<data_packet>& dp, std::map<int, int>& dpc) {
// buff = start of event

if (buff.size() < event_header_words) {
throw std::runtime_error("Event shorter than header");
}
// returns {words this event, channel mask, board fail, header timestamp}
auto [words, channel_mask, fail, event_time] = dp->digi->UnpackEventHeader(buff);
if (words < event_header_words) {
throw std::runtime_error("Invalid event word-count in header");
}
if ((size_t)words > buff.size()) {
throw std::runtime_error("Event header word-count exceeds event view size");
}

if(fail){ // board fail
//GenerateArtificialDeadtime(((dp->clock_counter<<31) + dp->header_time), dp->digi);
if (fInjectDeadtimeOnCorrupt) {
GenerateArtificialDeadtime(((dp->clock_counter<<31) + dp->header_time), dp->digi);
}
dp->digi->CheckFail(true);
fFailCounter[dp->digi->bid()]++;
return event_header_words;
Expand All @@ -213,6 +257,9 @@ int StraxFormatter::ProcessEvent(std::u32string_view buff,
for(unsigned ch=0; ch<n_chan; ch++){
if (channel_mask & (1<<ch)) {
ret = ProcessChannel(buff, words, channel_mask, event_time, frags, ch, dp, dpc);
if (ret <= 0 || (size_t)ret > buff.size()) {
throw std::runtime_error("Invalid channel size while processing event");
}
buff.remove_prefix(ret);
}
}
Expand All @@ -229,6 +276,9 @@ int StraxFormatter::ProcessChannel(std::u32string_view buff, int words_in_event,
// returns {timestamp (ns), words this channel, baseline, waveform}
auto [timestamp, channel_words, baseline_ch, wf] = dp->digi->UnpackChannelHeader(
buff, dp->clock_counter, dp->header_time, event_time, words_in_event, n_channels, channel);
if (channel_words < 2 || (size_t)channel_words > buff.size()) {
throw std::runtime_error("Corrupt channel header/size");
}

uint32_t samples_in_pulse = wf.size()*sizeof(char32_t)/sizeof(uint16_t);
uint16_t sw = dp->digi->SampleWidth();
Expand Down Expand Up @@ -336,7 +386,12 @@ void StraxFormatter::Process() {
dp = std::move(fBuffer.front());
fBuffer.pop_front();
lk.unlock();
ProcessDatapacket(std::move(dp));
try {
ProcessDatapacket(std::move(dp));
} catch (const std::exception& e) {
fLog->Entry(MongoLog::Error, "Fatal formatter exception in thread %lx: %s", fThreadId, e.what());
if (!fGracefulCorruptHandling) throw;
}
if (fActive == true) WriteOutChunks();
} else {
lk.unlock();
Expand All @@ -345,6 +400,9 @@ void StraxFormatter::Process() {
if (fBytesProcessed > 0)
End();
if (fMutexWaitTime.size() > 0) std::sort(fMutexWaitTime.begin(), fMutexWaitTime.end());
if (fCorruptPackets > 0) {
fLog->Entry(MongoLog::Warning, "Thread %lx handled %i corrupt datapacket(s) gracefully", fThreadId, fCorruptPackets);
}
}

void StraxFormatter::WriteOutChunk(int chunk_i){
Expand Down Expand Up @@ -486,4 +544,3 @@ std::vector<std::string> StraxFormatter::GetChunkNames(int chunk) {
GetStringFormat(chunk+1)+"_pre"}};
return ret;
}

4 changes: 4 additions & 0 deletions StraxFormatter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private:
void WriteOutChunks();
void End();
void GenerateArtificialDeadtime(int64_t, const std::shared_ptr<V1724>&);
void HandleCorruptPacket(const std::unique_ptr<data_packet>&, const std::string&);
void AddFragmentToBuffer(std::string, uint32_t, int);
std::vector<std::string> GetChunkNames(int);

Expand Down Expand Up @@ -105,6 +106,9 @@ private:
std::map<int, long> fBytesPerChunk;
std::atomic_int fInputBufferSize, fOutputBufferSize;
long fBytesProcessed;
bool fGracefulCorruptHandling;
bool fInjectDeadtimeOnCorrupt;
long fCorruptPackets;

double fProcTimeDP, fProcTimeEv, fProcTimeCh, fCompTime;
std::thread::id fThreadId;
Expand Down