[VQueues] Introduce sys_vqueues datafusion table#4635
[VQueues] Introduce sys_vqueues datafusion table#4635AhmedSoliman wants to merge 1 commit intomainfrom
Conversation
3e88b62 to
00d5a02
Compare
504675c to
50068d1
Compare
2e7ade5 to
50068d1
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Great work @AhmedSoliman. Being able to introspect all this information using SQL is awesome :-) LGTM. +1 for merging.
| } | ||
| } | ||
|
|
||
| fn scan_vqueue_inbox_stage<'store, K, F>( |
There was a problem hiding this comment.
I tripped over the fact that we have different inbox stages where one stage is called inbox.
| /// The entry processing status. Examples are `new`, `scheduled`, `running`, | ||
| /// `backing_off`, `yielded`, `suspended`, `failed`, and `succeeded`. |
There was a problem hiding this comment.
I think this list is not up to date anymore.
| has_lock: DataType::Boolean, | ||
|
|
||
| /// The entry will be eligible to run after this timestamp. Only present for entries | ||
| /// that are in the waiting inbox. |
There was a problem hiding this comment.
The waiting inbox is Stage::Inbox, right? Should we unify the names?
|
|
||
| // The callback can own types with blocking cleanup logic (e.g. BatchSender). | ||
| // Drop it off the async runtime worker thread. | ||
| let _ = std::thread::spawn(move || drop(callback)).join(); |
There was a problem hiding this comment.
Wouldn't the join block the worker thread similarly compared to running drop(callback) in-line?
| // entry_id can be an invocation id or state mutation id | ||
| .with_invocation_id("entry_id") | ||
| .with_partitioned_resource_id::<StateMutationId>("entry_id"), |
There was a problem hiding this comment.
If entry_id can contain an inv_ as well as sm_ id, then both extractors should probably not fail if parsing of the value fails. Otherwise we won't try the other extractor.
| predicate: &Arc<dyn PhysicalExpr>, | ||
| ) -> Option<BTreeSet<Stage>> { | ||
| // OR-chain recursion depth; enough for `stage = a OR stage = b OR ...` over all stages. | ||
| let in_list = InList::parse(predicate, 5)?; |
There was a problem hiding this comment.
The depth-limit needs to be 6 to be able to disjunct all available Stages. Maybe we put Stage::COUNT + 1 after adding Strum::EnumCount?
Expose vqueue entries as a single DataFusion table with stage-aware scanning. When the query filters `stage`, only matching stage key kinds are scanned; without a stage filter, all inbox stages are scanned and merged. Also project the latest entry metadata for observability (status plus EntryStatistics timestamps and counters), and add targeted tests for stage predicate extraction and sys_vqueues stage filtering behavior.
Expose vqueue entries as a single DataFusion table with stage-aware scanning.
When the query filters
stage, only matching stage key kinds are scanned;without a stage filter, all inbox stages are scanned and merged.
Also project the latest entry metadata for observability (status plus
EntryStatistics timestamps and counters), and add targeted tests for stage
predicate extraction and sys_vqueues stage filtering behavior.
Stack created with Sapling. Best reviewed with ReviewStack.