diff --git a/Cargo.lock b/Cargo.lock index 5055b83..650025e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,12 @@ version = "3.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "cast" version = "0.3.0" @@ -123,6 +129,17 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" +[[package]] +name = "cpulist" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3f1007d3c5970349212dad70267b354992aded1147e56920241c5002ebb865d" +dependencies = [ + "itertools 0.14.0", + "new_zealand", + "thiserror", +] + [[package]] name = "criterion" version = "0.6.0" @@ -207,12 +224,45 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", +] + [[package]] name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + +[[package]] +name = "folo_ffi" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51ee31b8da62ea0ade71954c1b9ec66e91ac75e8d5450cf5f6b5f57a8559c0e7" + [[package]] name = "generator" version = "0.8.5" @@ -258,6 +308,25 @@ dependencies = [ "loom", ] +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + [[package]] name = "itertools" version = "0.10.5" @@ -276,6 +345,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" @@ -309,6 +387,7 @@ dependencies = [ "haphazard", "lockfree", "loom", + "many_cpus_benchmarking", "portable-atomic", "rand", ] @@ -347,6 +426,43 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "many_cpus" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "682c64e29156f32d36d61251ba80e62265a465927e06e64ba1e11be8b9a3470b" +dependencies = [ + "cpulist", + "derive_more", + "foldhash", + "folo_ffi", + "heapless", + "itertools 0.14.0", + "libc", + "negative-impl", + "new_zealand", + "nonempty", + "rand", + "smallvec", + "windows", +] + +[[package]] +name = "many_cpus_benchmarking" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397ad52b967a8908b34be27554e2c2cc8471af0c848e06878fc3fef0b00f820e" +dependencies = [ + "cpulist", + "criterion", + "derive_more", + "itertools 0.14.0", + "many_cpus", + "new_zealand", + "nonempty", + "rand", +] + [[package]] name = "matchers" version = "0.1.0" @@ -362,6 +478,29 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "negative-impl" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67670e6848468ee39b823ed93add8bd2adb66a132b0c25fce6e60af0b81d930a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "new_zealand" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dfe8e684b400ea7fdfc0ffed692072a21b8d2813fabfa287ed68ab7817bc981" + +[[package]] +name = "nonempty" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "549e471b99ccaf2f89101bec68f4d244457d5a95a9c3d0672e9564124397741d" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -651,6 +790,12 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "syn" version = "2.0.104" @@ -662,6 +807,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.9" @@ -736,6 +901,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "valuable" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index 597e0ad..35bd1ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,9 @@ arbitrary = { version = "1.4.1", features = ["derive"] } unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)', 'cfg(fuzzing)'] } [features] -default = [ "std" ] -std = [ "haphazard" ] -portable-atomic = [ "dep:portable-atomic" ] +default = ["std"] +std = ["haphazard"] +portable-atomic = ["dep:portable-atomic"] [dependencies] crossbeam-utils = { version = "0.8.21", default-features = false } @@ -31,10 +31,15 @@ arbitrary = { version = "1.4.1", features = ["derive"] } criterion = "0.6.0" crossbeam-queue = "0.3.12" lockfree = "0.5.1" +many_cpus_benchmarking = "0.1.18" rand = "0.9.1" [[bench]] name = "syncqueue" harness = false -required-features = ["std"] # this is the new property +required-features = ["std"] +[[bench]] +name = "syncqueue_many_cpus" +harness = false +required-features = ["std"] diff --git a/benches/syncqueue_many_cpus.rs b/benches/syncqueue_many_cpus.rs new file mode 100644 index 0000000..2bda8f8 --- /dev/null +++ b/benches/syncqueue_many_cpus.rs @@ -0,0 +1,555 @@ +//! Many-CPU benchmarks for queue implementations using the many_cpus_benchmarking framework. +//! +//! This benchmark suite provides a many-CPU variant of the original syncqueue.rs benchmarks, +//! designed to measure how queue performance is affected by memory locality and processor +//! distribution in multi-memory-region systems when used for their intended purpose: +//! **producer-consumer communication**. +//! +//! ## Design Philosophy +//! +//! Queues are fundamentally about inter-thread communication between producers and consumers. +//! This benchmark implements a true producer-consumer pattern where: +//! +//! 1. **Producer worker** - Enqueues items into the shared queue +//! 2. **Consumer worker** - Dequeues items from the shared queue +//! 3. **Shared queue instance** - The communication channel between the two workers +//! +//! This follows the "different actions" pattern from the many_cpus_benchmarking framework, +//! allowing us to measure how memory locality affects inter-thread queue communication +//! performance. **Importantly, we exclude "self" distribution modes** (PinnedSelf, UnpinnedSelf, +//! UnpinnedPerMemoryRegionSelf) because these would prevent true concurrent producer-consumer +//! operation, leading to deadlock scenarios where the consumer waits indefinitely for items +//! that the producer cannot produce due to lack of concurrent execution. +//! +//! ## Memory Locality Effects +//! +//! The benchmark measures several important scenarios: +//! - **Same memory region**: Producer and consumer on processors sharing the same memory +//! - **Different memory regions**: Producer and consumer on processors in different memory regions +//! - **Payload exchange modes**: How performance changes when the queue is allocated in the +//! producer's vs consumer's memory region +//! +//! ## Queue Configuration +//! +//! Queue sizes are set to handle the communication load while matching original benchmark intentions: +//! - AllocBoundedQueue: 2048 (sufficient buffer for producer-consumer communication) +//! - ConstBoundedQueue: 256 (larger than original to handle async producer-consumer pattern) +//! - ArrayQueue: 256 (sufficient buffer for communication) +//! - UnboundedQueue: 1024 segment size (matches original) +//! +//! ## Operation Pattern +//! +//! Uses realistic producer-consumer communication with sufficient operations to measure +//! memory locality effects clearly, with a moderate multiplier to amortize framework overhead. +//! +//! ## Key Insights This Benchmark Measures +//! +//! 1. **Queue allocation locality**: How performance changes when the shared queue is allocated +//! in the producer's vs consumer's memory region (via payload exchange modes) +//! 2. **Cross-memory-region communication**: Performance differences when producer and consumer +//! are in the same vs different memory regions +//! 3. **True queue contention**: Real producer-consumer contention patterns rather than artificial +//! single-threaded enqueue-all/dequeue-all sequences +//! 4. **Implementation comparison**: How different queue implementations handle the memory locality +//! challenges of multi-memory-region systems +//! 5. **Concurrent operation requirement**: Excludes "self" distribution modes that would prevent +//! simultaneous producer-consumer operation, focusing only on truly concurrent scenarios + +use criterion::{Criterion, criterion_group, criterion_main}; +use crossbeam_queue::ArrayQueue; +use lfqueue::{AllocBoundedQueue, ConstBoundedQueue, UnboundedQueue, const_queue}; +use many_cpus_benchmarking::{Payload, WorkDistribution, execute_runs}; +use std::collections::VecDeque; +use std::hint::black_box; +use std::sync::{Arc, Mutex}; + +/// Number of operations each worker performs in the benchmark +/// Producer sends this many items, consumer receives this many items +const OPERATIONS_PER_WORKER: usize = 5000; + +/// Payload for AllocBoundedQueue benchmarking - Producer-Consumer pattern +struct AllocBoundedQueuePayload { + queue: Arc>, + operations: usize, + is_producer: bool, +} + +impl Payload for AllocBoundedQueuePayload { + fn new_pair() -> (Self, Self) { + // Shared queue instance for producer-consumer communication + let shared_queue = Arc::new(AllocBoundedQueue::new(2048)); + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) + } + + fn prepare(&mut self) { + // Pre-populate the queue to establish steady state and avoid initial blocking + if self.is_producer { + for i in 0..1000 { + let _ = self.queue.enqueue(i); + } + } + } + + fn process(&mut self) { + let queue = &self.queue; + + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + // Keep trying until successful (bounded queue may be full) + while queue.enqueue(i).is_err() { + std::hint::spin_loop(); + } + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.dequeue() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } + } + } +} + +/// Payload for UnboundedQueue benchmarking - Producer-Consumer pattern +struct UnboundedQueuePayload { + queue: Arc>, + operations: usize, + is_producer: bool, +} + +impl Payload for UnboundedQueuePayload { + fn new_pair() -> (Self, Self) { + // Shared queue instance for producer-consumer communication + let shared_queue = Arc::new(UnboundedQueue::with_segment_size(1024)); + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) + } + + fn prepare(&mut self) { + // Pre-populate the queue to establish steady state + if self.is_producer { + let mut handle = self.queue.full_handle(); + for i in 0..1000 { + handle.enqueue(i); + } + } + } + + fn process(&mut self) { + let queue = &self.queue; + + if self.is_producer { + // Producer: continuously enqueue items + let mut full_handle = queue.full_handle(); + for i in 0..self.operations { + full_handle.enqueue(i); + } + } else { + // Consumer: continuously dequeue items + let mut full_handle = queue.full_handle(); + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = full_handle.dequeue() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } + } + } +} + +/// Payload for ConstBoundedQueue benchmarking - Producer-Consumer pattern +struct ConstBoundedQueuePayload { + queue: Arc>, // Larger size for producer-consumer pattern + operations: usize, + is_producer: bool, +} + +impl Payload for ConstBoundedQueuePayload { + fn new_pair() -> (Self, Self) { + // Shared queue instance for producer-consumer communication + let shared_queue = Arc::new(const_queue!(usize; 256)); // Larger for async communication + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) + } + + fn prepare(&mut self) { + // Pre-populate the queue to establish steady state + if self.is_producer { + for i in 0..100 { + let _ = self.queue.enqueue(i); + } + } + } + + fn process(&mut self) { + let queue = &self.queue; + + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + // Keep trying until successful (bounded queue may be full) + while queue.enqueue(i).is_err() { + std::hint::spin_loop(); + } + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.dequeue() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } + } + } +} + +/// Payload for Mutex benchmarking - Producer-Consumer pattern +struct MutexQueuePayload { + queue: Arc>>, + operations: usize, + is_producer: bool, +} + +impl Payload for MutexQueuePayload { + fn new_pair() -> (Self, Self) { + // Shared mutex-protected queue for producer-consumer communication + let shared_queue = Arc::new(Mutex::new(VecDeque::new())); + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) + } + + fn prepare(&mut self) { + // Pre-populate the queue to establish steady state + if self.is_producer { + let mut queue = self.queue.lock().unwrap(); + for i in 0..1000 { + queue.push_back(i); + } + } + } + + fn process(&mut self) { + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + self.queue.lock().unwrap().push_back(i); + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + let mut queue = self.queue.lock().unwrap(); + if let Some(value) = queue.pop_front() { + drop(queue); // Release lock before black_box + black_box(value); + break; + } + drop(queue); // Release lock before spin + std::hint::spin_loop(); + } + } + } + } +} + +/// Payload for lockfree::queue::Queue benchmarking - Producer-Consumer pattern +struct LockfreeQueuePayload { + queue: Arc>, + operations: usize, + is_producer: bool, +} + +impl Payload for LockfreeQueuePayload { + fn new_pair() -> (Self, Self) { + // Shared lockfree queue for producer-consumer communication + let shared_queue = Arc::new(lockfree::queue::Queue::new()); + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) + } + + fn prepare(&mut self) { + // Pre-populate the queue to establish steady state + if self.is_producer { + for i in 0..1000 { + self.queue.push(i); + } + } + } + + fn process(&mut self) { + let queue = &self.queue; + + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + queue.push(i); + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.pop() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } + } + } +} + +/// Payload for crossbeam SegQueue benchmarking - Producer-Consumer pattern +struct CrossbeamSegQueuePayload { + queue: Arc>, + operations: usize, + is_producer: bool, +} + +impl Payload for CrossbeamSegQueuePayload { + fn new_pair() -> (Self, Self) { + // Shared crossbeam SegQueue for producer-consumer communication + let shared_queue = Arc::new(crossbeam_queue::SegQueue::new()); + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) + } + + fn prepare(&mut self) { + // Pre-populate the queue to establish steady state + if self.is_producer { + for i in 0..1000 { + self.queue.push(i); + } + } + } + + fn process(&mut self) { + let queue = &self.queue; + + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + queue.push(i); + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.pop() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } + } + } +} + +/// Payload for crossbeam ArrayQueue benchmarking - Producer-Consumer pattern +struct CrossbeamArrayQueuePayload { + queue: Arc>, + operations: usize, + is_producer: bool, +} + +impl Payload for CrossbeamArrayQueuePayload { + fn new_pair() -> (Self, Self) { + // Shared crossbeam ArrayQueue for producer-consumer communication + let shared_queue = Arc::new(ArrayQueue::new(256)); // Larger for async communication + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) + } + + fn prepare(&mut self) { + // Pre-populate the queue to establish steady state + if self.is_producer { + for i in 0..100 { + let _ = self.queue.push(i); + } + } + } + + fn process(&mut self) { + let queue = &self.queue; + + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + // Keep trying until successful (bounded queue may be full) + while queue.push(i).is_err() { + std::hint::spin_loop(); + } + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.pop() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } + } + } +} + +// Benchmark functions implementing producer-consumer patterns + +/// Producer-Consumer benchmark for AllocBoundedQueue +fn bench_alloc_bounded_queue_many_cpus(c: &mut Criterion) { + // Different actions (producer vs consumer), but exclude "self" modes to avoid deadlock + // Self modes would prevent concurrent producer-consumer operation leading to deadlock + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); +} + +/// Producer-Consumer benchmark for UnboundedQueue +fn bench_unbounded_queue_many_cpus(c: &mut Criterion) { + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); +} + +/// Producer-Consumer benchmark for ConstBoundedQueue +fn bench_const_bounded_queue_many_cpus(c: &mut Criterion) { + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); +} + +/// Producer-Consumer benchmark for Mutex +fn bench_mutex_queue_many_cpus(c: &mut Criterion) { + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); +} + +/// Producer-Consumer benchmark for lockfree::queue::Queue +fn bench_lockfree_queue_many_cpus(c: &mut Criterion) { + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); +} + +/// Producer-Consumer benchmark for crossbeam SegQueue +fn bench_crossbeam_seg_queue_many_cpus(c: &mut Criterion) { + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); +} + +/// Producer-Consumer benchmark for crossbeam ArrayQueue +fn bench_crossbeam_array_queue_many_cpus(c: &mut Criterion) { + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); +} + +criterion_group!( + syncqueue_many_cpus_benchmarks, + // Producer-Consumer benchmarks for all queue implementations + bench_alloc_bounded_queue_many_cpus, + bench_const_bounded_queue_many_cpus, + bench_unbounded_queue_many_cpus, + bench_mutex_queue_many_cpus, + bench_lockfree_queue_many_cpus, + bench_crossbeam_array_queue_many_cpus, + bench_crossbeam_seg_queue_many_cpus, +); + +criterion_main!(syncqueue_many_cpus_benchmarks);