[AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables.#2016
[AURON #2015] Add Native Scan Support for Apache Iceberg Copy-On-Write Tables.#2016slfan1989 wants to merge 6 commits intoapache:masterfrom
Conversation
f954ce5 to
41e9318
Compare
dev/reformat
Outdated
| # Check or format all code, including third-party code, with spark-3.4 | ||
| sparkver=spark-3.5 |
There was a problem hiding this comment.
Should the comment be spark-3.5?
There was a problem hiding this comment.
Pull request overview
This PR adds native scan support for Apache Iceberg Copy-On-Write (COW) tables to the Auron execution engine, enabling direct reads of Iceberg data files through Auron's native path for improved performance. The implementation follows the established SPI (Service Provider Interface) pattern used by other data source integrations like Paimon, with automatic fallback to Spark's execution path for unsupported scenarios.
Changes:
- Adds IcebergConvertProvider SPI extension to detect and convert Iceberg BatchScanExec nodes to native execution
- Implements validation logic to determine COW table eligibility (no delete files, no metadata columns, supported data types)
- Creates NativeIcebergTableScanExec to execute native Iceberg scans with Parquet/ORC format support
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala | SPI provider that checks version compatibility and delegates to IcebergScanSupport |
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala | Core validation logic to determine native scan eligibility and extract FileScanTask metadata via reflection |
| thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala | Native execution node that converts Iceberg tasks to FilePartitions and generates protobuf scan plans |
| thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala | Integration tests covering COW tables, projections, partitioning, ORC format, and fallback scenarios |
| thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/BaseAuronIcebergSuite.scala | Test base configuration with Auron and Iceberg extensions enabled |
| thirdparty/auron-iceberg/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider | SPI registration file for IcebergConvertProvider |
| thirdparty/auron-iceberg/pom.xml | Maven enforcer rules to validate Iceberg version (1.10.1) and Spark version (3.4-4.0) compatibility |
| spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java | Adds ENABLE_ICEBERG_SCAN configuration option |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala | Adds default value handling for shuffle manager configuration |
| spark-extension/pom.xml | Adds arrow-memory-core and arrow-memory-netty dependencies |
| pom.xml | Adds Iceberg version properties and enforcer rules for all Spark version profiles |
| auron-build.sh | Updates Iceberg version support to 1.10.1 and Spark version range to 3.4-4.0 |
| dev/reformat | Updates formatting script to include Iceberg module with version 1.10.1 |
| .github/workflows/iceberg.yml | CI workflow for testing Iceberg integration across Spark 3.4, 3.5, 4.0 with multiple Java versions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...rty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
Show resolved
Hide resolved
...rty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala
Show resolved
Hide resolved
...rty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala
Outdated
Show resolved
Hide resolved
…n-Write Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
…n-Write Tables. Signed-off-by: slfan1989 <slfan1989@apache.org>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...rty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala
Outdated
Show resolved
Hide resolved
| val raw = inputPartitionsMethod | ||
| .orElse(partitionsMethod) | ||
| .map(_.invoke(exec)) | ||
| .getOrElse(Seq.empty) |
There was a problem hiding this comment.
The reflective call _.invoke(exec) can throw (e.g., InvocationTargetException, IllegalAccessException) and currently isn’t caught. Because isSupported() is called outside tryConvert, any exception here can fail query planning rather than falling back. Wrap the reflective invocation + normalization in a try/catch (e.g., NonFatal) and return Seq.empty/failure so the provider can safely fall back.
| try { | ||
| // SparkInputPartition is package-private; use reflection to read its task group. | ||
| val taskGroupField = partition.getClass.getDeclaredField("taskGroup") | ||
| taskGroupField.setAccessible(true) | ||
| val taskGroup = taskGroupField.get(partition) | ||
|
|
||
| // Extract tasks and keep only file scan tasks. | ||
| val tasksMethod = taskGroup.getClass.getDeclaredMethod("tasks") | ||
| tasksMethod.setAccessible(true) | ||
| val tasks = tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]].asScala | ||
| val fileTasks = tasks.collect { case task: FileScanTask => task }.toSeq | ||
|
|
||
| // If any task is not a FileScanTask, fallback. | ||
| if (fileTasks.size != tasks.size) { | ||
| return None | ||
| } | ||
|
|
||
| Some(IcebergPartitionView(fileTasks)) | ||
| } catch { | ||
| case _: ReflectiveOperationException => None | ||
| } |
There was a problem hiding this comment.
icebergPartition only catches ReflectiveOperationException, but setAccessible(true) can throw runtime exceptions (e.g., InaccessibleObjectException on newer JDK/module settings or SecurityException). Since this runs in isSupported(), uncaught runtime exceptions can fail planning. Catch NonFatal (optionally with a debug log) and return None to ensure a safe fallback.
...rty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
| writer.write(delete) | ||
| writer.close() |
There was a problem hiding this comment.
writer.close() is not in a finally, so an exception during writer.write(delete) would leave the writer unclosed (and may keep the output file open). Consider using try/finally (or Using.resource) to guarantee the writer is closed even on failure.
| writer.write(delete) | |
| writer.close() | |
| try { | |
| writer.write(delete) | |
| } finally { | |
| writer.close() | |
| } |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…rg/AuronIcebergIntegrationSuite.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…uron/iceberg/IcebergScanSupport.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Which issue does this PR close?
Closes #2015
Rationale for this change
This PR adds native scan support for Apache Iceberg Copy-On-Write (COW) tables to improve query performance. Currently, Auron lacks direct integration with Iceberg, forcing all Iceberg queries to use Spark's native execution path, missing opportunities for native engine acceleration.
Key Motivations:
What changes are included in this PR?
Core Implementation:
Build & Configuration:
pom.xmlwith Iceberg version management and Maven enforcer rulesauron-build.shto support Iceberg build parametersspark.auron.enable.iceberg.scan(default: true)Supported Features:
Version Support:
Are there any user-facing changes?
No Breaking Changes: Existing functionality remains unchanged. Iceberg support is additive and disabled by default in unsupported scenarios.
How was this patch tested?
Unit & Integration Tests:
Test Environment: