Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/evm/server/force_inclusion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ func (m *mockDA) Get(ctx context.Context, ids []da.ID, namespace []byte) ([]da.B
return nil, nil
}

func (m *mockDA) Subscribe(_ context.Context, _ []byte) (<-chan da.SubscriptionEvent, error) {
// Not needed in these tests; return a closed channel.
ch := make(chan da.SubscriptionEvent)
close(ch)
return ch, nil
}

func (m *mockDA) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error) {
return nil, nil
}
Expand Down
65 changes: 65 additions & 0 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,71 @@ func (c *client) HasForcedInclusionNamespace() bool {
return c.hasForcedNamespace
}

// Subscribe subscribes to blobs in the given namespace via the celestia-node
// Subscribe API. It returns a channel that emits a SubscriptionEvent for every
// DA block containing a matching blob. The channel is closed when ctx is
// cancelled. The caller must drain the channel after cancellation to avoid
// goroutine leaks.
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
ns, err := share.NewNamespaceFromBytes(namespace)
if err != nil {
return nil, fmt.Errorf("invalid namespace: %w", err)
}

rawCh, err := c.blobAPI.Subscribe(ctx, ns)
if err != nil {
return nil, fmt.Errorf("blob subscribe: %w", err)
}

out := make(chan datypes.SubscriptionEvent, 16)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case resp, ok := <-rawCh:
if !ok {
return
}
if resp == nil {
continue
}
select {
case out <- datypes.SubscriptionEvent{
Height: resp.Height,
Blobs: extractBlobData(resp),
}:
case <-ctx.Done():
return
}
}
}
}()

return out, nil
}

// extractBlobData extracts raw byte slices from a subscription response,
// filtering out nil blobs, empty data, and blobs exceeding DefaultMaxBlobSize.
func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte {
if resp == nil || len(resp.Blobs) == 0 {
return nil
}
blobs := make([][]byte, 0, len(resp.Blobs))
for _, blob := range resp.Blobs {
if blob == nil {
continue
}
data := blob.Data()
if len(data) == 0 || len(data) > common.DefaultMaxBlobSize {
continue
}
blobs = append(blobs, data)
}
return blobs
}

// Get fetches blobs by their IDs. Used for visualization and fetching specific blobs.
func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) {
if len(ids) == 0 {
Expand Down
5 changes: 5 additions & 0 deletions block/internal/da/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type Client interface {
// Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs.
Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)

// Subscribe returns a channel that emits one SubscriptionEvent per DA block
// that contains a blob in the given namespace. The channel is closed when ctx
// is cancelled. Callers MUST drain the channel after cancellation.
Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error)

// GetLatestDAHeight returns the latest height available on the DA layer.
GetLatestDAHeight(ctx context.Context) (uint64, error)

Expand Down
3 changes: 3 additions & 0 deletions block/internal/da/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func (t *tracedClient) GetForcedInclusionNamespace() []byte {
func (t *tracedClient) HasForcedInclusionNamespace() bool {
return t.inner.HasForcedInclusionNamespace()
}
func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
return t.inner.Subscribe(ctx, namespace)
}

type submitError struct{ msg string }

Expand Down
8 changes: 8 additions & 0 deletions block/internal/da/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type mockFullClient struct {
getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)
getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error)
validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error)
subscribeFn func(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error)
}

func (m *mockFullClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
if m.subscribeFn == nil {
panic("not expected to be called")
}
return m.subscribeFn(ctx, namespace)
}

func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit {
Expand Down
Loading
Loading