From 6063e0630b423d6c37570cc230eb6df1eac36a42 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Mon, 18 May 2026 13:27:50 +0100 Subject: [PATCH 1/2] Let spark report statistics Signed-off-by: Robert Kruszewski --- .../main/java/dev/vortex/api/DataSource.java | 51 ++++ .../java/dev/vortex/jni/NativeDataSource.java | 6 + .../dev/vortex/spark/read/VortexScan.java | 109 +++++++- .../vortex/spark/read/VortexScanBuilder.java | 19 +- .../spark/VortexDataSourceStatsTest.java | 232 ++++++++++++++++++ vortex-file/src/multi/mod.rs | 37 ++- vortex-jni/src/data_source.rs | 67 ++++- vortex-layout/src/scan/multi.rs | 111 ++++++++- 8 files changed, 603 insertions(+), 29 deletions(-) create mode 100644 java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java diff --git a/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java b/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java index d13f7e6acc1..a586acaf1a8 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java +++ b/java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java @@ -128,6 +128,57 @@ public OptionalLong asOptional() { } } + /** + * Sum of the on-storage byte sizes of all files included in this data source along with the precision of that + * estimate. Mirrors the Rust {@code Option>} returned by {@code DataSource::byte_size}: + * {@link ByteSize.Unknown} when no estimate is available (for example when the filesystem listing did not return + * sizes), {@link ByteSize.Estimate} for an inexact hint (some files contribute extrapolated sizes), and + * {@link ByteSize.Exact} when every file has a known size. + */ + public ByteSize byteSize() { + long[] out = new long[2]; + NativeDataSource.byteSize(pointer, out); + return switch ((int) out[1]) { + case 1 -> new ByteSize.Estimate(out[0]); + case 2 -> new ByteSize.Exact(out[0]); + default -> ByteSize.Unknown.INSTANCE; + }; + } + + /** Precision-aware byte size. See {@link #byteSize()}. */ + public sealed interface ByteSize { + /** Returns the byte size as a long, or {@code OptionalLong.empty()} when unknown. */ + OptionalLong asOptional(); + + /** Byte size is not known. */ + final class Unknown implements ByteSize { + public static final Unknown INSTANCE = new Unknown(); + + private Unknown() {} + + @Override + public OptionalLong asOptional() { + return OptionalLong.empty(); + } + } + + /** Estimated byte size; the actual value may differ. */ + record Estimate(long value) implements ByteSize { + @Override + public OptionalLong asOptional() { + return OptionalLong.of(value); + } + } + + /** Exact byte size. */ + record Exact(long value) implements ByteSize { + @Override + public OptionalLong asOptional() { + return OptionalLong.of(value); + } + } + } + /** Submit a scan. */ public Scan scan(ScanOptions options) { Objects.requireNonNull(options, "options"); diff --git a/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java b/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java index b7e58d2dc21..cc2aa163cee 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java +++ b/java/vortex-jni/src/main/java/dev/vortex/jni/NativeDataSource.java @@ -33,4 +33,10 @@ private NativeDataSource() {} * {@code 1=estimate}, {@code 2=exact}. */ public static native void rowCount(long pointer, long[] out); + + /** + * Populate {@code out} with {@code [bytes, precision]}, the sum of on-storage file sizes for the data source. + * Precision is one of {@code 0=unknown}, {@code 1=estimate}, {@code 2=exact}. + */ + public static native void byteSize(long pointer, long[] out); } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java index d5949b57a4d..493e981d8f1 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java @@ -4,37 +4,65 @@ package dev.vortex.spark.read; import java.util.Arrays; +import dev.vortex.api.DataSource; +import dev.vortex.api.Session; +import dev.vortex.jni.NativeFiles; +import dev.vortex.spark.VortexSparkSession; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.OptionalLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.spark.sql.connector.catalog.CatalogV2Util; import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.expressions.filter.Predicate; +import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.Statistics; +import org.apache.spark.sql.connector.read.SupportsReportStatistics; +import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; -/** Spark V2 {@link Scan} over a table of Vortex files. */ -public final class VortexScan implements Scan { +/** + * Spark V2 {@link Scan} over a table of Vortex files. + * + *

Implements {@link SupportsReportStatistics} to surface both the row count Vortex records in each file footer and a + * Spark scan-size estimate. The byte estimate starts from the on-storage file sizes collected by + * {@code MultiFileDataSource}, then follows Spark's file scan convention by applying the SQL file-compression factor + * and scaling by the pushed read schema's default size relative to the full table schema's default size. When the + * listing did not return a size for one or more files the file-byte total is extrapolated before Spark scaling is + * applied. + */ +public final class VortexScan implements Scan, SupportsReportStatistics { private final List paths; + private final List tableColumns; private final List readColumns; private final Map formatOptions; private final Predicate[] pushedPredicates; + private volatile Statistics cachedStatistics; + /** * Creates a new VortexScan for the specified file paths and columns. The caller is responsible for passing * immutable collections; the constructor does not copy. * * @param paths the list of Vortex file paths to scan + * @param tableColumns the full table columns before projection pushdown * @param readColumns the list of columns to read from the files * @param pushedPredicates predicates pushed down by Spark; {@code null} or empty means no pushdown */ public VortexScan( List paths, + List tableColumns, List readColumns, - Map formatOptions, - Predicate[] pushedPredicates) { + Predicate[] pushedPredicates, + Map formatOptions) { this.paths = paths; + this.tableColumns = tableColumns; this.readColumns = readColumns; this.formatOptions = formatOptions; this.pushedPredicates = pushedPredicates == null ? new Predicate[0] : pushedPredicates.clone(); @@ -83,4 +111,77 @@ public Batch toBatch() { public ColumnarSupportMode columnarSupportMode() { return ColumnarSupportMode.SUPPORTED; } + + /** + * Returns statistics for this scan. + * + *

Opens the Vortex {@link DataSource} on first invocation and caches the result. The row count is taken from the + * data source (sum of file-footer row counts; extrapolated from the first opened file when other files are + * deferred). {@link Statistics#sizeInBytes()} is derived from the per-file sizes reported by the filesystem + * listing, then adjusted by Spark's compression factor and the ratio between the pushed read schema and the full + * table schema. When a listing did not return a size for some file the file-byte total is extrapolated. When no + * file size is known at all the value is left empty so Spark falls back to its default heuristic. + * + * @return statistics with row-count and Spark scan-size estimates + */ + @Override + public Statistics estimateStatistics() { + Statistics local = cachedStatistics; + if (local != null) { + return local; + } + synchronized (this) { + if (cachedStatistics == null) { + cachedStatistics = computeStatistics(); + } + return cachedStatistics; + } + } + + private Statistics computeStatistics() { + Session session = VortexSparkSession.get(formatOptions); + // Expand directory paths to concrete files the way VortexBatchExec does, so we use the + // same per-path resolution end-to-end. + List resolvedPaths = paths.stream() + .flatMap(path -> path.endsWith(".vortex") + ? Stream.of(path) + : NativeFiles.listFiles(session, path, formatOptions).stream()) + .collect(Collectors.toList()); + + if (resolvedPaths.isEmpty()) { + return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty()); + } + + DataSource source = DataSource.open(session, resolvedPaths, formatOptions); + return new VortexStatistics( + source.rowCount().asOptional(), + scaleSizeInBytes(source.byteSize().asOptional())); + } + + private OptionalLong scaleSizeInBytes(OptionalLong fileBytes) { + if (fileBytes.isEmpty()) { + return OptionalLong.empty(); + } + + StructType tableSchema = CatalogV2Util.v2ColumnsToStructType(tableColumns.toArray(new Column[0])); + StructType readSchema = readSchema(); + int tableDefaultSize = tableSchema.defaultSize(); + if (tableDefaultSize <= 0) { + return fileBytes; + } + + double scaled = SQLConf.get().fileCompressionFactor() + * fileBytes.getAsLong() + / tableDefaultSize + * readSchema.defaultSize(); + return OptionalLong.of((long) scaled); + } + + private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes) implements Statistics { + + @Override + public Map columnStats() { + return new HashMap<>(); + } + } } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java index 94990432b45..384c2778b1e 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java @@ -31,7 +31,8 @@ public final class VortexScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters { private final ImmutableList.Builder paths; - private final List columns; + private final List tableColumns; + private final List readColumns; private final Map formatOptions; private final Set partitionColumnNames; private Predicate[] pushedPredicates = new Predicate[0]; @@ -48,10 +49,11 @@ public VortexScanBuilder(Map formatOptions) { */ public VortexScanBuilder(Map formatOptions, Transform[] partitionTransforms) { this.paths = ImmutableList.builder(); - this.columns = new ArrayList<>(); Map options = Maps.newHashMap(); options.put("vortex.workerThreads", "4"); options.putAll(formatOptions); + this.tableColumns = new ArrayList<>(); + this.readColumns = new ArrayList<>(); this.formatOptions = options; this.partitionColumnNames = collectPartitionColumnNames(partitionTransforms); } @@ -74,7 +76,8 @@ public VortexScanBuilder addPath(String path) { * @return this builder for method chaining */ public VortexScanBuilder addColumn(Column column) { - this.columns.add(column); + this.tableColumns.add(column); + this.readColumns.add(column); return this; } @@ -97,7 +100,7 @@ public VortexScanBuilder addAllPaths(Iterable paths) { */ public VortexScanBuilder addAllColumns(Iterable columns) { for (Column column : columns) { - this.columns.add(column); + addColumn(column); } return this; } @@ -116,7 +119,7 @@ public Scan build() { // Allow empty columns for operations like count() that don't need actual column data // If no columns are specified, we'll read the minimal schema needed - return new VortexScan(paths, List.copyOf(this.columns), this.formatOptions, pushedPredicates); + return new VortexScan(paths, List.copyOf(this.tableColumns), List.copyOf(this.readColumns), pushedPredicates, this.formatOptions); } /** @@ -129,8 +132,8 @@ public Scan build() { */ @Override public void pruneColumns(StructType requiredSchema) { - columns.clear(); - columns.addAll(Arrays.asList(CatalogV2Util.structTypeToV2Columns(requiredSchema))); + readColumns.clear(); + readColumns.addAll(Arrays.asList(CatalogV2Util.structTypeToV2Columns(requiredSchema))); } /** @@ -145,7 +148,7 @@ public void pruneColumns(StructType requiredSchema) { @Override public Predicate[] pushPredicates(Predicate[] predicates) { Map dataColumnTypes = new HashMap<>(); - for (Column column : columns) { + for (Column column : readColumns) { if (!partitionColumnNames.contains(column.name())) { dataColumnTypes.put(column.name(), column.dataType()); } diff --git a/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java new file mode 100644 index 00000000000..c28ec8b3569 --- /dev/null +++ b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceStatsTest.java @@ -0,0 +1,232 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.spark; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import dev.vortex.spark.read.VortexScan; +import dev.vortex.spark.read.VortexScanBuilder; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Column; +import org.apache.spark.sql.connector.read.Statistics; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.io.TempDir; + +/** + * Integration tests for {@link VortexScan#estimateStatistics()}. + * + *

Verifies that the Spark V2 scan surfaces both the row count Vortex stores in each file footer and the sum of the + * on-storage file sizes reported by the filesystem listing. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public final class VortexDataSourceStatsTest { + private static final String FILE_COMPRESSION_FACTOR_KEY = "spark.sql.sources.fileCompressionFactor"; + + private SparkSession spark; + + @TempDir + Path tempDir; + + @BeforeAll + public void setUp() { + spark = SparkSession.builder() + .appName("VortexStatsTest") + .master("local[2]") + .config("spark.driver.host", "127.0.0.1") + .config("spark.sql.shuffle.partitions", "2") + .config("spark.sql.adaptive.enabled", "false") + .config("spark.ui.enabled", "false") + .getOrCreate(); + } + + @AfterAll + public void tearDown() { + if (spark != null) { + spark.stop(); + } + } + + @Test + @DisplayName("VortexScan reports exact row count for single-file scans") + public void testEstimateStatisticsReportsRowCount() throws IOException { + int numRows = 250; + Path outputPath = writeRows(numRows, "single_file"); + + VortexScan scan = buildScan(outputPath); + Statistics stats = scan.estimateStatistics(); + + assertTrue( + stats.numRows().isPresent(), + "VortexScan should report a row count for a Vortex dataset with a populated footer"); + assertEquals(numRows, stats.numRows().getAsLong(), "Row count should match the rows we wrote"); + } + + @Test + @DisplayName("VortexScan reports aggregate row count across multi-file scans") + public void testEstimateStatisticsAcrossMultipleFiles() throws IOException { + int numRows = 400; + Path outputPath = writeRows(numRows, "multi_file", 4); + + VortexScan scan = buildScan(outputPath); + Statistics stats = scan.estimateStatistics(); + + assertTrue(stats.numRows().isPresent(), "Row count should be reported for multi-file Vortex datasets"); + assertEquals(numRows, stats.numRows().getAsLong(), "Row count should sum across all files"); + } + + @Test + @DisplayName("VortexScan reports sizeInBytes equal to the sum of on-storage file sizes") + public void testEstimateStatisticsReportsSizeInBytes() throws IOException { + Path outputPath = writeRows(120, "with_size", 3); + + long expectedTotalBytes = totalVortexFileBytes(outputPath); + assertTrue(expectedTotalBytes > 0, "Test setup should produce at least one non-empty .vortex file"); + + VortexScan scan = buildScan(outputPath); + Statistics stats = scan.estimateStatistics(); + + assertTrue( + stats.sizeInBytes().isPresent(), + "VortexScan should surface a sizeInBytes when the filesystem listing reports file sizes"); + assertEquals( + expectedTotalBytes, + stats.sizeInBytes().getAsLong(), + "sizeInBytes should equal the sum of on-storage .vortex file sizes"); + } + + @Test + @DisplayName("VortexScan scales sizeInBytes by the pushed read schema") + public void testEstimateStatisticsScalesSizeInBytesForProjection() throws IOException { + Path outputPath = writeRows(120, "projected_size", 3); + long fileBytes = totalVortexFileBytes(outputPath); + + StructType fullSchema = spark.read() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .load() + .schema(); + StructType idOnlySchema = new StructType(new StructField[] {fullSchema.fields()[0]}); + + String previousCompressionFactor = spark.conf().get(FILE_COMPRESSION_FACTOR_KEY); + spark.conf().set(FILE_COMPRESSION_FACTOR_KEY, "0.5"); + try { + VortexScan scan = buildScan(outputPath, idOnlySchema); + Statistics stats = scan.estimateStatistics(); + + long expectedSize = (long) (0.5 * fileBytes / fullSchema.defaultSize() * idOnlySchema.defaultSize()); + assertTrue(stats.sizeInBytes().isPresent(), "Projected scans should still surface sizeInBytes"); + assertEquals( + expectedSize, + stats.sizeInBytes().getAsLong(), + "sizeInBytes should follow Spark FileScan's compression and schema-width scaling"); + assertTrue( + stats.sizeInBytes().getAsLong() < fileBytes, + "Projected scan stats should be smaller than full file bytes"); + } finally { + spark.conf().set(FILE_COMPRESSION_FACTOR_KEY, previousCompressionFactor); + } + } + + @Test + @DisplayName("VortexScan caches statistics across repeated calls") + public void testEstimateStatisticsIsCached() throws IOException { + Path outputPath = writeRows(50, "cached", 1); + + VortexScan scan = buildScan(outputPath); + Statistics first = scan.estimateStatistics(); + Statistics second = scan.estimateStatistics(); + + // Same instance returned -- the second call hits the cached value. + assertEquals(first, second, "estimateStatistics() should return the same Statistics object on repeat calls"); + assertInstanceOf(Statistics.class, first); + } + + private VortexScan buildScan(Path outputPath) { + return buildScan(outputPath, null); + } + + private VortexScan buildScan(Path outputPath, StructType requiredSchema) { + Dataset readDf = spark.read() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .load(); + StructType readSchema = readDf.schema(); + + VortexScanBuilder builder = new VortexScanBuilder(Map.of()); + builder.addPath(outputPath.toUri().toString()); + for (StructField field : readSchema.fields()) { + builder.addColumn(Column.create(field.name(), field.dataType())); + } + if (requiredSchema != null) { + builder.pruneColumns(requiredSchema); + } + return (VortexScan) builder.build(); + } + + private Path writeRows(int numRows, String name) throws IOException { + return writeRows(numRows, name, 1); + } + + private long totalVortexFileBytes(Path outputPath) throws IOException { + try (Stream paths = Files.walk(outputPath)) { + return paths.filter(Files::isRegularFile) + .filter(path -> path.getFileName().toString().endsWith(".vortex")) + .mapToLong(path -> { + try { + return Files.size(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .sum(); + } + } + + private Path writeRows(int numRows, String name, int partitions) throws IOException { + Path outputPath = tempDir.resolve(name); + Dataset df = spark.range(0, numRows) + .selectExpr("cast(id as int) as id", "concat('value_', cast(id as string)) as value"); + + df.repartition(partitions) + .write() + .format("vortex") + .option("path", outputPath.toUri().toString()) + .mode(SaveMode.Overwrite) + .save(); + return outputPath; + } + + @AfterEach + public void cleanupTempFiles() throws IOException { + if (tempDir != null && Files.exists(tempDir)) { + try (Stream paths = Files.walk(tempDir)) { + paths.sorted(Comparator.reverseOrder()).forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + System.err.println("Failed to delete: " + path); + } + }); + } + } + } +} diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs index 78c18ea438e..1e2ba3523c3 100644 --- a/vortex-file/src/multi/mod.rs +++ b/vortex-file/src/multi/mod.rs @@ -60,6 +60,9 @@ pub struct MultiFileDataSource { /// List of (glob, optional filesystem) pairs to resolve. /// When the filesystem is None, a local filesystem will be created in build(). glob_sources: Vec<(String, Option)>, + /// Pre-resolved file listings that skip glob expansion. The caller is responsible for + /// supplying the [`FileListing::size`] when stats reporting matters. + listing_sources: Vec<(FileListing, FileSystemRef)>, open_options_fn: Arc VortexOpenOptions + Send + Sync>, } @@ -69,6 +72,7 @@ impl MultiFileDataSource { Self { session, glob_sources: Vec::new(), + listing_sources: Vec::new(), open_options_fn: Arc::new(|opts| opts), } } @@ -94,6 +98,23 @@ impl MultiFileDataSource { self } + /// Add a pre-resolved file listing. + /// + /// Use this when the caller already knows the exact file path and (optionally) its size, + /// avoiding the glob expansion done by [`Self::with_glob`]. Supplying + /// [`FileListing::size`] is required for [`DataSource::byte_size`] to surface a contribution + /// from this file; otherwise the source size remains unknown for this file and the + /// data-source-level total is extrapolated from the files that do report a size. + pub fn with_listing(mut self, listing: FileListing, fs: FileSystemRef) -> Self { + let FileListing { path, size } = listing; + let listing = FileListing { + path: path.trim_start_matches('/').to_string(), + size, + }; + self.listing_sources.push((listing, fs)); + self + } + /// Customize [`VortexOpenOptions`] applied to each file. /// /// Use this to configure segment caches, metrics registries, or other per-file options. @@ -110,8 +131,10 @@ impl MultiFileDataSource { /// Discovers files via glob, opens the first file eagerly to determine the schema, /// and creates lazy factories for the remaining files. pub async fn build(self) -> VortexResult { - if self.glob_sources.is_empty() { - vortex_bail!("MultiFileDataSource requires at least one glob pattern"); + if self.glob_sources.is_empty() && self.listing_sources.is_empty() { + vortex_bail!( + "MultiFileDataSource requires at least one glob pattern or pre-resolved listing" + ); } // Create local filesystem lazily if needed (only if any glob lacks a filesystem). @@ -139,6 +162,7 @@ impl MultiFileDataSource { all_files.push((file, Arc::clone(&fs))); } } + all_files.extend(self.listing_sources); if all_files.is_empty() { let globs: Vec<_> = self.glob_sources.iter().map(|(g, _)| g.as_str()).collect(); @@ -155,6 +179,8 @@ impl MultiFileDataSource { let first_file = open_file(first_fs, first_file_listing, &self.session, open_fn).await?; let first_reader = layout_reader_with_stats(&first_file)?; + let byte_sizes: Vec> = all_files.iter().map(|(file, _)| file.size).collect(); + let factories: Vec> = all_files[1..] .iter() .map(|(file, fs)| { @@ -167,7 +193,12 @@ impl MultiFileDataSource { }) .collect(); - let inner = MultiLayoutDataSource::new_with_first(first_reader, factories, &self.session); + let inner = MultiLayoutDataSource::new_with_first( + first_reader, + factories, + byte_sizes, + &self.session, + ); debug!(file_count, dtype = %inner.dtype(), "built MultiFileDataSource"); diff --git a/vortex-jni/src/data_source.rs b/vortex-jni/src/data_source.rs index d733c2db48f..5f8ca2427a6 100644 --- a/vortex-jni/src/data_source.rs +++ b/vortex-jni/src/data_source.rs @@ -16,6 +16,8 @@ use std::path::PathBuf; use std::path::absolute; use std::sync::Arc; +use futures::StreamExt; +use futures::stream; use jni::EnvUnowned; use jni::objects::JClass; use jni::objects::JLongArray; @@ -28,6 +30,7 @@ use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::expr::stats::Precision; use vortex::file::multi::MultiFileDataSource; +use vortex::io::filesystem::FileListing; use vortex::io::filesystem::FileSystemRef; use vortex::io::runtime::BlockingRuntime; use vortex::io::session::RuntimeSessionExt; @@ -41,6 +44,10 @@ use crate::file::extract_properties; use crate::object_store::object_store_fs; use crate::session::session_ref; +/// In-flight size lookups while resolving exact paths to file listings. Balances HEAD +/// throughput on remote stores against connection overhead. +const SIZE_LOOKUP_CONCURRENCY: usize = 16; + /// Wraps an `Arc` behind a single pointer. pub(crate) struct NativeDataSource { inner: DataSourceRef, @@ -103,14 +110,49 @@ pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_open( } } - let mut builder = MultiFileDataSource::new(session.clone()); + // Split inputs into glob patterns (which fs.glob() expands via list(), capturing sizes + // automatically) and exact paths (which are resolved one-by-one with a HEAD-style size + // lookup so the data source can report total bytes for Spark-style stats). + let mut glob_inputs: Vec<(String, FileSystemRef)> = Vec::new(); + let mut exact_inputs: Vec<(String, FileSystemRef)> = Vec::new(); for glob_url in &glob_urls { let base = base_url(glob_url); let fs = fs_cache .get(&base) .cloned() .unwrap_or_else(|| unreachable!("fs cached for every base url")); - builder = builder.with_glob(glob_url.path(), Some(fs)); + let path = glob_url.path().to_string(); + if path.contains(['*', '?', '[']) { + glob_inputs.push((path, fs)); + } else { + exact_inputs.push((path, fs)); + } + } + + let resolved_listings: Vec<(FileListing, FileSystemRef)> = if exact_inputs.is_empty() { + Vec::new() + } else { + RUNTIME.block_on(async { + stream::iter(exact_inputs) + .map(|(path, fs)| async move { + let size = match fs.open_read(&path).await { + Ok(source) => source.size().await.ok(), + Err(_) => None, + }; + (FileListing { path, size }, fs) + }) + .buffer_unordered(SIZE_LOOKUP_CONCURRENCY) + .collect::>() + .await + }) + }; + + let mut builder = MultiFileDataSource::new(session.clone()); + for (glob, fs) in glob_inputs { + builder = builder.with_glob(glob, Some(fs)); + } + for (listing, fs) in resolved_listings { + builder = builder.with_listing(listing, fs); } let inner = RUNTIME @@ -211,6 +253,27 @@ pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_rowCount( }); } +/// Write the byte size into the two-slot jlong pair `out`: +/// `out[0]` receives the size in bytes (0 when unknown), `out[1]` the precision (0=unknown, 1=estimate, 2=exact). +#[unsafe(no_mangle)] +pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_byteSize( + mut env: EnvUnowned, + _class: JClass, + pointer: jlong, + out: JLongArray, +) { + try_or_throw(&mut env, |env| { + let ds = unsafe { NativeDataSource::from_ptr(pointer) }; + let (bytes, precision) = match ds.inner.byte_size() { + Some(Precision::Exact(b)) => (b as jlong, 2), + Some(Precision::Inexact(b)) => (b as jlong, 1), + None => (0, 0), + }; + out.set_region(env, 0, &[bytes, precision])?; + Ok(()) + }); +} + #[cfg(test)] mod tests { use super::*; diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index 9442b72cf7a..b789142235a 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -32,6 +32,7 @@ use async_trait::async_trait; use futures::FutureExt; use futures::StreamExt; use futures::stream; +use itertools::Itertools; use tracing::Instrument; use vortex_array::dtype::DType; use vortex_array::dtype::FieldPath; @@ -87,26 +88,68 @@ pub struct MultiLayoutDataSource { } pub enum MultiLayoutChild { - Opened(LayoutReaderRef), - Deferred(Arc), + Opened { + reader: LayoutReaderRef, + /// On-storage file size in bytes, if known from the listing metadata. + byte_size: Option, + }, + Deferred { + factory: Arc, + /// On-storage file size in bytes, if known from the listing metadata. + byte_size: Option, + }, +} + +impl MultiLayoutChild { + /// On-storage file size in bytes for this child, if known. + pub fn byte_size(&self) -> Option { + match self { + MultiLayoutChild::Opened { byte_size, .. } => *byte_size, + MultiLayoutChild::Deferred { byte_size, .. } => *byte_size, + } + } } impl MultiLayoutDataSource { /// Creates a multi-layout data source with the first reader pre-opened. /// /// The first reader determines the dtype. Remaining readers are opened lazily during - /// scanning via their factories. + /// scanning via their factories. `byte_sizes` carries the on-storage file size in bytes for + /// each child (first followed by remaining); pass `None` for entries where the size is + /// unknown. Must be empty or have length `1 + remaining.len()`. pub fn new_with_first( first: LayoutReaderRef, remaining: Vec>, + byte_sizes: Vec>, session: &VortexSession, ) -> Self { let dtype = first.dtype().clone(); let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY); - let mut children = Vec::with_capacity(1 + remaining.len()); - children.push(MultiLayoutChild::Opened(first)); - children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred)); + let total = 1 + remaining.len(); + let mut sizes = byte_sizes; + if sizes.is_empty() { + sizes = vec![None; total]; + } + debug_assert_eq!( + sizes.len(), + total, + "byte_sizes length must match the number of children" + ); + + let mut children = Vec::with_capacity(total); + let mut sizes_iter = sizes.into_iter(); + let first_size = sizes_iter.next().unwrap_or(None); + children.push(MultiLayoutChild::Opened { + reader: first, + byte_size: first_size, + }); + children.extend( + remaining + .into_iter() + .zip_eq(sizes_iter) + .map(|(factory, byte_size)| MultiLayoutChild::Deferred { factory, byte_size }), + ); Self { dtype, @@ -120,20 +163,34 @@ impl MultiLayoutDataSource { /// /// The dtype must be provided externally since there is no pre-opened reader to infer it /// from. This avoids eagerly opening any file when the schema is already known (e.g. from - /// a catalog or a prior scan). + /// a catalog or a prior scan). `byte_sizes` carries the on-storage file size in bytes for + /// each factory; pass `None` for entries where the size is unknown. Must be empty or have + /// the same length as `factories`. pub fn new_deferred( dtype: DType, factories: Vec>, + byte_sizes: Vec>, session: &VortexSession, ) -> Self { let concurrency = get_available_parallelism().unwrap_or(DEFAULT_CONCURRENCY); + let mut sizes = byte_sizes; + if sizes.is_empty() { + sizes = vec![None; factories.len()]; + } + debug_assert_eq!( + sizes.len(), + factories.len(), + "byte_sizes length must match the number of factories" + ); + Self { dtype, session: session.clone(), children: factories .into_iter() - .map(MultiLayoutChild::Deferred) + .zip_eq(sizes) + .map(|(factory, byte_size)| MultiLayoutChild::Deferred { factory, byte_size }) .collect(), concurrency, } @@ -166,11 +223,11 @@ impl DataSource for MultiLayoutDataSource { for child in &self.children { match child { - MultiLayoutChild::Opened(reader) => { + MultiLayoutChild::Opened { reader, .. } => { opened_count += 1; sum = sum.saturating_add(reader.row_count()); } - MultiLayoutChild::Deferred(_) => { + MultiLayoutChild::Deferred { .. } => { deferred_count += 1; } } @@ -192,6 +249,34 @@ impl DataSource for MultiLayoutDataSource { } } + fn byte_size(&self) -> Option> { + let total_count = self.children.len() as u64; + if total_count == 0 { + return Some(Precision::exact(0u64)); + } + + let mut sum: u64 = 0; + let mut known_count: u64 = 0; + for child in &self.children { + if let Some(size) = child.byte_size() { + sum = sum.saturating_add(size); + known_count += 1; + } + } + + if known_count == 0 { + return None; + } + + if known_count == total_count { + Some(Precision::exact(sum)) + } else { + let avg = sum / known_count; + let extrapolated = avg.saturating_mul(total_count); + Some(Precision::inexact(extrapolated)) + } + } + fn deserialize_partition( &self, _data: &[u8], @@ -206,8 +291,10 @@ impl DataSource for MultiLayoutDataSource { for child in &self.children { match child { - MultiLayoutChild::Opened(reader) => ready.push_back(Arc::clone(reader)), - MultiLayoutChild::Deferred(factory) => deferred.push_back(Arc::clone(factory)), + MultiLayoutChild::Opened { reader, .. } => ready.push_back(Arc::clone(reader)), + MultiLayoutChild::Deferred { factory, .. } => { + deferred.push_back(Arc::clone(factory)) + } } } From 4d9b080ad571fa8bdaba35fb3ff78c91e34f2f35 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 26 May 2026 17:13:31 +0100 Subject: [PATCH 2/2] format Signed-off-by: Robert Kruszewski --- .../SparkPredicateToVortexExpression.java | 45 +++++++++---------- .../dev/vortex/spark/read/VortexScan.java | 6 +-- .../vortex/spark/read/VortexScanBuilder.java | 7 ++- vortex-duckdb/src/table_function.rs | 2 +- vortex-jni/src/data_source.rs | 6 +-- vortex-layout/src/scan/multi.rs | 10 ++--- 6 files changed, 38 insertions(+), 38 deletions(-) diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/SparkPredicateToVortexExpression.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/SparkPredicateToVortexExpression.java index 8d3fe697153..781c008c79a 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/SparkPredicateToVortexExpression.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/SparkPredicateToVortexExpression.java @@ -6,6 +6,13 @@ import dev.vortex.api.Expression; import dev.vortex.api.Expression.BinaryOp; import dev.vortex.api.Expression.TimeUnit; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; import org.apache.spark.sql.connector.expressions.Literal; import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.expressions.filter.AlwaysFalse; @@ -33,14 +40,6 @@ import org.apache.spark.sql.types.TimestampType; import org.apache.spark.unsafe.types.UTF8String; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Optional; - /** * Translates {@link Predicate Spark V2 predicates} into Vortex {@link Expression}s for predicate pushdown. * @@ -50,16 +49,15 @@ */ final class SparkPredicateToVortexExpression { - private SparkPredicateToVortexExpression() { - } + private SparkPredicateToVortexExpression() {} /** * Returns true if the given Spark predicate can be translated to a Vortex expression and every named reference * resolves to a real field path under {@code dataColumnTypes}. * *

{@code dataColumnTypes} maps each pushable top-level column name to its top-level Spark {@link DataType}; - * partition columns and columns the scan does not project should not appear in the map. For nested references - * (for example {@code info.email}) the validator walks the named reference part by part, descending into + * partition columns and columns the scan does not project should not appear in the map. For nested references (for + * example {@code info.email}) the validator walks the named reference part by part, descending into * {@link StructType} fields so that {@code info} must be a struct that contains an {@code email} field. * *

This is the cheap check used in {@code SupportsPushDownV2Filters.pushPredicates} to decide which predicates @@ -77,8 +75,7 @@ static boolean isPushable(Predicate predicate, Map dataColumnT /** * Walks {@code parts} against {@code dataColumnTypes}, descending through {@link StructType} fields for - * dot-separated nested references. Returns true only when every part resolves to an actual field in the - * schema. + * dot-separated nested references. Returns true only when every part resolves to an actual field in the schema. */ private static boolean resolveFieldPath(String[] parts, Map dataColumnTypes) { if (parts.length == 0) { @@ -102,7 +99,9 @@ private static boolean resolveFieldPath(String[] parts, Map da } private static Optional findField(StructType struct, String name) { - return Arrays.stream(struct.fields()).filter(structField -> structField.name().equals(name)).findFirst(); + return Arrays.stream(struct.fields()) + .filter(structField -> structField.name().equals(name)) + .findFirst(); } private static boolean isStructurallyPushable(Predicate predicate) { @@ -135,7 +134,7 @@ private static boolean isStructurallyPushable(Predicate predicate) { yield true; } case "STARTS_WITH", "ENDS_WITH", "CONTAINS" -> - children.length == 2 && isPushableFieldRef(children[0]) && isPushableStringLiteral(children[1]); + children.length == 2 && isPushableFieldRef(children[0]) && isPushableStringLiteral(children[1]); // `BOOLEAN_EXPRESSION` wraps a bare boolean-valued child. We only handle the case // where the child itself is a field reference (e.g. `WHERE bool_col`). case "BOOLEAN_EXPRESSION" -> children.length == 1 && isPushableFieldRef(children[0]); @@ -178,12 +177,12 @@ static Optional convert(Predicate predicate) { case "=", "<>", "!=", ">", ">=", "<", "<=" -> convertComparison(predicate.name(), children); case "IS_NULL" -> children.length == 1 ? columnOf(children[0]).map(Expression::isNull) : Optional.empty(); case "IS_NOT_NULL" -> - children.length == 1 ? columnOf(children[0]).map(Expression::isNotNull) : Optional.empty(); + children.length == 1 ? columnOf(children[0]).map(Expression::isNotNull) : Optional.empty(); case "IN" -> convertIn(children); case "STARTS_WITH" -> - convertStringMatch(children, /* leadingWildcard= */ false, /* trailingWildcard= */ true); + convertStringMatch(children, /* leadingWildcard= */ false, /* trailingWildcard= */ true); case "ENDS_WITH" -> - convertStringMatch(children, /* leadingWildcard= */ true, /* trailingWildcard= */ false); + convertStringMatch(children, /* leadingWildcard= */ true, /* trailingWildcard= */ false); case "CONTAINS" -> convertStringMatch(children, /* leadingWildcard= */ true, /* trailingWildcard= */ true); case "BOOLEAN_EXPRESSION" -> children.length == 1 ? columnOf(children[0]) : Optional.empty(); default -> Optional.empty(); @@ -327,9 +326,7 @@ private static boolean isFieldRefExpr(org.apache.spark.sql.connector.expressions return expr instanceof NamedReference; } - /** - * Returns the Vortex column expression for a Spark named reference, walking nested struct fields. - */ + /** Returns the Vortex column expression for a Spark named reference, walking nested struct fields. */ private static Optional columnOf(org.apache.spark.sql.connector.expressions.Expression expr) { if (!(expr instanceof NamedReference)) { return Optional.empty(); @@ -501,9 +498,7 @@ private static Optional convertLiteral(Object value, DataType dataTy return Optional.empty(); } - /** - * Extract the unscaled integer value of a Spark decimal literal at the supplied {@code scale}. - */ + /** Extract the unscaled integer value of a Spark decimal literal at the supplied {@code scale}. */ private static BigInteger unscaledValueOf(Object value, int scale) { BigDecimal decimal; if (value instanceof Decimal) { diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java index 493e981d8f1..5ccea99180d 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java @@ -3,11 +3,11 @@ package dev.vortex.spark.read; -import java.util.Arrays; import dev.vortex.api.DataSource; import dev.vortex.api.Session; import dev.vortex.jni.NativeFiles; import dev.vortex.spark.VortexSparkSession; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -16,8 +16,8 @@ import java.util.stream.Stream; import org.apache.spark.sql.connector.catalog.CatalogV2Util; import org.apache.spark.sql.connector.catalog.Column; -import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.Statistics; @@ -59,7 +59,7 @@ public VortexScan( List paths, List tableColumns, List readColumns, - Predicate[] pushedPredicates, + Predicate[] pushedPredicates, Map formatOptions) { this.paths = paths; this.tableColumns = tableColumns; diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java index 384c2778b1e..107d22b30f6 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java @@ -119,7 +119,12 @@ public Scan build() { // Allow empty columns for operations like count() that don't need actual column data // If no columns are specified, we'll read the minimal schema needed - return new VortexScan(paths, List.copyOf(this.tableColumns), List.copyOf(this.readColumns), pushedPredicates, this.formatOptions); + return new VortexScan( + paths, + List.copyOf(this.tableColumns), + List.copyOf(this.readColumns), + pushedPredicates, + this.formatOptions); } /** diff --git a/vortex-duckdb/src/table_function.rs b/vortex-duckdb/src/table_function.rs index 11c5851af27..7b34725b123 100644 --- a/vortex-duckdb/src/table_function.rs +++ b/vortex-duckdb/src/table_function.rs @@ -439,7 +439,7 @@ pub fn statistics(bind_data: &TableFunctionBind, column_index: usize) -> Option< if children.len() != 1 { return None; } - let MultiLayoutChild::Opened(reader) = &children[0] else { + let MultiLayoutChild::Opened { reader, .. } = &children[0] else { return None; }; let stats_sets = match reader.as_any().downcast_ref::() { diff --git a/vortex-jni/src/data_source.rs b/vortex-jni/src/data_source.rs index 5f8ca2427a6..81257d65162 100644 --- a/vortex-jni/src/data_source.rs +++ b/vortex-jni/src/data_source.rs @@ -265,9 +265,9 @@ pub extern "system" fn Java_dev_vortex_jni_NativeDataSource_byteSize( try_or_throw(&mut env, |env| { let ds = unsafe { NativeDataSource::from_ptr(pointer) }; let (bytes, precision) = match ds.inner.byte_size() { - Some(Precision::Exact(b)) => (b as jlong, 2), - Some(Precision::Inexact(b)) => (b as jlong, 1), - None => (0, 0), + Precision::Exact(b) => (b as jlong, 2), + Precision::Inexact(b) => (b as jlong, 1), + Precision::Absent => (0, 0), }; out.set_region(env, 0, &[bytes, precision])?; Ok(()) diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index b789142235a..35c9aa1fa9a 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -249,10 +249,10 @@ impl DataSource for MultiLayoutDataSource { } } - fn byte_size(&self) -> Option> { + fn byte_size(&self) -> Precision { let total_count = self.children.len() as u64; if total_count == 0 { - return Some(Precision::exact(0u64)); + return Precision::exact(0u64); } let mut sum: u64 = 0; @@ -265,15 +265,15 @@ impl DataSource for MultiLayoutDataSource { } if known_count == 0 { - return None; + return Precision::Absent; } if known_count == total_count { - Some(Precision::exact(sum)) + Precision::exact(sum) } else { let avg = sum / known_count; let extrapolated = avg.saturating_mul(total_count); - Some(Precision::inexact(extrapolated)) + Precision::inexact(extrapolated) } }