Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.geoparquet

import scala.util.control.NonFatal

import org.apache.spark.sql.sedona_sql.UDT.Box2DUDT
import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
import org.datasyslab.proj4sedona.core.Proj
import org.datasyslab.proj4sedona.parser.CRSSerializer
Expand Down Expand Up @@ -236,6 +237,18 @@ object GeoParquetMetaData {
schema(coveringColumnIndex).dataType match {
case coveringColumnType: StructType =>
coveringColumnTypeToCovering(coveringColumnName, coveringColumnType)
case udt: Box2DUDT =>
// Box2DUDT exposes a struct<xmin, ymin, xmax, ymax: double> sqlType, which is the exact
// shape required by GeoParquet 1.1 bbox covering columns. Treat the underlying struct as
// the covering struct so users can write a Box2D column and have it referenced as a
// covering column in GeoParquet metadata without any manual struct construction.
udt.sqlType match {
case structType: StructType =>
coveringColumnTypeToCovering(coveringColumnName, structType)
case other =>
throw new IllegalStateException(
s"Box2DUDT.sqlType is expected to be a StructType, got $other")
}
case _ =>
Comment on lines 237 to 252
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 776c5b9 — bound the case to udt: Box2DUDT and use udt.sqlType with a match on StructType (no allocation, no asInstanceOf, clear failure mode if the sqlType shape ever changes).

On the second suggestion (generalize to any UserDefinedType whose sqlType is a StructType): I would push back on that. Other UDTs may have struct-shaped sqlTypes that are not valid bbox covering columns — e.g., a future ImageUDT or RasterMetadataUDT could have a struct of metadata fields, and the call to validateField("xmin") inside coveringColumnTypeToCovering would throw a confusing error instead of the clear "not a struct type" one. Keeping the match specific to Box2DUDT makes the contract explicit; we can add other UDTs case-by-case as they appear.

throw new IllegalArgumentException(
s"Covering column $coveringColumnName is not a struct type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,53 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll {
}
}

it("GeoParquet supports writing covering metadata from a Box2D column") {
// User-provided Box2D column referenced via the geoparquet.covering option.
val df = sparkSession
.range(0, 100)
.toDF("id")
.withColumn("id", expr("CAST(id AS DOUBLE)"))
.withColumn("geometry", expr("ST_Point(id, id + 1)"))
.withColumn("test_cov", expr("ST_Box2D(geometry)"))
val geoParquetSavePath = geoparquetoutputlocation + "/gp_with_box2d_covering.parquet"
df.write
.format("geoparquet")
.option("geoparquet.covering.geometry", "test_cov")
.mode("overwrite")
.save(geoParquetSavePath)
Comment on lines +1032 to +1037
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hard-coded path under geoparquetoutputlocation is the existing convention for the GeoParquet covering tests in this file (line 992 onward, gp_with_covering_metadata.parquet etc.). Using a different pattern just for the new tests would be inconsistent. If we want to move the suite to per-test unique dirs, that should be a focused refactor across all of these tests rather than a one-off here.

validateGeoParquetMetadata(geoParquetSavePath) { geo =>
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
val covering = coveringJsValue.extract[Covering]
assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
}
}

it("GeoParquet auto populates covering metadata for a Box2D <geom>_bbox column") {
// Auto-detect path: when a column named <geom>_bbox is a Box2D, reuse it as the
// covering column instead of synthesizing a separate float64 struct.
val df = sparkSession
.range(0, 100)
.toDF("id")
.withColumn("id", expr("CAST(id AS DOUBLE)"))
.withColumn("geometry", expr("ST_Point(id, id + 1)"))
.withColumn("geometry_bbox", expr("ST_Box2D(geometry)"))
val geoParquetSavePath = geoparquetoutputlocation + "/gp_box2d_auto_covering.parquet"
df.write.format("geoparquet").mode("overwrite").save(geoParquetSavePath)
Comment on lines +1058 to +1059
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the previous comment — sticking with the existing hard-coded geoparquetoutputlocation convention used by the other covering tests in this file. Refactoring to per-test unique dirs would be a separate suite-wide change.

validateGeoParquetMetadata(geoParquetSavePath) { geo =>
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
val covering = coveringJsValue.extract[Covering]
assert(covering.bbox.xmin == Seq("geometry_bbox", "xmin"))
assert(covering.bbox.ymin == Seq("geometry_bbox", "ymin"))
assert(covering.bbox.xmax == Seq("geometry_bbox", "xmax"))
assert(covering.bbox.ymax == Seq("geometry_bbox", "ymax"))
}
}

it("GeoParquet auto populates covering metadata for single geometry column") {
val df = sparkSession
.range(0, 100)
Expand Down
Loading