Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions infra/scripts/post_deployment.ps1
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Stop script on any error
$ErrorActionPreference = "Stop"

Write-Host "🔍 Fetching container app info from azd environment..."
Write-Host "[Search] Fetching container app info from azd environment..."

# Load values from azd env
$CONTAINER_WEB_APP_NAME = azd env get-value CONTAINER_WEB_APP_NAME
Expand All @@ -24,33 +24,33 @@ $WORKFLOW_APP_PORTAL_URL = "https://portal.azure.com/#resource/subscriptions/$SU
# Get the current script's directory
$ScriptDir = $PSScriptRoot

# Navigate from infra/scripts root src/api/data/data.sh
# Navigate from infra/scripts -> root -> src/api/data/data.sh
$DataScriptPath = Join-Path $ScriptDir "..\..\src\ContentProcessorAPI\samples\schemas"

# Resolve to an absolute path
$FullPath = Resolve-Path $DataScriptPath

# Output
Write-Host ""
Write-Host "🧭 Web App Details:"
Write-Host " Name: $CONTAINER_WEB_APP_NAME"
Write-Host " 🌐 Endpoint: $CONTAINER_WEB_APP_FQDN"
Write-Host " 🔗 Portal URL: $WEB_APP_PORTAL_URL"
Write-Host "[Info] Web App Details:"
Write-Host " [OK] Name: $CONTAINER_WEB_APP_NAME"
Write-Host " [URL] Endpoint: $CONTAINER_WEB_APP_FQDN"
Write-Host " [Link] Portal URL: $WEB_APP_PORTAL_URL"

Write-Host ""
Write-Host "🧭 API App Details:"
Write-Host " Name: $CONTAINER_API_APP_NAME"
Write-Host " 🌐 Endpoint: $CONTAINER_API_APP_FQDN"
Write-Host " 🔗 Portal URL: $API_APP_PORTAL_URL"
Write-Host "[Info] API App Details:"
Write-Host " [OK] Name: $CONTAINER_API_APP_NAME"
Write-Host " [URL] Endpoint: $CONTAINER_API_APP_FQDN"
Write-Host " [Link] Portal URL: $API_APP_PORTAL_URL"

Write-Host ""
Write-Host "🧭 Workflow App Details:"
Write-Host " Name: $CONTAINER_WORKFLOW_APP_NAME"
Write-Host " 🔗 Portal URL: $WORKFLOW_APP_PORTAL_URL"
Write-Host "[Info] Workflow App Details:"
Write-Host " [OK] Name: $CONTAINER_WORKFLOW_APP_NAME"
Write-Host " [Link] Portal URL: $WORKFLOW_APP_PORTAL_URL"

Write-Host ""
Write-Host "📦 Registering schemas and creating schema set..."
Write-Host " Waiting for API to be ready..."
Write-Host "[Package] Registering schemas and creating schema set..."
Write-Host " [Wait] Waiting for API to be ready..."

$MaxRetries = 10
$RetryInterval = 15
Expand All @@ -61,14 +61,14 @@ for ($i = 1; $i -le $MaxRetries; $i++) {
try {
$response = Invoke-WebRequest -Uri "$ApiBaseUrl/schemavault/" -Method GET -UseBasicParsing -TimeoutSec 10 -ErrorAction Stop
if ($response.StatusCode -eq 200) {
Write-Host " API is ready."
Write-Host " [OK] API is ready."
$ApiReady = $true
break
}
} catch {
# Ignore API not ready yet
# Ignore - API not ready yet
}
Write-Host " Attempt $i/$MaxRetries API not ready, retrying in ${RetryInterval}s..."
Write-Host " Attempt $i/$MaxRetries - API not ready, retrying in ${RetryInterval}s..."
Start-Sleep -Seconds $RetryInterval
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,12 @@ class Configuration(_configuration_base):
app_cosmos_container_process: str = Field(
default="Processes", alias="APP_COSMOS_CONTAINER_PROCESS"
)
app_storage_blob_url: str = Field(default="", alias="APP_STORAGE_BLOB_URL")
app_storage_queue_url: str = Field(default="", alias="APP_STORAGE_QUEUE_URL")
app_storage_blob_url: str = Field(
default="", alias="APP_STORAGE_BLOB_URL"
)
app_storage_queue_url: str = Field(
default="", alias="APP_STORAGE_QUEUE_URL"
)
app_message_queue_extract: str = Field(
default="content-pipeline-extract-queue", alias="APP_MESSAGE_QUEUE_EXTRACT"
)
Expand Down
13 changes: 6 additions & 7 deletions src/ContentProcessorWorkflow/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"""

import asyncio
import logging
import os

from sas.storage.blob.async_helper import AsyncStorageBlobHelper
Expand All @@ -26,8 +25,6 @@
from services.content_process_service import ContentProcessService
from steps.claim_processor import ClaimProcessor

logger = logging.getLogger(__name__)


class Application(ApplicationBase):
"""Local-development application that runs a single claim workflow.
Expand All @@ -44,7 +41,10 @@ def __init__(self):

def initialize(self):
"""Bootstrap the application context and register services."""
logger.info("Application initialized with configuration (secrets redacted)")
print(
"Application initialized with configuration:",
self.application_context.configuration,
)

self.register_services()

Expand All @@ -58,9 +58,8 @@ def register_services(self):
)

(
self.application_context.add_singleton(
DebuggingMiddleware, DebuggingMiddleware
)
self.application_context
.add_singleton(DebuggingMiddleware, DebuggingMiddleware)
.add_singleton(LoggingFunctionMiddleware, LoggingFunctionMiddleware)
.add_singleton(InputObserverMiddleware, InputObserverMiddleware)
.add_singleton(Mem0AsyncMemoryManager, Mem0AsyncMemoryManager)
Expand Down
10 changes: 6 additions & 4 deletions src/ContentProcessorWorkflow/src/main_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ def initialize(self):
Populates the DI container with agent-framework helpers, middlewares,
repository services, and the queue-processing service.
"""
logger.info("Application initialized with configuration (secrets redacted)")
print(
"Application initialized with configuration:",
self.application_context.configuration,
)
self.register_services()

def register_services(self):
Expand All @@ -114,9 +117,8 @@ def register_services(self):
)

(
self.application_context.add_singleton(
DebuggingMiddleware, DebuggingMiddleware
)
self.application_context
.add_singleton(DebuggingMiddleware, DebuggingMiddleware)
.add_singleton(LoggingFunctionMiddleware, LoggingFunctionMiddleware)
.add_singleton(InputObserverMiddleware, InputObserverMiddleware)
.add_singleton(Mem0AsyncMemoryManager, Mem0AsyncMemoryManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import json
import logging
import uuid
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone

from azure.identity import DefaultAzureCredential
Expand Down Expand Up @@ -86,6 +85,9 @@ def _get_blob_helper(self) -> StorageBlobHelper:
account_name=self._config.app_storage_account_name,
credential=self._credential,
)
# Ensure the processes container exists (sas-storage does not
# auto-create containers on upload, unlike the API's helper).
self._blob_helper.create_container(self._config.app_cps_processes)
return self._blob_helper

def _get_queue_client(self) -> QueueClient:
Expand All @@ -98,6 +100,9 @@ def _get_queue_client(self) -> QueueClient:
)
return self._queue_client

# ------------------------------------------------------------------ #
# submit — replaces POST /contentprocessor/submit
# ------------------------------------------------------------------ #
async def submit(
self,
file_bytes: bytes,
Expand All @@ -115,29 +120,13 @@ async def submit(
# 1. Upload file to blob: {cps-processes}/{process_id}/{filename}
container_name = self._config.app_cps_processes
blob_helper = self._get_blob_helper()
await asyncio.to_thread(
blob_helper.upload_blob,
blob_helper.upload_blob(
container_name=container_name,
blob_name=f"{process_id}/{filename}",
data=file_bytes,
)

# 2. Insert Cosmos record BEFORE enqueuing — the external
# ContentProcessor does find_document({"process_id": ...}) and
# only $set-updates the existing doc. If the doc doesn't exist
# yet, it inserts a duplicate without the "id" field, causing
# get_status (which queries by "id") to always see "processing".
record = ContentProcessRecord(
id=process_id,
process_id=process_id,
processed_file_name=filename,
processed_file_mime_type=mime_type,
status="processing",
imported_time=datetime.now(timezone.utc),
)
await self._process_repo.add_async(record)

# 3. Enqueue processing message (after Cosmos record exists)
# 2. Enqueue processing message
message = ContentProcessMessage(
process_id=process_id,
files=[
Expand Down Expand Up @@ -171,14 +160,25 @@ async def submit(
completed_steps=[],
),
)
await asyncio.to_thread(
self._get_queue_client().send_message,
message.model_dump_json(),
self._get_queue_client().send_message(message.model_dump_json())

# 3. Insert initial Cosmos record via sas-cosmosdb
record = ContentProcessRecord(
id=process_id,
process_id=process_id,
processed_file_name=filename,
processed_file_mime_type=mime_type,
status="processing",
imported_time=datetime.now(timezone.utc),
)
await self._process_repo.add_async(record)

logger.info("Submitted process %s for file %s", process_id, filename)
return process_id

# ------------------------------------------------------------------ #
# get_status — replaces GET /contentprocessor/status/{id}
# ------------------------------------------------------------------ #
async def get_status(self, process_id: str) -> dict | None:
"""Query Cosmos for process status.

Expand All @@ -194,6 +194,9 @@ async def get_status(self, process_id: str) -> dict | None:
"file_name": getattr(record, "processed_file_name", "") or "",
}

# ------------------------------------------------------------------ #
# get_processed — replaces GET /contentprocessor/processed/{id}
# ------------------------------------------------------------------ #
async def get_processed(self, process_id: str) -> dict | None:
"""Query Cosmos for the full processed content result.

Expand All @@ -204,6 +207,9 @@ async def get_processed(self, process_id: str) -> dict | None:
return None
return record.model_dump()

# ------------------------------------------------------------------ #
# get_steps — replaces GET /contentprocessor/processed/{id}/steps
# ------------------------------------------------------------------ #
def get_steps(self, process_id: str) -> list | None:
"""Download step_outputs.json from blob storage.

Expand All @@ -219,28 +225,25 @@ def get_steps(self, process_id: str) -> list | None:
)
return json.loads(data.decode("utf-8"))
except Exception:
logger.debug("step_outputs.json not found for process %s", process_id)
logger.debug(
"step_outputs.json not found for process %s", process_id
)
return None

# ------------------------------------------------------------------ #
# poll_status — replaces the HTTP polling loop
# ------------------------------------------------------------------ #
async def poll_status(
self,
process_id: str,
poll_interval_seconds: float = 5.0,
timeout_seconds: float = 600.0,
on_status_change: Callable[[str, dict], Awaitable[None]] | None = None,
) -> dict:
"""Poll Cosmos for status until terminal state or timeout.

Args:
on_status_change: Optional async callback invoked whenever the
status value changes between polls. Receives
``(new_status, result_dict)``.

Returns the final status dict with keys: status, process_id, file_name.
"""
elapsed = 0.0
last_status: str | None = None
result: dict | None = None
while elapsed < timeout_seconds:
result = await self.get_status(process_id)
if result is None:
Expand All @@ -252,18 +255,6 @@ async def poll_status(
}

status = result.get("status", "processing")

if status != last_status:
logger.info(
"Poll status change: process_id=%s %s -> %s",
process_id,
last_status,
status,
)
last_status = status
if on_status_change is not None:
await on_status_change(status, result)

if status in ("Completed", "Error"):
result["terminal"] = True
return result
Expand All @@ -282,6 +273,3 @@ async def poll_status(
def close(self):
"""Release connections."""
self._blob_helper = None
if self._queue_client is not None:
self._queue_client.close()
self._queue_client = None
Loading
Loading