Skip to content

Commit 9616e48

Browse files
committed
Add scheduler package
1 parent 256851f commit 9616e48

13 files changed

Lines changed: 1466 additions & 0 deletions

File tree

docs/features.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
|---------|-------------|
5656
| `mailer` | Email delivery (SMTP, SendGrid, AWS SES) |
5757
| `pubsub` | Publish/subscribe messaging |
58+
| `scheduler` | Job scheduling with pluggable storage, concurrent workers, dynamic settings |
5859
| `telemetry` | Request counting, crash collection, settings schemas |
5960
| `testhelper` | Testing utilities |
6061
| `fake` | Test doubles (mailer, telemetry) |

scheduler/README.md

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# scheduler
2+
3+
Job scheduler with pluggable storage backends.
4+
5+
## Features
6+
7+
- **Pluggable storage**: Bring your own `JobStore` implementation (Postgres included)
8+
- **Concurrent workers**: Configurable worker pool for parallel job execution
9+
- **Dynamic configuration**: Override settings at runtime via `SettingsProvider`
10+
- **Testable**: Fake clock, store, and logger for deterministic tests
11+
- **Schedule types**: Daily, Weekly, and Interval schedules with timezone support
12+
13+
## Usage
14+
15+
```go
16+
package main
17+
18+
import (
19+
"context"
20+
"log"
21+
"time"
22+
23+
"github.com/hatmaxkit/hatmax/scheduler"
24+
"github.com/hatmaxkit/hatmax/scheduler/postgres"
25+
)
26+
27+
func main() {
28+
db := connectDB()
29+
store := postgres.NewStore(db)
30+
31+
cfg := scheduler.Config{
32+
Enabled: true,
33+
Interval: 30 * time.Second,
34+
BatchSize: 20,
35+
Workers: 4,
36+
}
37+
38+
sched := scheduler.New(store, cfg, logger)
39+
40+
sched.Register("send-email", func(ctx context.Context, job scheduler.Job) scheduler.Result {
41+
// Process job
42+
return scheduler.Result{Output: map[string]any{"sent": true}}
43+
})
44+
45+
sched.Start(ctx)
46+
defer sched.Stop(ctx)
47+
}
48+
```
49+
50+
## Configuration
51+
52+
### Static (Config struct)
53+
54+
| Field | Default | Description |
55+
|-------|---------|-------------|
56+
| Enabled | false | Enable/disable scheduler |
57+
| Interval | 1m | Polling interval |
58+
| BatchSize | 20 | Max jobs per tick |
59+
| Workers | 1 | Concurrent workers |
60+
| RetryAttempts | 3 | Max retry attempts |
61+
| RetryBackoff | 1m | Base backoff duration |
62+
63+
### Dynamic (SettingsProvider)
64+
65+
Implement `SettingsProvider` to override at runtime:
66+
67+
```go
68+
sched.SetSettings(settingsService)
69+
```
70+
71+
Settings keys:
72+
- `scheduler.enabled` - Override Enabled
73+
- `scheduler.interval_seconds` - Override Interval
74+
- `scheduler.paused` - Pause without stopping
75+
76+
## Schedule Types
77+
78+
```go
79+
// Daily at 9:00 AM UTC
80+
daily := scheduler.Daily{Hour: 9, Minute: 0}
81+
82+
// Weekly on Friday at 5:00 PM in New York
83+
loc, _ := time.LoadLocation("America/New_York")
84+
weekly := scheduler.Weekly{Day: time.Friday, Hour: 17, Minute: 0, TZ: loc}
85+
86+
// Every 30 minutes
87+
interval := scheduler.Interval{Every: 30 * time.Minute}
88+
89+
// Get next run time
90+
next := daily.Next(time.Now())
91+
```
92+
93+
## Testing
94+
95+
Use fakes for deterministic tests:
96+
97+
```go
98+
store := scheduler.NewFakeStore()
99+
clock := scheduler.NewFakeClock(baseTime)
100+
log := &scheduler.FakeLogger{}
101+
102+
sched := scheduler.New(store, cfg, log)
103+
sched.SetClock(clock)
104+
105+
store.AddJob(scheduler.Job{
106+
ID: "test-job",
107+
TaskType: "email",
108+
ScheduledFor: clock.Now(),
109+
})
110+
111+
sched.Tick(ctx)
112+
clock.Advance(time.Hour)
113+
```
114+
115+
## Postgres Backend
116+
117+
```go
118+
import "github.com/hatmaxkit/hatmax/scheduler/postgres"
119+
120+
store := postgres.NewStore(db)
121+
122+
// Apply schema (or use migrations)
123+
db.Exec(postgres.Schema)
124+
```
125+
126+
Tables:
127+
- `scheduled_jobs` - Job definitions
128+
- `job_runs` - Execution history

scheduler/config.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package scheduler
2+
3+
import "time"
4+
5+
type Config struct {
6+
Enabled bool
7+
Interval time.Duration
8+
BatchSize int
9+
Workers int
10+
RetryAttempts int
11+
RetryBackoff time.Duration
12+
}
13+
14+
func (c Config) WithDefaults() Config {
15+
if c.Interval <= 0 {
16+
c.Interval = time.Minute
17+
}
18+
if c.BatchSize <= 0 {
19+
c.BatchSize = 20
20+
}
21+
if c.Workers <= 0 {
22+
c.Workers = 1
23+
}
24+
if c.RetryAttempts <= 0 {
25+
c.RetryAttempts = 3
26+
}
27+
if c.RetryBackoff <= 0 {
28+
c.RetryBackoff = time.Minute
29+
}
30+
return c
31+
}
32+
33+
const (
34+
SettingEnabled = "scheduler.enabled"
35+
SettingInterval = "scheduler.interval_seconds"
36+
SettingPaused = "scheduler.paused"
37+
)

scheduler/fake.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
)
8+
9+
type FakeClock struct {
10+
mu sync.Mutex
11+
now time.Time
12+
}
13+
14+
func NewFakeClock(t time.Time) *FakeClock {
15+
return &FakeClock{now: t}
16+
}
17+
18+
func (c *FakeClock) Now() time.Time {
19+
c.mu.Lock()
20+
defer c.mu.Unlock()
21+
return c.now
22+
}
23+
24+
func (c *FakeClock) Set(t time.Time) {
25+
c.mu.Lock()
26+
defer c.mu.Unlock()
27+
c.now = t
28+
}
29+
30+
func (c *FakeClock) Advance(d time.Duration) {
31+
c.mu.Lock()
32+
defer c.mu.Unlock()
33+
c.now = c.now.Add(d)
34+
}
35+
36+
type FakeStore struct {
37+
mu sync.Mutex
38+
Jobs []Job
39+
Runs map[string]*FakeRun
40+
}
41+
42+
type FakeRun struct {
43+
ID string
44+
JobID string
45+
ScheduledFor time.Time
46+
StartedAt time.Time
47+
FinishedAt time.Time
48+
Status string
49+
Output []byte
50+
Error string
51+
}
52+
53+
func NewFakeStore() *FakeStore {
54+
return &FakeStore{
55+
Runs: make(map[string]*FakeRun),
56+
}
57+
}
58+
59+
func (s *FakeStore) AddJob(job Job) {
60+
s.mu.Lock()
61+
defer s.mu.Unlock()
62+
s.Jobs = append(s.Jobs, job)
63+
}
64+
65+
func (s *FakeStore) ListDue(ctx context.Context, now time.Time, limit int) ([]Job, error) {
66+
s.mu.Lock()
67+
defer s.mu.Unlock()
68+
69+
var due []Job
70+
for _, j := range s.Jobs {
71+
if !j.ScheduledFor.After(now) {
72+
due = append(due, j)
73+
if len(due) >= limit {
74+
break
75+
}
76+
}
77+
}
78+
return due, nil
79+
}
80+
81+
func (s *FakeStore) CreateRun(ctx context.Context, jobID, runID string, scheduledFor time.Time) error {
82+
s.mu.Lock()
83+
defer s.mu.Unlock()
84+
s.Runs[runID] = &FakeRun{
85+
ID: runID,
86+
JobID: jobID,
87+
ScheduledFor: scheduledFor,
88+
Status: "pending",
89+
}
90+
return nil
91+
}
92+
93+
func (s *FakeStore) MarkRunning(ctx context.Context, runID string, startedAt time.Time) error {
94+
s.mu.Lock()
95+
defer s.mu.Unlock()
96+
if run, ok := s.Runs[runID]; ok {
97+
run.Status = "running"
98+
run.StartedAt = startedAt
99+
}
100+
return nil
101+
}
102+
103+
func (s *FakeStore) MarkSuccess(ctx context.Context, runID string, finishedAt time.Time, output []byte) error {
104+
s.mu.Lock()
105+
defer s.mu.Unlock()
106+
if run, ok := s.Runs[runID]; ok {
107+
run.Status = "success"
108+
run.FinishedAt = finishedAt
109+
run.Output = output
110+
}
111+
return nil
112+
}
113+
114+
func (s *FakeStore) MarkFailed(ctx context.Context, runID string, finishedAt time.Time, errMsg string) error {
115+
s.mu.Lock()
116+
defer s.mu.Unlock()
117+
if run, ok := s.Runs[runID]; ok {
118+
run.Status = "failed"
119+
run.FinishedAt = finishedAt
120+
run.Error = errMsg
121+
}
122+
return nil
123+
}
124+
125+
func (s *FakeStore) UpdateNextRun(ctx context.Context, jobID string, lastRun, nextRun time.Time) error {
126+
s.mu.Lock()
127+
defer s.mu.Unlock()
128+
for i, j := range s.Jobs {
129+
if j.ID == jobID {
130+
s.Jobs[i].ScheduledFor = nextRun
131+
break
132+
}
133+
}
134+
return nil
135+
}
136+
137+
func (s *FakeStore) GetRun(runID string) *FakeRun {
138+
s.mu.Lock()
139+
defer s.mu.Unlock()
140+
return s.Runs[runID]
141+
}
142+
143+
func (s *FakeStore) AllRuns() []*FakeRun {
144+
s.mu.Lock()
145+
defer s.mu.Unlock()
146+
runs := make([]*FakeRun, 0, len(s.Runs))
147+
for _, r := range s.Runs {
148+
runs = append(runs, r)
149+
}
150+
return runs
151+
}
152+
153+
type FakeLogger struct {
154+
mu sync.Mutex
155+
Messages []string
156+
}
157+
158+
func (l *FakeLogger) Info(v ...any) {}
159+
func (l *FakeLogger) Infof(format string, a ...any) {}
160+
func (l *FakeLogger) Error(v ...any) {}
161+
func (l *FakeLogger) Errorf(format string, a ...any) {}

0 commit comments

Comments
 (0)