Skip to content

Commit 55ab2af

Browse files
authored
Merge pull request #9 from tanzilahmed0/task-b12-celery-file-processing
Task B12: Implement Celery file processing for CSV uploads
2 parents 1d72909 + ed142f8 commit 55ab2af

4 files changed

Lines changed: 285 additions & 21 deletions

File tree

backend/api/projects.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
)
2020
from services.project_service import get_project_service
2121
from services.storage_service import storage_service
22+
from tasks.file_processing import process_csv_file
2223

2324
router = APIRouter(prefix="/projects", tags=["projects"])
2425
project_service = get_project_service()
@@ -277,6 +278,55 @@ async def get_upload_url(
277278
)
278279

279280

281+
@router.post("/{project_id}/process")
282+
async def trigger_file_processing(
283+
project_id: str, user_id: str = Depends(verify_token)
284+
) -> ApiResponse[Dict[str, str]]:
285+
"""Trigger CSV file processing for a project"""
286+
287+
try:
288+
user_uuid = uuid.UUID(user_id)
289+
project_uuid = uuid.UUID(project_id)
290+
291+
# Check if project exists and user owns it
292+
if not project_service.check_project_ownership(project_uuid, user_uuid):
293+
raise HTTPException(status_code=404, detail="Project not found")
294+
295+
# Get project to check current status
296+
project_db = project_service.get_project_by_id(project_uuid)
297+
if project_db.status == "ready":
298+
raise HTTPException(status_code=400, detail="Project already processed")
299+
300+
# Check if file exists in storage
301+
object_name = f"{user_id}/{project_id}/data.csv"
302+
if not storage_service.file_exists(object_name):
303+
raise HTTPException(
304+
status_code=400, detail="No file uploaded for processing"
305+
)
306+
307+
# Trigger Celery task
308+
task = process_csv_file.delay(project_id, user_id)
309+
310+
return ApiResponse(
311+
success=True,
312+
data={
313+
"message": "File processing started",
314+
"task_id": task.id,
315+
"project_id": project_id,
316+
},
317+
)
318+
319+
except ValueError as e:
320+
raise HTTPException(status_code=400, detail=f"Invalid project ID: {str(e)}")
321+
except HTTPException:
322+
# Re-raise HTTPExceptions without wrapping them
323+
raise
324+
except Exception as e:
325+
raise HTTPException(
326+
status_code=500, detail=f"Failed to start file processing: {str(e)}"
327+
)
328+
329+
280330
@router.get("/{project_id}/status")
281331
async def get_project_status(
282332
project_id: str, user_id: str = Depends(verify_token)

backend/services/project_service.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,20 +133,19 @@ def update_project_status(
133133
def update_project_metadata(
134134
self,
135135
project_id: uuid.UUID,
136-
csv_filename: str,
137136
row_count: int,
138137
column_count: int,
139138
columns_metadata: list,
139+
status: ProjectStatusEnum = ProjectStatusEnum.READY,
140140
) -> ProjectInDB:
141141
"""Update project metadata after file processing"""
142142
return self.update_project(
143143
project_id,
144144
ProjectUpdate(
145-
csv_filename=csv_filename,
146145
row_count=row_count,
147146
column_count=column_count,
148147
columns_metadata=columns_metadata,
149-
status=ProjectStatusEnum.READY,
148+
status=status,
150149
),
151150
)
152151

backend/tasks/file_processing.py

Lines changed: 97 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,142 @@
11
import logging
22
import os
3+
import uuid
4+
from io import StringIO
5+
from typing import Any, Dict, List, Optional
36

7+
import pandas as pd
48
from celery import current_task
59

610
from celery_app import celery_app
11+
from services.database_service import get_db_service
12+
from services.project_service import get_project_service
13+
from services.storage_service import storage_service
714

815
logger = logging.getLogger(__name__)
916

1017

1118
@celery_app.task(bind=True)
12-
def process_csv_file(self, project_id: str, file_path: str):
19+
def process_csv_file(self, project_id: str, user_id: str):
1320
"""
14-
Process uploaded CSV file - placeholder implementation for Task B2
15-
Will be fully implemented in Task B12
21+
Process uploaded CSV file for project analysis
1622
"""
1723
try:
24+
project_uuid = uuid.UUID(project_id)
25+
user_uuid = uuid.UUID(user_id)
26+
1827
# Update task state
1928
self.update_state(
2029
state="PROGRESS",
2130
meta={"current": 10, "total": 100, "status": "Starting CSV analysis..."},
2231
)
2332

24-
logger.info(f"Processing CSV file for project {project_id}: {file_path}")
33+
logger.info(f"Processing CSV file for project {project_id}")
2534

26-
# Simulate processing steps
27-
import time
35+
# Get project service
36+
project_service = get_project_service()
37+
38+
# Update project status to processing
39+
project_service.update_project_status(project_uuid, "processing")
40+
41+
# Download file from MinIO
42+
self.update_state(
43+
state="PROGRESS",
44+
meta={"current": 20, "total": 100, "status": "Downloading file..."},
45+
)
2846

29-
time.sleep(2)
47+
object_name = f"{user_id}/{project_id}/data.csv"
48+
file_content = storage_service.download_file(object_name)
3049

50+
if not file_content:
51+
raise Exception("Failed to download file from storage")
52+
53+
# Parse CSV with pandas
3154
self.update_state(
3255
state="PROGRESS",
33-
meta={"current": 50, "total": 100, "status": "Analyzing schema..."},
56+
meta={"current": 40, "total": 100, "status": "Parsing CSV..."},
3457
)
3558

36-
time.sleep(2)
59+
try:
60+
df = pd.read_csv(StringIO(file_content.decode("utf-8")))
61+
except Exception as e:
62+
raise Exception(f"Failed to parse CSV: {str(e)}")
3763

64+
# Analyze schema
3865
self.update_state(
3966
state="PROGRESS",
40-
meta={"current": 90, "total": 100, "status": "Finalizing..."},
67+
meta={"current": 60, "total": 100, "status": "Analyzing schema..."},
68+
)
69+
70+
columns_metadata = []
71+
for column in df.columns:
72+
col_type = str(df[column].dtype)
73+
74+
# Determine data type category
75+
if "int" in col_type or "float" in col_type:
76+
data_type = "number"
77+
elif "datetime" in col_type:
78+
data_type = "datetime"
79+
elif "bool" in col_type:
80+
data_type = "boolean"
81+
else:
82+
data_type = "string"
83+
84+
# Check for null values
85+
nullable = df[column].isnull().any()
86+
87+
# Get sample values (first 5 non-null values)
88+
sample_values = df[column].dropna().head(5).tolist()
89+
90+
columns_metadata.append(
91+
{
92+
"name": column,
93+
"type": data_type,
94+
"nullable": nullable,
95+
"sample_values": sample_values,
96+
}
97+
)
98+
99+
# Update project with analysis results
100+
self.update_state(
101+
state="PROGRESS",
102+
meta={"current": 80, "total": 100, "status": "Updating project..."},
103+
)
104+
105+
project_service.update_project_metadata(
106+
project_uuid,
107+
row_count=len(df),
108+
column_count=len(df.columns),
109+
columns_metadata=columns_metadata,
110+
status="ready",
111+
)
112+
113+
# Final update
114+
self.update_state(
115+
state="PROGRESS",
116+
meta={"current": 100, "total": 100, "status": "Processing complete"},
41117
)
42118

43-
# Mock result
44119
result = {
45120
"project_id": project_id,
46121
"status": "completed",
47-
"row_count": 1000,
48-
"column_count": 10,
49-
"columns_metadata": [
50-
{"name": "id", "type": "number", "nullable": False},
51-
{"name": "name", "type": "string", "nullable": False},
52-
{"name": "email", "type": "string", "nullable": True},
53-
],
122+
"row_count": len(df),
123+
"column_count": len(df.columns),
124+
"columns_metadata": columns_metadata,
54125
}
55126

56127
logger.info(f"Successfully processed CSV for project {project_id}")
57128
return result
58129

59130
except Exception as exc:
60131
logger.error(f"Error processing CSV for project {project_id}: {str(exc)}")
132+
133+
# Update project status to error
134+
try:
135+
project_service = get_project_service()
136+
project_service.update_project_status(project_uuid, "error")
137+
except:
138+
pass
139+
61140
self.update_state(
62141
state="FAILURE", meta={"error": str(exc), "project_id": project_id}
63142
)
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
from io import StringIO
2+
from unittest.mock import Mock, patch
3+
4+
import pandas as pd
5+
import pytest
6+
7+
from tasks.file_processing import process_csv_file
8+
9+
10+
class TestFileProcessing:
11+
"""Test Celery file processing tasks"""
12+
13+
@patch("tasks.file_processing.storage_service")
14+
@patch("tasks.file_processing.get_project_service")
15+
@patch.object(process_csv_file, "update_state", autospec=True)
16+
def test_process_csv_file_success(
17+
self, mock_update_state, mock_project_service, mock_storage_service
18+
):
19+
"""Test successful CSV file processing"""
20+
# Mock project service
21+
mock_service = Mock()
22+
mock_project_service.return_value = mock_service
23+
24+
# Mock storage service
25+
csv_content = """id,name,email,age
26+
1,John Doe,john@example.com,30
27+
2,Jane Smith,jane@example.com,25
28+
3,Bob Johnson,bob@example.com,35"""
29+
30+
mock_storage_service.download_file.return_value = csv_content.encode("utf-8")
31+
32+
# Call the task function directly (not through Celery wrapper)
33+
result = process_csv_file.run(
34+
"00000000-0000-0000-0000-000000000001",
35+
"00000000-0000-0000-0000-000000000002",
36+
)
37+
38+
# Verify storage service was called
39+
mock_storage_service.download_file.assert_called_once_with(
40+
"00000000-0000-0000-0000-000000000002/00000000-0000-0000-0000-000000000001/data.csv"
41+
)
42+
43+
# Verify project service was called
44+
mock_service.update_project_status.assert_called()
45+
mock_service.update_project_metadata.assert_called()
46+
47+
# Verify result
48+
assert result["project_id"] == "00000000-0000-0000-0000-000000000001"
49+
assert result["status"] == "completed"
50+
assert result["row_count"] == 3
51+
assert result["column_count"] == 4
52+
53+
# Verify column metadata
54+
columns_metadata = result["columns_metadata"]
55+
assert len(columns_metadata) == 4
56+
57+
# Check that columns are detected
58+
column_names = [col["name"] for col in columns_metadata]
59+
assert "id" in column_names
60+
assert "name" in column_names
61+
assert "email" in column_names
62+
assert "age" in column_names
63+
64+
@patch("tasks.file_processing.storage_service")
65+
@patch("tasks.file_processing.get_project_service")
66+
@patch.object(process_csv_file, "update_state", autospec=True)
67+
def test_process_csv_file_download_failure(
68+
self, mock_update_state, mock_project_service, mock_storage_service
69+
):
70+
"""Test CSV processing when file download fails"""
71+
# Mock project service
72+
mock_service = Mock()
73+
mock_project_service.return_value = mock_service
74+
75+
# Mock storage service to return None (download failure)
76+
mock_storage_service.download_file.return_value = None
77+
78+
# Call the task and expect exception
79+
with pytest.raises(Exception, match="Failed to download file from storage"):
80+
process_csv_file.run(
81+
"00000000-0000-0000-0000-000000000001",
82+
"00000000-0000-0000-0000-000000000002",
83+
)
84+
85+
@patch("tasks.file_processing.storage_service")
86+
@patch("tasks.file_processing.get_project_service")
87+
@patch.object(process_csv_file, "update_state", autospec=True)
88+
@patch("tasks.file_processing.pd.read_csv")
89+
def test_process_csv_file_parse_failure(
90+
self,
91+
mock_pandas_read_csv,
92+
mock_update_state,
93+
mock_project_service,
94+
mock_storage_service,
95+
):
96+
"""Test CSV processing when file parsing fails"""
97+
# Mock project service
98+
mock_service = Mock()
99+
mock_project_service.return_value = mock_service
100+
101+
# Mock storage service to return valid content
102+
mock_storage_service.download_file.return_value = b"valid,csv,content"
103+
104+
# Mock pandas to raise an exception
105+
mock_pandas_read_csv.side_effect = Exception("CSV parsing failed")
106+
107+
# Call the task and expect exception
108+
with pytest.raises(Exception, match="Failed to parse CSV"):
109+
process_csv_file.run(
110+
"00000000-0000-0000-0000-000000000001",
111+
"00000000-0000-0000-0000-000000000002",
112+
)
113+
114+
@patch("tasks.file_processing.storage_service")
115+
@patch("tasks.file_processing.get_project_service")
116+
@patch.object(process_csv_file, "update_state", autospec=True)
117+
def test_process_csv_file_error_handling(
118+
self, mock_update_state, mock_project_service, mock_storage_service
119+
):
120+
"""Test error handling in CSV processing"""
121+
# Mock project service to raise exception
122+
mock_service = Mock()
123+
mock_service.update_project_status.side_effect = Exception("Database error")
124+
mock_project_service.return_value = mock_service
125+
126+
# Mock storage service
127+
csv_content = """id,name
128+
1,John Doe"""
129+
mock_storage_service.download_file.return_value = csv_content.encode("utf-8")
130+
131+
# Call the task and expect exception
132+
with pytest.raises(Exception, match="Database error"):
133+
process_csv_file.run(
134+
"00000000-0000-0000-0000-000000000001",
135+
"00000000-0000-0000-0000-000000000002",
136+
)

0 commit comments

Comments
 (0)