add replication monitoring endpoint to unix sock#1057
add replication monitoring endpoint to unix sock#1057mrkurt wants to merge 2 commits intobenbjohnson:mainfrom
Conversation
Add a monitoring endpoint to the Unix socket IPC server that streams replication status via NDJSON when LTX files are uploaded. This enables real-time monitoring of replication progress. Changes: - Add OnSync callback to Replica that fires when LTX files are uploaded - Add StatusMonitor for coordinating event streaming to subscribers - Add GET /monitor endpoint to server with NDJSON streaming - Add litestream monitor CLI command to consume the stream - Add comprehensive tests for all new functionality Usage: litestream monitor -socket /var/run/litestream.sock
benbjohnson
left a comment
There was a problem hiding this comment.
Mostly looks good. A couple tweaks needed.
| // OnSync is called after LTX files are uploaded to the replica. | ||
| // Only fires when actual replication occurs (TXID advances). | ||
| // May be nil. | ||
| OnSync func(pos ltx.Pos) |
There was a problem hiding this comment.
Can you rename this to AfterSync? I could it being useful to have a BeforeSync as well.
| // Send initial full status | ||
| event := &StatusEvent{ | ||
| Type: "full", | ||
| Timestamp: time.Now(), | ||
| Databases: s.StatusMonitor.GetFullStatus(), | ||
| } | ||
| if err := json.NewEncoder(w).Encode(event); err != nil { | ||
| return | ||
| } | ||
| flusher.Flush() | ||
|
|
||
| // Subscribe to updates | ||
| ch := s.StatusMonitor.Subscribe() | ||
| defer s.StatusMonitor.Unsubscribe(ch) |
There was a problem hiding this comment.
You should subscribe before you send the full status so you don't lose any incremental statuses in between.
| store *Store | ||
|
|
||
| mu sync.RWMutex | ||
| subscribers map[chan *StatusEvent]struct{} |
There was a problem hiding this comment.
It'd be a little more flexible to have these be a type like *StatusSubscriber and then have a Ch field or method that has the channel. Or we could implement that later if we need to expand the functionality.
| select { | ||
| case ch <- event: | ||
| default: | ||
| // Drop if subscriber is slow |
There was a problem hiding this comment.
You should close the subscriber's channel and remove it rather than dropping the event. Otherwise we won't know if we're missing events in a stream under load.
| // StatusEvent represents a single NDJSON event sent to monitoring clients. | ||
| type StatusEvent struct { | ||
| Type string `json:"type"` // "full" or "sync" | ||
| Timestamp time.Time `json:"timestamp"` | ||
| Database *DatabaseStatus `json:"database,omitempty"` // for "sync" events | ||
| Databases []DatabaseStatus `json:"databases,omitempty"` // for "full" events | ||
| } |
There was a problem hiding this comment.
Do you think it makes more sense to have each StatusEvent be for a single database and just issue one for each database when we do our initial "full" broadcast?
There was a problem hiding this comment.
yeah that makes sense to me.
|
Hey @mrkurt! Thanks so much for putting this together — the replication monitoring endpoint is a great addition and we really appreciate the effort you've put into it. Just wanted to check in and see if you're still interested in continuing with the requested changes? No pressure at all — if you're busy or have moved on, we'd be happy to pick it up from here and take it across the finish line. Either way, thanks again for the contribution! |
Add a monitoring endpoint to the unix socket server that streams replication status when LTX files are uploaded. This enables real-time monitoring of replication progress.
Changes:
Usage:
litestream monitor -socket /var/run/litestream.sock