From 47c484b230e9a8b423022940b0d1616bbdc95d0e Mon Sep 17 00:00:00 2001 From: "Adam M. Rosenzweig" Date: Mon, 6 Apr 2026 11:30:17 -0400 Subject: [PATCH 1/2] Use dedicated exceptions when dispatcher or sequencer are disabled Signed-off-by: Adam M. Rosenzweig --- quantum/impl/quantum_dispatcher_impl.h | 13 +++--- quantum/quantum_exceptions.h | 42 +++++++++++++++++++ .../quantum_sequencer_experimental_impl.h | 6 +-- quantum/util/impl/quantum_sequencer_impl.h | 12 +++--- 4 files changed, 58 insertions(+), 15 deletions(-) create mode 100644 quantum/quantum_exceptions.h diff --git a/quantum/impl/quantum_dispatcher_impl.h b/quantum/impl/quantum_dispatcher_impl.h index a07dbec..7114088 100644 --- a/quantum/impl/quantum_dispatcher_impl.h +++ b/quantum/impl/quantum_dispatcher_impl.h @@ -19,6 +19,7 @@ //#################################### IMPLEMENTATIONS ######################################### //############################################################################################## +#include #include namespace Bloomberg { @@ -366,15 +367,15 @@ bool Dispatcher::drain(std::chrono::milliseconds timeout, { return true; //skip draining } - + auto start = std::chrono::steady_clock::now(); - + //wait until all queues have completed their work YieldingThread yield; while (!empty()) { yield(); - + //check remaining time if (timeout != std::chrono::milliseconds(-1)) { @@ -386,7 +387,7 @@ bool Dispatcher::drain(std::chrono::milliseconds timeout, } } } - + #ifdef __QUANTUM_PRINT_DEBUG std::lock_guard guard(Util::LogMutex()); std::cout << "All queues have drained." << std::endl; @@ -436,7 +437,7 @@ Dispatcher::postImpl(int queueId, using FirstArg = decltype(firstArgOf(func)); if (_drain || _terminated) { - throw std::runtime_error("Posting is disabled"); + throw DispatcherDrainingException{}; } if (queueId < (int)IQueue::QueueId::Any) { @@ -470,7 +471,7 @@ Dispatcher::postAsyncIoImpl(int queueId, using FirstArg = decltype(firstArgOf(func)); if (_drain || _terminated) { - throw std::runtime_error("Posting is disabled"); + throw DispatcherDrainingException{}; } if (queueId < (int)IQueue::QueueId::Any) { diff --git a/quantum/quantum_exceptions.h b/quantum/quantum_exceptions.h new file mode 100644 index 0000000..154a561 --- /dev/null +++ b/quantum/quantum_exceptions.h @@ -0,0 +1,42 @@ +/* +** Copyright 2026 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#ifndef BLOOMBERG_QUANTUM_EXCEPTIONS_H +#define BLOOMBERG_QUANTUM_EXCEPTIONS_H + +#include + +namespace Bloomberg { +namespace quantum { + +class DispatcherDrainingException : public std::runtime_error{ +public: + /// @brief Constructor. + DispatcherDrainingException() : std::runtime_error{"Posting is disabled"}{} + + using std::runtime_error::what; +}; + +class SequencerDrainingException : public std::runtime_error{ +public: + /// @brief Constructor. + SequencerDrainingException() : std::runtime_error{"Sequencer is disabled"}{} + + using std::runtime_error::what; +}; + +}} + +#endif //BLOOMBERG_QUANTUM_EXCEPTIONS_H diff --git a/quantum/util/impl/quantum_sequencer_experimental_impl.h b/quantum/util/impl/quantum_sequencer_experimental_impl.h index d0df577..44ea328 100644 --- a/quantum/util/impl/quantum_sequencer_experimental_impl.h +++ b/quantum/util/impl/quantum_sequencer_experimental_impl.h @@ -269,7 +269,7 @@ Sequencer::enqueueSingle( { if (_drain) { - throw std::runtime_error("Sequencer is disabled"); + throw SequencerDrainingException{}; } if (queueId < (int)IQueue::QueueId::Any) @@ -333,7 +333,7 @@ Sequencer::enqueueMultiple( { if (_drain) { - throw std::runtime_error("Sequencer is disabled"); + throw SequencerDrainingException{}; } if (queueId < (int)IQueue::QueueId::Any) { @@ -401,7 +401,7 @@ Sequencer::enqueueAllImpl( { if (_drain) { - throw std::runtime_error("Sequencer is disabled"); + throw SequencerDrainingException{}; } if (queueId < (int)IQueue::QueueId::Any) { diff --git a/quantum/util/impl/quantum_sequencer_impl.h b/quantum/util/impl/quantum_sequencer_impl.h index 1e649b2..fa9939d 100644 --- a/quantum/util/impl/quantum_sequencer_impl.h +++ b/quantum/util/impl/quantum_sequencer_impl.h @@ -59,7 +59,7 @@ Sequencer::enqueue( { if (_drain) { - throw std::runtime_error("Sequencer is disabled"); + throw SequencerDrainingException{}; } _dispatcher.post(_controllerQueueId, false, @@ -86,7 +86,7 @@ Sequencer::enqueue( { if (_drain) { - throw std::runtime_error("Sequencer is disabled"); + throw SequencerDrainingException{}; } if (queueId < (int)IQueue::QueueId::Any) { @@ -114,7 +114,7 @@ Sequencer::enqueue( { if (_drain) { - throw std::runtime_error("Sequencer is disabled"); + throw SequencerDrainingException{}; } _dispatcher.post(_controllerQueueId, false, @@ -141,7 +141,7 @@ Sequencer::enqueue( { if (_drain) { - throw std::runtime_error("Sequencer is disabled"); + throw SequencerDrainingException{}; } if (queueId < (int)IQueue::QueueId::Any) { @@ -166,7 +166,7 @@ Sequencer::enqueueAll(FUNC&& func, ARGS& { if (_drain) { - throw std::runtime_error("Sequencer is disabled"); + throw SequencerDrainingException{}; } _dispatcher.post(_controllerQueueId, false, @@ -191,7 +191,7 @@ Sequencer::enqueueAll( { if (_drain) { - throw std::runtime_error("Sequencer is disabled"); + throw SequencerDrainingException{}; } if (queueId < (int)IQueue::QueueId::Any) { From ca89a80b0b76f6c71bd3cba8e75b0649db24ce88 Mon Sep 17 00:00:00 2001 From: "Adam M. Rosenzweig" Date: Mon, 6 Apr 2026 14:40:50 -0400 Subject: [PATCH 2/2] Add unit tests on throws Signed-off-by: Adam M. Rosenzweig --- .../quantum_sequencer_experimental_tests.cpp | 24 +++++++++++++++++++ tests/quantum_sequencer_tests.cpp | 24 +++++++++++++++++++ tests/quantum_tests.cpp | 17 +++++++++++++ 3 files changed, 65 insertions(+) diff --git a/tests/quantum_sequencer_experimental_tests.cpp b/tests/quantum_sequencer_experimental_tests.cpp index b6420e4..a4be0da 100644 --- a/tests/quantum_sequencer_experimental_tests.cpp +++ b/tests/quantum_sequencer_experimental_tests.cpp @@ -716,4 +716,28 @@ TEST_P(SequencerExperimentalTest, DuplicateKeys) EXPECT_EQ(0u, stats.getPendingTaskCount()); } +TEST_P(SequencerExperimentalTest, EnqueueAfterDrainThrows) +{ + using namespace std::chrono_literals; + + SequencerExperimentalTestData::TaskSequencer sequencer{ getDispatcher() }; + + // Drain the sequencer + EXPECT_TRUE(sequencer.drain(10ms, true)); + + SequencerExperimentalTestData testData; + SequencerExperimentalTestData::TaskId id = 0; + SequencerExperimentalTestData::SequenceKey sequenceKey = 0; + + // Demonstrate that enqueueing throws an exception, + // and that it is both a std::runtime_error and the dedicated exception + EXPECT_THROW(sequencer.enqueue(sequenceKey, testData.makeTask(id)), std::runtime_error); + EXPECT_THROW(sequencer.enqueue(std::vector{sequenceKey}, testData.makeTask(id)), std::runtime_error); + EXPECT_THROW(sequencer.enqueueAll(testData.makeTask(id)), std::runtime_error); + + EXPECT_THROW(sequencer.enqueue(sequenceKey, testData.makeTask(id)), SequencerDrainingException); + EXPECT_THROW(sequencer.enqueue(std::vector{sequenceKey}, testData.makeTask(id)), SequencerDrainingException); + EXPECT_THROW(sequencer.enqueueAll(testData.makeTask(id)), SequencerDrainingException); +} + #endif // BLOOMBERG_QUANTUM_SEQUENCER_SUPPORT diff --git a/tests/quantum_sequencer_tests.cpp b/tests/quantum_sequencer_tests.cpp index af6ac7f..768872d 100644 --- a/tests/quantum_sequencer_tests.cpp +++ b/tests/quantum_sequencer_tests.cpp @@ -585,3 +585,27 @@ TEST_P(SequencerTest, DuplicateKeys) EXPECT_EQ(1u, stats.getPostedTaskCount()); EXPECT_EQ(0u, stats.getPendingTaskCount()); } + +TEST_P(SequencerTest, EnqueueAfterDrainThrows) +{ + using namespace std::chrono_literals; + + SequencerTestData::TaskSequencer sequencer{ getDispatcher() }; + + // Drain the sequencer + EXPECT_TRUE(sequencer.drain(10ms, true)); + + SequencerTestData testData; + SequencerTestData::TaskId id = 0; + SequencerTestData::SequenceKey sequenceKey = 0; + + // Demonstrate that enqueueing throws an exception, + // and that it is both a std::runtime_error and the dedicated exception + EXPECT_THROW(sequencer.enqueue(sequenceKey, testData.makeTask(id)), std::runtime_error); + EXPECT_THROW(sequencer.enqueue(std::vector{sequenceKey}, testData.makeTask(id)), std::runtime_error); + EXPECT_THROW(sequencer.enqueueAll(testData.makeTask(id)), std::runtime_error); + + EXPECT_THROW(sequencer.enqueue(sequenceKey, testData.makeTask(id)), SequencerDrainingException); + EXPECT_THROW(sequencer.enqueue(std::vector{sequenceKey}, testData.makeTask(id)), SequencerDrainingException); + EXPECT_THROW(sequencer.enqueueAll(testData.makeTask(id)), SequencerDrainingException); +} diff --git a/tests/quantum_tests.cpp b/tests/quantum_tests.cpp index 54841b5..bce3daa 100644 --- a/tests/quantum_tests.cpp +++ b/tests/quantum_tests.cpp @@ -14,6 +14,7 @@ ** limitations under the License. */ #include +#include #include #include #include @@ -551,6 +552,22 @@ TEST_P(ExecutionTest, DrainAllTasks) EXPECT_EQ((size_t)0, dispatcher.size()); } +TEST_P(ExecutionTest, PostAfterDrainThrows) +{ + Dispatcher& dispatcher = getDispatcher(); + dispatcher.drain(std::chrono::milliseconds(-1), true); + + // Demonstrate that posting throws an exception, + // and that it is both a std::runtime_error and the dedicated exception + EXPECT_THROW(dispatcher.post(DummyCoro), std::runtime_error); + EXPECT_THROW(dispatcher.post2(DummyCoro), std::runtime_error); + EXPECT_THROW(dispatcher.postAsyncIo(DummyIoTask), std::runtime_error); + + EXPECT_THROW(dispatcher.post(DummyCoro), DispatcherDrainingException); + EXPECT_THROW(dispatcher.post2(DummyCoro), DispatcherDrainingException); + EXPECT_THROW(dispatcher.postAsyncIo(DummyIoTask), DispatcherDrainingException); +} + TEST_P(ExecutionTest, YieldingBetweenTwoCoroutines) { //Basic test which verifies cooperation between two coroutines.