Skip to content

[VQueues] Introduce sys_vqueues datafusion table#4635

Open
AhmedSoliman wants to merge 1 commit intomainfrom
pr4635
Open

[VQueues] Introduce sys_vqueues datafusion table#4635
AhmedSoliman wants to merge 1 commit intomainfrom
pr4635

Conversation

@AhmedSoliman
Copy link
Copy Markdown
Contributor

@AhmedSoliman AhmedSoliman commented Apr 22, 2026

Expose vqueue entries as a single DataFusion table with stage-aware scanning.
When the query filters stage, only matching stage key kinds are scanned;
without a stage filter, all inbox stages are scanned and merged.

Also project the latest entry metadata for observability (status plus
EntryStatistics timestamps and counters), and add targeted tests for stage
predicate extraction and sys_vqueues stage filtering behavior.


Stack created with Sapling. Best reviewed with ReviewStack.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 22, 2026

Test Results

  8 files  ±0    8 suites  ±0   2m 24s ⏱️ +22s
 50 tests ±0   50 ✅ ±0  0 💤 ±0  0 ❌ ±0 
218 runs  ±0  218 ✅ ±0  0 💤 ±0  0 ❌ ±0 

Results for commit 98cf860. ± Comparison against base commit 1286c07.

♻️ This comment has been updated with latest results.

@AhmedSoliman AhmedSoliman force-pushed the pr4635 branch 2 times, most recently from 3e88b62 to 00d5a02 Compare April 24, 2026 12:15
@AhmedSoliman AhmedSoliman force-pushed the pr4635 branch 2 times, most recently from 504675c to 50068d1 Compare April 28, 2026 08:12
@AhmedSoliman AhmedSoliman force-pushed the pr4635 branch 2 times, most recently from 2e7ade5 to 50068d1 Compare April 28, 2026 12:29
@AhmedSoliman AhmedSoliman removed the request for review from tillrohrmann April 28, 2026 13:02
Copy link
Copy Markdown
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @AhmedSoliman. Being able to introspect all this information using SQL is awesome :-) LGTM. +1 for merging.

}
}

fn scan_vqueue_inbox_stage<'store, K, F>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tripped over the fact that we have different inbox stages where one stage is called inbox.

Comment on lines +30 to +31
/// The entry processing status. Examples are `new`, `scheduled`, `running`,
/// `backing_off`, `yielded`, `suspended`, `failed`, and `succeeded`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this list is not up to date anymore.

has_lock: DataType::Boolean,

/// The entry will be eligible to run after this timestamp. Only present for entries
/// that are in the waiting inbox.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The waiting inbox is Stage::Inbox, right? Should we unify the names?


// The callback can own types with blocking cleanup logic (e.g. BatchSender).
// Drop it off the async runtime worker thread.
let _ = std::thread::spawn(move || drop(callback)).join();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the join block the worker thread similarly compared to running drop(callback) in-line?

Comment on lines +53 to +55
// entry_id can be an invocation id or state mutation id
.with_invocation_id("entry_id")
.with_partitioned_resource_id::<StateMutationId>("entry_id"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If entry_id can contain an inv_ as well as sm_ id, then both extractors should probably not fail if parsing of the value fails. Otherwise we won't try the other extractor.

predicate: &Arc<dyn PhysicalExpr>,
) -> Option<BTreeSet<Stage>> {
// OR-chain recursion depth; enough for `stage = a OR stage = b OR ...` over all stages.
let in_list = InList::parse(predicate, 5)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The depth-limit needs to be 6 to be able to disjunct all available Stages. Maybe we put Stage::COUNT + 1 after adding Strum::EnumCount?

Expose vqueue entries as a single DataFusion table with stage-aware scanning.
When the query filters `stage`, only matching stage key kinds are scanned;
without a stage filter, all inbox stages are scanned and merged.

Also project the latest entry metadata for observability (status plus
EntryStatistics timestamps and counters), and add targeted tests for stage
predicate extraction and sys_vqueues stage filtering behavior.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants