diff --git a/docs/usecases/04-flood-snapshot.ipynb b/docs/usecases/04-flood-snapshot.ipynb new file mode 100644 index 00000000000..f37abd7a46a --- /dev/null +++ b/docs/usecases/04-flood-snapshot.ipynb @@ -0,0 +1,368 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": "\n\n# Buildings inside a flood extent \u2014 and which are critical infrastructure\n\nAn end-to-end disaster-response workflow. We answer:\n\n> **A flood happened over this AOI. Which buildings are inside the flood extent, and how many of them are hospitals, schools, or fire stations?**\n\n**Pipeline:**\n1. Synthesize a Sentinel-1-VV-style backscatter raster (low values where water sits flat \u2014 that's the flood signature SAR sees).\n2. Threshold with `RS_MapAlgebra` \u2192 binary water mask raster.\n3. Vectorize the mask: `RS_PixelAsPolygons` + `explode` + filter + `ST_Union_Agg` \u2192 a single dissolved flood polygon.\n4. `ST_Buffer` slightly to absorb SAR speckle.\n5. Synthesize building footprints labelled with infrastructure type (`residential` / `hospital` / `school` / `fire_station`).\n6. `ST_Intersects` spatial join to find affected buildings; flag any affected building whose `type IN ('hospital', 'school', 'fire_station')` as critical.\n7. Write the affected-buildings table as **GeoParquet 1.1** (auto covering-bbox + projjson CRS), read back, plot.\n\nAll inputs are synthesized in the notebook so it runs offline. To replace the synthesis cell with real Sentinel-1 imagery in production, point Sedona's [STAC reader](../tutorial/files/stac-sedona-spark.md) at an `earth-search` collection and feed the chosen scene's VV-polarization asset URL into the same `sedona.read.format(\"raster\")` call below.\n\n**Requires Sedona \u2265 1.9.0** for the auto-tiling raster reader (GH-2672) and the GeoParquet 1.1 writer's auto-populated covering-bbox + projjson CRS metadata (GH-2646, GH-2664)." + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Connect to Spark through SedonaContext" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sedona.spark import SedonaContext\n", + "\n", + "config = SedonaContext.builder().master(\"spark://localhost:7077\").getOrCreate()\n", + "sedona = SedonaContext.create(config)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Synthesize a Sentinel-1-VV-style backscatter raster\n", + "\n", + "Sentinel-1 SAR over flat water returns very low backscatter (water specularly reflects radar away from the satellite); over land the surface scatters energy back. We simulate that contrast with a single-band `float32` raster: high values everywhere except a circular flood patch in the south-west quadrant which we drop to low values. Written as a tiled GeoTIFF so the Sedona raster reader accepts it." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "import numpy as np\n", + "import rasterio\n", + "from rasterio.transform import from_bounds\n", + "\n", + "WORK = \"/tmp/flood-snapshot\"\n", + "os.makedirs(WORK, exist_ok=True)\n", + "\n", + "AOI = (-91.10, 41.50, -91.00, 41.60)\n", + "W = H = 128\n", + "transform = from_bounds(*AOI, W, H)\n", + "rng = np.random.default_rng(11)\n", + "\n", + "ys, xs = np.mgrid[0:H, 0:W]\n", + "flood_mask = ((xs - 32) ** 2 + (ys - 96) ** 2) < 28**2 # circle in SW corner\n", + "backscatter = 0.5 + 0.05 * rng.standard_normal((H, W)) # land\n", + "backscatter = np.where(\n", + " flood_mask, 0.05 + 0.02 * rng.standard_normal((H, W)), backscatter\n", + ")\n", + "backscatter = backscatter.clip(0, 1).astype(\"float32\")\n", + "\n", + "scene_path = f\"{WORK}/sentinel1_vv.tif\"\n", + "with rasterio.open(\n", + " scene_path,\n", + " \"w\",\n", + " driver=\"GTiff\",\n", + " tiled=True,\n", + " blockxsize=128,\n", + " blockysize=128,\n", + " height=H,\n", + " width=W,\n", + " count=1,\n", + " dtype=\"float32\",\n", + " crs=\"EPSG:4326\",\n", + " transform=transform,\n", + ") as dst:\n", + " dst.write(backscatter, 1)\n", + " dst.set_band_description(1, \"vv\")\n", + "\n", + "print(\n", + " f\"backscatter min={backscatter.min():.2f} max={backscatter.max():.2f} \"\n", + " f\"flood pixels={int(flood_mask.sum())}/{H*W}\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Threshold the backscatter into a water-mask raster\n", + "\n", + "Single-raster `RS_MapAlgebra` with a comparison expression: every pixel whose VV backscatter is below `0.2` becomes 1 (water), every other pixel becomes 0 (dry). The result is a `uint8` mask raster the same size and CRS as the input." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "scene = sedona.read.format(\"raster\").load(scene_path)\n", + "scene.createOrReplaceTempView(\"scene\")\n", + "\n", + "mask = sedona.sql(\"\"\"\n", + " SELECT x, y,\n", + " RS_MapAlgebra(\n", + " rast, 'B',\n", + " 'out[0] = rast[0] < 0.2 ? 1 : 0;'\n", + " ) AS rast\n", + " FROM scene\n", + "\"\"\")\n", + "mask.cache()\n", + "mask.createOrReplaceTempView(\"mask\")\n", + "mask.selectExpr(\n", + " \"x\",\n", + " \"y\",\n", + " \"RS_BandPixelType(rast, 1) as dtype\",\n", + " \"RS_Width(rast) as w\",\n", + " \"RS_Height(rast) as h\",\n", + ").show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Vectorize the mask into a single dissolved flood polygon\n", + "\n", + "`RS_PixelAsPolygons(raster, band)` returns an array of `{geom, val, x, y}` structs \u2014 one per pixel. We `explode` that into rows, keep only the wet pixels (`val = 1`), and dissolve them into a single `MultiPolygon` via `ST_Union_Agg`. A small `ST_Buffer` (0.0005\u00b0, ~50 m) absorbs SAR speckle along the edge of the flood." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": "wet_pixels = sedona.sql(\"\"\"\n SELECT explode(RS_PixelAsPolygons(rast, 1)) AS pixel\n FROM mask\n\"\"\").selectExpr(\"pixel.geom AS geom\", \"pixel.value AS value\").where(\"value = 1\")\nwet_pixels.createOrReplaceTempView(\"wet_pixels\")\n\nflood = sedona.sql(\"\"\"\n SELECT ST_Buffer(ST_Union_Agg(geom), 0.0005) AS geom\n FROM wet_pixels\n\"\"\")\nflood_geom = flood.first()[\"geom\"]\nprint(\n f\"flood polygon: type={flood_geom.geom_type}, \"\n f\"area={flood_geom.area:.6f} deg\u00b2, bounds={tuple(round(b, 4) for b in flood_geom.bounds)}\"\n)\n\nflood.createOrReplaceTempView(\"flood\")" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## 4. Vectorize the mask into a single dissolved flood polygon\n\n`RS_PixelAsPolygons(raster, band)` returns an array of `{geom, value, x, y}` structs \u2014 one per pixel. We `explode` that into rows, keep only the wet pixels (`value = 1`), and dissolve them into a single `MultiPolygon` via `ST_Union_Agg`. A small `ST_Buffer` (0.0005\u00b0, ~50 m) absorbs SAR speckle along the edge of the flood." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import Row\n", + "\n", + "xmin, ymin, xmax, ymax = AOI\n", + "step_x = (xmax - xmin) / 4\n", + "step_y = (ymax - ymin) / 4\n", + "half = 0.00125\n", + "types_grid = [\n", + " [\"residential\", \"hospital\", \"residential\", \"school\"],\n", + " [\"residential\", \"residential\", \"fire_station\", \"residential\"],\n", + " [\"school\", \"residential\", \"residential\", \"hospital\"],\n", + " [\"residential\", \"fire_station\", \"residential\", \"residential\"],\n", + "]\n", + "\n", + "building_rows = []\n", + "for ix in range(4):\n", + " for iy in range(4):\n", + " cx = xmin + (ix + 0.5) * step_x\n", + " cy = ymin + (iy + 0.5) * step_y\n", + " wkt = (\n", + " f\"POLYGON(({cx-half} {cy-half}, {cx+half} {cy-half}, \"\n", + " f\"{cx+half} {cy+half}, {cx-half} {cy+half}, {cx-half} {cy-half}))\"\n", + " )\n", + " building_rows.append(Row(bid=f\"B{ix}{iy}\", type=types_grid[iy][ix], wkt=wkt))\n", + "\n", + "buildings = sedona.createDataFrame(building_rows).selectExpr(\n", + " \"bid\", \"type\", \"ST_SetSRID(ST_GeomFromText(wkt), 4326) as geom\"\n", + ")\n", + "buildings.createOrReplaceTempView(\"buildings\")\n", + "buildings.show(16)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Find affected buildings + flag critical infrastructure\n", + "\n", + "`ST_Intersects(building, flood)` is the canonical \"affected by\" test. We add a `is_critical` flag for any affected building whose type is hospital / school / fire_station \u2014 those are the rows an emergency-management dashboard would surface first." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "affected = sedona.sql(\"\"\"\n", + " SELECT b.bid,\n", + " b.type,\n", + " b.type IN ('hospital', 'school', 'fire_station') AS is_critical,\n", + " b.geom\n", + " FROM buildings b, flood f\n", + " WHERE ST_Intersects(b.geom, f.geom)\n", + " ORDER BY is_critical DESC, b.bid\n", + "\"\"\")\n", + "affected.cache()\n", + "affected.createOrReplaceTempView(\"affected\")\n", + "affected.select(\"bid\", \"type\", \"is_critical\").show()\n", + "\n", + "summary = sedona.sql(\"\"\"\n", + " SELECT COUNT(*) AS affected_buildings,\n", + " SUM(CAST(is_critical AS INT)) AS affected_critical\n", + " FROM affected\n", + "\"\"\")\n", + "summary.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Persist the affected buildings as GeoParquet 1.1\n", + "\n", + "Auto-populated covering-bbox metadata + projjson CRS (derived from SRID 4326) make the result file ready for downstream tools to filter on bbox without scanning every row. Round-trip read-back confirms the metadata lands." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import shutil\n", + "\n", + "out_path = f\"{WORK}/affected_buildings.parquet\"\n", + "if os.path.exists(out_path):\n", + " shutil.rmtree(out_path)\n", + "affected.coalesce(1).write.format(\"geoparquet\").save(out_path)\n", + "\n", + "sedona.read.format(\"geoparquet\").load(out_path).select(\n", + " \"bid\", \"type\", \"is_critical\"\n", + ").show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. Visualize: backscatter + flood polygon + buildings color-coded\n", + "\n", + "Single matplotlib axes: VV backscatter as the basemap, the dissolved flood polygon overlaid in blue, building footprints filled by status \u2014 red for affected critical, orange for affected residential, white for unaffected." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import matplotlib.patches as mpatches\n", + "import matplotlib.pyplot as plt\n", + "\n", + "buildings_pdf = buildings.selectExpr(\"bid\", \"type\", \"ST_AsText(geom) as wkt\").toPandas()\n", + "affected_ids = set(r[\"bid\"] for r in affected.select(\"bid\").collect())\n", + "critical_affected_ids = set(\n", + " r[\"bid\"] for r in affected.where(\"is_critical\").select(\"bid\").collect()\n", + ")\n", + "\n", + "flood_wkt = sedona.sql(\"SELECT ST_AsText(geom) as wkt FROM flood\").first()[\"wkt\"]\n", + "\n", + "fig, ax = plt.subplots(1, 1, figsize=(7, 7))\n", + "extent = (AOI[0], AOI[2], AOI[1], AOI[3])\n", + "ax.imshow(backscatter, vmin=0, vmax=1, cmap=\"gray\", extent=extent)\n", + "\n", + "from shapely import wkt as shapely_wkt\n", + "\n", + "flood_shapely = shapely_wkt.loads(flood_wkt)\n", + "for poly in (\n", + " flood_shapely.geoms if hasattr(flood_shapely, \"geoms\") else [flood_shapely]\n", + "):\n", + " xs_p, ys_p = poly.exterior.xy\n", + " ax.fill(\n", + " xs_p, ys_p, facecolor=\"#268bd2\", edgecolor=\"#073642\", linewidth=1, alpha=0.5\n", + " )\n", + "\n", + "for _, row in buildings_pdf.iterrows():\n", + " bbox = row[\"wkt\"].split(\"((\")[1].split(\"))\")[0]\n", + " pts = [tuple(float(x) for x in p.strip().split()) for p in bbox.split(\",\")]\n", + " xs_b = [p[0] for p in pts]\n", + " ys_b = [p[1] for p in pts]\n", + " if row[\"bid\"] in critical_affected_ids:\n", + " face = \"#dc322f\" # red\n", + " elif row[\"bid\"] in affected_ids:\n", + " face = \"#cb4b16\" # orange\n", + " else:\n", + " face = \"#fdf6e3\" # cream\n", + " ax.fill(xs_b, ys_b, facecolor=face, edgecolor=\"black\", linewidth=0.6)\n", + " ax.text(\n", + " sum(xs_b[:-1]) / 4,\n", + " sum(ys_b[:-1]) / 4,\n", + " f\"{row['bid']}\\n{row['type'][:4]}\",\n", + " ha=\"center\",\n", + " va=\"center\",\n", + " fontsize=6,\n", + " )\n", + "\n", + "ax.set_title(\n", + " f\"Flood snapshot: {len(affected_ids)} buildings affected, \"\n", + " f\"{len(critical_affected_ids)} critical\"\n", + ")\n", + "ax.set_xlabel(\"longitude\")\n", + "ax.set_ylabel(\"latitude\")\n", + "ax.legend(\n", + " handles=[\n", + " mpatches.Patch(facecolor=\"#268bd2\", alpha=0.5, label=\"flood extent\"),\n", + " mpatches.Patch(facecolor=\"#dc322f\", label=\"affected critical\"),\n", + " mpatches.Patch(facecolor=\"#cb4b16\", label=\"affected residential\"),\n", + " mpatches.Patch(facecolor=\"#fdf6e3\", edgecolor=\"black\", label=\"unaffected\"),\n", + " ],\n", + " loc=\"upper right\",\n", + " fontsize=8,\n", + ")\n", + "fig.tight_layout()\n", + "fig" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## What just happened?\n", + "\n", + "An end-to-end disaster-response pipeline in a handful of SQL passes:\n", + "\n", + "1. Synthesized a SAR-VV-style backscatter raster.\n", + "2. Thresholded with single-raster `RS_MapAlgebra` to derive a binary water mask.\n", + "3. Vectorized the mask via `RS_PixelAsPolygons` + `explode` + `ST_Union_Agg` and absorbed speckle with `ST_Buffer` \u2014 the canonical raster\u2192vector dissolve in Sedona.\n", + "4. Built a Spark DataFrame of building footprints labelled with infrastructure type.\n", + "5. `ST_Intersects` spatial join surfaced the buildings inside the flood; a `type IN (...)` flag marked the critical ones.\n", + "6. Persisted to GeoParquet 1.1 (auto covering-bbox + projjson CRS), read back, plotted everything on a single matplotlib axes.\n", + "\n", + "Replace the synthesis cell in section 2 with `sedona.read.format(\"raster\").load(\"\")` and the rest of the pipeline runs unchanged on real Copernicus data." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}