From d53d93a44ef2531fd93a860574cdea7ff1c1654b Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 30 Apr 2026 15:14:51 +0200 Subject: [PATCH 1/6] feat(httpnet): coalesce concurrent requests per HTTP endpoint multiple peer ids regularly resolve to the same http gateway. for example, /dns/a-fil-http.aur.lu/tcp/443/https is currently advertised under three peer ids by delegated-ipfs.dev. bitswap creates one messagequeue per peer id, so a single want-have broadcast for one cid fans out into n identical head requests against the same upstream; want-have probes to non-best peers fan out the same way. the pattern will compound as http retrieval adoption grows. introduce a small singleflight tracker keyed on (scheme, host, sni, method, cid). concurrent identical requests share one round trip; each caller still updates its own per-url cooldown from the shared response so bitswap's per-peer accounting is preserved. - inflight.go: tracker plus key builder - msg_sender.go: split tryurl into executerequest (wire i/o, runs once) and handleresponse (per-caller classification + cooldown) - factor clearcooldown / applybackoff helpers out of the response switch --- bitswap/network/httpnet/httpnet.go | 3 + bitswap/network/httpnet/inflight.go | 97 ++++++++ bitswap/network/httpnet/inflight_test.go | 158 +++++++++++++ bitswap/network/httpnet/msg_sender.go | 274 ++++++++++++----------- 4 files changed, 399 insertions(+), 133 deletions(-) create mode 100644 bitswap/network/httpnet/inflight.go create mode 100644 bitswap/network/httpnet/inflight_test.go diff --git a/bitswap/network/httpnet/httpnet.go b/bitswap/network/httpnet/httpnet.go index 7be48267d..fa05a7fdd 100644 --- a/bitswap/network/httpnet/httpnet.go +++ b/bitswap/network/httpnet/httpnet.go @@ -234,6 +234,7 @@ type Network struct { errorTracker *errorTracker requestTracker *requestTracker cooldownTracker *cooldownTracker + inflight *inflightTracker ongoingConnsLock sync.RWMutex ongoingConns map[peer.ID]struct{} @@ -302,6 +303,8 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork { cooldownTracker := newCooldownTracker(DefaultMaxBackoff) htnet.cooldownTracker = cooldownTracker + htnet.inflight = newInflightTracker() + netdialer := &net.Dialer{ // Timeout for connects to complete. Timeout: htnet.dialTimeout, diff --git a/bitswap/network/httpnet/inflight.go b/bitswap/network/httpnet/inflight.go new file mode 100644 index 000000000..532743c00 --- /dev/null +++ b/bitswap/network/httpnet/inflight.go @@ -0,0 +1,97 @@ +package httpnet + +import ( + "strings" + "sync" +) + +// inflightTracker coalesces concurrent identical HTTP requests so that +// multiple peer IDs sharing one HTTP endpoint produce one round trip, +// not one per peer ID. +// +// Delegated routing returns multiple peer IDs for one HTTP gateway. For +// example, /dns/a-fil-http.aur.lu/tcp/443/https is currently advertised +// under three peer IDs. Bitswap creates one MessageQueue per peer ID, so +// a single want-have broadcast for one CID becomes three identical HEAD +// requests against the same upstream. Want-have probes that bitswap +// sends to non-best peers fan out the same way. The pattern compounds +// as HTTP retrieval grows. +// +// Coalescing applies only while a request is in flight. Once the leader +// returns, subsequent callers issue their own requests; we do not cache +// results. +type inflightTracker struct { + mu sync.Mutex + calls map[string]*inflightCall +} + +// inflightResult carries the response data shared from leader to waiters. +// +// On a successful round trip, statusCode, body, retryAfter, and +// requestURL are populated and err is nil. On failure, err and errType +// are populated; statusCode and body may also be set if the failure +// happened after the response arrived. +type inflightResult struct { + statusCode int + body []byte + retryAfter string + requestURL string + err error + errType senderErrorType // valid only when err != nil +} + +type inflightCall struct { + done chan struct{} + result *inflightResult +} + +func newInflightTracker() *inflightTracker { + return &inflightTracker{calls: make(map[string]*inflightCall)} +} + +// do runs fn once for the leader and returns the same result to every +// concurrent waiter on key. shared reports whether this caller waited on +// another leader. +// +// The key must distinguish requests that differ in any field that would +// change the upstream response: scheme, host, SNI, method, and CID. Two +// peers that agree on all five send identical bytes on the wire and may +// share the result. +func (t *inflightTracker) do(key string, fn func() *inflightResult) (result *inflightResult, shared bool) { + t.mu.Lock() + if c, ok := t.calls[key]; ok { + t.mu.Unlock() + <-c.done + return c.result, true + } + c := &inflightCall{done: make(chan struct{})} + t.calls[key] = c + t.mu.Unlock() + + c.result = fn() + + t.mu.Lock() + delete(t.calls, key) + t.mu.Unlock() + close(c.done) + + return c.result, false +} + +// inflightKey builds the singleflight key. SNI is part of the key +// because providers can advertise the same host with different SNI +// expectations and the upstream response may differ. +func inflightKey(scheme, host, sni, method, cid string) string { + var sb strings.Builder + sb.Grow(len(scheme) + len(host) + len(sni) + len(method) + len(cid) + 7) + sb.WriteString(scheme) + sb.WriteString("://") + sb.WriteString(host) + sb.WriteByte('|') + sb.WriteString(sni) + sb.WriteByte('|') + sb.WriteString(method) + sb.WriteByte('|') + sb.WriteString(cid) + return sb.String() +} diff --git a/bitswap/network/httpnet/inflight_test.go b/bitswap/network/httpnet/inflight_test.go new file mode 100644 index 000000000..afba6d497 --- /dev/null +++ b/bitswap/network/httpnet/inflight_test.go @@ -0,0 +1,158 @@ +package httpnet + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + "testing/synctest" +) + +// TestInflightCoalescesConcurrentCallers verifies that concurrent callers +// on the same key share one underlying call and observe the same result. +func TestInflightCoalescesConcurrentCallers(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + tracker := newInflightTracker() + + var calls atomic.Int32 + gate := make(chan struct{}) + want := &inflightResult{statusCode: 200, body: []byte("hello")} + + fn := func() *inflightResult { + calls.Add(1) + <-gate + return want + } + + const N = 10 + results := make([]*inflightResult, N) + shareds := make([]bool, N) + + var wg sync.WaitGroup + wg.Add(N) + for i := range N { + go func(i int) { + defer wg.Done() + results[i], shareds[i] = tracker.do("k", fn) + }(i) + } + + // Wait until the leader is blocked on gate and every waiter + // is blocked on the inflightCall's done channel. + synctest.Wait() + close(gate) + wg.Wait() + + if got := calls.Load(); got != 1 { + t.Fatalf("fn called %d times, want 1", got) + } + leaders := 0 + for i, r := range results { + if r != want { + t.Fatalf("caller %d got result %v, want %v", i, r, want) + } + if !shareds[i] { + leaders++ + } + } + if leaders != 1 { + t.Fatalf("got %d leaders, want exactly 1", leaders) + } + }) +} + +// TestInflightDistinctKeysRunIndependently verifies that callers on +// different keys do not block each other and each runs fn. +func TestInflightDistinctKeysRunIndependently(t *testing.T) { + tracker := newInflightTracker() + + var calls atomic.Int32 + fn := func() *inflightResult { + calls.Add(1) + return &inflightResult{statusCode: 200} + } + + for _, key := range []string{"a", "b", "c"} { + _, shared := tracker.do(key, fn) + if shared { + t.Fatalf("key %q reported shared=true on first call", key) + } + } + if got := calls.Load(); got != 3 { + t.Fatalf("fn called %d times, want 3", got) + } +} + +// TestInflightReleasesAfterCompletion verifies that the tracker drops +// completed entries so that subsequent identical calls run fn again +// rather than returning the stale prior result. +func TestInflightReleasesAfterCompletion(t *testing.T) { + tracker := newInflightTracker() + + first := &inflightResult{statusCode: 200, body: []byte("a")} + second := &inflightResult{statusCode: 404, body: []byte("b")} + + got1, shared1 := tracker.do("k", func() *inflightResult { return first }) + if shared1 || got1 != first { + t.Fatalf("first call: shared=%v got=%v want first=%v", shared1, got1, first) + } + + got2, shared2 := tracker.do("k", func() *inflightResult { return second }) + if shared2 || got2 != second { + t.Fatalf("second call: shared=%v got=%v want second=%v", shared2, got2, second) + } +} + +// TestInflightPropagatesErrors verifies that a leader failure surfaces +// to all waiters identically. +func TestInflightPropagatesErrors(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + tracker := newInflightTracker() + + gate := make(chan struct{}) + wantErr := errors.New("boom") + fn := func() *inflightResult { + <-gate + return &inflightResult{err: wantErr, errType: typeServer} + } + + const N = 4 + results := make([]*inflightResult, N) + var wg sync.WaitGroup + wg.Add(N) + for i := range N { + go func(i int) { + defer wg.Done() + results[i], _ = tracker.do("k", fn) + }(i) + } + synctest.Wait() + close(gate) + wg.Wait() + + for i, r := range results { + if r == nil || !errors.Is(r.err, wantErr) { + t.Fatalf("caller %d got %v, want err=%v", i, r, wantErr) + } + } + }) +} + +// TestInflightKeyDistinguishesFields verifies that the key separates +// requests that differ in any field that influences the upstream +// response. +func TestInflightKeyDistinguishesFields(t *testing.T) { + base := inflightKey("https", "example.com", "sni", "GET", "cid") + cases := map[string]string{ + "scheme": inflightKey("http", "example.com", "sni", "GET", "cid"), + "host": inflightKey("https", "other.com", "sni", "GET", "cid"), + "sni": inflightKey("https", "example.com", "other-sni", "GET", "cid"), + "method": inflightKey("https", "example.com", "sni", "HEAD", "cid"), + "cid": inflightKey("https", "example.com", "sni", "GET", "other-cid"), + } + for name, k := range cases { + if k == base { + t.Errorf("differing %s produced identical key", name) + } + } +} diff --git a/bitswap/network/httpnet/msg_sender.go b/bitswap/network/httpnet/msg_sender.go index 1a6734e13..a84acd4d6 100644 --- a/bitswap/network/httpnet/msg_sender.go +++ b/bitswap/network/httpnet/msg_sender.go @@ -174,10 +174,14 @@ func (err senderError) Error() string { return err.Err.Error() } -// tryURL attempts to make a request to the given URL using the given entry. -// Blocks, Haves etc. are recorded in the given response. cancellations are -// processed. tryURL returns an error so that it can be decided what to do next: -// i.e. retry, or move to next item in wantlist, or abort completely. +// tryURL makes one HTTP request for entry against u. It returns a +// senderError describing what the caller should do next: retry the same +// URL, move on to the next URL, skip the entry, or abort. +// +// Concurrent identical requests against the same HTTP endpoint share one +// round trip via the inflight tracker. See inflight.go for the rationale. +// Each caller still updates its own per-URL cooldown from the shared +// response. func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsmsg.Entry) (blocks.Block, *senderError) { var method string @@ -193,32 +197,43 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm if dl := u.cooldown.Load().(time.Time); !dl.IsZero() { err := fmt.Errorf("cooldown (%s): %s %q ", dl, method, u.URL) log.Debug(err) - return nil, &senderError{ - Type: typeRetryLater, - Err: err, - } + return nil, &senderError{Type: typeRetryLater, Err: err} } - // We do not abort ongoing requests. This is known to cause "http2: - // server sent GOAWAY and closed the connection" Losing a connection - // is worse than downloading some extra bytes. We do abort if the - // context WAS already cancelled before making the request. + // Abort only if the parent context is already cancelled. Cancelling + // an in-flight request triggers "http2: server sent GOAWAY and + // closed the connection"; the lost connection costs more than the + // extra bytes. if err := ctx.Err(); err != nil { log.Debugf("aborted before sending: %s %q", method, u.URL) - return nil, &senderError{ - Type: typeContext, - Err: err, - } + return nil, &senderError{Type: typeContext, Err: err} + } + + cidStr := entry.Cid.String() + key := inflightKey(u.URL.Scheme, u.URL.Host, u.SNI, method, cidStr) + res, shared := sender.ht.inflight.do(key, func() *inflightResult { + return sender.executeRequest(u, method, cidStr) + }) + if shared { + log.Debugf("piggybacked on inflight request: %s %q", method, u.URL) } + return sender.handleResponse(u, entry, method, res) +} + +// executeRequest runs the HTTP round trip and records wire-level metrics. +// It runs once per coalesced request; waiters reuse the result via +// inflightTracker.do. +func (sender *httpMsgSender) executeRequest(u *senderURL, method, cidStr string) *inflightResult { + // Detached context with the configured send timeout: the request must + // outlive any single caller's context so that waiters always get a + // usable result. ctx, cancel := context.WithTimeout(context.Background(), sender.opts.SendTimeout) defer cancel() - req, err := buildRequest(ctx, u.ParsedURL, method, entry.Cid.String(), sender.ht.userAgent) + + req, err := buildRequest(ctx, u.ParsedURL, method, cidStr, sender.ht.userAgent) if err != nil { - return nil, &senderError{ - Type: typeFatal, - Err: err, - } + return &inflightResult{err: err, errType: typeFatal} } log.Debugf("%d/%d %s %q", u.serverErrors.Load(), sender.opts.MaxRetries, method, req.URL) @@ -226,78 +241,82 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm sender.ht.metrics.RequestsInFlight.Inc() resp, err := sender.ht.client.Do(req) if err != nil { - err = fmt.Errorf("error making request to %q: %w", req.URL, err) + wrapped := fmt.Errorf("error making request to %q: %w", req.URL, err) sender.ht.metrics.RequestsFailure.Inc() sender.ht.metrics.RequestsInFlight.Dec() - log.Debug(err) - // Something prevents us from making a request. We cannot - // dial, or setup the connection perhaps. This counts as - // server error (unless context cancellation). This means we - // allow ourselves to hit this a maximum of MaxRetries per url. - // and Disconnect() the peer when no urls work. - serr := &senderError{ - Type: typeServer, - Err: err, - } - + log.Debug(wrapped) + errType := typeServer if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - serr.Type = typeContext // cont. with next block. + errType = typeContext + } + return &inflightResult{ + err: wrapped, + errType: errType, + requestURL: req.URL.String(), } - - return nil, serr } defer resp.Body.Close() - // Record request size var buf bytes.Buffer req.Write(&buf) - sender.ht.metrics.RequestsSentBytes.Add(float64((&buf).Len())) - - // Handle responses - limReader := &io.LimitedReader{ - R: resp.Body, - N: sender.ht.maxBlockSize, - } + sender.ht.metrics.RequestsSentBytes.Add(float64(buf.Len())) + limReader := &io.LimitedReader{R: resp.Body, N: sender.ht.maxBlockSize} body, err := io.ReadAll(limReader) if err != nil { - // treat this as server error - err = fmt.Errorf("error reading body from %q: %w", req.URL, err) + wrapped := fmt.Errorf("error reading body from %q: %w", req.URL, err) sender.ht.metrics.RequestsBodyFailure.Inc() sender.ht.metrics.RequestsInFlight.Dec() - log.Debug(err) - return nil, &senderError{ - Type: typeServer, - Err: err, + log.Debug(wrapped) + return &inflightResult{ + err: wrapped, + errType: typeServer, + statusCode: resp.StatusCode, + requestURL: req.URL.String(), } } - // special cases in response handling. Happens here to simplify - // metrics/handling below. statusCode := resp.StatusCode - // 1) Observed that some gateway implementation returns 500 instead of - // 404. - if statusCode != 200 && isKnownNotFoundError(string(body)) { - statusCode = 404 + // Some gateway implementations return 500 with an IPLD-shaped error + // body when they cannot find the content. Treat those as 404. + if statusCode != http.StatusOK && isKnownNotFoundError(string(body)) { + statusCode = http.StatusNotFound log.Debugf("treating as 404: %q -> %d: %q", req.URL, resp.StatusCode, string(body)) } - // Calculate full response size with headers and everything. - // So this is comparable to bitswap message response sizes. + // Record the full response size including headers so it stays + // comparable to bitswap message response sizes. resp.Body = nil var respBuf bytes.Buffer resp.Write(&respBuf) - respLen := (&respBuf).Len() + len(body) + respLen := respBuf.Len() + len(body) sender.ht.metrics.ResponseSizes.Observe(float64(respLen)) sender.ht.metrics.RequestsInFlight.Dec() - host := u.URL.Hostname() - // updateStatusCounter - sender.ht.metrics.updateStatusCounter(req.Method, statusCode, host) + sender.ht.metrics.updateStatusCounter(req.Method, statusCode, u.URL.Hostname()) + + return &inflightResult{ + statusCode: statusCode, + body: body, + retryAfter: resp.Header.Get("Retry-After"), + requestURL: req.URL.String(), + } +} + +// handleResponse classifies the shared HTTP result for the calling sender +// and updates its per-URL cooldown. Each waiter on a coalesced request +// runs this independently so that bitswap's per-peer accounting matches +// what it would see if every peer had made its own request. +func (sender *httpMsgSender) handleResponse(u *senderURL, entry bsmsg.Entry, method string, res *inflightResult) (blocks.Block, *senderError) { + if res.err != nil { + return nil, &senderError{Type: res.errType, Err: res.err} + } + + statusCode := res.statusCode + body := res.body switch statusCode { - // Valid responses signaling unavailability of the - // content. + // Valid responses that signal the content is unavailable. case http.StatusNotFound, http.StatusGone, http.StatusForbidden, @@ -308,39 +327,24 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm http.StatusTemporaryRedirect, http.StatusPermanentRedirect: - err := fmt.Errorf("%s %q -> %d: %q", req.Method, req.URL, statusCode, string(body)) + err := fmt.Errorf("%s %q -> %d: %q", method, res.requestURL, statusCode, string(body)) log.Debug(err) - // clear cooldowns since we got a proper reply - if !u.cooldown.Load().(time.Time).IsZero() { - sender.ht.cooldownTracker.remove(req.URL.Host) - u.cooldown.Store(time.Time{}) - } + sender.clearCooldown(u) + return nil, &senderError{Type: typeClient, Err: err} - return nil, &senderError{ - Type: typeClient, - Err: err, - } - case http.StatusOK: // \(^°^)/ - // clear cooldowns since we got a proper reply - if !u.cooldown.Load().(time.Time).IsZero() { - sender.ht.cooldownTracker.remove(req.URL.Host) - u.cooldown.Store(time.Time{}) - } - log.Debugf("%s %q -> %d (%d bytes)", req.Method, req.URL, statusCode, len(body)) + case http.StatusOK: + sender.clearCooldown(u) + log.Debugf("%s %q -> %d (%d bytes)", method, res.requestURL, statusCode, len(body)) - if req.Method == http.MethodHead { + if method == http.MethodHead { return nil, nil } - // GET b, err := bsmsg.NewWantlistBlock(body, entry.Cid, entry.Cid.Prefix()) if err != nil { log.Debugf("error making wantlist block for %s: %s", entry.Cid, err) - // avoid entertaining servers that send us wrong data - // too much. - return nil, &senderError{ - Type: typeServer, - Err: err, - } + // Server returned wrong data; treat as a server error so + // repeat offenders trip the breaker. + return nil, &senderError{Type: typeServer, Err: err} } atomic.AddUint64(&sender.ht.stats.MessagesRecvd, 1) return b, nil @@ -349,55 +353,59 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm http.StatusServiceUnavailable, http.StatusBadGateway, http.StatusGatewayTimeout: - // See path-gateway spec. All these codes SHOULD return - // Retry-After. They are used to signal that a block cannot - // be fetched too, not only fatal server issues, which poses a - // difficult overlap. Current approach treats these errors as - // non fatal if they don't happen repeatedly: - // - By default we disconnect on server errors: MaxRetries = 1. - // - First try errors. We add default backoff if non specified. - // - Retry same CID. If it fails again, count that as server - // error and avoid retrying on that url. - // - If we have no more urls to try, will move to next cid. - // - If we hit the MaxRetries for all urls, abort all. - - // In practice, our wantlists should be 1/3 elements. It - // doesn't make sense to tolerate 5 server errors for 3 - // requests as we will repeatedly hit broken servers that way. - // It is always better if endpoints keep these errors for - // server issues, and simply return 404 when they cannot find - // the content but everything else is fine. - err := fmt.Errorf("%s %q -> %d: %q", req.Method, req.URL, statusCode, string(body)) + // Per the path-gateway spec these codes SHOULD carry + // Retry-After. They cover both fatal server issues and "block + // not available right now", which overlap. We treat them as + // non-fatal until they repeat: + // - MaxRetries defaults to 1, so we disconnect on the second + // consecutive server error. + // - First failure: cooldown using Retry-After if present, + // else the configured backoff. + // - Retry the same CID. If it fails again, count it as a + // server error and avoid retrying that URL. + // - When all URLs hit MaxRetries, abort. + // + // Wantlists are typically 1-3 items; tolerating many server + // errors per cycle just means hammering broken servers. We + // prefer that endpoints reserve these codes for genuine + // server issues and return 404 for missing content. + err := fmt.Errorf("%s %q -> %d: %q", method, res.requestURL, statusCode, string(body)) log.Warn(err) - retryAfter := resp.Header.Get("Retry-After") - cooldownUntil, ok := parseRetryAfter(retryAfter) - if ok { // it means we should retry, so we will retry. - sender.ht.cooldownTracker.setByDate(req.URL.Host, cooldownUntil) - u.cooldown.Store(cooldownUntil) - } else { - sender.ht.cooldownTracker.setByDuration(req.URL.Host, sender.opts.SendErrorBackoff) - u.cooldown.Store(time.Now().Add(sender.opts.SendErrorBackoff)) - } + sender.applyBackoff(u, res.retryAfter) + return nil, &senderError{Type: typeRetryLater, Err: err} - return nil, &senderError{ - Type: typeRetryLater, - Err: err, - } - - // For any other code, we assume we must temporally - // backoff from the URL per the options. - // Tolerance for server errors per url is low. If after waiting etc. - // it fails MaxRetries, we will fully disconnect. + // For any other code we back off from the URL per the options. + // Tolerance for server errors per URL is low: after MaxRetries we + // disconnect from the peer. default: - err := fmt.Errorf("%q -> %d: %q", req.URL, statusCode, string(body)) + err := fmt.Errorf("%q -> %d: %q", res.requestURL, statusCode, string(body)) log.Warn(err) - sender.ht.cooldownTracker.setByDuration(req.URL.Host, sender.opts.SendErrorBackoff) - u.cooldown.Store(time.Now().Add(sender.opts.SendErrorBackoff)) - return nil, &senderError{ - Type: typeServer, - Err: err, - } + sender.applyBackoff(u, "") + return nil, &senderError{Type: typeServer, Err: err} + } +} + +// clearCooldown removes any active cooldown on u after a definitive +// response. Both the per-URL cooldown and the host-level cooldownTracker +// are cleared so that future senders for the same host start fresh. +func (sender *httpMsgSender) clearCooldown(u *senderURL) { + if !u.cooldown.Load().(time.Time).IsZero() { + sender.ht.cooldownTracker.remove(u.URL.Host) + u.cooldown.Store(time.Time{}) + } +} + +// applyBackoff sets a cooldown on u. If retryAfter parses as a date or +// seconds, that wins; otherwise we fall back to the configured +// SendErrorBackoff. +func (sender *httpMsgSender) applyBackoff(u *senderURL, retryAfter string) { + if t, ok := parseRetryAfter(retryAfter); ok { + sender.ht.cooldownTracker.setByDate(u.URL.Host, t) + u.cooldown.Store(t) + return } + sender.ht.cooldownTracker.setByDuration(u.URL.Host, sender.opts.SendErrorBackoff) + u.cooldown.Store(time.Now().Add(sender.opts.SendErrorBackoff)) } // isKnownNotFoundError checks if the response body contains a known IPLD-specific From 936c8502cb0e63c2f84f18909d1b88c191607cf4 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 30 Apr 2026 15:54:08 +0200 Subject: [PATCH 2/6] feat(httpnet): refcount pinger per HTTP endpoint multiple peer ids that resolve to the same http gateway each ran their own 5s ping ticker. for a record like /dns/a-fil-http.aur.lu/tcp/443/https that is currently advertised under three peer ids, that meant ~36 head /ipfs/bafkqaaa requests per minute to one upstream while idle. the gateway is a single physical resource; ping it once. key pinger state by (scheme, host, sni) instead of peer.id. peers sharing an endpoint bump a refcount on the existing hostping; the ticker runs while refcount > 0. ewma latency is stored per host; latency(peer.id) averages across that peer's hosts. extract endpointkey out of inflightkey so the inflight tracker and the pinger share one helper. --- bitswap/network/httpnet/httpnet.go | 2 +- bitswap/network/httpnet/inflight.go | 15 +- bitswap/network/httpnet/pinger.go | 284 +++++++++++++++++-------- bitswap/network/httpnet/pinger_test.go | 130 +++++++++++ 4 files changed, 343 insertions(+), 88 deletions(-) create mode 100644 bitswap/network/httpnet/pinger_test.go diff --git a/bitswap/network/httpnet/httpnet.go b/bitswap/network/httpnet/httpnet.go index fa05a7fdd..16be65537 100644 --- a/bitswap/network/httpnet/httpnet.go +++ b/bitswap/network/httpnet/httpnet.go @@ -363,7 +363,7 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork { } htnet.client = c - pinger := newPinger(htnet, pingCid) + pinger := newPinger(htnet) htnet.pinger = pinger et := newErrorTracker(htnet) diff --git a/bitswap/network/httpnet/inflight.go b/bitswap/network/httpnet/inflight.go index 532743c00..7caa62d9f 100644 --- a/bitswap/network/httpnet/inflight.go +++ b/bitswap/network/httpnet/inflight.go @@ -78,9 +78,22 @@ func (t *inflightTracker) do(key string, fn func() *inflightResult) (result *inf return c.result, false } -// inflightKey builds the singleflight key. SNI is part of the key +// endpointKey identifies a unique HTTP endpoint. SNI is part of the key // because providers can advertise the same host with different SNI // expectations and the upstream response may differ. +func endpointKey(scheme, host, sni string) string { + var sb strings.Builder + sb.Grow(len(scheme) + len(host) + len(sni) + 4) + sb.WriteString(scheme) + sb.WriteString("://") + sb.WriteString(host) + sb.WriteByte('|') + sb.WriteString(sni) + return sb.String() +} + +// inflightKey extends endpointKey with method and CID, identifying a +// unique HTTP request that can be coalesced across callers. func inflightKey(scheme, host, sni, method, cid string) string { var sb strings.Builder sb.Grow(len(scheme) + len(host) + len(sni) + len(method) + len(cid) + 7) diff --git a/bitswap/network/httpnet/pinger.go b/bitswap/network/httpnet/pinger.go index 6a161eca7..400d7b70d 100644 --- a/bitswap/network/httpnet/pinger.go +++ b/bitswap/network/httpnet/pinger.go @@ -11,36 +11,71 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) -// pinger pings connected hosts on regular intervals -// and tracks their latency. +// pingInterval is how often the per-host ticker probes each unique HTTP +// endpoint to refresh its latency reading. +const pingInterval = 5 * time.Second + +// ewmaSmoothing is the smoothing factor for the per-host latency EWMA. +// Matches the value used by the libp2p peerstore's LatencyEWMA. +const ewmaSmoothing = 0.1 + +// pinger probes HTTP endpoints periodically and tracks their latency. +// +// State is keyed by HTTP endpoint, not by peer ID. Multiple peer IDs that +// resolve to the same endpoint share one ticker and one latency reading. +// /dns/a-fil-http.aur.lu/tcp/443/https is currently advertised by three +// peer IDs in delegated-ipfs.dev responses; with per-peer pinging, three +// peer IDs idle on that gateway produce 36 HEAD requests per minute. The +// gateway is a single physical resource, so we ping it once. type pinger struct { ht *Network - latenciesLock sync.RWMutex - latencies map[peer.ID]time.Duration + mu sync.Mutex + // hosts holds one entry per unique HTTP endpoint currently being + // pinged. Keyed by endpointKey. + hosts map[string]*hostPing + // peerHosts maps each registered peer to the host keys it depends + // on, so stopPinging can decrement the right refcounts. + peerHosts map[peer.ID]map[string]struct{} +} + +// hostPing tracks the periodic probe for one HTTP endpoint plus its +// latency reading. refcount counts the peer IDs registered against it; +// the ticker runs while refcount > 0. +type hostPing struct { + url network.ParsedURL + method string + // peerID is a representative peer.ID kept for connectToURL logging. + // It is captured at first registration; later peers piggyback on + // this one's probes. + peerID peer.ID - pingsLock sync.RWMutex - pings map[peer.ID]context.CancelFunc + refcount int + cancel context.CancelFunc + + latencyMu sync.Mutex + latency time.Duration // EWMA, zero until the first sample } -func newPinger(ht *Network, pingCid string) *pinger { +func newPinger(ht *Network) *pinger { return &pinger{ ht: ht, - latencies: make(map[peer.ID]time.Duration), - pings: make(map[peer.ID]context.CancelFunc), + hosts: make(map[string]*hostPing), + peerHosts: make(map[peer.ID]map[string]struct{}), } } -// ping sends a ping packet to the first known url of the given peer and -// returns the result with the latency for this peer. The result is also -// recorded. +// ping issues immediate probes against every URL of p in parallel and +// returns the average RTT. It also records the per-host latency so that +// subsequent latency() calls reflect the fresh reading. +// +// ping is intended for callers that need a fresh sample (Network.Ping); +// the periodic ticker handles the steady-state case independently. func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { pi := pngr.ht.host.Peerstore().PeerInfo(p) urls := network.ExtractURLsFromPeer(pi) if len(urls) == 0 { - return ping.Result{ - Error: ErrNoHTTPAddresses, - } + return ping.Result{Error: ErrNoHTTPAddresses} } method := "GET" @@ -58,9 +93,9 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { results <- ping.Result{Error: err} return } - results <- ping.Result{ - RTT: time.Since(start), - } + rtt := time.Since(start) + pngr.recordHostLatency(u, rtt) + results <- ping.Result{RTT: rtt} }(u) } @@ -76,99 +111,176 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { } close(results) - lenErrors := len(errs) - // if all urls failed return that, otherwise ignore. - if lenErrors == len(urls) { - return ping.Result{ - Error: errors.Join(errs...), - } + if len(errs) == len(urls) { + return ping.Result{Error: errors.Join(errs...)} } - result.RTT = result.RTT / time.Duration(len(urls)-lenErrors) - - // log.Debugf("ping latency %s %s", p, result.RTT) - pngr.recordLatency(p, result.RTT) + result.RTT = result.RTT / time.Duration(len(urls)-len(errs)) return result } -// latency returns the recorded latency for the given peer. -func (pngr *pinger) latency(p peer.ID) time.Duration { - var lat time.Duration - pngr.latenciesLock.RLock() - { - lat = pngr.latencies[p] +// recordHostLatency updates the EWMA for the host that u points to. The +// host entry must already exist (created by startPinging). +func (pngr *pinger) recordHostLatency(u network.ParsedURL, next time.Duration) { + key := endpointKey(u.URL.Scheme, u.URL.Host, u.SNI) + pngr.mu.Lock() + hp := pngr.hosts[key] + pngr.mu.Unlock() + if hp == nil { + return } - pngr.latenciesLock.RUnlock() - return lat + hp.recordLatency(next) } -// recordLatency stores a new latency measurement for the given peer using an -// Exponetially Weighted Moving Average similar to LatencyEWMA from the -// peerstore. -func (pngr *pinger) recordLatency(p peer.ID, next time.Duration) { - nextf := float64(next) - s := 0.1 - pngr.latenciesLock.Lock() - { - ewma, found := pngr.latencies[p] - ewmaf := float64(ewma) - if !found { - pngr.latencies[p] = next // when no data, just take it as the mean. - } else { - nextf = ((1.0 - s) * ewmaf) + (s * nextf) - pngr.latencies[p] = time.Duration(nextf) +// latency returns the average EWMA latency across all hosts that p is +// registered against. Returns zero when p has no registered hosts or no +// host has produced a sample yet. +func (pngr *pinger) latency(p peer.ID) time.Duration { + pngr.mu.Lock() + keys := pngr.peerHosts[p] + hosts := make([]*hostPing, 0, len(keys)) + for k := range keys { + if hp, ok := pngr.hosts[k]; ok { + hosts = append(hosts, hp) + } + } + pngr.mu.Unlock() + + var total time.Duration + var samples int + for _, hp := range hosts { + if l := hp.currentLatency(); l > 0 { + total += l + samples++ } } - pngr.latenciesLock.Unlock() + if samples == 0 { + return 0 + } + return total / time.Duration(samples) } +// startPinging registers p with all of its known HTTP endpoints. New +// endpoints get a ticker; endpoints already pinged (because another peer +// shares them) just bump their refcount. func (pngr *pinger) startPinging(p peer.ID) { - pngr.pingsLock.Lock() - defer pngr.pingsLock.Unlock() - - _, ok := pngr.pings[p] - if ok { - log.Debugf("already pinging %s", p) + pi := pngr.ht.host.Peerstore().PeerInfo(p) + urls := network.ExtractURLsFromPeer(pi) + if len(urls) == 0 { + log.Debugf("startPinging: no HTTP URLs for %s", p) return } - ctx, cancel := context.WithCancel(context.Background()) - pngr.pings[p] = cancel + method := "GET" + if supportsHave(pngr.ht.host.Peerstore(), p) { + method = "HEAD" + } - log.Debugf("starting pings to %s", p) + pngr.mu.Lock() + defer pngr.mu.Unlock() - go func(ctx context.Context, p peer.ID) { - ticker := time.NewTicker(5 * time.Second) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - pngr.ping(ctx, p) - } + if _, ok := pngr.peerHosts[p]; ok { + log.Debugf("already pinging %s", p) + return + } + keys := make(map[string]struct{}, len(urls)) + for _, u := range urls { + key := endpointKey(u.URL.Scheme, u.URL.Host, u.SNI) + if _, dup := keys[key]; dup { + continue // peer advertised the same endpoint twice. + } + keys[key] = struct{}{} + + if hp, ok := pngr.hosts[key]; ok { + hp.refcount++ + continue + } + ctx, cancel := context.WithCancel(context.Background()) + hp := &hostPing{ + url: u, + method: method, + peerID: p, + refcount: 1, + cancel: cancel, } - }(ctx, p) + pngr.hosts[key] = hp + go pngr.tickerLoop(ctx, hp) + } + pngr.peerHosts[p] = keys + log.Debugf("starting pings for %s on %d hosts", p, len(keys)) } +// stopPinging unregisters p. Hosts whose refcount drops to zero have +// their ticker cancelled and their state removed. func (pngr *pinger) stopPinging(p peer.ID) { - log.Debugf("stopping pings to %s", p) - pngr.pingsLock.Lock() - { - cancel, ok := pngr.pings[p] - if ok { - cancel() + pngr.mu.Lock() + defer pngr.mu.Unlock() + + keys, ok := pngr.peerHosts[p] + if !ok { + return + } + delete(pngr.peerHosts, p) + log.Debugf("stopping pings for %s", p) + + for key := range keys { + hp := pngr.hosts[key] + if hp == nil { + continue } - delete(pngr.pings, p) + hp.refcount-- + if hp.refcount > 0 { + continue + } + hp.cancel() + delete(pngr.hosts, key) } - pngr.pingsLock.Unlock() - pngr.latenciesLock.Lock() - delete(pngr.latencies, p) - pngr.latenciesLock.Unlock() } +// isPinging reports whether p has any host registrations. func (pngr *pinger) isPinging(p peer.ID) bool { - pngr.pingsLock.RLock() - defer pngr.pingsLock.RUnlock() - - _, ok := pngr.pings[p] + pngr.mu.Lock() + defer pngr.mu.Unlock() + _, ok := pngr.peerHosts[p] return ok } + +// tickerLoop probes hp on pingInterval until ctx is cancelled. +func (pngr *pinger) tickerLoop(ctx context.Context, hp *hostPing) { + ticker := time.NewTicker(pingInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + pngr.tickOnce(ctx, hp) + } + } +} + +func (pngr *pinger) tickOnce(ctx context.Context, hp *hostPing) { + start := time.Now() + if _, err := pngr.ht.connectToURL(ctx, hp.peerID, hp.url, hp.method); err != nil { + log.Debug(err) + return + } + hp.recordLatency(time.Since(start)) +} + +// recordLatency updates the host's EWMA latency. The first sample is +// stored as-is; subsequent samples are smoothed. +func (hp *hostPing) recordLatency(next time.Duration) { + hp.latencyMu.Lock() + defer hp.latencyMu.Unlock() + if hp.latency == 0 { + hp.latency = next + return + } + hp.latency = time.Duration((1.0-ewmaSmoothing)*float64(hp.latency) + ewmaSmoothing*float64(next)) +} + +func (hp *hostPing) currentLatency() time.Duration { + hp.latencyMu.Lock() + defer hp.latencyMu.Unlock() + return hp.latency +} diff --git a/bitswap/network/httpnet/pinger_test.go b/bitswap/network/httpnet/pinger_test.go new file mode 100644 index 000000000..2870c66b6 --- /dev/null +++ b/bitswap/network/httpnet/pinger_test.go @@ -0,0 +1,130 @@ +package httpnet + +import ( + "context" + "testing" +) + +// TestPingerSharesHostsAcrossPeers verifies that two peer IDs which +// resolve to the same HTTP endpoint share one host entry (refcount 2) +// rather than producing two independent tickers. +func TestPingerSharesHostsAcrossPeers(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + // One server, two peers pointing at it. + srv := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + mustConnectToPeer(t, ctx, htnet, peerB, srv) + + htnet.pinger.mu.Lock() + hosts := len(htnet.pinger.hosts) + var refcount int + for _, hp := range htnet.pinger.hosts { + refcount = hp.refcount + } + htnet.pinger.mu.Unlock() + + if hosts != 1 { + t.Fatalf("got %d host entries, want 1", hosts) + } + if refcount != 2 { + t.Fatalf("got refcount %d, want 2", refcount) + } + if !htnet.pinger.isPinging(peerA.ID()) { + t.Errorf("peerA should be pinging") + } + if !htnet.pinger.isPinging(peerB.ID()) { + t.Errorf("peerB should be pinging") + } +} + +// TestPingerStopReleasesRefcount verifies that disconnecting one of two +// peers sharing a host leaves the host entry alive (refcount 1) and +// that disconnecting the second drops it. +func TestPingerStopReleasesRefcount(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + mustConnectToPeer(t, ctx, htnet, peerB, srv) + + if err := htnet.DisconnectFrom(ctx, peerA.ID()); err != nil { + t.Fatal(err) + } + + htnet.pinger.mu.Lock() + hostsAfterA := len(htnet.pinger.hosts) + var refcountAfterA int + for _, hp := range htnet.pinger.hosts { + refcountAfterA = hp.refcount + } + htnet.pinger.mu.Unlock() + + if hostsAfterA != 1 { + t.Fatalf("after disconnecting peerA: got %d host entries, want 1", hostsAfterA) + } + if refcountAfterA != 1 { + t.Fatalf("after disconnecting peerA: got refcount %d, want 1", refcountAfterA) + } + + if err := htnet.DisconnectFrom(ctx, peerB.ID()); err != nil { + t.Fatal(err) + } + + htnet.pinger.mu.Lock() + hostsAfterB := len(htnet.pinger.hosts) + htnet.pinger.mu.Unlock() + + if hostsAfterB != 0 { + t.Fatalf("after disconnecting peerB: got %d host entries, want 0", hostsAfterB) + } +} + +// TestPingerDistinctHostsRunIndependently verifies that two peers with +// different HTTP endpoints each get their own host entry. +func TestPingerDistinctHostsRunIndependently(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srvA := makeServer(t, 0, 0) + srvB := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srvA) + mustConnectToPeer(t, ctx, htnet, peerB, srvB) + + htnet.pinger.mu.Lock() + hosts := len(htnet.pinger.hosts) + htnet.pinger.mu.Unlock() + + if hosts != 2 { + t.Fatalf("got %d host entries, want 2", hosts) + } +} From e902d8375d1508a69e7a4f5943febfd1ad505cb4 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 30 Apr 2026 16:17:16 +0200 Subject: [PATCH 3/6] feat(httpnet): skip Connect probe for known HTTP endpoints when delegated routing returns multiple peer ids that share one http gateway (currently /dns/a-fil-http.aur.lu/tcp/443/https is advertised under three peer ids), each peer's connect issued its own head /ipfs/bafkqaaa probe to the same upstream. n peers => n probes. before probing, look up the url in the pinger's host map. if an earlier connect already proved the endpoint working, add the url straight to workingaddrs and inherit the cached head-support decision. the cache is valid as long as at least one peer is still registered against the endpoint; once refcount hits zero the entry is dropped and the next connect re-probes. while here, replace the hardcoded "HEAD" / "GET" strings in the package with http.MethodHead / http.MethodGet from net/http. --- bitswap/network/httpnet/connect_test.go | 122 +++++++++++++++++++++++ bitswap/network/httpnet/httpnet.go | 27 +++-- bitswap/network/httpnet/inflight_test.go | 13 +-- bitswap/network/httpnet/pinger.go | 24 ++++- 4 files changed, 166 insertions(+), 20 deletions(-) create mode 100644 bitswap/network/httpnet/connect_test.go diff --git a/bitswap/network/httpnet/connect_test.go b/bitswap/network/httpnet/connect_test.go new file mode 100644 index 000000000..e67b435a7 --- /dev/null +++ b/bitswap/network/httpnet/connect_test.go @@ -0,0 +1,122 @@ +package httpnet + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" +) + +// countingProbeServer wraps the standard test handler and counts probe +// requests (any request to /ipfs/). +func countingProbeServer(t *testing.T) (srv *httptest.Server, probes *atomic.Int32) { + t.Helper() + probes = new(atomic.Int32) + inner := &Handler{bstore: makeBlockstore(t, 0, 0)} + wrapped := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/ipfs/"+pingCid) { + probes.Add(1) + } + inner.ServeHTTP(rw, r) + }) + srv = httptest.NewUnstartedServer(wrapped) + srv.EnableHTTP2 = true + srv.StartTLS() + t.Cleanup(srv.Close) + return srv, probes +} + +// TestConnectSkipsProbeForKnownEndpoint verifies that a second peer +// resolving to the same HTTP endpoint as a previously-connected peer +// avoids issuing a fresh probe. +// +// This is the expected pattern when delegated routing returns multiple +// peer IDs for one gateway, e.g. /dns/a-fil-http.aur.lu/tcp/443/https +// is currently advertised under three peer IDs. +func TestConnectSkipsProbeForKnownEndpoint(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv, probes := countingProbeServer(t) + + mustConnectToPeer(t, ctx, htnet, peerA, srv) + if got := probes.Load(); got != 1 { + t.Fatalf("after peerA connect: got %d probes, want 1", got) + } + + mustConnectToPeer(t, ctx, htnet, peerB, srv) + if got := probes.Load(); got != 1 { + t.Fatalf("after peerB connect: got %d probes, want 1 (probe should be deduped)", got) + } + + if !htnet.IsConnectedToPeer(ctx, peerB.ID()) { + t.Errorf("peerB should be marked connected after probe dedup") + } +} + +// TestConnectInheritsHEADSupport verifies that the second peer inherits +// the HEAD-support decision from the cached endpoint state. +func TestConnectInheritsHEADSupport(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv, _ := countingProbeServer(t) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + if !supportsHave(htnet.host.Peerstore(), peerA.ID()) { + t.Fatal("peerA should support HEAD (the test server answers HEAD)") + } + + mustConnectToPeer(t, ctx, htnet, peerB, srv) + if !supportsHave(htnet.host.Peerstore(), peerB.ID()) { + t.Errorf("peerB should inherit HEAD support from peerA's cached probe") + } +} + +// TestConnectProbesNewEndpoint verifies that a peer pointing at an +// endpoint not yet in the cache still triggers a fresh probe. +func TestConnectProbesNewEndpoint(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srvA, probesA := countingProbeServer(t) + srvB, probesB := countingProbeServer(t) + + mustConnectToPeer(t, ctx, htnet, peerA, srvA) + mustConnectToPeer(t, ctx, htnet, peerB, srvB) + + if got := probesA.Load(); got != 1 { + t.Errorf("server A: got %d probes, want 1", got) + } + if got := probesB.Load(); got != 1 { + t.Errorf("server B: got %d probes, want 1 (different endpoint, fresh probe expected)", got) + } +} diff --git a/bitswap/network/httpnet/httpnet.go b/bitswap/network/httpnet/httpnet.go index 16be65537..d02da71e5 100644 --- a/bitswap/network/httpnet/httpnet.go +++ b/bitswap/network/httpnet/httpnet.go @@ -534,22 +534,30 @@ func (ht *Network) Connect(ctx context.Context, pi peer.AddrInfo) error { urls = urls[0:ht.maxHTTPAddressesPerPeer] } - // Try to talk to the peer by making HTTP requests to its urls and - // recording which ones work. This allows re-using the connections - // that we are about to open next time with the client. We call - // peer.Connected() on success. + // Probe each URL to confirm the gateway speaks our protocol. URLs + // already proven working by another peer (delegated routing routinely + // returns multiple peer IDs for one gateway, e.g. + // /dns/a-fil-http.aur.lu/tcp/443/https) skip the probe entirely and + // inherit the cached HEAD-support decision. var workingAddrs []multiaddr.Multiaddr supportsHead := true for _, u := range urls { - // If head works we assume GET works too. - status, err := ht.connectToURL(ctx, pi.ID, u, "HEAD") + if method, ok := ht.pinger.endpointKnown(u); ok { + workingAddrs = append(workingAddrs, u.Multiaddress) + if method != http.MethodHead { + supportsHead = false + } + log.Debugf("skipping probe for %s: endpoint already known via another peer", u.URL) + continue + } + + // If HEAD works we assume GET works too. + status, err := ht.connectToURL(ctx, pi.ID, u, http.MethodHead) if err != nil { errs = append(errs, fmt.Errorf("%s: %s", u.Multiaddress.String(), err)) - // abort if context cancelled if ctxErr := ctx.Err(); ctxErr != nil { return errors.Join(errs...) } - if status == http.StatusTooManyRequests { continue // do not try GET, just move on. } @@ -560,8 +568,7 @@ func (ht *Network) Connect(ctx context.Context, pi peer.AddrInfo) error { // HEAD did not work. Try GET. supportsHead = false - - _, err = ht.connectToURL(ctx, pi.ID, u, "GET") + _, err = ht.connectToURL(ctx, pi.ID, u, http.MethodGet) if err != nil { errs = append(errs, fmt.Errorf("%s: %s", u.Multiaddress.String(), err)) if ctxErr := ctx.Err(); ctxErr != nil { diff --git a/bitswap/network/httpnet/inflight_test.go b/bitswap/network/httpnet/inflight_test.go index afba6d497..0f675b088 100644 --- a/bitswap/network/httpnet/inflight_test.go +++ b/bitswap/network/httpnet/inflight_test.go @@ -2,6 +2,7 @@ package httpnet import ( "errors" + "net/http" "sync" "sync/atomic" "testing" @@ -142,13 +143,13 @@ func TestInflightPropagatesErrors(t *testing.T) { // requests that differ in any field that influences the upstream // response. func TestInflightKeyDistinguishesFields(t *testing.T) { - base := inflightKey("https", "example.com", "sni", "GET", "cid") + base := inflightKey("https", "example.com", "sni", http.MethodGet, "cid") cases := map[string]string{ - "scheme": inflightKey("http", "example.com", "sni", "GET", "cid"), - "host": inflightKey("https", "other.com", "sni", "GET", "cid"), - "sni": inflightKey("https", "example.com", "other-sni", "GET", "cid"), - "method": inflightKey("https", "example.com", "sni", "HEAD", "cid"), - "cid": inflightKey("https", "example.com", "sni", "GET", "other-cid"), + "scheme": inflightKey("http", "example.com", "sni", http.MethodGet, "cid"), + "host": inflightKey("https", "other.com", "sni", http.MethodGet, "cid"), + "sni": inflightKey("https", "example.com", "other-sni", http.MethodGet, "cid"), + "method": inflightKey("https", "example.com", "sni", http.MethodHead, "cid"), + "cid": inflightKey("https", "example.com", "sni", http.MethodGet, "other-cid"), } for name, k := range cases { if k == base { diff --git a/bitswap/network/httpnet/pinger.go b/bitswap/network/httpnet/pinger.go index 400d7b70d..79429f68b 100644 --- a/bitswap/network/httpnet/pinger.go +++ b/bitswap/network/httpnet/pinger.go @@ -3,6 +3,7 @@ package httpnet import ( "context" "errors" + "net/http" "sync" "time" @@ -78,9 +79,9 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { return ping.Result{Error: ErrNoHTTPAddresses} } - method := "GET" + method := http.MethodGet if supportsHave(pngr.ht.host.Peerstore(), p) { - method = "HEAD" + method = http.MethodHead } results := make(chan ping.Result, len(urls)) @@ -131,6 +132,21 @@ func (pngr *pinger) recordHostLatency(u network.ParsedURL, next time.Duration) { hp.recordLatency(next) } +// endpointKnown reports whether the given URL points at an endpoint that +// is already registered (proven working by a previous Connect). When ok +// is true, method is the probe method captured at first registration +// (http.MethodHead or http.MethodGet). Callers can skip a fresh probe +// and inherit the supportsHead decision from method. +func (pngr *pinger) endpointKnown(u network.ParsedURL) (method string, ok bool) { + pngr.mu.Lock() + defer pngr.mu.Unlock() + hp, ok := pngr.hosts[endpointKey(u.URL.Scheme, u.URL.Host, u.SNI)] + if !ok { + return "", false + } + return hp.method, true +} + // latency returns the average EWMA latency across all hosts that p is // registered against. Returns zero when p has no registered hosts or no // host has produced a sample yet. @@ -170,9 +186,9 @@ func (pngr *pinger) startPinging(p peer.ID) { return } - method := "GET" + method := http.MethodGet if supportsHave(pngr.ht.host.Peerstore(), p) { - method = "HEAD" + method = http.MethodHead } pngr.mu.Lock() From 36ec44b94b489d1ef1e42b183fc8ad80595433d1 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 30 Apr 2026 21:10:14 +0200 Subject: [PATCH 4/6] feat(httpnet): per-host breaker and error counters multiple peer ids regularly resolve to one http gateway. for example, /dns/a-fil-http.aur.lu/tcp/443/https is currently advertised under three peer ids by delegated-ipfs.dev. the per-peer breaker and 404 counter each let a misbehaving gateway burn n x maxretries / n x maxdonthave attempts before everyone gives up; sharing them trips once for the whole gateway. key changes: - senderURL.serverErrors becomes *atomic.Int64 shared per (scheme, host, sni) via a new breaker - errorTracker keys on endpointkey, takes []*senderurl, distributes client error counts across each request's hosts - clearcooldown also resets the per-host serverErrors on 200/4xx; one good answer forgives accrued breaker state for all peers using the host - pinger.stoppinging drops breaker and errortracker entries when the last peer using a host disconnects, so reconnects start fresh throttle handling matches http semantics: - httpworker no longer increments serverErrors on the typeretrylater 2nd-hit path. retry-after is "i am busy", not "i am broken"; charging the breaker on throttle would disconnect peers when the gateway is alive and just slow. - tryurl auto-expires its cooldown deadline so a long-lived senderurl resumes naturally once retry-after elapses, instead of staying stuck on a stale dl - pinger ticker skips probes while the host is on cooldown so we honour retry-after instead of polling inside the wait window at 50 req/sec against one popular gateway with retry-after: 5, this takes http traffic during the wait window from ~15 connect-probe attempts down to ~4 (3 initial 429s + 1 pinger-skip-aware tick), and recovery from cooldown + provider-rediscovery (~6s) down to cooldown alone (~5s). on retry-after-less 429/504 the default backoff is 1s (SendErrorBackoff), capped at maxbackoff=1m. real server errors (500 default arm, network failures, body read failures, malformed responses) still increment the breaker via typeserver as before, so single-peer behavior on genuine breakage is unchanged. --- bitswap/network/httpnet/breaker.go | 48 +++++++ bitswap/network/httpnet/connect_test.go | 3 +- bitswap/network/httpnet/cooldown.go | 10 ++ bitswap/network/httpnet/error_tracker.go | 75 ++++++---- bitswap/network/httpnet/error_tracker_test.go | 135 +++++++++++------- bitswap/network/httpnet/httpnet.go | 42 +++--- bitswap/network/httpnet/httpnet_test.go | 28 +--- bitswap/network/httpnet/inflight.go | 13 +- bitswap/network/httpnet/msg_sender.go | 24 +++- bitswap/network/httpnet/pinger.go | 18 ++- bitswap/network/httpnet/pinger_test.go | 46 ++++++ 11 files changed, 298 insertions(+), 144 deletions(-) create mode 100644 bitswap/network/httpnet/breaker.go diff --git a/bitswap/network/httpnet/breaker.go b/bitswap/network/httpnet/breaker.go new file mode 100644 index 000000000..350364c0d --- /dev/null +++ b/bitswap/network/httpnet/breaker.go @@ -0,0 +1,48 @@ +package httpnet + +import ( + "sync" + "sync/atomic" +) + +// breaker holds one shared serverError counter per HTTP endpoint. +// +// Without sharing, every (peer, URL) pair has its own counter and a +// flaky host can drain the per-peer quota of every peer that maps to +// it before any breaker trips. When N peer IDs resolve to one gateway +// the host accumulates N x MaxRetries attempts before all peers +// disconnect; the shared counter trips the breaker once for the whole +// gateway. +// +// Counters are returned as pointers so callers (senderURL) can use +// atomic.Int64 directly without going through the breaker on the hot +// path. +type breaker struct { + mu sync.Mutex + counters map[string]*atomic.Int64 +} + +func newBreaker() *breaker { + return &breaker{counters: make(map[string]*atomic.Int64)} +} + +// counter returns the shared per-host serverErrors counter for key, +// allocating one on first use. +func (b *breaker) counter(key string) *atomic.Int64 { + b.mu.Lock() + defer b.mu.Unlock() + if c, ok := b.counters[key]; ok { + return c + } + c := new(atomic.Int64) + b.counters[key] = c + return c +} + +// reset drops the counter for key. Called when no peer is using the +// host so a future reconnect starts fresh. +func (b *breaker) reset(key string) { + b.mu.Lock() + delete(b.counters, key) + b.mu.Unlock() +} diff --git a/bitswap/network/httpnet/connect_test.go b/bitswap/network/httpnet/connect_test.go index e67b435a7..7f1205bee 100644 --- a/bitswap/network/httpnet/connect_test.go +++ b/bitswap/network/httpnet/connect_test.go @@ -33,8 +33,7 @@ func countingProbeServer(t *testing.T) (srv *httptest.Server, probes *atomic.Int // avoids issuing a fresh probe. // // This is the expected pattern when delegated routing returns multiple -// peer IDs for one gateway, e.g. /dns/a-fil-http.aur.lu/tcp/443/https -// is currently advertised under three peer IDs. +// peer IDs for one gateway. func TestConnectSkipsProbeForKnownEndpoint(t *testing.T) { ctx := context.Background() diff --git a/bitswap/network/httpnet/cooldown.go b/bitswap/network/httpnet/cooldown.go index 1e9751146..5a61ea109 100644 --- a/bitswap/network/httpnet/cooldown.go +++ b/bitswap/network/httpnet/cooldown.go @@ -75,6 +75,16 @@ func (ct *cooldownTracker) remove(host string) { ct.urlsLock.Unlock() } +// onCooldown reports whether host has an active cooldown right now. +// Callers (e.g. the pinger) use this to honour Retry-After by skipping +// probes inside the window the gateway asked us to wait. +func (ct *cooldownTracker) onCooldown(host string) bool { + ct.urlsLock.RLock() + defer ct.urlsLock.RUnlock() + dl, ok := ct.urls[host] + return ok && time.Now().Before(dl) +} + func (ct *cooldownTracker) fillSenderURLs(urls []network.ParsedURL) []*senderURL { now := time.Now() surls := make([]*senderURL, len(urls)) diff --git a/bitswap/network/httpnet/error_tracker.go b/bitswap/network/httpnet/error_tracker.go index 72eb6ae1c..bf767084d 100644 --- a/bitswap/network/httpnet/error_tracker.go +++ b/bitswap/network/httpnet/error_tracker.go @@ -3,46 +3,65 @@ package httpnet import ( "errors" "sync" - - "github.com/libp2p/go-libp2p/core/peer" ) -var errThresholdCrossed = errors.New("the peer crossed the error threshold") +var errThresholdCrossed = errors.New("the host crossed the error threshold") +// errorTracker counts client errors (e.g. 404) per HTTP endpoint. +// +// Counts are keyed on endpointKey, not on peer.ID. Multiple peer IDs +// that resolve to the same gateway contribute to one shared counter, +// so a misbehaving host trips the breaker once for all peers using it +// rather than after every peer drained its own quota. type errorTracker struct { - ht *Network - - mux sync.RWMutex - errors map[peer.ID]int -} - -func newErrorTracker(ht *Network) *errorTracker { - return &errorTracker{ - ht: ht, - errors: make(map[peer.ID]int), - } + mu sync.Mutex + counts map[string]int } -func (et *errorTracker) stopTracking(p peer.ID) { - et.mux.Lock() - delete(et.errors, p) - et.mux.Unlock() +func newErrorTracker() *errorTracker { + return &errorTracker{counts: make(map[string]int)} } -// logErrors adds n to the current error count for p. If the total count is above the threshold, then an error is returned. If n is 0, the the total count is reset to 0. -func (et *errorTracker) logErrors(p peer.ID, n int, threshold int) error { - et.mux.Lock() - defer et.mux.Unlock() +// logErrors attributes n client errors to every host backing urls. +// When n is zero, the counters for those hosts reset (a successful +// SendMsg signals the hosts are healthy). +// +// Returns errThresholdCrossed when any host's count exceeds threshold. +// The caller decides what to do with that signal; today it disconnects +// the peer that triggered the check. Other peers using the same host +// are caught lazily on their next SendMsg. +func (et *errorTracker) logErrors(urls []*senderURL, n, threshold int) error { + if len(urls) == 0 { + return nil + } + et.mu.Lock() + defer et.mu.Unlock() - if n == 0 { // reset error count - delete(et.errors, p) + if n == 0 { + for _, u := range urls { + delete(et.counts, endpointKey(u.URL.Scheme, u.URL.Host, u.SNI)) + } return nil } - count := et.errors[p] - total := count + n - et.errors[p] = total - if total > threshold { + + var tripped bool + for _, u := range urls { + key := endpointKey(u.URL.Scheme, u.URL.Host, u.SNI) + et.counts[key] += n + if et.counts[key] > threshold { + tripped = true + } + } + if tripped { return errThresholdCrossed } return nil } + +// reset drops the count for key. Called when no peer is using the host +// so a future reconnect starts fresh. +func (et *errorTracker) reset(key string) { + et.mu.Lock() + delete(et.counts, key) + et.mu.Unlock() +} diff --git a/bitswap/network/httpnet/error_tracker_test.go b/bitswap/network/httpnet/error_tracker_test.go index 1f4f99a10..d0b17846f 100644 --- a/bitswap/network/httpnet/error_tracker_test.go +++ b/bitswap/network/httpnet/error_tracker_test.go @@ -1,96 +1,121 @@ package httpnet -// Write tests for the errorTracker implementation found in watcher.go import ( + "net/url" "sync" + "sync/atomic" "testing" + "time" - "github.com/libp2p/go-libp2p/core/peer" + "github.com/ipfs/boxo/bitswap/network" ) -func TestErrorTracker_StopTracking(t *testing.T) { - et := newErrorTracker(&Network{}) - p := peer.ID("testpeer") - - // Stop tracking the peer - et.stopTracking(p) - - // Check if the error count is removed - if _, ok := et.errors[p]; ok { - t.Errorf("Expected peer %s to be untracked but it was still tracked", p) +// makeTestURLs returns n senderURLs each pointing at a distinct synthetic +// host. Counters and cooldowns are initialized so the senderURLs are +// safe to use against the per-host trackers. +func makeTestURLs(t *testing.T, hosts ...string) []*senderURL { + t.Helper() + out := make([]*senderURL, len(hosts)) + for i, h := range hosts { + u, err := url.Parse("https://" + h) + if err != nil { + t.Fatalf("parse %q: %v", h, err) + } + su := &senderURL{ + ParsedURL: network.ParsedURL{URL: u}, + serverErrors: new(atomic.Int64), + } + su.cooldown.Store(time.Time{}) + out[i] = su } + return out } func TestErrorTracker_LogErrors_Reset(t *testing.T) { - et := newErrorTracker(&Network{}) - p := peer.ID("testpeer") + et := newErrorTracker() + urls := makeTestURLs(t, "host-a") - // Log some errors - err := et.logErrors(p, 5, 10) - if err != nil { - t.Errorf("Expected no error when logging errors but got %v", err) + if err := et.logErrors(urls, 5, 10); err != nil { + t.Errorf("logging 5 below threshold: got err %v, want nil", err) } - // Reset error count - err = et.logErrors(p, 0, 10) - if err != nil { - t.Errorf("Expected no error when resetting error count but got %v", err) + if err := et.logErrors(urls, 0, 10); err != nil { + t.Errorf("reset: got err %v, want nil", err) } - // Check if the error count is reset to 0 - count := et.errors[p] - if count != 0 { - t.Errorf("Expected error count for peer %s to be 0 after reset but got %d", p, count) + key := endpointKey(urls[0].URL.Scheme, urls[0].URL.Host, urls[0].SNI) + if got := et.counts[key]; got != 0 { + t.Errorf("after reset: count=%d, want 0", got) } } func TestErrorTracker_LogErrors_ThresholdCrossed(t *testing.T) { - et := newErrorTracker(&Network{}) - p := peer.ID("testpeer") + et := newErrorTracker() + urls := makeTestURLs(t, "host-a") + + if err := et.logErrors(urls, 11, 10); err != errThresholdCrossed { + t.Errorf("logging above threshold: got err %v, want errThresholdCrossed", err) + } + + key := endpointKey(urls[0].URL.Scheme, urls[0].URL.Host, urls[0].SNI) + if got := et.counts[key]; got != 11 { + t.Errorf("after trip: count=%d, want 11", got) + } +} - // Log errors until threshold is crossed - err := et.logErrors(p, 11, 10) - if err != errThresholdCrossed { - t.Errorf("Expected errorThresholdCrossed when logging errors above threshold but got %v", err) +// TestErrorTracker_SharesCounterAcrossPeers verifies that two peers +// resolving to the same HTTP endpoint share one error counter, so the +// breaker trips after one combined threshold rather than after each +// peer drains its own quota. +func TestErrorTracker_SharesCounterAcrossPeers(t *testing.T) { + et := newErrorTracker() + peerA := makeTestURLs(t, "shared-host") + peerB := makeTestURLs(t, "shared-host") + + // Peer A logs 60 errors; threshold 100 not yet crossed. + if err := et.logErrors(peerA, 60, 100); err != nil { + t.Errorf("peerA: got err %v, want nil", err) + } + // Peer B logs 50 errors; combined 110 trips the breaker. + if err := et.logErrors(peerB, 50, 100); err != errThresholdCrossed { + t.Errorf("peerB: got err %v, want errThresholdCrossed", err) } +} - // Check if the error count reflects the logged errors - count, ok := et.errors[p] - if !ok { - t.Errorf("Expected peer %s to be tracked but it was not", p) +func TestErrorTracker_DistinctHostsTrackIndependently(t *testing.T) { + et := newErrorTracker() + a := makeTestURLs(t, "host-a") + b := makeTestURLs(t, "host-b") + + if err := et.logErrors(a, 11, 10); err != errThresholdCrossed { + t.Errorf("host-a: got err %v, want errThresholdCrossed", err) } - if count != 11 { - t.Errorf("Expected error count for peer %s to be 10 after logging errors above threshold but got %d", p, count) + if err := et.logErrors(b, 5, 10); err != nil { + t.Errorf("host-b: got err %v, want nil (independent of host-a)", err) } } -// Write a test that tests concurrent access to the methods +// TestErrorTracker_ConcurrentAccess verifies safety under concurrent +// goroutines incrementing the same host counter. func TestErrorTracker_ConcurrentAccess(t *testing.T) { - et := newErrorTracker(&Network{}) - p := peer.ID("testpeer") + et := newErrorTracker() + urls := makeTestURLs(t, "host-a") var wg sync.WaitGroup - numRoutines := 10 - threshold := 100 + const numRoutines = 10 + const threshold = 100 for range numRoutines { wg.Go(func() { - for j := 0; j < threshold/numRoutines; j++ { - et.logErrors(p, 1, threshold) + for range threshold / numRoutines { + et.logErrors(urls, 1, threshold) } }) } - wg.Wait() - // Check if the error count is correct - count, ok := et.errors[p] - if !ok { - t.Errorf("Expected peer %s to be tracked but it was not", p) - } - expectedCount := threshold - actualCount := count - if actualCount != expectedCount { - t.Errorf("Expected error count for peer %s to be %d after concurrent logging but got %d", p, expectedCount, actualCount) + key := endpointKey(urls[0].URL.Scheme, urls[0].URL.Host, urls[0].SNI) + if got := et.counts[key]; got != threshold { + t.Errorf("after concurrent logging: count=%d, want %d", got, threshold) } } diff --git a/bitswap/network/httpnet/httpnet.go b/bitswap/network/httpnet/httpnet.go index d02da71e5..0fffcf5ce 100644 --- a/bitswap/network/httpnet/httpnet.go +++ b/bitswap/network/httpnet/httpnet.go @@ -235,6 +235,7 @@ type Network struct { requestTracker *requestTracker cooldownTracker *cooldownTracker inflight *inflightTracker + breaker *breaker ongoingConnsLock sync.RWMutex ongoingConns map[peer.ID]struct{} @@ -304,6 +305,7 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork { htnet.cooldownTracker = cooldownTracker htnet.inflight = newInflightTracker() + htnet.breaker = newBreaker() netdialer := &net.Dialer{ // Timeout for connects to complete. @@ -366,8 +368,7 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork { pinger := newPinger(htnet) htnet.pinger = pinger - et := newErrorTracker(htnet) - htnet.errorTracker = et + htnet.errorTracker = newErrorTracker() for i := 0; i < htnet.httpWorkers; i++ { go htnet.httpWorker(i) @@ -426,7 +427,11 @@ func (ht *Network) senderURLs(p peer.ID) []*senderURL { if len(urls) == 0 { return nil } - return ht.cooldownTracker.fillSenderURLs(urls) + surls := ht.cooldownTracker.fillSenderURLs(urls) + for _, su := range surls { + su.serverErrors = ht.breaker.counter(endpointKey(su.URL.Scheme, su.URL.Host, su.SNI)) + } + return surls } // IsHTTPPeer returns true if the peer is currently being pinged, which means @@ -535,9 +540,8 @@ func (ht *Network) Connect(ctx context.Context, pi peer.AddrInfo) error { } // Probe each URL to confirm the gateway speaks our protocol. URLs - // already proven working by another peer (delegated routing routinely - // returns multiple peer IDs for one gateway, e.g. - // /dns/a-fil-http.aur.lu/tcp/443/https) skip the probe entirely and + // already proven working by another peer (delegated routing returns + // multiple peer IDs for one gateway) skip the probe entirely and // inherit the cached HEAD-support decision. var workingAddrs []multiaddr.Multiaddr supportsHead := true @@ -669,11 +673,11 @@ func (ht *Network) DisconnectFrom(ctx context.Context, p peer.ID) error { log.Debugf("disconnecting from %s", p) ht.connEvtMgr.Disconnected(p) // notify everywhere that we are going offline + // stopPinging decrements per-host refcounts and, if this peer was + // the last user of a host, drops the host's breaker and + // errorTracker entries so a future reconnect starts fresh. The + // cooldownTracker has its own TTL cleaner and survives independently. ht.pinger.stopPinging(p) - ht.errorTracker.stopTracking(p) - - // coolDownTracker: we leave untouched. We want to keep - // ongoing cooldowns there in case we reconnect to this peer. return nil } @@ -760,20 +764,16 @@ func (ht *Network) httpWorker(i int) { if serr != nil { switch serr.Type { case typeRetryLater: - // This error signals that we - // should retry but if things - // keep failing we consider it - // a serverError. When - // multiple urls, retries may - // happen on a different url. + // Retry-After is the gateway's "I am + // busy, wait" signal, not a server + // failure. Drop the URL from this + // SendMsg's retry pool so the loop + // terminates, but do not charge the + // per-host breaker: throttling must + // not escalate to a disconnect. retryLaterErrors++ if retryLaterErrors%2 == 0 { - // we retried same CID 2 times. No luck. - // Increase server errors. - // Start ignoring urls. - result.err.Type = typeServer urlIgnore = append(urlIgnore, u) - u.serverErrors.Add(1) } continue // retry request again case typeClient: diff --git a/bitswap/network/httpnet/httpnet_test.go b/bitswap/network/httpnet/httpnet_test.go index 80a9650e5..85b1d8b54 100644 --- a/bitswap/network/httpnet/httpnet_test.go +++ b/bitswap/network/httpnet/httpnet_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "net/url" "strings" + "sync/atomic" "testing" "time" @@ -341,27 +342,12 @@ func TestBestURL(t *testing.T) { } // add some bogus urls to test the sorting now := time.Now() - surls := []*senderURL{ - { - ParsedURL: network.ParsedURL{ - URL: urls[0], - }, - }, - { - ParsedURL: network.ParsedURL{ - URL: urls[1], - }, - }, - { - ParsedURL: network.ParsedURL{ - URL: urls[2], - }, - }, - { - ParsedURL: network.ParsedURL{ - URL: urls[3], - }, - }, + surls := make([]*senderURL, len(urls)) + for i := range urls { + surls[i] = &senderURL{ + ParsedURL: network.ParsedURL{URL: urls[i]}, + serverErrors: new(atomic.Int64), + } } surls[0].cooldown.Store(now.Add(time.Second)) diff --git a/bitswap/network/httpnet/inflight.go b/bitswap/network/httpnet/inflight.go index 7caa62d9f..827c2d0e9 100644 --- a/bitswap/network/httpnet/inflight.go +++ b/bitswap/network/httpnet/inflight.go @@ -9,13 +9,12 @@ import ( // multiple peer IDs sharing one HTTP endpoint produce one round trip, // not one per peer ID. // -// Delegated routing returns multiple peer IDs for one HTTP gateway. For -// example, /dns/a-fil-http.aur.lu/tcp/443/https is currently advertised -// under three peer IDs. Bitswap creates one MessageQueue per peer ID, so -// a single want-have broadcast for one CID becomes three identical HEAD -// requests against the same upstream. Want-have probes that bitswap -// sends to non-best peers fan out the same way. The pattern compounds -// as HTTP retrieval grows. +// Delegated routing routinely returns multiple peer IDs for one HTTP +// gateway. Bitswap creates one MessageQueue per peer ID, so a single +// want-have broadcast for one CID becomes N identical HEAD requests +// against the same upstream. Want-have probes that bitswap sends to +// non-best peers fan out the same way. The pattern compounds as HTTP +// retrieval grows. // // Coalescing applies only while a request is in flight. Once the leader // returns, subsequent callers issue their own requests; we do not cache diff --git a/bitswap/network/httpnet/msg_sender.go b/bitswap/network/httpnet/msg_sender.go index a84acd4d6..c7d711d97 100644 --- a/bitswap/network/httpnet/msg_sender.go +++ b/bitswap/network/httpnet/msg_sender.go @@ -60,10 +60,16 @@ func setSenderOpts(opts *network.MessageSenderOpts) network.MessageSenderOpts { } // senderURL wraps url with information about cooldowns and errors. +// +// serverErrors points at a per-host counter shared across every +// senderURL that resolves to the same (scheme, host, sni). When one +// peer trips the breaker on an endpoint, every other peer sharing it +// sees the elevated count immediately. The pointer is supplied by +// breaker.counter and must be non-nil before the senderURL is used. type senderURL struct { network.ParsedURL cooldown atomic.Value - serverErrors atomic.Int64 + serverErrors *atomic.Int64 } // httpMsgSender implements a network.MessageSender. @@ -194,7 +200,11 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm panic("unknown bitswap entry type") } - if dl := u.cooldown.Load().(time.Time); !dl.IsZero() { + // Skip the request while the host's cooldown is still active. We + // auto-expire the deadline here so a long-lived senderURL doesn't + // stay stuck on a stale cooldown (fillSenderURLs only checks the + // deadline at construction time). + if dl := u.cooldown.Load().(time.Time); !dl.IsZero() && time.Now().Before(dl) { err := fmt.Errorf("cooldown (%s): %s %q ", dl, method, u.URL) log.Debug(err) return nil, &senderError{Type: typeRetryLater, Err: err} @@ -386,13 +396,17 @@ func (sender *httpMsgSender) handleResponse(u *senderURL, entry bsmsg.Entry, met } // clearCooldown removes any active cooldown on u after a definitive -// response. Both the per-URL cooldown and the host-level cooldownTracker -// are cleared so that future senders for the same host start fresh. +// response and resets the host's serverErrors counter. A 200 or 404 +// proves the host is healthy, so we forgive prior breaker accruals +// and let every peer sharing the host start fresh. func (sender *httpMsgSender) clearCooldown(u *senderURL) { if !u.cooldown.Load().(time.Time).IsZero() { sender.ht.cooldownTracker.remove(u.URL.Host) u.cooldown.Store(time.Time{}) } + if u.serverErrors.Load() > 0 { + u.serverErrors.Store(0) + } } // applyBackoff sets a cooldown on u. If retryAfter parses as a date or @@ -576,7 +590,7 @@ WANTLIST_LOOP: } // if totalClientErrors == 0, count is reset. - if err := sender.ht.errorTracker.logErrors(sender.peer, totalClientErrors, sender.ht.maxDontHaveErrors); err != nil { + if err := sender.ht.errorTracker.logErrors(sender.urls, totalClientErrors, sender.ht.maxDontHaveErrors); err != nil { log.Debugf("too many client errors. Disconnecting from %s", sender.peer) sender.ht.DisconnectFrom(ctx, sender.peer) } diff --git a/bitswap/network/httpnet/pinger.go b/bitswap/network/httpnet/pinger.go index 79429f68b..1806374a8 100644 --- a/bitswap/network/httpnet/pinger.go +++ b/bitswap/network/httpnet/pinger.go @@ -24,10 +24,10 @@ const ewmaSmoothing = 0.1 // // State is keyed by HTTP endpoint, not by peer ID. Multiple peer IDs that // resolve to the same endpoint share one ticker and one latency reading. -// /dns/a-fil-http.aur.lu/tcp/443/https is currently advertised by three -// peer IDs in delegated-ipfs.dev responses; with per-peer pinging, three -// peer IDs idle on that gateway produce 36 HEAD requests per minute. The -// gateway is a single physical resource, so we ping it once. +// With per-peer pinging, N peer IDs idle on one gateway would produce +// N times the probe traffic (HEAD /ipfs/ every 5 s) for no new +// information. The gateway is a single physical resource, so we ping +// it once. type pinger struct { ht *Network @@ -226,7 +226,8 @@ func (pngr *pinger) startPinging(p peer.ID) { } // stopPinging unregisters p. Hosts whose refcount drops to zero have -// their ticker cancelled and their state removed. +// their ticker cancelled and their host-shared state (pinger, breaker, +// errorTracker) removed so a future reconnect starts fresh. func (pngr *pinger) stopPinging(p peer.ID) { pngr.mu.Lock() defer pngr.mu.Unlock() @@ -249,6 +250,8 @@ func (pngr *pinger) stopPinging(p peer.ID) { } hp.cancel() delete(pngr.hosts, key) + pngr.ht.breaker.reset(key) + pngr.ht.errorTracker.reset(key) } } @@ -275,6 +278,11 @@ func (pngr *pinger) tickerLoop(ctx context.Context, hp *hostPing) { } func (pngr *pinger) tickOnce(ctx context.Context, hp *hostPing) { + // Respect Retry-After: while the host is in cooldown, the gateway + // has explicitly told us to wait. Skip the probe entirely. + if pngr.ht.cooldownTracker.onCooldown(hp.url.URL.Host) { + return + } start := time.Now() if _, err := pngr.ht.connectToURL(ctx, hp.peerID, hp.url, hp.method); err != nil { log.Debug(err) diff --git a/bitswap/network/httpnet/pinger_test.go b/bitswap/network/httpnet/pinger_test.go index 2870c66b6..af8a4a288 100644 --- a/bitswap/network/httpnet/pinger_test.go +++ b/bitswap/network/httpnet/pinger_test.go @@ -100,6 +100,52 @@ func TestPingerStopReleasesRefcount(t *testing.T) { } } +// TestPingerCleansSharedStateOnLastDisconnect verifies that when the +// last peer using a host disconnects, the host-shared breaker and +// errorTracker entries are dropped so a future reconnect starts fresh. +func TestPingerCleansSharedStateOnLastDisconnect(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + + // Seed breaker and errorTracker state for the host. + urls := htnet.senderURLs(peerA.ID()) + if len(urls) != 1 { + t.Fatalf("expected 1 senderURL, got %d", len(urls)) + } + urls[0].serverErrors.Store(7) + if err := htnet.errorTracker.logErrors(urls, 5, 100); err != nil { + t.Fatalf("errorTracker.logErrors: %v", err) + } + + if err := htnet.DisconnectFrom(ctx, peerA.ID()); err != nil { + t.Fatal(err) + } + + // Last peer is gone; both shared structures should have dropped + // their entries. + htnet.breaker.mu.Lock() + breakerEntries := len(htnet.breaker.counters) + htnet.breaker.mu.Unlock() + if breakerEntries != 0 { + t.Errorf("breaker still has %d entries after last disconnect", breakerEntries) + } + + htnet.errorTracker.mu.Lock() + errorEntries := len(htnet.errorTracker.counts) + htnet.errorTracker.mu.Unlock() + if errorEntries != 0 { + t.Errorf("errorTracker still has %d entries after last disconnect", errorEntries) + } +} + // TestPingerDistinctHostsRunIndependently verifies that two peers with // different HTTP endpoints each get their own host entry. func TestPingerDistinctHostsRunIndependently(t *testing.T) { From c4f32056268471a3351509bbbe03bbaa2fcae468 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 30 Apr 2026 23:44:02 +0200 Subject: [PATCH 5/6] feat(httpnet): respect Retry-After in on-demand Ping the periodic ticker already skips probes while a host is on cooldown. the on-demand Network.Ping path (called once per messagequeue start by donthavetimeoutmgr to bootstrap its dont-have timeout) still issued probes during the wait window. apply the same rule everywhere: if the host has an active cooldown, skip the probe and report errcooldownactive. donthavetimeoutmgr already treats any ping error by falling back to its default timeout, which real message latency refines as soon as traffic flows; no metric or behaviour regresses. --- bitswap/network/httpnet/pinger.go | 13 ++++++++ bitswap/network/httpnet/pinger_test.go | 42 ++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/bitswap/network/httpnet/pinger.go b/bitswap/network/httpnet/pinger.go index 1806374a8..642272193 100644 --- a/bitswap/network/httpnet/pinger.go +++ b/bitswap/network/httpnet/pinger.go @@ -20,6 +20,10 @@ const pingInterval = 5 * time.Second // Matches the value used by the libp2p peerstore's LatencyEWMA. const ewmaSmoothing = 0.1 +// errCooldownActive is reported by ping() when a URL is skipped because +// the host is in its Retry-After window. +var errCooldownActive = errors.New("host is on cooldown, skipping probe") + // pinger probes HTTP endpoints periodically and tracks their latency. // // State is keyed by HTTP endpoint, not by peer ID. Multiple peer IDs that @@ -87,6 +91,15 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result { results := make(chan ping.Result, len(urls)) for _, u := range urls { go func(u network.ParsedURL) { + // Respect Retry-After: skip probes against hosts that + // are currently on cooldown. Caller (dontHaveTimeoutMgr) + // handles the resulting error by falling back to its + // default timeout, which message latency refines once + // real traffic flows. + if pngr.ht.cooldownTracker.onCooldown(u.URL.Host) { + results <- ping.Result{Error: errCooldownActive} + return + } start := time.Now() _, err := pngr.ht.connectToURL(ctx, p, u, method) if err != nil { diff --git a/bitswap/network/httpnet/pinger_test.go b/bitswap/network/httpnet/pinger_test.go index af8a4a288..e58eb7d95 100644 --- a/bitswap/network/httpnet/pinger_test.go +++ b/bitswap/network/httpnet/pinger_test.go @@ -2,7 +2,11 @@ package httpnet import ( "context" + "errors" "testing" + "time" + + "github.com/ipfs/boxo/bitswap/network" ) // TestPingerSharesHostsAcrossPeers verifies that two peer IDs which @@ -146,6 +150,44 @@ func TestPingerCleansSharedStateOnLastDisconnect(t *testing.T) { } } +// TestPingerSkipsProbeOnCooldown verifies that ping() honours +// Retry-After: when the cooldownTracker shows the host is in its +// wait window, no HTTP probe is issued and the per-URL goroutine +// returns errCooldownActive. +func TestPingerSkipsProbeOnCooldown(t *testing.T) { + ctx := context.Background() + + htnet, mn := mockNetwork(t, mockReceiver(t)) + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv, probes := countingProbeServer(t) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + + // Connect probed once; baseline. + if got := probes.Load(); got != 1 { + t.Fatalf("after Connect: got %d probes, want 1", got) + } + + // Put the host on cooldown directly. ping() should skip the probe. + host := htnet.host.Peerstore().Addrs(peerA.ID())[0] + purl, err := network.ExtractHTTPAddress(host) + if err != nil { + t.Fatal(err) + } + htnet.cooldownTracker.setByDuration(purl.URL.Host, 30*time.Second) + + res := htnet.pinger.ping(ctx, peerA.ID()) + if res.Error == nil || !errors.Is(res.Error, errCooldownActive) { + t.Fatalf("ping during cooldown: got err %v, want errCooldownActive", res.Error) + } + if got := probes.Load(); got != 1 { + t.Errorf("after Ping during cooldown: got %d probes, want 1 (no new probe)", got) + } +} + // TestPingerDistinctHostsRunIndependently verifies that two peers with // different HTTP endpoints each get their own host entry. func TestPingerDistinctHostsRunIndependently(t *testing.T) { From eac3972ea00b42df251ce5ae6dc540481d6becf0 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 30 Apr 2026 23:51:57 +0200 Subject: [PATCH 6/6] test(httpnet): cover per-host breaker, cooldown, and throttle semantics add unit tests and integration tests pinning down the behaviours that the host-dedup branch's correctness rests on: - breaker_test.go: counter aliasing for the same key, independence across keys, reset drops the entry and a fresh allocation is handed out next, concurrent counter/reset is data-race clean - cooldown_test.go: ondemand reports active-vs-expired-vs-missing correctly, setbyduration clamps at maxbackoff - testbackoff: assert the per-host serverErrors counter stayed at 0 after a retry-after cycle. throttling must not feed the breaker; without this assertion a regression that re-introduces the worker's retrylater 2nd-hit increment would still pass on dont-have counts. - testsharedbreakerdisconnectsacrosspeers: peer a's typeserver increment via 500 must trip peer b's bestUrl on its next sendmsg. with two peers connected to the same host, both end up disconnected from one round of failures rather than one round per peer. - testtryurl_cooldownautoexpiry: a senderurl carrying a stale (already elapsed) cooldown deadline must let trydir-thru. fillsenderurls only checks the deadline at construction; tryurl needs the time.now().before(dl) guard to avoid sticking forever. --- bitswap/network/httpnet/breaker_test.go | 74 ++++++++++++++++ bitswap/network/httpnet/cooldown_test.go | 46 ++++++++++ bitswap/network/httpnet/httpnet_test.go | 105 +++++++++++++++++++++++ 3 files changed, 225 insertions(+) create mode 100644 bitswap/network/httpnet/breaker_test.go create mode 100644 bitswap/network/httpnet/cooldown_test.go diff --git a/bitswap/network/httpnet/breaker_test.go b/bitswap/network/httpnet/breaker_test.go new file mode 100644 index 000000000..f14c32ca2 --- /dev/null +++ b/bitswap/network/httpnet/breaker_test.go @@ -0,0 +1,74 @@ +package httpnet + +import ( + "sync" + "testing" +) + +func TestBreaker_CounterIsSharedPerKey(t *testing.T) { + b := newBreaker() + a1 := b.counter("k") + a2 := b.counter("k") + if a1 != a2 { + t.Fatalf("counter(k) returned different pointers across calls (%p vs %p)", a1, a2) + } + a1.Add(3) + if got := a2.Load(); got != 3 { + t.Fatalf("a2 sees %d, want 3 (counters must alias)", got) + } +} + +func TestBreaker_DistinctKeysAreIndependent(t *testing.T) { + b := newBreaker() + x := b.counter("x") + y := b.counter("y") + x.Add(7) + if got := y.Load(); got != 0 { + t.Fatalf("y leaked from x: y=%d, want 0", got) + } +} + +func TestBreaker_ResetDropsCounter(t *testing.T) { + b := newBreaker() + c1 := b.counter("k") + c1.Add(5) + b.reset("k") + + // After reset, a fresh counter is allocated; it must not carry the + // old value, and it must be a different pointer than the old one. + c2 := b.counter("k") + if c2 == c1 { + t.Fatalf("reset returned the same pointer; want a fresh counter") + } + if got := c2.Load(); got != 0 { + t.Fatalf("fresh counter has value %d, want 0", got) + } + // The old pointer is still usable but orphaned: incrementing it + // must not affect the new counter. + c1.Add(1) + if got := c2.Load(); got != 0 { + t.Fatalf("new counter affected by orphaned pointer: %d, want 0", got) + } +} + +func TestBreaker_ConcurrentCounterAndReset(t *testing.T) { + // Smoke test: counter() and reset() running concurrently must not + // deadlock or race-detect. + b := newBreaker() + + var wg sync.WaitGroup + const N = 20 + for range N { + wg.Go(func() { + for range 100 { + b.counter("hot").Add(1) + } + }) + wg.Go(func() { + for range 100 { + b.reset("hot") + } + }) + } + wg.Wait() +} diff --git a/bitswap/network/httpnet/cooldown_test.go b/bitswap/network/httpnet/cooldown_test.go new file mode 100644 index 000000000..d37d4a9c1 --- /dev/null +++ b/bitswap/network/httpnet/cooldown_test.go @@ -0,0 +1,46 @@ +package httpnet + +import ( + "testing" + "time" +) + +func TestCooldownTracker_OnCooldown(t *testing.T) { + ct := newCooldownTracker(time.Minute) + defer ct.stopCleaner() + + if ct.onCooldown("missing") { + t.Errorf("unset host reported on cooldown") + } + + ct.setByDuration("active", 10*time.Second) + if !ct.onCooldown("active") { + t.Errorf("active deadline not reported on cooldown") + } + + // A deadline in the past must not be reported as on cooldown. + ct.setByDate("expired", time.Now().Add(-1*time.Second)) + if ct.onCooldown("expired") { + t.Errorf("expired deadline reported on cooldown") + } + + ct.remove("active") + if ct.onCooldown("active") { + t.Errorf("removed host still reported on cooldown") + } +} + +func TestCooldownTracker_MaxBackoffCap(t *testing.T) { + ct := newCooldownTracker(2 * time.Second) + defer ct.stopCleaner() + + // Asking for 1 hour must be clamped to maxBackoff. + ct.setByDuration("clamp", time.Hour) + + ct.urlsLock.RLock() + dl := ct.urls["clamp"] + ct.urlsLock.RUnlock() + if got := time.Until(dl); got > 3*time.Second { + t.Errorf("setByDuration not clamped: %s remaining, want <=2s+slack", got) + } +} diff --git a/bitswap/network/httpnet/httpnet_test.go b/bitswap/network/httpnet/httpnet_test.go index 85b1d8b54..d3d0e912d 100644 --- a/bitswap/network/httpnet/httpnet_test.go +++ b/bitswap/network/httpnet/httpnet_test.go @@ -615,6 +615,22 @@ func TestBackOff(t *testing.T) { if len(recv.donthaves) != 2 || (len(recv.blocks)+len(recv.haves)) > 0 { t.Error("no blocks should have been received while on backoff") } + + // Retry-After is "I am busy", not "I am broken". The shared per-host + // breaker must not have been incremented; otherwise peer2 would have + // inherited a non-zero count and bestURL would have tripped the + // fatal disconnect path instead of returning DONT_HAVE. + urls := network.ExtractURLsFromPeer(htnet.host.Peerstore().PeerInfo(peer.ID())) + if len(urls) == 0 { + t.Fatal("expected at least one URL on peer") + } + hostKey := endpointKey(urls[0].URL.Scheme, urls[0].URL.Host, urls[0].SNI) + htnet.breaker.mu.Lock() + c := htnet.breaker.counters[hostKey] + htnet.breaker.mu.Unlock() + if c != nil && c.Load() != 0 { + t.Errorf("breaker counter for %s = %d after Retry-After cycle, want 0", hostKey, c.Load()) + } } // Write a TestErrorTracking function which tests that a peer is disconnected when the treshold is crossed. @@ -662,3 +678,92 @@ func TestErrorTracking(t *testing.T) { t.Fatal(err) } } + +// TestSharedBreakerDisconnectsAcrossPeers verifies that when one peer +// triggers a real server error against a host shared with other peers, +// the per-host breaker is high enough that the next SendMsg from any +// peer using the same host disconnects too. Without sharing, each peer +// would burn its own MaxRetries quota before disconnecting. +func TestSharedBreakerDisconnectsAcrossPeers(t *testing.T) { + ctx := context.Background() + recv := mockReceiver(t) + htnet, mn := mockNetwork(t, recv) + + peerA, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + peerB, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + srv := makeServer(t, 0, 0) + mustConnectToPeer(t, ctx, htnet, peerA, srv) + if err := recv.waitConnected(1); err != nil { + t.Fatal(err) + } + mustConnectToPeer(t, ctx, htnet, peerB, srv) + if err := recv.waitConnected(1); err != nil { + t.Fatal(err) + } + + // Drive a real server error from peerA against the shared host. The + // server returns 500 with empty body which lands in the default + // (typeServer) arm of handleResponse and increments the breaker. + msg := makeWantsMessage([]cid.Cid{errorCid}) + if err := htnet.SendMessage(ctx, peerA.ID(), msg); err != nil { + t.Fatal(err) + } + if err := recv.waitDisconnected(2); err != nil { + t.Fatalf("peerA should disconnect after typeServer trips MaxRetries: %v", err) + } + + // Counter is shared. peerB's senderURL inherits the same pointer at + // MessageSender construction time; bestURL trips fatal before any + // HTTP request even goes out. + wl := makeCids(t, 0, 1) + if err := htnet.SendMessage(ctx, peerB.ID(), makeWantsMessage(wl)); err != nil { + t.Fatal(err) + } + if err := recv.waitDisconnected(2); err != nil { + t.Fatalf("peerB should disconnect via the per-host shared breaker: %v", err) + } +} + +// TestTryURL_CooldownAutoExpiry verifies that a senderURL with a stale +// (already-elapsed) cooldown deadline does not block tryURL forever. +// fillSenderURLs only checks the deadline at construction time, so a +// long-lived senderURL needs tryURL itself to ignore expired deadlines. +func TestTryURL_CooldownAutoExpiry(t *testing.T) { + ctx := context.Background() + recv := mockReceiver(t) + htnet, mn := mockNetwork(t, recv) + + p, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + srv := makeServer(t, 0, 1) + mustConnectToPeer(t, ctx, htnet, p, srv) + + nms, err := htnet.NewMessageSender(ctx, p.ID(), nil) + if err != nil { + t.Fatal(err) + } + ms := nms.(*httpMsgSender) + + // Stamp an already-elapsed cooldown on the senderURL. + ms.urls[0].cooldown.Store(time.Now().Add(-1 * time.Hour)) + + wantBlocks := makeCids(t, 0, 1) + msg := makeWantsMessage(wantBlocks) + if err := nms.SendMsg(ctx, msg); err != nil { + t.Fatal(err) + } + recv.wait(1) + if len(recv.blocks) != 1 { + t.Errorf("expected the block through despite stale cooldown; got blocks=%d donthaves=%d", + len(recv.blocks), len(recv.donthaves)) + } +}