From 09b2eb102be7776bcd040353e61e1f7aef3e89cf Mon Sep 17 00:00:00 2001 From: Damian Wysokinski Date: Mon, 1 Dec 2025 22:26:23 +0100 Subject: [PATCH 1/8] add devcontianer functionality for api --- .devcontainer/api/Dockerfile | 62 +++++++++++++++++++ .devcontainer/api/README.md | 95 +++++++++++++++++++++++++++++ .devcontainer/api/devcontainer.json | 92 ++++++++++++++++++++++++++++ .devcontainer/api/launch.json | 51 ++++++++++++++++ .devcontainer/api/tasks.json | 36 +++++++++++ 5 files changed, 336 insertions(+) create mode 100644 .devcontainer/api/Dockerfile create mode 100644 .devcontainer/api/README.md create mode 100644 .devcontainer/api/devcontainer.json create mode 100644 .devcontainer/api/launch.json create mode 100644 .devcontainer/api/tasks.json diff --git a/.devcontainer/api/Dockerfile b/.devcontainer/api/Dockerfile new file mode 100644 index 0000000..edfee93 --- /dev/null +++ b/.devcontainer/api/Dockerfile @@ -0,0 +1,62 @@ +# Use Python base image for development - using Debian Bookworm (stable) +FROM mcr.microsoft.com/devcontainers/python:3.10-bookworm + +# Set timezone +ENV TZ="Europe/Helsinki" +ENV PYTHONUNBUFFERED=1 + +# Install system dependencies including GDAL for geopandas +RUN apt-get update && apt-get install -y \ + postgresql-client \ + curl \ + wget \ + git \ + gdal-bin \ + libgdal-dev \ + libgeos-dev \ + libproj-dev \ + libspatialindex-dev \ + build-essential \ + && apt-get clean -y \ + && rm -rf /var/lib/apt/lists/* + +# Install Azure Functions Core Tools (only on AMD64, skip on ARM64/Apple Silicon) +RUN ARCH=$(dpkg --print-architecture) && \ + if [ "$ARCH" = "amd64" ]; then \ + curl https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor > microsoft.gpg && \ + mv microsoft.gpg /etc/apt/trusted.gpg.d/microsoft.gpg && \ + sh -c 'echo "deb [arch=amd64] https://packages.microsoft.com/debian/12/prod bookworm main" > /etc/apt/sources.list.d/dotnetdev.list' && \ + apt-get update && \ + apt-get install -y azure-functions-core-tools-4 && \ + apt-get clean -y && \ + rm -rf /var/lib/apt/lists/*; \ + else \ + echo "Azure Functions Core Tools not available for $ARCH architecture. Skipping installation."; \ + fi + +# Set working directory +WORKDIR /workspace + +# Install Python dependencies +COPY python/requirements.txt /tmp/requirements.txt + +# Set GDAL environment variables for building Python packages +ENV GDAL_CONFIG=/usr/bin/gdal-config +ENV CPLUS_INCLUDE_PATH=/usr/include/gdal +ENV C_INCLUDE_PATH=/usr/include/gdal + +RUN pip install --upgrade pip \ + && pip install debugpy pytest pytest-asyncio httpx \ + && pip install -r /tmp/requirements.txt + +# Create .vscode-server directories with correct permissions +RUN mkdir -p /home/vscode/.vscode-server/bin \ + && mkdir -p /home/vscode/.vscode-server/data/Machine \ + && mkdir -p /home/vscode/.vscode-server/extensions \ + && mkdir -p /home/vscode/.vscode-server-insiders/bin \ + && mkdir -p /home/vscode/.vscode-server-insiders/extensions \ + && chown -R vscode:vscode /home/vscode + +# The vscode user already exists in the base image +# Switch to non-root user +USER vscode diff --git a/.devcontainer/api/README.md b/.devcontainer/api/README.md new file mode 100644 index 0000000..ee2aab9 --- /dev/null +++ b/.devcontainer/api/README.md @@ -0,0 +1,95 @@ +# HFP Analytics API Development Container + +This devcontainer provides a complete development environment for the HFP Analytics FastAPI application. + +## What's Included + +- **Python 3.10** with all project dependencies +- **Azure Functions Core Tools** for Azure development (AMD64 only) +- **PostgreSQL client** for database operations +- **TimescaleDB** database instance +- **Azurite** for local Azure Storage emulation +- **VS Code extensions** for Python, Azure, Docker, and more +- **ARM64 (Apple Silicon) support** - works on M1/M2/M3 Macs + +## Getting Started + +1. **Open in Dev Container** + - Open this folder in VS Code + - Press `F1` and select "Dev Containers: Reopen in Container" + - Wait for the container to build and start + +2. **Run the FastAPI App** + ```bash + cd python + uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload + ``` + + Or use the VS Code task: `Terminal > Run Task > Run FastAPI Development Server` + +3. **Access the Application** + - FastAPI: http://localhost:8000 + - API Docs: http://localhost:8000/docs + - PostgreSQL: localhost:5433 (user: postgres, password: postgres) + - Azurite Blob: http://localhost:10100 + - Azurite Queue: http://localhost:10101 + - Azurite Table: http://localhost:10102 + +## Debugging + +Use the built-in debugger: +1. Set breakpoints in your code +2. Press `F5` or go to "Run and Debug" +3. Select "Python: FastAPI" configuration + +## Database Setup + +Initialize the database schema: +```bash +psql -h localhost -U postgres -d analytics -f db/sql/100_create_global_objects.sql +``` + +Or use the VS Code task: `Terminal > Run Task > Database: Run Migrations` + +## Running Tests + +```bash +cd python +pytest +``` + +Or use the VS Code task: `Terminal > Run Task > Run Tests` + +## Tips + +- All changes to Python files trigger auto-reload +- Use `.env` file for environment variables +- The workspace is mounted at `/workspace` +- Extensions and settings are pre-configured + +## Ports + +| Service | Port | Description | +|---------|------|-------------| +| FastAPI | 8000 | Main API server | +| PostgreSQL | 5433 | TimescaleDB database (remapped to avoid conflicts) | +| Azurite Blob | 10100 | Azure Blob Storage emulator (remapped) | +| Azurite Queue | 10101 | Azure Queue Storage emulator (remapped) | +| Azurite Table | 10102 | Azure Table Storage emulator (remapped) | + +## Troubleshooting + +**Container won't start:** +- Make sure Docker is running +- Check if ports 5433, 8000, 10100, 10101, 10102 are available +- Port 5433 is used instead of 5432 to avoid conflicts with local PostgreSQL +- Ports 10100-10102 are used for Azurite to avoid conflicts with main docker-compose +- Try rebuilding: `F1 > Dev Containers: Rebuild Container` + +**Database connection issues:** +- Wait for the database health check to pass (about 10-15 seconds) +- Check connection string in `.env` file + +**Import errors:** +- Make sure PYTHONPATH is set correctly: `export PYTHONPATH=/workspace/python` +- Reinstall dependencies: `pip install -r python/requirements.txt` diff --git a/.devcontainer/api/devcontainer.json b/.devcontainer/api/devcontainer.json new file mode 100644 index 0000000..014000a --- /dev/null +++ b/.devcontainer/api/devcontainer.json @@ -0,0 +1,92 @@ +{ + "name": "HFP Analytics API (FastAPI)", + "dockerComposeFile": [ + "../docker-compose.yml" + ], + "service": "api", + "workspaceFolder": "/workspace", + "shutdownAction": "stopCompose", + + // Forward ports for the API, database, and Azurite + "forwardPorts": [ + 8000, // FastAPI + 5433, // PostgreSQL (remapped to avoid conflict with host) + 10100, // Azurite Blob (remapped to avoid conflict) + 10101, // Azurite Queue (remapped to avoid conflict) + 10102 // Azurite Table (remapped to avoid conflict) + ], + + "portsAttributes": { + "8000": { + "label": "FastAPI", + "onAutoForward": "notify" + }, + "5433": { + "label": "PostgreSQL" + }, + "10100": { + "label": "Azurite Blob" + }, + "10101": { + "label": "Azurite Queue" + }, + "10102": { + "label": "Azurite Table" + } + }, + + "customizations": { + "vscode": { + "extensions": [ + "ms-python.python", + "ms-python.vscode-pylance", + "ms-python.black-formatter", + "ms-python.isort", + "ms-python.debugpy", + "ms-azuretools.vscode-azurefunctions", + "ms-azuretools.vscode-docker", + "mtxr.sqltools", + "mtxr.sqltools-driver-pg", + "GitHub.copilot", + "esbenp.prettier-vscode" + ], + "settings": { + "python.defaultInterpreterPath": "/usr/local/bin/python", + "python.linting.enabled": true, + "python.linting.pylintEnabled": false, + "python.formatting.provider": "black", + "python.testing.pytestEnabled": true, + "python.testing.unittestEnabled": false, + "editor.formatOnSave": true, + "editor.codeActionsOnSave": { + "source.organizeImports": "explicit" + }, + "[python]": { + "editor.defaultFormatter": "ms-python.python" + } + } + } + }, + + // Commands to run after container is created + "postCreateCommand": "pip install -r python/requirements.txt && pip install debugpy pytest pytest-asyncio httpx", + + // Commands to run when attaching to existing container + "postAttachCommand": "echo 'Container attached. Run: cd python && uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload'", + + // Features to add to the container + "features": { + "ghcr.io/devcontainers/features/git:1": {}, + "ghcr.io/devcontainers/features/github-cli:1": {} + }, + + // Set environment variables + "containerEnv": { + "PYTHONPATH": "/workspace/python", + "PYTHONUNBUFFERED": "1", + "TZ": "Europe/Helsinki" + }, + + // Run as non-root user + "remoteUser": "vscode" +} \ No newline at end of file diff --git a/.devcontainer/api/launch.json b/.devcontainer/api/launch.json new file mode 100644 index 0000000..74444f3 --- /dev/null +++ b/.devcontainer/api/launch.json @@ -0,0 +1,51 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Python: FastAPI", + "type": "debugpy", + "request": "launch", + "module": "uvicorn", + "args": [ + "api.main:app", + "--host", + "0.0.0.0", + "--port", + "8000", + "--reload" + ], + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python" + }, + "jinja": true, + "justMyCode": false + }, + { + "name": "Python: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python" + } + }, + { + "name": "Python: Pytest", + "type": "debugpy", + "request": "launch", + "module": "pytest", + "args": [ + "-v", + "${file}" + ], + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python" + } + } + ] +} diff --git a/.devcontainer/api/tasks.json b/.devcontainer/api/tasks.json new file mode 100644 index 0000000..934352d --- /dev/null +++ b/.devcontainer/api/tasks.json @@ -0,0 +1,36 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "Run FastAPI Development Server", + "type": "shell", + "command": "cd python && uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload", + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "new" + }, + "isBackground": true, + "group": { + "kind": "build", + "isDefault": true + } + }, + { + "label": "Run Tests", + "type": "shell", + "command": "cd python && pytest", + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "shared" + } + }, + { + "label": "Database: Run Migrations", + "type": "shell", + "command": "psql -h localhost -U postgres -d analytics -f db/sql/100_create_global_objects.sql", + "problemMatcher": [] + } + ] +} From ac1c3f0fa6a96393054949ebafb9c875907cfef3 Mon Sep 17 00:00:00 2001 From: Damian Wysokinski Date: Tue, 2 Dec 2025 09:37:29 +0100 Subject: [PATCH 2/8] added docker-compose for devcontainer --- .devcontainer/docker-compose.yml | 56 ++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 .devcontainer/docker-compose.yml diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml new file mode 100644 index 0000000..7b500ec --- /dev/null +++ b/.devcontainer/docker-compose.yml @@ -0,0 +1,56 @@ +services: + api: + build: + context: .. + dockerfile: .devcontainer/api/Dockerfile + volumes: + # Mount the workspace + - ..:/workspace:cached + # Mount vscode server extensions for persistence + - vscode-server-extensions:/home/vscode/.vscode-server/extensions + - vscode-server-insiders-extensions:/home/vscode/.vscode-server-insiders/extensions + command: sleep infinity + network_mode: service:db + depends_on: + - db + - azurite + environment: + - PYTHONUNBUFFERED=1 + - TZ=Europe/Helsinki + env_file: + - ../.env + + db: + image: timescale/timescaledb:latest-pg16 + restart: unless-stopped + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: analytics + ports: + - "127.0.0.1:5433:5432" + volumes: + - postgres-data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 5s + start_period: 5s + timeout: 20s + retries: 3 + + azurite: + image: mcr.microsoft.com/azure-storage/azurite + restart: unless-stopped + ports: + - "127.0.0.1:10100:10000" + - "127.0.0.1:10101:10001" + - "127.0.0.1:10102:10002" + volumes: + - azurite-data:/data + command: azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0 + +volumes: + postgres-data: + azurite-data: + vscode-server-extensions: + vscode-server-insiders-extensions: From f61dba84229abd0e5dde5e588bece9ee3eaa51e5 Mon Sep 17 00:00:00 2001 From: Damian Wysokinski Date: Tue, 2 Dec 2025 13:14:35 +0100 Subject: [PATCH 3/8] added orchestrator config --- .devcontainer/api/devcontainer.json | 10 +- .devcontainer/docker-compose.yml | 25 ++- .devcontainer/importer/Dockerfile | 71 ++++++++ .devcontainer/importer/README.md | 208 +++++++++++++++++++++++ .devcontainer/importer/devcontainer.json | 105 ++++++++++++ .devcontainer/importer/launch.json | 99 +++++++++++ .devcontainer/importer/tasks.json | 86 ++++++++++ 7 files changed, 602 insertions(+), 2 deletions(-) create mode 100644 .devcontainer/importer/Dockerfile create mode 100644 .devcontainer/importer/README.md create mode 100644 .devcontainer/importer/devcontainer.json create mode 100644 .devcontainer/importer/launch.json create mode 100644 .devcontainer/importer/tasks.json diff --git a/.devcontainer/api/devcontainer.json b/.devcontainer/api/devcontainer.json index 014000a..ec8ef54 100644 --- a/.devcontainer/api/devcontainer.json +++ b/.devcontainer/api/devcontainer.json @@ -63,13 +63,21 @@ }, "[python]": { "editor.defaultFormatter": "ms-python.python" + }, + "launch": { + "configurations": [], + "compounds": [] + }, + "tasks": { + "version": "2.0.0", + "tasks": [] } } } }, // Commands to run after container is created - "postCreateCommand": "pip install -r python/requirements.txt && pip install debugpy pytest pytest-asyncio httpx", + "postCreateCommand": "pip install -r python/requirements.txt && pip install debugpy pytest pytest-asyncio httpx && mkdir -p /workspace/.vscode && cp /workspace/.devcontainer/shared/launch.json /workspace/.vscode/launch.json && cp /workspace/.devcontainer/shared/tasks.json /workspace/.vscode/tasks.json", // Commands to run when attaching to existing container "postAttachCommand": "echo 'Container attached. Run: cd python && uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload'", diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 7b500ec..57b8b3d 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -10,7 +10,6 @@ services: - vscode-server-extensions:/home/vscode/.vscode-server/extensions - vscode-server-insiders-extensions:/home/vscode/.vscode-server-insiders/extensions command: sleep infinity - network_mode: service:db depends_on: - db - azurite @@ -20,6 +19,28 @@ services: env_file: - ../.env + importer: + build: + context: .. + dockerfile: .devcontainer/importer/Dockerfile + volumes: + # Mount the workspace + - ..:/workspace:cached + # Mount vscode server extensions for persistence + - vscode-server-extensions-importer:/home/azurefunc/.vscode-server/extensions + - vscode-server-insiders-extensions-importer:/home/azurefunc/.vscode-server-insiders/extensions + command: sleep infinity + depends_on: + - db + - azurite + environment: + - PYTHONUNBUFFERED=1 + - TZ=Europe/Helsinki + - AzureWebJobsScriptRoot=/home/site/wwwroot + - WEBSITE_HOSTNAME=localhost:7072 + env_file: + - ../.env + db: image: timescale/timescaledb:latest-pg16 restart: unless-stopped @@ -54,3 +75,5 @@ volumes: azurite-data: vscode-server-extensions: vscode-server-insiders-extensions: + vscode-server-extensions-importer: + vscode-server-insiders-extensions-importer: diff --git a/.devcontainer/importer/Dockerfile b/.devcontainer/importer/Dockerfile new file mode 100644 index 0000000..ece17eb --- /dev/null +++ b/.devcontainer/importer/Dockerfile @@ -0,0 +1,71 @@ +# Use Python base image for development - using Debian Bookworm (stable) +FROM mcr.microsoft.com/devcontainers/python:3.10-bookworm + +# Set timezone +ENV TZ="Europe/Helsinki" +ENV PYTHONUNBUFFERED=1 + +# Install system dependencies including GDAL for geopandas +RUN apt-get update && apt-get install -y \ + postgresql-client \ + curl \ + wget \ + git \ + gdal-bin \ + libgdal-dev \ + libgeos-dev \ + libproj-dev \ + libspatialindex-dev \ + build-essential \ + && apt-get clean -y \ + && rm -rf /var/lib/apt/lists/* + +# Install Node.js (needed for Azure Functions Core Tools on ARM64) +RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - && \ + apt-get install -y nodejs && \ + apt-get clean -y && \ + rm -rf /var/lib/apt/lists/* + +# Install Azure Functions Core Tools +# - AMD64: Install from Microsoft repository +# - ARM64: Install via npm (official method for ARM/Apple Silicon) +RUN ARCH=$(dpkg --print-architecture) && \ + if [ "$ARCH" = "amd64" ]; then \ + curl https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor > microsoft.gpg && \ + mv microsoft.gpg /etc/apt/trusted.gpg.d/microsoft.gpg && \ + sh -c 'echo "deb [arch=amd64] https://packages.microsoft.com/debian/12/prod bookworm main" > /etc/apt/sources.list.d/dotnetdev.list' && \ + apt-get update && \ + apt-get install -y azure-functions-core-tools-4 && \ + apt-get clean -y && \ + rm -rf /var/lib/apt/lists/*; \ + else \ + echo "Installing Azure Functions Core Tools via npm for $ARCH architecture..."; \ + npm install -g azure-functions-core-tools@4 --unsafe-perm true; \ + fi + +# Set working directory +WORKDIR /workspace + +# Install Python dependencies +COPY python/requirements.txt /tmp/requirements.txt + +# Set GDAL environment variables for building Python packages +ENV GDAL_CONFIG=/usr/bin/gdal-config +ENV CPLUS_INCLUDE_PATH=/usr/include/gdal +ENV C_INCLUDE_PATH=/usr/include/gdal + +RUN pip install --upgrade pip \ + && pip install debugpy pytest pytest-asyncio httpx \ + && pip install -r /tmp/requirements.txt + +# Create .vscode-server directories with correct permissions +RUN mkdir -p /home/vscode/.vscode-server/bin \ + && mkdir -p /home/vscode/.vscode-server/data/Machine \ + && mkdir -p /home/vscode/.vscode-server/extensions \ + && mkdir -p /home/vscode/.vscode-server-insiders/bin \ + && mkdir -p /home/vscode/.vscode-server-insiders/extensions \ + && chown -R vscode:vscode /home/vscode + +# The vscode user already exists in the base image +# Switch to non-root user +USER vscode diff --git a/.devcontainer/importer/README.md b/.devcontainer/importer/README.md new file mode 100644 index 0000000..c6c36e5 --- /dev/null +++ b/.devcontainer/importer/README.md @@ -0,0 +1,208 @@ +# HFP Analytics Importer Development Container + +This devcontainer provides a complete development environment for the HFP Analytics Azure Functions (Importer) application. + +## What's Included + +- **Python 3.10** with all project dependencies +- **Azure Functions Core Tools v4** for local Azure Functions development +- **PostgreSQL client** for database operations +- **TimescaleDB** database instance +- **Azurite** for local Azure Storage emulation +- **VS Code extensions** for Python, Azure Functions, Docker, and more +- **ARM64 (Apple Silicon) support** - works on M1/M2/M3 Macs + +## Getting Started + +1. **Open in Dev Container** + - Open the `.devcontainer/importer` folder in VS Code + - Press `F1` and select "Dev Containers: Reopen in Container" + - Wait for the container to build and start + +2. **Start Azure Functions** + + **Method 1: Using VS Code Task (Recommended)** + - Press `Ctrl+Shift+P` (or `Cmd+Shift+P` on Mac) + - Select `Tasks: Run Task` + - Choose `Start Azure Functions` + + **Method 2: Using Terminal** + ```bash + cd python + func start --port 7071 + ``` + + **Method 3: Using the built-in command** + ```bash + cd python + func host start --port 7071 + ``` + +3. **Access the Application** + - Azure Functions: http://localhost:7071 + - Function Admin: http://localhost:7071/admin/functions + - PostgreSQL: localhost:5433 (user: postgres, password: postgres) - shared with API + - Azurite Blob: http://localhost:10100 - shared with API + - Azurite Queue: http://localhost:10101 - shared with API + - Azurite Table: http://localhost:10102 - shared with API + +## Available Functions + +The importer includes these Azure Functions: +- **httpPreprocess** - HTTP-triggered preprocessing function +- **importer** - Main import function +- **analyzer** - Analysis function +- **preprocess** - Preprocessing function +- **httpStart** (Durable) - Durable orchestration starter +- **orchestrator** (Durable) - Durable orchestrator +- **reclusterAnalysisActivity** (Durable) - Recluster analysis activity +- **setStatusActivity** (Durable) - Status setter activity +- **getStatusActivity** (Durable) - Status getter activity + +## Testing Functions + +### Test HTTP Function +```bash +# Test httpPreprocess +curl -X POST http://localhost:7071/httpPreprocess + +# Or use VS Code task: +# Ctrl+Shift+P -> Tasks: Run Task -> Trigger HTTP Function (httpPreprocess) +``` + +### View Function Logs +Azure Functions logs appear in the terminal where you ran `func start`. + +## Debugging + +### Debug Azure Functions: +1. Start Azure Functions with the task or terminal command +2. Set breakpoints in your Python code +3. Use `F5` or go to "Run and Debug" +4. Select "Attach to Python Functions" +5. Trigger your function via HTTP, timer, or queue + +### Debug Current File: +1. Open a Python file +2. Set breakpoints +3. Press `F5` +4. Select "Python: Current File" + +## Database Setup + +Initialize the database schema: +```bash +psql -h localhost -U postgres -d analytics -f db/sql/100_create_global_objects.sql +``` + +Or use the VS Code task: `Terminal > Run Task > Database: Run Migrations` + +## Running Tests + +```bash +cd python +pytest +``` + +Or use the VS Code task: `Terminal > Run Task > Run Tests` + +## Azure Functions Development Tips + +1. **Hot Reload**: Azure Functions automatically reload when you save Python files + +2. **View Function List**: + ```bash + cd python + func list + ``` + +3. **Check Function Status**: + ```bash + curl http://localhost:7071/admin/functions + ``` + +4. **Environment Variables**: Configure in `.env` file at workspace root + +5. **Function Configuration**: Each function has a `function.json` in its folder + +6. **Host Configuration**: Global settings in `python/host.json` + +## Ports + +| Service | Port | Description | +|---------|------|-------------| +| Azure Functions | 7071 | Main Functions host | +| PostgreSQL | 5433 | TimescaleDB database (shared with API) | +| Azurite Blob | 10100 | Azure Blob Storage emulator (shared with API) | +| Azurite Queue | 10101 | Azure Queue Storage emulator (shared with API) | +| Azurite Table | 10102 | Azure Table Storage emulator (shared with API) | + +## Troubleshooting + +**Container won't start:** +- Make sure Docker is running +- Check if ports 5433, 7071, 10100, 10101, 10102 are available +- Database and Azurite are shared with the API container +- Ports are remapped to avoid conflicts with main docker-compose +- Try rebuilding: `F1 > Dev Containers: Rebuild Container` + +**Functions won't start:** +- Verify you're in the `python` directory: `cd python` +- Check `host.json` exists in `python/` folder +- Ensure all function folders have `function.json` +- Check Azure Functions Core Tools: `func --version` + +**Database connection issues:** +- Wait for the database health check to pass (about 10-15 seconds) +- Check connection string in `.env` file +- Use port 5434 (not 5432) from host machine + +**Import errors:** +- Make sure PYTHONPATH is set: `export PYTHONPATH=/workspace/python` +- Reinstall dependencies: `pip install -r python/requirements.txt` + +**Storage connection errors:** +- Check Azurite is running: `docker ps | grep azurite` +- Verify storage connection string in `.env` uses correct ports (10100-10102) +- Default connection string: `DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10100/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10101/devstoreaccount1;TableEndpoint=http://127.0.0.1:10102/devstoreaccount1;` + +## Project Structure + +``` +python/ +├── host.json # Azure Functions host configuration +├── requirements.txt # Python dependencies +├── importer/ # Importer function +│ ├── function.json +│ └── __init__.py +├── analyzer/ # Analyzer function +├── preprocess/ # Preprocess function +├── httpPreprocess/ # HTTP preprocess function +├── durable/ # Durable functions +│ ├── httpStart/ +│ ├── orchestrator/ +│ └── *Activity/ +└── common/ # Shared code +``` + +## Useful Commands + +```bash +# List all functions +cd python && func list + +# Start with verbose logging +cd python && func start --port 7071 --verbose + +# Install new dependency +pip install && pip freeze > python/requirements.txt + +# Run specific test file +cd python && pytest tests/test_importer.py -v + +# Check Azure Functions version +func --version + +# View function templates +func templates list +``` diff --git a/.devcontainer/importer/devcontainer.json b/.devcontainer/importer/devcontainer.json new file mode 100644 index 0000000..8985bdb --- /dev/null +++ b/.devcontainer/importer/devcontainer.json @@ -0,0 +1,105 @@ +{ + "name": "HFP Analytics Importer (Azure Functions)", + "dockerComposeFile": [ + "../docker-compose.yml" + ], + "service": "importer", + "workspaceFolder": "/workspace", + "shutdownAction": "stopCompose", + + // Forward ports for Azure Functions, database, and Azurite + // Note: Database and Azurite are shared with API container + "forwardPorts": [ + 7071, // Azure Functions + 5433, // PostgreSQL (shared with API) + 10100, // Azurite Blob (shared with API) + 10101, // Azurite Queue (shared with API) + 10102 // Azurite Table (shared with API) + ], + + "portsAttributes": { + "7071": { + "label": "Azure Functions", + "onAutoForward": "notify" + }, + "5433": { + "label": "PostgreSQL" + }, + "10100": { + "label": "Azurite Blob" + }, + "10101": { + "label": "Azurite Queue" + }, + "10102": { + "label": "Azurite Table" + } + }, + + "customizations": { + "vscode": { + "extensions": [ + "ms-python.python", + "ms-python.vscode-pylance", + "ms-python.black-formatter", + "ms-python.isort", + "ms-python.debugpy", + "ms-azuretools.vscode-azurefunctions", + "ms-azuretools.vscode-docker", + "mtxr.sqltools", + "mtxr.sqltools-driver-pg", + "GitHub.copilot", + "esbenp.prettier-vscode" + ], + "settings": { + "python.defaultInterpreterPath": "/usr/local/bin/python", + "python.linting.enabled": true, + "python.linting.pylintEnabled": false, + "python.formatting.provider": "black", + "python.testing.pytestEnabled": true, + "python.testing.unittestEnabled": false, + "editor.formatOnSave": true, + "editor.codeActionsOnSave": { + "source.organizeImports": "explicit" + }, + "[python]": { + "editor.defaultFormatter": "ms-python.python" + }, + "azureFunctions.deploySubpath": "python", + "azureFunctions.projectRuntime": "~4", + "azureFunctions.projectLanguage": "Python", + "launch": { + "configurations": [], + "compounds": [] + }, + "tasks": { + "version": "2.0.0", + "tasks": [] + } + } + } + }, + + // Commands to run after container is created + "postCreateCommand": "pip install -r python/requirements.txt && pip install debugpy pytest pytest-asyncio httpx && mkdir -p /workspace/.vscode && cp /workspace/.devcontainer/shared/launch.json /workspace/.vscode/launch.json && cp /workspace/.devcontainer/shared/tasks.json /workspace/.vscode/tasks.json && cd /workspace/python && ln -sf durable/httpStart httpStart && ln -sf durable/orchestrator orchestrator && ln -sf durable/getStatusActivity getStatusActivity && ln -sf durable/setStatusActivity setStatusActivity && ln -sf durable/reclusterAnalysisActivity reclusterAnalysisActivity", + + // Commands to run when attaching to existing container + "postAttachCommand": "echo 'Container attached. To start Azure Functions:\n cd python\n func start --port 7071'", + + // Features to add to the container + "features": { + "ghcr.io/devcontainers/features/git:1": {}, + "ghcr.io/devcontainers/features/github-cli:1": {}, + "ghcr.io/devcontainers/features/azure-cli:1": {} + }, + + // Set environment variables + "containerEnv": { + "PYTHONPATH": "/workspace/python", + "PYTHONUNBUFFERED": "1", + "TZ": "Europe/Helsinki" + }, + + // Run as non-root user + "remoteUser": "vscode" +} diff --git a/.devcontainer/importer/launch.json b/.devcontainer/importer/launch.json new file mode 100644 index 0000000..4eafcd7 --- /dev/null +++ b/.devcontainer/importer/launch.json @@ -0,0 +1,99 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Azure Functions: All Functions (Start All)", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "localhost", + "port": 9091 + }, + "preLaunchTask": "Start Azure Functions with Debugger", + "pathMappings": [ + { + "localRoot": "${workspaceFolder}/python", + "remoteRoot": "/home/site/wwwroot" + } + ], + "justMyCode": false + }, + { + "name": "Azure Functions: Debug", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "localhost", + "port": 9091 + }, + "preLaunchTask": "Start Azure Functions with Debugger", + "pathMappings": [ + { + "localRoot": "${workspaceFolder}/python", + "remoteRoot": "/home/site/wwwroot" + } + ] + }, + { + "name": "Attach to Python Functions", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "localhost", + "port": 9091 + }, + "pathMappings": [ + { + "localRoot": "${workspaceFolder}/python", + "remoteRoot": "/home/site/wwwroot" + } + ] + }, + { + "name": "Azure Functions: Python (Local)", + "type": "debugpy", + "request": "launch", + "module": "azure_functions_worker", + "args": [ + "--host", + "127.0.0.1", + "--port", + "7071", + "--worker-id", + "test-worker" + ], + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python", + "AzureWebJobsScriptRoot": "${workspaceFolder}/python" + }, + "console": "integratedTerminal" + }, + { + "name": "Python: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python" + } + }, + { + "name": "Python: Pytest", + "type": "debugpy", + "request": "launch", + "module": "pytest", + "args": [ + "-v", + "${file}" + ], + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python" + } + } + ] +} diff --git a/.devcontainer/importer/tasks.json b/.devcontainer/importer/tasks.json new file mode 100644 index 0000000..ea5ea3c --- /dev/null +++ b/.devcontainer/importer/tasks.json @@ -0,0 +1,86 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "Start Azure Functions", + "type": "shell", + "command": "cd python && func start --port 7071", + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "new" + }, + "isBackground": true, + "group": "none" + }, + { + "label": "Start Azure Functions (Debug Mode)", + "type": "shell", + "command": "cd python && func start --port 7071 --verbose", + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "new" + }, + "isBackground": true + }, + { + "label": "Start Azure Functions with Debugger", + "type": "shell", + "command": "cd python && func start --port 7071 --python-debug-port 9091", + "problemMatcher": [ + { + "pattern": [ + { + "regexp": ".", + "file": 1, + "location": 2, + "message": 3 + } + ], + "background": { + "activeOnStart": true, + "beginsPattern": ".*Azure Functions Core Tools.*", + "endsPattern": ".*(Worker process started and initialized|Host started|Job host started).*" + } + } + ], + "presentation": { + "reveal": "always", + "panel": "dedicated" + }, + "isBackground": true, + "runOptions": { + "instanceLimit": 1 + } + }, + { + "label": "Install Python Dependencies", + "type": "shell", + "command": "pip install -r python/requirements.txt", + "problemMatcher": [] + }, + { + "label": "Run Tests", + "type": "shell", + "command": "cd python && pytest", + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "shared" + } + }, + { + "label": "Database: Run Migrations", + "type": "shell", + "command": "psql -h localhost -U postgres -d analytics -f db/sql/100_create_global_objects.sql", + "problemMatcher": [] + }, + { + "label": "Trigger HTTP Function (httpPreprocess)", + "type": "shell", + "command": "curl -X POST http://localhost:7071/httpPreprocess", + "problemMatcher": [] + } + ] +} From 137998fe9062450abf865411db667935ed0398f7 Mon Sep 17 00:00:00 2001 From: Damian Wysokinski Date: Thu, 4 Dec 2025 10:58:03 +0200 Subject: [PATCH 4/8] added debugger for importer --- .devcontainer/importer/launch.json | 10 ++++++++++ .devcontainer/importer/tasks.json | 14 ++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/.devcontainer/importer/launch.json b/.devcontainer/importer/launch.json index 4eafcd7..4c4b3a7 100644 --- a/.devcontainer/importer/launch.json +++ b/.devcontainer/importer/launch.json @@ -94,6 +94,16 @@ "env": { "PYTHONPATH": "${workspaceFolder}/python" } + }, + { + "name": "Attach to Python Functions dwys", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "localhost", + "port": 9091 + }, + "preLaunchTask": "func: host start" } ] } diff --git a/.devcontainer/importer/tasks.json b/.devcontainer/importer/tasks.json index ea5ea3c..af7248a 100644 --- a/.devcontainer/importer/tasks.json +++ b/.devcontainer/importer/tasks.json @@ -81,6 +81,20 @@ "type": "shell", "command": "curl -X POST http://localhost:7071/httpPreprocess", "problemMatcher": [] + }, + { + "type": "func", + "label": "func: host start", + "command": "start", + "problemMatcher": "$func-python-watch", + "isBackground": true, + // "dependsOn": "pip install (functions)", + "options": { + "cwd": "${workspaceFolder}/python", + "env": { + "languageWorkers__python__arguments": "-m debugpy --listen 127.0.0.1:9091" + } + } } ] } From cf5dd9e00e9dd1f3f5ce33067531105c52fc77bb Mon Sep 17 00:00:00 2001 From: Damian Wysokinski Date: Thu, 4 Dec 2025 11:40:40 +0200 Subject: [PATCH 5/8] added shared launches --- .devcontainer/shared/launch.json | 129 +++++++++++++++++++++++++++++++ .devcontainer/shared/tasks.json | 115 +++++++++++++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 .devcontainer/shared/launch.json create mode 100644 .devcontainer/shared/tasks.json diff --git a/.devcontainer/shared/launch.json b/.devcontainer/shared/launch.json new file mode 100644 index 0000000..310be36 --- /dev/null +++ b/.devcontainer/shared/launch.json @@ -0,0 +1,129 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Python: FastAPI", + "type": "debugpy", + "request": "launch", + "module": "uvicorn", + "args": [ + "api.main:app", + "--host", + "0.0.0.0", + "--port", + "8000", + "--reload" + ], + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python" + }, + "jinja": true, + "justMyCode": false + }, + { + "name": "Azure Functions: All Functions (Start All)", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "localhost", + "port": 9091 + }, + "preLaunchTask": "Start Azure Functions with Debugger", + "pathMappings": [ + { + "localRoot": "${workspaceFolder}/python", + "remoteRoot": "/home/site/wwwroot" + } + ], + "justMyCode": false + }, + { + "name": "Azure Functions: Debug", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "localhost", + "port": 9091 + }, + "preLaunchTask": "Start Azure Functions with Debugger", + "pathMappings": [ + { + "localRoot": "${workspaceFolder}/python", + "remoteRoot": "/home/site/wwwroot" + } + ] + }, + { + "name": "Attach to Python Functions", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "localhost", + "port": 9091 + }, + "pathMappings": [ + { + "localRoot": "${workspaceFolder}/python", + "remoteRoot": "/home/site/wwwroot" + } + ] + }, + { + "name": "Azure Functions: Python (Local)", + "type": "debugpy", + "request": "launch", + "module": "azure_functions_worker", + "args": [ + "--host", + "127.0.0.1", + "--port", + "7071", + "--worker-id", + "test-worker" + ], + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python", + "AzureWebJobsScriptRoot": "${workspaceFolder}/python" + }, + "console": "integratedTerminal" + }, + { + "name": "Python: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python" + } + }, + { + "name": "Python: Pytest", + "type": "debugpy", + "request": "launch", + "module": "pytest", + "args": [ + "-v", + "${file}" + ], + "console": "integratedTerminal", + "cwd": "${workspaceFolder}/python", + "env": { + "PYTHONPATH": "${workspaceFolder}/python" + } + }, + { + "name": "Attach to Python Functions dwys", + "type": "debugpy", + "request": "attach", + "connect": { + "host": "localhost", + "port": 9091 + }, + "preLaunchTask": "func: host start" + } + ] +} diff --git a/.devcontainer/shared/tasks.json b/.devcontainer/shared/tasks.json new file mode 100644 index 0000000..2044278 --- /dev/null +++ b/.devcontainer/shared/tasks.json @@ -0,0 +1,115 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "Run FastAPI Development Server", + "type": "shell", + "command": "cd python && uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload", + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "new" + }, + "isBackground": true, + "group": { + "kind": "build", + "isDefault": true + } + }, + { + "label": "Start Azure Functions", + "type": "shell", + "command": "cd python && func start --port 7071", + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "new" + }, + "isBackground": true, + "group": "none" + }, + { + "label": "Start Azure Functions (Debug Mode)", + "type": "shell", + "command": "cd python && func start --port 7071 --verbose", + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "new" + }, + "isBackground": true + }, + { + "label": "Start Azure Functions with Debugger", + "type": "shell", + "command": "cd python && func start --port 7071 --python-debug-port 9091", + "problemMatcher": [ + { + "pattern": [ + { + "regexp": ".", + "file": 1, + "location": 2, + "message": 3 + } + ], + "background": { + "activeOnStart": true, + "beginsPattern": ".*Azure Functions Core Tools.*", + "endsPattern": ".*(Worker process started and initialized|Host started|Job host started).*" + } + } + ], + "presentation": { + "reveal": "always", + "panel": "dedicated" + }, + "isBackground": true, + "runOptions": { + "instanceLimit": 1 + } + }, + { + "label": "Install Python Dependencies", + "type": "shell", + "command": "pip install -r python/requirements.txt", + "problemMatcher": [] + }, + { + "label": "Run Tests", + "type": "shell", + "command": "cd python && pytest", + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "shared" + } + }, + { + "label": "Database: Run Migrations", + "type": "shell", + "command": "psql -h localhost -U postgres -d analytics -f db/sql/100_create_global_objects.sql", + "problemMatcher": [] + }, + { + "label": "Trigger HTTP Function (httpPreprocess)", + "type": "shell", + "command": "curl -X POST http://localhost:7071/httpPreprocess", + "problemMatcher": [] + }, + { + "type": "func", + "label": "func: host start", + "command": "start", + "problemMatcher": "$func-python-watch", + "isBackground": true, + // "dependsOn": "pip install (functions)", + "options": { + "cwd": "${workspaceFolder}/python", + "env": { + "languageWorkers__python__arguments": "-m debugpy --listen 127.0.0.1:9091" + } + } + } + ] +} From 4dbe9bd4b872e6441f4297d4326152ef1b96f0a3 Mon Sep 17 00:00:00 2001 From: Damian Wysokinski Date: Thu, 29 Jan 2026 13:20:40 +0200 Subject: [PATCH 6/8] added recluster tst_median change. And devcontainer for imported --- .devcontainer/importer/devcontainer.json | 5 ++- .devcontainer/importer/launch.json | 51 ------------------------ .devcontainer/shared/launch.json | 51 ------------------------ python/common/recluster.py | 12 +++--- 4 files changed, 10 insertions(+), 109 deletions(-) diff --git a/.devcontainer/importer/devcontainer.json b/.devcontainer/importer/devcontainer.json index 8985bdb..85b9c4f 100644 --- a/.devcontainer/importer/devcontainer.json +++ b/.devcontainer/importer/devcontainer.json @@ -44,7 +44,7 @@ "ms-python.black-formatter", "ms-python.isort", "ms-python.debugpy", - "ms-azuretools.vscode-azurefunctions", + "ms-azuretools.vscode-azurefunctions@1.19.0", "ms-azuretools.vscode-docker", "mtxr.sqltools", "mtxr.sqltools-driver-pg", @@ -90,7 +90,8 @@ "features": { "ghcr.io/devcontainers/features/git:1": {}, "ghcr.io/devcontainers/features/github-cli:1": {}, - "ghcr.io/devcontainers/features/azure-cli:1": {} + "ghcr.io/devcontainers/features/azure-cli:1": {}, + //vscode azure function extentions must be 1.19.0 }, // Set environment variables diff --git a/.devcontainer/importer/launch.json b/.devcontainer/importer/launch.json index 4c4b3a7..00d9aac 100644 --- a/.devcontainer/importer/launch.json +++ b/.devcontainer/importer/launch.json @@ -18,57 +18,6 @@ ], "justMyCode": false }, - { - "name": "Azure Functions: Debug", - "type": "debugpy", - "request": "attach", - "connect": { - "host": "localhost", - "port": 9091 - }, - "preLaunchTask": "Start Azure Functions with Debugger", - "pathMappings": [ - { - "localRoot": "${workspaceFolder}/python", - "remoteRoot": "/home/site/wwwroot" - } - ] - }, - { - "name": "Attach to Python Functions", - "type": "debugpy", - "request": "attach", - "connect": { - "host": "localhost", - "port": 9091 - }, - "pathMappings": [ - { - "localRoot": "${workspaceFolder}/python", - "remoteRoot": "/home/site/wwwroot" - } - ] - }, - { - "name": "Azure Functions: Python (Local)", - "type": "debugpy", - "request": "launch", - "module": "azure_functions_worker", - "args": [ - "--host", - "127.0.0.1", - "--port", - "7071", - "--worker-id", - "test-worker" - ], - "cwd": "${workspaceFolder}/python", - "env": { - "PYTHONPATH": "${workspaceFolder}/python", - "AzureWebJobsScriptRoot": "${workspaceFolder}/python" - }, - "console": "integratedTerminal" - }, { "name": "Python: Current File", "type": "debugpy", diff --git a/.devcontainer/shared/launch.json b/.devcontainer/shared/launch.json index 310be36..6573a63 100644 --- a/.devcontainer/shared/launch.json +++ b/.devcontainer/shared/launch.json @@ -38,57 +38,6 @@ ], "justMyCode": false }, - { - "name": "Azure Functions: Debug", - "type": "debugpy", - "request": "attach", - "connect": { - "host": "localhost", - "port": 9091 - }, - "preLaunchTask": "Start Azure Functions with Debugger", - "pathMappings": [ - { - "localRoot": "${workspaceFolder}/python", - "remoteRoot": "/home/site/wwwroot" - } - ] - }, - { - "name": "Attach to Python Functions", - "type": "debugpy", - "request": "attach", - "connect": { - "host": "localhost", - "port": 9091 - }, - "pathMappings": [ - { - "localRoot": "${workspaceFolder}/python", - "remoteRoot": "/home/site/wwwroot" - } - ] - }, - { - "name": "Azure Functions: Python (Local)", - "type": "debugpy", - "request": "launch", - "module": "azure_functions_worker", - "args": [ - "--host", - "127.0.0.1", - "--port", - "7071", - "--worker-id", - "test-worker" - ], - "cwd": "${workspaceFolder}/python", - "env": { - "PYTHONPATH": "${workspaceFolder}/python", - "AzureWebJobsScriptRoot": "${workspaceFolder}/python" - }, - "console": "integratedTerminal" - }, { "name": "Python: Current File", "type": "debugpy", diff --git a/python/common/recluster.py b/python/common/recluster.py index ae997ea..26f7708 100644 --- a/python/common/recluster.py +++ b/python/common/recluster.py @@ -12,13 +12,12 @@ import numpy as np import pandas as pd import zstandard as zstd -from sklearn.cluster import DBSCAN - from common.container_client import FlowAnalyticsContainerClient from common.database import pool from common.enums import ReclusterStatus from common.logger_util import CustomDbLogHandler from common.utils import get_season +from sklearn.cluster import DBSCAN logger = logging.getLogger("api") @@ -109,13 +108,16 @@ async def load_preprocess_files( dfs = [] decompressor = zstd.ZstdDecompressor() + + for r in results: compressed_data = r[0] decompressed_csv = decompressor.decompress(compressed_data) df = pd.read_csv(io.BytesIO(decompressed_csv), sep=";") - df["tst_median"] = pd.to_datetime(df["tst_median"], format="ISO8601").dt.tz_convert( - "UTC" - ) + if "tst_median" in df.columns: + df["tst_median"] = pd.to_datetime(df["tst_median"], format="ISO8601").dt.tz_convert( + "UTC" + ) dfs.append(df) if not dfs: From cc6b99b292bef4ac7a251f15abec0ef871d667c6 Mon Sep 17 00:00:00 2001 From: Hanna Kitti Date: Wed, 8 Apr 2026 09:33:53 +0300 Subject: [PATCH 7/8] edited endpoint documentation --- python/api/routers/hfp.py | 107 +++++++++++++++++++++++++------------- 1 file changed, 70 insertions(+), 37 deletions(-) diff --git a/python/api/routers/hfp.py b/python/api/routers/hfp.py index d4a76ab..f80a418 100644 --- a/python/api/routers/hfp.py +++ b/python/api/routers/hfp.py @@ -40,6 +40,7 @@ route_id_pattern = re.compile(r"^[A-Za-z0-9]+$") CHUNK_SIZE = 10000 # Adjust to optimize if needed + class GzippedFileResponse(Response): media_type = "application/gzip" @@ -359,19 +360,26 @@ async def get_tlp_raw_data( ) return response + @router.get( "/delay_analytics", summary="Get delay analytics data.", - description="Returns delay analytics as packaged zip file. Initial request will start the analysis. Following requests will return the status of the analysis or the data.", + description=( + "Returns delay analytics as packaged zip file. Initial request will start the analysis. " + "Following requests will return the status of the analysis or the data. " + "Re-execute the analysis to refresh the status of the analysis if needed." + ), responses={ 200: { "description": "The data is returned as an attachment in the response.", - "content": {"application/gzip": {"schema": None, "example": None}} + "content": {"application/gzip": {"schema": None, "example": None}}, + }, + 202: { + "description": "Status message returned. Analysis queued, running or created, check again later." }, - 202: {"description": "Status message returned. Analysis queued, running or created, check again later."}, 204: {"description": "Query returned no data with the given parameters."}, - 422: {"description": "Query had invalid parameters."} - } + 422: {"description": "Query had invalid parameters."}, + }, ) async def get_delay_analytics_data_durable( route_id: Optional[str] = Query( @@ -384,30 +392,35 @@ async def get_delay_analytics_data_durable( default=None, title="From oday (YYYY-MM-DD)", description=( - "The oday from which the preprocessed clusters and departures will be used.", - "If same oday is used for from_oday and to_oday the analysis for that day will be returned.", - "If no date given the default value will be used (five days prior)." + "The oday from which the preprocessed clusters and departures will be used. ", + "If same oday is used for from_oday and to_oday the analysis for that day will be returned. ", + "If no date given the default value will be used (five days prior). ", + "The database contains data from first of May 2025. ", + "Format `yyyy-MM-dd`. ", ), - example="2025-02-10" + example="2025-04-01", ), to_oday: Optional[date] = Query( default=None, title="To oday (YYYY-MM-DD)", description=( - "The oday to which the preprocessed clusters and departures will be used.", - "If same oday is used for from_oday and to_oday the analysis for that day will be returned.", - "If no date given the default value will be used (yesterday)." + "The oday to which the preprocessed clusters and departures will be used. ", + "If same oday is used for from_oday and to_oday the analysis for that day will be returned. ", + "If no date given the default value will be used (yesterday). ", + "The database contains data from first of May 2025. ", + "Format `yyyy-MM-dd`. ", ), - example="2025-02-10" + example="2025-04-07", ), exclude_dates: Optional[str] = Query( default=None, title="Days to exclude (YYYY-MM-DD)", description=( - "The days to be excluded from the analysis." - "Provide valid date or dates separated with a comma." + "The days to be excluded from the analysis. " + "Provide valid date or dates separated with a comma. " + "Format `yyyy-MM-dd`. " ), - example="2025-02-10,2025-02-11" + example="2025-04-02,2025-04-03", ), ) -> Response: """ @@ -418,7 +431,7 @@ async def get_delay_analytics_data_durable( """ default_from_oday = get_target_oday(15) - default_to_oday = get_target_oday() + default_to_oday = get_target_oday() if not from_oday: from_oday = default_from_oday if not to_oday: @@ -428,8 +441,10 @@ async def get_delay_analytics_data_durable( from_oday=from_oday, to_oday=to_oday ) if not is_date_range_valid_: - raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=date_range_validity_message) - + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=date_range_validity_message, + ) if route_id is None or not route_id.strip(): route_ids = [] @@ -447,10 +462,9 @@ async def get_delay_analytics_data_durable( "msg": f"Invalid route ID: {rid}. Only letters and digits allowed.", "input": rid, } - ] + ], ) - if exclude_dates is not None: raw_dates = [r.strip() for r in exclude_dates.split(",") if r.strip()] valid_dates = [] @@ -467,7 +481,7 @@ async def get_delay_analytics_data_durable( "msg": f"Invalid date: {d}. Expected format is YYYY-MM-DD.", "input": d, } - ] + ], ) valid_dates.sort() exclude_dates = valid_dates @@ -478,7 +492,7 @@ async def get_delay_analytics_data_durable( "route_ids": route_ids, "from_oday": str(from_oday), "to_oday": str(to_oday), - "days_excluded": exclude_dates + "days_excluded": exclude_dates, } try: @@ -489,7 +503,7 @@ async def get_delay_analytics_data_durable( return Response( status_code=status.HTTP_202_ACCEPTED, content=resp.content, - media_type=resp.headers.get("Content-Type", "application/json") + media_type=resp.headers.get("Content-Type", "application/json"), ) resp.raise_for_status() try: @@ -501,10 +515,13 @@ async def get_delay_analytics_data_durable( media_type=resp.headers.get("Content-Type", "application/zip"), headers={ "Content-Disposition": 'attachment; filename="clusters.zip"' - } + }, ) except Exception as e: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Could not start Durable function: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Could not start Durable function: {e}", + ) @router.post( @@ -522,21 +539,37 @@ async def add_preprocess_data_from_blob_to_db( preprocess_type: Literal["clusters", "departures"], response: Response ) -> dict[str, List[str]]: with CustomDbLogHandler("api"): - client = FlowAnalyticsContainerClient() - - blob_data = await client.get_existing_blob_data_from_previous_2_months(preprocess_type=preprocess_type) - logger.debug(f'Found {len(blob_data)} blobs in blob storage') + client = FlowAnalyticsContainerClient() + + blob_data = await client.get_existing_blob_data_from_previous_2_months( + preprocess_type=preprocess_type + ) + logger.debug(f"Found {len(blob_data)} blobs in blob storage") - db_data = await get_existing_date_and_route_id_from_preprocess_table(preprocess_type=preprocess_type) + db_data = await get_existing_date_and_route_id_from_preprocess_table( + preprocess_type=preprocess_type + ) logger.debug(f"Found {len(db_data)} rows in database") - - missing_data = await find_missing_preprocess_data_in_db_compared_to_blob_storage(db_data=db_data, blobs_data=blob_data) - logger.debug(f"Found { len(missing_data)} blobs which are not in the database yet.") + + missing_data = ( + await find_missing_preprocess_data_in_db_compared_to_blob_storage( + db_data=db_data, blobs_data=blob_data + ) + ) + logger.debug( + f"Found {len(missing_data)} blobs which are not in the database yet." + ) if len(missing_data) > 0: - await upload_missing_preprocess_data_to_db(client=client, missing_blobs=missing_data, preprocess_type=preprocess_type) - logger.debug(f"Successfully imported {len(missing_data)} blobs from blob storage to database") + await upload_missing_preprocess_data_to_db( + client=client, + missing_blobs=missing_data, + preprocess_type=preprocess_type, + ) + logger.debug( + f"Successfully imported {len(missing_data)} blobs from blob storage to database" + ) else: logger.debug("There are 0 blobs to be added from blob storage to database") response.status_code = status.HTTP_201_CREATED - return {'imported data': [blob.blob_path for blob in missing_data]} + return {"imported data": [blob.blob_path for blob in missing_data]} From fb9d1370e47ef34021c0c64b19e2b078318837af Mon Sep 17 00:00:00 2001 From: Hanna Kitti Date: Wed, 8 Apr 2026 13:09:41 +0300 Subject: [PATCH 8/8] fixed col tst_median --- python/common/recluster.py | 445 +++++++++++++++++++++++++------------ 1 file changed, 307 insertions(+), 138 deletions(-) diff --git a/python/common/recluster.py b/python/common/recluster.py index a9ecd85..7907bbc 100644 --- a/python/common/recluster.py +++ b/python/common/recluster.py @@ -1,4 +1,3 @@ - # TODO: clean up imports import asyncio import functools @@ -31,7 +30,7 @@ HDG_DIFF_UPPER_LIMIT = 190 MIN_DELAY_EVENTS = 5 MIN_WEIGHTED_SAMPLES = 60 -SPEEDS_IN_DELAY = ['DELAY', 'SLOW'] +SPEEDS_IN_DELAY = ["DELAY", "SLOW"] SPEED_CLASSES = { "DELAY": { "MIN": 0.27, @@ -50,16 +49,17 @@ "pass": "Ohitus", "arr": "Saapuminen", "dep": "Poistuminen", - "stop": "Pysakki" + "stop": "Pysakki", } SEASON_MONTHS = { - "WINTER": [12, 1, 2], - "SPRING": [3, 4, 5], - "SUMMER": [6, 7, 8], - "AUTUMN": [9, 10, 11] + "WINTER": [12, 1, 2], + "SPRING": [3, 4, 5], + "SUMMER": [6, 7, 8], + "AUTUMN": [9, 10, 11], } + def get_routes_condition(column: str, values: list[str]) -> tuple[str, dict]: placeholders = [] params = {} @@ -70,13 +70,14 @@ def get_routes_condition(column: str, values: list[str]) -> tuple[str, dict]: condition = f"{column} IN ({', '.join(placeholders)})" return condition, params + # Refactor with load_compressed_departures_csv async def load_preprocess_files( route_ids: Optional[List[str]], from_oday: date, to_oday: date, exclude_dates: Optional[List[date]], - table: str + table: str, ) -> bytes: base_query = f"SELECT zst FROM delay.{table}" conditions = [] @@ -104,24 +105,23 @@ async def load_preprocess_files( results = await cur.fetchall() if not results: - return None + return None dfs = [] decompressor = zstd.ZstdDecompressor() - - + for r in results: - compressed_data = r[0] + compressed_data = r[0] decompressed_csv = decompressor.decompress(compressed_data) df = pd.read_csv(io.BytesIO(decompressed_csv), sep=";") if "tst_median" in df.columns: - df["tst_median"] = pd.to_datetime(df["tst_median"], format="ISO8601").dt.tz_convert( - "UTC" - ) + df["tst_median"] = pd.to_datetime( + df["tst_median"], format="ISO8601" + ).dt.tz_convert("UTC") dfs.append(df) if not dfs: - return None + return None combined_df = pd.concat(dfs, ignore_index=True) @@ -134,7 +134,13 @@ async def load_preprocess_files( return buffer.getvalue() -async def get_recluster_status(table: str, from_oday: date, to_oday: date, route_id: list = [], exclude_dates: list[date] = []) -> Dict[str, Optional[Any]]: +async def get_recluster_status( + table: str, + from_oday: date, + to_oday: date, + route_id: list = [], + exclude_dates: list[date] = [], +) -> Dict[str, Optional[Any]]: table_name = f"delay.{table}" query = f""" SELECT status, createdAt, progress @@ -145,19 +151,24 @@ async def get_recluster_status(table: str, from_oday: date, to_oday: date, route cur = await conn.execute( query, { - "route_id": route_id, - "from_oday": from_oday, - "to_oday": to_oday, - "exclude_dates": exclude_dates - } + "route_id": route_id, + "from_oday": from_oday, + "to_oday": to_oday, + "exclude_dates": exclude_dates, + }, ) row = await cur.fetchone() if row is None: return {"status": None, "createdAt": None, "progress": None} - status, created_at, progress = row - return {"status": ReclusterStatus[status], "createdAt": created_at, "progress": progress} + status, created_at, progress = row + return { + "status": ReclusterStatus[status], + "createdAt": created_at, + "progress": progress, + } + async def set_recluster_status( table: str, @@ -165,7 +176,7 @@ async def set_recluster_status( to_oday: date, route_id: list, days_excluded: list[date], - status: ReclusterStatus = ReclusterStatus.RUNNING + status: ReclusterStatus = ReclusterStatus.RUNNING, ) -> None: table_name = f"delay.{table}" query = f""" @@ -185,15 +196,12 @@ async def set_recluster_status( "to_oday": to_oday, "days_excluded": days_excluded, "status": status.value, - } + }, ) + async def update_recluster_progress( - route_id, - from_oday: date, - to_oday: date, - days_excluded: list[date], - progress: str + route_id, from_oday: date, to_oday: date, days_excluded: list[date], progress: str ) -> None: query = """ INSERT INTO delay.recluster_routes (route_id, from_oday, to_oday, days_excluded, progress) @@ -206,15 +214,22 @@ async def update_recluster_progress( await conn.execute( query, { - "progress": progress, + "progress": progress, "route_id": route_id, "from_oday": from_oday, "to_oday": to_oday, "days_excluded": days_excluded, - } + }, ) -async def load_recluster_geojson(table: str, from_oday: date, to_oday: date, days_excluded: list[date], route_id: list = [],) -> bytes: + +async def load_recluster_geojson( + table: str, + from_oday: date, + to_oday: date, + days_excluded: list[date], + route_id: list = [], +) -> bytes: table_name = f"delay.{table}" query = f""" SELECT zst @@ -229,11 +244,11 @@ async def load_recluster_geojson(table: str, from_oday: date, to_oday: date, day "from_oday": from_oday, "to_oday": to_oday, "days_excluded": days_excluded, - } + }, ) result = await row.fetchone() if not result or not result[0]: - return None + return None compressed_data = result[0] @@ -241,7 +256,14 @@ async def load_recluster_geojson(table: str, from_oday: date, to_oday: date, day decompressed_geojson = dctx.decompress(compressed_data) return decompressed_geojson -async def load_recluster_csv(table: str, from_oday: date, to_oday: date, days_excluded, route_id: list = [],) -> bytes: + +async def load_recluster_csv( + table: str, + from_oday: date, + to_oday: date, + days_excluded, + route_id: list = [], +) -> bytes: table_name = f"delay.{table}" query = f""" SELECT csv_zst @@ -256,11 +278,11 @@ async def load_recluster_csv(table: str, from_oday: date, to_oday: date, days_ex "from_oday": from_oday, "to_oday": to_oday, "days_excluded": days_excluded, - } + }, ) result = await row.fetchone() if not result or not result[0]: - return None + return None compressed_data = result[0] @@ -324,9 +346,9 @@ async def store_compressed_geojson( "days_excluded": days_excluded, "zst": compressed_data, "csv_zst": compressed_csv_data, - } + }, ) - + recluster_type = table.split("_")[1] await flow_analytics_container_client.save_cluster_data( @@ -334,13 +356,16 @@ async def store_compressed_geojson( compressed_data=compressed_data, from_oday=from_oday.strftime("%Y-%m-%d"), to_oday=to_oday.strftime("%Y-%m-%d"), - route_id=','.join(route_id) if isinstance(route_id, list) else route_id, - ) + route_id=",".join(route_id) if isinstance(route_id, list) else route_id, + ) del compressed_data, compressed_csv_data gc.collect() -def make_geo_df_WGS84(df: pd.DataFrame, lat_col: str, lon_col: str, crs: str = "EPSG:4326") -> gpd.GeoDataFrame: + +def make_geo_df_WGS84( + df: pd.DataFrame, lat_col: str, lon_col: str, crs: str = "EPSG:4326" +) -> gpd.GeoDataFrame: """Make a geodf from df. Note thet the function does not convert CRS but your input df needs to be WGS84, ie EPSG:4326. @@ -353,17 +378,31 @@ def make_geo_df_WGS84(df: pd.DataFrame, lat_col: str, lon_col: str, crs: str = " Returns: gpd.GeoDataFrame: _description_ """ - gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df[lon_col], df[lat_col]), crs=crs) + gdf = gpd.GeoDataFrame( + df, geometry=gpd.points_from_xy(df[lon_col], df[lat_col]), crs=crs + ) return gdf + # Currently not used since only doing reclustering for routes def recluster( clusters: pd.DataFrame, distance: int, radius: int, min_weighted_samples: int, - vars_to_group_level_one_clusters_by=['route_id', 'direction_cluster_id', 'time_group', 'dclass'], - cluster_id_vars_on_2nd_level=['route_id', 'direction_id', 'time_group', 'dclass', 'cluster_on_reclustered_level'] + vars_to_group_level_one_clusters_by=[ + "route_id", + "direction_cluster_id", + "time_group", + "dclass", + ], + cluster_id_vars_on_2nd_level=[ + "route_id", + "direction_id", + "time_group", + "dclass", + "cluster_on_reclustered_level", + ], ) -> pd.DataFrame: g = clusters.groupby(vars_to_group_level_one_clusters_by) @@ -371,7 +410,9 @@ def recluster( departure_clusters = [] reclustered_clusters = [] EPSILON = distance / radius - logger.debug(f"Data to be processed with DBSCAN. Rows: {clusters.shape[0]}, groups: {g.ngroups}") + logger.debug( + f"Data to be processed with DBSCAN. Rows: {clusters.shape[0]}, groups: {g.ngroups}" + ) for i, (group_key, sub) in enumerate(g, start=1): sub = sub.rename(columns={"cluster": "cluster_on_departure_level"}) X = np.radians(sub[["lat_median", "long_median"]]) @@ -382,7 +423,9 @@ def recluster( metric="haversine", ) - sub["cluster_on_reclustered_level"] = clusterer.fit_predict(X, sample_weight=sub["weight"]) + sub["cluster_on_reclustered_level"] = clusterer.fit_predict( + X, sample_weight=sub["weight"] + ) sub = sub[sub["cluster_on_reclustered_level"] != -1] if sub.empty: continue @@ -403,7 +446,9 @@ def recluster( return reclustered_clusters, departure_clusters -def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: list) -> pd.DataFrame: +def calculate_cluster_features( + df: pd.DataFrame, cluster_id_vars_on_2nd_level: list +) -> pd.DataFrame: """Calculate additional features for the identified clusters: medians for location and time and descriptive values for the deviation of the delay. Note: @@ -411,8 +456,8 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l - weight is the weighted value of delay seconds Args: - df pd.DataFrame: _description_ - cluster_id_vars_on_2nd_level (list, optional): _description_. Defaults to ['route_id','direction_id','time_group','dclass','cluster_on_reclustered_level']. + df (pd.DataFrame): Input dataframe containing cluster data. + cluster_id_vars_on_2nd_level (list, optional): List of columns to group by for the second level of clustering. Defaults to ['route_id','direction_id','time_group','dclass','cluster_on_reclustered_level']. Returns: pd.DataFrame: clusters with descriptive variables @@ -425,12 +470,11 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l df[col] = pd.to_numeric(df[col], errors="coerce") if "tst_median" in df.columns: - df["tst_median"] = pd.to_datetime(df["tst_median"], errors="coerce", utc=True) - df["tst_median_ns"] = df["tst_median"].astype("int64") + df["tst_median"] = pd.to_datetime(df["oday"], errors="coerce", utc=True) + df["tst_median_ns"] = df["tst_median"].astype("int64") else: df["tst_median_ns"] = pd.Series(index=df.index, dtype="float64") - clust_counts = df.drop_duplicates( subset=[ "route_id", @@ -440,20 +484,36 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l "cluster_on_reclustered_level", ] ) - clust_counts = clust_counts.groupby(cluster_id_vars_on_2nd_level, observed=False).size().reset_index(name="n_departures") + clust_counts = ( + clust_counts.groupby(cluster_id_vars_on_2nd_level, observed=False) + .size() + .reset_index(name="n_departures") + ) - clust_delay_feats = df.groupby(cluster_id_vars_on_2nd_level, observed=False)["weight"].quantile([0.10, 0.25, 0.5, 0.75, 0.90]).unstack() + clust_delay_feats = ( + df.groupby(cluster_id_vars_on_2nd_level, observed=False)["weight"] + .quantile([0.10, 0.25, 0.5, 0.75, 0.90]) + .unstack() + ) clust_delay_feats.columns = [(int(x * 100)) for x in clust_delay_feats.columns] clust_delay_feats = clust_delay_feats.add_prefix("q_").reset_index() median_cols = ["lat_median", "long_median", "hdg_median", "tst_median_ns"] existing_median_cols = [c for c in median_cols if c in df.columns] - median_vars = (df.groupby(cluster_id_vars_on_2nd_level, observed=False)[existing_median_cols].median().reset_index()) + median_vars = ( + df.groupby(cluster_id_vars_on_2nd_level, observed=False)[existing_median_cols] + .median() + .reset_index() + ) if "tst_median_ns" in median_vars.columns: - median_vars["tst_median"] = pd.to_datetime(median_vars["tst_median_ns"], utc=True) - median_vars["tst_median"] = median_vars["tst_median"].dt.tz_convert("Europe/Helsinki") + median_vars["tst_median"] = pd.to_datetime( + median_vars["tst_median_ns"], utc=True + ) + median_vars["tst_median"] = median_vars["tst_median"].dt.tz_convert( + "Europe/Helsinki" + ) median_vars = median_vars.drop(columns=["tst_median_ns"]) res = median_vars.merge(clust_counts, on=cluster_id_vars_on_2nd_level, how="outer") @@ -465,7 +525,9 @@ def calculate_cluster_features(df: pd.DataFrame, cluster_id_vars_on_2nd_level: l return res -def ui_related_var_modifications(df: pd.DataFrame, seasons_and_months: dict, DEPARTURE_THRESHOLD: int) -> pd.DataFrame: +def ui_related_var_modifications( + df: pd.DataFrame, seasons_and_months: dict, DEPARTURE_THRESHOLD: int +) -> pd.DataFrame: """All UI specific stuff here. Args: df: output data to modify @@ -475,85 +537,106 @@ def ui_related_var_modifications(df: pd.DataFrame, seasons_and_months: dict, DEP Returns: pd.DataFrame: clusters with ui related variables """ - df["tst_median"] = pd.to_datetime(df["tst_median"], format="%Y-%m-%d %H:%M:%S", errors="coerce") + df["tst_median"] = pd.to_datetime( + df["tst_median"], format="%Y-%m-%d %H:%M:%S", errors="coerce" + ) df["year"] = df["tst_median"].dt.year - df["season"] = df["tst_median"].dt.month.map(lambda x: get_season(x, seasons_and_months)) + df["season"] = df["tst_median"].dt.month.map( + lambda x: get_season(x, seasons_and_months) + ) for k, v in DCLASS_NAMES.items(): df["dclass"] = df["dclass"].replace(k, v) # mediaanin luokat avoimella ylärajalla # TODO: testaa yhdellä np.wherellä: lista ehtoja df["q_50_category"] = np.where(df["q_50"] <= 30, "0_15_30", ">75") - df["q_50_category"] = np.where((df["q_50"] > 30) & (df["q_50"] <= 45), "1_30_45", df["q_50_category"]) - df["q_50_category"] = np.where((df["q_50"] > 45) & (df["q_50"] <= 60), "2_45_60", df["q_50_category"]) - df["q_50_category"] = np.where((df["q_50"] > 60) & (df["q_50"] <= 75), "3_60_74", df["q_50_category"]) + df["q_50_category"] = np.where( + (df["q_50"] > 30) & (df["q_50"] <= 45), "1_30_45", df["q_50_category"] + ) + df["q_50_category"] = np.where( + (df["q_50"] > 45) & (df["q_50"] <= 60), "2_45_60", df["q_50_category"] + ) + df["q_50_category"] = np.where( + (df["q_50"] > 60) & (df["q_50"] <= 75), "3_60_74", df["q_50_category"] + ) # lähtömäärien luokat avoimella ylärajalla - df["n_departures_category"] = np.where(df["n_departures"] <= DEPARTURE_THRESHOLD, "<=" + str(DEPARTURE_THRESHOLD), ">" + str(DEPARTURE_THRESHOLD)) + df["n_departures_category"] = np.where( + df["n_departures"] <= DEPARTURE_THRESHOLD, + "<=" + str(DEPARTURE_THRESHOLD), + ">" + str(DEPARTURE_THRESHOLD), + ) df = df.rename(columns={"lat_median": "latitude", "long_median": "longitude"}) return df -async def get_preprocessed_departures(route_ids: [str], from_oday: date, to_oday: date, days_to_exclude: list[date]): - departures_data = await load_preprocess_files(route_ids, from_oday, to_oday, days_to_exclude, "preprocess_departures") +async def get_preprocessed_departures( + route_ids: [str], from_oday: date, to_oday: date, days_to_exclude: list[date] +): + departures_data = await load_preprocess_files( + route_ids, from_oday, to_oday, days_to_exclude, "preprocess_departures" + ) if not departures_data: logger.debug(f"No preprocessed departures ZST found for route_id={route_ids}") return None departure_types = { - "event_type": "category", - "route_id": "object", - "direction_id": "int8", - "operator_id": "int16", - "oper": "int8", - "vehicle_number": "int16", - "transport_mode": "object", - "time_group": "object", - "oday": "object", - "start": "object", - "tst": "object" + "event_type": "category", + "route_id": "object", + "direction_id": "int8", + "operator_id": "int16", + "oper": "int8", + "vehicle_number": "int16", + "transport_mode": "object", + "time_group": "object", + "oday": "object", + "start": "object", + "tst": "object", } preprocessed_departures = pd.read_csv( - io.BytesIO(departures_data), - sep=";", - dtype=departure_types + io.BytesIO(departures_data), sep=";", dtype=departure_types ) week_days_df = preprocessed_departures[ - ~preprocessed_departures["time_group"].str.contains("weekend", case=False, na=False) + ~preprocessed_departures["time_group"].str.contains( + "weekend", case=False, na=False + ) ].copy() week_days_df["time_group"] = "0_weekday_all" - preprocessed_departures = pd.concat([preprocessed_departures, week_days_df], axis=0).reset_index(drop=True) + preprocessed_departures = pd.concat( + [preprocessed_departures, week_days_df], axis=0 + ).reset_index(drop=True) return preprocessed_departures -async def get_preprocessed_clusters(route_ids: [str], from_oday: date, to_oday: date, days_to_exclude: list[date]): - cluster_data = await load_preprocess_files(route_ids, from_oday, to_oday, days_to_exclude, "preprocess_clusters") + +async def get_preprocessed_clusters( + route_ids: [str], from_oday: date, to_oday: date, days_to_exclude: list[date] +): + cluster_data = await load_preprocess_files( + route_ids, from_oday, to_oday, days_to_exclude, "preprocess_clusters" + ) if not cluster_data: logger.debug(f"No preprocessed cluster ZST found for route_id={route_ids}") return None clusters_dtypes = { - "route_id": "object", + "route_id": "object", "direction_id": "int8", - "hdg_median": "float32", - "dclass": "object", - "weight": "int32", - "time_group": "object", - "lat_median": "float32", - "long_median": "float32", - "oday": "object", - "start": "object", - "tst_median": "object" + "hdg_median": "float32", + "dclass": "object", + "weight": "int32", + "time_group": "object", + "lat_median": "float32", + "long_median": "float32", + "oday": "object", + "start": "object", + "tst_median": "object", } - clusters = pd.read_csv( - io.BytesIO(cluster_data), - sep=";", - dtype=clusters_dtypes - ) + clusters = pd.read_csv(io.BytesIO(cluster_data), sep=";", dtype=clusters_dtypes) week_days_df = clusters[ ~clusters["time_group"].str.contains("weekend", case=False, na=False) @@ -565,52 +648,82 @@ async def get_preprocessed_clusters(route_ids: [str], from_oday: date, to_oday: def run_asyncio_task(coro_fn, *args, **kwargs): - return asyncio.run(coro_fn(*args, **kwargs)) + return asyncio.run(coro_fn(*args, **kwargs)) + async def run_analysis_and_set_status( table: str, route_ids: list[str], from_oday: date, to_oday: date, - days_excluded: list[date] + days_excluded: list[date], ): with CustomDbLogHandler("api"): try: - logger.debug("Start asyncio task to run recluster analysis") - await asyncio.to_thread(functools.partial(run_asyncio_task, recluster_analysis, route_ids, from_oday, to_oday, days_excluded)) + logger.debug("Start asyncio task to run recluster analysis") + await asyncio.to_thread( + functools.partial( + run_asyncio_task, + recluster_analysis, + route_ids, + from_oday, + to_oday, + days_excluded, + ) + ) except Exception: logger.debug("Something went wrong. Setting status as FAILED") await set_recluster_status( - table=table, - from_oday=from_oday, - to_oday=to_oday, - route_id=route_ids, - days_excluded=days_excluded, - status=ReclusterStatus.FAILED + table=table, + from_oday=from_oday, + to_oday=to_oday, + route_id=route_ids, + days_excluded=days_excluded, + status=ReclusterStatus.FAILED, ) raise finally: gc.collect() -async def recluster_analysis(route_ids: list[str], from_oday: date, to_oday: date, days_to_exclude: list[date]): +async def recluster_analysis( + route_ids: list[str], from_oday: date, to_oday: date, days_to_exclude: list[date] +): with CustomDbLogHandler("api"): - logger.debug("Fetch data for recluster") start_time = datetime.now() - clusters = await get_preprocessed_clusters(route_ids, from_oday, to_oday, days_to_exclude) - preprocessed_departures = await get_preprocessed_departures(route_ids, from_oday, to_oday, days_to_exclude) + clusters = await get_preprocessed_clusters( + route_ids, from_oday, to_oday, days_to_exclude + ) + preprocessed_departures = await get_preprocessed_departures( + route_ids, from_oday, to_oday, days_to_exclude + ) end_time = datetime.now() - logger.debug(f"Data fetched for recluster {route_ids}, {from_oday}, {to_oday} in {end_time - start_time}") + logger.debug( + f"Data fetched for recluster {route_ids}, {from_oday}, {to_oday} in {end_time - start_time}" + ) if clusters is None or preprocessed_departures is None: - raise RuntimeError("Missing clusters or departures zst for recluster_analysis") + raise RuntimeError( + "Missing clusters or departures zst for recluster_analysis" + ) start_time = datetime.now() logger.debug("Start recluster for routes") - vars_to_group_level_one_clusters_by=['route_id', 'direction_id', 'time_group', 'dclass'] - cluster_id_vars_on_2nd_level=['route_id', 'direction_id', 'time_group', 'dclass', 'cluster_on_reclustered_level'] + vars_to_group_level_one_clusters_by = [ + "route_id", + "direction_id", + "time_group", + "dclass", + ] + cluster_id_vars_on_2nd_level = [ + "route_id", + "direction_id", + "time_group", + "dclass", + "cluster_on_reclustered_level", + ] # This section same as in recluster(). Consider removing recluster() if not used in future # Start of recluster() @@ -621,7 +734,9 @@ async def recluster_analysis(route_ids: list[str], from_oday: date, to_oday: dat EPSILON = EPS_DISTANCE_2 / EARHT_RADIUS_KM min_weighted_samples = MIN_WEIGHTED_SAMPLES group_count = g.ngroups - logger.debug(f"Data to be processed with DBSCAN. Rows: {clusters.shape[0]}, groups: {group_count}") + logger.debug( + f"Data to be processed with DBSCAN. Rows: {clusters.shape[0]}, groups: {group_count}" + ) for i, (group_key, sub) in enumerate(g, start=1): sub = sub.rename(columns={"cluster": "cluster_on_departure_level"}) X = np.radians(sub[["lat_median", "long_median"]]) @@ -632,7 +747,9 @@ async def recluster_analysis(route_ids: list[str], from_oday: date, to_oday: dat metric="haversine", ) - sub["cluster_on_reclustered_level"] = clusterer.fit_predict(X, sample_weight=sub["weight"]) + sub["cluster_on_reclustered_level"] = clusterer.fit_predict( + X, sample_weight=sub["weight"] + ) sub = sub[sub["cluster_on_reclustered_level"] != -1] if sub.empty: continue @@ -644,49 +761,101 @@ async def recluster_analysis(route_ids: list[str], from_oday: date, to_oday: dat if i % 1000 == 0: del sub gc.collect() - await update_recluster_progress(route_ids, from_oday, to_oday, days_to_exclude, f"{i}/{group_count}") + await update_recluster_progress( + route_ids, from_oday, to_oday, days_to_exclude, f"{i}/{group_count}" + ) logger.debug(f"DBSCAN processed {i}/{group_count} groups") - await update_recluster_progress(route_ids, from_oday, to_oday, days_to_exclude, f"{group_count}/{group_count}") + await update_recluster_progress( + route_ids, + from_oday, + to_oday, + days_to_exclude, + f"{group_count}/{group_count}", + ) departure_clusters = pd.concat(dep_clusters) route_clusters = pd.concat(reclustered_clusters) # End of recluster() - n_departures_analyzed = preprocessed_departures.groupby(["route_id", "direction_id", "time_group"], observed=False).size().to_frame().reset_index().rename(columns={0: "n_departures_analyzed"}) - route_clusters = route_clusters[route_clusters["q_50"] >= MIN_MEDIAN_DELAY_IN_CLUSTER] - route_clusters = route_clusters.merge(n_departures_analyzed, how="left", on=["route_id", "direction_id", "time_group"]) - route_clusters["share_of_departures"] = route_clusters["n_departures"] / route_clusters["n_departures_analyzed"] * 100 + n_departures_analyzed = ( + preprocessed_departures.groupby( + ["route_id", "direction_id", "time_group"], observed=False + ) + .size() + .to_frame() + .reset_index() + .rename(columns={0: "n_departures_analyzed"}) + ) + route_clusters = route_clusters[ + route_clusters["q_50"] >= MIN_MEDIAN_DELAY_IN_CLUSTER + ] + route_clusters = route_clusters.merge( + n_departures_analyzed, + how="left", + on=["route_id", "direction_id", "time_group"], + ) + route_clusters["share_of_departures"] = ( + route_clusters["n_departures"] + / route_clusters["n_departures_analyzed"] + * 100 + ) - departure_clusters = route_clusters[["route_id", "direction_id", "time_group", "dclass", "cluster_on_reclustered_level"]].merge( - departure_clusters, on=["route_id", "direction_id", "time_group", "dclass", "cluster_on_reclustered_level"], how="inner" + departure_clusters = route_clusters[ + [ + "route_id", + "direction_id", + "time_group", + "dclass", + "cluster_on_reclustered_level", + ] + ].merge( + departure_clusters, + on=[ + "route_id", + "direction_id", + "time_group", + "dclass", + "cluster_on_reclustered_level", + ], + how="inner", ) - route_clusters = ui_related_var_modifications(route_clusters, SEASON_MONTHS, DEPARTURE_THRESHOLD) + route_clusters = ui_related_var_modifications( + route_clusters, SEASON_MONTHS, DEPARTURE_THRESHOLD + ) - route_clusters["route_dir"] = route_clusters["route_id"].astype(str) + " S" + route_clusters["direction_id"].astype(str) + route_clusters["route_dir"] = ( + route_clusters["route_id"].astype(str) + + " S" + + route_clusters["direction_id"].astype(str) + ) bins = list(range(0, 101, 20)) labs = [] for i in range(len(bins) - 1): label = str(bins[i]) + "_" + str(bins[i + 1]) labs.append(label) - + route_clusters["shares_category"] = pd.cut( route_clusters["share_of_departures"], bins=bins, labels=labs, include_lowest=True, ) - route_clusters["share_of_departures"] = round(route_clusters["share_of_departures"], 1) + route_clusters["share_of_departures"] = round( + route_clusters["share_of_departures"], 1 + ) route_clusters = route_clusters.drop("cluster_on_reclustered_level", axis=1) - route_clusters = make_geo_df_WGS84(route_clusters, lat_col="latitude", lon_col="longitude", crs="EPSG:4326") - + route_clusters = make_geo_df_WGS84( + route_clusters, lat_col="latitude", lon_col="longitude", crs="EPSG:4326" + ) + db_route_id = route_ids if not db_route_id: db_route_id = [] flow_analytics_container_client = FlowAnalyticsContainerClient() - + end_time = datetime.now() logger.debug(f"Recluster analysis for routes done in {end_time - start_time}") @@ -764,4 +933,4 @@ async def recluster_analysis(route_ids: list[str], from_oday: date, to_oday: dat flow_analytics_container_client=flow_analytics_container_client, ) end_time = datetime.now() - logger.debug(f"Recluster modes stored to db {end_time - start_time}.")""" \ No newline at end of file + logger.debug(f"Recluster modes stored to db {end_time - start_time}.")"""