diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index d8da63d03eaa..49067333aebc 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -169,6 +169,8 @@ public static Set splitCommaDelimited(String src) @Replaces(oldName = "repair_request_timeout_in_ms", converter = Converters.MILLIS_DURATION_LONG, deprecated = true) public volatile DurationSpec.LongMillisecondsBound repair_request_timeout = new DurationSpec.LongMillisecondsBound("120000ms"); + public volatile DurationSpec.LongMillisecondsBound mutation_tracking_sync_timeout = new DurationSpec.LongMillisecondsBound("2m"); + public Integer streaming_connections_per_host = 1; @Replaces(oldName = "streaming_keep_alive_period_in_secs", converter = Converters.SECONDS_DURATION, deprecated = true) public DurationSpec.IntSecondsBound streaming_keep_alive_period = new DurationSpec.IntSecondsBound("300s"); diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 1e06a402e0a5..cc93f4532b56 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2592,6 +2592,16 @@ public static void setRepairRpcTimeout(Long timeOutInMillis) conf.repair_request_timeout = new DurationSpec.LongMillisecondsBound(timeOutInMillis); } + public static long getMutationTrackingSyncTimeout(TimeUnit unit) + { + return conf.mutation_tracking_sync_timeout.to(unit); + } + + public static void setMutationTrackingSyncTimeout(long timeoutInMillis) + { + conf.mutation_tracking_sync_timeout = new DurationSpec.LongMillisecondsBound(timeoutInMillis); + } + public static boolean hasCrossNodeTimeout() { return conf.internode_timeout; diff --git a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java index cb7fe9d8c358..e35b4c4dde96 100644 --- a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java @@ -202,6 +202,13 @@ else if (message.epoch().isBefore(metadata.schema.lastModified())) */ protected ClusterMetadata checkReplicationMigration(ClusterMetadata metadata, Message message, InetAddressAndPort respondTo) { + // Read repair mutations always bypass mutation tracking and use the untracked + // write path, so skip the replication migration routing check. The isReadRepair + // flag on the mutation hasn't been set yet at this point — it's set later in + // applyMutation() — so we check the handler type instead. + if (this instanceof ReadRepairVerbHandler) + return metadata; + IMutation mutation = message.payload; MutationRouting expected = mutation.id().isNone() ? MutationRouting.UNTRACKED : MutationRouting.TRACKED; if (expected == MigrationRouter.getMutationRouting(metadata, mutation)) diff --git a/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java b/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java index 9b68acc708e1..0648233ba34a 100644 --- a/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java +++ b/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java @@ -47,7 +47,8 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re { group = Keyspace.writeOrder.start(); - MigrationRouter.validateUntrackedMutation(mutation); + if (!mutation.isReadRepair()) + MigrationRouter.validateUntrackedMutation(mutation); // write the mutation to the commitlog and memtables CommitLogPosition position = null; if (makeDurable) diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 5e5f412a9b46..92e633bb1bca 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -437,7 +437,7 @@ public void apply(final Mutation mutation, boolean updateIndexes, boolean isDroppable) { - if (MigrationRouter.isFullyTracked(mutation)) + if (MigrationRouter.isFullyTracked(mutation) && !mutation.isReadRepair()) applyInternalTracked(mutation, null); else applyInternal(mutation, makeDurable, updateIndexes, isDroppable, false, null); @@ -460,7 +460,7 @@ private Future applyInternal(final Mutation mutation, boolean isDeferrable, Promise future) { - Preconditions.checkState(!getMetadata().useMutationTracking() && mutation.id().isNone()); + Preconditions.checkState((!getMetadata().useMutationTracking() || mutation.isReadRepair()) && mutation.id().isNone()); if (TEST_FAIL_WRITES && getMetadata().name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 1b2850bd58c1..4aa39c19a91d 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -108,6 +108,21 @@ public class Mutation implements IMutation, Supplier, Commitable // because it is being applied by one or in a context where transaction conflicts don't occur private PotentialTxnConflicts potentialTxnConflicts; + // Transient: not serialized on the wire. Set by ReadRepairVerbHandler on the + // receiving side so downstream code (Keyspace.apply, write handlers) can route + // read repair mutations through the untracked write path during migration. + private transient boolean isReadRepair; + + public void setReadRepair(boolean readRepair) + { + this.isReadRepair = readRepair; + } + + public boolean isReadRepair() + { + return isReadRepair; + } + public Mutation(MutationId id, PartitionUpdate update) { this(id, update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update), approxTime.now(), update.metadata().params.cdc, PotentialTxnConflicts.DISALLOW); @@ -148,7 +163,9 @@ public MutationId id() @Override public Mutation withMutationId(MutationId mutationId) { - return new Mutation(mutationId, keyspaceName, key, modifications, approxCreatedAtNanos, cdcEnabled, potentialTxnConflicts); + Mutation m = new Mutation(mutationId, keyspaceName, key, modifications, approxCreatedAtNanos, cdcEnabled, potentialTxnConflicts); + m.isReadRepair = this.isReadRepair; + return m; } private static boolean cdcEnabled(Iterable modifications) @@ -182,7 +199,9 @@ private static boolean cdcEnabled(Iterable modifications) Map updates = builder.build(); checkState(!updates.isEmpty(), "Updates should not be empty"); - return new Mutation(id, keyspaceName, key, builder.build(), approxCreatedAtNanos, potentialTxnConflicts); + Mutation result = new Mutation(id, keyspaceName, key, builder.build(), approxCreatedAtNanos, potentialTxnConflicts); + result.isReadRepair = this.isReadRepair; + return result; } public @Nullable Mutation without(TableId tableId) diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java index d40359c14472..fc2287a96731 100644 --- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java @@ -27,6 +27,7 @@ public class ReadRepairVerbHandler extends AbstractMutationVerbHandler public void applyMutation(Mutation mutation) { + mutation.setReadRepair(true); mutation.apply(); } diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index a5eedea941da..3ac129d7659f 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -63,6 +63,7 @@ import org.apache.cassandra.notifications.TablePreScrubNotification; import org.apache.cassandra.notifications.TruncationNotification; import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.TimeUUID; @@ -272,7 +273,10 @@ public void updateInitialSSTableSize(Iterable sstables) public void addSSTables(Collection sstables) { - Preconditions.checkState(!cfstore.metadata().replicationType().isTracked()); + // Tracked tables may legitimately use this path during migration from untracked to tracked, + // when incremental repair streams SSTables that were written before tracking was enabled. + Preconditions.checkState(!cfstore.metadata().replicationType().isTracked() + || ClusterMetadata.current().mutationTrackingMigrationState.isMigrating(cfstore.metadata().keyspace)); addSSTablesInternal(sstables, false, true, true); } diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index 362bf385c639..f0e43054a5fb 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@ -53,6 +53,7 @@ import org.apache.cassandra.service.accord.IAccordService; import org.apache.cassandra.service.accord.TimeOnlyRequestBookkeeping.LatencyRequestBookkeeping; import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; import org.apache.cassandra.replication.PendingLocalTransfer; import org.apache.cassandra.streaming.IncomingStream; import org.apache.cassandra.streaming.StreamReceiver; @@ -104,6 +105,19 @@ public CassandraStreamReceiver(ColumnFamilyStore cfs, StreamSession session, Lis this.requiresWritePath = requiresWritePath(cfs); } + /** + * Whether this stream should use the tracked transfer path (pending until activation). + * Returns false during mutation tracking migration for ranges that are still pending, + * since migration repair uses the untracked streaming path for those ranges. + */ + private boolean useTrackedTransferPath() + { + if (!cfs.metadata().replicationType().isTracked() || !session.streamOperation().isTrackable()) + return false; + + return KeyspaceMigrationInfo.shouldUseTrackedTransfers(ClusterMetadata.current(), cfs.getKeyspaceName(), cfs.metadata().id, ranges); + } + public static CassandraStreamReceiver fromReceiver(StreamReceiver receiver) { Preconditions.checkArgument(receiver instanceof CassandraStreamReceiver); @@ -135,7 +149,7 @@ public synchronized void received(IncomingStream stream) sstables.addAll(finished); receivedEntireSSTable = file.isEntireSSTable(); - if (cfs.metadata().replicationType().isTracked() && session.streamOperation().isTrackable()) + if (useTrackedTransferPath()) { PendingLocalTransfer transfer = new PendingLocalTransfer(cfs.metadata().id, session.planId(), sstables); MutationTrackingService.instance().received(transfer); @@ -266,7 +280,7 @@ public void finished() logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers); // SSTables involved in a coordinated transfer become live when the transfer is activated - if (cfs.metadata().replicationType().isTracked() && session.streamOperation().isTrackable()) + if (useTrackedTransferPath()) return; if (session.streamOperation() == StreamOperation.BOOTSTRAP) diff --git a/src/java/org/apache/cassandra/dht/NormalizedRanges.java b/src/java/org/apache/cassandra/dht/NormalizedRanges.java index c9a040a6585b..dc91466ba7e8 100644 --- a/src/java/org/apache/cassandra/dht/NormalizedRanges.java +++ b/src/java/org/apache/cassandra/dht/NormalizedRanges.java @@ -134,6 +134,19 @@ public boolean intersects(T token) return isIn; } + /** + * Check if any of these ranges intersect with the given range. + */ + public boolean intersects(Range range) + { + for (Range r : this) + { + if (r.intersects(range)) + return true; + } + return false; + } + public NormalizedRanges subtract(NormalizedRanges b) { if (b.isEmpty()) diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 4c64b0f31c7a..dd833965796a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -62,6 +62,8 @@ import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets; import org.apache.cassandra.replication.MutationTrackingService; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.TimeUUID; @@ -337,16 +339,28 @@ public final void abort() protected Map finalizeMetadata() { - // Reconciliation should not occur before activation for coordinated transfer streams for tracked keyspaces. + // Reconciliation should not occur before activation for coordinated transfer streams for tracked keyspaces. boolean reconcile = txn.opType() != OperationType.STREAM; + // During migration, incremental repair handles repair status for ranges still pending migration. + // Only apply mutation tracking reconciliation for ranges NOT in the migration pending set. + // For SSTables whose range falls within pending migration ranges, IR sets pendingRepair/repairedAt. if (metadata().replicationType().isTracked() && repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE && reconcile) { - Preconditions.checkState(Objects.equals(pendingRepair, ActiveRepairService.NO_PENDING_REPAIR)); - if (MutationTrackingService.instance().isDurablyReconciled(coordinatorLogOffsets)) + KeyspaceMigrationInfo migrationInfo = ClusterMetadata.current().mutationTrackingMigrationState.getKeyspaceInfo(metadata().keyspace); + boolean inMigrationPendingRange = migrationInfo != null + && migrationInfo.isRangeInPendingMigration(metadata().id, + first.getToken(), + last.getToken()); + + if (!inMigrationPendingRange) { - repairedAt = Clock.Global.currentTimeMillis(); - logger.debug("Marking SSTable {} as reconciled with repairedAt {}", descriptor, repairedAt); + Preconditions.checkState(Objects.equals(pendingRepair, ActiveRepairService.NO_PENDING_REPAIR)); + if (MutationTrackingService.instance().isDurablyReconciled(coordinatorLogOffsets)) + { + repairedAt = Clock.Global.currentTimeMillis(); + logger.debug("Marking SSTable {} as reconciled with repairedAt {}", descriptor, repairedAt); + } } } diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 93c0a68e7390..11b1c2fd7ece 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -64,6 +64,8 @@ import org.apache.cassandra.repair.messages.FinalizeCommit; import org.apache.cassandra.repair.messages.FinalizePromise; import org.apache.cassandra.repair.messages.FinalizePropose; +import org.apache.cassandra.repair.messages.MutationTrackingSyncRequest; +import org.apache.cassandra.repair.messages.MutationTrackingSyncResponse; import org.apache.cassandra.repair.messages.PrepareConsistentRequest; import org.apache.cassandra.repair.messages.PrepareConsistentResponse; import org.apache.cassandra.repair.messages.PrepareMessage; @@ -287,8 +289,10 @@ public enum Verb FINALIZE_PROMISE_MSG (111, P1, repairWithBackoffTimeout, ANTI_ENTROPY, () -> FinalizePromise.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ), FINALIZE_COMMIT_MSG (112, P1, repairWithBackoffTimeout, ANTI_ENTROPY, () -> FinalizeCommit.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ), FAILED_SESSION_MSG (113, P1, repairWithBackoffTimeout, ANTI_ENTROPY, () -> FailSession.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ), - STATUS_RSP (115, P1, repairTimeout, ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ), - STATUS_REQ (114, P1, repairTimeout, ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ), + STATUS_RSP (115, P1, repairTimeout, ANTI_ENTROPY, () -> StatusResponse.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ), + STATUS_REQ (114, P1, repairTimeout, ANTI_ENTROPY, () -> StatusRequest.serializer, () -> RepairMessageVerbHandler.instance(), REPAIR_RSP ), + MT_SYNC_RSP (117, P1, repairWithBackoffTimeout, REQUEST_RESPONSE, () -> MutationTrackingSyncResponse.serializer, RESPONSE_HANDLER ), + MT_SYNC_REQ (116, P1, repairWithBackoffTimeout, ANTI_ENTROPY, () -> MutationTrackingSyncRequest.serializer, () -> RepairMessageVerbHandler.instance(), MT_SYNC_RSP ), REPLICATION_DONE_RSP (82, P0, rpcTimeout, MISC, () -> NoPayload.serializer, RESPONSE_HANDLER ), REPLICATION_DONE_REQ (22, P0, rpcTimeout, MISC, () -> NoPayload.serializer, () -> ReplicationDoneVerbHandler.instance, REPLICATION_DONE_RSP), diff --git a/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java new file mode 100644 index 000000000000..8e75ca1f2575 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + +import static com.google.common.base.Preconditions.checkState; + +/** Repair task that syncs mutation tracking offsets across replicas */ +public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask +{ + + private final TimeUUID parentSession; + private final RepairCoordinator.NeighborsAndRanges neighborsAndRanges; + private final String[] cfnames; + private final ClusterMetadata metadata; + + protected MutationTrackingIncrementalRepairTask(RepairCoordinator coordinator, + TimeUUID parentSession, + RepairCoordinator.NeighborsAndRanges neighborsAndRanges, + String[] cfnames) + { + super(coordinator); + this.parentSession = parentSession; + this.neighborsAndRanges = neighborsAndRanges; + this.cfnames = cfnames; + this.metadata = coordinator.metadata; + } + + @Override + public String name() + { + return "MutationTrackingRepair"; + } + + @Override + public Future performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) + { + List allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); + checkState(!allRanges.isEmpty(), "No ranges to repair"); + + List syncCoordinators = new ArrayList<>(); + List>> rangeCollections = new ArrayList<>(); + + for (CommonRange commonRange : allRanges) + { + for (Range range : commonRange.ranges) + { + RepairJobDesc desc = new RepairJobDesc(parentSession, TimeUUID.Generator.nextTimeUUID(), + keyspace, "Mutation Tracking Sync", List.of(range)); + MutationTrackingSyncCoordinator syncCoordinator = + new MutationTrackingSyncCoordinator(coordinator.ctx, desc, commonRange.endpoints, metadata); + syncCoordinator.start(); + syncCoordinators.add(syncCoordinator); + rangeCollections.add(List.of(range)); + + logger.info("Started mutation tracking sync for range {}", range); + } + } + + coordinator.notifyProgress("Started mutation tracking sync for " + syncCoordinators.size() + " ranges"); + + AsyncPromise resultPromise = new AsyncPromise<>(); + + executor.execute(() -> { + try + { + waitForSyncCompletion(syncCoordinators, rangeCollections, resultPromise); + } + catch (InterruptedException e) + { + try + { + resultPromise.tryFailure(new RuntimeException("Interrupted waiting for Mutation Tracking sync coordinators to finish", e)); + } + finally + { + Thread.currentThread().interrupt(); + } + } + catch (Exception e) + { + logger.error("Error during mutation tracking repair", e); + resultPromise.tryFailure(e); + } + }); + + return resultPromise; + } + + private void waitForSyncCompletion(List syncCoordinators, + List>> rangeCollections, + AsyncPromise resultPromise) throws Exception + { + long deadlineNanos = coordinator.ctx.clock().nanoTime() + TimeUnit.MILLISECONDS.toNanos( + DatabaseDescriptor.getMutationTrackingSyncTimeout(TimeUnit.MILLISECONDS)); + Exception error = null; + for (MutationTrackingSyncCoordinator syncCoordinator : syncCoordinators) + { + long remainingNanos = deadlineNanos - coordinator.ctx.clock().nanoTime(); + try + { + syncCoordinator.awaitCompletion(remainingNanos, TimeUnit.NANOSECONDS); + } + catch (InterruptedException e) + { + try + { + syncCoordinators.forEach(MutationTrackingSyncCoordinator::cancel); + } + finally + { + throw e; + } + } + catch (TimeoutException e) + { + error = Throwables.merge(error, new RuntimeException("Mutation tracking sync timed out", e)); + } + catch (Exception e) + { + error = Throwables.merge(error, e); + } + } + + if (error != null) + { + logger.warn("Mutation tracking sync failed for keyspace {}", keyspace, error); + throw error; + } + + coordinator.notifyProgress("Mutation tracking sync completed for all ranges"); + + List results = new ArrayList<>(); + for (int i = 0; i < rangeCollections.size(); i++) + { + Collection> ranges = rangeCollections.get(i); + results.add(new RepairSessionResult(parentSession, keyspace, ranges, List.of(), false)); + } + resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, results)); + } + + /** + * Determines if this keyspace should use mutation tracking incremental repair. + * Returns true if: + * - Keyspace uses mutation tracking replication, OR + * - Keyspace is currently migrating (either direction) + * + * @param metadata the snapshotted cluster metadata to evaluate against + * @param keyspace the keyspace name to check + */ + public static boolean shouldUseMutationTrackingRepair(ClusterMetadata metadata, String keyspace) + { + KeyspaceMetadata ksm = metadata.schema.maybeGetKeyspaceMetadata(keyspace).orElse(null); + if (ksm == null) + return false; + + // Check if keyspace uses mutation tracking + if (ksm.useMutationTracking()) + return true; + + // For tracked→untracked migration (keyspace is currently untracked but migration is in progress), + // use regular incremental repair instead of MT repair. The MT sync step can't complete for this + // direction because streaming doesn't update mutation tracking offsets, and the keyspace is moving + // away from tracking. Regular incremental repair will sync the data and the RepairJob callback + // handler will still advance the migration state. + // TODO (desired): This is an over simplification in that depending on which ranges are migrated we might be able to just run MT sync but running IR should also be fine + return false; + } + + /** + * Determines if a mutation tracking migration is in progress for this keyspace. + * Returns true during migration: + * - Migrating TO mutation tracking: need traditional repair to sync pre-migration data + * - Migrating FROM mutation tracking: need traditional repair for post-migration consistency + * + * @param metadata the snapshotted cluster metadata to evaluate against + * @param keyspace the keyspace name to check + */ + public static boolean isMutationTrackingMigrationInProgress(ClusterMetadata metadata, String keyspace) + { + return metadata.mutationTrackingMigrationState.isMigrating(keyspace); + } +} diff --git a/src/java/org/apache/cassandra/repair/RepairCoordinator.java b/src/java/org/apache/cassandra/repair/RepairCoordinator.java index b511d081c984..3035d2a913f1 100644 --- a/src/java/org/apache/cassandra/repair/RepairCoordinator.java +++ b/src/java/org/apache/cassandra/repair/RepairCoordinator.java @@ -53,6 +53,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.NormalizedRanges; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RepairException; @@ -67,12 +68,14 @@ import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.repair.state.CoordinatorState; import org.apache.cassandra.repair.state.ParticipateState; +import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SystemDistributedKeyspace; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; @@ -86,6 +89,7 @@ import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventNotifier; @@ -113,20 +117,70 @@ public class RepairCoordinator implements Runnable, ProgressEventNotifier, Repai final SharedContext ctx; final Scheduler validationScheduler; + // Mutation tracking decision, snapshotted once at creation time from TCM + final boolean useMutationTracking; + final boolean mutationTrackingMigrationInProgress; + final ClusterMetadata metadata; + private TraceState traceState; - public RepairCoordinator(StorageService storageService, int cmd, RepairOption options, String keyspace, Epoch minEpoch) + /** + * Creates a RepairCoordinator, snapshotting TCM state to decide whether mutation tracking + * should be used. If mutation tracking is active for the keyspace (and no migration is in progress), + * the incremental flag in RepairOption is flipped to false to prevent anti-compaction. + */ + public static RepairCoordinator create(StorageService storageService, int cmd, RepairOption options, String keyspace, Epoch minEpoch) { - this(SharedContext.Global.instance, - (ks, tables) -> storageService.getValidColumnFamilies(false, false, ks, tables), - storageService::getLocalReplicas, - cmd, options, keyspace, minEpoch); + ClusterMetadata metadata = ClusterMetadata.current(); + boolean useMT = options.isIncremental() + && MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(metadata, keyspace); + boolean mtMigration = useMT + && MutationTrackingIncrementalRepairTask.isMutationTrackingMigrationInProgress(metadata, keyspace); + + // If using mutation tracking without migration, flip incremental to false + // to prevent anti-compaction since mutation tracking manages marking tables repaired itself + if (useMT && !mtMigration) + { + logger.info("Keyspace {} uses mutation tracking; disabling incremental repair to skip anti-compaction", keyspace); + options = options.withIncremental(false); + } + + // During migration, validate that repair ranges don't partially overlap with pending migration ranges. + // Ranges must be entirely inside or entirely outside the pending set so that a compatible repair + // behavior (MT vs normal IR) can be selected. + if (mtMigration) + { + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); + if (migrationInfo != null) + { + NormalizedRanges repairRanges = NormalizedRanges.normalizedRanges(options.getRanges()); + KeyspaceMetadata ksm = metadata.schema.getKeyspaceMetadata(keyspace); + Collection cfs = options.getColumnFamilies(); + migrationInfo.assertRangesNotMixedMigration(repairRanges, ksm, cfs.isEmpty() ? null : cfs); + } + } + + return new RepairCoordinator(SharedContext.Global.instance, + (ks, tables) -> storageService.getValidColumnFamilies(false, false, ks, tables), + storageService::getLocalReplicas, + cmd, options, keyspace, minEpoch, + useMT, mtMigration, metadata); } RepairCoordinator(SharedContext ctx, BiFunction> validColumnFamilies, Function getLocalReplicas, int cmd, RepairOption options, String keyspace, Epoch minEpoch) + { + this(ctx, validColumnFamilies, getLocalReplicas, cmd, options, keyspace, minEpoch, false, false, null); + } + + RepairCoordinator(SharedContext ctx, + BiFunction> validColumnFamilies, + Function getLocalReplicas, + int cmd, RepairOption options, String keyspace, Epoch minEpoch, + boolean useMutationTracking, boolean mutationTrackingMigrationInProgress, + ClusterMetadata metadata) { this.ctx = ctx; this.minEpoch = minEpoch; @@ -135,6 +189,9 @@ public RepairCoordinator(StorageService storageService, int cmd, RepairOption op this.tag = "repair:" + cmd; this.validColumnFamilies = validColumnFamilies; this.getLocalReplicas = getLocalReplicas; + this.useMutationTracking = useMutationTracking; + this.mutationTrackingMigrationInProgress = mutationTrackingMigrationInProgress; + this.metadata = metadata; ctx.repair().register(state); } @@ -496,24 +553,57 @@ private Future prepare(List columnFamilies, Set>> repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges) { - RepairTask task; + ExecutorPlus executor = createExecutor(); + state.phase.repairSubmitted(); + if (state.options.isPreview()) { - task = new PreviewRepairTask(this, state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), neighborsAndRanges.shouldExcludeDeadParticipants, cfnames); + RepairTask task = new PreviewRepairTask(this, state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), neighborsAndRanges.shouldExcludeDeadParticipants, cfnames); + return submitRepairTask(task, executor); + } + else if (useMutationTracking) + { + RepairTask mtTask = new MutationTrackingIncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + if (mutationTrackingMigrationInProgress) + { + // During migration, run incremental repair first, then mutation tracking sync. + // Propagate the IR result on success since it drives migration advancement. + RepairTask incrementalTask = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + AsyncPromise>> result = new AsyncPromise<>(); + logger.info("Migration to mutation tracking in progress for {}; running incremental repair before MT sync", state.keyspace); + incrementalTask.perform(executor, validationScheduler).addCallback( + irResult -> { + logger.info("Incremental repair completed for migration keyspace {}: hasFailed={}", state.keyspace, irResult.hasFailed()); + Pair> irPair = Pair.create(irResult, incrementalTask::successMessage); + mtTask.perform(executor, validationScheduler) + .addCallback( + mtResult -> result.trySuccess(irPair), + result::tryFailure); + }, + failure -> { + logger.warn("Incremental repair FAILED for migration keyspace {}", state.keyspace, failure); + result.tryFailure(failure); + } + ); + return result.addCallback((s, f) -> executor.shutdown()); + } + return submitRepairTask(mtTask, executor); } else if (state.options.isIncremental()) { - task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + RepairTask task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + return submitRepairTask(task, executor); } else { - task = new NormalRepairTask(this, state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), neighborsAndRanges.shouldExcludeDeadParticipants, cfnames); + RepairTask task = new NormalRepairTask(this, state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), neighborsAndRanges.shouldExcludeDeadParticipants, cfnames); + return submitRepairTask(task, executor); } + } - ExecutorPlus executor = createExecutor(); - state.phase.repairSubmitted(); + private Future>> submitRepairTask(RepairTask task, ExecutorPlus executor) + { return task.perform(executor, validationScheduler) - // after adding the callback java could no longer infer the type... .>>map(r -> Pair.create(r, task::successMessage)) .addCallback((s, f) -> executor.shutdown()); } diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 0a0411a9824c..96ed70442efe 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -57,6 +57,7 @@ import org.apache.cassandra.service.accord.repair.AccordRepair; import org.apache.cassandra.service.accord.repair.AccordRepair.AccordRepairResult; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; import org.apache.cassandra.service.replication.migration.MutationTrackingMigrationRepairResult; import org.apache.cassandra.service.paxos.cleanup.PaxosCleanup; import org.apache.cassandra.service.paxos.cleanup.PaxosUpdateLowBallot; @@ -261,7 +262,7 @@ protected void runRepair() .flatMap(this::executeTasks, taskExecutor); // For tracked keyspaces, we need to ensure sync'd data is present in the log - boolean isTracked = cfs.metadata().replicationType().isTracked(); + boolean isTracked = useTrackedTransfers(); if (isTracked) syncResults = TransferTrackingService.instance().onRepairSyncCompletion(this, syncResults, taskExecutor); } @@ -347,7 +348,7 @@ private Future createSyncTasks(Future accordRepai else syncTasks = createStandardSyncTasks(trees); - return ks.getMetadata().params.replicationType.isTracked() + return useTrackedTransfers() ? SyncTasks.tracked(ks, syncTasks) : SyncTasks.untracked(syncTasks); }, taskExecutor); @@ -369,6 +370,23 @@ private boolean isMetadataKeyspace() return desc.keyspace.equals(METADATA_KEYSPACE_NAME); } + /** + * Whether tracked repair transfers should be used for this repair job. + * Returns true only when the keyspace uses tracked replication AND the repair ranges + * for this table have completed migration (or no migration is in progress). + * During migration, ranges still in the pending set use the traditional untracked + * streaming path because: + * - The data being streamed is pre-migration data without mutation tracking offsets + * - TrackedRepairTransfer does not support --force (dead node exclusion) + */ + private boolean useTrackedTransfers() + { + if (!cfs.metadata().replicationType().isTracked()) + return false; + + return KeyspaceMigrationInfo.shouldUseTrackedTransfers(ClusterMetadata.current(), desc.keyspace, cfs.metadata().id, desc.ranges); + } + private boolean isTransient(InetAddressAndPort ep) { return session.state.commonRange.transEndpoints.contains(ep); @@ -465,7 +483,7 @@ Future> executeTasks(SyncTasks tasks) if (!tasks.isEmpty()) state.phase.streamSubmitted(); - if (cfs.metadata().replicationType().isTracked()) + if (useTrackedTransfers()) TransferTrackingService.instance().onRepairSyncExecution(tasks); for (SyncTask task : tasks) diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index a0ed59d97913..effdaf4655ef 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; @@ -29,11 +30,19 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; + +import org.apache.cassandra.replication.CoordinatorLogId; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.Offsets; import org.apache.cassandra.repair.messages.CleanupMessage; import org.apache.cassandra.repair.messages.FailSession; +import org.apache.cassandra.repair.messages.MutationTrackingSyncRequest; +import org.apache.cassandra.repair.messages.MutationTrackingSyncResponse; import org.apache.cassandra.repair.messages.PrepareMessage; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.StatusRequest; @@ -381,6 +390,10 @@ public void doVerb(final Message message) ctx.repair().consistent.local.handleStatusResponse(message.from(), (StatusResponse) message.payload); break; + case MT_SYNC_REQ: + handleMutationTrackingSyncRequest(message); + break; + default: ctx.repair().handleMessage(message); break; @@ -489,4 +502,29 @@ private static boolean acceptMessage(final ValidationRequest validationRequest, "validation request", from); } + + @SuppressWarnings("unchecked") + private void handleMutationTrackingSyncRequest(Message message) + { + MutationTrackingSyncRequest request = (MutationTrackingSyncRequest) message.payload; + RepairJobDesc desc = request.desc; + logger.debug("Handling mutation tracking sync request {}", desc); + + try + { + Map, Map> offsets = + MutationTrackingService.instance().collectWitnessedOffsetsForRanges(desc.keyspace, desc.ranges, request.liveHostIds); + + MutationTrackingSyncResponse response = new MutationTrackingSyncResponse( + desc, + offsets); + + ctx.messaging().send(message.responseWith(response), message.from()); + } + catch (Exception e) + { + logger.error("Failed to handle mutation tracking sync request {}", desc, e); + sendFailureResponse(message); + } + } } diff --git a/src/java/org/apache/cassandra/repair/RepairResult.java b/src/java/org/apache/cassandra/repair/RepairResult.java index d3a8a71d40f0..ce5964dc1ee7 100644 --- a/src/java/org/apache/cassandra/repair/RepairResult.java +++ b/src/java/org/apache/cassandra/repair/RepairResult.java @@ -22,6 +22,8 @@ import org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult; import org.apache.cassandra.service.replication.migration.MutationTrackingMigrationRepairResult; +import javax.annotation.Nullable; + /** * RepairJob's result */ @@ -29,7 +31,9 @@ public class RepairResult { public final RepairJobDesc desc; public final List stats; + @Nullable public final ConsensusMigrationRepairResult consensusMigrationRepairResult; + @Nullable public final MutationTrackingMigrationRepairResult mutationTrackingMigrationRepairResult; public RepairResult(RepairJobDesc desc, List stats, ConsensusMigrationRepairResult consensusMigrationRepairResult, MutationTrackingMigrationRepairResult mutationTrackingMigrationRepairResult) diff --git a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java index a5c4fdc2c16f..e89a3a40d88e 100644 --- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java +++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepairState.java @@ -98,8 +98,8 @@ protected AutoRepairState(RepairType repairType) protected RepairCoordinator getRepairRunnable(String keyspace, RepairOption options) { - return new RepairCoordinator(StorageService.instance, StorageService.nextRepairCommand.incrementAndGet(), - options, keyspace, ClusterMetadata.current().epoch); + return RepairCoordinator.create(StorageService.instance, StorageService.nextRepairCommand.incrementAndGet(), + options, keyspace, ClusterMetadata.current().epoch); } public long getLastRepairTime() diff --git a/src/java/org/apache/cassandra/repair/messages/MutationTrackingSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/MutationTrackingSyncRequest.java new file mode 100644 index 000000000000..b6c2bca86bcb --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/MutationTrackingSyncRequest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.util.Set; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.repair.RepairJobDesc; + +import static org.apache.cassandra.utils.CollectionSerializers.nullableIntSetSerializer; + +/** + * Request sent from the mutation tracking repair coordinator to each participant to collect + * their current witnessed offsets. This establishes a happens-before relationship: the + * participant's response contains offsets captured after receiving this request, which is + * sent after the repair starts. + *

+ * The liveHostIds set tells the responder which hosts are participating in this repair, + * so that the response only includes offsets witnessed by those hosts. This prevents the + * coordinator from setting sync targets that include offsets only known to down nodes. + */ +public class MutationTrackingSyncRequest extends RepairMessage +{ + /** The set of host IDs participating in this repair. Null means all replicas. */ + public final Set liveHostIds; + + public MutationTrackingSyncRequest(RepairJobDesc desc, Set liveHostIds) + { + super(desc); + this.liveHostIds = liveHostIds; + } + + @Override + public String toString() + { + return "MutationTrackingSyncRequest{" + + "desc=" + desc + + ", liveHostIds=" + liveHostIds + + '}'; + } + + public static final IVersionedSerializer serializer = new IVersionedSerializer<>() + { + public void serialize(MutationTrackingSyncRequest request, DataOutputPlus out, int version) throws IOException + { + RepairJobDesc.serializer.serialize(request.desc, out, version); + nullableIntSetSerializer.serialize(request.liveHostIds, out); + } + + public MutationTrackingSyncRequest deserialize(DataInputPlus in, int version) throws IOException + { + RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); + Set liveHostIds = nullableIntSetSerializer.deserialize(in); + return new MutationTrackingSyncRequest(desc, liveHostIds); + } + + public long serializedSize(MutationTrackingSyncRequest request, int version) + { + return RepairJobDesc.serializer.serializedSize(request.desc, version) + + nullableIntSetSerializer.serializedSize(request.liveHostIds); + } + }; +} diff --git a/src/java/org/apache/cassandra/repair/messages/MutationTrackingSyncResponse.java b/src/java/org/apache/cassandra/repair/messages/MutationTrackingSyncResponse.java new file mode 100644 index 000000000000..d0cc9b277e85 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/MutationTrackingSyncResponse.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.replication.CoordinatorLogId; +import org.apache.cassandra.replication.Offsets; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.utils.CollectionSerializers; + +/** + * Response from a participant to a {@link MutationTrackingSyncRequest}. Contains the + * participant's current witnessed offsets for each shard overlapping the requested ranges. + * These offsets are captured after the request is received, establishing a happens-before + * relationship with the repair start. + */ +public class MutationTrackingSyncResponse extends RepairMessage +{ + /** Per-shard witnessed offsets: shard range -> (logId -> offsets) */ + public final Map, Map> offsetsByShard; + + public MutationTrackingSyncResponse(RepairJobDesc desc, + Map, Map> offsetsByShard) + { + super(desc); + Objects.requireNonNull(offsetsByShard); + this.offsetsByShard = offsetsByShard; + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof MutationTrackingSyncResponse)) + return false; + MutationTrackingSyncResponse other = (MutationTrackingSyncResponse) o; + return Objects.equals(desc, other.desc) + && Objects.equals(offsetsByShard, other.offsetsByShard); + } + + @Override + public int hashCode() + { + return Objects.hash(desc, offsetsByShard); + } + + @Override + public String toString() + { + return "MutationTrackingSyncResponse{" + + "desc=" + desc + + ", shardCount=" + offsetsByShard.size() + + '}'; + } + + private static final IVersionedSerializer> offsetsMapSerializer = + CollectionSerializers.newMapSerializer(CoordinatorLogId.serializer, Offsets.serializer); + + @SuppressWarnings("unchecked") + public static final IVersionedSerializer serializer = new IVersionedSerializer<>() + { + public void serialize(MutationTrackingSyncResponse response, DataOutputPlus out, int version) throws IOException + { + RepairJobDesc.serializer.serialize(response.desc, out, version); + CollectionSerializers.serializeMap((Map, Map>) (Map) response.offsetsByShard, + out, version, Range.tokenSerializer, offsetsMapSerializer); + } + + public MutationTrackingSyncResponse deserialize(DataInputPlus in, int version) throws IOException + { + RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); + Map, Map> raw = + CollectionSerializers.deserializeMap(in, version, Range.tokenSerializer, offsetsMapSerializer); + Map, Map> offsetsByShard = + (Map, Map>) (Map) raw; + return new MutationTrackingSyncResponse(desc, offsetsByShard); + } + + public long serializedSize(MutationTrackingSyncResponse response, int version) + { + long size = RepairJobDesc.serializer.serializedSize(response.desc, version); + size += CollectionSerializers.serializedMapSize((Map, Map>) (Map) response.offsetsByShard, + version, Range.tokenSerializer, offsetsMapSerializer); + return size; + } + }; +} diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index c615bf9f52f3..59dbb9ba5bd5 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -93,6 +93,8 @@ public void onFailure(InetAddressAndPort from, RequestFailure failure) map.put(Verb.FINALIZE_PROMISE_MSG, SUPPORTS_RETRY); map.put(Verb.FINALIZE_COMMIT_MSG, SUPPORTS_RETRY); map.put(Verb.FAILED_SESSION_MSG, SUPPORTS_RETRY); + // Mutation tracking messages + map.put(Verb.MT_SYNC_REQ, SUPPORTS_RETRY); VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map); EnumSet allowsRetry = EnumSet.noneOf(Verb.class); @@ -110,6 +112,8 @@ public void onFailure(InetAddressAndPort from, RequestFailure failure) allowsRetry.add(Verb.FINALIZE_PROMISE_MSG); allowsRetry.add(Verb.FINALIZE_COMMIT_MSG); allowsRetry.add(Verb.FAILED_SESSION_MSG); + // Mutation tracking messages + allowsRetry.add(Verb.MT_SYNC_REQ); ALLOWS_RETRY = Collections.unmodifiableSet(allowsRetry); } diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index 3c8a260164d1..fa77d3ce7fcb 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -494,6 +494,25 @@ public String toString() ')'; } + /** + * Returns a new RepairOption with the incremental flag set to the given value. + * All other options are copied from this instance. + */ + public RepairOption withIncremental(boolean incremental) + { + if (this.incremental == incremental) + return this; + + RepairOption copy = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, + ranges, pullRepair, forceRepair, previewKind, optimiseStreams, + ignoreUnreplicatedKeyspaces, repairData, repairPaxos, + dontPurgeTombstones, repairAccord); + copy.getColumnFamilies().addAll(columnFamilies); + copy.getDataCenters().addAll(dataCenters); + copy.getHosts().addAll(hosts); + return copy; + } + public Map asMap() { Map options = new HashMap<>(); diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 7db405929601..4ca4949620b1 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -284,6 +284,76 @@ Offsets.Immutable collectReconciledOffsets() } } + /** + * Returns the UNION of all witnessed offsets from all participants. + * This represents all offsets that ANY replica has witnessed. + */ + Offsets.Immutable collectUnionOfWitnessedOffsets() + { + lock.readLock().lock(); + try + { + return Offsets.Immutable.copy(witnessedOffsets.union()); + } + finally + { + lock.readLock().unlock(); + } + } + + /** + * Returns the UNION of witnessed offsets scoped to only the specified host IDs. + */ + Offsets.Immutable collectUnionOfWitnessedOffsets(Set liveHostIds) + { + Offsets.Mutable union = new Offsets.Mutable(logId); + lock.readLock().lock(); + try + { + for (int hostId : liveHostIds) + { + if (!participants.contains(hostId)) + continue; + + Offsets.Mutable nodeOffsets = witnessedOffsets.get(hostId); + union.addAll(nodeOffsets); + } + } + finally + { + lock.readLock().unlock(); + } + return Offsets.Immutable.copy(union); + } + + /** + * Returns the intersection of witnessed offsets scoped to only the specified host IDs. + */ + Offsets.Immutable collectReconciledOffsets(Set liveHostIds) + { + lock.readLock().lock(); + try + { + Offsets.Mutable intersection = null; + for (int hostId : liveHostIds) + { + if (!participants.contains(hostId)) + continue; + + Offsets.Mutable nodeOffsets = witnessedOffsets.get(hostId); + if (intersection == null) + intersection = Offsets.Mutable.copy(nodeOffsets); + else + intersection = Offsets.Mutable.intersection(intersection, nodeOffsets); + } + return intersection == null ? new Offsets.Immutable(logId) : Offsets.Immutable.copy(intersection); + } + finally + { + lock.readLock().unlock(); + } + } + public long getUnreconciledCount() { lock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index e890b9c0345b..372a09806a64 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -192,6 +192,8 @@ public static void shutdown() throws InterruptedException private final IncomingMutations incomingMutations = new IncomingMutations(); private final OutgoingMutations outgoingMutations = new OutgoingMutations(); + private final Map> syncCoordinatorsByKeyspace = new ConcurrentHashMap<>(); + private volatile boolean started = false; private MutationTrackingService() @@ -400,6 +402,19 @@ public void updateReplicatedOffsets(String keyspace, Range range, List coordinators = syncCoordinatorsByKeyspace.get(keyspace); + if (coordinators != null) + { + for (MutationTrackingSyncCoordinator coordinator : coordinators) + { + if (range.intersects(coordinator.getRange())) + { + coordinator.onOffsetsReceived(); + } + } + } } public void recordFullyReconciledOffsets(ReconciledLogSnapshot reconciledSnapshot) @@ -457,6 +472,30 @@ public boolean registerMutationCallback(ShortMutationId mutationId, IncomingMuta return incomingMutations.subscribe(mutationId, callback); } + /** + * Register a sync coordinator to be notified when offset updates arrive. + */ + public void registerSyncCoordinator(MutationTrackingSyncCoordinator coordinator) + { + syncCoordinatorsByKeyspace.computeIfAbsent(coordinator.getKeyspace(), k -> ConcurrentHashMap.newKeySet()) + .add(coordinator); + } + + /** + * Unregister a sync coordinator. + */ + public void unregisterSyncCoordinator(MutationTrackingSyncCoordinator coordinator) + { + Set coordinators = syncCoordinatorsByKeyspace.get(coordinator.getKeyspace()); + if (coordinators != null) + { + coordinators.remove(coordinator); + + if (coordinators.isEmpty()) + syncCoordinatorsByKeyspace.remove(coordinator.getKeyspace(), coordinators); + } + } + public void executeTransfers(String keyspace, Set sstables, ConsistencyLevel cl) { shardLock.readLock().lock(); @@ -614,6 +653,58 @@ public Iterable getShards() return shards; } + public void forEachShardInKeyspace(String keyspace, Consumer consumer) + { + shardLock.readLock().lock(); + try + { + KeyspaceShards ksShards = keyspaceShards.get(keyspace); + if (ksShards != null) + ksShards.forEachShard(consumer); + } + finally + { + shardLock.readLock().unlock(); + } + } + + /** + * Collects the union of witnessed offsets for all shards in the given keyspace that overlap + * with the specified ranges. Used by the mutation tracking repair protocol to establish + * a happens-before relationship. + * + * @param keyspace the keyspace to collect offsets for + * @param ranges the token ranges to find overlapping shards for + * @return a map from shard range to the union of witnessed offsets per coordinator log + */ + public Map, Map> collectWitnessedOffsetsForRanges(String keyspace, Collection> ranges, Set liveHostIds) + { + Map, Map> result = new HashMap<>(); + shardLock.readLock().lock(); + try + { + KeyspaceShards ksShards = keyspaceShards.get(keyspace); + if (ksShards != null) + { + ksShards.forEachShard(shard -> { + for (Range range : ranges) + { + if (shard.range.intersects(range)) + { + result.put(shard.range, shard.collectUnionOfWitnessedOffsetsPerLog(liveHostIds)); + break; + } + } + }); + } + } + finally + { + shardLock.readLock().unlock(); + } + return result; + } + public void collectLocallyMissingMutations(MutationSummary remoteSummary, Log2OffsetsMap.Mutable into) { shardLock.readLock().lock(); @@ -983,7 +1074,7 @@ static UpdateDecision decisionForTopologyChange(String keyspace, ClusterMetadata } if (prevKsm == null) - return nextKsm.useMutationTracking() ? UpdateDecision.CREATE : UpdateDecision.NONE; + return nextKsm.useMutationTracking() ? (hasExisting ? UpdateDecision.REPLICA_GROUP : UpdateDecision.CREATE) : UpdateDecision.NONE; if (nextKsm == null) return prevKsm.useMutationTracking() ? UpdateDecision.DROP : UpdateDecision.NONE; @@ -1058,7 +1149,7 @@ private static Set> splitRange(Range range) static KeyspaceShards make(KeyspaceMetadata keyspace, ClusterMetadata cluster, LongSupplier logIdProvider, BiConsumer onNewLog) { - Preconditions.checkArgument(keyspace.params.replicationType.isTracked() || cluster.mutationTrackingMigrationState.getKeyspaceInfo(keyspace.name) != null); + Preconditions.checkArgument(keyspace.params.replicationType.isTracked() || cluster.mutationTrackingMigrationState.isMigrating(keyspace.name)); Map, Shard> shards = new HashMap<>(); Map, VersionedEndpoints.ForRange> groups = new HashMap<>(); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java new file mode 100644 index 000000000000..94bf30da0d6b --- /dev/null +++ b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.repair.SharedContext; +import org.apache.cassandra.repair.messages.MutationTrackingSyncRequest; +import org.apache.cassandra.repair.messages.MutationTrackingSyncResponse; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.concurrent.AsyncPromise; + +import static com.google.common.base.Preconditions.checkState; + +public class MutationTrackingSyncCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class); + + private final SharedContext ctx; + private final RepairJobDesc desc; + private final String keyspace; + private final Range range; + private final Set participants; + private final AsyncPromise completionFuture = new AsyncPromise<>(); + + // Per-shard state: tracks what each node has reported for that shard + private final Map, ShardSyncState> shardStates = new HashMap<>(); + + // Host IDs of participants for scoped offset collection/completion. + // Null means all shard participants (no filtering). + private final Set liveHostIds; + + private final AtomicBoolean started = new AtomicBoolean(false); + + // Remote participants we are waiting for sync responses from. Completion is + // not possible until all responses have been received, since remote nodes may + // report targets that the local node doesn't know about yet. + private final Set pendingSyncResponses = ConcurrentHashMap.newKeySet(); + + /** + * @param ctx shared context + * @param desc repair job descriptor + * @param participants the set of remote endpoints that should participate in this sync, + * as determined by the repair options (force, specific hosts). + * Only these endpoints will receive sync requests. If null, + * all remote replicas for overlapping shards will participate. + * @param metadata the snapshotted cluster metadata used to resolve endpoint-to-host-ID mappings + */ + public MutationTrackingSyncCoordinator(SharedContext ctx, RepairJobDesc desc, Set participants, ClusterMetadata metadata) + { + this.ctx = ctx; + this.desc = desc; + this.keyspace = desc.keyspace; + this.range = Iterables.getOnlyElement(desc.ranges); + this.participants = participants; + + // Convert participant endpoints to host IDs for scoped completion checks. + // If participants is null (no filtering), all shard participants are live. + if (participants != null) + { + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (InetAddressAndPort ep : participants) + { + builder.add(metadata.directory.peerId(ep).id()); + } + // Always include the local node + builder.add(metadata.directory.peerId(ctx.broadcastAddressAndPort()).id()); + liveHostIds = builder.build(); + } + else + { + liveHostIds = null; + } + } + + public void start() + { + if (!started.compareAndSet(false, true)) + throw new IllegalStateException("Sync coordinator already started"); + + List overlappingShards = new ArrayList<>(); + MutationTrackingService.instance().forEachShardInKeyspace(keyspace, shard -> { + if (shard.range.intersects(range)) + overlappingShards.add(shard); + }); + + checkState(!overlappingShards.isEmpty(), "No intersecting shards found for keyspace {} range {}", keyspace, range); + + for (Shard shard : overlappingShards) + { + ShardSyncState state = new ShardSyncState(shard, liveHostIds); + shardStates.put(shard.range, state); + } + + // Register to receive offset updates + MutationTrackingService.instance().registerSyncCoordinator(this); + + // Capture local targets + captureTargets(); + + logger.info("Sync coordinator started for keyspace {} range {}, tracking {} shards", + keyspace, range, overlappingShards.size()); + + // Send sync requests to all remote participants + sendSyncRequests(); + + // Check if already complete (e.g. single node, no targets) + checkIfReadyToComplete(); + } + + private void complete() + { + if (completionFuture.trySuccess(null)) + MutationTrackingService.instance().unregisterSyncCoordinator(this); + } + + private void sendSyncRequests() + { + MutationTrackingSyncRequest request = new MutationTrackingSyncRequest(desc, liveHostIds); + // Collect remote replicas, filtering to only allowed participants if specified. + // This respects --force (which excludes dead nodes) and --hosts (which + // restricts to specific nodes). + Set remoteParticipants = ConcurrentHashMap.newKeySet(); + for (ShardSyncState state : shardStates.values()) + remoteParticipants.addAll(state.shard.remoteReplicas()); + + if (participants != null) + remoteParticipants.retainAll(participants); + + pendingSyncResponses.addAll(remoteParticipants); + + for (InetAddressAndPort participant : remoteParticipants) + { + logger.debug("Sending mutation tracking sync request to {} for {}", participant, desc); + + RepairMessage.sendMessageWithRetries(ctx, + RepairMessage.notDone(completionFuture), + request, + Verb.MT_SYNC_REQ, + participant, + new RequestCallback() + { + @Override + public void onResponse(Message msg) + { + onSyncResponse(msg.from(), msg.payload); + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailure failure) + { + fail(new RuntimeException( + String.format("Mutation tracking sync failed: participant %s returned failure %s", from, failure.reason))); + } + + @Override + public boolean invokeOnFailure() + { + return true; + } + }); + } + } + + private void captureTargets() + { + checkState(!completionFuture.isDone()); + checkForTopologyChange(); + + for (ShardSyncState state : shardStates.values()) + { + state.captureTargets(); + } + } + + /** + * Checks if any of the shards we're tracking have changed due to topology updates. + * If a change is detected, fails the repair via {@link #fail(Throwable)}. + */ + private void checkForTopologyChange() + { + for (ShardSyncState state : shardStates.values()) + { + Shard currentShard = getCurrentShard(state.shard.range); + if (currentShard != state.shard) + { + fail(new RuntimeException("Repair failed: topology changed during sync")); + return; + } + } + } + + private Shard getCurrentShard(Range shardRange) + { + Shard[] result = new Shard[1]; + MutationTrackingService.instance().forEachShardInKeyspace(keyspace, shard -> { + if (shard.range.equals(shardRange)) + result[0] = shard; + }); + return result[0]; + } + + private void fail(Throwable cause) + { + if (completionFuture.tryFailure(cause)) + { + logger.warn("Sync coordinator for keyspace {} range {} failed: {}", + keyspace, range, cause.getMessage()); + MutationTrackingService.instance().unregisterSyncCoordinator(this); + } + } + + /** + * Check if all targets are reconciled across all shards. + */ + private void checkIfReadyToComplete() + { + if (completionFuture.isDone()) + return; + checkForTopologyChange(); + + if (checkIfComplete()) + { + logger.info("Sync coordinator completed for keyspace {} range {}", keyspace, range); + complete(); + } + } + + private boolean checkIfComplete() + { + if (completionFuture.isDone()) + return true; + + if (!pendingSyncResponses.isEmpty()) + return false; + + for (ShardSyncState state : shardStates.values()) + { + if (!state.isComplete()) + return false; + } + return true; + } + + /** + * Called when offset updates are received from a participant. + */ + public void onOffsetsReceived() + { + if (completionFuture.isDone()) + return; + + checkIfReadyToComplete(); + } + + /** + * Called when a sync response is received from a participant in response to a + * MutationTrackingSyncRequest. Updates the shard targets with the offsets from the + * response, establishing a happens-before relationship with the repair start. + * + * @param from the participant that sent the response + * @param response the sync response from a participant + */ + public void onSyncResponse(InetAddressAndPort from, MutationTrackingSyncResponse response) + { + if (completionFuture.isDone()) + return; + + // Deduplicate: retries of MT_SYNC_REQ can produce multiple responses from the + // same participant. Only process the first one. + if (!pendingSyncResponses.remove(from)) + return; + + // Update shard targets with the offsets received from the participant + for (Map.Entry, Map> entry : response.offsetsByShard.entrySet()) + { + Range shardRange = entry.getKey(); + ShardSyncState state = shardStates.get(shardRange); + if (state != null) + { + state.targets.putAll(entry.getValue()); + } + } + + logger.trace("Sync coordinator received sync response from {}", from); + + checkIfReadyToComplete(); + } + + public String getKeyspace() + { + return keyspace; + } + + public Range getRange() + { + return range; + } + + /** + * Blocks until sync completes or timeout is reached. + * + * @param timeout Maximum time to wait + * @param unit Time unit + * @return true if completed, false if timed out + */ + public void awaitCompletion(long timeout, TimeUnit unit) throws Exception + { + completionFuture.get(timeout, unit); + } + + public void cancel() + { + if (completionFuture.tryFailure(new RuntimeException("Sync cancelled"))) + MutationTrackingService.instance().unregisterSyncCoordinator(this); + } + + /** + * Tracks sync state for a single shard. + * Completion is scoped to only the live participant host IDs when provided, + * so that dead/excluded nodes don't block sync completion. + */ + private static class ShardSyncState + { + private final Shard shard; + + // If non-null, only these host IDs are considered for union/intersection. + // If null, all shard participants are used (equivalent to no filtering). + private final Set liveHostIds; + + // Target offsets: LogId -> the offsets we're waiting for live nodes to have + private final Map targets = new ConcurrentHashMap<>(); + + ShardSyncState(Shard shard, Set liveHostIds) + { + this.shard = shard; + this.liveHostIds = liveHostIds; + } + + void captureTargets() + { + Map unionOffsets = shard.collectUnionOfWitnessedOffsetsPerLog(liveHostIds); + targets.putAll(unionOffsets); + } + + boolean isComplete() + { + Map currentReconciled = shard.collectReconciledOffsetsPerLog(liveHostIds); + + for (Map.Entry entry : targets.entrySet()) + { + CoordinatorLogId logId = entry.getKey(); + Offsets.Immutable target = entry.getValue(); + + Offsets.Immutable reconciled = currentReconciled.get(logId); + if (reconciled == null) + return false; + + if (!containsAll(reconciled, target)) + return false; + } + return true; + } + + private boolean containsAll(Offsets reconciled, Offsets target) + { + for (ShortMutationId id : target) + { + if (!reconciled.contains(id.offset())) + return false; + } + return true; + } + } +} diff --git a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java index ac6fcc0dafae..1f396a513b7d 100644 --- a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java +++ b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java @@ -73,6 +73,22 @@ Offsets.Mutable intersection() return intersection; } + Offsets.Mutable union() + { + if (offsetsMap.isEmpty()) + throw new IllegalStateException("Cannot compute union of empty offsets map"); + + Iterator iter = offsetsMap.values().iterator(); + if (offsetsMap.size() == 1) + return Offsets.Mutable.copy(iter.next()); + + Offsets.Mutable union = Offsets.Mutable.copy(iter.next()); + while (iter.hasNext()) + union.addAll(iter.next()); + + return union; + } + public void add(int node, Offsets offsets) { Offsets.Mutable current = offsetsMap.get(node); diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index f282edf4262c..ca133160f522 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -20,7 +20,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -407,6 +409,76 @@ void collectShardReconciledOffsetsToBuilder(ReconciledKeyspaceOffsets.Builder ke logs.values().forEach(log -> keyspaceBuilder.put(log.logId, log.collectReconciledOffsets(), range)); } + /** + * Returns the reconciled offsets for each coordinator log in this shard. + * Reconciled offsets are the intersection of what all participants have. + */ + public Map collectReconciledOffsetsPerLog() + { + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable reconciled = log.collectReconciledOffsets(); + if (!reconciled.isEmpty()) + result.put(log.logId, reconciled); + } + return result; + } + + /** + * Returns the intersection of witnessed offsets scoped to only the specified participant host IDs. + * If liveHostIds is null, behaves the same as {@link #collectReconciledOffsetsPerLog()}. + */ + public Map collectReconciledOffsetsPerLog(Set liveHostIds) + { + if (liveHostIds == null) + return collectReconciledOffsetsPerLog(); + + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable reconciled = log.collectReconciledOffsets(liveHostIds); + if (!reconciled.isEmpty()) + result.put(log.logId, reconciled); + } + return result; + } + + /** + * Returns the UNION of witnessed offsets from all participants for each coordinator log. + * Union = all offsets that ANY replica has witnessed. + */ + public Map collectUnionOfWitnessedOffsetsPerLog() + { + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable union = log.collectUnionOfWitnessedOffsets(); + if (!union.isEmpty()) + result.put(log.logId, union); + } + return result; + } + + /** + * Returns the UNION of witnessed offsets scoped to only the specified participant host IDs. + * If liveHostIds is null, behaves the same as {@link #collectUnionOfWitnessedOffsetsPerLog()}. + */ + public Map collectUnionOfWitnessedOffsetsPerLog(Set liveHostIds) + { + if (liveHostIds == null) + return collectUnionOfWitnessedOffsetsPerLog(); + + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable union = log.collectUnionOfWitnessedOffsets(liveHostIds); + if (!union.isEmpty()) + result.put(log.logId, union); + } + return result; + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 47687551cfd7..7872d207da88 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -780,7 +780,7 @@ public void onFailure(InetAddressAndPort from, RequestFailure failure) if (failure.reason == RequestFailureReason.TIMEOUT) { pending.set(-1); - promise.setFailure(failRepairException(parentRepairSession, "Did not get replies from all endpoints.")); + promise.tryFailure(failRepairException(parentRepairSession, "Did not get replies from all endpoints.")); } else { @@ -798,7 +798,7 @@ private void ack() } else { - promise.setFailure(failRepairException(parentRepairSession, "Got negative replies from endpoints " + failedNodes)); + promise.tryFailure(failRepairException(parentRepairSession, "Got negative replies from endpoints " + failedNodes)); } } } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index eea21afe8194..2fe8578aca05 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3139,7 +3139,7 @@ private FutureTask createRepairTask(final int cmd, final String keyspace throw new IllegalArgumentException("data center(s) " + datacenters.toString() + " not found"); } - RepairCoordinator task = new RepairCoordinator(this, cmd, options, keyspace, ClusterMetadata.current().epoch); + RepairCoordinator task = RepairCoordinator.create(this, cmd, options, keyspace, ClusterMetadata.current().epoch); task.addProgressListener(progressSupport); for (ProgressListener listener : listeners) task.addProgressListener(listener); @@ -5464,6 +5464,19 @@ public void setRepairRpcTimeout(Long timeoutInMillis) DatabaseDescriptor.setRepairRpcTimeout(timeoutInMillis); logger.info("RepairRpcTimeout set to {}ms via JMX", timeoutInMillis); } + + public long getMutationTrackingSyncTimeout() + { + return DatabaseDescriptor.getMutationTrackingSyncTimeout(MILLISECONDS); + } + + public void setMutationTrackingSyncTimeout(long timeoutInMillis) + { + checkArgument(timeoutInMillis > 0); + DatabaseDescriptor.setMutationTrackingSyncTimeout(timeoutInMillis); + logger.info("MutationTrackingSyncTimeout set to {}ms via JMX", timeoutInMillis); + } + public void evictHungRepairs() { logger.info("StorageService#clearPaxosRateLimiters called via jmx"); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 5d4781c54651..31935467e400 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -1334,6 +1334,9 @@ public void enableAuditLog(String loggerName, String includedKeyspaces, String e public Long getRepairRpcTimeout(); public void setRepairRpcTimeout(Long timeoutInMillis); + public long getMutationTrackingSyncTimeout(); + public void setMutationTrackingSyncTimeout(long timeoutInMillis); + public void evictHungRepairs(); public void clearPaxosRepairs(); public void setSkipPaxosRepairCompatibilityCheck(boolean v); diff --git a/src/java/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfo.java b/src/java/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfo.java index ed193d61c293..38f0f00e75eb 100644 --- a/src/java/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfo.java +++ b/src/java/org/apache/cassandra/service/replication/migration/KeyspaceMigrationInfo.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.replication.migration; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -26,6 +27,7 @@ import java.util.Objects; import java.util.Set; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -36,11 +38,15 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.cassandra.db.TypeSizes.sizeof; import static org.apache.cassandra.utils.CollectionSerializers.deserializeList; import static org.apache.cassandra.utils.CollectionSerializers.deserializeMap; @@ -226,6 +232,26 @@ public boolean isTokenInPendingRange(TableId tableId, Token token) return tableRanges.intersects(token); } + /** + * Check if a range intersects with any pending migration range for the given table. + */ + public boolean isRangeInPendingMigration(TableId tableId, Range range) + { + NormalizedRanges tableRanges = pendingRangesPerTable.get(tableId); + if (tableRanges == null) + return false; + return tableRanges.intersects(range); + } + + /** + * Check if the range defined by startToken and endToken intersects with any pending migration range + * for the given table. + */ + public boolean isRangeInPendingMigration(TableId tableId, Token startToken, Token endToken) + { + return isRangeInPendingMigration(tableId, new Range<>(startToken, endToken)); + } + /** * Determine if read operations on a token should use tracked replication during migration. * @@ -246,6 +272,107 @@ public boolean shouldUseTrackedForWrites(boolean isTracked, TableId tableId, Tok return isTracked || isTokenInPendingRange(tableId, token); } + /** + * Asserts that the given ranges are either entirely inside or entirely outside the pending + * migration set for each specified table. During migration, reads for pending ranges continue + * to use the untracked read path with blocking read repair, so it is safe to exclude mutation + * tracking streaming for those ranges. However, partial overlap is not supported — repair must + * operate on ranges that are fully migrated or fully pending. + * + * @param ranges the normalized ranges to check + * @param tables the tables to check against + * @throws IllegalStateException if ranges partially overlap with pending ranges for any table + */ + public void assertRangesNotMixedMigration(@Nonnull NormalizedRanges ranges, + @Nonnull Iterable tables) + { + for (TableMetadata table : tables) + { + NormalizedRanges pendingRanges = getPendingRangesForTable(table.id); + if (pendingRanges.isEmpty()) + continue; + + NormalizedRanges overlap = pendingRanges.intersection(ranges); + if (overlap.isEmpty()) + continue; + + // Some ranges overlap with pending — verify ALL ranges are pending for this table + NormalizedRanges outside = ranges.subtract(pendingRanges); + if (!outside.isEmpty()) + throw new IllegalStateException(String.format( + "Ranges for keyspace %s partially overlap with migration pending ranges for table %s. " + + "Ranges must be entirely inside or entirely outside the pending set.", + keyspace, table.name)); + } + } + + /** + * Convenience overload that resolves column family names to table metadata before checking. + * + * @param ranges the normalized ranges to check + * @param ksm the keyspace metadata for resolving table names + * @param columnFamilies specific table names to check, or null for all tables + * @throws IllegalStateException if ranges partially overlap with pending ranges for any table + */ + public void assertRangesNotMixedMigration(@Nonnull NormalizedRanges ranges, + @Nonnull KeyspaceMetadata ksm, + @Nullable Collection columnFamilies) + { + checkArgument(columnFamilies == null || !columnFamilies.isEmpty(), "columnFmilies must not be empty"); + Iterable tables; + if (columnFamilies != null) + { + List tableList = new ArrayList<>(columnFamilies.size()); + for (String cf : columnFamilies) + { + TableMetadata table = ksm.tables.getNullable(cf); + if (table != null) + tableList.add(table); + } + tables = tableList; + } + else + { + tables = ksm.tables; + } + assertRangesNotMixedMigration(ranges, tables); + } + + /** + * Determines whether the given ranges for a table should use the tracked transfer path + * (coordinated activation via TrackedRepairTransfer) or the untracked streaming path. + *

+ * Returns true (use tracked) when: + * - No migration is in progress for this keyspace, OR + * - The ranges don't overlap with pending migration ranges for this table + *

+ * Returns false (use untracked) when: + * - A migration is in progress AND the ranges overlap with pending ranges + * + * @param metadata cluster metadata snapshot + * @param keyspace keyspace name + * @param tableId table to check + * @param ranges the ranges being repaired or streamed + * @return true if tracked transfers should be used + */ + public static boolean shouldUseTrackedTransfers(@Nonnull ClusterMetadata metadata, + @Nonnull String keyspace, + @Nonnull TableId tableId, + @Nonnull Collection> ranges) + { + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); + if (migrationInfo == null) + return true; + + NormalizedRanges pendingRanges = migrationInfo.getPendingRangesForTable(tableId); + if (pendingRanges.isEmpty()) + return true; + + NormalizedRanges normalizedRanges = NormalizedRanges.normalizedRanges(ranges); + NormalizedRanges overlap = pendingRanges.intersection(normalizedRanges); + return overlap.isEmpty(); + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/service/replication/migration/MutationTrackingMigrationState.java b/src/java/org/apache/cassandra/service/replication/migration/MutationTrackingMigrationState.java index 6b7850dad9a9..15fa8a6b39b1 100644 --- a/src/java/org/apache/cassandra/service/replication/migration/MutationTrackingMigrationState.java +++ b/src/java/org/apache/cassandra/service/replication/migration/MutationTrackingMigrationState.java @@ -256,6 +256,11 @@ public KeyspaceMigrationInfo getKeyspaceInfo(String keyspace) return keyspaceInfo.get(keyspace); } + public boolean isMigrating(String keyspace) + { + return keyspaceInfo.containsKey(keyspace); + } + public boolean hasMigratingKeyspaces() { return !keyspaceInfo.isEmpty(); diff --git a/src/java/org/apache/cassandra/utils/CollectionSerializers.java b/src/java/org/apache/cassandra/utils/CollectionSerializers.java index bac7b2450c08..42b48d289770 100644 --- a/src/java/org/apache/cassandra/utils/CollectionSerializers.java +++ b/src/java/org/apache/cassandra/utils/CollectionSerializers.java @@ -77,6 +77,9 @@ public long serializedSize(String str, int version) } }; + public static final UnversionedSerializer> intSetSerializer = newSetSerializer(Int32Serializer.serializer); + public static final UnversionedSerializer> nullableIntSetSerializer = NullableSerializer.wrap(intSetSerializer); + public static void serializeCollection(Collection values, DataOutputPlus out, UnversionedSerializer valueSerializer) throws IOException { out.writeUnsignedVInt32(values.size()); @@ -642,6 +645,30 @@ private static > C deserializeCollection(Data return result; } + public static UnversionedSerializer> newSetSerializer(UnversionedSerializer itemSerializer) + { + return new UnversionedSerializer>() + { + @Override + public void serialize(Set set, DataOutputPlus out) throws IOException + { + serializeCollection(set, out, itemSerializer); + } + + @Override + public Set deserialize(DataInputPlus in) throws IOException + { + return deserializeSet(in, itemSerializer); + } + + @Override + public long serializedSize(Set t) + { + return serializedCollectionSize(t, itemSerializer); + } + }; + } + public static UnversionedSerializer> newListSerializer(UnversionedSerializer itemSerializer) { return new UnversionedSerializer>() @@ -666,6 +693,30 @@ public long serializedSize(List t) }; } + public static IVersionedSerializer> newMapSerializer(IVersionedSerializer keySerializer, IVersionedSerializer valueSerializer) + { + return new IVersionedSerializer>() + { + @Override + public void serialize(Map map, DataOutputPlus out, int version) throws IOException + { + serializeMap(map, out, version, keySerializer, valueSerializer); + } + + @Override + public Map deserialize(DataInputPlus in, int version) throws IOException + { + return deserializeMap(in, version, keySerializer, valueSerializer); + } + + @Override + public long serializedSize(Map map, int version) + { + return serializedMapSize(map, version, keySerializer, valueSerializer); + } + }; + } + public static IVersionedSerializer> newListSerializer(IVersionedSerializer itemSerializer) { return new IVersionedSerializer>() diff --git a/test/distributed/org/apache/cassandra/distributed/test/MutationTrackingMigrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/MutationTrackingMigrationTest.java index 51344e893d57..9b1b6f3c0226 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/MutationTrackingMigrationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/MutationTrackingMigrationTest.java @@ -32,7 +32,6 @@ import org.apache.cassandra.distributed.api.ICoordinator; import org.apache.cassandra.replication.MutationJournal; import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; import org.apache.cassandra.service.replication.migration.MutationTrackingMigrationState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; @@ -41,8 +40,7 @@ import static java.lang.String.format; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** @@ -136,31 +134,31 @@ private void verifyKeyspaceState(String keyspace, ExpectedKeyspaceState expected ClusterMetadata metadata = ClusterMetadata.current(); KeyspaceMetadata ksm = expectedState != ExpectedKeyspaceState.DROPPED ? metadata.schema.getKeyspaceMetadata(keyspace) : null; MutationTrackingMigrationState migrationState = metadata.mutationTrackingMigrationState; - KeyspaceMigrationInfo migrationInfo = migrationState.getKeyspaceInfo(keyspace); + boolean migrating = migrationState.isMigrating(keyspace); switch (expectedState) { case UNTRACKED: assertTrue(!ksm.params.replicationType.isTracked()); - assertNull(migrationInfo); + assertFalse(migrating); break; case MIGRATING_TO_TRACKED: assertTrue(ksm.params.replicationType.isTracked()); - assertNotNull(migrationInfo); + assertTrue(migrating); break; case MIGRATING_TO_UNTRACKED: assertTrue(!ksm.params.replicationType.isTracked()); - assertNotNull(migrationInfo); + assertTrue(migrating); break; case TRACKED: assertTrue(ksm.params.replicationType.isTracked()); - assertNull(migrationInfo); + assertFalse(migrating); break; case DROPPED: - assertNull(migrationInfo); + assertFalse(migrating); break; default: throw new AssertionError("Unexpected state: " + expectedState); diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java new file mode 100644 index 000000000000..b93481f04a75 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.repair; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.repair.MutationTrackingIncrementalRepairTask; +import org.apache.cassandra.tcm.ClusterMetadata; + +import static org.junit.Assert.*; + +/** + * Tests for MutationTrackingIncrementalRepairTask. + * Tests the decision logic for when to use mutation tracking repair. + * + * Uses a shared cluster across all tests to minimize overhead. + */ +public class MutationTrackingIncrementalRepairTaskTest extends TestBaseImpl +{ + private static Cluster CLUSTER; + private static final AtomicInteger ksCounter = new AtomicInteger(); + + @BeforeClass + public static void setupCluster() throws IOException + { + CLUSTER = Cluster.build() + .withNodes(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK, Feature.GOSSIP)) + .start(); + } + + @AfterClass + public static void teardownCluster() + { + if (CLUSTER != null) + CLUSTER.close(); + } + + private static String nextKsName() + { + return "mtirt_ks" + ksCounter.incrementAndGet(); + } + + @Test + public void testShouldUseMutationTrackingRepairForTrackedKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(metadata, ksName); + }); + + assertTrue("Tracked keyspace should use mutation tracking repair", shouldUse); + } + + @Test + public void testShouldNotUseMutationTrackingRepairForUntrackedKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(metadata, ksName); + }); + + assertFalse("Untracked keyspace should not use mutation tracking repair", shouldUse); + } + + @Test + public void testRequiresTraditionalRepairReturnsFalseForNonMigratingKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + Boolean migrationInProgress = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.isMutationTrackingMigrationInProgress(metadata, ksName); + }); + + assertFalse("Non-migrating keyspace should not have migration in progress", migrationInProgress); + } + + @Test + public void testShouldUseMutationTrackingRepairForNonexistentKeyspace() throws Throwable + { + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(metadata, "nonexistent_ks_xyz"); + }); + + assertFalse("Nonexistent keyspace should return false", shouldUse); + } + + @Test + public void testMigrationFromUntrackedToTracked() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + + // Verify initial state + Boolean shouldUseBefore = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(metadata, ksName); + }); + assertFalse("Untracked keyspace should not use mutation tracking repair", shouldUseBefore); + + Boolean migrationBefore = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.isMutationTrackingMigrationInProgress(metadata, ksName); + }); + assertFalse("Non-migrating keyspace should not have migration in progress", migrationBefore); + + // Trigger migration by altering to tracked + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + // Verify migration state - both methods should now return true + Boolean shouldUseAfter = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(metadata, ksName); + }); + assertTrue("Migrating keyspace should use mutation tracking repair", shouldUseAfter); + + Boolean migrationAfter = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.isMutationTrackingMigrationInProgress(metadata, ksName); + }); + assertTrue("Migrating keyspace should have migration in progress", migrationAfter); + } + + @Test + public void testMigrationFromTrackedToUntracked() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + + // Verify initial state + Boolean shouldUseBefore = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(metadata, ksName); + }); + assertTrue("Tracked keyspace should use mutation tracking repair", shouldUseBefore); + + Boolean migrationBefore = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.isMutationTrackingMigrationInProgress(metadata, ksName); + }); + assertFalse("Non-migrating tracked keyspace should not have migration in progress", migrationBefore); + + // Migrate back to untracked + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + + // During reverse migration, both should still apply + Boolean shouldUseAfter = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(metadata, ksName); + }); + assertFalse("Keyspace migrating from tracked should not use mutation tracking repair", shouldUseAfter); + + Boolean migrationAfter = CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.isMutationTrackingMigrationInProgress(metadata, ksName); + }); + assertTrue("Keyspace migrating from tracked should have migration in progress", migrationAfter); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingRepairTest.java new file mode 100644 index 000000000000..4fbce7c16a06 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingRepairTest.java @@ -0,0 +1,917 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.repair; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiPredicate; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; + +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.repair.MutationTrackingIncrementalRepairTask; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static java.util.function.Predicate.not; +import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of; +import static org.junit.Assert.*; + +/** + * End-to-end tests for mutation tracking repair. + * + * Each test creates a unique keyspace. Keyspaces are not dropped between tests because + * dropping a tracked keyspace while background offset broadcasts are in flight causes + * NoSuchElementException in MutationTrackingService.getOrCreateShards (the broadcast + * tries to look up the dropped keyspace's metadata). The keyspaces are cleaned up when + * the cluster is closed at the end of the test class. + */ +public class MutationTrackingRepairTest extends TestBaseImpl +{ + private static final int NUM_NODES = 3; + private static final List ALL_NODES = List.of(1, 2, 3); + + private static Cluster CLUSTER; + private static ExecutorService executor; + private static final AtomicInteger ksCounter = new AtomicInteger(); + + private String ksName; + + @BeforeClass + public static void setupCluster() throws IOException + { + executor = Executors.newCachedThreadPool(); + CLUSTER = Cluster.build() + .withNodes(NUM_NODES) + .withConfig(cfg -> cfg.set("mutation_tracking_sync_timeout", "10s") + .set("request_timeout", "1000ms") + .set("repair.retries.max_attempts", 10) + .set("repair.retries.base_sleep_time", "100ms") + .set("repair.retries.max_sleep_time", "500ms") + .with(Feature.GOSSIP, Feature.NETWORK)) + .start(); + } + + @AfterClass + public static void teardownCluster() + { + executor.shutdownNow(); + if (CLUSTER != null) + CLUSTER.close(); + } + + @Before + public void setUp() + { + ksName = "mt_repair_" + ksCounter.incrementAndGet(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + } + + @After + public void tearDown() + { + CLUSTER.filters().reset(); + for (int i = 1; i <= CLUSTER.size(); i++) + { + CLUSTER.get(i).runOnInstance(() -> { + Gossiper.runInGossipStageBlocking(() -> { + for (var entry : Gossiper.instance.endpointStateMap.entrySet()) + { + InetAddressAndPort ep = entry.getKey(); + EndpointState state = entry.getValue(); + if (!ep.equals(FBUtilities.getBroadcastAddressAndPort()) && !state.isAlive()) + { + FailureDetector.instance.report(ep); + Gossiper.instance.realMarkAlive(ep, state); + } + } + }); + }); + } + } + + private void setupUntracked() + { + ksName = "mt_repair_" + ksCounter.incrementAndGet(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + } + + private void createTable(String tableName) + { + CLUSTER.schemaChange("CREATE TABLE " + ksName + '.' + tableName + " (k int PRIMARY KEY, v int)"); + } + + private void alterKeyspaceToTracked() + { + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + } + + private void alterKeyspaceToUntracked() + { + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + } + + private void insertData(String tableName, int start, int count) + { + for (int i = start; i < start + count; i++) + { + CLUSTER.coordinator(1).execute( + "INSERT INTO " + ksName + '.' + tableName + " (k, v) VALUES (?, ?)", + ConsistencyLevel.ALL, i, i); + } + } + + private void insertDataWithInconsistency(String tableName, int start, int count) + { + insertDataWithInconsistency(2, tableName, start, count); + } + + private void insertDataWithInconsistency(int isolatedNode, String tableName, int start, int count) + { + // Isolate a node so background reconcilation has some work to do + CLUSTER.filters().allVerbs().to(isolatedNode).drop(); + CLUSTER.filters().allVerbs().from(isolatedNode).drop(); + + for (int i = start; i < start + count; i++) + { + CLUSTER.coordinator(1).execute( + "INSERT INTO " + ksName + '.' + tableName + " (k, v) VALUES (?, ?)", + ConsistencyLevel.QUORUM, i, i); + } + + // Verify the isolated node is actually missing the data we just wrote + Object[][] results = CLUSTER.get(isolatedNode).executeInternal( + "SELECT k FROM " + ksName + '.' + tableName + " WHERE k >= ? AND k < ? ALLOW FILTERING", + start, start + count); + assertEquals("Node " + isolatedNode + " should not have data written while isolated", + 0, results.length); + CLUSTER.filters().reset(); + } + + private void assertDataOnAllNodes(String tableName, List keys) + { + for (int node = 1; node <= CLUSTER.size(); node++) + { + for (int key : keys) + { + Object[][] results = CLUSTER.get(node).executeInternal( + "SELECT k, v FROM " + ksName + '.' + tableName + " WHERE k = ?", key); + assertEquals("Node " + node + " missing row k=" + key, 1, results.length); + assertEquals(key, results[0][0]); + assertEquals(key, results[0][1]); + } + } + } + + private void assertDataOnAllNodes(String tableName, int start, int count) + { + List keys = new ArrayList<>(count); + for (int i = start; i < start + count; i++) + keys.add(i); + assertDataOnAllNodes(tableName, keys); + } + + private NodeToolResult nodetoolRepair(int node, String... args) + { + String[] cmd = new String[args.length + 1]; + cmd[0] = "repair"; + System.arraycopy(args, 0, cmd, 1, args.length); + return CLUSTER.get(node).nodetoolResult(cmd); + } + + private List repairConcurrently(List nodes, String... args) + { + List> futures = new ArrayList<>(); + for (int node : nodes) + { + int n = node; + futures.add(executor.submit(() -> nodetoolRepair(n, args))); + } + List results = new ArrayList<>(); + for (Future f : futures) + { + try + { + results.add(f.get(60, TimeUnit.SECONDS)); + } + catch (Exception e) + { + throw new RuntimeException("Repair future failed", e); + } + } + return results; + } + + private void assertAllSuccess(List results) + { + for (NodeToolResult r : results) + r.asserts().success(); + } + + private void assertAllFailure(List results) + { + for (NodeToolResult r : results) + r.asserts().failure(); + } + + private String[] withPR(String... args) + { + String[] result = new String[args.length + 1]; + System.arraycopy(args, 0, result, 0, args.length); + result[args.length] = "-pr"; + return result; + } + + private void repairResolvingInconsistency(String... args) throws Exception + { + repairResolvingInconsistency(2, ALL_NODES, withPR(args)); + } + + private void repairResolvingInconsistency(int isolatedNode, List nodes, String... args) throws Exception + { + // Dropping messages is to check that repair retries messages if needed + CLUSTER.filters().allVerbs().to(isolatedNode).drop(); + CLUSTER.filters().allVerbs().from(isolatedNode).drop(); + + List> futures = new ArrayList<>(); + for (int node : nodes) + { + int n = node; + futures.add(executor.submit(() -> nodetoolRepair(n, args))); + } + + Thread.sleep(2000); + assertTrue("Repair should be blocked while node " + isolatedNode + " is isolated", + futures.stream().allMatch(not(Future::isDone))); + + CLUSTER.filters().reset(); + + List results = new ArrayList<>(); + for (Future f : futures) + results.add(f.get(30, TimeUnit.SECONDS)); + assertAllSuccess(results); + + // Run a second time to make sure repair can be run multiple times without failing + assertAllSuccess(repairConcurrently(nodes, args)); + } + + private void repairFromNodesSuccess(List nodes, String... args) + { + String[] prArgs = withPR(args); + assertAllSuccess(repairConcurrently(nodes, prArgs)); + assertAllSuccess(repairConcurrently(nodes, prArgs)); + } + + private boolean isMigrationInProgress() + { + String ks = ksName; + return CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return MutationTrackingIncrementalRepairTask.isMutationTrackingMigrationInProgress(metadata, ks); + }); + } + + private boolean isMigrationComplete() + { + String ks = ksName; + return CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return !metadata.mutationTrackingMigrationState.isMigrating(ks); + }); + } + + /** + * Get the primary token range for a node as [start, end] token values. + * With SimpleStrategy RF=3 and 3 nodes, each node has exactly one primary range. + */ + private long[] getPrimaryRangeTokens(int node) + { + String ks = ksName; + return CLUSTER.get(node).callOnInstance(() -> { + var ranges = StorageService.instance.getPrimaryRanges(ks); + assertEquals(1, ranges.size()); + Range range = ranges.iterator().next(); + return new long[]{ + ((Murmur3Partitioner.LongToken) range.left).token, + ((Murmur3Partitioner.LongToken) range.right).token + }; + }); + } + + /** + * Compute which integer keys from [start, start+count) hash into the given token range. + */ + private List keysInTokenRange(int start, int count, long rangeStart, long rangeEnd) + { + Range range = new Range<>(new Murmur3Partitioner.LongToken(rangeStart), + new Murmur3Partitioner.LongToken(rangeEnd)); + List keys = new ArrayList<>(); + for (int i = start; i < start + count; i++) + { + Token token = Murmur3Partitioner.instance.getToken(ByteBufferUtil.bytes(i)); + if (range.contains(token)) + keys.add(i); + } + return keys; + } + + private String getBroadcastAddress(int node) + { + return CLUSTER.get(node).callOnInstance(() -> FBUtilities.getBroadcastAddressAndPort().getHostAddressAndPort()); + } + + private void isolateNode(int nodeToIsolate, int... observerNodes) + { + CLUSTER.filters().allVerbs().from(nodeToIsolate).drop(); + CLUSTER.filters().allVerbs().to(nodeToIsolate).drop(); + + String isolatedAddress = CLUSTER.get(nodeToIsolate).callOnInstance( + () -> FBUtilities.getBroadcastAddressAndPort().getHostAddressAndPort()); + for (int observer : observerNodes) + { + CLUSTER.get(observer).runOnInstance(() -> { + try + { + InetAddressAndPort neighbor = InetAddressAndPort.getByName(isolatedAddress); + FailureDetector.instance.forceConviction(neighbor); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + }); + } + } + + @Test + public void testBasicRepairHappyPath() throws Exception + { + insertDataWithInconsistency("tbl", 0, 100); + + repairResolvingInconsistency(ksName); + + assertDataOnAllNodes("tbl", 0, 100); + } + + @Test + public void testRepairSpecificTable() throws Exception + { + createTable("tbl1"); + createTable("tbl2"); + + // Repair only tbl1 + insertDataWithInconsistency("tbl1", 0, 50); + repairResolvingInconsistency(ksName, "tbl1"); + assertDataOnAllNodes("tbl1", 0, 50); + + // Repair only tbl2 while tbl1 already has repaired data + insertDataWithInconsistency("tbl2", 0, 50); + repairResolvingInconsistency(ksName, "tbl2"); + assertDataOnAllNodes("tbl2", 0, 50); + + // Repair both tables together + insertDataWithInconsistency("tbl1", 50, 50); + insertDataWithInconsistency("tbl2", 50, 50); + repairResolvingInconsistency(ksName, "tbl1", "tbl2"); + assertDataOnAllNodes("tbl1", 0, 100); + assertDataOnAllNodes("tbl2", 0, 100); + } + + @Test + public void testRepairAllTables() throws Exception + { + createTable("tbl1"); + createTable("tbl2"); + createTable("tbl3"); + + insertDataWithInconsistency("tbl1", 0, 30); + insertDataWithInconsistency("tbl2", 100, 30); + insertDataWithInconsistency("tbl3", 200, 30); + + repairResolvingInconsistency(ksName); + + assertDataOnAllNodes("tbl1", 0, 30); + assertDataOnAllNodes("tbl2", 100, 30); + assertDataOnAllNodes("tbl3", 200, 30); + } + + @Test + public void testForceRepairWithNodeDown() + { + insertDataWithInconsistency(3, "tbl", 0, 50); + + isolateNode(2, 1, 3); + + List liveNodes = List.of(1, 3); + assertAllFailure(repairConcurrently(liveNodes, withPR(ksName))); + + repairFromNodesSuccess(liveNodes, ksName, "--force"); + + for (int node : liveNodes) + { + for (int i = 0; i < 50; i++) + { + Object[][] results = CLUSTER.get(node).executeInternal( + "SELECT k, v FROM " + ksName + ".tbl WHERE k = ?", i); + assertEquals("Node " + node + " missing row k=" + i, 1, results.length); + } + } + } + + @Test + public void testForceRepairWithAllNodesUp() throws Exception + { + insertDataWithInconsistency("tbl", 0, 50); + + repairResolvingInconsistency(ksName, "--force"); + + assertDataOnAllNodes("tbl", 0, 50); + } + + @Test + public void testRepairWithSpecificHosts() + { + String addr1 = getBroadcastAddress(1); + String addr3 = getBroadcastAddress(3); + + insertDataWithInconsistency(3, "tbl", 0, 50); + + // Node 2 is down, so normal repair should fail + isolateNode(2, 1, 3); + + List liveNodes = List.of(1, 3); + assertAllFailure(repairConcurrently(liveNodes, withPR(ksName))); + + // Repair with --in-hosts scoped to only the live nodes should succeed + // Note: --in-hosts cannot be combined with -pr + String[] args = new String[]{ksName, "--in-hosts", addr1 + ',' + addr3}; + assertAllSuccess(repairConcurrently(liveNodes, args)); + assertAllSuccess(repairConcurrently(liveNodes, args)); + + for (int node : liveNodes) + { + for (int i = 0; i < 50; i++) + { + Object[][] results = CLUSTER.get(node).executeInternal( + "SELECT k, v FROM " + ksName + ".tbl WHERE k = ?", i); + assertEquals("Node " + node + " missing row k=" + i, 1, results.length); + } + } + } + + @Test + public void testMigrationUntrackedToTrackedCompletesViaRepair() throws Exception + { + setupUntracked(); + insertDataWithInconsistency("tbl", 0, 100); + + alterKeyspaceToTracked(); + assertTrue("Migration should be in progress after ALTER", isMigrationInProgress()); + + repairResolvingInconsistency(ksName); + assertTrue("Migration should complete after repair", isMigrationComplete()); + + assertDataOnAllNodes("tbl", 0, 100); + } + + @Test + public void testDataAccessibleDuringMigrationToTracked() throws Exception + { + setupUntracked(); + dataAccessibleDuringMigration(() -> alterKeyspaceToTracked()); + } + + @Test + public void testDataAccessibleDuringMigrationToUntracked() throws Exception + { + dataAccessibleDuringMigration(() -> alterKeyspaceToUntracked()); + } + + private void dataAccessibleDuringMigration(Runnable alterKeyspace) throws Exception + { + insertDataWithInconsistency("tbl", 0, 50); + + alterKeyspace.run(); + + Object[][] results = CLUSTER.coordinator(1).execute( + "SELECT k, v FROM " + ksName + ".tbl", ConsistencyLevel.ALL); + assertEquals("Pre-migration data should be readable", 50, results.length); + + insertData("tbl", 50, 50); + + results = CLUSTER.coordinator(1).execute( + "SELECT k, v FROM " + ksName + ".tbl", ConsistencyLevel.ALL); + assertEquals("All data should be readable during migration", 100, results.length); + + repairResolvingInconsistency(ksName); + assertTrue("Migration should complete after repair", isMigrationComplete()); + + results = CLUSTER.coordinator(1).execute( + "SELECT k, v FROM " + ksName + ".tbl", ConsistencyLevel.ALL); + assertEquals("All data should be readable after migration", 100, results.length); + + insertData("tbl", 100, 50); + results = CLUSTER.coordinator(1).execute( + "SELECT k, v FROM " + ksName + ".tbl", ConsistencyLevel.ALL); + assertEquals("All data including post-migration should be readable", 150, results.length); + + assertDataOnAllNodes("tbl", 0, 150); + } + + @Test + public void testMigrationTrackedToUntrackedCompletesViaRepair() throws Exception + { + insertDataWithInconsistency("tbl", 0, 100); + + alterKeyspaceToUntracked(); + assertTrue("Migration should be in progress after ALTER", isMigrationInProgress()); + + repairResolvingInconsistency(ksName); + assertTrue("Migration should complete after repair", isMigrationComplete()); + + assertDataOnAllNodes("tbl", 0, 100); + } + + @Test + public void testForceRepairWithDeadNodeDoesNotAdvanceMigration() + { + repairWithDeadNodeDoesNotAdvanceMigration(withPR(ksName, "--force")); + } + + @Test + public void testInHostsRepairWithDeadNodeDoesNotAdvanceMigration() + { + String addr1 = getBroadcastAddress(1); + String addr3 = getBroadcastAddress(3); + repairWithDeadNodeDoesNotAdvanceMigration(ksName, "--in-hosts", addr1 + ',' + addr3); + } + + private void repairWithDeadNodeDoesNotAdvanceMigration(String... repairArgs) + { + setupUntracked(); + insertDataWithInconsistency(3, "tbl", 0, 50); + + alterKeyspaceToTracked(); + assertTrue("Migration should be in progress", isMigrationInProgress()); + + isolateNode(2, 1, 3); + + List liveNodes = List.of(1, 3); + assertAllSuccess(repairConcurrently(liveNodes, repairArgs)); + + String ks = ksName; + assertTrue("Migration should not advance with dead nodes excluded", + CLUSTER.get(1).callOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + return metadata.mutationTrackingMigrationState.isMigrating(ks); + })); + } + + @Test + public void testInHostsRepairSucceedsWhenSpecifiedHostIsNetworkBlocked() + { + String addr1 = getBroadcastAddress(1); + String addr3 = getBroadcastAddress(3); + + insertDataWithInconsistency(3, "tbl", 0, 50); + + // Block network to node 2 but do NOT mark it down in gossip + CLUSTER.filters().allVerbs().from(2).drop(); + CLUSTER.filters().allVerbs().to(2).drop(); + + // Repair specifying only live hosts should succeed despite node 2 being blocked + List liveNodes = List.of(1, 3); + String[] args = new String[]{ksName, "--in-hosts", addr1 + ',' + addr3}; + assertAllSuccess(repairConcurrently(liveNodes, args)); + } + + @Test + public void testPreviewRepairDoesNotAdvanceMigration() throws Exception + { + setupUntracked(); + insertDataWithInconsistency("tbl", 0, 50); + + alterKeyspaceToTracked(); + assertTrue("Migration should be in progress", isMigrationInProgress()); + + repairResolvingInconsistency(ksName, "--preview"); + + assertTrue("Migration should not advance with preview repair", isMigrationInProgress()); + } + + @Test + public void testSubrangeRepair() throws Exception + { + long[] primaryRange = getPrimaryRangeTokens(1); + String st = Long.toString(primaryRange[0]); + String et = Long.toString(primaryRange[1]); + + insertDataWithInconsistency("tbl", 0, 100); + + repairResolvingInconsistency(2, ALL_NODES, ksName, "-st", st, "-et", et); + + List keysInRange = keysInTokenRange(0, 100, primaryRange[0], primaryRange[1]); + assertFalse("Should have keys hashing into node 1's primary range", keysInRange.isEmpty()); + + assertDataOnAllNodes("tbl", keysInRange); + } + + @Test + public void testSubrangeRepairAdvancesMigrationOnlyForSpecifiedRange() throws Exception + { + setupUntracked(); + long[] primaryRange = getPrimaryRangeTokens(1); + String st = Long.toString(primaryRange[0]); + String et = Long.toString(primaryRange[1]); + + insertDataWithInconsistency("tbl", 0, 100); + + alterKeyspaceToTracked(); + assertTrue("Full ring should be pending", isMigrationInProgress()); + + // During migration, subrange repair uses incremental repair. Running from all nodes + // on the same subrange causes anti-compaction conflicts, so repair from a single node. + repairResolvingInconsistency(2, List.of(1), ksName, "-st", st, "-et", et); + + assertTrue("Migration should not be complete after subrange repair", + isMigrationInProgress()); + + // Verify the repaired range is no longer pending but other ranges still are + String ks = ksName; + long rangeStart = primaryRange[0]; + long rangeEnd = primaryRange[1]; + CLUSTER.get(1).runOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + KeyspaceMigrationInfo info = metadata.mutationTrackingMigrationState.getKeyspaceInfo(ks); + assertNotNull("Migration info should still exist", info); + + Range repairedRange = new Range<>(new Murmur3Partitioner.LongToken(rangeStart), + new Murmur3Partitioner.LongToken(rangeEnd)); + for (var entry : info.pendingRangesPerTable.entrySet()) + { + for (Range pending : entry.getValue()) + { + assertFalse("Repaired range should not overlap with pending ranges for table " + entry.getKey(), + repairedRange.intersects(pending)); + } + } + }); + + // Verify all keys in the repaired range are present on all nodes + List keysInRange = keysInTokenRange(0, 100, primaryRange[0], primaryRange[1]); + assertFalse("Should have keys hashing into node 1's primary range", keysInRange.isEmpty()); + + assertDataOnAllNodes("tbl", keysInRange); + } + + @Test + public void testRepairRejectsMixedMigratedAndPendingRanges() + { + setupUntracked(); + insertData("tbl", 0, 50); + + alterKeyspaceToTracked(); + assertTrue("Migration should be in progress after ALTER", isMigrationInProgress()); + + long[] primaryRange = getPrimaryRangeTokens(1); + String st = Long.toString(primaryRange[0]); + String et = Long.toString(primaryRange[1]); + + // Repair node 1's primary range to advance migration for that subrange only. + // Run from single node to avoid anti-compaction conflicts during migration IR. + nodetoolRepair(1, ksName, "-st", st, "-et", et).asserts().success(); + + // Now attempt a repair with a range that straddles the migrated/pending boundary. + // Node 1's primary range has been repaired (no longer pending), but the range + // immediately after is still pending. A range spanning both should be rejected. + String straddleSt = Long.toString(primaryRange[1] - 1); + String straddleEt = Long.toString(primaryRange[1] + 1000); + NodeToolResult result = nodetoolRepair(1, ksName, "-st", straddleSt, "-et", straddleEt); + result.asserts().failure(); + assertTrue("Expected partial overlap error but got: " + result.getStderr(), + result.getStderr().contains("partially overlap with migration pending ranges")); + } + + @Test + public void testRepairTimeout() + { + insertData("tbl", 0, 50); + + CLUSTER.filters().verbs(Verb.MT_SYNC_REQ.id).to(2).drop(); + CLUSTER.filters().verbs(Verb.MT_SYNC_REQ.id).from(2).drop(); + + List results = repairConcurrently(ALL_NODES, withPR(ksName)); + assertAllFailure(results); + for (NodeToolResult r : results) + assertTrue("Expected timeout error but got: " + r.getStderr(), + r.getStderr().contains("Mutation tracking sync timed out")); + } + + /** + * Exercises the onFailure callback in MutationTrackingSyncCoordinator.sendSyncRequests(). + * Unlike testRepairTimeout (which drops MT_SYNC_REQ entirely so the request times out), + * this test makes the remote handler throw an exception, which sends a FAILURE_RSP back + * to the coordinator, triggering the onFailure -> fail() path. + */ + @Test + public void testSyncFailureResponse() + { + insertData("tbl", 0, 50); + + // The matcher throwing causes uncaught exceptions on the receiving nodes' stage threads. + // These are expected, so filter them out to avoid failing at cluster close. + CLUSTER.setUncaughtExceptionsFilter((nodeNum, throwable) -> + throwable.getMessage() != null && throwable.getMessage().contains("sync failure injected")); + try + { + CLUSTER.verbs(Verb.MT_SYNC_REQ).messagesMatching(of(m -> { + throw new RuntimeException("sync failure injected"); + })).drop(); + + List results = repairConcurrently(ALL_NODES, withPR(ksName)); + assertAllFailure(results); + for (NodeToolResult r : results) + assertTrue("Expected sync failure error but got: " + r.getStderr(), + r.getStderr().contains("Mutation tracking sync failed")); + } + finally + { + CLUSTER.setUncaughtExceptionsFilter((BiPredicate) null); + } + } + + /** + * During migration from untracked to tracked, incremental repair runs anti-compaction + * on SSTables that were written before tracking was enabled. When an SSTable partially + * overlaps the repair range, anti-compaction must split it by rewriting through + * SSTableWriter. The "inside repair range" writer gets pendingRepair set to the session ID. + * + * SSTableWriter.finalizeMetadata() must tolerate pendingRepair being set on a tracked + * table during migration. This test uses a narrow subrange to force anti-compaction to + * split SSTables (rather than just mutating fully-contained ones in place). + */ + @Test + public void testMigrationSubrangeRepairAntiCompactionSplitsSSTables() throws Exception + { + setupUntracked(); + + // Write data and flush so SSTables span the full token ring on each node. + insertData("tbl", 0, 500); + for (int i = 1; i <= NUM_NODES; i++) + CLUSTER.get(i).flush(ksName); + + alterKeyspaceToTracked(); + assertTrue("Migration should be in progress", isMigrationInProgress()); + + // Use a subrange that's well within one local range but wide enough to contain + // data. With 3 nodes, node 2's primary range is approximately + // (-3074457345618258603, 3074457345618258602]. A range of (0, 3000000000000000000] + // is fully contained in that range and covers ~16% of the ring, so ~80 of our 500 + // rows should hash into it. SSTables from the flush span the entire ring, so they + // will NOT be fully contained in this narrow range. Anti-compaction must split them + // via SSTableWriter, exercising the pendingRepair code path in finalizeMetadata(). + String st = "0"; + String et = "3000000000000000000"; + + // Run repair from a single node to avoid anti-compaction conflicts. + // This should succeed: anti-compaction splits SSTables and the repair completes. + NodeToolResult result = nodetoolRepair(1, ksName, "-st", st, "-et", et); + result.asserts().success(); + } + + @Test + public void testRepairSyncTimeout() + { + insertDataWithInconsistency("tbl", 0, 50); + + // Drop only offset broadcasts so MT_SYNC_REQ/RSP can succeed but + // reconciliation never completes, triggering mutation_tracking_sync_timeout + CLUSTER.filters().verbs(Verb.BROADCAST_LOG_OFFSETS.id).drop(); + + List results = repairConcurrently(ALL_NODES, withPR(ksName)); + assertAllFailure(results); + for (NodeToolResult r : results) + assertTrue("Expected sync timeout error but got: " + r.getStderr(), + r.getStderr().contains("Mutation tracking sync timed out")); + } + + /** + * Verifies that a topology change during an active mutation tracking sync causes + * the repair to fail with "topology changed during sync". + * + * The strategy: + * 1. Insert data so sync has work to do + * 2. Drop BROADCAST_LOG_OFFSETS so the sync coordinator stays alive waiting for + * offset reconciliation + * 3. Start repair in a background thread + * 4. Wait until the sync request has been sent (confirming sync is active) + * 5. ALTER KEYSPACE to change RF (3 -> 2), which triggers REPLICA_GROUP -> + * withUpdatedMetadata -> new Shard instances + * 6. Turn off the BROADCAST_LOG_OFFSETS filter so offset broadcasts resume, + * triggering onOffsetsReceived -> recaptureTargets -> checkForTopologyChange + * which detects the identity mismatch and fails the repair + * 7. Assert the repair failed with the expected topology change message + */ + @Test + public void testRepairFailsOnTopologyChange() throws Exception + { + insertData("tbl", 0, 50); + + // Block offset broadcasts so the sync coordinator stays alive waiting + IMessageFilters.Filter offsetFilter = CLUSTER.filters().verbs(Verb.BROADCAST_LOG_OFFSETS.id).drop(); + + // Use a latch to detect when the sync request has been sent, meaning + // the sync coordinator is active and tracking shard references + CountDownLatch syncStarted = new CountDownLatch(1); + IMessageFilters.Filter syncObserver = CLUSTER.verbs(Verb.MT_SYNC_REQ).messagesMatching( + (from, to, msg) -> { + syncStarted.countDown(); + return false; // don't drop the message + }).drop(); + + // Start repair in background + Future repairFuture = executor.submit(() -> nodetoolRepair(1, withPR(ksName))); + + // Wait until sync is active. The latch fires when MT_SYNC_REQ is sent, which + // happens after shardStates is fully populated in start(), so no additional + // delay is needed. + assertTrue("Timed out waiting for sync to start", + syncStarted.await(30, TimeUnit.SECONDS)); + + // ALTER KEYSPACE to change RF from 3 to 2 — this changes the participants for + // every range, triggering REPLICA_GROUP -> withUpdatedMetadata -> new Shard instances. + // The sync coordinator's shardStates still holds references to the old Shard objects. + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 2} " + + "AND replication_type='tracked'"); + + // Remove the sync observer since it's no longer needed + syncObserver.off(); + + // Turn off the offset broadcast filter so broadcasts resume. When an offset + // broadcast arrives, it calls onOffsetsReceived -> recaptureTargets -> + // checkForTopologyChange, which will detect that the current Shard instances + // (new objects from withUpdatedMetadata) differ from the ones stored in + // shardStates (reference equality check), and fail the repair. + offsetFilter.off(); + + NodeToolResult result = repairFuture.get(30, TimeUnit.SECONDS); + result.asserts().failure(); + assertTrue("Expected topology change error but got: " + result.getStderr(), + result.getStderr().contains("topology changed during sync")); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java new file mode 100644 index 000000000000..ef2581d300f2 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.replication; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.tcm.ClusterMetadata; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.repair.SharedContext; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; +import org.apache.cassandra.utils.TimeUUID; +import org.awaitility.Awaitility; + +import static org.junit.Assert.*; + +/** + * Distributed tests for MutationTrackingSyncCoordinator. + * + * Tests that the sync coordinator correctly waits for offset convergence + * across all nodes in a cluster. + */ +public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl +{ + private static final String KS_NAME = "sync_test_ks"; + private static final String TBL_NAME = "sync_test_tbl"; + + private void createTrackedKeyspace(Cluster cluster, String keyspaceSuffix) + { + String ksName = KS_NAME + keyspaceSuffix; + cluster.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + cluster.schemaChange("CREATE TABLE " + ksName + '.' + TBL_NAME + " (k int PRIMARY KEY, v int)"); + } + + private String tableName(String suffix) + { + return KS_NAME + suffix + '.' + TBL_NAME; + } + + private void pauseOffsetBroadcasts(Cluster cluster, boolean pause) + { + for (int i = 1; i <= cluster.size(); i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance().pauseOffsetBroadcast(pause)); + } + + private static Range fullTokenRange() + { + return new Range<>( + new Murmur3Partitioner.LongToken(Long.MIN_VALUE), + new Murmur3Partitioner.LongToken(Long.MAX_VALUE) + ); + } + + @Test + public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, ""); + + // Create a sync coordinator for a range that has no data + // It should complete immediately since there are no offsets to sync + Boolean completed = cluster.get(1).callOnInstance(() -> { + Range range = fullTokenRange(); + RepairJobDesc desc = new RepairJobDesc(TimeUUID.Generator.nextTimeUUID(), + TimeUUID.Generator.nextTimeUUID(), + KS_NAME, "", java.util.List.of(range)); + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator( + SharedContext.Global.instance, desc, null, ClusterMetadata.current()); + coordinator.start(); + + try + { + coordinator.awaitCompletion(5, TimeUnit.SECONDS); + return true; + } + catch (Exception e) + { + return false; + } + }); + + assertTrue("Sync coordinator should complete when there are no pending offsets", completed); + } + } + + @Test + public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable + { + // Enable repair retries with a short request timeout so that the dropped MT_SYNC_RSP + // from node 1 causes a quick timeout and retry rather than a 2-minute wait and failure. + // After the message filter is reset, the retried MT_SYNC_REQ will get a response. + try (Cluster cluster = builder().withNodes(3) + .withConfig(config -> config.set("request_timeout", "1000ms") + .set("repair.retries.max_attempts", 10) + .set("repair.retries.base_sleep_time", "100ms") + .set("repair.retries.max_sleep_time", "500ms")) + .start()) + { + createTrackedKeyspace(cluster, "3"); + + // Block all messages FROM node 1 to prevent write replication + // and also to drop MT_SYNC_RSP from node 1 back to the coordinator. + // This ensures that write only succeeds locally on node 1 and the + // sync coordinator can't get node 1's sync response. + cluster.filters().allVerbs().from(1).drop(); + + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("3") + " (k, v) VALUES (1, 1)", + ConsistencyLevel.ONE + ); + + // Start MutationTrackingSyncCoordinator on node 2 in a separate thread. + // It should wait for offsets to sync since node 1's sync response is being dropped. + // The coordinator sends MT_SYNC_REQ to nodes 1 and 3. Node 3's response comes back + // but node 1's response is dropped. The coordinator stays blocked because + // pendingSyncResponses still contains node 1. After the filter is reset, the + // retried MT_SYNC_REQ will succeed and the coordinator can proceed. + CompletableFuture coordinatorFuture = CompletableFuture.supplyAsync(() -> cluster.get(2).callOnInstance(() -> { + Range range = fullTokenRange(); + RepairJobDesc desc = new RepairJobDesc(TimeUUID.Generator.nextTimeUUID(), + TimeUUID.Generator.nextTimeUUID(), + KS_NAME + '3', "", java.util.List.of(range)); + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator( + SharedContext.Global.instance, desc, null, ClusterMetadata.current()); + coordinator.start(); + + try + { + coordinator.awaitCompletion(30, TimeUnit.SECONDS); + return true; + } + catch (Exception e) + { + return false; + } + })); + + // Wait until node 1 has the data + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted(() -> { + Object[][] results = cluster.get(1).executeInternal( + "SELECT k, v FROM " + tableName("3") + " WHERE k = 1"); + assertEquals("Node 1 should have the data", 1, results.length); + }); + + // Verify other nodes shouldn't have the data yet since we have blocked messages + for (int i = 2; i <= 3; i++) + { + Object[][] results = cluster.get(i).executeInternal( + "SELECT k, v FROM " + tableName("3") + " WHERE k = 1" + ); + assertEquals("Node " + i + " should not have data yet", 0, results.length); + } + + // Verify coordinator stays blocked for at least 2 seconds while node 1's + // sync response is being dropped. The coordinator can't complete because + // pendingSyncResponses still contains node 1. + Awaitility.await() + .during(Duration.ofSeconds(2)) + .atMost(Duration.ofSeconds(3)) + .until(() -> !coordinatorFuture.isDone()); + + // Reset filter so that retried MT_SYNC_REQ to node 1 can get a response, + // and offset broadcasts from node 1 can reach other nodes. + cluster.filters().reset(); + + // Force offset broadcasts on all nodes to drive reconciliation. + // After the sync response from node 1 establishes targets, the coordinator + // needs to see that all replicas have caught up via offset broadcasts. + for (int i = 1; i <= 3; i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance().broadcastOffsetsForTesting()); + + // Wait for coordinator to complete + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofMillis(200)) + .until(coordinatorFuture::isDone); + + assertTrue("Coordinator should complete successfully", coordinatorFuture.get()); + + // Verify data propagated to all replicas + for (int i = 1; i <= 3; i++) + { + final int nodeId = i; + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(100)) + .untilAsserted(() -> { + Object[][] results = cluster.get(nodeId).executeInternal( + "SELECT k, v FROM " + tableName("3") + " WHERE k = 1"); + assertEquals("Node " + nodeId + " should have the data", 1, results.length); + assertEquals(1, results[0][0]); + assertEquals(1, results[0][1]); + }); + } + } + } + + @Test + public void testSyncCoordinatorCancel() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, "4"); + + // Pause offset broadcasts on all nodes to prevent sync from completing + pauseOffsetBroadcasts(cluster, true); + + for (int i = 0; i < 100; i++) + { + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("4") + " (k, v) VALUES (?, ?)", + ConsistencyLevel.ONE, i, i); + } + + // Start coordinator - it will be stuck waiting for offsets + Boolean wasCancelled = cluster.get(1).callOnInstance(() -> { + Range range = fullTokenRange(); + RepairJobDesc desc = new RepairJobDesc(TimeUUID.Generator.nextTimeUUID(), + TimeUUID.Generator.nextTimeUUID(), + KS_NAME + '4', "", java.util.List.of(range)); + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator( + SharedContext.Global.instance, desc, null, ClusterMetadata.current()); + coordinator.start(); + + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + + coordinator.cancel(); // Cancel it + + // Verify it was cancelled + try + { + coordinator.awaitCompletion(1, TimeUnit.SECONDS); + return false; // Should have thrown + } + catch (Exception e) + { + Throwable cause = e.getCause() != null ? e.getCause() : e; + return cause.getMessage() != null && cause.getMessage().contains("cancelled"); + } + }); + assertTrue("Sync coordinator should be cancelled", wasCancelled); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java index 2b38ba8e26bc..fa932943b07a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java @@ -236,7 +236,11 @@ public void writeRepairedRows() CLUSTER.coordinator(1).execute(insert.toString(), ConsistencyLevel.ALL); } - CLUSTER.get(1).nodetoolResult("repair", specification.keyspaceName()).asserts().success(); + // Background reconciliation doesn't exist/work so incremental repair just hangs waiting for reconciliation that never occurs + if (specification.replicationType.isTracked()) + CLUSTER.get(1).nodetoolResult("repair", "-full", specification.keyspaceName()).asserts().success(); + else + CLUSTER.get(1).nodetoolResult("repair", specification.keyspaceName()).asserts().success(); } public void writeUnrepairedRows() diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java index 935cd3db6563..8d270d51c2cf 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java @@ -31,6 +31,7 @@ import org.apache.cassandra.harry.gen.Generator; import org.apache.cassandra.harry.gen.SchemaGenerators; import org.apache.cassandra.replication.MutationJournal; +import org.junit.Ignore; import org.junit.Test; @@ -45,48 +46,76 @@ public void bounceTest() throws Throwable { try (Cluster cluster = builder().withNodes(1).start()) { - int tables = 10; - int writesPerKey = 2; - int pks = 100; - withRandom(rng -> { - cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1} " + - "AND replication_type='tracked'", - KEYSPACE)); - - List builders = new ArrayList<>(); - for (int i = 0; i < tables; i++) - { - Generator schemaGen = SchemaGenerators.trivialSchema(KEYSPACE, () -> "mutation_tracking_bounce_" + (builders.size() + 1), POPULATION, - SchemaSpec.optionsBuilder()); - - SchemaSpec schema = schemaGen.generate(rng); - cluster.schemaChange(schema.compile()); - builders.add(new ReplayingHistoryBuilder(schema.valueGenerators, - hb -> InJvmDTestVisitExecutor.builder() - .consistencyLevel(ConsistencyLevel.QUORUM) - .build(schema, hb, cluster))); - } + bounceTest(cluster, 1, 1); + } + } + + @Test + public void bounceTestMultiNode() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + bounceTest(cluster, 3, 1); + } + } + + @Ignore("https://issues.apache.org/jira/browse/CASSANDRA-21256") + @Test + public void doubleBounceTestMultiNode() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + bounceTest(cluster, 3, 2); + } + } - int counter = 0; - for (int pk = 0; pk < pks; pk++) { - for (HistoryBuilder history : builders) - for (int i = 0; i < writesPerKey; i++) - history.insert(pk); + private void bounceTest(Cluster cluster, int rf, int bounces) throws Throwable + { + int tables = 10; + int writesPerKey = 2; + int pks = 100; + withRandom(rng -> { + cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': %d} " + + "AND replication_type='tracked'", + KEYSPACE, rf)); + + List builders = new ArrayList<>(); + for (int i = 0; i < tables; i++) + { + Generator schemaGen = SchemaGenerators.trivialSchema(KEYSPACE, () -> "mutation_tracking_bounce_" + (builders.size() + 1), POPULATION, + SchemaSpec.optionsBuilder()); + + SchemaSpec schema = schemaGen.generate(rng); + cluster.schemaChange(schema.compile()); + builders.add(new ReplayingHistoryBuilder(schema.valueGenerators, + hb -> InJvmDTestVisitExecutor.builder() + .consistencyLevel(ConsistencyLevel.QUORUM) + .build(schema, hb, cluster))); + } + + int counter = 0; + for (int pk = 0; pk < pks; pk++) + { + for (HistoryBuilder history : builders) + for (int i = 0; i < writesPerKey; i++) + history.insert(pk); if (++counter % 10 == 0) cluster.get(1).runOnInstance(() -> MutationJournal.instance().closeCurrentSegmentForTestingIfNonEmpty()); } + for (int bounce = 0; bounce < bounces; bounce++) + { ClusterUtils.stopUnchecked(cluster.get(1)); cluster.get(1).startup(); + } - for (int pk = 0; pk < pks; pk++) - for (HistoryBuilder history : builders) - for (int i = 0; i < 10; i++) - history.selectPartition(pk); + for (int pk = 0; pk < pks; pk++) + for (HistoryBuilder history : builders) + for (int i = 0; i < 10; i++) + history.selectPartition(pk); - cluster.get(1).runOnInstance(new MutationTrackingBounce_ValidateRunnable(tables * pks * writesPerKey)); - }); - } + cluster.get(1).runOnInstance(new MutationTrackingBounce_ValidateRunnable(tables * pks * writesPerKey)); + }); } } \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java index 32fdd2505562..c2b99267bdee 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java @@ -418,19 +418,22 @@ public void testFailedMutationRedelivery() throws Throwable Assert.assertEquals(0, summary.get(logId).reconciled.offsetCount()); }); - // resume the reconciler + // resume the reconciler and spin until reconciliation completes. + // The reconciler retries with PUSH_MUTATION_REQ whose response inherits the + // request's expiry (write_request_timeout). Under load the response can arrive + // after that expiry and be silently dropped by InboundMessageHandler, requiring + // a retry cycle. Spinning accommodates multiple retry rounds. cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().resumeActiveReconciler()); - Thread.sleep(1000); // wait for reconiciler to do its job - cluster.get(1).runOnInstance(() -> - { - TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); - DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); - CoordinatorLogId logId = getOnlyLogId(summary); - Assert.assertEquals(0, summary.get(logId).unreconciled.offsetCount()); - Assert.assertEquals(1, summary.get(logId).reconciled.offsetCount()); - }); + Util.spinUntilTrue(() -> + cluster.get(1).callOnInstance(() -> { + TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); + DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); + CoordinatorLogId logId = getOnlyLogId(summary); + return summary.get(logId).unreconciled.offsetCount() == 0 + && summary.get(logId).reconciled.offsetCount() == 1; + }), 10); } } } diff --git a/test/unit/org/apache/cassandra/db/MutationTrackingMutationVerbHandlerMigrationTest.java b/test/unit/org/apache/cassandra/db/MutationTrackingMutationVerbHandlerMigrationTest.java index 47d9e81fb49d..05b19292ea1c 100644 --- a/test/unit/org/apache/cassandra/db/MutationTrackingMutationVerbHandlerMigrationTest.java +++ b/test/unit/org/apache/cassandra/db/MutationTrackingMutationVerbHandlerMigrationTest.java @@ -24,7 +24,6 @@ import org.junit.Test; import org.apache.cassandra.ServerTestUtils; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ReadCommand.PotentialTxnConflicts; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.partitions.PartitionUpdate; diff --git a/test/unit/org/apache/cassandra/service/replication/migration/MutationTrackingMigrationStateTest.java b/test/unit/org/apache/cassandra/service/replication/migration/MutationTrackingMigrationStateTest.java index dc27a3792c6d..36d77d46ac89 100644 --- a/test/unit/org/apache/cassandra/service/replication/migration/MutationTrackingMigrationStateTest.java +++ b/test/unit/org/apache/cassandra/service/replication/migration/MutationTrackingMigrationStateTest.java @@ -153,7 +153,7 @@ public void testWithMigrationsCompleted() assertNotSame(state, completed); assertFalse(completed.hasMigratingKeyspaces()); - assertNull(completed.getKeyspaceInfo("test_ks")); + assertFalse(completed.isMigrating("test_ks")); } @Test @@ -169,7 +169,7 @@ public void testWithMigrationsRemoved() MutationTrackingMigrationState removed = state.dropKeyspaces(epoch, Collections.singleton("test_ks")); assertFalse(removed.hasMigratingKeyspaces()); - assertNull(removed.getKeyspaceInfo("test_ks")); + assertFalse(removed.isMigrating("test_ks")); } @Test @@ -253,7 +253,7 @@ public void testMultipleKeyspaces() state = state.withRangesRepairedForTable("ks1", testTableId, Collections.singleton(fullRing), epoch); assertEquals(1, state.keyspaceInfo.size()); - assertNull(state.getKeyspaceInfo("ks1")); + assertFalse(state.isMigrating("ks1")); // ks2 should still have full ring pending KeyspaceMigrationInfo expectedKs2AfterKs1Complete = createExpectedKeyspaceMigrationInfo( diff --git a/test/unit/org/apache/cassandra/tcm/transformations/AdvanceMutationTrackingMigrationTest.java b/test/unit/org/apache/cassandra/tcm/transformations/AdvanceMutationTrackingMigrationTest.java index c0289e8fc27d..bd6a1db4c2cf 100644 --- a/test/unit/org/apache/cassandra/tcm/transformations/AdvanceMutationTrackingMigrationTest.java +++ b/test/unit/org/apache/cassandra/tcm/transformations/AdvanceMutationTrackingMigrationTest.java @@ -139,7 +139,7 @@ public void testAdvanceRangesCompleteMigration() // Verify migration was auto-completed (keyspace removed from state) assertFalse(updated.mutationTrackingMigrationState.hasMigratingKeyspaces()); - assertNull(updated.mutationTrackingMigrationState.getKeyspaceInfo("test_ks")); + assertFalse(updated.mutationTrackingMigrationState.isMigrating("test_ks")); } @Test @@ -240,7 +240,7 @@ public void testAdvancePartialThenComplete() assertTrue(result2.isSuccess()); ClusterMetadata afterComplete = result2.success().metadata; - assertNull(afterComplete.mutationTrackingMigrationState.getKeyspaceInfo("test_ks")); + assertFalse(afterComplete.mutationTrackingMigrationState.isMigrating("test_ks")); assertFalse(afterComplete.mutationTrackingMigrationState.hasMigratingKeyspaces()); } @@ -285,7 +285,7 @@ public void testAdvanceMultipleTables() assertTrue(result2.isSuccess()); ClusterMetadata afterTable2 = result2.success().metadata; - assertNull(afterTable2.mutationTrackingMigrationState.getKeyspaceInfo("test_ks")); + assertFalse(afterTable2.mutationTrackingMigrationState.isMigrating("test_ks")); assertFalse(afterTable2.mutationTrackingMigrationState.hasMigratingKeyspaces()); } diff --git a/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java b/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java index a82b516d7714..95e8b5371906 100644 --- a/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java +++ b/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java @@ -42,8 +42,7 @@ import static org.apache.cassandra.cql3.CQLTester.schemaChange; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * Tests for AlterSchema auto-starting mutation tracking migration when replication type changes. @@ -80,7 +79,7 @@ public void testAutoStartToTrackedMigration() throws Throwable schemaChange(String.format("CREATE TABLE %s.tbl (pk int PRIMARY KEY, val int)", ksName)); ClusterMetadata metadata = ClusterMetadata.current(); - assertNull(metadata.mutationTrackingMigrationState.getKeyspaceInfo(ksName)); + assertFalse(metadata.mutationTrackingMigrationState.isMigrating(ksName)); // Alter tracked replication schemaChange(String.format("ALTER KEYSPACE %s WITH replication_type = 'tracked'", ksName)); @@ -116,7 +115,7 @@ public void testAutoStartToUntrackedMigration() throws Throwable schemaChange(String.format("CREATE TABLE %s.tbl (pk int PRIMARY KEY, val int)", ksName)); ClusterMetadata metadata = ClusterMetadata.current(); - assertNull(metadata.mutationTrackingMigrationState.getKeyspaceInfo(ksName)); + assertFalse(metadata.mutationTrackingMigrationState.isMigrating(ksName)); // Alter keyspace to untracked schemaChange(String.format("ALTER KEYSPACE %s WITH replication_type = 'untracked'", ksName)); @@ -149,7 +148,7 @@ public void testNoMigrationWhenReplicationTypeUnchanged() throws Throwable "AND replication_type = 'untracked'" ); schemaChange(String.format("CREATE TABLE %s.tbl (pk int PRIMARY KEY, val int)", ksName)); - assertNull(ClusterMetadata.current().mutationTrackingMigrationState.getKeyspaceInfo(ksName)); + assertFalse(ClusterMetadata.current().mutationTrackingMigrationState.isMigrating(ksName)); // Alter keyspace without changing replication type schemaChange(String.format( @@ -158,7 +157,7 @@ public void testNoMigrationWhenReplicationTypeUnchanged() throws Throwable )); // confirm no migrations were started - assertNull(ClusterMetadata.current().mutationTrackingMigrationState.getKeyspaceInfo(ksName)); + assertFalse(ClusterMetadata.current().mutationTrackingMigrationState.isMigrating(ksName)); } @Test @@ -221,8 +220,8 @@ public void testReverseMigrationDirection() throws Throwable schemaChange(String.format("CREATE TABLE %s.tbl (pk int PRIMARY KEY, val int)", ksName)); ClusterMetadata metadata = ClusterMetadata.current(); - assertNull("Should have no migration before first alter for " + ksName, - metadata.mutationTrackingMigrationState.getKeyspaceInfo(ksName)); + assertFalse("Should have no migration before first alter for " + ksName, + metadata.mutationTrackingMigrationState.isMigrating(ksName)); // Alter to tracked (untracked → tracked) schemaChange(String.format("ALTER KEYSPACE %s WITH replication_type = 'tracked'", ksName)); @@ -249,7 +248,7 @@ public void testReverseMigrationDirection() throws Throwable // this should auto-complete the migration, since none of the ranges from the initial alter completed migration metadata = ClusterMetadata.current(); - assertNull(metadata.mutationTrackingMigrationState.getKeyspaceInfo(ksName)); + assertFalse(metadata.mutationTrackingMigrationState.isMigrating(ksName)); // Alter back to tracked again schemaChange(String.format("ALTER KEYSPACE %s WITH replication_type = 'tracked'", ksName)); @@ -283,13 +282,13 @@ public void testDropKeyspaceDuringMigration() throws Throwable schemaChange(String.format("ALTER KEYSPACE %s WITH replication_type = 'tracked'", ksName)); ClusterMetadata metadata = ClusterMetadata.current(); - assertNotNull(metadata.mutationTrackingMigrationState.getKeyspaceInfo(ksName)); + assertTrue(metadata.mutationTrackingMigrationState.isMigrating(ksName)); // Drop the keyspace & confirm migration is also removed schemaChange(String.format("DROP KEYSPACE %s", ksName)); ClusterMetadata afterDrop = ClusterMetadata.current(); - assertNull(afterDrop.mutationTrackingMigrationState.getKeyspaceInfo(ksName)); + assertFalse(afterDrop.mutationTrackingMigrationState.isMigrating(ksName)); } @Test