@@ -2,13 +2,14 @@ package worker
22
33import (
44 "context"
5+ "errors"
56 "log/slog"
67 "strings"
78 "time"
89
910 "github.com/google/uuid"
11+ "github.com/hyp3rd/cron/v4"
1012 "github.com/hyp3rd/ewrap"
11- "github.com/robfig/cron/v3"
1213)
1314
1415const errParseCronSchedule = "parse cron schedule"
@@ -66,7 +67,7 @@ func (tm *TaskManager) RegisterCronTask(
6667 Origin : cronFactoryOriginUser ,
6768 }
6869
69- entryID := tm .cron . Schedule ( schedule , cron . FuncJob ( tm . cronJob ( normalized )) )
70+ entryID := tm .scheduleCronEntry ( normalized , schedule )
7071
7172 tm .cronEntries [normalized ] = entryID
7273 tm .cronSpecs [normalized ] = cronSpec {Spec : strings .TrimSpace (spec ), Durable : false }
@@ -103,32 +104,34 @@ func (tm *TaskManager) RegisterDurableCronTask(
103104 Origin : cronFactoryOriginUser ,
104105 }
105106
106- entryID := tm .cron . Schedule ( schedule , cron . FuncJob ( tm . cronJob ( normalized )) )
107+ entryID := tm .scheduleCronEntry ( normalized , schedule )
107108
108109 tm .cronEntries [normalized ] = entryID
109110 tm .cronSpecs [normalized ] = cronSpec {Spec : strings .TrimSpace (spec ), Durable : true }
110111
111112 return nil
112113}
113114
114- func (tm * TaskManager ) cronJob (name string ) func () {
115- return func () {
116- if tm .skipCronTick () {
117- return
115+ func (tm * TaskManager ) cronJob (name string ) func (context. Context ) error {
116+ return func (ctx context. Context ) error {
117+ if tm .skipCronTick (ctx ) {
118+ return nil
118119 }
119120
120121 spec , factory , ok := tm .cronSpecAndFactory (name )
121122 if ! ok {
122- return
123+ return nil
123124 }
124125
125126 if factory .Durable {
126- tm .runDurableCron (name , spec , factory )
127+ tm .runDurableCron (ctx , name , spec , factory )
127128
128- return
129+ return nil
129130 }
130131
131- tm .runInMemoryCron (name , spec , factory )
132+ tm .runInMemoryCron (ctx , name , spec , factory )
133+
134+ return nil
132135 }
133136}
134137
@@ -145,8 +148,8 @@ func (tm *TaskManager) cronSpecAndFactory(name string) (cronSpec, cronFactory, b
145148 return spec , factory , true
146149}
147150
148- func (tm * TaskManager ) runDurableCron (name string , spec cronSpec , factory cronFactory ) {
149- task , err := factory .DurableFactory (tm . ctx )
151+ func (tm * TaskManager ) runDurableCron (ctx context. Context , name string , spec cronSpec , factory cronFactory ) {
152+ task , err := factory .DurableFactory (ctx )
150153 if err != nil {
151154 cronLogError ("cron durable task factory" , name , err )
152155
@@ -187,15 +190,15 @@ func (tm *TaskManager) runDurableCron(name string, spec cronSpec, factory cronFa
187190
188191 tm .noteCronRun (runInfo )
189192
190- err = tm .RegisterDurableTask (tm . ctx , task )
193+ err = tm .RegisterDurableTask (ctx , task )
191194 if err != nil {
192195 tm .dropCronRun (task .ID )
193196 cronLogError ("cron register durable task" , name , err )
194197 }
195198}
196199
197- func (tm * TaskManager ) runInMemoryCron (name string , spec cronSpec , factory cronFactory ) {
198- task , err := factory .TaskFactory (tm . ctx )
200+ func (tm * TaskManager ) runInMemoryCron (ctx context. Context , name string , spec cronSpec , factory cronFactory ) {
201+ task , err := factory .TaskFactory (ctx )
199202 if err != nil {
200203 cronLogError ("cron task factory" , name , err )
201204
@@ -215,7 +218,7 @@ func (tm *TaskManager) runInMemoryCron(name string, spec cronSpec, factory cronF
215218 runInfo := cronRunInfoFromTask (name , spec , task , tm .defaultQueue )
216219 tm .noteCronRun (runInfo )
217220
218- err = tm .RegisterTask (tm . ctx , task )
221+ err = tm .RegisterTask (ctx , task )
219222 if err != nil {
220223 tm .dropCronRun (task .ID )
221224 cronLogError ("cron register task" , name , err )
@@ -258,8 +261,12 @@ func (tm *TaskManager) prepareCronRegistration(
258261 return name , schedule , nil
259262}
260263
261- func (tm * TaskManager ) skipCronTick () bool {
262- return tm .ctx .Err () != nil || ! tm .accepting .Load ()
264+ func (tm * TaskManager ) scheduleCronEntry (name string , schedule cron.Schedule ) cron.EntryID {
265+ return tm .cron .ScheduleNamed (name , schedule , cron .FuncJob (tm .cronJob (name )))
266+ }
267+
268+ func (tm * TaskManager ) skipCronTick (ctx context.Context ) bool {
269+ return ctx .Err () != nil || tm .ctx .Err () != nil || ! tm .accepting .Load ()
263270}
264271
265272// UnregisterCronTask removes a cron job by name.
@@ -295,7 +302,7 @@ func (tm *TaskManager) initCron() {
295302 tm .cronLoc = location
296303 }
297304
298- parser := cron .NewParser (
305+ parser := cron .NewSpecParser (
299306 cron .Second | cron .Minute | cron .Hour | cron .Dom | cron .Month | cron .Dow | cron .Descriptor ,
300307 )
301308 tm .cron = cron .New (cron .WithLocation (location ), cron .WithParser (parser ))
@@ -306,7 +313,7 @@ func (tm *TaskManager) startCron() {
306313 defer tm .cronMu .Unlock ()
307314
308315 if tm .cron != nil {
309- tm .cron .Start ()
316+ tm .cron .Start (tm . ctx )
310317 }
311318}
312319
@@ -315,7 +322,10 @@ func (tm *TaskManager) stopCron() {
315322 defer tm .cronMu .Unlock ()
316323
317324 if tm .cron != nil {
318- tm .cron .Stop ()
325+ err := tm .cron .Stop (tm .ctx )
326+ if err != nil && ! errors .Is (err , context .Canceled ) && ! errors .Is (err , context .DeadlineExceeded ) {
327+ cronLogError ("cron stop" , "scheduler" , err )
328+ }
319329 }
320330}
321331
@@ -358,13 +368,13 @@ func parseCronSpec(spec string, location *time.Location) (cron.Schedule, error)
358368}
359369
360370func cronParserStandard (_ * time.Location ) cron.Parser {
361- return cron .NewParser (
371+ return cron .NewSpecParser (
362372 cron .Minute | cron .Hour | cron .Dom | cron .Month | cron .Dow | cron .Descriptor ,
363373 )
364374}
365375
366376func cronParserSeconds (_ * time.Location ) cron.Parser {
367- return cron .NewParser (
377+ return cron .NewSpecParser (
368378 cron .Second | cron .Minute | cron .Hour | cron .Dom | cron .Month | cron .Dow | cron .Descriptor ,
369379 )
370380}
0 commit comments