Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The following emojis are used to highlight certain changes:

- 🛠 `pinning/pinner`: added `Pinner.Close() error`. Close cancels every in-flight operation's context, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, and waits for them to return. A scalar method that observes the cancellation may return `context.Canceled`; a stream interrupted by Close may surface `ErrClosed` on the channel before it closes. After Close returns, every other method returns the new `ErrClosed` sentinel; streaming methods deliver it as `StreamedPin.Err` on a single entry, then close the channel. Close is idempotent and goroutine-safe. **Action required:** downstream `Pinner` implementations must add `Close`. [#1150](https://github.com/ipfs/boxo/pull/1150)
- `pinning/pinner/dspinner`: implements `Close`. Close cancels the contexts of in-flight operations, so snapshot iteration in `RecursiveKeys`/`DirectKeys` and DAG fetches in `Pin` bail out promptly instead of draining to completion. Close returns as soon as those operations honor their ctx. Hosts owning the datastore should call `Close` on the pinner before closing the datastore to avoid the use-after-close panic path in stores such as pebble. [#1150](https://github.com/ipfs/boxo/pull/1150)
- `routing/http/types/iter`: added `Limit`, an iterator that caps another iterator at a fixed number of values.

### Changed
- upgrade to `go-libp2p-kad-dht` [v0.39.2](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.39.2)
Expand All @@ -40,11 +41,14 @@ The following emojis are used to highlight certain changes:

See [ipfs/kubo#11254](https://github.com/ipfs/kubo/pull/11254) for a worked example of the call-site update. [#1128](https://github.com/ipfs/boxo/pull/1128)

- ✨ `routing/http/server`: the Delegated Routing server now calls `DelegatedRouter.FindProviders`/`FindPeers` with a limit of `0` (unbounded) and applies the configured records limit itself, after filtering. This is what lets a filtered request still return a full page of results. The server reads the result iterator lazily and closes it once it has enough records, so delegates should return results lazily and stop work when the iterator is closed. A delegate that used the limit to end its walk early will now end it on `Close` instead.

### Removed

### Fixed

- `files`: now builds under `GOOS=js GOARCH=wasm` and `GOOS=wasip1 GOARCH=wasm`. [#935](https://github.com/ipfs/boxo/pull/935)
- `routing/http/server`: filtered `/routing/v1/providers` and `/routing/v1/peers` requests no longer return fewer records than the configured limit. Before, the limit was applied before `filter-addrs`/`filter-protocols` ran, so records dropped by the filters shrank the response. The limit is now applied after filtering.

### Security

Expand Down
16 changes: 6 additions & 10 deletions routing/http/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,9 @@ func TestClient_FindProviders(t *testing.T) {
cid := makeCID()

routerResultIter := iter.FromSlice(c.routerResult)
if c.expStreamingResponse {
router.On("FindProviders", mock.Anything, cid, 0).Return(routerResultIter, c.routerErr)
} else {
router.On("FindProviders", mock.Anything, cid, 20).Return(routerResultIter, c.routerErr)
}
// The server always passes 0 (unbounded) to the delegate; it
// enforces records limits itself, after filtering.
router.On("FindProviders", mock.Anything, cid, 0).Return(routerResultIter, c.routerErr)

resultIter, err := client.FindProviders(ctx, cid)
c.expErrContains.errContains(t, err)
Expand Down Expand Up @@ -701,11 +699,9 @@ func TestClient_FindPeers(t *testing.T) {
}

routerResultIter := iter.FromSlice(c.routerResult)
if c.expStreamingResponse {
router.On("FindPeers", mock.Anything, pid, 0).Return(routerResultIter, c.routerErr)
} else {
router.On("FindPeers", mock.Anything, pid, 20).Return(routerResultIter, c.routerErr)
}
// The server always passes 0 (unbounded) to the delegate; it
// enforces records limits itself, after filtering.
router.On("FindPeers", mock.Anything, pid, 0).Return(routerResultIter, c.routerErr)

resultIter, err := client.FindPeers(ctx, pid)
c.expErrContains.errContains(t, err)
Expand Down
65 changes: 43 additions & 22 deletions routing/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type FindProvidersAsyncResponse struct {
type DelegatedRouter interface {
// FindProviders searches for peers who are able to provide the given [cid.Cid].
// Limit indicates the maximum amount of results to return; 0 means unbounded.
//
// The HTTP server in this package always calls FindProviders with a
// limit of 0 and caps the response itself, after filtering. It consumes
// the iterator lazily and Closes it once enough records are collected,
// so implementations should return results lazily and stop work on Close.
FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error)

// Deprecated: historic API from [IPIP-526], may be removed in a future version.
Expand All @@ -78,6 +83,9 @@ type DelegatedRouter interface {

// FindPeers searches for peers who have the provided [peer.ID].
// Limit indicates the maximum amount of results to return; 0 means unbounded.
//
// As with FindProviders, the HTTP server always calls FindPeers with a
// limit of 0 and caps the response itself, after filtering.
FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error)

// GetIPNS searches for an [ipns.Record] for the given [ipns.Name].
Expand Down Expand Up @@ -124,18 +132,20 @@ func WithStreamingResultsDisabled() Option {
}
}

// WithRecordsLimit sets a limit that will be passed to [ContentRouter.FindProviders]
// and [ContentRouter.FindPeers] for non-streaming requests (application/json).
// Default is [DefaultRecordsLimit].
// WithRecordsLimit caps the number of records returned for non-streaming
// requests (application/json). The server applies the cap after filtering,
// so filtered-out records do not shrink the response. The delegate
// [ContentRouter] is always called with a limit of 0 (unbounded).
// A limit of 0 disables the cap. Default is [DefaultRecordsLimit].
func WithRecordsLimit(limit int) Option {
return func(s *server) {
s.recordsLimit = limit
}
}

// WithStreamingRecordsLimit sets a limit that will be passed to [ContentRouter.FindProviders]
// and [ContentRouter.FindPeers] for streaming requests (application/x-ndjson).
// Default is [DefaultStreamingRecordsLimit].
// WithStreamingRecordsLimit caps the number of records returned for
// streaming requests (application/x-ndjson). See [WithRecordsLimit] for
// how the cap is applied. Default is [DefaultStreamingRecordsLimit].
func WithStreamingRecordsLimit(limit int) Option {
return func(s *server) {
s.streamingRecordsLimit = limit
Expand Down Expand Up @@ -272,7 +282,7 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) {
}

var (
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string)
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record], recordsLimit int, filterAddrs, filterProtocols []string)
recordsLimit int
)

Expand All @@ -287,7 +297,13 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) {
ctx, cancel := context.WithTimeout(httpReq.Context(), s.routingTimeout)
defer cancel()

provIter, err := s.svc.FindProviders(ctx, cid, recordsLimit)
// Pass 0 (unbounded) to the delegate and enforce recordsLimit here,
// after filtering. Passing recordsLimit would let the delegate stop
// early, before filters run, so records dropped by filters would
// shrink the response below recordsLimit. The delegate returns
// results lazily; the limiting iterator closes it once the cap is
// reached.
provIter, err := s.svc.FindProviders(ctx, cid, 0)
if err != nil {
if errors.Is(err, routing.ErrNotFound) {
// handlerFunc takes care of setting the 404 and necessary headers
Expand All @@ -298,14 +314,15 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) {
}
}

handlerFunc(w, provIter, filterAddrs, filterProtocols)
handlerFunc(w, provIter, recordsLimit, filterAddrs, filterProtocols)
}

func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) {
func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record], recordsLimit int, filterAddrs, filterProtocols []string) {
defer provIter.Close()

filteredIter := filters.ApplyFiltersToIter(provIter, filterAddrs, filterProtocols)
providers, err := iter.ReadAllResults(filteredIter)
var limitedIter iter.ResultIter[types.Record] = iter.Limit(filteredIter, recordsLimit)
providers, err := iter.ReadAllResults(limitedIter)
Comment on lines 321 to +325
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provIter could be unconditionally closed after iter.ReadAllResults(), which would cancel request on router faster. No need to wait for response to be sent.

Suggested change
defer provIter.Close()
filteredIter := filters.ApplyFiltersToIter(provIter, filterAddrs, filterProtocols)
providers, err := iter.ReadAllResults(filteredIter)
var limitedIter iter.ResultIter[types.Record] = iter.Limit(filteredIter, recordsLimit)
providers, err := iter.ReadAllResults(limitedIter)
filteredIter := filters.ApplyFiltersToIter(provIter, filterAddrs, filterProtocols)
var limitedIter iter.ResultIter[types.Record] = iter.Limit(filteredIter, recordsLimit)
providers, err := iter.ReadAllResults(limitedIter)
provIter.Close()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even better to close limitedIter?

if err != nil {
writeErr(w, "FindProviders", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err))
return
Expand All @@ -321,10 +338,10 @@ func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIt
})
}

func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record], filterAddrs, filterProtocols []string) {
func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record], recordsLimit int, filterAddrs, filterProtocols []string) {
filteredIter := filters.ApplyFiltersToIter(provIter, filterAddrs, filterProtocols)

writeResultsIterNDJSON(w, filteredIter)
var limitedIter iter.ResultIter[types.Record] = iter.Limit(filteredIter, recordsLimit)
writeResultsIterNDJSON(w, limitedIter)
}

func (s *server) findPeers(w http.ResponseWriter, r *http.Request) {
Expand All @@ -346,7 +363,7 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) {
}

var (
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[*types.PeerRecord], filterAddrs, filterProtocols []string)
handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[*types.PeerRecord], recordsLimit int, filterAddrs, filterProtocols []string)
recordsLimit int
)

Expand All @@ -362,7 +379,9 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), s.routingTimeout)
defer cancel()

provIter, err := s.svc.FindPeers(ctx, pid, recordsLimit)
// Pass 0 (unbounded) to the delegate and enforce recordsLimit here,
// after filtering. See findProviders for the rationale.
provIter, err := s.svc.FindPeers(ctx, pid, 0)
if err != nil {
if errors.Is(err, routing.ErrNotFound) {
// handlerFunc takes care of setting the 404 and necessary headers
Expand All @@ -373,7 +392,7 @@ func (s *server) findPeers(w http.ResponseWriter, r *http.Request) {
}
}

handlerFunc(w, provIter, filterAddrs, filterProtocols)
handlerFunc(w, provIter, recordsLimit, filterAddrs, filterProtocols)
}

func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) {
Expand Down Expand Up @@ -438,12 +457,13 @@ func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) {
writeJSONResult(w, "Provide", resp)
}

func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord], filterAddrs, filterProtocols []string) {
func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord], recordsLimit int, filterAddrs, filterProtocols []string) {
defer peersIter.Close()

peersIter = filters.ApplyFiltersToPeerRecordIter(peersIter, filterAddrs, filterProtocols)
filteredIter := filters.ApplyFiltersToPeerRecordIter(peersIter, filterAddrs, filterProtocols)
var limitedIter iter.ResultIter[*types.PeerRecord] = iter.Limit(filteredIter, recordsLimit)

peers, err := iter.ReadAllResults(peersIter)
peers, err := iter.ReadAllResults(limitedIter)
Comment on lines 461 to +466
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reasoning as above, no need to keep query longer than required.

Suggested change
defer peersIter.Close()
peersIter = filters.ApplyFiltersToPeerRecordIter(peersIter, filterAddrs, filterProtocols)
filteredIter := filters.ApplyFiltersToPeerRecordIter(peersIter, filterAddrs, filterProtocols)
var limitedIter iter.ResultIter[*types.PeerRecord] = iter.Limit(filteredIter, recordsLimit)
peers, err := iter.ReadAllResults(peersIter)
peers, err := iter.ReadAllResults(limitedIter)
filteredIter := filters.ApplyFiltersToPeerRecordIter(peersIter, filterAddrs, filterProtocols)
var limitedIter iter.ResultIter[*types.PeerRecord] = iter.Limit(filteredIter, recordsLimit)
peers, err := iter.ReadAllResults(limitedIter)
peersIter.Close()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even better to close limitedIter?

if err != nil {
writeErr(w, "FindPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err))
return
Expand All @@ -459,7 +479,7 @@ func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[
})
}

func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord], filterAddrs, filterProtocols []string) {
func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[*types.PeerRecord], recordsLimit int, filterAddrs, filterProtocols []string) {
// Convert PeerRecord to Record so that we can reuse the filtering logic from findProviders
mappedIter := iter.Map(peersIter, func(v iter.Result[*types.PeerRecord]) iter.Result[types.Record] {
if v.Err != nil || v.Val == nil {
Expand All @@ -471,7 +491,8 @@ func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIte
})

filteredIter := filters.ApplyFiltersToIter(mappedIter, filterAddrs, filterProtocols)
writeResultsIterNDJSON(w, filteredIter)
var limitedIter iter.ResultIter[types.Record] = iter.Limit(filteredIter, recordsLimit)
writeResultsIterNDJSON(w, limitedIter)
}

func (s *server) GetIPNS(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading
Loading