Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ Marking ranges as migrating is a lightweight operation and does not
trigger the repairs that will finish the migration.

The range to mark migrating needs to be explicitly
provided otherwise the entire ring will be marked migrating for the
specified keyspace and tables. If the entire range is marked migrating it is
only necessary to invoke `begin-migration` on one node.
provided otherwise it will default to the primary range
of the node that `nodetool` is connecting to.
This is similar to running repair with the -pr flag.

This is only needed if
`accord.default++_++transactional++_++mode=explicit` is set in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,17 @@ public static void startMigrationToConsensusProtocol(@Nullable List<String> keys

IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
Optional<List<Range<Token>>> maybeParsedRanges = maybeRangesStr.map(rangesStr -> ImmutableList.copyOf(RepairOption.parseRanges(rangesStr, partitioner)));
Token minToken = partitioner.getMinimumToken();
NormalizedRanges<Token> ranges = normalizedRanges(maybeParsedRanges.orElse(ImmutableList.of(new Range(minToken, minToken))));

NormalizedRanges<Token> ranges;
if (maybeParsedRanges.isPresent())
ranges = normalizedRanges(maybeParsedRanges.get());
else
{
List<Range<Token>> defaultRanges = new ArrayList<>();
for (String keyspaceName : keyspaceNames)
defaultRanges.addAll(StorageService.instance.getPrimaryRanges(keyspaceName));
ranges = normalizedRanges(defaultRanges);
}

ClusterMetadataService.instance().commit(new BeginConsensusMigrationForTableAndRange(ranges, tableIds));
}
Expand All @@ -216,7 +225,7 @@ public static Integer finishMigrationToConsensusProtocol(@Nonnull String keyspac
checkNotNull(target);
ClusterMetadata cm = ClusterMetadata.current();

Optional<List<Range<Token>>> localKeyspaceRanges = Optional.of(ImmutableList.copyOf(StorageService.instance.getLocalReplicas(keyspace).onlyFull().ranges()));
Optional<List<Range<Token>>> localKeyspaceRanges = Optional.of(ImmutableList.copyOf(StorageService.instance.getPrimaryRanges(keyspace)));
List<Range<Token>> ranges = maybeRangesToRanges(maybeRangesStr, localKeyspaceRanges);
Map<TableId, TableMigrationState> allTableMigrationStates = ClusterMetadata.current().consensusMigrationState.tableStates;
List<TableId> tableIds = keyspacesAndTablesToTableIds(cm, ImmutableList.of(keyspace), maybeTables, Optional.of(allTableMigrationStates::containsKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -91,6 +93,7 @@
import org.apache.cassandra.service.paxos.Commit.Proposal;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.ByteArrayUtil;
Expand Down Expand Up @@ -569,6 +572,47 @@ public void testPaxosToAccordSerialRead() throws Exception
});
}

@Test
public void testBeginMigrationTargetsOnlyLocalRanges() throws Exception
{
List<String> ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';',
"CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}",
"CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)");
test(ddls, cluster -> {
cluster.schemaChange("ALTER TABLE " + qualifiedRegularTableName + " WITH " + transactionalMode.asCqlParam());
cluster.get(1).nodetoolResult("consensus_admin", "begin-migration");

String keyspace = KEYSPACE;
String tableName = regularTableName;
cluster.get(1).runOnInstance(() -> {
ConsensusMigrationState state = ClusterMetadataService.instance().metadata().consensusMigrationState;
Collection<Range<Token>> expectedMigratingRange = StorageService.instance.getPrimaryRanges(keyspace);
assertEquals(normalizedRanges(expectedMigratingRange), state.tableStates.get(Schema.instance.getTableMetadata(keyspace, tableName).id()).migratingRanges());
});
});
}

@Test
public void testFinishMigrationTargetsOnlyLocalRanges() throws Exception
{
List<String> ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';',
"CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}",
"CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)");
test(ddls, cluster -> {
cluster.schemaChange("ALTER TABLE " + qualifiedRegularTableName + " WITH " + transactionalMode.asCqlParam());
cluster.get(1).nodetoolResult("consensus_admin", "begin-migration");
cluster.get(1).nodetoolResult("consensus_admin", "finish-migration");

String keyspace = KEYSPACE;
String tableName = regularTableName;
cluster.get(1).runOnInstance(() -> {
ConsensusMigrationState state = ClusterMetadataService.instance().metadata().consensusMigrationState;
Collection<Range<Token>> expectedMigratingRange = StorageService.instance.getPrimaryRanges(keyspace);
assertEquals(normalizedRanges(expectedMigratingRange), state.tableStates.get(Schema.instance.getTableMetadata(keyspace, tableName).id()).migratedRanges);
});
});
}

private void assertTransactionalModes(String keyspace, String table, TransactionalMode mode, TransactionalMigrationFromMode migration)
{
forEach(() -> {
Expand Down