Skip to content

SingleBockProviderFulu with soft and hard limit for sampled blocks#10533

Open
gfukushima wants to merge 20 commits intoConsensys:masterfrom
gfukushima:single-block-provider-fulu
Open

SingleBockProviderFulu with soft and hard limit for sampled blocks#10533
gfukushima wants to merge 20 commits intoConsensys:masterfrom
gfukushima:single-block-provider-fulu

Conversation

@gfukushima
Copy link
Copy Markdown
Contributor

@gfukushima gfukushima commented Mar 26, 2026

PR Description

Fixed Issue(s)

Part of #10208

Documentation

  • I thought about documentation and added the doc-change-required label to this PR if updates are required.

Changelog

  • I thought about adding a changelog entry, and added one if I deemed necessary.

Note

Medium Risk
Touches sync and Fulu data-availability sampling paths, adding new eviction logic and new block-source fallback that could affect block fetching/import under load if limits are mis-tuned or eviction happens too early.

Overview
Adds a bounded, metrics-instrumented implementation of DasSamplerBasic (DasSamplerBasicImpl) that tracks sampled blocks, exposes getBlock/containsBlock, and applies soft/hard eviction limits to prevent unbounded tracker growth.

Wires the DAS sampler into sync and storage lookup: RecentBlocksFetchService now skips fetching blocks already present in the DAS sampler, BeaconChainController provides a SingleBlockProviderResolver that falls back between blob-sidecar trackers and the DAS sampler for StorageBackedRecentChainData, and DefaultSyncServiceFactory plumbs DasSamplerBasic through.

Introduces a new sync config/CLI knob --Xp2p-max-recently-sampled-blocks (default 128) to tune the sampler’s cache size, and updates unit/reference tests accordingly.

Reviewed by Cursor Bugbot for commit 5f5701d. Bugbot is set up for automated code reviews on this repo. Configure here.

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
@gfukushima gfukushima marked this pull request as ready for review March 26, 2026 09:06
Comment thread beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java Outdated
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
…t we could get we the defualt config. TLDR: DEFAULT_MAX_RECENTLY_SAMPLED_BLOCKS=DEFAULT_FORWARD_SYNC_BATCH_SIZE *

DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES; // 125

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Copy link
Copy Markdown
Contributor

@zilm13 zilm13 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure all these changes required especially with loosing concurrency safety traded to synchronization. Maybe just adjust makeNewRoomForTrackers in the old code according to new requirements? I guess we don't care much about blocks removed in removeAllForBlock because those are imported/rejected.

// We already have this block, waiting for blobs
return;
}
if (dasBasicSampler.containsBlock(blockRoot)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we put containsBlock in BlockEventsListener interface and check it both for dataColumnSidecars and blobSidecars in router? Maybe add an extra interface for clarity or rename this one but it looks like we are doing the same job here, isn't it?

this.recentChainData = recentChainData;
this.halfColumnsSamplingCompletionEnabled = halfColumnsSamplingCompletionEnabled;
this.maxRecentlySampledBlocks = maxRecentlySampledBlocks;
this.recentlySampledColumnsByRoot = new LinkedHashMap<>(maxRecentlySampledBlocks);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not concurrent safe

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chosen so can make use of the fifo during eviction, that's why I added some synchronism when this Map is modified

"das_recently_sampled_blocks_size",
"DAS recently sampled blocks size",
() -> {
synchronized (this) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are adding synchronization to this method. Is it really necessary? We are trying to avoid adding synchronization whenever it's possible in new classes and update old code because synchronization is slow and could be a reason for locks/stalling. Its a step backwards.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, in this case we want to make sure that the LinkedHashMap is atomically handled, for the metrics i don't think this would be extremelly necessary but also we want metrics that provide accurate data don't we?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean in general. It's just a first synchronization block of several. If we are going with concurrent safe setup without locks we could (rarely) get not contains and we will just redownload it again. It's not a big trade-off compared to locking everything here I think.
Maybe also check it in recentChainData in the end of checks inRecentBlocksFetchService maybe it's already there? Does it make sense? It should be fast to check there.

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
@gfukushima gfukushima requested a review from a team as a code owner April 17, 2026 02:31
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
# Conflicts:
#	services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 0af9b80. Configure here.

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
@gfukushima gfukushima requested a review from zilm13 April 23, 2026 04:07
Copy link
Copy Markdown
Contributor

@zilm13 zilm13 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better keeping it without synchronized!
I've left some comments we need to address before merging this.

final ExecutionPayloadProvider executionPayloadProvider =
createExecutionPayloadProvider(storageQueryChannel);

final SingleBlockProvider singleBlockProviderResolver =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some comment clarifying why do you need these sources for the block, why it's not recentChainData or smth like this

names = {"--Xp2p-max-recently-sampled-blocks"},
paramLabel = "<NUMBER>",
showDefaultValue = Visibility.ALWAYS,
description = "Maximum number recent blocks we should sample when syncing.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description is a bit confusing, same with variable name. Maybe max-concurrently-sampled-blocks or smth like this and update description + var name.

private final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, SPEC);

private DasSamplerBasic sampler;
private DasSamplerBasicImpl sampler;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New pruning logic is very complex and we don't have tests for it. At the same time the size a class input, so we could easily pass something like 2 and have some good tests. We definitely need to test new behavior.

AtomicBoolean fullySampled,
Optional<Integer> earlyCompletionRequirementCount) {
Optional<Integer> earlyCompletionRequirementCount,
AtomicReference<Optional<SignedBeaconBlock>> block,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't need block itself, we could skip storing a reference it and just make AtomicBoolean flag blockSeen. And leave function blockSeen(final Bytes32 blockRoot). Smth like this

}
final boolean[] created = {false};
final DataColumnSamplingTracker tracker =
recentlySampledColumnsByRoot.computeIfAbsent(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 steps still doesn't give you any guarantees against race. To have guarantees you could go this way after existing check. == will check that you get the same object skipping equals:

    final DataColumnSamplingTracker dataColumnSamplingTracker = DataColumnSamplingTracker.create(
            slot,
            blockRoot,
            custodyGroupCountManager,
            halfColumnsSamplingCompletionEnabled
                    ? Optional.of(
                    SpecConfigFulu.required(spec.atSlot(slot).getConfig())
                            .getNumberOfColumns()
                            / 2)
                    : Optional.empty());
    final DataColumnSamplingTracker trackerInserted =
        recentlySampledColumnsByRoot.computeIfAbsent(
            blockRoot,
            __ -> dataColumnSamplingTracker);
    if (dataColumnSamplingTracker == trackerInserted) {
      makeRoomForNewTracker();
      onFirstSeen(slot, blockRoot, dataColumnSamplingTracker);
    }
    return trackerInserted;

spec.computeStartSlotAtEpoch(recentChainData.getFinalizedEpoch()).increment();
recentlySampledColumnsByRoot
.values()
.removeIf(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to make this cache useful you'd need probably to remove this cleanup, isn't it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants