Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 102 additions & 6 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.primitives.AbstractRanges;
import accord.primitives.Ranges;

import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.WrappedExecutorPlus;
Expand Down Expand Up @@ -114,9 +117,13 @@
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.TokenRange;
import org.apache.cassandra.service.accord.api.TokenKey;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ownership.DataPlacement;
Expand Down Expand Up @@ -612,7 +619,12 @@ public Object call() throws Exception
FBUtilities.waitOnFutures(futures);
assert compacting.originals().isEmpty();
logger.info("Finished {} for {}.{} successfully", operationType, keyspace, table);
return AllSSTableOpStatus.SUCCESSFUL;

// Some SSTables were not fully cleaned up because Accord is still using those ranges
if (operation.incompleteOperation())
return AllSSTableOpStatus.INCOMPLETE;
else
return AllSSTableOpStatus.SUCCESSFUL;
}
finally
{
Expand All @@ -634,15 +646,18 @@ public Object call() throws Exception

private static interface OneSSTableOperation
{

Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction);
void execute(LifecycleTransaction input) throws IOException;
boolean incompleteOperation();
}

public enum AllSSTableOpStatus
{
SUCCESSFUL(0),
ABORTED(1),
UNABLE_TO_CANCEL(2);
UNABLE_TO_CANCEL(2),
INCOMPLETE(3);

public final int statusCode;

Expand All @@ -667,6 +682,12 @@ public void execute(LifecycleTransaction input)
{
scrubOne(cfs, input, options, active);
}

@Override
public boolean incompleteOperation()
{
return false;
}
}, jobs, OperationType.SCRUB);
}

Expand Down Expand Up @@ -695,6 +716,12 @@ public void execute(LifecycleTransaction input)
{
verifyOne(cfs, input.onlyOne(), options, active);
}

@Override
public boolean incompleteOperation()
{
return false;
}
}, 0, OperationType.VERIFY);
}

Expand Down Expand Up @@ -759,6 +786,12 @@ public void execute(LifecycleTransaction txn)
task.setCompactionType(OperationType.UPGRADE_SSTABLES);
task.execute(active);
}

@Override
public boolean incompleteOperation()
{
return false;
}
}, jobs, OperationType.UPGRADE_SSTABLES);
}

Expand Down Expand Up @@ -795,22 +828,37 @@ public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jo

return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
{
boolean incompleteOperation = false;

@Override
public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
{
List<SSTableReader> sortedSSTables = Lists.newArrayList(transaction.originals());
Iterator<SSTableReader> sstableIter = sortedSSTables.iterator();
int totalSSTables = 0;
int skippedSStables = 0;
int sstablesInUseByAccord = 0;
while (sstableIter.hasNext())
{
SSTableReader sstable = sstableIter.next();
boolean needsCleanupFull = needsCleanup(sstable, fullRanges);
boolean needsCleanupTransient = !transientRanges.isEmpty() && sstable.isRepaired() && needsCleanup(sstable, transientRanges);
totalSSTables++;

if (sstableContainsRangesNeededByAccord(cfStore.getTableId(), sstable))
{
logger.debug("Skipping {} ([{}, {}]) for cleanup; as Accord still needs the ranges.",
sstable,
sstable.getFirst().getToken(),
sstable.getLast().getToken());
sstableIter.remove();
transaction.cancel(sstable);
sstablesInUseByAccord++;
incompleteOperation = true;
}
//If there are no ranges for which the table needs cleanup either due to lack of intersection or lack
//of the table being repaired.
totalSSTables++;
if (!needsCleanupFull && !needsCleanupTransient)
else if (!needsCleanupFull && !needsCleanupTransient)
{
logger.debug("Skipping {} ([{}, {}]) for cleanup; all rows should be kept. Needs cleanup full ranges: {} Needs cleanup transient ranges: {} Repaired: {}",
sstable,
Expand All @@ -824,8 +872,13 @@ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
skippedSStables++;
}
}
logger.info("Skipping cleanup for {}/{} sstables for {}.{} since they are fully contained in owned ranges (full ranges: {}, transient ranges: {})",
skippedSStables, totalSSTables, cfStore.getKeyspaceName(), cfStore.getTableName(), fullRanges, transientRanges);

logger.info("Skipping cleanup for {} sstables for {}.{} since they are fully contained in owned ranges (full ranges: {}, transient ranges: {})",
skippedSStables, cfStore.getKeyspaceName(), cfStore.getTableName(), fullRanges, transientRanges);
logger.info("Skipping cleanup for {} sstables for {}.{} since they are still being used by Accord",
sstablesInUseByAccord, cfStore.getKeyspaceName(), cfStore.getTableName());
logger.info("Cleaned up {} sstables for {}.{}", totalSSTables, cfStore.getKeyspaceName(), cfStore.getTableName());

sortedSSTables.sort(SSTableReader.sizeComparator);
return sortedSSTables;
}
Expand All @@ -836,6 +889,12 @@ public void execute(LifecycleTransaction txn) throws IOException
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, allRanges, transientRanges, txn.onlyOne().isRepaired(), FBUtilities.nowInSeconds());
doCleanupOne(cfStore, txn, cleanupStrategy, allRanges, hasIndexes);
}

@Override
public boolean incompleteOperation()
{
return incompleteOperation;
}
}, jobs, OperationType.CLEANUP);
}

Expand Down Expand Up @@ -903,6 +962,12 @@ protected int getLevel()
task.setCompactionType(OperationType.GARBAGE_COLLECT);
task.execute(active);
}

@Override
public boolean incompleteOperation()
{
return false;
}
}, jobs, OperationType.GARBAGE_COLLECT);
}

Expand Down Expand Up @@ -971,6 +1036,12 @@ public void execute(LifecycleTransaction txn)
task.setCompactionType(OperationType.RELOCATE);
task.execute(active);
}

@Override
public boolean incompleteOperation()
{
return false;
}
}, jobs, OperationType.RELOCATE);
}

Expand Down Expand Up @@ -1599,6 +1670,31 @@ public static boolean needsCleanup(SSTableReader sstable, Collection<Range<Token
return false;
}

public static boolean sstableContainsRangesNeededByAccord(TableId tableId, SSTableReader sstable)
{
if (!AccordService.isSetup())
return false;

Future<List<Ranges>> futureAccordOwnedRanges = AccordService.toFuture(AccordService.instance().node().commandStores().getInUseRanges());
logger.info("Waiting for Accord to be ready");

try
{
Ranges accordOwnedRanges = futureAccordOwnedRanges.get().stream().reduce(Ranges.EMPTY, (acc, r) -> acc.union(AbstractRanges.UnionMode.MERGE_ADJACENT, r));

if (sstable.getFirst().equals(sstable.getLast()))
return accordOwnedRanges.intersects(new TokenKey(tableId, sstable.getFirst().getToken()));

Ranges sstableRanges = Ranges.of(TokenRange.create(tableId, sstable.getFirst().getToken(), sstable.getLast().getToken()));
return accordOwnedRanges.intersects(sstableRanges);
}
catch (Throwable e)
{
logger.error("Error while waiting for Accord in use ranges", e);
return true;
}
}

/**
* This function goes over a file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test.accord;

import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.service.StorageService;

import static com.google.common.collect.Iterables.getOnlyElement;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.Test;

public class AccordNodetoolCleanupTest extends TestBaseImpl
{
@Test
public void accordNodetoolCleanupTest() throws Throwable
{
String tableName = "tbl0";
String qualifiedTableName = KEYSPACE + '.' + tableName;
try (Cluster cluster = init(builder().withNodes(2).withoutVNodes().withConfig((config) ->
config
.set("accord.shard_durability_target_splits", "1")
.set("accord.shard_durability_cycle", "20s")
.with(Feature.NETWORK, Feature.GOSSIP)).start()))
{
cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'");

cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);

SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);

cluster.get(1).flush(withKeyspace("%s"));

String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens()));

long token = (Long) result.toObjectArrays()[0][0];

assertTrue(token < Long.parseLong(originalToken));

assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));

cluster.get(1).runOnInstance(() -> {
StorageService.instance.move(Long.toString(token - 1000));
});

// Wait until Accord retires range
try
{
Thread.sleep(20000);
}
catch (InterruptedException e)
{
fail();
}

cluster.get(1).nodetool("cleanup", KEYSPACE, tableName);

assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
}
}

@Test
public void accordNodetoolCleanupRangeInUseTest() throws Throwable
{
String tableName = "tbl0";
String qualifiedTableName = KEYSPACE + '.' + tableName;
try (Cluster cluster = init(builder().withNodes(2).withoutVNodes().withConfig((config) ->
config
.set("accord.shard_durability_target_splits", "1")
.set("accord.shard_durability_cycle", "20s")
.with(Feature.NETWORK, Feature.GOSSIP)).start())) {

cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'");

cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2);

SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);

cluster.get(1).flush(withKeyspace("%s"));

String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens()));

long token = (Long) result.toObjectArrays()[0][0];

assertTrue(token < Long.parseLong(originalToken));

assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));

cluster.get(1).runOnInstance(() -> StorageService.instance.move(Long.toString(token - 1000)));

cluster.get(1).nodetoolResult("cleanup", KEYSPACE, tableName);

assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
}
}

@Test
public void nodetoolCleanupForNonAccordTableTest() throws Throwable
{
String tableName = "tbl0";
String qualifiedTableName = KEYSPACE + '.' + tableName;
try (Cluster cluster = init(builder().withNodes(2).withoutVNodes().withConfig((config) ->
config
.set("accord.shard_durability_target_splits", "1")
.set("accord.shard_durability_cycle", "20s")
.with(Feature.NETWORK, Feature.GOSSIP)).start()))
{

cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v int)");

cluster.coordinator(1).execute("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (?, ?)", ConsistencyLevel.ALL, 1, 2);

SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL);

cluster.get(1).flush(withKeyspace("%s"));

String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens()));

long token = (Long) result.toObjectArrays()[0][0];

assertTrue(token < Long.parseLong(originalToken));

assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));

cluster.get(1).runOnInstance(() -> {
StorageService.instance.move(Long.toString(token - 1000));
});

cluster.get(1).nodetool("cleanup", KEYSPACE, tableName);

assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size()));
}
}
}