From 9766f9aa54260e9984318e1b6b48c6471bb10cec Mon Sep 17 00:00:00 2001 From: Mia Patrikios Date: Thu, 29 Jan 2026 11:40:17 -0500 Subject: [PATCH 1/3] Refactor logging messages for clarity, update datetime handling to use timezone-aware methods, and add Google Cloud Storage utility functions --- .firebase/hosting.ZnJvbnRlbmQ.cache | 10 +- backend/app/api/direct_upload.py | 126 ++++--- backend/app/api/download.py | 380 +++++++++++----------- backend/app/api/process_tasks.py | 195 ++++++----- backend/app/api/upload.py | 64 ++-- backend/app/core/config.py | 4 +- backend/app/core/gcs.py | 67 ++++ backend/app/models/processing.py | 13 +- backend/app/models/usage.py | 14 +- backend/app/services/detector.py | 107 +++--- backend/entrypoint.sh | 12 +- backend/main.py | 59 ++-- frontend/index.html | 4 +- frontend/static/js/analytics-dashboard.js | 44 +-- frontend/static/js/photo-processor.js | 6 +- frontend/static/js/state-manager.js | 15 +- 16 files changed, 544 insertions(+), 576 deletions(-) create mode 100644 backend/app/core/gcs.py diff --git a/.firebase/hosting.ZnJvbnRlbmQ.cache b/.firebase/hosting.ZnJvbnRlbmQ.cache index 35d360b..2a120df 100644 --- a/.firebase/hosting.ZnJvbnRlbmQ.cache +++ b/.firebase/hosting.ZnJvbnRlbmQ.cache @@ -1,15 +1,14 @@ +index.html,1769607006930,13a2777ea38d55eae096e7be4fe8ef948e4ed9fb7ea6b2a2cd9902418edf3038 404.html,1766671073811,05cbc6f94d7a69ce2e29646eab13be2c884e61ba93e3094df5028866876d18b3 -static/js/state-manager.js,1768928200351,567bcef4c1959bec4dce544d241e5e6f2e419deab1b6b4a68c7ef74043ab44a0 static/js/script.js,1768928200351,07e174130e2a2415c57d17b4048ac70d866c63f08990a40dbaf9bc40b821fe19 -static/js/photo-processor.js,1768928200350,95be2df099eb9793da9778ee8cd87283627bf80e52133719092c423e0757d355 static/js/config.js,1767980752113,78a50979181fc33d8cd9d3632de0ff48baa9472ca57ddd99e309b45e3d9561c7 static/js/batch-operations.js,1765904443892,150c9a4eaf48f392a41a00ac69d3c7e36730146565c4bed007998132f5294170 -static/js/analytics-dashboard.js,1768583277770,ccfac25ed7a067c948ff0c272c3d1c3c87a4125985dea2857f154a211af3ea1b static/js/components/atoms/Button.js,1766671073815,857f21eae50e3bf582dd64a5d928e638dc8b7901b50956f0f7916220d47c22e8 static/images/milind.jpeg,1768928200350,3ab0c056b70237858b14628beb76f4030f3ab8b61c77a5407ff4ce74c0abc3dd static/images/miapatrikios.jpeg,1755189202858,33154529bc40f43447874a8dbadab1146a7579a8acef292530e0af48b5f32b4c static/images/jonahfreedman.jpeg,1755189265752,379959114a796a7c9fc80be8ed4a87f1901b28a02999814fb8c5ba409e64481a static/images/bridgerjones.jpeg,1768928200347,e796c96c90df3149a67c6ee2e92dcb733e38b00e0865ed6ee7b55130edcb13fa +static/images/TS_Logo.png,1768934071539,4e1da35a1b5fc8ba15bc71cba56e09dc86bba260df8686d211e2873cc096846a static/css/photo-viewer.css,1767673385113,eb3a5ee1584eced2b5166212162ee397015d761cda66f81d40df25cafb055e7a static/css/global.css,1768928200340,edf2b29c9a411c43f1fccba9a1552dd46cb17a0faf2b574753e30dd1751592b4 static/css/core.css,1767673385113,b7b687a0b9841fdb45b1573751f17447080915933142be87a9401d7707eabfb1 @@ -21,5 +20,6 @@ components/tier-display.js,1767980752111,4844e8c8a28b8d077e4a3ae0630db3a51331f50 components/pricing-cards-manager.js,1768928200339,7ec111302220cebdb92d1f4210acd35027a463de25636b7c0794cdafb129dbe8 components/pricing-card.js,1768928200339,7d959c824881f01d0a55b6b5866359d326431f2a96a07743d42b2b6507a65f62 components/payment-form.js,1768928200338,5b40dd9510f4869f2f49fd8673e14e4e47dc96160903ff01abb9bdd76cf6ae27 -static/images/TS_Logo.png,1768934071539,4e1da35a1b5fc8ba15bc71cba56e09dc86bba260df8686d211e2873cc096846a -index.html,1768934071631,b7b2388753d9b176822fa515f799072956540975de051fc8048e2438c7e27b4e +static/js/state-manager.js,1769628397905,653a2de3641186a2613587318b58dbadb20ab63299dd9c17883bb7e7b743ac1a +static/js/analytics-dashboard.js,1769628362925,124395bb129edb4cad576f27f2db7f50616d52d62de0dc394d9902b8edb88015 +static/js/photo-processor.js,1769628409934,8d5e2b6274a680a41bfbcfd46478773613d9085bae696feea4154112d6179a81 diff --git a/backend/app/api/direct_upload.py b/backend/app/api/direct_upload.py index 81ce3f7..f55a555 100644 --- a/backend/app/api/direct_upload.py +++ b/backend/app/api/direct_upload.py @@ -10,15 +10,16 @@ from typing import List from fastapi import APIRouter, Depends, HTTPException, status -from google.cloud import storage from sqlalchemy.orm import Session from app.api.auth import get_current_user from app.core.config import settings +from app.core.gcs import get_gcs_bucket +from app.core.security_config import ALLOWED_EXTENSIONS from app.models.processing import PhotoDB, ProcessingStatus from app.models.schemas import ( SignedUploadRequest, - SignedUploadResponse, + SignedUploadResponse, SignedUrlInfo, UploadCompletionRequest, UploadCompletionResponse, @@ -31,29 +32,11 @@ router = APIRouter() logger = logging.getLogger(__name__) - -# GCS Configuration -BUCKET_NAME = settings.bucket_name -ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tiff", ".bmp"} MAX_FILE_SIZE = settings.get_max_file_size_bytes() -# Initialize GCS Client -try: - if BUCKET_NAME: - storage_client = storage.Client() - bucket = storage_client.bucket(BUCKET_NAME) - logger.info(f"โœ… Direct upload: Connected to GCS bucket: {BUCKET_NAME}") - else: - logger.error("โŒ No bucket configured for direct uploads") - bucket = None -except Exception as e: - logger.error(f"โŒ Could not connect to GCS for direct uploads: {e}") - bucket = None - # Utility functions - def get_file_extension(filename: str) -> str: return os.path.splitext(filename.lower())[1] @@ -72,34 +55,35 @@ async def get_signed_upload_urls( Step 1: Generate signed URLs for direct browser-to-GCS uploads. This replaces the slow server proxy method with fast direct uploads. """ - + try: - logger.info(f"๐Ÿ”— User {current_user.id} requesting signed URLs for {len(request.files)} files") - + logger.info(f"User {current_user.id} requesting signed URLs for {len(request.files)} files") + + bucket = get_gcs_bucket() if not bucket: - logger.error("โŒ GCS bucket not configured") + logger.error("GCS bucket not configured") raise HTTPException( - status_code=500, + status_code=500, detail="Google Cloud Storage not configured" ) - + if not request.files: raise HTTPException(status_code=400, detail="No files specified") - + # Check quota before generating URLs photo_count = len(request.files) can_upload, quota_message = usage_tracker.check_user_quota( db, current_user.id, ActionType.UPLOAD, photo_count ) - + if not can_upload: raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, + status_code=status.HTTP_403_FORBIDDEN, detail=quota_message ) - + signed_urls = [] - + for file_info in request.files: # Validate file if not is_allowed_file(file_info.filename): @@ -108,37 +92,36 @@ async def get_signed_upload_urls( detail=f"File {file_info.filename} has invalid extension. " f"Allowed: {', '.join(ALLOWED_EXTENSIONS)}", ) - + if file_info.size > MAX_FILE_SIZE: raise HTTPException( status_code=400, detail=f"File {file_info.filename} exceeds maximum size " f"of {settings.max_file_size_mb}MB", ) - + # Generate unique photo ID and GCS filename photo_id = str(uuid.uuid4()) file_extension = get_file_extension(file_info.filename) gcs_filename = f"{photo_id}{file_extension}" blob_path = f"{current_user.id}/{gcs_filename}" - + try: # Create blob reference blob = bucket.blob(blob_path) - + # Generate signed URL for PUT operation (valid for 15 minutes) - # Use service_account_email=None for Cloud Run ADC compatibility from google.auth import default from google.auth.transport import requests as google_requests - + # Get the default credentials and service account email credentials, project_id = default() auth_request = google_requests.Request() credentials.refresh(auth_request) - + # For Cloud Run, use the default service account service_account_email = credentials.service_account_email if hasattr(credentials, 'service_account_email') else None - + signed_url = blob.generate_signed_url( version="v4", expiration=timedelta(minutes=15), @@ -147,7 +130,7 @@ async def get_signed_upload_urls( service_account_email=service_account_email, access_token=credentials.token if hasattr(credentials, 'token') else None, ) - + signed_urls.append(SignedUrlInfo( photo_id=photo_id, filename=file_info.filename, @@ -158,28 +141,28 @@ async def get_signed_upload_urls( content_type=file_info.content_type, size=file_info.size )) - - logger.info(f"โœ… Generated signed URL for {file_info.filename} -> {photo_id}") - + + logger.info(f"Generated signed URL for {file_info.filename} -> {photo_id}") + except Exception as e: - logger.error(f"โŒ Failed to generate signed URL for {file_info.filename}: {e}") + logger.error(f"Failed to generate signed URL for {file_info.filename}: {e}") raise HTTPException( status_code=500, detail=f"Failed to generate signed URL for {file_info.filename}" ) - + return SignedUploadResponse( signed_urls=signed_urls, expires_in_minutes=15, - bucket_name=BUCKET_NAME, + bucket_name=settings.bucket_name, message=f"Generated {len(signed_urls)} signed URLs for direct upload" ) - + except HTTPException: # Re-raise HTTP exceptions as-is raise except Exception as e: - logger.error(f"โŒ Unexpected error in signed URL generation: {e}") + logger.error(f"Unexpected error in signed URL generation: {e}") raise HTTPException( status_code=500, detail="Internal server error during signed URL generation" @@ -196,29 +179,30 @@ async def complete_upload( Step 3: Record successful direct uploads in the database. Called by frontend after files have been uploaded directly to GCS. """ - + if not request.completed_uploads: raise HTTPException(status_code=400, detail="No completed uploads provided") - + + bucket = get_gcs_bucket() successful_photos = [] failed_photos = [] total_file_size_mb = 0.0 - + for upload in request.completed_uploads: try: # Optional: Verify the file actually exists in GCS blob_path = f"{current_user.id}/{upload.gcs_filename}" - + if bucket: blob = bucket.blob(blob_path) if not blob.exists(): - logger.warning(f"โš ๏ธ File not found in GCS: {blob_path}") + logger.warning(f"File not found in GCS: {blob_path}") failed_photos.append({ "photo_id": upload.photo_id, "error": "File not found in Google Cloud Storage" }) continue - + # Create database record using our optimized PhotoDB model photo_db = PhotoDB( photo_id=upload.photo_id, @@ -226,37 +210,37 @@ async def complete_upload( original_filename=upload.original_filename, file_path=blob_path, file_size_bytes=upload.size, - file_extension=upload.file_extension, # โœ… This fixes the O(N) -> O(1) issue! + file_extension=upload.file_extension, processing_status=ProcessingStatus.PENDING ) - + db.add(photo_db) - + # Calculate stats file_size_mb = upload.size / (1024 * 1024) total_file_size_mb += file_size_mb - + successful_photos.append(upload.photo_id) - - logger.info(f"โœ… Recorded direct upload: {upload.photo_id}") - + + logger.info(f"Recorded direct upload: {upload.photo_id}") + except Exception as e: - logger.error(f"โŒ Failed to record upload for {upload.photo_id}: {e}") + logger.error(f"Failed to record upload for {upload.photo_id}: {e}") failed_photos.append({ "photo_id": upload.photo_id, "error": str(e) }) - + # Commit successful uploads try: db.commit() - + # Update usage tracking if successful_photos: usage_tracker.use_quota( db, current_user.id, ActionType.UPLOAD, len(successful_photos) ) - + usage_tracker.log_action( db=db, user_id=current_user.id, @@ -265,18 +249,18 @@ async def complete_upload( file_size_mb=total_file_size_mb, success=True, ) - + current_user.increment_photos_uploaded(len(successful_photos)) db.commit() - + except Exception as e: - logger.error(f"โŒ Database commit failed: {e}") + logger.error(f"Database commit failed: {e}") db.rollback() raise HTTPException(status_code=500, detail="Failed to record uploads") - + # Get updated quota info updated_quota = usage_tracker.get_or_create_user_quota(db, current_user.id) - + return UploadCompletionResponse( successful_uploads=len(successful_photos), failed_uploads=len(failed_photos), @@ -294,4 +278,4 @@ async def complete_upload( }, message=f"Successfully recorded {len(successful_photos)} uploads" + (f", {len(failed_photos)} failed" if failed_photos else "") - ) \ No newline at end of file + ) diff --git a/backend/app/api/download.py b/backend/app/api/download.py index a83ee79..0277b5a 100644 --- a/backend/app/api/download.py +++ b/backend/app/api/download.py @@ -3,19 +3,18 @@ import uuid import zipfile from collections import defaultdict -from datetime import timedelta -from typing import Dict, List, Optional +from datetime import datetime +from typing import Dict, List from fastapi import APIRouter, Depends, HTTPException -from google.cloud import storage from sqlalchemy.orm import Session from app.api.auth import get_current_user -from app.api.process_tasks import detector as process_detector from app.core.config import settings +from app.core.gcs import get_gcs_bucket, generate_signed_url +from app.models.processing import ExportDB, ExportStatus, PhotoDB from app.models.schemas import ExportRequest from app.models.user import User -from app.services.detector import NumberDetector from database import get_db logger = logging.getLogger(__name__) @@ -23,48 +22,6 @@ router = APIRouter() EXPORT_DIR = "exports" -detector = NumberDetector() - -# Store export metadata with user association -export_metadata: Dict[str, dict] = {} - -# GCS Configuration (reusing pattern from upload.py) -BUCKET_NAME = settings.bucket_name - -# Initialize GCS Client (reusing pattern from upload.py) -try: - if BUCKET_NAME: - storage_client = storage.Client() - bucket = storage_client.bucket(BUCKET_NAME) - else: - bucket = None -except Exception as e: - bucket = None - - -def generate_signed_download_url(blob, expires_minutes=15): - """ - Reuse existing signed URL logic from upload.py:99-106 - """ - try: - from google.auth import default - from google.auth.transport import requests as google_requests - - credentials, _ = default() - auth_request = google_requests.Request() - credentials.refresh(auth_request) - - signed_url = blob.generate_signed_url( - version="v4", - expiration=timedelta(minutes=expires_minutes), - method="GET", - service_account_email=getattr(credentials, 'service_account_email', None), - access_token=getattr(credentials, 'token', None), - ) - return signed_url - except Exception as e: - logger.error(f"Failed to generate signed URL: {e}") - return None @router.post("/export") @@ -76,37 +33,43 @@ async def create_export( if not request.photo_ids: raise HTTPException(status_code=400, detail="No photo IDs provided") - export_id = str(uuid.uuid4()) + bucket = get_gcs_bucket() + if not bucket: + raise HTTPException(status_code=500, detail="Google Cloud Storage not configured") + export_id = str(uuid.uuid4()) zip_filename = f"tag_photos_{export_id}.zip" - - # GCS storage path gcs_blob_path = f"{current_user.id}/exports/{zip_filename}" - + # Temporary local path for ZIP creation before GCS upload temp_dir = os.path.join(EXPORT_DIR, "temp") os.makedirs(temp_dir, exist_ok=True) temp_zip_path = os.path.join(temp_dir, f"{export_id}_{zip_filename}") - # Store metadata for access control - export_metadata[export_id] = { - "user_id": current_user.id, - "filename": zip_filename, - "gcs_blob_path": gcs_blob_path, - } + # Create ExportDB record with CREATING status + export_record = ExportDB( + export_id=export_id, + user_id=current_user.id, + filename=zip_filename, + file_path=gcs_blob_path, + photo_count=len(request.photo_ids), + status=ExportStatus.CREATING + ) + db.add(export_record) + db.commit() try: logger.info(f"Creating export {export_id} for user {current_user.id} with {len(request.photo_ids)} photos") - + # Group photos by bib number with hybrid organization - grouped_photos = await organize_photos_by_bib(request.photo_ids, current_user.id, db) - + grouped_photos, photos_missing = await organize_photos_by_bib(request.photo_ids, current_user.id, db) + logger.info(f"Grouped photos into {len(grouped_photos)} groups: {list(grouped_photos.keys())}") - + files_added = 0 total_size = 0 - # Create temporary ZIP file for GCS upload + # Create temporary ZIP file, downloading photos from GCS with zipfile.ZipFile(temp_zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: for bib_number, photos in grouped_photos.items(): folder_name = ( @@ -114,10 +77,18 @@ async def create_export( ) logger.info(f"Processing folder '{folder_name}' with {len(photos)} photos") - for i, (photo_id, photo_path) in enumerate(photos, 1): - if photo_path and os.path.exists(photo_path): - # Get original filename and extension - original_filename = os.path.basename(photo_path) + for i, (photo_id, blob_path, original_filename) in enumerate(photos, 1): + try: + # Download file from GCS + blob = bucket.blob(blob_path) + if not blob.exists(): + logger.warning(f"Skipping photo {photo_id}: blob not found at {blob_path}") + continue + + file_data = blob.download_as_bytes() + file_size = len(file_data) + + # Get filename parts name_part, ext = os.path.splitext(original_filename) # Create hybrid filename: bibNumber_originalName_sequence.ext @@ -134,53 +105,68 @@ async def create_export( # Add to ZIP with folder structure arcname = f"{folder_name}/{new_filename}" - file_size = os.path.getsize(photo_path) - - logger.debug(f"Adding file: {photo_path} -> {arcname} ({file_size} bytes)") - zipf.write(photo_path, arcname) - + + logger.debug(f"Adding file: {blob_path} -> {arcname} ({file_size} bytes)") + zipf.writestr(arcname, file_data) + files_added += 1 total_size += file_size - else: - logger.warning(f"Skipping missing file: {photo_path} for photo_id: {photo_id}") + except Exception as e: + logger.warning(f"Failed to download photo {photo_id} from {blob_path}: {e}") logger.info(f"Export {export_id} completed: {files_added} files added, total size: {total_size} bytes") - + zip_size = os.path.getsize(temp_zip_path) if os.path.exists(temp_zip_path) else 0 logger.info(f"Temporary ZIP file created: {temp_zip_path}, size: {zip_size}") - # Upload ZIP to GCS (required - no fallback) - if not bucket: - raise HTTPException(status_code=500, detail="Google Cloud Storage not configured") - if not os.path.exists(temp_zip_path): + export_record.status = ExportStatus.FAILED + export_record.error_message = "Failed to create export file" + db.commit() raise HTTPException(status_code=500, detail="Failed to create export file") - + try: blob = bucket.blob(gcs_blob_path) with open(temp_zip_path, 'rb') as zip_file: blob.upload_from_file(zip_file, content_type='application/zip') - - logger.info(f"โœ… ZIP uploaded to GCS: {gcs_blob_path}") - + + logger.info(f"ZIP uploaded to GCS: {gcs_blob_path}") + + # Update ExportDB record with success status + export_record.status = ExportStatus.READY + export_record.file_size_bytes = total_size + export_record.completed_at = datetime.utcnow() + export_record.set_expiration(days=7) + db.commit() + except Exception as e: - logger.error(f"โŒ Failed to upload ZIP to GCS: {e}") + logger.error(f"Failed to upload ZIP to GCS: {e}") + export_record.status = ExportStatus.FAILED + export_record.error_message = str(e) + db.commit() raise HTTPException(status_code=500, detail=f"Failed to upload export: {str(e)}") - + finally: # Always clean up temporary file if os.path.exists(temp_zip_path): os.remove(temp_zip_path) - logger.info(f"๐Ÿงน Cleaned up temporary ZIP file: {temp_zip_path}") - + logger.info(f"Cleaned up temporary ZIP file: {temp_zip_path}") + return { "export_id": export_id, "download_url": f"/api/download/file/{export_id}", "files_included": files_added, + "photos_missing": photos_missing, "total_size_bytes": total_size, } + except HTTPException: + raise except Exception as e: + # Update export record with failure status + export_record.status = ExportStatus.FAILED + export_record.error_message = str(e) + db.commit() raise HTTPException( status_code=500, detail=f"Failed to create export: {str(e)}" ) @@ -188,130 +174,128 @@ async def create_export( @router.get("/file/{export_id}") async def download_export( - export_id: str, current_user: User = Depends(get_current_user) + export_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db) ): """ Return signed URL for ZIP download instead of streaming the file. - Reuses existing signed URL logic from upload.py. + Uses database for multi-instance Cloud Run compatibility. """ - - # Check if export exists and user has access - if export_id not in export_metadata: + + # Query database for export record with user ownership check + export_record = db.query(ExportDB).filter( + ExportDB.export_id == export_id, + ExportDB.user_id == current_user.id + ).first() + + if not export_record: raise HTTPException(status_code=404, detail="Export not found") - - export_info = export_metadata[export_id] - if export_info["user_id"] != current_user.id: - raise HTTPException(status_code=403, detail="Access denied") - - # GCS signed URL (production method) - if bucket and "gcs_blob_path" in export_info: - gcs_blob_path = export_info["gcs_blob_path"] - blob = bucket.blob(gcs_blob_path) - - if blob.exists(): - # Generate signed URL using existing logic from upload.py - signed_url = generate_signed_download_url(blob, expires_minutes=15) - if signed_url: - logger.info(f"โœ… Generated signed URL for export {export_id}") - return { - "signed_url": signed_url, - "filename": export_info["filename"], - "expires_in_minutes": 15, - } - - # If we reach here, the file is not available - logger.error(f"โŒ Export {export_id} not found in GCS or failed to generate signed URL") - raise HTTPException(status_code=404, detail="Export file not found or expired") - - -def find_photo_path(photo_id: str, user_id: int = None) -> Optional[str]: - """Find photo path, checking user-specific directories first.""" - extensions = [".jpg", ".jpeg", ".png", ".tiff", ".bmp"] - - # Check user-specific directories first (if user_id provided) - if user_id: - for directory in ["processed", "uploads"]: - user_dir = os.path.join(directory, str(user_id)) - for ext in extensions: - path = os.path.join(user_dir, f"{photo_id}{ext}") - logger.debug(f"Checking user-specific path: {path}") - if os.path.exists(path): - logger.info(f"Found photo at user-specific path: {path}") - return path - - # Fallback to global directories (legacy support) - for directory in ["processed", "uploads"]: - for ext in extensions: - path = os.path.join(directory, f"{photo_id}{ext}") - logger.debug(f"Checking global path: {path}") - if os.path.exists(path): - logger.info(f"Found photo at global path: {path}") - return path - - logger.warning(f"Photo not found: {photo_id} (user_id: {user_id})") - return None - - -async def organize_photos_by_bib(photo_ids: List[str], user_id: int, db: Session = None) -> Dict[str, List[tuple]]: - """Organize photos by bib number with numerical sorting""" + + # Check if export is ready + if export_record.status != ExportStatus.READY: + if export_record.status == ExportStatus.CREATING: + raise HTTPException(status_code=202, detail="Export is still being created") + elif export_record.status == ExportStatus.EXPIRED: + raise HTTPException(status_code=410, detail="Export has expired") + else: + raise HTTPException(status_code=500, detail=f"Export failed: {export_record.error_message}") + + # Check if export has expired + if export_record.is_expired(): + export_record.status = ExportStatus.EXPIRED + db.commit() + raise HTTPException(status_code=410, detail="Export has expired") + + bucket = get_gcs_bucket() + if not bucket: + raise HTTPException(status_code=500, detail="Google Cloud Storage not configured") + + blob = bucket.blob(export_record.file_path) + + if not blob.exists(): + logger.error(f"Export {export_id} blob not found at {export_record.file_path}") + raise HTTPException(status_code=404, detail="Export file not found") + + # Generate signed URL with Content-Disposition to force download + signed_url = generate_signed_url( + blob, method="GET", expires_minutes=15, download_filename=export_record.filename + ) + if not signed_url: + raise HTTPException(status_code=500, detail="Failed to generate download URL") + + # Track download + export_record.mark_downloaded() + db.commit() + + logger.info(f"Generated signed URL for export {export_id}") + return { + "signed_url": signed_url, + "filename": export_record.filename, + "expires_in_minutes": 15, + } + + +async def organize_photos_by_bib(photo_ids: List[str], user_id: int, db: Session) -> tuple[Dict[str, List[tuple]], int]: + """ + Organize photos by bib number with numerical sorting. + + Returns: + Tuple of (grouped_photos, missing_count) where grouped_photos is a Dict + with bib numbers as keys and list of (photo_id, photo_path) tuples as values. + """ grouped = defaultdict(list) - + missing_photos = [] + logger.info(f"Organizing {len(photo_ids)} photos for user {user_id}") - - # Debug: Check what directories exist - for directory in ["processed", "uploads"]: - user_dir = os.path.join(directory, str(user_id)) - if os.path.exists(user_dir): - files = os.listdir(user_dir) - logger.info(f"User directory {user_dir} exists with {len(files)} files") - else: - logger.warning(f"User directory {user_dir} does not exist") - - if os.path.exists(directory): - files = os.listdir(directory) - logger.info(f"Global directory {directory} exists with {len(files)} files") + + # Batch query all photos at once (efficient) + photo_records = db.query(PhotoDB).filter( + PhotoDB.photo_id.in_(photo_ids), + PhotoDB.user_id == user_id + ).all() + + # Create lookup dict + photo_map = {p.photo_id: p for p in photo_records} + logger.info(f"Database query returned {len(photo_records)} photos for {len(photo_ids)} requested IDs") for photo_id in photo_ids: - photo_path = find_photo_path(photo_id, user_id) - if not photo_path: - logger.warning(f"Photo path not found for photo_id: {photo_id}") + photo_record = photo_map.get(photo_id) + + if not photo_record: + logger.warning(f"Photo {photo_id}: Not found in database for user {user_id}") + missing_photos.append(photo_id) + continue + + # Use stored file_path from database (this is a GCS blob path) + blob_path = photo_record.file_path + + if not blob_path: + logger.warning(f"Photo {photo_id}: No file_path stored in database") + missing_photos.append(photo_id) continue - # Get detection result from database instead of memory - bib_number = "unknown" - - if db: - from app.models.processing import PhotoDB - - # Query database for photo detection result - photo_record = db.query(PhotoDB).filter( - PhotoDB.photo_id == photo_id, - PhotoDB.user_id == user_id - ).first() - - if photo_record: - # Check manual label first, then detected number - if photo_record.manual_label: - bib_number = photo_record.manual_label - logger.debug(f"Photo {photo_id}: Using manual label {bib_number}") - elif photo_record.detected_number: - bib_number = photo_record.detected_number - logger.debug(f"Photo {photo_id}: Using detected number {bib_number}") - else: - logger.debug(f"Photo {photo_id}: No label found in database, using 'unknown'") - else: - logger.warning(f"Photo {photo_id}: Not found in database for user {user_id}") + # Get bib number: manual label takes priority over detected + if photo_record.manual_label: + bib_number = photo_record.manual_label + logger.debug(f"Photo {photo_id}: Using manual label {bib_number}") + elif photo_record.detected_number: + bib_number = photo_record.detected_number + logger.debug(f"Photo {photo_id}: Using detected number {bib_number}") else: - # Fallback to memory (old behavior) - detection_result = process_detector.results.get(photo_id) - if detection_result and detection_result.bib_number: - bib_number = detection_result.bib_number - logger.debug(f"Photo {photo_id}: Found bib number {bib_number} from memory") - else: - logger.debug(f"Photo {photo_id}: No detection result in memory, using 'unknown'") + bib_number = "unknown" + logger.debug(f"Photo {photo_id}: No label found, using 'unknown'") + + grouped[bib_number].append((photo_id, blob_path, photo_record.original_filename)) + logger.debug(f"Photo {photo_id}: Added to group '{bib_number}' with blob_path {blob_path}") - grouped[bib_number].append((photo_id, photo_path)) - logger.debug(f"Photo {photo_id}: Added to group '{bib_number}' with path {photo_path}") + # Log summary + found_count = sum(len(v) for v in grouped.values()) + if missing_photos: + logger.warning(f"Export grouping: {len(photo_ids)} requested, {found_count} found, {len(missing_photos)} missing") + logger.warning(f"Missing photo IDs (first 10): {missing_photos[:10]}") + else: + logger.info(f"Export grouping: {len(photo_ids)} requested, {found_count} found, 0 missing") # Sort bib numbers numerically (not alphabetically) sorted_grouped = {} @@ -337,4 +321,4 @@ async def organize_photos_by_bib(photo_ids: List[str], user_id: int, db: Session for bib in sorted_bib_order: sorted_grouped[bib] = grouped[bib] - return sorted_grouped + return sorted_grouped, len(missing_photos) diff --git a/backend/app/api/process_tasks.py b/backend/app/api/process_tasks.py index ad07bce..dcae65b 100644 --- a/backend/app/api/process_tasks.py +++ b/backend/app/api/process_tasks.py @@ -9,7 +9,6 @@ from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from pydantic import BaseModel from sqlalchemy.orm import Session -from sqlalchemy import update # --- FIX 1: Top-level imports to prevent NameError in Worker --- from app.models.schemas import DetectionResult, ProcessingJob, ProcessingStatus @@ -52,23 +51,23 @@ class AddBatchRequest(BaseModel): from google.cloud import tasks_v2 task_client = tasks_v2.CloudTasksClient() CLOUD_TASKS_AVAILABLE = True - logger.info("โœ… Cloud Tasks client initialized") + logger.info("Cloud Tasks client initialized") except Exception as e: CLOUD_TASKS_AVAILABLE = False task_client = None - logger.error(f"โŒ Cloud Tasks initialization failed: {e}") + logger.error(f"Cloud Tasks initialization failed: {e}") # In-memory job store jobs: Dict[str, dict] = {} # Final diagnostic summary -logger.info(f"๐Ÿ” DIAGNOSTIC SUMMARY:") -logger.info(f"๐Ÿ” - CLOUD_TASKS_AVAILABLE: {CLOUD_TASKS_AVAILABLE}") -logger.info(f"๐Ÿ” - task_client initialized: {task_client is not None}") -logger.info(f"๐Ÿ” - PROJECT: {PROJECT}") -logger.info(f"๐Ÿ” - LOCATION: {LOCATION}") -logger.info(f"๐Ÿ” - QUEUE: {QUEUE}") -logger.info(f"๐Ÿ” - SERVICE_URL: {SERVICE_URL}") +logger.info(f"DIAGNOSTIC SUMMARY:") +logger.info(f" - CLOUD_TASKS_AVAILABLE: {CLOUD_TASKS_AVAILABLE}") +logger.info(f" - task_client initialized: {task_client is not None}") +logger.info(f" - PROJECT: {PROJECT}") +logger.info(f" - LOCATION: {LOCATION}") +logger.info(f" - QUEUE: {QUEUE}") +logger.info(f" - SERVICE_URL: {SERVICE_URL}") def queue_batch_tasks( @@ -79,7 +78,7 @@ def queue_batch_tasks( Returns the number of tasks successfully created. """ if not task_client: - logger.warning(f"๐Ÿšซ Cloud Tasks not available, cannot queue tasks") + logger.warning(f"Cloud Tasks not available, cannot queue tasks") return 0 queue_path = task_client.queue_path(PROJECT, LOCATION, QUEUE) @@ -90,7 +89,7 @@ def queue_batch_tasks( BATCH_SIZE = 20 photo_batches = [photo_ids[i:i + BATCH_SIZE] for i in range(0, len(photo_ids), BATCH_SIZE)] - logger.info(f"๐Ÿ”„ Queuing {len(photo_batches)} batch tasks for job {job_id[:8]}...") + logger.info(f"Queuing {len(photo_batches)} batch tasks for job {job_id[:8]}...") tasks_created = 0 for batch_idx, photo_batch in enumerate(photo_batches): @@ -117,9 +116,9 @@ def queue_batch_tasks( } task_client.create_task(request={"parent": queue_path, "task": task}) tasks_created += 1 - logger.info(f"โœ… Created task {batch_idx + 1}/{len(photo_batches)}: {len(photo_batch)} photos") + logger.info(f"Created task {batch_idx + 1}/{len(photo_batches)}: {len(photo_batch)} photos") except Exception as task_error: - logger.error(f"โŒ Failed to create task {batch_idx + 1}: {task_error}") + logger.error(f"Failed to create task {batch_idx + 1}: {task_error}") return tasks_created @@ -149,7 +148,7 @@ async def start_processing_with_tasks( status=ProcessingStatus.PENDING, total_photos=total_photos, debug_mode=debug, - ) + ) # 1. Store in-memory jobs[job_id] = {"job": job, "user_id": current_user.id} @@ -162,15 +161,12 @@ async def start_processing_with_tasks( # 3. CRITICAL: Link existing PhotoDB records to the processing job # This allows the worker to find them when updating progress - from app.models.processing import PhotoDB - from app.models.usage import ProcessingJob as ProcessingJobDB - - # Get the INTEGER primary key for this job_id + # Get the INTEGER primary key for this job_id processing_job_record = db.query(ProcessingJobDB).filter( ProcessingJobDB.job_id == job_id, ProcessingJobDB.user_id == current_user.id ).first() - + if processing_job_record: # Use the INTEGER primary key instead of UUID string db.query(PhotoDB).filter( @@ -187,10 +183,10 @@ async def start_processing_with_tasks( tasks_created = queue_batch_tasks(photo_ids, job_id, current_user.id, debug) if tasks_created == 0: - logger.warning(f"๐Ÿšซ No Cloud Tasks created, using fallback async processing") + logger.warning(f"No Cloud Tasks created, using fallback async processing") asyncio.create_task(process_photos_async_fallback(job_id, photo_ids, current_user.id, debug)) else: - logger.info(f"๐ŸŽ‰ {tasks_created} tasks queued successfully for job {job_id[:8]}...") + logger.info(f"{tasks_created} tasks queued successfully for job {job_id[:8]}...") return job @@ -223,7 +219,7 @@ async def add_batch_to_job( raise HTTPException(status_code=404, detail="Job not found") # Restore to memory - logger.info(f"๐Ÿ”„ Restoring job from database for add-batch: {job_id[:8]}...") + logger.info(f"Restoring job from database for add-batch: {job_id[:8]}...") job = ProcessingJob( job_id=db_job.job_id, photo_ids=[], @@ -255,7 +251,7 @@ async def add_batch_to_job( # Queue Cloud Tasks for new batch tasks_created = queue_batch_tasks(photo_ids, job_id, current_user.id, debug_mode=False) - logger.info(f"๐Ÿ“ฅ ADD-BATCH: Added {len(photo_ids)} photos to job {job_id[:8]}..., queued {tasks_created} tasks") + logger.info(f"ADD-BATCH: Added {len(photo_ids)} photos to job {job_id[:8]}..., queued {tasks_created} tasks") return {"status": "ok", "added": len(photo_ids), "tasks_created": tasks_created} @@ -282,37 +278,37 @@ async def process_batch_photos_worker(request: Request): if not all([photo_ids, job_id, user_id]) or not isinstance(photo_ids, list): return {"status": "error", "message": "Missing or invalid payload fields"} - logger.info(f"๐Ÿ”„ BATCH WORKER START: {batch_index}/{total_batches} processing {len(photo_ids)} photos") + logger.info(f"BATCH WORKER START: {batch_index}/{total_batches} processing {len(photo_ids)} photos") - # โฑ๏ธ TIMING: Gemini detection + # TIMING: Gemini detection detection_start = time.time() batch_results = await detector.process_photo_batch( photo_ids, debug_mode=debug_mode, user_id=user_id ) detection_time = (time.time() - detection_start) * 1000 - logger.info(f"โฑ๏ธ BATCH {batch_index}: Gemini detection took {detection_time:.0f}ms for {len(photo_ids)} photos") + logger.debug(f"BATCH {batch_index}: Gemini detection took {detection_time:.0f}ms for {len(photo_ids)} photos") if not batch_results: - logger.error(f"โŒ BATCH FAILED: No results returned from detector") + logger.error(f"BATCH FAILED: No results returned from detector") return {"status": "error", "message": "Batch processing failed"} - # โฑ๏ธ TIMING: Database save + # TIMING: Database save db_save_start = time.time() await save_batch_results_to_database(batch_results, user_id, job_id) db_save_time = (time.time() - db_save_start) * 1000 - logger.info(f"โฑ๏ธ BATCH {batch_index}: DB save took {db_save_time:.0f}ms") + logger.debug(f"BATCH {batch_index}: DB save took {db_save_time:.0f}ms") - # โฑ๏ธ TIMING: Job progress update + # TIMING: Job progress update progress_start = time.time() await update_job_progress(job_id, db) progress_time = (time.time() - progress_start) * 1000 - logger.info(f"โฑ๏ธ BATCH {batch_index}: Progress update took {progress_time:.0f}ms") + logger.debug(f"BATCH {batch_index}: Progress update took {progress_time:.0f}ms") successful_count = len([r for r in batch_results.values() if r.bib_number not in ["unknown", "error"]]) total_batch_time = (time.time() - batch_start_time) * 1000 - logger.info(f"โฑ๏ธ BATCH {batch_index} TOTAL: {total_batch_time:.0f}ms (Detection: {detection_time:.0f}ms, DB: {db_save_time:.0f}ms, Progress: {progress_time:.0f}ms)") - logger.info(f"โœ… Batch {batch_index}/{total_batches}: {successful_count}/{len(photo_ids)} photos detected") + logger.debug(f"BATCH {batch_index} TOTAL: {total_batch_time:.0f}ms (Detection: {detection_time:.0f}ms, DB: {db_save_time:.0f}ms, Progress: {progress_time:.0f}ms)") + logger.info(f"Batch {batch_index}/{total_batches}: {successful_count}/{len(photo_ids)} photos detected") return { "status": "success", @@ -320,9 +316,9 @@ async def process_batch_photos_worker(request: Request): "processed_count": len(batch_results), "successful_count": successful_count } - + except Exception as e: - logger.error(f"๐Ÿ”ฅ Batch worker failed for batch {batch_index if 'batch_index' in locals() else 'unknown'}: {e}") + logger.error(f"Batch worker failed for batch {batch_index if 'batch_index' in locals() else 'unknown'}: {e}") # Raising 500 tells Cloud Tasks to RETRY this batch automatically raise HTTPException(status_code=500, detail=str(e)) finally: @@ -418,11 +414,11 @@ async def save_batch_results_to_database(batch_results: Dict[str, DetectionResul }, synchronize_session=False) db_session.commit() - logger.info(f"โœ… BULK SAVED: {len(successful_updates)} detected + {len(unknown_updates)} unknown = {len(batch_results)} photos (job_pk={processing_job_pk})") + logger.info(f"BULK SAVED: {len(successful_updates)} detected + {len(unknown_updates)} unknown = {len(batch_results)} photos (job_pk={processing_job_pk})") except Exception as e: db_session.rollback() - logger.error(f"โŒ DB Save Failed: {e}") + logger.error(f"DB Save Failed: {e}") raise finally: db_session.close() @@ -437,7 +433,7 @@ async def update_job_progress(job_id: str, db: Session): # Get job data from memory (has expected total from /start) job_data = jobs.get(job_id) if not job_data: - logger.warning(f"โŒ Job not found in memory: {job_id[:8]}...") + logger.warning(f"Job not found in memory: {job_id[:8]}...") return # Use expected_total from job (set at /start), not current linked count @@ -461,7 +457,7 @@ async def update_job_progress(job_id: str, db: Session): PhotoDB.processing_status == ProcessingStatus.COMPLETED ).count() - logger.info(f"๐Ÿ”ข PROGRESS COUNT: {job_id[:8]}... {completed_photos}/{expected_total} photos completed") + logger.debug(f"PROGRESS COUNT: {job_id[:8]}... {completed_photos}/{expected_total} photos completed") if expected_total > 0: progress = int((completed_photos / expected_total) * 100) @@ -472,7 +468,7 @@ async def update_job_progress(job_id: str, db: Session): job_data["job"].progress = progress job_data["job"].completed_photos = completed_photos - logger.info(f"๐Ÿ“Š UPDATING: {job_id[:8]}... progress {old_progress}โ†’{progress}") + logger.debug(f"UPDATING: {job_id[:8]}... progress {old_progress}->{progress}") # Check if job is complete using expected_total (not linked count) if completed_photos >= expected_total: @@ -491,10 +487,13 @@ async def update_job_progress(job_id: str, db: Session): progress=100, completed_at=completed_at, total_processing_time_seconds=processing_time, ) - logger.info(f"๐ŸŽ‰ JOB COMPLETED: {job_id[:8]}... {completed_photos}/{expected_total} in {processing_time:.1f}s" if processing_time else f"๐ŸŽ‰ JOB COMPLETED: {job_id[:8]}... {completed_photos}/{expected_total}") + if processing_time: + logger.info(f"JOB COMPLETED: {job_id[:8]}... {completed_photos}/{expected_total} in {processing_time:.1f}s") + else: + logger.info(f"JOB COMPLETED: {job_id[:8]}... {completed_photos}/{expected_total}") else: usage_tracker.update_processing_job(db=db, job_id=job_id, progress=progress) - logger.info(f"๐Ÿ“ˆ PROGRESS: {job_id[:8]}... {progress}% ({completed_photos}/{expected_total})") + logger.info(f"PROGRESS: {job_id[:8]}... {progress}% ({completed_photos}/{expected_total})") db.commit() @@ -506,8 +505,8 @@ async def update_job_progress(job_id: str, db: Session): @router.get("/status/{job_id}") async def get_processing_status(job_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): """Get the current status of a processing job with real-time database check""" - logger.info(f"๐Ÿ” STATUS REQUEST: {job_id[:8]}... from user {current_user.id}") - + logger.debug(f"STATUS REQUEST: {job_id[:8]}... from user {current_user.id}") + job_data = jobs.get(job_id) if not job_data: # Fallback: check database (handles multi-instance Cloud Run) @@ -516,11 +515,11 @@ async def get_processing_status(job_id: str, current_user: User = Depends(get_cu ProcessingJobDB.user_id == current_user.id ).first() if not db_job: - logger.warning(f"โŒ Job not found in memory or database: {job_id[:8]}...") + logger.warning(f"Job not found in memory or database: {job_id[:8]}...") raise HTTPException(status_code=404, detail="Job not found") # Restore to memory for faster subsequent polls - logger.info(f"๐Ÿ”„ Restoring job from database: {job_id[:8]}...") + logger.info(f"Restoring job from database: {job_id[:8]}...") job = ProcessingJob( job_id=db_job.job_id, photo_ids=[], @@ -533,56 +532,56 @@ async def get_processing_status(job_id: str, current_user: User = Depends(get_cu # SECURITY: Verify job belongs to current user if job_data["user_id"] != current_user.id: - logger.warning(f"โŒ Access denied to job {job_id[:8]}... for user {current_user.id}") + logger.warning(f"Access denied to job {job_id[:8]}... for user {current_user.id}") raise HTTPException(status_code=403, detail="Access denied to job") - + # Log current in-memory state current_status = job_data["job"].status current_progress = job_data["job"].progress - logger.info(f"๐Ÿ“Š CURRENT STATE: {job_id[:8]}... status={current_status}, progress={current_progress}") - + logger.debug(f"CURRENT STATE: {job_id[:8]}... status={current_status}, progress={current_progress}") + # Real-time check: Update job status from database if still processing if current_status == ProcessingStatus.PROCESSING: - logger.info(f"๐Ÿ”„ Job still processing, checking database for updates: {job_id[:8]}...") + logger.debug(f"Job still processing, checking database for updates: {job_id[:8]}...") try: # Force update job progress from database await update_job_progress(job_id, db) - + # Check if status changed after update new_status = job_data["job"].status new_progress = job_data["job"].progress if new_status != current_status or new_progress != current_progress: - logger.info(f"๐Ÿ“ˆ STATUS CHANGE: {job_id[:8]}... {current_status}โ†’{new_status}, {current_progress}โ†’{new_progress}") + logger.debug(f"STATUS CHANGE: {job_id[:8]}... {current_status}->{new_status}, {current_progress}->{new_progress}") else: - logger.info(f"๐Ÿ“Š NO CHANGE: {job_id[:8]}... still {current_status} at {current_progress}%") - + logger.debug(f"NO CHANGE: {job_id[:8]}... still {current_status} at {current_progress}%") + # Timeout protection: If job has been processing for more than 10 minutes, mark as failed from datetime import datetime, timedelta if hasattr(job_data["job"], 'created_at'): time_elapsed = datetime.utcnow() - job_data["job"].created_at if time_elapsed > timedelta(minutes=10): - logger.warning(f"โฐ TIMEOUT: {job_id[:8]}... after {time_elapsed.total_seconds():.1f}s") + logger.warning(f"TIMEOUT: {job_id[:8]}... after {time_elapsed.total_seconds():.1f}s") job_data["job"].status = ProcessingStatus.FAILED job_data["job"].progress = 0 - + except Exception as e: - logger.error(f"โŒ Failed to update job progress for {job_id[:8]}...: {e}") + logger.error(f"Failed to update job progress for {job_id[:8]}...: {e}") else: - logger.info(f"โœ… Job already completed: {job_id[:8]}... status={current_status}") - + logger.debug(f"Job already completed: {job_id[:8]}... status={current_status}") + # Ensure proper JSON serialization by converting to dict if it's a Pydantic model job_response = job_data["job"] if hasattr(job_response, 'dict'): job_response = job_response.dict() - + final_status = job_response.get('status', 'unknown') final_progress = job_response.get('progress', 0) - logger.info(f"๐Ÿ“ค RESPONSE: {job_id[:8]}... returning status={final_status}, progress={final_progress}") - + logger.debug(f"RESPONSE: {job_id[:8]}... returning status={final_status}, progress={final_progress}") + return job_response -@router.get("/results/{job_id}") +@router.get("/results/{job_id}") async def get_processing_results(job_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): """Get the results of a completed processing job""" try: @@ -598,7 +597,7 @@ async def get_processing_results(job_id: str, current_user: User = Depends(get_c raise HTTPException(status_code=404, detail="Job not found") # Restore to memory - logger.info(f"๐Ÿ”„ Restoring job from database for results: {job_id[:8]}...") + logger.info(f"Restoring job from database for results: {job_id[:8]}...") job = ProcessingJob( job_id=db_job.job_id, photo_ids=[], @@ -615,39 +614,39 @@ async def get_processing_results(job_id: str, current_user: User = Depends(get_c # Get all photos for this job from the database from app.models.processing import PhotoDB - - # Get the INTEGER primary key for this job_id + + # Get the INTEGER primary key for this job_id processing_job_record = db.query(ProcessingJobDB).filter( ProcessingJobDB.job_id == job_id, ProcessingJobDB.user_id == current_user.id ).first() - + if not processing_job_record: logger.warning(f"Processing job not found: {job_id}") return {"unknown": []} - + processing_job_pk = processing_job_record.id - + photos = db.query(PhotoDB).filter( PhotoDB.processing_job_id == processing_job_pk, PhotoDB.user_id == current_user.id ).all() - + if not photos: logger.warning(f"No photos found for job {job_id}, user {current_user.id}") return {"unknown": []} - + # Group photos by bib number grouped_photos = {} - + for photo in photos: # Get effective bib number (manual label takes precedence) bib_number = photo.manual_label or photo.detected_number or 'unknown' - + # Initialize group if not exists if bib_number not in grouped_photos: grouped_photos[bib_number] = [] - + # Frontend will generate image URL using getImageUrl() method with JWT token photo_data = { "id": photo.photo_id, # Frontend uses this with getImageUrl() for secure access @@ -661,21 +660,21 @@ async def get_processing_results(job_id: str, current_user: User = Depends(get_c "created_at": photo.created_at.isoformat() if photo.created_at else None, "processed_at": photo.processed_at.isoformat() if photo.processed_at else None } - + # Add bounding box if available if photo.bbox_x is not None and photo.bbox_y is not None: photo_data["bbox"] = { "x": photo.bbox_x, - "y": photo.bbox_y, + "y": photo.bbox_y, "width": photo.bbox_width, "height": photo.bbox_height } - + grouped_photos[bib_number].append(photo_data) - + # Sort groups: numbered bibs first (sorted numerically), then unknown sorted_grouped = {} - + # Add numbered bibs first, sorted numerically numbered_bibs = [] for bib in grouped_photos.keys(): @@ -686,27 +685,27 @@ async def get_processing_results(job_id: str, current_user: User = Depends(get_c except ValueError: # Non-numeric bib, add as string numbered_bibs.append((float('inf'), bib)) - + # Sort by numeric value, then by string numbered_bibs.sort(key=lambda x: (x[0], x[1])) - + # Add sorted numbered groups for _, bib in numbered_bibs: sorted_grouped[bib] = grouped_photos[bib] - + # Add unknown group last if 'unknown' in grouped_photos: sorted_grouped['unknown'] = grouped_photos['unknown'] - - logger.info(f"โœ… Retrieved {len(photos)} photos in {len(sorted_grouped)} groups for job {job_id}") - + + logger.info(f"Retrieved {len(photos)} photos in {len(sorted_grouped)} groups for job {job_id}") + return sorted_grouped - + except HTTPException: # Re-raise HTTP exceptions raise except Exception as e: - logger.error(f"โŒ Failed to get results for job {job_id}: {e}") + logger.error(f"Failed to get results for job {job_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to retrieve results: {str(e)}") @@ -715,7 +714,7 @@ async def process_photos_async_fallback(job_id: str, photo_ids: List[str], user_ Fallback async processing when Cloud Tasks is not available. Uses concurrent batch processing for speed. """ - logger.info(f"๐Ÿ”„ Starting fallback async processing for {len(photo_ids)} photos") + logger.info(f"Starting fallback async processing for {len(photo_ids)} photos") try: from database import SessionLocal @@ -760,19 +759,22 @@ async def process_photos_async_fallback(job_id: str, photo_ids: List[str], user_ ) db_session.commit() - logger.info(f"๐ŸŽ‰ Fallback processing completed: {completed_count}/{len(photo_ids)} photos detected in {processing_time:.1f}s" if processing_time else f"๐ŸŽ‰ Fallback processing completed: {completed_count}/{len(photo_ids)} photos detected") + if processing_time: + logger.info(f"Fallback processing completed: {completed_count}/{len(photo_ids)} photos detected in {processing_time:.1f}s") + else: + logger.info(f"Fallback processing completed: {completed_count}/{len(photo_ids)} photos detected") finally: db_session.close() except Exception as e: - logger.error(f"๐Ÿ”ฅ Fallback processing failed: {e}") + logger.error(f"Fallback processing failed: {e}") # Mark job as failed job_data = jobs.get(job_id) if job_data: job_data["job"].status = ProcessingStatus.FAILED - + def sync_jobs_from_database(): """Load active jobs from DB into memory on startup.""" db = SessionLocal() @@ -790,11 +792,6 @@ def sync_jobs_from_database(): progress=db_job.progress ) jobs[db_job.job_id] = {"job": job, "user_id": db_job.user_id} - logger.info(f"๐Ÿ”„ Synced {len(active_jobs)} active jobs from database") + logger.info(f"Synced {len(active_jobs)} active jobs from database") finally: db.close() - -def cleanup_old_jobs(): - """Optional: Clear very old jobs from memory.""" - # (Implementation can be simple or empty for now) - pass \ No newline at end of file diff --git a/backend/app/api/upload.py b/backend/app/api/upload.py index bc4bffe..c421464 100644 --- a/backend/app/api/upload.py +++ b/backend/app/api/upload.py @@ -1,16 +1,16 @@ import logging import os import io -from datetime import timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse from sqlalchemy.orm import Session -from google.cloud import storage from app.api.auth import get_current_user from app.core.config import settings +from app.core.gcs import get_gcs_bucket, generate_signed_url +from app.core.security_config import ALLOWED_EXTENSIONS from app.models.schemas import PhotoInfo, ProcessingStatus from app.models.user import User from database import get_db @@ -18,19 +18,6 @@ router = APIRouter() logger = logging.getLogger(__name__) -# CONFIGURATION -BUCKET_NAME = settings.bucket_name -ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tiff", ".bmp"} - -# Initialize GCS Client -try: - if BUCKET_NAME: - storage_client = storage.Client() - bucket = storage_client.bucket(BUCKET_NAME) - else: - bucket = None -except Exception as e: - bucket = None # --- HELPERS --- @@ -41,27 +28,28 @@ def find_photo_by_id(db: Session, user_id: int, photo_id: str): def find_photo_in_storage(user_id: int, photo_db): if not photo_db or not photo_db.file_extension: return None, None, None, None - + filename = f"{photo_db.photo_id}{photo_db.file_extension}" blob_path = f"{user_id}/{filename}" - + + bucket = get_gcs_bucket() if bucket: blob = bucket.blob(blob_path) if blob.exists(): return 'gcs', blob_path, filename, blob - + local_path = os.path.join(settings.upload_dir, str(user_id), filename) if os.path.exists(local_path): return 'local', local_path, filename, None - + return None, None, None, None # --- ROUTES --- @router.get("/serve/{photo_id}/view") async def serve_photo_with_token( - photo_id: str, - token: str, + photo_id: str, + token: str, db: Session = Depends(get_db) ): """ @@ -79,37 +67,23 @@ async def serve_photo_with_token( photo_db = find_photo_by_id(db, user.id, photo_id) if not photo_db: raise HTTPException(status_code=404, detail="Photo record not found") - + # 3. Find in storage (GCS or Local) storage_type, path, filename, blob = find_photo_in_storage(user.id, photo_db) - + if not filename: raise HTTPException(status_code=404, detail="Photo file not found in storage") - + # 4. Handle GCS (Signed URL) if storage_type == 'gcs' and blob: - try: - from google.auth import default - from google.auth.transport import requests as google_requests - - credentials, _ = default() - auth_request = google_requests.Request() - credentials.refresh(auth_request) - - signed_url = blob.generate_signed_url( - version="v4", - expiration=timedelta(minutes=15), - method="GET", - service_account_email=getattr(credentials, 'service_account_email', None), - access_token=getattr(credentials, 'token', None), - ) + signed_url = generate_signed_url(blob, method="GET", expires_minutes=15) + if signed_url: return RedirectResponse(url=signed_url) - except Exception as e: - # Fallback to streaming if signing fails - return StreamingResponse(io.BytesIO(blob.download_as_bytes()), media_type="image/jpeg") - + # Fallback to streaming if signing fails + return StreamingResponse(io.BytesIO(blob.download_as_bytes()), media_type="image/jpeg") + # 5. Handle Local fallback elif storage_type == 'local': return FileResponse(path, media_type="image/jpeg") - - raise HTTPException(status_code=404) \ No newline at end of file + + raise HTTPException(status_code=404) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index e094876..9cde73a 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -266,9 +266,9 @@ def print_startup_info(self): logger.info(f"Rate Limit: {self.rate_limit_per_minute} requests/minute") if self.gemini_api_key: - logger.info("Gemini Flash API: Configured โœ…") + logger.info("Gemini Flash API: Configured") else: - logger.error("Gemini Flash API: Not configured โŒ") + logger.error("Gemini Flash API: Not configured") if self.get_smtp_config(): logger.info("Email: Configured") diff --git a/backend/app/core/gcs.py b/backend/app/core/gcs.py new file mode 100644 index 0000000..cf6d9be --- /dev/null +++ b/backend/app/core/gcs.py @@ -0,0 +1,67 @@ +""" +Shared Google Cloud Storage utilities. +Singleton pattern to avoid 150-200ms overhead per client initialization. +""" + +import logging +from datetime import timedelta +from functools import lru_cache +from typing import Optional + +from google.cloud import storage + +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +@lru_cache(maxsize=1) +def get_gcs_client() -> storage.Client: + """Singleton GCS client to avoid 150-200ms overhead per initialization.""" + client = storage.Client() + logger.info("GCS client initialized") + return client + + +@lru_cache(maxsize=1) +def get_gcs_bucket() -> Optional[storage.Bucket]: + """Get configured GCS bucket, or None if not configured.""" + if not settings.bucket_name: + return None + return get_gcs_client().bucket(settings.bucket_name) + + +def generate_signed_url( + blob: storage.Blob, + method: str = "GET", + expires_minutes: int = 15, + content_type: Optional[str] = None, + download_filename: Optional[str] = None +) -> Optional[str]: + """Generate a signed URL for GCS blob access.""" + try: + from google.auth import default + from google.auth.transport import requests as google_requests + + credentials, _ = default() + auth_request = google_requests.Request() + credentials.refresh(auth_request) + + kwargs = { + "version": "v4", + "expiration": timedelta(minutes=expires_minutes), + "method": method, + "service_account_email": getattr(credentials, 'service_account_email', None), + "access_token": getattr(credentials, 'token', None), + } + if content_type: + kwargs["content_type"] = content_type + + # Force browser to download instead of displaying inline + if download_filename: + kwargs["response_disposition"] = f'attachment; filename="{download_filename}"' + + return blob.generate_signed_url(**kwargs) + except Exception as e: + logger.error(f"Failed to generate signed URL: {e}") + return None diff --git a/backend/app/models/processing.py b/backend/app/models/processing.py index 79212e8..b0ccb78 100644 --- a/backend/app/models/processing.py +++ b/backend/app/models/processing.py @@ -2,7 +2,7 @@ Database models for photo processing, jobs, and exports. """ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum from typing import Any, Dict, List, Optional @@ -168,15 +168,20 @@ def is_expired(self) -> bool: """Check if the export has expired.""" if not self.expires_at: return False - return datetime.utcnow() > self.expires_at + now = datetime.now(timezone.utc) + # Handle both timezone-aware and naive datetimes + expires = self.expires_at + if expires.tzinfo is None: + expires = expires.replace(tzinfo=timezone.utc) + return now > expires def set_expiration(self, days: int = 7): """Set export expiration time.""" - self.expires_at = datetime.utcnow() + timedelta(days=days) + self.expires_at = datetime.now(timezone.utc) + timedelta(days=days) def mark_downloaded(self): """Mark export as downloaded.""" - self.last_downloaded_at = datetime.utcnow() + self.last_downloaded_at = datetime.now(timezone.utc) self.download_count += 1 diff --git a/backend/app/models/usage.py b/backend/app/models/usage.py index 60647af..ac4243c 100644 --- a/backend/app/models/usage.py +++ b/backend/app/models/usage.py @@ -1,5 +1,5 @@ import enum -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Optional from sqlalchemy import ( @@ -140,7 +140,7 @@ def start_processing(self) -> None: Mark the job as started. """ self.status = "processing" - self.started_at = datetime.utcnow() + self.started_at = datetime.now(timezone.utc) def complete_processing( self, success: bool = True, error_message: Optional[str] = None @@ -149,7 +149,7 @@ def complete_processing( Mark the job as completed or failed. """ self.status = "completed" if success else "failed" - self.completed_at = datetime.utcnow() + self.completed_at = datetime.now(timezone.utc) if error_message: self.error_message = error_message @@ -193,11 +193,15 @@ def is_expired(self) -> bool: """Check if the job has expired.""" if not self.expires_at: return False - return datetime.utcnow() > self.expires_at + now = datetime.now(timezone.utc) + expires = self.expires_at + if expires.tzinfo is None: + expires = expires.replace(tzinfo=timezone.utc) + return now > expires def set_expiration(self, days: int = 30): """Set job expiration time in days.""" - self.expires_at = datetime.utcnow() + timedelta(days=days) + self.expires_at = datetime.now(timezone.utc) + timedelta(days=days) def to_schema(self): """Convert to ProcessingJob schema for API responses.""" diff --git a/backend/app/services/detector.py b/backend/app/services/detector.py index 6d55a23..caba7f8 100644 --- a/backend/app/services/detector.py +++ b/backend/app/services/detector.py @@ -1,11 +1,12 @@ import asyncio import io +import json import logging import os +import random import re import sys import time -import json from typing import Dict, List, Optional, Tuple from PIL import Image @@ -18,22 +19,11 @@ PhotoInfo, ProcessingStatus, ) +from app.core.gcs import get_gcs_client # Configure logger for this module logger = logging.getLogger(__name__) -# Singleton GCS client for connection reuse (avoids 150-200ms overhead per download) -_gcs_client = None - -def get_gcs_client(): - """Lazy-initialize and return singleton GCS client.""" - global _gcs_client - if _gcs_client is None: - from google.cloud import storage - _gcs_client = storage.Client() - logger.info("โœ… GCS client singleton initialized") - return _gcs_client - class NumberDetector: def __init__(self): @@ -49,19 +39,19 @@ def _initialize_gemini_client(self): try: # Get Gemini API key from settings configuration from app.core.config import settings - + api_key = settings.gemini_api_key if api_key: self.gemini_client = genai.Client(api_key=api_key) self.use_gemini = True - logger.info("โœ… Gemini 2.0 Flash API initialized successfully") + logger.info("Gemini 2.0 Flash API initialized successfully") else: # No API key available self.gemini_client = None self.use_gemini = False - logger.error("โŒ No Gemini API key available - classification will fail") + logger.error("No Gemini API key available - classification will fail") except Exception as e: - logger.error(f"โŒ Gemini API initialization failed: {e}") + logger.error(f"Gemini API initialization failed: {e}") self.gemini_client = None self.use_gemini = False @@ -84,14 +74,14 @@ async def process_photo_batch( return {photo_id: DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) for photo_id in photo_ids} - logger.info(f"๐Ÿš€ CONCURRENT PROCESSING: Starting {len(photo_ids)} photos") + logger.info(f"CONCURRENT PROCESSING: Starting {len(photo_ids)} photos") # PREFETCH: Download all images in parallel before Gemini calls prefetch_start = time.time() image_cache = await self._prefetch_all_images(photo_ids, user_id, debug_mode) prefetch_time = time.time() - prefetch_start cached_count = len([v for v in image_cache.values() if v[0] is not None]) - logger.info(f"๐Ÿ“ฅ PREFETCH COMPLETE: {cached_count}/{len(photo_ids)} images cached in {prefetch_time:.2f}s") + logger.info(f"PREFETCH COMPLETE: {cached_count}/{len(photo_ids)} images cached in {prefetch_time:.2f}s") # Concurrency limit to respect Gemini rate limits # Increased from 20 to 50 to better utilize available quota (2,000 RPM) @@ -113,7 +103,7 @@ async def process_with_semaphore(photo_id: str, index: int) -> Tuple[str, Detect results = {} for item in results_list: if isinstance(item, Exception): - logger.error(f"โŒ Task exception: {item}") + logger.error(f"Task exception: {item}") continue photo_id, detection_result = item results[photo_id] = detection_result @@ -126,7 +116,7 @@ async def process_with_semaphore(photo_id: str, index: int) -> Tuple[str, Detect success_rate = (successful_count / len(photo_ids)) * 100 if photo_ids else 0 avg_time = total_time / len(photo_ids) if photo_ids else 0 - logger.info(f"๐ŸŽฏ CONCURRENT COMPLETE: {successful_count}/{len(photo_ids)} detected ({success_rate:.1f}% success) in {total_time:.2f}s ({avg_time:.2f}s/photo effective)") + logger.info(f"CONCURRENT COMPLETE: {successful_count}/{len(photo_ids)} detected ({success_rate:.1f}% success) in {total_time:.2f}s ({avg_time:.2f}s/photo effective)") return results @@ -134,13 +124,13 @@ async def _prefetch_all_images( self, photo_ids: List[str], user_id: Optional[int], debug_mode: bool ) -> Dict[str, Tuple[Optional[bytes], Tuple[int, int]]]: """Download all images from GCS in parallel before processing.""" - logger.info(f"๐Ÿ“ฅ PREFETCH: Downloading {len(photo_ids)} images in parallel...") + logger.info(f"PREFETCH: Downloading {len(photo_ids)} images in parallel...") async def fetch_one(photo_id: str) -> Tuple[str, Tuple[Optional[bytes], Tuple[int, int]]]: try: photo_path = await asyncio.to_thread(self._find_photo_path, photo_id, user_id) if not photo_path: - logger.warning(f"โŒ [{photo_id[:8]}] Photo not found during prefetch") + logger.warning(f"[{photo_id[:8]}] Photo not found during prefetch") return photo_id, (None, (1, 1)) image_data, img_shape = await asyncio.to_thread( @@ -148,7 +138,7 @@ async def fetch_one(photo_id: str) -> Tuple[str, Tuple[Optional[bytes], Tuple[in ) return photo_id, (image_data, img_shape) except Exception as e: - logger.error(f"โŒ [{photo_id[:8]}] Prefetch error: {e}") + logger.error(f"[{photo_id[:8]}] Prefetch error: {e}") return photo_id, (None, (1, 1)) tasks = [fetch_one(pid) for pid in photo_ids] @@ -163,18 +153,18 @@ async def _process_single_photo_cached( photo_start_time = time.time() try: - logger.info(f"๐Ÿ“ธ [{index+1}/{total}] Processing {photo_id[:8]}... (cached)") + logger.debug(f"[{index+1}/{total}] Processing {photo_id[:8]}... (cached)") - # โฑ๏ธ TIMING: Cache retrieval + # TIMING: Cache retrieval cache_start = time.time() image_data, img_shape = image_cache.get(photo_id, (None, (1, 1))) cache_time = (time.time() - cache_start) * 1000 if not image_data: - logger.warning(f"โŒ [{photo_id[:8]}] No cached image data") + logger.warning(f"[{photo_id[:8]}] No cached image data") return DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - logger.info(f"โฑ๏ธ [{photo_id[:8]}] Cache retrieval: {cache_time:.1f}ms, Image size: {len(image_data)/1024:.0f}KB") + logger.debug(f"[{photo_id[:8]}] Cache retrieval: {cache_time:.1f}ms, Image size: {len(image_data)/1024:.0f}KB") # REFINED PROMPT: Focus on Digit Integrity over Count single_prompt = """Act as an elite sports photography OCR specialist. @@ -198,8 +188,7 @@ async def _process_single_photo_cached( single_prompt ] - # โฑ๏ธ TIMING: Gemini API call with exponential backoff retry - import random + # TIMING: Gemini API call with exponential backoff retry api_start = time.time() response = None max_retries = 3 @@ -218,9 +207,9 @@ async def _process_single_photo_cached( ) if response and response.text: break # Success - logger.warning(f"โš ๏ธ [{photo_id[:8]}] Empty response, attempt {attempt+1}/{max_retries}") + logger.warning(f"[{photo_id[:8]}] Empty response, attempt {attempt+1}/{max_retries}") except asyncio.TimeoutError: - logger.warning(f"โš ๏ธ [{photo_id[:8]}] Timeout after 30s, attempt {attempt+1}/{max_retries}") + logger.warning(f"[{photo_id[:8]}] Timeout after 30s, attempt {attempt+1}/{max_retries}") if attempt < max_retries - 1: await asyncio.sleep(base_delay * (2 ** attempt)) continue @@ -229,31 +218,31 @@ async def _process_single_photo_cached( if "429" in error_str or "rate" in error_str or "quota" in error_str or "resource_exhausted" in error_str: # Exponential backoff with jitter: 0.5-1s, 1-1.5s, 2-2.5s delay = base_delay * (2 ** attempt) + random.uniform(0, 0.5) - logger.warning(f"โš ๏ธ [{photo_id[:8]}] Rate limited, retry {attempt+1}/{max_retries} after {delay:.1f}s") + logger.warning(f"[{photo_id[:8]}] Rate limited, retry {attempt+1}/{max_retries} after {delay:.1f}s") await asyncio.sleep(delay) else: - logger.error(f"โŒ [{photo_id[:8]}] Gemini error: {e}") + logger.error(f"[{photo_id[:8]}] Gemini error: {e}") raise api_time = (time.time() - api_start) * 1000 - logger.info(f"โฑ๏ธ [{photo_id[:8]}] Gemini API call: {api_time:.0f}ms") + logger.debug(f"[{photo_id[:8]}] Gemini API call: {api_time:.0f}ms") if not response or not response.text: - logger.error(f"โŒ [{photo_id[:8]}] Empty Gemini response after {max_retries} attempts") + logger.error(f"[{photo_id[:8]}] Empty Gemini response after {max_retries} attempts") return DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - # โฑ๏ธ TIMING: Parse response + # TIMING: Parse response parse_start = time.time() result = self._parse_gemini_response(response.text, photo_id, img_shape, photo_start_time) parse_time = (time.time() - parse_start) * 1000 total_time = (time.time() - photo_start_time) * 1000 - logger.info(f"โฑ๏ธ [{photo_id[:8]}] TOTAL: {total_time:.0f}ms (API: {api_time:.0f}ms, Parse: {parse_time:.1f}ms)") + logger.debug(f"[{photo_id[:8]}] TOTAL: {total_time:.0f}ms (API: {api_time:.0f}ms, Parse: {parse_time:.1f}ms)") return result except Exception as e: - logger.error(f"โŒ PROCESSING ERROR [{photo_id[:8]}]: {e}") + logger.error(f"PROCESSING ERROR [{photo_id[:8]}]: {e}") return DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) def _parse_gemini_response( @@ -272,11 +261,11 @@ def _parse_gemini_response( # Handle both array and object responses from Gemini if isinstance(res_json, list): if len(res_json) == 0: - logger.error(f"โŒ EMPTY ARRAY [{photo_id[:8]}]: Gemini returned empty array") + logger.error(f"EMPTY ARRAY [{photo_id[:8]}]: Gemini returned empty array") return DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) res_json = res_json[0] elif not isinstance(res_json, dict): - logger.error(f"โŒ INVALID FORMAT [{photo_id[:8]}]: Expected dict or list, got {type(res_json)}") + logger.error(f"INVALID FORMAT [{photo_id[:8]}]: Expected dict or list, got {type(res_json)}") return DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) # Extract and validate fields @@ -286,10 +275,10 @@ def _parse_gemini_response( # Validate detected bib number if not detected_bib or detected_bib.upper() in ["NONE", "NULL", "UNKNOWN", ""]: - logger.info(f"โŒ EMPTY [{photo_id[:8]}]: No number detected - {reasoning}") + logger.debug(f"EMPTY [{photo_id[:8]}]: No number detected - {reasoning}") return DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) elif not self._is_valid_bib_number(detected_bib): - logger.info(f"โŒ INVALID [{photo_id[:8]}]: '{detected_bib}' failed validation") + logger.debug(f"INVALID [{photo_id[:8]}]: '{detected_bib}' failed validation") return DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) # Convert text confidence to numeric @@ -303,7 +292,7 @@ def _parse_gemini_response( ] photo_time = time.time() - start_time - logger.info(f"โœ… SUCCESS [{photo_id[:8]}] ({confidence_text}): '{detected_bib}' in {photo_time:.2f}s") + logger.info(f"SUCCESS [{photo_id[:8]}] ({confidence_text}): '{detected_bib}' in {photo_time:.2f}s") return DetectionResult( bib_number=detected_bib, @@ -312,10 +301,10 @@ def _parse_gemini_response( ) except json.JSONDecodeError as e: - logger.error(f"โŒ JSON ERROR [{photo_id[:8]}]: {e}") + logger.error(f"JSON ERROR [{photo_id[:8]}]: {e}") return DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) except KeyError as e: - logger.error(f"โŒ MISSING FIELD [{photo_id[:8]}]: {e}") + logger.error(f"MISSING FIELD [{photo_id[:8]}]: {e}") return DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) @@ -330,30 +319,30 @@ def _resize_image(self, image_bytes: bytes, max_size: int = 1024) -> bytes: # Convert to RGB to handle PNGs/CMYK if img.mode != 'RGB': img = img.convert('RGB') - + original_size = img.size - - # Crop bottom 5% to remove watermarks while preserving handlebar plates + + # Crop bottom 5% to remove watermarks while preserving handlebar plates width, height = img.size crop_height = int(height * 0.95) # Keep top 95% img = img.crop((0, 0, width, crop_height)) - + # Only resize if larger than max_size if max(img.size) > max_size: img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) - + if logger.isEnabledFor(logging.DEBUG): new_size = img.size reduction = ((original_size[0] * original_size[1]) - (new_size[0] * new_size[1])) / (original_size[0] * original_size[1]) * 100 - logger.debug(f"๐ŸŽ๏ธ OCR Resize: {original_size} โ†’ {new_size} ({reduction:.0f}% smaller, watermark cropped)") - + logger.debug(f"OCR Resize: {original_size} -> {new_size} ({reduction:.0f}% smaller, watermark cropped)") + # OCR-optimized compression settings buffer = io.BytesIO() img.save(buffer, format="JPEG", quality=85, optimize=True) return buffer.getvalue() - + except Exception as e: - logger.warning(f"โš ๏ธ PIL resize failed: {e}, using original") + logger.warning(f"PIL resize failed: {e}, using original") return image_bytes # Fallback to original def _optimize_image_for_gemini( @@ -373,7 +362,7 @@ def _optimize_image_for_gemini( resized_data = self._resize_image(original_data, max_size=1024) if debug_mode: - logger.info(f"๐Ÿ“ท IMAGE: {original_width}x{original_height} ({len(original_data)/1024:.0f}KB) โ†’ resized ({len(resized_data)/1024:.0f}KB)") + logger.debug(f"IMAGE: {original_width}x{original_height} ({len(original_data)/1024:.0f}KB) -> resized ({len(resized_data)/1024:.0f}KB)") return resized_data, (original_height, original_width) @@ -424,7 +413,7 @@ def _find_photo_path( if settings.bucket_name: storage_client = get_gcs_client() # Singleton - avoids 150ms overhead per call bucket = storage_client.bucket(settings.bucket_name) - + for ext in extensions: filename = f"{photo_id}{ext}" blob_path = f"{user_id}/{filename}" @@ -435,7 +424,7 @@ def _find_photo_path( os.makedirs(user_upload_dir, exist_ok=True) local_path = os.path.join(user_upload_dir, filename) blob.download_to_filename(local_path) - logger.info(f"Downloaded {photo_id} from GCS to local storage") + logger.debug(f"Downloaded {photo_id} from GCS to local storage") return local_path except Exception: continue # Try next extension @@ -506,5 +495,5 @@ def update_manual_label(self, photo_id: str, bib_number: str) -> bool: return True except Exception as e: - logger.error(f"โŒ Failed to manually label photo {photo_id}: {e}") + logger.error(f"Failed to manually label photo {photo_id}: {e}") return False diff --git a/backend/entrypoint.sh b/backend/entrypoint.sh index d192806..64ab679 100644 --- a/backend/entrypoint.sh +++ b/backend/entrypoint.sh @@ -1,21 +1,21 @@ #!/bin/bash set -e # Exit on any error -echo "๐Ÿš€ Starting TagSort API deployment..." +echo "Starting TagSort API deployment..." # 1. Run database migrations -echo "๐Ÿ“Š Running database migrations..." +echo "Running database migrations..." cd /app/backend alembic upgrade head if [ $? -eq 0 ]; then - echo "โœ… Database migrations completed successfully" + echo "Database migrations completed successfully" else - echo "โŒ Database migrations failed" + echo "Database migrations failed" exit 1 fi -# 2. Start the FastAPI application -echo "๐ŸŒ Starting FastAPI application..." +# 2. Start the FastAPI application +echo "Starting FastAPI application..." cd /app exec uvicorn backend.main:app --host 0.0.0.0 --port ${PORT:-8080} \ No newline at end of file diff --git a/backend/main.py b/backend/main.py index 6412595..516a65c 100644 --- a/backend/main.py +++ b/backend/main.py @@ -37,9 +37,9 @@ # Security check for JWT if settings.is_production(): - logger.info("โœ… Running in PRODUCTION mode with secure JWT configuration") + logger.info("Running in PRODUCTION mode with secure JWT configuration") else: - logger.warning("โš ๏ธ Running in DEVELOPMENT mode") + logger.warning("Running in DEVELOPMENT mode") app = FastAPI( title="TagSort API", @@ -108,31 +108,29 @@ async def startup_event(): """Initialize database tables and configuration on startup.""" # Enable SQL query logging for debugging logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) - logger.info("๐Ÿ” SQL query logging enabled for analytics debugging") - + logger.info("SQL query logging enabled for analytics debugging") + # Print configuration info settings.print_startup_info() - logger.info("๐Ÿ”„ Database connection ready...") - + logger.info("Database connection ready...") + db_info = get_db_info() - logger.info(f"โœ… Database connected: {db_info['database_path']}") - logger.info(f"๐Ÿ“Š Database size: {db_info['database_size_mb']} MB") + logger.info(f"Database connected: {db_info['database_path']}") + logger.info(f"Database size: {db_info['database_size_mb']} MB") # Test AuthService singleton and JWT functionality from app.services.auth_service import auth_service - logger.info("๐Ÿงช Testing AuthService functionality...") + logger.info("Testing AuthService functionality...") - # Test token creation and verification + # Test token creation and verification (without logging the token) test_token = auth_service.create_access_token(999) # Test user ID - logger.debug(f"๐Ÿงช Test token created: {test_token[:50]}...") - test_result = auth_service.verify_token(test_token) if test_result: - logger.info("โœ… AuthService test PASSED - tokens work correctly") + logger.info("AuthService test PASSED - tokens work correctly") else: - logger.error("โŒ AuthService test FAILED - JWT not working properly") + logger.error("AuthService test FAILED - JWT not working properly") # Clean up expired sessions on startup from app.services.job_service import job_service @@ -144,42 +142,41 @@ async def startup_event(): try: from database import Base, engine Base.metadata.create_all(bind=engine) - logger.info("โœ… Database tables verified/created") + logger.info("Database tables verified/created") except Exception as e: - logger.warning(f"โš ๏ธ Table creation verification failed: {e}") + logger.warning(f"Table creation verification failed: {e}") # Clean up expired sessions try: cleaned_sessions = auth_service.cleanup_expired_sessions(db) if cleaned_sessions > 0: - logger.info(f"๐Ÿงน Cleaned up {cleaned_sessions} expired sessions") + logger.info(f"Cleaned up {cleaned_sessions} expired sessions") except Exception as e: - logger.warning(f"โš ๏ธ Session cleanup failed: {e}") + logger.warning(f"Session cleanup failed: {e}") # Recover stalled processing jobs (gracefully handle missing tables) try: recovered_jobs = job_service.recover_jobs_on_startup(db) if recovered_jobs > 0: - logger.info(f"๐Ÿ”„ Recovered {recovered_jobs} processing jobs") + logger.info(f"Recovered {recovered_jobs} processing jobs") except Exception as e: - logger.warning(f"โš ๏ธ Job recovery skipped (tables may not exist yet): {e}") + logger.warning(f"Job recovery skipped (tables may not exist yet): {e}") # Load active processing jobs into memory (gracefully handle missing tables) try: - from app.api.process_tasks import cleanup_old_jobs, sync_jobs_from_database + from app.api.process_tasks import sync_jobs_from_database sync_jobs_from_database() - cleanup_old_jobs() - logger.info("๐Ÿ”„ Synced active jobs from database") + logger.info("Synced active jobs from database") except Exception as e: - logger.warning(f"โš ๏ธ Job sync skipped (tables may not exist yet): {e}") + logger.warning(f"Job sync skipped (tables may not exist yet): {e}") # Clean up expired jobs and exports (gracefully handle missing tables) try: cleaned_jobs = job_service.cleanup_expired_jobs(db) if cleaned_jobs > 0: - logger.info(f"๐Ÿงน Cleaned up {cleaned_jobs} expired jobs") + logger.info(f"Cleaned up {cleaned_jobs} expired jobs") except Exception as e: - logger.warning(f"โš ๏ธ Job cleanup skipped (tables may not exist yet): {e}") + logger.warning(f"Job cleanup skipped (tables may not exist yet): {e}") finally: db.close() @@ -195,7 +192,7 @@ async def startup_event(): app.add_middleware( CORSMiddleware, - allow_origins=allowed_origins, + allow_origins=allowed_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], @@ -204,7 +201,7 @@ async def startup_event(): # Create required directories with proper permissions settings.create_directories() logger.info( - f"โœ… Created directories: {settings.upload_dir}, {settings.export_dir}, {settings.temp_dir}" + f"Created directories: {settings.upload_dir}, {settings.export_dir}, {settings.temp_dir}" ) @@ -283,7 +280,7 @@ async def database_status(): ) app.mount("/", StaticFiles(directory=frontend_path, html=True), name="frontend") -logger.info("โœ… API routers registered:") +logger.info("API routers registered:") logger.info(" - /api/auth") logger.info(" - /api/users") logger.info(" - /api/upload") @@ -299,6 +296,6 @@ async def database_status(): import os # Get the PORT from Cloud Run (defaults to 8080) port = int(os.environ.get("PORT", 8080)) - - # โœ… CORRECT FastAPI startup + + # CORRECT FastAPI startup uvicorn.run("main:app", host="0.0.0.0", port=port, reload=False) diff --git a/frontend/index.html b/frontend/index.html index f3049d7..7d65efc 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -36,7 +36,7 @@
TagSort - AI-powered race photo sorting + AI-powered photo sorting