SingleBockProviderFulu with soft and hard limit for sampled blocks#10533
SingleBockProviderFulu with soft and hard limit for sampled blocks#10533gfukushima wants to merge 20 commits intoConsensys:masterfrom
Conversation
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
…t we could get we the defualt config. TLDR: DEFAULT_MAX_RECENTLY_SAMPLED_BLOCKS=DEFAULT_FORWARD_SYNC_BATCH_SIZE * DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES; // 125 Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
zilm13
left a comment
There was a problem hiding this comment.
I'm not sure all these changes required especially with loosing concurrency safety traded to synchronization. Maybe just adjust makeNewRoomForTrackers in the old code according to new requirements? I guess we don't care much about blocks removed in removeAllForBlock because those are imported/rejected.
| // We already have this block, waiting for blobs | ||
| return; | ||
| } | ||
| if (dasBasicSampler.containsBlock(blockRoot)) { |
There was a problem hiding this comment.
Shouldn't we put containsBlock in BlockEventsListener interface and check it both for dataColumnSidecars and blobSidecars in router? Maybe add an extra interface for clarity or rename this one but it looks like we are doing the same job here, isn't it?
| this.recentChainData = recentChainData; | ||
| this.halfColumnsSamplingCompletionEnabled = halfColumnsSamplingCompletionEnabled; | ||
| this.maxRecentlySampledBlocks = maxRecentlySampledBlocks; | ||
| this.recentlySampledColumnsByRoot = new LinkedHashMap<>(maxRecentlySampledBlocks); |
There was a problem hiding this comment.
chosen so can make use of the fifo during eviction, that's why I added some synchronism when this Map is modified
| "das_recently_sampled_blocks_size", | ||
| "DAS recently sampled blocks size", | ||
| () -> { | ||
| synchronized (this) { |
There was a problem hiding this comment.
You are adding synchronization to this method. Is it really necessary? We are trying to avoid adding synchronization whenever it's possible in new classes and update old code because synchronization is slow and could be a reason for locks/stalling. Its a step backwards.
There was a problem hiding this comment.
well, in this case we want to make sure that the LinkedHashMap is atomically handled, for the metrics i don't think this would be extremelly necessary but also we want metrics that provide accurate data don't we?
There was a problem hiding this comment.
I mean in general. It's just a first synchronization block of several. If we are going with concurrent safe setup without locks we could (rarely) get not contains and we will just redownload it again. It's not a big trade-off compared to locking everything here I think.
Maybe also check it in recentChainData in the end of checks inRecentBlocksFetchService maybe it's already there? Does it make sense? It should be fast to check there.
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
# Conflicts: # services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java
Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Reviewed by Cursor Bugbot for commit 0af9b80. Configure here.
zilm13
left a comment
There was a problem hiding this comment.
Much better keeping it without synchronized!
I've left some comments we need to address before merging this.
| final ExecutionPayloadProvider executionPayloadProvider = | ||
| createExecutionPayloadProvider(storageQueryChannel); | ||
|
|
||
| final SingleBlockProvider singleBlockProviderResolver = |
There was a problem hiding this comment.
Could you please add some comment clarifying why do you need these sources for the block, why it's not recentChainData or smth like this
| names = {"--Xp2p-max-recently-sampled-blocks"}, | ||
| paramLabel = "<NUMBER>", | ||
| showDefaultValue = Visibility.ALWAYS, | ||
| description = "Maximum number recent blocks we should sample when syncing.", |
There was a problem hiding this comment.
The description is a bit confusing, same with variable name. Maybe max-concurrently-sampled-blocks or smth like this and update description + var name.
| private final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, SPEC); | ||
|
|
||
| private DasSamplerBasic sampler; | ||
| private DasSamplerBasicImpl sampler; |
There was a problem hiding this comment.
New pruning logic is very complex and we don't have tests for it. At the same time the size a class input, so we could easily pass something like 2 and have some good tests. We definitely need to test new behavior.
| AtomicBoolean fullySampled, | ||
| Optional<Integer> earlyCompletionRequirementCount) { | ||
| Optional<Integer> earlyCompletionRequirementCount, | ||
| AtomicReference<Optional<SignedBeaconBlock>> block, |
There was a problem hiding this comment.
As we don't need block itself, we could skip storing a reference it and just make AtomicBoolean flag blockSeen. And leave function blockSeen(final Bytes32 blockRoot). Smth like this
| } | ||
| final boolean[] created = {false}; | ||
| final DataColumnSamplingTracker tracker = | ||
| recentlySampledColumnsByRoot.computeIfAbsent( |
There was a problem hiding this comment.
2 steps still doesn't give you any guarantees against race. To have guarantees you could go this way after existing check. == will check that you get the same object skipping equals:
final DataColumnSamplingTracker dataColumnSamplingTracker = DataColumnSamplingTracker.create(
slot,
blockRoot,
custodyGroupCountManager,
halfColumnsSamplingCompletionEnabled
? Optional.of(
SpecConfigFulu.required(spec.atSlot(slot).getConfig())
.getNumberOfColumns()
/ 2)
: Optional.empty());
final DataColumnSamplingTracker trackerInserted =
recentlySampledColumnsByRoot.computeIfAbsent(
blockRoot,
__ -> dataColumnSamplingTracker);
if (dataColumnSamplingTracker == trackerInserted) {
makeRoomForNewTracker();
onFirstSeen(slot, blockRoot, dataColumnSamplingTracker);
}
return trackerInserted;| spec.computeStartSlotAtEpoch(recentChainData.getFinalizedEpoch()).increment(); | ||
| recentlySampledColumnsByRoot | ||
| .values() | ||
| .removeIf( |
There was a problem hiding this comment.
if you want to make this cache useful you'd need probably to remove this cleanup, isn't it?

PR Description
Fixed Issue(s)
Part of #10208
Documentation
doc-change-requiredlabel to this PR if updates are required.Changelog
Note
Medium Risk
Touches sync and Fulu data-availability sampling paths, adding new eviction logic and new block-source fallback that could affect block fetching/import under load if limits are mis-tuned or eviction happens too early.
Overview
Adds a bounded, metrics-instrumented implementation of
DasSamplerBasic(DasSamplerBasicImpl) that tracks sampled blocks, exposesgetBlock/containsBlock, and applies soft/hard eviction limits to prevent unbounded tracker growth.Wires the DAS sampler into sync and storage lookup:
RecentBlocksFetchServicenow skips fetching blocks already present in the DAS sampler,BeaconChainControllerprovides aSingleBlockProviderResolverthat falls back between blob-sidecar trackers and the DAS sampler forStorageBackedRecentChainData, andDefaultSyncServiceFactoryplumbsDasSamplerBasicthrough.Introduces a new sync config/CLI knob
--Xp2p-max-recently-sampled-blocks(default128) to tune the sampler’s cache size, and updates unit/reference tests accordingly.Reviewed by Cursor Bugbot for commit 5f5701d. Bugbot is set up for automated code reviews on this repo. Configure here.