This document explains every aspect of the library and summarizes practical recommendations for production use.
- Solve the dual-write problem using a transactional outbox.
- Stay polling-based without sacrificing throughput on MySQL 8.0+.
- Provide clean, SOLID abstractions for future adapters (Postgres, SQL Server, etc).
outbox.Entry describes a new outbox row to be persisted inside your business transaction.
Required fields:
AggregateType(logical stream name, e.g.order)EventType(event name, e.g.order.created)Payload(valid JSON, stored in MySQL JSON column)
Optional fields:
AggregateID(stream instance id)Headers(JSON metadata)ID(if zero, UUID v7 is generated)
outbox.Record is the stored representation returned by polling. It includes:
ID,AggregateType,AggregateID,EventTypePayload,HeadersCreatedAt,Attempts
Records move through these states:
StatusPending(0): ready for processingStatusProcessed(1): processed successfullyStatusDead(-1): exceeded retry attempts and moved to DLQ
- Start a business transaction.
- Write domain data and call
Store.Enqueuein the same transaction. - Commit the transaction.
- A
Relaypolls, locks, processes, and updates outbox rows.
At-least-once delivery means handlers must be idempotent.
Handler processes a single record:
err := handler.Handle(ctx, record)Errors trigger retry accounting. You can register an optional FailureHandler for logging/metrics.
Consumer fetches a locked batch and returns a Batch handle. The batch is responsible for:
Ack(mark processed)Fail(increment attempts, possibly dead-letter)Commit/Rollback
mysql.Store implements Consumer.
Contract note: Fetch must return ErrNoRecords when there is no work and must not return an empty batch. An empty batch is treated as ErrEmptyBatch.
Relay coordinates polling and processing:
Runruns worker goroutines that fetch batches and dispatch to the handler.ProcessOnceprocesses a single batch.
Useful options:
WithHandlerTimeoutto cap per-record processing time.WithFailureClassifierto decide retry vs dead-letter on errors.WithLogger/WithMetricsto plug in observability.WithPendingIntervalto enable pending sampling and set the minimum interval.
The adapter uses a single optimized query:
SELECT id, aggregate_type, aggregate_id, event_type, payload, headers
FROM outbox
WHERE status = 0
AND created_ts >= ? -- optional for pruning
ORDER BY id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;Key properties:
READ COMMITTEDavoids gap locks and blocking inserts.SKIP LOCKEDallows multiple workers to progress without waiting.ORDER BY idbenefits from UUID v7 ordering.LIMITkeeps transactions short.
Store.Enqueue uses the provided executor (typically *sql.Tx) so you can ensure atomicity with your domain write.
JSON validation is enabled by default; disable it for high-throughput scenarios with mysql.WithValidateJSON(false) or use mysql.WithValidatePayload / mysql.WithValidateHeaders for granular control.
If you do not want JSON payloads, use the binary schema helpers and store raw bytes:
schema, err := mysql.SchemaBinary("outbox")Partitioned variant:
schema, err := mysql.PartitionedSchemaBinary("outbox", partitions)When using binary payloads, disable payload validation:
store, err := mysql.NewStore(db, mysql.WithValidatePayload(false))Each Fail call:
- increments
attempt_count - sets
last_error(truncated to 1024 chars) - keeps
status = pendinguntilattempt_countreachesMaxAttempts, then setsstatus = dead
Relay can classify failures with FailureClassifier. When it returns:
FailureRetry:Failis called and attempts are incremented.FailureDead:Deadis called (if the batch supportsDeadBatch), otherwise it falls back toFail.
To inspect dead-lettered rows:
SELECT * FROM outbox WHERE status = -1 ORDER BY created_at DESC;- UUID v7 is time ordered (48-bit Unix milliseconds prefix).
BINARY(16)halves index size vsCHAR(36)and improves cache locality.ORDER BY idis chronological for UUID v7.
The schema includes a created_ts column derived from UUID v7:
created_ts BIGINT GENERATED ALWAYS AS (CONV(SUBSTR(HEX(id), 1, 12), 16, 10) DIV 1000) STORED- It stores Unix seconds and supports partition pruning.
- MySQL requires
STOREDwhen the column participates in the primary key. - The expression is evaluated on insert. For very high ingest rates, budget CPU accordingly.
INDEX idx_status_id (status, id) matches the polling query and avoids filesort.
Use range partitions on created_ts and drop old partitions instead of deleting rows:
ALTER TABLE outbox DROP PARTITION p202501;This is instant and avoids expensive delete/undo work.
Always keep a tail partition pmax VALUES LESS THAN (MAXVALUE). It prevents INSERT failures if a new partition was not created in time and lets the maintainer split it safely.
Pre-create future partitions and drop expired ones on a schedule. When you keep pmax, you must split it with REORGANIZE to insert new ranges:
-- Add tomorrow's partition (UTC) by splitting pmax.
ALTER TABLE outbox REORGANIZE PARTITION pmax INTO (
PARTITION p20250303 VALUES LESS THAN (UNIX_TIMESTAMP('2025-03-04')),
PARTITION pmax VALUES LESS THAN (MAXVALUE)
);
-- Drop partitions outside your retention window.
ALTER TABLE outbox DROP PARTITION p20250201;Cron example (pseudo):
0 2 * * * mysql -u app -p*** appdb -e "ALTER TABLE outbox REORGANIZE PARTITION pmax INTO (PARTITION p$(date -u +%Y%m%d) VALUES LESS THAN (UNIX_TIMESTAMP(DATE_ADD(CURDATE(), INTERVAL 1 DAY))), PARTITION pmax VALUES LESS THAN (MAXVALUE));"
mysql.PartitionMaintainer keeps partitions ahead of time and optionally drops old ones. It uses GET_LOCK on a single session, reads information_schema.PARTITIONS, and splits pmax via REORGANIZE PARTITION so missing ranges can be inserted before MAXVALUE.
Partition names are generated as:
- daily:
pYYYYMMDD - monthly:
pYYYYMM
Use this when:
- your service runs 24/7,
- the DB user can run
ALTER TABLE, - you want the simplest deployment with no extra infrastructure.
Permissions required:
ALTERon the outbox table.SELECToninformation_schema.PARTITIONS.- ability to call
GET_LOCK/RELEASE_LOCK(built-in MySQL functions).
Operational note: ALTER TABLE can take a metadata lock, so keep CheckEvery reasonably large (hourly/daily) and batch changes.
Operational note: DDL operations are logged at Info level via the maintainer logger.
Example:
maintainer, err := mysql.NewPartitionMaintainer(db, mysql.PartitionMaintainerConfig{
Table: "outbox",
Period: mysql.PartitionDay,
Lookahead: 30 * 24 * time.Hour,
CheckEvery: time.Hour,
Retention: 7 * 24 * time.Hour, // optional
})
if err != nil {
return err
}
go func() {
_ = maintainer.Run(ctx)
}()Use the CLI when ALTER privileges cannot be granted to the app or when you want a dedicated ops job:
cd cmd/outbox-partitions
go run . \
-dsn "user:pass@tcp(localhost:3306)/app?parseTime=true" \
-table outbox \
-period day \
-lookahead 720h \
-retention 168h \
-onceIf you do not use partitioning, cleanup requires periodic batched deletes. This is less efficient than dropping partitions, but it keeps tables bounded.
Use Store.Cleanup when you want to trigger cleanup manually:
result, err := store.Cleanup(ctx, mysql.CleanupOptions{
Before: time.Now().Add(-7 * 24 * time.Hour),
Limit: 10000,
IncludeDead: true,
})
if err != nil {
return err
}
_ = resultFor automated cleanup in the application, use mysql.CleanupMaintainer:
maintainer, err := mysql.NewCleanupMaintainer(db, mysql.CleanupMaintainerConfig{
Table: "outbox",
Retention: 7 * 24 * time.Hour,
CheckEvery: time.Hour,
Limit: 10000,
IncludeDead: true,
})
if err != nil {
return err
}
go func() {
_ = maintainer.Run(ctx)
}()Standalone CLI (cron / CronJob):
cd cmd/outbox-cleanup
go run . \
-dsn "user:pass@tcp(localhost:3306)/app?parseTime=true" \
-table outbox \
-retention 168h \
-limit 10000 \
-include-dead \
-onceOperational note: batched deletes can still create I/O and undo pressure. For large tables, prefer partitioning or schedule cleanup during off-peak hours. If cleanup becomes slow, consider adding composite indexes for the cutoff columns (for example, (status, processed_at) or (status, updated_at)).
The CLI reuses the same maintainer logic, so behavior is identical; it just runs under a different user and schedule.
- Start with batch size 50 and scale to 100 based on handler latency.
- Set worker count to the number of CPU cores or slightly above.
- Keep handler work short; avoid network retries inside the DB transaction.
- Set
WithHandlerTimeoutto avoid stuck handlers. Default0means no timeout.
- Use
Relay.WithPartitionWindow(e.g. 1h) to limit scans to hot partitions. - Set partitions that match operational retention (daily or hourly partitions).
Recommended baseline for write-heavy outbox tables:
innodb_buffer_pool_size: 70-80% of RAMinnodb_io_capacity: SSD tuning (2000-5000+)innodb_flush_log_at_trx_commit: use 1 for strict durabilityinnodb_log_file_size: large enough for peak write throughputmax_allowed_packet: increase if JSON payloads can be large
- Provide a
FailureHandlerto capture metrics and logs. - Route
status = -1rows to a manual DLQ workflow.
- Track counts for
pending,processed,dead. - Measure
attempt_countdistribution and handler latency. - Plug in your logger/metrics via
WithLoggerandWithMetrics. - Pending sampling is disabled by default.
- If the consumer implements
PendingCounter(MySQL does),Relaysamples pending counts when you enableWithPendingIntervaland reports them viaMetrics.SetPending.
Example stub:
type relayMetrics struct{}
func (relayMetrics) ObserveBatchDuration(time.Duration) {}
func (relayMetrics) AddProcessed(int) {}
func (relayMetrics) AddErrors(int) {}
func (relayMetrics) AddRetries(int) {}
func (relayMetrics) AddDead(int) {}
func (relayMetrics) SetPending(int) {}
relay := outbox.NewRelay(store, handler, outbox.WithMetrics(relayMetrics{}), outbox.WithPendingInterval(5*time.Second))Implement these interfaces in a new package (for example postgres):
ConsumerandBatchfor polling + lockingEnqueueusing a transaction executor
Keep the same contract:
Fetchreturns locked rows in a transaction (orErrNoRecords)AckandFailupdate records inside the same transactionCommitorRollbackfinalizes the batch
- Unit tests:
go test ./... - Integration tests (Docker):
go test -tags=integration ./... - Benchmarks:
go test -bench=. ./...