Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The following emojis are used to highlight certain changes:

### Added

- `retrieval`: added `State.Snapshot`, `State.Apply`, and `State.Notify` so consumers can stream `State` across a process boundary, e.g. to drive a live progress bar in Kubo's `cat`, `get`, or `dag export`. [#1153](https://github.com/ipfs/boxo/pull/1153)
- 🛠 `pinning/pinner`: added `Pinner.Close() error`. Close cancels every in-flight operation's context, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, and waits for them to return. A scalar method that observes the cancellation may return `context.Canceled`; a stream interrupted by Close may surface `ErrClosed` on the channel before it closes. After Close returns, every other method returns the new `ErrClosed` sentinel; streaming methods deliver it as `StreamedPin.Err` on a single entry, then close the channel. Close is idempotent and goroutine-safe. **Action required:** downstream `Pinner` implementations must add `Close`. [#1150](https://github.com/ipfs/boxo/pull/1150)
- `pinning/pinner/dspinner`: implements `Close`. Close cancels the contexts of in-flight operations, so snapshot iteration in `RecursiveKeys`/`DirectKeys` and DAG fetches in `Pin` bail out promptly instead of draining to completion. Close returns as soon as those operations honor their ctx. Hosts owning the datastore should call `Close` on the pinner before closing the datastore to avoid the use-after-close panic path in stores such as pebble. [#1150](https://github.com/ipfs/boxo/pull/1150)

Expand All @@ -40,6 +41,8 @@ The following emojis are used to highlight certain changes:

See [ipfs/kubo#11254](https://github.com/ipfs/kubo/pull/11254) for a worked example of the call-site update. [#1128](https://github.com/ipfs/boxo/pull/1128)

- `path/resolver`: `ResolveToLastNode`, `ResolvePath`, and `ResolvePathComponents` now populate `retrieval.State` on the request context when one is attached. They advance the state to `PhasePathResolution`, record the root CID from the input path, and record the terminal CID once resolution completes. Until now only the gateway backends populated these fields, leaving non-gateway callers (CLIs, custom tools) without phase or CID diagnostics on retrieval errors. The new calls are idempotent with the existing gateway-side ones, so behavior on the gateway path is unchanged.

### Removed

### Fixed
Expand Down
37 changes: 36 additions & 1 deletion path/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/boxo/fetcher"
fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers"
"github.com/ipfs/boxo/path"
"github.com/ipfs/boxo/retrieval"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -81,8 +82,10 @@ func (r *basicResolver) ResolveToLastNode(ctx context.Context, fpath path.Immuta
defer span.End()

c, remainder := fpath.RootCid(), fpath.Segments()[2:]
enterPathResolution(ctx, c)

if len(remainder) == 0 {
setTerminalCid(ctx, c)
return c, nil, nil
}

Expand Down Expand Up @@ -121,6 +124,7 @@ func (r *basicResolver) ResolveToLastNode(ctx context.Context, fpath path.Immuta

// if last node is not a link, just return it's cid, add path to remainder and return
if nd.Kind() != ipld.Kind_Link {
setTerminalCid(ctx, lastCid)
// return the cid and the remainder of the path
return lastCid, remainder[len(remainder)-depth-1:], nil
}
Expand All @@ -135,6 +139,7 @@ func (r *basicResolver) ResolveToLastNode(ctx context.Context, fpath path.Immuta
return cid.Cid{}, nil, fmt.Errorf("path %v resolves to a link that is not a cid link: %v", fpath, lnk)
}

setTerminalCid(ctx, clnk.Cid)
return clnk.Cid, []string{}, nil
}

Expand All @@ -147,6 +152,7 @@ func (r *basicResolver) ResolvePath(ctx context.Context, fpath path.ImmutablePat
defer span.End()

c, remainder := fpath.RootCid(), fpath.Segments()[2:]
enterPathResolution(ctx, c)

// create a selector to traverse all path segments but only match the last
pathSelector := pathLeafSelector(remainder)
Expand All @@ -158,6 +164,7 @@ func (r *basicResolver) ResolvePath(ctx context.Context, fpath path.ImmutablePat
if len(nodes) < 1 {
return nil, nil, fmt.Errorf("path %v did not resolve to a node", fpath)
}
setTerminalCid(ctx, c)
return nodes[len(nodes)-1], cidlink.Link{Cid: c}, nil
}

Expand All @@ -172,11 +179,15 @@ func (r *basicResolver) ResolvePathComponents(ctx context.Context, fpath path.Im
defer log.Debugw("resolvePathComponents", "fpath", fpath, "error", err)

c, remainder := fpath.RootCid(), fpath.Segments()[2:]
enterPathResolution(ctx, c)

// create a selector to traverse and match all path segments
pathSelector := pathAllSelector(remainder)

nodes, _, _, err = r.resolveNodes(ctx, c, pathSelector)
nodes, terminal, _, err := r.resolveNodes(ctx, c, pathSelector)
if err == nil && terminal.Defined() {
setTerminalCid(ctx, terminal)
}
return nodes, err
}

Expand Down Expand Up @@ -246,3 +257,27 @@ func pathSelector(path []string, ssb builder.SelectorSpecBuilder, reduce func(st
func startSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("boxo/path/resolver").Start(ctx, "Path."+name, opts...)
}

// enterPathResolution advances retrieval state into PhasePathResolution and
// records the root CID, when a [retrieval.State] is attached to ctx. It is a
// no-op otherwise. Calls are idempotent: SetPhase is monotonic, and SetRootCID
// is last-write-wins under a mutex.
func enterPathResolution(ctx context.Context, root cid.Cid) {
if rs := retrieval.StateFromContext(ctx); rs != nil {
rs.SetPhase(retrieval.PhasePathResolution)
if root.Defined() {
rs.SetRootCID(root)
}
}
}

// setTerminalCid records the CID of the terminating DAG entity on the resolved
// path, when a [retrieval.State] is attached to ctx. Otherwise it is a no-op.
func setTerminalCid(ctx context.Context, terminal cid.Cid) {
if !terminal.Defined() {
return
}
if rs := retrieval.StateFromContext(ctx); rs != nil {
rs.SetTerminalCID(terminal)
}
}
73 changes: 73 additions & 0 deletions path/resolver/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
dagmock "github.com/ipfs/boxo/ipld/merkledag/test"
"github.com/ipfs/boxo/path"
"github.com/ipfs/boxo/path/resolver"
"github.com/ipfs/boxo/retrieval"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-unixfsnode"
Expand Down Expand Up @@ -265,3 +266,75 @@ func TestResolveToLastNode_MixedSegmentTypes(t *testing.T) {
require.Equal(t, 0, len(remainder))
require.True(t, cid.Equals(a.Cid()))
}

// TestRetrievalStatePropagation verifies that the resolver advances
// retrieval.State into PhasePathResolution and records both the root and
// terminal CIDs when a State is attached to the request context. This is what
// lets non-gateway callers (like kubo's CLI) get phase + CID diagnostics for
// free, without each call site having to hand-set them.
func TestRetrievalStatePropagation(t *testing.T) {
bsrv := dagmock.Bserv()

root := randNode()
mid := randNode()
leaf := randNode()
require.NoError(t, mid.AddNodeLink("grandchild", leaf))
require.NoError(t, root.AddNodeLink("child", mid))
for _, n := range []*merkledag.ProtoNode{root, mid, leaf} {
require.NoError(t, bsrv.AddBlock(t.Context(), n))
}

fetcherFactory := bsfetcher.NewFetcherConfig(bsrv)
fetcherFactory.NodeReifier = unixfsnode.Reify
fetcherFactory.PrototypeChooser = dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) {
if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok {
return tlnkNd.LinkTargetNodePrototype(), nil
}
return basicnode.Prototype.Any, nil
})
r := resolver.NewBasicResolver(fetcherFactory)

p, err := path.Join(path.FromCid(root.Cid()), "child", "grandchild")
require.NoError(t, err)
imPath, err := path.NewImmutablePath(p)
require.NoError(t, err)

t.Run("ResolveToLastNode populates state", func(t *testing.T) {
ctx, rs := retrieval.ContextWithState(t.Context())
require.Equal(t, retrieval.PhaseInitializing, rs.GetPhase())

_, _, err := r.ResolveToLastNode(ctx, imPath)
require.NoError(t, err)

require.GreaterOrEqual(t, int(rs.GetPhase()), int(retrieval.PhasePathResolution))
require.True(t, rs.GetRootCID().Equals(root.Cid()), "root CID should match path root")
require.True(t, rs.GetTerminalCID().Equals(leaf.Cid()), "terminal CID should match resolved leaf")
})

t.Run("ResolvePath populates state", func(t *testing.T) {
ctx, rs := retrieval.ContextWithState(t.Context())

_, _, err := r.ResolvePath(ctx, imPath)
require.NoError(t, err)

require.GreaterOrEqual(t, int(rs.GetPhase()), int(retrieval.PhasePathResolution))
require.True(t, rs.GetRootCID().Equals(root.Cid()))
require.True(t, rs.GetTerminalCID().Equals(leaf.Cid()))
})

t.Run("CID-only path sets terminal to root", func(t *testing.T) {
ctx, rs := retrieval.ContextWithState(t.Context())

_, _, err := r.ResolveToLastNode(ctx, path.FromCid(root.Cid()))
require.NoError(t, err)

require.True(t, rs.GetRootCID().Equals(root.Cid()))
require.True(t, rs.GetTerminalCID().Equals(root.Cid()),
"for /ipfs/<cid> with no path, root and terminal should match")
})

t.Run("no state on context is a no-op", func(t *testing.T) {
_, _, err := r.ResolveToLastNode(t.Context(), imPath)
require.NoError(t, err)
})
}
Loading
Loading