From 3452e8be948bbb4428b4d73e122b68462a4fe36f Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Wed, 6 May 2026 00:15:36 -0700 Subject: [PATCH 1/2] [GH-2888] Flink bindings for Box2D Phase 1 surface Mirrors the Phase 1 SQL surface in Flink: - Box2DTypeSerializer (RAW Flink TypeSerializer for Box2D) - ST_Box2D scalar in Functions - ST_MakeBox2D, ST_GeomFromBox2D scalars in Constructors - ST_Extent aggregate in Aggregators - Box2D eval overloads on ST_XMin, ST_XMax, ST_YMin, ST_YMax, ST_AsText - Catalog registration for the four new top-level functions Tests in FunctionTest, ConstructorTest, AggregatorTest cover the happy path, NULL/empty propagation, and the dimensionality dispatch in ST_GeomFromBox2D. Closes #2888. --- .../sedona/flink/Box2DTypeSerializer.java | 167 ++++++++++++++++++ .../java/org/apache/sedona/flink/Catalog.java | 4 + .../sedona/flink/expressions/Aggregators.java | 60 +++++++ .../flink/expressions/Constructors.java | 35 ++++ .../sedona/flink/expressions/Functions.java | 65 +++++++ .../apache/sedona/flink/AggregatorTest.java | 29 +++ .../apache/sedona/flink/ConstructorTest.java | 32 ++++ .../org/apache/sedona/flink/FunctionTest.java | 37 ++++ 8 files changed, 429 insertions(+) create mode 100644 flink/src/main/java/org/apache/sedona/flink/Box2DTypeSerializer.java diff --git a/flink/src/main/java/org/apache/sedona/flink/Box2DTypeSerializer.java b/flink/src/main/java/org/apache/sedona/flink/Box2DTypeSerializer.java new file mode 100644 index 00000000000..da6524628e2 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/Box2DTypeSerializer.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink; + +import java.io.IOException; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.sedona.common.geometryObjects.Box2D; + +/** + * Flink {@link TypeSerializer} for {@link Box2D}. Serializes as a presence byte followed by four + * doubles (xmin, ymin, xmax, ymax). Total payload for a non-null value is 33 bytes. + */ +public class Box2DTypeSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + public static final Box2DTypeSerializer INSTANCE = new Box2DTypeSerializer(); + + public Box2DTypeSerializer() {} + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public TypeSerializer duplicate() { + return this; + } + + @Override + public Box2D createInstance() { + return new Box2D(0.0, 0.0, 0.0, 0.0); + } + + @Override + public Box2D copy(Box2D from) { + if (from == null) { + return null; + } + return new Box2D(from.getXMin(), from.getYMin(), from.getXMax(), from.getYMax()); + } + + @Override + public Box2D copy(Box2D from, Box2D reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(Box2D record, DataOutputView target) throws IOException { + if (record == null) { + target.writeByte(0); + } else { + target.writeByte(1); + target.writeDouble(record.getXMin()); + target.writeDouble(record.getYMin()); + target.writeDouble(record.getXMax()); + target.writeDouble(record.getYMax()); + } + } + + @Override + public Box2D deserialize(DataInputView source) throws IOException { + byte present = source.readByte(); + if (present == 0) { + return null; + } + double xmin = source.readDouble(); + double ymin = source.readDouble(); + double xmax = source.readDouble(); + double ymax = source.readDouble(); + return new Box2D(xmin, ymin, xmax, ymax); + } + + @Override + public Box2D deserialize(Box2D reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + byte present = source.readByte(); + target.writeByte(present); + if (present != 0) { + target.write(source, 32); + } + } + + @Override + public boolean equals(Object obj) { + return obj instanceof Box2DTypeSerializer; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new Box2DSerializerSnapshot(); + } + + public static final class Box2DSerializerSnapshot implements TypeSerializerSnapshot { + private static final int CURRENT_VERSION = 1; + + @Override + public int getCurrentVersion() { + return CURRENT_VERSION; + } + + @Override + public void writeSnapshot(DataOutputView out) throws IOException {} + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) + throws IOException { + if (readVersion != CURRENT_VERSION) { + throw new IOException( + "Cannot read snapshot: Incompatible version " + + readVersion + + ". Expected version " + + CURRENT_VERSION); + } + } + + @Override + public TypeSerializer restoreSerializer() { + return Box2DTypeSerializer.INSTANCE; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( + TypeSerializer newSerializer) { + if (newSerializer instanceof Box2DTypeSerializer) { + return TypeSerializerSchemaCompatibility.compatibleAsIs(); + } else { + return TypeSerializerSchemaCompatibility.incompatible(); + } + } + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java index 3ef00ce247e..ba813a3d3b2 100644 --- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java +++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java @@ -25,6 +25,7 @@ public class Catalog { public static UserDefinedFunction[] getFuncs() { return new UserDefinedFunction[] { new Aggregators.ST_Envelope_Aggr(), + new Aggregators.ST_Extent(), new Aggregators.ST_Intersection_Aggr(), new Aggregators.ST_Union_Aggr(), // Aliases for *_Aggr functions with *_Agg suffix @@ -40,6 +41,8 @@ public static UserDefinedFunction[] getFuncs() { new Constructors.ST_PointFromWKB(), new Constructors.ST_LineFromWKB(), new Constructors.ST_LinestringFromWKB(), + new Constructors.ST_GeomFromBox2D(), + new Constructors.ST_MakeBox2D(), new Constructors.ST_MakeEnvelope(), new Constructors.ST_MakePoint(), new Constructors.ST_MakePointM(), @@ -78,6 +81,7 @@ public static UserDefinedFunction[] getFuncs() { new Functions.ST_ConvexHull(), new Functions.ST_CrossesDateLine(), new Functions.ST_Expand(), + new Functions.ST_Box2D(), new Functions.ST_Envelope(), new Functions.ST_OrientedEnvelope(), new Functions.ST_Difference(), diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java index 947507d16a4..1e15c04f377 100644 --- a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java +++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java @@ -20,6 +20,8 @@ import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.functions.AggregateFunction; +import org.apache.sedona.common.geometryObjects.Box2D; +import org.apache.sedona.flink.Box2DTypeSerializer; import org.apache.sedona.flink.GeometryTypeSerializer; import org.locationtech.jts.geom.Coordinate; import org.locationtech.jts.geom.Envelope; @@ -108,6 +110,64 @@ public void resetAccumulator(Accumulators.Envelope acc) { } } + // Aggregate the bounding box of all input geometries as a Box2D. Mirrors PostGIS ST_Extent. + // Returns null when there are no rows or all inputs are null/empty. + @DataTypeHint(value = "RAW", rawSerializer = Box2DTypeSerializer.class, bridgedTo = Box2D.class) + public static class ST_Extent extends AggregateFunction { + + @Override + public Accumulators.Envelope createAccumulator() { + return new Accumulators.Envelope(); + } + + @Override + @DataTypeHint(value = "RAW", rawSerializer = Box2DTypeSerializer.class, bridgedTo = Box2D.class) + public Box2D getValue(Accumulators.Envelope acc) { + if (acc.minX > acc.maxX) return null; + return new Box2D(acc.minX, acc.minY, acc.maxX, acc.maxY); + } + + public void accumulate( + Accumulators.Envelope acc, + @DataTypeHint( + value = "RAW", + rawSerializer = GeometryTypeSerializer.class, + bridgedTo = Geometry.class) + Object o) { + if (o == null) return; + Geometry geometry = (Geometry) o; + if (geometry.isEmpty()) return; + Envelope envelope = geometry.getEnvelopeInternal(); + acc.minX = Math.min(acc.minX, envelope.getMinX()); + acc.minY = Math.min(acc.minY, envelope.getMinY()); + acc.maxX = Math.max(acc.maxX, envelope.getMaxX()); + acc.maxY = Math.max(acc.maxY, envelope.getMaxY()); + } + + public void retract( + Accumulators.Envelope acc, + @DataTypeHint( + value = "RAW", + rawSerializer = GeometryTypeSerializer.class, + bridgedTo = Geometry.class) + Object o) { + assert (false); + } + + public void merge(Accumulators.Envelope acc, Iterable it) { + for (Accumulators.Envelope a : it) { + acc.minX = Math.min(acc.minX, a.minX); + acc.minY = Math.min(acc.minY, a.minY); + acc.maxX = Math.max(acc.maxX, a.maxX); + acc.maxY = Math.max(acc.maxY, a.maxY); + } + } + + public void resetAccumulator(Accumulators.Envelope acc) { + acc.reset(); + } + } + // Compute the Union boundary of numbers of geometries // @DataTypeHint( diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java index 68c3d90621f..9f7c4fc0f6c 100644 --- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java +++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java @@ -23,8 +23,10 @@ import org.apache.sedona.common.Functions; import org.apache.sedona.common.enums.FileDataSplitter; import org.apache.sedona.common.enums.GeometryType; +import org.apache.sedona.common.geometryObjects.Box2D; import org.apache.sedona.common.utils.FormatUtils; import org.apache.sedona.common.utils.GeoHashDecoder; +import org.apache.sedona.flink.Box2DTypeSerializer; import org.apache.sedona.flink.GeometryTypeSerializer; import org.locationtech.jts.geom.*; import org.locationtech.jts.io.ParseException; @@ -297,6 +299,39 @@ public Geometry eval( } } + public static class ST_MakeBox2D extends ScalarFunction { + @DataTypeHint(value = "RAW", rawSerializer = Box2DTypeSerializer.class, bridgedTo = Box2D.class) + public Box2D eval( + @DataTypeHint( + value = "RAW", + rawSerializer = GeometryTypeSerializer.class, + bridgedTo = Geometry.class) + Object lowerLeft, + @DataTypeHint( + value = "RAW", + rawSerializer = GeometryTypeSerializer.class, + bridgedTo = Geometry.class) + Object upperRight) { + return org.apache.sedona.common.Constructors.makeBox2D( + (Geometry) lowerLeft, (Geometry) upperRight); + } + } + + public static class ST_GeomFromBox2D extends ScalarFunction { + @DataTypeHint( + value = "RAW", + rawSerializer = GeometryTypeSerializer.class, + bridgedTo = Geometry.class) + public Geometry eval( + @DataTypeHint( + value = "RAW", + rawSerializer = Box2DTypeSerializer.class, + bridgedTo = Box2D.class) + Box2D box) { + return org.apache.sedona.common.Constructors.geomFromBox2D(box); + } + } + public static class ST_PolygonFromEnvelope extends ScalarFunction { @DataTypeHint( value = "RAW", diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java index 71029be3a44..2f0103a639a 100644 --- a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java +++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java @@ -23,6 +23,8 @@ import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.InputGroup; import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.common.geometryObjects.Box2D; +import org.apache.sedona.flink.Box2DTypeSerializer; import org.apache.sedona.flink.GeometryArrayTypeSerializer; import org.apache.sedona.flink.GeometryTypeSerializer; import org.geotools.api.referencing.FactoryException; @@ -502,6 +504,19 @@ public Geometry eval( } } + public static class ST_Box2D extends ScalarFunction { + @DataTypeHint(value = "RAW", rawSerializer = Box2DTypeSerializer.class, bridgedTo = Box2D.class) + public Box2D eval( + @DataTypeHint( + value = "RAW", + rawSerializer = GeometryTypeSerializer.class, + bridgedTo = Geometry.class) + Object o) { + Geometry geom = (Geometry) o; + return org.apache.sedona.common.Functions.box2D(geom); + } + } + public static class ST_OrientedEnvelope extends ScalarFunction { @DataTypeHint( value = "RAW", @@ -921,6 +936,16 @@ public Double eval( Geometry geom = (Geometry) o; return org.apache.sedona.common.Functions.yMin(geom); } + + @DataTypeHint("Double") + public Double eval( + @DataTypeHint( + value = "RAW", + rawSerializer = Box2DTypeSerializer.class, + bridgedTo = Box2D.class) + Box2D box) { + return org.apache.sedona.common.Functions.yMin(box); + } } public static class ST_YMax extends ScalarFunction { @@ -934,6 +959,16 @@ public Double eval( Geometry geom = (Geometry) o; return org.apache.sedona.common.Functions.yMax(geom); } + + @DataTypeHint("Double") + public Double eval( + @DataTypeHint( + value = "RAW", + rawSerializer = Box2DTypeSerializer.class, + bridgedTo = Box2D.class) + Box2D box) { + return org.apache.sedona.common.Functions.yMax(box); + } } public static class ST_ZMax extends ScalarFunction { @@ -1287,6 +1322,16 @@ public String eval( Geometry geom = (Geometry) o; return org.apache.sedona.common.Functions.asWKT(geom); } + + @DataTypeHint("String") + public String eval( + @DataTypeHint( + value = "RAW", + rawSerializer = Box2DTypeSerializer.class, + bridgedTo = Box2D.class) + Box2D box) { + return org.apache.sedona.common.Functions.box2dAsText(box); + } } public static class ST_AsEWKB extends ScalarFunction { @@ -1499,6 +1544,16 @@ public Double eval( Geometry geom = (Geometry) o; return org.apache.sedona.common.Functions.xMax(geom); } + + @DataTypeHint("Double") + public Double eval( + @DataTypeHint( + value = "RAW", + rawSerializer = Box2DTypeSerializer.class, + bridgedTo = Box2D.class) + Box2D box) { + return org.apache.sedona.common.Functions.xMax(box); + } } public static class ST_XMin extends ScalarFunction { @@ -1512,6 +1567,16 @@ public Double eval( Geometry geom = (Geometry) o; return org.apache.sedona.common.Functions.xMin(geom); } + + @DataTypeHint("Double") + public Double eval( + @DataTypeHint( + value = "RAW", + rawSerializer = Box2DTypeSerializer.class, + bridgedTo = Box2D.class) + Box2D box) { + return org.apache.sedona.common.Functions.xMin(box); + } } public static class ST_BuildArea extends ScalarFunction { diff --git a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java index 4511788dc3b..e1af138fdc1 100644 --- a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java +++ b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.table.api.*; import org.apache.flink.types.Row; +import org.apache.sedona.common.geometryObjects.Box2D; import org.apache.sedona.flink.expressions.Functions; import org.junit.BeforeClass; import org.junit.Test; @@ -47,6 +48,34 @@ public void testEnvelop_Aggr() { last.getField(0).toString()); } + @Test + public void testExtent() { + tableEnv.executeSql( + "CREATE OR REPLACE TEMPORARY VIEW extent_view AS " + + "SELECT ST_GeomFromWKT(wkt) as geom FROM (" + + "VALUES ('POINT (1 2)'), ('POINT (4 5)'), ('LINESTRING (-3 0, 0 0)')" + + ") AS t(wkt)"); + Table result = tableEnv.sqlQuery("SELECT ST_Extent(geom) FROM extent_view"); + Row last = last(result); + Box2D bbox = (Box2D) last.getField(0); + assertEquals(-3.0, bbox.getXMin(), 0.0); + assertEquals(0.0, bbox.getYMin(), 0.0); + assertEquals(4.0, bbox.getXMax(), 0.0); + assertEquals(5.0, bbox.getYMax(), 0.0); + } + + @Test + public void testExtent_EmptyAndNullGeometries() { + tableEnv.executeSql( + "CREATE OR REPLACE TEMPORARY VIEW null_extent_view AS " + + "SELECT ST_GeomFromWKT(wkt) as geom FROM (" + + "VALUES (CAST(NULL AS STRING)), ('POINT EMPTY'), ('POLYGON EMPTY')" + + ") AS t(wkt)"); + Table result = tableEnv.sqlQuery("SELECT ST_Extent(geom) FROM null_extent_view"); + Row last = last(result); + assertNull(last.getField(0)); + } + @Test public void testEnvelope_Aggr_EmptyGeometries() { tableEnv.executeSql( diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java index 9732e4ba8e0..4a48adc564c 100644 --- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java +++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.Table; import org.apache.flink.types.Row; +import org.apache.sedona.common.geometryObjects.Box2D; import org.apache.sedona.common.jts2geojson.GeoJSONReader; import org.apache.sedona.flink.expressions.Constructors; import org.apache.sedona.flink.expressions.Functions; @@ -367,6 +368,37 @@ public void testGeometryFromText() { assertEquals(4326, actual); } + @Test + public void testMakeBox2D() { + Box2D bbox = + (Box2D) + last(tableEnv.sqlQuery("SELECT ST_MakeBox2D(ST_Point(1.0, 2.0), ST_Point(4.0, 5.0))")) + .getField(0); + assertEquals(1.0, bbox.getXMin(), 0.0); + assertEquals(2.0, bbox.getYMin(), 0.0); + assertEquals(4.0, bbox.getXMax(), 0.0); + assertEquals(5.0, bbox.getYMax(), 0.0); + } + + @Test + public void testGeomFromBox2D() { + String wkt = + last(tableEnv.sqlQuery( + "SELECT ST_AsText(ST_GeomFromBox2D(ST_MakeBox2D(" + + "ST_Point(1.0, 2.0), ST_Point(4.0, 5.0))))")) + .getField(0) + .toString(); + assertEquals("POLYGON ((1 2, 1 5, 4 5, 4 2, 1 2))", wkt); + + String pointWkt = + last(tableEnv.sqlQuery( + "SELECT ST_AsText(ST_GeomFromBox2D(ST_MakeBox2D(" + + "ST_Point(3.0, 3.0), ST_Point(3.0, 3.0))))")) + .getField(0) + .toString(); + assertEquals("POINT (3 3)", pointWkt); + } + @Test public void testMakeEnvelope() { Double minX = 1.0; diff --git a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java index 2171ecc78ea..15fa3ae8868 100644 --- a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java +++ b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java @@ -30,6 +30,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.table.api.Table; import org.apache.flink.types.Row; +import org.apache.sedona.common.geometryObjects.Box2D; import org.apache.sedona.flink.expressions.Functions; import org.apache.sedona.flink.expressions.FunctionsProj4; import org.datasyslab.proj4sedona.core.Proj; @@ -438,6 +439,42 @@ public void testEndPoint() { assertEquals("POINT (3 3)", result.toString()); } + @Test + public void testBox2D() { + Table t = + tableEnv.sqlQuery( + "SELECT ST_Box2D(ST_GeomFromText('POLYGON((1 2, 1 5, 4 5, 4 2, 1 2))')) AS bbox"); + Box2D bbox = (Box2D) first(t).getField(0); + assertEquals(1.0, bbox.getXMin(), 0.0); + assertEquals(2.0, bbox.getYMin(), 0.0); + assertEquals(4.0, bbox.getXMax(), 0.0); + assertEquals(5.0, bbox.getYMax(), 0.0); + + // null and empty inputs propagate to NULL. + Table tNull = + tableEnv.sqlQuery( + "SELECT ST_Box2D(ST_GeomFromText('POINT EMPTY')) AS empty_bbox," + + " ST_Box2D(ST_GeomFromText(CAST(NULL AS STRING))) AS null_bbox"); + Row row = first(tNull); + assertNull(row.getField(0)); + assertNull(row.getField(1)); + } + + @Test + public void testBox2DAsTextAndAccessors() { + Table t = + tableEnv.sqlQuery( + "WITH bx AS (" + + " SELECT ST_Box2D(ST_GeomFromText('POLYGON((1 2, 1 5, 4 5, 4 2, 1 2))')) AS b)" + + " SELECT ST_AsText(b), ST_XMin(b), ST_YMin(b), ST_XMax(b), ST_YMax(b) FROM bx"); + Row row = first(t); + assertEquals("BOX(1.0 2.0, 4.0 5.0)", row.getField(0)); + assertEquals(1.0, row.getField(1)); + assertEquals(2.0, row.getField(2)); + assertEquals(4.0, row.getField(3)); + assertEquals(5.0, row.getField(4)); + } + @Test public void testEnvelope() { Table linestringTable = createLineStringTable(1); From 0aa7ca15d12bf2b5f33125067aa7750623a5c7bf Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Wed, 6 May 2026 17:26:49 -0700 Subject: [PATCH 2/2] Box2DTypeSerializer.createInstance() returns null instead of (0,0,0,0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (0, 0) is Null Island — equator × prime meridian — a real point. A zero-zero Box2D used as an aggregation seed or initial value would silently include the origin in every result. Box2D has no in-band empty marker; absence is represented by SQL NULL. Returning null from createInstance() is the honest default and matches our type contract. The ST_Extent aggregator was never affected (it uses Accumulators.Envelope with +inf/-inf, not Box2D), but the precedent in createInstance() should not be a footgun. --- .../java/org/apache/sedona/flink/Box2DTypeSerializer.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink/src/main/java/org/apache/sedona/flink/Box2DTypeSerializer.java b/flink/src/main/java/org/apache/sedona/flink/Box2DTypeSerializer.java index da6524628e2..0fd5e51fa21 100644 --- a/flink/src/main/java/org/apache/sedona/flink/Box2DTypeSerializer.java +++ b/flink/src/main/java/org/apache/sedona/flink/Box2DTypeSerializer.java @@ -50,7 +50,11 @@ public TypeSerializer duplicate() { @Override public Box2D createInstance() { - return new Box2D(0.0, 0.0, 0.0, 0.0); + // Box2D has no in-band "empty" sentinel — absence is represented by SQL NULL. + // Returning null here is the honest default; (0, 0, 0, 0) would alias to a real point at + // the equator/prime-meridian and could silently corrupt any caller that treats the + // createInstance() result as a usable bbox. + return null; } @Override