From a5c92368655c79aa1a47062ee8ee31d87f7ecb46 Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Sun, 29 Jun 2025 08:15:30 +0300 Subject: [PATCH 1/7] Initial many_cpus benchmark --- Cargo.lock | 171 ++++++++++++ Cargo.toml | 13 +- benches/syncqueue_many_cpus.rs | 495 +++++++++++++++++++++++++++++++++ 3 files changed, 675 insertions(+), 4 deletions(-) create mode 100644 benches/syncqueue_many_cpus.rs diff --git a/Cargo.lock b/Cargo.lock index 5055b83..1e46d7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,12 @@ version = "3.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "cast" version = "0.3.0" @@ -123,6 +129,17 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" +[[package]] +name = "cpulist" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3f1007d3c5970349212dad70267b354992aded1147e56920241c5002ebb865d" +dependencies = [ + "itertools 0.14.0", + "new_zealand", + "thiserror", +] + [[package]] name = "criterion" version = "0.6.0" @@ -207,12 +224,45 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", +] + [[package]] name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + +[[package]] +name = "folo_ffi" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51ee31b8da62ea0ade71954c1b9ec66e91ac75e8d5450cf5f6b5f57a8559c0e7" + [[package]] name = "generator" version = "0.8.5" @@ -258,6 +308,25 @@ dependencies = [ "loom", ] +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + [[package]] name = "itertools" version = "0.10.5" @@ -276,6 +345,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" @@ -309,6 +387,7 @@ dependencies = [ "haphazard", "lockfree", "loom", + "many_cpus_benchmarking", "portable-atomic", "rand", ] @@ -347,6 +426,43 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "many_cpus" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9449764c390da2db0a251a82c4834637dd51127c108e0016bcf4fc1baa2183ce" +dependencies = [ + "cpulist", + "derive_more", + "foldhash", + "folo_ffi", + "heapless", + "itertools 0.14.0", + "libc", + "negative-impl", + "new_zealand", + "nonempty", + "rand", + "smallvec", + "windows", +] + +[[package]] +name = "many_cpus_benchmarking" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fe6628db6a085039ce36b80decd2c2b270ebe36d9b64f00291afe5c0f5df407" +dependencies = [ + "cpulist", + "criterion", + "derive_more", + "itertools 0.14.0", + "many_cpus", + "new_zealand", + "nonempty", + "rand", +] + [[package]] name = "matchers" version = "0.1.0" @@ -362,6 +478,29 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "negative-impl" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67670e6848468ee39b823ed93add8bd2adb66a132b0c25fce6e60af0b81d930a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "new_zealand" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dfe8e684b400ea7fdfc0ffed692072a21b8d2813fabfa287ed68ab7817bc981" + +[[package]] +name = "nonempty" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "549e471b99ccaf2f89101bec68f4d244457d5a95a9c3d0672e9564124397741d" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -651,6 +790,12 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "syn" version = "2.0.104" @@ -662,6 +807,26 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.9" @@ -736,6 +901,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "valuable" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index 597e0ad..492eab5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,9 @@ arbitrary = { version = "1.4.1", features = ["derive"] } unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)', 'cfg(fuzzing)'] } [features] -default = [ "std" ] -std = [ "haphazard" ] -portable-atomic = [ "dep:portable-atomic" ] +default = ["std"] +std = ["haphazard"] +portable-atomic = ["dep:portable-atomic"] [dependencies] crossbeam-utils = { version = "0.8.21", default-features = false } @@ -31,10 +31,15 @@ arbitrary = { version = "1.4.1", features = ["derive"] } criterion = "0.6.0" crossbeam-queue = "0.3.12" lockfree = "0.5.1" +many_cpus_benchmarking = "0.1" rand = "0.9.1" [[bench]] name = "syncqueue" harness = false -required-features = ["std"] # this is the new property +required-features = ["std"] +[[bench]] +name = "syncqueue_many_cpus" +harness = false +required-features = ["std"] diff --git a/benches/syncqueue_many_cpus.rs b/benches/syncqueue_many_cpus.rs new file mode 100644 index 0000000..59e7613 --- /dev/null +++ b/benches/syncqueue_many_cpus.rs @@ -0,0 +1,495 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use crossbeam_queue::ArrayQueue; +use lfqueue::{AllocBoundedQueue, ConstBoundedQueue, UnboundedQueue, const_queue}; +use many_cpus_benchmarking::{execute_runs, Payload, WorkDistribution}; +use std::collections::VecDeque; +use std::hint::black_box; +use std::sync::{Arc, Mutex}; + +// Configuration patterns matching the original PARAM_CONFIGS +// We scale down the operations significantly for the many_cpus_benchmarking framework +const SMALL_CONFIG: (usize, usize) = (1, 10); // Original: (1, 100) +const MEDIUM_CONFIG: (usize, usize) = (10, 10); // Original: (10, 100) +const LARGE_CONFIG: (usize, usize) = (100, 10); // Original: (100, 100) +const XLARGE_CONFIG: (usize, usize) = (100, 100); // Original: (100, 10000) + +/// Generic payload for benchmarking different queue types +/// Each worker creates its own queue and performs enqueue-dequeue cycles +/// This pattern matches the original syncqueue.rs behavior where each thread +/// does enqueue operations followed by dequeue operations +#[derive(Debug)] +struct QueuePayload { + queue: Option>, + operations: usize, + queue_factory: fn() -> Q, +} + +impl QueuePayload { + fn new_with_factory(operations: usize, factory: fn() -> Q) -> (Self, Self) { + ( + Self { + queue: None, + operations, + queue_factory: factory, + }, + Self { + queue: None, + operations, + queue_factory: factory, + }, + ) + } +} + +/// Payload for AllocBoundedQueue benchmarking +struct AllocBoundedQueuePayload { + queue: Option>>, + operations: usize, + queue_size: usize, +} + +impl Payload for AllocBoundedQueuePayload { + fn new_pair() -> (Self, Self) { + ( + Self { + queue: None, + operations: 10, // Default small operations + queue_size: 1024, + }, + Self { + queue: None, + operations: 10, + queue_size: 1024, + }, + ) + } + + fn prepare(&mut self) { + // Each worker creates its own queue in its memory region + self.queue = Some(Arc::new(AllocBoundedQueue::new(self.queue_size))); + } + + fn process(&mut self) { + let queue = self.queue.as_ref().expect("Queue should be initialized"); + + // Pattern from original: enqueue all items, then dequeue all items + // Enqueue phase + for i in 0..self.operations { + let mut attempts = 0; + while queue.enqueue(i).is_err() && attempts < 50 { + std::thread::yield_now(); + attempts += 1; + } + } + + // Dequeue phase + for _ in 0..self.operations { + let mut attempts = 0; + while queue.dequeue().is_none() && attempts < 50 { + std::thread::yield_now(); + attempts += 1; + } + } + } +} + +/// Payload for UnboundedQueue benchmarking (matches bench_lscq_queue) +struct UnboundedQueuePayload { + queue: Option>>, + operations: usize, + segment_size: usize, +} + +impl Payload for UnboundedQueuePayload { + fn new_pair() -> (Self, Self) { + ( + Self { + queue: None, + operations: 10, + segment_size: 1024, + }, + Self { + queue: None, + operations: 10, + segment_size: 1024, + }, + ) + } + + fn prepare(&mut self) { + self.queue = Some(Arc::new(UnboundedQueue::with_segment_size(self.segment_size))); + } + + fn process(&mut self) { + let queue = self.queue.as_ref().expect("Queue should be initialized"); + let mut full_handle = queue.full_handle(); + + // Pattern from original run_benchmark_lscq: enqueue all, then dequeue all + for i in 0..self.operations { + full_handle.enqueue(i); + } + + for _ in 0..self.operations { + black_box(full_handle.dequeue()); + } + } +} + +/// Payload for ConstBoundedQueue benchmarking +struct ConstBoundedQueuePayload { + queue: Option>>, // Using 64 to match const_queue!(usize; 32) -> 64 size + operations: usize, +} + +impl Payload for ConstBoundedQueuePayload { + fn new_pair() -> (Self, Self) { + ( + Self { + queue: None, + operations: 5, // Smaller for const queue + }, + Self { + queue: None, + operations: 5, + }, + ) + } + + fn prepare(&mut self) { + self.queue = Some(Arc::new(const_queue!(usize; 32))); + } + + fn process(&mut self) { + let queue = self.queue.as_ref().expect("Queue should be initialized"); + + // Enqueue phase + for i in 0..self.operations { + let mut attempts = 0; + while queue.enqueue(i).is_err() && attempts < 20 { + std::thread::yield_now(); + attempts += 1; + } + } + + // Dequeue phase + for _ in 0..self.operations { + let mut attempts = 0; + while queue.dequeue().is_none() && attempts < 20 { + std::thread::yield_now(); + attempts += 1; + } + } + } +} + +/// Payload for Mutex benchmarking +struct MutexQueuePayload { + queue: Option>>>, + operations: usize, +} + +impl Payload for MutexQueuePayload { + fn new_pair() -> (Self, Self) { + ( + Self { + queue: None, + operations: 10, + }, + Self { + queue: None, + operations: 10, + }, + ) + } + + fn prepare(&mut self) { + self.queue = Some(Arc::new(Mutex::new(VecDeque::new()))); + } + + fn process(&mut self) { + let queue = self.queue.as_ref().expect("Queue should be initialized"); + + // Enqueue phase + for i in 0..self.operations { + queue.lock().unwrap().push_back(i); + } + + // Dequeue phase + for _ in 0..self.operations { + black_box(queue.lock().unwrap().pop_front()); + } + } +} + +/// Payload for lockfree::queue::Queue benchmarking +struct LockfreeQueuePayload { + queue: Option>>, + operations: usize, +} + +impl Payload for LockfreeQueuePayload { + fn new_pair() -> (Self, Self) { + ( + Self { + queue: None, + operations: 10, + }, + Self { + queue: None, + operations: 10, + }, + ) + } + + fn prepare(&mut self) { + self.queue = Some(Arc::new(lockfree::queue::Queue::new())); + } + + fn process(&mut self) { + let queue = self.queue.as_ref().expect("Queue should be initialized"); + + // Enqueue phase + for i in 0..self.operations { + queue.push(i); + } + + // Dequeue phase + for _ in 0..self.operations { + black_box(queue.pop()); + } + } +} + +/// Payload for crossbeam SegQueue benchmarking +struct CrossbeamSegQueuePayload { + queue: Option>>, + operations: usize, +} + +impl Payload for CrossbeamSegQueuePayload { + fn new_pair() -> (Self, Self) { + ( + Self { + queue: None, + operations: 10, + }, + Self { + queue: None, + operations: 10, + }, + ) + } + + fn prepare(&mut self) { + self.queue = Some(Arc::new(crossbeam_queue::SegQueue::new())); + } + + fn process(&mut self) { + let queue = self.queue.as_ref().expect("Queue should be initialized"); + + // Enqueue phase + for i in 0..self.operations { + queue.push(i); + } + + // Dequeue phase + for _ in 0..self.operations { + black_box(queue.pop()); + } + } +} + +/// Payload for crossbeam ArrayQueue benchmarking +struct CrossbeamArrayQueuePayload { + queue: Option>>, + operations: usize, +} + +impl Payload for CrossbeamArrayQueuePayload { + fn new_pair() -> (Self, Self) { + ( + Self { + queue: None, + operations: 5, // Smaller for bounded array queue + }, + Self { + queue: None, + operations: 5, + }, + ) + } + + fn prepare(&mut self) { + self.queue = Some(Arc::new(ArrayQueue::new(32))); + } + + fn process(&mut self) { + let queue = self.queue.as_ref().expect("Queue should be initialized"); + + // Enqueue phase + for i in 0..self.operations { + let mut attempts = 0; + while queue.push(i).is_err() && attempts < 20 { + std::thread::yield_now(); + attempts += 1; + } + } + + // Dequeue phase + for _ in 0..self.operations { + let mut attempts = 0; + while queue.pop().is_none() && attempts < 20 { + std::thread::yield_now(); + attempts += 1; + } + } + } +} + +// Helper function to create different operation counts for different configurations +impl AllocBoundedQueuePayload { + fn with_config(config: (usize, usize)) -> (Self, Self) { + let (_, ops) = config; + let scaled_ops = std::cmp::max(1, ops / 10); // Scale down operations + + ( + Self { + queue: None, + operations: scaled_ops, + queue_size: 1024, + }, + Self { + queue: None, + operations: scaled_ops, + queue_size: 1024, + }, + ) + } +} + +// Similar helper implementations for other payload types +impl UnboundedQueuePayload { + fn with_config(config: (usize, usize)) -> (Self, Self) { + let (_, ops) = config; + let scaled_ops = std::cmp::max(1, ops / 10); + + ( + Self { + queue: None, + operations: scaled_ops, + segment_size: 1024, + }, + Self { + queue: None, + operations: scaled_ops, + segment_size: 1024, + }, + ) + } +} + +// Benchmark functions equivalent to the original syncqueue.rs + +/// Equivalent to bench_alloc_bounded_queue +fn bench_alloc_bounded_queue_many_cpus(c: &mut Criterion) { + execute_runs::( + c, + WorkDistribution::all_with_unique_processors_without_self() + ); +} + +/// Equivalent to bench_lscq_queue (UnboundedQueue) +fn bench_unbounded_queue_many_cpus(c: &mut Criterion) { + execute_runs::( + c, + WorkDistribution::all_with_unique_processors_without_self() + ); +} + +/// Equivalent to bench_const_bounded_queue +fn bench_const_bounded_queue_many_cpus(c: &mut Criterion) { + execute_runs::( + c, + WorkDistribution::all_with_unique_processors_without_self() + ); +} + +/// Equivalent to bench_mutex_queue +fn bench_mutex_queue_many_cpus(c: &mut Criterion) { + execute_runs::( + c, + WorkDistribution::all_with_unique_processors_without_self() + ); +} + +/// Equivalent to bench_lockfree_queue +fn bench_lockfree_queue_many_cpus(c: &mut Criterion) { + execute_runs::( + c, + WorkDistribution::all_with_unique_processors_without_self() + ); +} + +/// Equivalent to bench_crossbeam_seg_queue +fn bench_crossbeam_seg_queue_many_cpus(c: &mut Criterion) { + execute_runs::( + c, + WorkDistribution::all_with_unique_processors_without_self() + ); +} + +/// Equivalent to bench_crossbeam_array_queue +fn bench_crossbeam_array_queue_many_cpus(c: &mut Criterion) { + execute_runs::( + c, + WorkDistribution::all_with_unique_processors_without_self() + ); +} + +/// Memory region focused benchmarks - tests the most important memory effects +fn bench_memory_region_comparison(c: &mut Criterion) { + // Compare key memory region effects for the main queue types + execute_runs::(c, &[ + WorkDistribution::PinnedMemoryRegionPairs, // Cross-memory-region + WorkDistribution::PinnedSameMemoryRegion, // Same memory region + ]); + + execute_runs::(c, &[ + WorkDistribution::PinnedMemoryRegionPairs, + WorkDistribution::PinnedSameMemoryRegion, + ]); +} + +/// High-throughput tests with payload multipliers +fn bench_high_throughput_many_cpus(c: &mut Criterion) { + // Test with higher payload multipliers to reduce harness overhead + execute_runs::( + c, + &[WorkDistribution::PinnedMemoryRegionPairs] + ); + + execute_runs::( + c, + &[WorkDistribution::PinnedMemoryRegionPairs] + ); +} + +criterion_group!( + syncqueue_many_cpus_benchmarks, + // Main queue benchmarks (equivalent to the active ones in original) + bench_alloc_bounded_queue_many_cpus, + + // All the commented-out benchmarks from original (now uncommented) + bench_const_bounded_queue_many_cpus, + bench_unbounded_queue_many_cpus, + bench_mutex_queue_many_cpus, + bench_lockfree_queue_many_cpus, + bench_crossbeam_array_queue_many_cpus, + bench_crossbeam_seg_queue_many_cpus, + + // Additional memory-region focused tests + bench_memory_region_comparison, + bench_high_throughput_many_cpus, +); + +criterion_main!(syncqueue_many_cpus_benchmarks); From 275f9885d29e846385a8b44ea80e6b15b30c44ef Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Sun, 29 Jun 2025 09:04:52 +0300 Subject: [PATCH 2/7] Less wrong --- benches/syncqueue_many_cpus.rs | 163 +++++++++------------------------ 1 file changed, 44 insertions(+), 119 deletions(-) diff --git a/benches/syncqueue_many_cpus.rs b/benches/syncqueue_many_cpus.rs index 59e7613..a5ea4c2 100644 --- a/benches/syncqueue_many_cpus.rs +++ b/benches/syncqueue_many_cpus.rs @@ -6,71 +6,33 @@ use std::collections::VecDeque; use std::hint::black_box; use std::sync::{Arc, Mutex}; -// Configuration patterns matching the original PARAM_CONFIGS -// We scale down the operations significantly for the many_cpus_benchmarking framework -const SMALL_CONFIG: (usize, usize) = (1, 10); // Original: (1, 100) -const MEDIUM_CONFIG: (usize, usize) = (10, 10); // Original: (10, 100) -const LARGE_CONFIG: (usize, usize) = (100, 10); // Original: (100, 100) -const XLARGE_CONFIG: (usize, usize) = (100, 100); // Original: (100, 10000) - -/// Generic payload for benchmarking different queue types -/// Each worker creates its own queue and performs enqueue-dequeue cycles -/// This pattern matches the original syncqueue.rs behavior where each thread -/// does enqueue operations followed by dequeue operations -#[derive(Debug)] -struct QueuePayload { - queue: Option>, - operations: usize, - queue_factory: fn() -> Q, -} - -impl QueuePayload { - fn new_with_factory(operations: usize, factory: fn() -> Q) -> (Self, Self) { - ( - Self { - queue: None, - operations, - queue_factory: factory, - }, - Self { - queue: None, - operations, - queue_factory: factory, - }, - ) - } -} - /// Payload for AllocBoundedQueue benchmarking struct AllocBoundedQueuePayload { - queue: Option>>, + queue: Arc>, operations: usize, - queue_size: usize, } impl Payload for AllocBoundedQueuePayload { fn new_pair() -> (Self, Self) { + let shared_queue = Arc::new(AllocBoundedQueue::new(1024)); ( Self { - queue: None, + queue: shared_queue.clone(), operations: 10, // Default small operations - queue_size: 1024, }, Self { - queue: None, + queue: shared_queue, operations: 10, - queue_size: 1024, }, ) } fn prepare(&mut self) { - // Each worker creates its own queue in its memory region - self.queue = Some(Arc::new(AllocBoundedQueue::new(self.queue_size))); + // Queue is already created and shared between workers } fn process(&mut self) { - let queue = self.queue.as_ref().expect("Queue should be initialized"); + let queue = &self.queue; // Pattern from original: enqueue all items, then dequeue all items // Enqueue phase @@ -95,33 +57,31 @@ impl Payload for AllocBoundedQueuePayload { /// Payload for UnboundedQueue benchmarking (matches bench_lscq_queue) struct UnboundedQueuePayload { - queue: Option>>, + queue: Arc>, operations: usize, - segment_size: usize, } impl Payload for UnboundedQueuePayload { fn new_pair() -> (Self, Self) { + let shared_queue = Arc::new(UnboundedQueue::with_segment_size(1024)); ( Self { - queue: None, + queue: shared_queue.clone(), operations: 10, - segment_size: 1024, }, Self { - queue: None, + queue: shared_queue, operations: 10, - segment_size: 1024, }, ) } fn prepare(&mut self) { - self.queue = Some(Arc::new(UnboundedQueue::with_segment_size(self.segment_size))); + // Queue is already created and shared between workers } fn process(&mut self) { - let queue = self.queue.as_ref().expect("Queue should be initialized"); + let queue = &self.queue; let mut full_handle = queue.full_handle(); // Pattern from original run_benchmark_lscq: enqueue all, then dequeue all @@ -137,30 +97,33 @@ impl Payload for UnboundedQueuePayload { /// Payload for ConstBoundedQueue benchmarking struct ConstBoundedQueuePayload { - queue: Option>>, // Using 64 to match const_queue!(usize; 32) -> 64 size + queue: Arc>, // Using 64 to match const_queue!(usize; 32) -> 64 size operations: usize, } impl Payload for ConstBoundedQueuePayload { fn new_pair() -> (Self, Self) { + #[allow(clippy::unused_unit)] + let shared_queue = Arc::new(const_queue!(usize; 32)); + ( Self { - queue: None, + queue: shared_queue.clone(), operations: 5, // Smaller for const queue }, Self { - queue: None, + queue: shared_queue, operations: 5, }, ) } fn prepare(&mut self) { - self.queue = Some(Arc::new(const_queue!(usize; 32))); + // Queue is already created and shared between workers } fn process(&mut self) { - let queue = self.queue.as_ref().expect("Queue should be initialized"); + let queue = &self.queue; // Enqueue phase for i in 0..self.operations { @@ -184,30 +147,31 @@ impl Payload for ConstBoundedQueuePayload { /// Payload for Mutex benchmarking struct MutexQueuePayload { - queue: Option>>>, + queue: Arc>>, operations: usize, } impl Payload for MutexQueuePayload { fn new_pair() -> (Self, Self) { + let shared_queue = Arc::new(Mutex::new(VecDeque::new())); ( Self { - queue: None, + queue: shared_queue.clone(), operations: 10, }, Self { - queue: None, + queue: shared_queue, operations: 10, }, ) } fn prepare(&mut self) { - self.queue = Some(Arc::new(Mutex::new(VecDeque::new()))); + // Queue is already created and shared between workers } fn process(&mut self) { - let queue = self.queue.as_ref().expect("Queue should be initialized"); + let queue = &self.queue; // Enqueue phase for i in 0..self.operations { @@ -223,30 +187,31 @@ impl Payload for MutexQueuePayload { /// Payload for lockfree::queue::Queue benchmarking struct LockfreeQueuePayload { - queue: Option>>, + queue: Arc>, operations: usize, } impl Payload for LockfreeQueuePayload { fn new_pair() -> (Self, Self) { + let shared_queue = Arc::new(lockfree::queue::Queue::new()); ( Self { - queue: None, + queue: shared_queue.clone(), operations: 10, }, Self { - queue: None, + queue: shared_queue, operations: 10, }, ) } fn prepare(&mut self) { - self.queue = Some(Arc::new(lockfree::queue::Queue::new())); + // Queue is already created and shared between workers } fn process(&mut self) { - let queue = self.queue.as_ref().expect("Queue should be initialized"); + let queue = &self.queue; // Enqueue phase for i in 0..self.operations { @@ -262,30 +227,31 @@ impl Payload for LockfreeQueuePayload { /// Payload for crossbeam SegQueue benchmarking struct CrossbeamSegQueuePayload { - queue: Option>>, + queue: Arc>, operations: usize, } impl Payload for CrossbeamSegQueuePayload { fn new_pair() -> (Self, Self) { + let shared_queue = Arc::new(crossbeam_queue::SegQueue::new()); ( Self { - queue: None, + queue: shared_queue.clone(), operations: 10, }, Self { - queue: None, + queue: shared_queue, operations: 10, }, ) } fn prepare(&mut self) { - self.queue = Some(Arc::new(crossbeam_queue::SegQueue::new())); + // Queue is already created and shared between workers } fn process(&mut self) { - let queue = self.queue.as_ref().expect("Queue should be initialized"); + let queue = &self.queue; // Enqueue phase for i in 0..self.operations { @@ -301,30 +267,31 @@ impl Payload for CrossbeamSegQueuePayload { /// Payload for crossbeam ArrayQueue benchmarking struct CrossbeamArrayQueuePayload { - queue: Option>>, + queue: Arc>, operations: usize, } impl Payload for CrossbeamArrayQueuePayload { fn new_pair() -> (Self, Self) { + let shared_queue = Arc::new(ArrayQueue::new(32)); ( Self { - queue: None, + queue: shared_queue.clone(), operations: 5, // Smaller for bounded array queue }, Self { - queue: None, + queue: shared_queue, operations: 5, }, ) } fn prepare(&mut self) { - self.queue = Some(Arc::new(ArrayQueue::new(32))); + // Queue is already created and shared between workers } fn process(&mut self) { - let queue = self.queue.as_ref().expect("Queue should be initialized"); + let queue = &self.queue; // Enqueue phase for i in 0..self.operations { @@ -346,48 +313,6 @@ impl Payload for CrossbeamArrayQueuePayload { } } -// Helper function to create different operation counts for different configurations -impl AllocBoundedQueuePayload { - fn with_config(config: (usize, usize)) -> (Self, Self) { - let (_, ops) = config; - let scaled_ops = std::cmp::max(1, ops / 10); // Scale down operations - - ( - Self { - queue: None, - operations: scaled_ops, - queue_size: 1024, - }, - Self { - queue: None, - operations: scaled_ops, - queue_size: 1024, - }, - ) - } -} - -// Similar helper implementations for other payload types -impl UnboundedQueuePayload { - fn with_config(config: (usize, usize)) -> (Self, Self) { - let (_, ops) = config; - let scaled_ops = std::cmp::max(1, ops / 10); - - ( - Self { - queue: None, - operations: scaled_ops, - segment_size: 1024, - }, - Self { - queue: None, - operations: scaled_ops, - segment_size: 1024, - }, - ) - } -} - // Benchmark functions equivalent to the original syncqueue.rs /// Equivalent to bench_alloc_bounded_queue From d906dc36229bda16aaa936a44a3b5858a7c24b2c Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Sun, 29 Jun 2025 09:10:16 +0300 Subject: [PATCH 3/7] better --- benches/syncqueue_many_cpus.rs | 102 ++++++++------------------------- 1 file changed, 24 insertions(+), 78 deletions(-) diff --git a/benches/syncqueue_many_cpus.rs b/benches/syncqueue_many_cpus.rs index a5ea4c2..8cfd8fe 100644 --- a/benches/syncqueue_many_cpus.rs +++ b/benches/syncqueue_many_cpus.rs @@ -1,7 +1,7 @@ use criterion::{Criterion, criterion_group, criterion_main}; use crossbeam_queue::ArrayQueue; use lfqueue::{AllocBoundedQueue, ConstBoundedQueue, UnboundedQueue, const_queue}; -use many_cpus_benchmarking::{execute_runs, Payload, WorkDistribution}; +use many_cpus_benchmarking::{Payload, WorkDistribution, execute_runs}; use std::collections::VecDeque; use std::hint::black_box; use std::sync::{Arc, Mutex}; @@ -33,7 +33,7 @@ impl Payload for AllocBoundedQueuePayload { fn process(&mut self) { let queue = &self.queue; - + // Pattern from original: enqueue all items, then dequeue all items // Enqueue phase for i in 0..self.operations { @@ -43,7 +43,7 @@ impl Payload for AllocBoundedQueuePayload { attempts += 1; } } - + // Dequeue phase for _ in 0..self.operations { let mut attempts = 0; @@ -83,12 +83,12 @@ impl Payload for UnboundedQueuePayload { fn process(&mut self) { let queue = &self.queue; let mut full_handle = queue.full_handle(); - + // Pattern from original run_benchmark_lscq: enqueue all, then dequeue all for i in 0..self.operations { full_handle.enqueue(i); } - + for _ in 0..self.operations { black_box(full_handle.dequeue()); } @@ -105,7 +105,7 @@ impl Payload for ConstBoundedQueuePayload { fn new_pair() -> (Self, Self) { #[allow(clippy::unused_unit)] let shared_queue = Arc::new(const_queue!(usize; 32)); - + ( Self { queue: shared_queue.clone(), @@ -124,7 +124,7 @@ impl Payload for ConstBoundedQueuePayload { fn process(&mut self) { let queue = &self.queue; - + // Enqueue phase for i in 0..self.operations { let mut attempts = 0; @@ -133,7 +133,7 @@ impl Payload for ConstBoundedQueuePayload { attempts += 1; } } - + // Dequeue phase for _ in 0..self.operations { let mut attempts = 0; @@ -172,13 +172,13 @@ impl Payload for MutexQueuePayload { fn process(&mut self) { let queue = &self.queue; - + // Enqueue phase for i in 0..self.operations { queue.lock().unwrap().push_back(i); } - - // Dequeue phase + + // Dequeue phase for _ in 0..self.operations { black_box(queue.lock().unwrap().pop_front()); } @@ -212,12 +212,12 @@ impl Payload for LockfreeQueuePayload { fn process(&mut self) { let queue = &self.queue; - + // Enqueue phase for i in 0..self.operations { queue.push(i); } - + // Dequeue phase for _ in 0..self.operations { black_box(queue.pop()); @@ -252,12 +252,12 @@ impl Payload for CrossbeamSegQueuePayload { fn process(&mut self) { let queue = &self.queue; - + // Enqueue phase for i in 0..self.operations { queue.push(i); } - + // Dequeue phase for _ in 0..self.operations { black_box(queue.pop()); @@ -292,7 +292,7 @@ impl Payload for CrossbeamArrayQueuePayload { fn process(&mut self) { let queue = &self.queue; - + // Enqueue phase for i in 0..self.operations { let mut attempts = 0; @@ -301,7 +301,7 @@ impl Payload for CrossbeamArrayQueuePayload { attempts += 1; } } - + // Dequeue phase for _ in 0..self.operations { let mut attempts = 0; @@ -317,93 +317,43 @@ impl Payload for CrossbeamArrayQueuePayload { /// Equivalent to bench_alloc_bounded_queue fn bench_alloc_bounded_queue_many_cpus(c: &mut Criterion) { - execute_runs::( - c, - WorkDistribution::all_with_unique_processors_without_self() - ); + execute_runs::(c, WorkDistribution::all()); } /// Equivalent to bench_lscq_queue (UnboundedQueue) fn bench_unbounded_queue_many_cpus(c: &mut Criterion) { - execute_runs::( - c, - WorkDistribution::all_with_unique_processors_without_self() - ); + execute_runs::(c, WorkDistribution::all()); } /// Equivalent to bench_const_bounded_queue fn bench_const_bounded_queue_many_cpus(c: &mut Criterion) { - execute_runs::( - c, - WorkDistribution::all_with_unique_processors_without_self() - ); + execute_runs::(c, WorkDistribution::all()); } /// Equivalent to bench_mutex_queue fn bench_mutex_queue_many_cpus(c: &mut Criterion) { - execute_runs::( - c, - WorkDistribution::all_with_unique_processors_without_self() - ); + execute_runs::(c, WorkDistribution::all()); } /// Equivalent to bench_lockfree_queue fn bench_lockfree_queue_many_cpus(c: &mut Criterion) { - execute_runs::( - c, - WorkDistribution::all_with_unique_processors_without_self() - ); + execute_runs::(c, WorkDistribution::all()); } /// Equivalent to bench_crossbeam_seg_queue fn bench_crossbeam_seg_queue_many_cpus(c: &mut Criterion) { - execute_runs::( - c, - WorkDistribution::all_with_unique_processors_without_self() - ); + execute_runs::(c, WorkDistribution::all()); } /// Equivalent to bench_crossbeam_array_queue fn bench_crossbeam_array_queue_many_cpus(c: &mut Criterion) { - execute_runs::( - c, - WorkDistribution::all_with_unique_processors_without_self() - ); -} - -/// Memory region focused benchmarks - tests the most important memory effects -fn bench_memory_region_comparison(c: &mut Criterion) { - // Compare key memory region effects for the main queue types - execute_runs::(c, &[ - WorkDistribution::PinnedMemoryRegionPairs, // Cross-memory-region - WorkDistribution::PinnedSameMemoryRegion, // Same memory region - ]); - - execute_runs::(c, &[ - WorkDistribution::PinnedMemoryRegionPairs, - WorkDistribution::PinnedSameMemoryRegion, - ]); -} - -/// High-throughput tests with payload multipliers -fn bench_high_throughput_many_cpus(c: &mut Criterion) { - // Test with higher payload multipliers to reduce harness overhead - execute_runs::( - c, - &[WorkDistribution::PinnedMemoryRegionPairs] - ); - - execute_runs::( - c, - &[WorkDistribution::PinnedMemoryRegionPairs] - ); + execute_runs::(c, WorkDistribution::all()); } criterion_group!( syncqueue_many_cpus_benchmarks, // Main queue benchmarks (equivalent to the active ones in original) bench_alloc_bounded_queue_many_cpus, - // All the commented-out benchmarks from original (now uncommented) bench_const_bounded_queue_many_cpus, bench_unbounded_queue_many_cpus, @@ -411,10 +361,6 @@ criterion_group!( bench_lockfree_queue_many_cpus, bench_crossbeam_array_queue_many_cpus, bench_crossbeam_seg_queue_many_cpus, - - // Additional memory-region focused tests - bench_memory_region_comparison, - bench_high_throughput_many_cpus, ); criterion_main!(syncqueue_many_cpus_benchmarks); From d8171846dbf6b4b2600a56e9f5ea0d047e9ddad9 Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Sun, 29 Jun 2025 09:40:22 +0300 Subject: [PATCH 4/7] MAtch original better --- benches/syncqueue_many_cpus.rs | 67 ++++++++++------------------------ 1 file changed, 20 insertions(+), 47 deletions(-) diff --git a/benches/syncqueue_many_cpus.rs b/benches/syncqueue_many_cpus.rs index 8cfd8fe..c9fadea 100644 --- a/benches/syncqueue_many_cpus.rs +++ b/benches/syncqueue_many_cpus.rs @@ -18,11 +18,11 @@ impl Payload for AllocBoundedQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 10, // Default small operations + operations: 100, // Match original syncqueue.rs scale }, Self { queue: shared_queue, - operations: 10, + operations: 100, }, ) } @@ -30,27 +30,18 @@ impl Payload for AllocBoundedQueuePayload { fn prepare(&mut self) { // Queue is already created and shared between workers } - fn process(&mut self) { let queue = &self.queue; // Pattern from original: enqueue all items, then dequeue all items // Enqueue phase for i in 0..self.operations { - let mut attempts = 0; - while queue.enqueue(i).is_err() && attempts < 50 { - std::thread::yield_now(); - attempts += 1; - } + let _ = queue.enqueue(i); } // Dequeue phase for _ in 0..self.operations { - let mut attempts = 0; - while queue.dequeue().is_none() && attempts < 50 { - std::thread::yield_now(); - attempts += 1; - } + black_box(queue.dequeue()); } } } @@ -67,11 +58,11 @@ impl Payload for UnboundedQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 10, + operations: 100, }, Self { queue: shared_queue, - operations: 10, + operations: 100, }, ) } @@ -109,11 +100,11 @@ impl Payload for ConstBoundedQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 5, // Smaller for const queue + operations: 32, // Match array queue size for const queue }, Self { queue: shared_queue, - operations: 5, + operations: 32, }, ) } @@ -121,26 +112,17 @@ impl Payload for ConstBoundedQueuePayload { fn prepare(&mut self) { // Queue is already created and shared between workers } - fn process(&mut self) { let queue = &self.queue; // Enqueue phase for i in 0..self.operations { - let mut attempts = 0; - while queue.enqueue(i).is_err() && attempts < 20 { - std::thread::yield_now(); - attempts += 1; - } + let _ = queue.enqueue(i); } // Dequeue phase for _ in 0..self.operations { - let mut attempts = 0; - while queue.dequeue().is_none() && attempts < 20 { - std::thread::yield_now(); - attempts += 1; - } + black_box(queue.dequeue()); } } } @@ -157,11 +139,11 @@ impl Payload for MutexQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 10, + operations: 100, }, Self { queue: shared_queue, - operations: 10, + operations: 100, }, ) } @@ -197,11 +179,11 @@ impl Payload for LockfreeQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 10, + operations: 100, }, Self { queue: shared_queue, - operations: 10, + operations: 100, }, ) } @@ -237,11 +219,11 @@ impl Payload for CrossbeamSegQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 10, + operations: 100, }, Self { queue: shared_queue, - operations: 10, + operations: 100, }, ) } @@ -249,7 +231,6 @@ impl Payload for CrossbeamSegQueuePayload { fn prepare(&mut self) { // Queue is already created and shared between workers } - fn process(&mut self) { let queue = &self.queue; @@ -277,11 +258,11 @@ impl Payload for CrossbeamArrayQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 5, // Smaller for bounded array queue + operations: 32, // Match array queue size }, Self { queue: shared_queue, - operations: 5, + operations: 32, }, ) } @@ -295,20 +276,12 @@ impl Payload for CrossbeamArrayQueuePayload { // Enqueue phase for i in 0..self.operations { - let mut attempts = 0; - while queue.push(i).is_err() && attempts < 20 { - std::thread::yield_now(); - attempts += 1; - } + let _ = queue.push(i); } // Dequeue phase for _ in 0..self.operations { - let mut attempts = 0; - while queue.pop().is_none() && attempts < 20 { - std::thread::yield_now(); - attempts += 1; - } + black_box(queue.pop()); } } } From 26f62ed83ab2cfbbc732c2463fdb48376f6026a8 Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Sun, 29 Jun 2025 10:52:50 +0300 Subject: [PATCH 5/7] many_cpus_benchmarking = "0.1.18" --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e46d7a..650025e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,9 +428,9 @@ dependencies = [ [[package]] name = "many_cpus" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9449764c390da2db0a251a82c4834637dd51127c108e0016bcf4fc1baa2183ce" +checksum = "682c64e29156f32d36d61251ba80e62265a465927e06e64ba1e11be8b9a3470b" dependencies = [ "cpulist", "derive_more", @@ -449,9 +449,9 @@ dependencies = [ [[package]] name = "many_cpus_benchmarking" -version = "0.1.17" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fe6628db6a085039ce36b80decd2c2b270ebe36d9b64f00291afe5c0f5df407" +checksum = "397ad52b967a8908b34be27554e2c2cc8471af0c848e06878fc3fef0b00f820e" dependencies = [ "cpulist", "criterion", diff --git a/Cargo.toml b/Cargo.toml index 492eab5..35bd1ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ arbitrary = { version = "1.4.1", features = ["derive"] } criterion = "0.6.0" crossbeam-queue = "0.3.12" lockfree = "0.5.1" -many_cpus_benchmarking = "0.1" +many_cpus_benchmarking = "0.1.18" rand = "0.9.1" [[bench]] From 53dc4da06babc664401a3740e83bff900383a546 Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Sun, 29 Jun 2025 13:31:22 +0300 Subject: [PATCH 6/7] more const --- benches/syncqueue_many_cpus.rs | 44 +++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/benches/syncqueue_many_cpus.rs b/benches/syncqueue_many_cpus.rs index c9fadea..fb4097b 100644 --- a/benches/syncqueue_many_cpus.rs +++ b/benches/syncqueue_many_cpus.rs @@ -6,6 +6,9 @@ use std::collections::VecDeque; use std::hint::black_box; use std::sync::{Arc, Mutex}; +/// Number of operations each worker performs in the benchmark +const OPERATIONS_PER_WORKER: usize = 100; + /// Payload for AllocBoundedQueue benchmarking struct AllocBoundedQueuePayload { queue: Arc>, @@ -14,15 +17,16 @@ struct AllocBoundedQueuePayload { impl Payload for AllocBoundedQueuePayload { fn new_pair() -> (Self, Self) { - let shared_queue = Arc::new(AllocBoundedQueue::new(1024)); + // Increase queue size to accommodate more operations from multiple workers + let shared_queue = Arc::new(AllocBoundedQueue::new(2048)); ( Self { queue: shared_queue.clone(), - operations: 100, // Match original syncqueue.rs scale + operations: OPERATIONS_PER_WORKER, }, Self { queue: shared_queue, - operations: 100, + operations: OPERATIONS_PER_WORKER, }, ) } @@ -58,11 +62,11 @@ impl Payload for UnboundedQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 100, + operations: OPERATIONS_PER_WORKER, }, Self { queue: shared_queue, - operations: 100, + operations: OPERATIONS_PER_WORKER, }, ) } @@ -88,23 +92,24 @@ impl Payload for UnboundedQueuePayload { /// Payload for ConstBoundedQueue benchmarking struct ConstBoundedQueuePayload { - queue: Arc>, // Using 64 to match const_queue!(usize; 32) -> 64 size + queue: Arc>, // Updated to match const_queue!(usize; 256) -> 512 size operations: usize, } impl Payload for ConstBoundedQueuePayload { fn new_pair() -> (Self, Self) { #[allow(clippy::unused_unit)] - let shared_queue = Arc::new(const_queue!(usize; 32)); - + // Increase const queue size to accommodate more operations + let shared_queue = Arc::new(const_queue!(usize; 256)); + ( Self { queue: shared_queue.clone(), - operations: 32, // Match array queue size for const queue + operations: OPERATIONS_PER_WORKER, }, Self { queue: shared_queue, - operations: 32, + operations: OPERATIONS_PER_WORKER, }, ) } @@ -139,11 +144,11 @@ impl Payload for MutexQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 100, + operations: OPERATIONS_PER_WORKER, }, Self { queue: shared_queue, - operations: 100, + operations: OPERATIONS_PER_WORKER, }, ) } @@ -179,11 +184,11 @@ impl Payload for LockfreeQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 100, + operations: OPERATIONS_PER_WORKER, }, Self { queue: shared_queue, - operations: 100, + operations: OPERATIONS_PER_WORKER, }, ) } @@ -219,11 +224,11 @@ impl Payload for CrossbeamSegQueuePayload { ( Self { queue: shared_queue.clone(), - operations: 100, + operations: OPERATIONS_PER_WORKER, }, Self { queue: shared_queue, - operations: 100, + operations: OPERATIONS_PER_WORKER, }, ) } @@ -254,15 +259,16 @@ struct CrossbeamArrayQueuePayload { impl Payload for CrossbeamArrayQueuePayload { fn new_pair() -> (Self, Self) { - let shared_queue = Arc::new(ArrayQueue::new(32)); + // Increase array queue size to accommodate more operations + let shared_queue = Arc::new(ArrayQueue::new(256)); ( Self { queue: shared_queue.clone(), - operations: 32, // Match array queue size + operations: OPERATIONS_PER_WORKER, }, Self { queue: shared_queue, - operations: 32, + operations: OPERATIONS_PER_WORKER, }, ) } From 349160a0cc0040a41daa1756e4a6254ee0c25b25 Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Sun, 29 Jun 2025 17:12:32 +0300 Subject: [PATCH 7/7] Improve benchmark design --- benches/syncqueue_many_cpus.rs | 546 +++++++++++++++++++++++---------- 1 file changed, 378 insertions(+), 168 deletions(-) diff --git a/benches/syncqueue_many_cpus.rs b/benches/syncqueue_many_cpus.rs index fb4097b..2bda8f8 100644 --- a/benches/syncqueue_many_cpus.rs +++ b/benches/syncqueue_many_cpus.rs @@ -1,3 +1,60 @@ +//! Many-CPU benchmarks for queue implementations using the many_cpus_benchmarking framework. +//! +//! This benchmark suite provides a many-CPU variant of the original syncqueue.rs benchmarks, +//! designed to measure how queue performance is affected by memory locality and processor +//! distribution in multi-memory-region systems when used for their intended purpose: +//! **producer-consumer communication**. +//! +//! ## Design Philosophy +//! +//! Queues are fundamentally about inter-thread communication between producers and consumers. +//! This benchmark implements a true producer-consumer pattern where: +//! +//! 1. **Producer worker** - Enqueues items into the shared queue +//! 2. **Consumer worker** - Dequeues items from the shared queue +//! 3. **Shared queue instance** - The communication channel between the two workers +//! +//! This follows the "different actions" pattern from the many_cpus_benchmarking framework, +//! allowing us to measure how memory locality affects inter-thread queue communication +//! performance. **Importantly, we exclude "self" distribution modes** (PinnedSelf, UnpinnedSelf, +//! UnpinnedPerMemoryRegionSelf) because these would prevent true concurrent producer-consumer +//! operation, leading to deadlock scenarios where the consumer waits indefinitely for items +//! that the producer cannot produce due to lack of concurrent execution. +//! +//! ## Memory Locality Effects +//! +//! The benchmark measures several important scenarios: +//! - **Same memory region**: Producer and consumer on processors sharing the same memory +//! - **Different memory regions**: Producer and consumer on processors in different memory regions +//! - **Payload exchange modes**: How performance changes when the queue is allocated in the +//! producer's vs consumer's memory region +//! +//! ## Queue Configuration +//! +//! Queue sizes are set to handle the communication load while matching original benchmark intentions: +//! - AllocBoundedQueue: 2048 (sufficient buffer for producer-consumer communication) +//! - ConstBoundedQueue: 256 (larger than original to handle async producer-consumer pattern) +//! - ArrayQueue: 256 (sufficient buffer for communication) +//! - UnboundedQueue: 1024 segment size (matches original) +//! +//! ## Operation Pattern +//! +//! Uses realistic producer-consumer communication with sufficient operations to measure +//! memory locality effects clearly, with a moderate multiplier to amortize framework overhead. +//! +//! ## Key Insights This Benchmark Measures +//! +//! 1. **Queue allocation locality**: How performance changes when the shared queue is allocated +//! in the producer's vs consumer's memory region (via payload exchange modes) +//! 2. **Cross-memory-region communication**: Performance differences when producer and consumer +//! are in the same vs different memory regions +//! 3. **True queue contention**: Real producer-consumer contention patterns rather than artificial +//! single-threaded enqueue-all/dequeue-all sequences +//! 4. **Implementation comparison**: How different queue implementations handle the memory locality +//! challenges of multi-memory-region systems +//! 5. **Concurrent operation requirement**: Excludes "self" distribution modes that would prevent +//! simultaneous producer-consumer operation, focusing only on truly concurrent scenarios + use criterion::{Criterion, criterion_group, criterion_main}; use crossbeam_queue::ArrayQueue; use lfqueue::{AllocBoundedQueue, ConstBoundedQueue, UnboundedQueue, const_queue}; @@ -7,333 +64,486 @@ use std::hint::black_box; use std::sync::{Arc, Mutex}; /// Number of operations each worker performs in the benchmark -const OPERATIONS_PER_WORKER: usize = 100; +/// Producer sends this many items, consumer receives this many items +const OPERATIONS_PER_WORKER: usize = 5000; -/// Payload for AllocBoundedQueue benchmarking +/// Payload for AllocBoundedQueue benchmarking - Producer-Consumer pattern struct AllocBoundedQueuePayload { queue: Arc>, operations: usize, + is_producer: bool, } impl Payload for AllocBoundedQueuePayload { fn new_pair() -> (Self, Self) { - // Increase queue size to accommodate more operations from multiple workers + // Shared queue instance for producer-consumer communication let shared_queue = Arc::new(AllocBoundedQueue::new(2048)); - ( - Self { - queue: shared_queue.clone(), - operations: OPERATIONS_PER_WORKER, - }, - Self { - queue: shared_queue, - operations: OPERATIONS_PER_WORKER, - }, - ) + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) } fn prepare(&mut self) { - // Queue is already created and shared between workers + // Pre-populate the queue to establish steady state and avoid initial blocking + if self.is_producer { + for i in 0..1000 { + let _ = self.queue.enqueue(i); + } + } } + fn process(&mut self) { let queue = &self.queue; - // Pattern from original: enqueue all items, then dequeue all items - // Enqueue phase - for i in 0..self.operations { - let _ = queue.enqueue(i); - } - - // Dequeue phase - for _ in 0..self.operations { - black_box(queue.dequeue()); + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + // Keep trying until successful (bounded queue may be full) + while queue.enqueue(i).is_err() { + std::hint::spin_loop(); + } + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.dequeue() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } } } } -/// Payload for UnboundedQueue benchmarking (matches bench_lscq_queue) +/// Payload for UnboundedQueue benchmarking - Producer-Consumer pattern struct UnboundedQueuePayload { queue: Arc>, operations: usize, + is_producer: bool, } impl Payload for UnboundedQueuePayload { fn new_pair() -> (Self, Self) { + // Shared queue instance for producer-consumer communication let shared_queue = Arc::new(UnboundedQueue::with_segment_size(1024)); - ( - Self { - queue: shared_queue.clone(), - operations: OPERATIONS_PER_WORKER, - }, - Self { - queue: shared_queue, - operations: OPERATIONS_PER_WORKER, - }, - ) + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) } fn prepare(&mut self) { - // Queue is already created and shared between workers + // Pre-populate the queue to establish steady state + if self.is_producer { + let mut handle = self.queue.full_handle(); + for i in 0..1000 { + handle.enqueue(i); + } + } } fn process(&mut self) { let queue = &self.queue; - let mut full_handle = queue.full_handle(); - // Pattern from original run_benchmark_lscq: enqueue all, then dequeue all - for i in 0..self.operations { - full_handle.enqueue(i); - } - - for _ in 0..self.operations { - black_box(full_handle.dequeue()); + if self.is_producer { + // Producer: continuously enqueue items + let mut full_handle = queue.full_handle(); + for i in 0..self.operations { + full_handle.enqueue(i); + } + } else { + // Consumer: continuously dequeue items + let mut full_handle = queue.full_handle(); + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = full_handle.dequeue() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } } } } -/// Payload for ConstBoundedQueue benchmarking +/// Payload for ConstBoundedQueue benchmarking - Producer-Consumer pattern struct ConstBoundedQueuePayload { - queue: Arc>, // Updated to match const_queue!(usize; 256) -> 512 size + queue: Arc>, // Larger size for producer-consumer pattern operations: usize, + is_producer: bool, } impl Payload for ConstBoundedQueuePayload { fn new_pair() -> (Self, Self) { - #[allow(clippy::unused_unit)] - // Increase const queue size to accommodate more operations - let shared_queue = Arc::new(const_queue!(usize; 256)); + // Shared queue instance for producer-consumer communication + let shared_queue = Arc::new(const_queue!(usize; 256)); // Larger for async communication + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; - ( - Self { - queue: shared_queue.clone(), - operations: OPERATIONS_PER_WORKER, - }, - Self { - queue: shared_queue, - operations: OPERATIONS_PER_WORKER, - }, - ) + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) } fn prepare(&mut self) { - // Queue is already created and shared between workers + // Pre-populate the queue to establish steady state + if self.is_producer { + for i in 0..100 { + let _ = self.queue.enqueue(i); + } + } } + fn process(&mut self) { let queue = &self.queue; - // Enqueue phase - for i in 0..self.operations { - let _ = queue.enqueue(i); - } - - // Dequeue phase - for _ in 0..self.operations { - black_box(queue.dequeue()); + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + // Keep trying until successful (bounded queue may be full) + while queue.enqueue(i).is_err() { + std::hint::spin_loop(); + } + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.dequeue() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } } } } -/// Payload for Mutex benchmarking +/// Payload for Mutex benchmarking - Producer-Consumer pattern struct MutexQueuePayload { queue: Arc>>, operations: usize, + is_producer: bool, } impl Payload for MutexQueuePayload { fn new_pair() -> (Self, Self) { + // Shared mutex-protected queue for producer-consumer communication let shared_queue = Arc::new(Mutex::new(VecDeque::new())); - ( - Self { - queue: shared_queue.clone(), - operations: OPERATIONS_PER_WORKER, - }, - Self { - queue: shared_queue, - operations: OPERATIONS_PER_WORKER, - }, - ) + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) } fn prepare(&mut self) { - // Queue is already created and shared between workers + // Pre-populate the queue to establish steady state + if self.is_producer { + let mut queue = self.queue.lock().unwrap(); + for i in 0..1000 { + queue.push_back(i); + } + } } fn process(&mut self) { - let queue = &self.queue; - - // Enqueue phase - for i in 0..self.operations { - queue.lock().unwrap().push_back(i); - } - - // Dequeue phase - for _ in 0..self.operations { - black_box(queue.lock().unwrap().pop_front()); + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + self.queue.lock().unwrap().push_back(i); + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + let mut queue = self.queue.lock().unwrap(); + if let Some(value) = queue.pop_front() { + drop(queue); // Release lock before black_box + black_box(value); + break; + } + drop(queue); // Release lock before spin + std::hint::spin_loop(); + } + } } } } -/// Payload for lockfree::queue::Queue benchmarking +/// Payload for lockfree::queue::Queue benchmarking - Producer-Consumer pattern struct LockfreeQueuePayload { queue: Arc>, operations: usize, + is_producer: bool, } impl Payload for LockfreeQueuePayload { fn new_pair() -> (Self, Self) { + // Shared lockfree queue for producer-consumer communication let shared_queue = Arc::new(lockfree::queue::Queue::new()); - ( - Self { - queue: shared_queue.clone(), - operations: OPERATIONS_PER_WORKER, - }, - Self { - queue: shared_queue, - operations: OPERATIONS_PER_WORKER, - }, - ) + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) } fn prepare(&mut self) { - // Queue is already created and shared between workers + // Pre-populate the queue to establish steady state + if self.is_producer { + for i in 0..1000 { + self.queue.push(i); + } + } } fn process(&mut self) { let queue = &self.queue; - // Enqueue phase - for i in 0..self.operations { - queue.push(i); - } - - // Dequeue phase - for _ in 0..self.operations { - black_box(queue.pop()); + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + queue.push(i); + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.pop() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } } } } -/// Payload for crossbeam SegQueue benchmarking +/// Payload for crossbeam SegQueue benchmarking - Producer-Consumer pattern struct CrossbeamSegQueuePayload { queue: Arc>, operations: usize, + is_producer: bool, } impl Payload for CrossbeamSegQueuePayload { fn new_pair() -> (Self, Self) { + // Shared crossbeam SegQueue for producer-consumer communication let shared_queue = Arc::new(crossbeam_queue::SegQueue::new()); - ( - Self { - queue: shared_queue.clone(), - operations: OPERATIONS_PER_WORKER, - }, - Self { - queue: shared_queue, - operations: OPERATIONS_PER_WORKER, - }, - ) + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) } fn prepare(&mut self) { - // Queue is already created and shared between workers + // Pre-populate the queue to establish steady state + if self.is_producer { + for i in 0..1000 { + self.queue.push(i); + } + } } + fn process(&mut self) { let queue = &self.queue; - // Enqueue phase - for i in 0..self.operations { - queue.push(i); - } - - // Dequeue phase - for _ in 0..self.operations { - black_box(queue.pop()); + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + queue.push(i); + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.pop() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } } } } -/// Payload for crossbeam ArrayQueue benchmarking +/// Payload for crossbeam ArrayQueue benchmarking - Producer-Consumer pattern struct CrossbeamArrayQueuePayload { queue: Arc>, operations: usize, + is_producer: bool, } impl Payload for CrossbeamArrayQueuePayload { fn new_pair() -> (Self, Self) { - // Increase array queue size to accommodate more operations - let shared_queue = Arc::new(ArrayQueue::new(256)); - ( - Self { - queue: shared_queue.clone(), - operations: OPERATIONS_PER_WORKER, - }, - Self { - queue: shared_queue, - operations: OPERATIONS_PER_WORKER, - }, - ) + // Shared crossbeam ArrayQueue for producer-consumer communication + let shared_queue = Arc::new(ArrayQueue::new(256)); // Larger for async communication + + let producer = Self { + queue: shared_queue.clone(), + operations: OPERATIONS_PER_WORKER, + is_producer: true, + }; + + let consumer = Self { + queue: shared_queue, + operations: OPERATIONS_PER_WORKER, + is_producer: false, + }; + + (producer, consumer) } fn prepare(&mut self) { - // Queue is already created and shared between workers + // Pre-populate the queue to establish steady state + if self.is_producer { + for i in 0..100 { + let _ = self.queue.push(i); + } + } } fn process(&mut self) { let queue = &self.queue; - // Enqueue phase - for i in 0..self.operations { - let _ = queue.push(i); - } - - // Dequeue phase - for _ in 0..self.operations { - black_box(queue.pop()); + if self.is_producer { + // Producer: continuously enqueue items + for i in 0..self.operations { + // Keep trying until successful (bounded queue may be full) + while queue.push(i).is_err() { + std::hint::spin_loop(); + } + } + } else { + // Consumer: continuously dequeue items + for _ in 0..self.operations { + // Keep trying until successful (queue may be empty) + loop { + if let Some(value) = queue.pop() { + black_box(value); + break; + } + std::hint::spin_loop(); + } + } } } } -// Benchmark functions equivalent to the original syncqueue.rs +// Benchmark functions implementing producer-consumer patterns -/// Equivalent to bench_alloc_bounded_queue +/// Producer-Consumer benchmark for AllocBoundedQueue fn bench_alloc_bounded_queue_many_cpus(c: &mut Criterion) { - execute_runs::(c, WorkDistribution::all()); + // Different actions (producer vs consumer), but exclude "self" modes to avoid deadlock + // Self modes would prevent concurrent producer-consumer operation leading to deadlock + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); } -/// Equivalent to bench_lscq_queue (UnboundedQueue) +/// Producer-Consumer benchmark for UnboundedQueue fn bench_unbounded_queue_many_cpus(c: &mut Criterion) { - execute_runs::(c, WorkDistribution::all()); + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); } -/// Equivalent to bench_const_bounded_queue +/// Producer-Consumer benchmark for ConstBoundedQueue fn bench_const_bounded_queue_many_cpus(c: &mut Criterion) { - execute_runs::(c, WorkDistribution::all()); + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); } -/// Equivalent to bench_mutex_queue +/// Producer-Consumer benchmark for Mutex fn bench_mutex_queue_many_cpus(c: &mut Criterion) { - execute_runs::(c, WorkDistribution::all()); + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); } -/// Equivalent to bench_lockfree_queue +/// Producer-Consumer benchmark for lockfree::queue::Queue fn bench_lockfree_queue_many_cpus(c: &mut Criterion) { - execute_runs::(c, WorkDistribution::all()); + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); } -/// Equivalent to bench_crossbeam_seg_queue +/// Producer-Consumer benchmark for crossbeam SegQueue fn bench_crossbeam_seg_queue_many_cpus(c: &mut Criterion) { - execute_runs::(c, WorkDistribution::all()); + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); } -/// Equivalent to bench_crossbeam_array_queue +/// Producer-Consumer benchmark for crossbeam ArrayQueue fn bench_crossbeam_array_queue_many_cpus(c: &mut Criterion) { - execute_runs::(c, WorkDistribution::all()); + execute_runs::(c, WorkDistribution::all_with_unique_processors_without_self()); } criterion_group!( syncqueue_many_cpus_benchmarks, - // Main queue benchmarks (equivalent to the active ones in original) + // Producer-Consumer benchmarks for all queue implementations bench_alloc_bounded_queue_many_cpus, - // All the commented-out benchmarks from original (now uncommented) bench_const_bounded_queue_many_cpus, bench_unbounded_queue_many_cpus, bench_mutex_queue_many_cpus,