From 24139bc2f627114df5df44037566e17a98960bf4 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 3 Mar 2026 10:15:31 +0100 Subject: [PATCH 1/6] feat: Replace the Syncer's polling DA worker with an event-driven DAFollower and introduce DA client subscription. --- apps/evm/server/force_inclusion_test.go | 7 + block/internal/da/client.go | 42 +++ block/internal/da/interface.go | 5 + block/internal/da/tracing.go | 3 + block/internal/da/tracing_test.go | 8 + block/internal/syncing/da_follower.go | 317 ++++++++++++++++++ block/internal/syncing/syncer.go | 134 ++------ block/internal/syncing/syncer_backoff_test.go | 233 ++++--------- .../internal/syncing/syncer_benchmark_test.go | 15 +- block/internal/syncing/syncer_test.go | 29 +- pkg/da/types/types.go | 7 + test/mocks/da.go | 68 ++++ test/testda/dummy.go | 5 + 13 files changed, 608 insertions(+), 265 deletions(-) create mode 100644 block/internal/syncing/da_follower.go diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index d04a356622..6be6e6ca76 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -50,6 +50,13 @@ func (m *mockDA) Get(ctx context.Context, ids []da.ID, namespace []byte) ([]da.B return nil, nil } +func (m *mockDA) Subscribe(_ context.Context, _ []byte) (<-chan da.SubscriptionEvent, error) { + // Not needed in these tests; return a closed channel. + ch := make(chan da.SubscriptionEvent) + close(ch) + return ch, nil +} + func (m *mockDA) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error) { return nil, nil } diff --git a/block/internal/da/client.go b/block/internal/da/client.go index a92a9eef28..35f604acab 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -350,6 +350,48 @@ func (c *client) HasForcedInclusionNamespace() bool { return c.hasForcedNamespace } +// Subscribe subscribes to blobs in the given namespace via the celestia-node +// Subscribe API. It returns a channel that emits a SubscriptionEvent for every +// DA block containing a matching blob. The channel is closed when ctx is +// cancelled. The caller must drain the channel after cancellation to avoid +// goroutine leaks. +func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { + ns, err := share.NewNamespaceFromBytes(namespace) + if err != nil { + return nil, fmt.Errorf("invalid namespace: %w", err) + } + + rawCh, err := c.blobAPI.Subscribe(ctx, ns) + if err != nil { + return nil, fmt.Errorf("blob subscribe: %w", err) + } + + out := make(chan datypes.SubscriptionEvent, 16) + go func() { + defer close(out) + for { + select { + case <-ctx.Done(): + return + case resp, ok := <-rawCh: + if !ok { + return + } + if resp == nil { + continue + } + select { + case out <- datypes.SubscriptionEvent{Height: resp.Height}: + case <-ctx.Done(): + return + } + } + } + }() + + return out, nil +} + // Get fetches blobs by their IDs. Used for visualization and fetching specific blobs. func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { if len(ids) == 0 { diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index dd7a15d8f9..3cc0677580 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -17,6 +17,11 @@ type Client interface { // Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs. Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) + // Subscribe returns a channel that emits one SubscriptionEvent per DA block + // that contains a blob in the given namespace. The channel is closed when ctx + // is cancelled. Callers MUST drain the channel after cancellation. + Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) + // GetLatestDAHeight returns the latest height available on the DA layer. GetLatestDAHeight(ctx context.Context) (uint64, error) diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index 4d946a8b74..161293c2c3 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -145,6 +145,9 @@ func (t *tracedClient) GetForcedInclusionNamespace() []byte { func (t *tracedClient) HasForcedInclusionNamespace() bool { return t.inner.HasForcedInclusionNamespace() } +func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { + return t.inner.Subscribe(ctx, namespace) +} type submitError struct{ msg string } diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index de32532a31..e20bf3b84b 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -22,6 +22,14 @@ type mockFullClient struct { getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) + subscribeFn func(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) +} + +func (m *mockFullClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { + if m.subscribeFn == nil { + panic("not expected to be called") + } + return m.subscribeFn(ctx, namespace) } func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit { diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go new file mode 100644 index 0000000000..27ec33b6fc --- /dev/null +++ b/block/internal/syncing/da_follower.go @@ -0,0 +1,317 @@ +package syncing + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "time" + + "github.com/rs/zerolog" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// DAFollower subscribes to DA blob events and drives sequential catchup. +// +// Architecture: +// - followLoop listens on the subscription channel and atomically updates +// highestSeenDAHeight. +// - catchupLoop sequentially retrieves from localDAHeight → highestSeenDAHeight, +// piping events to the Syncer's heightInCh. +// +// The two goroutines share only atomic state; no mutexes needed. +type DAFollower interface { + // Start begins the follow and catchup loops. + Start(ctx context.Context) error + // Stop cancels the context and waits for goroutines. + Stop() + // HasReachedHead returns true once the catchup loop has processed the + // DA head at least once. Once true, it stays true. + HasReachedHead() bool +} + +// daFollower is the concrete implementation of DAFollower. +type daFollower struct { + client da.Client + retriever DARetriever + logger zerolog.Logger + + // pipeEvent sends a DA height event to the syncer's processLoop. + pipeEvent func(ctx context.Context, event common.DAHeightEvent) error + + // Namespace to subscribe on (header namespace). + namespace []byte + + // localDAHeight is only written by catchupLoop and read by followLoop + // to determine whether a catchup is needed. + localDAHeight atomic.Uint64 + + // highestSeenDAHeight is written by followLoop and read by catchupLoop. + highestSeenDAHeight atomic.Uint64 + + // headReached tracks whether the follower has caught up to DA head. + headReached atomic.Bool + + // catchupSignal is sent by followLoop to wake catchupLoop when a new + // height is seen that is above localDAHeight. + catchupSignal chan struct{} + + // daBlockTime is used as a backoff before retrying after errors. + daBlockTime time.Duration + + // lifecycle + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// DAFollowerConfig holds configuration for creating a DAFollower. +type DAFollowerConfig struct { + Client da.Client + Retriever DARetriever + Logger zerolog.Logger + PipeEvent func(ctx context.Context, event common.DAHeightEvent) error + Namespace []byte + StartDAHeight uint64 + DABlockTime time.Duration +} + +// NewDAFollower creates a new daFollower. +func NewDAFollower(cfg DAFollowerConfig) DAFollower { + f := &daFollower{ + client: cfg.Client, + retriever: cfg.Retriever, + logger: cfg.Logger.With().Str("component", "da_follower").Logger(), + pipeEvent: cfg.PipeEvent, + namespace: cfg.Namespace, + catchupSignal: make(chan struct{}, 1), + daBlockTime: cfg.DABlockTime, + } + f.localDAHeight.Store(cfg.StartDAHeight) + return f +} + +// Start begins the follow and catchup goroutines. +func (f *daFollower) Start(ctx context.Context) error { + f.ctx, f.cancel = context.WithCancel(ctx) + + f.wg.Add(2) + go f.followLoop() + go f.catchupLoop() + + f.logger.Info(). + Uint64("start_da_height", f.localDAHeight.Load()). + Msg("DA follower started") + return nil +} + +// Stop cancels and waits. +func (f *daFollower) Stop() { + if f.cancel != nil { + f.cancel() + } + f.wg.Wait() +} + +// HasReachedHead returns whether the DA head has been reached at least once. +func (f *daFollower) HasReachedHead() bool { + return f.headReached.Load() +} + +// signalCatchup sends a non-blocking signal to wake catchupLoop. +func (f *daFollower) signalCatchup() { + select { + case f.catchupSignal <- struct{}{}: + default: + // Already signaled, catchupLoop will pick up the new highestSeen. + } +} + +// followLoop subscribes to DA blob events and keeps highestSeenDAHeight up to date. +// When a new height appears above localDAHeight, it wakes the catchup loop. +func (f *daFollower) followLoop() { + defer f.wg.Done() + + f.logger.Info().Msg("starting follow loop") + defer f.logger.Info().Msg("follow loop stopped") + + for { + if err := f.runSubscription(); err != nil { + if f.ctx.Err() != nil { + return + } + f.logger.Warn().Err(err).Msg("DA subscription failed, reconnecting") + select { + case <-f.ctx.Done(): + return + case <-time.After(f.backoff()): + } + } + } +} + +// runSubscription opens a single subscription and processes events until the +// channel is closed or an error occurs. +func (f *daFollower) runSubscription() error { + ch, err := f.client.Subscribe(f.ctx, f.namespace) + if err != nil { + return err + } + + for { + select { + case <-f.ctx.Done(): + return f.ctx.Err() + case ev, ok := <-ch: + if !ok { + return errors.New("subscription channel closed") + } + f.updateHighest(ev.Height) + } + } +} + +// updateHighest atomically bumps highestSeenDAHeight and signals catchup if needed. +func (f *daFollower) updateHighest(height uint64) { + for { + cur := f.highestSeenDAHeight.Load() + if height <= cur { + return + } + if f.highestSeenDAHeight.CompareAndSwap(cur, height) { + f.logger.Debug(). + Uint64("da_height", height). + Msg("new highest DA height seen from subscription") + f.signalCatchup() + return + } + } +} + +// catchupLoop waits for signals and sequentially retrieves DA heights +// from localDAHeight up to highestSeenDAHeight. +func (f *daFollower) catchupLoop() { + defer f.wg.Done() + + f.logger.Info().Msg("starting catchup loop") + defer f.logger.Info().Msg("catchup loop stopped") + + for { + select { + case <-f.ctx.Done(): + return + case <-f.catchupSignal: + f.runCatchup() + } + } +} + +// runCatchup sequentially retrieves from localDAHeight to highestSeenDAHeight. +// It handles priority heights first, then sequential heights. +func (f *daFollower) runCatchup() { + for { + if f.ctx.Err() != nil { + return + } + + // Check for priority heights from P2P hints first. + if priorityHeight := f.retriever.PopPriorityHeight(); priorityHeight > 0 { + currentHeight := f.localDAHeight.Load() + if priorityHeight < currentHeight { + continue + } + f.logger.Debug(). + Uint64("da_height", priorityHeight). + Msg("fetching priority DA height from P2P hint") + if err := f.fetchAndPipeHeight(priorityHeight); err != nil { + if !f.waitOnCatchupError(err, priorityHeight) { + return + } + } + continue + } + + // Sequential catchup. + local := f.localDAHeight.Load() + highest := f.highestSeenDAHeight.Load() + + if local > highest { + // Caught up. + f.headReached.Store(true) + return + } + + if err := f.fetchAndPipeHeight(local); err != nil { + if !f.waitOnCatchupError(err, local) { + return + } + // Retry the same height after backoff. + continue + } + + // Advance local height. + f.localDAHeight.Store(local + 1) + } +} + +// fetchAndPipeHeight retrieves events at a single DA height and pipes them +// to the syncer. +func (f *daFollower) fetchAndPipeHeight(daHeight uint64) error { + events, err := f.retriever.RetrieveFromDA(f.ctx, daHeight) + if err != nil { + switch { + case errors.Is(err, datypes.ErrBlobNotFound): + // No blobs at this height — not an error, just skip. + return nil + case errors.Is(err, datypes.ErrHeightFromFuture): + // DA hasn't produced this height yet — mark head reached + // but return the error to trigger a backoff retry. + f.headReached.Store(true) + return err + default: + return err + } + } + + for _, event := range events { + if err := f.pipeEvent(f.ctx, event); err != nil { + return err + } + } + + return nil +} + +// errCaughtUp is a sentinel used to signal that the catchup loop has reached DA head. +var errCaughtUp = errors.New("caught up with DA head") + +// waitOnCatchupError logs the error and backs off before retrying. +// It returns true if the caller should continue (retry), or false if the +// catchup loop should exit (context cancelled or caught-up sentinel). +func (f *daFollower) waitOnCatchupError(err error, daHeight uint64) bool { + if errors.Is(err, errCaughtUp) { + f.logger.Debug().Uint64("da_height", daHeight).Msg("DA catchup reached head, waiting for subscription signal") + return false + } + if f.ctx.Err() != nil { + return false + } + f.logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("catchup error, backing off") + select { + case <-f.ctx.Done(): + return false + case <-time.After(f.backoff()): + return true + } +} + +// backoff returns the configured DA block time or a sane default. +func (f *daFollower) backoff() time.Duration { + if f.daBlockTime > 0 { + return f.daBlockTime + } + return 2 * time.Second +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index ce96c23ef2..154cd927ec 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -21,7 +21,6 @@ import ( "github.com/evstack/ev-node/block/internal/da" coreexecutor "github.com/evstack/ev-node/core/execution" "github.com/evstack/ev-node/pkg/config" - datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/store" @@ -80,6 +79,9 @@ type Syncer struct { p2pHandler p2pHandler raftRetriever *raftRetriever + // DA follower (replaces the old polling daWorkerLoop) + daFollower DAFollower + // Forced inclusion tracking forcedInclusionMu sync.RWMutex seenBlockTxs map[string]struct{} // SHA-256 hex of every tx seen in a DA-sourced block @@ -96,9 +98,6 @@ type Syncer struct { // P2P wait coordination p2pWaitState atomic.Value // stores p2pWaitState - // DA head-reached signal for recovery mode (stays true once DA head is seen) - daHeadReached atomic.Bool - // blockSyncer is the interface used for block sync operations. // defaults to self, but can be wrapped with tracing. blockSyncer BlockSyncer @@ -197,7 +196,20 @@ func (s *Syncer) Start(ctx context.Context) error { // Start main processing loop s.wg.Go(s.processLoop) - // Start dedicated workers for DA, and pending processing + // Start the DA follower (subscribe + catchup) and other workers + s.daFollower = NewDAFollower(DAFollowerConfig{ + Client: s.daClient, + Retriever: s.daRetriever, + Logger: s.logger, + PipeEvent: s.pipeEvent, + Namespace: s.daClient.GetHeaderNamespace(), + StartDAHeight: s.daRetrieverHeight.Load(), + DABlockTime: s.config.DA.BlockTime.Duration, + }) + if err := s.daFollower.Start(s.ctx); err != nil { + return fmt.Errorf("failed to start DA follower: %w", err) + } + s.startSyncWorkers() s.logger.Info().Msg("syncer started") @@ -212,6 +224,12 @@ func (s *Syncer) Stop() error { s.cancel() s.cancelP2PWait(0) + + // Stop the DA follower first (it owns its own goroutines). + if s.daFollower != nil { + s.daFollower.Stop() + } + s.wg.Wait() // Skip draining if we're shutting down due to a critical error (e.g. execution @@ -359,113 +377,23 @@ func (s *Syncer) processLoop() { } func (s *Syncer) startSyncWorkers() { - s.wg.Add(3) - go s.daWorkerLoop() + // DA follower is already started in Start(). + s.wg.Add(2) go s.pendingWorkerLoop() go s.p2pWorkerLoop() } -func (s *Syncer) daWorkerLoop() { - defer s.wg.Done() - - s.logger.Info().Msg("starting DA worker") - defer s.logger.Info().Msg("DA worker stopped") - - for { - err := s.fetchDAUntilCaughtUp() - - var backoff time.Duration - if err == nil { - // No error, means we are caught up. - s.daHeadReached.Store(true) - backoff = s.config.DA.BlockTime.Duration - } else { - // Error, back off for a shorter duration. - backoff = s.config.DA.BlockTime.Duration - if backoff <= 0 { - backoff = 2 * time.Second - } - } - - select { - case <-s.ctx.Done(): - return - case <-time.After(backoff): - } - } -} - -// HasReachedDAHead returns true once the DA worker has reached the DA head. +// HasReachedDAHead returns true once the DA follower has caught up to the DA head. // Once set, it stays true. func (s *Syncer) HasReachedDAHead() bool { - return s.daHeadReached.Load() -} - -func (s *Syncer) fetchDAUntilCaughtUp() error { - for { - select { - case <-s.ctx.Done(): - return s.ctx.Err() - default: - } - - // Check for priority heights from P2P hints first - var daHeight uint64 - if priorityHeight := s.daRetriever.PopPriorityHeight(); priorityHeight > 0 { - // Skip if we've already fetched past this height - currentHeight := s.daRetrieverHeight.Load() - if priorityHeight < currentHeight { - continue - } - daHeight = priorityHeight - s.logger.Debug().Uint64("da_height", daHeight).Msg("fetching priority DA height from P2P hint") - } else { - daHeight = max(s.daRetrieverHeight.Load(), s.cache.DaHeight()) - } - - events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight) - if err != nil { - switch { - case errors.Is(err, datypes.ErrBlobNotFound): - s.daRetrieverHeight.Store(daHeight + 1) - continue // Fetch next height immediately - case errors.Is(err, datypes.ErrHeightFromFuture): - s.logger.Debug().Err(err).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests") - return nil // Caught up - default: - s.logger.Error().Err(err).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests") - return err // Other errors - } - } - - if len(events) == 0 { - // This can happen if RetrieveFromDA returns no events and no error. - s.logger.Debug().Uint64("da_height", daHeight).Msg("no events returned from DA, but no error either.") - } - - // Process DA events - for _, event := range events { - if err := s.pipeEvent(s.ctx, event); err != nil { - return err - } - } - - // Update DA retrieval height on successful retrieval - // For priority fetches, only update if the priority height is ahead of current - // For sequential fetches, always increment - newHeight := daHeight + 1 - for { - current := s.daRetrieverHeight.Load() - if newHeight <= current { - break // Already at or past this height - } - if s.daRetrieverHeight.CompareAndSwap(current, newHeight) { - break - } - } + if s.daFollower != nil { + return s.daFollower.HasReachedHead() } + return false } +// fetchDAUntilCaughtUp was removed — the DAFollower handles this concern. + func (s *Syncer) pendingWorkerLoop() { defer s.wg.Done() diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index d6c6689f29..10564b4b28 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -7,27 +7,20 @@ import ( "testing/synctest" "time" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" - "github.com/evstack/ev-node/core/execution" - "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/store" - extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) -// TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff -// behavior when encountering different types of DA layer errors. -func TestSyncer_BackoffOnDAError(t *testing.T) { +// TestDAFollower_BackoffOnCatchupError verifies that the DAFollower implements +// proper backoff behavior when encountering different types of DA layer errors. +func TestDAFollower_BackoffOnCatchupError(t *testing.T) { tests := map[string]struct { daBlockTime time.Duration error error @@ -63,30 +56,25 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { synctest.Test(t, func(t *testing.T) { - ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 15*time.Second) defer cancel() - // Setup syncer - syncer := setupTestSyncer(t, tc.daBlockTime) - syncer.ctx = ctx - - // Setup mocks daRetriever := NewMockDARetriever(t) - p2pHandler := newMockp2pHandler(t) - p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - syncer.daRetriever = daRetriever - syncer.p2pHandler = p2pHandler - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() - - // Mock PopPriorityHeight to always return 0 (no priority heights) daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() - // Create mock stores for P2P - mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) - mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } + + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 100, + DABlockTime: tc.daBlockTime, + }).(*daFollower) - mockDataStore := extmocks.NewMockStore[*types.Data](t) - mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + follower.ctx, follower.cancel = context.WithCancel(ctx) + follower.highestSeenDAHeight.Store(102) var callTimes []time.Time callCount := 0 @@ -100,17 +88,14 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { Return(nil, tc.error).Once() if tc.expectsBackoff { - // Second call should be delayed due to backoff daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). Run(func(args mock.Arguments) { callTimes = append(callTimes, time.Now()) callCount++ - // Cancel to end test cancel() }). Return(nil, datypes.ErrBlobNotFound).Once() } else { - // For ErrBlobNotFound, DA height should increment daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). Run(func(args mock.Arguments) { callTimes = append(callTimes, time.Now()) @@ -120,12 +105,9 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { Return(nil, datypes.ErrBlobNotFound).Once() } - // Run sync loop - syncer.startSyncWorkers() + go follower.runCatchup() <-ctx.Done() - syncer.wg.Wait() - // Verify behavior if tc.expectsBackoff { require.Len(t, callTimes, 2, "should make exactly 2 calls with backoff") @@ -151,35 +133,32 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { } } -// TestSyncer_BackoffResetOnSuccess verifies that backoff is properly reset -// after a successful DA retrieval, allowing the syncer to continue at normal speed. -func TestSyncer_BackoffResetOnSuccess(t *testing.T) { +// TestDAFollower_BackoffResetOnSuccess verifies that backoff is properly reset +// after a successful DA retrieval. +func TestDAFollower_BackoffResetOnSuccess(t *testing.T) { synctest.Test(t, func(t *testing.T) { - ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 15*time.Second) defer cancel() - syncer := setupTestSyncer(t, 1*time.Second) - syncer.ctx = ctx - addr, pub, signer := buildSyncTestSigner(t) - gen := syncer.genesis + gen := backoffTestGenesis(addr) daRetriever := NewMockDARetriever(t) - p2pHandler := newMockp2pHandler(t) - p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - syncer.daRetriever = daRetriever - syncer.p2pHandler = p2pHandler - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() - - // Mock PopPriorityHeight to always return 0 (no priority heights) daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() - // Create mock stores for P2P - mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) - mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } + + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 100, + DABlockTime: 1 * time.Second, + }).(*daFollower) - mockDataStore := extmocks.NewMockStore[*types.Data](t) - mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() + follower.ctx, follower.cancel = context.WithCancel(ctx) + follower.highestSeenDAHeight.Store(105) var callTimes []time.Time @@ -190,7 +169,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { }). Return(nil, errors.New("temporary failure")).Once() - // Second call - success (should reset backoff and increment DA height) + // Second call - success _, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil) data := &types.Data{ Metadata: &types.Metadata{ @@ -211,7 +190,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { }). Return([]common.DAHeightEvent{event}, nil).Once() - // Third call - should happen immediately after success (DA height incremented to 101) + // Third call - should happen immediately after success (DA height incremented) daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). Run(func(args mock.Arguments) { callTimes = append(callTimes, time.Now()) @@ -219,137 +198,71 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { }). Return(nil, datypes.ErrBlobNotFound).Once() - // Start process loop to handle events - go syncer.processLoop() - - // Run workers - syncer.startSyncWorkers() + go follower.runCatchup() <-ctx.Done() - syncer.wg.Wait() require.Len(t, callTimes, 3, "should make exactly 3 calls") - // Verify backoff between first and second call delay1to2 := callTimes[1].Sub(callTimes[0]) assert.GreaterOrEqual(t, delay1to2, 1*time.Second, "should have backed off between error and success (got %v)", delay1to2) - // Verify no backoff between second and third call (backoff reset) delay2to3 := callTimes[2].Sub(callTimes[1]) assert.Less(t, delay2to3, 100*time.Millisecond, "should continue immediately after success (got %v)", delay2to3) }) } -// TestSyncer_BackoffBehaviorIntegration tests the complete backoff flow: -// error -> backoff delay -> recovery -> normal operation. -func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { - // Test simpler backoff behavior: error -> backoff -> success -> continue +// TestDAFollower_CatchupThenReachHead verifies the catchup flow: +// sequential fetch from local → highest → mark head reached. +func TestDAFollower_CatchupThenReachHead(t *testing.T) { synctest.Test(t, func(t *testing.T) { - ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 15*time.Second) defer cancel() - syncer := setupTestSyncer(t, 500*time.Millisecond) - syncer.ctx = ctx - daRetriever := NewMockDARetriever(t) - p2pHandler := newMockp2pHandler(t) - p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - syncer.daRetriever = daRetriever - syncer.p2pHandler = p2pHandler - - // Mock PopPriorityHeight to always return 0 (no priority heights) daRetriever.On("PopPriorityHeight").Return(uint64(0)).Maybe() - // Create mock stores for P2P - mockHeaderStore := extmocks.NewMockStore[*types.SignedHeader](t) - mockHeaderStore.EXPECT().Height().Return(uint64(0)).Maybe() - - mockDataStore := extmocks.NewMockStore[*types.Data](t) - mockDataStore.EXPECT().Height().Return(uint64(0)).Maybe() - - var callTimes []time.Time - p2pHandler.On("SetProcessedHeight", mock.Anything).Return().Maybe() - - // First call - error (triggers backoff) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - }). - Return(nil, errors.New("network error")).Once() - - // Second call - should be delayed due to backoff - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - }). - Return(nil, datypes.ErrBlobNotFound).Once() - - // Third call - should continue without delay (DA height incremented) - daRetriever.On("RetrieveFromDA", mock.Anything, uint64(101)). - Run(func(args mock.Arguments) { - callTimes = append(callTimes, time.Now()) - cancel() - }). - Return(nil, datypes.ErrBlobNotFound).Once() - - go syncer.processLoop() - syncer.startSyncWorkers() - <-ctx.Done() - syncer.wg.Wait() - - require.Len(t, callTimes, 3, "should make exactly 3 calls") + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } + + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 3, + DABlockTime: 500 * time.Millisecond, + }).(*daFollower) + + follower.ctx, follower.cancel = context.WithCancel(ctx) + follower.highestSeenDAHeight.Store(5) + + var fetchedHeights []uint64 + + for h := uint64(3); h <= 5; h++ { + h := h + daRetriever.On("RetrieveFromDA", mock.Anything, h). + Run(func(args mock.Arguments) { + fetchedHeights = append(fetchedHeights, h) + }). + Return(nil, datypes.ErrBlobNotFound).Once() + } - // First to second call should be delayed (backoff) - delay1to2 := callTimes[1].Sub(callTimes[0]) - assert.GreaterOrEqual(t, delay1to2, 500*time.Millisecond, - "should have backoff delay between first and second call (got %v)", delay1to2) + follower.runCatchup() - // Second to third call should be immediate (no backoff after ErrBlobNotFound) - delay2to3 := callTimes[2].Sub(callTimes[1]) - assert.Less(t, delay2to3, 100*time.Millisecond, - "should continue immediately after ErrBlobNotFound (got %v)", delay2to3) + assert.True(t, follower.HasReachedHead(), "should have reached DA head") + // Heights 3, 4, 5 processed; local now at 6 which > highest (5) → caught up + assert.Equal(t, []uint64{3, 4, 5}, fetchedHeights, "should have fetched heights sequentially") }) } -func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer { - t.Helper() - - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) - - addr, _, _ := buildSyncTestSigner(t) - - cfg := config.DefaultConfig() - cfg.DA.BlockTime.Duration = daBlockTime - - gen := genesis.Genesis{ +// backoffTestGenesis creates a test genesis for the backoff tests. +func backoffTestGenesis(addr []byte) genesis.Genesis { + return genesis.Genesis{ ChainID: "test-chain", InitialHeight: 1, - StartTime: time.Now().Add(-time.Hour), // Start in past + StartTime: time.Now().Add(-time.Hour), ProposerAddress: addr, DAStartHeight: 100, } - - syncer := NewSyncer( - st, - execution.NewDummyExecutor(), - nil, - cm, - common.NopMetrics(), - cfg, - gen, - extmocks.NewMockStore[*types.P2PSignedHeader](t), - extmocks.NewMockStore[*types.P2PData](t), - zerolog.Nop(), - common.DefaultBlockOptions(), - make(chan error, 1), - nil, - ) - - require.NoError(t, syncer.initializeState()) - return syncer } diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index 330afc2e53..063afd3c64 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -44,6 +44,20 @@ func BenchmarkSyncerIO(b *testing.B) { // run both loops go fixt.s.processLoop() + + // Create a DAFollower to drive DA retrieval. + follower := NewDAFollower(DAFollowerConfig{ + Retriever: fixt.s.daRetriever, + Logger: zerolog.Nop(), + PipeEvent: fixt.s.pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: fixt.s.daRetrieverHeight.Load(), + DABlockTime: 0, + }).(*daFollower) + follower.ctx, follower.cancel = context.WithCancel(fixt.s.ctx) + follower.highestSeenDAHeight.Store(spec.heights + daHeightOffset) + go follower.runCatchup() + fixt.s.startSyncWorkers() require.Eventually(b, func() bool { @@ -60,7 +74,6 @@ func BenchmarkSyncerIO(b *testing.B) { } require.Len(b, fixt.s.heightInCh, 0) - assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight) gotStoreHeight, err := fixt.s.store.Height(b.Context()) require.NoError(b, err) assert.Equal(b, spec.heights, gotStoreHeight) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 5ffd607d5b..892dd1050a 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -416,12 +416,26 @@ func TestSyncLoopPersistState(t *testing.T) { Return(nil, datypes.ErrHeightFromFuture) go syncerInst1.processLoop() + + // Create and start a DAFollower so DA retrieval actually happens. + follower1 := NewDAFollower(DAFollowerConfig{ + Retriever: daRtrMock, + Logger: zerolog.Nop(), + PipeEvent: syncerInst1.pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: syncerInst1.daRetrieverHeight.Load(), + DABlockTime: cfg.DA.BlockTime.Duration, + }).(*daFollower) + follower1.ctx, follower1.cancel = context.WithCancel(ctx) + // Set highest so catchup runs through all mocked heights. + follower1.highestSeenDAHeight.Store(myFutureDAHeight) + go follower1.runCatchup() syncerInst1.startSyncWorkers() syncerInst1.wg.Wait() requireEmptyChan(t, errorCh) t.Log("sync workers on instance1 completed") - require.Equal(t, myFutureDAHeight, syncerInst1.daRetrieverHeight.Load()) + require.Equal(t, myFutureDAHeight, follower1.localDAHeight.Load()) // wait for all events consumed require.NoError(t, cm.SaveToStore()) @@ -479,6 +493,19 @@ func TestSyncLoopPersistState(t *testing.T) { // when it starts, it should fetch from the last height it stopped at t.Log("sync workers on instance2 started") + + // Create a follower for instance 2. + follower2 := NewDAFollower(DAFollowerConfig{ + Retriever: daRtrMock, + Logger: zerolog.Nop(), + PipeEvent: syncerInst2.pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: syncerInst2.daRetrieverHeight.Load(), + DABlockTime: cfg.DA.BlockTime.Duration, + }).(*daFollower) + follower2.ctx, follower2.cancel = context.WithCancel(ctx) + follower2.highestSeenDAHeight.Store(syncerInst2.daRetrieverHeight.Load() + 1) + go follower2.runCatchup() syncerInst2.startSyncWorkers() syncerInst2.wg.Wait() diff --git a/pkg/da/types/types.go b/pkg/da/types/types.go index b2f2e7bc30..7f48fc43ef 100644 --- a/pkg/da/types/types.go +++ b/pkg/da/types/types.go @@ -82,3 +82,10 @@ func SplitID(id []byte) (uint64, []byte, error) { commitment := id[8:] return binary.LittleEndian.Uint64(id[:8]), commitment, nil } + +// SubscriptionEvent is a namespace-agnostic signal that a blob was finalized at +// Height on the DA layer. Produced by Subscribe and consumed by DA followers. +type SubscriptionEvent struct { + // Height is the DA layer height at which the blob was finalized. + Height uint64 +} diff --git a/test/mocks/da.go b/test/mocks/da.go index f5293d907a..27ce9badf3 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -492,6 +492,74 @@ func (_c *MockClient_Submit_Call) RunAndReturn(run func(ctx context.Context, dat return _c } +// Subscribe provides a mock function for the type MockClient +func (_mock *MockClient) Subscribe(ctx context.Context, namespace []byte) (<-chan da.SubscriptionEvent, error) { + ret := _mock.Called(ctx, namespace) + + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + + var r0 <-chan da.SubscriptionEvent + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) (<-chan da.SubscriptionEvent, error)); ok { + return returnFunc(ctx, namespace) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) <-chan da.SubscriptionEvent); ok { + r0 = returnFunc(ctx, namespace) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan da.SubscriptionEvent) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []byte) error); ok { + r1 = returnFunc(ctx, namespace) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe' +type MockClient_Subscribe_Call struct { + *mock.Call +} + +// Subscribe is a helper method to define mock.On call +// - ctx context.Context +// - namespace []byte +func (_e *MockClient_Expecter) Subscribe(ctx interface{}, namespace interface{}) *MockClient_Subscribe_Call { + return &MockClient_Subscribe_Call{Call: _e.mock.On("Subscribe", ctx, namespace)} +} + +func (_c *MockClient_Subscribe_Call) Run(run func(ctx context.Context, namespace []byte)) *MockClient_Subscribe_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockClient_Subscribe_Call) Return(subscriptionEventCh <-chan da.SubscriptionEvent, err error) *MockClient_Subscribe_Call { + _c.Call.Return(subscriptionEventCh, err) + return _c +} + +func (_c *MockClient_Subscribe_Call) RunAndReturn(run func(ctx context.Context, namespace []byte) (<-chan da.SubscriptionEvent, error)) *MockClient_Subscribe_Call { + _c.Call.Return(run) + return _c +} + // NewMockVerifier creates a new instance of MockVerifier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockVerifier(t interface { diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 648021b76a..360bbf686f 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -42,6 +42,11 @@ type DummyDA struct { tickerStop chan struct{} } +func (d *DummyDA) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { + //TODO implement me + panic("implement me") +} + // Option configures a DummyDA instance. type Option func(*DummyDA) From 2ae32fbfc6320cb7d86f1b96f507c729bc1aced5 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 3 Mar 2026 13:29:49 +0100 Subject: [PATCH 2/6] feat: add inline blob processing to DAFollower for zero-latency follow mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the DA subscription delivers blobs at the current local DA height, the followLoop now processes them inline via ProcessBlobs — avoiding a round-trip re-fetch from the DA layer. Architecture: - followLoop: processes subscription blobs inline when caught up (fast path), falls through to catchupLoop when behind (slow path). - catchupLoop: unchanged — sequential RetrieveFromDA() for bulk sync. Changes: - Add Blobs field to SubscriptionEvent for carrying raw blob data - Add extractBlobData() to DA client Subscribe adapter - Export ProcessBlobs on DARetriever interface - Add handleSubscriptionEvent() to DAFollower with inline fast path - Add TestDAFollower_InlineProcessing with 3 sub-tests --- block/internal/da/client.go | 25 ++++- block/internal/syncing/da_follower.go | 37 +++++- block/internal/syncing/da_retriever.go | 9 ++ block/internal/syncing/da_retriever_mock.go | 65 +++++++++++ .../internal/syncing/da_retriever_tracing.go | 4 + .../syncing/da_retriever_tracing_test.go | 4 + block/internal/syncing/syncer_backoff_test.go | 106 ++++++++++++++++++ pkg/da/types/types.go | 3 + 8 files changed, 249 insertions(+), 4 deletions(-) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 35f604acab..aa2a5be6ae 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -381,7 +381,10 @@ func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datype continue } select { - case out <- datypes.SubscriptionEvent{Height: resp.Height}: + case out <- datypes.SubscriptionEvent{ + Height: resp.Height, + Blobs: extractBlobData(resp), + }: case <-ctx.Done(): return } @@ -392,6 +395,26 @@ func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datype return out, nil } +// extractBlobData extracts raw byte slices from a subscription response, +// filtering out nil blobs and empty data. +func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte { + if resp == nil || len(resp.Blobs) == 0 { + return nil + } + blobs := make([][]byte, 0, len(resp.Blobs)) + for _, blob := range resp.Blobs { + if blob == nil { + continue + } + data := blob.Data() + if len(data) == 0 { + continue + } + blobs = append(blobs, data) + } + return blobs +} + // Get fetches blobs by their IDs. Used for visualization and fetching specific blobs. func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { if len(ids) == 0 { diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go index 27ec33b6fc..e9f95a1666 100644 --- a/block/internal/syncing/da_follower.go +++ b/block/internal/syncing/da_follower.go @@ -17,8 +17,9 @@ import ( // DAFollower subscribes to DA blob events and drives sequential catchup. // // Architecture: -// - followLoop listens on the subscription channel and atomically updates -// highestSeenDAHeight. +// - followLoop listens on the subscription channel. When caught up, it processes +// subscription blobs inline (fast path, no DA re-fetch). Otherwise, it updates +// highestSeenDAHeight and signals the catchup loop. // - catchupLoop sequentially retrieves from localDAHeight → highestSeenDAHeight, // piping events to the Syncer's heightInCh. // @@ -169,11 +170,41 @@ func (f *daFollower) runSubscription() error { if !ok { return errors.New("subscription channel closed") } - f.updateHighest(ev.Height) + f.handleSubscriptionEvent(ev) } } } +// handleSubscriptionEvent processes a subscription event. When the follower is +// caught up (ev.Height == localDAHeight) and blobs are available, it processes +// them inline — avoiding a DA re-fetch round trip. Otherwise, it just updates +// highestSeenDAHeight and lets catchupLoop handle retrieval. +func (f *daFollower) handleSubscriptionEvent(ev datypes.SubscriptionEvent) { + // Always record the highest height we've seen from the subscription. + f.updateHighest(ev.Height) + + // Fast path: process blobs inline when caught up. + // Only fire when ev.Height == localDAHeight to avoid out-of-order processing. + if len(ev.Blobs) > 0 && ev.Height == f.localDAHeight.Load() { + events := f.retriever.ProcessBlobs(f.ctx, ev.Blobs, ev.Height) + for _, event := range events { + if err := f.pipeEvent(f.ctx, event); err != nil { + f.logger.Warn().Err(err).Uint64("da_height", ev.Height). + Msg("failed to pipe inline event, catchup will retry") + return // catchupLoop already signaled via updateHighest + } + } + // Advance local height — we processed this height inline. + f.localDAHeight.Store(ev.Height + 1) + f.headReached.Store(true) + f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)). + Msg("processed subscription blobs inline (fast path)") + return + } + + // Slow path: behind or no blobs — catchupLoop will handle via signal from updateHighest. +} + // updateHighest atomically bumps highestSeenDAHeight and signals catchup if needed. func (f *daFollower) updateHighest(height uint64) { for { diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index a6c9d43c7c..177ff98628 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -24,6 +24,9 @@ import ( type DARetriever interface { // RetrieveFromDA retrieves blocks from the specified DA height and returns height events RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) + // ProcessBlobs parses raw blob bytes at a given DA height into height events. + // Used by the DAFollower to process subscription blobs inline without re-fetching. + ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent // QueuePriorityHeight queues a DA height for priority retrieval (from P2P hints). // These heights take precedence over sequential fetching. QueuePriorityHeight(daHeight uint64) @@ -191,6 +194,12 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight } } +// ProcessBlobs processes raw blob bytes to extract headers and data and returns height events. +// This is the public interface used by the DAFollower for inline subscription processing. +func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + return r.processBlobs(ctx, blobs, daHeight) +} + // processBlobs processes retrieved blobs to extract headers and data and returns height events func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { // Decode all blobs diff --git a/block/internal/syncing/da_retriever_mock.go b/block/internal/syncing/da_retriever_mock.go index 2e191c8851..dc6926485e 100644 --- a/block/internal/syncing/da_retriever_mock.go +++ b/block/internal/syncing/da_retriever_mock.go @@ -189,3 +189,68 @@ func (_c *MockDARetriever_RetrieveFromDA_Call) RunAndReturn(run func(ctx context _c.Call.Return(run) return _c } + +// ProcessBlobs provides a mock function for the type MockDARetriever +func (_mock *MockDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + ret := _mock.Called(ctx, blobs, daHeight) + + if len(ret) == 0 { + panic("no return value specified for ProcessBlobs") + } + + var r0 []common.DAHeightEvent + if returnFunc, ok := ret.Get(0).(func(context.Context, [][]byte, uint64) []common.DAHeightEvent); ok { + r0 = returnFunc(ctx, blobs, daHeight) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]common.DAHeightEvent) + } + } + return r0 +} + +// MockDARetriever_ProcessBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessBlobs' +type MockDARetriever_ProcessBlobs_Call struct { + *mock.Call +} + +// ProcessBlobs is a helper method to define mock.On call +// - ctx context.Context +// - blobs [][]byte +// - daHeight uint64 +func (_e *MockDARetriever_Expecter) ProcessBlobs(ctx interface{}, blobs interface{}, daHeight interface{}) *MockDARetriever_ProcessBlobs_Call { + return &MockDARetriever_ProcessBlobs_Call{Call: _e.mock.On("ProcessBlobs", ctx, blobs, daHeight)} +} + +func (_c *MockDARetriever_ProcessBlobs_Call) Run(run func(ctx context.Context, blobs [][]byte, daHeight uint64)) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 [][]byte + if args[1] != nil { + arg1 = args[1].([][]byte) + } + var arg2 uint64 + if args[2] != nil { + arg2 = args[2].(uint64) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockDARetriever_ProcessBlobs_Call) Return(dAHeightEvents []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Return(dAHeightEvents) + return _c +} + +func (_c *MockDARetriever_ProcessBlobs_Call) RunAndReturn(run func(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Return(run) + return _c +} diff --git a/block/internal/syncing/da_retriever_tracing.go b/block/internal/syncing/da_retriever_tracing.go index 2bc7a4094d..1e1a9ea7c0 100644 --- a/block/internal/syncing/da_retriever_tracing.go +++ b/block/internal/syncing/da_retriever_tracing.go @@ -63,3 +63,7 @@ func (t *tracedDARetriever) QueuePriorityHeight(daHeight uint64) { func (t *tracedDARetriever) PopPriorityHeight() uint64 { return t.inner.PopPriorityHeight() } + +func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + return t.inner.ProcessBlobs(ctx, blobs, daHeight) +} diff --git a/block/internal/syncing/da_retriever_tracing_test.go b/block/internal/syncing/da_retriever_tracing_test.go index 99ce1eb639..930fc43617 100644 --- a/block/internal/syncing/da_retriever_tracing_test.go +++ b/block/internal/syncing/da_retriever_tracing_test.go @@ -31,6 +31,10 @@ func (m *mockDARetriever) QueuePriorityHeight(daHeight uint64) {} func (m *mockDARetriever) PopPriorityHeight() uint64 { return 0 } +func (m *mockDARetriever) ProcessBlobs(_ context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + return nil +} + func setupDARetrieverTrace(t *testing.T, inner DARetriever) (DARetriever, *tracetest.SpanRecorder) { t.Helper() sr := tracetest.NewSpanRecorder() diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 10564b4b28..58327c0819 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -256,6 +256,112 @@ func TestDAFollower_CatchupThenReachHead(t *testing.T) { }) } +// TestDAFollower_InlineProcessing verifies the fast path: when the subscription +// delivers blobs at the current localDAHeight, handleSubscriptionEvent processes +// them inline via ProcessBlobs (not RetrieveFromDA). +func TestDAFollower_InlineProcessing(t *testing.T) { + t.Run("processes_blobs_inline_when_caught_up", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + + var pipedEvents []common.DAHeightEvent + pipeEvent := func(_ context.Context, ev common.DAHeightEvent) error { + pipedEvents = append(pipedEvents, ev) + return nil + } + + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 10, + DABlockTime: 500 * time.Millisecond, + }).(*daFollower) + + follower.ctx, follower.cancel = context.WithCancel(t.Context()) + defer follower.cancel() + + blobs := [][]byte{[]byte("header-blob"), []byte("data-blob")} + expectedEvents := []common.DAHeightEvent{ + {DaHeight: 10, Source: common.SourceDA}, + } + + // ProcessBlobs should be called (not RetrieveFromDA) + daRetriever.On("ProcessBlobs", mock.Anything, blobs, uint64(10)). + Return(expectedEvents).Once() + + // Simulate subscription event at the current localDAHeight + follower.handleSubscriptionEvent(datypes.SubscriptionEvent{ + Height: 10, + Blobs: blobs, + }) + + // Verify: ProcessBlobs was called, events were piped, height advanced + require.Len(t, pipedEvents, 1, "should pipe 1 event from inline processing") + assert.Equal(t, uint64(10), pipedEvents[0].DaHeight) + assert.Equal(t, uint64(11), follower.localDAHeight.Load(), "localDAHeight should advance past processed height") + assert.True(t, follower.HasReachedHead(), "should mark head as reached after inline processing") + }) + + t.Run("falls_through_to_catchup_when_behind", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } + + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 10, + DABlockTime: 500 * time.Millisecond, + }).(*daFollower) + + follower.ctx, follower.cancel = context.WithCancel(t.Context()) + defer follower.cancel() + + // Subscription reports height 15 but local is at 10 — should NOT process inline + follower.handleSubscriptionEvent(datypes.SubscriptionEvent{ + Height: 15, + Blobs: [][]byte{[]byte("blob")}, + }) + + // ProcessBlobs should NOT have been called + daRetriever.AssertNotCalled(t, "ProcessBlobs", mock.Anything, mock.Anything, mock.Anything) + assert.Equal(t, uint64(10), follower.localDAHeight.Load(), "localDAHeight should not change") + assert.Equal(t, uint64(15), follower.highestSeenDAHeight.Load(), "highestSeen should be updated") + }) + + t.Run("falls_through_when_no_blobs", func(t *testing.T) { + daRetriever := NewMockDARetriever(t) + + pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil } + + follower := NewDAFollower(DAFollowerConfig{ + Retriever: daRetriever, + Logger: zerolog.Nop(), + PipeEvent: pipeEvent, + Namespace: []byte("ns"), + StartDAHeight: 10, + DABlockTime: 500 * time.Millisecond, + }).(*daFollower) + + follower.ctx, follower.cancel = context.WithCancel(t.Context()) + defer follower.cancel() + + // Subscription at current height but no blobs — should fall through + follower.handleSubscriptionEvent(datypes.SubscriptionEvent{ + Height: 10, + Blobs: nil, + }) + + // ProcessBlobs should NOT have been called + daRetriever.AssertNotCalled(t, "ProcessBlobs", mock.Anything, mock.Anything, mock.Anything) + assert.Equal(t, uint64(10), follower.localDAHeight.Load(), "localDAHeight should not change") + assert.Equal(t, uint64(10), follower.highestSeenDAHeight.Load(), "highestSeen should be updated") + }) +} + // backoffTestGenesis creates a test genesis for the backoff tests. func backoffTestGenesis(addr []byte) genesis.Genesis { return genesis.Genesis{ diff --git a/pkg/da/types/types.go b/pkg/da/types/types.go index 7f48fc43ef..8c11ce5cb5 100644 --- a/pkg/da/types/types.go +++ b/pkg/da/types/types.go @@ -88,4 +88,7 @@ func SplitID(id []byte) (uint64, []byte, error) { type SubscriptionEvent struct { // Height is the DA layer height at which the blob was finalized. Height uint64 + // Blobs contains the raw blob data from the subscription response. + // When non-nil, followers can process blobs inline without re-fetching from DA. + Blobs [][]byte } From 72abe51c9b8746c67f4f4ec3494ddd2de6563bc3 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 3 Mar 2026 13:55:14 +0100 Subject: [PATCH 3/6] feat: subscribe to both header and data namespaces for inline processing When header and data use different DA namespaces, the DAFollower now subscribes to both and merges events via a fan-in goroutine. This ensures inline blob processing works correctly for split-namespace configurations. Changes: - Add DataNamespace to DAFollowerConfig and daFollower - Subscribe to both namespaces in runSubscription with mergeSubscriptions fan-in - Guard handleSubscriptionEvent to only advance localDAHeight when ProcessBlobs returns at least one complete event (header+data matched) - Pass DataNamespace from syncer.go - Implement Subscribe on DummyDA test helper with subscriber notification --- block/internal/syncing/da_follower.go | 78 +++++++++++++++++++++++---- block/internal/syncing/syncer.go | 1 + test/testda/dummy.go | 61 +++++++++++++++++++-- 3 files changed, 128 insertions(+), 12 deletions(-) diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go index e9f95a1666..cfadf36cb8 100644 --- a/block/internal/syncing/da_follower.go +++ b/block/internal/syncing/da_follower.go @@ -1,8 +1,10 @@ package syncing import ( + "bytes" "context" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -45,6 +47,9 @@ type daFollower struct { // Namespace to subscribe on (header namespace). namespace []byte + // dataNamespace is the data namespace (may equal namespace when header+data + // share the same namespace). When different, we subscribe to both and merge. + dataNamespace []byte // localDAHeight is only written by catchupLoop and read by followLoop // to determine whether a catchup is needed. @@ -76,18 +81,24 @@ type DAFollowerConfig struct { Logger zerolog.Logger PipeEvent func(ctx context.Context, event common.DAHeightEvent) error Namespace []byte + DataNamespace []byte // may be nil or equal to Namespace StartDAHeight uint64 DABlockTime time.Duration } // NewDAFollower creates a new daFollower. func NewDAFollower(cfg DAFollowerConfig) DAFollower { + dataNs := cfg.DataNamespace + if len(dataNs) == 0 { + dataNs = cfg.Namespace + } f := &daFollower{ client: cfg.Client, retriever: cfg.Retriever, logger: cfg.Logger.With().Str("component", "da_follower").Logger(), pipeEvent: cfg.PipeEvent, namespace: cfg.Namespace, + dataNamespace: dataNs, catchupSignal: make(chan struct{}, 1), daBlockTime: cfg.DABlockTime, } @@ -154,12 +165,22 @@ func (f *daFollower) followLoop() { } } -// runSubscription opens a single subscription and processes events until the -// channel is closed or an error occurs. +// runSubscription opens subscriptions on both header and data namespaces (if +// different) and processes events until a channel is closed or an error occurs. func (f *daFollower) runSubscription() error { - ch, err := f.client.Subscribe(f.ctx, f.namespace) + headerCh, err := f.client.Subscribe(f.ctx, f.namespace) if err != nil { - return err + return fmt.Errorf("subscribe header namespace: %w", err) + } + + // If namespaces differ, subscribe to the data namespace too and fan-in. + ch := headerCh + if !bytes.Equal(f.namespace, f.dataNamespace) { + dataCh, err := f.client.Subscribe(f.ctx, f.dataNamespace) + if err != nil { + return fmt.Errorf("subscribe data namespace: %w", err) + } + ch = f.mergeSubscriptions(headerCh, dataCh) } for { @@ -175,6 +196,41 @@ func (f *daFollower) runSubscription() error { } } +// mergeSubscriptions fans two subscription channels into one, concatenating +// blobs from both namespaces when events arrive at the same DA height. +func (f *daFollower) mergeSubscriptions( + headerCh, dataCh <-chan datypes.SubscriptionEvent, +) <-chan datypes.SubscriptionEvent { + out := make(chan datypes.SubscriptionEvent, 16) + go func() { + defer close(out) + for headerCh != nil || dataCh != nil { + var ev datypes.SubscriptionEvent + var ok bool + select { + case <-f.ctx.Done(): + return + case ev, ok = <-headerCh: + if !ok { + headerCh = nil + continue + } + case ev, ok = <-dataCh: + if !ok { + dataCh = nil + continue + } + } + select { + case out <- ev: + case <-f.ctx.Done(): + return + } + } + }() + return out +} + // handleSubscriptionEvent processes a subscription event. When the follower is // caught up (ev.Height == localDAHeight) and blobs are available, it processes // them inline — avoiding a DA re-fetch round trip. Otherwise, it just updates @@ -194,11 +250,15 @@ func (f *daFollower) handleSubscriptionEvent(ev datypes.SubscriptionEvent) { return // catchupLoop already signaled via updateHighest } } - // Advance local height — we processed this height inline. - f.localDAHeight.Store(ev.Height + 1) - f.headReached.Store(true) - f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)). - Msg("processed subscription blobs inline (fast path)") + // Only advance if we produced at least one complete event. + // With split namespaces, the first namespace's blobs may not produce + // events until the second namespace's blobs arrive at the same height. + if len(events) != 0 { + f.localDAHeight.Store(ev.Height + 1) + f.headReached.Store(true) + f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)). + Msg("processed subscription blobs inline (fast path)") + } return } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 154cd927ec..b5f429090a 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -203,6 +203,7 @@ func (s *Syncer) Start(ctx context.Context) error { Logger: s.logger, PipeEvent: s.pipeEvent, Namespace: s.daClient.GetHeaderNamespace(), + DataNamespace: s.daClient.GetDataNamespace(), StartDAHeight: s.daRetrieverHeight.Load(), DABlockTime: s.config.DA.BlockTime.Duration, }) diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 360bbf686f..7f29140b28 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -28,6 +28,12 @@ func (h *Header) Time() time.Time { return h.Timestamp } +// subscriber holds a channel and the context for a single Subscribe caller. +type subscriber struct { + ch chan datypes.SubscriptionEvent + ctx context.Context +} + // DummyDA is a test implementation of the DA client interface. // It supports blob storage, height simulation, failure injection, and header retrieval. type DummyDA struct { @@ -38,13 +44,50 @@ type DummyDA struct { headers map[uint64]*Header // height -> header (with timestamp) failSubmit atomic.Bool + // subscribers tracks active Subscribe callers. + subscribers []*subscriber + tickerMu sync.Mutex tickerStop chan struct{} } -func (d *DummyDA) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) { - //TODO implement me - panic("implement me") +// Subscribe returns a channel that emits a SubscriptionEvent for every new DA +// height produced by Submit or StartHeightTicker. The channel is closed when +// ctx is cancelled or Reset is called. +func (d *DummyDA) Subscribe(ctx context.Context, _ []byte) (<-chan datypes.SubscriptionEvent, error) { + ch := make(chan datypes.SubscriptionEvent, 64) + sub := &subscriber{ch: ch, ctx: ctx} + + d.mu.Lock() + d.subscribers = append(d.subscribers, sub) + d.mu.Unlock() + + // Remove subscriber and close channel when ctx is cancelled. + go func() { + <-ctx.Done() + d.mu.Lock() + defer d.mu.Unlock() + for i, s := range d.subscribers { + if s == sub { + d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...) + break + } + } + close(ch) + }() + + return ch, nil +} + +// notifySubscribers sends an event to all active subscribers. Must be called +// with d.mu held. +func (d *DummyDA) notifySubscribers(ev datypes.SubscriptionEvent) { + for _, sub := range d.subscribers { + select { + case sub.ch <- ev: + case <-sub.ctx.Done(): + } + } } // Option configures a DummyDA instance. @@ -116,6 +159,10 @@ func (d *DummyDA) Submit(_ context.Context, data [][]byte, _ float64, namespace Timestamp: now, } } + d.notifySubscribers(datypes.SubscriptionEvent{ + Height: height, + Blobs: data, + }) d.mu.Unlock() return datypes.ResultSubmit{ @@ -258,6 +305,9 @@ func (d *DummyDA) StartHeightTicker(interval time.Duration) func() { Timestamp: now, } } + d.notifySubscribers(datypes.SubscriptionEvent{ + Height: height, + }) d.mu.Unlock() case <-stopCh: return @@ -282,6 +332,11 @@ func (d *DummyDA) Reset() { d.blobs = make(map[uint64]map[string][][]byte) d.headers = make(map[uint64]*Header) d.failSubmit.Store(false) + // Close all subscriber channels. + for _, sub := range d.subscribers { + close(sub.ch) + } + d.subscribers = nil d.mu.Unlock() d.tickerMu.Lock() From 24fe982c540fb6e9471e295876efe3c0f20d307e Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 3 Mar 2026 14:45:25 +0100 Subject: [PATCH 4/6] feat: add subscription watchdog to detect stalled DA subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If no subscription events arrive within 3× the DA block time (default 30s), the watchdog triggers and returns an error. The followLoop then reconnects the subscription with the standard backoff. This prevents the node from silently stopping sync when the DA subscription stalls (e.g., network partition, DA node freeze). --- block/internal/syncing/da_follower.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go index cfadf36cb8..5b97acfb5b 100644 --- a/block/internal/syncing/da_follower.go +++ b/block/internal/syncing/da_follower.go @@ -167,6 +167,8 @@ func (f *daFollower) followLoop() { // runSubscription opens subscriptions on both header and data namespaces (if // different) and processes events until a channel is closed or an error occurs. +// A watchdog timer triggers if no events arrive within watchdogTimeout(), +// causing a reconnect. func (f *daFollower) runSubscription() error { headerCh, err := f.client.Subscribe(f.ctx, f.namespace) if err != nil { @@ -183,6 +185,10 @@ func (f *daFollower) runSubscription() error { ch = f.mergeSubscriptions(headerCh, dataCh) } + watchdogTimeout := f.watchdogTimeout() + watchdog := time.NewTimer(watchdogTimeout) + defer watchdog.Stop() + for { select { case <-f.ctx.Done(): @@ -192,6 +198,9 @@ func (f *daFollower) runSubscription() error { return errors.New("subscription channel closed") } f.handleSubscriptionEvent(ev) + watchdog.Reset(watchdogTimeout) + case <-watchdog.C: + return errors.New("subscription watchdog: no events received, reconnecting") } } } @@ -406,3 +415,14 @@ func (f *daFollower) backoff() time.Duration { } return 2 * time.Second } + +// watchdogTimeout returns how long to wait for a subscription event before +// assuming the subscription is stalled. Defaults to 3× the DA block time. +const watchdogMultiplier = 3 + +func (f *daFollower) watchdogTimeout() time.Duration { + if f.daBlockTime > 0 { + return f.daBlockTime * watchdogMultiplier + } + return 30 * time.Second +} From f4b9f2feda861f6ebaa5974114232e733159ac34 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Tue, 3 Mar 2026 15:24:32 +0100 Subject: [PATCH 5/6] fix: security hardening for DA subscription path --- block/internal/da/client.go | 4 +- block/internal/syncing/da_follower.go | 60 +++++++++++++++----------- block/internal/syncing/da_retriever.go | 3 ++ 3 files changed, 40 insertions(+), 27 deletions(-) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index aa2a5be6ae..bb0f118436 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -396,7 +396,7 @@ func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datype } // extractBlobData extracts raw byte slices from a subscription response, -// filtering out nil blobs and empty data. +// filtering out nil blobs, empty data, and blobs exceeding DefaultMaxBlobSize. func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte { if resp == nil || len(resp.Blobs) == 0 { return nil @@ -407,7 +407,7 @@ func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte { continue } data := blob.Data() - if len(data) == 0 { + if len(data) == 0 || len(data) > common.DefaultMaxBlobSize { continue } blobs = append(blobs, data) diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go index 5b97acfb5b..785bf859cc 100644 --- a/block/internal/syncing/da_follower.go +++ b/block/internal/syncing/da_follower.go @@ -170,7 +170,11 @@ func (f *daFollower) followLoop() { // A watchdog timer triggers if no events arrive within watchdogTimeout(), // causing a reconnect. func (f *daFollower) runSubscription() error { - headerCh, err := f.client.Subscribe(f.ctx, f.namespace) + // Sub-context ensures the merge goroutine is cancelled when this function returns. + subCtx, subCancel := context.WithCancel(f.ctx) + defer subCancel() + + headerCh, err := f.client.Subscribe(subCtx, f.namespace) if err != nil { return fmt.Errorf("subscribe header namespace: %w", err) } @@ -178,11 +182,11 @@ func (f *daFollower) runSubscription() error { // If namespaces differ, subscribe to the data namespace too and fan-in. ch := headerCh if !bytes.Equal(f.namespace, f.dataNamespace) { - dataCh, err := f.client.Subscribe(f.ctx, f.dataNamespace) + dataCh, err := f.client.Subscribe(subCtx, f.dataNamespace) if err != nil { return fmt.Errorf("subscribe data namespace: %w", err) } - ch = f.mergeSubscriptions(headerCh, dataCh) + ch = f.mergeSubscriptions(subCtx, headerCh, dataCh) } watchdogTimeout := f.watchdogTimeout() @@ -191,8 +195,8 @@ func (f *daFollower) runSubscription() error { for { select { - case <-f.ctx.Done(): - return f.ctx.Err() + case <-subCtx.Done(): + return subCtx.Err() case ev, ok := <-ch: if !ok { return errors.New("subscription channel closed") @@ -205,9 +209,9 @@ func (f *daFollower) runSubscription() error { } } -// mergeSubscriptions fans two subscription channels into one, concatenating -// blobs from both namespaces when events arrive at the same DA height. +// mergeSubscriptions fans two subscription channels into one. func (f *daFollower) mergeSubscriptions( + ctx context.Context, headerCh, dataCh <-chan datypes.SubscriptionEvent, ) <-chan datypes.SubscriptionEvent { out := make(chan datypes.SubscriptionEvent, 16) @@ -217,7 +221,7 @@ func (f *daFollower) mergeSubscriptions( var ev datypes.SubscriptionEvent var ok bool select { - case <-f.ctx.Done(): + case <-ctx.Done(): return case ev, ok = <-headerCh: if !ok { @@ -232,7 +236,7 @@ func (f *daFollower) mergeSubscriptions( } select { case out <- ev: - case <-f.ctx.Done(): + case <-ctx.Done(): return } } @@ -244,34 +248,39 @@ func (f *daFollower) mergeSubscriptions( // caught up (ev.Height == localDAHeight) and blobs are available, it processes // them inline — avoiding a DA re-fetch round trip. Otherwise, it just updates // highestSeenDAHeight and lets catchupLoop handle retrieval. +// +// Uses CAS on localDAHeight to claim exclusive access to processBlobs, +// preventing concurrent map access with catchupLoop. func (f *daFollower) handleSubscriptionEvent(ev datypes.SubscriptionEvent) { // Always record the highest height we've seen from the subscription. f.updateHighest(ev.Height) - // Fast path: process blobs inline when caught up. - // Only fire when ev.Height == localDAHeight to avoid out-of-order processing. - if len(ev.Blobs) > 0 && ev.Height == f.localDAHeight.Load() { + // Fast path: try to claim this height for inline processing. + // CAS(N, N+1) ensures only one goroutine (followLoop or catchupLoop) + // can enter processBlobs for height N. + if len(ev.Blobs) > 0 && f.localDAHeight.CompareAndSwap(ev.Height, ev.Height+1) { events := f.retriever.ProcessBlobs(f.ctx, ev.Blobs, ev.Height) for _, event := range events { if err := f.pipeEvent(f.ctx, event); err != nil { + // Roll back so catchupLoop can retry this height. + f.localDAHeight.Store(ev.Height) f.logger.Warn().Err(err).Uint64("da_height", ev.Height). Msg("failed to pipe inline event, catchup will retry") - return // catchupLoop already signaled via updateHighest + return } } - // Only advance if we produced at least one complete event. - // With split namespaces, the first namespace's blobs may not produce - // events until the second namespace's blobs arrive at the same height. if len(events) != 0 { - f.localDAHeight.Store(ev.Height + 1) f.headReached.Store(true) f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)). Msg("processed subscription blobs inline (fast path)") + } else { + // No complete events (split namespace, waiting for other half). + f.localDAHeight.Store(ev.Height) } return } - // Slow path: behind or no blobs — catchupLoop will handle via signal from updateHighest. + // Slow path: behind, no blobs, or catchupLoop claimed this height. } // updateHighest atomically bumps highestSeenDAHeight and signals catchup if needed. @@ -282,9 +291,6 @@ func (f *daFollower) updateHighest(height uint64) { return } if f.highestSeenDAHeight.CompareAndSwap(cur, height) { - f.logger.Debug(). - Uint64("da_height", height). - Msg("new highest DA height seen from subscription") f.signalCatchup() return } @@ -344,16 +350,20 @@ func (f *daFollower) runCatchup() { return } + // CAS claims this height — prevents followLoop from inline-processing + if !f.localDAHeight.CompareAndSwap(local, local+1) { + // followLoop already advanced past this height via inline processing. + continue + } + if err := f.fetchAndPipeHeight(local); err != nil { + // Roll back so we can retry after backoff. + f.localDAHeight.Store(local) if !f.waitOnCatchupError(err, local) { return } - // Retry the same height after backoff. continue } - - // Advance local height. - f.localDAHeight.Store(local + 1) } } diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index 177ff98628..782fa9c318 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -196,6 +196,9 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight // ProcessBlobs processes raw blob bytes to extract headers and data and returns height events. // This is the public interface used by the DAFollower for inline subscription processing. +// +// NOT thread-safe: the caller (DAFollower) must ensure exclusive access via CAS +// on localDAHeight before calling this method. func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { return r.processBlobs(ctx, blobs, daHeight) } From c21c211e27e8927850aa053c2012cfca371ad902 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Wed, 4 Mar 2026 12:29:57 +0100 Subject: [PATCH 6/6] feat: Implement blob subscription for local DA and update JSON-RPC client to use WebSockets, along with E2E test updates for new `evnode` flags and P2P address retrieval. --- pkg/da/jsonrpc/client.go | 17 ++++- test/e2e/evm_force_inclusion_e2e_test.go | 4 +- test/e2e/evm_test_common.go | 15 ++--- tools/local-da/local.go | 83 ++++++++++++++++++++++++ tools/local-da/rpc.go | 40 ++++++++++-- 5 files changed, 143 insertions(+), 16 deletions(-) diff --git a/pkg/da/jsonrpc/client.go b/pkg/da/jsonrpc/client.go index f1a31a9738..3f7c0dd381 100644 --- a/pkg/da/jsonrpc/client.go +++ b/pkg/da/jsonrpc/client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "strings" libshare "github.com/celestiaorg/go-square/v3/share" "github.com/filecoin-project/go-jsonrpc" @@ -23,6 +24,15 @@ func (c *Client) Close() { } } +// httpToWS converts an HTTP(S) URL to a WebSocket URL. +// go-jsonrpc requires WebSocket for channel-based subscriptions (e.g. Subscribe). +// WebSocket connections also support regular RPC calls, so this is backward-compatible. +func httpToWS(addr string) string { + addr = strings.Replace(addr, "https://", "wss://", 1) + addr = strings.Replace(addr, "http://", "ws://", 1) + return addr +} + // NewClient connects to the celestia-node RPC endpoint func NewClient(ctx context.Context, addr, token string, authHeaderName string) (*Client, error) { var httpHeader http.Header @@ -33,16 +43,19 @@ func NewClient(ctx context.Context, addr, token string, authHeaderName string) ( httpHeader = http.Header{authHeaderName: []string{fmt.Sprintf("Bearer %s", token)}} } + // Use WebSocket so that channel-based subscriptions (blob.Subscribe) work. + wsAddr := httpToWS(addr) + var cl Client // Connect to the blob namespace - blobCloser, err := jsonrpc.NewClient(ctx, addr, "blob", &cl.Blob.Internal, httpHeader) + blobCloser, err := jsonrpc.NewClient(ctx, wsAddr, "blob", &cl.Blob.Internal, httpHeader) if err != nil { return nil, fmt.Errorf("failed to connect to blob namespace: %w", err) } // Connect to the header namespace - headerCloser, err := jsonrpc.NewClient(ctx, addr, "header", &cl.Header.Internal, httpHeader) + headerCloser, err := jsonrpc.NewClient(ctx, wsAddr, "header", &cl.Header.Internal, httpHeader) if err != nil { blobCloser() return nil, fmt.Errorf("failed to connect to header namespace: %w", err) diff --git a/test/e2e/evm_force_inclusion_e2e_test.go b/test/e2e/evm_force_inclusion_e2e_test.go index f201f7f363..c6c25270c5 100644 --- a/test/e2e/evm_force_inclusion_e2e_test.go +++ b/test/e2e/evm_force_inclusion_e2e_test.go @@ -238,8 +238,10 @@ func TestEvmFullNodeForceInclusionE2E(t *testing.T) { // --- End Sequencer Setup --- // --- Start Full Node Setup --- + // Get sequencer's full P2P address (including peer ID) for the full node to connect to + sequencerP2PAddress := getNodeP2PAddress(t, sut, sequencerHome, env.Endpoints.RollkitRPCPort) // Reuse setupFullNode helper which handles genesis copying and node startup - setupFullNode(t, sut, fullNodeHome, sequencerHome, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints) + setupFullNode(t, sut, fullNodeHome, sequencerHome, env.FullNodeJWT, env.GenesisHash, sequencerP2PAddress, env.Endpoints) t.Log("Full node is up") // --- End Full Node Setup --- diff --git a/test/e2e/evm_test_common.go b/test/e2e/evm_test_common.go index 7c089e762b..c5bfb05c23 100644 --- a/test/e2e/evm_test_common.go +++ b/test/e2e/evm_test_common.go @@ -449,15 +449,15 @@ func setupFullNode(t testing.TB, sut *SystemUnderTest, fullNodeHome, sequencerHo "--home", fullNodeHome, "--evm.jwt-secret-file", fullNodeJwtSecretFile, "--evm.genesis-hash", genesisHash, - "--rollkit.p2p.peers", sequencerP2PAddress, + "--evnode.p2p.peers", sequencerP2PAddress, "--evm.engine-url", endpoints.GetFullNodeEngineURL(), "--evm.eth-url", endpoints.GetFullNodeEthURL(), - "--rollkit.da.block_time", DefaultDABlockTime, - "--rollkit.da.address", endpoints.GetDAAddress(), - "--rollkit.da.namespace", DefaultDANamespace, - "--rollkit.da.batching_strategy", "immediate", - "--rollkit.rpc.address", endpoints.GetFullNodeRPCListen(), - "--rollkit.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), + "--evnode.da.block_time", DefaultDABlockTime, + "--evnode.da.address", endpoints.GetDAAddress(), + "--evnode.da.namespace", DefaultDANamespace, + "--evnode.da.batching_strategy", "immediate", + "--evnode.rpc.address", endpoints.GetFullNodeRPCListen(), + "--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(), } sut.ExecCmd(evmSingleBinaryPath, args...) // Use AwaitNodeLive instead of AwaitNodeUp because in lazy mode scenarios, @@ -932,4 +932,3 @@ func PrintTraceReport(t testing.TB, label string, spans []TraceSpan) { t.Logf("%-40s %5.1f%% %s", name, pct, bar) } } - diff --git a/tools/local-da/local.go b/tools/local-da/local.go index ef519f17ce..07d6876f9d 100644 --- a/tools/local-da/local.go +++ b/tools/local-da/local.go @@ -13,6 +13,7 @@ import ( "sync" "time" + libshare "github.com/celestiaorg/go-square/v3/share" "github.com/rs/zerolog" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" @@ -27,6 +28,18 @@ const ( DefaultBlockTime = 1 * time.Second ) +// subscriber holds a registered subscription's channel and namespace filter. +type subscriber struct { + ch chan subscriptionEvent + ns libshare.Namespace +} + +// subscriptionEvent is sent to subscribers when a new DA block is produced. +type subscriptionEvent struct { + height uint64 + blobs []*blobrpc.Blob +} + // LocalDA is a simple implementation of in-memory DA. Not production ready! Intended only for testing! // // Data is stored in a map, where key is a serialized sequence number. This key is returned as ID. @@ -43,6 +56,10 @@ type LocalDA struct { blockTime time.Duration lastTime time.Time // tracks last timestamp to ensure monotonicity + // Subscriber registry (protected by mu) + subscribers map[int]*subscriber + nextSubID int + logger zerolog.Logger } @@ -57,6 +74,7 @@ func NewLocalDA(logger zerolog.Logger, opts ...func(*LocalDA) *LocalDA) *LocalDA data: make(map[uint64][]kvp), timestamps: make(map[uint64]time.Time), blobData: make(map[uint64][]*blobrpc.Blob), + subscribers: make(map[int]*subscriber), maxBlobSize: DefaultMaxBlobSize, blockTime: DefaultBlockTime, lastTime: time.Now(), @@ -209,6 +227,7 @@ func (d *LocalDA) SubmitWithOptions(ctx context.Context, blobs []datypes.Blob, g d.data[d.height] = append(d.data[d.height], kvp{ids[i], blob}) } + d.notifySubscribers(d.height) d.logger.Info().Uint64("newHeight", d.height).Int("count", len(ids)).Msg("SubmitWithOptions successful") return ids, nil } @@ -239,6 +258,7 @@ func (d *LocalDA) Submit(ctx context.Context, blobs []datypes.Blob, gasPrice flo d.data[d.height] = append(d.data[d.height], kvp{ids[i], blob}) } + d.notifySubscribers(d.height) d.logger.Info().Uint64("newHeight", d.height).Int("count", len(ids)).Msg("Submit successful") return ids, nil } @@ -335,5 +355,68 @@ func (d *LocalDA) produceEmptyBlock() { defer d.mu.Unlock() d.height++ d.timestamps[d.height] = d.monotonicTime() + d.notifySubscribers(d.height) d.logger.Debug().Uint64("height", d.height).Msg("produced empty block") } + +// subscribe registers a new subscriber for blobs matching the given namespace. +// Returns a read-only channel and a subscription ID for later unsubscription. +// Must NOT be called with d.mu held. +func (d *LocalDA) subscribe(ns libshare.Namespace) (<-chan subscriptionEvent, int) { + d.mu.Lock() + defer d.mu.Unlock() + + id := d.nextSubID + d.nextSubID++ + ch := make(chan subscriptionEvent, 64) + d.subscribers[id] = &subscriber{ch: ch, ns: ns} + d.logger.Info().Int("subID", id).Str("namespace", hex.EncodeToString(ns.Bytes())).Msg("subscriber registered") + return ch, id +} + +// unsubscribe removes a subscriber and closes its channel. +// Must NOT be called with d.mu held. +func (d *LocalDA) unsubscribe(id int) { + d.mu.Lock() + defer d.mu.Unlock() + + if sub, ok := d.subscribers[id]; ok { + close(sub.ch) + delete(d.subscribers, id) + d.logger.Info().Int("subID", id).Msg("subscriber unregistered") + } +} + +// notifySubscribers sends a subscriptionEvent to all registered subscribers. +// For each subscriber, only blobs matching the subscriber's namespace are included. +// Slow consumers (full channel) are dropped to avoid blocking block production. +// MUST be called with d.mu held. +func (d *LocalDA) notifySubscribers(height uint64) { + if len(d.subscribers) == 0 { + return + } + + allBlobs := d.blobData[height] // may be nil for empty blocks + + for id, sub := range d.subscribers { + // Filter blobs matching subscriber namespace + var matched []*blobrpc.Blob + for _, b := range allBlobs { + if b != nil && b.Namespace().Equals(sub.ns) { + matched = append(matched, b) + } + } + + evt := subscriptionEvent{ + height: height, + blobs: matched, + } + + select { + case sub.ch <- evt: + default: + // Slow consumer — drop to avoid blocking block production + d.logger.Warn().Int("subID", id).Uint64("height", height).Msg("dropping event for slow subscriber") + } + } +} diff --git a/tools/local-da/rpc.go b/tools/local-da/rpc.go index 6f681d6538..de8e0da070 100644 --- a/tools/local-da/rpc.go +++ b/tools/local-da/rpc.go @@ -31,6 +31,7 @@ func (s *blobServer) Submit(_ context.Context, blobs []*jsonrpc.Blob, _ *jsonrpc if len(blobs) == 0 { s.da.timestamps[height] = time.Now() + s.da.notifySubscribers(height) return height, nil } @@ -48,6 +49,7 @@ func (s *blobServer) Submit(_ context.Context, blobs []*jsonrpc.Blob, _ *jsonrpc s.da.blobData[height] = append(s.da.blobData[height], b) } s.da.timestamps[height] = time.Now() + s.da.notifySubscribers(height) return height, nil } @@ -127,11 +129,39 @@ func (s *blobServer) GetCommitmentProof(_ context.Context, _ uint64, _ libshare. return &jsonrpc.CommitmentProof{}, nil } -// Subscribe returns a closed channel; LocalDA does not push live updates. -func (s *blobServer) Subscribe(_ context.Context, _ libshare.Namespace) (<-chan *jsonrpc.SubscriptionResponse, error) { - ch := make(chan *jsonrpc.SubscriptionResponse) - close(ch) - return ch, nil +// Subscribe streams blobs as they are included for the given namespace. +// The returned channel emits a SubscriptionResponse for every new DA block. +// The channel is closed when ctx is cancelled. +func (s *blobServer) Subscribe(ctx context.Context, namespace libshare.Namespace) (<-chan *jsonrpc.SubscriptionResponse, error) { + eventCh, subID := s.da.subscribe(namespace) + + out := make(chan *jsonrpc.SubscriptionResponse, 64) + go func() { + defer close(out) + defer s.da.unsubscribe(subID) + + for { + select { + case <-ctx.Done(): + return + case evt, ok := <-eventCh: + if !ok { + return + } + resp := &jsonrpc.SubscriptionResponse{ + Height: evt.height, + Blobs: evt.blobs, + } + select { + case out <- resp: + case <-ctx.Done(): + return + } + } + } + }() + + return out, nil } // startBlobServer starts an HTTP JSON-RPC server on addr serving the blob namespace.