diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index ea370b4601fb..336f07533d49 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -65,6 +65,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.primitives.AbstractRanges; +import accord.primitives.Ranges; + import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.ExecutorFactory; import org.apache.cassandra.concurrent.WrappedExecutorPlus; @@ -114,9 +117,13 @@ import org.apache.cassandra.repair.NoSuchRepairSessionException; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.TokenKey; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ownership.DataPlacement; @@ -612,7 +619,12 @@ public Object call() throws Exception FBUtilities.waitOnFutures(futures); assert compacting.originals().isEmpty(); logger.info("Finished {} for {}.{} successfully", operationType, keyspace, table); - return AllSSTableOpStatus.SUCCESSFUL; + + // Some SSTables were not fully cleaned up because Accord is still using those ranges + if (operation.incompleteOperation()) + return AllSSTableOpStatus.INCOMPLETE; + else + return AllSSTableOpStatus.SUCCESSFUL; } finally { @@ -634,15 +646,18 @@ public Object call() throws Exception private static interface OneSSTableOperation { + Iterable filterSSTables(LifecycleTransaction transaction); void execute(LifecycleTransaction input) throws IOException; + boolean incompleteOperation(); } public enum AllSSTableOpStatus { SUCCESSFUL(0), ABORTED(1), - UNABLE_TO_CANCEL(2); + UNABLE_TO_CANCEL(2), + INCOMPLETE(3); public final int statusCode; @@ -667,6 +682,12 @@ public void execute(LifecycleTransaction input) { scrubOne(cfs, input, options, active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, jobs, OperationType.SCRUB); } @@ -695,6 +716,12 @@ public void execute(LifecycleTransaction input) { verifyOne(cfs, input.onlyOne(), options, active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, 0, OperationType.VERIFY); } @@ -759,6 +786,12 @@ public void execute(LifecycleTransaction txn) task.setCompactionType(OperationType.UPGRADE_SSTABLES); task.execute(active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, jobs, OperationType.UPGRADE_SSTABLES); } @@ -795,6 +828,8 @@ public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jo return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() { + boolean incompleteOperation = false; + @Override public Iterable filterSSTables(LifecycleTransaction transaction) { @@ -802,15 +837,28 @@ public Iterable filterSSTables(LifecycleTransaction transaction) Iterator sstableIter = sortedSSTables.iterator(); int totalSSTables = 0; int skippedSStables = 0; + int sstablesInUseByAccord = 0; while (sstableIter.hasNext()) { SSTableReader sstable = sstableIter.next(); boolean needsCleanupFull = needsCleanup(sstable, fullRanges); boolean needsCleanupTransient = !transientRanges.isEmpty() && sstable.isRepaired() && needsCleanup(sstable, transientRanges); + totalSSTables++; + + if (sstableContainsRangesNeededByAccord(cfStore.getTableId(), sstable)) + { + logger.debug("Skipping {} ([{}, {}]) for cleanup; as Accord still needs the ranges.", + sstable, + sstable.getFirst().getToken(), + sstable.getLast().getToken()); + sstableIter.remove(); + transaction.cancel(sstable); + sstablesInUseByAccord++; + incompleteOperation = true; + } //If there are no ranges for which the table needs cleanup either due to lack of intersection or lack //of the table being repaired. - totalSSTables++; - if (!needsCleanupFull && !needsCleanupTransient) + else if (!needsCleanupFull && !needsCleanupTransient) { logger.debug("Skipping {} ([{}, {}]) for cleanup; all rows should be kept. Needs cleanup full ranges: {} Needs cleanup transient ranges: {} Repaired: {}", sstable, @@ -824,8 +872,13 @@ public Iterable filterSSTables(LifecycleTransaction transaction) skippedSStables++; } } - logger.info("Skipping cleanup for {}/{} sstables for {}.{} since they are fully contained in owned ranges (full ranges: {}, transient ranges: {})", - skippedSStables, totalSSTables, cfStore.getKeyspaceName(), cfStore.getTableName(), fullRanges, transientRanges); + + logger.info("Skipping cleanup for {} sstables for {}.{} since they are fully contained in owned ranges (full ranges: {}, transient ranges: {})", + skippedSStables, cfStore.getKeyspaceName(), cfStore.getTableName(), fullRanges, transientRanges); + logger.info("Skipping cleanup for {} sstables for {}.{} since they are still being used by Accord", + sstablesInUseByAccord, cfStore.getKeyspaceName(), cfStore.getTableName()); + logger.info("Cleaned up {} sstables for {}.{}", totalSSTables, cfStore.getKeyspaceName(), cfStore.getTableName()); + sortedSSTables.sort(SSTableReader.sizeComparator); return sortedSSTables; } @@ -836,6 +889,12 @@ public void execute(LifecycleTransaction txn) throws IOException CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, allRanges, transientRanges, txn.onlyOne().isRepaired(), FBUtilities.nowInSeconds()); doCleanupOne(cfStore, txn, cleanupStrategy, allRanges, hasIndexes); } + + @Override + public boolean incompleteOperation() + { + return incompleteOperation; + } }, jobs, OperationType.CLEANUP); } @@ -903,6 +962,12 @@ protected int getLevel() task.setCompactionType(OperationType.GARBAGE_COLLECT); task.execute(active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, jobs, OperationType.GARBAGE_COLLECT); } @@ -971,6 +1036,12 @@ public void execute(LifecycleTransaction txn) task.setCompactionType(OperationType.RELOCATE); task.execute(active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, jobs, OperationType.RELOCATE); } @@ -1599,6 +1670,31 @@ public static boolean needsCleanup(SSTableReader sstable, Collection> futureAccordOwnedRanges = AccordService.toFuture(AccordService.instance().node().commandStores().getInUseRanges()); + logger.info("Waiting for Accord to be ready"); + + try + { + Ranges accordOwnedRanges = futureAccordOwnedRanges.get().stream().reduce(Ranges.EMPTY, (acc, r) -> acc.union(AbstractRanges.UnionMode.MERGE_ADJACENT, r)); + + if (sstable.getFirst().equals(sstable.getLast())) + return accordOwnedRanges.intersects(new TokenKey(tableId, sstable.getFirst().getToken())); + + Ranges sstableRanges = Ranges.of(TokenRange.create(tableId, sstable.getFirst().getToken(), sstable.getLast().getToken())); + return accordOwnedRanges.intersects(sstableRanges); + } + catch (Throwable e) + { + logger.error("Error while waiting for Accord in use ranges", e); + return true; + } + } + /** * This function goes over a file and removes the keys that the node is not responsible for * and only keeps keys that this node is responsible for. diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java new file mode 100644 index 000000000000..3e8e7e18448e --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.service.StorageService; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.Test; + +public class AccordNodetoolCleanupTest extends TestBaseImpl +{ + @Test + public void accordNodetoolCleanupTest() throws Throwable + { + String tableName = "tbl0"; + String qualifiedTableName = KEYSPACE + '.' + tableName; + try (Cluster cluster = init(builder().withNodes(2).withoutVNodes().withConfig((config) -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)).start())) + { + cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}"); + cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + cluster.get(1).flush(withKeyspace("%s")); + + String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assertTrue(token < Long.parseLong(originalToken)); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + cluster.get(1).runOnInstance(() -> { + StorageService.instance.move(Long.toString(token - 1000)); + }); + + // Wait until Accord retires range + try + { + Thread.sleep(20000); + } + catch (InterruptedException e) + { + fail(); + } + + cluster.get(1).nodetool("cleanup", KEYSPACE, tableName); + + assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + } + } + + @Test + public void accordNodetoolCleanupRangeInUseTest() throws Throwable + { + String tableName = "tbl0"; + String qualifiedTableName = KEYSPACE + '.' + tableName; + try (Cluster cluster = init(builder().withNodes(2).withoutVNodes().withConfig((config) -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)).start())) { + + cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}"); + cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + cluster.get(1).flush(withKeyspace("%s")); + + String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assertTrue(token < Long.parseLong(originalToken)); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + cluster.get(1).runOnInstance(() -> StorageService.instance.move(Long.toString(token - 1000))); + + cluster.get(1).nodetoolResult("cleanup", KEYSPACE, tableName); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + } + } + + @Test + public void nodetoolCleanupForNonAccordTableTest() throws Throwable + { + String tableName = "tbl0"; + String qualifiedTableName = KEYSPACE + '.' + tableName; + try (Cluster cluster = init(builder().withNodes(2).withoutVNodes().withConfig((config) -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)).start())) + { + + cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}"); + cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v int)"); + + cluster.coordinator(1).execute("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (?, ?)", ConsistencyLevel.ALL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + cluster.get(1).flush(withKeyspace("%s")); + + String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assertTrue(token < Long.parseLong(originalToken)); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + cluster.get(1).runOnInstance(() -> { + StorageService.instance.move(Long.toString(token - 1000)); + }); + + cluster.get(1).nodetool("cleanup", KEYSPACE, tableName); + + assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + } + } +} +