diff --git a/bitswap/network/httpnet/breaker.go b/bitswap/network/httpnet/breaker.go new file mode 100644 index 000000000..350364c0d --- /dev/null +++ b/bitswap/network/httpnet/breaker.go @@ -0,0 +1,48 @@ +package httpnet + +import ( + "sync" + "sync/atomic" +) + +// breaker holds one shared serverError counter per HTTP endpoint. +// +// Without sharing, every (peer, URL) pair has its own counter and a +// flaky host can drain the per-peer quota of every peer that maps to +// it before any breaker trips. When N peer IDs resolve to one gateway +// the host accumulates N x MaxRetries attempts before all peers +// disconnect; the shared counter trips the breaker once for the whole +// gateway. +// +// Counters are returned as pointers so callers (senderURL) can use +// atomic.Int64 directly without going through the breaker on the hot +// path. +type breaker struct { + mu sync.Mutex + counters map[string]*atomic.Int64 +} + +func newBreaker() *breaker { + return &breaker{counters: make(map[string]*atomic.Int64)} +} + +// counter returns the shared per-host serverErrors counter for key, +// allocating one on first use. +func (b *breaker) counter(key string) *atomic.Int64 { + b.mu.Lock() + defer b.mu.Unlock() + if c, ok := b.counters[key]; ok { + return c + } + c := new(atomic.Int64) + b.counters[key] = c + return c +} + +// reset drops the counter for key. Called when no peer is using the +// host so a future reconnect starts fresh. +func (b *breaker) reset(key string) { + b.mu.Lock() + delete(b.counters, key) + b.mu.Unlock() +} diff --git a/bitswap/network/httpnet/breaker_test.go b/bitswap/network/httpnet/breaker_test.go new file mode 100644 index 000000000..f14c32ca2 --- /dev/null +++ b/bitswap/network/httpnet/breaker_test.go @@ -0,0 +1,74 @@ +package httpnet + +import ( + "sync" + "testing" +) + +func TestBreaker_CounterIsSharedPerKey(t *testing.T) { + b := newBreaker() + a1 := b.counter("k") + a2 := b.counter("k") + if a1 != a2 { + t.Fatalf("counter(k) returned different pointers across calls (%p vs %p)", a1, a2) + } + a1.Add(3) + if got := a2.Load(); got != 3 { + t.Fatalf("a2 sees %d, want 3 (counters must alias)", got) + } +} + +func TestBreaker_DistinctKeysAreIndependent(t *testing.T) { + b := newBreaker() + x := b.counter("x") + y := b.counter("y") + x.Add(7) + if got := y.Load(); got != 0 { + t.Fatalf("y leaked from x: y=%d, want 0", got) + } +} + +func TestBreaker_ResetDropsCounter(t *testing.T) { + b := newBreaker() + c1 := b.counter("k") + c1.Add(5) + b.reset("k") + + // After reset, a fresh counter is allocated; it must not carry the + // old value, and it must be a different pointer than the old one. + c2 := b.counter("k") + if c2 == c1 { + t.Fatalf("reset returned the same pointer; want a fresh counter") + } + if got := c2.Load(); got != 0 { + t.Fatalf("fresh counter has value %d, want 0", got) + } + // The old pointer is still usable but orphaned: incrementing it + // must not affect the new counter. + c1.Add(1) + if got := c2.Load(); got != 0 { + t.Fatalf("new counter affected by orphaned pointer: %d, want 0", got) + } +} + +func TestBreaker_ConcurrentCounterAndReset(t *testing.T) { + // Smoke test: counter() and reset() running concurrently must not + // deadlock or race-detect. + b := newBreaker() + + var wg sync.WaitGroup + const N = 20 + for range N { + wg.Go(func() { + for range 100 { + b.counter("hot").Add(1) + } + }) + wg.Go(func() { + for range 100 { + b.reset("hot") + } + }) + } + wg.Wait() +} diff --git a/bitswap/network/httpnet/connect_test.go b/bitswap/network/httpnet/connect_test.go new file mode 100644 index 000000000..7f1205bee --- /dev/null +++ b/bitswap/network/httpnet/connect_test.go @@ -0,0 +1,121 @@ +package httpnet + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" +) + +// countingProbeServer wraps the standard test handler and counts probe +// requests (any request to /ipfs/). +func countingProbeServer(t *testing.T) (srv *httptest.Server, probes *atomic.Int32) { + t.Helper() + probes = new(atomic.Int32) + inner := &Handler{bstore: makeBlockstore(t, 0, 0)} + wrapped := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/ipfs/"+pingCid) { + probes.Add(1) + } + inner.ServeHTTP(rw, r) + }) + srv = httptest.NewUnstartedServer(wrapped) + srv.EnableHTTP2 = true + srv.StartTLS() + t.Cleanup(srv.Close) + return srv, probes +} + +// TestConnectSkipsProbeForKnownEndpoint verifies that a second peer +// resolving to the same HTTP endpoint as a previously-connected peer +// avoids issuing a fresh probe. +// +// This is the expected pattern when delegated routing returns multiple +// peer IDs for one gateway. +func TestConnectSkipsProbeForKnownEndpoint(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv, probes := countingProbeServer(t) + + mustConnectToPeer(t, ctx, htnet, peerA, srv) + if got := probes.Load(); got != 1 { + t.Fatalf("after peerA connect: got %d probes, want 1", got) + } + + mustConnectToPeer(t, ctx, htnet, peerB, srv) + if got := probes.Load(); got != 1 { + t.Fatalf("after peerB connect: got %d probes, want 1 (probe should be deduped)", got) + } + + if !htnet.IsConnectedToPeer(ctx, peerB.ID()) { + t.Errorf("peerB should be marked connected after probe dedup") + } +} + +// TestConnectInheritsHEADSupport verifies that the second peer inherits +// the HEAD-support decision from the cached endpoint state. +func TestConnectInheritsHEADSupport(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv, _ := countingProbeServer(t) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + if !supportsHave(htnet.host.Peerstore(), peerA.ID()) { + t.Fatal("peerA should support HEAD (the test server answers HEAD)") + } + + mustConnectToPeer(t, ctx, htnet, peerB, srv) + if !supportsHave(htnet.host.Peerstore(), peerB.ID()) { + t.Errorf("peerB should inherit HEAD support from peerA's cached probe") + } +} + +// TestConnectProbesNewEndpoint verifies that a peer pointing at an +// endpoint not yet in the cache still triggers a fresh probe. +func TestConnectProbesNewEndpoint(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srvA, probesA := countingProbeServer(t) + srvB, probesB := countingProbeServer(t) + + mustConnectToPeer(t, ctx, htnet, peerA, srvA) + mustConnectToPeer(t, ctx, htnet, peerB, srvB) + + if got := probesA.Load(); got != 1 { + t.Errorf("server A: got %d probes, want 1", got) + } + if got := probesB.Load(); got != 1 { + t.Errorf("server B: got %d probes, want 1 (different endpoint, fresh probe expected)", got) + } +} diff --git a/bitswap/network/httpnet/cooldown.go b/bitswap/network/httpnet/cooldown.go index 1e9751146..5a61ea109 100644 --- a/bitswap/network/httpnet/cooldown.go +++ b/bitswap/network/httpnet/cooldown.go @@ -75,6 +75,16 @@ func (ct *cooldownTracker) remove(host string) { ct.urlsLock.Unlock() } +// onCooldown reports whether host has an active cooldown right now. +// Callers (e.g. the pinger) use this to honour Retry-After by skipping +// probes inside the window the gateway asked us to wait. +func (ct *cooldownTracker) onCooldown(host string) bool { + ct.urlsLock.RLock() + defer ct.urlsLock.RUnlock() + dl, ok := ct.urls[host] + return ok && time.Now().Before(dl) +} + func (ct *cooldownTracker) fillSenderURLs(urls []network.ParsedURL) []*senderURL { now := time.Now() surls := make([]*senderURL, len(urls)) diff --git a/bitswap/network/httpnet/cooldown_test.go b/bitswap/network/httpnet/cooldown_test.go new file mode 100644 index 000000000..d37d4a9c1 --- /dev/null +++ b/bitswap/network/httpnet/cooldown_test.go @@ -0,0 +1,46 @@ +package httpnet + +import ( + "testing" + "time" +) + +func TestCooldownTracker_OnCooldown(t *testing.T) { + ct := newCooldownTracker(time.Minute) + defer ct.stopCleaner() + + if ct.onCooldown("missing") { + t.Errorf("unset host reported on cooldown") + } + + ct.setByDuration("active", 10*time.Second) + if !ct.onCooldown("active") { + t.Errorf("active deadline not reported on cooldown") + } + + // A deadline in the past must not be reported as on cooldown. + ct.setByDate("expired", time.Now().Add(-1*time.Second)) + if ct.onCooldown("expired") { + t.Errorf("expired deadline reported on cooldown") + } + + ct.remove("active") + if ct.onCooldown("active") { + t.Errorf("removed host still reported on cooldown") + } +} + +func TestCooldownTracker_MaxBackoffCap(t *testing.T) { + ct := newCooldownTracker(2 * time.Second) + defer ct.stopCleaner() + + // Asking for 1 hour must be clamped to maxBackoff. + ct.setByDuration("clamp", time.Hour) + + ct.urlsLock.RLock() + dl := ct.urls["clamp"] + ct.urlsLock.RUnlock() + if got := time.Until(dl); got > 3*time.Second { + t.Errorf("setByDuration not clamped: %s remaining, want <=2s+slack", got) + } +} diff --git a/bitswap/network/httpnet/error_tracker.go b/bitswap/network/httpnet/error_tracker.go index 72eb6ae1c..bf767084d 100644 --- a/bitswap/network/httpnet/error_tracker.go +++ b/bitswap/network/httpnet/error_tracker.go @@ -3,46 +3,65 @@ package httpnet import ( "errors" "sync" - - "github.com/libp2p/go-libp2p/core/peer" ) -var errThresholdCrossed = errors.New("the peer crossed the error threshold") +var errThresholdCrossed = errors.New("the host crossed the error threshold") +// errorTracker counts client errors (e.g. 404) per HTTP endpoint. +// +// Counts are keyed on endpointKey, not on peer.ID. Multiple peer IDs +// that resolve to the same gateway contribute to one shared counter, +// so a misbehaving host trips the breaker once for all peers using it +// rather than after every peer drained its own quota. type errorTracker struct { - ht *Network - - mux sync.RWMutex - errors map[peer.ID]int -} - -func newErrorTracker(ht *Network) *errorTracker { - return &errorTracker{ - ht: ht, - errors: make(map[peer.ID]int), - } + mu sync.Mutex + counts map[string]int } -func (et *errorTracker) stopTracking(p peer.ID) { - et.mux.Lock() - delete(et.errors, p) - et.mux.Unlock() +func newErrorTracker() *errorTracker { + return &errorTracker{counts: make(map[string]int)} } -// logErrors adds n to the current error count for p. If the total count is above the threshold, then an error is returned. If n is 0, the the total count is reset to 0. -func (et *errorTracker) logErrors(p peer.ID, n int, threshold int) error { - et.mux.Lock() - defer et.mux.Unlock() +// logErrors attributes n client errors to every host backing urls. +// When n is zero, the counters for those hosts reset (a successful +// SendMsg signals the hosts are healthy). +// +// Returns errThresholdCrossed when any host's count exceeds threshold. +// The caller decides what to do with that signal; today it disconnects +// the peer that triggered the check. Other peers using the same host +// are caught lazily on their next SendMsg. +func (et *errorTracker) logErrors(urls []*senderURL, n, threshold int) error { + if len(urls) == 0 { + return nil + } + et.mu.Lock() + defer et.mu.Unlock() - if n == 0 { // reset error count - delete(et.errors, p) + if n == 0 { + for _, u := range urls { + delete(et.counts, endpointKey(u.URL.Scheme, u.URL.Host, u.SNI)) + } return nil } - count := et.errors[p] - total := count + n - et.errors[p] = total - if total > threshold { + + var tripped bool + for _, u := range urls { + key := endpointKey(u.URL.Scheme, u.URL.Host, u.SNI) + et.counts[key] += n + if et.counts[key] > threshold { + tripped = true + } + } + if tripped { return errThresholdCrossed } return nil } + +// reset drops the count for key. Called when no peer is using the host +// so a future reconnect starts fresh. +func (et *errorTracker) reset(key string) { + et.mu.Lock() + delete(et.counts, key) + et.mu.Unlock() +} diff --git a/bitswap/network/httpnet/error_tracker_test.go b/bitswap/network/httpnet/error_tracker_test.go index 1f4f99a10..d0b17846f 100644 --- a/bitswap/network/httpnet/error_tracker_test.go +++ b/bitswap/network/httpnet/error_tracker_test.go @@ -1,96 +1,121 @@ package httpnet -// Write tests for the errorTracker implementation found in watcher.go import ( + "net/url" "sync" + "sync/atomic" "testing" + "time" - "github.com/libp2p/go-libp2p/core/peer" + "github.com/ipfs/boxo/bitswap/network" ) -func TestErrorTracker_StopTracking(t *testing.T) { - et := newErrorTracker(&Network{}) - p := peer.ID("testpeer") - - // Stop tracking the peer - et.stopTracking(p) - - // Check if the error count is removed - if _, ok := et.errors[p]; ok { - t.Errorf("Expected peer %s to be untracked but it was still tracked", p) +// makeTestURLs returns n senderURLs each pointing at a distinct synthetic +// host. Counters and cooldowns are initialized so the senderURLs are +// safe to use against the per-host trackers. +func makeTestURLs(t *testing.T, hosts ...string) []*senderURL { + t.Helper() + out := make([]*senderURL, len(hosts)) + for i, h := range hosts { + u, err := url.Parse("https://" + h) + if err != nil { + t.Fatalf("parse %q: %v", h, err) + } + su := &senderURL{ + ParsedURL: network.ParsedURL{URL: u}, + serverErrors: new(atomic.Int64), + } + su.cooldown.Store(time.Time{}) + out[i] = su } + return out } func TestErrorTracker_LogErrors_Reset(t *testing.T) { - et := newErrorTracker(&Network{}) - p := peer.ID("testpeer") + et := newErrorTracker() + urls := makeTestURLs(t, "host-a") - // Log some errors - err := et.logErrors(p, 5, 10) - if err != nil { - t.Errorf("Expected no error when logging errors but got %v", err) + if err := et.logErrors(urls, 5, 10); err != nil { + t.Errorf("logging 5 below threshold: got err %v, want nil", err) } - // Reset error count - err = et.logErrors(p, 0, 10) - if err != nil { - t.Errorf("Expected no error when resetting error count but got %v", err) + if err := et.logErrors(urls, 0, 10); err != nil { + t.Errorf("reset: got err %v, want nil", err) } - // Check if the error count is reset to 0 - count := et.errors[p] - if count != 0 { - t.Errorf("Expected error count for peer %s to be 0 after reset but got %d", p, count) + key := endpointKey(urls[0].URL.Scheme, urls[0].URL.Host, urls[0].SNI) + if got := et.counts[key]; got != 0 { + t.Errorf("after reset: count=%d, want 0", got) } } func TestErrorTracker_LogErrors_ThresholdCrossed(t *testing.T) { - et := newErrorTracker(&Network{}) - p := peer.ID("testpeer") + et := newErrorTracker() + urls := makeTestURLs(t, "host-a") + + if err := et.logErrors(urls, 11, 10); err != errThresholdCrossed { + t.Errorf("logging above threshold: got err %v, want errThresholdCrossed", err) + } + + key := endpointKey(urls[0].URL.Scheme, urls[0].URL.Host, urls[0].SNI) + if got := et.counts[key]; got != 11 { + t.Errorf("after trip: count=%d, want 11", got) + } +} - // Log errors until threshold is crossed - err := et.logErrors(p, 11, 10) - if err != errThresholdCrossed { - t.Errorf("Expected errorThresholdCrossed when logging errors above threshold but got %v", err) +// TestErrorTracker_SharesCounterAcrossPeers verifies that two peers +// resolving to the same HTTP endpoint share one error counter, so the +// breaker trips after one combined threshold rather than after each +// peer drains its own quota. +func TestErrorTracker_SharesCounterAcrossPeers(t *testing.T) { + et := newErrorTracker() + peerA := makeTestURLs(t, "shared-host") + peerB := makeTestURLs(t, "shared-host") + + // Peer A logs 60 errors; threshold 100 not yet crossed. + if err := et.logErrors(peerA, 60, 100); err != nil { + t.Errorf("peerA: got err %v, want nil", err) + } + // Peer B logs 50 errors; combined 110 trips the breaker. + if err := et.logErrors(peerB, 50, 100); err != errThresholdCrossed { + t.Errorf("peerB: got err %v, want errThresholdCrossed", err) } +} - // Check if the error count reflects the logged errors - count, ok := et.errors[p] - if !ok { - t.Errorf("Expected peer %s to be tracked but it was not", p) +func TestErrorTracker_DistinctHostsTrackIndependently(t *testing.T) { + et := newErrorTracker() + a := makeTestURLs(t, "host-a") + b := makeTestURLs(t, "host-b") + + if err := et.logErrors(a, 11, 10); err != errThresholdCrossed { + t.Errorf("host-a: got err %v, want errThresholdCrossed", err) } - if count != 11 { - t.Errorf("Expected error count for peer %s to be 10 after logging errors above threshold but got %d", p, count) + if err := et.logErrors(b, 5, 10); err != nil { + t.Errorf("host-b: got err %v, want nil (independent of host-a)", err) } } -// Write a test that tests concurrent access to the methods +// TestErrorTracker_ConcurrentAccess verifies safety under concurrent +// goroutines incrementing the same host counter. func TestErrorTracker_ConcurrentAccess(t *testing.T) { - et := newErrorTracker(&Network{}) - p := peer.ID("testpeer") + et := newErrorTracker() + urls := makeTestURLs(t, "host-a") var wg sync.WaitGroup - numRoutines := 10 - threshold := 100 + const numRoutines = 10 + const threshold = 100 for range numRoutines { wg.Go(func() { - for j := 0; j < threshold/numRoutines; j++ { - et.logErrors(p, 1, threshold) + for range threshold / numRoutines { + et.logErrors(urls, 1, threshold) } }) } - wg.Wait() - // Check if the error count is correct - count, ok := et.errors[p] - if !ok { - t.Errorf("Expected peer %s to be tracked but it was not", p) - } - expectedCount := threshold - actualCount := count - if actualCount != expectedCount { - t.Errorf("Expected error count for peer %s to be %d after concurrent logging but got %d", p, expectedCount, actualCount) + key := endpointKey(urls[0].URL.Scheme, urls[0].URL.Host, urls[0].SNI) + if got := et.counts[key]; got != threshold { + t.Errorf("after concurrent logging: count=%d, want %d", got, threshold) } } diff --git a/bitswap/network/httpnet/httpnet.go b/bitswap/network/httpnet/httpnet.go index 7be48267d..0fffcf5ce 100644 --- a/bitswap/network/httpnet/httpnet.go +++ b/bitswap/network/httpnet/httpnet.go @@ -234,6 +234,8 @@ type Network struct { errorTracker *errorTracker requestTracker *requestTracker cooldownTracker *cooldownTracker + inflight *inflightTracker + breaker *breaker ongoingConnsLock sync.RWMutex ongoingConns map[peer.ID]struct{} @@ -302,6 +304,9 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork { cooldownTracker := newCooldownTracker(DefaultMaxBackoff) htnet.cooldownTracker = cooldownTracker + htnet.inflight = newInflightTracker() + htnet.breaker = newBreaker() + netdialer := &net.Dialer{ // Timeout for connects to complete. Timeout: htnet.dialTimeout, @@ -360,11 +365,10 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork { } htnet.client = c - pinger := newPinger(htnet, pingCid) + pinger := newPinger(htnet) htnet.pinger = pinger - et := newErrorTracker(htnet) - htnet.errorTracker = et + htnet.errorTracker = newErrorTracker() for i := 0; i < htnet.httpWorkers; i++ { go htnet.httpWorker(i) @@ -423,7 +427,11 @@ func (ht *Network) senderURLs(p peer.ID) []*senderURL { if len(urls) == 0 { return nil } - return ht.cooldownTracker.fillSenderURLs(urls) + surls := ht.cooldownTracker.fillSenderURLs(urls) + for _, su := range surls { + su.serverErrors = ht.breaker.counter(endpointKey(su.URL.Scheme, su.URL.Host, su.SNI)) + } + return surls } // IsHTTPPeer returns true if the peer is currently being pinged, which means @@ -531,22 +539,29 @@ func (ht *Network) Connect(ctx context.Context, pi peer.AddrInfo) error { urls = urls[0:ht.maxHTTPAddressesPerPeer] } - // Try to talk to the peer by making HTTP requests to its urls and - // recording which ones work. This allows re-using the connections - // that we are about to open next time with the client. We call - // peer.Connected() on success. + // Probe each URL to confirm the gateway speaks our protocol. URLs + // already proven working by another peer (delegated routing returns + // multiple peer IDs for one gateway) skip the probe entirely and + // inherit the cached HEAD-support decision. var workingAddrs []multiaddr.Multiaddr supportsHead := true for _, u := range urls { - // If head works we assume GET works too. - status, err := ht.connectToURL(ctx, pi.ID, u, "HEAD") + if method, ok := ht.pinger.endpointKnown(u); ok { + workingAddrs = append(workingAddrs, u.Multiaddress) + if method != http.MethodHead { + supportsHead = false + } + log.Debugf("skipping probe for %s: endpoint already known via another peer", u.URL) + continue + } + + // If HEAD works we assume GET works too. + status, err := ht.connectToURL(ctx, pi.ID, u, http.MethodHead) if err != nil { errs = append(errs, fmt.Errorf("%s: %s", u.Multiaddress.String(), err)) - // abort if context cancelled if ctxErr := ctx.Err(); ctxErr != nil { return errors.Join(errs...) } - if status == http.StatusTooManyRequests { continue // do not try GET, just move on. } @@ -557,8 +572,7 @@ func (ht *Network) Connect(ctx context.Context, pi peer.AddrInfo) error { // HEAD did not work. Try GET. supportsHead = false - - _, err = ht.connectToURL(ctx, pi.ID, u, "GET") + _, err = ht.connectToURL(ctx, pi.ID, u, http.MethodGet) if err != nil { errs = append(errs, fmt.Errorf("%s: %s", u.Multiaddress.String(), err)) if ctxErr := ctx.Err(); ctxErr != nil { @@ -659,11 +673,11 @@ func (ht *Network) DisconnectFrom(ctx context.Context, p peer.ID) error { log.Debugf("disconnecting from %s", p) ht.connEvtMgr.Disconnected(p) // notify everywhere that we are going offline + // stopPinging decrements per-host refcounts and, if this peer was + // the last user of a host, drops the host's breaker and + // errorTracker entries so a future reconnect starts fresh. The + // cooldownTracker has its own TTL cleaner and survives independently. ht.pinger.stopPinging(p) - ht.errorTracker.stopTracking(p) - - // coolDownTracker: we leave untouched. We want to keep - // ongoing cooldowns there in case we reconnect to this peer. return nil } @@ -750,20 +764,16 @@ func (ht *Network) httpWorker(i int) { if serr != nil { switch serr.Type { case typeRetryLater: - // This error signals that we - // should retry but if things - // keep failing we consider it - // a serverError. When - // multiple urls, retries may - // happen on a different url. + // Retry-After is the gateway's "I am + // busy, wait" signal, not a server + // failure. Drop the URL from this + // SendMsg's retry pool so the loop + // terminates, but do not charge the + // per-host breaker: throttling must + // not escalate to a disconnect. retryLaterErrors++ if retryLaterErrors%2 == 0 { - // we retried same CID 2 times. No luck. - // Increase server errors. - // Start ignoring urls. - result.err.Type = typeServer urlIgnore = append(urlIgnore, u) - u.serverErrors.Add(1) } continue // retry request again case typeClient: diff --git a/bitswap/network/httpnet/httpnet_test.go b/bitswap/network/httpnet/httpnet_test.go index 80a9650e5..d3d0e912d 100644 --- a/bitswap/network/httpnet/httpnet_test.go +++ b/bitswap/network/httpnet/httpnet_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "net/url" "strings" + "sync/atomic" "testing" "time" @@ -341,27 +342,12 @@ func TestBestURL(t *testing.T) { } // add some bogus urls to test the sorting now := time.Now() - surls := []*senderURL{ - { - ParsedURL: network.ParsedURL{ - URL: urls[0], - }, - }, - { - ParsedURL: network.ParsedURL{ - URL: urls[1], - }, - }, - { - ParsedURL: network.ParsedURL{ - URL: urls[2], - }, - }, - { - ParsedURL: network.ParsedURL{ - URL: urls[3], - }, - }, + surls := make([]*senderURL, len(urls)) + for i := range urls { + surls[i] = &senderURL{ + ParsedURL: network.ParsedURL{URL: urls[i]}, + serverErrors: new(atomic.Int64), + } } surls[0].cooldown.Store(now.Add(time.Second)) @@ -629,6 +615,22 @@ func TestBackOff(t *testing.T) { if len(recv.donthaves) != 2 || (len(recv.blocks)+len(recv.haves)) > 0 { t.Error("no blocks should have been received while on backoff") } + + // Retry-After is "I am busy", not "I am broken". The shared per-host + // breaker must not have been incremented; otherwise peer2 would have + // inherited a non-zero count and bestURL would have tripped the + // fatal disconnect path instead of returning DONT_HAVE. + urls := network.ExtractURLsFromPeer(htnet.host.Peerstore().PeerInfo(peer.ID())) + if len(urls) == 0 { + t.Fatal("expected at least one URL on peer") + } + hostKey := endpointKey(urls[0].URL.Scheme, urls[0].URL.Host, urls[0].SNI) + htnet.breaker.mu.Lock() + c := htnet.breaker.counters[hostKey] + htnet.breaker.mu.Unlock() + if c != nil && c.Load() != 0 { + t.Errorf("breaker counter for %s = %d after Retry-After cycle, want 0", hostKey, c.Load()) + } } // Write a TestErrorTracking function which tests that a peer is disconnected when the treshold is crossed. @@ -676,3 +678,92 @@ func TestErrorTracking(t *testing.T) { t.Fatal(err) } } + +// TestSharedBreakerDisconnectsAcrossPeers verifies that when one peer +// triggers a real server error against a host shared with other peers, +// the per-host breaker is high enough that the next SendMsg from any +// peer using the same host disconnects too. Without sharing, each peer +// would burn its own MaxRetries quota before disconnecting. +func TestSharedBreakerDisconnectsAcrossPeers(t *testing.T) { + ctx := context.Background() + recv := mockReceiver(t) + htnet, mn := mockNetwork(t, recv) + + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + if err := recv.waitConnected(1); err != nil { + t.Fatal(err) + } + mustConnectToPeer(t, ctx, htnet, peerB, srv) + if err := recv.waitConnected(1); err != nil { + t.Fatal(err) + } + + // Drive a real server error from peerA against the shared host. The + // server returns 500 with empty body which lands in the default + // (typeServer) arm of handleResponse and increments the breaker. + msg := makeWantsMessage([]cid.Cid{errorCid}) + if err := htnet.SendMessage(ctx, peerA.ID(), msg); err != nil { + t.Fatal(err) + } + if err := recv.waitDisconnected(2); err != nil { + t.Fatalf("peerA should disconnect after typeServer trips MaxRetries: %v", err) + } + + // Counter is shared. peerB's senderURL inherits the same pointer at + // MessageSender construction time; bestURL trips fatal before any + // HTTP request even goes out. + wl := makeCids(t, 0, 1) + if err := htnet.SendMessage(ctx, peerB.ID(), makeWantsMessage(wl)); err != nil { + t.Fatal(err) + } + if err := recv.waitDisconnected(2); err != nil { + t.Fatalf("peerB should disconnect via the per-host shared breaker: %v", err) + } +} + +// TestTryURL_CooldownAutoExpiry verifies that a senderURL with a stale +// (already-elapsed) cooldown deadline does not block tryURL forever. +// fillSenderURLs only checks the deadline at construction time, so a +// long-lived senderURL needs tryURL itself to ignore expired deadlines. +func TestTryURL_CooldownAutoExpiry(t *testing.T) { + ctx := context.Background() + recv := mockReceiver(t) + htnet, mn := mockNetwork(t, recv) + + p, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + srv := makeServer(t, 0, 1) + mustConnectToPeer(t, ctx, htnet, p, srv) + + nms, err := htnet.NewMessageSender(ctx, p.ID(), nil) + if err != nil { + t.Fatal(err) + } + ms := nms.(*httpMsgSender) + + // Stamp an already-elapsed cooldown on the senderURL. + ms.urls[0].cooldown.Store(time.Now().Add(-1 * time.Hour)) + + wantBlocks := makeCids(t, 0, 1) + msg := makeWantsMessage(wantBlocks) + if err := nms.SendMsg(ctx, msg); err != nil { + t.Fatal(err) + } + recv.wait(1) + if len(recv.blocks) != 1 { + t.Errorf("expected the block through despite stale cooldown; got blocks=%d donthaves=%d", + len(recv.blocks), len(recv.donthaves)) + } +} diff --git a/bitswap/network/httpnet/inflight.go b/bitswap/network/httpnet/inflight.go new file mode 100644 index 000000000..827c2d0e9 --- /dev/null +++ b/bitswap/network/httpnet/inflight.go @@ -0,0 +1,109 @@ +package httpnet + +import ( + "strings" + "sync" +) + +// inflightTracker coalesces concurrent identical HTTP requests so that +// multiple peer IDs sharing one HTTP endpoint produce one round trip, +// not one per peer ID. +// +// Delegated routing routinely returns multiple peer IDs for one HTTP +// gateway. Bitswap creates one MessageQueue per peer ID, so a single +// want-have broadcast for one CID becomes N identical HEAD requests +// against the same upstream. Want-have probes that bitswap sends to +// non-best peers fan out the same way. The pattern compounds as HTTP +// retrieval grows. +// +// Coalescing applies only while a request is in flight. Once the leader +// returns, subsequent callers issue their own requests; we do not cache +// results. +type inflightTracker struct { + mu sync.Mutex + calls map[string]*inflightCall +} + +// inflightResult carries the response data shared from leader to waiters. +// +// On a successful round trip, statusCode, body, retryAfter, and +// requestURL are populated and err is nil. On failure, err and errType +// are populated; statusCode and body may also be set if the failure +// happened after the response arrived. +type inflightResult struct { + statusCode int + body []byte + retryAfter string + requestURL string + err error + errType senderErrorType // valid only when err != nil +} + +type inflightCall struct { + done chan struct{} + result *inflightResult +} + +func newInflightTracker() *inflightTracker { + return &inflightTracker{calls: make(map[string]*inflightCall)} +} + +// do runs fn once for the leader and returns the same result to every +// concurrent waiter on key. shared reports whether this caller waited on +// another leader. +// +// The key must distinguish requests that differ in any field that would +// change the upstream response: scheme, host, SNI, method, and CID. Two +// peers that agree on all five send identical bytes on the wire and may +// share the result. +func (t *inflightTracker) do(key string, fn func() *inflightResult) (result *inflightResult, shared bool) { + t.mu.Lock() + if c, ok := t.calls[key]; ok { + t.mu.Unlock() + <-c.done + return c.result, true + } + c := &inflightCall{done: make(chan struct{})} + t.calls[key] = c + t.mu.Unlock() + + c.result = fn() + + t.mu.Lock() + delete(t.calls, key) + t.mu.Unlock() + close(c.done) + + return c.result, false +} + +// endpointKey identifies a unique HTTP endpoint. SNI is part of the key +// because providers can advertise the same host with different SNI +// expectations and the upstream response may differ. +func endpointKey(scheme, host, sni string) string { + var sb strings.Builder + sb.Grow(len(scheme) + len(host) + len(sni) + 4) + sb.WriteString(scheme) + sb.WriteString("://") + sb.WriteString(host) + sb.WriteByte('|') + sb.WriteString(sni) + return sb.String() +} + +// inflightKey extends endpointKey with method and CID, identifying a +// unique HTTP request that can be coalesced across callers. +func inflightKey(scheme, host, sni, method, cid string) string { + var sb strings.Builder + sb.Grow(len(scheme) + len(host) + len(sni) + len(method) + len(cid) + 7) + sb.WriteString(scheme) + sb.WriteString("://") + sb.WriteString(host) + sb.WriteByte('|') + sb.WriteString(sni) + sb.WriteByte('|') + sb.WriteString(method) + sb.WriteByte('|') + sb.WriteString(cid) + return sb.String() +} diff --git a/bitswap/network/httpnet/inflight_test.go b/bitswap/network/httpnet/inflight_test.go new file mode 100644 index 000000000..0f675b088 --- /dev/null +++ b/bitswap/network/httpnet/inflight_test.go @@ -0,0 +1,159 @@ +package httpnet + +import ( + "errors" + "net/http" + "sync" + "sync/atomic" + "testing" + "testing/synctest" +) + +// TestInflightCoalescesConcurrentCallers verifies that concurrent callers +// on the same key share one underlying call and observe the same result. +func TestInflightCoalescesConcurrentCallers(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + tracker := newInflightTracker() + + var calls atomic.Int32 + gate := make(chan struct{}) + want := &inflightResult{statusCode: 200, body: []byte("hello")} + + fn := func() *inflightResult { + calls.Add(1) + <-gate + return want + } + + const N = 10 + results := make([]*inflightResult, N) + shareds := make([]bool, N) + + var wg sync.WaitGroup + wg.Add(N) + for i := range N { + go func(i int) { + defer wg.Done() + results[i], shareds[i] = tracker.do("k", fn) + }(i) + } + + // Wait until the leader is blocked on gate and every waiter + // is blocked on the inflightCall's done channel. + synctest.Wait() + close(gate) + wg.Wait() + + if got := calls.Load(); got != 1 { + t.Fatalf("fn called %d times, want 1", got) + } + leaders := 0 + for i, r := range results { + if r != want { + t.Fatalf("caller %d got result %v, want %v", i, r, want) + } + if !shareds[i] { + leaders++ + } + } + if leaders != 1 { + t.Fatalf("got %d leaders, want exactly 1", leaders) + } + }) +} + +// TestInflightDistinctKeysRunIndependently verifies that callers on +// different keys do not block each other and each runs fn. +func TestInflightDistinctKeysRunIndependently(t *testing.T) { + tracker := newInflightTracker() + + var calls atomic.Int32 + fn := func() *inflightResult { + calls.Add(1) + return &inflightResult{statusCode: 200} + } + + for _, key := range []string{"a", "b", "c"} { + _, shared := tracker.do(key, fn) + if shared { + t.Fatalf("key %q reported shared=true on first call", key) + } + } + if got := calls.Load(); got != 3 { + t.Fatalf("fn called %d times, want 3", got) + } +} + +// TestInflightReleasesAfterCompletion verifies that the tracker drops +// completed entries so that subsequent identical calls run fn again +// rather than returning the stale prior result. +func TestInflightReleasesAfterCompletion(t *testing.T) { + tracker := newInflightTracker() + + first := &inflightResult{statusCode: 200, body: []byte("a")} + second := &inflightResult{statusCode: 404, body: []byte("b")} + + got1, shared1 := tracker.do("k", func() *inflightResult { return first }) + if shared1 || got1 != first { + t.Fatalf("first call: shared=%v got=%v want first=%v", shared1, got1, first) + } + + got2, shared2 := tracker.do("k", func() *inflightResult { return second }) + if shared2 || got2 != second { + t.Fatalf("second call: shared=%v got=%v want second=%v", shared2, got2, second) + } +} + +// TestInflightPropagatesErrors verifies that a leader failure surfaces +// to all waiters identically. +func TestInflightPropagatesErrors(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + tracker := newInflightTracker() + + gate := make(chan struct{}) + wantErr := errors.New("boom") + fn := func() *inflightResult { + <-gate + return &inflightResult{err: wantErr, errType: typeServer} + } + + const N = 4 + results := make([]*inflightResult, N) + var wg sync.WaitGroup + wg.Add(N) + for i := range N { + go func(i int) { + defer wg.Done() + results[i], _ = tracker.do("k", fn) + }(i) + } + synctest.Wait() + close(gate) + wg.Wait() + + for i, r := range results { + if r == nil || !errors.Is(r.err, wantErr) { + t.Fatalf("caller %d got %v, want err=%v", i, r, wantErr) + } + } + }) +} + +// TestInflightKeyDistinguishesFields verifies that the key separates +// requests that differ in any field that influences the upstream +// response. +func TestInflightKeyDistinguishesFields(t *testing.T) { + base := inflightKey("https", "example.com", "sni", http.MethodGet, "cid") + cases := map[string]string{ + "scheme": inflightKey("http", "example.com", "sni", http.MethodGet, "cid"), + "host": inflightKey("https", "other.com", "sni", http.MethodGet, "cid"), + "sni": inflightKey("https", "example.com", "other-sni", http.MethodGet, "cid"), + "method": inflightKey("https", "example.com", "sni", http.MethodHead, "cid"), + "cid": inflightKey("https", "example.com", "sni", http.MethodGet, "other-cid"), + } + for name, k := range cases { + if k == base { + t.Errorf("differing %s produced identical key", name) + } + } +} diff --git a/bitswap/network/httpnet/msg_sender.go b/bitswap/network/httpnet/msg_sender.go index 1a6734e13..c7d711d97 100644 --- a/bitswap/network/httpnet/msg_sender.go +++ b/bitswap/network/httpnet/msg_sender.go @@ -60,10 +60,16 @@ func setSenderOpts(opts *network.MessageSenderOpts) network.MessageSenderOpts { } // senderURL wraps url with information about cooldowns and errors. +// +// serverErrors points at a per-host counter shared across every +// senderURL that resolves to the same (scheme, host, sni). When one +// peer trips the breaker on an endpoint, every other peer sharing it +// sees the elevated count immediately. The pointer is supplied by +// breaker.counter and must be non-nil before the senderURL is used. type senderURL struct { network.ParsedURL cooldown atomic.Value - serverErrors atomic.Int64 + serverErrors *atomic.Int64 } // httpMsgSender implements a network.MessageSender. @@ -174,10 +180,14 @@ func (err senderError) Error() string { return err.Err.Error() } -// tryURL attempts to make a request to the given URL using the given entry. -// Blocks, Haves etc. are recorded in the given response. cancellations are -// processed. tryURL returns an error so that it can be decided what to do next: -// i.e. retry, or move to next item in wantlist, or abort completely. +// tryURL makes one HTTP request for entry against u. It returns a +// senderError describing what the caller should do next: retry the same +// URL, move on to the next URL, skip the entry, or abort. +// +// Concurrent identical requests against the same HTTP endpoint share one +// round trip via the inflight tracker. See inflight.go for the rationale. +// Each caller still updates its own per-URL cooldown from the shared +// response. func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsmsg.Entry) (blocks.Block, *senderError) { var method string @@ -190,35 +200,50 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm panic("unknown bitswap entry type") } - if dl := u.cooldown.Load().(time.Time); !dl.IsZero() { + // Skip the request while the host's cooldown is still active. We + // auto-expire the deadline here so a long-lived senderURL doesn't + // stay stuck on a stale cooldown (fillSenderURLs only checks the + // deadline at construction time). + if dl := u.cooldown.Load().(time.Time); !dl.IsZero() && time.Now().Before(dl) { err := fmt.Errorf("cooldown (%s): %s %q ", dl, method, u.URL) log.Debug(err) - return nil, &senderError{ - Type: typeRetryLater, - Err: err, - } + return nil, &senderError{Type: typeRetryLater, Err: err} } - // We do not abort ongoing requests. This is known to cause "http2: - // server sent GOAWAY and closed the connection" Losing a connection - // is worse than downloading some extra bytes. We do abort if the - // context WAS already cancelled before making the request. + // Abort only if the parent context is already cancelled. Cancelling + // an in-flight request triggers "http2: server sent GOAWAY and + // closed the connection"; the lost connection costs more than the + // extra bytes. if err := ctx.Err(); err != nil { log.Debugf("aborted before sending: %s %q", method, u.URL) - return nil, &senderError{ - Type: typeContext, - Err: err, - } + return nil, &senderError{Type: typeContext, Err: err} } + cidStr := entry.Cid.String() + key := inflightKey(u.URL.Scheme, u.URL.Host, u.SNI, method, cidStr) + res, shared := sender.ht.inflight.do(key, func() *inflightResult { + return sender.executeRequest(u, method, cidStr) + }) + if shared { + log.Debugf("piggybacked on inflight request: %s %q", method, u.URL) + } + + return sender.handleResponse(u, entry, method, res) +} + +// executeRequest runs the HTTP round trip and records wire-level metrics. +// It runs once per coalesced request; waiters reuse the result via +// inflightTracker.do. +func (sender *httpMsgSender) executeRequest(u *senderURL, method, cidStr string) *inflightResult { + // Detached context with the configured send timeout: the request must + // outlive any single caller's context so that waiters always get a + // usable result. ctx, cancel := context.WithTimeout(context.Background(), sender.opts.SendTimeout) defer cancel() - req, err := buildRequest(ctx, u.ParsedURL, method, entry.Cid.String(), sender.ht.userAgent) + + req, err := buildRequest(ctx, u.ParsedURL, method, cidStr, sender.ht.userAgent) if err != nil { - return nil, &senderError{ - Type: typeFatal, - Err: err, - } + return &inflightResult{err: err, errType: typeFatal} } log.Debugf("%d/%d %s %q", u.serverErrors.Load(), sender.opts.MaxRetries, method, req.URL) @@ -226,78 +251,82 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm sender.ht.metrics.RequestsInFlight.Inc() resp, err := sender.ht.client.Do(req) if err != nil { - err = fmt.Errorf("error making request to %q: %w", req.URL, err) + wrapped := fmt.Errorf("error making request to %q: %w", req.URL, err) sender.ht.metrics.RequestsFailure.Inc() sender.ht.metrics.RequestsInFlight.Dec() - log.Debug(err) - // Something prevents us from making a request. We cannot - // dial, or setup the connection perhaps. This counts as - // server error (unless context cancellation). This means we - // allow ourselves to hit this a maximum of MaxRetries per url. - // and Disconnect() the peer when no urls work. - serr := &senderError{ - Type: typeServer, - Err: err, - } - + log.Debug(wrapped) + errType := typeServer if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - serr.Type = typeContext // cont. with next block. + errType = typeContext + } + return &inflightResult{ + err: wrapped, + errType: errType, + requestURL: req.URL.String(), } - - return nil, serr } defer resp.Body.Close() - // Record request size var buf bytes.Buffer req.Write(&buf) - sender.ht.metrics.RequestsSentBytes.Add(float64((&buf).Len())) - - // Handle responses - limReader := &io.LimitedReader{ - R: resp.Body, - N: sender.ht.maxBlockSize, - } + sender.ht.metrics.RequestsSentBytes.Add(float64(buf.Len())) + limReader := &io.LimitedReader{R: resp.Body, N: sender.ht.maxBlockSize} body, err := io.ReadAll(limReader) if err != nil { - // treat this as server error - err = fmt.Errorf("error reading body from %q: %w", req.URL, err) + wrapped := fmt.Errorf("error reading body from %q: %w", req.URL, err) sender.ht.metrics.RequestsBodyFailure.Inc() sender.ht.metrics.RequestsInFlight.Dec() - log.Debug(err) - return nil, &senderError{ - Type: typeServer, - Err: err, + log.Debug(wrapped) + return &inflightResult{ + err: wrapped, + errType: typeServer, + statusCode: resp.StatusCode, + requestURL: req.URL.String(), } } - // special cases in response handling. Happens here to simplify - // metrics/handling below. statusCode := resp.StatusCode - // 1) Observed that some gateway implementation returns 500 instead of - // 404. - if statusCode != 200 && isKnownNotFoundError(string(body)) { - statusCode = 404 + // Some gateway implementations return 500 with an IPLD-shaped error + // body when they cannot find the content. Treat those as 404. + if statusCode != http.StatusOK && isKnownNotFoundError(string(body)) { + statusCode = http.StatusNotFound log.Debugf("treating as 404: %q -> %d: %q", req.URL, resp.StatusCode, string(body)) } - // Calculate full response size with headers and everything. - // So this is comparable to bitswap message response sizes. + // Record the full response size including headers so it stays + // comparable to bitswap message response sizes. resp.Body = nil var respBuf bytes.Buffer resp.Write(&respBuf) - respLen := (&respBuf).Len() + len(body) + respLen := respBuf.Len() + len(body) sender.ht.metrics.ResponseSizes.Observe(float64(respLen)) sender.ht.metrics.RequestsInFlight.Dec() - host := u.URL.Hostname() - // updateStatusCounter - sender.ht.metrics.updateStatusCounter(req.Method, statusCode, host) + sender.ht.metrics.updateStatusCounter(req.Method, statusCode, u.URL.Hostname()) + + return &inflightResult{ + statusCode: statusCode, + body: body, + retryAfter: resp.Header.Get("Retry-After"), + requestURL: req.URL.String(), + } +} + +// handleResponse classifies the shared HTTP result for the calling sender +// and updates its per-URL cooldown. Each waiter on a coalesced request +// runs this independently so that bitswap's per-peer accounting matches +// what it would see if every peer had made its own request. +func (sender *httpMsgSender) handleResponse(u *senderURL, entry bsmsg.Entry, method string, res *inflightResult) (blocks.Block, *senderError) { + if res.err != nil { + return nil, &senderError{Type: res.errType, Err: res.err} + } + + statusCode := res.statusCode + body := res.body switch statusCode { - // Valid responses signaling unavailability of the - // content. + // Valid responses that signal the content is unavailable. case http.StatusNotFound, http.StatusGone, http.StatusForbidden, @@ -308,39 +337,24 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm http.StatusTemporaryRedirect, http.StatusPermanentRedirect: - err := fmt.Errorf("%s %q -> %d: %q", req.Method, req.URL, statusCode, string(body)) + err := fmt.Errorf("%s %q -> %d: %q", method, res.requestURL, statusCode, string(body)) log.Debug(err) - // clear cooldowns since we got a proper reply - if !u.cooldown.Load().(time.Time).IsZero() { - sender.ht.cooldownTracker.remove(req.URL.Host) - u.cooldown.Store(time.Time{}) - } + sender.clearCooldown(u) + return nil, &senderError{Type: typeClient, Err: err} - return nil, &senderError{ - Type: typeClient, - Err: err, - } - case http.StatusOK: // \(^°^)/ - // clear cooldowns since we got a proper reply - if !u.cooldown.Load().(time.Time).IsZero() { - sender.ht.cooldownTracker.remove(req.URL.Host) - u.cooldown.Store(time.Time{}) - } - log.Debugf("%s %q -> %d (%d bytes)", req.Method, req.URL, statusCode, len(body)) + case http.StatusOK: + sender.clearCooldown(u) + log.Debugf("%s %q -> %d (%d bytes)", method, res.requestURL, statusCode, len(body)) - if req.Method == http.MethodHead { + if method == http.MethodHead { return nil, nil } - // GET b, err := bsmsg.NewWantlistBlock(body, entry.Cid, entry.Cid.Prefix()) if err != nil { log.Debugf("error making wantlist block for %s: %s", entry.Cid, err) - // avoid entertaining servers that send us wrong data - // too much. - return nil, &senderError{ - Type: typeServer, - Err: err, - } + // Server returned wrong data; treat as a server error so + // repeat offenders trip the breaker. + return nil, &senderError{Type: typeServer, Err: err} } atomic.AddUint64(&sender.ht.stats.MessagesRecvd, 1) return b, nil @@ -349,55 +363,63 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm http.StatusServiceUnavailable, http.StatusBadGateway, http.StatusGatewayTimeout: - // See path-gateway spec. All these codes SHOULD return - // Retry-After. They are used to signal that a block cannot - // be fetched too, not only fatal server issues, which poses a - // difficult overlap. Current approach treats these errors as - // non fatal if they don't happen repeatedly: - // - By default we disconnect on server errors: MaxRetries = 1. - // - First try errors. We add default backoff if non specified. - // - Retry same CID. If it fails again, count that as server - // error and avoid retrying on that url. - // - If we have no more urls to try, will move to next cid. - // - If we hit the MaxRetries for all urls, abort all. - - // In practice, our wantlists should be 1/3 elements. It - // doesn't make sense to tolerate 5 server errors for 3 - // requests as we will repeatedly hit broken servers that way. - // It is always better if endpoints keep these errors for - // server issues, and simply return 404 when they cannot find - // the content but everything else is fine. - err := fmt.Errorf("%s %q -> %d: %q", req.Method, req.URL, statusCode, string(body)) + // Per the path-gateway spec these codes SHOULD carry + // Retry-After. They cover both fatal server issues and "block + // not available right now", which overlap. We treat them as + // non-fatal until they repeat: + // - MaxRetries defaults to 1, so we disconnect on the second + // consecutive server error. + // - First failure: cooldown using Retry-After if present, + // else the configured backoff. + // - Retry the same CID. If it fails again, count it as a + // server error and avoid retrying that URL. + // - When all URLs hit MaxRetries, abort. + // + // Wantlists are typically 1-3 items; tolerating many server + // errors per cycle just means hammering broken servers. We + // prefer that endpoints reserve these codes for genuine + // server issues and return 404 for missing content. + err := fmt.Errorf("%s %q -> %d: %q", method, res.requestURL, statusCode, string(body)) log.Warn(err) - retryAfter := resp.Header.Get("Retry-After") - cooldownUntil, ok := parseRetryAfter(retryAfter) - if ok { // it means we should retry, so we will retry. - sender.ht.cooldownTracker.setByDate(req.URL.Host, cooldownUntil) - u.cooldown.Store(cooldownUntil) - } else { - sender.ht.cooldownTracker.setByDuration(req.URL.Host, sender.opts.SendErrorBackoff) - u.cooldown.Store(time.Now().Add(sender.opts.SendErrorBackoff)) - } + sender.applyBackoff(u, res.retryAfter) + return nil, &senderError{Type: typeRetryLater, Err: err} - return nil, &senderError{ - Type: typeRetryLater, - Err: err, - } - - // For any other code, we assume we must temporally - // backoff from the URL per the options. - // Tolerance for server errors per url is low. If after waiting etc. - // it fails MaxRetries, we will fully disconnect. + // For any other code we back off from the URL per the options. + // Tolerance for server errors per URL is low: after MaxRetries we + // disconnect from the peer. default: - err := fmt.Errorf("%q -> %d: %q", req.URL, statusCode, string(body)) + err := fmt.Errorf("%q -> %d: %q", res.requestURL, statusCode, string(body)) log.Warn(err) - sender.ht.cooldownTracker.setByDuration(req.URL.Host, sender.opts.SendErrorBackoff) - u.cooldown.Store(time.Now().Add(sender.opts.SendErrorBackoff)) - return nil, &senderError{ - Type: typeServer, - Err: err, - } + sender.applyBackoff(u, "") + return nil, &senderError{Type: typeServer, Err: err} + } +} + +// clearCooldown removes any active cooldown on u after a definitive +// response and resets the host's serverErrors counter. A 200 or 404 +// proves the host is healthy, so we forgive prior breaker accruals +// and let every peer sharing the host start fresh. +func (sender *httpMsgSender) clearCooldown(u *senderURL) { + if !u.cooldown.Load().(time.Time).IsZero() { + sender.ht.cooldownTracker.remove(u.URL.Host) + u.cooldown.Store(time.Time{}) + } + if u.serverErrors.Load() > 0 { + u.serverErrors.Store(0) + } +} + +// applyBackoff sets a cooldown on u. If retryAfter parses as a date or +// seconds, that wins; otherwise we fall back to the configured +// SendErrorBackoff. +func (sender *httpMsgSender) applyBackoff(u *senderURL, retryAfter string) { + if t, ok := parseRetryAfter(retryAfter); ok { + sender.ht.cooldownTracker.setByDate(u.URL.Host, t) + u.cooldown.Store(t) + return } + sender.ht.cooldownTracker.setByDuration(u.URL.Host, sender.opts.SendErrorBackoff) + u.cooldown.Store(time.Now().Add(sender.opts.SendErrorBackoff)) } // isKnownNotFoundError checks if the response body contains a known IPLD-specific @@ -568,7 +590,7 @@ WANTLIST_LOOP: } // if totalClientErrors == 0, count is reset. - if err := sender.ht.errorTracker.logErrors(sender.peer, totalClientErrors, sender.ht.maxDontHaveErrors); err != nil { + if err := sender.ht.errorTracker.logErrors(sender.urls, totalClientErrors, sender.ht.maxDontHaveErrors); err != nil { log.Debugf("too many client errors. Disconnecting from %s", sender.peer) sender.ht.DisconnectFrom(ctx, sender.peer) } diff --git a/bitswap/network/httpnet/pinger.go b/bitswap/network/httpnet/pinger.go index 6a161eca7..642272193 100644 --- a/bitswap/network/httpnet/pinger.go +++ b/bitswap/network/httpnet/pinger.go @@ -3,6 +3,7 @@ package httpnet import ( "context" "errors" + "net/http" "sync" "time" @@ -11,46 +12,94 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) -// pinger pings connected hosts on regular intervals -// and tracks their latency. +// pingInterval is how often the per-host ticker probes each unique HTTP +// endpoint to refresh its latency reading. +const pingInterval = 5 * time.Second + +// ewmaSmoothing is the smoothing factor for the per-host latency EWMA. +// Matches the value used by the libp2p peerstore's LatencyEWMA. +const ewmaSmoothing = 0.1 + +// errCooldownActive is reported by ping() when a URL is skipped because +// the host is in its Retry-After window. +var errCooldownActive = errors.New("host is on cooldown, skipping probe") + +// pinger probes HTTP endpoints periodically and tracks their latency. +// +// State is keyed by HTTP endpoint, not by peer ID. Multiple peer IDs that +// resolve to the same endpoint share one ticker and one latency reading. +// With per-peer pinging, N peer IDs idle on one gateway would produce +// N times the probe traffic (HEAD /ipfs/ every 5 s) for no new +// information. The gateway is a single physical resource, so we ping +// it once. type pinger struct { ht *Network - latenciesLock sync.RWMutex - latencies map[peer.ID]time.Duration + mu sync.Mutex + // hosts holds one entry per unique HTTP endpoint currently being + // pinged. Keyed by endpointKey. + hosts map[string]*hostPing + // peerHosts maps each registered peer to the host keys it depends + // on, so stopPinging can decrement the right refcounts. + peerHosts map[peer.ID]map[string]struct{} +} + +// hostPing tracks the periodic probe for one HTTP endpoint plus its +// latency reading. refcount counts the peer IDs registered against it; +// the ticker runs while refcount > 0. +type hostPing struct { + url network.ParsedURL + method string + // peerID is a representative peer.ID kept for connectToURL logging. + // It is captured at first registration; later peers piggyback on + // this one's probes. + peerID peer.ID + + refcount int + cancel context.CancelFunc - pingsLock sync.RWMutex - pings map[peer.ID]context.CancelFunc + latencyMu sync.Mutex + latency time.Duration // EWMA, zero until the first sample } -func newPinger(ht *Network, pingCid string) *pinger { +func newPinger(ht *Network) *pinger { return &pinger{ ht: ht, - latencies: make(map[peer.ID]time.Duration), - pings: make(map[peer.ID]context.CancelFunc), + hosts: make(map[string]*hostPing), + peerHosts: make(map[peer.ID]map[string]struct{}), } } -// ping sends a ping packet to the first known url of the given peer and -// returns the result with the latency for this peer. The result is also -// recorded. +// ping issues immediate probes against every URL of p in parallel and +// returns the average RTT. It also records the per-host latency so that +// subsequent latency() calls reflect the fresh reading. +// +// ping is intended for callers that need a fresh sample (Network.Ping); +// the periodic ticker handles the steady-state case independently. func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { pi := pngr.ht.host.Peerstore().PeerInfo(p) urls := network.ExtractURLsFromPeer(pi) if len(urls) == 0 { - return ping.Result{ - Error: ErrNoHTTPAddresses, - } + return ping.Result{Error: ErrNoHTTPAddresses} } - method := "GET" + method := http.MethodGet if supportsHave(pngr.ht.host.Peerstore(), p) { - method = "HEAD" + method = http.MethodHead } results := make(chan ping.Result, len(urls)) for _, u := range urls { go func(u network.ParsedURL) { + // Respect Retry-After: skip probes against hosts that + // are currently on cooldown. Caller (dontHaveTimeoutMgr) + // handles the resulting error by falling back to its + // default timeout, which message latency refines once + // real traffic flows. + if pngr.ht.cooldownTracker.onCooldown(u.URL.Host) { + results <- ping.Result{Error: errCooldownActive} + return + } start := time.Now() _, err := pngr.ht.connectToURL(ctx, p, u, method) if err != nil { @@ -58,9 +107,9 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { results <- ping.Result{Error: err} return } - results <- ping.Result{ - RTT: time.Since(start), - } + rtt := time.Since(start) + pngr.recordHostLatency(u, rtt) + results <- ping.Result{RTT: rtt} }(u) } @@ -76,99 +125,199 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { } close(results) - lenErrors := len(errs) - // if all urls failed return that, otherwise ignore. - if lenErrors == len(urls) { - return ping.Result{ - Error: errors.Join(errs...), - } + if len(errs) == len(urls) { + return ping.Result{Error: errors.Join(errs...)} } - result.RTT = result.RTT / time.Duration(len(urls)-lenErrors) - - // log.Debugf("ping latency %s %s", p, result.RTT) - pngr.recordLatency(p, result.RTT) + result.RTT = result.RTT / time.Duration(len(urls)-len(errs)) return result } -// latency returns the recorded latency for the given peer. -func (pngr *pinger) latency(p peer.ID) time.Duration { - var lat time.Duration - pngr.latenciesLock.RLock() - { - lat = pngr.latencies[p] +// recordHostLatency updates the EWMA for the host that u points to. The +// host entry must already exist (created by startPinging). +func (pngr *pinger) recordHostLatency(u network.ParsedURL, next time.Duration) { + key := endpointKey(u.URL.Scheme, u.URL.Host, u.SNI) + pngr.mu.Lock() + hp := pngr.hosts[key] + pngr.mu.Unlock() + if hp == nil { + return + } + hp.recordLatency(next) +} + +// endpointKnown reports whether the given URL points at an endpoint that +// is already registered (proven working by a previous Connect). When ok +// is true, method is the probe method captured at first registration +// (http.MethodHead or http.MethodGet). Callers can skip a fresh probe +// and inherit the supportsHead decision from method. +func (pngr *pinger) endpointKnown(u network.ParsedURL) (method string, ok bool) { + pngr.mu.Lock() + defer pngr.mu.Unlock() + hp, ok := pngr.hosts[endpointKey(u.URL.Scheme, u.URL.Host, u.SNI)] + if !ok { + return "", false } - pngr.latenciesLock.RUnlock() - return lat + return hp.method, true } -// recordLatency stores a new latency measurement for the given peer using an -// Exponetially Weighted Moving Average similar to LatencyEWMA from the -// peerstore. -func (pngr *pinger) recordLatency(p peer.ID, next time.Duration) { - nextf := float64(next) - s := 0.1 - pngr.latenciesLock.Lock() - { - ewma, found := pngr.latencies[p] - ewmaf := float64(ewma) - if !found { - pngr.latencies[p] = next // when no data, just take it as the mean. - } else { - nextf = ((1.0 - s) * ewmaf) + (s * nextf) - pngr.latencies[p] = time.Duration(nextf) +// latency returns the average EWMA latency across all hosts that p is +// registered against. Returns zero when p has no registered hosts or no +// host has produced a sample yet. +func (pngr *pinger) latency(p peer.ID) time.Duration { + pngr.mu.Lock() + keys := pngr.peerHosts[p] + hosts := make([]*hostPing, 0, len(keys)) + for k := range keys { + if hp, ok := pngr.hosts[k]; ok { + hosts = append(hosts, hp) + } + } + pngr.mu.Unlock() + + var total time.Duration + var samples int + for _, hp := range hosts { + if l := hp.currentLatency(); l > 0 { + total += l + samples++ } } - pngr.latenciesLock.Unlock() + if samples == 0 { + return 0 + } + return total / time.Duration(samples) } +// startPinging registers p with all of its known HTTP endpoints. New +// endpoints get a ticker; endpoints already pinged (because another peer +// shares them) just bump their refcount. func (pngr *pinger) startPinging(p peer.ID) { - pngr.pingsLock.Lock() - defer pngr.pingsLock.Unlock() - - _, ok := pngr.pings[p] - if ok { - log.Debugf("already pinging %s", p) + pi := pngr.ht.host.Peerstore().PeerInfo(p) + urls := network.ExtractURLsFromPeer(pi) + if len(urls) == 0 { + log.Debugf("startPinging: no HTTP URLs for %s", p) return } - ctx, cancel := context.WithCancel(context.Background()) - pngr.pings[p] = cancel + method := http.MethodGet + if supportsHave(pngr.ht.host.Peerstore(), p) { + method = http.MethodHead + } - log.Debugf("starting pings to %s", p) + pngr.mu.Lock() + defer pngr.mu.Unlock() - go func(ctx context.Context, p peer.ID) { - ticker := time.NewTicker(5 * time.Second) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - pngr.ping(ctx, p) - } + if _, ok := pngr.peerHosts[p]; ok { + log.Debugf("already pinging %s", p) + return + } + keys := make(map[string]struct{}, len(urls)) + for _, u := range urls { + key := endpointKey(u.URL.Scheme, u.URL.Host, u.SNI) + if _, dup := keys[key]; dup { + continue // peer advertised the same endpoint twice. + } + keys[key] = struct{}{} + + if hp, ok := pngr.hosts[key]; ok { + hp.refcount++ + continue } - }(ctx, p) + ctx, cancel := context.WithCancel(context.Background()) + hp := &hostPing{ + url: u, + method: method, + peerID: p, + refcount: 1, + cancel: cancel, + } + pngr.hosts[key] = hp + go pngr.tickerLoop(ctx, hp) + } + pngr.peerHosts[p] = keys + log.Debugf("starting pings for %s on %d hosts", p, len(keys)) } +// stopPinging unregisters p. Hosts whose refcount drops to zero have +// their ticker cancelled and their host-shared state (pinger, breaker, +// errorTracker) removed so a future reconnect starts fresh. func (pngr *pinger) stopPinging(p peer.ID) { - log.Debugf("stopping pings to %s", p) - pngr.pingsLock.Lock() - { - cancel, ok := pngr.pings[p] - if ok { - cancel() + pngr.mu.Lock() + defer pngr.mu.Unlock() + + keys, ok := pngr.peerHosts[p] + if !ok { + return + } + delete(pngr.peerHosts, p) + log.Debugf("stopping pings for %s", p) + + for key := range keys { + hp := pngr.hosts[key] + if hp == nil { + continue + } + hp.refcount-- + if hp.refcount > 0 { + continue } - delete(pngr.pings, p) + hp.cancel() + delete(pngr.hosts, key) + pngr.ht.breaker.reset(key) + pngr.ht.errorTracker.reset(key) } - pngr.pingsLock.Unlock() - pngr.latenciesLock.Lock() - delete(pngr.latencies, p) - pngr.latenciesLock.Unlock() } +// isPinging reports whether p has any host registrations. func (pngr *pinger) isPinging(p peer.ID) bool { - pngr.pingsLock.RLock() - defer pngr.pingsLock.RUnlock() - - _, ok := pngr.pings[p] + pngr.mu.Lock() + defer pngr.mu.Unlock() + _, ok := pngr.peerHosts[p] return ok } + +// tickerLoop probes hp on pingInterval until ctx is cancelled. +func (pngr *pinger) tickerLoop(ctx context.Context, hp *hostPing) { + ticker := time.NewTicker(pingInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + pngr.tickOnce(ctx, hp) + } + } +} + +func (pngr *pinger) tickOnce(ctx context.Context, hp *hostPing) { + // Respect Retry-After: while the host is in cooldown, the gateway + // has explicitly told us to wait. Skip the probe entirely. + if pngr.ht.cooldownTracker.onCooldown(hp.url.URL.Host) { + return + } + start := time.Now() + if _, err := pngr.ht.connectToURL(ctx, hp.peerID, hp.url, hp.method); err != nil { + log.Debug(err) + return + } + hp.recordLatency(time.Since(start)) +} + +// recordLatency updates the host's EWMA latency. The first sample is +// stored as-is; subsequent samples are smoothed. +func (hp *hostPing) recordLatency(next time.Duration) { + hp.latencyMu.Lock() + defer hp.latencyMu.Unlock() + if hp.latency == 0 { + hp.latency = next + return + } + hp.latency = time.Duration((1.0-ewmaSmoothing)*float64(hp.latency) + ewmaSmoothing*float64(next)) +} + +func (hp *hostPing) currentLatency() time.Duration { + hp.latencyMu.Lock() + defer hp.latencyMu.Unlock() + return hp.latency +} diff --git a/bitswap/network/httpnet/pinger_test.go b/bitswap/network/httpnet/pinger_test.go new file mode 100644 index 000000000..e58eb7d95 --- /dev/null +++ b/bitswap/network/httpnet/pinger_test.go @@ -0,0 +1,218 @@ +package httpnet + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ipfs/boxo/bitswap/network" +) + +// TestPingerSharesHostsAcrossPeers verifies that two peer IDs which +// resolve to the same HTTP endpoint share one host entry (refcount 2) +// rather than producing two independent tickers. +func TestPingerSharesHostsAcrossPeers(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + // One server, two peers pointing at it. + srv := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + mustConnectToPeer(t, ctx, htnet, peerB, srv) + + htnet.pinger.mu.Lock() + hosts := len(htnet.pinger.hosts) + var refcount int + for _, hp := range htnet.pinger.hosts { + refcount = hp.refcount + } + htnet.pinger.mu.Unlock() + + if hosts != 1 { + t.Fatalf("got %d host entries, want 1", hosts) + } + if refcount != 2 { + t.Fatalf("got refcount %d, want 2", refcount) + } + if !htnet.pinger.isPinging(peerA.ID()) { + t.Errorf("peerA should be pinging") + } + if !htnet.pinger.isPinging(peerB.ID()) { + t.Errorf("peerB should be pinging") + } +} + +// TestPingerStopReleasesRefcount verifies that disconnecting one of two +// peers sharing a host leaves the host entry alive (refcount 1) and +// that disconnecting the second drops it. +func TestPingerStopReleasesRefcount(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + mustConnectToPeer(t, ctx, htnet, peerB, srv) + + if err := htnet.DisconnectFrom(ctx, peerA.ID()); err != nil { + t.Fatal(err) + } + + htnet.pinger.mu.Lock() + hostsAfterA := len(htnet.pinger.hosts) + var refcountAfterA int + for _, hp := range htnet.pinger.hosts { + refcountAfterA = hp.refcount + } + htnet.pinger.mu.Unlock() + + if hostsAfterA != 1 { + t.Fatalf("after disconnecting peerA: got %d host entries, want 1", hostsAfterA) + } + if refcountAfterA != 1 { + t.Fatalf("after disconnecting peerA: got refcount %d, want 1", refcountAfterA) + } + + if err := htnet.DisconnectFrom(ctx, peerB.ID()); err != nil { + t.Fatal(err) + } + + htnet.pinger.mu.Lock() + hostsAfterB := len(htnet.pinger.hosts) + htnet.pinger.mu.Unlock() + + if hostsAfterB != 0 { + t.Fatalf("after disconnecting peerB: got %d host entries, want 0", hostsAfterB) + } +} + +// TestPingerCleansSharedStateOnLastDisconnect verifies that when the +// last peer using a host disconnects, the host-shared breaker and +// errorTracker entries are dropped so a future reconnect starts fresh. +func TestPingerCleansSharedStateOnLastDisconnect(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + + // Seed breaker and errorTracker state for the host. + urls := htnet.senderURLs(peerA.ID()) + if len(urls) != 1 { + t.Fatalf("expected 1 senderURL, got %d", len(urls)) + } + urls[0].serverErrors.Store(7) + if err := htnet.errorTracker.logErrors(urls, 5, 100); err != nil { + t.Fatalf("errorTracker.logErrors: %v", err) + } + + if err := htnet.DisconnectFrom(ctx, peerA.ID()); err != nil { + t.Fatal(err) + } + + // Last peer is gone; both shared structures should have dropped + // their entries. + htnet.breaker.mu.Lock() + breakerEntries := len(htnet.breaker.counters) + htnet.breaker.mu.Unlock() + if breakerEntries != 0 { + t.Errorf("breaker still has %d entries after last disconnect", breakerEntries) + } + + htnet.errorTracker.mu.Lock() + errorEntries := len(htnet.errorTracker.counts) + htnet.errorTracker.mu.Unlock() + if errorEntries != 0 { + t.Errorf("errorTracker still has %d entries after last disconnect", errorEntries) + } +} + +// TestPingerSkipsProbeOnCooldown verifies that ping() honours +// Retry-After: when the cooldownTracker shows the host is in its +// wait window, no HTTP probe is issued and the per-URL goroutine +// returns errCooldownActive. +func TestPingerSkipsProbeOnCooldown(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv, probes := countingProbeServer(t) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + + // Connect probed once; baseline. + if got := probes.Load(); got != 1 { + t.Fatalf("after Connect: got %d probes, want 1", got) + } + + // Put the host on cooldown directly. ping() should skip the probe. + host := htnet.host.Peerstore().Addrs(peerA.ID())[0] + purl, err := network.ExtractHTTPAddress(host) + if err != nil { + t.Fatal(err) + } + htnet.cooldownTracker.setByDuration(purl.URL.Host, 30*time.Second) + + res := htnet.pinger.ping(ctx, peerA.ID()) + if res.Error == nil || !errors.Is(res.Error, errCooldownActive) { + t.Fatalf("ping during cooldown: got err %v, want errCooldownActive", res.Error) + } + if got := probes.Load(); got != 1 { + t.Errorf("after Ping during cooldown: got %d probes, want 1 (no new probe)", got) + } +} + +// TestPingerDistinctHostsRunIndependently verifies that two peers with +// different HTTP endpoints each get their own host entry. +func TestPingerDistinctHostsRunIndependently(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srvA := makeServer(t, 0, 0) + srvB := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srvA) + mustConnectToPeer(t, ctx, htnet, peerB, srvB) + + htnet.pinger.mu.Lock() + hosts := len(htnet.pinger.hosts) + htnet.pinger.mu.Unlock() + + if hosts != 2 { + t.Fatalf("got %d host entries, want 2", hosts) + } +}