55 ConversationHandler , CallbackContext , ContextTypes )
66
77from logger_config import logger
8- from constants import TIMEOUT_IN_SEC , STATION_SELECT_ONE_TIME , STATION_SELECT_SUBSCRIBE , ONE_TIME , SUBSCRIBE , UNSUBSCRIBE , VALID_SUMMARY_INTERVALS , JOBQUEUE_DELAY , DEFAULT_USER_ID
8+
9+ from constants import (TIMEOUT_IN_SEC , STATION_SELECT_ONE_TIME ,
10+ STATION_SELECT_SUBSCRIBE , ONE_TIME , SUBSCRIBE ,
11+ UNSUBSCRIBE , VALID_SUMMARY_INTERVALS ,
12+ BOT_JOBQUEUE_DELAY , BOT_DEFAULT_USER_ID ,
13+ BOT_MAX_RESCHEDULE_TIME )
914
1015
1116class PlotBot :
@@ -107,28 +112,27 @@ def __init__(self,
107112 self .app .add_handler (one_time_forecast_handler )
108113 self .app .add_error_handler (self ._error )
109114
115+ # schedule jobs
110116 self .app .job_queue .run_once (
111117 self ._override_basetime ,
112118 when = 0 ,
113- name = 'Override basetime' ,
119+ name = 'override basetime' ,
114120 )
115121 self .app .job_queue .run_repeating (
116122 self ._update_basetime ,
123+ first = 120 ,
117124 interval = 60 ,
118- first = 60 ,
119- name = 'Update basetime' ,
125+ name = 'update basetime' ,
120126 )
121127 self .app .job_queue .run_repeating (
122128 self ._cache_plots ,
123129 interval = 30 ,
124- first = 30 ,
125- name = 'Cache plots' ,
130+ name = 'cache plots' ,
126131 )
127132 self .app .job_queue .run_repeating (
128- self ._broadcast_from_queue ,
129- interval = 90 ,
130- first = 60 ,
131- name = 'Broadcast' ,
133+ self ._broadcast ,
134+ interval = 30 ,
135+ name = 'broadcast' ,
132136 )
133137
134138 async def _override_basetime (self , context : CallbackContext ):
@@ -138,11 +142,20 @@ async def _update_basetime(self, context: CallbackContext):
138142 self ._ecmwf .upgrade_basetime_global ()
139143 self ._ecmwf .upgrade_basetime_stations ()
140144
141- async def _send_plot_from_queue (self , context : CallbackContext ):
145+ async def _process_request (self , context : CallbackContext ):
142146 job = context .job
143147 user_id , station_name = job .data
144- plots = self ._ecmwf .download_plots ([station_name ])
145- await self ._send_plot_to_user (plots , station_name , user_id )
148+
149+ plots = self ._ecmwf .download_plots ([station_name
150+ ]).get (station_name , None )
151+
152+ # plots are available
153+ if plots and len (plots ) > 0 :
154+ await self ._send_plots_to_user (plots , station_name , user_id )
155+ job .schedule_removal ()
156+ else :
157+ logger .info (
158+ f"Plots not available for { station_name } , rescheduling job." )
146159
147160 def start (self ):
148161 logger .info ('Starting bot' )
@@ -153,7 +166,7 @@ async def _error(self, update: Update, context: CallbackContext):
153166 if update :
154167 user_id = update .message .chat_id
155168 else :
156- user_id = DEFAULT_USER_ID
169+ user_id = BOT_DEFAULT_USER_ID
157170 logger .error (f"Exception while handling an update: { context .error } " )
158171 self ._db .log_activity (
159172 activity_type = "bot-error" ,
@@ -338,10 +351,9 @@ async def _subscribe_for_station(self, update: Update,
338351 )
339352 self ._db .add_subscription (msg_text , user .id )
340353
354+ self ._schedule_process_request (f"subscription_{ msg_text } _{ user .id } " ,
355+ data = (user .id , msg_text ))
341356 logger .info (f' { user .first_name } subscribed for Station { msg_text } ' )
342- context .job_queue .run_once (self ._send_plot_from_queue ,
343- JOBQUEUE_DELAY ,
344- data = (user .id , msg_text ))
345357
346358 self ._db .log_activity (
347359 activity_type = "subscription" ,
@@ -351,6 +363,15 @@ async def _subscribe_for_station(self, update: Update,
351363
352364 return ConversationHandler .END
353365
366+ def _schedule_process_request (self , job_name , data ):
367+ self .app .job_queue .run_repeating (self ._process_request ,
368+ first = BOT_JOBQUEUE_DELAY ,
369+ interval = 60 ,
370+ last = BOT_MAX_RESCHEDULE_TIME ,
371+ name = job_name ,
372+ data = data )
373+ logger .debug (f"Scheduled job { job_name } with data { data } " )
374+
354375 async def _request_one_time_forecast_for_station (
355376 self , update : Update , context : CallbackContext ) -> int :
356377 user = update .message .from_user
@@ -361,9 +382,9 @@ async def _request_one_time_forecast_for_station(
361382 reply_markup = ReplyKeyboardRemove (),
362383 )
363384
364- context . job_queue . run_once ( self ._send_plot_from_queue ,
365- JOBQUEUE_DELAY ,
366- data = (user .id , msg_text ))
385+ self ._schedule_process_request (
386+ f"one_time_forecast_ { msg_text } _ { user . id } " ,
387+ data = (user .id , msg_text ))
367388 logger .info (
368389 f' { user .first_name } requested forecast for Station { msg_text } ' )
369390
@@ -387,22 +408,28 @@ async def _cancel(self, update: Update, context: CallbackContext) -> int:
387408 async def _cache_plots (self , context : CallbackContext ):
388409 self ._ecmwf .cache_plots ()
389410
390- async def _send_plot_to_user (self , plots , station_name , user_id ):
391- logger .debug (f'Send plot to user: { user_id } ' )
411+ async def _send_plots_to_user (self , plots , station_name , user_id ):
412+ logger .debug (f'Send plots of { station_name } to user: { user_id } ' )
413+
392414 try :
393415 await self .app .bot .send_message (chat_id = user_id , text = station_name )
394- for plot in plots [station_name ]:
416+ for plot in plots :
417+ logger .debug (f'Plot: { plot } ' )
395418 await self .app .bot .send_photo (chat_id = user_id ,
396419 photo = open (plot , 'rb' ))
397420 except Exception as e :
398- logger .error (f'Error sending plot to user { user_id } : { e } ' )
421+ logger .error (f'Error sending plots to user { user_id } : { e } ' )
399422
400- async def _broadcast_from_queue (self , context : CallbackContext ):
401- plots = self ._ecmwf .download_latest_plots (
423+ async def _broadcast (self , context : CallbackContext ):
424+ latest_plots = self ._ecmwf .download_latest_plots (
402425 self ._db .stations_with_subscribers ())
403- if plots :
404- for station_name in plots :
405- for user_id in self ._db .get_subscriptions_by_station (
406- station_name ):
407- await self ._send_plot_to_user (plots , station_name , user_id )
408- logger .info ('plots sent to all users' )
426+ if latest_plots :
427+ for station_name , plots in latest_plots .items ():
428+ if len (plots ) == 0 :
429+ continue
430+ else :
431+ for user_id in self ._db .get_subscriptions_by_station (
432+ station_name ):
433+ await self ._send_plots_to_user (plots , station_name ,
434+ user_id )
435+ logger .info (f'Broadcasted { station_name } ' )
0 commit comments