Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions admin_taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/google/uuid"
"github.com/hyp3rd/cron/v4"
"github.com/hyp3rd/ewrap"
"github.com/robfig/cron/v3"
)

func (tm *TaskManager) adminBackend() (AdminBackend, error) {
Expand Down Expand Up @@ -124,7 +124,11 @@ func (tm *TaskManager) AdminSchedules(ctx context.Context) ([]AdminSchedule, err

results := make([]AdminSchedule, 0, len(tm.cronEntries))
for _, entry := range tm.cron.Entries() {
name := nameByID[entry.ID]
name := entry.Name
if name == "" {
name = nameByID[entry.ID]
}

if name == "" {
continue
}
Expand Down Expand Up @@ -474,7 +478,7 @@ func (tm *TaskManager) AdminCreateSchedule(ctx context.Context, spec AdminSchedu
tm.cron.Remove(existing)
}

entryID := tm.cron.Schedule(schedule, cron.FuncJob(tm.cronJob(name)))
entryID := tm.scheduleCronEntry(name, schedule)
tm.cronEntries[name] = entryID
tm.cronSpecs[name] = cronSpec{Spec: specValue, Durable: factory.Durable}

Expand Down
58 changes: 34 additions & 24 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package worker

import (
"context"
"errors"
"log/slog"
"strings"
"time"

"github.com/google/uuid"
"github.com/hyp3rd/cron/v4"
"github.com/hyp3rd/ewrap"
"github.com/robfig/cron/v3"
)

const errParseCronSchedule = "parse cron schedule"
Expand Down Expand Up @@ -66,7 +67,7 @@ func (tm *TaskManager) RegisterCronTask(
Origin: cronFactoryOriginUser,
}

entryID := tm.cron.Schedule(schedule, cron.FuncJob(tm.cronJob(normalized)))
entryID := tm.scheduleCronEntry(normalized, schedule)

tm.cronEntries[normalized] = entryID
tm.cronSpecs[normalized] = cronSpec{Spec: strings.TrimSpace(spec), Durable: false}
Expand Down Expand Up @@ -103,32 +104,34 @@ func (tm *TaskManager) RegisterDurableCronTask(
Origin: cronFactoryOriginUser,
}

entryID := tm.cron.Schedule(schedule, cron.FuncJob(tm.cronJob(normalized)))
entryID := tm.scheduleCronEntry(normalized, schedule)

tm.cronEntries[normalized] = entryID
tm.cronSpecs[normalized] = cronSpec{Spec: strings.TrimSpace(spec), Durable: true}

return nil
}

func (tm *TaskManager) cronJob(name string) func() {
return func() {
if tm.skipCronTick() {
return
func (tm *TaskManager) cronJob(name string) func(context.Context) error {
return func(ctx context.Context) error {
if tm.skipCronTick(ctx) {
return nil
}

spec, factory, ok := tm.cronSpecAndFactory(name)
if !ok {
return
return nil
}

if factory.Durable {
tm.runDurableCron(name, spec, factory)
tm.runDurableCron(ctx, name, spec, factory)

return
return nil
}

tm.runInMemoryCron(name, spec, factory)
tm.runInMemoryCron(ctx, name, spec, factory)

return nil
}
}

Expand All @@ -145,8 +148,8 @@ func (tm *TaskManager) cronSpecAndFactory(name string) (cronSpec, cronFactory, b
return spec, factory, true
}

func (tm *TaskManager) runDurableCron(name string, spec cronSpec, factory cronFactory) {
task, err := factory.DurableFactory(tm.ctx)
func (tm *TaskManager) runDurableCron(ctx context.Context, name string, spec cronSpec, factory cronFactory) {
task, err := factory.DurableFactory(ctx)
if err != nil {
cronLogError("cron durable task factory", name, err)

Expand Down Expand Up @@ -187,15 +190,15 @@ func (tm *TaskManager) runDurableCron(name string, spec cronSpec, factory cronFa

tm.noteCronRun(runInfo)

err = tm.RegisterDurableTask(tm.ctx, task)
err = tm.RegisterDurableTask(ctx, task)
if err != nil {
tm.dropCronRun(task.ID)
cronLogError("cron register durable task", name, err)
}
}

func (tm *TaskManager) runInMemoryCron(name string, spec cronSpec, factory cronFactory) {
task, err := factory.TaskFactory(tm.ctx)
func (tm *TaskManager) runInMemoryCron(ctx context.Context, name string, spec cronSpec, factory cronFactory) {
task, err := factory.TaskFactory(ctx)
if err != nil {
cronLogError("cron task factory", name, err)

Expand All @@ -215,7 +218,7 @@ func (tm *TaskManager) runInMemoryCron(name string, spec cronSpec, factory cronF
runInfo := cronRunInfoFromTask(name, spec, task, tm.defaultQueue)
tm.noteCronRun(runInfo)

err = tm.RegisterTask(tm.ctx, task)
err = tm.RegisterTask(ctx, task)
if err != nil {
tm.dropCronRun(task.ID)
cronLogError("cron register task", name, err)
Expand Down Expand Up @@ -258,8 +261,12 @@ func (tm *TaskManager) prepareCronRegistration(
return name, schedule, nil
}

func (tm *TaskManager) skipCronTick() bool {
return tm.ctx.Err() != nil || !tm.accepting.Load()
func (tm *TaskManager) scheduleCronEntry(name string, schedule cron.Schedule) cron.EntryID {
return tm.cron.ScheduleNamed(name, schedule, cron.FuncJob(tm.cronJob(name)))
}

func (tm *TaskManager) skipCronTick(ctx context.Context) bool {
return ctx.Err() != nil || tm.ctx.Err() != nil || !tm.accepting.Load()
}

// UnregisterCronTask removes a cron job by name.
Expand Down Expand Up @@ -295,7 +302,7 @@ func (tm *TaskManager) initCron() {
tm.cronLoc = location
}

parser := cron.NewParser(
parser := cron.NewSpecParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
tm.cron = cron.New(cron.WithLocation(location), cron.WithParser(parser))
Expand All @@ -306,7 +313,7 @@ func (tm *TaskManager) startCron() {
defer tm.cronMu.Unlock()

if tm.cron != nil {
tm.cron.Start()
tm.cron.Start(tm.ctx)
}
}

Expand All @@ -315,7 +322,10 @@ func (tm *TaskManager) stopCron() {
defer tm.cronMu.Unlock()

if tm.cron != nil {
tm.cron.Stop()
err := tm.cron.Stop(tm.ctx)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
cronLogError("cron stop", "scheduler", err)
}
}
}

Expand Down Expand Up @@ -358,13 +368,13 @@ func parseCronSpec(spec string, location *time.Location) (cron.Schedule, error)
}

func cronParserStandard(_ *time.Location) cron.Parser {
return cron.NewParser(
return cron.NewSpecParser(
cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
}

func cronParserSeconds(_ *time.Location) cron.Parser {
return cron.NewParser(
return cron.NewSpecParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ go 1.26.2
require (
github.com/goccy/go-json v0.10.6
github.com/google/uuid v1.6.0
github.com/hyp3rd/cron/v4 v4.0.0
github.com/hyp3rd/ewrap v1.3.9
github.com/hyp3rd/sectools v1.2.4
github.com/redis/rueidis v1.0.73
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cobra v1.10.2
go.opentelemetry.io/otel/metric v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hyp3rd/cron/v4 v4.0.0 h1:S5cLICFWyxnsZCn/96KGd65I1poqg3lUn7hpRCx9ZLA=
github.com/hyp3rd/cron/v4 v4.0.0/go.mod h1:4TQSY8fjNp05ajGxS2jFNdsHxBLAFk91snCRjW7z1v8=
github.com/hyp3rd/ewrap v1.3.9 h1:4vtnxji/aJdnyR2dfl93R/uYcGrNdi93EbV/r5BYalk=
github.com/hyp3rd/ewrap v1.3.9/go.mod h1:2AgfjKPZjfBxvlTrbdWrNZzxV3jqmcOHg38aKyXvxpQ=
github.com/hyp3rd/hyperlogger v0.1.7 h1:v2ffVH/I/jqGTuPsvxoUjVk4Hn1vfhdDParw/ua+VQQ=
Expand All @@ -34,8 +36,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/rueidis v1.0.73 h1:0Enrg0VuMdaYyNDDj0lLIheWY0uybCeQOh+jTp2GG3M=
github.com/redis/rueidis v1.0.73/go.mod h1:lfdcZzJ1oKGKL37vh9fO3ymwt+0TdjkkUCJxbgpmcgQ=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
2 changes: 1 addition & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"

"github.com/google/uuid"
"github.com/hyp3rd/cron/v4"
"github.com/hyp3rd/ewrap"
"github.com/robfig/cron/v3"
"golang.org/x/time/rate"
)

Expand Down
Loading