From 32bec7bc25978c2178ed67de9c6eb775d680eb25 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Mar 2026 16:25:05 -0700 Subject: [PATCH 1/9] added check to make sure ranges that are still used by Accord are not deleted by nodetool cleanup --- .../db/compaction/CompactionManager.java | 29 +++++ .../test/accord/AccordCQLTestBase.java | 7 + .../accord/AccordNodetoolCleanupTest.java | 123 ++++++++++++++++++ 3 files changed, 159 insertions(+) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index ea370b4601fb..6d67f1a98fad 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -65,6 +65,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.primitives.Ranges; + import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.ExecutorFactory; import org.apache.cassandra.concurrent.WrappedExecutorPlus; @@ -114,9 +116,12 @@ import org.apache.cassandra.repair.NoSuchRepairSessionException; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.TokenRange; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ownership.DataPlacement; @@ -807,6 +812,21 @@ public Iterable filterSSTables(LifecycleTransaction transaction) SSTableReader sstable = sstableIter.next(); boolean needsCleanupFull = needsCleanup(sstable, fullRanges); boolean needsCleanupTransient = !transientRanges.isEmpty() && sstable.isRepaired() && needsCleanup(sstable, transientRanges); + boolean sstableRangesIntersectWithCommandStores = sstableContainsRangeNeededByAccord(cfStore.getTableId(), sstable); + + // If there still exists Command Stores that own this range, don't cleanup this specific range + // as Accord still needs it even though we no longer own the range + if (sstableRangesIntersectWithCommandStores) + { + logger.debug("Skipping {} ([{}, {}]) for cleanup; as Accord still needs the ranges.", + sstable, + sstable.getFirst().getToken(), + sstable.getLast().getToken()); + sstableIter.remove(); + transaction.cancel(sstable); + skippedSStables++; + } + //If there are no ranges for which the table needs cleanup either due to lack of intersection or lack //of the table being repaired. totalSSTables++; @@ -1599,6 +1619,15 @@ public static boolean needsCleanup(SSTableReader sstable, Collection builder + .withoutVNodes() + .withConfig(config -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)), 6); + } + + @Test + public void accordNodetoolCleanupTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + test(ddls, cluster -> { + cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 2, 2); + + SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + String keyspace = KEYSPACE; + String tableName = accordTableName; + cluster.get(2).flush(withKeyspace("%s")); + + assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(keyspace).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assert(token < Long.parseLong(originalToken)); + + cluster.get(2).runOnInstance(() -> { + StorageService.instance.move(Long.toString(token - 10000)); + }); + + NodeToolResult r = cluster.get(2).nodetoolResult("cleanup", KEYSPACE, accordTableName); + + assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(keyspace).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + // The invariant that we want to preserve is that no ranges that overlap with our command stores should be removed, in this case this so happens to be + cluster.get(2).runOnInstance(() -> { + Ranges commandStoreRanges = Ranges.EMPTY; + for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) + { + Ranges commandStoreRange = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get ranges", safeCommandStore -> { + return safeCommandStore.ranges().all(); + })); + + commandStoreRanges = commandStoreRanges.union(AbstractRanges.UnionMode.MERGE_ADJACENT, commandStoreRange); + } + }); + }); + } +} From b072804482d2c7d02c009496add689a95ac90c70 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Mar 2026 16:33:39 -0700 Subject: [PATCH 2/9] special case sstable with single key --- .../cassandra/db/compaction/CompactionManager.java | 13 ++++++++----- .../test/accord/AccordNodetoolCleanupTest.java | 8 -------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 6d67f1a98fad..2f2e6dd3dde4 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -122,6 +122,7 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.TokenKey; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ownership.DataPlacement; @@ -812,7 +813,7 @@ public Iterable filterSSTables(LifecycleTransaction transaction) SSTableReader sstable = sstableIter.next(); boolean needsCleanupFull = needsCleanup(sstable, fullRanges); boolean needsCleanupTransient = !transientRanges.isEmpty() && sstable.isRepaired() && needsCleanup(sstable, transientRanges); - boolean sstableRangesIntersectWithCommandStores = sstableContainsRangeNeededByAccord(cfStore.getTableId(), sstable); + boolean sstableRangesIntersectWithCommandStores = sstableContainsRangesNeededByAccord(cfStore.getTableId(), sstable); // If there still exists Command Stores that own this range, don't cleanup this specific range // as Accord still needs it even though we no longer own the range @@ -1619,13 +1620,15 @@ public static boolean needsCleanup(SSTableReader sstable, Collection { cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); - cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 2, 2); SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); From e3bd0f873ef55154ee55b337f785a6709ab9641f Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Mar 2026 16:35:12 -0700 Subject: [PATCH 3/9] removed unused imports --- .../distributed/test/accord/AccordCQLTestBase.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java index 0ed8cb5facd6..dbf6d6553f64 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java @@ -39,7 +39,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import static com.google.common.collect.Iterables.getOnlyElement; import org.assertj.core.api.Assertions; import org.junit.BeforeClass; @@ -48,9 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.local.CommandStore; -import accord.local.PreLoadContext; -import accord.primitives.Ranges; import accord.primitives.Unseekables; import accord.topology.SelectShards; import accord.topology.Topologies; @@ -77,14 +73,12 @@ import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICoordinator; -import org.apache.cassandra.distributed.api.NodeToolResult; import org.apache.cassandra.distributed.api.QueryResults; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.distributed.shared.AssertUtils; import org.apache.cassandra.distributed.test.sai.SAIUtil; import org.apache.cassandra.distributed.util.QueryResultUtil; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.AccordTestUtils; import org.apache.cassandra.service.consensus.TransactionalMode; @@ -105,7 +99,6 @@ import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat; -import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; From 3d752e3eca6ab7e4649a1c3d48e6eb08ac08f609 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Mar 2026 16:42:18 -0700 Subject: [PATCH 4/9] fix up tests --- .../accord/AccordNodetoolCleanupTest.java | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java index cd97d090fe54..251f083407ef 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java @@ -22,18 +22,23 @@ import java.util.Arrays; import java.util.List; +import accord.api.RoutingKey; import accord.local.CommandStore; import accord.local.PreLoadContext; import accord.primitives.AbstractRanges; import accord.primitives.Ranges; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.NodeToolResult; import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.api.TokenKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +46,7 @@ import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static com.google.common.collect.Iterables.getOnlyElement; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Test; @@ -74,15 +80,15 @@ public void accordNodetoolCleanupTest() throws Throwable "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}", "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); test(ddls, cluster -> { + String tableName = accordTableName; + cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); - String keyspace = KEYSPACE; - String tableName = accordTableName; cluster.get(2).flush(withKeyspace("%s")); - assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(keyspace).getColumnFamilyStore(tableName).getLiveSSTables().size())); + assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); @@ -91,25 +97,31 @@ public void accordNodetoolCleanupTest() throws Throwable assert(token < Long.parseLong(originalToken)); cluster.get(2).runOnInstance(() -> { - StorageService.instance.move(Long.toString(token - 10000)); - }); + TableId tid = Schema.instance.getTableMetadata(KEYSPACE, tableName).id(); + RoutingKey key = TokenKey.parse(tid, String.valueOf(token), Murmur3Partitioner.instance); - NodeToolResult r = cluster.get(2).nodetoolResult("cleanup", KEYSPACE, accordTableName); - - assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(keyspace).getColumnFamilyStore(tableName).getLiveSSTables().size())); - - // The invariant that we want to preserve is that no ranges that overlap with our command stores should be removed, in this case this so happens to be - cluster.get(2).runOnInstance(() -> { - Ranges commandStoreRanges = Ranges.EMPTY; + boolean tokenInCommandStore = false; for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) { Ranges commandStoreRange = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get ranges", safeCommandStore -> { return safeCommandStore.ranges().all(); })); - commandStoreRanges = commandStoreRanges.union(AbstractRanges.UnionMode.MERGE_ADJACENT, commandStoreRange); + if (commandStoreRange.intersects(key)) + tokenInCommandStore = true; } + + assertTrue(tokenInCommandStore); + }); + + cluster.get(2).runOnInstance(() -> { + StorageService.instance.move(Long.toString(token - 1000)); }); + + cluster.get(2).nodetool("cleanup", KEYSPACE, accordTableName); + + assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + }); } } From b20db8fd819e25c437ee1ca99dfd14507472d271 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Tue, 24 Mar 2026 16:53:57 -0700 Subject: [PATCH 5/9] comments --- .../cassandra/db/compaction/CompactionManager.java | 2 +- .../test/accord/AccordNodetoolCleanupTest.java | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 2f2e6dd3dde4..18c28ad0490e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1622,7 +1622,7 @@ public static boolean needsCleanup(SSTableReader sstable, Collection Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); long token = (Long) result.toObjectArrays()[0][0]; assert(token < Long.parseLong(originalToken)); + assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + // SSTable overlaps with Accord CommandStores ranges, so after cleanup we should still have that + // 1 SSTable, even though we no longer own the ranges for that SSTable cluster.get(2).runOnInstance(() -> { TableId tid = Schema.instance.getTableMetadata(KEYSPACE, tableName).id(); RoutingKey key = TokenKey.parse(tid, String.valueOf(token), Murmur3Partitioner.instance); @@ -103,7 +103,7 @@ public void accordNodetoolCleanupTest() throws Throwable boolean tokenInCommandStore = false; for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) { - Ranges commandStoreRange = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get ranges", safeCommandStore -> { + Ranges commandStoreRange = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get CommandStore ranges", safeCommandStore -> { return safeCommandStore.ranges().all(); })); @@ -121,7 +121,6 @@ public void accordNodetoolCleanupTest() throws Throwable cluster.get(2).nodetool("cleanup", KEYSPACE, accordTableName); assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - }); } } From 5a892a48a68dcdf856118ed0f157a5c6914e4957 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 25 Mar 2026 13:38:55 -0700 Subject: [PATCH 6/9] moved test to cql test base --- .../test/accord/AccordCQLTestBase.java | 64 +++++++++ .../accord/AccordNodetoolCleanupTest.java | 126 ------------------ 2 files changed, 64 insertions(+), 126 deletions(-) delete mode 100644 test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java index dbf6d6553f64..fcaae0fc1f81 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java @@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import static com.google.common.collect.Iterables.getOnlyElement; import org.assertj.core.api.Assertions; import org.junit.BeforeClass; @@ -47,6 +48,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.RoutingKey; +import accord.local.CommandStore; +import accord.local.PreLoadContext; +import accord.primitives.Ranges; import accord.primitives.Unseekables; import accord.topology.SelectShards; import accord.topology.Topologies; @@ -64,6 +69,7 @@ import org.apache.cassandra.cql3.ast.Txn; import org.apache.cassandra.cql3.functions.types.utils.Bytes; import org.apache.cassandra.cql3.statements.TransactionStatement; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.MapType; @@ -79,8 +85,12 @@ import org.apache.cassandra.distributed.test.sai.SAIUtil; import org.apache.cassandra.distributed.util.QueryResultUtil; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.AccordTestUtils; +import org.apache.cassandra.service.accord.api.TokenKey; import org.apache.cassandra.service.consensus.TransactionalMode; import org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode; import org.apache.cassandra.utils.AssertionUtils; @@ -99,6 +109,7 @@ import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat; +import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -3433,4 +3444,57 @@ public void userSeesInvalidRejection() throws Exception .hasMessage("Attempted to set an element on a list which is null"); }); } + + @Test + public void accordNodetoolCleanupTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH " + transactionalMode.asCqlParam()); + test(ddls, cluster -> { + String tableName = accordTableName; + + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + cluster.get(1).flush(withKeyspace("%s")); + + String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assert(token < Long.parseLong(originalToken)); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + // SSTable overlaps with Accord CommandStores ranges, so after cleanup we should still have that + // 1 SSTable, even though we no longer own the ranges for that SSTable + cluster.get(1).runOnInstance(() -> { + TableId tid = Schema.instance.getTableMetadata(KEYSPACE, tableName).id(); + RoutingKey key = TokenKey.parse(tid, String.valueOf(token), Murmur3Partitioner.instance); + + boolean tokenInCommandStore = false; + for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) + { + Ranges commandStoreRange = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get CommandStore ranges", safeCommandStore -> { + return safeCommandStore.ranges().all(); + })); + + if (commandStoreRange.intersects(key)) + tokenInCommandStore = true; + } + + assertTrue(tokenInCommandStore); + }); + + cluster.get(1).runOnInstance(() -> { + StorageService.instance.move(Long.toString(token - 1000)); + }); + + cluster.get(1).nodetool("cleanup", KEYSPACE, accordTableName); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + }); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java deleted file mode 100644 index 9fa0f9573a0b..000000000000 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.test.accord; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -import accord.api.RoutingKey; -import accord.local.CommandStore; -import accord.local.PreLoadContext; -import accord.primitives.Ranges; - -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.distributed.api.Feature; -import org.apache.cassandra.distributed.api.SimpleQueryResult; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.accord.AccordService; -import org.apache.cassandra.service.accord.api.TokenKey; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.cassandra.service.accord.AccordService.getBlocking; -import static com.google.common.collect.Iterables.getOnlyElement; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.junit.BeforeClass; -import org.junit.Test; - -public class AccordNodetoolCleanupTest extends AccordTestBase -{ - private static final Logger logger = LoggerFactory.getLogger(AccordNodetoolCleanupTest.class); - - @Override - protected Logger logger() - { - return logger; - } - - @BeforeClass - public static void setupClass() throws IOException - { - AccordTestBase.setupCluster(builder -> builder - .withoutVNodes() - .withConfig(config -> - config - .set("accord.shard_durability_target_splits", "1") - .set("accord.shard_durability_cycle", "20s") - .with(Feature.NETWORK, Feature.GOSSIP)), 6); - } - - @Test - public void accordNodetoolCleanupTest() throws Throwable - { - List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', - "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}", - "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); - test(ddls, cluster -> { - String tableName = accordTableName; - - cluster.coordinator(2).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); - - SimpleQueryResult result = cluster.coordinator(2).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); - - cluster.get(2).flush(withKeyspace("%s")); - - String originalToken = cluster.get(2).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); - - long token = (Long) result.toObjectArrays()[0][0]; - - assert(token < Long.parseLong(originalToken)); - - assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - - // SSTable overlaps with Accord CommandStores ranges, so after cleanup we should still have that - // 1 SSTable, even though we no longer own the ranges for that SSTable - cluster.get(2).runOnInstance(() -> { - TableId tid = Schema.instance.getTableMetadata(KEYSPACE, tableName).id(); - RoutingKey key = TokenKey.parse(tid, String.valueOf(token), Murmur3Partitioner.instance); - - boolean tokenInCommandStore = false; - for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) - { - Ranges commandStoreRange = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get CommandStore ranges", safeCommandStore -> { - return safeCommandStore.ranges().all(); - })); - - if (commandStoreRange.intersects(key)) - tokenInCommandStore = true; - } - - assertTrue(tokenInCommandStore); - }); - - cluster.get(2).runOnInstance(() -> { - StorageService.instance.move(Long.toString(token - 1000)); - }); - - cluster.get(2).nodetool("cleanup", KEYSPACE, accordTableName); - - assertEquals(1, (int) cluster.get(2).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - }); - } -} From ecbf66f1257ecb874ea527822a30b02f6dce34e7 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 25 Mar 2026 13:55:05 -0700 Subject: [PATCH 7/9] add additional test and check if accord service is on --- .../db/compaction/CompactionManager.java | 3 ++ .../test/accord/AccordCQLTestBase.java | 36 ++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 18c28ad0490e..de46469a1d37 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1622,6 +1622,9 @@ public static boolean needsCleanup(SSTableReader sstable, Collection Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); @@ -3497,4 +3497,38 @@ public void accordNodetoolCleanupTest() throws Throwable assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); }); } + + @Test + public void nodetoolCleanupForNonAccordTableTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", + "CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)"); + test(ddls, cluster -> { + String tableName = regularTableName; + + cluster.coordinator(1).execute("INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (?, ?)", ConsistencyLevel.ALL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedRegularTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + cluster.get(1).flush(withKeyspace("%s")); + + String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assertTrue(token < Long.parseLong(originalToken)); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + cluster.get(1).runOnInstance(() -> { + StorageService.instance.move(Long.toString(token - 1000)); + }); + + cluster.get(1).nodetool("cleanup", KEYSPACE, regularTableName); + + // SSTable should be removed as we no longer own the range and no Accord CommandStore needs these ranges + assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + }); + } } From 33ecb950babe8e0d849f93ac1a982653f83f53e9 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 1 Apr 2026 01:00:06 -0700 Subject: [PATCH 8/9] cleanup on retirement rather than on command store deletion --- .../db/compaction/CompactionManager.java | 95 ++++++++-- .../test/accord/AccordCQLTestBase.java | 98 ---------- .../accord/AccordNodetoolCleanupTest.java | 175 ++++++++++++++++++ 3 files changed, 253 insertions(+), 115 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index de46469a1d37..336f07533d49 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -65,6 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.primitives.AbstractRanges; import accord.primitives.Ranges; import org.apache.cassandra.cache.AutoSavingCache; @@ -618,7 +619,12 @@ public Object call() throws Exception FBUtilities.waitOnFutures(futures); assert compacting.originals().isEmpty(); logger.info("Finished {} for {}.{} successfully", operationType, keyspace, table); - return AllSSTableOpStatus.SUCCESSFUL; + + // Some SSTables were not fully cleaned up because Accord is still using those ranges + if (operation.incompleteOperation()) + return AllSSTableOpStatus.INCOMPLETE; + else + return AllSSTableOpStatus.SUCCESSFUL; } finally { @@ -640,15 +646,18 @@ public Object call() throws Exception private static interface OneSSTableOperation { + Iterable filterSSTables(LifecycleTransaction transaction); void execute(LifecycleTransaction input) throws IOException; + boolean incompleteOperation(); } public enum AllSSTableOpStatus { SUCCESSFUL(0), ABORTED(1), - UNABLE_TO_CANCEL(2); + UNABLE_TO_CANCEL(2), + INCOMPLETE(3); public final int statusCode; @@ -673,6 +682,12 @@ public void execute(LifecycleTransaction input) { scrubOne(cfs, input, options, active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, jobs, OperationType.SCRUB); } @@ -701,6 +716,12 @@ public void execute(LifecycleTransaction input) { verifyOne(cfs, input.onlyOne(), options, active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, 0, OperationType.VERIFY); } @@ -765,6 +786,12 @@ public void execute(LifecycleTransaction txn) task.setCompactionType(OperationType.UPGRADE_SSTABLES); task.execute(active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, jobs, OperationType.UPGRADE_SSTABLES); } @@ -801,6 +828,8 @@ public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jo return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() { + boolean incompleteOperation = false; + @Override public Iterable filterSSTables(LifecycleTransaction transaction) { @@ -808,16 +837,15 @@ public Iterable filterSSTables(LifecycleTransaction transaction) Iterator sstableIter = sortedSSTables.iterator(); int totalSSTables = 0; int skippedSStables = 0; + int sstablesInUseByAccord = 0; while (sstableIter.hasNext()) { SSTableReader sstable = sstableIter.next(); boolean needsCleanupFull = needsCleanup(sstable, fullRanges); boolean needsCleanupTransient = !transientRanges.isEmpty() && sstable.isRepaired() && needsCleanup(sstable, transientRanges); - boolean sstableRangesIntersectWithCommandStores = sstableContainsRangesNeededByAccord(cfStore.getTableId(), sstable); + totalSSTables++; - // If there still exists Command Stores that own this range, don't cleanup this specific range - // as Accord still needs it even though we no longer own the range - if (sstableRangesIntersectWithCommandStores) + if (sstableContainsRangesNeededByAccord(cfStore.getTableId(), sstable)) { logger.debug("Skipping {} ([{}, {}]) for cleanup; as Accord still needs the ranges.", sstable, @@ -825,13 +853,12 @@ public Iterable filterSSTables(LifecycleTransaction transaction) sstable.getLast().getToken()); sstableIter.remove(); transaction.cancel(sstable); - skippedSStables++; + sstablesInUseByAccord++; + incompleteOperation = true; } - //If there are no ranges for which the table needs cleanup either due to lack of intersection or lack //of the table being repaired. - totalSSTables++; - if (!needsCleanupFull && !needsCleanupTransient) + else if (!needsCleanupFull && !needsCleanupTransient) { logger.debug("Skipping {} ([{}, {}]) for cleanup; all rows should be kept. Needs cleanup full ranges: {} Needs cleanup transient ranges: {} Repaired: {}", sstable, @@ -845,8 +872,13 @@ public Iterable filterSSTables(LifecycleTransaction transaction) skippedSStables++; } } - logger.info("Skipping cleanup for {}/{} sstables for {}.{} since they are fully contained in owned ranges (full ranges: {}, transient ranges: {})", - skippedSStables, totalSSTables, cfStore.getKeyspaceName(), cfStore.getTableName(), fullRanges, transientRanges); + + logger.info("Skipping cleanup for {} sstables for {}.{} since they are fully contained in owned ranges (full ranges: {}, transient ranges: {})", + skippedSStables, cfStore.getKeyspaceName(), cfStore.getTableName(), fullRanges, transientRanges); + logger.info("Skipping cleanup for {} sstables for {}.{} since they are still being used by Accord", + sstablesInUseByAccord, cfStore.getKeyspaceName(), cfStore.getTableName()); + logger.info("Cleaned up {} sstables for {}.{}", totalSSTables, cfStore.getKeyspaceName(), cfStore.getTableName()); + sortedSSTables.sort(SSTableReader.sizeComparator); return sortedSSTables; } @@ -857,6 +889,12 @@ public void execute(LifecycleTransaction txn) throws IOException CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, allRanges, transientRanges, txn.onlyOne().isRepaired(), FBUtilities.nowInSeconds()); doCleanupOne(cfStore, txn, cleanupStrategy, allRanges, hasIndexes); } + + @Override + public boolean incompleteOperation() + { + return incompleteOperation; + } }, jobs, OperationType.CLEANUP); } @@ -924,6 +962,12 @@ protected int getLevel() task.setCompactionType(OperationType.GARBAGE_COLLECT); task.execute(active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, jobs, OperationType.GARBAGE_COLLECT); } @@ -992,6 +1036,12 @@ public void execute(LifecycleTransaction txn) task.setCompactionType(OperationType.RELOCATE); task.execute(active); } + + @Override + public boolean incompleteOperation() + { + return false; + } }, jobs, OperationType.RELOCATE); } @@ -1625,13 +1675,24 @@ public static boolean sstableContainsRangesNeededByAccord(TableId tableId, SSTab if (!AccordService.isSetup()) return false; - Ranges accordOwnedRanges = AccordService.instance().node().commandStores().commandStoresOwnedRanges(); + Future> futureAccordOwnedRanges = AccordService.toFuture(AccordService.instance().node().commandStores().getInUseRanges()); + logger.info("Waiting for Accord to be ready"); + + try + { + Ranges accordOwnedRanges = futureAccordOwnedRanges.get().stream().reduce(Ranges.EMPTY, (acc, r) -> acc.union(AbstractRanges.UnionMode.MERGE_ADJACENT, r)); - if (sstable.getFirst().equals(sstable.getLast())) - return accordOwnedRanges.intersects(new TokenKey(tableId, sstable.getFirst().getToken())); + if (sstable.getFirst().equals(sstable.getLast())) + return accordOwnedRanges.intersects(new TokenKey(tableId, sstable.getFirst().getToken())); - Ranges sstableRanges = Ranges.of(TokenRange.create(tableId, sstable.getFirst().getToken(), sstable.getLast().getToken())); - return accordOwnedRanges.intersects(sstableRanges); + Ranges sstableRanges = Ranges.of(TokenRange.create(tableId, sstable.getFirst().getToken(), sstable.getLast().getToken())); + return accordOwnedRanges.intersects(sstableRanges); + } + catch (Throwable e) + { + logger.error("Error while waiting for Accord in use ranges", e); + return true; + } } /** diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java index 40be8699eb04..dbf6d6553f64 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java @@ -39,7 +39,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import static com.google.common.collect.Iterables.getOnlyElement; import org.assertj.core.api.Assertions; import org.junit.BeforeClass; @@ -48,10 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.api.RoutingKey; -import accord.local.CommandStore; -import accord.local.PreLoadContext; -import accord.primitives.Ranges; import accord.primitives.Unseekables; import accord.topology.SelectShards; import accord.topology.Topologies; @@ -69,7 +64,6 @@ import org.apache.cassandra.cql3.ast.Txn; import org.apache.cassandra.cql3.functions.types.utils.Bytes; import org.apache.cassandra.cql3.statements.TransactionStatement; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.MapType; @@ -85,12 +79,8 @@ import org.apache.cassandra.distributed.test.sai.SAIUtil; import org.apache.cassandra.distributed.util.QueryResultUtil; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.AccordTestUtils; -import org.apache.cassandra.service.accord.api.TokenKey; import org.apache.cassandra.service.consensus.TransactionalMode; import org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode; import org.apache.cassandra.utils.AssertionUtils; @@ -109,7 +99,6 @@ import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat; -import static org.apache.cassandra.service.accord.AccordService.getBlocking; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -3444,91 +3433,4 @@ public void userSeesInvalidRejection() throws Exception .hasMessage("Attempted to set an element on a list which is null"); }); } - - @Test - public void accordNodetoolCleanupTest() throws Throwable - { - List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', - "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", - "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH " + transactionalMode.asCqlParam()); - test(ddls, cluster -> { - String tableName = accordTableName; - - cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); - - SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); - - cluster.get(1).flush(withKeyspace("%s")); - - String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); - - long token = (Long) result.toObjectArrays()[0][0]; - - assertTrue(token < Long.parseLong(originalToken)); - - assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - - // SSTable overlaps with Accord CommandStores ranges, so after cleanup we should still have that - // 1 SSTable, even though we no longer own the ranges for that SSTable - cluster.get(1).runOnInstance(() -> { - TableId tid = Schema.instance.getTableMetadata(KEYSPACE, tableName).id(); - RoutingKey key = TokenKey.parse(tid, String.valueOf(token), Murmur3Partitioner.instance); - - boolean tokenInCommandStore = false; - for (CommandStore commandStore : AccordService.instance().node().commandStores().all()) - { - Ranges commandStoreRange = getBlocking(commandStore.submit((PreLoadContext.Empty) () -> "Get CommandStore ranges", safeCommandStore -> { - return safeCommandStore.ranges().all(); - })); - - if (commandStoreRange.intersects(key)) - tokenInCommandStore = true; - } - - assertTrue(tokenInCommandStore); - }); - - cluster.get(1).runOnInstance(() -> { - StorageService.instance.move(Long.toString(token - 1000)); - }); - - cluster.get(1).nodetool("cleanup", KEYSPACE, accordTableName); - - assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - }); - } - - @Test - public void nodetoolCleanupForNonAccordTableTest() throws Throwable - { - List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', - "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", - "CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)"); - test(ddls, cluster -> { - String tableName = regularTableName; - - cluster.coordinator(1).execute("INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (?, ?)", ConsistencyLevel.ALL, 1, 2); - - SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedRegularTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); - - cluster.get(1).flush(withKeyspace("%s")); - - String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); - - long token = (Long) result.toObjectArrays()[0][0]; - - assertTrue(token < Long.parseLong(originalToken)); - - assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - - cluster.get(1).runOnInstance(() -> { - StorageService.instance.move(Long.toString(token - 1000)); - }); - - cluster.get(1).nodetool("cleanup", KEYSPACE, regularTableName); - - // SSTable should be removed as we no longer own the range and no Accord CommandStore needs these ranges - assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - }); - } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java new file mode 100644 index 000000000000..89b7effe4512 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.service.StorageService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.BeforeClass; +import org.junit.Test; + +public class AccordNodetoolCleanupTest extends AccordTestBase +{ + private static final Logger logger = LoggerFactory.getLogger(AccordNodetoolCleanupTest.class); + + @Override + protected Logger logger() + { + return logger; + } + + @BeforeClass + public static void setupClass() throws IOException + { + AccordTestBase.setupCluster(builder -> builder + .withoutVNodes() + .withConfig(config -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)), 2); + } + + @Test + public void accordNodetoolCleanupTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + test(ddls, cluster -> { + + String tableName = accordTableName; + + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + cluster.get(1).flush(withKeyspace("%s")); + + String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assertTrue(token < Long.parseLong(originalToken)); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + cluster.get(1).runOnInstance(() -> { + StorageService.instance.move(Long.toString(token - 1000)); + }); + + // Wait until Accord retires range + try + { + Thread.sleep(20000); + } + catch (InterruptedException e) + { + fail(); + } + + cluster.get(1).nodetool("cleanup", KEYSPACE, accordTableName); + + assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + }); + } + + @Test + public void accordNodetoolCleanupRangeInUseTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + test(ddls, cluster -> { + + String tableName = accordTableName; + + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + cluster.get(1).flush(withKeyspace("%s")); + + String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assertTrue(token < Long.parseLong(originalToken)); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + cluster.get(1).runOnInstance(() -> StorageService.instance.move(Long.toString(token - 1000))); + + String accordTableName = qualifiedAccordTableName; + + cluster.get(1).nodetool("cleanup", KEYSPACE, accordTableName); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + }); + } + + @Test + public void nodetoolCleanupForNonAccordTableTest() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", + "CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)"); + test(ddls, cluster -> { + String tableName = regularTableName; + + cluster.coordinator(1).execute("INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (?, ?)", ConsistencyLevel.ALL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedRegularTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + + cluster.get(1).flush(withKeyspace("%s")); + + String originalToken = cluster.get(1).callOnInstance(() -> getOnlyElement(StorageService.instance.getTokens())); + + long token = (Long) result.toObjectArrays()[0][0]; + + assertTrue(token < Long.parseLong(originalToken)); + + assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + + cluster.get(1).runOnInstance(() -> { + StorageService.instance.move(Long.toString(token - 1000)); + }); + + cluster.get(1).nodetool("cleanup", KEYSPACE, regularTableName); + + assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); + }); + } +} + From b5934f8e176198acd311d85c9c6e2e4d7aa06509 Mon Sep 17 00:00:00 2001 From: Alan Wang Date: Wed, 1 Apr 2026 10:23:58 -0700 Subject: [PATCH 9/9] refactor tests --- .../accord/AccordNodetoolCleanupTest.java | 111 ++++++++---------- 1 file changed, 50 insertions(+), 61 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java index 89b7effe4512..3e8e7e18448e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolCleanupTest.java @@ -18,62 +18,41 @@ package org.apache.cassandra.distributed.test.accord; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.service.StorageService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static com.google.common.collect.Iterables.getOnlyElement; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.jupiter.api.Assertions.assertEquals; -import org.junit.BeforeClass; import org.junit.Test; -public class AccordNodetoolCleanupTest extends AccordTestBase +public class AccordNodetoolCleanupTest extends TestBaseImpl { - private static final Logger logger = LoggerFactory.getLogger(AccordNodetoolCleanupTest.class); - - @Override - protected Logger logger() - { - return logger; - } - - @BeforeClass - public static void setupClass() throws IOException - { - AccordTestBase.setupCluster(builder -> builder - .withoutVNodes() - .withConfig(config -> - config - .set("accord.shard_durability_target_splits", "1") - .set("accord.shard_durability_cycle", "20s") - .with(Feature.NETWORK, Feature.GOSSIP)), 2); - } - @Test public void accordNodetoolCleanupTest() throws Throwable { - List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', - "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", - "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); - test(ddls, cluster -> { - - String tableName = accordTableName; - - cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); - - SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + String tableName = "tbl0"; + String qualifiedTableName = KEYSPACE + '.' + tableName; + try (Cluster cluster = init(builder().withNodes(2).withoutVNodes().withConfig((config) -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)).start())) + { + cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}"); + cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); + + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); cluster.get(1).flush(withKeyspace("%s")); @@ -99,25 +78,30 @@ public void accordNodetoolCleanupTest() throws Throwable fail(); } - cluster.get(1).nodetool("cleanup", KEYSPACE, accordTableName); + cluster.get(1).nodetool("cleanup", KEYSPACE, tableName); assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - }); + } } @Test public void accordNodetoolCleanupRangeInUseTest() throws Throwable { - List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', - "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", - "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); - test(ddls, cluster -> { + String tableName = "tbl0"; + String qualifiedTableName = KEYSPACE + '.' + tableName; + try (Cluster cluster = init(builder().withNodes(2).withoutVNodes().withConfig((config) -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)).start())) { - String tableName = accordTableName; + cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}"); + cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v int) WITH transactional_mode='full'"); - cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (?, ?)"), ConsistencyLevel.SERIAL, 1, 2); - SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedAccordTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); cluster.get(1).flush(withKeyspace("%s")); @@ -131,26 +115,31 @@ public void accordNodetoolCleanupRangeInUseTest() throws Throwable cluster.get(1).runOnInstance(() -> StorageService.instance.move(Long.toString(token - 1000))); - String accordTableName = qualifiedAccordTableName; - - cluster.get(1).nodetool("cleanup", KEYSPACE, accordTableName); + cluster.get(1).nodetoolResult("cleanup", KEYSPACE, tableName); assertEquals(1, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - }); + } } @Test public void nodetoolCleanupForNonAccordTableTest() throws Throwable { - List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', - "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", - "CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)"); - test(ddls, cluster -> { - String tableName = regularTableName; + String tableName = "tbl0"; + String qualifiedTableName = KEYSPACE + '.' + tableName; + try (Cluster cluster = init(builder().withNodes(2).withoutVNodes().withConfig((config) -> + config + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "20s") + .with(Feature.NETWORK, Feature.GOSSIP)).start())) + { + + cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}"); + cluster.schemaChange("CREATE TABLE " + qualifiedTableName + " (k int PRIMARY KEY, v int)"); - cluster.coordinator(1).execute("INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (?, ?)", ConsistencyLevel.ALL, 1, 2); + cluster.coordinator(1).execute("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (?, ?)", ConsistencyLevel.ALL, 1, 2); - SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedRegularTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); + SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT token(k) FROM " + qualifiedTableName + " WHERE k = 1 LIMIT 1", ConsistencyLevel.SERIAL); cluster.get(1).flush(withKeyspace("%s")); @@ -166,10 +155,10 @@ public void nodetoolCleanupForNonAccordTableTest() throws Throwable StorageService.instance.move(Long.toString(token - 1000)); }); - cluster.get(1).nodetool("cleanup", KEYSPACE, regularTableName); + cluster.get(1).nodetool("cleanup", KEYSPACE, tableName); assertEquals(0, (int) cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore(tableName).getLiveSSTables().size())); - }); + } } }