Switch to new data passing API#109
Conversation
doesn't touch DefaultTableClient yet
method is unimplemented! in default client for now
bit ugly on the expression stuff atm
ryan-johnson-databricks
left a comment
There was a problem hiding this comment.
Nice! We're sooo close to banishing the last bits of arrow from core kernel code. Just that one spot in data skipping that messes with selection vectors.
(bunch of unresolved comments from before, plus several new ones)
data-skipping still uses arrow for now, we can change that after #83 merges
It merged.
| if metadata.is_none() { | ||
| return Ok(Box::new(std::iter::empty())); | ||
| pub fn schema(&self) -> DeltaResult<StructType> { | ||
| Ok(serde_json::from_str(&self.schema_string)?) |
There was a problem hiding this comment.
Do we eventually need to use the engine's json parser for this?
There was a problem hiding this comment.
Well, it's internal so we don't have to I don't think
There was a problem hiding this comment.
Internal and a single value that won't hog MB/GB memory. Fair.
There was a problem hiding this comment.
I do remember we discussed some related topic. In general the main issue here I think is that we cannot tell the engine what schema we are expecting which right now is required. IIRC the question was mainly trying to avoid serde_json as a dependency, personally I would vote for accepting as a dependency and doing it as implemented here.
Another place whas trying to parse the _last_cehckpoint file, which may also contain the schema of the checkpoint files - I think. At that time we would also have no way of telling eninge the schema of the data we want. In this case though I belive that using that schema was not eneven doen internally at databricks?
| let schema = ArrowSchema::new(vec![ArrowField::new("output", arrow_type, true)]); | ||
| RecordBatch::try_new(Arc::new(schema), vec![array_ref])? |
There was a problem hiding this comment.
Out of curiosity -- do you know what code currently triggers this non-struct path?
There was a problem hiding this comment.
You can tell from DataSkippingFilter::new. For instance the STATS_EXPR is a column expression, so it'll eval to just an Array, so the select_stats_evaluator will trigger this path (I think, haven't tested exactly what does but I do know the unit tests hit both paths)
|
|
||
| impl EngineInterface for SimpleClient { | ||
| fn get_expression_handler(&self) -> Arc<dyn ExpressionHandler> { | ||
| unimplemented!(); |
There was a problem hiding this comment.
This just needs us to factor out the expression handler that the default client uses?
Or is there some bigger blocker?
There was a problem hiding this comment.
Nope, that's all it would take yeah. Just would prefer to do that as a follow-up
|
|
||
| pub(crate) fn treemap_to_bools(treemap: RoaringTreemap) -> Vec<bool> { | ||
| fn combine(high_bits: u32, low_bits: u32) -> usize { | ||
| ((u64::from(high_bits) << 32) | u64::from(low_bits)) as usize |
There was a problem hiding this comment.
Rescuing #109 (comment) from github oblivion...
this would silently lose information if sizeof(usize) < sizeof(u64); we should add a static assertion to make it fail at compile time instead.
(even if we move it from kernel to engine, the same issue would remain in our default clients)
There was a problem hiding this comment.
Yep, I've filed #124 since we'd need to add a dependency
| /// Any type that an engine wants to return as "data" needs to implement this trait. The bulk of the | ||
| /// work is in the [`EngineData::extract`] method. See the docs for that method for more details. | ||
| /// ```rust | ||
| /// # use std::any::Any; |
There was a problem hiding this comment.
what does the # mean?
There was a problem hiding this comment.
There was a problem hiding this comment.
TIL, thanks!
| if metadata.is_none() { | ||
| return Ok(Box::new(std::iter::empty())); | ||
| pub fn schema(&self) -> DeltaResult<StructType> { | ||
| Ok(serde_json::from_str(&self.schema_string)?) |
There was a problem hiding this comment.
Internal and a single value that won't hog MB/GB memory. Fair.
| return Ok(Box::new(SimpleData::new(RecordBatch::new_empty( | ||
| output_schema, | ||
| )))); |
There was a problem hiding this comment.
Agree that's not better.
(I'm still building intuition for when type inference works vs. doesn't, thanks for your patience)
Conflicts are resolved now, correct? |
Yep, removed from the description. Just waiting for one more review now, I've pinged people |
zachschuermann
left a comment
There was a problem hiding this comment.
awesome stuff nick LGTM I just left a few random nits/questions/comments
| use crate::{DeltaResult, Error, FileSystemClient}; | ||
|
|
||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct DeletionVectorDescriptor { |
There was a problem hiding this comment.
do we want this to be pub? Could it just be pub(crate)?
There was a problem hiding this comment.
I think we do want actions to be pub, since we expect external systems might use them. at the very least, all of the action stuff is pub for now, so I'll leave it for a follow-up discussion :)
|
|
||
| use super::DeletionVectorDescriptor; | ||
|
|
||
| fn dv_relateive() -> DeletionVectorDescriptor { |
There was a problem hiding this comment.
nit
| fn dv_relateive() -> DeletionVectorDescriptor { | |
| fn dv_relative() -> DeletionVectorDescriptor { |
| Ok(Box::new(zipped.flatten().map(Action::Add))) | ||
| } | ||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct Add { |
There was a problem hiding this comment.
at risk of being repetitive: do we want actions to be pub? I was hoping we could hide more of the implementation details instead of exposing a large API with internal details like standalone did..
There was a problem hiding this comment.
oh yea just noticed below - Remove is pub(crate)
There was a problem hiding this comment.
Yeah, it's a good question which I'll leave for a later discussion :)
There was a problem hiding this comment.
maybe we could hide it behind the visibility crate as well?
I think right now there is a files list method exposed externally, but my hope was anyhow to make that in terms of data...
| handler.parse_json(json_strings, output_schema).unwrap() | ||
| impl Add { | ||
| /// Since we always want to parse multiple adds from data, we return a `Vec<Add>` | ||
| pub fn parse_from_data(data: &dyn EngineData) -> DeltaResult<Vec<Add>> { |
There was a problem hiding this comment.
awesome how these turned out :)
| .collect::<Vec<_>>(); | ||
| println!("{:?}", actions) | ||
| impl Remove { | ||
| // _try_new_from_data for now, to avoid warning, probably will need at some point |
There was a problem hiding this comment.
Yeah, I think we'll want this at some point, so I'm just going to leave it so it's less work then. If it hangs out for a while and we end up not needing it, I will remove it.
| } | ||
|
|
||
| #[derive(Default)] | ||
| struct AddRemoveVisitor { |
There was a problem hiding this comment.
is this just for visiting a mix of add/removes for replay?
There was a problem hiding this comment.
yep, it just handles visiting one or the other so we only have to traverse the data once.
| } | ||
|
|
||
| #[test] | ||
| #[test_log::test] |
There was a problem hiding this comment.
should we have many other tests using test_log?
There was a problem hiding this comment.
we probably should, this was just a really useful one for debugging so I left it in. i think we should actually just always have it, which is maybe just a matter of always importing it or something, but again, future work :)
| pub mod arrow_conversion; | ||
|
|
||
| #[cfg(feature = "simple-client")] | ||
| pub mod simple_client; |
There was a problem hiding this comment.
wow awesome example/implementation of a client
Co-authored-by: Zach Schuermann <zachary.zvs@gmail.com>
roeap
left a comment
There was a problem hiding this comment.
Very sorry I did not get to it last friday - dropping my pending comments in case someone is interested :).
| Err(err) => Either::Right(std::iter::once(Err(err))), | ||
| } | ||
| } | ||
| Err(err) => Either::Right(std::iter::once(Err(err))), |
There was a problem hiding this comment.
the main reason for this is that rust wants us to return exactly one specific kind of iterator for the impl Iterator<Item = DeltaResult<Add>>, but Vec::Iter and the Once iterator are different things. Eiher helps us to conveniently normalise this. Without collecting the all actions internally, I see no clear path to avoid this. This does not mean there is none though :)
| /// The time when this metadata action is created, in milliseconds since the Unix epoch | ||
| pub created_time: Option<i64>, | ||
| /// Configuration options for the metadata action | ||
| pub configuration: HashMap<String, Option<String>>, |
There was a problem hiding this comment.
This has been the situation as I found it when first joining with delta-rs, and I remember this bothering me a bunch of times, but I don't recall exactly what the reason was we could not get rid of it.
I'll try to remember, but if we can move to just String, this will allow he consuming code to avoid a bunch of None checks...
| if metadata.is_none() { | ||
| return Ok(Box::new(std::iter::empty())); | ||
| pub fn schema(&self) -> DeltaResult<StructType> { | ||
| Ok(serde_json::from_str(&self.schema_string)?) |
There was a problem hiding this comment.
I do remember we discussed some related topic. In general the main issue here I think is that we cannot tell the engine what schema we are expecting which right now is required. IIRC the question was mainly trying to avoid serde_json as a dependency, personally I would vote for accepting as a dependency and doing it as implemented here.
Another place whas trying to parse the _last_cehckpoint file, which may also contain the schema of the checkpoint files - I think. At that time we would also have no way of telling eninge the schema of the data we want. In this case though I belive that using that schema was not eneven doen internally at databricks?
| Ok(Box::new(zipped.flatten().map(Action::Add))) | ||
| } | ||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct Add { |
There was a problem hiding this comment.
maybe we could hide it behind the visibility crate as well?
I think right now there is a files list method exposed externally, but my hope was anyhow to make that in terms of data...
| let output_schema = Arc::new(log_schema().clone()); | ||
| handler.parse_json(json_strings, output_schema).unwrap() | ||
| impl Add { | ||
| /// Since we always want to parse multiple adds from data, we return a `Vec<Add>` |
There was a problem hiding this comment.
nit this feels more like a comment (//) then a docstring (///)?

WIP of the "big bang" switch to the new data passing API. Note this is still based on when I named things "EngineClient" and not "EngineInterface" so that term is used throughout. I will rebase to
mainand rename everything once we're agreed on the major pieces.This implements:
libto use itSimpleClientwhich is a simpleEngineClientthat uses arrow but isn't asyncscandata_skipping.rs#126Potential things still to do:
SimpleDatatoArrowDataand not in thesimple_clientmod since it can be used by both Default and Simple client