Open
Conversation
Enhance hub to store log history in ring buffer
There was a problem hiding this comment.
Pull request overview
This PR adds configurable log history replay for the log streaming socket by introducing an in-memory ring buffer in the logs hub, along with configuration, validation, and test updates to support the new behavior.
Changes:
- Add
logs.history(default5000) to config, docs, and validation. - Extend the logs hub to store recent log messages in a ring buffer and replay them to newly registered clients.
- Update server/client queue sizing and add unit + e2e coverage for replay behavior.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/config/validate_test.go | Adds validation test cases for logs.history bounds. |
| internal/config/validate.go | Enforces Logs.History > 0. |
| internal/config/loader_test.go | Ensures loader applies defaults and parses logs.history. |
| internal/config/constants.go | Introduces SocketLogsHistorySize default constant. |
| internal/config/config_test.go | Asserts default config includes Logs.History. |
| internal/config/config.go | Adds Logs.History to config schema and defaulting. |
| internal/app/logs/server_test.go | Updates server tests for new hub signature and adds replay/queue sizing tests. |
| internal/app/logs/server.go | Wires history size into server and sizes client queue to accommodate replay. |
| internal/app/logs/hub_test.go | Updates existing tests and adds ring buffer + replay behavior tests. |
| internal/app/logs/hub.go | Implements ring-buffer-backed history + replay-on-register. |
| internal/app/logs/broadcast_test.go | Extends message type constant coverage for status. |
| internal/app/errors/errors.go | Adds ErrInvalidLogsHistory. |
| e2e/suite.go | Adds gosec suppression comment for e2e helpers. |
| e2e/default_tier_test.go | Adds an e2e test asserting replay surfaces prior logs. |
| docs/src/pages/docs/configuration.astro | Documents logs.buffer and new logs.history setting. |
There was a problem hiding this comment.
Pull request overview
This PR adds log replay support by introducing an in-memory log history (ring buffer) in the logs hub and exposing/configuring it via logs.history in fuku.yaml.
Changes:
- Add
logs.historyconfiguration with defaults, validation, loader coverage, and documentation. - Extend the logs hub/server to retain recent log messages and replay them to newly connected clients.
- Expand unit/e2e coverage for replay behavior, and update CI/lint configuration accordingly.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
internal/config/config.go |
Adds Logs.History to config schema and default value wiring. |
internal/config/constants.go |
Introduces SocketLogsHistorySize default. |
internal/config/validate.go |
Validates logs.history > 0. |
internal/app/errors/errors.go |
Adds ErrInvalidLogsHistory. |
internal/config/config_test.go |
Verifies DefaultConfig() sets history default. |
internal/config/validate_test.go |
Adds validation cases for invalid history values. |
internal/config/loader_test.go |
Ensures history loads from YAML and defaults properly. |
internal/app/logs/hub.go |
Adds ring-buffer history and replays history on client registration. |
internal/app/logs/server.go |
Sizes client send queue for replay + live logs; refactors write loop into writePump. |
internal/app/logs/hub_test.go |
Adds tests for history storage, replay behavior, and ring buffer behavior. |
internal/app/logs/server_test.go |
Updates hub construction signature and adds server-level replay tests. |
internal/app/logs/broadcast_test.go |
Expands message-type constant assertions to include status. |
e2e/default_tier_test.go |
Adds an e2e test intended to validate replay-visible log output. |
docs/src/pages/docs/configuration.astro |
Documents logs.history and clarifies logs.buffer. |
.golangci.yaml |
Excludes gosec from e2e/ (incl. non-*_test.go files). |
.github/workflows/checks.yaml |
Bumps golangci-lint version used in PR checks. |
.github/workflows/master.yaml |
Bumps golangci-lint version used on master branch runs. |
Comment on lines
+490
to
+492
| sentinel, err := net.Dial("unix", s.SocketPath()) | ||
| require.NoError(t, err) | ||
|
|
internal/app/logs/server_test.go
Outdated
Comment on lines
+553
to
+567
| s := &server{ | ||
| bufferSize: 100, | ||
| historySize: 500, | ||
| } | ||
|
|
||
| client := NewClientConn("test", s.bufferSize+s.historySize) | ||
| assert.Equal(t, 600, cap(client.SendChan)) | ||
|
|
||
| s2 := &server{ | ||
| bufferSize: cfg.Logs.Buffer, | ||
| historySize: 100, | ||
| } | ||
|
|
||
| client2 := NewClientConn("test2", s2.bufferSize+s2.historySize) | ||
| assert.Equal(t, cfg.Logs.Buffer+100, cap(client2.SendChan)) |
Comment on lines
+389
to
+392
| func Test_Broadcast_StoresInHistory(t *testing.T) { | ||
| log := testLogger() | ||
| ctx, cancel := context.WithCancel(t.Context()) | ||
| h := NewHub(100, 10, log).(*hub) |
Comment on lines
+464
to
+568
| func Test_Server_handleConnection_ReplayHistory(t *testing.T) { | ||
| ctrl := gomock.NewController(t) | ||
| defer ctrl.Finish() | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
|
|
||
| cfg := config.DefaultConfig() | ||
|
|
||
| mockLogger := logger.NewMockLogger(ctrl) | ||
| mockLogger.EXPECT().Info().Return(nil).AnyTimes() | ||
| mockLogger.EXPECT().Debug().Return(nil).AnyTimes() | ||
| mockLogger.EXPECT().Warn().Return(nil).AnyTimes() | ||
| mockLogger.EXPECT().Error().Return(nil).AnyTimes() | ||
|
|
||
| s := &server{ | ||
| bufferSize: cfg.Logs.Buffer, | ||
| historySize: cfg.Logs.History, | ||
| hub: NewHub(cfg.Logs.Buffer, cfg.Logs.History, mockLogger), | ||
| log: mockLogger, | ||
| } | ||
|
|
||
| err := s.Start(ctx, "test-replay", []string{"api"}) | ||
| require.NoError(t, err) | ||
|
|
||
| defer os.Remove(s.SocketPath()) | ||
|
|
||
| sentinel, err := net.Dial("unix", s.SocketPath()) | ||
| require.NoError(t, err) | ||
|
|
||
| sentinelReq := SubscribeRequest{Type: MessageSubscribe, Services: []string{"api"}} | ||
| sentinelData, err := json.Marshal(sentinelReq) | ||
| require.NoError(t, err) | ||
|
|
||
| sentinelData = append(sentinelData, '\n') | ||
|
|
||
| _, err = sentinel.Write(sentinelData) | ||
| require.NoError(t, err) | ||
|
|
||
| sentinelReader := bufio.NewReader(sentinel) | ||
|
|
||
| _, err = sentinelReader.ReadBytes('\n') | ||
| require.NoError(t, err) | ||
|
|
||
| s.Broadcast("api", "before-connect") | ||
|
|
||
| _, err = sentinelReader.ReadBytes('\n') | ||
| require.NoError(t, err) | ||
|
|
||
| conn, err := net.Dial("unix", s.SocketPath()) | ||
| require.NoError(t, err) | ||
|
|
||
| subscribeReq := SubscribeRequest{Type: MessageSubscribe, Services: []string{"api"}} | ||
| data, err := json.Marshal(subscribeReq) | ||
| require.NoError(t, err) | ||
|
|
||
| data = append(data, '\n') | ||
|
|
||
| _, err = conn.Write(data) | ||
| require.NoError(t, err) | ||
|
|
||
| reader := bufio.NewReader(conn) | ||
|
|
||
| statusLine, err := reader.ReadBytes('\n') | ||
| require.NoError(t, err) | ||
|
|
||
| var status StatusMessage | ||
|
|
||
| err = json.Unmarshal(statusLine, &status) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, MessageStatus, status.Type) | ||
|
|
||
| replayLine, err := reader.ReadBytes('\n') | ||
| require.NoError(t, err) | ||
|
|
||
| var replayMsg LogMessage | ||
|
|
||
| err = json.Unmarshal(replayLine, &replayMsg) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, MessageLog, replayMsg.Type) | ||
| assert.Equal(t, "before-connect", replayMsg.Message) | ||
|
|
||
| conn.Close() | ||
| cancel() | ||
| s.Stop() | ||
| } | ||
|
|
||
| func Test_Server_ClientQueueSizing(t *testing.T) { | ||
| cfg := config.DefaultConfig() | ||
|
|
||
| s := &server{ | ||
| bufferSize: 100, | ||
| historySize: 500, | ||
| } | ||
|
|
||
| client := NewClientConn("test", s.bufferSize+s.historySize) | ||
| assert.Equal(t, 600, cap(client.SendChan)) | ||
|
|
||
| s2 := &server{ | ||
| bufferSize: cfg.Logs.Buffer, | ||
| historySize: 100, | ||
| } | ||
|
|
||
| client2 := NewClientConn("test2", s2.bufferSize+s2.historySize) | ||
| assert.Equal(t, cfg.Logs.Buffer+100, cap(client2.SendChan)) | ||
| } |
Comment on lines
+59
to
+74
| // newRingBuffer creates a ring buffer with the given capacity | ||
| func newRingBuffer(capacity int) *ringBuffer { | ||
| return &ringBuffer{ | ||
| items: make([]LogMessage, capacity), | ||
| } | ||
| } | ||
|
|
||
| // push adds a message, overwriting the oldest if full | ||
| func (r *ringBuffer) push(msg LogMessage) { | ||
| r.items[r.head] = msg | ||
|
|
||
| r.head = (r.head + 1) % len(r.items) | ||
| if r.count < len(r.items) { | ||
| r.count++ | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Enhance hub to store log history in ring buffer