diff --git a/apps/evm/cmd/run.go b/apps/evm/cmd/run.go index 60730eac52..0d3fc25f14 100644 --- a/apps/evm/cmd/run.go +++ b/apps/evm/cmd/run.go @@ -60,7 +60,7 @@ var RunCmd = &cobra.Command{ return err } - blobClient, err := blobrpc.NewClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") + blobClient, err := blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") if err != nil { return fmt.Errorf("failed to create blob client: %w", err) } diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index d04a356622..6be6e6ca76 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -50,6 +50,13 @@ func (m *mockDA) Get(ctx context.Context, ids []da.ID, namespace []byte) ([]da.B return nil, nil } +func (m *mockDA) Subscribe(_ context.Context, _ []byte) (<-chan da.SubscriptionEvent, error) { + // Not needed in these tests; return a closed channel. + ch := make(chan da.SubscriptionEvent) + close(ch) + return ch, nil +} + func (m *mockDA) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error) { return nil, nil } diff --git a/apps/grpc/cmd/run.go b/apps/grpc/cmd/run.go index 41278999cb..6db5a409da 100644 --- a/apps/grpc/cmd/run.go +++ b/apps/grpc/cmd/run.go @@ -108,7 +108,7 @@ func createSequencer( genesis genesis.Genesis, executor execution.Executor, ) (coresequencer.Sequencer, error) { - blobClient, err := blobrpc.NewClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") + blobClient, err := blobrpc.NewWSClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") if err != nil { return nil, fmt.Errorf("failed to create blob client: %w", err) } diff --git a/apps/testapp/cmd/run.go b/apps/testapp/cmd/run.go index fe026cdfcc..fff3a9e064 100644 --- a/apps/testapp/cmd/run.go +++ b/apps/testapp/cmd/run.go @@ -111,7 +111,7 @@ func createSequencer( genesis genesis.Genesis, executor execution.Executor, ) (coresequencer.Sequencer, error) { - blobClient, err := blobrpc.NewClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") + blobClient, err := blobrpc.NewWSClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") if err != nil { return nil, fmt.Errorf("failed to create blob client: %w", err) } diff --git a/block/internal/da/client.go b/block/internal/da/client.go index a92a9eef28..bb0f118436 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -350,6 +350,71 @@ func (c *client) HasForcedInclusionNamespace() bool { return c.hasForcedNamespace } +// Subscribe subscribes to blobs in the given namespace via the celestia-node +// Subscribe API. It returns a channel that emits a SubscriptionEvent for every +// DA block containing a matching blob. The channel is closed when ctx is +// cancelled. The caller must drain the channel after cancellation to avoid +// goroutine leaks. +func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { + ns, err := share.NewNamespaceFromBytes(namespace) + if err != nil { + return nil, fmt.Errorf("invalid namespace: %w", err) + } + + rawCh, err := c.blobAPI.Subscribe(ctx, ns) + if err != nil { + return nil, fmt.Errorf("blob subscribe: %w", err) + } + + out := make(chan datypes.SubscriptionEvent, 16) + go func() { + defer close(out) + for { + select { + case <-ctx.Done(): + return + case resp, ok := <-rawCh: + if !ok { + return + } + if resp == nil { + continue + } + select { + case out <- datypes.SubscriptionEvent{ + Height: resp.Height, + Blobs: extractBlobData(resp), + }: + case <-ctx.Done(): + return + } + } + } + }() + + return out, nil +} + +// extractBlobData extracts raw byte slices from a subscription response, +// filtering out nil blobs, empty data, and blobs exceeding DefaultMaxBlobSize. +func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte { + if resp == nil || len(resp.Blobs) == 0 { + return nil + } + blobs := make([][]byte, 0, len(resp.Blobs)) + for _, blob := range resp.Blobs { + if blob == nil { + continue + } + data := blob.Data() + if len(data) == 0 || len(data) > common.DefaultMaxBlobSize { + continue + } + blobs = append(blobs, data) + } + return blobs +} + // Get fetches blobs by their IDs. Used for visualization and fetching specific blobs. func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { if len(ids) == 0 { diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index dd7a15d8f9..3cc0677580 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -17,6 +17,11 @@ type Client interface { // Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs. Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) + // Subscribe returns a channel that emits one SubscriptionEvent per DA block + // that contains a blob in the given namespace. The channel is closed when ctx + // is cancelled. Callers MUST drain the channel after cancellation. + Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) + // GetLatestDAHeight returns the latest height available on the DA layer. GetLatestDAHeight(ctx context.Context) (uint64, error) diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index 4d946a8b74..161293c2c3 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -145,6 +145,9 @@ func (t *tracedClient) GetForcedInclusionNamespace() []byte { func (t *tracedClient) HasForcedInclusionNamespace() bool { return t.inner.HasForcedInclusionNamespace() } +func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { + return t.inner.Subscribe(ctx, namespace) +} type submitError struct{ msg string } diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index de32532a31..e20bf3b84b 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -22,6 +22,14 @@ type mockFullClient struct { getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) + subscribeFn func(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) +} + +func (m *mockFullClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { + if m.subscribeFn == nil { + panic("not expected to be called") + } + return m.subscribeFn(ctx, namespace) } func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit { diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go new file mode 100644 index 0000000000..e92d5be1ab --- /dev/null +++ b/block/internal/syncing/da_follower.go @@ -0,0 +1,437 @@ +package syncing + +import ( + "bytes" + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/rs/zerolog" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// DAFollower subscribes to DA blob events and drives sequential catchup. +// +// Architecture: +// - followLoop listens on the subscription channel. When caught up, it processes +// subscription blobs inline (fast path, no DA re-fetch). Otherwise, it updates +// highestSeenDAHeight and signals the catchup loop. +// - catchupLoop sequentially retrieves from localDAHeight → highestSeenDAHeight, +// piping events to the Syncer's heightInCh. +// +// The two goroutines share only atomic state; no mutexes needed. +type DAFollower interface { + // Start begins the follow and catchup loops. + Start(ctx context.Context) error + // Stop cancels the context and waits for goroutines. + Stop() + // HasReachedHead returns true once the catchup loop has processed the + // DA head at least once. Once true, it stays true. + HasReachedHead() bool +} + +// daFollower is the concrete implementation of DAFollower. +type daFollower struct { + client da.Client + retriever DARetriever + logger zerolog.Logger + + // pipeEvent sends a DA height event to the syncer's processLoop. + pipeEvent func(ctx context.Context, event common.DAHeightEvent) error + + // Namespace to subscribe on (header namespace). + namespace []byte + // dataNamespace is the data namespace (may equal namespace when header+data + // share the same namespace). When different, we subscribe to both and merge. + dataNamespace []byte + + // localDAHeight is only written by catchupLoop and read by followLoop + // to determine whether a catchup is needed. + localDAHeight atomic.Uint64 + + // highestSeenDAHeight is written by followLoop and read by catchupLoop. + highestSeenDAHeight atomic.Uint64 + + // headReached tracks whether the follower has caught up to DA head. + headReached atomic.Bool + + // catchupSignal is sent by followLoop to wake catchupLoop when a new + // height is seen that is above localDAHeight. + catchupSignal chan struct{} + + // daBlockTime is used as a backoff before retrying after errors. + daBlockTime time.Duration + + // lifecycle + cancel context.CancelFunc + wg sync.WaitGroup +} + +// DAFollowerConfig holds configuration for creating a DAFollower. +type DAFollowerConfig struct { + Client da.Client + Retriever DARetriever + Logger zerolog.Logger + PipeEvent func(ctx context.Context, event common.DAHeightEvent) error + Namespace []byte + DataNamespace []byte // may be nil or equal to Namespace + StartDAHeight uint64 + DABlockTime time.Duration +} + +// NewDAFollower creates a new daFollower. +func NewDAFollower(cfg DAFollowerConfig) DAFollower { + dataNs := cfg.DataNamespace + if len(dataNs) == 0 { + dataNs = cfg.Namespace + } + f := &daFollower{ + client: cfg.Client, + retriever: cfg.Retriever, + logger: cfg.Logger.With().Str("component", "da_follower").Logger(), + pipeEvent: cfg.PipeEvent, + namespace: cfg.Namespace, + dataNamespace: dataNs, + catchupSignal: make(chan struct{}, 1), + daBlockTime: cfg.DABlockTime, + } + f.localDAHeight.Store(cfg.StartDAHeight) + return f +} + +// Start begins the follow and catchup goroutines. +func (f *daFollower) Start(ctx context.Context) error { + ctx, f.cancel = context.WithCancel(ctx) + + f.wg.Add(2) + go f.followLoop(ctx) + go f.catchupLoop(ctx) + + f.logger.Info(). + Uint64("start_da_height", f.localDAHeight.Load()). + Msg("DA follower started") + return nil +} + +// Stop cancels and waits. +func (f *daFollower) Stop() { + if f.cancel != nil { + f.cancel() + } + f.wg.Wait() +} + +// HasReachedHead returns whether the DA head has been reached at least once. +func (f *daFollower) HasReachedHead() bool { + return f.headReached.Load() +} + +// signalCatchup sends a non-blocking signal to wake catchupLoop. +func (f *daFollower) signalCatchup() { + select { + case f.catchupSignal <- struct{}{}: + default: + // Already signaled, catchupLoop will pick up the new highestSeen. + } +} + +// followLoop subscribes to DA blob events and keeps highestSeenDAHeight up to date. +// When a new height appears above localDAHeight, it wakes the catchup loop. +func (f *daFollower) followLoop(ctx context.Context) { + defer f.wg.Done() + + f.logger.Info().Msg("starting follow loop") + defer f.logger.Info().Msg("follow loop stopped") + + for { + if err := f.runSubscription(ctx); err != nil { + if ctx.Err() != nil { + return + } + f.logger.Warn().Err(err).Msg("DA subscription failed, reconnecting") + select { + case <-ctx.Done(): + return + case <-time.After(f.backoff()): + } + } + } +} + +// runSubscription opens subscriptions on both header and data namespaces (if +// different) and processes events until a channel is closed or an error occurs. +// A watchdog timer triggers if no events arrive within watchdogTimeout(), +// causing a reconnect. +func (f *daFollower) runSubscription(ctx context.Context) error { + // Sub-context ensures the merge goroutine is cancelled when this function returns. + subCtx, subCancel := context.WithCancel(ctx) + defer subCancel() + + headerCh, err := f.client.Subscribe(subCtx, f.namespace) + if err != nil { + return fmt.Errorf("subscribe header namespace: %w", err) + } + + // If namespaces differ, subscribe to the data namespace too and fan-in. + ch := headerCh + if !bytes.Equal(f.namespace, f.dataNamespace) { + dataCh, err := f.client.Subscribe(subCtx, f.dataNamespace) + if err != nil { + return fmt.Errorf("subscribe data namespace: %w", err) + } + ch = f.mergeSubscriptions(subCtx, headerCh, dataCh) + } + + watchdogTimeout := f.watchdogTimeout() + watchdog := time.NewTimer(watchdogTimeout) + defer watchdog.Stop() + + for { + select { + case <-subCtx.Done(): + return subCtx.Err() + case ev, ok := <-ch: + if !ok { + return errors.New("subscription channel closed") + } + f.handleSubscriptionEvent(ctx, ev) + watchdog.Reset(watchdogTimeout) + case <-watchdog.C: + return errors.New("subscription watchdog: no events received, reconnecting") + } + } +} + +// mergeSubscriptions fans two subscription channels into one. +func (f *daFollower) mergeSubscriptions( + ctx context.Context, + headerCh, dataCh <-chan datypes.SubscriptionEvent, +) <-chan datypes.SubscriptionEvent { + out := make(chan datypes.SubscriptionEvent, 16) + go func() { + defer close(out) + for headerCh != nil || dataCh != nil { + var ev datypes.SubscriptionEvent + var ok bool + select { + case <-ctx.Done(): + return + case ev, ok = <-headerCh: + if !ok { + headerCh = nil + continue + } + case ev, ok = <-dataCh: + if !ok { + dataCh = nil + continue + } + } + select { + case out <- ev: + case <-ctx.Done(): + return + } + } + }() + return out +} + +// handleSubscriptionEvent processes a subscription event. When the follower is +// caught up (ev.Height == localDAHeight) and blobs are available, it processes +// them inline — avoiding a DA re-fetch round trip. Otherwise, it just updates +// highestSeenDAHeight and lets catchupLoop handle retrieval. +// +// Uses CAS on localDAHeight to claim exclusive access to processBlobs, +// preventing concurrent map access with catchupLoop. +func (f *daFollower) handleSubscriptionEvent(ctx context.Context, ev datypes.SubscriptionEvent) { + // Always record the highest height we've seen from the subscription. + f.updateHighest(ev.Height) + + // Fast path: try to claim this height for inline processing. + // CAS(N, N+1) ensures only one goroutine (followLoop or catchupLoop) + // can enter processBlobs for height N. + if len(ev.Blobs) > 0 && f.localDAHeight.CompareAndSwap(ev.Height, ev.Height+1) { + events := f.retriever.ProcessBlobs(ctx, ev.Blobs, ev.Height) + for _, event := range events { + if err := f.pipeEvent(ctx, event); err != nil { + // Roll back so catchupLoop can retry this height. + f.localDAHeight.Store(ev.Height) + f.logger.Warn().Err(err).Uint64("da_height", ev.Height). + Msg("failed to pipe inline event, catchup will retry") + return + } + } + if len(events) != 0 { + f.headReached.Store(true) + f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)). + Msg("processed subscription blobs inline (fast path)") + } else { + // No complete events (split namespace, waiting for other half). + f.localDAHeight.Store(ev.Height) + } + return + } + + // Slow path: behind, no blobs, or catchupLoop claimed this height. +} + +// updateHighest atomically bumps highestSeenDAHeight and signals catchup if needed. +func (f *daFollower) updateHighest(height uint64) { + for { + cur := f.highestSeenDAHeight.Load() + if height <= cur { + return + } + if f.highestSeenDAHeight.CompareAndSwap(cur, height) { + f.signalCatchup() + return + } + } +} + +// catchupLoop waits for signals and sequentially retrieves DA heights +// from localDAHeight up to highestSeenDAHeight. +func (f *daFollower) catchupLoop(ctx context.Context) { + defer f.wg.Done() + + f.logger.Info().Msg("starting catchup loop") + defer f.logger.Info().Msg("catchup loop stopped") + + for { + select { + case <-ctx.Done(): + return + case <-f.catchupSignal: + f.runCatchup(ctx) + } + } +} + +// runCatchup sequentially retrieves from localDAHeight to highestSeenDAHeight. +// It handles priority heights first, then sequential heights. +func (f *daFollower) runCatchup(ctx context.Context) { + for { + if ctx.Err() != nil { + return + } + + // Check for priority heights from P2P hints first. + if priorityHeight := f.retriever.PopPriorityHeight(); priorityHeight > 0 { + currentHeight := f.localDAHeight.Load() + if priorityHeight < currentHeight { + continue + } + f.logger.Debug(). + Uint64("da_height", priorityHeight). + Msg("fetching priority DA height from P2P hint") + if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { + if !f.waitOnCatchupError(ctx, err, priorityHeight) { + return + } + } + continue + } + + // Sequential catchup. + local := f.localDAHeight.Load() + highest := f.highestSeenDAHeight.Load() + + if local > highest { + // Caught up. + f.headReached.Store(true) + return + } + + // CAS claims this height prevents followLoop from inline-processing + if !f.localDAHeight.CompareAndSwap(local, local+1) { + // followLoop already advanced past this height via inline processing. + continue + } + + if err := f.fetchAndPipeHeight(ctx, local); err != nil { + // Roll back so we can retry after backoff. + f.localDAHeight.Store(local) + if !f.waitOnCatchupError(ctx, err, local) { + return + } + continue + } + } +} + +// fetchAndPipeHeight retrieves events at a single DA height and pipes them +// to the syncer. +func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) error { + events, err := f.retriever.RetrieveFromDA(ctx, daHeight) + if err != nil { + switch { + case errors.Is(err, datypes.ErrBlobNotFound): + // No blobs at this height — not an error, just skip. + return nil + case errors.Is(err, datypes.ErrHeightFromFuture): + // DA hasn't produced this height yet — mark head reached + // but return the error to trigger a backoff retry. + f.headReached.Store(true) + return err + default: + return err + } + } + + for _, event := range events { + if err := f.pipeEvent(ctx, event); err != nil { + return err + } + } + + return nil +} + +// errCaughtUp is a sentinel used to signal that the catchup loop has reached DA head. +var errCaughtUp = errors.New("caught up with DA head") + +// waitOnCatchupError logs the error and backs off before retrying. +// It returns true if the caller should continue (retry), or false if the +// catchup loop should exit (context cancelled or caught-up sentinel). +func (f *daFollower) waitOnCatchupError(ctx context.Context, err error, daHeight uint64) bool { + if errors.Is(err, errCaughtUp) { + f.logger.Debug().Uint64("da_height", daHeight).Msg("DA catchup reached head, waiting for subscription signal") + return false + } + if ctx.Err() != nil { + return false + } + f.logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("catchup error, backing off") + select { + case <-ctx.Done(): + return false + case <-time.After(f.backoff()): + return true + } +} + +// backoff returns the configured DA block time or a sane default. +func (f *daFollower) backoff() time.Duration { + if f.daBlockTime > 0 { + return f.daBlockTime + } + return 2 * time.Second +} + +// watchdogTimeout returns how long to wait for a subscription event before +// assuming the subscription is stalled. Defaults to 3× the DA block time. +const watchdogMultiplier = 3 + +func (f *daFollower) watchdogTimeout() time.Duration { + if f.daBlockTime > 0 { + return f.daBlockTime * watchdogMultiplier + } + return 30 * time.Second +} diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index a6c9d43c7c..782fa9c318 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -24,6 +24,9 @@ import ( type DARetriever interface { // RetrieveFromDA retrieves blocks from the specified DA height and returns height events RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) + // ProcessBlobs parses raw blob bytes at a given DA height into height events. + // Used by the DAFollower to process subscription blobs inline without re-fetching. + ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent // QueuePriorityHeight queues a DA height for priority retrieval (from P2P hints). // These heights take precedence over sequential fetching. QueuePriorityHeight(daHeight uint64) @@ -191,6 +194,15 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight } } +// ProcessBlobs processes raw blob bytes to extract headers and data and returns height events. +// This is the public interface used by the DAFollower for inline subscription processing. +// +// NOT thread-safe: the caller (DAFollower) must ensure exclusive access via CAS +// on localDAHeight before calling this method. +func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + return r.processBlobs(ctx, blobs, daHeight) +} + // processBlobs processes retrieved blobs to extract headers and data and returns height events func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { // Decode all blobs diff --git a/block/internal/syncing/da_retriever_mock.go b/block/internal/syncing/da_retriever_mock.go index 2e191c8851..dc6926485e 100644 --- a/block/internal/syncing/da_retriever_mock.go +++ b/block/internal/syncing/da_retriever_mock.go @@ -189,3 +189,68 @@ func (_c *MockDARetriever_RetrieveFromDA_Call) RunAndReturn(run func(ctx context _c.Call.Return(run) return _c } + +// ProcessBlobs provides a mock function for the type MockDARetriever +func (_mock *MockDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + ret := _mock.Called(ctx, blobs, daHeight) + + if len(ret) == 0 { + panic("no return value specified for ProcessBlobs") + } + + var r0 []common.DAHeightEvent + if returnFunc, ok := ret.Get(0).(func(context.Context, [][]byte, uint64) []common.DAHeightEvent); ok { + r0 = returnFunc(ctx, blobs, daHeight) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]common.DAHeightEvent) + } + } + return r0 +} + +// MockDARetriever_ProcessBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessBlobs' +type MockDARetriever_ProcessBlobs_Call struct { + *mock.Call +} + +// ProcessBlobs is a helper method to define mock.On call +// - ctx context.Context +// - blobs [][]byte +// - daHeight uint64 +func (_e *MockDARetriever_Expecter) ProcessBlobs(ctx interface{}, blobs interface{}, daHeight interface{}) *MockDARetriever_ProcessBlobs_Call { + return &MockDARetriever_ProcessBlobs_Call{Call: _e.mock.On("ProcessBlobs", ctx, blobs, daHeight)} +} + +func (_c *MockDARetriever_ProcessBlobs_Call) Run(run func(ctx context.Context, blobs [][]byte, daHeight uint64)) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 [][]byte + if args[1] != nil { + arg1 = args[1].([][]byte) + } + var arg2 uint64 + if args[2] != nil { + arg2 = args[2].(uint64) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockDARetriever_ProcessBlobs_Call) Return(dAHeightEvents []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Return(dAHeightEvents) + return _c +} + +func (_c *MockDARetriever_ProcessBlobs_Call) RunAndReturn(run func(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Return(run) + return _c +} diff --git a/block/internal/syncing/da_retriever_tracing.go b/block/internal/syncing/da_retriever_tracing.go index 2bc7a4094d..1e1a9ea7c0 100644 --- a/block/internal/syncing/da_retriever_tracing.go +++ b/block/internal/syncing/da_retriever_tracing.go @@ -63,3 +63,7 @@ func (t *tracedDARetriever) QueuePriorityHeight(daHeight uint64) { func (t *tracedDARetriever) PopPriorityHeight() uint64 { return t.inner.PopPriorityHeight() } + +func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + return t.inner.ProcessBlobs(ctx, blobs, daHeight) +} diff --git a/block/internal/syncing/da_retriever_tracing_test.go b/block/internal/syncing/da_retriever_tracing_test.go index 99ce1eb639..930fc43617 100644 --- a/block/internal/syncing/da_retriever_tracing_test.go +++ b/block/internal/syncing/da_retriever_tracing_test.go @@ -31,6 +31,10 @@ func (m *mockDARetriever) QueuePriorityHeight(daHeight uint64) {} func (m *mockDARetriever) PopPriorityHeight() uint64 { return 0 } +func (m *mockDARetriever) ProcessBlobs(_ context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + return nil +} + func setupDARetrieverTrace(t *testing.T, inner DARetriever) (DARetriever, *tracetest.SpanRecorder) { t.Helper() sr := tracetest.NewSpanRecorder() diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 848ae9825a..b355bcd87e 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -21,7 +21,6 @@ import ( "github.com/evstack/ev-node/block/internal/da" coreexecutor "github.com/evstack/ev-node/core/execution" "github.com/evstack/ev-node/pkg/config" - datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/store" @@ -80,6 +79,9 @@ type Syncer struct { p2pHandler p2pHandler raftRetriever *raftRetriever + // DA follower (replaces the old polling daWorkerLoop) + daFollower DAFollower + // Forced inclusion tracking forcedInclusionMu sync.RWMutex seenBlockTxs map[string]struct{} // SHA-256 hex of every tx seen in a DA-sourced block @@ -96,9 +98,6 @@ type Syncer struct { // P2P wait coordination p2pWaitState atomic.Value // stores p2pWaitState - // DA head-reached signal for recovery mode (stays true once DA head is seen) - daHeadReached atomic.Bool - // blockSyncer is the interface used for block sync operations. // defaults to self, but can be wrapped with tracing. blockSyncer BlockSyncer @@ -198,7 +197,21 @@ func (s *Syncer) Start(ctx context.Context) error { // Start main processing loop s.wg.Go(func() { s.processLoop(ctx) }) - // Start dedicated workers for DA, and pending processing + // Start the DA follower (subscribe + catchup) and other workers + s.daFollower = NewDAFollower(DAFollowerConfig{ + Client: s.daClient, + Retriever: s.daRetriever, + Logger: s.logger, + PipeEvent: s.pipeEvent, + Namespace: s.daClient.GetHeaderNamespace(), + DataNamespace: s.daClient.GetDataNamespace(), + StartDAHeight: s.daRetrieverHeight.Load(), + DABlockTime: s.config.DA.BlockTime.Duration, + }) + if err := s.daFollower.Start(ctx); err != nil { + return fmt.Errorf("failed to start DA follower: %w", err) + } + s.startSyncWorkers(ctx) s.logger.Info().Msg("syncer started") @@ -213,6 +226,12 @@ func (s *Syncer) Stop() error { s.cancel() s.cancelP2PWait(0) + + // Stop the DA follower first (it owns its own goroutines). + if s.daFollower != nil { + s.daFollower.Stop() + } + s.wg.Wait() // Skip draining if we're shutting down due to a critical error (e.g. execution @@ -360,113 +379,23 @@ func (s *Syncer) processLoop(ctx context.Context) { } func (s *Syncer) startSyncWorkers(ctx context.Context) { - s.wg.Add(3) - go s.daWorkerLoop(ctx) + // DA follower is already started in Start(). + s.wg.Add(2) go s.pendingWorkerLoop(ctx) go s.p2pWorkerLoop(ctx) } -func (s *Syncer) daWorkerLoop(ctx context.Context) { - defer s.wg.Done() - - s.logger.Info().Msg("starting DA worker") - defer s.logger.Info().Msg("DA worker stopped") - - for { - err := s.fetchDAUntilCaughtUp(ctx) - - var backoff time.Duration - if err == nil { - // No error, means we are caught up. - s.daHeadReached.Store(true) - backoff = s.config.DA.BlockTime.Duration - } else { - // Error, back off for a shorter duration. - backoff = s.config.DA.BlockTime.Duration - if backoff <= 0 { - backoff = 2 * time.Second - } - } - - select { - case <-ctx.Done(): - return - case <-time.After(backoff): - } - } -} - -// HasReachedDAHead returns true once the DA worker has reached the DA head. +// HasReachedDAHead returns true once the DA follower has caught up to the DA head. // Once set, it stays true. func (s *Syncer) HasReachedDAHead() bool { - return s.daHeadReached.Load() -} - -func (s *Syncer) fetchDAUntilCaughtUp(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - // Check for priority heights from P2P hints first - var daHeight uint64 - if priorityHeight := s.daRetriever.PopPriorityHeight(); priorityHeight > 0 { - // Skip if we've already fetched past this height - currentHeight := s.daRetrieverHeight.Load() - if priorityHeight < currentHeight { - continue - } - daHeight = priorityHeight - s.logger.Debug().Uint64("da_height", daHeight).Msg("fetching priority DA height from P2P hint") - } else { - daHeight = max(s.daRetrieverHeight.Load(), s.cache.DaHeight()) - } - - events, err := s.daRetriever.RetrieveFromDA(ctx, daHeight) - if err != nil { - switch { - case errors.Is(err, datypes.ErrBlobNotFound): - s.daRetrieverHeight.Store(daHeight + 1) - continue // Fetch next height immediately - case errors.Is(err, datypes.ErrHeightFromFuture): - s.logger.Debug().Err(err).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests") - return nil // Caught up - default: - s.logger.Error().Err(err).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests") - return err // Other errors - } - } - - if len(events) == 0 { - // This can happen if RetrieveFromDA returns no events and no error. - s.logger.Debug().Uint64("da_height", daHeight).Msg("no events returned from DA, but no error either.") - } - - // Process DA events - for _, event := range events { - if err := s.pipeEvent(ctx, event); err != nil { - return err - } - } - - // Update DA retrieval height on successful retrieval - // For priority fetches, only update if the priority height is ahead of current - // For sequential fetches, always increment - newHeight := daHeight + 1 - for { - current := s.daRetrieverHeight.Load() - if newHeight <= current { - break // Already at or past this height - } - if s.daRetrieverHeight.CompareAndSwap(current, newHeight) { - break - } - } + if s.daFollower != nil { + return s.daFollower.HasReachedHead() } + return false } +// fetchDAUntilCaughtUp was removed — the DAFollower handles this concern. + func (s *Syncer) pendingWorkerLoop(ctx context.Context) { defer s.wg.Done() diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index c2bed9385d..bd2d6000fe 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -7,27 +7,20 @@ import ( "testing/synctest" "time" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" - "github.com/evstack/ev-node/core/execution" - "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/store" - extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) -// TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff -// behavior when encountering different types of DA layer errors. -func TestSyncer_BackoffOnDAError(t *testing.T) { +// TestDAFollower_BackoffOnCatchupError verifies that the DAFollower implements +// proper backoff behavior when encountering different types of DA layer errors. +func TestDAFollower_BackoffOnCatchupError(t *testing.T) { tests := map[string]struct { daBlockTime time.Duration error error @@ -66,27 +59,22 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) defer cancel() - // Setup syncer - syncer := setupTestSyncer(t, tc.daBlockTime) - syncer.ctx = ctx - - // Setup mocks daRetriever := NewMockDARetriever(t) - p2pHandler := newMockp2pHandler(t) - p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - syncer.daRetriever = daRetriever - syncer.p2pHandler = p2pHandler - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() - - // Mock PopPriorityHeight to always return 0 (no priority heights) daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() - // Create mock stores for P2P - mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) - mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } - mockDataStore := extmocks.NewMockStore[*types.Data](t) - mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 100, + DABlockTime: tc.daBlockTime, + }).(*daFollower) + + ctx, follower.cancel = context.WithCancel(ctx) + follower.highestSeenDAHeight.Store(102) var callTimes []time.Time callCount := 0 @@ -100,17 +88,14 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { Return(nil, tc.error).Once() if tc.expectsBackoff { - // Second call should be delayed due to backoff daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). Run(func(args mock.Arguments) { callTimes = append(callTimes, time.Now()) callCount++ - // Cancel to end test cancel() }). Return(nil, datypes.ErrBlobNotFound).Once() } else { - // For ErrBlobNotFound, DA height should increment daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). Run(func(args mock.Arguments) { callTimes = append(callTimes, time.Now()) @@ -120,12 +105,9 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { Return(nil, datypes.ErrBlobNotFound).Once() } - // Run sync loop - syncer.startSyncWorkers(ctx) + go follower.runCatchup(ctx) <-ctx.Done() - syncer.wg.Wait() - // Verify behavior if tc.expectsBackoff { require.Len(t, callTimes, 2, "should make exactly 2 calls with backoff") @@ -151,35 +133,32 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { } } -// TestSyncer_BackoffResetOnSuccess verifies that backoff is properly reset -// after a successful DA retrieval, allowing the syncer to continue at normal speed. -func TestSyncer_BackoffResetOnSuccess(t *testing.T) { +// TestDAFollower_BackoffResetOnSuccess verifies that backoff is properly reset +// after a successful DA retrieval. +func TestDAFollower_BackoffResetOnSuccess(t *testing.T) { synctest.Test(t, func(t *testing.T) { - ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 15*time.Second) defer cancel() - syncer := setupTestSyncer(t, 1*time.Second) - syncer.ctx = ctx - addr, pub, signer := buildSyncTestSigner(t) - gen := syncer.genesis + gen := backoffTestGenesis(addr) daRetriever := NewMockDARetriever(t) - p2pHandler := newMockp2pHandler(t) - p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - syncer.daRetriever = daRetriever - syncer.p2pHandler = p2pHandler - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() - - // Mock PopPriorityHeight to always return 0 (no priority heights) daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() - // Create mock stores for P2P - mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) - mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } + + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 100, + DABlockTime: 1 * time.Second, + }).(*daFollower) - mockDataStore := extmocks.NewMockStore[*types.Data](t) - mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + ctx, follower.cancel = context.WithCancel(ctx) + follower.highestSeenDAHeight.Store(105) var callTimes []time.Time @@ -190,7 +169,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { }). Return(nil, errors.New("temporary failure")).Once() - // Second call - success (should reset backoff and increment DA height) + // Second call - success _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) data := &types.Data{ Metadata: &types.Metadata{ @@ -211,7 +190,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { }). Return([]common.DAHeightEvent{event}, nil).Once() - // Third call - should happen immediately after success (DA height incremented to 101) + // Third call - should happen immediately after success (DA height incremented) daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). Run(func(args mock.Arguments) { callTimes = append(callTimes, time.Now()) @@ -219,137 +198,170 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { }). Return(nil, datypes.ErrBlobNotFound).Once() - // Start process loop to handle events - go syncer.processLoop(ctx) - - // Run workers - syncer.startSyncWorkers(ctx) + go follower.runCatchup(ctx) <-ctx.Done() - syncer.wg.Wait() require.Len(t, callTimes, 3, "should make exactly 3 calls") - // Verify backoff between first and second call delay1to2 := callTimes[1].Sub(callTimes[0]) assert.GreaterOrEqual(t, delay1to2, 1*time.Second, "should have backed off between error and success (got %v)", delay1to2) - // Verify no backoff between second and third call (backoff reset) delay2to3 := callTimes[2].Sub(callTimes[1]) assert.Less(t, delay2to3, 100*time.Millisecond, "should continue immediately after success (got %v)", delay2to3) }) } -// TestSyncer_BackoffBehaviorIntegration tests the complete backoff flow: -// error -> backoff delay -> recovery -> normal operation. -func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { - // Test simpler backoff behavior: error -> backoff -> success -> continue +// TestDAFollower_CatchupThenReachHead verifies the catchup flow: +// sequential fetch from local → highest → mark head reached. +func TestDAFollower_CatchupThenReachHead(t *testing.T) { synctest.Test(t, func(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) defer cancel() - syncer := setupTestSyncer(t, 500*time.Millisecond) - syncer.ctx = ctx - daRetriever := NewMockDARetriever(t) - p2pHandler := newMockp2pHandler(t) - p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - syncer.daRetriever = daRetriever - syncer.p2pHandler = p2pHandler - - // Mock PopPriorityHeight to always return 0 (no priority heights) daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() - // Create mock stores for P2P - mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) - mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } - mockDataStore := extmocks.NewMockStore[*types.Data](t) - mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 3, + DABlockTime: 500 * time.Millisecond, + }).(*daFollower) - var callTimes []time.Time - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() + follower.highestSeenDAHeight.Store(5) - // First call - error (triggers backoff) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - }). - Return(nil, errors.New("network error")).Once() + var fetchedHeights []uint64 - // Second call - should be delayed due to backoff - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - }). - Return(nil, datypes.ErrBlobNotFound).Once() + for h := uint64(3); h <= 5; h++ { + daRetriever.On("RetrieveFromDA", mock.Anything, h). + Run(func(args mock.Arguments) { + fetchedHeights = append(fetchedHeights, h) + }). + Return(nil, datypes.ErrBlobNotFound).Once() + } - // Third call - should continue without delay (DA height incremented) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - cancel() - }). - Return(nil, datypes.ErrBlobNotFound).Once() + follower.runCatchup(ctx) - go syncer.processLoop(ctx) - syncer.startSyncWorkers(ctx) - <-ctx.Done() - syncer.wg.Wait() + assert.True(t, follower.HasReachedHead(), "should have reached DA head") + // Heights 3, 4, 5 processed; local now at 6 which > highest (5) → caught up + assert.Equal(t, []uint64{3, 4, 5}, fetchedHeights, "should have fetched heights sequentially") + }) +} - require.Len(t, callTimes, 3, "should make exactly 3 calls") +// TestDAFollower_InlineProcessing verifies the fast path: when the subscription +// delivers blobs at the current localDAHeight, handleSubscriptionEvent processes +// them inline via ProcessBlobs (not RetrieveFromDA). +func TestDAFollower_InlineProcessing(t *testing.T) { + t.Run("processes_blobs_inline_when_caught_up", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) - // First to second call should be delayed (backoff) - delay1to2 := callTimes[1].Sub(callTimes[0]) - assert.GreaterOrEqual(t, delay1to2, 500*time.Millisecond, - "should have backoff delay between first and second call (got %v)", delay1to2) + var pipedEvents []common.DAHeightEvent + pipeEvent := func(_ context.Context, ev common.DAHeightEvent) error { + pipedEvents = append(pipedEvents, ev) + return nil + } - // Second to third call should be immediate (no backoff after ErrBlobNotFound) - delay2to3 := callTimes[2].Sub(callTimes[1]) - assert.Less(t, delay2to3, 100*time.Millisecond, - "should continue immediately after ErrBlobNotFound (got %v)", delay2to3) + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 10, + DABlockTime: 500 * time.Millisecond, + }).(*daFollower) + + blobs := [][]byte{[]byte("header-blob"), []byte("data-blob")} + expectedEvents := []common.DAHeightEvent{ + {DaHeight: 10, Source: common.SourceDA}, + } + + // ProcessBlobs should be called (not RetrieveFromDA) + daRetriever.On("ProcessBlobs", mock.Anything, blobs, uint64(10)). + Return(expectedEvents).Once() + + // Simulate subscription event at the current localDAHeight + follower.handleSubscriptionEvent(t.Context(), datypes.SubscriptionEvent{ + Height: 10, + Blobs: blobs, + }) + + // Verify: ProcessBlobs was called, events were piped, height advanced + require.Len(t, pipedEvents, 1, "should pipe 1 event from inline processing") + assert.Equal(t, uint64(10), pipedEvents[0].DaHeight) + assert.Equal(t, uint64(11), follower.localDAHeight.Load(), "localDAHeight should advance past processed height") + assert.True(t, follower.HasReachedHead(), "should mark head as reached after inline processing") }) -} -func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer { - t.Helper() + t.Run("falls_through_to_catchup_when_behind", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } + + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 10, + DABlockTime: 500 * time.Millisecond, + }).(*daFollower) + + ctx := t.Context() + ctx, follower.cancel = context.WithCancel(ctx) + defer follower.cancel() + + // Subscription reports height 15 but local is at 10 — should NOT process inline + follower.handleSubscriptionEvent(ctx, datypes.SubscriptionEvent{ + Height: 15, + Blobs: [][]byte{[]byte("blob")}, + }) - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) + // ProcessBlobs should NOT have been called + daRetriever.AssertNotCalled(t, "ProcessBlobs", mock.Anything, mock.Anything, mock.Anything) + assert.Equal(t, uint64(10), follower.localDAHeight.Load(), "localDAHeight should not change") + assert.Equal(t, uint64(15), follower.highestSeenDAHeight.Load(), "highestSeen should be updated") + }) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) + t.Run("falls_through_when_no_blobs", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) - addr, _, _ := buildSyncTestSigner(t) + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } + + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 10, + DABlockTime: 500 * time.Millisecond, + }).(*daFollower) + + // Subscription at current height but no blobs — should fall through + follower.handleSubscriptionEvent(t.Context(), datypes.SubscriptionEvent{ + Height: 10, + Blobs: nil, + }) - cfg := config.DefaultConfig() - cfg.DA.BlockTime.Duration = daBlockTime + // ProcessBlobs should NOT have been called + daRetriever.AssertNotCalled(t, "ProcessBlobs", mock.Anything, mock.Anything, mock.Anything) + assert.Equal(t, uint64(10), follower.localDAHeight.Load(), "localDAHeight should not change") + assert.Equal(t, uint64(10), follower.highestSeenDAHeight.Load(), "highestSeen should be updated") + }) +} - gen := genesis.Genesis{ +// backoffTestGenesis creates a test genesis for the backoff tests. +func backoffTestGenesis(addr []byte) genesis.Genesis { + return genesis.Genesis{ ChainID: "test-chain", InitialHeight: 1, - StartTime: time.Now().Add(-time.Hour), // Start in past + StartTime: time.Now().Add(-time.Hour), ProposerAddress: addr, DAStartHeight: 100, } - - syncer := NewSyncer( - st, - execution.NewDummyExecutor(), - nil, - cm, - common.NopMetrics(), - cfg, - gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), - nil, - ) - - require.NoError(t, syncer.initializeState()) - return syncer } diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 6ca482c05b..a69da71507 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -43,11 +43,25 @@ func BenchmarkSyncerIO(b *testing.B) { fixt := newBenchFixture(b, spec.heights, spec.shuffledTx, spec.daDelay, spec.execDelay, true) // run both loops - go fixt.s.processLoop(fixt.s.ctx) - fixt.s.startSyncWorkers(fixt.s.ctx) + ctx := b.Context() + go fixt.s.processLoop(ctx) + + // Create a DAFollower to drive DA retrieval. + follower := NewDAFollower(DAFollowerConfig{ + Retriever: fixt.s.daRetriever, + Logger: zerolog.Nop(), + PipeEvent: fixt.s.pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: fixt.s.daRetrieverHeight.Load(), + DABlockTime: 0, + }).(*daFollower) + follower.highestSeenDAHeight.Store(spec.heights + daHeightOffset) + go follower.runCatchup(ctx) + + fixt.s.startSyncWorkers(ctx) require.Eventually(b, func() bool { - processedHeight, _ := fixt.s.store.Height(b.Context()) + processedHeight, _ := fixt.s.store.Height(ctx) return processedHeight == spec.heights }, 5*time.Second, 50*time.Microsecond) fixt.s.cancel() @@ -61,7 +75,7 @@ func BenchmarkSyncerIO(b *testing.B) { require.Len(b, fixt.s.heightInCh, 0) assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight) - gotStoreHeight, err := fixt.s.store.Height(b.Context()) + gotStoreHeight, err := fixt.s.store.Height(ctx) require.NoError(b, err) assert.Equal(b, spec.heights, gotStoreHeight) } diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index cb15c108e6..64ccea439d 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -417,12 +417,26 @@ func TestSyncLoopPersistState(t *testing.T) { Return(nil, datypes.ErrHeightFromFuture) go syncerInst1.processLoop(ctx) + + // Create and start a DAFollower so DA retrieval actually happens. + follower1 := NewDAFollower(DAFollowerConfig{ + Retriever: daRtrMock, + Logger: zerolog.Nop(), + PipeEvent: syncerInst1.pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: syncerInst1.daRetrieverHeight.Load(), + DABlockTime: cfg.DA.BlockTime.Duration, + }).(*daFollower) + ctx, follower1.cancel = context.WithCancel(ctx) + // Set highest so catchup runs through all mocked heights. + follower1.highestSeenDAHeight.Store(myFutureDAHeight) + go follower1.runCatchup(ctx) syncerInst1.startSyncWorkers(ctx) syncerInst1.wg.Wait() requireEmptyChan(t, errorCh) t.Log("sync workers on instance1 completed") - require.Equal(t, myFutureDAHeight, syncerInst1.daRetrieverHeight.Load()) + require.Equal(t, myFutureDAHeight, follower1.localDAHeight.Load()) // wait for all events consumed require.NoError(t, cm.SaveToStore()) @@ -480,6 +494,19 @@ func TestSyncLoopPersistState(t *testing.T) { // when it starts, it should fetch from the last height it stopped at t.Log("sync workers on instance2 started") + + // Create a follower for instance 2. + follower2 := NewDAFollower(DAFollowerConfig{ + Retriever: daRtrMock, + Logger: zerolog.Nop(), + PipeEvent: syncerInst2.pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: syncerInst2.daRetrieverHeight.Load(), + DABlockTime: cfg.DA.BlockTime.Duration, + }).(*daFollower) + ctx, follower2.cancel = context.WithCancel(ctx) + follower2.highestSeenDAHeight.Store(syncerInst2.daRetrieverHeight.Load() + 1) + go follower2.runCatchup(ctx) syncerInst2.startSyncWorkers(ctx) syncerInst2.wg.Wait() diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 33b1eba006..4a5c7577c6 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -107,14 +107,8 @@ func StartNode( }() } - blobClient, err := blobrpc.NewClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") - if err != nil { - return fmt.Errorf("failed to create blob client: %w", err) - } - defer blobClient.Close() - daClient := block.NewDAClient(blobClient, nodeConfig, logger) - - // create a new remote signer + // Validate and load signer first (before attempting DA connection, which may fail + // eagerly over WebSocket if no DA server is running). var signer signer.Signer if nodeConfig.Signer.SignerType == "file" && (nodeConfig.Node.Aggregator && !nodeConfig.Node.BasedSequencer) { // Get passphrase file path @@ -152,6 +146,13 @@ func StartNode( return fmt.Errorf("unknown signer type: %s", nodeConfig.Signer.SignerType) } + blobClient, err := blobrpc.NewWSClient(ctx, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "") + if err != nil { + return fmt.Errorf("failed to create blob client: %w", err) + } + defer blobClient.Close() + daClient := block.NewDAClient(blobClient, nodeConfig, logger) + // sanity check for based sequencer if nodeConfig.Node.BasedSequencer && genesis.DAStartHeight == 0 { return fmt.Errorf("based sequencing requires DAStartHeight to be set in genesis. This value should be identical for all nodes of the chain") diff --git a/pkg/da/jsonrpc/client.go b/pkg/da/jsonrpc/client.go index f1a31a9738..c856cdd7b9 100644 --- a/pkg/da/jsonrpc/client.go +++ b/pkg/da/jsonrpc/client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "strings" libshare "github.com/celestiaorg/go-square/v3/share" "github.com/filecoin-project/go-jsonrpc" @@ -23,7 +24,17 @@ func (c *Client) Close() { } } -// NewClient connects to the celestia-node RPC endpoint +// httpToWS converts an HTTP(S) URL to a WebSocket URL. +func httpToWS(addr string) string { + addr = strings.Replace(addr, "https://", "wss://", 1) + addr = strings.Replace(addr, "http://", "ws://", 1) + return addr +} + +// NewClient connects to the DA RPC endpoint using the address as-is. +// Uses HTTP by default (lazy connection — only connects on first RPC call). +// Does NOT support channel-based subscriptions (e.g. Subscribe). +// For subscription support, use NewWSClient instead. func NewClient(ctx context.Context, addr, token string, authHeaderName string) (*Client, error) { var httpHeader http.Header if token != "" { @@ -57,6 +68,15 @@ func NewClient(ctx context.Context, addr, token string, authHeaderName string) ( return &cl, nil } +// NewWSClient connects to the DA RPC endpoint over WebSocket. +// Automatically converts http:// to ws:// (and https:// to wss://). +// Supports channel-based subscriptions (e.g. Subscribe). +// Note: WebSocket connections are eager — they connect at creation time +// and will fail immediately if the server is unavailable. +func NewWSClient(ctx context.Context, addr, token string, authHeaderName string) (*Client, error) { + return NewClient(ctx, httpToWS(addr), token, authHeaderName) +} + // BlobAPI mirrors celestia-node's blob module (nodebuilder/blob/blob.go). // jsonrpc.NewClient wires Internal.* to RPC stubs. type BlobAPI struct { diff --git a/pkg/da/types/types.go b/pkg/da/types/types.go index b2f2e7bc30..8c11ce5cb5 100644 --- a/pkg/da/types/types.go +++ b/pkg/da/types/types.go @@ -82,3 +82,13 @@ func SplitID(id []byte) (uint64, []byte, error) { commitment := id[8:] return binary.LittleEndian.Uint64(id[:8]), commitment, nil } + +// SubscriptionEvent is a namespace-agnostic signal that a blob was finalized at +// Height on the DA layer. Produced by Subscribe and consumed by DA followers. +type SubscriptionEvent struct { + // Height is the DA layer height at which the blob was finalized. + Height uint64 + // Blobs contains the raw blob data from the subscription response. + // When non-nil, followers can process blobs inline without re-fetching from DA. + Blobs [][]byte +} diff --git a/test/e2e/evm_force_inclusion_e2e_test.go b/test/e2e/evm_force_inclusion_e2e_test.go index f201f7f363..c6c25270c5 100644 --- a/test/e2e/evm_force_inclusion_e2e_test.go +++ b/test/e2e/evm_force_inclusion_e2e_test.go @@ -238,8 +238,10 @@ func TestEvmFullNodeForceInclusionE2E(t *testing.T) { // --- End Sequencer Setup --- // --- Start Full Node Setup --- + // Get sequencer's full P2P address (including peer ID) for the full node to connect to + sequencerP2PAddress := getNodeP2PAddress(t, sut, sequencerHome, env.Endpoints.RollkitRPCPort) // Reuse setupFullNode helper which handles genesis copying and node startup - setupFullNode(t, sut, fullNodeHome, sequencerHome, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints) + setupFullNode(t, sut, fullNodeHome, sequencerHome, env.FullNodeJWT, env.GenesisHash, sequencerP2PAddress, env.Endpoints) t.Log("Full node is up") // --- End Full Node Setup --- diff --git a/test/e2e/evm_test_common.go b/test/e2e/evm_test_common.go index 7c089e762b..c5bfb05c23 100644 --- a/test/e2e/evm_test_common.go +++ b/test/e2e/evm_test_common.go @@ -449,15 +449,15 @@ func setupFullNode(t testing.TB, sut *SystemUnderTest, fullNodeHome, sequencerHo "--home", fullNodeHome, "--evm.jwt-secret-file", fullNodeJwtSecretFile, "--evm.genesis-hash", genesisHash, - "--rollkit.p2p.peers", sequencerP2PAddress, + "--evnode.p2p.peers", sequencerP2PAddress, "--evm.engine-url", endpoints.GetFullNodeEngineURL(), "--evm.eth-url", endpoints.GetFullNodeEthURL(), - "--rollkit.da.block_time", DefaultDABlockTime, - "--rollkit.da.address", endpoints.GetDAAddress(), - "--rollkit.da.namespace", DefaultDANamespace, - "--rollkit.da.batching_strategy", "immediate", - "--rollkit.rpc.address", endpoints.GetFullNodeRPCListen(), - "--rollkit.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.batching_strategy", "immediate", + "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), } sut.ExecCmd(evmSingleBinaryPath, args...) // Use AwaitNodeLive instead of AwaitNodeUp because in lazy mode scenarios, @@ -932,4 +932,3 @@ func PrintTraceReport(t testing.TB, label string, spans []TraceSpan) { t.Logf("%-40s %5.1f%% %s", name, pct, bar) } } - diff --git a/test/mocks/da.go b/test/mocks/da.go index f5293d907a..27ce9badf3 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -492,6 +492,74 @@ func (_c *MockClient_Submit_Call) RunAndReturn(run func(ctx context.Context, dat return _c } +// Subscribe provides a mock function for the type MockClient +func (_mock *MockClient) Subscribe(ctx context.Context, namespace []byte) (<-chan da.SubscriptionEvent, error) { + ret := _mock.Called(ctx, namespace) + + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + + var r0 <-chan da.SubscriptionEvent + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) (<-chan da.SubscriptionEvent, error)); ok { + return returnFunc(ctx, namespace) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) <-chan da.SubscriptionEvent); ok { + r0 = returnFunc(ctx, namespace) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan da.SubscriptionEvent) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []byte) error); ok { + r1 = returnFunc(ctx, namespace) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe' +type MockClient_Subscribe_Call struct { + *mock.Call +} + +// Subscribe is a helper method to define mock.On call +// - ctx context.Context +// - namespace []byte +func (_e *MockClient_Expecter) Subscribe(ctx interface{}, namespace interface{}) *MockClient_Subscribe_Call { + return &MockClient_Subscribe_Call{Call: _e.mock.On("Subscribe", ctx, namespace)} +} + +func (_c *MockClient_Subscribe_Call) Run(run func(ctx context.Context, namespace []byte)) *MockClient_Subscribe_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockClient_Subscribe_Call) Return(subscriptionEventCh <-chan da.SubscriptionEvent, err error) *MockClient_Subscribe_Call { + _c.Call.Return(subscriptionEventCh, err) + return _c +} + +func (_c *MockClient_Subscribe_Call) RunAndReturn(run func(ctx context.Context, namespace []byte) (<-chan da.SubscriptionEvent, error)) *MockClient_Subscribe_Call { + _c.Call.Return(run) + return _c +} + // NewMockVerifier creates a new instance of MockVerifier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockVerifier(t interface { diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 648021b76a..7f29140b28 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -28,6 +28,12 @@ func (h *Header) Time() time.Time { return h.Timestamp } +// subscriber holds a channel and the context for a single Subscribe caller. +type subscriber struct { + ch chan datypes.SubscriptionEvent + ctx context.Context +} + // DummyDA is a test implementation of the DA client interface. // It supports blob storage, height simulation, failure injection, and header retrieval. type DummyDA struct { @@ -38,10 +44,52 @@ type DummyDA struct { headers map[uint64]*Header // height -> header (with timestamp) failSubmit atomic.Bool + // subscribers tracks active Subscribe callers. + subscribers []*subscriber + tickerMu sync.Mutex tickerStop chan struct{} } +// Subscribe returns a channel that emits a SubscriptionEvent for every new DA +// height produced by Submit or StartHeightTicker. The channel is closed when +// ctx is cancelled or Reset is called. +func (d *DummyDA) Subscribe(ctx context.Context, _ []byte) (<-chan datypes.SubscriptionEvent, error) { + ch := make(chan datypes.SubscriptionEvent, 64) + sub := &subscriber{ch: ch, ctx: ctx} + + d.mu.Lock() + d.subscribers = append(d.subscribers, sub) + d.mu.Unlock() + + // Remove subscriber and close channel when ctx is cancelled. + go func() { + <-ctx.Done() + d.mu.Lock() + defer d.mu.Unlock() + for i, s := range d.subscribers { + if s == sub { + d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...) + break + } + } + close(ch) + }() + + return ch, nil +} + +// notifySubscribers sends an event to all active subscribers. Must be called +// with d.mu held. +func (d *DummyDA) notifySubscribers(ev datypes.SubscriptionEvent) { + for _, sub := range d.subscribers { + select { + case sub.ch <- ev: + case <-sub.ctx.Done(): + } + } +} + // Option configures a DummyDA instance. type Option func(*DummyDA) @@ -111,6 +159,10 @@ func (d *DummyDA) Submit(_ context.Context, data [][]byte, _ float64, namespace Timestamp: now, } } + d.notifySubscribers(datypes.SubscriptionEvent{ + Height: height, + Blobs: data, + }) d.mu.Unlock() return datypes.ResultSubmit{ @@ -253,6 +305,9 @@ func (d *DummyDA) StartHeightTicker(interval time.Duration) func() { Timestamp: now, } } + d.notifySubscribers(datypes.SubscriptionEvent{ + Height: height, + }) d.mu.Unlock() case <-stopCh: return @@ -277,6 +332,11 @@ func (d *DummyDA) Reset() { d.blobs = make(map[uint64]map[string][][]byte) d.headers = make(map[uint64]*Header) d.failSubmit.Store(false) + // Close all subscriber channels. + for _, sub := range d.subscribers { + close(sub.ch) + } + d.subscribers = nil d.mu.Unlock() d.tickerMu.Lock() diff --git a/tools/local-da/local.go b/tools/local-da/local.go index ef519f17ce..07d6876f9d 100644 --- a/tools/local-da/local.go +++ b/tools/local-da/local.go @@ -13,6 +13,7 @@ import ( "sync" "time" + libshare "github.com/celestiaorg/go-square/v3/share" "github.com/rs/zerolog" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" @@ -27,6 +28,18 @@ const ( DefaultBlockTime = 1 * time.Second ) +// subscriber holds a registered subscription's channel and namespace filter. +type subscriber struct { + ch chan subscriptionEvent + ns libshare.Namespace +} + +// subscriptionEvent is sent to subscribers when a new DA block is produced. +type subscriptionEvent struct { + height uint64 + blobs []*blobrpc.Blob +} + // LocalDA is a simple implementation of in-memory DA. Not production ready! Intended only for testing! // // Data is stored in a map, where key is a serialized sequence number. This key is returned as ID. @@ -43,6 +56,10 @@ type LocalDA struct { blockTime time.Duration lastTime time.Time // tracks last timestamp to ensure monotonicity + // Subscriber registry (protected by mu) + subscribers map[int]*subscriber + nextSubID int + logger zerolog.Logger } @@ -57,6 +74,7 @@ func NewLocalDA(logger zerolog.Logger, opts ...func(*LocalDA) *LocalDA) *LocalDA data: make(map[uint64][]kvp), timestamps: make(map[uint64]time.Time), blobData: make(map[uint64][]*blobrpc.Blob), + subscribers: make(map[int]*subscriber), maxBlobSize: DefaultMaxBlobSize, blockTime: DefaultBlockTime, lastTime: time.Now(), @@ -209,6 +227,7 @@ func (d *LocalDA) SubmitWithOptions(ctx context.Context, blobs []datypes.Blob, g d.data[d.height] = append(d.data[d.height], kvp{ids[i], blob}) } + d.notifySubscribers(d.height) d.logger.Info().Uint64("newHeight", d.height).Int("count", len(ids)).Msg("SubmitWithOptions successful") return ids, nil } @@ -239,6 +258,7 @@ func (d *LocalDA) Submit(ctx context.Context, blobs []datypes.Blob, gasPrice flo d.data[d.height] = append(d.data[d.height], kvp{ids[i], blob}) } + d.notifySubscribers(d.height) d.logger.Info().Uint64("newHeight", d.height).Int("count", len(ids)).Msg("Submit successful") return ids, nil } @@ -335,5 +355,68 @@ func (d *LocalDA) produceEmptyBlock() { defer d.mu.Unlock() d.height++ d.timestamps[d.height] = d.monotonicTime() + d.notifySubscribers(d.height) d.logger.Debug().Uint64("height", d.height).Msg("produced empty block") } + +// subscribe registers a new subscriber for blobs matching the given namespace. +// Returns a read-only channel and a subscription ID for later unsubscription. +// Must NOT be called with d.mu held. +func (d *LocalDA) subscribe(ns libshare.Namespace) (<-chan subscriptionEvent, int) { + d.mu.Lock() + defer d.mu.Unlock() + + id := d.nextSubID + d.nextSubID++ + ch := make(chan subscriptionEvent, 64) + d.subscribers[id] = &subscriber{ch: ch, ns: ns} + d.logger.Info().Int("subID", id).Str("namespace", hex.EncodeToString(ns.Bytes())).Msg("subscriber registered") + return ch, id +} + +// unsubscribe removes a subscriber and closes its channel. +// Must NOT be called with d.mu held. +func (d *LocalDA) unsubscribe(id int) { + d.mu.Lock() + defer d.mu.Unlock() + + if sub, ok := d.subscribers[id]; ok { + close(sub.ch) + delete(d.subscribers, id) + d.logger.Info().Int("subID", id).Msg("subscriber unregistered") + } +} + +// notifySubscribers sends a subscriptionEvent to all registered subscribers. +// For each subscriber, only blobs matching the subscriber's namespace are included. +// Slow consumers (full channel) are dropped to avoid blocking block production. +// MUST be called with d.mu held. +func (d *LocalDA) notifySubscribers(height uint64) { + if len(d.subscribers) == 0 { + return + } + + allBlobs := d.blobData[height] // may be nil for empty blocks + + for id, sub := range d.subscribers { + // Filter blobs matching subscriber namespace + var matched []*blobrpc.Blob + for _, b := range allBlobs { + if b != nil && b.Namespace().Equals(sub.ns) { + matched = append(matched, b) + } + } + + evt := subscriptionEvent{ + height: height, + blobs: matched, + } + + select { + case sub.ch <- evt: + default: + // Slow consumer — drop to avoid blocking block production + d.logger.Warn().Int("subID", id).Uint64("height", height).Msg("dropping event for slow subscriber") + } + } +} diff --git a/tools/local-da/rpc.go b/tools/local-da/rpc.go index c8a3c97bee..024910c6bd 100644 --- a/tools/local-da/rpc.go +++ b/tools/local-da/rpc.go @@ -31,6 +31,7 @@ func (s *blobServer) Submit(_ context.Context, blobs []*jsonrpc.Blob, _ *jsonrpc if len(blobs) == 0 { s.da.timestamps[height] = time.Now() + s.da.notifySubscribers(height) return height, nil } @@ -48,6 +49,7 @@ func (s *blobServer) Submit(_ context.Context, blobs []*jsonrpc.Blob, _ *jsonrpc s.da.blobData[height] = append(s.da.blobData[height], b) } s.da.timestamps[height] = time.Now() + s.da.notifySubscribers(height) return height, nil } @@ -127,11 +129,39 @@ func (s *blobServer) GetCommitmentProof(_ context.Context, _ uint64, _ libshare. return &jsonrpc.CommitmentProof{}, nil } -// Subscribe returns a closed channel; LocalDA does not push live updates. -func (s *blobServer) Subscribe(_ context.Context, _ libshare.Namespace) (<-chan *jsonrpc.SubscriptionResponse, error) { - ch := make(chan *jsonrpc.SubscriptionResponse) - close(ch) - return ch, nil +// Subscribe streams blobs as they are included for the given namespace. +// The returned channel emits a SubscriptionResponse for every new DA block. +// The channel is closed when ctx is cancelled. +func (s *blobServer) Subscribe(ctx context.Context, namespace libshare.Namespace) (<-chan *jsonrpc.SubscriptionResponse, error) { + eventCh, subID := s.da.subscribe(namespace) + + out := make(chan *jsonrpc.SubscriptionResponse, 64) + go func() { + defer close(out) + defer s.da.unsubscribe(subID) + + for { + select { + case <-ctx.Done(): + return + case evt, ok := <-eventCh: + if !ok { + return + } + resp := &jsonrpc.SubscriptionResponse{ + Height: evt.height, + Blobs: evt.blobs, + } + select { + case out <- resp: + case <-ctx.Done(): + return + } + } + } + }() + + return out, nil } // startBlobServer starts an HTTP JSON-RPC server on addr serving the blob namespace.