-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
236 lines (202 loc) · 10.6 KB
/
worker.py
File metadata and controls
236 lines (202 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import time
import signal
import sys
import json
import asyncio
from io import StringIO
from jobs import get_next_job, mark_job_done, mark_job_failed
from database import get_db_cursor
from helpers import check_for_sequence_data, bulk_send_to_elastic
from logging import getLogger
logger = getLogger(__name__)
print("Starting job worker...")
# Handle shutdown
def shutdown(signum, frame):
print("Worker shutting down...")
sys.exit(0)
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
async def process_sequence_validation(job):
payload = json.loads(job['payload']) if isinstance(job['payload'], str) else job['payload']
job_data = payload['data']
submission_id = job_data['submission_id']
isolate_ids = job_data['isolate_ids']
split_on_fasta_headers = job_data.get('split_on_fasta_headers', True)
print(f"Processing sequence validation for submission {submission_id} with {len(isolate_ids)} isolates")
logger.info(f"Processing sequence validation for submission {submission_id} with {len(isolate_ids)} isolates")
# Get isolates to process
with get_db_cursor() as cursor:
# Get isolates that are still validated (haven't been touched since job was queued)
cursor.execute("""
SELECT * FROM isolates
WHERE id = ANY(%s::uuid[]) AND status = 'validated'
ORDER BY id
FOR UPDATE SKIP LOCKED
""", (isolate_ids,))
validated_isolates = cursor.fetchall()
print(f"Found {len(validated_isolates)} isolates still validated and ready for sequence checking")
logger.info(f"Found {len(validated_isolates)} isolates still validated and ready for sequence checking")
# Process each isolate with progress logging
total_isolates = len(validated_isolates)
isolates_to_update = []
for i, isolate in enumerate(validated_isolates, 1):
print(f"Processing isolate {i}/{total_isolates}: {isolate['id']}")
logger.info(f"Processing isolate {i}/{total_isolates}: {isolate['id']}")
try:
# Add timeout wrapper for the async operation
success, result = await asyncio.wait_for(
check_for_sequence_data(isolate, split_on_fasta_headers=split_on_fasta_headers),
timeout=120 # 2 minutes timeout per isolate
)
except asyncio.TimeoutError:
print(f"Timeout processing isolate {isolate['id']} - marking as error")
logger.error(f"Timeout processing isolate {isolate['id']} - marking as error")
success = False
result = "Processing timeout - sequence check took too long"
except Exception as e:
print(f"Exception processing isolate {isolate['id']}: {str(e)}")
logger.error(f"Exception processing isolate {isolate['id']}: {str(e)}")
success = False
result = f"Processing error: {str(e)}"
with get_db_cursor() as cursor:
if success:
# Success - update with object_id, keep status as 'validated'
cursor.execute("""
UPDATE isolates
SET object_id = %s, status = 'validated', updated_at = NOW()
WHERE id = %s AND status = 'validated'
""", (result, isolate['id']))
print(f"Sequence saved for isolate {isolate['id']}: {result}")
logger.info(f"Sequence saved for isolate {isolate['id']}: {result}")
else:
# Error - set seq_error and change status to sequence_error
seq_error_data = {
"row": isolate["tsv_row"],
"seq_error": result
}
cursor.execute("""
UPDATE isolates
SET seq_error = %s, status = 'error', updated_at = NOW()
WHERE id = %s AND status = 'validated'
""", (json.dumps(seq_error_data), isolate['id']))
print(f"Sequence error for isolate {isolate['id']}: {result}")
logger.error(f"Sequence error for isolate {isolate['id']}: {result}")
# Get updated isolate data for bulk ES update
cursor.execute("""
SELECT i.*, s.project_id, p.pathogen_id, p.privacy as visibility
FROM isolates i
LEFT JOIN submissions s ON i.submission_id = s.id
LEFT JOIN projects p ON s.project_id = p.id
WHERE i.id = %s
""", (isolate['id'],))
updated_isolate = cursor.fetchone()
if updated_isolate:
isolates_to_update.append(updated_isolate)
# Bulk update all isolates in Elasticsearch after processing
if isolates_to_update:
print(f"Bulk updating {len(isolates_to_update)} isolates in Elasticsearch...")
logger.info(f"Bulk updating {len(isolates_to_update)} isolates in Elasticsearch...")
elastic_success = bulk_send_to_elastic(isolates_to_update)
if elastic_success:
print(f"Bulk update successful.")
logger.info(f"Bulk update successful.")
else:
print(f"Bulk update failed.")
logger.error(f"Bulk update failed.")
# After processing all isolates, check final status and update submission
with get_db_cursor() as cursor:
cursor.execute("""
SELECT COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'error') as validation_errors,
COUNT(*) FILTER (WHERE status = 'validated') as validated_isolates
FROM isolates
WHERE submission_id = %s
""", (submission_id,))
counts = cursor.fetchone()
# Only set to 'validated' if ALL isolates are validated (all-or-nothing approach)
if counts['validated_isolates'] == counts['total'] and counts['total'] > 0:
final_status = 'validated'
else:
final_status = 'error'
cursor.execute("""
UPDATE submissions
SET status = %s, updated_at = NOW()
WHERE id = %s
""", (final_status, submission_id))
print(f"Submission {submission_id} final status: {final_status}")
logger.info(f"Submission {submission_id} final status: {final_status}")
print(f"Total isolates: {counts['total']}, Validation errors: {counts['validation_errors']}, Validated: {counts['validated_isolates']}")
logger.info(f"Total isolates: {counts['total']}, Validation errors: {counts['validation_errors']}, Validated: {counts['validated_isolates']}")
async def run_async_job(job):
"""Handle asynchronous jobs"""
if job['job_type'] == 'validate_sequences':
await process_sequence_validation(job)
else:
logger.exception(f"Unknown async job type: {job['job_type']}")
return f"Unknown async job type: {job['job_type']}"
# Main worker loop
while True:
try:
# Get next job
job = get_next_job()
if job:
print(f"Got job {job['id']} ({job['job_type']}): {job['payload']}")
try:
# Handle different job types with overall timeout
if job['job_type'] == 'validate_sequences':
# Async job - need event loop with timeout
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Add overall job timeout (10 minutes)
task = loop.create_task(run_async_job(job))
loop.run_until_complete(asyncio.wait_for(task, timeout=600))
except asyncio.TimeoutError:
print(f"Job {job['id']} timed out after 10 minutes")
logger.exception(f"Job {job['id']} timed out after 10 minutes")
raise Exception("Job timed out after 10 minutes")
finally:
loop.close()
# Mark as done
mark_job_done(job['id'])
print(f"Job {job['id']} completed successfully!")
except Exception as e:
print(f"Job {job['id']} failed: {e}")
logger.exception(f"Job {job['id']} failed: {e}")
failure_info = mark_job_failed(job['id'], str(e))
# If job failed permanently, update submission status to 'error'
if failure_info['permanently_failed']:
try:
# Extract submission_id from job payload
payload = json.loads(failure_info['payload']) if isinstance(failure_info['payload'], str) else failure_info['payload']
job_data = payload.get('data', {})
submission_id = job_data.get('submission_id')
if submission_id:
print(f"Job permanently failed. Updating submission {submission_id} to 'error' status")
with get_db_cursor() as cursor:
cursor.execute("""
UPDATE submissions
SET status = 'error',
error = %s,
updated_at = NOW()
WHERE id = %s AND status = 'validating'
""", (json.dumps({"job_error": failure_info['error_msg']}), submission_id))
print(f"Submission {submission_id} marked as 'error' due to permanent job failure")
logger.error(f"Submission {submission_id} marked as 'error' due to permanent job failure")
else:
print(f"Warning: Could not extract submission_id from permanently failed job {job['id']}")
logger.warning(f"Warning: Could not extract submission_id from permanently failed job {job['id']}")
except Exception as update_error:
print(f"Error updating submission status after job failure: {update_error}")
logger.error(f"Error updating submission status after job failure: {update_error}")
else:
print("Waiting for splitting jobs...")
time.sleep(1)
except KeyboardInterrupt:
print("Worker interrupted")
logger.info("Worker interrupted")
break
except Exception as e:
print(f"Worker error: {e}")
logger.error(f"Worker error: {e}")
time.sleep(5)