Skip to content
/ queue Public

A multi-driver job queue for Go with retries, delays, uniqueness, and fluent task builders.

Notifications You must be signed in to change notification settings

goforj/queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

142 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

queue logo

queue gives your services one queue API with Redis, SQL, NATS, SQS, RabbitMQ, and in-process drivers.

Go Reference License: MIT Go Test Go version Latest tag Go Report Card Tests

Installation

go get github.com/goforj/queue

Quick Start

import (
	"context"
	"fmt"

	"github.com/goforj/queue"
)

func main() {
	q, _ := queue.NewWorkerpool()

	q.Register("emails:send", func(ctx context.Context, j queue.Context) error {
		var payload struct {
			To string `json:"to"`
		}
		_ = j.Bind(&payload)
		fmt.Println("send to", payload.To)
		return nil
	})

	_ = q.StartWorkers(context.Background())
	defer q.Shutdown(context.Background())

	_, _ = q.Dispatch(
		context.Background(),
		queue.NewJob("emails:send").
			Payload(map[string]any{"to": "user@example.com"}).
			OnQueue("default"),
	)
}

Quick Start (Advanced: Workflows)

import (
	"context"

	"github.com/goforj/queue"
)

type EmailPayload struct {
	ID int `json:"id"`
}

func main() {
	q, _ := queue.NewWorkerpool()

	q.Register("reports:generate", func(ctx context.Context, j queue.Context) error {
		return nil
	})
	q.Register("reports:upload", func(ctx context.Context, j queue.Context) error {
		var payload EmailPayload
		if err := j.Bind(&payload); err != nil {
			return err
		}
		return nil
	})
	q.Register("users:notify_report_ready", func(ctx context.Context, j queue.Context) error {
		return nil
	})

	_ = q.Workers(2).StartWorkers(context.Background())
	defer q.Shutdown(context.Background())

	chainID, _ := q.Chain(
		// 1) generate report data
		queue.NewJob("reports:generate").Payload(map[string]any{"report_id": "rpt_123"}),
		// 2) upload report artifact after generate succeeds
		queue.NewJob("reports:upload").Payload(EmailPayload{ID: 123}),
		// 3) notify user only after upload succeeds
		queue.NewJob("users:notify_report_ready").Payload(map[string]any{"user_id": 123}),
	).OnQueue("critical").Dispatch(context.Background())
	_ = chainID
}

Job builder options

// Define a struct for your job payload.
type EmailPayload struct {
	ID int `json:"id"`
	To string `json:"to"`
}

// Fluent builder pattern for job options.
job := queue.NewJob("emails:send").
	// Payload can be bytes, structs, maps, or JSON-marshalable values.
	// Default payload is empty.
	Payload(EmailPayload{ID: 123, To: "user@example.com"}).
	// OnQueue sets the queue name.
	// Default is empty; broker-style drivers expect an explicit queue.
	OnQueue("default").
	// Timeout sets per-job execution timeout.
	// Default is unset; some drivers may apply driver/runtime defaults.
	Timeout(20 * time.Second).
	// Retry sets max retries.
	// Default is 0, which means one total attempt.
	Retry(3).
	// Backoff sets retry delay.
	// Default is unset; Redis dispatch returns ErrBackoffUnsupported.
	Backoff(500 * time.Millisecond).
	// Delay schedules first execution in the future.
	// Default is 0 (run immediately).
	Delay(2 * time.Second).
	// UniqueFor deduplicates Type+Payload for a TTL window.
	// Default is 0 (no dedupe).
	UniqueFor(45 * time.Second)

// Dispatch the job to the queue.
_ = q.Dispatch(job)

// In handlers, use Bind to decode payload into a struct.
q.Register("emails:send", func(ctx context.Context, job queue.Job) error {
	var payload EmailPayload
	if err := job.Bind(&payload); err != nil {
		return err
	}
	return nil
})

Drivers

Driver / Backend Mode Notes Durable Async Delay Unique Backoff Timeout
Null Drop-only Discards dispatched jobs; useful for disabled queue modes and smoke tests. - - - - - -
Sync Inline (caller) Deterministic local execution with no external infra. - - - âś“ - âś“
Workerpool In-process pool Local async behavior without external broker/database. - âś“ âś“ âś“ âś“ âś“
Database SQL (pg/mysql/sqlite) Durable queue with SQL storage. âś“ âś“ âś“ âś“ âś“ âś“
Redis Redis/Asynq Production Redis backend (Asynq semantics). âś“ âś“ âś“ âś“ - âś“
NATS Broker target NATS transport with queue-subject routing. - âś“ âś“ âś“ âś“ âś“
SQS Broker target AWS SQS transport with endpoint overrides for localstack/testing. - âś“ âś“ âś“ âś“ âś“
RabbitMQ Broker target RabbitMQ transport and worker consumption. - âś“ âś“ âś“ âś“ âś“

Middleware

Use queue.WithMiddleware(...) to apply cross-cutting workflow behavior (logging, filtering, error policy) to chains/batches/dispatch orchestration.

audit := queue.MiddlewareFunc(func(ctx context.Context, j queue.Context, next queue.Next) error {
    return next(ctx, j)
})

q, _ := queue.New(
    queue.Config{Driver: queue.DriverWorkerpool},
    queue.WithMiddleware(audit),
)
_ = q

Core Concepts

Concept Purpose Primary API
Job Typed work unit for app handlers queue.NewJob, Dispatch
Chain Ordered workflow (A then B then C) Chain(...).Dispatch(...)
Batch Parallel workflow with callbacks Batch(...).Then/Catch/Finally
Middleware Cross-cutting execution policy Queue middleware (queue.WithMiddleware)
Events Lifecycle hooks and observability queue runtime events (queue.Observer) + workflow events (advanced plumbing)
Backends Driver/runtime transport queue.New(...) / queue.NewQueue(...)

Queue Backends

Use queue constructors/config to choose transport/runtime. Queue composes these backends for workflow features. Use queue.NewQueue(...) only when you need the advanced low-level QueueRuntime API.

Backend Constructor
In-process sync queue.NewSync()
In-process worker pool queue.NewWorkerpool()
SQL durable queue queue.NewDatabase(driver, dsn)
Redis/Asynq queue.NewRedis(addr)
NATS queue.NewNATS(url)
SQS queue.NewSQS(region)
RabbitMQ queue.NewRabbitMQ(url)
Drop-only (disabled mode) queue.NewNull()

Observability

Use queue.Observer implementations to capture normalized runtime events across drivers.

collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
    collector,
    queue.ObserverFunc(func(event queue.Event) {
        _ = event.Kind
    }),
)

q, _ := queue.New(queue.Config{
    Driver:   queue.DriverWorkerpool,
    Observer: observer,
})
_ = q

Distributed counters and source of truth

  • StatsCollector counters are process-local and event-driven.
  • In multi-process deployments, aggregate metrics externally (OTel/Prometheus/etc.).
  • Prefer backend-native stats when available.
  • queue.SupportsNativeStats(q) indicates native driver snapshot support.
  • queue.Snapshot(ctx, q, collector) merges native + collector where possible.

Compose observers

events := make(chan queue.Event, 100)
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
    collector,
    queue.ChannelObserver{
        Events:     events,
        DropIfFull: true,
    },
    queue.ObserverFunc(func(e queue.Event) {
        _ = e
    }),
)

q, _ := queue.New(queue.Config{
    Driver:   queue.DriverWorkerpool,
    Observer: observer,
})
_ = q

Kitchen sink event logging (runtime + workflow)

Runnable example: examples/observeall/main.go

logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
runtimeObserver := queue.ObserverFunc(func(event queue.Event) {
	attemptInfo := fmt.Sprintf("attempt=%d/%d", event.Attempt, event.MaxRetry+1)
	jobInfo := fmt.Sprintf("job=%s key=%s queue=%s driver=%s", event.JobType, event.JobKey, event.Queue, event.Driver)

	switch event.Kind {
	case queue.EventEnqueueAccepted:
		logger.Info("Accepted dispatch", "msg", fmt.Sprintf("Accepted %s", jobInfo), "scheduled", event.Scheduled, "at", event.Time.Format(time.RFC3339Nano))
	case queue.EventEnqueueRejected:
		logger.Error("Dispatch failed", "msg", fmt.Sprintf("Rejected %s", jobInfo), "error", event.Err)
	case queue.EventEnqueueDuplicate:
		logger.Warn("Skipped duplicate job", "msg", fmt.Sprintf("Duplicate %s", jobInfo))
	case queue.EventEnqueueCanceled:
		logger.Warn("Canceled dispatch", "msg", fmt.Sprintf("Canceled %s", jobInfo), "error", event.Err)
	case queue.EventProcessStarted:
		logger.Info("Started processing job", "msg", fmt.Sprintf("Started %s (%s)", jobInfo, attemptInfo), "at", event.Time.Format(time.RFC3339Nano))
	case queue.EventProcessSucceeded:
		logger.Info("Processed job", "msg", fmt.Sprintf("Processed %s in %s (%s)", jobInfo, event.Duration, attemptInfo))
	case queue.EventProcessFailed:
		logger.Error("Processing failed", "msg", fmt.Sprintf("Failed %s after %s (%s)", jobInfo, event.Duration, attemptInfo), "error", event.Err)
	case queue.EventProcessRetried:
		logger.Warn("Retrying job", "msg", fmt.Sprintf("Retry scheduled for %s (%s)", jobInfo, attemptInfo), "error", event.Err)
	case queue.EventProcessArchived:
		logger.Error("Archived failed job", "msg", fmt.Sprintf("Archived %s after final failure (%s)", jobInfo, attemptInfo), "error", event.Err)
	case queue.EventQueuePaused:
		logger.Info("Paused queue", "msg", fmt.Sprintf("Paused queue=%s driver=%s", event.Queue, event.Driver))
	case queue.EventQueueResumed:
		logger.Info("Resumed queue", "msg", fmt.Sprintf("Resumed queue=%s driver=%s", event.Queue, event.Driver))
	default:
		logger.Info("Queue event", "msg", fmt.Sprintf("kind=%s %s", event.Kind, jobInfo))
	}
})
workflowObserver := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
	logger.Info("workflow event",
		"kind", event.Kind,
		"dispatch_id", event.DispatchID,
		"job_id", event.JobID,
		"chain_id", event.ChainID,
		"batch_id", event.BatchID,
		"job_type", event.JobType,
		"queue", event.Queue,
		"attempt", event.Attempt,
		"duration", event.Duration,
		"err", event.Err,
	)
})

q, _ := queue.New(
	queue.Config{
		Driver:    queue.DriverRedis,
		RedisAddr: "127.0.0.1:6379",
		Observer:  runtimeObserver,
	},
	queue.WithObserver(workflowObserver),
)
_ = q

Observability capabilities by driver

Driver Native Stats Pause/Resume
null - -
sync - -
workerpool - -
database âś“ -
redis âś“ âś“
nats - -
sqs - -
rabbitmq - -

Events reference

Type EventKind Meaning
queue enqueue_accepted Job accepted by driver for enqueue.
queue enqueue_rejected Job enqueue failed.
queue enqueue_duplicate Duplicate job rejected due to uniqueness key.
queue enqueue_canceled Context cancellation prevented enqueue.
queue process_started Worker began processing job.
queue process_succeeded Handler returned success.
queue process_failed Handler returned error.
queue process_retried Driver scheduled retry attempt.
queue process_archived Job moved to terminal failure state.
queue queue_paused Queue was paused (driver supports pause).
queue queue_resumed Queue was resumed.
workflow dispatch_started Workflow runtime accepted a dispatch request and created a dispatch record.
workflow dispatch_succeeded Dispatch was successfully enqueued to the underlying queue runtime.
workflow dispatch_failed Dispatch failed before job execution could start.
workflow job_started A workflow job handler started execution.
workflow job_succeeded A workflow job handler completed successfully.
workflow job_failed A workflow job handler returned an error.
workflow chain_started A chain workflow was created and started.
workflow chain_advanced Chain progressed from one node to the next node.
workflow chain_completed Chain reached terminal success.
workflow chain_failed Chain reached terminal failure.
workflow batch_started A batch workflow was created and started.
workflow batch_progressed Batch state changed as jobs completed/failed.
workflow batch_completed Batch reached terminal success (or allowed-failure completion).
workflow batch_failed Batch reached terminal failure.
workflow batch_cancelled Batch was cancelled before normal completion.
workflow callback_started Chain/batch callback execution started.
workflow callback_succeeded Chain/batch callback completed successfully.
workflow callback_failed Chain/batch callback returned an error.

Testing By Audience

Application tests

Use queue.NewFake() to assert dispatch behavior in application tests.

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fake.AssertDispatched(nil, "emails:send")

Runtime/driver tests

Use the same queue.NewFake() helper when testing queue/job-level dispatch semantics.

fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fake.AssertDispatched(nil, "emails:send")

Queue Runtime (Lower Level)

If you do not need workflow orchestration, use the lower-level queue.QueueRuntime via queue.NewQueue(...). Most application code should prefer queue.New(...).

q, _ := queue.NewQueue(queue.Config{Driver: queue.DriverWorkerpool})
q.Register("emails:send", func(ctx context.Context, job queue.Job) error {
    return nil
})
_ = q.Workers(2).StartWorkers(context.Background())
defer q.Shutdown(context.Background())
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))

Matrix status and backend integration notes are tracked in docs/integration-scenarios.md.

API reference

The API section below is autogenerated; do not edit between the markers.

API Index

Group Functions
Constructors New NewDatabase NewNATS NewNull NewQueue NewRabbitMQ NewRedis NewSQS NewStatsCollector NewSync NewWorkerpool
Job Backoff Bind Delay NewJob OnQueue Payload PayloadBytes PayloadJSON Retry Timeout UniqueFor
Observability Active Archived Failed MultiObserver ChannelObserver.Observe Observer.Observe ObserverFunc.Observe StatsCollector.Observe Pause Paused Pending Processed Queue Queues Resume RetryCount Scheduled Snapshot StatsCollector.Snapshot SupportsNativeStats SupportsPause Throughput
Queue Batch Chain Dispatch Driver FindBatch FindChain Pause Prune Register Resume Shutdown StartWorkers Stats WithClock WithMiddleware WithObserver WithStore Workers
Queue Runtime Dispatch DispatchCtx Driver Register Shutdown StartWorkers Workers

API

Constructors

New

New creates the high-level Queue API based on Config.Driver.

q, err := queue.New(queue.Config{Driver: queue.DriverWorkerpool})
if err != nil {
	return
}
type EmailPayload struct {
	ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error {
	var payload EmailPayload
	if err := j.Bind(&payload); err != nil {
		return err
	}
	return nil
})
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
	context.Background(),
	queue.NewJob("emails:send").
		Payload(EmailPayload{ID: 1}).
		OnQueue("default"),
)

NewDatabase

NewDatabase creates a Queue on the SQL backend.

q, err := queue.NewDatabase("sqlite", "file:queue.db?_busy_timeout=5000")
if err != nil {
	return
}

NewNATS

NewNATS creates a Queue on the NATS backend.

q, err := queue.NewNATS("nats://127.0.0.1:4222")
if err != nil {
	return
}

NewNull

NewNull creates a Queue on the null backend.

q, err := queue.NewNull()
if err != nil {
	return
}

NewQueue

NewQueue creates the low-level queue runtime (driver-facing API) based on Config.Driver. Use this only for driver-focused/advanced runtime access; application code should prefer New.

q, err := queue.NewQueue(queue.Config{
	Driver:       queue.DriverSync,
	DefaultQueue: "critical",
})
if err != nil {
	return
}
type EmailPayload struct {
	ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, job queue.Job) error {
	var payload EmailPayload
	if err := job.Bind(&payload); err != nil {
		return err
	}
	return nil
})
defer q.Shutdown(context.Background())

NewRabbitMQ

NewRabbitMQ creates a Queue on the RabbitMQ backend.

q, err := queue.NewRabbitMQ("amqp://guest:guest@127.0.0.1:5672/")
if err != nil {
	return
}

NewRedis

NewRedis creates a Queue on the Redis backend.

q, err := queue.NewRedis("127.0.0.1:6379")
if err != nil {
	return
}

NewSQS

NewSQS creates a Queue on the SQS backend.

q, err := queue.NewSQS("us-east-1")
if err != nil {
	return
}

NewStatsCollector

NewStatsCollector creates an event collector for queue counters.

collector := queue.NewStatsCollector()

NewSync

NewSync creates a Queue on the synchronous in-process backend.

q, err := queue.NewSync()
if err != nil {
	return
}

NewWorkerpool

NewWorkerpool creates a Queue on the in-process workerpool backend.

q, err := queue.NewWorkerpool()
if err != nil {
	return
}

Job

Backoff

Backoff sets delay between retries.

job := queue.NewJob("emails:send").Backoff(500 * time.Millisecond)

Bind

Bind unmarshals job payload JSON into dst.

type EmailPayload struct {
	ID int    `json:"id"`
	To string `json:"to"`
}
job := queue.NewJob("emails:send").Payload(EmailPayload{
	ID: 1,
	To: "user@example.com",
})
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
	return
}

Delay

Delay defers execution by duration.

job := queue.NewJob("emails:send").Delay(300 * time.Millisecond)

NewJob

NewJob creates a job value with a required job type.

job := queue.NewJob("emails:send")

OnQueue

OnQueue sets the target queue name.

job := queue.NewJob("emails:send").OnQueue("critical")

Payload

Payload sets job payload from common value types.

Example: payload bytes

jobBytes := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))

Example: payload struct

type Meta struct {
	Nested bool `json:"nested"`
}
type EmailPayload struct {
	ID   int    `json:"id"`
	To   string `json:"to"`
	Meta Meta   `json:"meta"`
}
jobStruct := queue.NewJob("emails:send").Payload(EmailPayload{
	ID:   1,
	To:   "user@example.com",
	Meta: Meta{Nested: true},
})

Example: payload map

jobMap := queue.NewJob("emails:send").Payload(map[string]any{
	"id":  1,
	"to":  "user@example.com",
	"meta": map[string]any{"nested": true},
})

PayloadBytes

PayloadBytes returns a copy of job payload bytes.

job := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
payload := job.PayloadBytes()

PayloadJSON

PayloadJSON marshals payload as JSON.

job := queue.NewJob("emails:send").PayloadJSON(map[string]int{"id": 1})

Retry

Retry sets max retry attempts.

job := queue.NewJob("emails:send").Retry(4)

Timeout

Timeout sets per-job execution timeout.

job := queue.NewJob("emails:send").Timeout(10 * time.Second)

UniqueFor

UniqueFor enables uniqueness dedupe within the given TTL.

job := queue.NewJob("emails:send").UniqueFor(45 * time.Second)

Observability

Active

Active returns active count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Active: 2},
	},
}
fmt.Println(snapshot.Active("default"))
// Output: 2

Archived

Archived returns archived count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Archived: 7},
	},
}
fmt.Println(snapshot.Archived("default"))
// Output: 7

Failed

Failed returns failed count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Failed: 2},
	},
}
fmt.Println(snapshot.Failed("default"))
// Output: 2

MultiObserver

MultiObserver fans out events to multiple observers.

events := make(chan queue.Event, 2)
observer := queue.MultiObserver(
	queue.ChannelObserver{Events: events},
	queue.ObserverFunc(func(queue.Event) {}),
)
observer.Observe(queue.Event{Kind: queue.EventEnqueueAccepted})
fmt.Println(len(events))
// Output: 1

ChannelObserver.Observe

Observe forwards an event to the configured channel.

ch := make(chan queue.Event, 1)
observer := queue.ChannelObserver{Events: ch}
observer.Observe(queue.Event{Kind: queue.EventProcessStarted, Queue: "default"})
event := <-ch

Observer.Observe

Observe handles a queue runtime event.

var observer queue.Observer
observer.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
})

ObserverFunc.Observe

Observe calls the wrapped function.

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
observer := queue.ObserverFunc(func(event queue.Event) {
	logger.Info("queue event",
		"kind", event.Kind,
		"driver", event.Driver,
		"queue", event.Queue,
		"job_type", event.JobType,
		"attempt", event.Attempt,
		"max_retry", event.MaxRetry,
		"duration", event.Duration,
		"err", event.Err,
	)
})
observer.Observe(queue.Event{
	Kind:     queue.EventProcessSucceeded,
	Driver:   queue.DriverSync,
	Queue:    "default",
	JobType: "emails:send",
})

StatsCollector.Observe

Observe records an event and updates normalized counters.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})

Pause

Pause pauses queue consumption for drivers that support it.

q, _ := queue.NewSync()
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 1

Paused

Paused returns paused count for a queue.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventQueuePaused,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
fmt.Println(snapshot.Paused("default"))
// Output: 1

Pending

Pending returns pending count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Pending: 3},
	},
}
fmt.Println(snapshot.Pending("default"))
// Output: 3

Processed

Processed returns processed count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Processed: 11},
	},
}
fmt.Println(snapshot.Processed("default"))
// Output: 11

Queue

Queue returns queue counters for a queue name.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
counters, ok := snapshot.Queue("default")
fmt.Println(ok, counters.Pending)
// Output: true 1

Queues

Queues returns sorted queue names present in the snapshot.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "critical",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
names := snapshot.Queues()
fmt.Println(len(names), names[0])
// Output: 1 critical

Resume

Resume resumes queue consumption for drivers that support it.

q, _ := queue.NewSync()
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 0

RetryCount

RetryCount returns retry count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Retry: 1},
	},
}
fmt.Println(snapshot.RetryCount("default"))
// Output: 1

Scheduled

Scheduled returns scheduled count for a queue.

snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Scheduled: 4},
	},
}
fmt.Println(snapshot.Scheduled("default"))
// Output: 4

Snapshot

Snapshot returns driver-native stats, falling back to collector data.

q, _ := queue.NewSync()
snapshot, _ := q.Stats(context.Background())
_, ok := snapshot.Queue("default")
fmt.Println(ok)
// Output: true

StatsCollector.Snapshot

Snapshot returns a copy of collected counters.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
collector.Observe(queue.Event{
	Kind:   queue.EventProcessStarted,
	Driver: queue.DriverSync,
	Queue:  "default",
	JobKey: "job-1",
	Time:   time.Now(),
})
collector.Observe(queue.Event{
	Kind:     queue.EventProcessSucceeded,
	Driver:   queue.DriverSync,
	Queue:    "default",
	JobKey:  "job-1",
	Duration: 12 * time.Millisecond,
	Time:     time.Now(),
})
snapshot := collector.Snapshot()
counters, _ := snapshot.Queue("default")
throughput, _ := snapshot.Throughput("default")
fmt.Printf("queues=%v\n", snapshot.Queues())
fmt.Printf("counters=%+v\n", counters)
fmt.Printf("hour=%+v\n", throughput.Hour)
// Output:
// queues=[default]
// counters={Pending:0 Active:0 Scheduled:0 Retry:0 Archived:0 Processed:1 Failed:0 Paused:0 AvgWait:0s AvgRun:12ms}
// hour={Processed:1 Failed:0}

SupportsNativeStats

SupportsNativeStats reports whether a queue runtime exposes native stats snapshots.

q, _ := queue.NewSync()
fmt.Println(queue.SupportsNativeStats(q))
// Output: true

SupportsPause

SupportsPause reports whether a queue runtime supports Pause/Resume.

q, _ := queue.NewSync()
fmt.Println(queue.SupportsPause(q))
// Output: true

Throughput

Throughput returns rolling throughput windows for a queue name.

collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventProcessSucceeded,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
throughput, ok := snapshot.Throughput("default")
fmt.Printf("ok=%v hour=%+v day=%+v week=%+v\n", ok, throughput.Hour, throughput.Day, throughput.Week)
// Output: ok=true hour={Processed:1 Failed:0} day={Processed:1 Failed:0} week={Processed:1 Failed:0}

Queue

Batch

Batch creates a batch builder for fan-out workflow execution.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error { return nil })
_, _ = q.Batch(
	queue.NewJob("emails:send").Payload(map[string]any{"id": 1}),
	queue.NewJob("emails:send").Payload(map[string]any{"id": 2}),
).Name("send-emails").OnQueue("default").Dispatch(context.Background())

Chain

Chain creates a chain builder for sequential workflow execution.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("first", func(ctx context.Context, j queue.Context) error { return nil })
q.Register("second", func(ctx context.Context, j queue.Context) error { return nil })
_, _ = q.Chain(
	queue.NewJob("first"),
	queue.NewJob("second"),
).OnQueue("default").Dispatch(context.Background())

Queue.Dispatch

Dispatch enqueues a high-level job.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error { return nil })
job := queue.NewJob("emails:send").Payload(map[string]any{"id": 1}).OnQueue("default")
_, _ = q.Dispatch(context.Background(), job)

Queue.Driver

Driver reports the configured backend driver for the underlying queue runtime.

q, err := queue.NewSync()
if err != nil {
	return
}
fmt.Println(q.Driver())
// Output: sync

FindBatch

FindBatch returns current batch state by ID.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error { return nil })
batchID, err := q.Batch(queue.NewJob("emails:send")).Dispatch(context.Background())
if err != nil {
	return
}
_, _ = q.FindBatch(context.Background(), batchID)

FindChain

FindChain returns current chain state by ID.

q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("first", func(ctx context.Context, j queue.Context) error { return nil })
chainID, err := q.Chain(queue.NewJob("first")).Dispatch(context.Background())
if err != nil {
	return
}
_, _ = q.FindChain(context.Background(), chainID)

Queue.Pause

Pause pauses consumption for a queue when supported by the underlying driver.

q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsPause(q) {
}

Prune

Prune deletes old workflow state records.

q, err := queue.NewSync()
if err != nil {
	return
}

Queue.Register

Register binds a handler for a high-level job type.

q, err := queue.NewSync()
if err != nil {
	return
}
type EmailPayload struct {
	ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, j queue.Context) error {
	var payload EmailPayload
	if err := j.Bind(&payload); err != nil {
		return err
	}
	return nil
})

Queue.Resume

Resume resumes consumption for a queue when supported by the underlying driver.

q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsPause(q) {
}

Queue.Shutdown

Shutdown drains workers and closes underlying resources.

q, err := queue.NewWorkerpool()
if err != nil {
	return
}

Queue.StartWorkers

StartWorkers starts worker processing.

q, err := queue.NewWorkerpool()
if err != nil {
	return
}

Stats

Stats returns a normalized snapshot when supported by the underlying driver.

q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsNativeStats(q) {
	_, _ = q.Stats(context.Background())
}

WithClock

WithClock overrides the workflow runtime clock.

q, err := queue.New(
	queue.Config{Driver: queue.DriverSync},
	queue.WithClock(func() time.Time { return time.Unix(0, 0) }),
)
if err != nil {
	return
}

WithMiddleware

WithMiddleware appends queue workflow middleware.

mw := queue.MiddlewareFunc(func(ctx context.Context, j queue.Context, next queue.Next) error {
	return next(ctx, j)
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithMiddleware(mw))
if err != nil {
	return
}

WithObserver

WithObserver installs a workflow lifecycle observer.

observer := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithObserver(observer))
if err != nil {
	return
}

WithStore

WithStore overrides the workflow orchestration store.

var store queue.WorkflowStore
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithStore(store))
if err != nil {
	return
}

Queue.Workers

Workers sets desired worker concurrency before StartWorkers.

q, err := queue.NewWorkerpool()
if err != nil {
	return
}
q.Workers(4)

Queue Runtime

QueueRuntime.Dispatch

Dispatch submits a typed job payload using the default queue.

var q queue.QueueRuntime
err := q.Dispatch(
	queue.NewJob("emails:send").
		Payload(map[string]any{"id": 1}).
		OnQueue("default"),
)

QueueRuntime.DispatchCtx

DispatchCtx submits a typed job payload using the provided context.

var q queue.QueueRuntime
err := q.DispatchCtx(
	context.Background(),
	queue.NewJob("emails:send").OnQueue("default"),
)

QueueRuntime.Driver

Driver returns the active queue driver.

var q queue.QueueRuntime
driver := q.Driver()

QueueRuntime.Register

Register associates a handler with a job type.

var q queue.QueueRuntime
q.Register("emails:send", func(context.Context, queue.Job) error { return nil })

QueueRuntime.Shutdown

Shutdown drains running work and releases resources.

var q queue.QueueRuntime
err := q.Shutdown(context.Background())

QueueRuntime.StartWorkers

StartWorkers starts worker execution.

var q queue.QueueRuntime
err := q.StartWorkers(context.Background())

QueueRuntime.Workers

Workers sets desired worker concurrency before StartWorkers.

var q queue.QueueRuntime
q = q.Workers(4)

Testing

AssertCount

AssertCount fails when dispatch count is not expected.

fake := queue.NewFake()
fake.AssertCount(nil, 1)

AssertDispatched

AssertDispatched fails when jobType was not dispatched.

fake := queue.NewFake()
fake.AssertDispatched(nil, "emails:send")

AssertDispatchedOn

AssertDispatchedOn fails when jobType was not dispatched on queueName.

fake := queue.NewFake()
	queue.NewJob("emails:send").
		OnQueue("critical"),
)
fake.AssertDispatchedOn(nil, "critical", "emails:send")

AssertDispatchedTimes

AssertDispatchedTimes fails when jobType dispatch count does not match expected.

fake := queue.NewFake()
fake.AssertDispatchedTimes(nil, "emails:send", 2)

AssertNotDispatched

AssertNotDispatched fails when jobType was dispatched.

fake := queue.NewFake()
fake.AssertNotDispatched(nil, "emails:cancel")

AssertNothingDispatched

AssertNothingDispatched fails when any dispatch was recorded.

fake := queue.NewFake()
fake.AssertNothingDispatched(nil)

FakeQueue.Dispatch

Dispatch records a typed job payload in-memory using the fake default queue.

fake := queue.NewFake()
err := fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))

FakeQueue.DispatchCtx

DispatchCtx submits a typed job payload using the provided context.

fake := queue.NewFake()
ctx := context.Background()
err := fake.DispatchCtx(ctx, queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(err == nil)
// Output: true

FakeQueue.Driver

Driver returns the active queue driver.

fake := queue.NewFake()
driver := fake.Driver()

NewFake

NewFake creates a queue fake that records dispatches and provides assertions.

fake := queue.NewFake()
	queue.NewJob("emails:send").
		Payload(map[string]any{"id": 1}).
		OnQueue("critical"),
)
records := fake.Records()
fmt.Println(len(records), records[0].Queue, records[0].Job.Type)
// Output: 1 critical emails:send

Records

Records returns a copy of all dispatch records.

fake := queue.NewFake()
records := fake.Records()
fmt.Println(len(records), records[0].Job.Type)
// Output: 1 emails:send

FakeQueue.Register

Register associates a handler with a job type.

fake := queue.NewFake()
fake.Register("emails:send", func(context.Context, queue.Job) error { return nil })

Reset

Reset clears all recorded dispatches.

fake := queue.NewFake()
fmt.Println(len(fake.Records()))
fake.Reset()
fmt.Println(len(fake.Records()))
// Output:
// 1
// 0

FakeQueue.Shutdown

Shutdown drains running work and releases resources.

fake := queue.NewFake()
err := fake.Shutdown(context.Background())

FakeQueue.StartWorkers

StartWorkers starts worker execution.

fake := queue.NewFake()
err := fake.StartWorkers(context.Background())

FakeQueue.Workers

Workers sets desired worker concurrency before StartWorkers.

fake := queue.NewFake()
q := fake.Workers(4)
fmt.Println(q != nil)
// Output: true

About

A multi-driver job queue for Go with retries, delays, uniqueness, and fluent task builders.

Topics

Resources

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published