From fbc586e7ff7f561027d414472cd5915f8869c790 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 6 May 2026 11:05:32 +0100 Subject: [PATCH 1/2] [VQueues] Introducing inbox caching and read in batches Replaces the single-head cache with a sorted 24-entry per-queue cache. Refills run via tokio::task::spawn_blocking when data isn't in the block cache; in-flight notify_enqueued / notify_removed events are buffered as an overlay (Add / Tombstone) and merged on completion. On overlay overflow we set a horizon (exclusive upper bound) instead of back-eviction: a popped tombstone could otherwise re-admit a deleted row. Storage rows at or above the horizon are dropped from the merge and rediscovered on the next refill. Drops the tailing iterator and its workarounds. Splits VQueueCursor into inbox (returns CursorError; WouldBlock under non-blocking opts) and VQueueRunningCursor (sync). --- Cargo.lock | 2 + .../src/tests/vqueue_table_test/mod.rs | 665 +-------- .../src/vqueue_table/reader.rs | 6 +- .../src/vqueue_table/running_reader.rs | 46 +- .../src/vqueue_table/waiting_reader.rs | 86 +- crates/rocksdb/src/background.rs | 4 +- crates/storage-api/src/vqueue_table/store.rs | 46 +- crates/vqueues/Cargo.toml | 2 + crates/vqueues/src/scheduler/drr.rs | 6 +- crates/vqueues/src/scheduler/eligible.rs | 26 +- crates/vqueues/src/scheduler/queue.rs | 1327 +++++++++-------- crates/vqueues/src/scheduler/queue_test.rs | 403 +++++ crates/vqueues/src/scheduler/vqueue_state.rs | 33 +- 13 files changed, 1329 insertions(+), 1323 deletions(-) create mode 100644 crates/vqueues/src/scheduler/queue_test.rs diff --git a/Cargo.lock b/Cargo.lock index 4a084bee5f..07668d8e11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8619,9 +8619,11 @@ dependencies = [ "restate-limiter", "restate-memory", "restate-partition-store", + "restate-platform", "restate-rocksdb", "restate-serde-util", "restate-storage-api", + "restate-time-util", "restate-types", "restate-util-string", "restate-worker-api", diff --git a/crates/partition-store/src/tests/vqueue_table_test/mod.rs b/crates/partition-store/src/tests/vqueue_table_test/mod.rs index be7d060657..1830213153 100644 --- a/crates/partition-store/src/tests/vqueue_table_test/mod.rs +++ b/crates/partition-store/src/tests/vqueue_table_test/mod.rs @@ -20,14 +20,16 @@ //! Invariants: //! - The waiting cursor must not cross the boundary to adjacent vqueue ids. //! - The waiting cursor must not cross partition-key boundaries either. -//! - When the waiting cursor is "seeked" to to first after a higher-order item has been inserted -//! (i.e. item with has_lock=true, or with older run_at), the cursor must show this added item. +//! - The waiting cursor uses snapshot semantics: it captures a consistent +//! view of storage at creation time. Writes/deletes that happen after the +//! cursor is created are NOT visible to that cursor — callers must create +//! a fresh cursor to observe them. use restate_clock::time::MillisSinceEpoch; use restate_storage_api::Transaction; use restate_storage_api::vqueue_table::{ - EntryKey, EntryMetadata, EntryValue, Stage, Status, VQueueCursor, VQueueStore, - WriteVQueueTable, stats::EntryStatistics, + EntryKey, EntryMetadata, EntryValue, Options, Stage, Status, VQueueCursor, VQueueRunningCursor, + VQueueStore, WriteVQueueTable, stats::EntryStatistics, }; use restate_types::clock::UniqueTimestamp; use restate_types::identifiers::PartitionKey; @@ -90,7 +92,16 @@ fn default_entry(id: u8) -> (EntryKey, EntryValue) { entry(id, false, 0, id as u64) } -/// Collects all items from a cursor into a Vec +fn inbox_reader(db: &crate::PartitionDb, qid: &VQueueId) -> impl VQueueCursor + use<> { + db.new_inbox_reader( + qid, + Options { + allow_blocking_io: true, + }, + ) +} + +/// Collects all items from an inbox cursor. fn collect_cursor(cursor: &mut C) -> Vec<(EntryKey, EntryValue)> { let mut items = Vec::new(); cursor.seek_to_first(); @@ -101,6 +112,17 @@ fn collect_cursor(cursor: &mut C) -> Vec<(EntryKey, EntryValue) items } +/// Collects all items from a running cursor (different error type than inbox). +fn collect_running(cursor: &mut C) -> Vec<(EntryKey, EntryValue)> { + let mut items = Vec::new(); + cursor.seek_to_first(); + while let Ok(Some(item)) = cursor.peek() { + items.push(item); + cursor.advance(); + } + items +} + fn collect_ids(items: &[(EntryKey, EntryValue)]) -> Vec { items.iter().map(|(key, _)| *key.entry_id()).collect() } @@ -125,7 +147,7 @@ async fn key_ordering_by_has_lock_run_at_seq(txn: &mut W) { fn verify_key_ordering_by_has_lock_run_at_seq(db: &crate::PartitionDb) { let qid = test_qid(); - let mut reader = db.new_inbox_reader(&qid); + let mut reader = inbox_reader(db, &qid); let items = collect_cursor(&mut reader); assert_eq!(items.len(), 5, "Expected 5 items in inbox"); @@ -165,7 +187,7 @@ async fn ordering_within_same_lock_domain(txn: &mut W) { fn verify_ordering_within_same_lock_domain(db: &crate::PartitionDb) { let qid = VQueueId::custom(2000, "1"); - let mut reader = db.new_inbox_reader(&qid); + let mut reader = inbox_reader(db, &qid); let items = collect_cursor(&mut reader); assert_eq!(items.len(), 4, "Expected 4 items"); @@ -199,12 +221,12 @@ fn verify_running_and_inbox_are_separate(db: &crate::PartitionDb) { // Running reader should only see the Run stage entry let mut run_reader = db.new_run_reader(&qid); - let run_items = collect_cursor(&mut run_reader); + let run_items = collect_running(&mut run_reader); assert_eq!(run_items.len(), 1, "Running reader should see 1 item"); assert_eq!(*run_items[0].0.entry_id(), entry_id(10)); // Inbox reader should only see the Inbox stage entries - let mut inbox_reader = db.new_inbox_reader(&qid); + let mut inbox_reader = inbox_reader(db, &qid); let inbox_items = collect_cursor(&mut inbox_reader); assert_eq!(inbox_items.len(), 2, "Inbox reader should see 2 items"); assert_eq!(collect_ids(&inbox_items), vec![entry_id(20), entry_id(21)]); @@ -226,10 +248,10 @@ fn verify_seek_after_works(db: &crate::PartitionDb) { let entries: Vec<_> = (1..=5).map(default_entry).collect(); - let mut reader = db.new_inbox_reader(&qid); + let mut reader = inbox_reader(db, &qid); // Seek after the 3rd entry (id=3) - reader.seek_after(&qid, &entries[2].0); + reader.seek_after(&entries[2].0); let item = reader.peek().unwrap(); assert!(item.is_some(), "Should have items after seek_after"); // Next item should be entry 4 (strictly after entry 3) @@ -251,7 +273,7 @@ fn verify_empty_queue_returns_none(db: &crate::PartitionDb) { "Empty running queue should return None" ); - let mut inbox_reader = db.new_inbox_reader(&qid); + let mut inbox_reader = inbox_reader(db, &qid); inbox_reader.seek_to_first(); assert!( inbox_reader.peek().unwrap().is_none(), @@ -283,17 +305,17 @@ fn verify_vqueue_isolation(db: &crate::PartitionDb) { let qid3 = VQueueId::custom(pkey, "3"); // Each queue should only see its own entry - let mut reader1 = db.new_inbox_reader(&qid1); + let mut reader1 = inbox_reader(db, &qid1); let items1 = collect_cursor(&mut reader1); assert_eq!(items1.len(), 1); assert_eq!(*items1[0].0.entry_id(), entry_id(1)); - let mut reader2 = db.new_inbox_reader(&qid2); + let mut reader2 = inbox_reader(db, &qid2); let items2 = collect_cursor(&mut reader2); assert_eq!(items2.len(), 1); assert_eq!(*items2[0].0.entry_id(), entry_id(2)); - let mut reader3 = db.new_inbox_reader(&qid3); + let mut reader3 = inbox_reader(db, &qid3); let items3 = collect_cursor(&mut reader3); assert_eq!(items3.len(), 1); assert_eq!(*items3[0].0.entry_id(), entry_id(3)); @@ -320,7 +342,7 @@ fn verify_waiting_cursor_boundary_is_respected(db: &crate::PartitionDb) { let qid_b = VQueueId::custom(pkey, "b"); let a2 = entry(12, false, 10, 2); - let mut reader_a = db.new_inbox_reader(&qid_a); + let mut reader_a = inbox_reader(db, &qid_a); reader_a.seek_to_first(); assert_eq!( @@ -339,13 +361,13 @@ fn verify_waiting_cursor_boundary_is_respected(db: &crate::PartitionDb) { "Reader for qid_a must stop before qid_b entries" ); - reader_a.seek_after(&qid_a, &a2.0); + reader_a.seek_after(&a2.0); assert!( reader_a.peek().unwrap().is_none(), "seek_after(last_item) must not cross into the next vqueue" ); - let mut reader_b = db.new_inbox_reader(&qid_b); + let mut reader_b = inbox_reader(db, &qid_b); let items_b = collect_cursor(&mut reader_b); assert_eq!(collect_ids(&items_b), vec![entry_id(21)]); } @@ -376,7 +398,7 @@ fn verify_waiting_cursor_partition_prefix_boundary_is_respected(db: &crate::Part let qid_next_partition = VQueueId::custom(PartitionKey::from(5_201u64), "shared-boundary"); let target2 = entry(42, false, 10, 2); - let mut reader_target = db.new_inbox_reader(&qid_target_partition); + let mut reader_target = inbox_reader(db, &qid_target_partition); reader_target.seek_to_first(); assert_eq!( @@ -403,33 +425,33 @@ fn verify_waiting_cursor_partition_prefix_boundary_is_respected(db: &crate::Part "Reader for target qid must stop before adjacent partition keys" ); - reader_target.seek_after(&qid_target_partition, &target2.0); + reader_target.seek_after(&target2.0); assert!( reader_target.peek().unwrap().is_none(), "seek_after(last_item) must not cross into adjacent partition-key prefixes" ); - let mut reader_prev = db.new_inbox_reader(&qid_prev_partition); + let mut reader_prev = inbox_reader(db, &qid_prev_partition); assert_eq!( collect_ids(&collect_cursor(&mut reader_prev)), vec![entry_id(31)] ); - let mut reader_next = db.new_inbox_reader(&qid_next_partition); + let mut reader_next = inbox_reader(db, &qid_next_partition); assert_eq!( collect_ids(&collect_cursor(&mut reader_next)), vec![entry_id(51)] ); } -/// Test: Tailing iterator sees newly enqueued items after seek_to_first. +/// Test: a freshly created reader sees the current state of storage, +/// including writes that landed after a previous reader was created. /// -/// This verifies that the inbox reader (which uses a tailing iterator) can see -/// items that were added after the reader was created, when re-seeking. -async fn tailing_iterator_sees_new_items_on_reseek(rocksdb: &mut PartitionStore) { +/// This is the snapshot-semantics counterpart to the previous tailing-iterator +/// tests: callers must construct a new reader to observe post-creation writes. +async fn fresh_reader_sees_current_state(rocksdb: &mut PartitionStore) { let qid = VQueueId::custom(6000, "1"); - // Insert initial entries let entry1 = default_entry(1); let entry2 = default_entry(2); { @@ -439,586 +461,60 @@ async fn tailing_iterator_sees_new_items_on_reseek(rocksdb: &mut PartitionStore) txn.commit().await.expect("commit should succeed"); } - // Create reader and verify initial state - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - let item = reader.peek().unwrap(); - assert_eq!(item.as_ref().map(|e| *e.0.entry_id()), Some(entry_id(1))); - reader.advance(); - - let item = reader.peek().unwrap(); - assert_eq!(item.as_ref().map(|e| *e.0.entry_id()), Some(entry_id(2))); - reader.advance(); - - // Should be empty now - assert!(reader.peek().unwrap().is_none()); - - // Now add a new entry while the reader is still open - let entry3 = default_entry(3); + // Create the original reader and drain it. { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.commit().await.expect("commit should succeed"); - } - - // Re-seek to first - should now see all 3 entries - reader.seek_to_first(); - let items = { - let mut items = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - items.push(item); - reader.advance(); - } - items - }; - - assert_eq!(items.len(), 3, "Should see all 3 items after reseek"); - assert_eq!( - collect_ids(&items), - vec![entry_id(1), entry_id(2), entry_id(3)] - ); -} - -/// Test: Re-seek to first sees newly inserted higher-order items. -/// -/// This covers both invariants mentioned in the module docs: -/// - new item with older `run_at` appears first after re-seek -/// - new item with `has_lock=true` appears first after re-seek -async fn reseek_shows_new_higher_order_items(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(6_500, "1"); - - let base1 = entry(1, false, 100, 1); - let base2 = entry(2, false, 200, 2); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &base1.0, &base1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &base2.0, &base2.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - - // Add an item with older run_at (higher order among unlocked entries) - let older_run_at = entry(3, false, 50, 3); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &older_run_at.0, &older_run_at.1); - txn.commit().await.expect("commit should succeed"); - } - - reader.seek_to_first(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - // Add an item that has a lock (always higher order than unlocked entries) - let locked = entry(4, true, 300, 4); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &locked.0, &locked.1); - txn.commit().await.expect("commit should succeed"); - } - - reader.seek_to_first(); - let items = { - let mut items = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - items.push(item); - reader.advance(); - } - items - }; - - assert_eq!( - collect_ids(&items), - vec![entry_id(4), entry_id(3), entry_id(1), entry_id(2)] - ); -} - -/// Test: Tailing iterator sees newly enqueued items via seek_after. -/// -/// This verifies that when using seek_after to resume iteration, newly added -/// items that sort after the seek position are visible. -async fn tailing_iterator_sees_new_items_on_seek_after(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(7000, "1"); - - // Insert initial entries with different key prefixes - let entry_first = entry(1, true, 10, 1); - let entry_second = entry(2, false, 10, 2); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_first.0, &entry_first.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_second.0, &entry_second.1); - txn.commit().await.expect("commit should succeed"); - } - - // Create reader and read the first item - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - let first = reader.peek().unwrap().unwrap(); - assert_eq!(*first.0.entry_id(), entry_id(1)); - - // Now add a new entry that sorts after the seek position - let entry_third = entry(3, false, 10, 3); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_third.0, &entry_third.1); - txn.commit().await.expect("commit should succeed"); - } - - // Seek after the first item - should see entry_second and entry_third - reader.seek_after(&qid, &first.0); - - let mut remaining = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - remaining.push(item); - reader.advance(); - } - - assert_eq!(remaining.len(), 2, "Should see 2 items after seek_after"); - assert_eq!(collect_ids(&remaining), vec![entry_id(2), entry_id(3)]); -} - -/// Test: Tailing iterator sees items inserted ahead of current position without re-seek. -/// -/// Scenario (1): after the cursor has advanced at least once, if a new item is inserted -/// at a key greater than the current position, it should become visible by continuing to -/// call `advance()`/`peek()` without any seek. -async fn tailing_iterator_sees_inserted_ahead_without_reseek(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(7_200, "1"); - - let entry1 = entry(1, false, 10, 1); - let entry3 = entry(3, false, 30, 3); - let entry5 = entry(5, false, 50, 5); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry5.0, &entry5.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - reader.advance(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - let entry4 = entry(4, false, 40, 4); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry4.0, &entry4.1); - txn.commit().await.expect("commit should succeed"); - } - - // Continue from current position without seek. - reader.advance(); - let maybe_inserted = reader.peek().unwrap().as_ref().map(|e| *e.0.entry_id()); - assert_eq!( - maybe_inserted, - Some(entry_id(4)), - "Inserted item ahead of current position should be visible without re-seek" - ); - reader.advance(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(5)); -} - -/// Test: Tailing iterator sees flushed insertions while mid-iteration without re-seek. -/// -/// Scenario: the cursor advances a couple of times and still has items to read. -/// New entries are inserted ahead of the current position, but before the next -/// existing item (splicing), and memtables are flushed. -/// Continuing with `advance()`/`peek()` (without seek) should surface the flushed items. -async fn tailing_iterator_sees_flushed_insertions_mid_iteration(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(7_250, "1"); - - let entry1 = entry(1, false, 10, 1); - let entry3 = entry(3, false, 30, 3); - let entry6 = entry(6, false, 60, 6); - let entry9 = entry(9, false, 90, 9); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry6.0, &entry6.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry9.0, &entry9.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - reader.advance(); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - // Splice entries between current position (3) and next existing item (6). - let entry4 = entry(4, false, 40, 4); - let entry5 = entry(5, false, 50, 5); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry4.0, &entry4.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry5.0, &entry5.1); - txn.commit().await.expect("commit should succeed"); + let db = rocksdb.partition_db(); + let mut reader = inbox_reader(db, &qid); + let items = collect_cursor(&mut reader); + assert_eq!(collect_ids(&items), vec![entry_id(1), entry_id(2)]); } - rocksdb - .partition_db() - .flush_memtables(true) - .await - .expect("flush memtables should succeed"); - - // Continue from current position without seek. - reader.advance(); - let mut remaining_ids = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - remaining_ids.push(*item.0.entry_id()); - reader.advance(); - } - - assert_eq!( - remaining_ids, - vec![entry_id(4), entry_id(5), entry_id(6), entry_id(9)], - "Spliced flushed items should be visible and not skipped without re-seek" - ); -} - -/// Test: Seeked tailing iterator sees spliced insertions after a pre-insert flush. -/// -/// Scenario: perform one seek to position the cursor on the item immediately -/// before the splice point, flush memtables, then insert a new item in the middle. -/// Advancing from that seeked position should return the new spliced item first. -async fn seeked_tailing_iterator_sees_spliced_insertions_after_preflush( - rocksdb: &mut PartitionStore, -) { - let qid = VQueueId::custom(7_255, "1"); - - let entry1 = entry(1, false, 10, 1); - let entry3 = entry(3, false, 30, 3); - let entry6 = entry(6, false, 60, 6); - let entry9 = entry(9, false, 90, 9); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry6.0, &entry6.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry9.0, &entry9.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - - rocksdb - .partition_db() - .flush_memtables(true) - .await - .expect("flush memtables should succeed"); - - // Single seek before inserting new items: land on entry3. - reader.seek_after(&qid, &entry1.0); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - // Splice one entry between current position (3) and next existing item (6). - let entry4 = entry(4, false, 40, 4); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry4.0, &entry4.1); - txn.commit().await.expect("commit should succeed"); - } - - // Continue from the seeked position without another seek. - reader.advance(); - - // This is the proof that we need to re-seek because the iterator will be blind - // to the newly added item if the seek happened after the mutable memtable flush. - assert_ne!( - *reader.peek().unwrap().unwrap().0.entry_id(), - entry_id(4), - "advance from seeked predecessor should surface the spliced item first" - ); - - // Re-seeking should make it visible again. - reader.seek_after(&qid, &entry1.0); - reader.advance(); - assert_eq!( - *reader.peek().unwrap().unwrap().0.entry_id(), - entry_id(4), - "advance from seeked predecessor should surface the spliced item first" - ); - - reader.advance(); - let mut tail_ids = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - tail_ids.push(*item.0.entry_id()); - reader.advance(); - } - - assert_eq!( - tail_ids, - vec![entry_id(6), entry_id(9)], - "Remaining tail after the spliced item should keep original order" - ); -} - -/// Test: Seeked tailing iterator sees appended insertions after a pre-insert flush. -/// -/// Scenario: perform one seek, flush memtables, then insert new items that are strictly -/// after the existing tail. Continuing with `advance()`/`peek()` from that seeked -/// position should eventually surface the appended items. -async fn seeked_tailing_iterator_sees_appended_insertions_after_preflush( - rocksdb: &mut PartitionStore, -) { - let qid = VQueueId::custom(7_256, "1"); - - let entry1 = entry(1, false, 10, 1); - let entry3 = entry(3, false, 30, 3); - let entry6 = entry(6, false, 60, 6); - let entry9 = entry(9, false, 90, 9); + // Append a new item. + let entry3 = default_entry(3); { let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry6.0, &entry6.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry9.0, &entry9.1); txn.commit().await.expect("commit should succeed"); } + // A fresh reader sees all three items. let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - - // Single seek before appending new tail items: land on entry3. - reader.seek_after(&qid, &entry1.0); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(3)); - - rocksdb - .partition_db() - .flush_memtables(true) - .await - .expect("flush memtables should succeed"); - - let entry10 = entry(10, false, 100, 10); - let entry11 = entry(11, false, 110, 11); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry10.0, &entry10.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry11.0, &entry11.1); - txn.commit().await.expect("commit should succeed"); - } - - // Continue from the seeked position without another seek. - reader.advance(); - let mut remaining_ids = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - remaining_ids.push(*item.0.entry_id()); - reader.advance(); - } - + let mut reader = inbox_reader(db, &qid); + let items = collect_cursor(&mut reader); assert_eq!( - remaining_ids, - vec![entry_id(6), entry_id(9), entry_id(10), entry_id(11)], - "Appended flushed items should be visible after traversing existing tail" - ); -} - -/// Test: Tailing iterator sees items inserted after reaching the end without re-seek. -/// -/// Scenario (2): after the cursor reaches end-of-iteration, if a new item is inserted, -/// continuing with `advance()`/`peek()` (without seek) should surface the new item. -async fn tailing_iterator_sees_item_added_after_end_without_reseek(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(7_300, "1"); - - let entry1 = entry(1, false, 10, 1); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.commit().await.expect("commit should succeed"); - } - - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - reader.advance(); - assert!(reader.peek().unwrap().is_none(), "Reader should be at end"); - - let entry2 = entry(2, false, 20, 2); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry2.0, &entry2.1); - txn.commit().await.expect("commit should succeed"); - } - - // Stay in the same iterator path, no re-seek. - reader.advance(); - assert!( - reader.peek().unwrap().is_none(), - "Inserted item after end is not visible without re-seek" + collect_ids(&items), + vec![entry_id(1), entry_id(2), entry_id(3)] ); - - // Re-seeking makes the newly inserted item visible. - reader.seek_after(&qid, &entry1.0); - assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(2)); } -/// Test: Deleted items don't appear after reseek. -/// -/// This verifies that when an item is deleted while the reader is open, -/// re-seeking will not return the deleted item. -async fn deleted_items_not_visible_after_reseek(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(8000, "1"); +/// Test: an existing reader holds a snapshot — writes after creation are not +/// observable, even after `seek_to_first` or `seek_after`. +async fn existing_reader_does_not_see_post_snapshot_writes(rocksdb: &mut PartitionStore) { + let qid = VQueueId::custom(6_100, "1"); - // Insert initial entries let entry1 = default_entry(1); let entry2 = default_entry(2); - let entry3 = default_entry(3); { let mut txn = rocksdb.transaction(); txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry2.0, &entry2.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); txn.commit().await.expect("commit should succeed"); } - // Create reader and verify we see all 3 let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - let items = collect_cursor(&mut reader); - assert_eq!(items.len(), 3); - - // Delete the middle entry while the reader is still open - { - let mut txn = rocksdb.transaction(); - assert!( - txn.get_vqueue_inbox(&qid, Stage::Inbox, &entry2.0) - .unwrap() - .is_some() - ); - txn.delete_vqueue_inbox(&qid, Stage::Inbox, &entry2.0); - txn.commit().await.expect("commit should succeed"); - } - - // Re-seek and verify we only see entries 1 and 3 + let mut reader = inbox_reader(db, &qid); reader.seek_to_first(); - let items = { - let mut items = Vec::new(); - while let Ok(Some(item)) = reader.peek() { - items.push(item); - reader.advance(); - } - items - }; - - assert_eq!(items.len(), 2, "Should only see 2 items after deletion"); - assert_eq!(collect_ids(&items), vec![entry_id(1), entry_id(3)]); -} - -/// Test: Deleted items don't appear after seek_after. -/// -/// This verifies that deleted items are not returned when using seek_after -/// to resume iteration past a certain point. -async fn deleted_items_not_visible_after_seek_after(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(8500, "1"); + assert_eq!(*reader.peek().unwrap().unwrap().0.entry_id(), entry_id(1)); - // Insert entries - let entry1 = default_entry(1); - let entry2 = default_entry(2); + // Insert a new item after the reader has taken its snapshot. let entry3 = default_entry(3); { let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry1.0, &entry1.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry2.0, &entry2.1); txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry3.0, &entry3.1); txn.commit().await.expect("commit should succeed"); } - // Create reader - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - // Read first item - let first = reader.peek().unwrap().unwrap(); - assert_eq!(*first.0.entry_id(), entry_id(1)); - - // Delete entries 2 and 3 while reader is open - { - let mut txn = rocksdb.transaction(); - assert!( - txn.get_vqueue_inbox(&qid, Stage::Inbox, &entry2.0) - .unwrap() - .is_some() - ); - txn.delete_vqueue_inbox(&qid, Stage::Inbox, &entry2.0); - assert!( - txn.get_vqueue_inbox(&qid, Stage::Inbox, &entry3.0) - .unwrap() - .is_some() - ); - txn.delete_vqueue_inbox(&qid, Stage::Inbox, &entry3.0); - txn.commit().await.expect("commit should succeed"); - } - - // Seek after first - should see nothing since 2 and 3 are deleted - reader.seek_after(&qid, &first.0); - assert!( - reader.peek().unwrap().is_none(), - "Should see no items after seek_after when remaining items are deleted" - ); -} - -/// Test: Concurrent enqueue and delete operations are handled correctly. -/// -/// This tests a more complex scenario where items are both added and removed -/// while the reader is open. -async fn concurrent_enqueue_and_delete(rocksdb: &mut PartitionStore) { - let qid = VQueueId::custom(9000, "1"); - - // Insert initial entries in deterministic key order - let entry_high = entry(10, true, 10, 10); - let entry_mid = entry(20, false, 20, 20); - let entry_low = entry(30, false, 30, 30); - { - let mut txn = rocksdb.transaction(); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_high.0, &entry_high.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_mid.0, &entry_mid.1); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_low.0, &entry_low.1); - txn.commit().await.expect("commit should succeed"); - } - - // Create reader and read first item - let db = rocksdb.partition_db(); - let mut reader = db.new_inbox_reader(&qid); - reader.seek_to_first(); - - let first = reader.peek().unwrap().unwrap(); - assert_eq!(*first.0.entry_id(), entry_id(10)); - - // Simultaneously: delete entry_mid, add a new entry that sorts first - let entry_new_first = entry(5, true, 5, 5); - { - let mut txn = rocksdb.transaction(); - assert!( - txn.get_vqueue_inbox(&qid, Stage::Inbox, &entry_mid.0) - .unwrap() - .is_some() - ); - txn.delete_vqueue_inbox(&qid, Stage::Inbox, &entry_mid.0); - txn.put_vqueue_inbox(&qid, Stage::Inbox, &entry_new_first.0, &entry_new_first.1); - txn.commit().await.expect("commit should succeed"); - } - - // Re-seek from start - should see: id=5, id=10, id=30 - // (id=20 is deleted, id=5 is added) + // Re-seeking the same reader still only shows the snapshot's two items. reader.seek_to_first(); let items = { let mut items = Vec::new(); @@ -1028,12 +524,7 @@ async fn concurrent_enqueue_and_delete(rocksdb: &mut PartitionStore) { } items }; - - assert_eq!(items.len(), 3, "Should see 3 items"); - assert_eq!( - collect_ids(&items), - vec![entry_id(5), entry_id(10), entry_id(30)] - ); + assert_eq!(collect_ids(&items), vec![entry_id(1), entry_id(2)]); } pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { @@ -1062,16 +553,8 @@ pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { verify_waiting_cursor_boundary_is_respected(db); verify_waiting_cursor_partition_prefix_boundary_is_respected(db); - // Tailing iterator tests - these need mutable access to rocksdb for writes - tailing_iterator_sees_new_items_on_reseek(&mut rocksdb).await; - reseek_shows_new_higher_order_items(&mut rocksdb).await; - tailing_iterator_sees_new_items_on_seek_after(&mut rocksdb).await; - tailing_iterator_sees_inserted_ahead_without_reseek(&mut rocksdb).await; - tailing_iterator_sees_flushed_insertions_mid_iteration(&mut rocksdb).await; - seeked_tailing_iterator_sees_spliced_insertions_after_preflush(&mut rocksdb).await; - seeked_tailing_iterator_sees_appended_insertions_after_preflush(&mut rocksdb).await; - tailing_iterator_sees_item_added_after_end_without_reseek(&mut rocksdb).await; - deleted_items_not_visible_after_reseek(&mut rocksdb).await; - deleted_items_not_visible_after_seek_after(&mut rocksdb).await; - concurrent_enqueue_and_delete(&mut rocksdb).await; + // Snapshot-iterator tests — exercise the contract that a fresh reader + // sees current storage and that an existing reader holds a fixed view. + fresh_reader_sees_current_state(&mut rocksdb).await; + existing_reader_does_not_see_post_snapshot_writes(&mut rocksdb).await; } diff --git a/crates/partition-store/src/vqueue_table/reader.rs b/crates/partition-store/src/vqueue_table/reader.rs index 99a1870f0a..ab8bfface0 100644 --- a/crates/partition-store/src/vqueue_table/reader.rs +++ b/crates/partition-store/src/vqueue_table/reader.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::vqueue_table::VQueueStore; +use restate_storage_api::vqueue_table::{Options, VQueueStore}; use restate_types::vqueues::VQueueId; use crate::PartitionDb; @@ -24,7 +24,7 @@ impl VQueueStore for PartitionDb { VQueueRunningReader::new(self, qid) } - fn new_inbox_reader(&self, qid: &VQueueId) -> Self::InboxReader { - VQueueWaitingReader::new(self, qid) + fn new_inbox_reader(&self, qid: &VQueueId, opts: Options) -> Self::InboxReader { + VQueueWaitingReader::new(self, qid, opts) } } diff --git a/crates/partition-store/src/vqueue_table/running_reader.rs b/crates/partition-store/src/vqueue_table/running_reader.rs index cc21bfba67..f8d44aa5a5 100644 --- a/crates/partition-store/src/vqueue_table/running_reader.rs +++ b/crates/partition-store/src/vqueue_table/running_reader.rs @@ -13,7 +13,7 @@ use bilrost::OwnedMessage; use rocksdb::DBRawIteratorWithThreadMode; use restate_storage_api::StorageError; -use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueCursor}; +use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueRunningCursor}; use restate_types::vqueues::VQueueId; use crate::PartitionDb; @@ -60,57 +60,15 @@ impl VQueueRunningReader { } } -impl VQueueCursor for VQueueRunningReader { +impl VQueueRunningCursor for VQueueRunningReader { fn seek_to_first(&mut self) { self.it.seek_to_first(); } - fn seek_after(&mut self, _qid: &VQueueId, _key: &EntryKey) { - panic!("seek_after is not supported for running snapshot reader"); - } - fn advance(&mut self) { self.it.next(); } - /// Returns the current key under cursor - fn current_key(&mut self) -> Result, StorageError> { - if let Some(key) = self.it.key() { - debug_assert_eq!(key.len(), RunningKey::serialized_length_fixed()); - - // The portion we are interested in is everything that represents the EntryKey - let entry_key = - ::decode(&mut &key[RunningKey::offset_of_entry_key()..])?; - Ok(Some(entry_key)) - } else { - // we reached the end (or an error). We cannot recover from this without seek. - // todo: add support for iterator refresh(). - self.it - .status() - .context("peek into vqueue snapshot iterator") - .map_err(StorageError::Generic)?; - // iterator is beyond the end, we can't peek - Ok(None) - } - } - - /// Returns the current value under cursor - fn current_value(&mut self) -> Result, StorageError> { - if let Some(mut value) = self.it.value() { - let value = EntryValue::decode(&mut value)?; - Ok(Some(value)) - } else { - // we reached the end (or an error). We cannot recover from this without seek. - // todo: add support for iterator refresh(). - self.it - .status() - .context("peek into vqueue snapshot iterator") - .map_err(StorageError::Generic)?; - // iterator is beyond the end, we can't peek - Ok(None) - } - } - fn peek(&mut self) -> Result, StorageError> { if let Some((key, mut value)) = self.it.item() { debug_assert_eq!(key.len(), RunningKey::serialized_length_fixed()); diff --git a/crates/partition-store/src/vqueue_table/waiting_reader.rs b/crates/partition-store/src/vqueue_table/waiting_reader.rs index 48a91b4a35..0f81d9f268 100644 --- a/crates/partition-store/src/vqueue_table/waiting_reader.rs +++ b/crates/partition-store/src/vqueue_table/waiting_reader.rs @@ -8,12 +8,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use anyhow::Context; +use std::sync::Arc; + use bilrost::OwnedMessage; use rocksdb::DBRawIteratorWithThreadMode; +use restate_rocksdb::RocksDb; use restate_storage_api::StorageError; -use restate_storage_api::vqueue_table::{EntryKey, EntryValue, Stage, VQueueCursor}; +use restate_storage_api::vqueue_table::{ + CursorError, EntryKey, EntryValue, Options, Stage, VQueueCursor, +}; use restate_types::vqueues::VQueueId; use crate::PartitionDb; @@ -22,20 +26,34 @@ use crate::vqueue_table::InboxKey; use crate::vqueue_table::inbox::InboxKeyRef; pub struct VQueueWaitingReader { + qid: VQueueId, it: DBRawIteratorWithThreadMode<'static, rocksdb::DB>, + // Safety: This must be dropped last since the iterator references memory allocated by it. + // This is only set to Some if the iterator is configured to run with blocking IO. The + // assumption is that blocking iterators will run in background threads, so we need to pin + // the database until the iterator is dropped. + _db: Option>, } impl VQueueWaitingReader { - pub(crate) fn new(db: &PartitionDb, qid: &VQueueId) -> Self { + pub(crate) fn new(db: &PartitionDb, qid: &VQueueId, opts: Options) -> Self { let mut readopts = rocksdb::ReadOptions::default(); readopts.set_async_io(true); // this is not the place to be concerned about corruption, we favor speed // over safety for this particular use-case. readopts.set_verify_checksums(false); - readopts.set_tailing(true); + // We re-create this reader on every refill, so a fresh snapshot is what + // we want. A tailing iterator would see new writes but is unsafe across + // memtable flushes. // use prefix extractors for efficient filtering. readopts.set_prefix_same_as_start(true); + if opts.allow_blocking_io { + readopts.set_read_tier(rocksdb::ReadTier::All); + } else { + readopts.set_read_tier(rocksdb::ReadTier::BlockCache); + } + // we know how big the prefix is let mut key_buf = [0u8; InboxKey::by_qid_prefix_len()]; InboxKeyRef::builder() @@ -54,10 +72,14 @@ impl VQueueWaitingReader { .raw_iterator_cf_opt(db.cf_handle(), readopts); Self { + qid: qid.clone(), // Safety: // The iterator is guaranteed to be dropped before the database is dropped, we hold to the // PartitionDb in the scheduler which pins the database and the column family. + // + // We also pin the database if blocking IO is configured. it: unsafe { super::ignore_iterator_lifetime(it) }, + _db: opts.allow_blocking_io.then(|| db.rocksdb().clone()), } } } @@ -67,9 +89,8 @@ impl VQueueCursor for VQueueWaitingReader { self.it.seek_to_first(); } - fn seek_after(&mut self, qid: &VQueueId, key: &EntryKey) { - tracing::trace!("Seeking after {key:?}"); - let mut key_buf = super::inbox::encode_stage_key(Stage::Inbox, qid, key); + fn seek_after(&mut self, key: &EntryKey) { + let mut key_buf = super::inbox::encode_stage_key(Stage::Inbox, &self.qid, key); let success = crate::convert_to_upper_bound(&mut key_buf); debug_assert!(success); self.it.seek(key_buf); @@ -79,57 +100,20 @@ impl VQueueCursor for VQueueWaitingReader { self.it.next(); } - /// Returns the current key under cursor - fn current_key(&mut self) -> Result, StorageError> { - if let Some(key) = self.it.key() { - debug_assert_eq!(key.len(), InboxKey::serialized_length_fixed()); - - // The portion we are interested in is everything that represents the EntryKey - let entry_key = - ::decode(&mut &key[InboxKey::offset_of_entry_key()..])?; - Ok(Some(entry_key)) - } else { - // we reached the end (or an error). We cannot recover from this without seek. - // todo: add support for iterator refresh(). - self.it - .status() - .context("peek into vqueue snapshot iterator") - .map_err(StorageError::Generic)?; - // iterator is beyond the end, we can't peek - Ok(None) - } - } - /// Returns the current value under cursor - fn current_value(&mut self) -> Result, StorageError> { - if let Some(mut value) = self.it.value() { - let value = EntryValue::decode(&mut value)?; - Ok(Some(value)) - } else { - // we reached the end (or an error). We cannot recover from this without seek. - // todo: add support for iterator refresh(). - self.it - .status() - .context("peek into vqueue snapshot iterator") - .map_err(StorageError::Generic)?; - // iterator is beyond the end, we can't peek - Ok(None) - } - } - - fn peek(&mut self) -> Result, StorageError> { + fn peek(&mut self) -> Result, CursorError> { if let Some((key, mut value)) = self.it.item() { debug_assert_eq!(key.len(), InboxKey::serialized_length_fixed()); let entry_key = ::decode(&mut &key[InboxKey::offset_of_entry_key()..])?; - let value = EntryValue::decode(&mut value)?; + + let value = EntryValue::decode(&mut value).map_err(StorageError::BilrostDecode)?; Ok(Some((entry_key, value))) } else { - // We reached the end (or an error). We cannot recover from this without seek. - self.it - .status() - .context("peek into vqueue iterator") - .map_err(StorageError::Generic)?; + self.it.status().map_err(|err| match err.kind() { + rocksdb::ErrorKind::Incomplete => CursorError::WouldBlock, + _ => CursorError::Other(StorageError::Generic(err.into())), + })?; // iterator is beyond the end, we can't peek Ok(None) } diff --git a/crates/rocksdb/src/background.rs b/crates/rocksdb/src/background.rs index 5252543a07..6584d70fa0 100644 --- a/crates/rocksdb/src/background.rs +++ b/crates/rocksdb/src/background.rs @@ -64,7 +64,7 @@ where ) .increment(1); - let span = tracing::Span::current().clone(); + let span = tracing::Span::current(); move || span.in_scope(|| self.run()) } @@ -75,7 +75,7 @@ where OP_TYPE => self.kind.as_static_str(), ) .increment(1); - let span = tracing::Span::current().clone(); + let span = tracing::Span::current(); move || { span.in_scope(|| { diff --git a/crates/storage-api/src/vqueue_table/store.rs b/crates/storage-api/src/vqueue_table/store.rs index d882e6103f..eca01a30dd 100644 --- a/crates/storage-api/src/vqueue_table/store.rs +++ b/crates/storage-api/src/vqueue_table/store.rs @@ -10,30 +10,54 @@ use restate_types::vqueues::VQueueId; +use crate::StorageError; + use super::{EntryKey, EntryValue}; -use crate::Result; + +pub struct Options { + /// Allows blocking IO when we need to read from the storage + /// operations that block should be performed on IO threads. + /// + /// When set to false, seek and read operations may return `WouldBlock` if + /// the operation cannot be performed without blocking. + pub allow_blocking_io: bool, +} /// Storage for vqueue heads (e.g., RocksDB `ready_idx`). pub trait VQueueStore { - type RunningReader: VQueueCursor + Send + Unpin; - type InboxReader: VQueueCursor + Send + Unpin; + type RunningReader: VQueueRunningCursor + Send + Unpin; + type InboxReader: VQueueCursor + Send + Unpin + 'static; fn new_run_reader(&self, qid: &VQueueId) -> Self::RunningReader; - fn new_inbox_reader(&self, qid: &VQueueId) -> Self::InboxReader; + fn new_inbox_reader(&self, qid: &VQueueId, opts: Options) -> Self::InboxReader; } -/// Iterator over vqueue entries +/// Iterator over "waiting inbox" vqueue entries pub trait VQueueCursor { /// Moves the cursor to the beginning (min key) of the vqueue fn seek_to_first(&mut self); /// Moves the cursor to point strictly after `item` - fn seek_after(&mut self, qid: &VQueueId, item: &EntryKey); - /// Returns the current key under cursor - fn current_key(&mut self) -> Result>; - /// Returns the current value under cursor - fn current_value(&mut self) -> Result>; + fn seek_after(&mut self, item: &EntryKey); + /// Advancing the cursor. + fn advance(&mut self); /// Peek item without advancing the cursor - fn peek(&mut self) -> Result>; + fn peek(&mut self) -> Result, CursorError>; +} + +/// Iterator over already running vqueue entries +pub trait VQueueRunningCursor { + /// Moves the cursor to the beginning (min key) of the vqueue + fn seek_to_first(&mut self); + /// Peek item without advancing the cursor + fn peek(&mut self) -> Result, StorageError>; /// Advancing the cursor. If this fails, the error is returned on the next call to peek() fn advance(&mut self); } + +#[derive(Debug, thiserror::Error)] +pub enum CursorError { + #[error("operation cannot be completed without block this thread")] + WouldBlock, + #[error(transparent)] + Other(#[from] StorageError), +} diff --git a/crates/vqueues/Cargo.toml b/crates/vqueues/Cargo.toml index 6ea4b67b02..7394eb4291 100644 --- a/crates/vqueues/Cargo.toml +++ b/crates/vqueues/Cargo.toml @@ -18,9 +18,11 @@ restate-clock = { workspace = true } restate-futures-util = { workspace = true } restate-limiter = { workspace = true } restate-memory = { workspace = true } +restate-platform = { workspace = true } restate-rocksdb = { workspace = true } restate-serde-util = { workspace = true } restate-storage-api = { workspace = true } +restate-time-util = { workspace = true } restate-types = { workspace = true } restate-util-string = { workspace = true } restate-worker-api = { workspace = true } diff --git a/crates/vqueues/src/scheduler/drr.rs b/crates/vqueues/src/scheduler/drr.rs index 2d7d50ba4c..c846a0f048 100644 --- a/crates/vqueues/src/scheduler/drr.rs +++ b/crates/vqueues/src/scheduler/drr.rs @@ -68,7 +68,6 @@ pub struct DRRScheduler { } impl DRRScheduler { - #[allow(clippy::too_many_arguments)] pub fn new( limit_qid_per_poll: NonZeroU16, max_items_per_decision: NonZeroU16, @@ -92,6 +91,7 @@ impl DRRScheduler { qid.clone(), handle, VQueueMetaLite::new(meta), + &storage, meta.num_running(), ) }); @@ -156,13 +156,13 @@ impl DRRScheduler { break; } - let Some(handle) = this.eligible.next_eligible(this.storage, this.q)? else { + let Some(handle) = this.eligible.next_eligible(cx, this.storage, this.q)? else { break; }; let qstate = this.q.get_mut(handle).unwrap(); - match qstate.try_pop(cx, this.storage, this.resource_manager)? { + match qstate.try_pop(cx, this.resource_manager)? { Pop::NeedsCredit => { this.eligible.rotate_one(); } diff --git a/crates/vqueues/src/scheduler/eligible.rs b/crates/vqueues/src/scheduler/eligible.rs index dc4d281212..2b3e2ad947 100644 --- a/crates/vqueues/src/scheduler/eligible.rs +++ b/crates/vqueues/src/scheduler/eligible.rs @@ -110,10 +110,13 @@ impl EligibilityTracker { pub fn next_eligible( &mut self, + cx: &mut std::task::Context<'_>, storage: &S, vqueues: &mut SlotMap>, ) -> Result, StorageError> { - loop { + let n = self.ready_ring.len(); + // avoid rescanning the ready ring multiple rounds + for _ in 0..n { // what is my current status let Some(handle) = self.ready_ring.front().copied() else { return Ok(None); @@ -137,12 +140,12 @@ impl EligibilityTracker { match current_state { State::NeedsPoll => { // update the state based on eligibility. - match qstate.poll_eligibility(storage)?.as_compact() { - Eligibility::Eligible => { + match qstate.poll_eligibility(cx, storage) { + Poll::Ready(Ok(Eligibility::Eligible)) => { *current_state = State::Ready; return Ok(Some(handle)); } - Eligibility::EligibleAt(ts) => { + Poll::Ready(Ok(Eligibility::EligibleAt(ts))) => { let ts = ts.as_unix_millis(); let duration = ts.duration_since(SchedulerClock.now_millis()); let timer_key = self.delayed_eligibility.insert(handle, duration); @@ -152,24 +155,29 @@ impl EligibilityTracker { self.ready_ring.pop_front(); continue; } - Eligibility::NotEligible => { + Poll::Ready(Ok(Eligibility::NotEligible)) => { self.ready_ring.pop_front(); self.remove(handle); continue; } + Poll::Ready(Err(err)) => { + return Err(err); + } + Poll::Pending => { + continue; + } } } - State::BlockedOn(_) => { + State::BlockedOn(_) | State::Scheduled { .. } => { self.ready_ring.pop_front(); } State::Ready => { return Ok(Some(handle)); } - State::Scheduled { .. } => { - self.ready_ring.pop_front(); - } } } + + Ok(None) } pub fn remove(&mut self, handle: VQueueHandle) { diff --git a/crates/vqueues/src/scheduler/queue.rs b/crates/vqueues/src/scheduler/queue.rs index f381f3f5ac..53c60184ed 100644 --- a/crates/vqueues/src/scheduler/queue.rs +++ b/crates/vqueues/src/scheduler/queue.rs @@ -8,20 +8,197 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::StorageError; -use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueCursor, VQueueStore}; +use std::collections::VecDeque; +use std::num::NonZeroU32; +use std::pin::Pin; +use std::task::{Poll, ready}; + +use itertools::{EitherOrBoth, Itertools as _}; +use tokio::time::Instant; +use tracing::trace; + +use restate_storage_api::vqueue_table::CursorError; +use restate_storage_api::vqueue_table::{ + EntryKey, EntryValue, VQueueCursor, VQueueRunningCursor, VQueueStore, +}; +use restate_storage_api::{StorageError, vqueue_table}; +use restate_time_util::DurationExt; use restate_types::vqueues::VQueueId; use super::UnconfirmedAssignments; -#[derive(Debug)] -enum Head { - /// We need a seek+read to know the head. - Unknown, - /// The current cursor's head - Known { key: EntryKey, value: EntryValue }, - /// We know that we've reached the end of the vqueue - Empty, +/// The number of entries we are willing to keep in cache +const INBOX_CACHE_CAPACITY: usize = 24; + +enum Overlay { + Add(EntryValue), + Tombstone, +} + +/// In-flight async refill, plus an overlay of `notify_enqueued` / +/// `notify_removed` events that arrived while the task was running. +/// +/// # Storage-write-before-notify invariant +/// +/// The partition processor commits inbox writes/deletes to RocksDB *before* +/// dispatching `EnqueuedToInbox` / `RemovedFromInbox` events to the +/// scheduler. This ordering rule underpins the following overlay reasoning: +/// +/// 1. **Why tombstones are needed at all.** The refill task's RocksDB +/// snapshot is fixed at task-start time. A `notify_removed` that arrives +/// while the task is in flight may correspond to a delete that committed +/// *before* the task started its scan (snapshot still sees the row) or +/// *after* (snapshot doesn't). We can't tell from the notification +/// alone, so we record a tombstone and let `merge_join_by` suppress the +/// row if it shows up in the result. +/// +/// 2. **Why `push_added_item` panics on a key it already tombstoned.** A +/// later add of a key we previously tombstoned would imply the +/// notifications arrived out-of-order with respect to storage commits, +/// which the invariant rules out. +/// +/// 3. **Why cancelling an in-flight task discards the overlay safely** (see +/// [`RefillState::update_anchor`]). When cancellation kicks in, every +/// pending notification has already had its storage write committed. +/// The next fresh refill takes a new snapshot and sees post-commit +/// state directly, so the overlay would be redundant. +struct RefillTask { + started_at: Instant, + refill_anchor: Option, + /// Items that are known to be added or removed while the refill was in-flight + /// Sorted ascending by EntryKey. + overlay: VecDeque<(EntryKey, Overlay)>, + handle: tokio::task::JoinHandle, StorageError>>, + /// Exclusive upper bound on the overlay's coverage. `None` until the + /// first at-capacity event; once set, only shrinks. + /// + /// On overflow we set this (exclusive upper bound) instead of + /// evicting from the overlay. Overlay events and merge-time storage rows + /// with `key >= horizon` are then dropped; the next refill rediscovers + /// them via `seek_after(cache.back)` (always below `horizon`). Avoids + /// the silent-drop hazard of back-eviction — a popped tombstone could + /// re-admit a deleted row — at the cost of re-fetching some + /// already-read storage rows. + horizon: Option, +} +impl RefillTask { + /// Capacity/horizon handling shared by the `push_*` methods. Caller resolves + /// duplicates first; `pos` is insertion position. + /// + /// Returns `Some(pos)` to insert there, or `None` if the event was dropped. + fn prepare_overlay_insert(&mut self, key: EntryKey, pos: usize) -> Option { + if self.horizon.is_some_and(|h| key >= h) { + return None; + } + if self.overlay.len() < INBOX_CACHE_CAPACITY { + return Some(pos); + } + // At capacity: shrink the horizon to seal the affected range. + if pos == self.overlay.len() { + // New key sorts past every overlay entry — it becomes the + // horizon itself and is not tracked. + self.horizon = Some(self.horizon.map_or(key, |h| h.min(key))); + None + } else { + // New key displaces the back; the back's key becomes the + // horizon (anything at-or-above it is now uncertain). + let back_key = self + .overlay + .back() + .expect("overlay at capacity must have a back") + .0; + self.horizon = Some(self.horizon.map_or(back_key, |h| h.min(back_key))); + self.overlay.pop_back(); + Some(pos) + } + } + + fn push_tombstone(&mut self, key_to_remove: &EntryKey) { + let pos = match self + .overlay + .binary_search_by_key(key_to_remove, |&(k, _)| k) + { + Ok(pos) => { + // Existing overlay entry → upgrade to Tombstone. + self.overlay.get_mut(pos).unwrap().1 = Overlay::Tombstone; + return; + } + Err(pos) => pos, + }; + if let Some(pos) = self.prepare_overlay_insert(*key_to_remove, pos) { + self.overlay + .insert(pos, (*key_to_remove, Overlay::Tombstone)); + } + } + + fn push_added_item(&mut self, key_to_add: &EntryKey, value: &EntryValue) { + let pos = match self.overlay.binary_search_by_key(key_to_add, |&(k, _)| k) { + Ok(_) => panic!( + "Something went wrong here. We should not see duplicate enqueues or an enqueue after a removal of the same key: {key_to_add:?}" + ), + Err(pos) => pos, + }; + if let Some(pos) = self.prepare_overlay_insert(*key_to_add, pos) { + self.overlay + .insert(pos, (*key_to_add, Overlay::Add(value.clone()))); + } + } +} + +#[derive(derive_more::Debug)] +enum RefillState { + #[debug("standby")] + Standby { + /// `seek_after` anchor for the next refill, and the upper bound used to + /// decide whether to accept a `notify_enqueued`. Items currently in the + /// cache, items previously consumed from the cache, and items in the + /// caller's skip set together account for every inbox key + /// `<= refill_anchor`. Keys `> refill_anchor` are undiscovered and will + /// appear on the next refill via `seek_after(refill_anchor)`. + refill_anchor: Option, + }, + #[debug("in-flight (age: {})", _0.started_at.elapsed().friendly())] + InFlight(Box), +} + +impl Default for RefillState { + fn default() -> Self { + Self::Standby { + refill_anchor: None, + } + } +} + +impl RefillState { + /// Updates the standby anchor, or cancels an in-flight task and resets + /// to standby with the given anchor. + /// + /// Dropping an in-flight `RefillTask` here also drops its overlay + /// (pending `Add` / `Tombstone` records). This is safe per the + /// storage-write-before-notify invariant documented on [`RefillTask`]: + /// every notification we have observed (and thus every overlay entry) + /// has its storage write already committed, so the next fresh refill's + /// snapshot reflects them directly without needing the overlay. + fn update_anchor(&mut self, new_anchor: Option) { + match self { + RefillState::Standby { refill_anchor } => { + *refill_anchor = new_anchor; + } + RefillState::InFlight(_) => { + trace!("Refill task was in-flight but no longer needed, cancelling it"); + // Drop the task; the receiver going away signals the worker + // thread to bail. See `RefillTask` doc for why discarding + // the overlay is safe here. + *self = RefillState::Standby { + refill_anchor: new_anchor, + } + } + } + } + + fn is_in_flight(&self) -> bool { + matches!(self, Self::InFlight(_)) + } } #[derive(Debug)] @@ -38,683 +215,639 @@ pub enum QueueItem<'a> { } #[derive(derive_more::Debug)] -pub(crate) enum Reader { - /// Reader was never opened and might need to scan running items - New { already_running: u32 }, +enum Stage { + /// Brand-new queue; running items still need to be drained first. + New { + already_running: NonZeroU32, + reader: Option, + }, + /// In running stage. Single-item head, single-shot reader. #[debug("Running")] Running { - remaining: u32, + head: (EntryKey, EntryValue), reader: S::RunningReader, + remaining: u32, }, - #[debug("Inbox")] - Inbox(S::InboxReader), - // We can transition back to Reader::Inbox if new items have been added to the inbox - // but we should never return to `Running`. - #[debug("Closed")] - Closed, + /// In inbox stage. The queue's `inbox_cache` is the source of truth + /// between refills. + Inbox, + /// Inbox is fully drained. + Empty, } pub(crate) struct Queue { - head: Head, - reader: Reader, + stage: Stage, + /// Backing cache for the inbox stage. + /// Meaningful only when `stage` is `Inbox` or `Empty`; for + /// other stages it must be empty (invariant maintained by `advance` / + /// `remove` / `enqueue`). + /// + /// Sorted ascending by EntryKey. Front = current head. + items: VecDeque<(EntryKey, EntryValue)>, + refill_state: RefillState, } impl Queue { /// Creates a new queue that must first go through the given number of running items /// before it switches to reading the waiting inbox. - pub fn new(num_running: u32) -> Self { + pub fn new(num_running: u32, storage: &S, qid: &VQueueId) -> Self { + let stage = if num_running > 0 { + Stage::New { + already_running: NonZeroU32::new(num_running).unwrap(), + reader: Some(storage.new_run_reader(qid)), + } + } else { + Stage::Inbox + }; + Self { - head: Head::Unknown, - reader: Reader::New { - already_running: num_running, - }, + stage, + items: VecDeque::with_capacity(INBOX_CACHE_CAPACITY), + refill_state: Default::default(), } } /// Creates an empty queue pub fn new_closed() -> Self { Self { - head: Head::Empty, - reader: Reader::Closed, + stage: Stage::Empty, + items: VecDeque::with_capacity(INBOX_CACHE_CAPACITY), + refill_state: Default::default(), } } /// If the queue is known to be empty (no more items to dequeue) pub fn is_empty(&self) -> bool { - matches!(self.head, Head::Empty) + matches!(self.stage, Stage::Empty) } + /// Returns `true` iff the removed key was the head of the queue. + /// + /// While the queue is still in the running stage, this is a no-op: the + /// scheduler may still yield a running item after the state machine has + /// declared it removed, and the state machine must ignore that yield. pub fn remove(&mut self, key_to_remove: &EntryKey) -> bool { - // Can this be the known head? - // Yes. Perhaps it expired/ended externally. - // We do not do anything if the reader is still at the running stage, - // - // This means that the scheduler might still yield the "running" item after - // the state machine has declared it as completed/removed. The state machine - // must be able to handle this case and "ignore" the yield command of this item. - if matches!(self.reader, Reader::Closed | Reader::Inbox(..)) - && let Head::Known { ref key, .. } = self.head - && key == key_to_remove - { - self.head = Head::Unknown; - // Ensure that next advance would re-seek to the newly added item - self.reader = Reader::Closed; - true - } else { - false + if matches!( + self.stage, + Stage::New { .. } | Stage::Running { .. } | Stage::Empty + ) { + return false; } + + // We are in Inbox/Waiting stage. + let Ok(pos) = self.items.binary_search_by_key(key_to_remove, |&(k, _)| k) else { + // This branch handles when we don't have the item in cache. + // + // The removed item can be: + // - Cached + // - Unconfirmed (not in cache) || < refill anchor + // - > refill anchor + // - We have in-flight refill + // - No in-inflight refills + // + // Scenarios: + // - We have empty cache (or does it matter?), maybe what matters is if we have it in cache or not. + // - We have waiting inbox (inbox > 0) + // - We have waiting inbox but with unconfirmed items in flights, we even out at zero. + // which means that we should not see any removals outside what we have already + // decided to start. We are effectively (empty). + // - No inflight refill yet. Ignore. + // - In flight refill exists. --> Maybe tombstone it. + // - We have cached, no in-flight refill. + // - Remove if in cache, otherwise, ignore. + // - We have cached items, removal is < the refill anchor, we don't have it in cache. Ignore. + // - We have cached items, but the removal is > the refill anchor --> Maybe tombstone it. + // + // Removed item is beyond the refill anchor + match self.refill_state { + RefillState::Standby { .. } => {} + RefillState::InFlight(ref mut task) + if task + .refill_anchor + .is_none_or(|ref refill| key_to_remove > refill) => + { + // The refill task is in flight. We cannot determine if the key will impact + // its result or not, we'll keep at most INBOX_CACHE_CAPACITY tombstones to + // use as overlay when the refill is complete. + task.push_tombstone(key_to_remove); + } + RefillState::InFlight(_) => { + // Ignore it. This item has either been shipped (unconfirmed assignment) + // or was never in the inbox to start with. + } + } + + return false; + }; + self.items.remove(pos); + // The anchor stays put — even if we removed the back, the entry was + // also removed from storage, so `seek_after(anchor)` will skip it + // naturally on the next refill. + pos == 0 } - /// Returns true if the head was changed + /// Returns `true` iff the new item became the head of the queue. pub fn enqueue(&mut self, key: &EntryKey, value: &EntryValue) -> bool { - match (&self.head, &self.reader) { - // we are only unknown if we are new and didn't read the running list yet, - // we might also be in a limbo state if advance() failed. - (_, Reader::New { .. } | Reader::Running { .. }) => { /* do nothing */ } - (Head::Unknown, _) => { /* do nothing */ } - (Head::Empty, _) => { - self.reader = Reader::Closed; - self.head = Head::Known { - key: *key, - value: value.clone(), - }; + match self.stage { + Stage::New { .. } | Stage::Running { .. } => return false, + Stage::Empty => { + // The cache is already allocated and empty (kept around + // across the previous `Inbox -> Empty` transition). Just + // re-seed it and flip the marker. + assert!(self.items.is_empty()); + assert!(!self.refill_state.is_in_flight()); + self.items.push_back((*key, value.clone())); + self.refill_state.update_anchor(Some(*key)); + self.stage = Stage::Inbox; return true; } - ( - Head::Known { - key: current_key, .. - }, - Reader::Inbox(_) | Reader::Closed, - ) => { - if key < current_key { - self.head = Head::Known { - key: *key, - value: value.clone(), - }; - // Ensure that next advance would re-seek to the newly added item - self.reader = Reader::Closed; - return true; - } else { - // This is a temporary fix to ensure that we perform a re-seek - // to fix the issue where the iterator wouldn't see the newly added - // items if the memtable was flushed prior the seek. - self.reader = Reader::Closed; - } + Stage::Inbox => { /* fall-through */ } + } + + // Insert `(key, value)` into the sorted cache. + // + // Returns `true` iff the item became the new front (head). Returns + // `false` if the item was rejected because its key falls strictly above + // the cache's coverage zone (`> refill_anchor`); in that case the item + // will be discovered on the next refill. + + // Priority-queue rule: ignore items strictly above our coverage zone. + // The bound is `refill_anchor`, NOT `cache.back`. After consuming the + // back of the cache, `cache.back` can be lower than `refill_anchor`, + // and items in `(cache.back, refill_anchor]` would be lost (the next + // refill seek_after(refill_anchor) only returns keys strictly greater + // than the anchor). + match self.refill_state { + RefillState::Standby { refill_anchor, .. } + if refill_anchor.is_none_or(|ref refill| key > refill) => + { + // We cannot accept items beyond our coverage zone. + return false; + } + RefillState::Standby { .. } => { /* in coverage zone */ } + // This enqueue may or may not appear in the in-flight operation and + // there is no way to determine that. So, we stage it on the in-flight + // task overlay until we receive the result. + RefillState::InFlight(ref mut task) + if task.refill_anchor.is_none_or(|ref refill| key > refill) => + { + task.push_added_item(key, value); + return false; + } + RefillState::InFlight(ref task) => { + // the new item is < than what the refill task is interested in. Therefore, + // we add it right now. + assert!(task.refill_anchor.is_some_and(|ref refill| key <= refill)); } } - false - } - /// Returns the head if known, or None if the queue needs advancing - pub fn head(&self) -> Option> { - match (&self.head, &self.reader) { - (Head::Unknown, _) => None, - (_, Reader::New { .. }) => None, - (Head::Known { key, value }, Reader::Running { .. }) => { - Some(QueueItem::Running { key, value }) + let pos = match self.items.binary_search_by_key(key, |&(k, _)| k) { + Ok(_) => { + // We already know about this key which means that the head has not changed + // as a result of this enqueue. + return false; } - (Head::Known { key, value }, Reader::Inbox(_) | Reader::Closed) => { - Some(QueueItem::Inbox { key, value }) + Err(pos) => pos, + }; + + // We need to be careful about moving the refill anchor if there is an + // in-flight refill task. + // + // let's say we have a refill task in flight, and we are enqueueing smaller + // than it's anchor point, we are confident that the task will never return + // this item. If as a result of the enqueue we exceeded the cache capacity. + // + // If there was an in-flight refill, we must cancel it and reset our refill + // anchor to point to the back of the cache so that the next refill can + // re-discover it. This is an acceptable trade-off because we must be here + // because we have sufficiently populated the cache with recently enqueued + // items and we wouldn't need the refill immediately anyway. + + // At-cap fast path: avoid the VecDeque grow/shrink that would happen + // if we inserted first and evicted afterwards. + if self.items.len() >= INBOX_CACHE_CAPACITY { + if pos == self.items.len() { + // The new key would land at the back and be evicted on the + // very next step. Skip the insert, but still lower the anchor + // to the current back so the next refill can re-discover `key` + // via `seek_after(anchor)`. + // + // Cancels the in-flight task if exists + self.refill_state + .update_anchor(self.items.back().map(|(k, _)| *k)); + return false; } - (Head::Empty, _) => Some(QueueItem::None), + // Make room first; `pos` is unaffected because pos < old_len in + // this branch (we shift elements right of `pos` regardless). + debug_assert!(pos < self.items.len()); + self.items.pop_back(); + self.items.insert(pos, (*key, value.clone())); + + // Cancels the in-flight task if exists + self.refill_state + .update_anchor(self.items.back().map(|(k, _)| *k)); + return pos == 0; } + + // Normal path: cache below capacity. + self.items.insert(pos, (*key, value.clone())); + // Anchor maintenance: if it was None (very first insert), set it to + // this key. Otherwise the precondition above guarantees + // `key <= anchor`, so the anchor stays put. + if let RefillState::Standby { refill_anchor } = &mut self.refill_state + && refill_anchor.is_none() + { + *refill_anchor = Some(*key); + } + pos == 0 } - pub fn advance_if_needed( + /// Returns the head if known, or `None` if the queue needs advancing. + pub fn head(&self) -> Option> { + match &self.stage { + Stage::New { .. } => None, + Stage::Running { head: (k, v), .. } => Some(QueueItem::Running { key: k, value: v }), + Stage::Inbox => self + .items + .front() + .map(|(k, v)| QueueItem::Inbox { key: k, value: v }), + Stage::Empty => Some(QueueItem::None), + } + } + + pub fn poll_advance_if_needed( &mut self, + cx: &mut std::task::Context<'_>, storage: &S, skip: &UnconfirmedAssignments, qid: &VQueueId, - ) -> Result, StorageError> { - // Keep advancing until the head is known - while matches!(self.head, Head::Unknown) { - self.advance(storage, skip, qid)?; - } - - match (&self.head, &self.reader) { - (Head::Unknown, _) => unreachable!("head must be known"), - (_, Reader::New { .. }) => unreachable!("reader cannot be new after poll"), - (Head::Known { key, value }, Reader::Running { .. }) => { - Ok(QueueItem::Running { key, value }) + effectively_empty: bool, + allow_blocking_io: bool, + ) -> Poll, StorageError>> { + loop { + let needs_advance = match self.stage { + Stage::New { .. } => true, + Stage::Inbox => self.items.is_empty(), + _ => false, + }; + if !needs_advance { + break; } - (Head::Known { key, value }, Reader::Inbox(_) | Reader::Closed) => { - Ok(QueueItem::Inbox { key, value }) + if !self.try_advance()? { + // We cannot advance without a refill + if self.refill_state.is_in_flight() { + ready!(self.poll_refill_task(cx, skip)); + // If we ended up also being empty and we can't determine if we are + // empty or not, we should start another refill task and return Pending. + // This happens automatically because in that case we'll "continue" + } else { + // Do we need to refill? + // We don't need to refill if we are at Inbox stage and we know no more + // inbox entries are available. + if effectively_empty && matches!(self.stage, Stage::Inbox) { + self.stage = Stage::Empty; + break; + } + // A) try refill immediate refill if allowed + match self.try_refill(storage, qid, skip, allow_blocking_io) { + Ok(_) => {} + Err(CursorError::WouldBlock) => { + // B) start an async refill task + self.start_refill_task(storage, qid); + } + Err(CursorError::Other(e)) => { + // C) fail miserably + return Poll::Ready(Err(e)); + } + } + } } - (Head::Empty, _) => Ok(QueueItem::None), } + + Poll::Ready(Ok(match &self.stage { + Stage::New { .. } => unreachable!("head must be resolved after advance_if_needed"), + Stage::Running { head: (k, v), .. } => QueueItem::Running { key: k, value: v }, + Stage::Inbox => { + let (k, v) = self + .items + .front() + .expect("inbox cache must have a head after advance_if_needed"); + QueueItem::Inbox { key: k, value: v } + } + Stage::Empty => QueueItem::None, + })) } /// Advances the queue to the next item. /// - /// The queue reader will skip over items in `skip` when reading the inbox stage. When reading - /// the running stage, the `skip` set is ignored. - pub fn advance( - &mut self, - storage: &S, - skip: &UnconfirmedAssignments, - qid: &VQueueId, - ) -> Result<(), StorageError> { + /// In the inbox stage this consumes the current head (if any) and exposes + /// the next cached item; the cache is refilled from storage in batches of + /// up to [`INBOX_CACHE_CAPACITY`] when it empties. The `skip` set is + /// consulted only when reading the inbox stage; it is ignored when + /// reading the running stage. + /// + /// + /// Returns false if advance was not possible and we need to perform a refill. + pub fn try_advance(&mut self) -> Result { + // Split into disjoint borrows so the `Stage::Inbox` arm below can + // mutate both `stage` and `inbox_cache` without fighting the + // borrow checker. + let Self { stage, items, .. } = self; loop { - match self.reader { - Reader::New { already_running } if already_running > 0 => { - let mut reader = storage.new_run_reader(qid); + match stage { + Stage::New { + already_running, + reader, + } => { + let mut reader = reader.take().unwrap(); reader.seek_to_first(); - let item = reader.peek()?; - if let Some((key, value)) = item { - self.head = Head::Known { key, value }; - self.reader = Reader::Running { - remaining: already_running, + if let Some((key, value)) = reader.peek()? { + *stage = Stage::Running { + head: (key, value), reader, + remaining: already_running.get(), }; - break; - } else { - assert!( - already_running > 0, - "vqueue {qid:?} has no running items but its metadata says that it has {already_running} running items", - ); - // move to inbox reading - self.head = Head::Unknown; - self.reader = Reader::Closed; + return Ok(true); } + debug_assert!( + false, + "vqueue has no running items but its metadata says it has {}", + already_running.get(), + ); + *stage = Stage::Inbox; } - Reader::New { .. } => { - // create new inbox reader - self.reader = Reader::Closed; - } - Reader::Running { - ref mut reader, - ref mut remaining, + Stage::Running { + reader, + remaining, + head, } => { reader.advance(); *remaining = remaining.saturating_sub(1); - let item = reader.peek()?; - if let Some((key, value)) = item { - debug_assert!(*remaining > 0); - self.head = Head::Known { key, value }; - break; - } else { - debug_assert_eq!(0, *remaining); - // move to inbox reading - self.head = Head::Unknown; - self.reader = Reader::Closed; - } - } - Reader::Inbox(ref mut reader) => { - reader.advance(); - let key = reader.current_key()?; - if let Some(key) = key { - if skip.contains_key(&key) { - continue; - } - self.head = Head::Known { - key, - value: reader.current_value()?.unwrap(), - }; - break; - } else { - // we are done reading inbox - self.head = Head::Empty; - self.reader = Reader::Closed; - break; - } - } - Reader::Closed => { - match self.head { - Head::Unknown => { - let mut reader = storage.new_inbox_reader(qid); - reader.seek_to_first(); - let key = reader.current_key()?; - if let Some(key) = key { - if skip.contains_key(&key) { - self.reader = Reader::Inbox(reader); - continue; - } - self.head = Head::Known { - key, - value: reader.current_value()?.unwrap(), - }; - self.reader = Reader::Inbox(reader); - break; - } else { - self.head = Head::Empty; - self.reader = Reader::Closed; - } - } - Head::Known { ref key, .. } => { - // seek to known head first, then advance. - let mut reader = storage.new_inbox_reader(qid); - reader.seek_after(qid, key); - let next_key = reader.current_key()?; - if let Some(next_key) = next_key { - if skip.contains_key(&next_key) { - self.reader = Reader::Inbox(reader); - continue; - } - self.head = Head::Known { - key: next_key, - value: reader.current_value()?.unwrap(), - }; - self.reader = Reader::Inbox(reader); - break; - } else { - self.head = Head::Empty; - self.reader = Reader::Closed; - } + match reader.peek()? { + Some(next) => { + debug_assert!(*remaining > 0); + *head = next; + return Ok(true); } - Head::Empty => { - // do nothing. - return Ok(()); + None => { + debug_assert_eq!(0, *remaining); + *stage = Stage::Inbox; } } } + Stage::Inbox => return Ok(items.pop_front().is_some()), + Stage::Empty => return Ok(true), } } - Ok(()) } - pub(crate) fn remaining_in_running_stage(&self) -> u32 { - match self.reader { - Reader::New { already_running } => already_running, - Reader::Running { remaining, .. } => remaining, - Reader::Inbox(..) => 0, - Reader::Closed => 0, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use restate_clock::RoughTimestamp; - use restate_clock::time::MillisSinceEpoch; - use restate_core::TaskCenter; - use restate_partition_store::{PartitionDb, PartitionStore, PartitionStoreManager}; - use restate_rocksdb::RocksDbManager; - use restate_storage_api::Transaction; - use restate_storage_api::vqueue_table::{ - EntryId, EntryKey, EntryKind, EntryMetadata, EntryValue, Stage, Status, WriteVQueueTable, - stats::EntryStatistics, - }; - use restate_types::clock::UniqueTimestamp; - use restate_types::identifiers::{PartitionId, PartitionKey}; - use restate_types::partitions::Partition; - use restate_types::sharding::KeyRange; - use restate_types::vqueues::VQueueId; - - const BASE_RUN_AT_MS: u64 = 1_744_000_000_000; - - fn test_entry_at(id: u8, has_lock: bool, run_at: MillisSinceEpoch) -> (EntryKey, EntryValue) { - let run_at = RoughTimestamp::from_unix_millis_clamped(run_at); - let created_at = UniqueTimestamp::try_from(1_000u64 + id as u64).unwrap(); - let entry_id = EntryId::new(EntryKind::Invocation, [id; EntryId::REMAINDER_LEN]); - let key = EntryKey::new(has_lock, run_at, id as u64, entry_id); - let stats = EntryStatistics::new(created_at, run_at); - let value = EntryValue { - status: if stats.first_runnable_at > created_at.to_unix_millis() { - Status::Scheduled - } else { - Status::New - }, - metadata: EntryMetadata::default(), - stats, + /// Refills `cache` with up to [`INBOX_CACHE_CAPACITY`] items from storage if + /// it's possible to do so without blocking the thread. + /// + /// Starting at `cache.refill_anchor` (or the very first key if no anchor is + /// set yet). Returns true if items were added. Items in the `skip` set are + /// not added to the cache, but the anchor advances past them so they are not + /// reconsidered on the next refill. + fn try_refill( + &mut self, + storage: &S, + qid: &VQueueId, + skip: &UnconfirmedAssignments, + allow_blocking_io: bool, + ) -> Result<(), CursorError> { + let start = Instant::now(); + let mut reader = storage.new_inbox_reader(qid, vqueue_table::Options { allow_blocking_io }); + let RefillState::Standby { refill_anchor } = &mut self.refill_state else { + panic!("refill state must be standby"); }; - (key, value) - } - - fn test_entry(id: u8, has_lock: bool, run_at_ms: u64) -> (EntryKey, EntryValue) { - test_entry_at( - id, - has_lock, - MillisSinceEpoch::new(BASE_RUN_AT_MS + run_at_ms), - ) - } - - fn default_entry(id: u8) -> (EntryKey, EntryValue) { - test_entry(id, false, 0) - } - - fn test_qid(partition_key: u64) -> VQueueId { - VQueueId::custom(partition_key, "1") - } - - async fn storage_test_environment() -> PartitionStore { - let rocksdb_manager = RocksDbManager::init(); - TaskCenter::set_on_shutdown(Box::pin(async { - rocksdb_manager.shutdown().await; - })); - - let manager = PartitionStoreManager::create() - .await - .expect("DB storage creation succeeds"); - manager - .open( - &Partition::new(PartitionId::MIN, KeyRange::new(0, PartitionKey::MAX - 1)), - None, - ) - .await - .expect("DB storage creation succeeds") - } - - async fn insert_entries( - rocksdb: &mut PartitionStore, - qid: &VQueueId, - stage: Stage, - entries: &[(EntryKey, EntryValue)], - ) { - let mut txn = rocksdb.transaction(); - for (key, value) in entries { - txn.put_vqueue_inbox(qid, stage, key, value); + match refill_anchor { + Some(anchor) => reader.seek_after(anchor), + None => reader.seek_to_first(), } - txn.commit().await.expect("commit should succeed"); - } - #[restate_core::test] - async fn test_queue_running_to_inbox_to_empty() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(1000); - let running_entry = default_entry(1); - let inbox_entry = default_entry(2); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Running, - std::slice::from_ref(&running_entry), - ) - .await; - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&inbox_entry), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(1); - let mut skip = UnconfirmedAssignments::new(); - - assert!(!queue.is_empty()); - - let head = queue.advance_if_needed(db, &skip, &qid).unwrap(); - assert!(matches!(head, QueueItem::Running { key, .. } if *key == running_entry.0)); - assert!( - matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running_entry.0) - ); + while self.items.len() < INBOX_CACHE_CAPACITY { + let Some((key, value)) = reader.peek()? else { + if self.items.is_empty() { + self.stage = Stage::Empty; + } + break; + }; - queue.advance(db, &skip, &qid).unwrap(); - assert!( - matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == inbox_entry.0) + if !skip.contains_key(&key) { + self.items.push_back((key, value)); + } + *refill_anchor = Some(refill_anchor.map_or(key, |a| a.max(key))); + reader.advance(); + } + tracing::debug!( + "non-blocking refill finished in {}, cache has {} items", + start.elapsed().friendly(), + self.items.len() ); - let Some(QueueItem::Inbox { key, .. }) = queue.head() else { - panic!("expected inbox head"); - }; - skip.insert(*key, Default::default()); - assert!(!queue.is_empty()); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(queue.is_empty()); - - let higher = default_entry(0); - assert!(queue.enqueue(&higher.0, &higher.1)); - let head = queue.advance_if_needed(db, &skip, &qid).unwrap(); - assert!(matches!(head, QueueItem::Inbox { key, .. } if *key == higher.0)); + Ok(()) } - #[restate_core::test] - async fn test_entry_key_ordering() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(2000); - let low = test_entry(1, false, 3_000); - let high = test_entry(2, false, 2_000); - let highest = test_entry(3, true, 9_000); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - &[low.clone(), high.clone(), highest.clone()], - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == highest.0)); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == high.0)); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == low.0)); - } + fn start_refill_task(&mut self, storage: &S, qid: &VQueueId) { + assert!(!self.refill_state.is_in_flight()); + let RefillState::Standby { refill_anchor } = self.refill_state else { + panic!("refill state must be standby"); + }; - #[restate_core::test] - async fn test_run_at_below_now_bumps_entry_higher_in_inbox() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(2_500); - let now = MillisSinceEpoch::now().as_u64(); - let future_entry = - test_entry_at(1, false, MillisSinceEpoch::new(now.saturating_add(60_000))); - let overdue_entry = - test_entry_at(2, false, MillisSinceEpoch::new(now.saturating_sub(1_000))); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - &[future_entry.clone(), overdue_entry.clone()], - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!( - matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == overdue_entry.0) + let mut reader = storage.new_inbox_reader( + qid, + vqueue_table::Options { + allow_blocking_io: true, + }, ); - queue.advance(db, &skip, &qid).unwrap(); - assert!( - matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == future_entry.0) - ); - } + let handle = tokio::task::spawn_blocking(move || { + // collect and send the results at the end + let mut results = Vec::with_capacity(INBOX_CACHE_CAPACITY); - #[restate_core::test] - async fn test_enqueue_replaces_head_on_smaller_key() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(3000); - let initial = test_entry(1, false, 3_000); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&initial), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == initial.0)); - - let higher = test_entry(2, false, 2_000); - assert!(queue.enqueue(&higher.0, &higher.1)); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == higher.0)); - assert!( - matches!(queue.advance_if_needed(db, &skip, &qid).unwrap(), QueueItem::Inbox { key, .. } if *key == higher.0) - ); + match refill_anchor { + Some(anchor) => reader.seek_after(&anchor), + None => reader.seek_to_first(), + } - let lower = test_entry(3, false, 4_000); - assert!(!queue.enqueue(&lower.0, &lower.1)); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == higher.0)); + while results.len() < INBOX_CACHE_CAPACITY { + // In this mode, we don't expect to see WouldBlock + match reader.peek() { + Ok(Some((key, value))) => { + results.push((key, value)); + reader.advance(); + } + Ok(None) => { + // no more items + break; + } + Err(CursorError::WouldBlock) => { + unreachable!("background refill task should never see WouldBlock"); + } + Err(CursorError::Other(e)) => { + tracing::error!("refill task failed: {e}"); + return Err(e); + } + } + } + Ok(results) + }); + + let task = Box::new(RefillTask { + started_at: Instant::now(), + refill_anchor, + horizon: None, + overlay: VecDeque::with_capacity(INBOX_CACHE_CAPACITY), + handle, + }); + + self.refill_state = RefillState::InFlight(task); } - #[restate_core::test] - async fn test_remove() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(4000); - let entry = default_entry(1); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&entry), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let mut skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - let head_key = match queue.head() { - Some(QueueItem::Inbox { key, .. }) => *key, - _ => panic!("expected inbox head"), + fn poll_refill_task( + &mut self, + cx: &mut std::task::Context<'_>, + skip: &UnconfirmedAssignments, + ) -> Poll<()> { + let (items, overlay, mut refill_anchor, horizon) = match self.refill_state { + RefillState::Standby { .. } => return Poll::Ready(()), + RefillState::InFlight(ref mut task) => { + match ready!(Pin::new(&mut task.handle).poll(cx)) { + Err(join_err) => { + tracing::error!("refill task panicked: {join_err}"); + self.refill_state = RefillState::Standby { + refill_anchor: task.refill_anchor.take(), + }; + return Poll::Ready(()); + } + Ok(Err(err)) => { + tracing::error!("refill task failed: {err}"); + self.refill_state = RefillState::Standby { + refill_anchor: task.refill_anchor.take(), + }; + return Poll::Ready(()); + } + Ok(Ok(result)) => { + tracing::debug!( + "refill task finished with {} items in {}", + result.len(), + task.started_at.elapsed().friendly() + ); + ( + result, + std::mem::take(&mut task.overlay), + task.refill_anchor.take(), + task.horizon.take(), + ) + } + } + } }; + // If we got less items than what we asked, then this must have been the end of the queue + // at the time of the refill. + let end_of_queue = items.len() < INBOX_CACHE_CAPACITY; - assert!(!queue.remove(&default_entry(99).0)); - assert!(queue.remove(&head_key)); - assert!(queue.head().is_none()); - assert!( - matches!(queue.advance_if_needed(db, &skip, &qid).unwrap(), QueueItem::Inbox { key, .. } if *key == entry.0) - ); + // We now need to merge the items we received, apply the overlays, and deduplicate + // with existing entries in cache. + // At the end, we figure out what's the next anchor to use for the next refill. + // + // + // The strategy here is more complex that the blocking/inline version because + // concurrency is hard, who knew! + // We need to do all that while keeping the cache capacity in check. We don't + // want to re-allocate/resize the cache. + // + // We navigate both the overlays and the items in together in semi-lockstep. + // Technically, this is a LSM-style read/compaction algorithm. + for either in items + .into_iter() + .merge_join_by(overlay, |(key, _), (overlay_key, _)| key.cmp(overlay_key)) + { + // First key at-or-above the horizon ends the merge: overlay + // entries are all below it by construction, and the merge + // walks ascending, so the rest is storage that the next + // refill will rediscover via `seek_after`. + let key = match &either { + EitherOrBoth::Left((k, _)) + | EitherOrBoth::Right((k, _)) + | EitherOrBoth::Both((k, _), _) => *k, + }; + if horizon.is_some_and(|h| key >= h) { + break; + } + // Left is the item from db, right is the overlay + match either { + // Somewhat similar to a normal enqueue + EitherOrBoth::Left((key, value)) + | EitherOrBoth::Right((key, Overlay::Add(value))) + // overlay always wins. + | EitherOrBoth::Both((key, _), (_, Overlay::Add(value))) => { + if skip.contains_key(&key) { + // The key was already dispatched, skip it. + refill_anchor = Some(refill_anchor.map_or(key, |a| a.max(key))); + continue; + } + // Insert sorted in cache and ignore it if we already have it. + // If this item pushes us over the cache capcity, then we ignore it and reset + // the refill anchor to it. + let pos = match self.items.binary_search_by_key(&key, |&(k, _)| k) { + Ok(_) => continue, + Err(pos) => pos, + }; + if self.items.len() >= INBOX_CACHE_CAPACITY { + if pos == self.items.len() { + // beyond capacity, ignore the item. Reset the anchor. + refill_anchor = self.items.back().map(|(k, _)| *k); + break; + } + // Make room first; + self.items.pop_back(); + self.items.insert(pos, (key, value)); + refill_anchor = self.items.back().map(|(k, _)| *k); + break; + } - skip.insert(entry.0, Default::default()); - assert!(queue.remove(&head_key)); - assert!(queue.head().is_none()); - assert!(matches!( - queue.advance_if_needed(db, &skip, &qid).unwrap(), - QueueItem::None - )); - } + // Normal path: cache below capacity. + self.items.insert(pos, (key, value)); + refill_anchor = Some(key); + } + EitherOrBoth::Right((key, Overlay::Tombstone)) + // overlay always wins. + | EitherOrBoth::Both((key, _), (_, Overlay::Tombstone)) => { + // In theory, we should never see a tombstone that impacts the existing + // cache. + debug_assert!(self.items.binary_search_by_key(&key, |&(k, _)| k).is_err()); + // we should push the anchor to this item. + refill_anchor = Some(key); + } + } + } - #[restate_core::test] - async fn test_skip_set() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(5000); - let entry1 = default_entry(1); - let entry2 = default_entry(2); - let entry3 = default_entry(3); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - &[entry1.clone(), entry2.clone(), entry3.clone()], - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(0); - let mut skip = UnconfirmedAssignments::new(); - skip.insert(entry1.0, Default::default()); - skip.insert(entry2.0, Default::default()); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == entry3.0)); - } + // It's very important is that we must reset the task to standby + self.refill_state = RefillState::Standby { + refill_anchor: refill_anchor.or(self.items.back().map(|(k, _)| *k)), + }; - #[restate_core::test] - async fn test_running_before_inbox_regardless_of_key() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(6000); - let running_entry = default_entry(1); - let inbox_entry = test_entry(2, true, 0); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Running, - std::slice::from_ref(&running_entry), - ) - .await; - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&inbox_entry), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(1); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Running { .. }))); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Inbox { .. }))); - } + // at the end, if the cache is empty and end_of_queue is true, then we must + // have exhausted the inbox. + if self.items.is_empty() && end_of_queue { + self.stage = Stage::Empty; + } - #[restate_core::test] - async fn test_enqueue_and_remove_ignored_during_running_stage() { - let mut rocksdb = storage_test_environment().await; - - let qid = test_qid(7000); - let running1 = default_entry(1); - let running2 = default_entry(2); - let inbox_entry = test_entry(10, true, 0); - - insert_entries( - &mut rocksdb, - &qid, - Stage::Running, - &[running1.clone(), running2.clone()], - ) - .await; - insert_entries( - &mut rocksdb, - &qid, - Stage::Inbox, - std::slice::from_ref(&inbox_entry), - ) - .await; - - let db = rocksdb.partition_db(); - let mut queue: Queue = Queue::new(2); - let skip = UnconfirmedAssignments::new(); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running1.0)); - - let even_higher = test_entry(11, true, 0); - assert!(!queue.enqueue(&even_higher.0, &even_higher.1)); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running1.0)); - - assert!(!queue.remove(&running2.0)); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running1.0)); - - assert!(!queue.remove(&running1.0)); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running1.0)); - - queue.advance(db, &skip, &qid).unwrap(); - assert!(matches!(queue.head(), Some(QueueItem::Running { key, .. }) if *key == running2.0)); - - queue.advance(db, &skip, &qid).unwrap(); - assert!( - matches!(queue.head(), Some(QueueItem::Inbox { key, .. }) if *key == inbox_entry.0) - ); + Poll::Ready(()) + } - assert!(queue.remove(&inbox_entry.0)); - assert!(queue.head().is_none()); + pub(crate) fn remaining_in_running_stage(&self) -> u32 { + match &self.stage { + Stage::New { + already_running, .. + } => already_running.get(), + Stage::Running { remaining, .. } => *remaining, + Stage::Inbox | Stage::Empty => 0, + } } } + +#[cfg(test)] +#[path = "queue_test.rs"] +mod queue_test; diff --git a/crates/vqueues/src/scheduler/queue_test.rs b/crates/vqueues/src/scheduler/queue_test.rs new file mode 100644 index 0000000000..38551902a7 --- /dev/null +++ b/crates/vqueues/src/scheduler/queue_test.rs @@ -0,0 +1,403 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Test harness for the vqueue refill machinery. +//! +//! These tests use a fake [`VQueueStore`] (`GatedStore`) that lets the +//! caller freeze the snapshot a refill task sees and interleave +//! `notify_enqueued` / `notify_removed` events deterministically against +//! the in-flight refill thread. That's what makes overlay and cancellation +//! behaviour testable without a real RocksDB. +//! +//! Each [`GatedReader`] is created with `Options::allow_blocking_io` set +//! by the queue itself: +//! - The first attempt (`try_refill`) passes `allow_blocking_io: false`; +//! the fake responds with [`CursorError::WouldBlock`] which drives the +//! queue down the async refill path. +//! - The async path then constructs a second reader with +//! `allow_blocking_io: true`; that reader parks on its first `peek` +//! (signalling the test that the snapshot is frozen) and resumes once +//! the test releases the latch. + +use std::sync::{Arc, Condvar, Mutex}; +use std::task::{Context, Poll, Waker}; + +use restate_clock::RoughTimestamp; +use restate_clock::time::MillisSinceEpoch; +use restate_storage_api::StorageError; +use restate_storage_api::vqueue_table::stats::EntryStatistics; +use restate_storage_api::vqueue_table::{ + CursorError, EntryId, EntryKey, EntryKind, EntryMetadata, EntryValue, Options, Status, + VQueueCursor, VQueueRunningCursor, VQueueStore, +}; +use restate_types::clock::UniqueTimestamp; +use restate_types::vqueues::VQueueId; + +use super::*; + +// ---------- helpers ---------- + +const BASE_RUN_AT_MS: u64 = 1_744_000_000_000; + +/// Builds an entry whose sort position is determined entirely by `seq` +/// (other `EntryKey` components are fixed). +fn entry_at_seq(seq: u64) -> (EntryKey, EntryValue) { + let run_at = RoughTimestamp::from_unix_millis_clamped(MillisSinceEpoch::new(BASE_RUN_AT_MS)); + let created_at = UniqueTimestamp::try_from(1_000u64 + seq).unwrap(); + let entry_id = EntryId::new(EntryKind::Invocation, [0u8; EntryId::REMAINDER_LEN]); + let key = EntryKey::new(false, run_at, seq, entry_id); + let stats = EntryStatistics::new(created_at, run_at); + let value = EntryValue { + status: Status::New, + metadata: EntryMetadata::default(), + stats, + }; + (key, value) +} + +fn test_qid(partition_key: u64) -> VQueueId { + VQueueId::custom(partition_key, "1") +} + +/// Polls the queue once and asserts it returned `Pending`. Used to kick +/// off an in-flight refill task so the test can interleave events while +/// the refill thread is parked at its gate. +fn poll_once_expect_pending( + queue: &mut Queue, + storage: &S, + skip: &UnconfirmedAssignments, + qid: &VQueueId, +) { + let mut cx = Context::from_waker(Waker::noop()); + match queue.poll_advance_if_needed(&mut cx, storage, skip, qid, false, false) { + Poll::Pending => {} + Poll::Ready(Ok(item)) => panic!("expected Pending, got Ready({item:?})"), + Poll::Ready(Err(e)) => panic!("expected Pending, got error: {e:?}"), + } +} + +/// Drives `poll_advance_if_needed` until it returns `Ready`. Yields between +/// Pending iterations so the `spawn_blocking` refill task can make progress. +async fn drive_until_ready( + queue: &mut Queue, + storage: &S, + skip: &UnconfirmedAssignments, + qid: &VQueueId, +) { + let mut cx = Context::from_waker(Waker::noop()); + loop { + match queue.poll_advance_if_needed(&mut cx, storage, skip, qid, false, false) { + Poll::Ready(Ok(_)) => return, + Poll::Ready(Err(e)) => panic!("queue error: {e:?}"), + Poll::Pending => tokio::time::sleep(std::time::Duration::from_millis(1)).await, + } + } +} + +/// Pops everything currently in the cache (no refill, no I/O) and returns +/// the keys in head-to-tail order. +fn drain_cache(queue: &mut Queue) -> Vec { + let mut keys = vec![]; + while let Some(QueueItem::Inbox { key, .. }) = queue.head() { + keys.push(*key); + queue.try_advance().unwrap(); + } + keys +} + +// ---------- fake VQueueStore ---------- + +/// In-memory storage with hooks to gate the refill thread at a precise +/// moment, so tests can deterministically interleave queue events against +/// an in-flight refill. +struct GatedStore { + /// Source of truth for what `new_inbox_reader` will snapshot. Sorted + /// ascending by `EntryKey`. + entries: Mutex>, + /// Latch released by the test when it wants the parked refill thread + /// to proceed past its first `peek`. + release: Arc<(Mutex, Condvar)>, + /// Signalled by the reader the moment it parks. The test waits on + /// this to know the snapshot is frozen and queue events can now be + /// safely interleaved. + parked: Arc<(Mutex, Condvar)>, +} + +impl GatedStore { + fn new(entries: Vec<(EntryKey, EntryValue)>) -> Self { + Self { + entries: Mutex::new(entries), + release: Arc::new((Mutex::new(false), Condvar::new())), + parked: Arc::new((Mutex::new(false), Condvar::new())), + } + } + + fn release_refill_thread(&self) { + let (lock, cv) = &*self.release; + *lock.lock().unwrap() = true; + cv.notify_all(); + } + + fn wait_until_parked(&self) { + let (lock, cv) = &*self.parked; + let mut parked = lock.lock().unwrap(); + while !*parked { + parked = cv.wait(parked).unwrap(); + } + } +} + +struct GatedReader { + /// Frozen at construction time (matches RocksDB snapshot semantics). + /// Sorted ascending by `EntryKey`. + snapshot: Vec<(EntryKey, EntryValue)>, + cursor: usize, + /// `true` for blocking readers (those constructed via the async refill + /// path with `allow_blocking_io: true`); they park at the gate on the + /// first `peek` so the test can pin the snapshot moment. Non-blocking + /// readers always return `WouldBlock` instead, driving the queue down + /// the async refill path. + blocking: bool, + release: Arc<(Mutex, Condvar)>, + parked: Arc<(Mutex, Condvar)>, + /// Whether this reader has parked yet. Each reader parks at most + /// once, on its first peek/key/value call. + has_parked: bool, +} + +impl GatedReader { + fn block_until_released(&mut self) { + if self.has_parked { + return; + } + self.has_parked = true; + // Tell the test we're parked. + let (lock, cv) = &*self.parked; + *lock.lock().unwrap() = true; + cv.notify_all(); + // Wait for the test to release us. + let (lock, cv) = &*self.release; + let mut released = lock.lock().unwrap(); + while !*released { + released = cv.wait(released).unwrap(); + } + } +} + +impl VQueueCursor for GatedReader { + fn seek_to_first(&mut self) { + self.cursor = 0; + } + + fn seek_after(&mut self, anchor: &EntryKey) { + self.cursor = self + .snapshot + .iter() + .position(|(k, _)| k > anchor) + .unwrap_or(self.snapshot.len()); + } + + fn advance(&mut self) { + if self.cursor < self.snapshot.len() { + self.cursor += 1; + } + } + + fn peek(&mut self) -> Result, CursorError> { + if !self.blocking { + return Err(CursorError::WouldBlock); + } + self.block_until_released(); + Ok(self.snapshot.get(self.cursor).cloned()) + } +} + +/// Stub for the running stage. Tests construct queues with +/// `num_running = 0`, so this is never actually exercised; it only exists +/// to satisfy the [`VQueueStore`] trait bound. +struct StubRunningReader; + +impl VQueueRunningCursor for StubRunningReader { + fn seek_to_first(&mut self) {} + fn peek(&mut self) -> Result, StorageError> { + Ok(None) + } + fn advance(&mut self) {} +} + +impl VQueueStore for GatedStore { + type RunningReader = StubRunningReader; + type InboxReader = GatedReader; + + fn new_run_reader(&self, _qid: &VQueueId) -> Self::RunningReader { + StubRunningReader + } + + fn new_inbox_reader(&self, _qid: &VQueueId, opts: Options) -> Self::InboxReader { + // Snapshot freezes here. + let snapshot = self.entries.lock().unwrap().clone(); + GatedReader { + snapshot, + cursor: 0, + blocking: opts.allow_blocking_io, + release: self.release.clone(), + parked: self.parked.clone(), + has_parked: false, + } + } +} + +// ---------- tests ---------- + +/// Sanity: a single in-flight refill with no concurrent events surfaces +/// every storage row in the cache, in `EntryKey` order. +#[restate_core::test] +async fn refill_without_overlay_activity() { + let entries: Vec<_> = (1..=5).map(entry_at_seq).collect(); + let storage = GatedStore::new(entries.clone()); + let qid = test_qid(1); + let mut queue: Queue = Queue::new(0, &storage, &qid); + let skip = UnconfirmedAssignments::new(); + + poll_once_expect_pending(&mut queue, &storage, &skip, &qid); + storage.wait_until_parked(); + storage.release_refill_thread(); + drive_until_ready(&mut queue, &storage, &skip, &qid).await; + + let drained = drain_cache(&mut queue); + assert_eq!( + drained, + entries.iter().map(|(k, _)| *k).collect::>(), + "cache should contain every storage row in order", + ); +} + +/// Tombstones in the overlay correctly suppress the matching storage row +/// during merge — no overflow happens here, so the overlay's information +/// is fully preserved. This is the "happy path" for the +/// commit-before-notify invariant. +#[restate_core::test] +async fn tombstone_in_overlay_suppresses_storage_row() { + let r_target = entry_at_seq(100); + let storage = GatedStore::new(vec![r_target.clone()]); + let qid = test_qid(2); + let mut queue: Queue = Queue::new(0, &storage, &qid); + let skip = UnconfirmedAssignments::new(); + + poll_once_expect_pending(&mut queue, &storage, &skip, &qid); + storage.wait_until_parked(); + // Storage commit (modelled here as the snapshot being frozen with + // r_target visible) lands first; the notify_removed lands second. + queue.remove(&r_target.0); + storage.release_refill_thread(); + drive_until_ready(&mut queue, &storage, &skip, &qid).await; + + let drained = drain_cache(&mut queue); + assert!( + !drained.contains(&r_target.0), + "deleted row should be suppressed by overlay tombstone, got: {drained:?}", + ); +} + +/// **Bug demonstration.** When the overlay is at capacity and the back +/// entry is a tombstone, `push_added_item`'s `pop_back` silently drops +/// that tombstone. The merge then lets the (already-deleted) row into +/// the cache. +/// +/// Layout at the moment of overflow: +/// +/// ```text +/// overlay[0] = Tombstone(seq=50) // pre-tombstone +/// overlay[1] = Tombstone(seq=100) // pre-tombstone +/// overlay[2..CAPACITY-1] = Add(seq=150..) // CAPACITY-3 adds +/// overlay[CAPACITY-1] = Tombstone(seq=500) // *r_target* +/// ``` +/// +/// The trigger add (seq=400) sorts at `pos = CAPACITY-1`, which is +/// `< overlay.len() == CAPACITY`, so `push_added_item` evicts the back +/// (`Tombstone(500)`) and inserts the trigger. After this, the overlay +/// holds only `[T(50), T(100), Add(150..), Add(400)]` — the tombstone +/// for `r_target` is gone. +/// +/// Storage is `[r_target=seq500]` (a single row that's been deleted but +/// is still in the in-flight task's snapshot, faithful to the race the +/// invariant doc on `RefillTask` describes). The merge produces: +/// +/// ```text +/// [T(50), T(100), Add(150..), Add(400), Left(seq500)] +/// ``` +/// +/// With the tombstone evicted there's nothing to suppress `Left(seq500)`, +/// so the merge inserts it into the cache. The drain then sees `seq500`, +/// which is the bug. +#[restate_core::test] +async fn tombstone_evicted_on_overlay_overflow_leaks_deleted_row() { + // Overlay layout requires CAPACITY >= 4 (2 front tombstones + at + // least one add + 1 back tombstone). Bail loudly if someone shrinks + // the cache below that. + const { assert!(INBOX_CACHE_CAPACITY >= 4) }; + + // Front-anchor tombstones: low seqs so they sort to the front of the + // overlay and don't influence the eviction we want to trigger. + let pre_tombstone_a = entry_at_seq(50); + let pre_tombstone_b = entry_at_seq(100); + + // The deleted row that the in-flight task's snapshot still sees. + let r_target = entry_at_seq(500); + + // Adds that fit between the front tombstones and r_target's + // tombstone. Together with the three tombstones the overlay reaches + // exactly CAPACITY. We use a contiguous seq range starting at 150. + let n_adds = INBOX_CACHE_CAPACITY - 3; + let pre_adds: Vec<_> = (150..150 + n_adds as u64).map(entry_at_seq).collect(); + + // The trigger: an add that sorts BEFORE r_target's tombstone but + // after every other overlay entry. With `pos < overlay.len()` and + // the overlay at cap, `push_added_item` will `pop_back` — evicting + // `Tombstone(r_target)`. + let trigger_add = entry_at_seq(400); + + let storage = GatedStore::new(vec![r_target.clone()]); + let qid = test_qid(3); + let mut queue: Queue = Queue::new(0, &storage, &qid); + let skip = UnconfirmedAssignments::new(); + + // Kick off the in-flight refill so subsequent enqueue/remove events + // route through the overlay rather than the cache. + poll_once_expect_pending(&mut queue, &storage, &skip, &qid); + storage.wait_until_parked(); + + // Three tombstones; only Tombstone(r_target) matters for the bug. + queue.remove(&pre_tombstone_a.0); + queue.remove(&pre_tombstone_b.0); + queue.remove(&r_target.0); + + // Fill the overlay up to CAPACITY with the front-of-r_target adds. + for (k, v) in &pre_adds { + queue.enqueue(k, v); + } + + // Trigger the eviction. After this, Tombstone(r_target) is gone. + queue.enqueue(&trigger_add.0, &trigger_add.1); + + // Release the refill thread; it returns the snapshot ([r_target]). + storage.release_refill_thread(); + drive_until_ready(&mut queue, &storage, &skip, &qid).await; + + let drained = drain_cache(&mut queue); + + // Bug: r_target shows up in the cache even though we issued + // `notify_removed` for it before the merge ran. The matching + // tombstone was dropped on overlay overflow. + assert!( + !drained.contains(&r_target.0), + "deleted row leaked into cache: {drained:?}", + ); +} diff --git a/crates/vqueues/src/scheduler/vqueue_state.rs b/crates/vqueues/src/scheduler/vqueue_state.rs index b83fc73696..2264ddf0cc 100644 --- a/crates/vqueues/src/scheduler/vqueue_state.rs +++ b/crates/vqueues/src/scheduler/vqueue_state.rs @@ -8,14 +8,15 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::task::{Poll, ready}; use std::time::Duration; use enum_map::{Enum, EnumMap}; use metrics::counter; +use restate_storage_api::StorageError; use tokio::time::Instant; use restate_clock::RoughTimestamp; -use restate_storage_api::StorageError; use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueStore, stats::WaitStats}; use restate_types::vqueues::{EntryId, VQueueId}; use restate_worker_api::ResourceKind; @@ -228,9 +229,10 @@ impl VQueueState { qid: VQueueId, handle: VQueueHandle, meta: VQueueMetaLite, + storage: &S, num_running: u32, ) -> Self { - let queue = Queue::new(num_running); + let queue = Queue::new(num_running, storage, &qid); Self { handle, qid, @@ -263,7 +265,6 @@ impl VQueueState { pub fn try_pop( &mut self, cx: &mut std::task::Context<'_>, - storage: &S, resources: &mut ResourceManager, ) -> Result { let (inbox_head_key, inbox_head_value, is_running) = match self.queue.head() { @@ -291,8 +292,7 @@ impl VQueueState { key: *inbox_head_key, next_run_at: None, }; - self.queue - .advance(storage, &self.unconfirmed_assignments, &self.qid)?; + self.queue.try_advance()?; return Ok(Pop::Yield(action)); } @@ -313,8 +313,7 @@ impl VQueueState { wait_stats: self.head_stats.finalize(), }; - self.queue - .advance(storage, &self.unconfirmed_assignments, &self.qid)?; + self.queue.try_advance()?; Ok(Pop::Run(action)) } AcquireOutcome::BlockedOn(resource) => { @@ -335,11 +334,21 @@ impl VQueueState { && self.unconfirmed_assignments.is_empty() } - pub fn poll_eligibility(&mut self, storage: &S) -> Result { - self.queue - .advance_if_needed(storage, &self.unconfirmed_assignments, &self.qid)?; - - Ok(self.check_eligibility()) + pub fn poll_eligibility( + &mut self, + cx: &mut std::task::Context<'_>, + storage: &S, + ) -> Poll> { + ready!(self.queue.poll_advance_if_needed( + cx, + storage, + &self.unconfirmed_assignments, + &self.qid, + self.num_waiting_inbox() == 0, + false, + ))?; + + Poll::Ready(Ok(self.check_eligibility().as_compact())) } pub fn check_eligibility(&self) -> DetailedEligibility { From 0a68e950b4cfb624d85e19ac276249c569837905 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 6 May 2026 11:05:32 +0100 Subject: [PATCH 2/2] [VQueues] Introduce sys_vqueues datafusion table Expose vqueue entries as a single DataFusion table with stage-aware scanning. When the query filters `stage`, only matching stage key kinds are scanned; without a stage filter, all inbox stages are scanned and merged. Also project the latest entry metadata for observability (status plus EntryStatistics timestamps and counters), and add targeted tests for stage predicate extraction and sys_vqueues stage filtering behavior. --- .../src/tests/vqueue_table_test/mod.rs | 64 ++++++ .../partition-store/src/vqueue_table/mod.rs | 115 ++++++++++- crates/storage-api/src/vqueue_table/entry.rs | 2 +- crates/storage-api/src/vqueue_table/tables.rs | 17 ++ .../storage-query-datafusion/src/context.rs | 6 + crates/storage-query-datafusion/src/filter.rs | 152 +++++++++++++- crates/storage-query-datafusion/src/lib.rs | 1 + .../src/table_docs.rs | 3 +- .../src/vqueues/mod.rs | 18 ++ .../src/vqueues/row.rs | 101 +++++++++ .../src/vqueues/schema.rs | 79 +++++++ .../src/vqueues/table.rs | 159 +++++++++++++++ .../src/vqueues/tests.rs | 192 ++++++++++++++++++ 13 files changed, 905 insertions(+), 4 deletions(-) create mode 100644 crates/storage-query-datafusion/src/vqueues/mod.rs create mode 100644 crates/storage-query-datafusion/src/vqueues/row.rs create mode 100644 crates/storage-query-datafusion/src/vqueues/schema.rs create mode 100644 crates/storage-query-datafusion/src/vqueues/table.rs create mode 100644 crates/storage-query-datafusion/src/vqueues/tests.rs diff --git a/crates/partition-store/src/tests/vqueue_table_test/mod.rs b/crates/partition-store/src/tests/vqueue_table_test/mod.rs index 1830213153..67a784b8ed 100644 --- a/crates/partition-store/src/tests/vqueue_table_test/mod.rs +++ b/crates/partition-store/src/tests/vqueue_table_test/mod.rs @@ -27,12 +27,14 @@ use restate_clock::time::MillisSinceEpoch; use restate_storage_api::Transaction; +use restate_storage_api::vqueue_table::ScanVQueueInboxStages; use restate_storage_api::vqueue_table::{ EntryKey, EntryMetadata, EntryValue, Options, Stage, Status, VQueueCursor, VQueueRunningCursor, VQueueStore, WriteVQueueTable, stats::EntryStatistics, }; use restate_types::clock::UniqueTimestamp; use restate_types::identifiers::PartitionKey; +use restate_types::sharding::KeyRange; use restate_types::vqueues::{EntryId, EntryKind, VQueueId}; use crate::PartitionStore; @@ -527,6 +529,67 @@ async fn existing_reader_does_not_see_post_snapshot_writes(rocksdb: &mut Partiti assert_eq!(collect_ids(&items), vec![entry_id(1), entry_id(2)]); } +/// Test: Stage scan reads only the requested stage key kind. +/// +/// This validates the datafusion-oriented scan API and ensures stage-specific +/// scans do not leak rows from adjacent stage key kinds or partition keys. +async fn stage_scan_is_filtered_by_stage(rocksdb: &mut PartitionStore) { + let target_partition_key = PartitionKey::from(9_300u64); + let other_partition_key = PartitionKey::from(9_301u64); + let target_qid = VQueueId::custom(target_partition_key, "scan-target"); + let other_qid = VQueueId::custom(other_partition_key, "scan-target"); + + let stages = [ + Stage::Inbox, + Stage::Running, + Stage::Suspended, + Stage::Paused, + Stage::Finished, + ]; + + { + let mut txn = rocksdb.transaction(); + for (index, stage) in stages.into_iter().enumerate() { + let entry_id = 100 + index as u8; + let target_entry = default_entry(entry_id); + let other_entry = default_entry(entry_id + 10); + + txn.put_vqueue_inbox(&target_qid, stage, &target_entry.0, &target_entry.1); + txn.put_vqueue_inbox(&other_qid, stage, &other_entry.0, &other_entry.1); + } + txn.commit().await.expect("commit should succeed"); + } + + let range = KeyRange::from(target_partition_key..=target_partition_key); + + for (index, stage) in stages.into_iter().enumerate() { + let expected_key = default_entry(100 + index as u8).0; + let rows = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let rows_for_scan = rows.clone(); + + rocksdb + .for_each_vqueue_inbox_entry(range, stage, move |(qid, got_stage, key, _)| { + rows_for_scan + .lock() + .expect("stage scan lock should not be poisoned") + .push((qid.clone(), got_stage, *key)); + std::ops::ControlFlow::Continue(()) + }) + .expect("stage scan setup should succeed") + .await + .expect("stage scan should succeed"); + + let rows = rows + .lock() + .expect("stage scan lock should not be poisoned") + .clone(); + assert_eq!(rows.len(), 1, "stage {stage} should return one row"); + assert_eq!(rows[0].0, target_qid, "stage {stage} returned wrong qid"); + assert_eq!(rows[0].1, stage, "stage {stage} returned wrong stage"); + assert_eq!(rows[0].2, expected_key, "stage {stage} returned wrong key"); + } +} + pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { let mut txn = rocksdb.transaction(); @@ -553,6 +616,7 @@ pub(crate) async fn run_tests(mut rocksdb: PartitionStore) { verify_waiting_cursor_boundary_is_respected(db); verify_waiting_cursor_partition_prefix_boundary_is_respected(db); + stage_scan_is_filtered_by_stage(&mut rocksdb).await; // Snapshot-iterator tests — exercise the contract that a fresh reader // sees current storage and that an existing reader holds a fixed view. fresh_reader_sees_current_state(&mut rocksdb).await; diff --git a/crates/partition-store/src/vqueue_table/mod.rs b/crates/partition-store/src/vqueue_table/mod.rs index 57f115ac79..c1283c2368 100644 --- a/crates/partition-store/src/vqueue_table/mod.rs +++ b/crates/partition-store/src/vqueue_table/mod.rs @@ -18,6 +18,7 @@ mod running_reader; mod waiting_reader; use std::io::Cursor; +use std::pin::Pin; pub use entry::{EntryStatusKey, EntryStatusKeyRef, StatusHeaderRaw}; pub use inbox::InboxKey; @@ -32,12 +33,12 @@ use tracing::error; use restate_rocksdb::Priority; use restate_storage_api::StorageError; -use restate_storage_api::vqueue_table::ScanVQueueMetaTable; use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaRef, VQueueMetaUpdates}; use restate_storage_api::vqueue_table::{ EntryKey, EntryMetadata, EntryStatusHeader, EntryValue, LazyEntryStatus, ReadVQueueTable, ScanVQueueTable, Stage, Status, WriteVQueueTable, stats::EntryStatistics, }; +use restate_storage_api::vqueue_table::{ScanVQueueInboxStages, ScanVQueueMetaTable}; use restate_types::sharding::{KeyRange, PartitionKey}; use restate_types::vqueues::{EntryId, Seq, VQueueId}; @@ -474,6 +475,118 @@ impl ScanVQueueMetaTable for PartitionStore { } } +fn stage_from_key_kind(key_kind: KeyKind) -> Option { + match key_kind { + KeyKind::VQueueInboxStage => Some(Stage::Inbox), + KeyKind::VQueueRunningStage => Some(Stage::Running), + KeyKind::VQueueSuspendedStage => Some(Stage::Suspended), + KeyKind::VQueuePausedStage => Some(Stage::Paused), + KeyKind::VQueueFinishedStage => Some(Stage::Finished), + _ => None, + } +} + +fn scan_vqueue_inbox_stage<'store, K, F>( + partition_store: &'store PartitionStore, + scanner_name: &'static str, + range: KeyRange, + stage: Stage, + mut f: F, +) -> Result> + Send + 'store>>> +where + K: EncodeTableKeyPrefix + 'store, + F: for<'row> FnMut( + (&'row VQueueId, Stage, &'row EntryKey, &'row EntryValue), + ) -> std::ops::ControlFlow<()> + + Send + + Sync + + 'static, +{ + let future = partition_store + .iterator_for_each( + scanner_name, + Priority::Low, + TableScan::FullScanPartitionKeyRange::(range), + move |(mut key, mut value)| { + let key_kind = break_on_err(KeyKind::deserialize(&mut key))?; + let key_stage = break_on_err( + stage_from_key_kind(key_kind).ok_or(StorageError::DataIntegrityError), + )?; + if key_stage != stage { + return std::ops::ControlFlow::Break(Err(StorageError::DataIntegrityError)); + } + + let qid: VQueueId = break_on_err(crate::keys::deserialize(&mut key))?; + let entry_key: EntryKey = break_on_err(crate::keys::deserialize(&mut key))?; + let entry = break_on_err( + EntryValue::decode(&mut value).map_err(StorageError::BilrostDecode), + )?; + + f((&qid, stage, &entry_key, &entry)).map_break(Ok) + }, + ) + .map_err(|_| StorageError::OperationalError)?; + + Ok(Box::pin(future)) +} + +impl ScanVQueueInboxStages for PartitionStore { + fn for_each_vqueue_inbox_entry< + F: for<'a> FnMut( + (&'a VQueueId, Stage, &'a EntryKey, &'a EntryValue), + ) -> std::ops::ControlFlow<()> + + Send + + Sync + + 'static, + >( + &self, + range: KeyRange, + stage: Stage, + f: F, + ) -> Result> + Send> { + match stage { + Stage::Unknown => Err(StorageError::Generic(anyhow::anyhow!( + "Unknown stage can't be scanned" + ))), + Stage::Inbox => scan_vqueue_inbox_stage::( + self, + "df-vqueue-inbox", + range, + stage, + f, + ), + Stage::Running => scan_vqueue_inbox_stage::( + self, + "df-vqueue-running", + range, + stage, + f, + ), + Stage::Suspended => scan_vqueue_inbox_stage::( + self, + "df-vqueue-suspended", + range, + stage, + f, + ), + Stage::Paused => scan_vqueue_inbox_stage::( + self, + "df-vqueue-paused", + range, + stage, + f, + ), + Stage::Finished => scan_vqueue_inbox_stage::( + self, + "df-vqueue-finished", + range, + stage, + f, + ), + } + } +} + // ## Safety // The iterator is guaranteed to be dropped before the database is dropped, we hold to the // PartitionDb in this struct for as long as the iterator is alive. diff --git a/crates/storage-api/src/vqueue_table/entry.rs b/crates/storage-api/src/vqueue_table/entry.rs index 4e7990f62f..e2b40475fb 100644 --- a/crates/storage-api/src/vqueue_table/entry.rs +++ b/crates/storage-api/src/vqueue_table/entry.rs @@ -205,7 +205,7 @@ impl<'a> From<&'a EntryMetadata> for EntryMetadataRef<'a> { pub struct EntryMetadata { // todo: This is temporary placeholder, type and name _will_ change. #[bilrost(tag(1))] - deployment: Option, + pub deployment: Option, } #[cfg(test)] diff --git a/crates/storage-api/src/vqueue_table/tables.rs b/crates/storage-api/src/vqueue_table/tables.rs index 87142d2bfb..4984c07164 100644 --- a/crates/storage-api/src/vqueue_table/tables.rs +++ b/crates/storage-api/src/vqueue_table/tables.rs @@ -221,3 +221,20 @@ pub trait ScanVQueueMetaTable { f: F, ) -> Result> + Send>; } + +pub trait ScanVQueueInboxStages { + /// Used for data-fusion queries + fn for_each_vqueue_inbox_entry< + F: for<'a> FnMut( + (&'a VQueueId, Stage, &'a EntryKey, &'a EntryValue), + ) -> std::ops::ControlFlow<()> + + Send + + Sync + + 'static, + >( + &self, + range: KeyRange, + stage: Stage, + f: F, + ) -> Result> + Send>; +} diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 162a9fff55..a85b830f30 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -282,6 +282,12 @@ where self.partition_store_manager.clone(), &self.remote_scanner_manager, )?; + crate::vqueues::register_self( + ctx, + self.partition_selector.clone(), + self.partition_store_manager.clone(), + &self.remote_scanner_manager, + )?; ctx.datafusion_context.sql(SYS_INVOCATION_VIEW).await?; diff --git a/crates/storage-query-datafusion/src/filter.rs b/crates/storage-query-datafusion/src/filter.rs index eb2b740172..fe65313c7e 100644 --- a/crates/storage-query-datafusion/src/filter.rs +++ b/crates/storage-query-datafusion/src/filter.rs @@ -22,6 +22,7 @@ use datafusion::physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion::physical_plan::PhysicalExpr; use datafusion::physical_plan::expressions::{BinaryExpr, Column, InListExpr, Literal}; +use restate_storage_api::vqueue_table::Stage; use restate_types::PartitionedResourceId; use restate_types::identifiers::partitioner::HashPartitioner; use restate_types::identifiers::{ @@ -282,6 +283,78 @@ fn extract_column_literal<'a>( Some((col, lit)) } +#[derive(Debug, Clone)] +pub struct VQueueFilter { + pub partition_keys: KeyRange, + pub stages: Option>, +} + +impl ScanLocalPartitionFilter for VQueueFilter { + fn new(range: KeyRange, predicate: Option>) -> Self { + let mut stages: Option> = None; + + if let Some(predicate) = predicate + && let Ok(predicate) = snapshot_physical_expr(predicate) + { + for conjunct in split_conjunction(&predicate) { + let Some(conjunct_stages) = parse_vqueue_stages("stage", conjunct) else { + continue; + }; + + stages = Some(match stages { + Some(current) => current.intersection(&conjunct_stages).copied().collect(), + None => conjunct_stages, + }); + } + } + + Self { + partition_keys: range, + stages, + } + } +} + +fn parse_vqueue_stages( + column_name: &str, + predicate: &Arc, +) -> Option> { + // OR-chain recursion depth; enough for `stage = a OR stage = b OR ...` over all stages. + let in_list = InList::parse(predicate, 5)?; + + if in_list.col.name() != column_name || in_list.negated { + return None; + } + + let mut stages = BTreeSet::new(); + for literal in in_list.list { + let Some(Some(stage_str)) = literal.try_as_str() else { + continue; + }; + + if let Some(stage) = parse_stage_literal(stage_str) { + stages.insert(stage); + } + } + + if stages.is_empty() { + None + } else { + Some(stages) + } +} + +fn parse_stage_literal(value: &str) -> Option { + match value.to_ascii_lowercase().as_str() { + "inbox" => Some(Stage::Inbox), + "run" | "running" => Some(Stage::Running), + "suspended" => Some(Stage::Suspended), + "paused" => Some(Stage::Paused), + "finished" => Some(Stage::Finished), + _ => None, + } +} + #[derive(Debug, Clone)] pub struct InvocationIdFilter { pub partition_keys: KeyRange, @@ -347,12 +420,13 @@ mod tests { use datafusion::physical_plan::PhysicalExpr; use datafusion::physical_plan::expressions::{BinaryExpr, Column, InListExpr, Literal}; + use restate_storage_api::vqueue_table::Stage; use restate_types::identifiers::{InvocationId, ServiceId, StateMutationId, WithPartitionKey}; use restate_types::invocation::{InvocationTarget, VirtualObjectHandlerType}; use restate_types::sharding::KeyRange; use crate::filter::{ - FirstMatchingPartitionKeyExtractor, InvocationIdFilter, PartitionKeyExtractor, + FirstMatchingPartitionKeyExtractor, InvocationIdFilter, PartitionKeyExtractor, VQueueFilter, }; use crate::partition_store_scanner::ScanLocalPartitionFilter; @@ -716,4 +790,80 @@ mod tests { let filter = InvocationIdFilter::new(FULL_RANGE, Some(predicate)); assert!(filter.invocation_ids.is_none()); } + + #[test] + fn vqueue_filter_single_stage_eq() { + let predicate = eq(col("stage"), utf8_lit("running")); + + let filter = VQueueFilter::new(FULL_RANGE, Some(predicate)); + assert_eq!( + filter.stages, + Some(std::collections::BTreeSet::from([Stage::Running])) + ); + } + + #[test] + fn vqueue_filter_in_list() { + let predicate = in_list("stage", vec![utf8_lit("running"), utf8_lit("paused")]); + + let filter = VQueueFilter::new(FULL_RANGE, Some(predicate)); + assert_eq!( + filter.stages, + Some(std::collections::BTreeSet::from([ + Stage::Running, + Stage::Paused, + ])) + ); + } + + #[test] + fn vqueue_filter_or_expression() { + let predicate = or( + eq(col("stage"), utf8_lit("finished")), + eq(col("stage"), utf8_lit("inbox")), + ); + + let filter = VQueueFilter::new(FULL_RANGE, Some(predicate)); + assert_eq!( + filter.stages, + Some(std::collections::BTreeSet::from([ + Stage::Finished, + Stage::Inbox, + ])) + ); + } + + #[test] + fn vqueue_filter_conjunction_intersection() { + let predicate = and( + in_list( + "stage", + vec![utf8_lit("running"), utf8_lit("paused"), utf8_lit("inbox")], + ), + eq(col("stage"), utf8_lit("paused")), + ); + + let filter = VQueueFilter::new(FULL_RANGE, Some(predicate)); + assert_eq!( + filter.stages, + Some(std::collections::BTreeSet::from([Stage::Paused])) + ); + } + + #[test] + fn vqueue_filter_invalid_stage_falls_back() { + let predicate = eq(col("stage"), utf8_lit("not-a-stage")); + + let filter = VQueueFilter::new(FULL_RANGE, Some(predicate)); + assert!(filter.stages.is_none()); + assert_eq!(filter.partition_keys, FULL_RANGE); + } + + #[test] + fn vqueue_filter_no_predicate() { + let filter = VQueueFilter::new(FULL_RANGE, None); + + assert!(filter.stages.is_none()); + assert_eq!(filter.partition_keys, FULL_RANGE); + } } diff --git a/crates/storage-query-datafusion/src/lib.rs b/crates/storage-query-datafusion/src/lib.rs index 139185dd1f..cd0cd69124 100644 --- a/crates/storage-query-datafusion/src/lib.rs +++ b/crates/storage-query-datafusion/src/lib.rs @@ -41,6 +41,7 @@ mod table_macro; mod table_providers; mod user_limits; mod vqueue_meta; +mod vqueues; pub use table_providers::Scan; pub mod table_util; diff --git a/crates/storage-query-datafusion/src/table_docs.rs b/crates/storage-query-datafusion/src/table_docs.rs index 12c68c198d..7468e484b6 100644 --- a/crates/storage-query-datafusion/src/table_docs.rs +++ b/crates/storage-query-datafusion/src/table_docs.rs @@ -10,7 +10,7 @@ use crate::{ deployment, inbox, invocation_state, invocation_status, journal, journal_events, - keyed_service_status, promise, scheduler_status, service, state, vqueue_meta, + keyed_service_status, promise, scheduler_status, service, state, vqueue_meta, vqueues, }; use std::borrow::Cow; @@ -28,6 +28,7 @@ pub const ALL_TABLE_DOCS: &[StaticTableDocs] = &[ service::schema::TABLE_DOCS, state::schema::TABLE_DOCS, vqueue_meta::schema::TABLE_DOCS, + vqueues::schema::TABLE_DOCS, ]; pub trait TableDocs { diff --git a/crates/storage-query-datafusion/src/vqueues/mod.rs b/crates/storage-query-datafusion/src/vqueues/mod.rs new file mode 100644 index 0000000000..9e16f0e6d3 --- /dev/null +++ b/crates/storage-query-datafusion/src/vqueues/mod.rs @@ -0,0 +1,18 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod row; +pub(crate) mod schema; +mod table; + +pub(crate) use table::register_self; + +#[cfg(test)] +mod tests; diff --git a/crates/storage-query-datafusion/src/vqueues/row.rs b/crates/storage-query-datafusion/src/vqueues/row.rs new file mode 100644 index 0000000000..8b357e9295 --- /dev/null +++ b/crates/storage-query-datafusion/src/vqueues/row.rs @@ -0,0 +1,101 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_storage_api::vqueue_table::{EntryKey, EntryKind, EntryValue, Stage}; +use restate_types::vqueues::VQueueId; + +use super::schema::SysVqueuesBuilder; + +#[inline] +pub(crate) fn append_vqueues_row<'a>( + builder: &mut SysVqueuesBuilder, + qid: &'a VQueueId, + stage: Stage, + entry_key: &'a EntryKey, + entry: &'a EntryValue, +) { + let mut row = builder.row(); + + row.partition_key(qid.partition_key()); + if row.is_id_defined() { + row.fmt_id(qid); + } + if row.is_stage_defined() { + row.fmt_stage(stage); + } + if row.is_status_defined() { + row.fmt_status(entry.status); + } + + if row.is_has_lock_defined() { + row.has_lock(entry_key.has_lock()); + } + if matches!(stage, Stage::Inbox) && row.is_run_at_defined() { + row.run_at(entry_key.run_at().as_unix_millis().as_u64() as i64); + } + if row.is_sequence_number_defined() { + row.sequence_number(entry_key.seq().as_u64()); + } + + if row.is_entry_id_defined() { + row.fmt_entry_id(entry_key.entry_id().display(qid.partition_key())); + } + + if row.is_entry_kind_defined() { + row.entry_kind(match entry_key.kind() { + EntryKind::Invocation => "invocation", + EntryKind::StateMutation => "state-mutation", + EntryKind::Unknown => "unknown", + }); + } + + if row.is_created_at_defined() { + row.created_at(entry.stats.created_at.to_unix_millis().as_u64() as i64); + } + + if row.is_transitioned_at_defined() { + row.transitioned_at(entry.stats.transitioned_at.to_unix_millis().as_u64() as i64); + } + + if row.is_num_attempts_defined() { + row.num_attempts(entry.stats.num_attempts); + } + if row.is_num_pauses_defined() { + row.num_pauses(entry.stats.num_paused); + } + if row.is_num_suspensions_defined() { + row.num_suspensions(entry.stats.num_suspensions); + } + if row.is_num_yields_defined() { + row.num_yields(entry.stats.num_yields); + } + + if row.is_first_attempt_at_defined() + && let Some(first_attempt_at) = entry.stats.first_attempt_at + { + row.first_attempt_at(first_attempt_at.to_unix_millis().as_u64() as i64); + } + + if row.is_latest_attempt_at_defined() + && let Some(latest_attempt_at) = entry.stats.latest_attempt_at + { + row.latest_attempt_at(latest_attempt_at.to_unix_millis().as_u64() as i64); + } + + if row.is_first_runnable_at_defined() { + row.first_runnable_at(entry.stats.first_runnable_at.as_u64() as i64); + } + + if row.is_deployment_defined() + && let Some(deployment) = &entry.metadata.deployment + { + row.fmt_deployment(deployment); + } +} diff --git a/crates/storage-query-datafusion/src/vqueues/schema.rs b/crates/storage-query-datafusion/src/vqueues/schema.rs new file mode 100644 index 0000000000..2bb8452c5b --- /dev/null +++ b/crates/storage-query-datafusion/src/vqueues/schema.rs @@ -0,0 +1,79 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::table_macro::*; + +use datafusion::arrow::datatypes::DataType; + +// Stage scans run concurrently and merge into a shared builder, so only +// `partition_key` is monotone within a single partition's output stream. +define_sort_order!(sys_vqueues(partition_key)); + +define_table!(sys_vqueues( + /// Internal column that is used for partitioning. Can be ignored. + partition_key: DataType::UInt64, + + /// The VQueue Identifier (vq_...). + id: DataType::Utf8, + + /// The stage this entry currently belongs to. Choices are 'inbox', 'running', 'paused', + /// 'suspended', and 'finished'. + stage: DataType::Utf8, + + /// The entry processing status. Examples are `new`, `scheduled`, `running`, + /// `backing_off`, `yielded`, `suspended`, `failed`, and `succeeded`. + status: DataType::Utf8, + + /// Whether this entry currently holds a lock. + has_lock: DataType::Boolean, + + /// The entry will be eligible to run after this timestamp. Only present for entries + /// that are in the waiting inbox. + run_at: TimestampMillisecond, + + /// Sequence number encoded in the queue ordering key. + sequence_number: DataType::UInt64, + + /// Identifier of the entry. + entry_id: DataType::Utf8, + + /// Entry kind (`invocation` or `state-mutation`). + entry_kind: DataType::Utf8, + + /// Creation timestamp of the entry. + created_at: TimestampMillisecond, + + /// Timestamp of the latest stage transition. + transitioned_at: TimestampMillisecond, + + /// Number of times this entry has been moved to the run queue. + num_attempts: DataType::UInt32, + + /// Number of times this entry has been moved to the paused stage. + num_pauses: DataType::UInt32, + + /// Number of times this entry has been moved to the suspended stage. + num_suspensions: DataType::UInt32, + + /// Number of times this entry has yielded execution. + num_yields: DataType::UInt32, + + /// Timestamp of the first attempt to run this entry. + first_attempt_at: TimestampMillisecond, + + /// Timestamp of the latest attempt to run this entry. + latest_attempt_at: TimestampMillisecond, + + /// The realistic earliest time at which this entry can run its first attempt. + first_runnable_at: TimestampMillisecond, + + /// If set, the entry's pinned deployment identifier. + deployment: DataType::Utf8, +)); diff --git a/crates/storage-query-datafusion/src/vqueues/table.rs b/crates/storage-query-datafusion/src/vqueues/table.rs new file mode 100644 index 0000000000..a23177a1d4 --- /dev/null +++ b/crates/storage-query-datafusion/src/vqueues/table.rs @@ -0,0 +1,159 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::fmt::Debug; +use std::ops::ControlFlow; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +use futures::future::try_join_all; +use parking_lot::Mutex; + +use restate_partition_store::{PartitionStore, PartitionStoreManager}; +use restate_storage_api::StorageError; +use restate_storage_api::vqueue_table::{EntryKey, EntryValue, ScanVQueueInboxStages, Stage}; +use restate_types::identifiers::StateMutationId; +use restate_types::vqueues::VQueueId; + +use crate::context::{QueryContext, SelectPartitions}; +use crate::filter::{FirstMatchingPartitionKeyExtractor, VQueueFilter}; +use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition}; +use crate::remote_query_scanner_manager::RemoteScannerManager; +use crate::table_providers::{PartitionedTableProvider, ScanPartition}; +use crate::vqueues::row::append_vqueues_row; +use crate::vqueues::schema::{SysVqueuesBuilder, sys_vqueues_sort_order}; + +const NAME: &str = "sys_vqueues"; + +pub(crate) fn register_self( + ctx: &QueryContext, + partition_selector: impl SelectPartitions, + partition_store_manager: Arc, + remote_scanner_manager: &RemoteScannerManager, +) -> datafusion::common::Result<()> { + let local_scanner = Arc::new(LocalPartitionsScanner::new( + partition_store_manager, + VQueuesScanner, + )) as Arc; + + let table = PartitionedTableProvider::new( + partition_selector, + SysVqueuesBuilder::schema(), + sys_vqueues_sort_order(), + remote_scanner_manager.create_distributed_scanner(NAME, local_scanner), + FirstMatchingPartitionKeyExtractor::default() + .with_partitioned_resource_id::("id") + // entry_id can be an invocation id or state mutation id + .with_invocation_id("entry_id") + .with_partitioned_resource_id::("entry_id"), + ); + + ctx.register_partitioned_table(NAME, Arc::new(table)) +} + +#[derive(Debug, Clone)] +struct VQueuesScanner; + +const SCAN_STAGE_ORDER: [Stage; 5] = [ + Stage::Paused, + Stage::Suspended, + Stage::Running, + Stage::Inbox, + Stage::Finished, +]; + +/// Bound on in-flight stage scans per partition scan. Keeps RocksDB iterator +/// pressure and the shared builder's `Mutex` contention in check while still +/// overlapping I/O across stages. +const MAX_CONCURRENT_STAGE_SCANS: usize = 2; + +impl ScanLocalPartition for VQueuesScanner { + type Builder = SysVqueuesBuilder; + type Item<'a> = (&'a VQueueId, Stage, &'a EntryKey, &'a EntryValue); + type ConversionError = std::convert::Infallible; + type Filter = VQueueFilter; + + fn for_each_row< + F: for<'a> FnMut(Self::Item<'a>) -> ControlFlow> + + Send + + Sync + + 'static, + >( + partition_store: &PartitionStore, + filter: VQueueFilter, + f: F, + ) -> Result> + Send, StorageError> { + let range = filter.partition_keys; + let stages: Vec<_> = if let Some(requested_stages) = filter.stages { + SCAN_STAGE_ORDER + .into_iter() + .filter(|stage| requested_stages.contains(stage)) + .collect() + } else { + SCAN_STAGE_ORDER.to_vec() + }; + + let callback = Arc::new(Mutex::new(f)); + let should_stop = Arc::new(AtomicBool::new(false)); + + Ok(async move { + let scan_result = async { + for stage_batch in stages.chunks(MAX_CONCURRENT_STAGE_SCANS) { + if should_stop.load(Ordering::Relaxed) { + break; + } + + let mut scans = Vec::with_capacity(stage_batch.len()); + for stage in stage_batch { + let callback = callback.clone(); + let should_stop = should_stop.clone(); + + scans.push(partition_store.for_each_vqueue_inbox_entry( + range, + *stage, + move |item| { + if should_stop.load(Ordering::Relaxed) { + return ControlFlow::Break(()); + } + + let mut callback = callback.lock(); + let control_flow = (*callback)(item); + if control_flow.is_break() { + should_stop.store(true, Ordering::Relaxed); + } + control_flow.map_break(Result::unwrap) + }, + )?); + } + + try_join_all(scans).await?; + } + + Ok(()) + } + .await; + + // The callback can own types with blocking cleanup logic (e.g. BatchSender). + // Drop it off the async runtime worker thread. + let _ = std::thread::spawn(move || drop(callback)).join(); + + scan_result + }) + } + + fn append_row<'a>( + row_builder: &mut Self::Builder, + value: Self::Item<'a>, + ) -> Result<(), Self::ConversionError> { + let (qid, stage, entry_key, entry) = value; + append_vqueues_row(row_builder, qid, stage, entry_key, entry); + Ok(()) + } +} diff --git a/crates/storage-query-datafusion/src/vqueues/tests.rs b/crates/storage-query-datafusion/src/vqueues/tests.rs new file mode 100644 index 0000000000..4192937860 --- /dev/null +++ b/crates/storage-query-datafusion/src/vqueues/tests.rs @@ -0,0 +1,192 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::mocks::*; +use crate::row; + +use datafusion::arrow::array::{StringArray, TimestampMillisecondArray, UInt32Array}; +use datafusion::arrow::record_batch::RecordBatch; +use futures::StreamExt; +use googletest::all; +use googletest::prelude::{assert_that, eq}; + +use restate_storage_api::Transaction; +use restate_storage_api::vqueue_table::{ + EntryId, EntryKey, EntryKind, EntryMetadata, EntryValue, Stage, Status, WriteVQueueTable, + stats::EntryStatistics, +}; +use restate_types::clock::UniqueTimestamp; +use restate_types::time::MillisSinceEpoch; +use restate_types::vqueues::VQueueId; + +#[restate_core::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_vqueue_entry_value_fields() { + let mut engine = MockQueryEngine::create().await; + + let qid = VQueueId::custom(1337, "df-vqueue"); + let key = EntryKey::new( + true, + MillisSinceEpoch::new(1_744_000_001_000), + 7, + EntryId::new(EntryKind::Invocation, [7; 16]), + ); + + let created_at = + UniqueTimestamp::try_from_unix_millis(MillisSinceEpoch::new(1_744_000_000_100)).unwrap(); + let transitioned_at = + UniqueTimestamp::try_from_unix_millis(MillisSinceEpoch::new(1_744_000_000_300)).unwrap(); + let first_attempt_at = + UniqueTimestamp::try_from_unix_millis(MillisSinceEpoch::new(1_744_000_000_400)).unwrap(); + let latest_attempt_at = + UniqueTimestamp::try_from_unix_millis(MillisSinceEpoch::new(1_744_000_000_500)).unwrap(); + + let mut stats = EntryStatistics::new(created_at, key.run_at().to_owned()); + stats.transitioned_at = transitioned_at; + stats.num_attempts = 3; + stats.num_paused = 2; + stats.num_suspensions = 4; + stats.num_yields = 5; + stats.first_attempt_at = Some(first_attempt_at); + stats.latest_attempt_at = Some(latest_attempt_at); + + let value = EntryValue { + status: Status::BackingOff, + metadata: EntryMetadata { + deployment: Some("dp_123".to_string()), + }, + stats, + }; + + // This row should be returned. + let mut tx = engine.partition_store().transaction(); + tx.put_vqueue_inbox(&qid, Stage::Inbox, &key, &value); + + // This row should only be returned when stage filtering selects it. + tx.put_vqueue_inbox(&qid, Stage::Running, &key, &value); + tx.commit().await.unwrap(); + + let records = engine + .execute( + "SELECT stage, status, num_attempts, num_pauses, num_suspensions, num_yields, \ + created_at, transitioned_at, first_attempt_at, latest_attempt_at, first_runnable_at, \ + run_at, deployment FROM sys_vqueues WHERE stage = 'inbox' ORDER BY sequence_number", + ) + .await + .unwrap() + .stream + .collect::>>() + .await + .remove(0) + .unwrap(); + + assert_eq!(records.num_rows(), 1); + + assert_that!( + records, + all!(row!( + 0, + { + "stage" => StringArray: eq("inbox"), + "status" => StringArray: eq("backing_off"), + "num_attempts" => UInt32Array: eq(3), + "num_pauses" => UInt32Array: eq(2), + "num_suspensions" => UInt32Array: eq(4), + "num_yields" => UInt32Array: eq(5), + "created_at" => TimestampMillisecondArray: eq(created_at.to_unix_millis().as_u64() as i64), + "transitioned_at" => TimestampMillisecondArray: eq(transitioned_at.to_unix_millis().as_u64() as i64), + "first_attempt_at" => TimestampMillisecondArray: eq(first_attempt_at.to_unix_millis().as_u64() as i64), + "latest_attempt_at" => TimestampMillisecondArray: eq(latest_attempt_at.to_unix_millis().as_u64() as i64), + "first_runnable_at" => TimestampMillisecondArray: eq(value.stats.first_runnable_at.as_u64() as i64), + "run_at" => TimestampMillisecondArray: eq(key.run_at().as_unix_millis().as_u64() as i64), + "deployment" => StringArray: eq("dp_123"), + } + )) + ); +} + +#[restate_core::test(flavor = "multi_thread", worker_threads = 2)] +async fn vqueue_stage_filter_and_unfiltered_scan() { + let mut engine = MockQueryEngine::create().await; + + let qid = VQueueId::custom(2337, "df-vqueue-stages"); + + let mut tx = engine.partition_store().transaction(); + let stages = [ + Stage::Inbox, + Stage::Running, + Stage::Suspended, + Stage::Paused, + Stage::Finished, + ]; + for (index, stage) in stages.into_iter().enumerate() { + let key = EntryKey::new( + false, + MillisSinceEpoch::new(1_744_001_000_000 + index as u64), + (index + 1) as u64, + EntryId::new(EntryKind::Invocation, [index as u8 + 1; 16]), + ); + let value = EntryValue { + status: Status::Started, + metadata: EntryMetadata::default(), + stats: EntryStatistics::new( + UniqueTimestamp::try_from_unix_millis(MillisSinceEpoch::new( + 1_744_001_000_100 + index as u64, + )) + .unwrap(), + key.run_at().to_owned(), + ), + }; + tx.put_vqueue_inbox(&qid, stage, &key, &value); + } + tx.commit().await.unwrap(); + + let all_stages = engine + .execute("SELECT stage FROM sys_vqueues ORDER BY stage") + .await + .unwrap() + .stream + .collect::>>() + .await + .remove(0) + .unwrap(); + + assert_eq!(all_stages.num_rows(), 5); + assert_that!( + all_stages, + all!( + row!(0, { "stage" => StringArray: eq("finished") }), + row!(1, { "stage" => StringArray: eq("inbox") }), + row!(2, { "stage" => StringArray: eq("paused") }), + row!(3, { "stage" => StringArray: eq("running") }), + row!(4, { "stage" => StringArray: eq("suspended") }) + ) + ); + + let filtered_stages = engine + .execute( + "SELECT stage FROM sys_vqueues WHERE stage IN ('running', 'paused') ORDER BY stage", + ) + .await + .unwrap() + .stream + .collect::>>() + .await + .remove(0) + .unwrap(); + + assert_eq!(filtered_stages.num_rows(), 2); + assert_that!( + filtered_stages, + all!( + row!(0, { "stage" => StringArray: eq("paused") }), + row!(1, { "stage" => StringArray: eq("running") }) + ) + ); +}