Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 57 additions & 24 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,6 @@ func (c *ctrlBackend) handleRequest(ctx context.Context, req any) {
}
}

func sendWorker(ctx context.Context, ch chan Resource, r Resource) {
r.SetBusy(true)
select {
case <-ctx.Done():
case ch <- r:
}
}

func sendWorkerSynchronous(ctx context.Context, ch chan synchronousRequest, r synchronousRequest) {
r.resource.SetBusy(true)
select {
Expand Down Expand Up @@ -159,26 +151,67 @@ func (c *ctrlBackend) loop(ctx context.Context, readywg, donewg *sync.WaitGroup)
readywg.Done()
defer c.traceSink.Put(ctx, "httprc controller: stopping main controller loop")
defer donewg.Done()

var pending []Resource
for {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: waiting for request or tick (tick interval=%s)", c.tickInterval))
select {
case req := <-c.incoming:
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: got request %T", req))
c.handleRequest(ctx, req)
case t := <-c.check.C:
c.periodicCheck(ctx, t)
case <-ctx.Done():
return
if len(pending) > 0 {
// Dispatch pending items while remaining responsive to incoming
// requests. This prevents a deadlock where periodicCheck blocks
// on c.outgoing while a worker blocks on c.incoming (issue #113).

// Skip resources that were removed (or replaced) after periodicCheck
// queued them. Without this check, a stale resource could be sent to
// a worker, causing an unnecessary fetch and a subsequent
// adjustIntervalRequest for a resource that is no longer registered.
r := pending[0]
// Compare interface values directly. This is safe because all
// Resource implementations are pointer types (*ResourceBase[T]),
// so the comparison is a pointer identity check.
if cur, ok := c.items[r.URL()]; !ok || cur != r {
Comment thread
lestrrat marked this conversation as resolved.
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: skipping pending resource %q (no longer registered or replaced)", r.URL()))
r.SetBusy(false)
Comment thread
lestrrat marked this conversation as resolved.
pending = pending[1:]
continue
}

c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: dispatching pending resource %q to worker pool (%d remaining)", pending[0].URL(), len(pending)))
select {
case req := <-c.incoming:
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: got request %T (while dispatching)", req))
c.handleRequest(ctx, req)
case c.outgoing <- pending[0]:
Comment thread
lestrrat marked this conversation as resolved.
pending = pending[1:]
Comment thread
lestrrat marked this conversation as resolved.
case t := <-c.check.C:
pending = append(pending, c.periodicCheck(ctx, t)...)
case <-ctx.Done():
return
}
} else {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: waiting for request or tick (tick interval=%s)", c.tickInterval))
select {
case req := <-c.incoming:
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: got request %T", req))
c.handleRequest(ctx, req)
case t := <-c.check.C:
pending = c.periodicCheck(ctx, t)
case <-ctx.Done():
return
Comment thread
lestrrat marked this conversation as resolved.
}
}
}
}

func (c *ctrlBackend) periodicCheck(ctx context.Context, t time.Time) {
// periodicCheck examines all registered resources and returns those that are
// due for refresh. Items are marked busy here so they won't be selected again
// on the next tick. The caller (loop) is responsible for dispatching them to
// the worker pool, interleaved with incoming request handling, to avoid the
// deadlock described in https://github.com/lestrrat-go/httprc/issues/113.
func (c *ctrlBackend) periodicCheck(ctx context.Context, t time.Time) []Resource {
c.traceSink.Put(ctx, "httprc controller: START periodic check")
defer c.traceSink.Put(ctx, "httprc controller: END periodic check")
var minNext time.Time
var dispatched int
minInterval := -1 * time.Second
var toDispatch []Resource
for _, item := range c.items {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: checking resource %q", item.URL()))

Expand All @@ -196,14 +229,13 @@ func (c *ctrlBackend) periodicCheck(ctx context.Context, t time.Time) {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resource %q is busy or not ready yet, skipping", item.URL()))
continue
}
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resource %q is ready, dispatching to worker pool", item.URL()))
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: resource %q is ready, queuing for dispatch", item.URL()))

dispatched++
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: dispatching resource %q to worker pool", item.URL()))
sendWorker(ctx, c.outgoing, item)
item.SetBusy(true)
toDispatch = append(toDispatch, item)
}

c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: dispatched %d resources", dispatched))
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: queued %d resources for dispatch", len(toDispatch)))

// Next check is always at the earliest next check + 1 second.
// The extra second makes sure that we are _past_ the actual next check time
Expand All @@ -223,6 +255,7 @@ func (c *ctrlBackend) periodicCheck(ctx context.Context, t time.Time) {
}

c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: next check in %s", c.tickInterval))
return toDispatch
}

func (c *ctrlBackend) SetTickInterval(d time.Duration) {
Expand Down
105 changes: 105 additions & 0 deletions httprc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,3 +859,108 @@ func TestIntegration_multiple_ready_calls_after_err_not_ready(t *testing.T) {
require.NoError(t, err, "should still be able to get data")
require.Equal(t, "ok", data2["status"])
}

// TestPeriodicCheckDeadlock reproduces the deadlock described in
// https://github.com/lestrrat-go/httprc/issues/113
//
// The deadlock occurs when:
// 1. A periodic check in the controller iterates ready resources and
// queues work for them by sending items on c.outgoing.
// 2. c.outgoing is a buffered channel with capacity numWorkers+1. With
// numWorkers=1, it can hold 2 items. With 1 worker and N>2 ready
// resources, the controller eventually blocks trying to send the 3rd
// item to c.outgoing once the buffer is full.
// 3. The worker that picked up the 1st item finishes and attempts to send
// a control message (such as an interval adjustment) to w.incoming
// (= c.incoming).
// 4. But c.incoming is read by the controller loop, which is currently
// blocked trying to send to c.outgoing.
// 5. Circular wait → deadlock.
//
// The test registers multiple resources with 1 worker and short refresh
// intervals, waits for a periodic check to fire, then attempts to register
// a new resource (which also sends to c.incoming). If the deadlock is
// present, the new registration will hang until the context deadline.
func TestPeriodicCheckDeadlock(t *testing.T) {
t.Parallel()

const (
numResources = 6
numWorkers = 1
minRefreshInterval = time.Second
maxRefreshInterval = 2 * time.Second
)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"ok"}`))
}))
t.Cleanup(srv.Close)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

traceDst := io.Discard
if testing.Verbose() {
traceDst = os.Stderr
}

cl := httprc.NewClient(
httprc.WithWorkers(numWorkers),
httprc.WithTraceSink(tracesink.NewSlog(slog.New(slog.NewJSONHandler(traceDst, nil)))),
)
ctrl, err := cl.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { ctrl.Shutdown(time.Second) })

// Register N resources, all with the same short refresh interval.
// Each resource uses a distinct URL path so they are treated as separate items.
for i := range numResources {
url := srv.URL + "/resource/" + strconv.Itoa(i)
r, err := httprc.NewResource[map[string]string](
url,
httprc.JSONTransformer[map[string]string](),
httprc.WithMinInterval(minRefreshInterval),
httprc.WithMaxInterval(maxRefreshInterval),
)
require.NoError(t, err, "NewResource should succeed for resource %d", i)

addCtx, addCancel := context.WithTimeout(ctx, 10*time.Second)
err = ctrl.Add(addCtx, r)
addCancel()
require.NoError(t, err, "Add should succeed for resource %d", i)
}

// Wait long enough for all resources to become due for periodic refresh.
// maxRefreshInterval + 1s gives headroom for the tick to fire.
time.Sleep(maxRefreshInterval + time.Second)

// Now attempt to register a new resource. This sends an addRequest to
// c.incoming. If the deadlock is present, periodicCheck is blocked
// sending to c.outgoing while the worker is blocked sending to
// c.incoming, so this Add will hang.
newURL := srv.URL + "/resource/new"
newR, err := httprc.NewResource[map[string]string](
newURL,
httprc.JSONTransformer[map[string]string](),
httprc.WithMinInterval(minRefreshInterval),
httprc.WithMaxInterval(maxRefreshInterval),
)
require.NoError(t, err, "NewResource should succeed for new resource")

addCtx, addCancel := context.WithTimeout(ctx, 5*time.Second)
defer addCancel()

// Use WithWaitReady(false) so that a timeout here can only be caused by
// the controller deadlock, not by a slow initial fetch under backlog.
err = ctrl.Add(addCtx, newR, httprc.WithWaitReady(false))

// Before fix: context.DeadlineExceeded (Add hangs for 5s, then times out)
// After fix: succeeds promptly
require.NoError(t, err, "Add should not deadlock; got timeout indicating periodicCheck deadlock (issue #113)")
Comment thread
lestrrat marked this conversation as resolved.

// Verify the new resource is accessible
existing, lookupErr := ctrl.Lookup(ctx, newURL)
require.NoError(t, lookupErr, "Lookup should find the newly added resource")
require.Equal(t, newURL, existing.URL())
}
1 change: 1 addition & 0 deletions proxysink/proxysink.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (p *Proxy[T]) flushloop(ctx context.Context) {
p.mu.Unlock()
return
}
p.mu.Unlock()
default:
}

Expand Down