-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathrun_arq_worker.py
More file actions
168 lines (134 loc) · 5.41 KB
/
run_arq_worker.py
File metadata and controls
168 lines (134 loc) · 5.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python3
"""
ARQ Worker Script
This script runs ARQ workers to process firmware dump jobs.
Uses ARQ's built-in worker management for improved performance and reliability.
Usage:
python run_arq_worker.py [worker_name]
Examples:
python run_arq_worker.py
python run_arq_worker.py worker_01
"""
import asyncio
import sys
import signal
from typing import Optional
import arq
from rich.console import Console
from telegram import Bot
from dumpyarabot.arq_config import (
WorkerSettings,
after_job_end,
on_job_start,
on_startup,
shutdown_arq,
)
from dumpyarabot.config import settings as _settings
from dumpyarabot.message_queue import message_queue
console = Console()
class ARQWorkerManager:
"""Manages ARQ worker lifecycle with graceful shutdown."""
def __init__(self, worker_name: Optional[str] = None):
self.worker_name = worker_name or "arq_worker"
self.worker: Optional[arq.Worker] = None
self.bot: Optional[Bot] = None
self.shutdown_event = asyncio.Event()
async def start_worker(self):
"""Start the ARQ worker."""
console.print(f"[green]Starting ARQ worker: {self.worker_name}[/green]")
try:
# Create worker with our settings using the correct API
self.worker = arq.Worker(
functions=WorkerSettings.get_functions(),
redis_settings=WorkerSettings.redis_settings,
max_jobs=WorkerSettings.max_jobs,
job_timeout=WorkerSettings.job_timeout,
keep_result=WorkerSettings.keep_result,
max_tries=WorkerSettings.max_tries,
health_check_interval=WorkerSettings.health_check_interval,
allow_abort_jobs=WorkerSettings.allow_abort_jobs,
queue_name=WorkerSettings.queue_name,
on_startup=on_startup,
on_job_start=on_job_start,
after_job_end=after_job_end,
)
# Set up signal handlers for graceful shutdown
self._setup_signal_handlers()
bot_kwargs = {"token": _settings.TELEGRAM_BOT_TOKEN}
if _settings.TELEGRAM_API_BASE_URL:
base = _settings.TELEGRAM_API_BASE_URL.rstrip("/")
bot_kwargs["base_url"] = f"{base}/bot"
bot_kwargs["base_file_url"] = f"{base}/file/bot"
self.bot = Bot(**bot_kwargs)
await self.bot.initialize()
message_queue.set_bot(self.bot)
console.print(f"[blue]Worker {self.worker_name} started and waiting for jobs...[/blue]")
worker_task = asyncio.create_task(self.worker.async_run())
shutdown_task = asyncio.create_task(self.shutdown_event.wait())
done, pending = await asyncio.wait(
{worker_task, shutdown_task},
return_when=asyncio.FIRST_COMPLETED,
)
if worker_task in done:
exc = worker_task.exception()
if exc is not None:
raise exc
if shutdown_task in done and worker_task not in done:
console.print(f"[yellow]Shutdown event received, stopping worker {self.worker_name}...[/yellow]")
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
console.print(f"[red]Error starting worker {self.worker_name}: {e}[/red]")
raise
def _setup_signal_handlers(self):
"""Set up signal handlers for graceful shutdown."""
def signal_handler(signum, frame):
console.print(f"[yellow]Received signal {signum}, initiating graceful shutdown...[/yellow]")
self.shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
async def shutdown(self):
"""Gracefully shutdown the worker."""
console.print(f"[yellow]Shutting down worker {self.worker_name}...[/yellow]")
if self.worker:
await self.worker.close()
if self.bot:
try:
await self.bot.shutdown()
except Exception:
pass
await shutdown_arq()
console.print(f"[green]Worker {self.worker_name} shutdown complete[/green]")
async def main():
"""Main entry point for ARQ worker."""
worker_name = sys.argv[1] if len(sys.argv) > 1 else None
manager = ARQWorkerManager(worker_name)
try:
await manager.start_worker()
except KeyboardInterrupt:
console.print("[yellow]Keyboard interrupt received[/yellow]")
except Exception as e:
console.print(f"[red]Worker crashed: {e}[/red]")
sys.exit(1)
finally:
await manager.shutdown()
if __name__ == "__main__":
# Show startup banner
console.print("\n[bold blue] ARQ Firmware Dump Worker[/bold blue]")
console.print("[dim]Processing firmware dumps with ARQ queue system[/dim]\n")
try:
asyncio.run(main())
except KeyboardInterrupt:
console.print("\n[yellow]Worker stopped by user[/yellow]")
except Exception as e:
console.print(f"\n[red]Fatal error: {e}[/red]")
sys.exit(1)