Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ public class ParquetProperties {

public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;

/**
* Default for the Approach 2 (micro-row-group) writer knob: 0 means the legacy writer
* path is used and each {@code ParquetWriter} record-batch flush produces exactly one
* {@code BlockMetaData}. A positive value opts in to producing K logical
* micro-row-groups per physical column chunk on every flush, sized by this value.
*/
public static final long DEFAULT_MICRO_ROW_GROUP_ROW_COUNT = 0L;

/**
* @deprecated This shared instance can cause thread safety issues when used by multiple builders concurrently.
* Use {@code new DefaultValuesWriterFactory()} instead to create individual instances.
Expand Down Expand Up @@ -135,6 +143,7 @@ public static WriterVersion fromString(String name) {
private final Map<String, String> extraMetaData;
private final ColumnProperty<Boolean> statistics;
private final ColumnProperty<Boolean> sizeStatistics;
private final long microRowGroupRowCount;

private ParquetProperties(Builder builder) {
this.pageSizeThreshold = builder.pageSize;
Expand Down Expand Up @@ -167,6 +176,7 @@ private ParquetProperties(Builder builder) {
this.extraMetaData = builder.extraMetaData;
this.statistics = builder.statistics.build();
this.sizeStatistics = builder.sizeStatistics.build();
this.microRowGroupRowCount = builder.microRowGroupRowCount;
}

public static Builder builder() {
Expand Down Expand Up @@ -322,6 +332,18 @@ public boolean getPageWriteChecksumEnabled() {
return pageWriteChecksumEnabled;
}

/**
* @return the Approach 2 (micro-row-group) target row count per logical block, or
* {@code 0} if disabled. When positive, every record-batch flush in
* {@code InternalParquetRecordWriter} produces {@code ceil(flushRowCount /
* microRowGroupRowCount)} logical {@code BlockMetaData} entries that share one
* physical column chunk; readers consume them via
* {@code ParquetFileReader.readNextRowGroup()}'s Approach 2 dispatch.
*/
public long getMicroRowGroupRowCount() {
return microRowGroupRowCount;
}

public OptionalLong getBloomFilterNDV(ColumnDescriptor column) {
Long ndv = bloomFilterNDVs.getValue(column);
return ndv == null ? OptionalLong.empty() : OptionalLong.of(ndv);
Expand Down Expand Up @@ -419,6 +441,7 @@ public static class Builder {
private Map<String, String> extraMetaData = new HashMap<>();
private final ColumnProperty.Builder<Boolean> statistics;
private final ColumnProperty.Builder<Boolean> sizeStatistics;
private long microRowGroupRowCount = DEFAULT_MICRO_ROW_GROUP_ROW_COUNT;

private Builder() {
enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
Expand Down Expand Up @@ -460,6 +483,7 @@ private Builder(ParquetProperties toCopy) {
this.extraMetaData = toCopy.extraMetaData;
this.statistics = ColumnProperty.builder(toCopy.statistics);
this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics);
this.microRowGroupRowCount = toCopy.microRowGroupRowCount;
}

/**
Expand Down Expand Up @@ -756,6 +780,30 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) {
return this;
}

/**
* Opt in to the Approach 2 (micro-row-group) writer path: each record-batch flush
* produces one physical column chunk whose pages are split into multiple logical
* {@code BlockMetaData} entries of approximately this row count. Set to {@code 0} to
* use the legacy single-block-per-flush behavior (the default).
*
* <p>Prototype limitations apply (see {@link
* org.apache.parquet.hadoop.metadata.ColumnChunkMetaData#SENTINEL_OFFSET}):
* encryption is unsupported and per-block statistics / ColumnIndex / bloom filters
* are not emitted.
*
* @param microRowGroupRowCount target row count per logical micro-row-group, or
* {@code 0} to disable. Must be non-negative.
* @return this builder for method chaining
*/
public Builder withMicroRowGroupRowCount(long microRowGroupRowCount) {
Preconditions.checkArgument(
microRowGroupRowCount >= 0,
"Invalid micro-row-group row count (negative): %s",
microRowGroupRowCount);
this.microRowGroupRowCount = microRowGroupRowCount;
return this;
}

public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public OffsetIndex build(long shift) {
private final IntList compressedPageSizes = new IntArrayList();
private final LongList firstRowIndexes = new LongArrayList();
private final LongList unencodedDataBytes = new LongArrayList();
// Per-page row counts captured by the two-arg add(compressedPageSize, rowCount) call.
// Kept alongside firstRowIndexes so the micro-row-group writer path can slice pages
// by absolute row range without recomputing row deltas from the builder's running state.
private final LongList rowCounts = new LongArrayList();
private long previousOffset;
private int previousPageSize;
private long previousRowIndex;
Expand Down Expand Up @@ -161,6 +165,7 @@ public void add(int compressedPageSize, long rowCount, Optional<Long> unencodedD
previousRowIndex + previousRowCount,
unencodedDataBytes);
previousRowCount = rowCount;
rowCounts.add(rowCount);
}

/**
Expand Down Expand Up @@ -257,4 +262,29 @@ public OffsetIndex build(long shift) {

return offsetIndex;
}

/**
* @return the number of pages added via the two-arg {@link #add(int, long)} variant
* (used by the micro-row-group writer path). Returns 0 if pages were added via
* the explicit-offset variant only.
*/
public int getPageCount() {
return rowCounts.size();
}

/**
* @param pageIndex page ordinal in [0, {@link #getPageCount()})
* @return the compressed page size recorded for page {@code pageIndex}
*/
public int getCompressedPageSize(int pageIndex) {
return compressedPageSizes.getInt(pageIndex);
}

/**
* @param pageIndex page ordinal in [0, {@link #getPageCount()})
* @return the row count recorded for page {@code pageIndex} via {@link #add(int, long)}
*/
public long getRowCount(int pageIndex) {
return rowCounts.getLong(pageIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,22 @@ public static RowRanges createSingle(long rowCount) {
return new RowRanges(new Range(0L, rowCount - 1L));
}

/**
* Creates an immutable {@link RowRanges} with the single closed range {@code [from, to]}.
* Used by the Approach 2 (micro-row-group) reader path to express a logical micro-row-group's
* absolute row range against a shared physical column chunk.
*
* @param from inclusive first row index (must be non-negative)
* @param to inclusive last row index (must be {@code >= from})
* @return an immutable {@link RowRanges} representing {@code [from, to]}
*/
public static RowRanges createBetween(long from, long to) {
if (from < 0 || to < from) {
throw new IllegalArgumentException("Invalid row range [" + from + ", " + to + ']');
}
return new RowRanges(new Range(from, to));
}

/**
* Creates a mutable RowRanges object with the following ranges:
* <pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,38 @@ public void testIntersection() {
assertAllRowsEqual(intersection(empty, ranges2).iterator());
assertAllRowsEqual(intersection(empty, empty).iterator());
}

@Test
public void testCreateBetween() {
// Single-element range
RowRanges single = RowRanges.createBetween(42L, 42L);
assertEquals(1L, single.rowCount());
assertAllRowsEqual(single.iterator(), 42L);

// Multi-element range starting at zero (matches createSingle semantics)
RowRanges fromZero = RowRanges.createBetween(0L, 4L);
assertEquals(5L, fromZero.rowCount());
assertAllRowsEqual(fromZero.iterator(), 0L, 1L, 2L, 3L, 4L);
assertEquals(
RowRanges.createSingle(5L).getRanges().toString(),
fromZero.getRanges().toString());

// Multi-element range with non-zero (file-absolute) start, the Approach 2 use case
RowRanges absolute = RowRanges.createBetween(100_000L, 100_004L);
assertEquals(5L, absolute.rowCount());
assertAllRowsEqual(absolute.iterator(), 100_000L, 100_001L, 100_002L, 100_003L, 100_004L);
assertTrue(absolute.isOverlapping(100_002L, 100_003L));
assertFalse(absolute.isOverlapping(99_000L, 99_999L));
assertFalse(absolute.isOverlapping(100_005L, 100_010L));
}

@Test(expected = IllegalArgumentException.class)
public void testCreateBetweenRejectsNegativeFrom() {
RowRanges.createBetween(-1L, 0L);
}

@Test(expected = IllegalArgumentException.class)
public void testCreateBetweenRejectsInvertedRange() {
RowRanges.createBetween(10L, 5L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,35 @@ public long getMemSize() {
return buf.size();
}

/**
* Snapshot this page writer's accumulated state as a {@link MicroRowGroupColumnData}
* for the Approach 2 (micro-row-group) writer path. Returns the SAME {@code buf} and
* {@code offsetIndexBuilder} references this writer owns — the caller (typically
* {@link ParquetFileWriter#writeMicroRowGroups}) will write the buffer to disk once
* and reuse the builder; this writer must not be touched again after the drain.
*
* @throws IllegalStateException if the writer is configured for encrypted output —
* Approach 2 does not support encryption in this prototype.
*/
MicroRowGroupColumnData drainForMicroRowGroups() {
if (headerBlockEncryptor != null) {
throw new IllegalStateException(
"drainForMicroRowGroups (Approach 2) does not support encryption");
}
return new MicroRowGroupColumnData(
path,
compressor.getCodecName(),
dictionaryPage,
buf,
uncompressedLength,
compressedLength,
totalValueCount,
offsetIndexBuilder,
rlEncodings,
dlEncodings,
dataEncodings);
}

public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
if (null == headerBlockEncryptor) {
writer.writeColumnChunk(
Expand Down Expand Up @@ -700,4 +729,20 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
pageWriter.writeToFileWriter(writer);
}
}

/**
* Snapshot every column's accumulated pages as {@link MicroRowGroupColumnData} entries
* (in {@link #schema} column order) for the Approach 2 (micro-row-group) writer path.
* After this call, the underlying page writers must not be used to write any more pages;
* the typical caller closes this store immediately afterwards.
*
* @throws IllegalStateException if any column writer was configured for encrypted output
*/
List<MicroRowGroupColumnData> drainForMicroRowGroups() {
List<MicroRowGroupColumnData> result = new ArrayList<>(schema.getColumns().size());
for (ColumnDescriptor path : schema.getColumns()) {
result.add(writers.get(path).drainForMicroRowGroups());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,25 @@ private void flushRowGroupToStore() throws IOException {

if (recordCount > 0) {
rowGroupOrdinal++;
parquetFileWriter.startBlock(recordCount);
columnStore.flush();
pageStore.flushToFileWriter(parquetFileWriter);
recordCount = 0;
parquetFileWriter.endBlock();
this.nextRowGroupSize = Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold);
long microRowGroupRowCount = props.getMicroRowGroupRowCount();
if (microRowGroupRowCount > 0 && fileEncryptor == null && recordCount > microRowGroupRowCount) {
// Approach 2 path: flush all pages, then write one physical column chunk per
// column whose pages are sliced into K logical micro-row-groups, each marked
// with data_page_offset == SENTINEL_OFFSET. Encryption short-circuits to the
// legacy path because writeMicroRowGroups does not support encrypted columns.
columnStore.flush();
long[] microRowGroupRowCounts = splitIntoMicroRowGroupCounts(recordCount, microRowGroupRowCount);
parquetFileWriter.writeMicroRowGroups(pageStore.drainForMicroRowGroups(), microRowGroupRowCounts);
recordCount = 0;
this.nextRowGroupSize = Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold);
} else {
parquetFileWriter.startBlock(recordCount);
columnStore.flush();
pageStore.flushToFileWriter(parquetFileWriter);
recordCount = 0;
parquetFileWriter.endBlock();
this.nextRowGroupSize = Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold);
}
}
} finally {
AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore);
Expand All @@ -240,6 +253,22 @@ private void flushRowGroupToStore() throws IOException {
}
}

/**
* Split a total record count into K logical micro-row-group row counts of
* approximately {@code target} rows each, with the final entry absorbing the remainder.
*/
private static long[] splitIntoMicroRowGroupCounts(long total, long target) {
int k = Math.toIntExact((total + target - 1) / target);
long[] counts = new long[k];
long remaining = total;
for (int i = 0; i < k - 1; i++) {
counts[i] = target;
remaining -= target;
}
counts[k - 1] = remaining;
return counts;
}

long getRowGroupSizeThreshold() {
return rowGroupSizeThreshold;
}
Expand Down
Loading