Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions flink/src/main/java/org/apache/sedona/flink/Box2DTypeSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.flink;

import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.sedona.common.geometryObjects.Box2D;

/**
* Flink {@link TypeSerializer} for {@link Box2D}. Serializes as a presence byte followed by four
* doubles (xmin, ymin, xmax, ymax). Total payload for a non-null value is 33 bytes.
*/
public class Box2DTypeSerializer extends TypeSerializer<Box2D> {

private static final long serialVersionUID = 1L;

public static final Box2DTypeSerializer INSTANCE = new Box2DTypeSerializer();

public Box2DTypeSerializer() {}

@Override
public boolean isImmutableType() {
return true;
}

@Override
public TypeSerializer<Box2D> duplicate() {
return this;
}

@Override
public Box2D createInstance() {
// Box2D has no in-band "empty" sentinel — absence is represented by SQL NULL.
// Returning null here is the honest default; (0, 0, 0, 0) would alias to a real point at
// the equator/prime-meridian and could silently corrupt any caller that treats the
// createInstance() result as a usable bbox.
return null;
}

@Override
public Box2D copy(Box2D from) {
if (from == null) {
return null;
}
return new Box2D(from.getXMin(), from.getYMin(), from.getXMax(), from.getYMax());
}

@Override
public Box2D copy(Box2D from, Box2D reuse) {
return copy(from);
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(Box2D record, DataOutputView target) throws IOException {
if (record == null) {
target.writeByte(0);
} else {
target.writeByte(1);
target.writeDouble(record.getXMin());
target.writeDouble(record.getYMin());
target.writeDouble(record.getXMax());
target.writeDouble(record.getYMax());
}
}

@Override
public Box2D deserialize(DataInputView source) throws IOException {
byte present = source.readByte();
if (present == 0) {
return null;
}
double xmin = source.readDouble();
double ymin = source.readDouble();
double xmax = source.readDouble();
double ymax = source.readDouble();
return new Box2D(xmin, ymin, xmax, ymax);
}

@Override
public Box2D deserialize(Box2D reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
byte present = source.readByte();
target.writeByte(present);
if (present != 0) {
target.write(source, 32);
}
}

@Override
public boolean equals(Object obj) {
return obj instanceof Box2DTypeSerializer;
}

@Override
public int hashCode() {
return getClass().hashCode();
}

@Override
public TypeSerializerSnapshot<Box2D> snapshotConfiguration() {
return new Box2DSerializerSnapshot();
}

public static final class Box2DSerializerSnapshot implements TypeSerializerSnapshot<Box2D> {
private static final int CURRENT_VERSION = 1;

@Override
public int getCurrentVersion() {
return CURRENT_VERSION;
}

@Override
public void writeSnapshot(DataOutputView out) throws IOException {}

@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
throws IOException {
if (readVersion != CURRENT_VERSION) {
throw new IOException(
"Cannot read snapshot: Incompatible version "
+ readVersion
+ ". Expected version "
+ CURRENT_VERSION);
}
}

@Override
public TypeSerializer<Box2D> restoreSerializer() {
return Box2DTypeSerializer.INSTANCE;
}

@Override
public TypeSerializerSchemaCompatibility<Box2D> resolveSchemaCompatibility(
TypeSerializer<Box2D> newSerializer) {
if (newSerializer instanceof Box2DTypeSerializer) {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
} else {
return TypeSerializerSchemaCompatibility.incompatible();
}
}
}
}
4 changes: 4 additions & 0 deletions flink/src/main/java/org/apache/sedona/flink/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class Catalog {
public static UserDefinedFunction[] getFuncs() {
return new UserDefinedFunction[] {
new Aggregators.ST_Envelope_Aggr(),
new Aggregators.ST_Extent(),
new Aggregators.ST_Intersection_Aggr(),
new Aggregators.ST_Union_Aggr(),
// Aliases for *_Aggr functions with *_Agg suffix
Expand All @@ -40,6 +41,8 @@ public static UserDefinedFunction[] getFuncs() {
new Constructors.ST_PointFromWKB(),
new Constructors.ST_LineFromWKB(),
new Constructors.ST_LinestringFromWKB(),
new Constructors.ST_GeomFromBox2D(),
new Constructors.ST_MakeBox2D(),
new Constructors.ST_MakeEnvelope(),
new Constructors.ST_MakePoint(),
new Constructors.ST_MakePointM(),
Expand Down Expand Up @@ -78,6 +81,7 @@ public static UserDefinedFunction[] getFuncs() {
new Functions.ST_ConvexHull(),
new Functions.ST_CrossesDateLine(),
new Functions.ST_Expand(),
new Functions.ST_Box2D(),
new Functions.ST_Envelope(),
new Functions.ST_OrientedEnvelope(),
new Functions.ST_Difference(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.sedona.common.geometryObjects.Box2D;
import org.apache.sedona.flink.Box2DTypeSerializer;
import org.apache.sedona.flink.GeometryTypeSerializer;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Envelope;
Expand Down Expand Up @@ -108,6 +110,64 @@ public void resetAccumulator(Accumulators.Envelope acc) {
}
}

// Aggregate the bounding box of all input geometries as a Box2D. Mirrors PostGIS ST_Extent.
// Returns null when there are no rows or all inputs are null/empty.
@DataTypeHint(value = "RAW", rawSerializer = Box2DTypeSerializer.class, bridgedTo = Box2D.class)
public static class ST_Extent extends AggregateFunction<Box2D, Accumulators.Envelope> {

@Override
public Accumulators.Envelope createAccumulator() {
return new Accumulators.Envelope();
}

@Override
@DataTypeHint(value = "RAW", rawSerializer = Box2DTypeSerializer.class, bridgedTo = Box2D.class)
public Box2D getValue(Accumulators.Envelope acc) {
if (acc.minX > acc.maxX) return null;
return new Box2D(acc.minX, acc.minY, acc.maxX, acc.maxY);
}

public void accumulate(
Accumulators.Envelope acc,
@DataTypeHint(
value = "RAW",
rawSerializer = GeometryTypeSerializer.class,
bridgedTo = Geometry.class)
Object o) {
if (o == null) return;
Geometry geometry = (Geometry) o;
if (geometry.isEmpty()) return;
Envelope envelope = geometry.getEnvelopeInternal();
acc.minX = Math.min(acc.minX, envelope.getMinX());
acc.minY = Math.min(acc.minY, envelope.getMinY());
acc.maxX = Math.max(acc.maxX, envelope.getMaxX());
acc.maxY = Math.max(acc.maxY, envelope.getMaxY());
}

public void retract(
Accumulators.Envelope acc,
@DataTypeHint(
value = "RAW",
rawSerializer = GeometryTypeSerializer.class,
bridgedTo = Geometry.class)
Object o) {
assert (false);
}

public void merge(Accumulators.Envelope acc, Iterable<Accumulators.Envelope> it) {
for (Accumulators.Envelope a : it) {
acc.minX = Math.min(acc.minX, a.minX);
acc.minY = Math.min(acc.minY, a.minY);
acc.maxX = Math.max(acc.maxX, a.maxX);
acc.maxY = Math.max(acc.maxY, a.maxY);
}
}

public void resetAccumulator(Accumulators.Envelope acc) {
acc.reset();
}
}

// Compute the Union boundary of numbers of geometries
//
@DataTypeHint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.apache.sedona.common.Functions;
import org.apache.sedona.common.enums.FileDataSplitter;
import org.apache.sedona.common.enums.GeometryType;
import org.apache.sedona.common.geometryObjects.Box2D;
import org.apache.sedona.common.utils.FormatUtils;
import org.apache.sedona.common.utils.GeoHashDecoder;
import org.apache.sedona.flink.Box2DTypeSerializer;
import org.apache.sedona.flink.GeometryTypeSerializer;
import org.locationtech.jts.geom.*;
import org.locationtech.jts.io.ParseException;
Expand Down Expand Up @@ -297,6 +299,39 @@ public Geometry eval(
}
}

public static class ST_MakeBox2D extends ScalarFunction {
@DataTypeHint(value = "RAW", rawSerializer = Box2DTypeSerializer.class, bridgedTo = Box2D.class)
public Box2D eval(
@DataTypeHint(
value = "RAW",
rawSerializer = GeometryTypeSerializer.class,
bridgedTo = Geometry.class)
Object lowerLeft,
@DataTypeHint(
value = "RAW",
rawSerializer = GeometryTypeSerializer.class,
bridgedTo = Geometry.class)
Object upperRight) {
return org.apache.sedona.common.Constructors.makeBox2D(
(Geometry) lowerLeft, (Geometry) upperRight);
}
}

public static class ST_GeomFromBox2D extends ScalarFunction {
@DataTypeHint(
value = "RAW",
rawSerializer = GeometryTypeSerializer.class,
bridgedTo = Geometry.class)
public Geometry eval(
@DataTypeHint(
value = "RAW",
rawSerializer = Box2DTypeSerializer.class,
bridgedTo = Box2D.class)
Box2D box) {
return org.apache.sedona.common.Constructors.geomFromBox2D(box);
}
}

public static class ST_PolygonFromEnvelope extends ScalarFunction {
@DataTypeHint(
value = "RAW",
Expand Down
Loading
Loading