Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
# NOTE: https://github.com/golangci/golangci-lint-action/releases/tag/v9.2.0
uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20
with:
version: v2.8.0
version: v2.11.3
vet:
name: Vet
permissions:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
# NOTE: https://github.com/golangci/golangci-lint-action/releases/tag/v9.2.0
uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20
with:
version: v2.8.0
version: v2.11.3
vet:
name: Vet
permissions:
Expand Down
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ linters:
text: unused-parameter
- linters:
- errcheck
- gosec
path: ^e2e/
- linters:
- forbidigo
Expand Down
5 changes: 3 additions & 2 deletions docs/src/pages/docs/configuration.astro
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ profiles:

<hr />

<h2>Log Streaming Buffer</h2>
<h2>Log Streaming</h2>

<CodeEditor title="fuku.yaml" lang="yaml" code={`logs:
buffer: 1000 # Socket log streaming buffer size (default: 1000)`} />
buffer: 1000 # Broadcast channel depth for socket log streaming (default: 1000)
history: 5000 # Number of recent log messages kept in memory for replay (default: 5000)`} />

<hr />

Expand Down
26 changes: 26 additions & 0 deletions e2e/default_tier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,29 @@ func Test_DefaultTier_LogsCommand(t *testing.T) {
assert.Contains(t, output, "services:")
assert.Contains(t, output, "2 running")
}

func Test_DefaultTier_LogsReplay(t *testing.T) {
runner := NewRunner(t, "testdata/default-tier")
defer runner.Stop()

err := runner.Start("default")
require.NoError(t, err)

err = runner.WaitForRunning(15 * time.Second)
require.NoError(t, err)

logsRunner := NewLogsRunner(t, "testdata/default-tier")
defer logsRunner.Stop()

err = logsRunner.Start("default")
require.NoError(t, err)

err = logsRunner.WaitForLog("service_ready", 5*time.Second)
require.NoError(t, err)

output := logsRunner.Output()

assert.Contains(t, output, "service_ready")
assert.Contains(t, output, "profile:")
assert.Contains(t, output, "default")
}
1 change: 1 addition & 0 deletions internal/app/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var (
ErrInvalidRetryAttempts = errors.New("retry attempts must be greater than 0")
ErrInvalidRetryBackoff = errors.New("retry backoff must not be negative")
ErrInvalidLogsBuffer = errors.New("logs buffer must be greater than 0")
ErrInvalidLogsHistory = errors.New("logs history must be greater than 0")

ErrProfileNotFound = errors.New("profile not found")
ErrUnsupportedProfileFormat = errors.New("unsupported profile format")
Expand Down
1 change: 1 addition & 0 deletions internal/app/logs/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
func Test_MessageType_Constants(t *testing.T) {
assert.Equal(t, MessageSubscribe, MessageType("subscribe"))
assert.Equal(t, MessageLog, MessageType("log"))
assert.Equal(t, MessageStatus, MessageType("status"))
}

func Test_SubscribeRequest_Marshal(t *testing.T) {
Expand Down
61 changes: 59 additions & 2 deletions internal/app/logs/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,63 @@ func (c *ClientConn) ShouldReceive(service string) bool {
return c.Services[service]
}

// ringBuffer is a fixed-size circular buffer for log message history
type ringBuffer struct {
items []LogMessage
head int
count int
}

// newRingBuffer creates a ring buffer with the given capacity
func newRingBuffer(capacity int) *ringBuffer {
return &ringBuffer{
items: make([]LogMessage, capacity),
}
}

// push adds a message, overwriting the oldest if full
func (r *ringBuffer) push(msg LogMessage) {
r.items[r.head] = msg

r.head = (r.head + 1) % len(r.items)
if r.count < len(r.items) {
r.count++
}
}
Comment on lines +59 to +74

// forEach iterates from oldest to newest
func (r *ringBuffer) forEach(fn func(LogMessage)) {
if r.count == 0 {
return
}

start := (r.head - r.count + len(r.items)) % len(r.items)
for i := range r.count {
fn(r.items[(start+i)%len(r.items)])
}
}

// hub implements the Hub interface
type hub struct {
clients map[*ClientConn]bool
register chan *ClientConn
unregister chan *ClientConn
broadcast chan LogMessage
done chan struct{}
history *ringBuffer
log logger.Logger
dropped atomic.Int64
}

// NewHub creates a new Hub instance with the specified buffer size
func NewHub(bufferSize int, log logger.Logger) Hub {
// NewHub creates a new Hub instance with the specified buffer and history sizes
func NewHub(bufferSize int, historySize int, log logger.Logger) Hub {
return &hub{
clients: make(map[*ClientConn]bool),
register: make(chan *ClientConn),
unregister: make(chan *ClientConn),
broadcast: make(chan LogMessage, bufferSize),
done: make(chan struct{}),
history: newRingBuffer(historySize),
log: log,
}
}
Expand Down Expand Up @@ -125,12 +163,31 @@ func (h *hub) Run(ctx context.Context) {
}
case client := <-h.register:
h.clients[client] = true

replayed := 0

h.history.forEach(func(msg LogMessage) {
if !client.ShouldReceive(msg.Service) {
return
}

select {
case client.SendChan <- msg:
replayed++
default:
h.dropped.Add(1)
}
})

h.log.Debug().Msgf("Client %s registered, replayed %d messages", client.ID, replayed)
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
close(client.SendChan)
delete(h.clients, client)
}
case msg := <-h.broadcast:
h.history.push(msg)

for client := range h.clients {
if client.ShouldReceive(msg.Service) {
select {
Expand Down
Loading
Loading