Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a249e80
CEP-45: Incremental Repair Blocking Wait for offsets
aparnanaik0522 Jan 17, 2026
9e750e4
end--no-edit
aparnanaik0522 Jan 21, 2026
5f51ed1
Create new test to validate inc repair on ALL replicas
aparnanaik0522 Jan 22, 2026
4aced30
SyncCoordinatorTest file fix
aparnanaik0522 Jan 26, 2026
42b465e
Change shardStates from CHM -> HM
aparnanaik0522 Jan 26, 2026
30cf06f
Fix possible shard staleness
aparnanaik0522 Jan 26, 2026
92ab440
Fix for happens-before
aparnanaik0522 Jan 28, 2026
6d68ef0
Fix MutationTrackingIncrementalRepairTask file
aparnanaik0522 Jan 29, 2026
3ab3b55
Fix IR still doing anti-compaction and TCM consulted multiple times n…
aweisberg Mar 3, 2026
f0f68c3
Collect offsets via message exchange
aweisberg Mar 4, 2026
44a3bd3
Remove extra timeouts, make top level timeout configurable hot prop
aweisberg Mar 4, 2026
3c457fc
Fix timeout calculation and clean up error handling
aweisberg Mar 5, 2026
17095cd
Mutation tracking sync timeout doesn't need to be that long. If it di…
aweisberg Mar 5, 2026
65996e4
Using IncrementalRepairTask directly instead of embedding inside Muta…
aweisberg Mar 5, 2026
f99c9d5
Add support for force and with hosts
aweisberg Mar 9, 2026
4774648
Fix MutationTrackingSyncCoordinatorTest
aweisberg Mar 9, 2026
d591087
Clean up/fix result handling when pairing incremental repair with MT …
aweisberg Mar 9, 2026
42e23c2
Fix java.lang.IllegalStateException: Attempted to create a new keyspa…
aweisberg Mar 12, 2026
65a366e
Add MutationTrackingRepairTest
aweisberg Mar 12, 2026
886e4bd
checkpoint before big mess
aweisberg Mar 12, 2026
2e27603
During migration incremental repair might legitimately need to add ss…
aweisberg Mar 16, 2026
d42314e
During migration SSTableWriter.finalizeMetadata() needs to accept IR …
aweisberg Mar 16, 2026
94cf54b
Can't run MT sync during migration away from mutation tracking becaus…
aweisberg Mar 16, 2026
9cf77b9
RepairCoordinator add logging
aweisberg Mar 16, 2026
adf3638
During mutation tracking migration restrict repairs to being either e…
aweisberg Mar 17, 2026
b4b06b3
Spurious error trying to set failure on promise when it has already b…
aweisberg Mar 17, 2026
3f6f640
Allow read repair mutations to be applied during migration
aweisberg Mar 17, 2026
20c464c
Finish MutationTrackingRepairTest
aweisberg Mar 18, 2026
04c49ba
Final test and bug fixes after rebase
aweisberg Mar 19, 2026
15a98da
A bunch of self review changes/improvements
aweisberg Mar 23, 2026
b4b931d
SSTableWriter.finalizeMetadata just bypass any attempt and check for …
aweisberg Mar 24, 2026
8e9a6a7
Ignore double bounce test https://issues.apache.org/jira/browse/CASSA…
aweisberg Mar 25, 2026
c2c5f2b
No background reconciliation breaks PartialUpdateHandlingTest when it…
aweisberg Mar 26, 2026
76a6aac
Fix flaky testFailedMutationRedelivery
aweisberg Mar 27, 2026
5ac9e2f
Reset filters only after checking data is inconsistent
aweisberg Mar 27, 2026
4ab0f2b
Fix MutationTrackingRepairTest style
aweisberg Apr 1, 2026
c787044
Add MutationTrackingMigrationState.isMigrating
aweisberg Apr 2, 2026
3331b66
SSTableWriter nit to simplify
aweisberg Apr 2, 2026
0d18d4b
MutationTrackingSyncRequest javadoc formatting
aweisberg Apr 2, 2026
2ebab7e
MutationTrackingIncrementalRepairTask Convert empty ranges handling t…
aweisberg Apr 2, 2026
294c024
Clean up error handling in MutationTrackingSyncCoordinator/MutationTr…
aweisberg Apr 6, 2026
aa78ecf
Review feedback reduce duplication
aweisberg Apr 6, 2026
46ab151
Most of MutationTrackingSyncCoordinatorFeedback
aweisberg Apr 6, 2026
04d75e7
Add test case for topology changes causing failure
aweisberg Apr 6, 2026
069a449
More feedback/cleanup for MutationSyncCoordinator
aweisberg Apr 6, 2026
0739dce
Nit assertRangesNotMixedMigration handling of empty columnFamilies
aweisberg Apr 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ public static Set<String> splitCommaDelimited(String src)
@Replaces(oldName = "repair_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true)
public volatile DurationSpec.LongMillisecondsBound repair_request_timeout = new DurationSpec.LongMillisecondsBound("120000ms");

public volatile DurationSpec.LongMillisecondsBound mutation_tracking_sync_timeout = new DurationSpec.LongMillisecondsBound("2m");

public Integer streaming_connections_per_host = 1;
@Replaces(oldName = "streaming_keep_alive_period_in_secs", converter = Converters.SECONDS_DURATION, deprecated = true)
public DurationSpec.IntSecondsBound streaming_keep_alive_period = new DurationSpec.IntSecondsBound("300s");
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2592,6 +2592,16 @@ public static void setRepairRpcTimeout(Long timeOutInMillis)
conf.repair_request_timeout = new DurationSpec.LongMillisecondsBound(timeOutInMillis);
}

public static long getMutationTrackingSyncTimeout(TimeUnit unit)
{
return conf.mutation_tracking_sync_timeout.to(unit);
}

public static void setMutationTrackingSyncTimeout(long timeoutInMillis)
{
conf.mutation_tracking_sync_timeout = new DurationSpec.LongMillisecondsBound(timeoutInMillis);
}

public static boolean hasCrossNodeTimeout()
{
return conf.internode_timeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ else if (message.epoch().isBefore(metadata.schema.lastModified()))
*/
protected ClusterMetadata checkReplicationMigration(ClusterMetadata metadata, Message<T> message, InetAddressAndPort respondTo)
{
// Read repair mutations always bypass mutation tracking and use the untracked
// write path, so skip the replication migration routing check. The isReadRepair
// flag on the mutation hasn't been set yet at this point — it's set later in
// applyMutation() — so we check the handler type instead.
if (this instanceof ReadRepairVerbHandler)
return metadata;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess the other option would be something like a handlesReadRepair() method that only ReadRepairVerbHandler overrides, but it's literally called ReadRepairVerbHandler, and we probably won't have something else handle RR mutations.

In any case, I'm remembering blocking RR is going to be reworked for migration anyway, so ignore me :D


IMutation mutation = message.payload;
MutationRouting expected = mutation.id().isNone() ? MutationRouting.UNTRACKED : MutationRouting.TRACKED;
if (expected == MigrationRouter.getMutationRouting(metadata, mutation))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re
{
group = Keyspace.writeOrder.start();

MigrationRouter.validateUntrackedMutation(mutation);
if (!mutation.isReadRepair())
MigrationRouter.validateUntrackedMutation(mutation);
// write the mutation to the commitlog and memtables
CommitLogPosition position = null;
if (makeDurable)
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ public void apply(final Mutation mutation,
boolean updateIndexes,
boolean isDroppable)
{
if (MigrationRouter.isFullyTracked(mutation))
if (MigrationRouter.isFullyTracked(mutation) && !mutation.isReadRepair())
applyInternalTracked(mutation, null);
else
applyInternal(mutation, makeDurable, updateIndexes, isDroppable, false, null);
Expand All @@ -460,7 +460,7 @@ private Future<?> applyInternal(final Mutation mutation,
boolean isDeferrable,
Promise<?> future)
{
Preconditions.checkState(!getMetadata().useMutationTracking() && mutation.id().isNone());
Preconditions.checkState((!getMetadata().useMutationTracking() || mutation.isReadRepair()) && mutation.id().isNone());

if (TEST_FAIL_WRITES && getMetadata().name.equals(TEST_FAIL_WRITES_KS))
throw new RuntimeException("Testing write failures");
Expand Down
23 changes: 21 additions & 2 deletions src/java/org/apache/cassandra/db/Mutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ public class Mutation implements IMutation, Supplier<Mutation>, Commitable
// because it is being applied by one or in a context where transaction conflicts don't occur
private PotentialTxnConflicts potentialTxnConflicts;

// Transient: not serialized on the wire. Set by ReadRepairVerbHandler on the
// receiving side so downstream code (Keyspace.apply, write handlers) can route
// read repair mutations through the untracked write path during migration.
private transient boolean isReadRepair;

public void setReadRepair(boolean readRepair)
{
this.isReadRepair = readRepair;
}

public boolean isReadRepair()
{
return isReadRepair;
}

public Mutation(MutationId id, PartitionUpdate update)
{
this(id, update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), approxTime.now(), update.metadata().params.cdc, PotentialTxnConflicts.DISALLOW);
Expand Down Expand Up @@ -148,7 +163,9 @@ public MutationId id()
@Override
public Mutation withMutationId(MutationId mutationId)
{
return new Mutation(mutationId, keyspaceName, key, modifications, approxCreatedAtNanos, cdcEnabled, potentialTxnConflicts);
Mutation m = new Mutation(mutationId, keyspaceName, key, modifications, approxCreatedAtNanos, cdcEnabled, potentialTxnConflicts);
m.isReadRepair = this.isReadRepair;
return m;
}

private static boolean cdcEnabled(Iterable<PartitionUpdate> modifications)
Expand Down Expand Up @@ -182,7 +199,9 @@ private static boolean cdcEnabled(Iterable<PartitionUpdate> modifications)

Map<TableId, PartitionUpdate> updates = builder.build();
checkState(!updates.isEmpty(), "Updates should not be empty");
return new Mutation(id, keyspaceName, key, builder.build(), approxCreatedAtNanos, potentialTxnConflicts);
Mutation result = new Mutation(id, keyspaceName, key, builder.build(), approxCreatedAtNanos, potentialTxnConflicts);
result.isReadRepair = this.isReadRepair;
return result;
}

public @Nullable Mutation without(TableId tableId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ReadRepairVerbHandler extends AbstractMutationVerbHandler<Mutation>

public void applyMutation(Mutation mutation)
{
mutation.setReadRepair(true);
mutation.apply();
}

Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/db/lifecycle/Tracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.cassandra.notifications.TablePreScrubNotification;
import org.apache.cassandra.notifications.TruncationNotification;
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
Expand Down Expand Up @@ -272,7 +273,10 @@ public void updateInitialSSTableSize(Iterable<SSTableReader> sstables)

public void addSSTables(Collection<SSTableReader> sstables)
{
Preconditions.checkState(!cfstore.metadata().replicationType().isTracked());
// Tracked tables may legitimately use this path during migration from untracked to tracked,
// when incremental repair streams SSTables that were written before tracking was enabled.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that during migration to tracked, we'd expect these SSTables to have no coordinator log offsets then? Is that worth asserting?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't matter for imports, since the keyspace being currently tracked means we'll avoid this method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we will actually hit this method during migration. The sstables might actually have offsets in them since tracked writes have already started and the incremental repair starts after.

Preconditions.checkState(!cfstore.metadata().replicationType().isTracked()
|| ClusterMetadata.current().mutationTrackingMigrationState.isMigrating(cfstore.metadata().keyspace));
addSSTablesInternal(sstables, false, true, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.TimeOnlyRequestBookkeeping.LatencyRequestBookkeeping;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo;
import org.apache.cassandra.replication.PendingLocalTransfer;
import org.apache.cassandra.streaming.IncomingStream;
import org.apache.cassandra.streaming.StreamReceiver;
Expand Down Expand Up @@ -104,6 +105,19 @@ public CassandraStreamReceiver(ColumnFamilyStore cfs, StreamSession session, Lis
this.requiresWritePath = requiresWritePath(cfs);
}

/**
* Whether this stream should use the tracked transfer path (pending until activation).
* Returns false during mutation tracking migration for ranges that are still pending,
* since migration repair uses the untracked streaming path for those ranges.
*/
private boolean useTrackedTransferPath()
{
if (!cfs.metadata().replicationType().isTracked() || !session.streamOperation().isTrackable())
return false;

return KeyspaceMigrationInfo.shouldUseTrackedTransfers(ClusterMetadata.current(), cfs.getKeyspaceName(), cfs.metadata().id, ranges);
}

public static CassandraStreamReceiver fromReceiver(StreamReceiver receiver)
{
Preconditions.checkArgument(receiver instanceof CassandraStreamReceiver);
Expand Down Expand Up @@ -135,7 +149,7 @@ public synchronized void received(IncomingStream stream)
sstables.addAll(finished);
receivedEntireSSTable = file.isEntireSSTable();

if (cfs.metadata().replicationType().isTracked() && session.streamOperation().isTrackable())
if (useTrackedTransferPath())
{
PendingLocalTransfer transfer = new PendingLocalTransfer(cfs.metadata().id, session.planId(), sstables);
MutationTrackingService.instance().received(transfer);
Expand Down Expand Up @@ -266,7 +280,7 @@ public void finished()
logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers);

// SSTables involved in a coordinated transfer become live when the transfer is activated
if (cfs.metadata().replicationType().isTracked() && session.streamOperation().isTrackable())
if (useTrackedTransferPath())
return;

if (session.streamOperation() == StreamOperation.BOOTSTRAP)
Expand Down
13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/dht/NormalizedRanges.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ public boolean intersects(T token)
return isIn;
}

/**
* Check if any of these ranges intersect with the given range.
*/
public boolean intersects(Range<T> range)
{
for (Range<T> r : this)
{
if (r.intersects(range))
return true;
}
return false;
}

public NormalizedRanges<T> subtract(NormalizedRanges<T> b)
{
if (b.isEmpty())
Expand Down
24 changes: 19 additions & 5 deletions src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
Expand Down Expand Up @@ -337,16 +339,28 @@ public final void abort()

protected Map<MetadataType, MetadataComponent> finalizeMetadata()
{
// Reconciliation should not occur before activation for coordinated transfer streams for tracked keyspaces.
// Reconciliation should not occur before activation for coordinated transfer streams for tracked keyspaces.
boolean reconcile = txn.opType() != OperationType.STREAM;

// During migration, incremental repair handles repair status for ranges still pending migration.
// Only apply mutation tracking reconciliation for ranges NOT in the migration pending set.
// For SSTables whose range falls within pending migration ranges, IR sets pendingRepair/repairedAt.
if (metadata().replicationType().isTracked() && repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE && reconcile)
{
Preconditions.checkState(Objects.equals(pendingRepair, ActiveRepairService.NO_PENDING_REPAIR));
if (MutationTrackingService.instance().isDurablyReconciled(coordinatorLogOffsets))
KeyspaceMigrationInfo migrationInfo = ClusterMetadata.current().mutationTrackingMigrationState.getKeyspaceInfo(metadata().keyspace);
boolean inMigrationPendingRange = migrationInfo != null
&& migrationInfo.isRangeInPendingMigration(metadata().id,
first.getToken(),
last.getToken());

if (!inMigrationPendingRange)
{
repairedAt = Clock.Global.currentTimeMillis();
logger.debug("Marking SSTable {} as reconciled with repairedAt {}", descriptor, repairedAt);
Preconditions.checkState(Objects.equals(pendingRepair, ActiveRepairService.NO_PENDING_REPAIR));
if (MutationTrackingService.instance().isDurablyReconciled(coordinatorLogOffsets))
{
repairedAt = Clock.Global.currentTimeMillis();
logger.debug("Marking SSTable {} as reconciled with repairedAt {}", descriptor, repairedAt);
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/java/org/apache/cassandra/net/Verb.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.apache.cassandra.repair.messages.FinalizeCommit;
import org.apache.cassandra.repair.messages.FinalizePromise;
import org.apache.cassandra.repair.messages.FinalizePropose;
import org.apache.cassandra.repair.messages.MutationTrackingSyncRequest;
import org.apache.cassandra.repair.messages.MutationTrackingSyncResponse;
import org.apache.cassandra.repair.messages.PrepareConsistentRequest;
import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
import org.apache.cassandra.repair.messages.PrepareMessage;
Expand Down Expand Up @@ -287,8 +289,10 @@ public enum Verb
FINALIZE_PROMISE_MSG (111, P1, repairWithBackoffTimeout, ANTI_ENTROPY, () -> FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ),
FINALIZE_COMMIT_MSG (112, P1, repairWithBackoffTimeout, ANTI_ENTROPY, () -> FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ),
FAILED_SESSION_MSG (113, P1, repairWithBackoffTimeout, ANTI_ENTROPY, () -> FailSession.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ),
STATUS_RSP (115, P1, repairTimeout, ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ),
STATUS_REQ (114, P1, repairTimeout, ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ),
STATUS_RSP (115, P1, repairTimeout, ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ),
STATUS_REQ (114, P1, repairTimeout, ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ),
MT_SYNC_RSP (117, P1, repairWithBackoffTimeout, REQUEST_RESPONSE, () -> MutationTrackingSyncResponse.serializer, RESPONSE_HANDLER ),
MT_SYNC_REQ (116, P1, repairWithBackoffTimeout, ANTI_ENTROPY, () -> MutationTrackingSyncRequest.serializer, () -> RepairMessageVerbHandler.instance(), MT_SYNC_RSP ),

REPLICATION_DONE_RSP (82, P0, rpcTimeout, MISC, () -> NoPayload.serializer, RESPONSE_HANDLER ),
REPLICATION_DONE_REQ (22, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP),
Expand Down
Loading