Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions quantum/impl/quantum_dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//#################################### IMPLEMENTATIONS #########################################
//##############################################################################################

#include <quantum/quantum_exceptions.h>
#include <quantum/util/quantum_util.h>

namespace Bloomberg {
Expand Down Expand Up @@ -366,15 +367,15 @@ bool Dispatcher::drain(std::chrono::milliseconds timeout,
{
return true; //skip draining
}

auto start = std::chrono::steady_clock::now();

//wait until all queues have completed their work
YieldingThread yield;
while (!empty())
{
yield();

//check remaining time
if (timeout != std::chrono::milliseconds(-1))
{
Expand All @@ -386,7 +387,7 @@ bool Dispatcher::drain(std::chrono::milliseconds timeout,
}
}
}

#ifdef __QUANTUM_PRINT_DEBUG
std::lock_guard<std::mutex> guard(Util::LogMutex());
std::cout << "All queues have drained." << std::endl;
Expand Down Expand Up @@ -436,7 +437,7 @@ Dispatcher::postImpl(int queueId,
using FirstArg = decltype(firstArgOf(func));
if (_drain || _terminated)
{
throw std::runtime_error("Posting is disabled");
throw DispatcherDrainingException{};
}
if (queueId < (int)IQueue::QueueId::Any)
{
Expand Down Expand Up @@ -470,7 +471,7 @@ Dispatcher::postAsyncIoImpl(int queueId,
using FirstArg = decltype(firstArgOf(func));
if (_drain || _terminated)
{
throw std::runtime_error("Posting is disabled");
throw DispatcherDrainingException{};
}
if (queueId < (int)IQueue::QueueId::Any)
{
Expand Down
42 changes: 42 additions & 0 deletions quantum/quantum_exceptions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
** Copyright 2026 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/
#ifndef BLOOMBERG_QUANTUM_EXCEPTIONS_H
#define BLOOMBERG_QUANTUM_EXCEPTIONS_H

#include <stdexcept>

namespace Bloomberg {
namespace quantum {

class DispatcherDrainingException : public std::runtime_error{
public:
/// @brief Constructor.
DispatcherDrainingException() : std::runtime_error{"Posting is disabled"}{}

using std::runtime_error::what;
};

class SequencerDrainingException : public std::runtime_error{
public:
/// @brief Constructor.
SequencerDrainingException() : std::runtime_error{"Sequencer is disabled"}{}

using std::runtime_error::what;
};

}}

#endif //BLOOMBERG_QUANTUM_EXCEPTIONS_H
6 changes: 3 additions & 3 deletions quantum/util/impl/quantum_sequencer_experimental_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueueSingle(
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
throw SequencerDrainingException{};
}

if (queueId < (int)IQueue::QueueId::Any)
Expand Down Expand Up @@ -333,7 +333,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueueMultiple(
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
throw SequencerDrainingException{};
}
if (queueId < (int)IQueue::QueueId::Any)
{
Expand Down Expand Up @@ -401,7 +401,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueueAllImpl(
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
throw SequencerDrainingException{};
}
if (queueId < (int)IQueue::QueueId::Any)
{
Expand Down
12 changes: 6 additions & 6 deletions quantum/util/impl/quantum_sequencer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueue(
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
throw SequencerDrainingException{};
}
_dispatcher.post(_controllerQueueId,
false,
Expand All @@ -86,7 +86,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueue(
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
throw SequencerDrainingException{};
}
if (queueId < (int)IQueue::QueueId::Any)
{
Expand Down Expand Up @@ -114,7 +114,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueue(
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
throw SequencerDrainingException{};
}
_dispatcher.post(_controllerQueueId,
false,
Expand All @@ -141,7 +141,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueue(
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
throw SequencerDrainingException{};
}
if (queueId < (int)IQueue::QueueId::Any)
{
Expand All @@ -166,7 +166,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueueAll(FUNC&& func, ARGS&
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
throw SequencerDrainingException{};
}
_dispatcher.post(_controllerQueueId,
false,
Expand All @@ -191,7 +191,7 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::enqueueAll(
{
if (_drain)
{
throw std::runtime_error("Sequencer is disabled");
throw SequencerDrainingException{};
}
if (queueId < (int)IQueue::QueueId::Any)
{
Expand Down
24 changes: 24 additions & 0 deletions tests/quantum_sequencer_experimental_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,4 +716,28 @@ TEST_P(SequencerExperimentalTest, DuplicateKeys)
EXPECT_EQ(0u, stats.getPendingTaskCount());
}

TEST_P(SequencerExperimentalTest, EnqueueAfterDrainThrows)
{
using namespace std::chrono_literals;

SequencerExperimentalTestData::TaskSequencer sequencer{ getDispatcher() };

// Drain the sequencer
EXPECT_TRUE(sequencer.drain(10ms, true));

SequencerExperimentalTestData testData;
SequencerExperimentalTestData::TaskId id = 0;
SequencerExperimentalTestData::SequenceKey sequenceKey = 0;

// Demonstrate that enqueueing throws an exception,
// and that it is both a std::runtime_error and the dedicated exception
EXPECT_THROW(sequencer.enqueue(sequenceKey, testData.makeTask(id)), std::runtime_error);
EXPECT_THROW(sequencer.enqueue(std::vector<int>{sequenceKey}, testData.makeTask(id)), std::runtime_error);
EXPECT_THROW(sequencer.enqueueAll(testData.makeTask(id)), std::runtime_error);

EXPECT_THROW(sequencer.enqueue(sequenceKey, testData.makeTask(id)), SequencerDrainingException);
EXPECT_THROW(sequencer.enqueue(std::vector<int>{sequenceKey}, testData.makeTask(id)), SequencerDrainingException);
EXPECT_THROW(sequencer.enqueueAll(testData.makeTask(id)), SequencerDrainingException);
}

#endif // BLOOMBERG_QUANTUM_SEQUENCER_SUPPORT
24 changes: 24 additions & 0 deletions tests/quantum_sequencer_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,3 +585,27 @@ TEST_P(SequencerTest, DuplicateKeys)
EXPECT_EQ(1u, stats.getPostedTaskCount());
EXPECT_EQ(0u, stats.getPendingTaskCount());
}

TEST_P(SequencerTest, EnqueueAfterDrainThrows)
{
using namespace std::chrono_literals;

SequencerTestData::TaskSequencer sequencer{ getDispatcher() };

// Drain the sequencer
EXPECT_TRUE(sequencer.drain(10ms, true));

SequencerTestData testData;
SequencerTestData::TaskId id = 0;
SequencerTestData::SequenceKey sequenceKey = 0;

// Demonstrate that enqueueing throws an exception,
// and that it is both a std::runtime_error and the dedicated exception
EXPECT_THROW(sequencer.enqueue(sequenceKey, testData.makeTask(id)), std::runtime_error);
EXPECT_THROW(sequencer.enqueue(std::vector<int>{sequenceKey}, testData.makeTask(id)), std::runtime_error);
EXPECT_THROW(sequencer.enqueueAll(testData.makeTask(id)), std::runtime_error);

EXPECT_THROW(sequencer.enqueue(sequenceKey, testData.makeTask(id)), SequencerDrainingException);
EXPECT_THROW(sequencer.enqueue(std::vector<int>{sequenceKey}, testData.makeTask(id)), SequencerDrainingException);
EXPECT_THROW(sequencer.enqueueAll(testData.makeTask(id)), SequencerDrainingException);
}
17 changes: 17 additions & 0 deletions tests/quantum_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
** limitations under the License.
*/
#include <gtest/gtest.h>
#include <quantum/quantum_exceptions.h>
#include <quantum_fixture.h>
#include <stdexcept>
#include <vector>
Expand Down Expand Up @@ -551,6 +552,22 @@ TEST_P(ExecutionTest, DrainAllTasks)
EXPECT_EQ((size_t)0, dispatcher.size());
}

TEST_P(ExecutionTest, PostAfterDrainThrows)
{
Dispatcher& dispatcher = getDispatcher();
dispatcher.drain(std::chrono::milliseconds(-1), true);

// Demonstrate that posting throws an exception,
// and that it is both a std::runtime_error and the dedicated exception
EXPECT_THROW(dispatcher.post(DummyCoro), std::runtime_error);
EXPECT_THROW(dispatcher.post2(DummyCoro), std::runtime_error);
EXPECT_THROW(dispatcher.postAsyncIo(DummyIoTask), std::runtime_error);

EXPECT_THROW(dispatcher.post(DummyCoro), DispatcherDrainingException);
EXPECT_THROW(dispatcher.post2(DummyCoro), DispatcherDrainingException);
EXPECT_THROW(dispatcher.postAsyncIo(DummyIoTask), DispatcherDrainingException);
}

TEST_P(ExecutionTest, YieldingBetweenTwoCoroutines)
{
//Basic test which verifies cooperation between two coroutines.
Expand Down
Loading