From 793d640142135b977e761b73a160588ab750b9b4 Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Sun, 10 May 2026 20:28:50 -0700 Subject: [PATCH 1/5] [GH-2938] Push down ST_BoxIntersects/ST_BoxContains to GeoParquet bbox metadata Adds Box2DLeafFilter, a new GeoParquetSpatialFilter variant that prunes files using the geometry column whose covering metadata references the Box2D column in the predicate. Both ST_BoxIntersects and ST_BoxContains push down to a file-level INTERSECTS check: per-row containment implies per-row intersection, which is only possible if the file's union envelope intersects the query box. Pushdown is sound (never produces false negatives) and well-targeted when the Box2D column is the registered GeoParquet 1.1 covering for a geometry column (auto-detected _bbox columns and explicit geoparquet.covering options both fit). Tests in GeoParquetSpatialFilterPushDownSuite cover: - ST_BoxIntersects with a Box2D query against a quadrant-partitioned dataset; correct region pruning for Q1-only, left-half, and fully-disjoint windows. - ST_BoxContains with a tiny query box inside Q1; pushes down as INTERSECTS at the file level. Pairs naturally with the deferred GeoParquet reader auto- materialization of bbox covering columns as Box2D (#2877). Closes #2938. --- .../geoparquet/GeoParquetSpatialFilter.scala | 54 +++++++++++++++ .../SpatialFilterPushDownForGeoParquet.scala | 27 +++++++- ...GeoParquetSpatialFilterPushDownSuite.scala | 68 +++++++++++++++++++ 3 files changed, 147 insertions(+), 2 deletions(-) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala index cacc64d94b4..ce1d90c386b 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala @@ -18,6 +18,7 @@ */ package org.apache.spark.sql.execution.datasources.geoparquet +import org.apache.sedona.common.geometryObjects.Box2D import org.apache.sedona.core.spatialOperator.SpatialPredicate import org.locationtech.jts.geom.Envelope import org.locationtech.jts.geom.Geometry @@ -88,4 +89,57 @@ object GeoParquetSpatialFilter { } override def simpleString: String = s"$columnName ${predicateType.name} $queryWindow" } + + /** + * Pushdown filter for predicates that operate on a Box2D-typed column (e.g. + * `ST_BoxIntersects(box_col, lit_box)` or `ST_BoxContains(box_col, lit_box)`). + * + * Per-file evaluation: walks the file's GeoParquet column metadata to find the geometry column + * whose covering metadata points at `box2dColumnName`, then prunes using that geometry column's + * recorded bbox. + * + * Both intersects and contains map to a file-level INTERSECTS check: per-row containment + * implies per-row intersection, which implies the file's union envelope must intersect the + * query box for any row to match. If no geometry column references this Box2D column as its + * covering, the file is kept (cannot prune safely). + * + * @param box2dColumnName + * the Box2D column referenced by the predicate + * @param queryBox + * the literal Box2D from the predicate's RHS + */ + case class Box2DLeafFilter(box2dColumnName: String, queryBox: Box2D) + extends GeoParquetSpatialFilter { + + override def evaluate(columns: Map[String, GeometryFieldMetaData]): Boolean = { + // Find the geometry column whose covering metadata points at this Box2D column. + val matchingGeomEntry = columns.find { case (_, field) => + field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName)) + } + + matchingGeomEntry match { + case Some((_, field)) => + // Use the geometry column's recorded bbox to prune. The union of per-row Box2D values + // is a superset of the geometry column's bbox (covering boxes are at least as wide as + // their geometries), so if the geom-column bbox does not intersect the query box, no + // row's Box2D can intersect either. May leave some files unpruned when Box2D values + // are conservatively wider than geometries, but never produces false negatives. + val bbox = field.bbox.getOrElse(return true) + if (bbox.isEmpty) return true + val fileXMin = bbox(0) + val fileYMin = bbox(1) + val fileXMax = bbox(2) + val fileYMax = bbox(3) + !(fileXMax < queryBox.getXMin || fileXMin > queryBox.getXMax + || fileYMax < queryBox.getYMin || fileYMin > queryBox.getYMax) + case None => + // No geometry column references this Box2D column as covering — cannot prune safely. + true + } + } + + override def simpleString: String = + s"$box2dColumnName INTERSECTS BOX(${queryBox.getXMin} ${queryBox.getYMin}, " + + s"${queryBox.getXMax} ${queryBox.getYMax})" + } } diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala index 7c8fefca473..fed1e72a6e4 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala @@ -18,6 +18,7 @@ */ package org.apache.spark.sql.sedona_sql.optimization +import org.apache.sedona.common.geometryObjects.Box2D import org.apache.sedona.common.sphere.Haversine import org.apache.sedona.core.spatialOperator.SpatialPredicate import org.apache.sedona.sql.utils.GeometrySerializer @@ -42,10 +43,12 @@ import org.apache.spark.sql.execution.datasources.PushableColumnBase import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormatBase import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.AndFilter +import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.Box2DLeafFilter import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.LeafFilter import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.OrFilter -import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT -import org.apache.spark.sql.sedona_sql.expressions.{ST_AsEWKT, ST_Buffer, ST_Contains, ST_CoveredBy, ST_Covers, ST_Crosses, ST_DWithin, ST_Distance, ST_DistanceSphere, ST_DistanceSpheroid, ST_Equals, ST_Intersects, ST_OrderingEquals, ST_Overlaps, ST_Touches, ST_Within} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sedona_sql.UDT.{Box2DUDT, GeometryUDT} +import org.apache.spark.sql.sedona_sql.expressions.{ST_AsEWKT, ST_BoxContains, ST_BoxIntersects, ST_Buffer, ST_Contains, ST_CoveredBy, ST_Covers, ST_Crosses, ST_DWithin, ST_Distance, ST_DistanceSphere, ST_DistanceSpheroid, ST_Equals, ST_Intersects, ST_OrderingEquals, ST_Overlaps, ST_Touches, ST_Within} import org.apache.spark.sql.sedona_sql.optimization.ExpressionUtils.splitConjunctivePredicates import org.apache.spark.sql.types.DoubleType import org.locationtech.jts.geom.Geometry @@ -144,6 +147,16 @@ class SpatialFilterPushDownForGeoParquet(sparkSession: SparkSession) extends Rul SpatialPredicate.INTERSECTS, GeometryUDT.deserialize(value)) + // Box2D predicates push down to an INTERSECTS check on the geometry column whose covering + // metadata references this Box2D column. Both BoxIntersects and BoxContains use INTERSECTS + // semantics at the file level: per-row containment implies per-row intersection, which is + // only possible if the geometry column's file-level bbox intersects the query box. + case ST_BoxIntersects(_) | ST_BoxContains(_) => + for { + (name, value) <- resolveNameAndLiteral(predicate.children, pushableColumn) + queryBox <- extractBox2DLiteral(value) + } yield Box2DLeafFilter(unquote(name), queryBox) + case LessThan(ST_Distance(distArgs), Literal(d, DoubleType)) => for ((name, value) <- resolveNameAndLiteral(distArgs, pushableColumn)) yield distanceFilter(name, GeometryUDT.deserialize(value), d.asInstanceOf[Double]) @@ -256,6 +269,16 @@ class SpatialFilterPushDownForGeoParquet(sparkSession: SparkSession) extends Rul parseColumnPath(name).mkString(".") } + /** + * Extract a [[Box2D]] from a Catalyst literal value. Box2DUDT serializes to an InternalRow of + * four doubles; if the value is something else, the predicate is not pushable. + */ + private def extractBox2DLiteral(value: Any): Option[Box2D] = value match { + case row: InternalRow if row.numFields == 4 => + Some(new Box2D(row.getDouble(0), row.getDouble(1), row.getDouble(2), row.getDouble(3))) + case _ => None + } + private def resolveNameAndLiteral( expressions: Seq[Expression], pushableColumn: PushableColumnBase): Option[(String, Any)] = { diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala index 2f37488d1a5..f8ee61f4784 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.SimpleMode import org.apache.spark.sql.execution.datasources.geoparquet.{GeoParquetFileFormat, GeoParquetMetaData, GeoParquetSpatialFilter} +import org.apache.spark.sql.functions.expr import org.locationtech.jts.geom.Coordinate import org.locationtech.jts.geom.Geometry import org.locationtech.jts.geom.GeometryFactory @@ -317,6 +318,73 @@ class GeoParquetSpatialFilterPushDownSuite extends TestBaseScala with TableDrive assert(getPushedDownSpatialFilter(dfFiltered).isEmpty) } } + + it("Push down ST_BoxIntersects against a Box2D covering column") { + val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture() + try { + // Q1 region only (region 1, center +10/+10) + val q1Filter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)))" + verifyBox2DFilter(box2dDf, box2dMetaMap, q1Filter, Seq(1)) + + // Window covering Q2 and Q4 (negative X) — should preserve regions 0 and 2 + val leftHalfFilter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(-20.0, -20.0), ST_Point(-1.0, 20.0)))" + verifyBox2DFilter(box2dDf, box2dMetaMap, leftHalfFilter, Seq(0, 2)) + + // Disjoint window prunes everything + val disjointFilter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(100.0, 100.0), ST_Point(200.0, 200.0)))" + verifyBox2DFilter(box2dDf, box2dMetaMap, disjointFilter, Seq.empty) + } finally { + FileUtils.deleteDirectory(new File(box2dDir).getParentFile) + } + } + + it("Push down ST_BoxContains against a Box2D covering column") { + val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture() + try { + // ST_BoxContains(box_col, lit_box) pushes down as INTERSECTS at the file level. A tiny + // query box inside Q1 prunes everything except region 1. + val containsFilter = + "ST_BoxContains(geom_bbox, ST_MakeBox2D(ST_Point(9.0, 9.0), ST_Point(10.0, 10.0)))" + verifyBox2DFilter(box2dDf, box2dMetaMap, containsFilter, Seq(1)) + } finally { + FileUtils.deleteDirectory(new File(box2dDir).getParentFile) + } + } + } + + private def setupBox2DCoveringFixture() + : (DataFrame, String, Map[Int, Seq[GeoParquetMetaData]]) = { + val box2dParent = + Files.createTempDirectory("sedona_geoparquet_box2d_").toFile.getAbsolutePath + val box2dDir = box2dParent + "/data" + val withBox = df.withColumn("geom_bbox", expr("ST_Box2D(geom)")) + withBox.coalesce(1).write.partitionBy("region").format("geoparquet").save(box2dDir) + val box2dDf = sparkSession.read.format("geoparquet").load(box2dDir) + val box2dMetaMap = readGeoParquetMetaDataMap(box2dDir) + (box2dDf, box2dDir, box2dMetaMap) + } + + private def verifyBox2DFilter( + box2dDf: DataFrame, + box2dMetaMap: Map[Int, Seq[GeoParquetMetaData]], + condition: String, + expectedPreservedRegions: Seq[Int]): Unit = { + val dfFiltered = box2dDf.where(condition) + val pushed = getPushedDownSpatialFilter(dfFiltered) + assert(pushed.isDefined, s"Expected filter push-down for: $condition") + val preserved = box2dMetaMap + .filter { case (_, metaDataList) => + metaDataList.exists(metadata => pushed.get.evaluate(metadata.columns)) + } + .keys + .toSeq + .sorted + assert( + expectedPreservedRegions.sorted == preserved, + s"Expected $expectedPreservedRegions, got $preserved for: $condition") } /** From 23829ab4ccd05eb28d690bed7668cf1de422344d Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Sun, 10 May 2026 23:24:40 -0700 Subject: [PATCH 2/5] Address review on GH-2938 Box2D filter pushdown 1. Soundness caveat. The pushdown uses the file's geom-column bbox to prune, which is sound only when per-row Box2D = per-row geometry envelope. The GeoParquet spec permits conservatively wider covering bboxes (sedona-db does this via next_after Float32 rounding), in which case pruning can drop matching files. Documented the limitation, added an opt-out conf (spark.sedona.geoparquet.box2dFilterPushDown, default on), and flagged Parquet-column-statistics-based proper pruning as the follow-up. Sedona's own writer produces exact envelopes via ST_Box2D(geom), so the common path is sound. 2. Deterministic matching. columns.find returned an arbitrary match when multiple geometry columns referenced the same covering. Now collects all matches and requires exactly one; falls back to keep- file otherwise. 3. Removed unused Box2DUDT import in the pushdown rule. 4. Test correctness. verifyBox2DFilter now compares the pushed-down result against the same query with pushdown disabled, catching cases where pruning is over-aggressive. --- .../geoparquet/GeoParquetSpatialFilter.scala | 43 ++++++++++++------- .../SpatialFilterPushDownForGeoParquet.scala | 13 +++++- ...GeoParquetSpatialFilterPushDownSuite.scala | 14 ++++++ 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala index ce1d90c386b..b9ac4e0ce77 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala @@ -99,9 +99,21 @@ object GeoParquetSpatialFilter { * recorded bbox. * * Both intersects and contains map to a file-level INTERSECTS check: per-row containment - * implies per-row intersection, which implies the file's union envelope must intersect the - * query box for any row to match. If no geometry column references this Box2D column as its - * covering, the file is kept (cannot prune safely). + * implies per-row intersection, so the file's recorded geom bbox must intersect the query box + * for any row to match. + * + * '''Soundness caveat.''' The GeoParquet 1.1 spec allows covering bboxes to be conservatively + * wider than per-row geometry envelopes (e.g. sedona-db's Float32 writer rounds outward via + * `next_after`). When that happens, the union of per-row Box2D values is a strict superset of + * the file's geom bbox, and pruning using the geom bbox can produce false negatives. Pushdown + * is sound when the Box2D column is exactly the per-row geometry envelope — which is the case + * for Sedona's own writer (`ST_Box2D(geom)` produces exact envelopes). A proper Parquet column + * statistics-based pruning that operates on the Box2D column's own xmin/ymin/xmax/ymax bounds + * is tracked as a follow-up; until then this filter is opt-out via + * `spark.sedona.geoparquet.box2dFilterPushDown`. + * + * Ambiguity: if multiple geometry columns reference the same Box2D column as their covering + * (unusual), the file is kept rather than picking an arbitrary one. * * @param box2dColumnName * the Box2D column referenced by the predicate @@ -112,18 +124,16 @@ object GeoParquetSpatialFilter { extends GeoParquetSpatialFilter { override def evaluate(columns: Map[String, GeometryFieldMetaData]): Boolean = { - // Find the geometry column whose covering metadata points at this Box2D column. - val matchingGeomEntry = columns.find { case (_, field) => - field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName)) - } + // Find all geometry columns whose covering metadata points at this Box2D column. Require + // exactly one match — multiple matches are ambiguous and we fall back to keep-file. + val matchingGeomFields = columns.collect { + case (_, field) + if field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName)) => + field + }.toSeq - matchingGeomEntry match { - case Some((_, field)) => - // Use the geometry column's recorded bbox to prune. The union of per-row Box2D values - // is a superset of the geometry column's bbox (covering boxes are at least as wide as - // their geometries), so if the geom-column bbox does not intersect the query box, no - // row's Box2D can intersect either. May leave some files unpruned when Box2D values - // are conservatively wider than geometries, but never produces false negatives. + matchingGeomFields match { + case Seq(field) => val bbox = field.bbox.getOrElse(return true) if (bbox.isEmpty) return true val fileXMin = bbox(0) @@ -132,8 +142,9 @@ object GeoParquetSpatialFilter { val fileYMax = bbox(3) !(fileXMax < queryBox.getXMin || fileXMin > queryBox.getXMax || fileYMax < queryBox.getYMin || fileYMin > queryBox.getYMax) - case None => - // No geometry column references this Box2D column as covering — cannot prune safely. + case _ => + // Zero matches: no covering registered for this column. Multiple matches: ambiguous. + // Either way, cannot prune safely. true } } diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala index fed1e72a6e4..d78f8f3b1f7 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFi import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.LeafFilter import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.OrFilter import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sedona_sql.UDT.{Box2DUDT, GeometryUDT} +import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.sedona_sql.expressions.{ST_AsEWKT, ST_BoxContains, ST_BoxIntersects, ST_Buffer, ST_Contains, ST_CoveredBy, ST_Covers, ST_Crosses, ST_DWithin, ST_Distance, ST_DistanceSphere, ST_DistanceSpheroid, ST_Equals, ST_Intersects, ST_OrderingEquals, ST_Overlaps, ST_Touches, ST_Within} import org.apache.spark.sql.sedona_sql.optimization.ExpressionUtils.splitConjunctivePredicates import org.apache.spark.sql.types.DoubleType @@ -56,6 +56,11 @@ import org.locationtech.jts.geom.Point class SpatialFilterPushDownForGeoParquet(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private def enableBox2DFilterPushDown: Boolean = + sparkSession.conf + .get("spark.sedona.geoparquet.box2dFilterPushDown", "true") + .toBoolean + override def apply(plan: LogicalPlan): LogicalPlan = { val enableSpatialFilterPushDown = sparkSession.conf.get("spark.sedona.geoparquet.spatialFilterPushDown", "true").toBoolean @@ -151,7 +156,11 @@ class SpatialFilterPushDownForGeoParquet(sparkSession: SparkSession) extends Rul // metadata references this Box2D column. Both BoxIntersects and BoxContains use INTERSECTS // semantics at the file level: per-row containment implies per-row intersection, which is // only possible if the geometry column's file-level bbox intersects the query box. - case ST_BoxIntersects(_) | ST_BoxContains(_) => + // Soundness assumes the Box2D column is exactly the per-row geometry envelope (true for + // ST_Box2D(geom)-derived columns including the auto-generated _bbox path). When the + // covering column is conservatively wider than the geometry, pushdown can drop matching + // files; the box2dFilterPushDown conf lets users opt out in that case. + case ST_BoxIntersects(_) | ST_BoxContains(_) if enableBox2DFilterPushDown => for { (name, value) <- resolveNameAndLiteral(predicate.children, pushableColumn) queryBox <- extractBox2DLiteral(value) diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala index f8ee61f4784..8e837f91418 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala @@ -385,6 +385,20 @@ class GeoParquetSpatialFilterPushDownSuite extends TestBaseScala with TableDrive assert( expectedPreservedRegions.sorted == preserved, s"Expected $expectedPreservedRegions, got $preserved for: $condition") + + // Verify the pushed-down result matches the result computed without push-down (i.e. that + // pruning did not drop any rows we should have kept). + val expectedResult = withConf(Map("spark.sedona.geoparquet.box2dFilterPushDown" -> "false")) { + box2dDf + .where(condition) + .orderBy("region", "id") + .select("region", "id") + .collect() + .toSeq + } + val actualResult = + dfFiltered.orderBy("region", "id").select("region", "id").collect().toSeq + assert(expectedResult == actualResult, s"Result mismatch under push-down for: $condition") } /** From 3c2366609e175305f548f15671159822b8191e76 Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Mon, 11 May 2026 21:50:18 -0700 Subject: [PATCH 3/5] Make Box2D filter pushdown opt-in (default false) Round-2 review pointed out that default-on can silently produce wrong results for spec-compliant GeoParquet files with conservatively-wider covering columns. Flipping the default keeps correctness as the non-surprising baseline; users with Sedona-written files (where covering = exact envelope) opt in via spark.sedona.geoparquet.box2dFilterPushDown=true. Default flips back to true once the proper Parquet-column-statistics-based pruning lands in #2949. Tests: - Existing ST_BoxIntersects / ST_BoxContains tests now wrap their pushdown assertions in withConf(box2dFilterPushDown=true). - Added reverse-arg-order coverage: ST_BoxIntersects(lit_box, col) and ST_BoxContains(lit_box, col) should produce identical pruning to the col-first orientation. - Added a 'disabled by default' test asserting no pushdown is emitted without the opt-in conf. --- .../SpatialFilterPushDownForGeoParquet.scala | 18 +++-- ...GeoParquetSpatialFilterPushDownSuite.scala | 68 +++++++++++++------ 2 files changed, 63 insertions(+), 23 deletions(-) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala index d78f8f3b1f7..a1d339a195a 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala @@ -56,9 +56,19 @@ import org.locationtech.jts.geom.Point class SpatialFilterPushDownForGeoParquet(sparkSession: SparkSession) extends Rule[LogicalPlan] { + /** + * Box2D filter pushdown is opt-in (default `false`) because pruning via the geometry column's + * recorded bbox is only sound when per-row Box2D values equal per-row geometry envelopes. + * Sedona's own writer satisfies this (`ST_Box2D(geom)` produces exact envelopes), but the + * GeoParquet 1.1 spec permits conservatively wider coverings (e.g., sedona-db's Float32 writer + * rounds outward via `next_after`), which can produce false negatives. Users who know their + * files were written with exact coverings can enable this via + * `spark.sedona.geoparquet.box2dFilterPushDown=true`. The proper Parquet + * column-statistics-based pruning that removes this caveat is tracked in #2949. + */ private def enableBox2DFilterPushDown: Boolean = sparkSession.conf - .get("spark.sedona.geoparquet.box2dFilterPushDown", "true") + .get("spark.sedona.geoparquet.box2dFilterPushDown", "false") .toBoolean override def apply(plan: LogicalPlan): LogicalPlan = { @@ -156,10 +166,10 @@ class SpatialFilterPushDownForGeoParquet(sparkSession: SparkSession) extends Rul // metadata references this Box2D column. Both BoxIntersects and BoxContains use INTERSECTS // semantics at the file level: per-row containment implies per-row intersection, which is // only possible if the geometry column's file-level bbox intersects the query box. - // Soundness assumes the Box2D column is exactly the per-row geometry envelope (true for + // Soundness requires the Box2D column to be exactly the per-row geometry envelope (true for // ST_Box2D(geom)-derived columns including the auto-generated _bbox path). When the - // covering column is conservatively wider than the geometry, pushdown can drop matching - // files; the box2dFilterPushDown conf lets users opt out in that case. + // covering column is conservatively wider than the geometry (spec-permitted), pushdown can + // drop matching files — see #2949. The box2dFilterPushDown conf is opt-in by default. case ST_BoxIntersects(_) | ST_BoxContains(_) if enableBox2DFilterPushDown => for { (name, value) <- resolveNameAndLiteral(predicate.children, pushableColumn) diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala index 8e837f91418..11881bd4f3d 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala @@ -322,20 +322,28 @@ class GeoParquetSpatialFilterPushDownSuite extends TestBaseScala with TableDrive it("Push down ST_BoxIntersects against a Box2D covering column") { val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture() try { - // Q1 region only (region 1, center +10/+10) - val q1Filter = - "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)))" - verifyBox2DFilter(box2dDf, box2dMetaMap, q1Filter, Seq(1)) - - // Window covering Q2 and Q4 (negative X) — should preserve regions 0 and 2 - val leftHalfFilter = - "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(-20.0, -20.0), ST_Point(-1.0, 20.0)))" - verifyBox2DFilter(box2dDf, box2dMetaMap, leftHalfFilter, Seq(0, 2)) - - // Disjoint window prunes everything - val disjointFilter = - "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(100.0, 100.0), ST_Point(200.0, 200.0)))" - verifyBox2DFilter(box2dDf, box2dMetaMap, disjointFilter, Seq.empty) + withConf(Map("spark.sedona.geoparquet.box2dFilterPushDown" -> "true")) { + // Q1 region only (region 1, center +10/+10) + val q1Filter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)))" + verifyBox2DFilter(box2dDf, box2dMetaMap, q1Filter, Seq(1)) + + // Window covering Q2 and Q4 (negative X) — should preserve regions 0 and 2 + val leftHalfFilter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(-20.0, -20.0), ST_Point(-1.0, 20.0)))" + verifyBox2DFilter(box2dDf, box2dMetaMap, leftHalfFilter, Seq(0, 2)) + + // Disjoint window prunes everything + val disjointFilter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(100.0, 100.0), ST_Point(200.0, 200.0)))" + verifyBox2DFilter(box2dDf, box2dMetaMap, disjointFilter, Seq.empty) + + // Reverse argument order: ST_BoxIntersects(lit, col) is symmetric and should produce + // the same pruning as ST_BoxIntersects(col, lit). + val reversedFilter = + "ST_BoxIntersects(ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)), geom_bbox)" + verifyBox2DFilter(box2dDf, box2dMetaMap, reversedFilter, Seq(1)) + } } finally { FileUtils.deleteDirectory(new File(box2dDir).getParentFile) } @@ -344,11 +352,33 @@ class GeoParquetSpatialFilterPushDownSuite extends TestBaseScala with TableDrive it("Push down ST_BoxContains against a Box2D covering column") { val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture() try { - // ST_BoxContains(box_col, lit_box) pushes down as INTERSECTS at the file level. A tiny - // query box inside Q1 prunes everything except region 1. - val containsFilter = - "ST_BoxContains(geom_bbox, ST_MakeBox2D(ST_Point(9.0, 9.0), ST_Point(10.0, 10.0)))" - verifyBox2DFilter(box2dDf, box2dMetaMap, containsFilter, Seq(1)) + withConf(Map("spark.sedona.geoparquet.box2dFilterPushDown" -> "true")) { + // ST_BoxContains(box_col, lit_box) pushes down as INTERSECTS at the file level. A tiny + // query box inside Q1 prunes everything except region 1. + val containsFilter = + "ST_BoxContains(geom_bbox, ST_MakeBox2D(ST_Point(9.0, 9.0), ST_Point(10.0, 10.0)))" + verifyBox2DFilter(box2dDf, box2dMetaMap, containsFilter, Seq(1)) + + // Reverse argument order: ST_BoxContains(lit_box, col) — at the file level both + // orderings devolve to INTERSECTS pruning. + val reversedFilter = + "ST_BoxContains(ST_MakeBox2D(ST_Point(9.0, 9.0), ST_Point(10.0, 10.0)), geom_bbox)" + verifyBox2DFilter(box2dDf, box2dMetaMap, reversedFilter, Seq(1)) + } + } finally { + FileUtils.deleteDirectory(new File(box2dDir).getParentFile) + } + } + + it("Box2D filter pushdown disabled by default") { + val (box2dDf, box2dDir, _) = setupBox2DCoveringFixture() + try { + val dfFiltered = box2dDf.where( + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)))") + assert( + getPushedDownSpatialFilter(dfFiltered).isEmpty, + "Box2D filter pushdown should be off by default until #2949 lands proper Parquet " + + "column-statistics-based pruning") } finally { FileUtils.deleteDirectory(new File(box2dDir).getParentFile) } From d2d5644795429f139d7853a86f3e349dfe7c3a75 Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Mon, 11 May 2026 22:25:29 -0700 Subject: [PATCH 4/5] Push down Box2D predicates via Parquet row-group statistics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the file-metadata-based Box2DLeafFilter (which pruned using the geometry column's bbox — unsound when GeoParquet 1.1 coverings are conservatively wider than per-row envelopes) with a translation to a Parquet FilterPredicate over the Box2D column's own (xmin, ymin, xmax, ymax) leaf statistics. - GeoParquetSpatialFilter gains a toParquetFilter hook. AndFilter/OrFilter combine children's predicates; LeafFilter returns None. - Box2DLeafFilter carries a Box2DPredicateKind (Intersects / ColumnContainsLiteral / LiteralContainsColumn) and emits the matching conjunction of four double-column inequalities. evaluate() returns true because pruning now happens at the Parquet layer. - GeoParquetFileFormat ANDs the spatial Parquet predicate into the existing pushed FilterPredicate so Parquet's stats-based skipping prunes row groups whose bounds disprove the predicate. - Pushdown is sound for any writer, so the opt-in spark.sedona.geoparquet.box2dFilterPushDown conf is removed. --- .../geoparquet/GeoParquetFileFormat.scala | 19 ++- .../geoparquet/GeoParquetSpatialFilter.scala | 148 ++++++++++++------ .../SpatialFilterPushDownForGeoParquet.scala | 40 ++--- ...GeoParquetSpatialFilterPushDownSuite.scala | 139 +++++++--------- 4 files changed, 185 insertions(+), 161 deletions(-) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala index 8a5c2c19a0f..72ac543e52d 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala @@ -273,6 +273,15 @@ class GeoParquetFileFormat(val spatialFilter: Option[GeoParquetSpatialFilter]) None } + // Spatial filters that translate to Parquet row-group predicates (e.g. Box2D bounds + // comparisons on a Box2D-typed column) are AND'd into the pushed-down filter so Parquet + // can skip row groups whose column statistics disprove them. + val combinedPushed = spatialFilter.flatMap(_.toParquetFilter) match { + case Some(spatialPredicate) => + Some(pushed.fold(spatialPredicate)(p => FilterApi.and(p, spatialPredicate))) + case None => pushed + } + // Prune file scans using pushed down spatial filters and per-column bboxes in geoparquet metadata val shouldScanFile = GeoParquetMetaData.parseKeyValueMetaData(footerFileMetaData.getKeyValueMetaData).forall { @@ -304,8 +313,10 @@ class GeoParquetFileFormat(val spatialFilter: Option[GeoParquetSpatialFilter]) // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + if (combinedPushed.isDefined) { + ParquetInputFormat.setFilterPredicate( + hadoopAttemptContext.getConfiguration, + combinedPushed.get) } if (enableVectorizedReader) { logWarning( @@ -319,8 +330,8 @@ class GeoParquetFileFormat(val spatialFilter: Option[GeoParquetSpatialFilter]) datetimeRebaseSpec, int96RebaseSpec, options) - val reader = if (pushed.isDefined && enableRecordFilter) { - val parquetFilter = FilterCompat.get(pushed.get, null) + val reader = if (combinedPushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(combinedPushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) } else { new ParquetRecordReader[InternalRow](readSupport) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala index b9ac4e0ce77..65e4437b8a8 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala @@ -18,6 +18,7 @@ */ package org.apache.spark.sql.execution.datasources.geoparquet +import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} import org.apache.sedona.common.geometryObjects.Box2D import org.apache.sedona.core.spatialOperator.SpatialPredicate import org.locationtech.jts.geom.Envelope @@ -29,7 +30,21 @@ import org.locationtech.jts.geom.Geometry * [[org.apache.spark.sql.sedona_sql.optimization.SpatialFilterPushDownForGeoParquet]]. */ trait GeoParquetSpatialFilter { + + /** + * File-level evaluation against GeoParquet column metadata. Used for cheap whole-file pruning + * before reading row-group statistics. Filters that cannot soundly prune at the file metadata + * level should return `true` here and emit their pruning predicate via [[toParquetFilter]]. + */ def evaluate(columns: Map[String, GeometryFieldMetaData]): Boolean + + /** + * Translate this spatial filter into a Parquet [[FilterPredicate]] that the Parquet reader can + * evaluate against row-group statistics. Returns `None` if the filter cannot be expressed as a + * Parquet predicate (e.g. arbitrary JTS predicates on a geometry column). + */ + def toParquetFilter: Option[FilterPredicate] = None + def simpleString: String } @@ -41,6 +56,14 @@ object GeoParquetSpatialFilter { left.evaluate(columns) && right.evaluate(columns) } + override def toParquetFilter: Option[FilterPredicate] = + (left.toParquetFilter, right.toParquetFilter) match { + case (Some(l), Some(r)) => Some(FilterApi.and(l, r)) + case (Some(l), None) => Some(l) + case (None, Some(r)) => Some(r) + case _ => None + } + override def simpleString: String = s"(${left.simpleString}) AND (${right.simpleString})" } @@ -48,6 +71,14 @@ object GeoParquetSpatialFilter { extends GeoParquetSpatialFilter { override def evaluate(columns: Map[String, GeometryFieldMetaData]): Boolean = left.evaluate(columns) || right.evaluate(columns) + + // OR pushdown to Parquet requires both sides translate; otherwise we'd drop matching rows. + override def toParquetFilter: Option[FilterPredicate] = + for { + l <- left.toParquetFilter + r <- right.toParquetFilter + } yield FilterApi.or(l, r) + override def simpleString: String = s"(${left.simpleString}) OR (${right.simpleString})" } @@ -90,67 +121,86 @@ object GeoParquetSpatialFilter { override def simpleString: String = s"$columnName ${predicateType.name} $queryWindow" } + /** + * Semantic kind of a Box2D leaf predicate. Determines which inequality system is emitted as a + * Parquet filter against the four (xmin, ymin, xmax, ymax) leaf columns of a Box2D-typed + * column. + */ + sealed trait Box2DPredicateKind { + def simpleName: String + } + object Box2DPredicateKind { + + /** `ST_BoxIntersects(box_col, lit)` — symmetric, same regardless of argument order. */ + case object Intersects extends Box2DPredicateKind { + override def simpleName: String = "INTERSECTS" + } + + /** `ST_BoxContains(box_col, lit)` — the column box must contain the literal box. */ + case object ColumnContainsLiteral extends Box2DPredicateKind { + override def simpleName: String = "CONTAINS" + } + + /** `ST_BoxContains(lit, box_col)` — the literal box must contain the column box. */ + case object LiteralContainsColumn extends Box2DPredicateKind { + override def simpleName: String = "CONTAINED_BY" + } + } + /** * Pushdown filter for predicates that operate on a Box2D-typed column (e.g. * `ST_BoxIntersects(box_col, lit_box)` or `ST_BoxContains(box_col, lit_box)`). * - * Per-file evaluation: walks the file's GeoParquet column metadata to find the geometry column - * whose covering metadata points at `box2dColumnName`, then prunes using that geometry column's - * recorded bbox. - * - * Both intersects and contains map to a file-level INTERSECTS check: per-row containment - * implies per-row intersection, so the file's recorded geom bbox must intersect the query box - * for any row to match. - * - * '''Soundness caveat.''' The GeoParquet 1.1 spec allows covering bboxes to be conservatively - * wider than per-row geometry envelopes (e.g. sedona-db's Float32 writer rounds outward via - * `next_after`). When that happens, the union of per-row Box2D values is a strict superset of - * the file's geom bbox, and pruning using the geom bbox can produce false negatives. Pushdown - * is sound when the Box2D column is exactly the per-row geometry envelope — which is the case - * for Sedona's own writer (`ST_Box2D(geom)` produces exact envelopes). A proper Parquet column - * statistics-based pruning that operates on the Box2D column's own xmin/ymin/xmax/ymax bounds - * is tracked as a follow-up; until then this filter is opt-out via - * `spark.sedona.geoparquet.box2dFilterPushDown`. - * - * Ambiguity: if multiple geometry columns reference the same Box2D column as their covering - * (unusual), the file is kept rather than picking an arbitrary one. + * Pruning is performed by translating the predicate into per-leaf inequalities on the Box2D + * column's four `Double` fields (`xmin`, `ymin`, `xmax`, `ymax`) and pushing the result down as + * a Parquet [[FilterPredicate]]. Parquet's row-group statistics machinery then skips row groups + * whose per-column min/max bounds disprove the predicate. * - * @param box2dColumnName - * the Box2D column referenced by the predicate - * @param queryBox - * the literal Box2D from the predicate's RHS + * File-metadata evaluation returns `true` (i.e. don't prune at the GeoParquet metadata layer) + * because that path relied on the geometry column's bbox and is unsound when the GeoParquet 1.1 + * spec permits coverings to be conservatively wider than per-row envelopes. The Parquet-stats + * path uses the Box2D column's actual recorded min/max, so it is sound for any writer. */ - case class Box2DLeafFilter(box2dColumnName: String, queryBox: Box2D) + case class Box2DLeafFilter( + box2dColumnName: String, + predicateKind: Box2DPredicateKind, + queryBox: Box2D) extends GeoParquetSpatialFilter { - override def evaluate(columns: Map[String, GeometryFieldMetaData]): Boolean = { - // Find all geometry columns whose covering metadata points at this Box2D column. Require - // exactly one match — multiple matches are ambiguous and we fall back to keep-file. - val matchingGeomFields = columns.collect { - case (_, field) - if field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName)) => - field - }.toSeq - - matchingGeomFields match { - case Seq(field) => - val bbox = field.bbox.getOrElse(return true) - if (bbox.isEmpty) return true - val fileXMin = bbox(0) - val fileYMin = bbox(1) - val fileXMax = bbox(2) - val fileYMax = bbox(3) - !(fileXMax < queryBox.getXMin || fileXMin > queryBox.getXMax - || fileYMax < queryBox.getYMin || fileYMin > queryBox.getYMax) - case _ => - // Zero matches: no covering registered for this column. Multiple matches: ambiguous. - // Either way, cannot prune safely. - true + override def evaluate(columns: Map[String, GeometryFieldMetaData]): Boolean = true + + override def toParquetFilter: Option[FilterPredicate] = { + val xmin = FilterApi.doubleColumn(s"$box2dColumnName.xmin") + val ymin = FilterApi.doubleColumn(s"$box2dColumnName.ymin") + val xmax = FilterApi.doubleColumn(s"$box2dColumnName.xmax") + val ymax = FilterApi.doubleColumn(s"$box2dColumnName.ymax") + val qxMin = java.lang.Double.valueOf(queryBox.getXMin) + val qyMin = java.lang.Double.valueOf(queryBox.getYMin) + val qxMax = java.lang.Double.valueOf(queryBox.getXMax) + val qyMax = java.lang.Double.valueOf(queryBox.getYMax) + + val predicate = predicateKind match { + case Box2DPredicateKind.Intersects => + // Intersection: row's xmax >= lit.xmin && xmin <= lit.xmax && ymax >= lit.ymin && ymin <= lit.ymax + FilterApi.and( + FilterApi.and(FilterApi.gtEq(xmax, qxMin), FilterApi.ltEq(xmin, qxMax)), + FilterApi.and(FilterApi.gtEq(ymax, qyMin), FilterApi.ltEq(ymin, qyMax))) + case Box2DPredicateKind.ColumnContainsLiteral => + // Column contains literal: row's xmin <= lit.xmin && xmax >= lit.xmax && ymin <= lit.ymin && ymax >= lit.ymax + FilterApi.and( + FilterApi.and(FilterApi.ltEq(xmin, qxMin), FilterApi.gtEq(xmax, qxMax)), + FilterApi.and(FilterApi.ltEq(ymin, qyMin), FilterApi.gtEq(ymax, qyMax))) + case Box2DPredicateKind.LiteralContainsColumn => + // Literal contains column: row's xmin >= lit.xmin && xmax <= lit.xmax && ymin >= lit.ymin && ymax <= lit.ymax + FilterApi.and( + FilterApi.and(FilterApi.gtEq(xmin, qxMin), FilterApi.ltEq(xmax, qxMax)), + FilterApi.and(FilterApi.gtEq(ymin, qyMin), FilterApi.ltEq(ymax, qyMax))) } + Some(predicate) } override def simpleString: String = - s"$box2dColumnName INTERSECTS BOX(${queryBox.getXMin} ${queryBox.getYMin}, " + + s"$box2dColumnName ${predicateKind.simpleName} BOX(${queryBox.getXMin} ${queryBox.getYMin}, " + s"${queryBox.getXMax} ${queryBox.getYMax})" } } diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala index a1d339a195a..8a7b64741cf 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileForma import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.AndFilter import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.Box2DLeafFilter +import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.Box2DPredicateKind import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.LeafFilter import org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.OrFilter import org.apache.spark.sql.catalyst.InternalRow @@ -56,21 +57,6 @@ import org.locationtech.jts.geom.Point class SpatialFilterPushDownForGeoParquet(sparkSession: SparkSession) extends Rule[LogicalPlan] { - /** - * Box2D filter pushdown is opt-in (default `false`) because pruning via the geometry column's - * recorded bbox is only sound when per-row Box2D values equal per-row geometry envelopes. - * Sedona's own writer satisfies this (`ST_Box2D(geom)` produces exact envelopes), but the - * GeoParquet 1.1 spec permits conservatively wider coverings (e.g., sedona-db's Float32 writer - * rounds outward via `next_after`), which can produce false negatives. Users who know their - * files were written with exact coverings can enable this via - * `spark.sedona.geoparquet.box2dFilterPushDown=true`. The proper Parquet - * column-statistics-based pruning that removes this caveat is tracked in #2949. - */ - private def enableBox2DFilterPushDown: Boolean = - sparkSession.conf - .get("spark.sedona.geoparquet.box2dFilterPushDown", "false") - .toBoolean - override def apply(plan: LogicalPlan): LogicalPlan = { val enableSpatialFilterPushDown = sparkSession.conf.get("spark.sedona.geoparquet.spatialFilterPushDown", "true").toBoolean @@ -162,19 +148,23 @@ class SpatialFilterPushDownForGeoParquet(sparkSession: SparkSession) extends Rul SpatialPredicate.INTERSECTS, GeometryUDT.deserialize(value)) - // Box2D predicates push down to an INTERSECTS check on the geometry column whose covering - // metadata references this Box2D column. Both BoxIntersects and BoxContains use INTERSECTS - // semantics at the file level: per-row containment implies per-row intersection, which is - // only possible if the geometry column's file-level bbox intersects the query box. - // Soundness requires the Box2D column to be exactly the per-row geometry envelope (true for - // ST_Box2D(geom)-derived columns including the auto-generated _bbox path). When the - // covering column is conservatively wider than the geometry (spec-permitted), pushdown can - // drop matching files — see #2949. The box2dFilterPushDown conf is opt-in by default. - case ST_BoxIntersects(_) | ST_BoxContains(_) if enableBox2DFilterPushDown => + // Box2D predicates push down as Parquet row-group filters on the Box2D column's underlying + // (xmin, ymin, xmax, ymax) double leaves. Pruning is done by Parquet's stats-based skipping + // against the column's recorded min/max, which is sound regardless of how the writer chose + // the per-row Box2D values. + case ST_BoxIntersects(_) => + // Intersects is symmetric — both argument orders produce the same predicate. for { (name, value) <- resolveNameAndLiteral(predicate.children, pushableColumn) queryBox <- extractBox2DLiteral(value) - } yield Box2DLeafFilter(unquote(name), queryBox) + } yield Box2DLeafFilter(unquote(name), Box2DPredicateKind.Intersects, queryBox) + + case ST_BoxContains(Seq(pushableColumn(name), Literal(v, _))) => + extractBox2DLiteral(v).map(qb => + Box2DLeafFilter(unquote(name), Box2DPredicateKind.ColumnContainsLiteral, qb)) + case ST_BoxContains(Seq(Literal(v, _), pushableColumn(name))) => + extractBox2DLiteral(v).map(qb => + Box2DLeafFilter(unquote(name), Box2DPredicateKind.LiteralContainsColumn, qb)) case LessThan(ST_Distance(distArgs), Literal(d, DoubleType)) => for ((name, value) <- resolveNameAndLiteral(distArgs, pushableColumn)) diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala index 11881bd4f3d..75ce42a1735 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala @@ -319,113 +319,86 @@ class GeoParquetSpatialFilterPushDownSuite extends TestBaseScala with TableDrive } } - it("Push down ST_BoxIntersects against a Box2D covering column") { - val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture() + it("Push down ST_BoxIntersects against a Box2D column") { + val (box2dDf, box2dDir) = setupBox2DCoveringFixture() try { - withConf(Map("spark.sedona.geoparquet.box2dFilterPushDown" -> "true")) { - // Q1 region only (region 1, center +10/+10) - val q1Filter = - "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)))" - verifyBox2DFilter(box2dDf, box2dMetaMap, q1Filter, Seq(1)) - - // Window covering Q2 and Q4 (negative X) — should preserve regions 0 and 2 - val leftHalfFilter = - "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(-20.0, -20.0), ST_Point(-1.0, 20.0)))" - verifyBox2DFilter(box2dDf, box2dMetaMap, leftHalfFilter, Seq(0, 2)) - - // Disjoint window prunes everything - val disjointFilter = - "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(100.0, 100.0), ST_Point(200.0, 200.0)))" - verifyBox2DFilter(box2dDf, box2dMetaMap, disjointFilter, Seq.empty) - - // Reverse argument order: ST_BoxIntersects(lit, col) is symmetric and should produce - // the same pruning as ST_BoxIntersects(col, lit). - val reversedFilter = - "ST_BoxIntersects(ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)), geom_bbox)" - verifyBox2DFilter(box2dDf, box2dMetaMap, reversedFilter, Seq(1)) - } + // Q1 region only (region 1, center +10/+10) + val q1Filter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)))" + verifyBox2DFilter(box2dDf, q1Filter) + + // Window covering Q2 and Q4 (negative X) + val leftHalfFilter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(-20.0, -20.0), ST_Point(-1.0, 20.0)))" + verifyBox2DFilter(box2dDf, leftHalfFilter) + + // Disjoint window prunes everything + val disjointFilter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(100.0, 100.0), ST_Point(200.0, 200.0)))" + verifyBox2DFilter(box2dDf, disjointFilter) + + // Reverse argument order: ST_BoxIntersects(lit, col) is symmetric. + val reversedFilter = + "ST_BoxIntersects(ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)), geom_bbox)" + verifyBox2DFilter(box2dDf, reversedFilter) } finally { FileUtils.deleteDirectory(new File(box2dDir).getParentFile) } } - it("Push down ST_BoxContains against a Box2D covering column") { - val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture() + it("Push down ST_BoxContains against a Box2D column") { + val (box2dDf, box2dDir) = setupBox2DCoveringFixture() try { - withConf(Map("spark.sedona.geoparquet.box2dFilterPushDown" -> "true")) { - // ST_BoxContains(box_col, lit_box) pushes down as INTERSECTS at the file level. A tiny - // query box inside Q1 prunes everything except region 1. - val containsFilter = - "ST_BoxContains(geom_bbox, ST_MakeBox2D(ST_Point(9.0, 9.0), ST_Point(10.0, 10.0)))" - verifyBox2DFilter(box2dDf, box2dMetaMap, containsFilter, Seq(1)) - - // Reverse argument order: ST_BoxContains(lit_box, col) — at the file level both - // orderings devolve to INTERSECTS pruning. - val reversedFilter = - "ST_BoxContains(ST_MakeBox2D(ST_Point(9.0, 9.0), ST_Point(10.0, 10.0)), geom_bbox)" - verifyBox2DFilter(box2dDf, box2dMetaMap, reversedFilter, Seq(1)) - } - } finally { - FileUtils.deleteDirectory(new File(box2dDir).getParentFile) - } - } - - it("Box2D filter pushdown disabled by default") { - val (box2dDf, box2dDir, _) = setupBox2DCoveringFixture() - try { - val dfFiltered = box2dDf.where( - "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0), ST_Point(15.0, 15.0)))") - assert( - getPushedDownSpatialFilter(dfFiltered).isEmpty, - "Box2D filter pushdown should be off by default until #2949 lands proper Parquet " + - "column-statistics-based pruning") + // ST_BoxContains(box_col, lit_box) — the column box must contain the literal box. A tiny + // query inside Q1 is contained only by rows from region 1. + val containsFilter = + "ST_BoxContains(geom_bbox, ST_MakeBox2D(ST_Point(9.0, 9.0), ST_Point(10.0, 10.0)))" + verifyBox2DFilter(box2dDf, containsFilter) + + // Reverse argument order: ST_BoxContains(lit_box, col) — the literal box must contain the + // column box. The 10x10 window in Q1 contains the 2x2 polygons centered at (5,5), (5,15), + // (15,5), (15,15) only partially; only rows whose envelopes lie entirely inside the window + // survive. + val reversedFilter = + "ST_BoxContains(ST_MakeBox2D(ST_Point(4.0, 4.0), ST_Point(16.0, 16.0)), geom_bbox)" + verifyBox2DFilter(box2dDf, reversedFilter) } finally { FileUtils.deleteDirectory(new File(box2dDir).getParentFile) } } } - private def setupBox2DCoveringFixture() - : (DataFrame, String, Map[Int, Seq[GeoParquetMetaData]]) = { + private def setupBox2DCoveringFixture(): (DataFrame, String) = { val box2dParent = Files.createTempDirectory("sedona_geoparquet_box2d_").toFile.getAbsolutePath val box2dDir = box2dParent + "/data" val withBox = df.withColumn("geom_bbox", expr("ST_Box2D(geom)")) withBox.coalesce(1).write.partitionBy("region").format("geoparquet").save(box2dDir) val box2dDf = sparkSession.read.format("geoparquet").load(box2dDir) - val box2dMetaMap = readGeoParquetMetaDataMap(box2dDir) - (box2dDf, box2dDir, box2dMetaMap) + (box2dDf, box2dDir) } - private def verifyBox2DFilter( - box2dDf: DataFrame, - box2dMetaMap: Map[Int, Seq[GeoParquetMetaData]], - condition: String, - expectedPreservedRegions: Seq[Int]): Unit = { + private def verifyBox2DFilter(box2dDf: DataFrame, condition: String): Unit = { val dfFiltered = box2dDf.where(condition) + + // Pushdown is attached and translates to a Parquet row-group filter. val pushed = getPushedDownSpatialFilter(dfFiltered) - assert(pushed.isDefined, s"Expected filter push-down for: $condition") - val preserved = box2dMetaMap - .filter { case (_, metaDataList) => - metaDataList.exists(metadata => pushed.get.evaluate(metadata.columns)) - } - .keys - .toSeq - .sorted + assert(pushed.isDefined, s"Expected spatial filter push-down for: $condition") assert( - expectedPreservedRegions.sorted == preserved, - s"Expected $expectedPreservedRegions, got $preserved for: $condition") - - // Verify the pushed-down result matches the result computed without push-down (i.e. that - // pruning did not drop any rows we should have kept). - val expectedResult = withConf(Map("spark.sedona.geoparquet.box2dFilterPushDown" -> "false")) { - box2dDf - .where(condition) - .orderBy("region", "id") - .select("region", "id") - .collect() - .toSeq - } + pushed.get.toParquetFilter.isDefined, + s"Expected a Parquet FilterPredicate for: $condition") + + // Correctness: pushdown must not drop any matching rows. Compare against a run with the + // spatial filter rule disabled (so no Parquet predicate is injected from Sedona). + val expectedResult = + withConf(Map("spark.sedona.geoparquet.spatialFilterPushDown" -> "false")) { + box2dDf + .where(condition) + .orderBy("region", "id") + .select("region", "id") + .collect() + .toSeq + } val actualResult = dfFiltered.orderBy("region", "id").select("region", "id").collect().toSeq assert(expectedResult == actualResult, s"Result mismatch under push-down for: $condition") From 0d69d507681aba83b7b515f4286ff305ecd14be5 Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Mon, 11 May 2026 22:46:51 -0700 Subject: [PATCH 5/5] Address Copilot review on Box2D row-group pushdown - Gate spatial Parquet predicate injection on `enableParquetFilterPushDown` so disabling `spark.sql.parquet.filterPushdown` also disables Sedona- injected row-group predicates (otherwise the spatial path bypassed the user's intent to turn parquet pushdown off). - Reject inverted-bound Box2D literals in extractBox2DLiteral so the predicate falls back to runtime evaluation and surfaces the expected IllegalArgumentException; otherwise Parquet's row-group filter could prune all matches and hide the throw. - Add a test locking in the inverted-bound fallback: no pushdown attached, and collect() surfaces an IllegalArgumentException in the cause chain. --- .../geoparquet/GeoParquetFileFormat.scala | 16 +++++++++---- .../SpatialFilterPushDownForGeoParquet.scala | 12 ++++++++-- ...GeoParquetSpatialFilterPushDownSuite.scala | 24 +++++++++++++++++++ 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala index 72ac543e52d..db0f1dd8649 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala @@ -275,11 +275,17 @@ class GeoParquetFileFormat(val spatialFilter: Option[GeoParquetSpatialFilter]) // Spatial filters that translate to Parquet row-group predicates (e.g. Box2D bounds // comparisons on a Box2D-typed column) are AND'd into the pushed-down filter so Parquet - // can skip row groups whose column statistics disprove them. - val combinedPushed = spatialFilter.flatMap(_.toParquetFilter) match { - case Some(spatialPredicate) => - Some(pushed.fold(spatialPredicate)(p => FilterApi.and(p, spatialPredicate))) - case None => pushed + // can skip row groups whose column statistics disprove them. Gated on the same Spark + // SQL flag as ordinary Parquet pushdown so disabling `spark.sql.parquet.filterPushdown` + // also disables Sedona-injected row-group predicates. + val combinedPushed = if (enableParquetFilterPushDown) { + spatialFilter.flatMap(_.toParquetFilter) match { + case Some(spatialPredicate) => + Some(pushed.fold(spatialPredicate)(p => FilterApi.and(p, spatialPredicate))) + case None => pushed + } + } else { + pushed } // Prune file scans using pushed down spatial filters and per-column bboxes in geoparquet metadata diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala index 8a7b64741cf..31ccea3de6d 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala @@ -280,11 +280,19 @@ class SpatialFilterPushDownForGeoParquet(sparkSession: SparkSession) extends Rul /** * Extract a [[Box2D]] from a Catalyst literal value. Box2DUDT serializes to an InternalRow of - * four doubles; if the value is something else, the predicate is not pushable. + * four doubles; if the value is something else, the predicate is not pushable. Inverted bounds + * (xmin>xmax or ymin>ymax) are rejected here so the predicate falls back to runtime evaluation + * and surfaces the expected IllegalArgumentException — pushing them through Parquet would + * silently prune all matching rows before the throw fires. */ private def extractBox2DLiteral(value: Any): Option[Box2D] = value match { case row: InternalRow if row.numFields == 4 => - Some(new Box2D(row.getDouble(0), row.getDouble(1), row.getDouble(2), row.getDouble(3))) + val xmin = row.getDouble(0) + val ymin = row.getDouble(1) + val xmax = row.getDouble(2) + val ymax = row.getDouble(3) + if (xmin > xmax || ymin > ymax) None + else Some(new Box2D(xmin, ymin, xmax, ymax)) case _ => None } diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala index 75ce42a1735..9eec5b7070f 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala @@ -346,6 +346,30 @@ class GeoParquetSpatialFilterPushDownSuite extends TestBaseScala with TableDrive } } + it("ST_BoxIntersects with inverted-bound literal falls back to runtime throw") { + val (box2dDf, box2dDir) = setupBox2DCoveringFixture() + try { + // xmin > xmax: must not push down — otherwise Parquet's row-group filter could prune all + // matches and hide the expected IllegalArgumentException from the predicate's runtime + // evaluation. + val invertedFilter = + "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(20.0, -20.0), ST_Point(-20.0, 20.0)))" + val dfFiltered = box2dDf.where(invertedFilter) + assert( + getPushedDownSpatialFilter(dfFiltered).isEmpty, + "Inverted-bound Box2D literal must not be pushed down") + val ex = intercept[Exception](dfFiltered.collect()) + assert( + Iterator + .iterate(ex: Throwable)(_.getCause) + .takeWhile(_ != null) + .exists(_.isInstanceOf[IllegalArgumentException]), + s"Expected IllegalArgumentException in cause chain, got: $ex") + } finally { + FileUtils.deleteDirectory(new File(box2dDir).getParentFile) + } + } + it("Push down ST_BoxContains against a Box2D column") { val (box2dDf, box2dDir) = setupBox2DCoveringFixture() try {