feat(arrow-avro): HeaderInfo to expose OCF header#9548
Conversation
Add HeaderInfo to expose OCF header information such as the writer schema and sync marker. Add read_header_info function to the reader module, and its async counterpart to the reader::async_reader module, to read the header from the file reader and return HeaderInfo. Add build_with_header method to async reader builder to enable reuse of the header with multiple readers.
HeaderInfo to expose OCF header
|
I have not added a method to the sync reader builder yet, because I have some doubts about the API. The implementation uses the sequential When some seeking capabilities are added to the sync reader to work with ranges, this may need to be revisited. |
|
@jecsand838 can you help review this PR? |
jecsand838
left a comment
There was a problem hiding this comment.
@mzabaluev LMGTM!
Just left a few comments regarding the doc comments and some duplicated code.
| let (header, header_len) = | ||
| read_header(&mut self.reader, self.file_size, self.header_size_hint).await?; | ||
| self.build_internal(&header, header_len) | ||
| } | ||
|
|
||
| /// Build the asynchronous Avro reader with the provided header. | ||
| /// | ||
| /// This allows initializing the reader with pre-parsed header information. | ||
| /// Note that this method is not async because it does not need to perform any I/O operations. | ||
| pub fn build_with_header( | ||
| self, | ||
| header_info: HeaderInfo, | ||
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | ||
| self.build_internal(header_info.header(), header_info.header_len()) | ||
| } | ||
|
|
||
| fn build_internal( | ||
| self, | ||
| header: &Header, | ||
| header_len: u64, | ||
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | ||
| let writer_schema = { | ||
| let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| { | ||
| AvroError::ParseError("No Avro schema present in file header".to_string()) |
There was a problem hiding this comment.
You maybe able to get rid of the duplicated writer_schema code by doing something like this:
| let (header, header_len) = | |
| read_header(&mut self.reader, self.file_size, self.header_size_hint).await?; | |
| self.build_internal(&header, header_len) | |
| } | |
| /// Build the asynchronous Avro reader with the provided header. | |
| /// | |
| /// This allows initializing the reader with pre-parsed header information. | |
| /// Note that this method is not async because it does not need to perform any I/O operations. | |
| pub fn build_with_header( | |
| self, | |
| header_info: HeaderInfo, | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| self.build_internal(header_info.header(), header_info.header_len()) | |
| } | |
| fn build_internal( | |
| self, | |
| header: &Header, | |
| header_len: u64, | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| let writer_schema = { | |
| let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| { | |
| AvroError::ParseError("No Avro schema present in file header".to_string()) | |
| let header_info = | |
| read_header_info(&mut self.reader, self.file_size, self.header_size_hint).await?; | |
| self.build_internal(&header_info) | |
| } | |
| /// Build the asynchronous Avro reader with the provided header. | |
| /// | |
| /// This allows initializing the reader with pre-parsed header information. | |
| /// Note that this method is not async because it does not need to perform any I/O operations. | |
| pub fn build_with_header( | |
| self, | |
| header_info: HeaderInfo, | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| self.build_internal(&header_info) | |
| } | |
| fn build_internal( | |
| self, | |
| header: &HeaderInfo, | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| let writer_schema = header.writer_schema()?; |
I know it adds the Arc allocation in HeaderInfo::new when the caller doesn't need to share the header, but that seems like a negligible cost for a once-per-file operation. Otherwise you could abstract the writer_schema() logic onto Header for re-use.
There was a problem hiding this comment.
OK, this will simplify things for sure.
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | ||
| self.build_internal(header_info.header(), header_info.header_len()) |
There was a problem hiding this comment.
Also, do you think we should add the empty file check here as well?
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| self.build_internal(header_info.header(), header_info.header_len()) | |
| ) -> Result<AsyncAvroFileReader<R>, AvroError> { | |
| if self.file_size == 0 { | |
| return Err(AvroError::InvalidArgument("File size cannot be 0".into())); | |
| } | |
| self.build_internal(header_info.header(), header_info.header_len()) |
There was a problem hiding this comment.
I thought it guards a degenerate case in try_build where we'd attempt to read the header even though the file size is 0.
If we receive the header, we trust the length anyway and if the actual length is 0, the reading will fail elsewhere.
| @@ -1273,7 +1275,7 @@ impl ReaderBuilder { | |||
| /// the discovered writer (and optional reader) schema, and prepares to iterate blocks, | |||
| /// decompressing if necessary. | |||
| pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, ArrowError> { | |||
There was a problem hiding this comment.
I do think there's value in adding the HeaderInfo to the sync Reader, but that can be added in the future.
Co-authored-by: Connor Sanders <170039284+jecsand838@users.noreply.github.com>
Specifically, use the read_header_info function and reuse the writer_schema method in the async reader's builder, sacrificing some negligible performance loss on allocating and constructing a temporary Arc in the HeaderInfo.
alamb
left a comment
There was a problem hiding this comment.
Thank you @mzabaluev and @jecsand838
Which issue does this PR close?
Rationale for this change
Rework of #9462 along the lines proposed in #9462 (comment).
What changes are included in this PR?
Add
HeaderInfoas a cheaply cloneable value to expose header information parsed from an Avro OCF file.Add
read_header_infofunction to thereadermodule, and its async counterpart to thereader::async_readermodule, to read the header from the file reader and returnHeaderInfo.Add
build_with_headermethod to async reader builder to enable reuse of the header with multiple readers.Are these changes tested?
Added a test for the async reader.
Are there any user-facing changes?
New API in arrow-avro:
reader::HeaderInforeader::read_header_infoandreader::async_reader::read_header_infobuild_with_headermethod ofAvroAsyncFileReader's builder.