diff --git a/.firebase/hosting.ZnJvbnRlbmQ.cache b/.firebase/hosting.ZnJvbnRlbmQ.cache index ac5e5ea..203dd00 100644 --- a/.firebase/hosting.ZnJvbnRlbmQ.cache +++ b/.firebase/hosting.ZnJvbnRlbmQ.cache @@ -1,8 +1,4 @@ 404.html,1766671073811,05cbc6f94d7a69ce2e29646eab13be2c884e61ba93e3094df5028866876d18b3 -static/js/state-manager.js,1767964327134,a195528b8a7bdf2ff8a0d98c435ce108dcd652fbdfe77a7c831093794eec4bcc -static/js/script.js,1767967530218,608ac254bed45ae0c5d9abf80d181304b039d47401a00406d3c024f0c83093c2 -static/js/photo-processor.js,1767967514620,685d7326925eff60de289197aa5b2cbf3370bce891c52bfb59eda973fbf63608 -static/js/config.js,1767715596855,78a50979181fc33d8cd9d3632de0ff48baa9472ca57ddd99e309b45e3d9561c7 static/js/batch-operations.js,1765904443892,150c9a4eaf48f392a41a00ac69d3c7e36730146565c4bed007998132f5294170 static/js/analytics-dashboard.js,1767673385114,8bd489575178657db06e2aca294a91d93758b3d8b04a14b785341faf36946de9 static/js/components/atoms/Button.js,1766671073815,857f21eae50e3bf582dd64a5d928e638dc8b7901b50956f0f7916220d47c22e8 @@ -10,14 +6,18 @@ static/images/miapatrikios.jpeg,1755189202858,33154529bc40f43447874a8dbadab1146a static/images/jonahfreedman.jpeg,1755189265752,379959114a796a7c9fc80be8ed4a87f1901b28a02999814fb8c5ba409e64481a static/images/bridgerjones.jpeg,1759738952787,a38bf4ddb7222db0307970b32d82d14580a08d2775e9f2201f629469a67dbd79 static/css/photo-viewer.css,1767673385113,eb3a5ee1584eced2b5166212162ee397015d761cda66f81d40df25cafb055e7a -static/css/global.css,1767739927505,aa786d46f4baee33fbdaad9d78389df2f9d2b51e7796f01e9394df16b958eccc static/css/core.css,1767673385113,b7b687a0b9841fdb45b1573751f17447080915933142be87a9401d7707eabfb1 static/css/components.css,1767673385113,a82e7b72b7c7258fa679cb50b75ca5eeaf1c30899a0b29182d443913240dcd04 static/css/Button.css,1766671073813,b733032a5583ea1b702c4f30df1acc60c10f0d94f234caeb9aea20a07d2b8dbd payment/success.html,1766671073813,dac776c1f255f358c965466c50a8fb8f53de0ca78765e4c8984bbbe3bd7435b7 payment/cancelled.html,1766671073813,d10648f9ec2c82ac1bb952c653b5702aa6514d2ed6bdc70d89f1148b465c64b4 -components/tier-display.js,1767964029407,4fba2b1df659b78ac4f4b1c5a4e85709b40ded61d994c1b74f121e47abd2bee2 -components/pricing-cards-manager.js,1767965277775,b8778ebf1f923d74d2f8cae4db61fdd2c901684fce9c73d867de55c1fef7c20c -components/pricing-card.js,1767965256740,9577b7f8de4ca043e893113b92a0493661b4a220bde84383041ab8173908c4e2 -components/payment-form.js,1767717450840,d019422c6f764a1242e3461a784d8f548b6b1e6a0fffe4fd8ddc0fb6c8cb8746 -index.html,1767968365276,58e69fba74c8008426ffa74453481be25eb03330614fb47f630369c96e72dc8c +static/js/config.js,1767980752113,78a50979181fc33d8cd9d3632de0ff48baa9472ca57ddd99e309b45e3d9561c7 +static/js/state-manager.js,1767980752115,8bf00a733e6646b892fe978bced4abb894a4326848fd5a9a2e67605871cb96f1 +index.html,1767980752111,5945b061da97b93bf18fc01486c4265928683d6fc26daae91e2103a1c10f85e3 +components/tier-display.js,1767980752111,4844e8c8a28b8d077e4a3ae0630db3a51331f50bd5eca725640eacf255c6a16a +components/pricing-cards-manager.js,1767980752110,8d15f4aae255b00373fde5364f903a6cbea9b1328ba25f2abeed20f7597aebdb +components/pricing-card.js,1767980752109,eaceadf45c187d17f9c5de6ddc1ca16765380366846a3ac6582d98159e3e9772 +components/payment-form.js,1767980752107,cc80e141f6274a3c95a2c4b4d20950716f5a8ed0b87355e17d28554f012c8553 +static/js/script.js,1767980752114,15f8d275cef1da8486c679c71beee85ec1e8ec218c994a550309a41252d81cef +static/css/global.css,1767980752112,aa786d46f4baee33fbdaad9d78389df2f9d2b51e7796f01e9394df16b958eccc +static/js/photo-processor.js,1767980752114,685d7326925eff60de289197aa5b2cbf3370bce891c52bfb59eda973fbf63608 diff --git a/backend/app/api/process_tasks.py b/backend/app/api/process_tasks.py index ca03ef2..9522d3d 100644 --- a/backend/app/api/process_tasks.py +++ b/backend/app/api/process_tasks.py @@ -162,7 +162,7 @@ async def start_processing_with_tasks( asyncio.create_task(process_photos_async_fallback(job_id, photo_ids, current_user.id, debug)) return job - # Group photos into batches of 1 for testing Elite OCR system + # Group photos into batches of 1 for one-call-per-photo processing BATCH_SIZE = 1 photo_batches = [] for i in range(0, len(photo_ids), BATCH_SIZE): @@ -219,52 +219,12 @@ async def start_processing_with_tasks( return job - -# COMMENTED OUT: Force batch-only processing -# @router.post("/worker") -# async def process_single_photo_worker(request: Request): -# """ -# Cloud Tasks Worker Endpoint -# Each request handles exactly ONE photo for maximum reliability. -# """ -# # Create a fresh database session for this specific photo task -# db = SessionLocal() -# try: -# payload = await request.json() -# photo_id = payload.get("photo_id") -# job_id = payload.get("job_id") -# user_id = payload.get("user_id") -# debug_mode = payload.get("debug_mode", False) -# -# if not all([photo_id, job_id, user_id]): -# return {"status": "error", "message": "Missing required payload fields"} -# -# # 1. Run detection (Gemini) -# detection_result = await detector.process_photo( -# photo_id, debug_mode=debug_mode, user_id=user_id -# ) -# -# # 2. Save result to DB (updates status to 'completed' for this photo) -# await save_detection_to_database(photo_id, user_id, detection_result, job_id) -# -# # 3. Update the overall Job Progress (calculates % based on completed photos) -# await update_job_progress(job_id, db) -# -# return {"status": "success", "photo_id": photo_id} -# -# except Exception as e: -# logger.error(f"🔥 Worker failed for photo {photo_id if 'photo_id' in locals() else 'unknown'}: {e}") -# # Raising 500 tells Cloud Tasks to RETRY this specific photo automatically -# raise HTTPException(status_code=500, detail=str(e)) -# finally: -# db.close() - - @router.post("/batch-worker") async def process_batch_photos_worker(request: Request): """ - Optimized Cloud Tasks Batch Worker Endpoint - Each request handles 3 photos in a single Gemini API call for 2-3x better performance. + Cloud Tasks Worker Endpoint for Single Photo Processing + Each request handles 1 photo with individual Gemini API calls for maximum accuracy. + Concurrency controlled by GEMINI_CONCURRENCY environment variable. """ db = SessionLocal() try: @@ -286,6 +246,13 @@ async def process_batch_photos_worker(request: Request): photo_ids, debug_mode=debug_mode, user_id=user_id ) + # DEBUG: Log detailed results info + logger.info(f"🔍 DEBUG WORKER: batch_results type={type(batch_results)}, len={len(batch_results) if batch_results else 'None'}") + if batch_results: + logger.info(f"🔍 DEBUG WORKER: keys={list(batch_results.keys())[:3]}") + for photo_id, result in list(batch_results.items())[:2]: # Log first 2 results + logger.info(f"🔍 DEBUG WORKER: {photo_id[:8]} -> bib='{result.bib_number}', conf={result.confidence}") + logger.info(f"🔄 BATCH WORKER COMPLETE: {batch_index}/{total_batches} received {len(batch_results)} results") if not batch_results: @@ -468,7 +435,10 @@ async def update_job_progress(job_id: str, db: Session): progress = int((completed_photos / total_photos) * 100) # Update in-memory job - job_data = jobs.get(job_id) + job_data = ensure_job_loaded(job_id, db) + if not job_data: + logger.warning(f"❌ Job not found in memory or DB: {job_id[:8]}...") + return if job_data: old_progress = job_data["job"].progress old_status = job_data["job"].status @@ -500,187 +470,161 @@ async def update_job_progress(job_id: str, db: Session): progress=progress, ) logger.info(f"📈 PROGRESS UPDATE: {job_id[:8]}... {progress}% ({completed_photos}/{total_photos})") - else: - logger.warning(f"❌ Job not found in memory during progress update: {job_id[:8]}...") - db.commit() except Exception as e: logger.error(f"Error updating job progress for {job_id}: {e}") -# Keep existing endpoints for compatibility @router.get("/status/{job_id}") -async def get_processing_status(job_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): - """Get the current status of a processing job with real-time database check""" +async def get_processing_status( + job_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """Get the current status of a processing job with real-time database check.""" logger.info(f"🔍 STATUS REQUEST: {job_id[:8]}... from user {current_user.id}") - - job_data = jobs.get(job_id) + + # Load from memory or reconstruct from DB + job_data = ensure_job_loaded(job_id, db, current_user.id) if not job_data: - logger.warning(f"❌ Job not found in memory: {job_id[:8]}...") raise HTTPException(status_code=404, detail="Job not found") - - # SECURITY: Verify job belongs to current user + + # Security: verify ownership if job_data["user_id"] != current_user.id: logger.warning(f"❌ Access denied to job {job_id[:8]}... for user {current_user.id}") raise HTTPException(status_code=403, detail="Access denied to job") - - # Log current in-memory state + current_status = job_data["job"].status current_progress = job_data["job"].progress logger.info(f"📊 CURRENT STATE: {job_id[:8]}... status={current_status}, progress={current_progress}") - - # Real-time check: Update job status from database if still processing + + # If still processing, refresh from DB if current_status == ProcessingStatus.PROCESSING: logger.info(f"🔄 Job still processing, checking database for updates: {job_id[:8]}...") try: - # Force update job progress from database await update_job_progress(job_id, db) - - # Check if status changed after update new_status = job_data["job"].status new_progress = job_data["job"].progress if new_status != current_status or new_progress != current_progress: logger.info(f"📈 STATUS CHANGE: {job_id[:8]}... {current_status}→{new_status}, {current_progress}→{new_progress}") else: logger.info(f"📊 NO CHANGE: {job_id[:8]}... still {current_status} at {current_progress}%") - - # Timeout protection: If job has been processing for more than 10 minutes, mark as failed + + # Timeout protection (optional) from datetime import datetime, timedelta - if hasattr(job_data["job"], 'created_at'): + if hasattr(job_data["job"], "created_at"): time_elapsed = datetime.utcnow() - job_data["job"].created_at if time_elapsed > timedelta(minutes=10): logger.warning(f"⏰ TIMEOUT: {job_id[:8]}... after {time_elapsed.total_seconds():.1f}s") job_data["job"].status = ProcessingStatus.FAILED job_data["job"].progress = 0 - except Exception as e: logger.error(f"❌ Failed to update job progress for {job_id[:8]}...: {e}") else: logger.info(f"✅ Job already completed: {job_id[:8]}... status={current_status}") - - # Ensure proper JSON serialization by converting to dict if it's a Pydantic model + + # Serialize response job_response = job_data["job"] - if hasattr(job_response, 'dict'): + if hasattr(job_response, "dict"): job_response = job_response.dict() - - final_status = job_response.get('status', 'unknown') - final_progress = job_response.get('progress', 0) + + final_status = job_response.get("status", "unknown") + final_progress = job_response.get("progress", 0) logger.info(f"📤 RESPONSE: {job_id[:8]}... returning status={final_status}, progress={final_progress}") - + return job_response -@router.get("/results/{job_id}") -async def get_processing_results(job_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): - """Get the results of a completed processing job""" - try: - # Check if job exists and belongs to user - job_data = jobs.get(job_id) - if not job_data: - raise HTTPException(status_code=404, detail="Job not found") - - # Verify job belongs to current user - if job_data["user_id"] != current_user.id: - raise HTTPException(status_code=403, detail="Access denied to job") - - # Get all photos for this job from the database - from app.models.processing import PhotoDB - from app.models.usage import ProcessingJob as ProcessingJobDB - - # Get the INTEGER primary key for this job_id - processing_job_record = db.query(ProcessingJobDB).filter( +@router.get("/results/{job_id}") +async def get_processing_results( + job_id: str, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """Get the results of a completed processing job.""" + # Load from memory or reconstruct from DB + job_data = ensure_job_loaded(job_id, db, current_user.id) + if not job_data: + raise HTTPException(status_code=404, detail="Job not found") + + # Verify ownership + if job_data["user_id"] != current_user.id: + raise HTTPException(status_code=403, detail="Access denied to job") + + # Get all photos for this job from the database + processing_job_record = ( + db.query(ProcessingJobDB) + .filter( ProcessingJobDB.job_id == job_id, - ProcessingJobDB.user_id == current_user.id - ).first() - - if not processing_job_record: - logger.warning(f"Processing job not found: {job_id}") - return {"unknown": []} - - processing_job_pk = processing_job_record.id - - photos = db.query(PhotoDB).filter( + ProcessingJobDB.user_id == current_user.id, + ) + .first() + ) + if not processing_job_record: + logger.warning(f"Processing job not found: {job_id}") + return {"unknown": []} + + processing_job_pk = processing_job_record.id + photos = ( + db.query(PhotoDB) + .filter( PhotoDB.processing_job_id == processing_job_pk, - PhotoDB.user_id == current_user.id - ).all() - - if not photos: - logger.warning(f"No photos found for job {job_id}, user {current_user.id}") - return {"unknown": []} - - # Group photos by bib number - grouped_photos = {} - - for photo in photos: - # Get effective bib number (manual label takes precedence) - bib_number = photo.manual_label or photo.detected_number or 'unknown' - - # Initialize group if not exists - if bib_number not in grouped_photos: - grouped_photos[bib_number] = [] - - # Frontend will generate image URL using getImageUrl() method with JWT token - photo_data = { - "id": photo.photo_id, # Frontend uses this with getImageUrl() for secure access - "filename": photo.original_filename, - "detected_number": photo.detected_number, - "manual_label": photo.manual_label, - "confidence": photo.confidence, - "detection_method": photo.detection_method, - "file_size_mb": round(photo.file_size_bytes / (1024 * 1024), 2) if photo.file_size_bytes else 0, - "processing_status": photo.processing_status.value if photo.processing_status else "pending", - "created_at": photo.created_at.isoformat() if photo.created_at else None, - "processed_at": photo.processed_at.isoformat() if photo.processed_at else None + PhotoDB.user_id == current_user.id, + ) + .all() + ) + if not photos: + logger.warning(f"No photos found for job {job_id}, user {current_user.id}") + return {"unknown": []} + + # Group photos by bib number + grouped_photos: Dict[str, List[Dict[str, Any]]] = {} + for photo in photos: + bib_number = photo.manual_label or photo.detected_number or "unknown" + grouped_photos.setdefault(bib_number, []) + photo_data = { + "id": photo.photo_id, + "filename": photo.original_filename, + "detected_number": photo.detected_number, + "manual_label": photo.manual_label, + "confidence": photo.confidence, + "detection_method": photo.detection_method, + "file_size_mb": round(photo.file_size_bytes / (1024 * 1024), 2) + if photo.file_size_bytes + else 0, + "processing_status": photo.processing_status.value + if photo.processing_status + else "pending", + "created_at": photo.created_at.isoformat() if photo.created_at else None, + "processed_at": photo.processed_at.isoformat() if photo.processed_at else None, + } + if photo.bbox_x is not None and photo.bbox_y is not None: + photo_data["bbox"] = { + "x": photo.bbox_x, + "y": photo.bbox_y, + "width": photo.bbox_width, + "height": photo.bbox_height, } - - # Add bounding box if available - if photo.bbox_x is not None and photo.bbox_y is not None: - photo_data["bbox"] = { - "x": photo.bbox_x, - "y": photo.bbox_y, - "width": photo.bbox_width, - "height": photo.bbox_height - } - - grouped_photos[bib_number].append(photo_data) - - # Sort groups: numbered bibs first (sorted numerically), then unknown - sorted_grouped = {} - - # Add numbered bibs first, sorted numerically - numbered_bibs = [] - for bib in grouped_photos.keys(): - if bib != 'unknown': - try: - # Try to convert to int for proper numeric sorting - numbered_bibs.append((int(bib), bib)) - except ValueError: - # Non-numeric bib, add as string - numbered_bibs.append((float('inf'), bib)) - - # Sort by numeric value, then by string - numbered_bibs.sort(key=lambda x: (x[0], x[1])) - - # Add sorted numbered groups - for _, bib in numbered_bibs: - sorted_grouped[bib] = grouped_photos[bib] - - # Add unknown group last - if 'unknown' in grouped_photos: - sorted_grouped['unknown'] = grouped_photos['unknown'] - - logger.info(f"✅ Retrieved {len(photos)} photos in {len(sorted_grouped)} groups for job {job_id}") - - return sorted_grouped - - except HTTPException: - # Re-raise HTTP exceptions - raise - except Exception as e: - logger.error(f"❌ Failed to get results for job {job_id}: {e}") - raise HTTPException(status_code=500, detail=f"Failed to retrieve results: {str(e)}") - + grouped_photos[bib_number].append(photo_data) + + # Sort numbered bibs first, then unknown + sorted_grouped: Dict[str, List[Dict[str, Any]]] = {} + numbered_bibs = [] + for bib in grouped_photos: + if bib != "unknown": + try: + numbered_bibs.append((int(bib), bib)) + except ValueError: + numbered_bibs.append((float("inf"), bib)) + numbered_bibs.sort(key=lambda x: (x[0], x[1])) + for _, bib in numbered_bibs: + sorted_grouped[bib] = grouped_photos[bib] + if "unknown" in grouped_photos: + sorted_grouped["unknown"] = grouped_photos["unknown"] + + logger.info(f"✅ Retrieved {len(photos)} photos in {len(sorted_grouped)} groups for job {job_id}") + return sorted_grouped async def process_photos_async_fallback(job_id: str, photo_ids: List[str], user_id: int, debug_mode: bool): """ @@ -755,24 +699,93 @@ def sync_jobs_from_database(): """Load active jobs from DB into memory on startup.""" db = SessionLocal() try: + from app.models.processing import PhotoDB, ProcessingStatus as DBProcessingStatus + active_jobs = db.query(ProcessingJobDB).filter( ProcessingJobDB.status.in_(["pending", "processing"]) ).all() + + status_map = { + "completed": ProcessingStatus.COMPLETED, + "failed": ProcessingStatus.FAILED, + "processing": ProcessingStatus.PROCESSING, + "pending": ProcessingStatus.PENDING, + } + for db_job in active_jobs: - # Reconstruct the ProcessingJob object + completed_photos = db.query(PhotoDB).filter( + PhotoDB.processing_job_id == db_job.id, + PhotoDB.processing_status == DBProcessingStatus.COMPLETED + ).count() + + # Recompute progress if needed + total_photos = db_job.total_photos or 0 + progress = 0 + if total_photos: + progress = int((completed_photos / total_photos) * 100) + + current_status = status_map.get(db_job.status, ProcessingStatus.PROCESSING) + if progress >= 100 and current_status == ProcessingStatus.PROCESSING: + current_status = ProcessingStatus.COMPLETED + job = ProcessingJob( job_id=db_job.job_id, - photo_ids=[], # We don't necessarily need the IDs for status tracking - status=ProcessingStatus(db_job.status), - total_photos=db_job.total_photos, - progress=db_job.progress + photo_ids=[], # not needed for status tracking + status=current_status, + total_photos=total_photos, + progress=progress, + completed_photos=completed_photos, ) jobs[db_job.job_id] = {"job": job, "user_id": db_job.user_id} + logger.info(f"🔄 Synced {len(active_jobs)} active jobs from database") finally: db.close() def cleanup_old_jobs(): """Optional: Clear very old jobs from memory.""" - # (Implementation can be simple or empty for now) - pass \ No newline at end of file + pass + +def ensure_job_loaded(job_id: str, db: Session, user_id: int | None = None): + job_data = jobs.get(job_id) + if job_data: + return job_data + from app.models.usage import ProcessingJob as ProcessingJobDB + from app.models.processing import PhotoDB, ProcessingStatus as DBProcessingStatus + + q = db.query(ProcessingJobDB).filter(ProcessingJobDB.job_id == job_id) + if user_id: + q = q.filter(ProcessingJobDB.user_id == user_id) + db_job = q.first() + if not db_job: + return None + + total_photos = db_job.total_photos or 0 + completed = db.query(PhotoDB).filter( + PhotoDB.processing_job_id == db_job.id, + PhotoDB.processing_status == DBProcessingStatus.COMPLETED, + ).count() + progress = int((completed / total_photos) * 100) if total_photos else 0 + + status_map = { + "completed": ProcessingStatus.COMPLETED, + "failed": ProcessingStatus.FAILED, + "processing": ProcessingStatus.PROCESSING, + "pending": ProcessingStatus.PENDING, + } + current_status = status_map.get(db_job.status, ProcessingStatus.PROCESSING) + if progress >= 100 and current_status == ProcessingStatus.PROCESSING: + current_status = ProcessingStatus.COMPLETED + + job = ProcessingJob( + job_id=db_job.job_id, + photo_ids=[], + status=current_status, + total_photos=total_photos, + progress=progress, + completed_photos=completed, + ) + job_data = {"job": job, "user_id": db_job.user_id} + jobs[job_id] = job_data + return job_data + \ No newline at end of file diff --git a/backend/app/services/detector.py b/backend/app/services/detector.py index 99e168c..3870cdf 100644 --- a/backend/app/services/detector.py +++ b/backend/app/services/detector.py @@ -27,6 +27,7 @@ def __init__(self): self.results: Dict[str, DetectionResult] = {} self.gemini_client = None self.use_gemini = None # Will be determined on first use + self._file_cache: Dict[str, str] = {} # Cache: photo_id -> local_file_path def _initialize_gemini_client(self): """Initialize Gemini client lazily when first needed""" @@ -56,9 +57,9 @@ async def process_photo_batch( self, photo_ids: List[str], debug_mode: bool = False, user_id: Optional[int] = None ) -> Dict[str, DetectionResult]: """ - Sequential Processing for Maximum Accuracy (Batch Size 1). - Processes photos individually to eliminate 'context bleeding' and digit count forcing. - Fixes the '648 vs 7' hallucination issue by focusing on digit integrity over forced counting. + Concurrent Processing for Speed with Bounded Gemini Calls. + Processes photos in parallel but with individual Gemini calls for maximum accuracy. + Uses semaphore to limit concurrent API calls and prevent rate limiting. """ if not photo_ids: return {} @@ -68,35 +69,47 @@ async def process_photo_batch( if not self.use_gemini: logger.error("Cannot process batch: Gemini is not configured.") - return {photo_id: DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) + return {photo_id: DetectionResult(bib_number="unknown", bbox=None) for photo_id in photo_ids} - results = {} - logger.info(f"🔄 SEQUENTIAL PROCESSING: Starting {len(photo_ids)} photos individually") + # Tunable concurrency limit via environment variable + import asyncio + concurrency = int(os.getenv("GEMINI_CONCURRENCY", "2")) # Default: 2 for balanced throughput + semaphore = asyncio.Semaphore(concurrency) - # Process each photo INDIVIDUALLY for absolute focus - for i, photo_id in enumerate(photo_ids): - photo_start_time = time.time() - - try: - logger.info(f"📸 [{i+1}/{len(photo_ids)}] Processing {photo_id[:8]}...") - - photo_path = self._find_photo_path(photo_id, user_id) - if not photo_path: - logger.warning(f"❌ [{photo_id[:8]}] Photo file not found") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - continue - - # Load 3072px optimized data - image_data, img_shape = self._optimize_image_for_gemini(photo_path, debug_mode) - if not image_data: - logger.warning(f"❌ [{photo_id[:8]}] Failed to optimize image") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - continue + logger.info(f"🚀 CONCURRENT PROCESSING: Starting {len(photo_ids)} photos with concurrency={concurrency}") + + results = {} + + async def process_single_photo(idx: int, photo_id: str) -> None: + """Process a single photo with semaphore-controlled concurrency.""" + async with semaphore: + photo_start_time = time.time() - # REFINED PROMPT: Focus on Digit Integrity over Count - # This fixes the "648 vs 7" issue by allowing the model to reject background graphics - single_prompt = """Act as an elite sports photography OCR specialist. + try: + logger.info(f"📸 [{idx+1}/{len(photo_ids)}] Processing {photo_id[:8]}...") + + # T1: File path resolution and potential GCS download + t1_start = time.time() + photo_path = self._find_photo_path(photo_id, user_id) + t1_duration = (time.time() - t1_start) * 1000 # Convert to milliseconds + + if not photo_path: + logger.warning(f"❌ [{photo_id[:8]}] Photo file not found") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + return + + # T2: Image optimization and preprocessing + t2_start = time.time() + image_data, img_shape = self._optimize_image_for_gemini(photo_path, debug_mode) + t2_duration = (time.time() - t2_start) * 1000 + if not image_data: + logger.warning(f"❌ [{photo_id[:8]}] Failed to optimize image") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + return + + # REFINED PROMPT: Focus on Digit Integrity over Count + single_prompt = """Act as an elite sports photography OCR specialist. Extract the race number from the handlebar plate or bib. Rules for 99.9% Accuracy: @@ -110,171 +123,216 @@ async def process_photo_batch( "plate_description": "description of plate color/position", "digit_count": int, "number": "string", - "confidence": "high/medium/low", "reasoning": "Explain why shapes were identified as digits or rejected as graphics" }""" - content = [ - types.Part.from_bytes(data=image_data, mime_type="image/jpeg"), - single_prompt - ] - - # Sequential API Call - logger.info(f"🤖 [{photo_id[:8]}] Calling Gemini with Digit Integrity prompt...") - response = await self.gemini_client.aio.models.generate_content( - model="gemini-2.0-flash", - contents=content, - config=types.GenerateContentConfig(response_mime_type="application/json") - ) - - if not response or not response.text: - logger.error(f"❌ [{photo_id[:8]}] Empty Gemini response") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - continue - - # Enhanced JSON parsing with error handling - try: - clean_text = response.text.strip() - if clean_text.startswith("```json"): - clean_text = clean_text.split("```json")[1].split("```")[0].strip() - elif clean_text.startswith("```"): - clean_text = clean_text.split("```")[1].split("```")[0].strip() - - res_json = json.loads(clean_text) - - # Handle both array and object responses from Gemini - if isinstance(res_json, list): - if len(res_json) == 0: - logger.error(f"❌ EMPTY ARRAY [{photo_id[:8]}]: Gemini returned empty array") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - continue - # Take the first element from array - res_json = res_json[0] - logger.debug(f"🔧 ARRAY HANDLING [{photo_id[:8]}]: Extracted first element from array response") - elif not isinstance(res_json, dict): - logger.error(f"❌ INVALID FORMAT [{photo_id[:8]}]: Expected dict or list, got {type(res_json)} - {clean_text[:100]}...") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - continue - - # Extract and validate fields - detected_bib = str(res_json.get("number", "")).strip() - plate_description = res_json.get("plate_description", "No description") - digit_count = res_json.get("digit_count", 0) - confidence_text = res_json.get("confidence", "low") - reasoning = res_json.get("reasoning", "No reasoning provided") - - # Enhanced logging with reasoning - logger.info(f"🔍 PLATE [{photo_id[:8]}] ({confidence_text}): {plate_description}") - logger.info(f"🔢 REASONING [{photo_id[:8]}]: {reasoning}") - logger.info(f"📊 DIGITS [{photo_id[:8]}]: Detected '{detected_bib}' ({digit_count} digit{'s' if digit_count != 1 else ''})") - - # Validate detected bib number - if not detected_bib or detected_bib.upper() in ["NONE", "NULL", "UNKNOWN", ""]: - logger.info(f"❌ EMPTY [{photo_id[:8]}]: No number detected - {reasoning}") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - elif not self._is_valid_bib_number(detected_bib): - logger.info(f"❌ INVALID [{photo_id[:8]}]: '{detected_bib}' failed validation - {reasoning}") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - else: - # Convert text confidence to numeric - confidence_map = {"high": 0.95, "medium": 0.75, "low": 0.5} - numeric_confidence = confidence_map.get(confidence_text.lower(), 0.5) + content = [ + types.Part.from_bytes(data=image_data, mime_type="image/jpeg"), + single_prompt + ] + + # T3: Gemini API call + t3_start = time.time() + logger.info(f"🤖 [{photo_id[:8]}] Calling Gemini with Digit Integrity prompt...") + response = await self.gemini_client.aio.models.generate_content( + model="gemini-2.0-flash", + contents=content, + config=types.GenerateContentConfig(response_mime_type="application/json") + ) + t3_duration = (time.time() - t3_start) * 1000 + + if not response or not response.text: + logger.error(f"❌ [{photo_id[:8]}] Empty Gemini response") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + return + + # T4: JSON parsing and postprocessing + t4_start = time.time() + try: + clean_text = response.text.strip() + if clean_text.startswith("```json"): + clean_text = clean_text.split("```json")[1].split("```")[0].strip() + elif clean_text.startswith("```"): + clean_text = clean_text.split("```")[1].split("```")[0].strip() - # Create center-focused bounding box - bbox = [ - int(img_shape[1] * 0.25), int(img_shape[0] * 0.3), - int(img_shape[1] * 0.75), int(img_shape[0] * 0.7) - ] + res_json = json.loads(clean_text) - photo_time = time.time() - photo_start_time - logger.info(f"✅ SUCCESS [{photo_id[:8]}] ({confidence_text}): '{detected_bib}' in {photo_time:.2f}s") + # Handle both array and object responses from Gemini + if isinstance(res_json, list): + if len(res_json) == 0: + logger.error(f"❌ EMPTY ARRAY [{photo_id[:8]}]: Gemini returned empty array") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + return + # Take the first element from array + res_json = res_json[0] + logger.debug(f"🔧 ARRAY HANDLING [{photo_id[:8]}]: Extracted first element from array response") + elif not isinstance(res_json, dict): + logger.error(f"❌ INVALID FORMAT [{photo_id[:8]}]: Expected dict or list, got {type(res_json)} - {clean_text[:100]}...") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + return + + # Extract and validate fields + detected_bib = str(res_json.get("number", "")).strip() + plate_description = res_json.get("plate_description", "No description") + digit_count = res_json.get("digit_count", 0) + reasoning = res_json.get("reasoning", "No reasoning provided") + + # Enhanced logging with reasoning + logger.info(f"🔍 PLATE [{photo_id[:8]}]: {plate_description}") + logger.info(f"🔢 REASONING [{photo_id[:8]}]: {reasoning}") + logger.info(f"📊 DIGITS [{photo_id[:8]}]: Detected '{detected_bib}' ({digit_count} digit{'s' if digit_count != 1 else ''})") - results[photo_id] = DetectionResult( - bib_number=detected_bib, - confidence=numeric_confidence, - bbox=bbox - ) - self.results[photo_id] = results[photo_id] # Store in cache - - except json.JSONDecodeError as e: - logger.error(f"❌ JSON ERROR [{photo_id[:8]}]: {e} - Response: {response.text[:200]}...") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - except KeyError as e: - logger.error(f"❌ MISSING FIELD [{photo_id[:8]}]: {e} - Response: {response.text[:200]}...") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) - - except Exception as e: - logger.error(f"❌ PROCESSING ERROR [{photo_id[:8]}]: {e}") - results[photo_id] = DetectionResult(bib_number="unknown", confidence=0.0, bbox=None) + # Validate detected bib number + if not detected_bib or detected_bib.upper() in ["NONE", "NULL", "UNKNOWN", ""]: + logger.info(f"❌ EMPTY [{photo_id[:8]}]: No number detected - {reasoning}") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + elif not self._is_valid_bib_number(detected_bib): + logger.info(f"❌ INVALID [{photo_id[:8]}]: '{detected_bib}' failed validation - {reasoning}") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + + + # Create center-focused bounding box + bbox = [ + int(img_shape[1] * 0.25), int(img_shape[0] * 0.3), + int(img_shape[1] * 0.75), int(img_shape[0] * 0.7) + ] + + results[photo_id] = DetectionResult( + bib_number=detected_bib, + bbox=bbox + ) + self.results[photo_id] = results[photo_id] # Store in cache + + # Complete T4 timing + t4_duration = (time.time() - t4_start) * 1000 + + # Log comprehensive timing breakdown + total_duration = t1_duration + t2_duration + t3_duration + t4_duration + logger.info(f"⏱️ TIMING [{photo_id[:8]}]: T1={t1_duration:.0f}ms T2={t2_duration:.0f}ms T3={t3_duration:.0f}ms T4={t4_duration:.0f}ms TOTAL={total_duration:.0f}ms") + + photo_time = time.time() - photo_start_time + if detected_bib and detected_bib not in ["unknown", "error"]: + logger.info(f"✅ SUCCESS [{photo_id[:8]}]: '{detected_bib}' in {photo_time:.2f}s") + else: + logger.info(f"❌ NO DETECTION [{photo_id[:8]}] in {photo_time:.2f}s") + + except json.JSONDecodeError as e: + t4_duration = (time.time() - t4_start) * 1000 + total_duration = t1_duration + t2_duration + t3_duration + t4_duration + logger.error(f"❌ JSON ERROR [{photo_id[:8]}]: {e} - Response: {response.text[:200]}...") + logger.info(f"⏱️ TIMING [{photo_id[:8]}]: T1={t1_duration:.0f}ms T2={t2_duration:.0f}ms T3={t3_duration:.0f}ms T4={t4_duration:.0f}ms TOTAL={total_duration:.0f}ms") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + except KeyError as e: + t4_duration = (time.time() - t4_start) * 1000 + total_duration = t1_duration + t2_duration + t3_duration + t4_duration + logger.error(f"❌ MISSING FIELD [{photo_id[:8]}]: {e} - Response: {response.text[:200]}...") + logger.info(f"⏱️ TIMING [{photo_id[:8]}]: T1={t1_duration:.0f}ms T2={t2_duration:.0f}ms T3={t3_duration:.0f}ms T4={t4_duration:.0f}ms TOTAL={total_duration:.0f}ms") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + + except Exception as e: + # Handle timing for general errors (calculate what we can) + try: + t4_duration = (time.time() - t4_start) * 1000 if 't4_start' in locals() else 0 + total_duration = t1_duration + t2_duration + t3_duration + t4_duration + logger.info(f"⏱️ TIMING [{photo_id[:8]}]: T1={t1_duration:.0f}ms T2={t2_duration:.0f}ms T3={t3_duration:.0f}ms T4={t4_duration:.0f}ms TOTAL={total_duration:.0f}ms") + except: + pass # Don't let timing errors mask the original error + + logger.error(f"❌ PROCESSING ERROR [{photo_id[:8]}]: {e}") + results[photo_id] = DetectionResult(bib_number="unknown", bbox=None) + + # Process all photos concurrently with bounded parallelism + await asyncio.gather(*(process_single_photo(i, photo_id) for i, photo_id in enumerate(photo_ids))) # Final summary total_time = time.time() - batch_start_time successful_count = len([r for r in results.values() if r.bib_number not in ["unknown", "error"]]) success_rate = (successful_count / len(photo_ids)) * 100 if photo_ids else 0 - logger.info(f"🎯 SEQUENTIAL COMPLETE: {successful_count}/{len(photo_ids)} detected ({success_rate:.1f}% success) in {total_time:.2f}s") + # DEBUG: Log detailed results before return + logger.info(f"🔍 DEBUG DETECTOR: results type={type(results)}, len={len(results)}") + if results: + logger.info(f"🔍 DEBUG DETECTOR: keys={list(results.keys())[:3]}") + for photo_id, result in list(results.items())[:2]: # Log first 2 results + logger.info(f"🔍 DEBUG DETECTOR: {photo_id[:8]} -> bib='{result.bib_number}', conf={getattr(result, 'confidence', 'NO_CONF')}") + + logger.info(f"🎯 CONCURRENT COMPLETE: {successful_count}/{len(photo_ids)} detected ({success_rate:.1f}% success) in {total_time:.2f}s") + logger.info(f"📁 CACHE STATUS: {len(self._file_cache)} files cached in memory") return results - def _resize_image(self, image_bytes: bytes, max_size: int = 1536) -> bytes: - """ - Resizes image in memory with OCR-optimized settings. - Higher quality and resolution for better text recognition. - """ + + def _optimize_image_for_gemini( + self, image_path: str, debug_mode: bool = False + ) -> Tuple[bytes, Tuple[int, int]]: + """Optimize image for Gemini with configurable compression for faster processing""" try: - with Image.open(io.BytesIO(image_bytes)) as img: - # Convert to RGB to handle PNGs/CMYK + # Environment-controlled optimization settings + max_dim = int(os.getenv("GEMINI_MAX_DIM", "1536")) # Default: 1536px max dimension + jpeg_quality = int(os.getenv("GEMINI_JPEG_QUALITY", "90")) # Default: 90% quality + enable_compression = os.getenv("GEMINI_ENABLE_COMPRESSION", "true").lower() == "true" + + with Image.open(image_path) as img: + # Convert to RGB for JPEG compatibility if img.mode != 'RGB': img = img.convert('RGB') original_size = img.size + original_width, original_height = original_size - # Crop bottom 5% to remove watermarks while preserving handlebar plates + if not enable_compression: + # Raw mode for maximum accuracy (A/B testing) + with open(image_path, "rb") as f: + original_data = f.read() + if debug_mode: + logger.info(f"📷 RAW MODE: {original_width}x{original_height} ({len(original_data)/1024:.0f}KB) - NO COMPRESSION") + return original_data, (original_height, original_width) + + # OCR-optimized preprocessing width, height = img.size + + # Crop bottom 5% to remove watermarks while preserving handlebar plates crop_height = int(height * 0.95) # Keep top 95% img = img.crop((0, 0, width, crop_height)) - # Only resize if larger than max_size - if max(img.size) > max_size: - img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) + # Smart resize: only if larger than max_dim + new_size = img.size + if max(img.size) > max_dim: + # Calculate new dimensions maintaining aspect ratio + if width > height: + new_width = max_dim + new_height = int((height * max_dim) / width) + else: + new_height = max_dim + new_width = int((width * max_dim) / height) - if logger.isEnabledFor(logging.DEBUG): - new_size = img.size - reduction = ((original_size[0] * original_size[1]) - (new_size[0] * new_size[1])) / (original_size[0] * original_size[1]) * 100 - logger.debug(f"🏎️ OCR Resize: {original_size} → {new_size} ({reduction:.0f}% smaller, watermark cropped)") + img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) + new_size = (new_width, new_height) - # OCR-optimized compression settings + # High-quality JPEG compression optimized for OCR buffer = io.BytesIO() - img.save(buffer, format="JPEG", quality=95, optimize=False) - return buffer.getvalue() + img.save( + buffer, + format="JPEG", + quality=jpeg_quality, + optimize=True, # Better compression + subsampling=0 # No chroma subsampling for text clarity + ) + optimized_data = buffer.getvalue() - except Exception as e: - logger.warning(f"⚠️ PIL resize failed: {e}, using original") - return image_bytes # Fallback to original - - def _optimize_image_for_gemini( - self, image_path: str, debug_mode: bool = False - ) -> Tuple[bytes, Tuple[int, int]]: - """Send raw images to Gemini for maximum OCR accuracy""" - try: - # Read raw image data - NO COMPRESSION - with open(image_path, "rb") as f: - original_data = f.read() - - # Get original dimensions for bbox calculations (PIL returns width, height) - with Image.open(image_path) as img: - original_width, original_height = img.size - - # COMPRESSION DISABLED: Return raw image data for maximum OCR accuracy - if debug_mode: - logger.info(f"📷 RAW IMAGE: {original_width}x{original_height} ({len(original_data)/1024:.0f}KB) - NO COMPRESSION") + if debug_mode: + original_kb = len(open(image_path, "rb").read()) / 1024 + optimized_kb = len(optimized_data) / 1024 + compression_ratio = (1 - optimized_kb / original_kb) * 100 + logger.info(f"📷 OPTIMIZED: {original_size}→{new_size} | {original_kb:.0f}KB→{optimized_kb:.0f}KB ({compression_ratio:.0f}% smaller) | Q={jpeg_quality}") + + # Return optimized data with original dimensions for bbox calculations + return optimized_data, (original_height, original_width) - return original_data, (original_height, original_width) - except Exception as e: - logger.error(f"PIL optimization failed: {e}, using original") + logger.error(f"Image optimization failed: {e}, using original") # Fallback to original image with open(image_path, "rb") as f: original_data = f.read() @@ -303,7 +361,7 @@ def _find_photo_path( self, photo_id: str, user_id: Optional[int] = None ) -> Optional[str]: """ - Find photo path with strict user isolation. + Find photo path with strict user isolation and caching. Supports both local storage and Google Cloud Storage. SECURITY: Never falls back to shared directories - only user-specific paths. """ @@ -316,11 +374,25 @@ def _find_photo_path( ) return None + # Check cache first + if photo_id in self._file_cache: + cached_path = self._file_cache[photo_id] + if os.path.exists(cached_path): + logger.debug(f"📁 CACHE HIT: {photo_id[:8]} -> {os.path.basename(cached_path)}") + return cached_path + else: + # Remove stale cache entry + del self._file_cache[photo_id] + logger.debug(f"📁 CACHE MISS: {photo_id[:8]} (file deleted)") + # First try local storage user_upload_dir = os.path.join("uploads", str(user_id)) for ext in extensions: local_path = os.path.join(user_upload_dir, f"{photo_id}{ext}") if os.path.exists(local_path): + # Cache the successful local path + self._file_cache[photo_id] = local_path + logger.debug(f"📁 LOCAL FOUND: {photo_id[:8]} -> {os.path.basename(local_path)}") return local_path # If not found locally, try to download from GCS @@ -342,7 +414,10 @@ def _find_photo_path( os.makedirs(user_upload_dir, exist_ok=True) local_path = os.path.join(user_upload_dir, filename) blob.download_to_filename(local_path) - logger.info(f"Downloaded {photo_id} from GCS to local storage") + + # Cache the successful download + self._file_cache[photo_id] = local_path + logger.info(f"📥 GCS DOWNLOAD: {photo_id[:8]} -> {filename} (cached)") return local_path except Exception as e: logger.debug(f"Could not download from GCS: {e}") @@ -395,14 +470,12 @@ def update_manual_label(self, photo_id: str, bib_number: str) -> bool: # For unknown labels, keep them as unknown manual_result = DetectionResult( bib_number="unknown", - confidence=0.0, # Unknown labels get 0% confidence bbox=None, ) else: # Create a manual detection result with high confidence for valid bib numbers manual_result = DetectionResult( bib_number=bib_number, - confidence=1.0, # Manual labels get 100% confidence bbox=None, # No bounding box for manual labels ) diff --git a/backend/database.py b/backend/database.py index 7f24aa4..bad3dc7 100644 --- a/backend/database.py +++ b/backend/database.py @@ -11,11 +11,11 @@ # Configure engine based on database type if DATABASE_URL.startswith("postgresql://"): - # PostgreSQL configuration (production) + # PostgreSQL configuration (production) - Optimized for moderate concurrency engine = create_engine( DATABASE_URL, - pool_size=20, # Handle concurrent image processing requests - max_overflow=30, # Handle burst loads during batch uploads + pool_size=3, # Match max Cloud Tasks concurrent dispatches + max_overflow=0, # No overflow - strict limit to prevent exhaustion pool_timeout=30, # Connection timeout pool_recycle=1800, # Recycle connections every 30 minutes )