Skip to content

Spark: implement SupportsPushDownVariantExtractions for shredded variant column pruning (plan rewrite) #16448

@qlong

Description

@qlong

Feature Request / Improvement

Iceberg can write shredded variant columns to Parquet (#14297). On the read path,
SparkScanBuilder does not implement Spark 4.1's SupportsPushDownVariantExtractions,
so Spark never rewrites variant_get(...) into struct field accesses and never prunes the
scan output schema to the requested shredded fields.

As a result, even when queries only need one or two paths (e.g.
variant_get(payload, '$.size', 'long')), the optimized plan still evaluates
variant_get in a Spark Filter above BatchScan, and the scan output still
treats payload as VariantType. The reader therefore loads the full shredded
Parquet layout (hundreds of typed_value.* columns in practice), materializes a
full VARIANT per row, and applies path extraction only after the scan. That reconstruction
is expensive: on the GitHub Activities 1-day shredded Iceberg table (GHA), a simple
filter/count query was ~14× slower than the same workload on a plain JSON string column
(~63s vs ~4.4s), despite only ~1.6× more storage.

(Predicate pushdown of variant_get into IcebergScan filters for manifest
planning is handled separately #15385. it does not rewrite the plan or avoid reading
all shredded columns per file.)

This issue implements the DSv2 contract (plan rewrite). A follow-on change will wire the
annotated readSchema() into the Parquet reader and avoid full-variant reconstruction (I/O reduction).

Plan rewrite example
For this query:

CREATE TABLE events (
  id INT,
  type STRING,
  payload VARIANT
) USING iceberg
TBLPROPERTIES ('format-version' = '3'); 

SELECT count(*) AS large_events
FROM events
WHERE type = 'PushEvent'
  AND variant_get(payload, '$.size', 'long') > 5;

Before (today — no SupportsPushDownVariantExtractions)


Aggregate [count(1)]
+- Filter (... type = PushEvent)
              AND (variant_get(payload#22, $.size, LongType, ...) > 5))    ← still a function call
      +- RelationV2[type#19, payload#22]


Filter (variant_get(payload#22, $.size, ...) > 5)     ← runs per row AFTER scan
  +- BatchScan [type#15, payload#18]
       IcebergScan(..., filters=type IS NOT NULL, payload IS NOT NULL, type = 'PushEvent')
       ReadSchema: full payload variant / all shredded columns

After (DSv2 plan rewrite)

Aggregate [count(1)]
+- Filter (... type = PushEvent)
              AND (payload#25.0 > 5))                    ← struct field access, not variant_get
      +- RelationV2[type#24, payload#25]


Filter (isnotnull(payload#25) AND (payload#25.0 > 5))    
  +- BatchScan [type#24, payload#25]
       readSchema(): struct<payload: struct<0: bigint, ...>>

Note

This issue changes the logical/physical plan shape and readSchema() contract. Parquet may still read all shredded columns until the follow-on wires readSchema() into the reader.

Query engine

Spark

Willingness to contribute

  • I can contribute this improvement/feature independently
  • I would be willing to contribute this improvement/feature with guidance from the Iceberg community
  • I cannot contribute this improvement/feature at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    improvementPR that improves existing functionality

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions