diff --git a/.env.example b/.env.example index 63da271..4842182 100644 --- a/.env.example +++ b/.env.example @@ -16,6 +16,10 @@ FEEDBACK_QUEUE=plagiarism_feedback # Dead Letter Queue (optional - leave empty to disable) DEAD_LETTER_QUEUE=plagiarism_failed_submissions +#GCP CONFIGURATION +GCP_ENABLED=true +GCP_KEY_PATH=/app/credentials/gcp_service_account.json + # POSTGRESQL CONFIGURATION POSTGRES_HOST=postgres POSTGRES_PORT=5432 diff --git a/.gitignore b/.gitignore index 3e1e6d7..cdbb287 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ + # Credentials + app/credentials/* + +scratch* + # Python __pycache__/ *.py[cod] @@ -14,6 +19,7 @@ build/ # Environment variables .env .env.local +.env.prod # Logs logs/ diff --git a/Dockerfile b/Dockerfile index 5426394..37475e8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,7 @@ WORKDIR /app # Install build dependencies in a single layer RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ + vim \ gcc \ g++ \ git \ @@ -18,10 +19,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # Copy only requirements first for better caching COPY requirements.txt . -# Use pip cache and install in parallel -RUN --mount=type=cache,target=/root/.cache/pip \ - python -m pip install --upgrade pip setuptools wheel && \ - pip install -r requirements.txt --user --no-warn-script-location +# Install to explicit location +RUN python -m pip install --no-cache-dir --prefix=/install --upgrade pip setuptools wheel && \ + pip install --no-cache-dir --no-deps --prefix=/install -r requirements.txt + +RUN pip install --prefix=/install -r requirements.txt --no-cache-dir # ============================================ # Final stage - minimal runtime image @@ -37,18 +39,15 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && rm -rf /var/lib/apt/lists/* # Copy installed packages from builder -COPY --from=builder /root/.local /root/.local - -ENV PATH=/root/.local/bin:$PATH +COPY --from=builder /install /usr/local # Create necessary directories RUN mkdir -p /app/data /app/logs /root/.cache/clip +RUN ls + # Copy application code (do this last for better caching) COPY . . -# Lightweight healthcheck -HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ - CMD python -c "import sys; sys.exit(0)" || exit 1 -CMD ["python", "app.py"] +CMD ["python", "app.py"] \ No newline at end of file diff --git a/Dockerfile.api b/Dockerfile.api index 01fcc66..d0b5c03 100644 --- a/Dockerfile.api +++ b/Dockerfile.api @@ -1,4 +1,4 @@ -FROM python:3.13-slim +FROM python:3.13-slim as builder WORKDIR /app @@ -7,9 +7,18 @@ RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf # Copy requirements and install dependencies COPY api/requirements.txt /app/api/requirements.txt -RUN --mount=type=cache,target=/root/.cache/pip \ - python -m pip install --upgrade pip && \ - pip install -r /app/api/requirements.txt +RUN python -m pip install --prefix=/install -r --upgrade pip && \ + pip install --prefix=/install -r /app/api/requirements.txt --no-cache-dir + +# ============================================ +# Final stage - minimal runtime image +# ============================================ +FROM python:3.13-slim + +WORKDIR /app + +# Copy installed packages from builder +COPY --from=builder /install /usr/local # Copy application code COPY api/ /app/api/ @@ -18,9 +27,6 @@ COPY utils/ /app/utils/ # Expose API port EXPOSE 8000 -# Healthcheck (check if uvicorn is responding) -HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ - CMD curl -f http://localhost:8000/ || exit 1 # Run the API CMD ["uvicorn", "api.api:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/api/api.py b/api/api.py index 0d42d81..35e8d50 100644 --- a/api/api.py +++ b/api/api.py @@ -8,31 +8,49 @@ from fastapi import FastAPI, HTTPException, status from fastapi.responses import JSONResponse from pydantic import BaseModel, Field, validator -import aio_pika -import json -import uuid - -from dotenv import load_dotenv -import os - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from utils.security import safe_hash_student_id - -# load_dotenv() - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - -# RabbitMQ Configuration -RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost") +import aio_pika +import json +import uuid +from dotenv import load_dotenv +import os + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from utils.security import safe_hash_student_id + +load_dotenv() + +DEFAULT_LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + + +def get_log_format() -> str: + log_format = os.getenv("LOG_FORMAT", DEFAULT_LOG_FORMAT) + if log_format.lower() == "json": + return DEFAULT_LOG_FORMAT + return log_format + + +# Configure logging +logging.basicConfig( + level=getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO), + format=get_log_format(), +) +logger = logging.getLogger(__name__) + +# RabbitMQ Configuration +RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost") RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672") RABBITMQ_VHOST = os.getenv("RABBITMQ_VHOST", "/") RABBITMQ_USER = os.getenv("RABBITMQ_USER", "admin") RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "admin123") +#PRINT THE RABBITMQ CONFIG FOR DEBUGGING +# logger.info("###################") +# logger.info(f"RABBITMQ_HOST={RABBITMQ_HOST}") +# logger.info(f"RABBITMQ_PORT={RABBITMQ_PORT}") +# logger.info(f"RABBITMQ_VHOST={RABBITMQ_VHOST}") +# logger.info(f"RABBITMQ_USER={RABBITMQ_USER}") +# logger.info("###################") + SUBMISSION_QUEUE = os.getenv("SUBMISSION_QUEUE", "plagiarism_submissions") FEEDBACK_QUEUE = os.getenv("FEEDBACK_QUEUE", "plagiarism_feedback") @@ -53,15 +71,42 @@ class SubmissionRequest(BaseModel): """Request model for submission creation""" student_id: str = Field(..., description="Student identifier", min_length=1) - image_url: str = Field(..., description="URL of the submitted image", min_length=1) + submission_type: str = Field(..., description="Type of submission: text, audio, video, or image") + submission_url: Optional[str] = Field( + None, description="URL of the submitted resource (required for image submissions)" + ) + submission_text: Optional[str] = Field( + None, description="Text content of the submission (required for text submissions)" + ) + submitted_at: Optional[str] = Field( + None, + description="Original submission timestamp in ISO format. If provided, this value is preserved in the processed result.", + ) assignment_id: Optional[str] = Field(None, description="Assignment identifier") - @validator("image_url") - def validate_url(cls, v): - """Validate that image_url is not empty""" - if not v or not v.strip(): - raise ValueError("image_url cannot be empty") - return v.strip() + @validator("submission_type") + def validate_submission_type(cls, v): + """Validate that submission_type is one of the supported values""" + allowed = {"text", "audio", "video", "image"} + if v not in allowed: + raise ValueError(f"submission_type must be one of {sorted(allowed)}") + return v + + @validator("submission_url", always=True) + def validate_submission_url(cls, v, values): + if values.get("submission_type") == "image": + if not v or not v.strip(): + raise ValueError("submission_url is required for image submissions") + return v.strip() + return v + + @validator("submission_text", always=True) + def validate_submission_text(cls, v, values): + if values.get("submission_type") == "text": + if not v or not v.strip(): + raise ValueError("submission_text is required for text submissions") + return v.strip() + return v @validator("student_id") def validate_student_id(cls, v): @@ -199,13 +244,13 @@ async def send_to_rabbitmq(message: dict, queue_name: str): ) async def create_submission(request: SubmissionRequest): """ - Submit an image for plagiarism detection + Submit a new user submission for plagiarism detection - Creates a new plagiarism check submission by sending the image URL - and metadata to the processing queue. + Creates a new plagiarism check submission by sending submission metadata + to the processing queue. Args: - request: Submission request containing student_id, image_url, and optional assignment_id + request: Submission request containing student_id, submission_type, submission_url, and optional assignment_id Returns: SubmissionResponse with submission details and unique ID @@ -237,9 +282,11 @@ async def create_submission(request: SubmissionRequest): payload = { "student_id": hashed_student_id, "submission_id": submission_id, - "img_url": request.image_url, + "submission_type": request.submission_type, + "submission_url": request.submission_url, + "submission_text": request.submission_text, + "submitted_at": request.submitted_at, "assign_id": assignment_id, - "submitted_at": datetime.datetime.utcnow().isoformat(), } # Send to RabbitMQ diff --git a/app.py b/app.py index b54e76d..4b9cb7d 100644 --- a/app.py +++ b/app.py @@ -1,20 +1,32 @@ import asyncio import logging -import signal -import sys -import os -from mq.rmq_client import RabbitMQClient -from plag_checker.submissions_checker import SubmissionChecker -from dotenv import load_dotenv - -__version__ = "1.0.0" - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) -load_dotenv() +import signal +import sys +import os +from dotenv import load_dotenv +from mq.rmq_client import RabbitMQClient +from plag_checker.submissions_checker import SubmissionChecker + +__version__ = "1.0.0" + +load_dotenv() + +DEFAULT_LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + + +def get_log_format() -> str: + log_format = os.getenv("LOG_FORMAT", DEFAULT_LOG_FORMAT) + if log_format.lower() == "json": + return DEFAULT_LOG_FORMAT + return log_format + + +# Configure logging +logging.basicConfig( + level=getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO), + format=get_log_format(), +) +logger = logging.getLogger(__name__) def validate_configuration(): @@ -30,6 +42,10 @@ def validate_configuration(): ] missing = [var for var in required_env_vars if not os.getenv(var)] + #print the required env vars and their values for debugging + # for var in required_env_vars: + # logger.info("###################") + # logger.info(f"{var}={os.getenv(var)}") if missing: logger.error(f"Missing required environment variables: {missing}") raise ValueError(f"Missing required environment variables: {missing}") diff --git a/config/config.py b/config/config.py index a9d9572..a805c93 100644 --- a/config/config.py +++ b/config/config.py @@ -93,7 +93,7 @@ class DetectionConfig(BaseSettings): exact_dup_threshold: float = Field(default=0.95, env="EXACT_DUPLICATE_THRESHOLD") near_dup_threshold: float = Field(default=0.90, env="NEAR_DUPLICATE_THRESHOLD") - semantic_threshold: float = Field(default=0.80, env="SEMANTIC_MATCH_THRESHOLD") + semantic_threshold: float = Field(default=0.70, env="SEMANTIC_MATCH_THRESHOLD") # Hash matching thresholds (Hamming distance, 0-64 bits) hash_threshold: int = Field(default=8, env="HASH_MATCH_THRESHOLD") @@ -215,6 +215,19 @@ class Config: case_sensitive = False +class GCPConfig(BaseSettings): + """GCP Cloud Storage configuration for authenticated image downloads.""" + + gcp_enabled: bool = Field(default=True, env="GCP_ENABLED") + gcp_key_path: str = Field( + default="", env="GCP_KEY_PATH", description="Path to GCP service account JSON key file" + ) + + class Config: + env_file = ".env" + case_sensitive = False + + class LoggingConfig(BaseSettings): """Logging configuration.""" @@ -239,6 +252,7 @@ class AppConfig(BaseSettings): detection: DetectionConfig = DetectionConfig() vector_search: VectorSearchConfig = VectorSearchConfig() image_processing: ImageProcessingConfig = ImageProcessingConfig() + gcp: GCPConfig = GCPConfig() logging: LoggingConfig = LoggingConfig() app_name: str = Field(default="MentorMe Plagiarism Detection", env="APP_NAME") diff --git a/database/db_manager.py b/database/db_manager.py index 9263f3d..a962f8c 100644 --- a/database/db_manager.py +++ b/database/db_manager.py @@ -27,7 +27,7 @@ class DatabaseManager: await db.init_pool() try: # Use database operations - await db.insert_submission_if_not_exists(data, image_url) + await db.insert_submission_if_not_exists(data, submission_url) finally: await db.close() """ @@ -67,6 +67,9 @@ async def init_pool(self): db_name = os.getenv("POSTGRES_DB") or os.getenv("DB_NAME") db_host = os.getenv("POSTGRES_HOST") or os.getenv("DB_HOST", "localhost") db_port = int(os.getenv("POSTGRES_PORT") or os.getenv("DB_PORT", "5432")) + # db_port = 5435 # TEMP OVERRIDE FOR TESTING + + if not all([db_user, db_password, db_name]): raise ValueError("Missing required database environment variables") @@ -162,7 +165,7 @@ def _normalize_vector(self, embedding_list: Optional[Any]) -> str: return str(normalized) async def insert_submission_if_not_exists( - self, submission_data: dict, image_url: str, status: int + self, submission_data: dict, submission_url: Optional[str], status: int ): """ Insert a new submission if it doesn't already exist. @@ -170,8 +173,8 @@ async def insert_submission_if_not_exists( Uses INSERT ... ON CONFLICT for atomic upsert operation to prevent race conditions. Args: - submission_data: Dict containing submission_id, student_id, assign_id, img_url - image_url: URL of the submitted image + submission_data: Dict containing submission_id, student_id, assign_id, submission_url, submission_type, submission_text + submission_url: URL of the submitted content status: Initial submission status Returns: @@ -190,8 +193,8 @@ async def insert_submission_if_not_exists( try: insert_sql = """ - INSERT INTO submissions (submission_id, student_id, assign_id, image_url, status) - VALUES ($1, $2, $3, $4, $5) + INSERT INTO submissions (submission_id, student_id, assign_id, submission_url, submission_type, submission_text, status) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (submission_id) DO NOTHING RETURNING id; """ @@ -203,7 +206,9 @@ async def insert_submission_if_not_exists( submission_id, submission_data.get("student_id"), submission_data.get("assign_id"), - submission_data.get("img_url") or image_url, + submission_data.get("submission_url") or submission_url, + submission_data.get("submission_type"), + submission_data.get("submission_text"), status, ) @@ -248,8 +253,10 @@ async def update_result(self, submission_id: str, result: dict, status_val: int) await self._execute( """ UPDATE submissions - SET result = $1, status = $2, updated_at = NOW() - WHERE submission_id = $3; + SET result = $1, + status = $2, + updated_at = NOW() + WHERE submission_id = $3; """, json.dumps(result), status_val, @@ -268,13 +275,13 @@ async def update_status( message_value: str, ): """ - Update submission status. - - Valid status values: RECEIVED, PROCESSING, PROCESSED, PENDING_FEEDBACK_PUSH, COMPLETED, FAILED + Update submission status and retry metadata. Args: submission_id: Unique submission identifier - status_value: New status value + status_value: New status code + retry_count_value: Retry counter value + message_value: Human-readable status message Raises: asyncpg.PostgresError: If update fails @@ -287,13 +294,15 @@ async def update_status( result = await self._execute( """ UPDATE submissions - SET status = $1, message = $2, retry_count = $3, updated_at = $4 - WHERE submission_id = $5; + SET status = $1, + retry_count = $2, + message = $3, + updated_at = NOW() + WHERE submission_id = $4; """, status_value, - message_value, retry_count_value, - datetime.utcnow(), + message_value, submission_id, ) if result == "UPDATE 0": @@ -511,10 +520,10 @@ async def fetch_reference_images_by_id(self, reference_id): raise RuntimeError("Database pool not initialized") try: - image_path = await self._fetch( + image_path = await self._fetchval( """ SELECT image_path - FROM reference_images where id = $1; + FROM reference_images where reference_id = $1; """, reference_id, ) @@ -565,7 +574,7 @@ async def fetch_peer_submissions( # This prevents flagging the original submitter as a plagiarist records = await self._fetch( """ - SELECT id, student_id, assign_id,image_url, phash, dhash, ahash, created_at + SELECT id, student_id, assign_id, submission_url, phash, dhash, ahash, created_at FROM submissions WHERE student_id != $1 AND created_at > $2 @@ -580,7 +589,7 @@ async def fetch_peer_submissions( else: records = await self._fetch( """ - SELECT id, student_id, assign_id,image_url, phash, dhash, ahash, created_at + SELECT id, student_id, assign_id, submission_url, phash, dhash, ahash, created_at FROM submissions WHERE student_id != $1 AND phash IS NOT NULL @@ -612,7 +621,7 @@ async def fetch_self_submissions(self, current_student_id: str): try: records = await self._fetch( """ - SELECT id, created_at, phash, dhash, ahash, assign_id, image_url + SELECT id, created_at, phash, dhash, ahash, assign_id, submission_url FROM submissions WHERE student_id = $1 AND phash IS NOT NULL @@ -692,11 +701,11 @@ async def pgvector_search_peer_submissions( vec = self._normalize_vector(embedding_list) results = await self._fetch( """ - SELECT + SELECT submission_id, student_id, assign_id, - image_url, + submission_url, created_at, (clip_embedding <#> $1::vector) * -1 as similarity FROM submissions @@ -741,11 +750,11 @@ async def pgvector_search_self_submissions( vec = self._normalize_vector(embedding_list) results = await self._fetch( """ - SELECT + SELECT submission_id, student_id, assign_id, - image_url, + submission_url, created_at, (clip_embedding <#> $1::vector) * -1 as similarity FROM submissions diff --git a/database/init.sql b/database/init.sql index 0448456..55997a5 100644 --- a/database/init.sql +++ b/database/init.sql @@ -1,7 +1,5 @@ -- UUID support CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; - --- pgvector extension for vector similarity search (only needed if USE_PGVECTOR=true) CREATE EXTENSION IF NOT EXISTS vector; -- Drop existing tables if doing a clean restore (commented by default) @@ -16,7 +14,9 @@ CREATE TABLE IF NOT EXISTS submissions ( submission_id VARCHAR(100) UNIQUE NOT NULL, assign_id VARCHAR(200), student_id VARCHAR(64) NOT NULL, -- SHA-256 hashed - image_url TEXT, + submission_url TEXT, + submission_type VARCHAR(20), + submission_text TEXT, status integer DEFAULT 0, retry_count integer DEFAULT 0, result jsonb, @@ -72,12 +72,12 @@ USING hnsw (clip_embedding vector_ip_ops); -- Reference images corpus CREATE TABLE IF NOT EXISTS reference_images ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), - reference_id VARCHAR(100) UNIQUE NOT NULL, + reference_id VARCHAR(200) UNIQUE NOT NULL, -- will be used as assignment id when fetching references from assignments image_path TEXT NOT NULL, phash VARCHAR(64) NOT NULL, dhash VARCHAR(64) NOT NULL, ahash VARCHAR(64) NOT NULL, - category VARCHAR(100), + category VARCHAR(200), description TEXT, source VARCHAR(200), faiss_index_position INTEGER, diff --git a/database/migrations/002_rename_and_add_fields.sql b/database/migrations/002_rename_and_add_fields.sql new file mode 100644 index 0000000..ac4e7dd --- /dev/null +++ b/database/migrations/002_rename_and_add_fields.sql @@ -0,0 +1,4 @@ +-- Migration: Rename image_url to submission_url and add submission_type, submission_text +ALTER TABLE submissions RENAME COLUMN image_url TO submission_url; +ALTER TABLE submissions ADD COLUMN submission_type VARCHAR(20); +ALTER TABLE submissions ADD COLUMN submission_text TEXT; \ No newline at end of file diff --git a/docker-compose-prod.yml b/docker-compose-prod.yml index 89a21cb..0f28bd6 100644 --- a/docker-compose-prod.yml +++ b/docker-compose-prod.yml @@ -1,6 +1,42 @@ version: '3.8' services: + # =================================== + # POSTGRESQL - Database + # =================================== + postgres: + image: pgvector/pgvector:pg16 + container_name: plg-postgres + ports: + - "5432:5432" + environment: + POSTGRES_DB: ${POSTGRES_DB} + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_INITDB_ARGS: "-E UTF8" + POSTGRES_MAX_CONNECTIONS: 20 + PGDATA: /var/lib/postgresql/data/pgdata + volumes: + - postgres_data:/var/lib/postgresql/data + - ./database/init.sql:/docker-entrypoint-initdb.d/init.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + deploy: + resources: + limits: + cpus: '0.5' + memory: 512M + reservations: + cpus: '0.25' + memory: 256M + networks: + - plg-network + restart: unless-stopped + # =================================== # PLAGIARISM CHECKER SERVICE # =================================== @@ -8,32 +44,66 @@ services: build: context: . dockerfile: Dockerfile - container_name: mentorme-plagiarism-checker + container_name: plg-checker env_file: - .env volumes: - ./data:/app/data - ./logs:/app/logs depends_on: - rabbitmq: + postgres: condition: service_healthy + deploy: + resources: + limits: + cpus: '8.0' # Increased from 4.0 - allows up to 8 CPU cores + memory: 8G + reservations: + cpus: '2' # Increased from 1 - guarantees 2 cores minimum + memory: 2G + restart: unless-stopped + networks: + - plg-network + + # =================================== + # API SERVICE + # =================================== + api: + build: + context: . + dockerfile: Dockerfile.api + container_name: plg-api + env_file: + - .env + volumes: + - ./data:/app/data + - ./logs:/app/logs + depends_on: postgres: condition: service_healthy deploy: resources: limits: - cpus: '1.0' - memory: 4G + cpus: '8.0' # Increased from 4.0 - allows up to 8 CPU cores + memory: 8G reservations: - cpus: '0.5' + cpus: '2' # Increased from 1 - guarantees 2 cores minimum memory: 2G restart: unless-stopped networks: - - mentorme-plagiarism-network + - plg-network + + +# =================================== +# VOLUMES +# =================================== +volumes: + postgres_data: + driver: local # =================================== # NETWORKS # =================================== networks: - mentorme-plagiarism-network: + plg-network: driver: bridge diff --git a/docker-postgres.yml b/docker-postgres.yml new file mode 100644 index 0000000..28f0cab --- /dev/null +++ b/docker-postgres.yml @@ -0,0 +1,45 @@ +version: '3.8' + +services: + # =================================== + # POSTGRESQL - Database + # =================================== + postgres: + image: docker.io/pgvector/pgvector:pg16 + container_name: plg-postgresdb + ports: + - "5432:5432" + environment: + POSTGRES_DB: plagiarism_db + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_INITDB_ARGS: "-E UTF8" + POSTGRES_MAX_CONNECTIONS: 20 + PGDATA: /var/lib/postgresql/data/pgdata + volumes: + - postgres_data:/var/lib/postgresql/data + - ./database/init.sql:/docker-entrypoint-initdb.d/init.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + deploy: + resources: + limits: + cpus: '0.5' + memory: 512M + reservations: + cpus: '0.25' + memory: 256M + restart: always + + +# =================================== +# VOLUMES +# =================================== +volumes: + postgres_data: + driver: local + diff --git a/docs/DOCUMENTATION.md b/docs/DOCUMENTATION.md index 9bae9d4..9747212 100644 --- a/docs/DOCUMENTATION.md +++ b/docs/DOCUMENTATION.md @@ -776,11 +776,11 @@ asyncio.run(test()) #### Build Docker Image ```bash # Standard build (model downloaded on first run) -docker build -t mentorme-plagiarism:latest . +docker build -t plg:latest . # With HuggingFace token for model prefetch during build (optional) # This pre-downloads the CLIP model into the Docker image -docker build -t mentorme-plagiarism:latest \ +docker build -t plg:latest \ --build-arg HUGGINGFACE_HUB_TOKEN=your_token_here . # Note: HuggingFace token is optional - public models can be downloaded without authentication @@ -804,7 +804,7 @@ docker-compose down # docker-compose.yml snippet services: worker: - image: mentorme-plagiarism:latest + image: plg:latest environment: - POSTGRES_HOST=postgres - RABBITMQ_HOST=rabbitmq diff --git a/image_worker/assigment_ref_images.py b/image_worker/assigment_ref_images.py new file mode 100644 index 0000000..10617f4 --- /dev/null +++ b/image_worker/assigment_ref_images.py @@ -0,0 +1,347 @@ +import base64 +import io +import os +from PIL import Image +import requests +from typing import List, Dict, Optional +from dotenv import load_dotenv +import asyncpg +from datetime import datetime +import logging + +load_dotenv() + +logger = logging.getLogger(__name__) + +# Environment variables +ASSIGNMENT_CACHE_DAYS = int(os.getenv("ASSIGNMENT_CACHE_DAYS", "2")) +ENABLE_CACHE = os.getenv("ENABLE_CACHE", "true").lower() == "true" +PURGE_CACHE = os.getenv("PURGE_CACHE", "false").lower() == "true" + +# Database configuration +DB_CONFIG = { + "host": os.getenv("POSTGRES_HOST", "localhost"), + "port": int(os.getenv("POSTGRES_PORT", 5432)), + "database": os.getenv("POSTGRES_DB", "plagiarism_db"), + "user": os.getenv("POSTGRES_USER", "postgres"), + "password": os.getenv("POSTGRES_PASSWORD", "postgres"), +} + + +async def get_db_connection(): + """Create async database connection.""" + conn_string = ( + f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}" + f"@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}" + ) + return await asyncpg.connect(conn_string) + + +async def cleanup_cache(): + """Delete cached assignment reference images older than ASSIGNMENT_CACHE_DAYS.""" + try: + conn = await get_db_connection() + try: + # cutoff_date = datetime.utcnow() - timedelta(days=ASSIGNMENT_CACHE_DAYS) + + # Delete old assignment caches (reference_ids starting with "ASSIGN-") + result = await conn.execute( + """ + DELETE FROM reference_images + WHERE reference_id LIKE 'ASSIGN-%' + """ + ) + + deleted_count = int(result.split()[-1]) if result else 0 + if deleted_count > 0: + logger.info(f"Cleaned up {deleted_count} cached assignment reference images") + + finally: + await conn.close() + + except Exception as e: + logger.error(f"Cache cleanup failed: {e}") + + +async def get_cached_assignment(assignment_id: str) -> Optional[List[Dict]]: + """ + Retrieve cached reference images for an assignment from database. + + Args: + assignment_id: Assignment identifier + + Returns: + List of reference image dicts with precomputed hashes (no embeddings), or None if not cached + """ + try: + conn = await get_db_connection() + try: + # Query for cached images with this assignment_id (no clip_embedding in SELECT) + rows = await conn.fetch( + """ + SELECT reference_id, image_path, phash, dhash, ahash, created_at + FROM reference_images + WHERE reference_id LIKE $1 + ORDER BY reference_id + """, + f"ASSIGN-{assignment_id}-%" + ) + + if not rows: + logger.info(f"No cached reference images found for assignment: {assignment_id}") + return None + + # Check if cache is still valid + oldest_created = min(row['created_at'] for row in rows) + age_days = (datetime.utcnow() - oldest_created).days + + if age_days > ASSIGNMENT_CACHE_DAYS: + logger.info(f"Cache expired for assignment {assignment_id} (age: {age_days} days)") + return None + + logger.info(f"Retrieved {len(rows)} cached reference images for assignment: {assignment_id}") + + # Return just the hashes and name (no embeddings) + images = [] + for row in rows: + images.append({ + "name": row['reference_id'] + row['image_path'], # Image name stored in image_path + "phash": row['phash'], + "dhash": row['dhash'], + "ahash": row['ahash'] + }) + + return images + + finally: + await conn.close() + + except Exception as e: + logger.error(f"Failed to retrieve cached assignment: {e}") + return None + + +async def save_to_cache(assignment_id: str, images: List[Dict]): + """ + Save assignment reference images (hashes + embeddings) to database cache. + + Args: + assignment_id: Assignment identifier + images: List of image dicts with "name", "phash", "dhash", "ahash", and optionally "embedding" + """ + try: + conn = await get_db_connection() + try: + cached_count = 0 + + for idx, img_data in enumerate(images): + try: + # Create unique reference_id for this assignment's reference image + reference_id = f"ASSIGN-{assignment_id}-{idx:03d}" + image_name = img_data.get("name", f"ref_{idx}") + + # Check if hashes are precomputed + if "phash" not in img_data or "dhash" not in img_data or "ahash" not in img_data: + logger.warning(f"Hashes not precomputed for image {idx}, skipping cache") + continue + + # Extract embedding if present + embedding = img_data.get("embedding") + + if embedding is not None: + # Convert numpy array to pgvector format + embedding_str = '[' + ','.join(map(str, embedding.tolist())) + ']' + clip_generated = True + else: + embedding_str = None + clip_generated = False + + # Insert into database with embedding + await conn.execute( + """ + INSERT INTO reference_images + (reference_id, image_path, phash, dhash, ahash, category, description, source, + clip_embedding_generated, clip_embedding) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::vector) + ON CONFLICT (reference_id) DO UPDATE SET + image_path = EXCLUDED.image_path, + phash = EXCLUDED.phash, + dhash = EXCLUDED.dhash, + ahash = EXCLUDED.ahash, + clip_embedding_generated = EXCLUDED.clip_embedding_generated, + clip_embedding = EXCLUDED.clip_embedding, + updated_at = NOW() + """, + reference_id, + image_name, + img_data["phash"], + img_data["dhash"], + img_data["ahash"], + "assignment_cache", + f"Reference image from {image_name}", + f"assignment_{assignment_id}", + clip_generated, + embedding_str + ) + + cached_count += 1 + + except Exception as img_err: + logger.error(f"Failed to cache image {idx} for assignment {assignment_id}: {img_err}") + continue + + logger.info(f"Cached {cached_count}/{len(images)} reference images for assignment: {assignment_id}") + + finally: + await conn.close() + + except Exception as e: + logger.error(f"Failed to save to cache: {e}") + + +async def get_reference_images( + assignment_id: str, + clip_handler=None, + hash_handler=None +) -> Optional[List[Dict]]: + """ + Fetch reference images for an assignment from TAP LMS API with caching support. + + Args: + assignment_id: Assignment identifier + clip_handler: CLIPHandler instance from worker (optional) + hash_handler: HashHandler instance from worker (optional) + + Returns: + List of reference image dictionaries with precomputed hashes (no embeddings in return) + """ + try: + # Cleanup old cache if enabled + if PURGE_CACHE: + await cleanup_cache() + + # Check cache first if enabled + if ENABLE_CACHE: + cached_images = await get_cached_assignment(assignment_id) + if cached_images is not None: + logger.info(f"Using cached reference images for assignment: {assignment_id}") + return cached_images + + # Fetch from API + logger.info(f"Fetching reference images from API for assignment: {assignment_id}") + images = await fetch_from_api(assignment_id, clip_handler, hash_handler) + + # Save to cache if enabled (embeddings will be saved to DB but not returned) + if ENABLE_CACHE and images: + await save_to_cache(assignment_id, images) + + # Remove embeddings from return object + if images: + for img in images: + img.pop("embedding", None) + + return images + + except Exception as e: + logger.error(f"Error fetching reference images: {e}") + return None + + +async def fetch_from_api( + assignment_id: str, + clip_handler=None, + hash_handler=None +) -> Optional[List[Dict]]: + """ + Fetch reference images for an assignment from TAP LMS API and compute hashes + embeddings. + + Args: + assignment_id: Assignment identifier + clip_handler: CLIPHandler instance from worker (optional) + hash_handler: HashHandler instance from worker (optional) + + Returns: + List of reference image dictionaries with precomputed hashes and embeddings + """ + api_key = os.getenv("FRAPPE_API_KEY") + api_secret = os.getenv("FRAPPE_API_SECRET") + base_url = os.getenv("FRAPPE_API_BASE_URL") + + if not all([api_key, api_secret, base_url]): + logger.error("Missing API configuration: FRAPPE_API_KEY, FRAPPE_API_SECRET, or FRAPPE_API_BASE_URL") + return None + + if hash_handler is None: + logger.error("No hash_handler provided, cannot compute hashes") + return None + + assignment_context_endpoint = "api/method/tap_lms.imgana.submission.get_assignment_context" + + headers = { + "Content-Type": "application/json", + "Authorization": f"token {api_key}:{api_secret}" + } + + api_url = f"{base_url.rstrip('/')}/{assignment_context_endpoint.lstrip('/')}" + + try: + # Use synchronous requests in async context (consider aiohttp for true async) + response = requests.post( + api_url, + headers=headers, + json={"assignment_id": assignment_id}, + timeout=30 + ) + + response.raise_for_status() + data = response.json() + + reference_images = data.get("message", {}).get("assignment", {}).get("reference_images", []) + + if not reference_images: + logger.warning(f"No reference images found for assignment: {assignment_id}") + return [] + + # Process images: decode, compute hashes and embeddings, then discard PIL objects + processed_images = [] + for image in reference_images: + try: + decoded_bytes = base64.b64decode(image["content"]) + image_obj = Image.open(io.BytesIO(decoded_bytes)) + + # Compute hashes using passed handler + hashes = hash_handler.compute_hashes(image_obj) + + # Generate CLIP embedding if handler provided + embedding = None + if clip_handler is not None: + try: + embedding = clip_handler.generate_embedding(image_obj) + except Exception as embed_err: + logger.error(f"Failed to generate embedding for {image.get('name', 'unknown')}: {embed_err}") + + # Close PIL Image - we don't need it anymore + image_obj.close() + + # Store hashes and embedding (embedding will be saved to DB but removed before return) + processed_images.append({ + "name": image.get("name", f"ref_{len(processed_images)}"), + "phash": hashes["phash"], + "dhash": hashes["dhash"], + "ahash": hashes["ahash"], + "embedding": embedding # Temporary, for save_to_cache + }) + + except Exception as img_err: + logger.error(f"Failed to process image {image.get('name', 'unknown')}: {img_err}") + continue + + logger.info(f"Fetched and processed {len(processed_images)} reference images from API for assignment: {assignment_id}") + return processed_images + + except requests.exceptions.RequestException as e: + logger.error(f"API request failed for assignment {assignment_id}: {e}") + return None + except Exception as e: + logger.error(f"Failed to fetch from API: {e}") + return None \ No newline at end of file diff --git a/image_worker/gcs_client.py b/image_worker/gcs_client.py new file mode 100644 index 0000000..29bda27 --- /dev/null +++ b/image_worker/gcs_client.py @@ -0,0 +1,261 @@ +""" +GCP Cloud Storage (GCS) client helper functions for authenticated image downloads. + +Provides async-compatible methods to download images from GCS buckets using +service account authentication via JSON key files. +""" + +import logging +import asyncio +import json +from io import BytesIO +from pathlib import Path +from typing import Optional +from urllib.parse import urlparse +from PIL import Image +from google.cloud import storage +from google.oauth2 import service_account +from google.api_core.exceptions import NotFound + +logger = logging.getLogger(__name__) + + +def resolve_gcp_key_path(key_path: str) -> Path: + """Resolve the configured GCP key path for container and local runs.""" + configured_path = Path(key_path).expanduser() + + if configured_path.exists(): + return configured_path + + project_root = Path(__file__).resolve().parent.parent + + if configured_path.is_absolute(): + try: + local_path = project_root / "app" / configured_path.relative_to("/app") + except ValueError: + local_path = configured_path + else: + local_path = project_root / configured_path + + return local_path + + +def load_gcp_credentials(key_path: str) -> Optional[service_account.Credentials]: + """ + Load GCP service account credentials from JSON key file. + + Args: + key_path: Path to the GCP service account JSON key file + + Returns: + service_account.Credentials object, or None if loading fails + + Raises: + FileNotFoundError: If key file does not exist + ValueError: If key file is invalid JSON or missing required fields + """ + try: + key_file = resolve_gcp_key_path(key_path) + if not key_file.exists(): + raise FileNotFoundError(f"GCP key file not found: {key_file}") + + with open(key_file, "r") as f: + key_data = json.load(f) + + credentials = service_account.Credentials.from_service_account_info(key_data) + logger.debug(f"GCP credentials loaded successfully from: {key_file}") + return credentials + + except FileNotFoundError as e: + logger.error(f"GCP key file error: {e}") + raise + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in GCP key file: {e}") + raise ValueError(f"Invalid JSON in GCP key file: {e}") + except Exception as e: + logger.error(f"Failed to load GCP credentials: {e}") + raise + + +def parse_gcs_url(gcs_url: str) -> tuple[str, str]: + """ + Parse a GCS URL into bucket name and blob path. + + Args: + gcs_url: GCS URL in format gs://bucket-name/path/to/object + + Returns: + Tuple of (bucket_name, blob_path) + + Raises: + ValueError: If URL is not a valid GCS URL + """ + if gcs_url.startswith("gs://"): + path = gcs_url[5:] + elif gcs_url.startswith("https://storage.googleapis.com/"): + # support public and authenticated GCS object URLs + parsed = urlparse(gcs_url) + path = parsed.path.lstrip("/") + elif gcs_url.startswith("http://storage.googleapis.com/"): + parsed = urlparse(gcs_url) + path = parsed.path.lstrip("/") + else: + raise ValueError(f"Invalid GCS URL: {gcs_url}. Must start with 'gs://' or 'https://storage.googleapis.com/'") + + # Split bucket name and blob path + parts = path.split("/", 1) + if len(parts) < 2: + raise ValueError(f"Invalid GCS URL: {gcs_url}. Must include bucket and blob path") + + bucket_name = parts[0] + blob_path = parts[1] + + if not bucket_name or not blob_path: + raise ValueError(f"Invalid GCS URL: {gcs_url}. Bucket and blob path cannot be empty") + + return bucket_name, blob_path + + +def create_gcs_client(credentials: service_account.Credentials) -> storage.Client: + """ + Create a GCP Storage client with provided credentials. + + Args: + credentials: GCP service account credentials + + Returns: + google.cloud.storage.Client instance + + Raises: + Exception: If client creation fails + """ + try: + client = storage.Client(credentials=credentials) + logger.debug("GCS client created successfully") + return client + except Exception as e: + logger.error(f"Failed to create GCS client: {e}") + raise + + +async def download_from_gcs( + gcs_url: str, + credentials: service_account.Credentials, + timeout: int = 30, +) -> Image.Image: + """ + Download an image from GCS bucket asynchronously. + + Args: + gcs_url: GCS URL in format gs://bucket-name/path/to/image + credentials: GCP service account credentials + timeout: Download timeout in seconds + + Returns: + PIL Image object + + Raises: + ValueError: If URL is invalid or credentials missing + FileNotFoundError: If bucket or blob not found + Exception: For other GCS operation failures + """ + try: + bucket_name, blob_path = parse_gcs_url(gcs_url) + logger.debug( + f"Downloading from GCS: bucket={bucket_name}, blob={blob_path}, timeout={timeout}s" + ) + + # Run blocking GCS operations in executor to avoid blocking event loop + loop = asyncio.get_event_loop() + image = await loop.run_in_executor( + None, + _download_gcs_blob_sync, + gcs_url, + credentials, + bucket_name, + blob_path, + timeout, + ) + + logger.info(f"Successfully downloaded image from GCS: {gcs_url}") + return image + + except Exception as e: + logger.error(f"GCS download failed: url={gcs_url}, error={e}", exc_info=True) + raise + + +def _download_gcs_blob_sync( + gcs_url: str, + credentials: service_account.Credentials, + bucket_name: str, + blob_path: str, + timeout: int, +) -> Image.Image: + """ + Synchronous helper to download blob from GCS (runs in executor). + + Args: + gcs_url: Full GCS URL for logging + credentials: GCP service account credentials + bucket_name: Name of the GCS bucket + blob_path: Path to the blob within the bucket + timeout: Download timeout in seconds + + Returns: + PIL Image object + + Raises: + FileNotFoundError: If bucket or blob not found + Exception: For other GCS operation failures + """ + try: + client = create_gcs_client(credentials) + bucket = client.bucket(bucket_name) + blob = bucket.blob(blob_path) + + try: + blob_content = blob.download_as_bytes(timeout=timeout) + except NotFound: + raise FileNotFoundError( + f"GCS blob not found: gs://{bucket_name}/{blob_path}" + ) + + # Parse image + try: + image = Image.open(BytesIO(blob_content)) + logger.debug( + f"Image parsed successfully: format={image.format}, size={image.size}" + ) + return image + except Exception as img_error: + raise ValueError( + f"Invalid or corrupted image format from GCS: {str(img_error)}" + ) + + except FileNotFoundError: + raise + except Exception as e: + logger.error( + f"GCS blob download error: bucket={bucket_name}, blob={blob_path}, error={e}" + ) + raise + + +def is_gcs_url(url: str) -> bool: + """ + Check if a URL is a GCS URL. + + Args: + url: URL to check + + Returns: + True if URL is a GCS URL using 'gs://' or 'storage.googleapis.com', False otherwise + """ + if not url: + return False + return ( + url.startswith("gs://") + or url.startswith("https://storage.googleapis.com/") + or url.startswith("http://storage.googleapis.com/") + ) diff --git a/image_worker/image_validator.py b/image_worker/image_validator.py index 43a7b25..e4d1666 100644 --- a/image_worker/image_validator.py +++ b/image_worker/image_validator.py @@ -53,6 +53,26 @@ class ImageValidator: "vecteezy", ] + + IMAGE_EXTENSIONS = { + "jpg", + "jpeg", + "png", + "gif", + "webp", + "bmp", + "tiff", + "heic", + } + VIDEO_EXTENSIONS = { + "mp4", + "mov", + "webm", + "mkv", + "avi", + "mpeg", + "mpg", + } def __init__( self, min_variance_threshold: float = 5.0, @@ -71,6 +91,24 @@ def __init__( self.min_unique_colors = min_unique_colors self.max_solid_color_ratio = max_solid_color_ratio + + def detect_media_type(self, submission_url: str) -> str: + """Detect media type based on URL/extension.""" + if not submission_url: + return "image" + + url_without_query = submission_url.split("?", 1)[0].lower() + if "." in url_without_query: + ext = url_without_query.rsplit(".", 1)[-1] + if ext in self.IMAGE_EXTENSIONS: + return "image" + if ext in self.VIDEO_EXTENSIONS: + return "video" + + if "video" in url_without_query: + return "video" + return "image" + def check_stock_image_url(self, image_url: str) -> Tuple[bool, Optional[str]]: """ Check if URL is from a known stock image website. diff --git a/image_worker/worker.py b/image_worker/worker.py index 0798bef..477def2 100644 --- a/image_worker/worker.py +++ b/image_worker/worker.py @@ -1,16 +1,14 @@ import json from datetime import datetime import logging -import aiohttp -import ssl from PIL import Image -from io import BytesIO import time import numpy as np import asyncio from typing import Dict, Optional, Tuple, Any -from urllib.parse import urlparse from dotenv import load_dotenv +from image_worker.assigment_ref_images import get_reference_images +from image_worker.gcs_client import is_gcs_url, download_from_gcs, load_gcp_credentials from config.config import config from database.db_manager import DatabaseManager @@ -23,10 +21,8 @@ from utils.exceptions import ( WorkerNotInitializedError, ValidationError, - ImageDownloadError, - NetworkTimeoutError, InvalidImageURLError, - InvalidImageFormatError, + GCSDownloadError, ) load_dotenv() @@ -58,7 +54,6 @@ def __init__(self, db_manager=None): config.image_processing.max_image_height, ) self.DOWNLOAD_TIMEOUT = config.image_processing.download_timeout - self.DOWNLOAD_RETRIES = config.image_processing.download_retries self.use_pgvector = config.vector_search.use_pgvector self.faiss_top_k = config.vector_search.faiss_top_k @@ -86,6 +81,20 @@ def __init__(self, db_manager=None): ) self.vector_handler = None + self.gcp_credentials = None + + # Initialize GCP credentials if enabled + if config.gcp.gcp_enabled: + try: + self.gcp_credentials = load_gcp_credentials(config.gcp.gcp_key_path) + logger.info(f"GCP credentials loaded from: {config.gcp.gcp_key_path}") + except Exception as e: + logger.error(f"Failed to load GCP credentials: {e}") + if config.environment == "production": + raise GCSDownloadError( + f"GCP authentication failed: {e}", + details={"key_path": config.gcp.gcp_key_path}, + ) if not self.use_pgvector: self.vector_handler = FAISSHandler( @@ -120,99 +129,56 @@ async def close(self): async def download_image(self, image_url: str) -> Image.Image: """ - Download image from URL with timeout and validation. + Download image from a Google Cloud Storage URL. + + Supports gs:// and storage.googleapis.com URLs via configured GCP + credentials. Args: - image_url: HTTP/HTTPS URL of the image + image_url: GCS URL (gs://bucket-name/path/to/image or + https://storage.googleapis.com/bucket-name/path/to/image) Returns: PIL Image object Raises: InvalidImageURLError: If URL is malformed or invalid - NetworkTimeoutError: If download times out - InvalidImageFormatError: If file is not a valid image - ImageDownloadError: For other download failures + GCSDownloadError: For GCS authentication, bucket, blob, or download failures """ - max_retries = self.DOWNLOAD_RETRIES - for attempt in range(max_retries): - try: - parsed = urlparse(image_url) - if not parsed.scheme or not parsed.netloc: - raise InvalidImageURLError( - f"Invalid URL: {image_url}", - details={"url": image_url, "parsed": str(parsed)}, - ) - - logger.debug( - f"Downloading image: url={image_url[:80]}..., attempt={attempt + 1}/{max_retries}" - ) - - # Configure SSL context based on config - if config.image_processing.disable_ssl_verify: - ssl_context = ssl.create_default_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - logger.debug("SSL verification disabled for image download") - connector = aiohttp.TCPConnector(ssl=ssl_context) - else: - connector = aiohttp.TCPConnector() - - async with aiohttp.ClientSession(connector=connector) as session: - async with session.get( - image_url, - timeout=aiohttp.ClientTimeout(total=self.DOWNLOAD_TIMEOUT), - ) as response: - response.raise_for_status() - content = await response.read() - - try: - image = Image.open(BytesIO(content)) - return image - except ValidationError: - raise - except Exception as img_error: - raise InvalidImageFormatError( - f"Invalid or corrupted image format: {str(img_error)}", - details={"url": image_url, "error": str(img_error)}, - ) - - except asyncio.TimeoutError: - logger.warning( - f"Download timeout: attempt={attempt + 1}/{max_retries}, " - f"timeout={self.DOWNLOAD_TIMEOUT}s" - ) - if attempt == max_retries - 1: - raise NetworkTimeoutError( - f"Download timed out after {max_retries} attempts", - details={ - "url": image_url, - "timeout": self.DOWNLOAD_TIMEOUT, - "retries": max_retries, - }, - ) - await asyncio.sleep(1) + if not is_gcs_url(image_url): + raise InvalidImageURLError( + f"Invalid GCS URL: {image_url}", + details={"url": image_url}, + ) - except aiohttp.ClientError as client_error: - logger.warning( - f"Download failed: attempt={attempt + 1}/{max_retries}, " - f"error={str(client_error)}" - ) - if attempt == max_retries - 1: - raise ImageDownloadError( - f"Failed to download image: {str(client_error)}", - details={ - "url": image_url, - "error": str(client_error), - "retries": max_retries, - }, - ) - await asyncio.sleep(1) + if not self.gcp_credentials: + raise GCSDownloadError( + "GCP credentials not initialized. Set GCP_ENABLED=true and GCP_KEY_PATH in .env", + details={"url": image_url}, + ) - raise ImageDownloadError( - f"Failed to download image after {max_retries} attempts", - details={"url": image_url, "retries": max_retries}, - ) + logger.debug(f"Downloading image from GCS: url={image_url}") + try: + return await download_from_gcs( + image_url, + self.gcp_credentials, + timeout=self.DOWNLOAD_TIMEOUT, + ) + except FileNotFoundError as e: + raise GCSDownloadError( + f"GCS bucket or blob not found: {str(e)}", + details={"url": image_url}, + ) + except ValueError as e: + raise GCSDownloadError( + f"Invalid or corrupted image from GCS: {str(e)}", + details={"url": image_url}, + ) + except Exception as e: + raise GCSDownloadError( + f"GCS download failed: {str(e)}", + details={"url": image_url, "error": str(e)}, + ) def _validate_input(self, data: Dict[str, Any]) -> Tuple[str, str, str, str, str]: """ @@ -227,7 +193,7 @@ def _validate_input(self, data: Dict[str, Any]) -> Tuple[str, str, str, str, str Raises: ValidationError: If required fields are missing or invalid """ - required_fields = ["submission_id", "student_id", "img_url", "db_record_id"] + required_fields = ["submission_id", "student_id", "submission_url", "db_record_id"] for field in required_fields: if field not in data or not data[field]: raise ValidationError( @@ -238,7 +204,7 @@ def _validate_input(self, data: Dict[str, Any]) -> Tuple[str, str, str, str, str submission_id = data["submission_id"] student_id = data["student_id"] assign_id = data.get("assign_id", "N/A") - image_url = data["img_url"] + image_url = data["submission_url"] db_record_id = data["db_record_id"] return submission_id, student_id, assign_id, image_url, db_record_id @@ -291,7 +257,71 @@ async def _async_compare_self(self, hashes, prev): ) return comparison, prev - async def check_hash_match( + async def check_assignment_reference_hash_match( + self, hashes: dict, assignment_id: str + ) -> Tuple[bool, Optional[str], Optional[float], Optional[str]]: + """ + Check if submission matches any reference image via perceptual hash comparison. + + Uses three hash types (pHash, dHash, aHash) for robust duplicate detection. + + Args: + hashes: Dict containing 'phash', 'dhash', 'ahash' hex strings + + Returns: + Tuple of (is_match, reference_id, similarity_score, image_url) + - is_match: True if hash match found + - reference_id: UUID of matched reference (or None) + - similarity_score: 0.0-1.0 similarity score (or None) + - image_url: URL of matched reference image (or None) + + Raises: + Exception: If database query fails + """ + try: + + references = await get_reference_images(assignment_id, self.clip_handler, self.hash_handler) + if not references: + return False, None, None, None + + # for ref_image in references: + # if ref_image["content"] is not None: + # hashes = self.hash_handler.compute_hashes(ref_image["content"]) + # ref_image['phash'] = hashes['phash'] + # ref_image['dhash'] = hashes['dhash'] + # ref_image['ahash'] = hashes['ahash'] + + tasks = [self._async_compare_ref(hashes, ref) for ref in references] + results = await asyncio.gather(*tasks) + + + best_match = None + best_score = 999 + best_comparison = None + for comparison, ref in results: + if comparison["is_match"] and comparison["avg_distance"] < best_score: + best_score = comparison["avg_distance"] + best_match = ref + best_comparison = comparison + + if best_match and best_comparison: + logger.info("Assignment reference match found") + similarity = 1 - (best_score / 64.0) + return ( + True, + str(best_match["name"]), + similarity, + str(best_match["name"]), + ) + else: + logger.info("No assignment reference match found") + return False, None, None, None + + except Exception as e: + logger.error(f"Hash check failed: {e}", exc_info=True) + raise + + async def check_db_reference_hash_match( self, hashes: dict ) -> Tuple[bool, Optional[str], Optional[float], Optional[str]]: """ @@ -375,6 +405,11 @@ async def check_clip_match( if not results: return None, 0.0, None + + # print("#"*70) + # for ref_id, sim, meta in results: + # print(f" Ref ID: {ref_id}, Similarity: {sim:.4f}, Meta: {meta}") + # print("#"*70) matches = [ (ref_id, sim, meta) @@ -479,10 +514,12 @@ async def process_submission(self, data: Dict[str, Any]) -> Optional[str]: try: extracted = self._validate_input(data) - submission_id, student_id, assign_id, image_url, db_record_id = extracted + submission_id, student_id, assign_id, submission_url, db_record_id = extracted logger.info(f"Processing submission: {submission_id}") + image_url = submission_url + # Check for stock image URLs before downloading is_stock, stock_site = self.image_validator.check_stock_image_url(image_url) if is_stock and stock_site: @@ -540,7 +577,8 @@ async def process_submission(self, data: Dict[str, Any]) -> Optional[str]: student_id, self_result.get("first_submission_date_for_image", None), ) - hash_check_result = await self.check_hash_match(hashes) + # hash_check_result = await self.check_db_reference_hash_match(hashes) + hash_check_result = await self.check_assignment_reference_hash_match(hashes,assign_id) ( hash_match, @@ -1020,6 +1058,9 @@ def _create_stock_image_result( "student_id": student_id, "assignment_id": assign_id, "image_url": image_url, + "is_ai_generated": False, + "ai_detection_source": "None", + "ai_confidence": 0.0, "is_plagiarized": True, "similarity_score": 1.0, "match_type": "stock_image", @@ -1290,8 +1331,9 @@ def format_results( ) message["ai_confidence"] = plagiarism_status.get("ai_confidence", 0.0) - payload_preview = json.dumps(message, indent=2)[:2000] - logger.info(f"Result payload preview (2000 chars):\n{payload_preview}...") + # payload_preview = json.dumps(message, indent=2)[:2000] + # logger.info(f"Result payload preview (2000 chars):\n{payload_preview}...") + return json.dumps(message) diff --git a/mq/rmq_client.py b/mq/rmq_client.py index f99877f..7848ca0 100644 --- a/mq/rmq_client.py +++ b/mq/rmq_client.py @@ -91,10 +91,8 @@ async def connect(self): ) self.channel = await self.connection.channel() - # Set prefetch count to 1 to avoid multiple slow CLIP inferences in parallel - # This prevents heartbeat timeouts from multiple long-running tasks - prefetch = int(os.getenv("RABBITMQ_PREFETCH_COUNT", "1")) - await self.channel.set_qos(prefetch_count=prefetch) + # Keep consumer concurrency bounded for slow CLIP inference workloads. + await self.channel.set_qos(prefetch_count=self.PREFETCH_COUNT) # Declare Dead Letter Queue first if configured if self.DEAD_LETTER_QUEUE: @@ -103,24 +101,29 @@ async def connect(self): ) logger.info(f"Dead Letter Queue declared: {self.DEAD_LETTER_QUEUE}") - # Declare main submission queue self.submission_queue = await self.channel.declare_queue( - self.SUBMISSION_QUEUE, durable=True + self.SUBMISSION_QUEUE, + durable=True, ) logger.info(f"Submission queue declared: {self.SUBMISSION_QUEUE}") - # Declare feedback queue for publishing results self.feedback_queue = await self.channel.declare_queue( - self.FEEDBACK_QUEUE, durable=True + self.FEEDBACK_QUEUE, + durable=True, ) logger.info(f"Feedback queue declared: {self.FEEDBACK_QUEUE}") + logger.info( f"Connected to RabbitMQ with prefetch_count={self.PREFETCH_COUNT}, all queues declared" ) return except Exception as e: logger.error(f"RabbitMQ connection failed: {e}") + if self.connection: + await self.connection.close() + self.connection = None + self.channel = None attempt += 1 if attempt > self.STARTUP_RETRY_LIMIT: logger.error( @@ -133,12 +136,17 @@ async def publish_message(self, message_body): """Publish to feedback queue and handle failures.""" try: await self.channel.default_exchange.publish( - aio_pika.Message(body=json.dumps(message_body).encode()), + aio_pika.Message( + body=json.dumps(message_body).encode(), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + ), routing_key=self.FEEDBACK_QUEUE, ) logger.info( f"Published submission {message_body.get('submission_id')} for user {message_body.get('student_id')}" ) + logger.info(f"Published message body:") + logger.info(json.dumps(message_body, indent=2)) except asyncio.CancelledError as e: logger.warning("publish_message CancelledError") raise Exception("publish_message CancelledError") from e @@ -212,7 +220,7 @@ async def publish_to_dlq(self, message_body, reason: str = "Max retries exceeded async def start_consumer(self, callback): """Start consuming messages from the submission queue.""" await self.connect() - await self.submission_queue.consume(lambda msg: callback(msg)) + await self.submission_queue.consume(lambda msg: callback(msg), no_ack=False) async def close(self): """Close RabbitMQ connection and cancel pending tasks.""" diff --git a/pg_setup.sh b/pg_setup.sh new file mode 100755 index 0000000..7172f5a --- /dev/null +++ b/pg_setup.sh @@ -0,0 +1,118 @@ +COMPOSE_FILE="docker-postgres.yml" +COMPOSE_CMD="podman-compose" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +CYAN='\033[0;36m' +GRAY='\033[0;37m' +NC='\033[0m' + + +# Load configuration from .env if it exists +if [ -f ".env" ]; then + set -a + source <(grep -v '^#' .env | grep -v '^$' | sed 's/\r$//') + set +a +fi + +# Configuration (with defaults) +POSTGRES_CONTAINER="plg-postgres" +POSTGRES_PORT="${POSTGRES_PORT:-5432}" +POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-postgres}" +POSTGRES_DB="${POSTGRES_DB:-plagiarism_db}" +POSTGRES_USER="${POSTGRES_USER:-postgres}" + +$COMPOSE_CMD -f $COMPOSE_FILE up -d + +wait_for_postgres() { + echo -e "${YELLOW}Waiting for PostgreSQL...${NC}" + + local max_attempts=30 + local attempt=0 + + while [ $attempt -lt $max_attempts ]; do + attempt=$((attempt + 1)) + + if $CONTAINER_CMD exec $POSTGRES_CONTAINER pg_isready -U $POSTGRES_USER &>/dev/null; then + echo -e "${GREEN}[OK] PostgreSQL ready${NC}" + return 0 + fi + + sleep 2 + done + + echo -e "${RED}ERROR: PostgreSQL timeout${NC}" + exit 1 +} + +initialize_database() { + if [ ! -f "database/init.sql" ]; then + echo -e "${YELLOW}WARNING: database/init.sql not found${NC}" + return + fi + + echo -e "${CYAN}Initializing database...${NC}" + + # Run init.sql + if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < database/init.sql 2>/dev/null; then + echo -e "${GREEN}[OK] Database schema initialized${NC}" + else + echo -e "${GRAY}Database schema already exists${NC}" + fi + + # Create migrations tracking table if it doesn't exist + $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " + CREATE TABLE IF NOT EXISTS schema_migrations ( + id SERIAL PRIMARY KEY, + migration_name VARCHAR(255) UNIQUE NOT NULL, + applied_at TIMESTAMP DEFAULT NOW() + ); + " 2>/dev/null + + # Run migration scripts + if [ -d "database/migrations" ]; then + local migration_count=0 + for migration_file in database/migrations/*.sql; do + if [ -f "$migration_file" ]; then + local migration_name=$(basename "$migration_file") + + # Check if migration already applied + local already_applied=$($CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -t -c " + SELECT COUNT(*) FROM schema_migrations WHERE migration_name = '$migration_name'; + " 2>/dev/null | tr -d '[:space:]') + + if [ "$already_applied" = "0" ]; then + echo -e "${CYAN}Applying migration: $migration_name${NC}" + + if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < "$migration_file" 2>/dev/null; then + # Record migration as applied + $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " + INSERT INTO schema_migrations (migration_name) VALUES ('$migration_name'); + " 2>/dev/null + echo -e "${GREEN}[OK] Applied: $migration_name${NC}" + migration_count=$((migration_count + 1)) + else + echo -e "${YELLOW}WARNING: Failed to apply $migration_name${NC}" + fi + fi + fi + done + + if [ $migration_count -eq 0 ]; then + echo -e "${GRAY}All migrations already applied${NC}" + else + echo -e "${GREEN}[OK] Applied $migration_count migration(s)${NC}" + fi + fi +} + +main() { + wait_for_postgres + initialize_database + echo -e "\n${GRAY}Containers running in background. Press Ctrl+C to exit this script.${NC}" +} + +main +echo "Setup Complete" diff --git a/plag_checker/submissions_checker.py b/plag_checker/submissions_checker.py index 0787003..9f04ee8 100644 --- a/plag_checker/submissions_checker.py +++ b/plag_checker/submissions_checker.py @@ -6,7 +6,6 @@ from mq.mq_client import MQClient from database.db_manager import DatabaseManager from processors.image_processor import ImageProcessor -from processors.text_processor import TextProcessor from typing import TYPE_CHECKING @@ -33,16 +32,14 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): """Ensure message is acknowledged exactly once on exit.""" if not self.acked: - # Default behavior: reject without requeue on unhandled errors - logger.warning( - "Message not explicitly acknowledged, rejecting (requeue=False)" - ) + # Default behavior: requeue unhandled messages to avoid message loss. + logger.warning("Message not explicitly acknowledged, requeueing") try: - await self.message.reject(requeue=False) + await self.message.nack(requeue=True) self.acked = True - self.action = "reject(cleanup)" + self.action = "nack(cleanup, requeue=True)" except Exception as e: - logger.critical(f"CRITICAL: Failed to reject message in cleanup: {e}") + logger.critical(f"CRITICAL: Failed to nack message in cleanup: {e}") # DO NOT set self.acked = True here - let finally block handle it return False # Don't suppress exceptions @@ -92,7 +89,6 @@ def __init__( self._owns_image_worker = image_worker is None self.image_processor = None # Will be set after image_worker initialization - self.text_processor = TextProcessor() self.STARTUP_RETRY_DELAY = startup_retry_delay self.MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) @@ -119,13 +115,15 @@ async def initialize(self): if self.image_worker is None: raise RuntimeError("ImageWorker failed to initialize") + logger.info("Submission Checker initialized successfully") + await self.start_consumer() async def process_submission(self, submission): if self._shutdown: - logger.warning("Shutdown in progress, rejecting message") + logger.warning("Shutdown in progress, requeueing message") try: - await submission.nack(requeue=False) + await submission.nack(requeue=True) except Exception as e: logger.error(f"Failed to nack message during shutdown: {e}") return @@ -157,7 +155,13 @@ async def process_submission(self, submission): redelivered = getattr(submission, "redelivered", False) delivery_count = 0 - if hasattr(submission, "headers") and submission.headers: + from collections.abc import Mapping + + if ( + hasattr(submission, "headers") + and isinstance(submission.headers, Mapping) + and submission.headers + ): delivery_count = submission.headers.get("x-delivery-count", 0) if redelivered and delivery_count == 0: @@ -181,31 +185,35 @@ async def process_submission(self, submission): ) if not dlq_success: logger.error( - f"Failed to send poison message to DLQ for submission {submission_id}" + f"Failed to send poison message to DLQ for submission {submission_id}; requeueing" ) + await submission.nack(requeue=True, submission_id=submission_id) + message_acked = True + return - await submission.nack( - submission, requeue=False, submission_id=submission_id - ) + await submission.nack(requeue=False, submission_id=submission_id) message_acked = True return - image_url = data.get("img_url") or data.get("image_url") + submission_url = data.get("submission_url") + submission_type = data.get("submission_type") record_id = await self.db.insert_submission_if_not_exists( - data, image_url or "", status + data, submission_url or "", status ) - if not image_url: - logger.error(f"No image URL in submission {submission_id}") + if submission_type != "image": + logger.info( + f"Skipping plagiarism processing for non-image submission {submission_id}" + ) + elif not submission_url: + logger.error(f"No submission_url in image submission {submission_id}") await self.db.update_status( submission_id, SubmissionStatus.FAILED, 0, - "No image URL in submission", - ) - await submission.nack( - submission, requeue=False, submission_id=submission_id + "No submission_url in image submission", ) + await submission.nack(requeue=False, submission_id=submission_id) message_acked = True return @@ -215,41 +223,33 @@ async def process_submission(self, submission): data["db_record_id"] = str(record_id) # Convert UUID to string - # Get processor and validate initialization - try: - processor = self.get_processor(data) - except RuntimeError as init_error: - # Processor not initialized - this is a system error, should retry - logger.error(f"System not ready: {init_error}") - await self.db.update_status( - submission_id, - SubmissionStatus.FAILED, - 0, - f"System initialization error: {str(init_error)}", - ) - await submission.nack( - submission, requeue=True, submission_id=submission_id - ) # Retry - system might be initializing - message_acked = True - return - except ValueError as validation_error: - # Invalid message format - should not retry - logger.error( - f"Invalid message format for submission {submission_id}: {validation_error}" - ) - await self.db.update_status( - submission_id, - SubmissionStatus.FAILED, - 0, - f"Invalid message format: {str(validation_error)}", - ) - await submission.nack( - submission, requeue=False, submission_id=submission_id - ) # Don't retry invalid format - message_acked = True - return - - result_text = await processor.process(data) + submission_type = data.get("submission_type") + if submission_type == "image": + try: + processor = self.get_processor(data) + except RuntimeError as init_error: + logger.error(f"System not ready: {init_error}") + await self.db.update_status( + submission_id, + SubmissionStatus.FAILED, + 0, + f"System initialization error: {str(init_error)}", + ) + await submission.nack(requeue=True, submission_id=submission_id) + message_acked = True + return + result_text = await processor.process(data) + else: + result_text = { + "similar_sources": [], + "similarity_score": 0.0, + "is_plagiarized": False, + "match_type": "original", + "is_ai_generated": False, + "ai_detection_source": "", + "ai_confidence": 0.0, + "plagiarism_source": "", + } try: if isinstance(result_text, (str, bytes, bytearray)): @@ -268,20 +268,17 @@ async def process_submission(self, submission): logger.warning( f"Invalid result for submission {submission_id}: {parse_error}" ) - await self.db.update_status( - submission_id or "unknown", - SubmissionStatus.FAILED, - int(retry_count or 0), - f"Invalid processing result: {str(parse_error)}", - ) - await submission.nack(requeue=False) - message_acked = True - return + raise RuntimeError(f"Invalid processing result: {parse_error}") from parse_error data["similar_sources"] = result_text.get("similar_sources") data["similarity_score"] = result_text.get("similarity_score") data["is_plagiarized"] = result_text.get("is_plagiarized") data["match_type"] = result_text.get("match_type") + data["assignment_id"] = data.pop("assign_id") + data["is_ai_generated"] = result_text.get("is_ai_generated", False) + data["ai_detection_source"] = result_text.get("ai_detection_source", "") + data["ai_confidence"] = result_text.get("ai_confidence", 0.0) + data["plagiarism_source"] = result_text.get("plagiarism_source", "") publish_data = {k: v for k, v in data.items() if k != "db_record_id"} @@ -301,9 +298,7 @@ async def process_submission(self, submission): except json.JSONDecodeError as e: logger.error(f"Invalid JSON in message: {e}") - await submission.nack( - submission, requeue=False, submission_id=submission_id - ) + await submission.nack(requeue=False, submission_id=submission_id) message_acked = True except RuntimeError as e: @@ -313,7 +308,7 @@ async def process_submission(self, submission): f"rejecting message (shutdown={self._shutdown})" ) await submission.nack( - submission, requeue=False, submission_id=submission_id + requeue=True, submission_id=submission_id ) message_acked = True else: @@ -334,9 +329,7 @@ async def process_submission(self, submission): rc + 1, f"Retry {rc + 1}/{self.MAX_RETRIES}: {str(e)[:200]}", ) - await submission.nack( - submission, requeue=True, submission_id=submission_id - ) + await submission.nack(requeue=True, submission_id=submission_id) message_acked = True else: await self.db.update_status( @@ -352,12 +345,13 @@ async def process_submission(self, submission): ) if not dlq_success: logger.error( - f"Failed to send max-retry message to DLQ for submission {submission_id}" + f"Failed to send max-retry message to DLQ for submission {submission_id}; requeueing" ) + await submission.nack(requeue=True, submission_id=submission_id) + message_acked = True + return - await submission.nack( - submission, requeue=False, submission_id=submission_id - ) + await submission.nack(requeue=False, submission_id=submission_id) message_acked = True logger.warning( f"Message discarded after {rc} retries and sent to DLQ" @@ -372,9 +366,9 @@ async def process_submission(self, submission): # Final safety net: ensure message is acknowledged if not message_acked: logger.warning( - f"Emergency reject for un-acknowledged message {submission_id}" + f"Emergency requeue for un-acknowledged message {submission_id}" ) - await submission.reject() + await submission.nack(requeue=True, submission_id=submission_id) message_acked = True def get_processor(self, data: dict): @@ -385,18 +379,18 @@ def get_processor(self, data: dict): RuntimeError: If processor is not initialized ValueError: If data format is invalid """ - if data.get("img_url"): + submission_type = data.get("submission_type") + if submission_type == "image": if self.image_processor is None: raise RuntimeError( "ImageProcessor not initialized - initialize() must be called before processing messages" ) return self.image_processor - elif data.get("text"): - return self.text_processor - else: - raise ValueError( - "Invalid submission format: missing both 'img_url' and 'text' fields" - ) + if submission_type is None: + raise ValueError("Invalid submission format: missing submission_type") + raise ValueError( + f"Unsupported submission_type for processing: {data.get('submission_type')}" + ) async def start_consumer(self): await self.client.start_consumer(self.process_submission) diff --git a/processors/__init__.py b/processors/__init__.py index fa8cdbb..960feed 100644 --- a/processors/__init__.py +++ b/processors/__init__.py @@ -1,17 +1,13 @@ -from .text_processor import TextProcessor from .image_processor import ImageProcessor async def get_processor( - data: dict, db_manager=None, image_processor=None, text_processor=None + data: dict, db_manager=None, image_processor=None ): """ Returns appropriate processor based on content type. - Priority: if 'img_url' present → ImageProcessor - otherwise → TextProcessor + Priority: if submission_type is 'image' → ImageProcessor """ - if data.get("img_url"): + if data.get("submission_type") == "image": return image_processor - elif data.get("text"): - return text_processor return None diff --git a/processors/image_processor.py b/processors/image_processor.py index 653a66c..89fc6fc 100644 --- a/processors/image_processor.py +++ b/processors/image_processor.py @@ -29,7 +29,7 @@ async def process(self, data: dict) -> dict: Args: data: dict containing: - - img_url or image_url: URL of the image to process + - submission_url: URL of the image to process - submission_id: unique submission identifier - student_id: hashed student identifier - assign_id: assignment identifier @@ -38,11 +38,11 @@ async def process(self, data: dict) -> dict: Returns: dict with plagiarism detection results or error message """ - image_url = data.get("img_url") or data.get("image_url") + submission_url = data.get("submission_url") - if not image_url: - logger.error("No image URL provided in submission data") - return {"error": "No image URL provided"} + if not submission_url: + logger.error("No submission_url provided in submission data") + return {"error": "No submission_url provided"} try: logger.info(f"Processing image submission: {data.get('submission_id')}") @@ -65,6 +65,9 @@ async def process(self, data: dict) -> dict: ) return {"error": "Invalid JSON response from worker"} + logger.debug("Result payload ") + logger.debug(json.dumps(result, indent=2)) + logger.info( f"Successfully processed submission: {data.get('submission_id')}" ) diff --git a/processors/text_processor.py b/processors/text_processor.py deleted file mode 100644 index b8a678a..0000000 --- a/processors/text_processor.py +++ /dev/null @@ -1,10 +0,0 @@ -from .base_processor import BaseProcessor - -class TextProcessor(BaseProcessor): - """Processor for text-based submissions.""" - - async def process(self, data: dict): - # Example: process text for plagiarism - text = data.get("payload", "") - # Dummy similarity result - return {"similarity_score": 78, "matched_sources": ["Source A", "Source B"]} diff --git a/requirements.txt b/requirements.txt index c2db3ae..d860689 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,31 +1,116 @@ -# Message Queue - RabbitMQ (latest stable) -aio-pika==9.5.7 -aiormq==6.9.2 - -# Database - PostgreSQL (async) -asyncpg==0.30.0 -pgvector==0.4.1 - -# HTTP Client -aiohttp==3.13.2 -requests==2.32.3 - -# Configuration Management (Pydantic v2 compatible) -pydantic<2.0.0,>=1.10.0 -python-dotenv==1.2.1 - -# Image Processing -pillow==12.0.0 -imagehash==4.3.2 - -# Machine Learning - CLIP & FAISS -torch==2.9.0 -torchvision==0.24.0 -open-clip-torch==3.2.0 -faiss-cpu==1.12.0 -numpy==2.3.4 - -# Utilities -tqdm==4.67.1 - -podman-compose==1.5.0 \ No newline at end of file +# Message Queue - RabbitMQ +aio-pika==9.5.7 +aiormq==6.9.2 +pamqp==3.3.0 + +# Database - PostgreSQL +asyncpg==0.30.0 +pgvector==0.4.1 + +# HTTP Client +aiohttp==3.13.2 +aiohappyeyeballs==2.6.1 +aiosignal==1.4.0 +async-timeout==5.0.1 +requests==2.32.3 +httpcore==1.0.9 +httpx==0.28.1 + +# GCP Cloud Storage +google-api-core==2.30.3 +google-auth==2.52.0 +google-cloud-core==2.6.0 +google-cloud-storage==3.10.1 +google-crc32c==1.8.0 +google-resumable-media==2.9.0 +googleapis-common-protos==1.75.0 +proto-plus==1.28.0 +protobuf==7.34.1 +pyasn1==0.6.3 +pyasn1_modules==0.4.2 + +# Configuration Management +pydantic==1.10.24 +python-dotenv==1.2.1 + +# Image Processing +pillow==12.0.0 +ImageHash==4.3.2 +PyWavelets==1.8.0 + +# Machine Learning - CLIP & FAISS +torch==2.9.0 +torchvision==0.24.0 +open_clip_torch==3.2.0 +faiss-cpu==1.12.0 +numpy==2.2.6 +timm==1.0.22 +safetensors==0.7.0 +huggingface_hub==1.2.1 +hf-xet==1.2.0 +filelock==3.20.0 +fsspec==2025.12.0 +ftfy==6.3.1 +regex==2025.11.3 +scipy==1.15.3 +mpmath==1.3.0 +sympy==1.14.0 +networkx==3.4.2 + +# NVIDIA CUDA runtime dependencies installed with torch +nvidia-cublas-cu12==12.8.4.1 +nvidia-cuda-cupti-cu12==12.8.90 +nvidia-cuda-nvrtc-cu12==12.8.93 +nvidia-cuda-runtime-cu12==12.8.90 +nvidia-cudnn-cu12==9.10.2.21 +nvidia-cufft-cu12==11.3.3.83 +nvidia-cufile-cu12==1.13.1.3 +nvidia-curand-cu12==10.3.9.90 +nvidia-cusolver-cu12==11.7.3.90 +nvidia-cusparse-cu12==12.5.8.93 +nvidia-cusparselt-cu12==0.7.1 +nvidia-nccl-cu12==2.27.5 +nvidia-nvjitlink-cu12==12.8.93 +nvidia-nvshmem-cu12==3.3.20 +nvidia-nvtx-cu12==12.8.90 +triton==3.5.0 + +# Web Framework - FastAPI +fastapi==0.115.0 +uvicorn==0.34.0 +starlette==0.38.6 +anyio==4.12.0 +h11==0.16.0 +httptools==0.7.1 +uvloop==0.22.1 +watchfiles==1.1.1 +websockets==15.0.1 +click==8.3.1 +typer-slim==0.20.0 +shellingham==1.5.4 + +# Utilities +annotated-doc==0.0.4 +attrs==25.4.0 +certifi==2025.11.12 +cffi==2.0.0 +charset-normalizer==3.4.4 +cryptography==48.0.0 +exceptiongroup==1.3.1 +frozenlist==1.8.0 +idna==3.11 +Jinja2==3.1.6 +MarkupSafe==3.0.3 +multidict==6.7.0 +packaging==25.0 +podman-compose==1.5.0 +propcache==0.4.1 +pycparser==3.0 +PyYAML==6.0.3 +setuptools==80.9.0 +tqdm==4.67.1 +typing_extensions==4.15.0 +urllib3==2.6.0 +wcwidth==0.2.14 +wheel==0.45.1 +yarl==1.22.0 diff --git a/scripts/docker-postgres.yml b/scripts/docker-postgres.yml new file mode 100644 index 0000000..f6dac63 --- /dev/null +++ b/scripts/docker-postgres.yml @@ -0,0 +1,53 @@ +version: '3.8' + +services: + # =================================== + # POSTGRESQL - Database + # =================================== + postgres: + image: pgvector/pgvector:pg16 + container_name: plg-postgres + ports: + - "5432:5432" + environment: + POSTGRES_DB: plagiarism_db + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_INITDB_ARGS: "-E UTF8" + POSTGRES_MAX_CONNECTIONS: 20 + PGDATA: /var/lib/postgresql/data/pgdata + volumes: + - postgres_data:/var/lib/postgresql/data + - ./database/init.sql:/docker-entrypoint-initdb.d/init.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + deploy: + resources: + limits: + cpus: '0.5' + memory: 512M + reservations: + cpus: '0.25' + memory: 256M + networks: + - plg-network + restart: always + + +# =================================== +# VOLUMES +# =================================== +volumes: + postgres_data: + driver: local + +# =================================== +# NETWORKS +# =================================== +networks: + plg-network: + driver: bridge diff --git a/scripts/postgres_setup.sh b/scripts/postgres_setup.sh new file mode 100644 index 0000000..92396b9 --- /dev/null +++ b/scripts/postgres_setup.sh @@ -0,0 +1,118 @@ +COMPOSE_FILE="docker-postgres.yml" +COMPOSE_CMD="podman-compose" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +CYAN='\033[0;36m' +GRAY='\033[0;37m' +NC='\033[0m' + + +# Load configuration from .env if it exists +if [ -f ".env" ]; then + set -a + source <(grep -v '^#' .env | grep -v '^$' | sed 's/\r$//') + set +a +fi + +# Configuration (with defaults) +POSTGRES_CONTAINER="plg-postgres" +POSTGRES_PORT="${POSTGRES_PORT:-5432}" +POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-postgres}" +POSTGRES_DB="${POSTGRES_DB:-plagiarism_db}" +POSTGRES_USER="${POSTGRES_USER:-postgres}" + +$COMPOSE_CMD -f $COMPOSE_FILE up -d + +wait_for_postgres() { + echo -e "${YELLOW}Waiting for PostgreSQL...${NC}" + + local max_attempts=30 + local attempt=0 + + while [ $attempt -lt $max_attempts ]; do + attempt=$((attempt + 1)) + + if $CONTAINER_CMD exec $POSTGRES_CONTAINER pg_isready -U $POSTGRES_USER &>/dev/null; then + echo -e "${GREEN}[OK] PostgreSQL ready${NC}" + return 0 + fi + + sleep 2 + done + + echo -e "${RED}ERROR: PostgreSQL timeout${NC}" + exit 1 +} + +initialize_database() { + if [ ! -f "database/init.sql" ]; then + echo -e "${YELLOW}WARNING: database/init.sql not found${NC}" + return + fi + + echo -e "${CYAN}Initializing database...${NC}" + + # Run init.sql + if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < database/init.sql 2>/dev/null; then + echo -e "${GREEN}[OK] Database schema initialized${NC}" + else + echo -e "${GRAY}Database schema already exists${NC}" + fi + + # Create migrations tracking table if it doesn't exist + $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " + CREATE TABLE IF NOT EXISTS schema_migrations ( + id SERIAL PRIMARY KEY, + migration_name VARCHAR(255) UNIQUE NOT NULL, + applied_at TIMESTAMP DEFAULT NOW() + ); + " 2>/dev/null + + # Run migration scripts + if [ -d "database/migrations" ]; then + local migration_count=0 + for migration_file in database/migrations/*.sql; do + if [ -f "$migration_file" ]; then + local migration_name=$(basename "$migration_file") + + # Check if migration already applied + local already_applied=$($CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -t -c " + SELECT COUNT(*) FROM schema_migrations WHERE migration_name = '$migration_name'; + " 2>/dev/null | tr -d '[:space:]') + + if [ "$already_applied" = "0" ]; then + echo -e "${CYAN}Applying migration: $migration_name${NC}" + + if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < "$migration_file" 2>/dev/null; then + # Record migration as applied + $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " + INSERT INTO schema_migrations (migration_name) VALUES ('$migration_name'); + " 2>/dev/null + echo -e "${GREEN}[OK] Applied: $migration_name${NC}" + migration_count=$((migration_count + 1)) + else + echo -e "${YELLOW}WARNING: Failed to apply $migration_name${NC}" + fi + fi + fi + done + + if [ $migration_count -eq 0 ]; then + echo -e "${GRAY}All migrations already applied${NC}" + else + echo -e "${GREEN}[OK] Applied $migration_count migration(s)${NC}" + fi + fi +} + +main() { + wait_for_postgres + initialize_database + echo -e "\n${GRAY}Containers running in background. Press Ctrl+C to exit this script.${NC}" +} + +main +echo -e "${YELLOW}PostgreSQL done...${NC}" \ No newline at end of file diff --git a/scripts/restart_app.sh b/scripts/restart_app.sh new file mode 100755 index 0000000..c95bfde --- /dev/null +++ b/scripts/restart_app.sh @@ -0,0 +1,46 @@ +# SKIP_GIT_PULL=1 sh scripts/restart_app.sh + +#!/bin/sh +set -eu + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +APP_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" +BRANCH="${BRANCH:-plg_integration}" +APP_SERVICE="${APP_SERVICE:-plg_app.service}" +POSTGRES_SERVICE="${POSTGRES_SERVICE:-plg_postgres.service}" +SKIP_GIT_PULL="${SKIP_GIT_PULL:-0}" + +log() { + printf '\n[%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "$*" +} + +start_postgres_service_if_present() { + if systemctl --user cat "$POSTGRES_SERVICE" >/dev/null 2>&1; then + log "Ensuring PostgreSQL service is running" + systemctl --user start "$POSTGRES_SERVICE" + else + log "PostgreSQL service $POSTGRES_SERVICE not found; migrations will start PostgreSQL if needed" + fi +} + +cd "$APP_DIR" + +if [ "$SKIP_GIT_PULL" != "1" ]; then + log "Pulling latest code for $BRANCH" + git fetch origin "$BRANCH" + git checkout "$BRANCH" + git pull --ff-only origin "$BRANCH" +else + log "Skipping git pull because SKIP_GIT_PULL=$SKIP_GIT_PULL" +fi + +start_postgres_service_if_present + +log "Applying database migrations" +./scripts/run_db_migrations.sh + +log "Restarting application service" +systemctl --user restart "$APP_SERVICE" +systemctl --user --no-pager --full status "$APP_SERVICE" + +log "Restart complete" diff --git a/scripts/run_db_migrations.sh b/scripts/run_db_migrations.sh new file mode 100755 index 0000000..4ed5891 --- /dev/null +++ b/scripts/run_db_migrations.sh @@ -0,0 +1,172 @@ +#!/bin/sh +set -eu + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +APP_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" +COMPOSE_FILE="${COMPOSE_FILE:-docker-postgres.yml}" +POSTGRES_CONTAINER="${POSTGRES_CONTAINER:-plg-postgresdb}" +INIT_SQL="${INIT_SQL:-database/init.sql}" +MIGRATIONS_DIR="${MIGRATIONS_DIR:-database/migrations}" +BASELINE_EXISTING_MIGRATIONS="${BASELINE_EXISTING_MIGRATIONS:-0}" +INIT_CHECK_SQL="${INIT_CHECK_SQL:-SELECT to_regclass('public.submissions') IS NOT NULL;}" + +cd "$APP_DIR" + +if [ -f ".env" ]; then + ENV_FILE="$(mktemp)" + trap 'rm -f "$ENV_FILE"' EXIT + sed 's/\r$//' .env >"$ENV_FILE" + + set -a + # shellcheck disable=SC1091 + . "$ENV_FILE" + set +a +else + echo "ERROR: .env is required in $APP_DIR" >&2 + exit 1 +fi + +: "${POSTGRES_DB:?POSTGRES_DB must be set in .env}" +: "${POSTGRES_USER:?POSTGRES_USER must be set in .env}" + +if command -v podman >/dev/null 2>&1; then + CONTAINER_CMD="${CONTAINER_CMD:-podman}" +elif command -v docker >/dev/null 2>&1; then + CONTAINER_CMD="${CONTAINER_CMD:-docker}" +else + echo "ERROR: podman or docker is required" >&2 + exit 1 +fi + +if [ -x "$APP_DIR/venv/bin/podman-compose" ]; then + COMPOSE_CMD="${COMPOSE_CMD:-$APP_DIR/venv/bin/podman-compose}" +elif command -v podman-compose >/dev/null 2>&1; then + COMPOSE_CMD="${COMPOSE_CMD:-podman-compose}" +elif command -v docker-compose >/dev/null 2>&1; then + COMPOSE_CMD="${COMPOSE_CMD:-docker-compose}" +elif docker compose version >/dev/null 2>&1; then + COMPOSE_CMD="${COMPOSE_CMD:-docker compose}" +else + COMPOSE_CMD="" +fi + +find_postgres_container() { + if "$CONTAINER_CMD" container exists "$POSTGRES_CONTAINER" >/dev/null 2>&1; then + echo "$POSTGRES_CONTAINER" + return + fi + + for candidate in plg-postgresdb plg-postgres; do + if "$CONTAINER_CMD" container exists "$candidate" >/dev/null 2>&1; then + echo "$candidate" + return + fi + done + + echo "$POSTGRES_CONTAINER" +} + +start_postgres_if_needed() { + if [ -n "$COMPOSE_CMD" ] && [ -f "$COMPOSE_FILE" ]; then + $COMPOSE_CMD -f "$COMPOSE_FILE" up -d postgres + else + "$CONTAINER_CMD" start "$(find_postgres_container)" >/dev/null + fi +} + +wait_for_postgres() { + local container="$1" + local attempt=1 + + while [ "$attempt" -le 60 ]; do + if "$CONTAINER_CMD" exec "$container" pg_isready -U "$POSTGRES_USER" -d "$POSTGRES_DB" >/dev/null 2>&1; then + return 0 + fi + sleep 2 + attempt=$((attempt + 1)) + done + + echo "ERROR: PostgreSQL did not become ready in time" >&2 + exit 1 +} + +psql_exec() { + local container="$1" + shift + "$CONTAINER_CMD" exec "$container" psql -v ON_ERROR_STOP=1 -U "$POSTGRES_USER" -d "$POSTGRES_DB" "$@" +} + +psql_file() { + local container="$1" + local file="$2" + "$CONTAINER_CMD" exec -i "$container" psql -v ON_ERROR_STOP=1 -U "$POSTGRES_USER" -d "$POSTGRES_DB" <"$file" +} + +record_migration() { + local container="$1" + local migration_name="$2" + + psql_exec "$container" -c "INSERT INTO schema_migrations (migration_name) VALUES ('$migration_name') ON CONFLICT (migration_name) DO NOTHING;" +} + +main() { + start_postgres_if_needed + + local container + container="$(find_postgres_container)" + + wait_for_postgres "$container" + + local initialized_from_init=0 + local init_check_result + init_check_result="$( + psql_exec "$container" -tAc "$INIT_CHECK_SQL" + )" + + if [ "$init_check_result" != "t" ] && [ -f "$INIT_SQL" ]; then + echo "Applying $INIT_SQL" + psql_file "$container" "$INIT_SQL" + initialized_from_init=1 + fi + + psql_exec "$container" -c " +CREATE TABLE IF NOT EXISTS schema_migrations ( + id SERIAL PRIMARY KEY, + migration_name VARCHAR(255) UNIQUE NOT NULL, + applied_at TIMESTAMP DEFAULT NOW() +);" + + if [ -d "$MIGRATIONS_DIR" ]; then + for migration_file in "$MIGRATIONS_DIR"/*.sql; do + [ -f "$migration_file" ] || continue + + migration_name="$(basename "$migration_file")" + + if [ "$initialized_from_init" = "1" ]; then + echo "Recording migration already included by $INIT_SQL: $migration_name" + record_migration "$container" "$migration_name" + continue + fi + + if [ "$BASELINE_EXISTING_MIGRATIONS" = "1" ]; then + echo "Recording existing migration without applying: $migration_name" + record_migration "$container" "$migration_name" + continue + fi + + already_applied="$( + psql_exec "$container" -tAc "SELECT COUNT(*) FROM schema_migrations WHERE migration_name = '$migration_name';" + )" + + if [ "$already_applied" = "0" ]; then + echo "Applying migration: $migration_name" + psql_file "$container" "$migration_file" + record_migration "$container" "$migration_name" + else + echo "Skipping already-applied migration: $migration_name" + fi + done + fi +} + +main "$@" diff --git a/scripts/setup_vm.sh b/scripts/setup_vm.sh new file mode 100755 index 0000000..1f1db57 --- /dev/null +++ b/scripts/setup_vm.sh @@ -0,0 +1,171 @@ +#!/usr/bin/env bash +set -euo pipefail + +APP_USER="${APP_USER:-$(id -un)}" +APP_HOME="${APP_HOME:-$HOME}" +APP_DIR="${APP_DIR:-$APP_HOME/tap_plg}" +REPO_URL="${REPO_URL:-https://github.com/theapprenticeproject/tap_plg.git}" +BRANCH="${BRANCH:-plg_integration}" +PYTHON_BIN="${PYTHON_BIN:-python3}" +POSTGRES_SERVICE="${POSTGRES_SERVICE:-plg_postgres.service}" +APP_SERVICE="${APP_SERVICE:-plg_app.service}" +DOWNLOAD_CLIP_MODEL="${DOWNLOAD_CLIP_MODEL:-1}" +ENV_FILE="${ENV_FILE:-}" + +log() { + printf '\n[%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "$*" +} + +run_sudo() { + if [ "$(id -u)" -eq 0 ]; then + "$@" + else + sudo "$@" + fi +} + +clone_or_update_repo() { + if [ -d "$APP_DIR/.git" ]; then + log "Updating existing repository in $APP_DIR" + git -C "$APP_DIR" fetch origin "$BRANCH" + git -C "$APP_DIR" checkout "$BRANCH" + git -C "$APP_DIR" pull --ff-only origin "$BRANCH" + else + log "Cloning $REPO_URL into $APP_DIR" + mkdir -p "$(dirname "$APP_DIR")" + git clone --branch "$BRANCH" "$REPO_URL" "$APP_DIR" + fi +} + +install_env_file() { + cd "$APP_DIR" + + if [ -f ".env" ]; then + log ".env already exists" + return + fi + + if [ -n "$ENV_FILE" ]; then + if [ ! -f "$ENV_FILE" ]; then + echo "ERROR: ENV_FILE does not exist: $ENV_FILE" >&2 + exit 1 + fi + + log "Creating .env from $ENV_FILE" + cp "$ENV_FILE" .env + return + fi + + echo "ERROR: .env is required in $APP_DIR, or pass ENV_FILE=/path/to/envfile when running this script." >&2 + exit 1 +} + +setup_python_environment() { + cd "$APP_DIR" + + log "Creating/updating Python virtual environment" + "$PYTHON_BIN" -m venv venv + ./venv/bin/python -m pip install podman-compose + + log "Starting production environment" + ./start-prod-env.sh --with-api + + if [ "$DOWNLOAD_CLIP_MODEL" = "1" ]; then + log "Downloading CLIP model if it is not already present" + ./venv/bin/python scripts/download_clip_model.py --model ViT-L-14 + else + log "Skipping CLIP model download because DOWNLOAD_CLIP_MODEL=$DOWNLOAD_CLIP_MODEL" + fi +} + +install_system_packages() { + log "Installing OS packages" + run_sudo apt update + run_sudo apt install -y podman vim git python3.10-venv + + log "Configuring systemd delegation for rootless Podman" + run_sudo mkdir -p /etc/systemd/system/user@.service.d + run_sudo tee /etc/systemd/system/user@.service.d/delegate.conf >/dev/null <<'EOF' +[Service] +Delegate=cpu cpuset io memory pids +EOF + run_sudo systemctl daemon-reload +} + +install_user_services() { + cd "$APP_DIR" + + local systemd_dir="$HOME/.config/systemd/user" + mkdir -p "$systemd_dir" + + log "Writing user systemd services" + cat >"$systemd_dir/$POSTGRES_SERVICE" <"$systemd_dir/$APP_SERVICE" </dev/null || true) -case "${UNAME_OUT}" in - MINGW*|MSYS*|CYGWIN*) OS_TYPE="windows-msys" ;; - Darwin) OS_TYPE="macos" ;; - Linux) OS_TYPE="linux" ;; - *) OS_TYPE="unix" ;; -esac - -# Detect Python executable -PY="" -if [ "${OS_TYPE}" = "windows-msys" ]; then - PY_CANDIDATES=("py" "python3" "python" "python.exe") -else - PY_CANDIDATES=("python3" "python" "py" "python.exe") -fi - -for candidate in "${PY_CANDIDATES[@]}"; do - if [ "$candidate" = "py" ]; then - if command -v py >/dev/null 2>&1; then - if py -3 -c "import sys; sys.stdout.write('Python found!!\n')" 2>/dev/null; then - PY='py -3' - break - fi - fi - continue - fi - - candidate_path=$(command -v "$candidate" 2>/dev/null || true) - if [ -n "$candidate_path" ]; then - case "$candidate_path" in - *WindowsApps*|*windowsapps*) continue ;; - esac - - if "$candidate" -c "import sys; sys.stdout.write('ok')" 2>/dev/null; then - PY="$candidate" - break - fi - fi -done - -if [ -z "$PY" ]; then - echo "ERROR: No Python 3.10+ found. Install Python or ensure it's in PATH." >&2 - exit 1 -fi - -# Detect container runtime (Podman or Docker) -CONTAINER_CMD="" -COMPOSE_CMD="" -if command -v podman &> /dev/null; then - CONTAINER_CMD="podman" - if command -v podman-compose &> /dev/null; then - COMPOSE_CMD="podman-compose" - else - echo "ERROR: podman-compose not found. Install it:" >&2 - echo " pip install podman-compose" >&2 - exit 1 - fi -elif command -v docker &> /dev/null; then - CONTAINER_CMD="docker" - if command -v docker-compose &> /dev/null; then - COMPOSE_CMD="docker-compose" - elif docker compose version &> /dev/null; then - COMPOSE_CMD="docker compose" - else - echo "ERROR: docker-compose not found. Install it:" >&2 - echo " https://docs.docker.com/compose/install/" >&2 - exit 1 - fi -else - echo "ERROR: Neither Podman nor Docker found. Install one of them:" >&2 - echo " Podman: https://podman.io/getting-started/installation" >&2 - echo " Docker: https://docs.docker.com/get-docker/" >&2 - exit 1 -fi - -# Colors for output -RED='\033[0;31m' -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -CYAN='\033[0;36m' -GRAY='\033[0;37m' -NC='\033[0m' - -echo -e "${CYAN}==================================================================" -echo -e " MentorMe Plagiarism Checker - Local Development Setup" -echo -e " Container Runtime: ${CONTAINER_CMD}" -echo -e " Compose File: ${COMPOSE_FILE}" -echo -e "==================================================================${NC}" -echo "" - -# Validate compose file exists -if [ ! -f "$COMPOSE_FILE" ]; then - echo -e "${RED}ERROR: $COMPOSE_FILE not found${NC}" - exit 1 -fi - -# Load configuration from .env if it exists -if [ -f ".env" ]; then - set -a - source <(grep -v '^#' .env | grep -v '^$' | sed 's/\r$//') - set +a -fi - -# Configuration (with defaults) -POSTGRES_CONTAINER="mentorme-plagiarism-postgres" -RABBITMQ_CONTAINER="mentorme-plagiarism-rabbitmq" -POSTGRES_PORT="${POSTGRES_PORT:-5432}" -RABBITMQ_PORT="${RABBITMQ_PORT:-5672}" -RABBITMQ_MGMT_PORT="${RABBITMQ_MANAGEMENT_PORT:-15672}" -POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-postgres}" -POSTGRES_DB="${POSTGRES_DB:-plagiarism_db}" -POSTGRES_USER="${POSTGRES_USER:-postgres}" -RABBITMQ_USER="${RABBITMQ_USER:-admin}" -RABBITMQ_PASS="${RABBITMQ_PASS:-admin123}" -CLIP_MODEL_URL="${CLIP_MODEL_URL:-https://huggingface.co/laion/CLIP-ViT-L-14-laion2B-s32B-b82K/resolve/main/open_clip_pytorch_model.bin}" - -stop_existing_containers() { - local containers_exist=false - - # Check if compose stack is running - if $COMPOSE_CMD -f $COMPOSE_FILE ps 2>/dev/null | grep -q "Up\|running"; then - containers_exist=true - fi - - if [ "$containers_exist" = true ]; then - echo -e "${YELLOW}Existing containers found:${NC}" - $COMPOSE_CMD -f $COMPOSE_FILE ps - echo "" - echo -e "${YELLOW}This will stop and remove existing containers.${NC}" - read -p "Continue? (y/N): " -n 1 -r - echo - - if [[ ! $REPLY =~ ^[Yy]$ ]]; then - echo -e "${RED}Aborted by user${NC}" - # exit 0 - else - echo -e "${CYAN}Stopping existing containers...${NC}" - $COMPOSE_CMD -f $COMPOSE_FILE down - fi - fi - - echo -e "${GREEN}[OK] Ready to start containers${NC}" -} - -start_containers() { - echo -e "${CYAN}Starting containers with $COMPOSE_FILE...${NC}" - - # Determine which services to start - #local services="postgres rabbitmq pgadmin plagiarism-checker" - local services="postgres rabbitmq plagiarism-checker" - - if [ "$START_API" -eq 1 ]; then - services="$services api" - echo -e "${CYAN}Including API service${NC}" - - # Force rebuild API container to ensure it uses Dockerfile.api (not Dockerfile) - echo -e "${YELLOW}Rebuilding API container with Dockerfile.api...${NC}" - $COMPOSE_CMD -f $COMPOSE_FILE build --no-cache api - - if [ $? -ne 0 ]; then - echo -e "${RED}ERROR: Failed to build API container${NC}" - exit 1 - fi - echo -e "${GREEN}[OK] API container rebuilt${NC}" - fi - - $COMPOSE_CMD -f $COMPOSE_FILE up -d $services - - if [ $? -ne 0 ]; then - echo -e "${RED}ERROR: Failed to start containers${NC}" - exit 1 - fi - - echo -e "${GREEN}[OK] Containers started${NC}" -} - -wait_for_postgres() { - echo -e "${YELLOW}Waiting for PostgreSQL...${NC}" - - local max_attempts=30 - local attempt=0 - - while [ $attempt -lt $max_attempts ]; do - attempt=$((attempt + 1)) - - if $CONTAINER_CMD exec $POSTGRES_CONTAINER pg_isready -U $POSTGRES_USER &>/dev/null; then - echo -e "${GREEN}[OK] PostgreSQL ready${NC}" - return 0 - fi - - sleep 2 - done - - echo -e "${RED}ERROR: PostgreSQL timeout${NC}" - exit 1 -} - -wait_for_rabbitmq() { - echo -e "${YELLOW}Waiting for RabbitMQ...${NC}" - - local max_attempts=30 - local attempt=0 - - while [ $attempt -lt $max_attempts ]; do - attempt=$((attempt + 1)) - - if curl -s -f http://localhost:$RABBITMQ_MGMT_PORT &>/dev/null; then - echo -e "${GREEN}[OK] RabbitMQ ready${NC}" - return 0 - fi - - sleep 2 - done - - echo -e "${RED}ERROR: RabbitMQ timeout${NC}" - exit 1 -} - -wait_for_api() { - if [ "$START_API" -ne 1 ]; then - return 0 - fi - - echo -e "${YELLOW}Waiting for API service...${NC}" - - local max_attempts=30 - local attempt=0 - - while [ $attempt -lt $max_attempts ]; do - attempt=$((attempt + 1)) - - if curl -s -f http://localhost:8000/health &>/dev/null; then - echo -e "${GREEN}[OK] API service ready${NC}" - return 0 - fi - - sleep 2 - done - - echo -e "${RED}ERROR: API service timeout${NC}" - echo -e "${YELLOW}Checking API container logs:${NC}" - $COMPOSE_CMD -f $COMPOSE_FILE logs --tail=50 api - exit 1 -} - -initialize_database() { - if [ ! -f "database/init.sql" ]; then - echo -e "${YELLOW}WARNING: database/init.sql not found${NC}" - return - fi - - echo -e "${CYAN}Initializing database...${NC}" - - # Run init.sql - if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < database/init.sql 2>/dev/null; then - echo -e "${GREEN}[OK] Database schema initialized${NC}" - else - echo -e "${GRAY}Database schema already exists${NC}" - fi - - # Create migrations tracking table if it doesn't exist - $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " - CREATE TABLE IF NOT EXISTS schema_migrations ( - id SERIAL PRIMARY KEY, - migration_name VARCHAR(255) UNIQUE NOT NULL, - applied_at TIMESTAMP DEFAULT NOW() - ); - " 2>/dev/null - - # Run migration scripts - if [ -d "database/migrations" ]; then - local migration_count=0 - for migration_file in database/migrations/*.sql; do - if [ -f "$migration_file" ]; then - local migration_name=$(basename "$migration_file") - - # Check if migration already applied - local already_applied=$($CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -t -c " - SELECT COUNT(*) FROM schema_migrations WHERE migration_name = '$migration_name'; - " 2>/dev/null | tr -d '[:space:]') - - if [ "$already_applied" = "0" ]; then - echo -e "${CYAN}Applying migration: $migration_name${NC}" - - if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < "$migration_file" 2>/dev/null; then - # Record migration as applied - $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " - INSERT INTO schema_migrations (migration_name) VALUES ('$migration_name'); - " 2>/dev/null - echo -e "${GREEN}[OK] Applied: $migration_name${NC}" - migration_count=$((migration_count + 1)) - else - echo -e "${YELLOW}WARNING: Failed to apply $migration_name${NC}" - fi - fi - fi - done - - if [ $migration_count -eq 0 ]; then - echo -e "${GRAY}All migrations already applied${NC}" - else - echo -e "${GREEN}[OK] Applied $migration_count migration(s)${NC}" - fi - fi -} - -create_env_file() { - if [ -f ".env" ]; then - echo -e "${GREEN}[OK] .env exists${NC}" - return - fi - - if [ ! -f ".env.example" ]; then - echo -e "${RED}ERROR: .env.example not found${NC}" - exit 1 - fi - - echo -e "${CYAN}Creating .env from template...${NC}" - cp .env.example .env - - # Keep service names for docker-compose (containers communicate via service names) - # No transformation needed - .env.example already has correct service names - - echo -e "${GREEN}[OK] .env created${NC}" -} - -show_summary() { - echo -e "\n${CYAN}==================================================================" - echo -e " Environment Ready!" - echo -e "==================================================================${NC}" - echo "" - echo -e "${GREEN}✓ Services Running:${NC}" - echo -e " PostgreSQL: localhost:$POSTGRES_PORT (with pgvector)" - echo -e " RabbitMQ: localhost:$RABBITMQ_PORT" - echo -e " RabbitMQ UI: http://localhost:$RABBITMQ_MGMT_PORT ($RABBITMQ_USER/$RABBITMQ_PASS)" - - if [ "$START_API" -eq 1 ]; then - echo -e " API: http://localhost:8000" - echo -e " API Docs: http://localhost:8000/docs" - fi - - echo "" - echo -e "${CYAN}💡 Quick Start:${NC}" - echo -e " ${YELLOW}./start-dev-env.sh --full-setup${NC} ${GRAY}# Development mode (default)${NC}" - echo -e " ${YELLOW}./start-dev-env.sh --with-api${NC} ${GRAY}# Include API container${NC}" - echo -e " ${YELLOW}./start-dev-env.sh --prod${NC} ${GRAY}# Production mode${NC}" - echo "" - echo -e "${CYAN}📋 Manual Setup:${NC}" - echo -e " 1. ${YELLOW}${PY} -m venv venv${NC}" - - case "${OS_TYPE}" in - windows-msys) echo -e " ${YELLOW}source venv/Scripts/activate${NC}" ;; - *) echo -e " ${YELLOW}source venv/bin/activate${NC}" ;; - esac - - echo -e " 2. ${YELLOW}${PY} -m pip install -r requirements.txt${NC}" - echo -e " 3. ${YELLOW}${PY} app.py${NC}" - echo "" - echo -e "${CYAN}🔧 Container Commands:${NC}" - echo -e " ${GRAY}Logs: $COMPOSE_CMD -f $COMPOSE_FILE logs -f${NC}" - echo -e " ${GRAY}Stop: $COMPOSE_CMD -f $COMPOSE_FILE stop${NC}" - echo -e " ${GRAY}Remove: $COMPOSE_CMD -f $COMPOSE_FILE down${NC}" - echo "" - echo -e "${CYAN}==================================================================${NC}" -} - -main() { - create_env_file - stop_existing_containers - start_containers - wait_for_postgres - wait_for_rabbitmq - wait_for_api - initialize_database - show_summary - echo -e "\n${GRAY}Containers running in background. Press Ctrl+C to exit this script.${NC}" -} - -setup_python_environment() { - echo -e "\n${CYAN}==================================================================" - echo -e " Full Setup: Python Environment" - echo -e "==================================================================${NC}" - - if [ -d "venv" ]; then - echo -e "${GRAY}Virtual environment exists${NC}" - else - echo -e "${CYAN}Creating virtual environment...${NC}" - $PY -m venv venv - echo -e "${GREEN}[OK] venv created${NC}" - fi - - echo -e "${CYAN}Activating virtual environment...${NC}" - if [ "${OS_TYPE}" = "windows-msys" ]; then - source venv/Scripts/activate - else - source venv/bin/activate - fi - - echo -e "${CYAN}Installing dependencies (5-10 minutes)...${NC}" - python -m pip install --upgrade pip setuptools wheel - python -m pip install -r requirements.txt - echo -e "${GREEN}[OK] Dependencies installed${NC}" - - echo -e "${CYAN}Creating directories...${NC}" - mkdir -p data/reference_images data/models/clip logs - echo -e "${GREEN}[OK] Directories created${NC}" - - # Download CLIP model using curl - echo -e "${CYAN}Checking CLIP model...${NC}" - if [ ! -f "data/models/clip/open_clip_pytorch_model.bin" ]; then - echo -e "${YELLOW}Downloading CLIP model (this may take a while)...${NC}" - curl -L -o data/models/clip/open_clip_pytorch_model.bin \ - ${CLIP_MODEL_URL} || { - echo -e "${YELLOW}WARNING: CLIP model download failed, will download on first run${NC}" - } - - if [ -f "data/models/clip/open_clip_pytorch_model.bin" ]; then - echo -e "${GREEN}[OK] CLIP model downloaded${NC}" - fi - else - echo -e "${GRAY}CLIP model already exists${NC}" - fi - - echo -e "${CYAN}Verifying environment...${NC}" - python -c " -import open_clip, asyncpg, aio_pika, PIL, imagehash -print('✓ All imports successful') -" || { - echo -e "${RED}ERROR: Environment verification failed${NC}" - exit 1 - } - - echo -e "\n${CYAN}==================================================================" - echo -e " Setup Complete!" - echo -e "==================================================================${NC}" - echo -e "\n${GREEN}✓ Next Steps:${NC}" - echo -e "\n${CYAN}Terminal 1 - Worker:${NC}" - [ "${OS_TYPE}" = "windows-msys" ] && echo -e " ${YELLOW}source venv/Scripts/activate${NC}" || echo -e " ${YELLOW}source venv/bin/activate${NC}" - echo -e " ${YELLOW}python app.py${NC}" - echo -e "\n${CYAN}Terminal 2 - API:${NC}" - [ "${OS_TYPE}" = "windows-msys" ] && echo -e " ${YELLOW}source venv/Scripts/activate${NC}" || echo -e " ${YELLOW}source venv/bin/activate${NC}" - echo -e " ${YELLOW}cd api && uvicorn api:app --reload --host 0.0.0.0 --port 8000${NC}" - echo -e "\n${CYAN}API Docs:${NC} ${YELLOW}http://localhost:8000/docs${NC}" - echo "" -} - -main - -if [ "$FULL_SETUP" -eq 1 ]; then - setup_python_environment -fi +#!/usr/bin/env bash +# Local Development Startup Script +# Starts PostgreSQL and RabbitMQ containers using docker-compose + +set -e +FULL_SETUP=0 +START_API=0 +COMPOSE_FILE="docker-compose-dev.yml" + +while [[ "$#" -gt 0 ]]; do + case "$1" in + --full-setup) FULL_SETUP=1; shift ;; + --with-api) START_API=1; shift ;; + --prod) COMPOSE_FILE="docker-compose-prod.yml"; shift ;; + --dev) COMPOSE_FILE="docker-compose-dev.yml"; shift ;; + *) break ;; + esac +done + +# Detect platform +OS_TYPE="unknown" +UNAME_OUT=$(uname -s 2>/dev/null || true) +case "${UNAME_OUT}" in + MINGW*|MSYS*|CYGWIN*) OS_TYPE="windows-msys" ;; + Darwin) OS_TYPE="macos" ;; + Linux) OS_TYPE="linux" ;; + *) OS_TYPE="unix" ;; +esac + +# Detect Python executable +PY="" +if [ "${OS_TYPE}" = "windows-msys" ]; then + PY_CANDIDATES=("py" "python3" "python" "python.exe") +else + PY_CANDIDATES=("python3" "python" "py" "python.exe") +fi + +for candidate in "${PY_CANDIDATES[@]}"; do + if [ "$candidate" = "py" ]; then + if command -v py >/dev/null 2>&1; then + if py -3 -c "import sys; sys.stdout.write('Python found!!\n')" 2>/dev/null; then + PY='py -3' + break + fi + fi + continue + fi + + candidate_path=$(command -v "$candidate" 2>/dev/null || true) + if [ -n "$candidate_path" ]; then + case "$candidate_path" in + *WindowsApps*|*windowsapps*) continue ;; + esac + + if "$candidate" -c "import sys; sys.stdout.write('ok')" 2>/dev/null; then + PY="$candidate" + break + fi + fi +done + +if [ -z "$PY" ]; then + echo "ERROR: No Python 3.10+ found. Install Python or ensure it's in PATH." >&2 + exit 1 +fi + +# Detect container runtime (Podman or Docker) +CONTAINER_CMD="" +COMPOSE_CMD="" +if command -v podman &> /dev/null; then + CONTAINER_CMD="podman" + if command -v podman-compose &> /dev/null; then + COMPOSE_CMD="podman-compose" + else + echo "ERROR: podman-compose not found. Install it:" >&2 + echo " pip install podman-compose" >&2 + exit 1 + fi +elif command -v docker &> /dev/null; then + CONTAINER_CMD="docker" + if command -v docker-compose &> /dev/null; then + COMPOSE_CMD="docker-compose" + elif docker compose version &> /dev/null; then + COMPOSE_CMD="docker compose" + else + echo "ERROR: docker-compose not found. Install it:" >&2 + echo " https://docs.docker.com/compose/install/" >&2 + exit 1 + fi +else + echo "ERROR: Neither Podman nor Docker found. Install one of them:" >&2 + echo " Podman: https://podman.io/getting-started/installation" >&2 + echo " Docker: https://docs.docker.com/get-docker/" >&2 + exit 1 +fi + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +CYAN='\033[0;36m' +GRAY='\033[0;37m' +NC='\033[0m' + +echo -e "${CYAN}==================================================================" +echo -e " MentorMe Plagiarism Checker - Local Development Setup" +echo -e " Container Runtime: ${CONTAINER_CMD}" +echo -e " Compose File: ${COMPOSE_FILE}" +echo -e "==================================================================${NC}" +echo "" + +# Validate compose file exists +if [ ! -f "$COMPOSE_FILE" ]; then + echo -e "${RED}ERROR: $COMPOSE_FILE not found${NC}" + exit 1 +fi + +# Load configuration from .env if it exists +if [ -f ".env" ]; then + set -a + source <(grep -v '^#' .env | grep -v '^$' | sed 's/\r$//') + set +a +fi + +# Configuration (with defaults) +POSTGRES_CONTAINER="plg-postgres" +RABBITMQ_CONTAINER="plg-rabbitmq" +POSTGRES_PORT="${POSTGRES_PORT:-5432}" +RABBITMQ_PORT="${RABBITMQ_PORT:-5672}" +RABBITMQ_MGMT_PORT="${RABBITMQ_MANAGEMENT_PORT:-15672}" +POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-postgres}" +POSTGRES_DB="${POSTGRES_DB:-plagiarism_db}" +POSTGRES_USER="${POSTGRES_USER:-postgres}" +RABBITMQ_USER="${RABBITMQ_USER:-admin}" +RABBITMQ_PASS="${RABBITMQ_PASS:-admin123}" +CLIP_MODEL_URL="${CLIP_MODEL_URL:-https://huggingface.co/laion/CLIP-ViT-L-14-laion2B-s32B-b82K/resolve/main/open_clip_pytorch_model.bin}" + +stop_existing_containers() { + local containers_exist=false + + # Check if compose stack is running + if $COMPOSE_CMD -f $COMPOSE_FILE ps 2>/dev/null | grep -q "Up\|running"; then + containers_exist=true + fi + + if [ "$containers_exist" = true ]; then + echo -e "${YELLOW}Existing containers found:${NC}" + $COMPOSE_CMD -f $COMPOSE_FILE ps + echo "" + echo -e "${YELLOW}This will stop and remove existing containers.${NC}" + read -p "Continue? (y/N): " -n 1 -r + echo + + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo -e "${RED}Aborted by user${NC}" + # exit 0 + else + echo -e "${CYAN}Stopping existing containers...${NC}" + $COMPOSE_CMD -f $COMPOSE_FILE down + fi + fi + + echo -e "${GREEN}[OK] Ready to start containers${NC}" +} + +start_containers() { + echo -e "${CYAN}Starting containers with $COMPOSE_FILE...${NC}" + + # Determine which services to start + #local services="postgres rabbitmq pgadmin plagiarism-checker" + local services="postgres rabbitmq plagiarism-checker" + + if [ "$START_API" -eq 1 ]; then + services="$services api" + echo -e "${CYAN}Including API service${NC}" + + # Force rebuild API container to ensure it uses Dockerfile.api (not Dockerfile) + echo -e "${YELLOW}Rebuilding API container with Dockerfile.api...${NC}" + $COMPOSE_CMD -f $COMPOSE_FILE build --no-cache api + + if [ $? -ne 0 ]; then + echo -e "${RED}ERROR: Failed to build API container${NC}" + exit 1 + fi + echo -e "${GREEN}[OK] API container rebuilt${NC}" + fi + + $COMPOSE_CMD -f $COMPOSE_FILE up -d $services + + if [ $? -ne 0 ]; then + echo -e "${RED}ERROR: Failed to start containers${NC}" + exit 1 + fi + + echo -e "${GREEN}[OK] Containers started${NC}" +} + +wait_for_postgres() { + echo -e "${YELLOW}Waiting for PostgreSQL...${NC}" + + local max_attempts=30 + local attempt=0 + + while [ $attempt -lt $max_attempts ]; do + attempt=$((attempt + 1)) + + if $CONTAINER_CMD exec $POSTGRES_CONTAINER pg_isready -U $POSTGRES_USER &>/dev/null; then + echo -e "${GREEN}[OK] PostgreSQL ready${NC}" + return 0 + fi + + sleep 2 + done + + echo -e "${RED}ERROR: PostgreSQL timeout${NC}" + exit 1 +} + +wait_for_rabbitmq() { + echo -e "${YELLOW}Waiting for RabbitMQ...${NC}" + + local max_attempts=30 + local attempt=0 + + while [ $attempt -lt $max_attempts ]; do + attempt=$((attempt + 1)) + + if curl -s -f http://localhost:$RABBITMQ_MGMT_PORT &>/dev/null; then + echo -e "${GREEN}[OK] RabbitMQ ready${NC}" + return 0 + fi + + sleep 2 + done + + echo -e "${RED}ERROR: RabbitMQ timeout${NC}" + exit 1 +} + +wait_for_api() { + if [ "$START_API" -ne 1 ]; then + return 0 + fi + + echo -e "${YELLOW}Waiting for API service...${NC}" + + local max_attempts=30 + local attempt=0 + + while [ $attempt -lt $max_attempts ]; do + attempt=$((attempt + 1)) + + if curl -s -f http://localhost:8000/health &>/dev/null; then + echo -e "${GREEN}[OK] API service ready${NC}" + return 0 + fi + + sleep 2 + done + + echo -e "${RED}ERROR: API service timeout${NC}" + echo -e "${YELLOW}Checking API container logs:${NC}" + $COMPOSE_CMD -f $COMPOSE_FILE logs --tail=50 api + exit 1 +} + +initialize_database() { + if [ ! -f "database/init.sql" ]; then + echo -e "${YELLOW}WARNING: database/init.sql not found${NC}" + return + fi + + echo -e "${CYAN}Initializing database...${NC}" + + # Run init.sql + if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < database/init.sql 2>/dev/null; then + echo -e "${GREEN}[OK] Database schema initialized${NC}" + else + echo -e "${GRAY}Database schema already exists${NC}" + fi + + # Create migrations tracking table if it doesn't exist + $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " + CREATE TABLE IF NOT EXISTS schema_migrations ( + id SERIAL PRIMARY KEY, + migration_name VARCHAR(255) UNIQUE NOT NULL, + applied_at TIMESTAMP DEFAULT NOW() + ); + " 2>/dev/null + + # Run migration scripts + if [ -d "database/migrations" ]; then + local migration_count=0 + for migration_file in database/migrations/*.sql; do + if [ -f "$migration_file" ]; then + local migration_name=$(basename "$migration_file") + + # Check if migration already applied + local already_applied=$($CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -t -c " + SELECT COUNT(*) FROM schema_migrations WHERE migration_name = '$migration_name'; + " 2>/dev/null | tr -d '[:space:]') + + if [ "$already_applied" = "0" ]; then + echo -e "${CYAN}Applying migration: $migration_name${NC}" + + if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < "$migration_file" 2>/dev/null; then + # Record migration as applied + $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " + INSERT INTO schema_migrations (migration_name) VALUES ('$migration_name'); + " 2>/dev/null + echo -e "${GREEN}[OK] Applied: $migration_name${NC}" + migration_count=$((migration_count + 1)) + else + echo -e "${YELLOW}WARNING: Failed to apply $migration_name${NC}" + fi + fi + fi + done + + if [ $migration_count -eq 0 ]; then + echo -e "${GRAY}All migrations already applied${NC}" + else + echo -e "${GREEN}[OK] Applied $migration_count migration(s)${NC}" + fi + fi +} + +create_env_file() { + if [ -f ".env" ]; then + echo -e "${GREEN}[OK] .env exists${NC}" + return + fi + + if [ ! -f ".env.example" ]; then + echo -e "${RED}ERROR: .env.example not found${NC}" + exit 1 + fi + + echo -e "${CYAN}Creating .env from template...${NC}" + cp .env.example .env + + # Keep service names for docker-compose (containers communicate via service names) + # No transformation needed - .env.example already has correct service names + + echo -e "${GREEN}[OK] .env created${NC}" +} + +show_summary() { + echo -e "\n${CYAN}==================================================================" + echo -e " Environment Ready!" + echo -e "==================================================================${NC}" + echo "" + echo -e "${GREEN}✓ Services Running:${NC}" + echo -e " PostgreSQL: localhost:$POSTGRES_PORT (with pgvector)" + echo -e " RabbitMQ: localhost:$RABBITMQ_PORT" + echo -e " RabbitMQ UI: http://localhost:$RABBITMQ_MGMT_PORT ($RABBITMQ_USER/$RABBITMQ_PASS)" + + if [ "$START_API" -eq 1 ]; then + echo -e " API: http://localhost:8000" + echo -e " API Docs: http://localhost:8000/docs" + fi + + echo "" + echo -e "${CYAN}💡 Quick Start:${NC}" + echo -e " ${YELLOW}./start-dev-env.sh --full-setup${NC} ${GRAY}# Development mode (default)${NC}" + echo -e " ${YELLOW}./start-dev-env.sh --with-api${NC} ${GRAY}# Include API container${NC}" + echo -e " ${YELLOW}./start-dev-env.sh --prod${NC} ${GRAY}# Production mode${NC}" + echo "" + echo -e "${CYAN}📋 Manual Setup:${NC}" + echo -e " 1. ${YELLOW}${PY} -m venv venv${NC}" + + case "${OS_TYPE}" in + windows-msys) echo -e " ${YELLOW}source venv/Scripts/activate${NC}" ;; + *) echo -e " ${YELLOW}source venv/bin/activate${NC}" ;; + esac + + echo -e " 2. ${YELLOW}${PY} -m pip install -r requirements.txt${NC}" + echo -e " 3. ${YELLOW}${PY} app.py${NC}" + echo "" + echo -e "${CYAN}🔧 Container Commands:${NC}" + echo -e " ${GRAY}Logs: $COMPOSE_CMD -f $COMPOSE_FILE logs -f${NC}" + echo -e " ${GRAY}Stop: $COMPOSE_CMD -f $COMPOSE_FILE stop${NC}" + echo -e " ${GRAY}Remove: $COMPOSE_CMD -f $COMPOSE_FILE down${NC}" + echo "" + echo -e "${CYAN}==================================================================${NC}" +} + +main() { + create_env_file + stop_existing_containers + start_containers + wait_for_postgres + wait_for_rabbitmq + wait_for_api + initialize_database + show_summary + echo -e "\n${GRAY}Containers running in background. Press Ctrl+C to exit this script.${NC}" +} + +setup_python_environment() { + echo -e "\n${CYAN}==================================================================" + echo -e " Full Setup: Python Environment" + echo -e "==================================================================${NC}" + + if [ -d "venv" ]; then + echo -e "${GRAY}Virtual environment exists${NC}" + else + echo -e "${CYAN}Creating virtual environment...${NC}" + $PY -m venv venv + echo -e "${GREEN}[OK] venv created${NC}" + fi + + echo -e "${CYAN}Activating virtual environment...${NC}" + if [ "${OS_TYPE}" = "windows-msys" ]; then + source venv/Scripts/activate + else + source venv/bin/activate + fi + + echo -e "${CYAN}Installing dependencies (5-10 minutes)...${NC}" + python -m pip install --upgrade pip setuptools wheel + python -m pip install -r requirements.txt + echo -e "${GREEN}[OK] Dependencies installed${NC}" + + echo -e "${CYAN}Creating directories...${NC}" + mkdir -p data/reference_images data/models/clip logs + echo -e "${GREEN}[OK] Directories created${NC}" + + # Download CLIP model using curl + echo -e "${CYAN}Checking CLIP model...${NC}" + if [ ! -f "data/models/clip/open_clip_pytorch_model.bin" ]; then + echo -e "${YELLOW}Downloading CLIP model (this may take a while)...${NC}" + curl -L -o data/models/clip/open_clip_pytorch_model.bin \ + ${CLIP_MODEL_URL} || { + echo -e "${YELLOW}WARNING: CLIP model download failed, will download on first run${NC}" + } + + if [ -f "data/models/clip/open_clip_pytorch_model.bin" ]; then + echo -e "${GREEN}[OK] CLIP model downloaded${NC}" + fi + else + echo -e "${GRAY}CLIP model already exists${NC}" + fi + + echo -e "${CYAN}Verifying environment...${NC}" + python -c " +import open_clip, asyncpg, aio_pika, PIL, imagehash +print('✓ All imports successful') +" || { + echo -e "${RED}ERROR: Environment verification failed${NC}" + exit 1 + } + + echo -e "\n${CYAN}==================================================================" + echo -e " Setup Complete!" + echo -e "==================================================================${NC}" + echo -e "\n${GREEN}✓ Next Steps:${NC}" + echo -e "\n${CYAN}Terminal 1 - Worker:${NC}" + [ "${OS_TYPE}" = "windows-msys" ] && echo -e " ${YELLOW}source venv/Scripts/activate${NC}" || echo -e " ${YELLOW}source venv/bin/activate${NC}" + echo -e " ${YELLOW}python app.py${NC}" + echo -e "\n${CYAN}Terminal 2 - API:${NC}" + [ "${OS_TYPE}" = "windows-msys" ] && echo -e " ${YELLOW}source venv/Scripts/activate${NC}" || echo -e " ${YELLOW}source venv/bin/activate${NC}" + echo -e " ${YELLOW}cd api && uvicorn api:app --reload --host 0.0.0.0 --port 8000${NC}" + echo -e "\n${CYAN}API Docs:${NC} ${YELLOW}http://localhost:8000/docs${NC}" + echo "" +} + +main + +if [ "$FULL_SETUP" -eq 1 ]; then + setup_python_environment +fi diff --git a/start-prod-env.sh b/start-prod-env.sh new file mode 100755 index 0000000..b9b8e0c --- /dev/null +++ b/start-prod-env.sh @@ -0,0 +1,444 @@ +#!/usr/bin/env bash +# Local Development Startup Script +# Starts PostgreSQL and RabbitMQ containers using docker-compose + +set -e +FULL_SETUP=0 +START_API=1 +COMPOSE_FILE="docker-compose-prod.yml" + +while [[ "$#" -gt 0 ]]; do + case "$1" in + --full-setup) FULpodmanL_SETUP=1; shift ;; + --with-api) START_API=1; shift ;; + --prod) COMPOSE_FILE="docker-compose-prod.yml"; shift ;; + --dev) COMPOSE_FILE="docker-compose-dev.yml"; shift ;; + *) break ;; + esac +done + +# Detect platform +OS_TYPE="unknown" +UNAME_OUT=$(uname -s 2>/dev/null || true) +case "${UNAME_OUT}" in + MINGW*|MSYS*|CYGWIN*) OS_TYPE="windows-msys" ;; + Darwin) OS_TYPE="macos" ;; + Linux) OS_TYPE="linux" ;; + *) OS_TYPE="unix" ;; +esac + +# Detect Python executable +PY="" +if [ "${OS_TYPE}" = "windows-msys" ]; then + PY_CANDIDATES=("py" "python3" "python" "python.exe") +else + PY_CANDIDATES=("python3" "python" "py" "python.exe") +fi + +for candidate in "${PY_CANDIDATES[@]}"; do + if [ "$candidate" = "py" ]; then + if command -v py >/dev/null 2>&1; then + if py -3 -c "import sys; sys.stdout.write('Python found!!\n')" 2>/dev/null; then + PY='py -3' + break + fi + fi + continue + fi + + candidate_path=$(command -v "$candidate" 2>/dev/null || true) + if [ -n "$candidate_path" ]; then + case "$candidate_path" in + *WindowsApps*|*windowsapps*) continue ;; + esac + + if "$candidate" -c "import sys; sys.stdout.write('ok')" 2>/dev/null; then + PY="$candidate" + break + fi + fi +done + +if [ -z "$PY" ]; then + echo "ERROR: No Python 3.10+ found. Install Python or ensure it's in PATH." >&2 + exit 1 +fi + +# Detect container runtime (Podman or Docker) +CONTAINER_CMD="" +COMPOSE_CMD="" +if command -v podman &> /dev/null; then + CONTAINER_CMD="podman" + if command -v podman-compose &> /dev/null; then + COMPOSE_CMD="podman-compose" + else + echo "ERROR: podman-compose not found. Install it:" >&2 + echo " pip install podman-compose" >&2 + exit 1 + fi +elif command -v docker &> /dev/null; then + CONTAINER_CMD="docker" + if command -v docker-compose &> /dev/null; then + COMPOSE_CMD="docker-compose" + elif docker compose version &> /dev/null; then + COMPOSE_CMD="docker compose" + else + echo "ERROR: docker-compose not found. Install it:" >&2 + echo " https://docs.docker.com/compose/install/" >&2 + exit 1 + fi +else + echo "ERROR: Neither Podman nor Docker found. Install one of them:" >&2 + echo " Podman: https://podman.io/getting-started/installation" >&2 + echo " Docker: https://docs.docker.com/get-docker/" >&2 + exit 1 +fi + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +CYAN='\033[0;36m' +GRAY='\033[0;37m' +NC='\033[0m' + +echo -e "${CYAN}==================================================================" +echo -e " Plagiarism Checker - PROD Development Setup" +echo -e " Container Runtime: ${CONTAINER_CMD}" +echo -e " Compose File: ${COMPOSE_FILE}" +echo -e "==================================================================${NC}" +echo "" + +# Validate compose file exists +if [ ! -f "$COMPOSE_FILE" ]; then + echo -e "${RED}ERROR: $COMPOSE_FILE not found${NC}" + exit 1 +fi + +# Load configuration from .env if it exists +if [ -f ".env" ]; then + set -a + source <(grep -v '^#' .env | grep -v '^$' | sed 's/\r$//') + set +a +fi + +# Configuration (with defaults) +POSTGRES_CONTAINER="plg-postgres" +POSTGRES_PORT="${POSTGRES_PORT:-5432}" +POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-postgres}" +POSTGRES_DB="${POSTGRES_DB:-plagiarism_db}" +POSTGRES_USER="${POSTGRES_USER:-postgres}" +CLIP_MODEL_URL="${CLIP_MODEL_URL:-https://huggingface.co/laion/CLIP-ViT-L-14-laion2B-s32B-b82K/resolve/main/open_clip_pytorch_model.bin}" + +stop_existing_containers() { + local containers_exist=false + + # Check if compose stack is running + if $COMPOSE_CMD -f $COMPOSE_FILE ps 2>/dev/null | grep -q "Up\|running"; then + containers_exist=true + fi + + if [ "$containers_exist" = true ]; then + echo -e "${YELLOW}Existing containers found:${NC}" + $COMPOSE_CMD -f $COMPOSE_FILE ps + echo "" + echo -e "${YELLOW}This will stop and remove existing containers.${NC}" + read -p "Continue? (y/N): " -n 1 -r + echo + + if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo -e "${RED}Aborted by user${NC}" + # exit 0 + else + echo -e "${CYAN}Stopping existing containers...${NC}" + $COMPOSE_CMD -f $COMPOSE_FILE down + fi + fi + + echo -e "${GREEN}[OK] Ready to start containers${NC}" +} + +start_containers() { + echo -e "${CYAN}Starting containers with $COMPOSE_FILE...${NC}" + + # Determine which services to start + #local services="postgres rabbitmq pgadmin plagiarism-checker" + local services="postgres plagiarism-checker" + + if [ "$START_API" -eq 1 ]; then + services="$services api" + echo -e "${CYAN}Including API service${NC}" + + # Force rebuild API container to ensure it uses Dockerfile.api (not Dockerfile) + echo -e "${YELLOW}Rebuilding API container with Dockerfile.api...${NC}" + $COMPOSE_CMD -f $COMPOSE_FILE build --no-cache api + + if [ $? -ne 0 ]; then + echo -e "${RED}ERROR: Failed to build API container${NC}" + exit 1 + fi + echo -e "${GREEN}[OK] API container rebuilt${NC}" + fi + + $COMPOSE_CMD -f $COMPOSE_FILE up -d $services + + if [ $? -ne 0 ]; then + echo -e "${RED}ERROR: Failed to start containers${NC}" + exit 1 + fi + + echo -e "${GREEN}[OK] Containers started${NC}" +} + +wait_for_postgres() { + echo -e "${YELLOW}Waiting for PostgreSQL...${NC}" + + local max_attempts=30 + local attempt=0 + + while [ $attempt -lt $max_attempts ]; do + attempt=$((attempt + 1)) + + if $CONTAINER_CMD exec $POSTGRES_CONTAINER pg_isready -U $POSTGRES_USER &>/dev/null; then + echo -e "${GREEN}[OK] PostgreSQL ready${NC}" + return 0 + fi + + sleep 2 + done + + echo -e "${RED}ERROR: PostgreSQL timeout${NC}" + exit 1 +} + + +wait_for_api() { + if [ "$START_API" -ne 1 ]; then + return 0 + fi + + echo -e "${YELLOW}Waiting for API service...${NC}" + + local max_attempts=30 + local attempt=0 + + while [ $attempt -lt $max_attempts ]; do + attempt=$((attempt + 1)) + + if curl -s -f http://localhost:8000/health &>/dev/null; then + echo -e "${GREEN}[OK] API service ready${NC}" + return 0 + fi + + sleep 2 + done + + echo -e "${RED}ERROR: API service timeout${NC}" + echo -e "${YELLOW}Checking API container logs:${NC}" + $COMPOSE_CMD -f $COMPOSE_FILE logs --tail=50 api + exit 1 +} + +initialize_database() { + if [ ! -f "database/init.sql" ]; then + echo -e "${YELLOW}WARNING: database/init.sql not found${NC}" + return + fi + + echo -e "${CYAN}Initializing database...${NC}" + + # Run init.sql + if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < database/init.sql 2>/dev/null; then + echo -e "${GREEN}[OK] Database schema initialized${NC}" + else + echo -e "${GRAY}Database schema already exists${NC}" + fi + + # Create migrations tracking table if it doesn't exist + $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " + CREATE TABLE IF NOT EXISTS schema_migrations ( + id SERIAL PRIMARY KEY, + migration_name VARCHAR(255) UNIQUE NOT NULL, + applied_at TIMESTAMP DEFAULT NOW() + ); + " 2>/dev/null + + # Run migration scripts + if [ -d "database/migrations" ]; then + local migration_count=0 + for migration_file in database/migrations/*.sql; do + if [ -f "$migration_file" ]; then + local migration_name=$(basename "$migration_file") + + # Check if migration already applied + local already_applied=$($CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -t -c " + SELECT COUNT(*) FROM schema_migrations WHERE migration_name = '$migration_name'; + " 2>/dev/null | tr -d '[:space:]') + + if [ "$already_applied" = "0" ]; then + echo -e "${CYAN}Applying migration: $migration_name${NC}" + + if $CONTAINER_CMD exec -i $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB < "$migration_file" 2>/dev/null; then + # Record migration as applied + $CONTAINER_CMD exec $POSTGRES_CONTAINER psql -U $POSTGRES_USER -d $POSTGRES_DB -c " + INSERT INTO schema_migrations (migration_name) VALUES ('$migration_name'); + " 2>/dev/null + echo -e "${GREEN}[OK] Applied: $migration_name${NC}" + migration_count=$((migration_count + 1)) + else + echo -e "${YELLOW}WARNING: Failed to apply $migration_name${NC}" + fi + fi + fi + done + + if [ $migration_count -eq 0 ]; then + echo -e "${GRAY}All migrations already applied${NC}" + else + echo -e "${GREEN}[OK] Applied $migration_count migration(s)${NC}" + fi + fi +} + +create_env_file() { + if [ -f ".env" ]; then + echo -e "${GREEN}[OK] .env exists${NC}" + return + fi + + if [ ! -f ".env.example" ]; then + echo -e "${RED}ERROR: .env.example not found${NC}" + exit 1 + fi + + echo -e "${CYAN}Creating .env from template...${NC}" + cp .env.example .env + + # Keep service names for docker-compose (containers communicate via service names) + # No transformation needed - .env.example already has correct service names + + echo -e "${GREEN}[OK] .env created${NC}" +} + +show_summary() { + echo -e "\n${CYAN}==================================================================" + echo -e " Environment Ready!" + echo -e "==================================================================${NC}" + echo "" + echo -e "${GREEN}✓ Services Running:${NC}" + echo -e " PostgreSQL: localhost:$POSTGRES_PORT (with pgvector)" + + if [ "$START_API" -eq 1 ]; then + echo -e " API: http://localhost:8000" + echo -e " API Docs: http://localhost:8000/docs" + fi + + echo "" + echo -e "${CYAN}💡 Quick Start:${NC}" + echo -e " ${YELLOW}./start-dev-env.sh --full-setup${NC} ${GRAY}# Development mode (default)${NC}" + echo -e " ${YELLOW}./start-dev-env.sh --with-api${NC} ${GRAY}# Include API container${NC}" + echo -e " ${YELLOW}./start-dev-env.sh --prod${NC} ${GRAY}# Production mode${NC}" + echo "" + echo -e "${CYAN}📋 Manual Setup:${NC}" + echo -e " 1. ${YELLOW}${PY} -m venv venv${NC}" + + case "${OS_TYPE}" in + windows-msys) echo -e " ${YELLOW}source venv/Scripts/activate${NC}" ;; + *) echo -e " ${YELLOW}source venv/bin/activate${NC}" ;; + esac + + echo -e " 2. ${YELLOW}${PY} -m pip install -r requirements.txt${NC}" + echo -e " 3. ${YELLOW}${PY} app.py${NC}" + echo "" + echo -e "${CYAN}🔧 Container Commands:${NC}" + echo -e " ${GRAY}Logs: $COMPOSE_CMD -f $COMPOSE_FILE logs -f${NC}" + echo -e " ${GRAY}Stop: $COMPOSE_CMD -f $COMPOSE_FILE stop${NC}" + echo -e " ${GRAY}Remove: $COMPOSE_CMD -f $COMPOSE_FILE down${NC}" + echo "" + echo -e "${CYAN}==================================================================${NC}" +} + +main() { + create_env_file + stop_existing_containers + start_containers + wait_for_postgres + wait_for_api + initialize_database + show_summary + echo -e "\n${GRAY}Containers running in background. Press Ctrl+C to exit this script.${NC}" +} + +setup_python_environment() { + echo -e "\n${CYAN}==================================================================" + echo -e " Full Setup: Python Environment" + echo -e "==================================================================${NC}" + + if [ -d "venv" ]; then + echo -e "${GRAY}Virtual environment exists${NC}" + else + echo -e "${CYAN}Creating virtual environment...${NC}" + $PY -m venv venv + echo -e "${GREEN}[OK] venv created${NC}" + fi + + echo -e "${CYAN}Activating virtual environment...${NC}" + if [ "${OS_TYPE}" = "windows-msys" ]; then + source venv/Scripts/activate + else + source venv/bin/activate + fi + + echo -e "${CYAN}Installing dependencies (5-10 minutes)...${NC}" + python -m pip install --upgrade pip setuptools wheel + python -m pip install -r requirements.txt + echo -e "${GREEN}[OK] Dependencies installed${NC}" + + echo -e "${CYAN}Creating directories...${NC}" + mkdir -p data/reference_images data/models/clip logs + echo -e "${GREEN}[OK] Directories created${NC}" + + # Download CLIP model using curl + echo -e "${CYAN}Checking CLIP model...${NC}" + if [ ! -f "data/models/clip/open_clip_pytorch_model.bin" ]; then + echo -e "${YELLOW}Downloading CLIP model (this may take a while)...${NC}" + curl -L -o data/models/clip/open_clip_pytorch_model.bin \ + ${CLIP_MODEL_URL} || { + echo -e "${YELLOW}WARNING: CLIP model download failed, will download on first run${NC}" + } + + if [ -f "data/models/clip/open_clip_pytorch_model.bin" ]; then + echo -e "${GREEN}[OK] CLIP model downloaded${NC}" + fi + else + echo -e "${GRAY}CLIP model already exists${NC}" + fi + + echo -e "${CYAN}Verifying environment...${NC}" + python -c " +import open_clip, asyncpg, aio_pika, PIL, imagehash +print('✓ All imports successful') +" || { + echo -e "${RED}ERROR: Environment verification failed${NC}" + exit 1 + } + + echo -e "\n${CYAN}==================================================================" + echo -e " Setup Complete!" + echo -e "==================================================================${NC}" + echo -e "\n${GREEN}✓ Next Steps:${NC}" + echo -e "\n${CYAN}Terminal 1 - Worker:${NC}" + [ "${OS_TYPE}" = "windows-msys" ] && echo -e " ${YELLOW}source venv/Scripts/activate${NC}" || echo -e " ${YELLOW}source venv/bin/activate${NC}" + echo -e " ${YELLOW}python app.py${NC}" + echo -e "\n${CYAN}Terminal 2 - API:${NC}" + [ "${OS_TYPE}" = "windows-msys" ] && echo -e " ${YELLOW}source venv/Scripts/activate${NC}" || echo -e " ${YELLOW}source venv/bin/activate${NC}" + echo -e " ${YELLOW}cd api && uvicorn api:app --reload --host 0.0.0.0 --port 8000${NC}" + echo -e "\n${CYAN}API Docs:${NC} ${YELLOW}http://localhost:8000/docs${NC}" + echo "" +} + +main + +if [ "$FULL_SETUP" -eq 1 ]; then + setup_python_environment +fi diff --git a/tests/conftest.py b/tests/conftest.py index d41114e..26dfec9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -112,7 +112,7 @@ def sample_submission_data(sample_hashes, sample_clip_embedding): "submission_id": "SUB-001", "student_id": "ST001", "assign_id": "A001", - "img_url": "https://example.com/image.jpg", + "submission_url": "https://example.com/image.jpg", "db_record_id": "123e4567-e89b-12d3-a456-426614174000", "hashes": sample_hashes, "clip_embedding": sample_clip_embedding, @@ -194,7 +194,7 @@ def mock_fetchrow_side_effect(*args, **kwargs): "submission_id": "SUB-001", "student_id": "ST001", "assign_id": "A001", - "img_url": "https://example.com/image.jpg", + "submission_url": "https://example.com/image.jpg", "is_plagiarized": False, "plag_source": None, "plag_confidence": None, diff --git a/tests/test_db_manager.py b/tests/test_db_manager.py index cedcc89..1f56453 100644 --- a/tests/test_db_manager.py +++ b/tests/test_db_manager.py @@ -178,7 +178,9 @@ async def test_insert_submission_success(self): submission_id="SUB-001", student_id="ST001", assign_id="A001", - img_url="http://example.com/image.jpg", + submission_url="http://example.com/image.jpg", + submission_type="image", + submission_text=None, hashes={"phash": "abc", "dhash": "def", "ahash": "ghi"}, clip_embedding=[0.1] * 512, ) diff --git a/tests/test_processors.py b/tests/test_processors.py index 49c9b00..4ba6271 100644 --- a/tests/test_processors.py +++ b/tests/test_processors.py @@ -1,5 +1,5 @@ """ -Unit tests for image_processor and text_processor. +Unit tests for ImageProcessor. """ import pytest @@ -7,7 +7,6 @@ import json from processors.image_processor import ImageProcessor -from processors.text_processor import TextProcessor class TestImageProcessor: @@ -60,15 +59,15 @@ async def test_process_no_image_url(self, image_processor): result = await image_processor.process(data) - assert result == {"error": "No image URL provided"} + assert result == {"error": "No submission_url provided"} @pytest.mark.asyncio - async def test_process_with_img_url(self, image_processor, mock_image_worker): - """Test processing submission with img_url.""" + async def test_process_with_submission_url(self, image_processor, mock_image_worker): + """Test processing submission with submission_url.""" data = { "submission_id": "SUB-001", "student_id": "ST001", - "img_url": "https://example.com/image.jpg", + "submission_url": "https://example.com/image.jpg", } expected_result = { @@ -89,7 +88,7 @@ async def test_process_with_image_url(self, image_processor, mock_image_worker): data = { "submission_id": "SUB-002", "student_id": "ST002", - "image_url": "https://example.com/photo.png", + "submission_url": "https://example.com/photo.png", } expected_result = { @@ -111,7 +110,7 @@ async def test_process_worker_returns_none( """Test when worker returns None.""" data = { "submission_id": "SUB-003", - "img_url": "https://example.com/image.jpg", + "submission_url": "https://example.com/image.jpg", } mock_image_worker.process_submission.return_value = None @@ -127,7 +126,7 @@ async def test_process_worker_returns_json_string( """Test when worker returns JSON string.""" data = { "submission_id": "SUB-004", - "img_url": "https://example.com/image.jpg", + "submission_url": "https://example.com/image.jpg", } expected_result = { @@ -148,7 +147,7 @@ async def test_process_worker_returns_invalid_json( """Test when worker returns invalid JSON string.""" data = { "submission_id": "SUB-005", - "img_url": "https://example.com/image.jpg", + "submission_url": "https://example.com/image.jpg", } mock_image_worker.process_submission.return_value = "not valid json {{" @@ -165,7 +164,7 @@ async def test_process_worker_raises_exception( """Test when worker raises exception.""" data = { "submission_id": "SUB-006", - "img_url": "https://example.com/image.jpg", + "submission_url": "https://example.com/image.jpg", } mock_image_worker.process_submission.side_effect = RuntimeError( @@ -186,7 +185,7 @@ async def test_process_multiple_submissions( submissions = [ { "submission_id": f"SUB-{i}", - "img_url": f"https://example.com/image{i}.jpg", + "submission_url": f"https://example.com/image{i}.jpg", } for i in range(3) ] @@ -205,68 +204,3 @@ async def test_process_multiple_submissions( assert results[1]["similarity_score"] == 0.6 assert results[2]["similarity_score"] == 0.7 - -class TestTextProcessor: - """Test cases for TextProcessor.""" - - @pytest.fixture - def text_processor(self): - """Create TextProcessor instance.""" - return TextProcessor() - - @pytest.mark.asyncio - async def test_process_text_submission(self, text_processor): - """Test processing text submission.""" - data = { - "submission_id": "TEXT-001", - "student_id": "ST001", - "payload": "This is some text to check for plagiarism.", - } - - result = await text_processor.process(data) - - assert "similarity_score" in result - assert "matched_sources" in result - assert isinstance(result["similarity_score"], (int, float)) - assert isinstance(result["matched_sources"], list) - - @pytest.mark.asyncio - async def test_process_empty_text(self, text_processor): - """Test processing empty text.""" - data = {"submission_id": "TEXT-002", "payload": ""} - - result = await text_processor.process(data) - - assert "similarity_score" in result - assert "matched_sources" in result - - @pytest.mark.asyncio - async def test_process_missing_payload(self, text_processor): - """Test processing submission without payload field.""" - data = {"submission_id": "TEXT-003"} - - result = await text_processor.process(data) - - # Should handle missing payload gracefully - assert "similarity_score" in result - assert "matched_sources" in result - - @pytest.mark.asyncio - async def test_process_returns_expected_format(self, text_processor): - """Test that result has expected format.""" - data = {"payload": "Sample text"} - - result = await text_processor.process(data) - - # Verify structure - assert isinstance(result, dict) - assert "similarity_score" in result - assert "matched_sources" in result - - # Verify types - assert isinstance(result["similarity_score"], (int, float)) - assert isinstance(result["matched_sources"], list) - - # Verify reasonable values (based on dummy implementation) - assert 0 <= result["similarity_score"] <= 100 - assert all(isinstance(source, str) for source in result["matched_sources"]) diff --git a/tests/test_rmq_client.py b/tests/test_rmq_client.py index b97f8e5..e7b515e 100644 --- a/tests/test_rmq_client.py +++ b/tests/test_rmq_client.py @@ -22,6 +22,7 @@ def rmq_client(self): { "RABBITMQ_HOST": "test-host", "RABBITMQ_PORT": "5672", + "RABBITMQ_VHOST": "/", "RABBITMQ_USER": "testuser", "RABBITMQ_PASS": "testpass", "SUBMISSION_QUEUE": "test_submissions", @@ -35,7 +36,19 @@ def rmq_client(self): def test_init_default_values(self): """Test initialization with default values from config.""" - client = RabbitMQClient() + with patch.dict( + "os.environ", + { + "RABBITMQ_HOST": "localhost", + "RABBITMQ_PORT": "5672", + "RABBITMQ_VHOST": "/", + "SUBMISSION_QUEUE": "plagiarism_submissions", + "FEEDBACK_QUEUE": "plagiarism_feedback", + "RABBITMQ_PREFETCH_COUNT": "5", + "MAX_RETRIES": "3", + }, + ): + client = RabbitMQClient() # Values come from config, not hardcoded defaults assert client.RABBITMQ_HOST in ["localhost", "rabbitmq"] @@ -80,13 +93,16 @@ async def test_connect_success(self, rmq_client): # Verify connection was established mock_aio_pika.connect_robust.assert_called_once_with( - rmq_client.RABBITMQ_URL + rmq_client.RABBITMQ_URL, heartbeat=600, timeout=30 ) mock_connection.channel.assert_called_once() mock_channel.set_qos.assert_called_once_with(prefetch_count=10) # Verify queues were declared (DLQ + submission + feedback) assert mock_channel.declare_queue.call_count == 3 + for call in mock_channel.declare_queue.call_args_list: + assert call.kwargs["durable"] is True + assert "passive" not in call.kwargs @pytest.mark.asyncio async def test_connect_retry_on_failure(self, rmq_client): @@ -154,6 +170,7 @@ async def test_publish_message_success(self, rmq_client): call_args = mock_aio_pika.Message.call_args assert call_args[1]["body"] == json.dumps(message_body).encode() + assert call_args[1]["delivery_mode"] == mock_aio_pika.DeliveryMode.PERSISTENT publish_call_args = mock_exchange.publish.call_args assert publish_call_args[1]["routing_key"] == "test_feedback" @@ -248,6 +265,7 @@ async def test_start_consumer(self, rmq_client): # Verify consumer was started mock_queue.consume.assert_called_once() + assert mock_queue.consume.call_args[1]["no_ack"] is False @pytest.mark.asyncio async def test_close(self, rmq_client): diff --git a/tests/test_submission_checker.py b/tests/test_submission_checker.py index 062400a..6176cd0 100644 --- a/tests/test_submission_checker.py +++ b/tests/test_submission_checker.py @@ -55,13 +55,13 @@ async def test_reject_message(self, mock_message): mock_message.reject.assert_called_once_with(requeue=False) @pytest.mark.asyncio - async def test_auto_reject_on_exit(self, mock_message): - """Test that message is auto-rejected if not explicitly acked.""" + async def test_auto_requeue_on_exit(self, mock_message): + """Test that message is auto-requeued if not explicitly acked.""" async with MessageAckManager(mock_message): pass # Don't ack - # Should auto-reject with requeue=False - mock_message.reject.assert_called_once_with(requeue=False) + # Should auto-nack with requeue=True + mock_message.nack.assert_called_once_with(requeue=True) @pytest.mark.asyncio async def test_no_double_ack(self, mock_message): @@ -163,7 +163,7 @@ async def test_initialize(self, submission_checker, mock_db_manager): async def test_get_processor_for_image(self, submission_checker): """Test getting processor for image submission.""" submission_checker.image_processor = MagicMock() - data = {"img_url": "https://example.com/image.jpg"} + data = {"submission_type": "image", "submission_url": "https://example.com/image.jpg"} processor = submission_checker.get_processor(data) @@ -172,16 +172,15 @@ async def test_get_processor_for_image(self, submission_checker): @pytest.mark.asyncio async def test_get_processor_for_text(self, submission_checker): """Test getting processor for text submission.""" - data = {"text": "Some text content"} + data = {"submission_type": "text", "submission_text": "Some text content"} - processor = submission_checker.get_processor(data) - - assert processor is submission_checker.text_processor + with pytest.raises(ValueError, match="Unsupported submission_type"): + submission_checker.get_processor(data) @pytest.mark.asyncio async def test_get_processor_invalid_format(self, submission_checker): """Test getting processor with invalid format raises error.""" - data = {"submission_id": "SUB-001"} # No img_url or text + data = {"submission_id": "SUB-001"} # No submission_type or submission_url with pytest.raises(ValueError, match="Invalid submission format"): submission_checker.get_processor(data) @@ -190,7 +189,7 @@ async def test_get_processor_invalid_format(self, submission_checker): async def test_get_processor_not_initialized(self, submission_checker): """Test getting processor when not initialized raises error.""" submission_checker.image_processor = None - data = {"img_url": "https://example.com/image.jpg"} + data = {"submission_type": "image", "submission_url": "https://example.com/image.jpg"} with pytest.raises(RuntimeError, match="not initialized"): submission_checker.get_processor(data) @@ -216,7 +215,8 @@ async def test_process_submission_success( "submission_id": "SUB-001", "student_id": "ST001", "assign_id": "A001", - "img_url": "https://example.com/image.jpg", + "submission_type": "image", + "submission_url": "https://example.com/image.jpg", } ).encode() mock_message.ack = AsyncMock() @@ -233,23 +233,57 @@ async def test_process_submission_success( submission_checker.client.publish_message.assert_called_once() @pytest.mark.asyncio - async def test_process_submission_no_image_url( + async def test_process_submission_non_image_skips_plagiarism( self, submission_checker, mock_db_manager ): - """Test processing submission without image URL.""" + """Test non-image submissions skip plagiarism and are published as original.""" + submission_checker.image_processor = MagicMock() + mock_message = MagicMock() mock_message.body = json.dumps( - {"submission_id": "SUB-002", "student_id": "ST002"} + { + "submission_id": "SUB-006", + "student_id": "ST006", + "assign_id": "A006", + "submission_type": "text", + "submission_text": "Hello WHO ARE YOU?", + } ).encode() + mock_message.ack = AsyncMock() + mock_message.redelivered = False + mock_message.headers = {"x-delivery-count": 0} + + await submission_checker.process_submission(mock_message) + + submission_checker.image_processor.process.assert_not_called() + mock_db_manager.update_result.assert_called_once() + submission_checker.client.publish_message.assert_called_once() + mock_message.ack.assert_called_once() + + @pytest.mark.asyncio + async def test_process_submission_text_submission_without_url( + self, submission_checker, mock_db_manager + ): + """Test processing a text submission without submission_url.""" + mock_message = MagicMock() + mock_message.body = json.dumps( + { + "submission_id": "SUB-002", + "student_id": "ST002", + "assign_id": "A002", + "submission_type": "text", + "submission_text": "Hello", + } + ).encode() + mock_message.ack = AsyncMock() mock_message.nack = AsyncMock() mock_message.redelivered = False mock_message.headers = {} await submission_checker.process_submission(mock_message) - mock_db_manager.update_status.assert_called() - - mock_message.nack.assert_called_once_with(requeue=False) + mock_db_manager.update_result.assert_called_once() + mock_message.ack.assert_called_once() @pytest.mark.asyncio async def test_process_submission_invalid_json(self, submission_checker): @@ -257,11 +291,13 @@ async def test_process_submission_invalid_json(self, submission_checker): mock_message = MagicMock() mock_message.body = b"invalid json {{" mock_message.nack = AsyncMock() + mock_message.redelivered = False + mock_message.headers = {} await submission_checker.process_submission(mock_message) # Should nack without requeue - mock_message.nack.assert_called_once_with(requeue=False) + mock_message.nack.assert_called_once_with(requeue=False, submission_id="unknown") @pytest.mark.asyncio async def test_process_submission_retry_logic( @@ -280,16 +316,18 @@ async def test_process_submission_retry_logic( mock_message.body = json.dumps( { "submission_id": "SUB-003", - "img_url": "https://example.com/image.jpg", + "submission_type": "image", + "submission_url": "https://example.com/image.jpg", } ).encode() mock_message.nack = AsyncMock() mock_message.redelivered = False + mock_message.headers = {} await submission_checker.process_submission(mock_message) # Should nack with requeue (retry) - mock_message.nack.assert_called_once_with(requeue=True) + mock_message.nack.assert_called_once_with(requeue=True, submission_id="SUB-003") # Should update status with retry count mock_db_manager.update_status.assert_called() @@ -311,7 +349,8 @@ async def test_process_submission_max_retries( mock_message.body = json.dumps( { "submission_id": "SUB-004", - "img_url": "https://example.com/image.jpg", + "submission_type": "image", + "submission_url": "https://example.com/image.jpg", } ).encode() mock_message.nack = AsyncMock() @@ -322,7 +361,7 @@ async def test_process_submission_max_retries( submission_checker.client.publish_to_dlq.assert_called_once() - mock_message.nack.assert_called_once_with(requeue=False) + mock_message.nack.assert_called_once_with(requeue=False, submission_id="SUB-004") @pytest.mark.asyncio async def test_process_submission_poison_message( @@ -333,7 +372,7 @@ async def test_process_submission_poison_message( mock_message = MagicMock() mock_message.body = json.dumps( - {"submission_id": "SUB-005", "img_url": "http://example.com"} + {"submission_id": "SUB-005", "submission_type": "image", "submission_url": "http://example.com"} ).encode() mock_message.headers = {"x-delivery-count": 5} mock_message.redelivered = True @@ -343,7 +382,7 @@ async def test_process_submission_poison_message( # Should detect poison message and publish to DLQ submission_checker.client.publish_to_dlq.assert_called_once() - mock_message.nack.assert_called_once_with(requeue=False) + mock_message.nack.assert_called_once_with(requeue=False, submission_id="SUB-005") @pytest.mark.asyncio async def test_close_owns_resources( @@ -388,5 +427,5 @@ async def test_shutdown_flag(self, submission_checker): await submission_checker.process_submission(mock_message) - # Should reject message immediately - mock_message.nack.assert_called_once_with(requeue=False) + # Should requeue message immediately + mock_message.nack.assert_called_once_with(requeue=True) diff --git a/tests/test_worker.py b/tests/test_worker.py index 431c105..37de665 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -45,7 +45,7 @@ def sample_submission_data(): "submission_id": "SUB-TEST-001", "student_id": "ST-TEST-001", "assign_id": "A-TEST-001", - "img_url": "https://example.com/test.png", + "submission_url": "https://example.com/test.png", "db_record_id": 1, } @@ -187,6 +187,39 @@ async def test_download_image_invalid_url(self, mocked_worker): with pytest.raises(Exception): # Should raise InvalidImageURLError or similar await worker.download_image("not-a-url") + @pytest.mark.asyncio + async def test_download_image_storage_google_public_url(self, mocked_worker): + """Test public storage.googleapis.com image download via HTTPS fallback.""" + worker = mocked_worker + worker.gcp_credentials = None + + pil_image = await worker.download_image( + "https://storage.googleapis.com/assignment_submission/submissions/SUB-IMSUB.png" + ) + + assert pil_image is not None + assert isinstance(pil_image, Image.Image) + + @pytest.mark.asyncio + async def test_download_image_storage_google_authenticated_url(self, mocked_worker): + """Test authenticated storage.googleapis.com URL downloads via GCS helper when credentials are available.""" + worker = mocked_worker + worker.gcp_credentials = MagicMock() + + sample_image = Image.open(BytesIO(worker._test_mocks["session"].get().read.return_value)) + + with patch( + "image_worker.worker.download_from_gcs", + AsyncMock(return_value=sample_image), + ) as mock_download: + pil_image = await worker.download_image( + "https://storage.googleapis.com/assignment_submission/submissions/SUB-IMSUB.png" + ) + + assert pil_image is not None + assert isinstance(pil_image, Image.Image) + mock_download.assert_awaited_once() + class TestWorkerSubmissionProcessing: """Test submission processing workflow.""" @@ -222,7 +255,7 @@ async def test_process_submission_with_peer_check(self, mocked_worker, mock_db): "submission_id": "SUB-002", "student_id": "ST002", "assign_id": "A001", - "img_url": "https://example.com/test.png", + "submission_url": "https://example.com/test.png", "db_record_id": 2, } @@ -251,7 +284,7 @@ async def test_process_submission_with_self_check(self, mocked_worker, mock_db): "submission_id": "SUB-003", "student_id": "ST003", "assign_id": "A001", - "img_url": "https://example.com/test.png", + "submission_url": "https://example.com/test.png", "db_record_id": 3, } @@ -274,7 +307,7 @@ async def test_hash_computation(self, mocked_worker): "submission_id": "SUB-004", "student_id": "ST004", "assign_id": "A001", - "img_url": "https://example.com/test.png", + "submission_url": "https://example.com/test.png", "db_record_id": 4, } @@ -298,7 +331,7 @@ async def test_clip_embedding_generation(self, mocked_worker): "submission_id": "SUB-005", "student_id": "ST005", "assign_id": "A001", - "img_url": "https://example.com/test.png", + "submission_url": "https://example.com/test.png", "db_record_id": 5, } @@ -321,7 +354,7 @@ async def test_ai_detection_called(self, mocked_worker): "submission_id": "SUB-006", "student_id": "ST006", "assign_id": "A001", - "img_url": "https://example.com/test.png", + "submission_url": "https://example.com/test.png", "db_record_id": 6, } @@ -344,7 +377,7 @@ async def test_image_validation_called(self, mocked_worker): "submission_id": "SUB-007", "student_id": "ST007", "assign_id": "A001", - "img_url": "https://example.com/test.png", + "submission_url": "https://example.com/test.png", "db_record_id": 7, } diff --git a/utils/exceptions.py b/utils/exceptions.py index df0a713..a0b4406 100644 --- a/utils/exceptions.py +++ b/utils/exceptions.py @@ -48,6 +48,12 @@ class InvalidImageFormatError(ImageProcessingError): pass +class GCSDownloadError(ImageDownloadError): + """Failed to download image from GCS bucket (includes auth, bucket, and download errors).""" + + pass + + class ImageTooLargeError(ImageProcessingError): """Image exceeds maximum allowed size."""