Dhara is a Go-based distributed task queue built on PostgreSQL.
It is an active work in progress, but the current codebase already covers the core queue lifecycle:
- creating tasks via HTTP
- listing and fetching tasks
- canceling tasks
- retrying dead tasks
- claiming tasks from PostgreSQL
- executing tasks in worker pools
- heartbeating running tasks
- reaping stale running tasks
- recording task status changes and logs
- exposing operational health and metrics endpoints
- running schema migrations
In active development
Package names, structure, and startup flow may still change as the project matures.
Prereqs: Go 1.26+, Docker, and running PostgreSQL instance.
-
Start Postgres via Docker:
-
Set the required environment variable: export DHARA_DATABASE_URL="postgres://dhara:dhara@localhost:5432/dhara?sslmode=disable"
If you prefer a file, copy
.env.exampleto.envand load it in your shell. -
Run the server (auto-migrates by default): go run ./cmd/server
-
Create a task: curl -X POST http://localhost:8080/api/v1/tasks
-H "Content-Type: application/json"
-d '{"type":"echo","payload":{"message":"hello"}}' -
List tasks: curl "http://localhost:8080/api/v1/tasks?limit=20"
-
To register custom task types, see Adding custom task types.
Required:
DHARA_DATABASE_URL— PostgreSQL connection string.
Optional (defaults shown in .env.example):
AUTO_MIGRATE(defaulttrue)MIGRATIONS_DIR(defaultinternal/db/migrations)PORT(default8080)WORKER_COUNT(default5)HANDLER_TIMEOUT(default5m)SHUTDOWN_TIMEOUT(default30s)LOG_LEVEL(defaultinfo)LOG_FORMAT(defaulttext)
- PostgreSQL-backed task storage
- Task creation API
- Task listing API
- Task retrieval API
- Task cancellation API
- Manual retry of dead tasks
- Worker pool that polls for pending tasks
- Heartbeats for running tasks
- Reaper for stale running tasks
- Task handlers for demo workloads
- Automatic migrations on startup (configurable)
/api/v1/livez,/api/v1/readyz,api/v1/healthhealth endpoints- Prometheus-style
/api/v1/metricsendpoint - Structured logging with
slog
Dhara uses PostgreSQL as the source of truth for task state.
-
HTTP server Exposes the task API, health endpoints, and metrics endpoint.
-
Task store Handles database operations for creating, claiming, updating, retrying, canceling, and reading tasks.
-
Worker pool Continuously polls for pending tasks and executes handlers concurrently.
-
Reaper Detects stale running tasks and either requeues them or marks them dead.
-
Task handlers Business logic for task types such as
echo,send_email,always_fail, andslow_task. -
Migrations Sets up and evolves the database schema.
-
Metrics Exposes queue health and lifecycle counters for operational visibility.
- A client submits a task through the HTTP API.
- The task is stored in PostgreSQL with
PENDINGstatus. - Worker goroutines poll for available tasks.
- A worker atomically claims a pending task.
- The matching handler executes the task payload.
- The task is marked
COMPLETED, retried with backoff, or markedDEAD. - Heartbeats keep running tasks alive while they are executing.
- The reaper requeues stale running tasks or moves them to dead-letter state.
- Task status changes are recorded in
task_logs.
The service exposes health endpoints to help with orchestration and monitoring.
-
GET /api/v1/livezLiveness probe. Returns200when the process is running. -
GET /api/v1/readyzReadiness probe. Typically checks database connectivity and startup gates. -
GET /api/v1/healthAlias of readiness in the current API layout.
GET /metrics
Returns metrics in Prometheus text exposition format.
The endpoint includes:
- task lifecycle counters
- current queue state from PostgreSQL
- worker counts
- inflight execution counts
Example metric families:
tasks_enqueued_totaltasks_completed_totaltasks_attempt_failures_totaltasks_retried_totaltasks_dead_totaltasks_canceled_totaltasks_reaped_totaltasks_by_status{status="PENDING"}tasks_pending_breakdown{status="ready"}workers_totalworkers_inflight
The API is still evolving, but currently includes:
POST /api/v1/tasksGET /api/v1/tasks/{id}GET /api/v1/tasksDELETE /api/v1/tasks/{id}POST /api/v1/tasks/{id}/retry
More endpoints may be added later, including:
- richer filtering and pagination options
- task introspection/debug endpoints
- per-task execution history
- operational admin endpoints
Dhara currently supports the following task states:
PENDINGRUNNINGCOMPLETEDCANCELEDDEAD
PENDINGtasks are eligible for workers to claim.RUNNINGtasks are being processed by a worker.COMPLETEDtasks finished successfully.CANCELEDtasks were canceled before execution.DEADtasks exhausted retries or were explicitly marked unrecoverable.
Dhara includes basic failure recovery mechanics:
- failed tasks can be retried with exponential backoff
- running tasks send periodic heartbeats
- stale running tasks can be reaped and requeued
- tasks that exceed retry limits can be marked dead
This is intentionally simple today, but it mirrors the core mechanics used by production queue systems.
By default, the server runs migrations on startup when AUTO_MIGRATE=true.
To run them manually (or if AUTO_MIGRATE=false):
go run ./cmd/migrateThe MIGRATIONS_DIR environment variable controls the directory (default internal/db/migrations).
go run ./cmd/serverMake sure DHARA_DATABASE_URL is set.
The server will:
- open the database pool
- run migrations when
AUTO_MIGRATE=true - initialize task storage
- start the worker pool
- serve HTTP on
:8080
The current demo handlers include:
echosend_emailalways_failslow_task
These are intentionally simple and are meant to exercise the worker and recovery flow.
Dhara uses a registry that maps task type strings to handler function. To add your own:
-
Implement a handler with signature
func(ctx context.Context, payload json.RawMessage) error.Example handler (e.g.,
internal/tasks/custom_handlers.go):import ( "context" "encoding/json" "fmt" "github.com/md-talim/dhara/internal/ctxlog" ) func WelcomeEmail(ctx context.Context, payload json.RawMessage) error { var p struct { To string `json:"to"` Subject string `json:"subject"` } if err := json.Unmarshal(payload, &p); err != nil { return fmt.Errorf("invalid payload: %w", err) } ctxlog.From(ctx).Info("sending welcome email", "to", p.To, "subject", p.Subject) return nil }
-
Register your handler in
cmd/server/main.go:registry := tasks.NewRegistry(map[string]tasks.HandlerFunc{ "echo": tasks.Echo, "send_email": tasks.SendEmail, "welcome_email": tasks.WelcomeEmail, }) application, err := app.NewApplication(start, cfg, logger, registry)
-
Submit tasks with
"type": "welcome_email"in the API request.
See internal/tasks/demo_handlers.go`` for more examples.
Dhara is being built with the following goals in mind:
- correctness first
- PostgreSQL as the durable queue backend
- clear task state transitions
- observable worker behavior
- minimal dependencies
- no HTTP framework, just Go’s standard library
- production-style operational visibility without external hosted services
This repository is not finished yet.
Package names, structure, and APIs may be refactored as development continues.
- stronger retry semantics with jitter
- improved dead-letter handling
- task execution histograms
- better queue latency metrics
- more complete health/readiness gates
- richer operational dashboards
- more robust cancellation semantics
- stronger validation and test coverage
- clearer startup and wiring structure
- refactoring around application bootstrap and lifecycle management
TBD
