diff --git a/.gitignore b/.gitignore index 28d716cf..7a2ff941 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ __pycache__ # clang formatting temporary files .clang-format + +# ctest generated artifacts +Testing/Temporary/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 2bb3cea4..9744bcc3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,15 +23,15 @@ daq_protobuf_codegen( opmon/*.proto ) ############################################################################## daq_add_library( TriggerInhibitAgent.cpp TriggerRecordBuilderData.cpp TPBundleHandler.cpp - LINK_LIBRARIES + LINK_LIBRARIES opmonlib::opmonlib ers::ers HighFive appfwk::appfwk logging::logging stdc++fs dfmessages::dfmessages utilities::utilities trigger::trigger detdataformats::detdataformats trgdataformats::trgdataformats) - + daq_add_plugin( HDF5DataStore duneDataStore LINK_LIBRARIES dfmodules hdf5libs::hdf5libs stdc++fs) daq_add_plugin( FragmentAggregatorModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) daq_add_plugin( DataWriterModule duneDAQModule LINK_LIBRARIES dfmodules hdf5libs::hdf5libs iomanager::iomanager ) -daq_add_plugin( DFOModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) +daq_add_plugin( DFOModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) daq_add_plugin( TRBModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) daq_add_plugin( TRMonRequestorModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) daq_add_plugin( FakeDataProdModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager) diff --git a/include/dfmodules/DFODecision.hpp b/include/dfmodules/DFODecision.hpp new file mode 100644 index 00000000..48f8918f --- /dev/null +++ b/include/dfmodules/DFODecision.hpp @@ -0,0 +1,63 @@ +/** + * @file DFODecision.hpp DFODecision message used by DFOConsensusModule. + * + * A DFODecision is broadcast by the responsible DFOConsensusModule to all + * peer DFOs after each TRBModule state change (trigger assignment or + * completion). This allows every DFO in the ensemble to maintain an accurate + * per-TRBModule slot-usage view and issue correct TriggerInhibit messages to + * the MLT. + * + * Fields + * ------ + * run_number – run in which this decision was made + * trigger_number – the TriggerDecision that triggered the state change + * trb_connection_name– connection-ID of the TRBModule whose slot count changed + * trb_slot_count – absolute slot count for that TRB *after* the change + * source_dfo_name – name() of the DFO that generated this message + * is_completion – false = new assignment; true = TRB completed a trigger + * + * Serialisation + * ------------- + * nlohmann/json (to_json / from_json) is provided via + * NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE. For in-process Queue transport this is + * not required; for network (ZMQ / kSendRecv) connections production + * deployments should additionally register a msgpack serialiser in dfmessages. + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#ifndef DFMODULES_INCLUDE_DFMODULES_DFODECISION_HPP_ +#define DFMODULES_INCLUDE_DFMODULES_DFODECISION_HPP_ + +#include "daqdataformats/Types.hpp" +#include "nlohmann/json.hpp" + +#include + +namespace dunedaq { +namespace dfmodules { + +struct DFODecision +{ + daqdataformats::run_number_t run_number{ 0 }; + daqdataformats::trigger_number_t trigger_number{ 0 }; + std::string trb_connection_name; ///< Connection-ID of the TRBModule involved + size_t trb_slot_count{ 0 }; ///< TRB slot count after this event + std::string source_dfo_name; ///< Name of the DFO that generated this message + bool is_completion{ false }; ///< true = completion; false = new assignment +}; + +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(DFODecision, + run_number, + trigger_number, + trb_connection_name, + trb_slot_count, + source_dfo_name, + is_completion) + +} // namespace dfmodules +} // namespace dunedaq + +#endif // DFMODULES_INCLUDE_DFMODULES_DFODECISION_HPP_ diff --git a/plugins/DFOModule.cpp b/plugins/DFOModule.cpp index 6c66e37e..7c42ea6e 100644 --- a/plugins/DFOModule.cpp +++ b/plugins/DFOModule.cpp @@ -16,6 +16,7 @@ #include "iomanager/IOManager.hpp" #include "logging/Logging.hpp" +#include #include #include #include @@ -23,8 +24,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -35,8 +38,12 @@ enum { TLVL_ENTER_EXIT_METHODS = 5, + TLVL_PEER_ANNOUNCE = 6, TLVL_CONFIG = 7, TLVL_WORK_STEPS = 10, + TLVL_TD_FILTER = 11, + TLVL_DFO_DECISION = 12, + TLVL_WATCHDOG = 13, TLVL_TRIGDEC_RECEIVED = 21, TLVL_NOTIFY_TRIGGER = 22, TLVL_DISPATCH_TO_TRB = 23, @@ -47,8 +54,6 @@ namespace dunedaq::dfmodules { DFOModule::DFOModule(const std::string& name) : dunedaq::appfwk::DAQModule(name) - , m_queue_timeout(100) - , m_run_number(0) { register_command("conf", &DFOModule::do_conf); register_command("start", &DFOModule::do_start); @@ -74,6 +79,10 @@ DFOModule::init(std::shared_ptr mcfg) if (con->get_data_type() == datatype_to_string()) { m_td_connection = con->UID(); } + if (con->get_data_type() == datatype_to_string()) { + m_dfo_decision_input_connection = con->UID(); + TLOG_DEBUG(TLVL_DFO_DECISION) << get_name() << ": Found DFODecision input connection: " << con->UID(); + } } for (auto con : mdal->get_outputs()) { if (con->get_data_type() == datatype_to_string()) { @@ -82,24 +91,36 @@ DFOModule::init(std::shared_ptr mcfg) if (con->get_data_type() == datatype_to_string()) { m_trb_conn_ids.push_back(con->UID()); } + if (con->get_data_type() == datatype_to_string()) { + m_dfo_decision_output_connections.push_back(con->UID()); + TLOG_DEBUG(TLVL_DFO_DECISION) << get_name() << ": Found DFODecision output connection: " << con->UID(); + } } - if (m_token_connection == "") { + if (m_token_connection.empty()) { throw appfwk::MissingConnection( ERS_HERE, get_name(), datatype_to_string(), "input"); } - if (m_td_connection == "") { + if (m_td_connection.empty()) { throw appfwk::MissingConnection(ERS_HERE, get_name(), datatype_to_string(), "input"); } if (m_busy_sender == nullptr) { throw appfwk::MissingConnection(ERS_HERE, get_name(), datatype_to_string(), "output"); - } m_dfo_conf = mdal->get_configuration(); + m_consensus_enabled = m_dfo_conf->get_consensus_enabled(); + m_expected_peers = m_consensus_enabled ? m_dfo_decision_output_connections.size() : 0; + // these are just tests to check if the connections are ok iom->get_receiver(m_token_connection); iom->get_receiver(m_td_connection); + if (m_consensus_enabled && !m_dfo_decision_input_connection.empty()) { + iom->get_receiver(m_dfo_decision_input_connection); + } + + TLOG() << get_name() << ": DFOModule initialized in " << (m_consensus_enabled ? "consensus" : "standalone") + << " mode with " << m_expected_peers << " expected peer DFO(s)"; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method"; } @@ -113,9 +134,12 @@ DFOModule::do_conf(const CommandData_t&) m_stop_timeout = std::chrono::milliseconds(m_dfo_conf->get_stop_timeout_ms()); m_busy_threshold = m_dfo_conf->get_busy_threshold(); m_free_threshold = m_dfo_conf->get_free_threshold(); - m_td_send_retries = m_dfo_conf->get_td_send_retries(); + m_dfo_decision_timeout = std::chrono::milliseconds(m_dfo_conf->get_dfo_decision_timeout_ms()); + m_peer_announce_timeout = std::chrono::milliseconds(m_dfo_conf->get_peer_announce_timeout_ms()); + m_watchdog_interval = std::chrono::milliseconds(m_dfo_conf->get_watchdog_interval_ms()); + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method, there are " << m_dataflow_availability.size() << " TRB apps defined"; } @@ -126,12 +150,34 @@ DFOModule::do_start(const CommandData_t& payload) TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; m_received_tokens = 0; + m_sent_decisions = 0; + m_received_decisions = 0; + m_waiting_for_decision = 0; + m_deciding_destination = 0; + m_forwarding_decision = 0; + m_waiting_for_token = 0; + m_processing_token = 0; + + { + std::lock_guard guard(m_peers_mutex); + m_registered_peers.clear(); + } + { + std::lock_guard guard(m_remote_slots_mutex); + m_remote_slot_counts.clear(); + } + { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds.clear(); + } + m_own_index.store(0); + m_num_dfos.store(1); + m_run_number = payload.value("run", 0); m_running_status.store(true); m_last_notified_busy.store(false); - m_last_assignement_it = m_dataflow_availability.end(); - + m_last_assignment_it = m_dataflow_availability.end(); m_last_token_received = m_last_td_received = std::chrono::steady_clock::now(); // 19-Dec-2024, KAB: check that TriggerDecision senders are ready to send. This is done @@ -145,18 +191,47 @@ DFOModule::do_start(const CommandData_t& payload) bool is_ready = m_busy_sender->is_ready_for_sending(std::chrono::milliseconds(100)); TLOG_DEBUG(0) << "The sender for TriggerInhibit messages " << (is_ready ? "is" : "is not") << " ready."; } - for (auto trb_conn : m_trb_conn_ids) { + for (const auto& trb_conn : m_trb_conn_ids) { auto sender = iom->get_sender(trb_conn); if (sender != nullptr) { bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100)); - TLOG_DEBUG(0) << "The TriggerDecision sender for " << trb_conn << " " << (is_ready ? "is" : "is not") << " ready."; + TLOG_DEBUG(0) << "The TriggerDecision sender for " << trb_conn << " " << (is_ready ? "is" : "is not") + << " ready."; } } + iom->add_callback( - m_token_connection, std::bind(&DFOModule::receive_trigger_complete_token, this, std::placeholders::_1)); + m_token_connection, std::bind(&DFOModule::on_token, this, std::placeholders::_1)); iom->add_callback( - m_td_connection, std::bind(&DFOModule::receive_trigger_decision, this, std::placeholders::_1)); + m_td_connection, std::bind(&DFOModule::on_trigger_decision, this, std::placeholders::_1)); + + if (m_consensus_enabled && !m_dfo_decision_input_connection.empty()) { + iom->add_callback( + m_dfo_decision_input_connection, + std::bind(&DFOModule::on_dfo_decision, this, std::placeholders::_1)); + } + + if (m_consensus_enabled) { + send_peer_announcement(); + + if (m_expected_peers > 0) { + std::unique_lock lock(m_peers_mutex); + bool all_peers_ready = m_peers_cv.wait_for(lock, m_peer_announce_timeout, [this] { + return m_registered_peers.size() >= m_expected_peers; + }); + if (!all_peers_ready) { + ers::warning(DFOConsensusPeerTimeout( + ERS_HERE, get_name(), m_expected_peers, m_registered_peers.size())); + } + } + + compute_partition(); + ers::info(DFOConsensusPartitionInfo(ERS_HERE, get_name(), m_own_index.load(), m_num_dfos.load())); + + m_watchdog_running.store(true); + m_watchdog_thread = std::thread(&DFOModule::watchdog_thread_func, this); + } TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method"; } @@ -166,6 +241,11 @@ DFOModule::do_stop(const CommandData_t& /*args*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method"; + m_watchdog_running.store(false); + if (m_watchdog_thread.joinable()) { + m_watchdog_thread.join(); + } + m_running_status.store(false); auto iom = iomanager::IOManager::get(); @@ -175,12 +255,15 @@ DFOModule::do_stop(const CommandData_t& /*args*/) auto step_timeout = m_stop_timeout / wait_steps; int step_counter = 0; while (!is_empty() && step_counter < wait_steps) { - TLOG() << get_name() << ": stop delayed while waiting for " << used_slots() << " TDs to completed"; + TLOG() << get_name() << ": stop delayed while waiting for " << used_slots() << " TDs to complete"; std::this_thread::sleep_for(step_timeout); ++step_counter; } iom->remove_callback(m_token_connection); + if (m_consensus_enabled && !m_dfo_decision_input_connection.empty()) { + iom->remove_callback(m_dfo_decision_input_connection); + } std::list> remnants; for (auto& app : m_dataflow_availability) { @@ -194,9 +277,20 @@ DFOModule::do_stop(const CommandData_t& /*args*/) ers::error(IncompleteTriggerDecision(ERS_HERE, r->decision.trigger_number, m_run_number)); } + { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds.clear(); + } + { + std::lock_guard guard(m_remote_slots_mutex); + m_remote_slot_counts.clear(); + } + m_own_index.store(0); + m_num_dfos.store(1); + std::lock_guard guard(m_trigger_counters_mutex); m_trigger_counters.clear(); - + TLOG() << get_name() << " successfully stopped"; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method"; } @@ -207,11 +301,152 @@ DFOModule::do_scrap(const CommandData_t& /*args*/) TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method"; m_dataflow_availability.clear(); + { + std::lock_guard guard(m_peers_mutex); + m_registered_peers.clear(); + } + { + std::lock_guard guard(m_remote_slots_mutex); + m_remote_slot_counts.clear(); + } + { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds.clear(); + } TLOG() << get_name() << " successfully scrapped"; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method"; } +void +DFOModule::on_token(const dfmessages::TriggerDecisionToken& token) +{ + receive_trigger_complete_token(token); +} + +void +DFOModule::on_trigger_decision(const dfmessages::TriggerDecision& decision) +{ + if (!m_consensus_enabled) { + receive_trigger_decision(decision); + return; + } + + { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds[decision.trigger_number] = { decision, std::chrono::steady_clock::now() }; + } + + size_t num_dfos = m_num_dfos.load(); + size_t own_index = m_own_index.load(); + + if (num_dfos > 1 && (decision.trigger_number % num_dfos) != own_index) { + TLOG_DEBUG(TLVL_TD_FILTER) << get_name() << ": Buffered trigger_number " << decision.trigger_number + << " awaiting DFODecision from partition " + << (decision.trigger_number % num_dfos); + return; + } + + receive_trigger_decision(decision); +} + +void +DFOModule::on_dfo_decision(const DFODecision& msg) +{ + if (!m_consensus_enabled) { + return; + } + + TLOG_DEBUG(TLVL_DFO_DECISION) << get_name() << ": Received DFODecision from " << msg.source_dfo_name + << " trigger=" << msg.trigger_number << " trb=" << msg.trb_connection_name + << " slots=" << msg.trb_slot_count + << " completion=" << std::boolalpha << msg.is_completion; + + if (msg.run_number == 0 && msg.trigger_number == s_peer_announce_magic) { + TLOG_DEBUG(TLVL_PEER_ANNOUNCE) << get_name() << ": Received peer announcement from " << msg.source_dfo_name; + bool newly_registered = false; + { + std::lock_guard guard(m_peers_mutex); + auto [it, inserted] = m_registered_peers.insert(msg.source_dfo_name); + newly_registered = inserted; + } + m_peers_cv.notify_all(); + + if (newly_registered) { + compute_partition(); + ers::info(DFOConsensusPartitionInfo(ERS_HERE, get_name(), m_own_index.load(), m_num_dfos.load())); + } + return; + } + + if (msg.run_number != m_run_number) { + return; + } + + { + std::lock_guard guard(m_remote_slots_mutex); + m_remote_slot_counts[msg.source_dfo_name][msg.trb_connection_name] = msg.trb_slot_count; + } + + if (!msg.is_completion) { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds.erase(msg.trigger_number); + } + + notify_trigger_if_needed(); +} + +void +DFOModule::send_peer_announcement() +{ + if (!m_consensus_enabled || m_dfo_decision_output_connections.empty()) { + return; + } + + DFODecision announcement; + announcement.run_number = 0; + announcement.trigger_number = s_peer_announce_magic; + announcement.trb_connection_name = ""; + announcement.trb_slot_count = 0; + announcement.source_dfo_name = get_name(); + announcement.is_completion = true; + + auto iom = iomanager::IOManager::get(); + for (const auto& conn : m_dfo_decision_output_connections) { + try { + auto announcement_copy = announcement; + iom->get_sender(conn)->send(std::move(announcement_copy), m_queue_timeout); + TLOG_DEBUG(TLVL_PEER_ANNOUNCE) << get_name() << ": Sent peer announcement to " << conn; + } catch (const ers::Issue& excpt) { + ers::warning(excpt); + } + } +} + +void +DFOModule::compute_partition() +{ + std::vector ensemble; + { + std::lock_guard guard(m_peers_mutex); + ensemble.push_back(get_name()); + for (const auto& peer : m_registered_peers) { + ensemble.push_back(peer); + } + } + + std::sort(ensemble.begin(), ensemble.end()); + + auto it = std::find(ensemble.begin(), ensemble.end(), get_name()); + size_t own_index = (it != ensemble.end()) ? static_cast(std::distance(ensemble.begin(), it)) : 0; + + m_own_index.store(own_index); + m_num_dfos.store(ensemble.size()); + + TLOG_DEBUG(TLVL_TD_FILTER) << get_name() << ": Partition computed: index=" << own_index << " of " + << ensemble.size() << " DFO(s)"; +} + void DFOModule::receive_trigger_decision(const dfmessages::TriggerDecision& decision) { @@ -227,10 +462,10 @@ DFOModule::receive_trigger_decision(const dfmessages::TriggerDecision& decision) auto decision_received = std::chrono::steady_clock::now(); ++m_received_decisions; auto trigger_types = unpack_types(decision.trigger_type); - for ( const auto t : trigger_types ) { + for (const auto t : trigger_types) { ++get_trigger_counter(t).received; } - + std::chrono::steady_clock::time_point decision_assigned; do { @@ -291,7 +526,7 @@ DFOModule::find_slot(const dfmessages::TriggerDecision& decision) size_t minimum = std::numeric_limits::max(); unsigned int counter = 0; - auto candidate_it = m_last_assignement_it; + auto candidate_it = m_last_assignment_it; if (candidate_it == m_dataflow_availability.end()) candidate_it = m_dataflow_availability.begin(); @@ -318,7 +553,7 @@ DFOModule::find_slot(const dfmessages::TriggerDecision& decision) continue; output = candidate_it->second->make_assignment(decision); - m_last_assignement_it = candidate_it; + m_last_assignment_it = candidate_it; } if (!output) { @@ -327,7 +562,7 @@ DFOModule::find_slot(const dfmessages::TriggerDecision& decision) // number of assignments if (minimum_occupied != m_dataflow_availability.end()) { output = minimum_occupied->second->make_assignment(decision); - m_last_assignement_it = minimum_occupied; + m_last_assignment_it = minimum_occupied; ers::warning(AssignedToBusyApp(ERS_HERE, decision.trigger_number, minimum_occupied->first, minimum)); } } @@ -340,11 +575,11 @@ DFOModule::find_slot(const dfmessages::TriggerDecision& decision) } void -DFOModule::generate_opmon_data() +DFOModule::generate_opmon_data() { opmon::DFOInfo info; - info.set_tokens_received( m_received_tokens.exchange(0) ); + info.set_tokens_received(m_received_tokens.exchange(0)); info.set_decisions_sent(m_sent_decisions.exchange(0)); info.set_decisions_received(m_received_decisions.exchange(0)); info.set_waiting_for_decision(m_waiting_for_decision.exchange(0)); @@ -352,16 +587,16 @@ DFOModule::generate_opmon_data() info.set_forwarding_decision(m_forwarding_decision.exchange(0)); info.set_waiting_for_token(m_waiting_for_token.exchange(0)); info.set_processing_token(m_processing_token.exchange(0)); - publish( std::move(info) ); + publish(std::move(info)); - std::lock_guard guard(m_trigger_counters_mutex); - for ( auto & [type, counts] : m_trigger_counters ) { + std::lock_guard guard(m_trigger_counters_mutex); + for (auto& [type, counts] : m_trigger_counters) { opmon::TriggerInfo ti; ti.set_received(counts.received.exchange(0)); ti.set_completed(counts.completed.exchange(0)); auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type]; - publish( std::move(ti), {{"type", name}} ); - } + publish(std::move(ti), { { "type", name } }); + } } void @@ -406,7 +641,12 @@ DFOModule::receive_trigger_complete_token(const dfmessages::TriggerDecisionToken try { auto dec_ptr = app_it->second->complete_assignment(token.trigger_number, m_metadata_function); auto trigger_types = unpack_types(dec_ptr->decision.trigger_type); - for ( const auto t : trigger_types ) ++ get_trigger_counter(t).completed; + for (const auto t : trigger_types) + ++get_trigger_counter(t).completed; + + if (m_consensus_enabled) { + broadcast_dfo_decision(token.trigger_number, token.decision_destination, app_it->second->used_slots(), true); + } } catch (AssignedTriggerDecisionNotFound const& err) { ers::error(err); } @@ -428,9 +668,44 @@ DFOModule::receive_trigger_complete_token(const dfmessages::TriggerDecisionToken bool DFOModule::is_busy() const { - for (auto& dfapp : m_dataflow_availability) { - if (!dfapp.second->is_busy()) + if (!m_consensus_enabled) { + for (auto& dfapp : m_dataflow_availability) { + if (!dfapp.second->is_busy()) + return false; + } + return true; + } + + return is_globally_busy(); +} + +bool +DFOModule::is_globally_busy() const +{ + if (m_dataflow_availability.empty()) { + return true; + } + + std::map peer_totals; + { + std::lock_guard guard(m_remote_slots_mutex); + for (const auto& peer_slots : m_remote_slot_counts) { + for (const auto& [trb_conn, count] : peer_slots.second) { + peer_totals[trb_conn] += count; + } + } + } + + for (const auto& [trb_conn, trb_data] : m_dataflow_availability) { + size_t total_slots = trb_data->used_slots(); + auto peer_it = peer_totals.find(trb_conn); + if (peer_it != peer_totals.end()) { + total_slots += peer_it->second; + } + + if (total_slots < m_busy_threshold) { return false; + } } return true; } @@ -497,26 +772,29 @@ DFOModule::dispatch(const std::shared_ptr& assignment) << assignment->connection_name; bool wasSentSuccessfully = false; - int retries = m_td_send_retries; + int retries = m_td_send_retries > static_cast(std::numeric_limits::max()) + ? std::numeric_limits::max() + : static_cast(m_td_send_retries); auto iom = iomanager::IOManager::get(); do { try { auto decision_copy = dfmessages::TriggerDecision(assignment->decision); - iom->get_sender(assignment->connection_name) - ->send(std::move(decision_copy), m_queue_timeout); + iom->get_sender(assignment->connection_name)->send(std::move(decision_copy), + m_queue_timeout); wasSentSuccessfully = true; ++m_sent_decisions; TLOG_DEBUG(TLVL_DISPATCH_TO_TRB) << get_name() << " Sent TriggerDecision for trigger_number " - << decision_copy.trigger_number << " to TRB at connection " - << assignment->connection_name << " for run number " << decision_copy.run_number; + << assignment->decision.trigger_number << " to TRB at connection " + << assignment->connection_name << " for run number " + << assignment->decision.run_number; } catch (const ers::Issue& excpt) { std::ostringstream oss_warn; oss_warn << "Send to connection \"" << assignment->connection_name << "\" failed"; ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt)); } - retries--; + --retries; } while (!wasSentSuccessfully && m_running_status.load() && retries > 0); @@ -524,10 +802,128 @@ DFOModule::dispatch(const std::shared_ptr& assignment) return wasSentSuccessfully; } +void +DFOModule::broadcast_dfo_decision(daqdataformats::trigger_number_t trigger_number, + const std::string& trb_conn, + size_t trb_slot_count, + bool is_completion) +{ + if (!m_consensus_enabled || m_dfo_decision_output_connections.empty()) + return; + + DFODecision msg; + msg.run_number = m_run_number; + msg.trigger_number = trigger_number; + msg.trb_connection_name = trb_conn; + msg.trb_slot_count = trb_slot_count; + msg.source_dfo_name = get_name(); + msg.is_completion = is_completion; + + auto iom = iomanager::IOManager::get(); + for (const auto& conn : m_dfo_decision_output_connections) { + try { + auto msg_copy = msg; + iom->get_sender(conn)->send(std::move(msg_copy), m_queue_timeout); + TLOG_DEBUG(TLVL_DFO_DECISION) << get_name() << ": Sent DFODecision to " << conn + << " trigger=" << trigger_number << " trb=" << trb_conn + << " slots=" << trb_slot_count + << " completion=" << std::boolalpha << is_completion; + } catch (const ers::Issue& excpt) { + ers::warning(excpt); + } + } +} + void DFOModule::assign_trigger_decision(const std::shared_ptr& assignment) { - m_dataflow_availability[assignment->connection_name]->add_assignment(assignment); + auto& trb_data = m_dataflow_availability[assignment->connection_name]; + trb_data->add_assignment(assignment); + + if (m_consensus_enabled) { + broadcast_dfo_decision( + assignment->decision.trigger_number, assignment->connection_name, trb_data->used_slots(), false); + + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds.erase(assignment->decision.trigger_number); + } +} + +void +DFOModule::watchdog_thread_func() +{ + while (m_watchdog_running.load() && m_consensus_enabled) { + std::this_thread::sleep_for(m_watchdog_interval); + + if (!m_running_status.load()) { + continue; + } + + std::vector> timed_out; + auto now = std::chrono::steady_clock::now(); + + { + std::lock_guard guard(m_pending_tds_mutex); + for (const auto& entry : m_pending_tds) { + auto age = std::chrono::duration_cast(now - entry.second.received_at); + if (age > m_dfo_decision_timeout) { + timed_out.emplace_back(entry.first, entry.second); + } + } + } + + for (const auto& [trigger_number, pending] : timed_out) { + size_t failed_index = m_num_dfos.load() > 0 ? (trigger_number % m_num_dfos.load()) : 0; + handle_peer_failure(failed_index, trigger_number); + + std::lock_guard guard(m_pending_tds_mutex); + auto it = m_pending_tds.find(trigger_number); + if (it != m_pending_tds.end()) { + TLOG_DEBUG(TLVL_WATCHDOG) << get_name() << ": Reprocessing trigger_number " << trigger_number + << " after failover"; + auto decision_copy = it->second.decision; + m_pending_tds.erase(it); + receive_trigger_decision(decision_copy); + } + } + } +} + +void +DFOModule::handle_peer_failure(size_t failed_index, daqdataformats::trigger_number_t trigger_number) +{ + std::string failed_peer_name; + { + std::lock_guard guard(m_peers_mutex); + + std::vector ensemble; + ensemble.push_back(get_name()); + for (const auto& peer : m_registered_peers) { + ensemble.push_back(peer); + } + std::sort(ensemble.begin(), ensemble.end()); + + if (failed_index < ensemble.size()) { + failed_peer_name = ensemble[failed_index]; + if (!failed_peer_name.empty() && failed_peer_name != get_name()) { + m_registered_peers.erase(failed_peer_name); + } + } + } + + if (!failed_peer_name.empty() && failed_peer_name != get_name()) { + ers::warning(DFOConsensusFailover(ERS_HERE, get_name(), failed_peer_name, trigger_number)); + } + + { + std::lock_guard guard(m_remote_slots_mutex); + if (!failed_peer_name.empty()) { + m_remote_slot_counts.erase(failed_peer_name); + } + } + + compute_partition(); + ers::info(DFOConsensusPartitionInfo(ERS_HERE, get_name(), m_own_index.load(), m_num_dfos.load())); } } // namespace dunedaq::dfmodules diff --git a/plugins/DFOModule.hpp b/plugins/DFOModule.hpp index ad280b6c..8486b32a 100644 --- a/plugins/DFOModule.hpp +++ b/plugins/DFOModule.hpp @@ -9,6 +9,7 @@ #ifndef DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_ #define DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_ +#include "dfmodules/DFODecision.hpp" #include "dfmodules/TriggerRecordBuilderData.hpp" #include "appmodel/DFOConf.hpp" @@ -25,12 +26,19 @@ #include "appfwk/DAQModule.hpp" #include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG< +#include +#include +#include +#include #include #include +#include +#include #include +#include #include #include -#include namespace dunedaq { @@ -63,6 +71,26 @@ ERS_DECLARE_ISSUE(dfmodules, "TriggerDecision " << trigger_number << " was assigned to DF app " << app << " that was busy with " << used_slots << " TDs", ((uint32_t)trigger_number)((std::string)app)((size_t)used_slots)) // NOLINT(build/unsigned) + +ERS_DECLARE_ISSUE(dfmodules, + DFOConsensusPeerTimeout, + "DFOModule " << module_name << ": Timed out waiting for " << expected_peers + << " peer(s) to announce; received " << received_peers + << ". Continuing with the peers that responded.", + ((std::string)module_name)((size_t)expected_peers)((size_t)received_peers)) + +ERS_DECLARE_ISSUE(dfmodules, + DFOConsensusPartitionInfo, + "DFOModule " << module_name << ": Partition index " << own_index << " of " << num_dfos + << " DFO(s) in the ensemble.", + ((std::string)module_name)((size_t)own_index)((size_t)num_dfos)) + +ERS_DECLARE_ISSUE(dfmodules, + DFOConsensusFailover, + "DFOModule " << module_name << ": DFO peer " << failed_dfo + << " timed out for trigger_number " << trigger_number + << ". Removing from ensemble and reassigning.", + ((std::string)module_name)((std::string)failed_dfo)((uint32_t)trigger_number)) // Re-enable coverage checking LCOV_EXCL_STOP namespace dfmodules { @@ -88,6 +116,9 @@ class DFOModule : public dunedaq::appfwk::DAQModule void init(std::shared_ptr mcfg) override; + static constexpr daqdataformats::trigger_number_t s_peer_announce_magic = + std::numeric_limits::max(); + protected: virtual std::shared_ptr find_slot(const dfmessages::TriggerDecision& decision); // find_slot operates on a round-robin logic @@ -95,7 +126,7 @@ class DFOModule : public dunedaq::appfwk::DAQModule using trbd_ptr_t = std::shared_ptr; using data_structure_t = std::map; data_structure_t m_dataflow_availability; - data_structure_t::iterator m_last_assignement_it; + data_structure_t::iterator m_last_assignment_it; std::function m_metadata_function; private: @@ -107,28 +138,46 @@ class DFOModule : public dunedaq::appfwk::DAQModule void generate_opmon_data() override; - virtual void receive_trigger_complete_token(const dfmessages::TriggerDecisionToken&); + void send_peer_announcement(); + void compute_partition(); + + void on_token(const dfmessages::TriggerDecisionToken& token); + void on_trigger_decision(const dfmessages::TriggerDecision& decision); + void on_dfo_decision(const DFODecision& msg); + + void receive_trigger_complete_token(const dfmessages::TriggerDecisionToken&); void receive_trigger_decision(const dfmessages::TriggerDecision&); + virtual bool is_busy() const; + bool is_globally_busy() const; bool is_empty() const; size_t used_slots() const; void notify_trigger_if_needed() const; bool dispatch(const std::shared_ptr& assignment); + + void broadcast_dfo_decision(daqdataformats::trigger_number_t trigger_number, + const std::string& trb_conn, + size_t trb_slot_count, + bool is_completion); virtual void assign_trigger_decision(const std::shared_ptr& assignment); + void watchdog_thread_func(); + void handle_peer_failure(size_t failed_index, + daqdataformats::trigger_number_t trigger_number); + // Configuration - const appmodel::DFOConf* m_dfo_conf; - std::chrono::milliseconds m_queue_timeout; - std::chrono::microseconds m_stop_timeout; - dunedaq::daqdataformats::run_number_t m_run_number; + const appmodel::DFOConf* m_dfo_conf{ nullptr }; + std::chrono::milliseconds m_queue_timeout{ 100 }; + std::chrono::microseconds m_stop_timeout{ 0 }; + dunedaq::daqdataformats::run_number_t m_run_number{ 0 }; // Connections std::shared_ptr> m_busy_sender; std::string m_token_connection; std::string m_td_connection; - size_t m_td_send_retries; - size_t m_busy_threshold; - size_t m_free_threshold; + size_t m_td_send_retries{ 0 }; + size_t m_busy_threshold{ 0 }; + size_t m_free_threshold{ 0 }; std::vector m_trb_conn_ids; // Coordination @@ -138,23 +187,53 @@ class DFOModule : public dunedaq::appfwk::DAQModule std::chrono::steady_clock::time_point m_last_td_received; mutable std::mutex m_notify_trigger_mutex; + // Consensus state + bool m_consensus_enabled{ false }; + size_t m_expected_peers{ 0 }; + std::vector m_dfo_decision_output_connections; + std::string m_dfo_decision_input_connection; + std::chrono::milliseconds m_dfo_decision_timeout{ 2000 }; + std::chrono::milliseconds m_peer_announce_timeout{ 500 }; + std::chrono::milliseconds m_watchdog_interval{ 100 }; + std::set m_registered_peers; + mutable std::mutex m_peers_mutex; + std::condition_variable m_peers_cv; + std::atomic m_own_index{ 0 }; + std::atomic m_num_dfos{ 1 }; + std::map> m_remote_slot_counts; + mutable std::mutex m_remote_slots_mutex; + + struct PendingTD + { + dfmessages::TriggerDecision decision; + std::chrono::steady_clock::time_point received_at; + }; + std::map m_pending_tds; + mutable std::mutex m_pending_tds_mutex; + + std::thread m_watchdog_thread; + std::atomic m_watchdog_running{ false }; + // Struct for statistic - struct TriggerData { - std::atomic received{0}; - std::atomic completed{0}; + struct TriggerData + { + std::atomic received{ 0 }; + std::atomic completed{ 0 }; }; static std::set - unpack_types( decltype(dfmessages::TriggerDecision::trigger_type) t) { + unpack_types(decltype(dfmessages::TriggerDecision::trigger_type) t) + { std::set results; if (t == dfmessages::TypeDefaults::s_invalid_trigger_type) return results; const std::bitset<64> bits(t); - for( size_t i = 0; i < bits.size(); ++i ) { - if ( bits[i] ) results.insert((trgdataformats::TriggerCandidateData::Type)i); + for (size_t i = 0; i < bits.size(); ++i) { + if (bits[i]) + results.insert(static_cast(i)); } return results; } - + // Statistics std::atomic m_received_tokens{ 0 }; // NOLINT (build/unsigned) std::atomic m_sent_decisions{ 0 }; // NOLINT (build/unsigned) @@ -165,15 +244,16 @@ class DFOModule : public dunedaq::appfwk::DAQModule std::atomic m_waiting_for_token{ 0 }; // NOLINT (build/unsigned) std::atomic m_processing_token{ 0 }; // NOLINT (build/unsigned) std::map m_trigger_counters; - std::mutex m_trigger_counters_mutex; // used to safely handle the map above - TriggerData & get_trigger_counter(trgdataformats::TriggerCandidateData::Type type) { + std::mutex m_trigger_counters_mutex; // used to safely handle the map above + TriggerData& get_trigger_counter(trgdataformats::TriggerCandidateData::Type type) + { auto it = m_trigger_counters.find(type); - if (it != m_trigger_counters.end()) return it->second; - + if (it != m_trigger_counters.end()) + return it->second; + std::lock_guard guard(m_trigger_counters_mutex); return m_trigger_counters[type]; } - }; } // namespace dfmodules } // namespace dunedaq diff --git a/test/config/datafloworchestrator_test.data.xml b/test/config/datafloworchestrator_test.data.xml index be3b19b1..5d7caeb2 100755 --- a/test/config/datafloworchestrator_test.data.xml +++ b/test/config/datafloworchestrator_test.data.xml @@ -92,7 +92,19 @@ - + + + + + + + + + + + + + @@ -103,6 +115,20 @@ + + + + + + + + + + + + + + @@ -110,6 +136,7 @@ + @@ -329,7 +356,40 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/unittest/DFOModule_test.cxx b/unittest/DFOModule_test.cxx index 690ca08a..fa12a4a0 100644 --- a/unittest/DFOModule_test.cxx +++ b/unittest/DFOModule_test.cxx @@ -9,9 +9,12 @@ #include "DFOModule.hpp" +#include "appmodel/DFOModule.hpp" +#include "appmodel/DFOConf.hpp" #include "dfmessages/TriggerDecisionToken.hpp" #include "dfmessages/TriggerInhibit.hpp" #include "dfmodules/CommonIssues.hpp" +#include "dfmodules/DFODecision.hpp" #include "dfmodules/opmon/DFOModule.pb.h" #include "iomanager/IOManager.hpp" #include "iomanager/Sender.hpp" @@ -243,6 +246,10 @@ BOOST_AUTO_TEST_CASE(SendTrigDecFailed) dfo->execute_command("conf", null_data); + auto iom = iomanager::IOManager::get(); + auto inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback([](const dfmessages::TriggerInhibit&) {}); + dfo->execute_command("start", start_data); send_init_token("invalid_connection"); @@ -265,8 +272,234 @@ BOOST_AUTO_TEST_CASE(SendTrigDecFailed) send_token(1000); std::this_thread::sleep_for(std::chrono::milliseconds(50)); + BOOST_TEST_MESSAGE("Draining dataflow and scrapping DFO"); + dfo->execute_command("drain_dataflow", null_data); + dfo->execute_command("scrap", null_data); + + inh_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// Peer-announcement magic value +// Verify that s_peer_announce_magic does not equal 0 and is max for the type. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(PeerAnnounceMagicValue) +{ + BOOST_REQUIRE_NE(DFOModule::s_peer_announce_magic, static_cast(0)); + BOOST_REQUIRE_EQUAL(DFOModule::s_peer_announce_magic, std::numeric_limits::max()); +} + +// --------------------------------------------------------------------------- +// Partition-filter logic (unit test without networking) +// Instantiate the DFOModule and manually exercise the filter by +// sending trigger decisions at various trigger_numbers. With two DFOs +// (indices 0 and 1), decisions with even trigger_numbers go to the DFO +// at index 0 and odd ones to index 1. +// Here we simulate the DFO that has own_index=0 and num_dfos=2 by +// starting it and injecting a synthetic peer announcement so the partition +// settles before the first trigger decision arrives. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(PartitionFilter) +{ + auto dfo = appfwk::make_module("DFOModule", "test_consensus"); + opmgr.register_node("dfo", dfo); + dfo->init(cfgMgr); + + appfwk::DAQModule::CommandData_t null_data; + appfwk::DAQModule::CommandData_t start_data; + start_data.emplace("run", 1); + + dfo->execute_command("conf", null_data); + + auto iom = iomanager::IOManager::get(); + + // Count how many trigger decisions actually reach the TRB. + std::atomic received_count{ 0 }; + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback([&received_count](const dfmessages::TriggerDecision& td) { + ++received_count; + // Send a completion token back. + dfmessages::TriggerDecisionToken token; + token.run_number = td.run_number; + token.trigger_number = td.trigger_number; + token.decision_destination = "trigdec_0"; + get_iom_sender("token")->send(std::move(token), iomanager::Sender::s_block); + }); + + auto inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback(recv_triginh); + + dfo->execute_command("start", start_data); + send_init_token(); // Register TRB app with the DFO. + + // Inject a synthetic peer announcement that makes the module believe a + // second DFO "zzz_peer" is also in the ensemble. "test" < "zzz_peer" + // alphabetically, so "test" gets index 0 and processes even trigger_numbers. + { + DFODecision peer_ann; + peer_ann.run_number = 0; + peer_ann.trigger_number = DFOModule::s_peer_announce_magic; + peer_ann.trb_connection_name = ""; + peer_ann.trb_slot_count = 0; + peer_ann.source_dfo_name = "zzz_peer"; + peer_ann.is_completion = true; + get_iom_sender("dfo_decision_input")->send(std::move(peer_ann), iomanager::Sender::s_block); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Trigger numbers 1–4: only even ones (2, 4) should reach the TRB because + // this DFO has own_index==0 (trigger_number % 2 == 0). + for (dfmessages::trigger_number_t tn = 1; tn <= 4; ++tn) { + send_trigdec(tn); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // Exactly 2 decisions (trigger_numbers 2 and 4) should have reached the TRB. + BOOST_REQUIRE_EQUAL(received_count.load(), 2u); + + dfo->execute_command("drain_dataflow", null_data); + dfo->execute_command("scrap", null_data); + + dec_recv->remove_callback(); + inh_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// DFODecision broadcast on assignment +// In standalone mode (no peer DFO connections) no DFODecision output +// connections are configured, so the broadcast is a no-op. Verify that +// the module still correctly processes the TD and the callback path does +// not throw. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(DFODecisionBroadcastStandaloneNoOp) +{ + auto dfo = appfwk::make_module("DFOModule", "test_consensus"); + opmgr.register_node("dfo", dfo); + dfo->init(cfgMgr); + + appfwk::DAQModule::CommandData_t null_data; + appfwk::DAQModule::CommandData_t start_data; + start_data.emplace("run", 1); + + dfo->execute_command("conf", null_data); + + auto iom = iomanager::IOManager::get(); + auto dec_recv = iom->get_receiver("trigdec_0"); + std::atomic received{ 0 }; + dec_recv->add_callback([&](const dfmessages::TriggerDecision& td) { + ++received; + dfmessages::TriggerDecisionToken token; + token.run_number = td.run_number; + token.trigger_number = td.trigger_number; + token.decision_destination = "trigdec_0"; + get_iom_sender("token")->send(std::move(token), iomanager::Sender::s_block); + }); + + auto inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback([](const dfmessages::TriggerInhibit&) {}); + + dfo->execute_command("start", start_data); + send_init_token(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + send_trigdec(1); + send_trigdec(2); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // Both TDs should have been processed without error. + BOOST_REQUIRE_GE(received.load(), 1); + dfo->execute_command("drain_dataflow", null_data); dfo->execute_command("scrap", null_data); + + dec_recv->remove_callback(); + inh_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// Watchdog failover +// Simulate a two-DFO scenario where the responsible peer (zzz_peer) for +// odd trigger_numbers never sends a DFODecision. The watchdog should +// detect the timeout, remove zzz_peer from the ensemble, recompute the +// partition (this DFO now owns ALL triggers), and reassign the pending TD. +// +// We use s_dfo_decision_timeout to bound the wait, then verify the module +// eventually dispatches the previously-orphaned TD. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(WatchdogFailover) +{ + auto dfo = appfwk::make_module("DFOModule", "test_consensus"); + opmgr.register_node("dfo", dfo); + dfo->init(cfgMgr); + + appfwk::DAQModule::CommandData_t null_data; + appfwk::DAQModule::CommandData_t start_data; + start_data.emplace("run", 1); + + dfo->execute_command("conf", null_data); + + auto iom = iomanager::IOManager::get(); + + std::atomic received_count{ 0 }; + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback([&](const dfmessages::TriggerDecision& td) { + ++received_count; + dfmessages::TriggerDecisionToken token; + token.run_number = td.run_number; + token.trigger_number = td.trigger_number; + token.decision_destination = "trigdec_0"; + get_iom_sender("token")->send(std::move(token), iomanager::Sender::s_block); + }); + + auto inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback([](const dfmessages::TriggerInhibit&) {}); + + dfo->execute_command("start", start_data); + send_init_token(); + + // Inject a synthetic peer announcement for "zzz_peer" so this DFO gets + // own_index=0 (even trigger_numbers) and zzz_peer gets index=1 (odd). + { + DFODecision peer_ann; + peer_ann.run_number = 0; + peer_ann.trigger_number = DFOModule::s_peer_announce_magic; + peer_ann.trb_connection_name = ""; + peer_ann.trb_slot_count = 0; + peer_ann.source_dfo_name = "zzz_peer"; + peer_ann.is_completion = true; + get_iom_sender("dfo_decision_input")->send(std::move(peer_ann), iomanager::Sender::s_block); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Send trigger_number 1 (odd → would normally go to zzz_peer) and + // trigger_number 2 (even → processed immediately by this DFO). + send_trigdec(1); // buffered; waiting for zzz_peer's DFODecision + send_trigdec(2); // processed immediately + + // Wait for this DFO to handle trigger 2. + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + BOOST_REQUIRE_GE(received_count.load(), 1u); // trigger 2 processed + + // Wait for the watchdog to fire (timeout + extra time for the watchdog + // interval and processing). + auto mdal = cfgMgr->get_dal("test_consensus"); + auto dfo_timeout_ms = std::chrono::milliseconds(mdal->get_configuration()->get_dfo_decision_timeout_ms()); + + auto watchdog_wait = dfo_timeout_ms + std::chrono::milliseconds(500); + std::this_thread::sleep_for(watchdog_wait); + + // After failover, trigger 1 should also have been dispatched. + BOOST_REQUIRE_EQUAL(received_count.load(), 2u); + + dfo->execute_command("drain_dataflow", null_data); + dfo->execute_command("scrap", null_data); + + dec_recv->remove_callback(); + inh_recv->remove_callback(); } BOOST_AUTO_TEST_SUITE_END()