Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 368 additions & 0 deletions docs/usecases/04-flood-snapshot.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,368 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": "<!--\n Licensed to the Apache Software Foundation (ASF) under one\n or more contributor license agreements. See the NOTICE file\n distributed with this work for additional information\n regarding copyright ownership. The ASF licenses this file\n to you under the Apache License, Version 2.0 (the\n \"License\"); you may not use this file except in compliance\n with the License. You may obtain a copy of the License at\n\n http://www.apache.org/licenses/LICENSE-2.0\n\n Unless required by applicable law or agreed to in writing,\n software distributed under the License is distributed on an\n \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n KIND, either express or implied. See the License for the\n specific language governing permissions and limitations\n under the License.\n-->\n\n# Buildings inside a flood extent \u2014 and which are critical infrastructure\n\nAn end-to-end disaster-response workflow. We answer:\n\n> **A flood happened over this AOI. Which buildings are inside the flood extent, and how many of them are hospitals, schools, or fire stations?**\n\n**Pipeline:**\n1. Synthesize a Sentinel-1-VV-style backscatter raster (low values where water sits flat \u2014 that's the flood signature SAR sees).\n2. Threshold with `RS_MapAlgebra` \u2192 binary water mask raster.\n3. Vectorize the mask: `RS_PixelAsPolygons` + `explode` + filter + `ST_Union_Agg` \u2192 a single dissolved flood polygon.\n4. `ST_Buffer` slightly to absorb SAR speckle.\n5. Synthesize building footprints labelled with infrastructure type (`residential` / `hospital` / `school` / `fire_station`).\n6. `ST_Intersects` spatial join to find affected buildings; flag any affected building whose `type IN ('hospital', 'school', 'fire_station')` as critical.\n7. Write the affected-buildings table as **GeoParquet 1.1** (auto covering-bbox + projjson CRS), read back, plot.\n\nAll inputs are synthesized in the notebook so it runs offline. To replace the synthesis cell with real Sentinel-1 imagery in production, point Sedona's [STAC reader](../tutorial/files/stac-sedona-spark.md) at an `earth-search` collection and feed the chosen scene's VV-polarization asset URL into the same `sedona.read.format(\"raster\")` call below.\n\n**Requires Sedona \u2265 1.9.0** for the auto-tiling raster reader (GH-2672) and the GeoParquet 1.1 writer's auto-populated covering-bbox + projjson CRS metadata (GH-2646, GH-2664)."
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Connect to Spark through SedonaContext"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sedona.spark import SedonaContext\n",
"\n",
"config = SedonaContext.builder().master(\"spark://localhost:7077\").getOrCreate()\n",
"sedona = SedonaContext.create(config)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Synthesize a Sentinel-1-VV-style backscatter raster\n",
"\n",
"Sentinel-1 SAR over flat water returns very low backscatter (water specularly reflects radar away from the satellite); over land the surface scatters energy back. We simulate that contrast with a single-band `float32` raster: high values everywhere except a circular flood patch in the south-west quadrant which we drop to low values. Written as a tiled GeoTIFF so the Sedona raster reader accepts it."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"import numpy as np\n",
"import rasterio\n",
"from rasterio.transform import from_bounds\n",
"\n",
"WORK = \"/tmp/flood-snapshot\"\n",
"os.makedirs(WORK, exist_ok=True)\n",
"\n",
"AOI = (-91.10, 41.50, -91.00, 41.60)\n",
"W = H = 128\n",
"transform = from_bounds(*AOI, W, H)\n",
"rng = np.random.default_rng(11)\n",
"\n",
"ys, xs = np.mgrid[0:H, 0:W]\n",
"flood_mask = ((xs - 32) ** 2 + (ys - 96) ** 2) < 28**2 # circle in SW corner\n",
"backscatter = 0.5 + 0.05 * rng.standard_normal((H, W)) # land\n",
"backscatter = np.where(\n",
" flood_mask, 0.05 + 0.02 * rng.standard_normal((H, W)), backscatter\n",
")\n",
"backscatter = backscatter.clip(0, 1).astype(\"float32\")\n",
"\n",
"scene_path = f\"{WORK}/sentinel1_vv.tif\"\n",
"with rasterio.open(\n",
" scene_path,\n",
" \"w\",\n",
" driver=\"GTiff\",\n",
" tiled=True,\n",
" blockxsize=128,\n",
" blockysize=128,\n",
" height=H,\n",
" width=W,\n",
" count=1,\n",
" dtype=\"float32\",\n",
" crs=\"EPSG:4326\",\n",
" transform=transform,\n",
") as dst:\n",
" dst.write(backscatter, 1)\n",
" dst.set_band_description(1, \"vv\")\n",
"\n",
"print(\n",
" f\"backscatter min={backscatter.min():.2f} max={backscatter.max():.2f} \"\n",
" f\"flood pixels={int(flood_mask.sum())}/{H*W}\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Threshold the backscatter into a water-mask raster\n",
"\n",
"Single-raster `RS_MapAlgebra` with a comparison expression: every pixel whose VV backscatter is below `0.2` becomes 1 (water), every other pixel becomes 0 (dry). The result is a `uint8` mask raster the same size and CRS as the input."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scene = sedona.read.format(\"raster\").load(scene_path)\n",
"scene.createOrReplaceTempView(\"scene\")\n",
"\n",
"mask = sedona.sql(\"\"\"\n",
" SELECT x, y,\n",
" RS_MapAlgebra(\n",
" rast, 'B',\n",
" 'out[0] = rast[0] < 0.2 ? 1 : 0;'\n",
" ) AS rast\n",
" FROM scene\n",
"\"\"\")\n",
"mask.cache()\n",
"mask.createOrReplaceTempView(\"mask\")\n",
"mask.selectExpr(\n",
" \"x\",\n",
" \"y\",\n",
" \"RS_BandPixelType(rast, 1) as dtype\",\n",
" \"RS_Width(rast) as w\",\n",
" \"RS_Height(rast) as h\",\n",
").show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Vectorize the mask into a single dissolved flood polygon\n",
"\n",
"`RS_PixelAsPolygons(raster, band)` returns an array of `{geom, val, x, y}` structs \u2014 one per pixel. We `explode` that into rows, keep only the wet pixels (`val = 1`), and dissolve them into a single `MultiPolygon` via `ST_Union_Agg`. A small `ST_Buffer` (0.0005\u00b0, ~50 m) absorbs SAR speckle along the edge of the flood."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": "wet_pixels = sedona.sql(\"\"\"\n SELECT explode(RS_PixelAsPolygons(rast, 1)) AS pixel\n FROM mask\n\"\"\").selectExpr(\"pixel.geom AS geom\", \"pixel.value AS value\").where(\"value = 1\")\nwet_pixels.createOrReplaceTempView(\"wet_pixels\")\n\nflood = sedona.sql(\"\"\"\n SELECT ST_Buffer(ST_Union_Agg(geom), 0.0005) AS geom\n FROM wet_pixels\n\"\"\")\nflood_geom = flood.first()[\"geom\"]\nprint(\n f\"flood polygon: type={flood_geom.geom_type}, \"\n f\"area={flood_geom.area:.6f} deg\u00b2, bounds={tuple(round(b, 4) for b in flood_geom.bounds)}\"\n)\n\nflood.createOrReplaceTempView(\"flood\")"
},
{
"cell_type": "markdown",
"metadata": {},
"source": "## 4. Vectorize the mask into a single dissolved flood polygon\n\n`RS_PixelAsPolygons(raster, band)` returns an array of `{geom, value, x, y}` structs \u2014 one per pixel. We `explode` that into rows, keep only the wet pixels (`value = 1`), and dissolve them into a single `MultiPolygon` via `ST_Union_Agg`. A small `ST_Buffer` (0.0005\u00b0, ~50 m) absorbs SAR speckle along the edge of the flood."
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import Row\n",
"\n",
"xmin, ymin, xmax, ymax = AOI\n",
"step_x = (xmax - xmin) / 4\n",
"step_y = (ymax - ymin) / 4\n",
"half = 0.00125\n",
"types_grid = [\n",
" [\"residential\", \"hospital\", \"residential\", \"school\"],\n",
" [\"residential\", \"residential\", \"fire_station\", \"residential\"],\n",
" [\"school\", \"residential\", \"residential\", \"hospital\"],\n",
" [\"residential\", \"fire_station\", \"residential\", \"residential\"],\n",
"]\n",
"\n",
"building_rows = []\n",
"for ix in range(4):\n",
" for iy in range(4):\n",
" cx = xmin + (ix + 0.5) * step_x\n",
" cy = ymin + (iy + 0.5) * step_y\n",
" wkt = (\n",
" f\"POLYGON(({cx-half} {cy-half}, {cx+half} {cy-half}, \"\n",
" f\"{cx+half} {cy+half}, {cx-half} {cy+half}, {cx-half} {cy-half}))\"\n",
" )\n",
" building_rows.append(Row(bid=f\"B{ix}{iy}\", type=types_grid[iy][ix], wkt=wkt))\n",
"\n",
"buildings = sedona.createDataFrame(building_rows).selectExpr(\n",
" \"bid\", \"type\", \"ST_SetSRID(ST_GeomFromText(wkt), 4326) as geom\"\n",
")\n",
"buildings.createOrReplaceTempView(\"buildings\")\n",
"buildings.show(16)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 6. Find affected buildings + flag critical infrastructure\n",
"\n",
"`ST_Intersects(building, flood)` is the canonical \"affected by\" test. We add a `is_critical` flag for any affected building whose type is hospital / school / fire_station \u2014 those are the rows an emergency-management dashboard would surface first."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"affected = sedona.sql(\"\"\"\n",
" SELECT b.bid,\n",
" b.type,\n",
" b.type IN ('hospital', 'school', 'fire_station') AS is_critical,\n",
" b.geom\n",
" FROM buildings b, flood f\n",
" WHERE ST_Intersects(b.geom, f.geom)\n",
" ORDER BY is_critical DESC, b.bid\n",
"\"\"\")\n",
"affected.cache()\n",
"affected.createOrReplaceTempView(\"affected\")\n",
"affected.select(\"bid\", \"type\", \"is_critical\").show()\n",
"\n",
"summary = sedona.sql(\"\"\"\n",
" SELECT COUNT(*) AS affected_buildings,\n",
" SUM(CAST(is_critical AS INT)) AS affected_critical\n",
" FROM affected\n",
"\"\"\")\n",
"summary.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 7. Persist the affected buildings as GeoParquet 1.1\n",
"\n",
"Auto-populated covering-bbox metadata + projjson CRS (derived from SRID 4326) make the result file ready for downstream tools to filter on bbox without scanning every row. Round-trip read-back confirms the metadata lands."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import shutil\n",
"\n",
"out_path = f\"{WORK}/affected_buildings.parquet\"\n",
"if os.path.exists(out_path):\n",
" shutil.rmtree(out_path)\n",
"affected.coalesce(1).write.format(\"geoparquet\").save(out_path)\n",
"\n",
"sedona.read.format(\"geoparquet\").load(out_path).select(\n",
" \"bid\", \"type\", \"is_critical\"\n",
").show(truncate=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 8. Visualize: backscatter + flood polygon + buildings color-coded\n",
"\n",
"Single matplotlib axes: VV backscatter as the basemap, the dissolved flood polygon overlaid in blue, building footprints filled by status \u2014 red for affected critical, orange for affected residential, white for unaffected."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.patches as mpatches\n",
"import matplotlib.pyplot as plt\n",
"\n",
"buildings_pdf = buildings.selectExpr(\"bid\", \"type\", \"ST_AsText(geom) as wkt\").toPandas()\n",
"affected_ids = set(r[\"bid\"] for r in affected.select(\"bid\").collect())\n",
"critical_affected_ids = set(\n",
" r[\"bid\"] for r in affected.where(\"is_critical\").select(\"bid\").collect()\n",
")\n",
"\n",
"flood_wkt = sedona.sql(\"SELECT ST_AsText(geom) as wkt FROM flood\").first()[\"wkt\"]\n",
"\n",
"fig, ax = plt.subplots(1, 1, figsize=(7, 7))\n",
"extent = (AOI[0], AOI[2], AOI[1], AOI[3])\n",
"ax.imshow(backscatter, vmin=0, vmax=1, cmap=\"gray\", extent=extent)\n",
"\n",
"from shapely import wkt as shapely_wkt\n",
"\n",
"flood_shapely = shapely_wkt.loads(flood_wkt)\n",
"for poly in (\n",
" flood_shapely.geoms if hasattr(flood_shapely, \"geoms\") else [flood_shapely]\n",
"):\n",
" xs_p, ys_p = poly.exterior.xy\n",
" ax.fill(\n",
" xs_p, ys_p, facecolor=\"#268bd2\", edgecolor=\"#073642\", linewidth=1, alpha=0.5\n",
" )\n",
"\n",
"for _, row in buildings_pdf.iterrows():\n",
" bbox = row[\"wkt\"].split(\"((\")[1].split(\"))\")[0]\n",
" pts = [tuple(float(x) for x in p.strip().split()) for p in bbox.split(\",\")]\n",
" xs_b = [p[0] for p in pts]\n",
" ys_b = [p[1] for p in pts]\n",
" if row[\"bid\"] in critical_affected_ids:\n",
" face = \"#dc322f\" # red\n",
" elif row[\"bid\"] in affected_ids:\n",
" face = \"#cb4b16\" # orange\n",
" else:\n",
" face = \"#fdf6e3\" # cream\n",
" ax.fill(xs_b, ys_b, facecolor=face, edgecolor=\"black\", linewidth=0.6)\n",
" ax.text(\n",
" sum(xs_b[:-1]) / 4,\n",
" sum(ys_b[:-1]) / 4,\n",
" f\"{row['bid']}\\n{row['type'][:4]}\",\n",
" ha=\"center\",\n",
" va=\"center\",\n",
" fontsize=6,\n",
" )\n",
"\n",
"ax.set_title(\n",
" f\"Flood snapshot: {len(affected_ids)} buildings affected, \"\n",
" f\"{len(critical_affected_ids)} critical\"\n",
")\n",
"ax.set_xlabel(\"longitude\")\n",
"ax.set_ylabel(\"latitude\")\n",
"ax.legend(\n",
" handles=[\n",
" mpatches.Patch(facecolor=\"#268bd2\", alpha=0.5, label=\"flood extent\"),\n",
" mpatches.Patch(facecolor=\"#dc322f\", label=\"affected critical\"),\n",
" mpatches.Patch(facecolor=\"#cb4b16\", label=\"affected residential\"),\n",
" mpatches.Patch(facecolor=\"#fdf6e3\", edgecolor=\"black\", label=\"unaffected\"),\n",
" ],\n",
" loc=\"upper right\",\n",
" fontsize=8,\n",
")\n",
"fig.tight_layout()\n",
"fig"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## What just happened?\n",
"\n",
"An end-to-end disaster-response pipeline in a handful of SQL passes:\n",
"\n",
"1. Synthesized a SAR-VV-style backscatter raster.\n",
"2. Thresholded with single-raster `RS_MapAlgebra` to derive a binary water mask.\n",
"3. Vectorized the mask via `RS_PixelAsPolygons` + `explode` + `ST_Union_Agg` and absorbed speckle with `ST_Buffer` \u2014 the canonical raster\u2192vector dissolve in Sedona.\n",
"4. Built a Spark DataFrame of building footprints labelled with infrastructure type.\n",
"5. `ST_Intersects` spatial join surfaced the buildings inside the flood; a `type IN (...)` flag marked the critical ones.\n",
"6. Persisted to GeoParquet 1.1 (auto covering-bbox + projjson CRS), read back, plotted everything on a single matplotlib axes.\n",
"\n",
"Replace the synthesis cell in section 2 with `sedona.read.format(\"raster\").load(\"<sentinel-1 vv asset URL discovered via sedona.read.format('stac')>\")` and the rest of the pipeline runs unchanged on real Copernicus data."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Loading