Skip to content

add replication monitoring endpoint to unix sock#1057

Open
mrkurt wants to merge 2 commits intobenbjohnson:mainfrom
mrkurt:mrkurt/replication-monitor
Open

add replication monitoring endpoint to unix sock#1057
mrkurt wants to merge 2 commits intobenbjohnson:mainfrom
mrkurt:mrkurt/replication-monitor

Conversation

@mrkurt
Copy link
Copy Markdown
Contributor

@mrkurt mrkurt commented Jan 27, 2026

Add a monitoring endpoint to the unix socket server that streams replication status when LTX files are uploaded. This enables real-time monitoring of replication progress.

Changes:

  • Add OnSync callback to Replica that fires when LTX files are uploaded
  • Add StatusMonitor for coordinating event streaming to subscribers
  • Add GET /monitor endpoint to server with NDJSON streaming
  • Add litestream monitor CLI command to consume the stream
  • Add comprehensive tests for all new functionality

Usage:
litestream monitor -socket /var/run/litestream.sock

Add a monitoring endpoint to the Unix socket IPC server that streams
replication status via NDJSON when LTX files are uploaded. This enables
real-time monitoring of replication progress.

Changes:
- Add OnSync callback to Replica that fires when LTX files are uploaded
- Add StatusMonitor for coordinating event streaming to subscribers
- Add GET /monitor endpoint to server with NDJSON streaming
- Add litestream monitor CLI command to consume the stream
- Add comprehensive tests for all new functionality

Usage:
  litestream monitor -socket /var/run/litestream.sock
Copy link
Copy Markdown
Owner

@benbjohnson benbjohnson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good. A couple tweaks needed.

Comment thread replica.go Outdated
Comment on lines +57 to +60
// OnSync is called after LTX files are uploaded to the replica.
// Only fires when actual replication occurs (TXID advances).
// May be nil.
OnSync func(pos ltx.Pos)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename this to AfterSync? I could it being useful to have a BeforeSync as well.

Comment thread server.go Outdated
Comment on lines +230 to +243
// Send initial full status
event := &StatusEvent{
Type: "full",
Timestamp: time.Now(),
Databases: s.StatusMonitor.GetFullStatus(),
}
if err := json.NewEncoder(w).Encode(event); err != nil {
return
}
flusher.Flush()

// Subscribe to updates
ch := s.StatusMonitor.Subscribe()
defer s.StatusMonitor.Unsubscribe(ch)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should subscribe before you send the full status so you don't lose any incremental statuses in between.

Comment thread status.go Outdated
store *Store

mu sync.RWMutex
subscribers map[chan *StatusEvent]struct{}
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be a little more flexible to have these be a type like *StatusSubscriber and then have a Ch field or method that has the channel. Or we could implement that later if we need to expand the functionality.

Comment thread status.go Outdated
select {
case ch <- event:
default:
// Drop if subscriber is slow
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should close the subscriber's channel and remove it rather than dropping the event. Otherwise we won't know if we're missing events in a stream under load.

Comment thread status.go
Comment on lines +137 to +143
// StatusEvent represents a single NDJSON event sent to monitoring clients.
type StatusEvent struct {
Type string `json:"type"` // "full" or "sync"
Timestamp time.Time `json:"timestamp"`
Database *DatabaseStatus `json:"database,omitempty"` // for "sync" events
Databases []DatabaseStatus `json:"databases,omitempty"` // for "full" events
}
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it makes more sense to have each StatusEvent be for a single database and just issue one for each database when we do our initial "full" broadcast?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that makes sense to me.

@benbjohnson benbjohnson marked this pull request as ready for review January 29, 2026 21:41
@corylanou
Copy link
Copy Markdown
Collaborator

Hey @mrkurt! Thanks so much for putting this together — the replication monitoring endpoint is a great addition and we really appreciate the effort you've put into it.

Just wanted to check in and see if you're still interested in continuing with the requested changes? No pressure at all — if you're busy or have moved on, we'd be happy to pick it up from here and take it across the finish line.

Either way, thanks again for the contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants