Skip to content

[WIP] adding sync mode#3131

Draft
alpe wants to merge 5 commits intomainfrom
alex/2803_best_2worlds
Draft

[WIP] adding sync mode#3131
alpe wants to merge 5 commits intomainfrom
alex/2803_best_2worlds

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Mar 3, 2026

Overview

alpe added 3 commits March 3, 2026 13:02
…ollower and introduce DA client subscription.
…w mode

When the DA subscription delivers blobs at the current local DA height,
the followLoop now processes them inline via ProcessBlobs — avoiding
a round-trip re-fetch from the DA layer.

Architecture:
- followLoop: processes subscription blobs inline when caught up (fast path),
  falls through to catchupLoop when behind (slow path).
- catchupLoop: unchanged — sequential RetrieveFromDA() for bulk sync.

Changes:
- Add Blobs field to SubscriptionEvent for carrying raw blob data
- Add extractBlobData() to DA client Subscribe adapter
- Export ProcessBlobs on DARetriever interface
- Add handleSubscriptionEvent() to DAFollower with inline fast path
- Add TestDAFollower_InlineProcessing with 3 sub-tests
When header and data use different DA namespaces, the DAFollower now
subscribes to both and merges events via a fan-in goroutine. This ensures
inline blob processing works correctly for split-namespace configurations.

Changes:
- Add DataNamespace to DAFollowerConfig and daFollower
- Subscribe to both namespaces in runSubscription with mergeSubscriptions fan-in
- Guard handleSubscriptionEvent to only advance localDAHeight when
  ProcessBlobs returns at least one complete event (header+data matched)
- Pass DataNamespace from syncer.go
- Implement Subscribe on DummyDA test helper with subscriber notification
@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedMar 3, 2026, 2:29 PM

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alex/2803_best_2worlds

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Contributor

claude bot commented Mar 3, 2026

Claude finished @alpe's task in 4m 16s —— View job


Code Review — [WIP] Adding Sync Mode

  • Gather context and understand PR scope
  • Review da_follower.go (new file - core of sync mode)
  • Review da/client.go subscription additions
  • Review syncer.go refactoring
  • Review DA types and interfaces
  • Review test changes
  • Summarize findings

This is a substantial architectural improvement — replacing the polling DA worker with an event-driven DAFollower. The design is clever and the code is generally clean. Below are the issues I found, ordered by severity.


Critical Issues

1. Data race in processBlobs maps — Fix this →

The comment in da_retriever.go:201 says "NOT thread-safe: the caller must ensure exclusive access via CAS on localDAHeight", but this invariant is not maintained:

// Scenario:
// localDAHeight = 100
catchupLoop: CAS(100, 101) → succeeds → enters processBlobs(height=100)
// localDAHeight is now 101
followLoop:  CAS(101, 102) → succeeds → enters processBlobs(height=101) concurrently!

Both goroutines can be in processBlobs simultaneously at different heights, concurrently mutating the shared pendingHeaders and pendingData maps. The CAS only prevents the same height from being processed twice — it does not prevent concurrent map access across different heights. This is a data race (detectable with -race).

2. daRetrieverHeight is never updated by the new DAFollower

In syncer.go, s.daRetrieverHeight is still used in processHeightEvent (line ~559) to skip P2P DA hints:

if daHeightHint < s.daRetrieverHeight.Load() {
    continue
}

In the old polling worker, this was advanced as the DA worker progressed. The new DAFollower has its own localDAHeight atomic, but never writes back to s.daRetrieverHeight. As a result, daRetrieverHeight stays at genesis and P2P DA hints are never filtered out — every hint will be queued regardless of whether it has already been processed.

3. Priority height processing races with followLoop inline processing

In runCatchup (da_follower.go:327), priority heights are fetched without claiming them via CAS on localDAHeight:

if priorityHeight := f.retriever.PopPriorityHeight(); priorityHeight > 0 {
    // ...
    if err := f.fetchAndPipeHeight(priorityHeight); err != nil {  // No CAS!

If priorityHeight == localDAHeight, followLoop could simultaneously claim it via CAS and call ProcessBlobs. Both goroutines would then process the same height, potentially producing duplicate events in heightInCh.


Moderate Issues

4. Partial pipe rollback may produce duplicate events — Fix this →

da_follower.go:263-270:

for _, event := range events {
    if err := f.pipeEvent(f.ctx, event); err != nil {
        // Roll back so catchupLoop can retry this height.
        f.localDAHeight.Store(ev.Height)  // But some events already piped!
        return
    }
}

If N events exist for a DA height and event K fails, events 0..K-1 are already in heightInCh but localDAHeight is rolled back. When catchupLoop retries via RetrieveFromDA, it fetches all N events again, leading to duplicates of 0..K-1.

5. errCaughtUp is dead code — Fix this →

da_follower.go:399:

var errCaughtUp = errors.New("caught up with DA head")

This is defined and handled in waitOnCatchupError, but never returned by fetchAndPipeHeight or any other function. The if errors.Is(err, errCaughtUp) branch in waitOnCatchupError is unreachable. Either wire it up (e.g., return it when ErrHeightFromFuture is seen on the first sequential height) or remove it.

6. Architecture comment inaccuracy — Fix this →

da_follower.go:28:

// The two goroutines share only atomic state; no mutexes needed.

This claim is incorrect — both goroutines share the pendingHeaders / pendingData maps in daRetriever through the DARetriever interface. The comment should be corrected.

7. testda.Reset() may double-close subscriber channels

test/testda/dummy.go:330-347: Reset() calls close(sub.ch) on all subscribers, but if a subscriber's context was already cancelled, the cleanup goroutine (lines 65-79) also closes the same channel. Concurrent Reset() + context cancellation produces a panic on a double-close. Consider draining d.subscribers before closing channels, with synchronization to prevent the cleanup goroutine from racing.


Minor Issues

8. tracedDARetriever.ProcessBlobs is not traced — Fix this →

da_retriever_tracing.go:67ProcessBlobs is not instrumented with a tracing span, unlike RetrieveFromDA.

9. Missing Subscribe span in tracing client

da/tracing.go:148tracedClient.Subscribe passes through without a span. For consistency, it should at minimum record the subscribe call.

10. daFollower.Start is not idempotent

Calling Start() twice leaks the first pair of goroutines. Document or guard against this.

11. Worker count inconsistency

syncer.go:197 uses s.wg.Go(s.processLoop) (adds 1 to wg), while startSyncWorkers uses s.wg.Add(2) for two goroutines. The DA follower has its own wg. Consider centralising all goroutine startup for clarity.

12. Subscribe rawCh not drained on cancel (da/client.go:370)

The goroutine wrapping rawCh exits on ctx.Done() without draining rawCh. The caller contract says channels must be drained, but the internal wrapping goroutine abandons messages in rawCh. This relies on the underlying blob API cleaning up properly.


Testing

Coverage is at 31.6% for this patch (da_follower.go at 41%, da/client.go at 0%):

  • da/client.go:Subscribe has no tests
  • da_follower.go — the core concurrency paths (mergeSubscriptions, priority heights, split-namespace fast-path rollback) have no coverage
  • The data race in processBlobs should be caught with a -race enabled integration test or a focused unit test

Summary

The event-driven DAFollower architecture is the right direction and the split between followLoop/catchupLoop + inline processing is elegant. The main concern is thread safety — the CAS protocol doesn't fully protect the shared pendingHeaders/pendingData maps across different heights being processed concurrently. The daRetrieverHeight staleness is also a functional regression from the old polling path.

If no subscription events arrive within 3× the DA block time (default
30s), the watchdog triggers and returns an error. The followLoop then
reconnects the subscription with the standard backoff. This prevents
the node from silently stopping sync when the DA subscription stalls
(e.g., network partition, DA node freeze).
@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 29.64427% with 178 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.99%. Comparing base (e877782) to head (f4b9f2f).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
block/internal/syncing/da_follower.go 38.34% 112 Missing and 7 partials ⚠️
block/internal/da/client.go 0.00% 36 Missing ⚠️
block/internal/syncing/syncer.go 5.55% 16 Missing and 1 partial ⚠️
block/internal/da/tracing.go 0.00% 2 Missing ⚠️
block/internal/syncing/da_retriever.go 0.00% 2 Missing ⚠️
block/internal/syncing/da_retriever_tracing.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3131      +/-   ##
==========================================
- Coverage   60.76%   59.99%   -0.77%     
==========================================
  Files         113      114       +1     
  Lines       11672    11886     +214     
==========================================
+ Hits         7092     7131      +39     
- Misses       3777     3947     +170     
- Partials      803      808       +5     
Flag Coverage Δ
combined 59.99% <29.64%> (-0.77%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant