diff --git a/include/boost/lockfree/detail/freelist.hpp b/include/boost/lockfree/detail/freelist.hpp index 23776b7..9024c6c 100644 --- a/include/boost/lockfree/detail/freelist.hpp +++ b/include/boost/lockfree/detail/freelist.hpp @@ -236,7 +236,7 @@ class alignas( cacheline_bytes ) freelist_stack : Alloc freelist_node* new_pool_ptr = reinterpret_cast< freelist_node* >( node ); for ( ;; ) { - tagged_node_ptr new_pool( new_pool_ptr, old_pool.get_tag() ); + tagged_node_ptr new_pool( new_pool_ptr, old_pool.get_next_tag() ); new_pool->next.set_ptr( old_pool.get_ptr() ); if ( pool_.compare_exchange_weak( old_pool, new_pool ) ) @@ -250,7 +250,7 @@ class alignas( cacheline_bytes ) freelist_stack : Alloc tagged_node_ptr old_pool = pool_.load( memory_order_relaxed ); freelist_node* new_pool_ptr = reinterpret_cast< freelist_node* >( node ); - tagged_node_ptr new_pool( new_pool_ptr, old_pool.get_tag() ); + tagged_node_ptr new_pool( new_pool_ptr, old_pool.get_next_tag() ); new_pool->next.set_ptr( old_pool.get_ptr() ); pool_.store( new_pool, memory_order_relaxed ); @@ -589,7 +589,7 @@ class fixed_size_freelist : NodeStorage tagged_index old_pool = pool_.load( memory_order_acquire ); for ( ;; ) { - tagged_index new_pool( index, old_pool.get_tag() ); + tagged_index new_pool( index, old_pool.get_next_tag() ); new_pool_node->next.set_index( old_pool.get_index() ); if ( pool_.compare_exchange_weak( old_pool, new_pool ) ) @@ -602,7 +602,7 @@ class fixed_size_freelist : NodeStorage freelist_node* new_pool_node = reinterpret_cast< freelist_node* >( NodeStorage::nodes() + index ); tagged_index old_pool = pool_.load( memory_order_acquire ); - tagged_index new_pool( index, old_pool.get_tag() ); + tagged_index new_pool( index, old_pool.get_next_tag() ); new_pool_node->next.set_index( old_pool.get_index() ); pool_.store( new_pool ); diff --git a/include/boost/lockfree/stack.hpp b/include/boost/lockfree/stack.hpp index d5696d5..b60b121 100644 --- a/include/boost/lockfree/stack.hpp +++ b/include/boost/lockfree/stack.hpp @@ -301,7 +301,7 @@ class stack { tagged_node_handle old_tos = tos.load( detail::memory_order_relaxed ); for ( ;; ) { - tagged_node_handle new_tos( pool.get_handle( new_top_node ), old_tos.get_tag() ); + tagged_node_handle new_tos( pool.get_handle( new_top_node ), old_tos.get_next_tag() ); end_node->next = pool.get_handle( old_tos ); if ( tos.compare_exchange_weak( old_tos, new_tos ) ) @@ -313,7 +313,7 @@ class stack { tagged_node_handle old_tos = tos.load( detail::memory_order_relaxed ); - tagged_node_handle new_tos( pool.get_handle( new_top_node ), old_tos.get_tag() ); + tagged_node_handle new_tos( pool.get_handle( new_top_node ), old_tos.get_next_tag() ); end_node->next = pool.get_handle( old_tos ); tos.store( new_tos, memory_order_relaxed ); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c7dd5be..728fa2f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -23,6 +23,7 @@ set(Tests destructor_test freelist_test queue_bounded_stress_test + queue_comprehensive_stress_test queue_fixedsize_stress_test queue_interprocess_test queue_test @@ -30,6 +31,7 @@ set(Tests spsc_queue_stress_test spsc_queue_test stack_bounded_stress_test + stack_comprehensive_stress_test stack_fixedsize_stress_test stack_interprocess_test stack_test diff --git a/test/queue_comprehensive_stress_test.cpp b/test/queue_comprehensive_stress_test.cpp new file mode 100644 index 0000000..3338b6d --- /dev/null +++ b/test/queue_comprehensive_stress_test.cpp @@ -0,0 +1,37 @@ +// Copyright (C) 2026 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include + +#define BOOST_TEST_MAIN +#ifdef BOOST_LOCKFREE_INCLUDE_TESTS +# include +#else +# include +#endif + +#include "test_common.hpp" + +namespace { + +using comprehensive_stress_tester_1c = comprehensive_stress_tester< 10, 1 >; +using comprehensive_stress_tester_4c = comprehensive_stress_tester< 10, 4 >; + +} // namespace + +BOOST_AUTO_TEST_CASE( queue_comprehensive_stress_unbounded_1_consumer ) +{ + comprehensive_stress_tester_1c tester; + boost::lockfree::queue< int > q( 128 ); + tester.run( q ); +} + +BOOST_AUTO_TEST_CASE( queue_comprehensive_stress_unbounded_4_consumers ) +{ + comprehensive_stress_tester_4c tester; + boost::lockfree::queue< int > q( 128 ); + tester.run( q ); +} diff --git a/test/stack_comprehensive_stress_test.cpp b/test/stack_comprehensive_stress_test.cpp new file mode 100644 index 0000000..f76d1f9 --- /dev/null +++ b/test/stack_comprehensive_stress_test.cpp @@ -0,0 +1,37 @@ +// Copyright (C) 2026 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include + +#define BOOST_TEST_MAIN +#ifdef BOOST_LOCKFREE_INCLUDE_TESTS +# include +#else +# include +#endif + +#include "test_common.hpp" + +namespace { + +using comprehensive_stack_stress_tester_1c = comprehensive_stack_stress_tester< 10, 1 >; +using comprehensive_stack_stress_tester_4c = comprehensive_stack_stress_tester< 10, 4 >; + +} // namespace + +BOOST_AUTO_TEST_CASE( stack_comprehensive_stress_unbounded_1_consumer ) +{ + comprehensive_stack_stress_tester_1c tester; + boost::lockfree::stack< int > s( 128 ); + tester.run( s ); +} + +BOOST_AUTO_TEST_CASE( stack_comprehensive_stress_unbounded_4_consumers ) +{ + comprehensive_stack_stress_tester_4c tester; + boost::lockfree::stack< int > s( 128 ); + tester.run( s ); +} diff --git a/test/test_common.hpp b/test/test_common.hpp index c449eaa..48061e5 100644 --- a/test/test_common.hpp +++ b/test/test_common.hpp @@ -337,3 +337,290 @@ struct stack_consume_all_atomic_stress_tester } // namespace impl using impl::stack_consume_all_atomic_stress_tester; + +namespace impl { + +// Comprehensive stress tester with per-producer value ranges and per-consumer monotonicity tracking +// - 10 producer threads, each producing 900,000 sequential values +// - Producer i produces: 1000000*i, 1000000*i+1, ..., 1000000*i+899999 +// - Multiple consumer threads, each tracking last-received value per producer +// - Validates strict monotonicity per producer and per-consumer consistency +template < int NumProducers = 10, int NumConsumers = 1 > +struct comprehensive_stress_tester +{ + enum + { + values_per_producer = 900000, + base_multiplier = 1000000, + }; + + std::atomic< int > producers_finished { 0 }; + std::atomic< int > validation_errors { 0 }; + std::atomic< long > total_values_consumed { 0 }; + + // Per-consumer validation state + struct consumer_state + { + std::array< int, NumProducers > last_received {}; + }; + std::array< consumer_state, NumConsumers > consumers; + + template < typename DataStructure > + bool validate_value( int consumer_id, int value ) + { + if ( consumer_id < 0 || consumer_id >= NumConsumers ) + return false; + + int producer_id = value / base_multiplier; + int offset = value % base_multiplier; + + // Check range + if ( producer_id < 0 || producer_id >= NumProducers ) + return false; + if ( offset < 0 || offset >= values_per_producer ) + return false; + + // Check strict monotonicity per producer for this consumer + int& last = consumers[ consumer_id ].last_received[ producer_id ]; + if ( last != 0 && value <= last ) + return false; + + last = value; + return true; + } + + template < typename DataStructure > + void produce_items( DataStructure& ds, int producer_id ) + { + int base = base_multiplier * producer_id; + for ( long i = 0; i < values_per_producer; ++i ) { + int value = base + i; + while ( !ds.push( value ) ) { +#ifdef __VXWORKS__ + std::this_thread::yield(); +#endif + } + } + producers_finished.fetch_add( 1 ); + } + + template < typename DataStructure > + void consume_items( DataStructure& ds, int consumer_id ) + { + while ( true ) { + int value; + if ( !ds.pop( value ) ) { + if ( producers_finished.load() == NumProducers ) + break; +#ifdef __VXWORKS__ + std::this_thread::yield(); +#endif + continue; + } + + if ( !validate_value< DataStructure >( consumer_id, value ) ) + validation_errors.fetch_add( 1 ); + total_values_consumed.fetch_add( 1 ); + } + + // Drain remaining items + int value; + while ( ds.pop( value ) ) { + if ( !validate_value< DataStructure >( consumer_id, value ) ) + validation_errors.fetch_add( 1 ); + total_values_consumed.fetch_add( 1 ); + } + } + + template < typename DataStructure > + void run( DataStructure& ds ) + { + BOOST_WARN( ds.is_lock_free() ); + producers_finished.store( 0 ); + validation_errors.store( 0 ); + total_values_consumed.store( 0 ); + + BOOST_TEST_REQUIRE( ds.empty() ); + + boost::thread_group producers; + boost::thread_group consumers_group; + + // Spawn producer threads + for ( int i = 0; i < NumProducers; ++i ) + producers.create_thread( [ this, &ds, i ] { + produce_items( ds, i ); + } ); + + // Spawn consumer threads + for ( int i = 0; i < NumConsumers; ++i ) + consumers_group.create_thread( [ this, &ds, i ] { + consume_items( ds, i ); + } ); + + std::cout << "comprehensive stress test: " << NumProducers << " producers, " << NumConsumers + << " consumers created" << std::endl; + + producers.join_all(); + std::cout << "producers finished" << std::endl; + + consumers_group.join_all(); + std::cout << "consumers finished" << std::endl; + + BOOST_TEST_REQUIRE( ds.empty() ); + BOOST_TEST_REQUIRE( validation_errors.load() == 0 ); + BOOST_TEST_REQUIRE( total_values_consumed.load() == NumProducers * values_per_producer ); + } +}; + +} // namespace impl + +using impl::comprehensive_stress_tester; + +namespace impl { + +// Comprehensive stress tester for stacks - counts elements per producer range +// (stacks are LIFO; with concurrent multi-producer/multi-consumer, we can't enforce +// ordering, but we can validate that each element consumed belongs to a valid producer +// range and that the total count per producer matches across all consumers) +template < int NumProducers = 10, int NumConsumers = 1 > +struct comprehensive_stack_stress_tester +{ + enum + { + values_per_producer = 900000, + base_multiplier = 1000000, + }; + + std::atomic< int > producers_finished { 0 }; + std::atomic< int > validation_errors { 0 }; + std::atomic< long > total_values_consumed { 0 }; + + // Per-consumer count of elements from each producer range + struct consumer_state + { + std::array< long, NumProducers > producer_counts {}; + }; + std::array< consumer_state, NumConsumers > consumers; + + template < typename DataStructure > + bool validate_value( int consumer_id, int value ) + { + if ( consumer_id < 0 || consumer_id >= NumConsumers ) + return false; + + int producer_id = value / base_multiplier; + int offset = value % base_multiplier; + + // Check range + if ( producer_id < 0 || producer_id >= NumProducers ) + return false; + if ( offset < 0 || offset >= values_per_producer ) + return false; + + // Increment count for this producer + consumers[ consumer_id ].producer_counts[ producer_id ]++; + return true; + } + + template < typename DataStructure > + void produce_items( DataStructure& ds, int producer_id ) + { + int base = base_multiplier * producer_id; + for ( long i = 0; i < values_per_producer; ++i ) { + int value = base + i; + while ( !ds.push( value ) ) { +#ifdef __VXWORKS__ + std::this_thread::yield(); +#endif + } + } + producers_finished.fetch_add( 1 ); + } + + template < typename DataStructure > + void consume_items( DataStructure& ds, int consumer_id ) + { + while ( true ) { + int value; + if ( !ds.pop( value ) ) { + if ( producers_finished.load() == NumProducers ) + break; +#ifdef __VXWORKS__ + std::this_thread::yield(); +#endif + continue; + } + + if ( !validate_value< DataStructure >( consumer_id, value ) ) + validation_errors.fetch_add( 1 ); + total_values_consumed.fetch_add( 1 ); + } + + // Drain remaining items + int value; + while ( ds.pop( value ) ) { + if ( !validate_value< DataStructure >( consumer_id, value ) ) + validation_errors.fetch_add( 1 ); + total_values_consumed.fetch_add( 1 ); + } + } + + template < typename DataStructure > + void run( DataStructure& ds ) + { + BOOST_WARN( ds.is_lock_free() ); + producers_finished.store( 0 ); + validation_errors.store( 0 ); + total_values_consumed.store( 0 ); + + // Reset consumer counts + for ( int i = 0; i < NumConsumers; ++i ) { + for ( int j = 0; j < NumProducers; ++j ) { + consumers[ i ].producer_counts[ j ] = 0; + } + } + + BOOST_TEST_REQUIRE( ds.empty() ); + + boost::thread_group producers; + boost::thread_group consumers_group; + + // Spawn producer threads + for ( int i = 0; i < NumProducers; ++i ) + producers.create_thread( [ this, &ds, i ] { + produce_items( ds, i ); + } ); + + // Spawn consumer threads + for ( int i = 0; i < NumConsumers; ++i ) + consumers_group.create_thread( [ this, &ds, i ] { + consume_items( ds, i ); + } ); + + std::cout << "comprehensive stack stress test: " << NumProducers << " producers, " << NumConsumers + << " consumers created" << std::endl; + + producers.join_all(); + std::cout << "producers finished" << std::endl; + + consumers_group.join_all(); + std::cout << "consumers finished" << std::endl; + + BOOST_TEST_REQUIRE( ds.empty() ); + BOOST_TEST_REQUIRE( validation_errors.load() == 0 ); + BOOST_TEST_REQUIRE( total_values_consumed.load() == NumProducers * values_per_producer ); + + // Validate that sum of counts for each producer across all consumers equals values_per_producer + for ( int producer_id = 0; producer_id < NumProducers; ++producer_id ) { + long total_for_producer = 0; + for ( int consumer_id = 0; consumer_id < NumConsumers; ++consumer_id ) { + total_for_producer += consumers[ consumer_id ].producer_counts[ producer_id ]; + } + BOOST_TEST_REQUIRE( total_for_producer == values_per_producer ); + } + } +}; + +} // namespace impl + +using impl::comprehensive_stack_stress_tester;