queue gives your services one queue API with Redis, SQL, NATS, SQS, RabbitMQ, and in-process drivers.
go get github.com/goforj/queueimport (
"context"
"fmt"
"github.com/goforj/queue"
)
func main() {
q, _ := queue.NewWorkerpool()
q.Register("emails:send", func(ctx context.Context, j queue.Context) error {
var payload struct {
To string `json:"to"`
}
_ = j.Bind(&payload)
fmt.Println("send to", payload.To)
return nil
})
_ = q.StartWorkers(context.Background())
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
context.Background(),
queue.NewJob("emails:send").
Payload(map[string]any{"to": "user@example.com"}).
OnQueue("default"),
)
}import (
"context"
"github.com/goforj/queue"
)
type EmailPayload struct {
ID int `json:"id"`
}
func main() {
q, _ := queue.NewWorkerpool()
q.Register("reports:generate", func(ctx context.Context, j queue.Context) error {
return nil
})
q.Register("reports:upload", func(ctx context.Context, j queue.Context) error {
var payload EmailPayload
if err := j.Bind(&payload); err != nil {
return err
}
return nil
})
q.Register("users:notify_report_ready", func(ctx context.Context, j queue.Context) error {
return nil
})
_ = q.Workers(2).StartWorkers(context.Background())
defer q.Shutdown(context.Background())
chainID, _ := q.Chain(
// 1) generate report data
queue.NewJob("reports:generate").Payload(map[string]any{"report_id": "rpt_123"}),
// 2) upload report artifact after generate succeeds
queue.NewJob("reports:upload").Payload(EmailPayload{ID: 123}),
// 3) notify user only after upload succeeds
queue.NewJob("users:notify_report_ready").Payload(map[string]any{"user_id": 123}),
).OnQueue("critical").Dispatch(context.Background())
_ = chainID
}// Define a struct for your job payload.
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
}
// Fluent builder pattern for job options.
job := queue.NewJob("emails:send").
// Payload can be bytes, structs, maps, or JSON-marshalable values.
// Default payload is empty.
Payload(EmailPayload{ID: 123, To: "user@example.com"}).
// OnQueue sets the queue name.
// Default is empty; broker-style drivers expect an explicit queue.
OnQueue("default").
// Timeout sets per-job execution timeout.
// Default is unset; some drivers may apply driver/runtime defaults.
Timeout(20 * time.Second).
// Retry sets max retries.
// Default is 0, which means one total attempt.
Retry(3).
// Backoff sets retry delay.
// Default is unset; Redis dispatch returns ErrBackoffUnsupported.
Backoff(500 * time.Millisecond).
// Delay schedules first execution in the future.
// Default is 0 (run immediately).
Delay(2 * time.Second).
// UniqueFor deduplicates Type+Payload for a TTL window.
// Default is 0 (no dedupe).
UniqueFor(45 * time.Second)
// Dispatch the job to the queue.
_ = q.Dispatch(job)
// In handlers, use Bind to decode payload into a struct.
q.Register("emails:send", func(ctx context.Context, job queue.Job) error {
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
return err
}
return nil
})Use queue.WithMiddleware(...) to apply cross-cutting workflow behavior (logging, filtering, error policy) to chains/batches/dispatch orchestration.
audit := queue.MiddlewareFunc(func(ctx context.Context, j queue.Context, next queue.Next) error {
return next(ctx, j)
})
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool},
queue.WithMiddleware(audit),
)
_ = q| Concept | Purpose | Primary API |
|---|---|---|
| Job | Typed work unit for app handlers | queue.NewJob, Dispatch |
| Chain | Ordered workflow (A then B then C) | Chain(...).Dispatch(...) |
| Batch | Parallel workflow with callbacks | Batch(...).Then/Catch/Finally |
| Middleware | Cross-cutting execution policy | Queue middleware (queue.WithMiddleware) |
| Events | Lifecycle hooks and observability | queue runtime events (queue.Observer) + workflow events (advanced plumbing) |
| Backends | Driver/runtime transport | queue.New(...) / queue.NewQueue(...) |
Use queue constructors/config to choose transport/runtime. Queue composes these backends for workflow features.
Use queue.NewQueue(...) only when you need the advanced low-level QueueRuntime API.
| Backend | Constructor |
|---|---|
| In-process sync | queue.NewSync() |
| In-process worker pool | queue.NewWorkerpool() |
| SQL durable queue | queue.NewDatabase(driver, dsn) |
| Redis/Asynq | queue.NewRedis(addr) |
| NATS | queue.NewNATS(url) |
| SQS | queue.NewSQS(region) |
| RabbitMQ | queue.NewRabbitMQ(url) |
| Drop-only (disabled mode) | queue.NewNull() |
Use queue.Observer implementations to capture normalized runtime events across drivers.
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
collector,
queue.ObserverFunc(func(event queue.Event) {
_ = event.Kind
}),
)
q, _ := queue.New(queue.Config{
Driver: queue.DriverWorkerpool,
Observer: observer,
})
_ = qStatsCollectorcounters are process-local and event-driven.- In multi-process deployments, aggregate metrics externally (OTel/Prometheus/etc.).
- Prefer backend-native stats when available.
queue.SupportsNativeStats(q)indicates native driver snapshot support.queue.Snapshot(ctx, q, collector)merges native + collector where possible.
events := make(chan queue.Event, 100)
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
collector,
queue.ChannelObserver{
Events: events,
DropIfFull: true,
},
queue.ObserverFunc(func(e queue.Event) {
_ = e
}),
)
q, _ := queue.New(queue.Config{
Driver: queue.DriverWorkerpool,
Observer: observer,
})
_ = qRunnable example: examples/observeall/main.go
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
runtimeObserver := queue.ObserverFunc(func(event queue.Event) {
attemptInfo := fmt.Sprintf("attempt=%d/%d", event.Attempt, event.MaxRetry+1)
jobInfo := fmt.Sprintf("job=%s key=%s queue=%s driver=%s", event.JobType, event.JobKey, event.Queue, event.Driver)
switch event.Kind {
case queue.EventEnqueueAccepted:
logger.Info("Accepted dispatch", "msg", fmt.Sprintf("Accepted %s", jobInfo), "scheduled", event.Scheduled, "at", event.Time.Format(time.RFC3339Nano))
case queue.EventEnqueueRejected:
logger.Error("Dispatch failed", "msg", fmt.Sprintf("Rejected %s", jobInfo), "error", event.Err)
case queue.EventEnqueueDuplicate:
logger.Warn("Skipped duplicate job", "msg", fmt.Sprintf("Duplicate %s", jobInfo))
case queue.EventEnqueueCanceled:
logger.Warn("Canceled dispatch", "msg", fmt.Sprintf("Canceled %s", jobInfo), "error", event.Err)
case queue.EventProcessStarted:
logger.Info("Started processing job", "msg", fmt.Sprintf("Started %s (%s)", jobInfo, attemptInfo), "at", event.Time.Format(time.RFC3339Nano))
case queue.EventProcessSucceeded:
logger.Info("Processed job", "msg", fmt.Sprintf("Processed %s in %s (%s)", jobInfo, event.Duration, attemptInfo))
case queue.EventProcessFailed:
logger.Error("Processing failed", "msg", fmt.Sprintf("Failed %s after %s (%s)", jobInfo, event.Duration, attemptInfo), "error", event.Err)
case queue.EventProcessRetried:
logger.Warn("Retrying job", "msg", fmt.Sprintf("Retry scheduled for %s (%s)", jobInfo, attemptInfo), "error", event.Err)
case queue.EventProcessArchived:
logger.Error("Archived failed job", "msg", fmt.Sprintf("Archived %s after final failure (%s)", jobInfo, attemptInfo), "error", event.Err)
case queue.EventQueuePaused:
logger.Info("Paused queue", "msg", fmt.Sprintf("Paused queue=%s driver=%s", event.Queue, event.Driver))
case queue.EventQueueResumed:
logger.Info("Resumed queue", "msg", fmt.Sprintf("Resumed queue=%s driver=%s", event.Queue, event.Driver))
default:
logger.Info("Queue event", "msg", fmt.Sprintf("kind=%s %s", event.Kind, jobInfo))
}
})
workflowObserver := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
logger.Info("workflow event",
"kind", event.Kind,
"dispatch_id", event.DispatchID,
"job_id", event.JobID,
"chain_id", event.ChainID,
"batch_id", event.BatchID,
"job_type", event.JobType,
"queue", event.Queue,
"attempt", event.Attempt,
"duration", event.Duration,
"err", event.Err,
)
})
q, _ := queue.New(
queue.Config{
Driver: queue.DriverRedis,
RedisAddr: "127.0.0.1:6379",
Observer: runtimeObserver,
},
queue.WithObserver(workflowObserver),
)
_ = q| Driver | Native Stats | Pause/Resume |
|---|---|---|
| null | - | - |
| sync | - | - |
| workerpool | - | - |
| database | âś“ | - |
| redis | âś“ | âś“ |
| nats | - | - |
| sqs | - | - |
| rabbitmq | - | - |
| Type | EventKind | Meaning |
|---|---|---|
| queue | enqueue_accepted | Job accepted by driver for enqueue. |
| queue | enqueue_rejected | Job enqueue failed. |
| queue | enqueue_duplicate | Duplicate job rejected due to uniqueness key. |
| queue | enqueue_canceled | Context cancellation prevented enqueue. |
| queue | process_started | Worker began processing job. |
| queue | process_succeeded | Handler returned success. |
| queue | process_failed | Handler returned error. |
| queue | process_retried | Driver scheduled retry attempt. |
| queue | process_archived | Job moved to terminal failure state. |
| queue | queue_paused | Queue was paused (driver supports pause). |
| queue | queue_resumed | Queue was resumed. |
| workflow | dispatch_started | Workflow runtime accepted a dispatch request and created a dispatch record. |
| workflow | dispatch_succeeded | Dispatch was successfully enqueued to the underlying queue runtime. |
| workflow | dispatch_failed | Dispatch failed before job execution could start. |
| workflow | job_started | A workflow job handler started execution. |
| workflow | job_succeeded | A workflow job handler completed successfully. |
| workflow | job_failed | A workflow job handler returned an error. |
| workflow | chain_started | A chain workflow was created and started. |
| workflow | chain_advanced | Chain progressed from one node to the next node. |
| workflow | chain_completed | Chain reached terminal success. |
| workflow | chain_failed | Chain reached terminal failure. |
| workflow | batch_started | A batch workflow was created and started. |
| workflow | batch_progressed | Batch state changed as jobs completed/failed. |
| workflow | batch_completed | Batch reached terminal success (or allowed-failure completion). |
| workflow | batch_failed | Batch reached terminal failure. |
| workflow | batch_cancelled | Batch was cancelled before normal completion. |
| workflow | callback_started | Chain/batch callback execution started. |
| workflow | callback_succeeded | Chain/batch callback completed successfully. |
| workflow | callback_failed | Chain/batch callback returned an error. |
Use queue.NewFake() to assert dispatch behavior in application tests.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fake.AssertDispatched(nil, "emails:send")Use the same queue.NewFake() helper when testing queue/job-level dispatch semantics.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fake.AssertDispatched(nil, "emails:send")If you do not need workflow orchestration, use the lower-level queue.QueueRuntime via queue.NewQueue(...).
Most application code should prefer queue.New(...).
q, _ := queue.NewQueue(queue.Config{Driver: queue.DriverWorkerpool})
q.Register("emails:send", func(ctx context.Context, job queue.Job) error {
return nil
})
_ = q.Workers(2).StartWorkers(context.Background())
defer q.Shutdown(context.Background())
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))Matrix status and backend integration notes are tracked in docs/integration-scenarios.md.
The API section below is autogenerated; do not edit between the markers.
New creates the high-level Queue API based on Config.Driver.
q, err := queue.New(queue.Config{Driver: queue.DriverWorkerpool})
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error {
var payload EmailPayload
if err := j.Bind(&payload); err != nil {
return err
}
return nil
})
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
context.Background(),
queue.NewJob("emails:send").
Payload(EmailPayload{ID: 1}).
OnQueue("default"),
)NewDatabase creates a Queue on the SQL backend.
q, err := queue.NewDatabase("sqlite", "file:queue.db?_busy_timeout=5000")
if err != nil {
return
}NewNATS creates a Queue on the NATS backend.
q, err := queue.NewNATS("nats://127.0.0.1:4222")
if err != nil {
return
}NewNull creates a Queue on the null backend.
q, err := queue.NewNull()
if err != nil {
return
}NewQueue creates the low-level queue runtime (driver-facing API) based on Config.Driver. Use this only for driver-focused/advanced runtime access; application code should prefer New.
q, err := queue.NewQueue(queue.Config{
Driver: queue.DriverSync,
DefaultQueue: "critical",
})
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, job queue.Job) error {
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
return err
}
return nil
})
defer q.Shutdown(context.Background())NewRabbitMQ creates a Queue on the RabbitMQ backend.
q, err := queue.NewRabbitMQ("amqp://guest:guest@127.0.0.1:5672/")
if err != nil {
return
}NewRedis creates a Queue on the Redis backend.
q, err := queue.NewRedis("127.0.0.1:6379")
if err != nil {
return
}NewSQS creates a Queue on the SQS backend.
q, err := queue.NewSQS("us-east-1")
if err != nil {
return
}NewStatsCollector creates an event collector for queue counters.
collector := queue.NewStatsCollector()NewSync creates a Queue on the synchronous in-process backend.
q, err := queue.NewSync()
if err != nil {
return
}NewWorkerpool creates a Queue on the in-process workerpool backend.
q, err := queue.NewWorkerpool()
if err != nil {
return
}Backoff sets delay between retries.
job := queue.NewJob("emails:send").Backoff(500 * time.Millisecond)Bind unmarshals job payload JSON into dst.
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
}
job := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "user@example.com",
})
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
return
}Delay defers execution by duration.
job := queue.NewJob("emails:send").Delay(300 * time.Millisecond)NewJob creates a job value with a required job type.
job := queue.NewJob("emails:send")OnQueue sets the target queue name.
job := queue.NewJob("emails:send").OnQueue("critical")Payload sets job payload from common value types.
Example: payload bytes
jobBytes := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))Example: payload struct
type Meta struct {
Nested bool `json:"nested"`
}
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
Meta Meta `json:"meta"`
}
jobStruct := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "user@example.com",
Meta: Meta{Nested: true},
})Example: payload map
jobMap := queue.NewJob("emails:send").Payload(map[string]any{
"id": 1,
"to": "user@example.com",
"meta": map[string]any{"nested": true},
})PayloadBytes returns a copy of job payload bytes.
job := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
payload := job.PayloadBytes()PayloadJSON marshals payload as JSON.
job := queue.NewJob("emails:send").PayloadJSON(map[string]int{"id": 1})Retry sets max retry attempts.
job := queue.NewJob("emails:send").Retry(4)Timeout sets per-job execution timeout.
job := queue.NewJob("emails:send").Timeout(10 * time.Second)UniqueFor enables uniqueness dedupe within the given TTL.
job := queue.NewJob("emails:send").UniqueFor(45 * time.Second)Active returns active count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Active: 2},
},
}
fmt.Println(snapshot.Active("default"))
// Output: 2Archived returns archived count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Archived: 7},
},
}
fmt.Println(snapshot.Archived("default"))
// Output: 7Failed returns failed count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Failed: 2},
},
}
fmt.Println(snapshot.Failed("default"))
// Output: 2MultiObserver fans out events to multiple observers.
events := make(chan queue.Event, 2)
observer := queue.MultiObserver(
queue.ChannelObserver{Events: events},
queue.ObserverFunc(func(queue.Event) {}),
)
observer.Observe(queue.Event{Kind: queue.EventEnqueueAccepted})
fmt.Println(len(events))
// Output: 1Observe forwards an event to the configured channel.
ch := make(chan queue.Event, 1)
observer := queue.ChannelObserver{Events: ch}
observer.Observe(queue.Event{Kind: queue.EventProcessStarted, Queue: "default"})
event := <-chObserve handles a queue runtime event.
var observer queue.Observer
observer.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
})Observe calls the wrapped function.
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
observer := queue.ObserverFunc(func(event queue.Event) {
logger.Info("queue event",
"kind", event.Kind,
"driver", event.Driver,
"queue", event.Queue,
"job_type", event.JobType,
"attempt", event.Attempt,
"max_retry", event.MaxRetry,
"duration", event.Duration,
"err", event.Err,
)
})
observer.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobType: "emails:send",
})Observe records an event and updates normalized counters.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})Pause pauses queue consumption for drivers that support it.
q, _ := queue.NewSync()
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 1Paused returns paused count for a queue.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventQueuePaused,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
fmt.Println(snapshot.Paused("default"))
// Output: 1Pending returns pending count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Pending: 3},
},
}
fmt.Println(snapshot.Pending("default"))
// Output: 3Processed returns processed count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 11},
},
}
fmt.Println(snapshot.Processed("default"))
// Output: 11Queue returns queue counters for a queue name.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, ok := snapshot.Queue("default")
fmt.Println(ok, counters.Pending)
// Output: true 1Queues returns sorted queue names present in the snapshot.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "critical",
Time: time.Now(),
})
snapshot := collector.Snapshot()
names := snapshot.Queues()
fmt.Println(len(names), names[0])
// Output: 1 criticalResume resumes queue consumption for drivers that support it.
q, _ := queue.NewSync()
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 0RetryCount returns retry count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Retry: 1},
},
}
fmt.Println(snapshot.RetryCount("default"))
// Output: 1Scheduled returns scheduled count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Scheduled: 4},
},
}
fmt.Println(snapshot.Scheduled("default"))
// Output: 4Snapshot returns driver-native stats, falling back to collector data.
q, _ := queue.NewSync()
snapshot, _ := q.Stats(context.Background())
_, ok := snapshot.Queue("default")
fmt.Println(ok)
// Output: trueSnapshot returns a copy of collected counters.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessStarted,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Duration: 12 * time.Millisecond,
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, _ := snapshot.Queue("default")
throughput, _ := snapshot.Throughput("default")
fmt.Printf("queues=%v\n", snapshot.Queues())
fmt.Printf("counters=%+v\n", counters)
fmt.Printf("hour=%+v\n", throughput.Hour)
// Output:
// queues=[default]
// counters={Pending:0 Active:0 Scheduled:0 Retry:0 Archived:0 Processed:1 Failed:0 Paused:0 AvgWait:0s AvgRun:12ms}
// hour={Processed:1 Failed:0}SupportsNativeStats reports whether a queue runtime exposes native stats snapshots.
q, _ := queue.NewSync()
fmt.Println(queue.SupportsNativeStats(q))
// Output: trueSupportsPause reports whether a queue runtime supports Pause/Resume.
q, _ := queue.NewSync()
fmt.Println(queue.SupportsPause(q))
// Output: trueThroughput returns rolling throughput windows for a queue name.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
throughput, ok := snapshot.Throughput("default")
fmt.Printf("ok=%v hour=%+v day=%+v week=%+v\n", ok, throughput.Hour, throughput.Day, throughput.Week)
// Output: ok=true hour={Processed:1 Failed:0} day={Processed:1 Failed:0} week={Processed:1 Failed:0}Batch creates a batch builder for fan-out workflow execution.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error { return nil })
_, _ = q.Batch(
queue.NewJob("emails:send").Payload(map[string]any{"id": 1}),
queue.NewJob("emails:send").Payload(map[string]any{"id": 2}),
).Name("send-emails").OnQueue("default").Dispatch(context.Background())Chain creates a chain builder for sequential workflow execution.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, j queue.Context) error { return nil })
q.Register("second", func(ctx context.Context, j queue.Context) error { return nil })
_, _ = q.Chain(
queue.NewJob("first"),
queue.NewJob("second"),
).OnQueue("default").Dispatch(context.Background())Dispatch enqueues a high-level job.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error { return nil })
job := queue.NewJob("emails:send").Payload(map[string]any{"id": 1}).OnQueue("default")
_, _ = q.Dispatch(context.Background(), job)Driver reports the configured backend driver for the underlying queue runtime.
q, err := queue.NewSync()
if err != nil {
return
}
fmt.Println(q.Driver())
// Output: syncFindBatch returns current batch state by ID.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error { return nil })
batchID, err := q.Batch(queue.NewJob("emails:send")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindBatch(context.Background(), batchID)FindChain returns current chain state by ID.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, j queue.Context) error { return nil })
chainID, err := q.Chain(queue.NewJob("first")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindChain(context.Background(), chainID)Pause pauses consumption for a queue when supported by the underlying driver.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
}Prune deletes old workflow state records.
q, err := queue.NewSync()
if err != nil {
return
}Register binds a handler for a high-level job type.
q, err := queue.NewSync()
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error {
var payload EmailPayload
if err := j.Bind(&payload); err != nil {
return err
}
return nil
})Resume resumes consumption for a queue when supported by the underlying driver.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
}Shutdown drains workers and closes underlying resources.
q, err := queue.NewWorkerpool()
if err != nil {
return
}StartWorkers starts worker processing.
q, err := queue.NewWorkerpool()
if err != nil {
return
}Stats returns a normalized snapshot when supported by the underlying driver.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsNativeStats(q) {
_, _ = q.Stats(context.Background())
}WithClock overrides the workflow runtime clock.
q, err := queue.New(
queue.Config{Driver: queue.DriverSync},
queue.WithClock(func() time.Time { return time.Unix(0, 0) }),
)
if err != nil {
return
}WithMiddleware appends queue workflow middleware.
mw := queue.MiddlewareFunc(func(ctx context.Context, j queue.Context, next queue.Next) error {
return next(ctx, j)
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithMiddleware(mw))
if err != nil {
return
}WithObserver installs a workflow lifecycle observer.
observer := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithObserver(observer))
if err != nil {
return
}WithStore overrides the workflow orchestration store.
var store queue.WorkflowStore
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithStore(store))
if err != nil {
return
}Workers sets desired worker concurrency before StartWorkers.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
q.Workers(4)Dispatch submits a typed job payload using the default queue.
var q queue.QueueRuntime
err := q.Dispatch(
queue.NewJob("emails:send").
Payload(map[string]any{"id": 1}).
OnQueue("default"),
)DispatchCtx submits a typed job payload using the provided context.
var q queue.QueueRuntime
err := q.DispatchCtx(
context.Background(),
queue.NewJob("emails:send").OnQueue("default"),
)Driver returns the active queue driver.
var q queue.QueueRuntime
driver := q.Driver()Register associates a handler with a job type.
var q queue.QueueRuntime
q.Register("emails:send", func(context.Context, queue.Job) error { return nil })Shutdown drains running work and releases resources.
var q queue.QueueRuntime
err := q.Shutdown(context.Background())StartWorkers starts worker execution.
var q queue.QueueRuntime
err := q.StartWorkers(context.Background())Workers sets desired worker concurrency before StartWorkers.
var q queue.QueueRuntime
q = q.Workers(4)AssertCount fails when dispatch count is not expected.
fake := queue.NewFake()
fake.AssertCount(nil, 1)AssertDispatched fails when jobType was not dispatched.
fake := queue.NewFake()
fake.AssertDispatched(nil, "emails:send")AssertDispatchedOn fails when jobType was not dispatched on queueName.
fake := queue.NewFake()
queue.NewJob("emails:send").
OnQueue("critical"),
)
fake.AssertDispatchedOn(nil, "critical", "emails:send")AssertDispatchedTimes fails when jobType dispatch count does not match expected.
fake := queue.NewFake()
fake.AssertDispatchedTimes(nil, "emails:send", 2)AssertNotDispatched fails when jobType was dispatched.
fake := queue.NewFake()
fake.AssertNotDispatched(nil, "emails:cancel")AssertNothingDispatched fails when any dispatch was recorded.
fake := queue.NewFake()
fake.AssertNothingDispatched(nil)Dispatch records a typed job payload in-memory using the fake default queue.
fake := queue.NewFake()
err := fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))DispatchCtx submits a typed job payload using the provided context.
fake := queue.NewFake()
ctx := context.Background()
err := fake.DispatchCtx(ctx, queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(err == nil)
// Output: trueDriver returns the active queue driver.
fake := queue.NewFake()
driver := fake.Driver()NewFake creates a queue fake that records dispatches and provides assertions.
fake := queue.NewFake()
queue.NewJob("emails:send").
Payload(map[string]any{"id": 1}).
OnQueue("critical"),
)
records := fake.Records()
fmt.Println(len(records), records[0].Queue, records[0].Job.Type)
// Output: 1 critical emails:sendRecords returns a copy of all dispatch records.
fake := queue.NewFake()
records := fake.Records()
fmt.Println(len(records), records[0].Job.Type)
// Output: 1 emails:sendRegister associates a handler with a job type.
fake := queue.NewFake()
fake.Register("emails:send", func(context.Context, queue.Job) error { return nil })Reset clears all recorded dispatches.
fake := queue.NewFake()
fmt.Println(len(fake.Records()))
fake.Reset()
fmt.Println(len(fake.Records()))
// Output:
// 1
// 0Shutdown drains running work and releases resources.
fake := queue.NewFake()
err := fake.Shutdown(context.Background())StartWorkers starts worker execution.
fake := queue.NewFake()
err := fake.StartWorkers(context.Background())Workers sets desired worker concurrency before StartWorkers.
fake := queue.NewFake()
q := fake.Workers(4)
fmt.Println(q != nil)
// Output: true