From 4c33f884ed86d8017cb15906b2ca4458116df362 Mon Sep 17 00:00:00 2001 From: Matt Byrd Date: Thu, 10 Aug 2023 16:25:52 +0100 Subject: [PATCH 1/2] Uncompressed size is being used for compressed tables in maintenance operations use compression ratio to scale the amount of remaining disk usage CompactionInfo::estimatedRemainingWriteBytes is uncompressed, causing compaction to be rejected due to size when it has space to complete patch by Matt Byrd; reviewed by for CASSANDRA-21245 Co-authored-by: Jon Meredith --- .../cassandra/cache/AutoSavingCache.java | 1 + .../db/compaction/ActiveCompactions.java | 4 +- .../db/compaction/CompactionInfo.java | 32 +++++++----- .../db/compaction/CompactionIterator.java | 8 ++- .../db/compaction/CompactionTask.java | 2 +- .../cassandra/db/view/ViewBuilderTask.java | 4 +- .../internal/CollatedViewIndexBuilder.java | 4 +- .../sai/StorageAttachedIndexBuilder.java | 1 + .../index/sasi/SASIIndexBuilder.java | 1 + .../sstable/format/SortedTableScrubber.java | 1 + .../sstable/format/SortedTableVerifier.java | 1 + .../IndexSummaryRedistribution.java | 2 +- .../cassandra/streaming/StreamSession.java | 2 +- .../test/CompactionDiskSpaceTest.java | 4 +- .../test/SecondaryIndexCompactionTest.java | 4 +- .../test/StreamsDiskSpaceTest.java | 4 +- .../db/compaction/CompactionInfoTest.java | 4 +- .../db/compaction/CompactionsCQLTest.java | 49 ++++++++++++++++--- .../db/repair/PendingAntiCompactionTest.java | 6 +-- .../db/virtual/SSTableTasksTableTest.java | 3 +- .../indexsummary/IndexSummaryManagerTest.java | 2 +- .../tools/nodetool/CompactionStatsTest.java | 16 +++--- 22 files changed, 108 insertions(+), 47 deletions(-) diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 3c3d9a8cdd59..3f2cf718603c 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -330,6 +330,7 @@ else if (cacheType == CacheService.CacheType.COUNTER_CACHE) type, 0, keysEstimate, + keysEstimate, Unit.KEYS, nextTimeUUID(), getCacheDataPath(CURRENT_VERSION).toPath().toString()); diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java index 4e238ad95d46..dba1959285f3 100644 --- a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java +++ b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java @@ -55,7 +55,7 @@ public void finishCompaction(CompactionInfo.Holder ci) /** * Get the estimated number of bytes remaining to write per sstable directory */ - public Map estimatedRemainingWriteBytes() + public Map estimatedRemainingWriteToDiskBytes() { synchronized (compactions) { @@ -66,7 +66,7 @@ public Map estimatedRemainingWriteBytes() List directories = compactionInfo.getTargetDirectories(); if (directories == null || directories.isEmpty()) continue; - long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteBytes() / directories.size(); + long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteToDiskBytes() / directories.size(); for (File directory : directories) writeBytesPerSSTableDir.merge(directory, remainingWriteBytesPerDataDir, Long::sum); } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index 0bfc925a7d0d..e87c4e0e109b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@ -52,16 +52,18 @@ public final class CompactionInfo private final OperationType tasktype; private final long completed; private final long total; + private final long totalCompressed; private final Unit unit; private final TimeUUID compactionId; private final ImmutableSet sstables; private final String targetDirectory; - public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, Collection sstables, String targetDirectory) + public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, Collection sstables, String targetDirectory) { this.tasktype = tasktype; this.completed = completed; this.total = total; + this.totalCompressed = totalCompressed; this.metadata = metadata; this.unit = unit; this.compactionId = compactionId; @@ -69,38 +71,38 @@ public CompactionInfo(TableMetadata metadata, OperationType tasktype, long compl this.targetDirectory = targetDirectory; } - public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection sstables, String targetDirectory) + public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection sstables, String targetDirectory) { - this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, targetDirectory); + this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, targetDirectory); } - public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection sstables) + public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection sstables) { - this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, null); + this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, null); } /** * Special compaction info where we always need to cancel the compaction - for example ViewBuilderTask where we don't know * the sstables at construction */ - public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId) + public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId) { - return withoutSSTables(metadata, tasktype, completed, total, unit, compactionId, null); + return withoutSSTables(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, null); } /** * Special compaction info where we always need to cancel the compaction - for example AutoSavingCache where we don't know * the sstables at construction */ - public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, String targetDirectory) + public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, String targetDirectory) { - return new CompactionInfo(metadata, tasktype, completed, total, unit, compactionId, ImmutableSet.of(), targetDirectory); + return new CompactionInfo(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, ImmutableSet.of(), targetDirectory); } /** @return A copy of this CompactionInfo with updated progress. */ public CompactionInfo forProgress(long complete, long total) { - return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId, sstables, targetDirectory); + return new CompactionInfo(metadata, tasktype, complete, total, totalCompressed, unit, compactionId, sstables, targetDirectory); } public Optional getKeyspace() @@ -183,12 +185,16 @@ public String targetDirectory() /** * Note that this estimate is based on the amount of data we have left to read - it assumes input * size == output size for a compaction, which is not really true, but should most often provide a worst case - * remaining write size. + * remaining write size. We also scale by the effective compression ratio since total/completed are for the uncompressed size. */ - public long estimatedRemainingWriteBytes() + public long estimatedRemainingWriteToDiskBytes() { if (unit == Unit.BYTES && tasktype.writesData) - return getTotal() - getCompleted(); + { + final long total = getTotal(); + double compressionRatio = total == 0 ? 1 : ((double) totalCompressed / (double)total); + return (long)(compressionRatio * (total - getCompleted())); + } return 0; } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 7f97dc0663cc..0ea91185ad26 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -154,6 +154,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte private final long nowInSec; private final TimeUUID compactionId; private final long totalBytes; + private final long totalCompressedBytes; private long bytesRead; private long totalSourceCQLRows; @@ -227,9 +228,13 @@ public CompactionIterator(OperationType type, this.bytesRead = 0; long bytes = 0; - for (ISSTableScanner scanner : scanners) + long compressedBytes = 0; + for (ISSTableScanner scanner : scanners) { bytes += scanner.getLengthInBytes(); + compressedBytes += scanner.getCompressedLengthInBytes(); + } this.totalBytes = bytes; + this.totalCompressedBytes = compressedBytes; this.mergeCounters = new long[scanners.size()]; // note that we leak `this` from the constructor when calling beginCompaction below, this means we have to get the sstables before // calling that to avoid a NPE. @@ -277,6 +282,7 @@ public CompactionInfo getCompactionInfo() type, bytesRead, totalBytes, + totalCompressedBytes, compactionId, sstables, targetDirectory); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 7336c4543a8a..bf62db23f593 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -489,7 +489,7 @@ protected boolean buildCompactionCandidatesForAvailableDiskSpace(final Set expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteBytes(); + Map expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes(); // todo: abort streams if they block compactions if (cfs.getDirectories().hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSize, expectedWriteSize)) diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java index daf08794d539..548751667f76 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java @@ -205,13 +205,13 @@ public CompactionInfo getCompactionInfo() if (range.left.getPartitioner().splitter().isPresent()) { long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000); - return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId); + return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, 1000, Unit.RANGES, compactionId); } // When there is no splitter, estimate based on number of total keys but // take the max with keysBuilt + 1 to avoid having more completed than total long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range)); - return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId); + return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, keysTotal, Unit.KEYS, compactionId); } @Override diff --git a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java index 07bdc420ca07..92c48cc7ca40 100644 --- a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java @@ -61,8 +61,10 @@ public CompactionInfo getCompactionInfo() OperationType.INDEX_BUILD, iter.getBytesRead(), iter.getTotalBytes(), + iter.getTotalBytes(), compactionId, - sstables); + sstables + ); } public void build() diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java index a2022f350143..4abd4e21bf15 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java @@ -250,6 +250,7 @@ public CompactionInfo getCompactionInfo() OperationType.INDEX_BUILD, bytesProcessed, totalSizeInBytes, + totalSizeInBytes, compactionId, sstables.keySet()); } diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java index 555bce1b9add..4e71538e41f7 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java @@ -134,6 +134,7 @@ public CompactionInfo getCompactionInfo() OperationType.INDEX_BUILD, bytesProcessed, totalBytesToProcess, + totalBytesToProcess, compactionId, sstables.keySet(), targetDirectory); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableScrubber.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableScrubber.java index 07fc965673da..66effe8e455f 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableScrubber.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableScrubber.java @@ -384,6 +384,7 @@ public CompactionInfo getCompactionInfo() OperationType.SCRUB, dataFile.getFilePointer(), dataFile.length(), + sstable.onDiskLength(), scrubCompactionId, ImmutableSet.of(sstable), File.getPath(sstable.getFilename()).getParent().toString()); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableVerifier.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableVerifier.java index 305c337a475a..b586c1b9f6ab 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableVerifier.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableVerifier.java @@ -499,6 +499,7 @@ public CompactionInfo getCompactionInfo() OperationType.VERIFY, dataFile.getFilePointer(), dataFile.length(), + sstable.onDiskLength(), verificationCompactionId, ImmutableSet.of(sstable)); } diff --git a/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryRedistribution.java index 335a659354ec..e37014c5507a 100644 --- a/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryRedistribution.java +++ b/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryRedistribution.java @@ -363,7 +363,7 @@ static > Pair, List perTableIdIncomingBytes, for (FileStore fs : allWriteableFileStores) newStreamBytesToWritePerFileStore.merge(fs, totalBytesInPerFileStore, Long::sum); } - Map totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteBytes(), + Map totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes(), fileStoreMapper); long totalStreamRemaining = StreamManager.instance.getTotalRemainingOngoingBytes(); long totalBytesStreamRemainingPerFileStore = totalStreamRemaining / Math.max(1, allFileStores.size()); diff --git a/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java index 36c5163e534a..7ee7c9cc4a5c 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java @@ -115,7 +115,7 @@ public static class BB public static void install(ClassLoader cl, Integer node) { new ByteBuddy().rebase(ActiveCompactions.class) - .method(named("estimatedRemainingWriteBytes")) + .method(named("estimatedRemainingWriteToDiskBytes")) .intercept(MethodDelegation.to(BB.class)) .make() .load(cl, ClassLoadingStrategy.Default.INJECTION); @@ -127,7 +127,7 @@ public static void install(ClassLoader cl, Integer node) .load(cl, ClassLoadingStrategy.Default.INJECTION); } - public static Map estimatedRemainingWriteBytes() + public static Map estimatedRemainingWriteToDiskBytes() { if (sstableDir != null) return ImmutableMap.of(sstableDir, estimatedRemaining.get()); diff --git a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java index 9d168145c55b..eea28b01b1d1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java @@ -60,7 +60,7 @@ public void test2iCompaction() throws IOException // emulate ongoing index compaction: CompactionInfo.Holder h = new MockHolder(i.getIndexCfs().metadata(), idxSSTables); CompactionManager.instance.active.beginCompaction(h); - CompactionManager.instance.active.estimatedRemainingWriteBytes(); + CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes(); CompactionManager.instance.active.finishCompaction(h); }); } @@ -79,7 +79,7 @@ public MockHolder(TableMetadata metadata, Set sstables) @Override public CompactionInfo getCompactionInfo() { - return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, nextTimeUUID(), sstables); + return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, 0, nextTimeUUID(), sstables); } @Override diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java index f2a20fdf8e58..9f7336444fa7 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java @@ -73,7 +73,7 @@ public void testAbortStreamsWhenOngoingCompactionsLeaveInsufficientSpace() throw .withConfig(config -> config.set("hinted_handoff_enabled", false) .with(GOSSIP) .with(NETWORK)) - .withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, ActiveCompactions.class, "estimatedRemainingWriteBytes")) + .withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, ActiveCompactions.class, "estimatedRemainingWriteToDiskBytes")) .start())) { cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}"); @@ -139,7 +139,7 @@ public static long getTotalRemainingOngoingBytes() return ongoing.get(); } - public static Map estimatedRemainingWriteBytes() + public static Map estimatedRemainingWriteToDiskBytes() { Map ret = new HashMap<>(); if (datadir != null) diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionInfoTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionInfoTest.java index c4ae274809d8..2fd20d25aa3a 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionInfoTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionInfoTest.java @@ -39,7 +39,7 @@ public void testCompactionInfoToStringContainsTaskId() { ColumnFamilyStore cfs = MockSchema.newCFS(); TimeUUID expectedTaskId = nextTimeUUID(); - CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, expectedTaskId, new ArrayList<>()); + CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, 1000, expectedTaskId, new ArrayList<>()); Assertions.assertThat(compactionInfo.toString()) .contains(expectedTaskId.toString()); } @@ -50,7 +50,7 @@ public void testCompactionInfoToStringFormat() UUID tableId = UUID.randomUUID(); TimeUUID taskId = nextTimeUUID(); ColumnFamilyStore cfs = MockSchema.newCFS(builder -> builder.id(TableId.fromUUID(tableId))); - CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, taskId, new ArrayList<>()); + CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, 300, taskId, new ArrayList<>()); Assertions.assertThat(compactionInfo.toString()) .isEqualTo("Compaction(%s, 0 / 1000 bytes)@%s(mockks, mockcf1)", taskId, tableId); } diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index 57113b4feb84..006b8146e5a5 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@ -58,6 +58,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.LegacySSTableTest; +import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.util.File; @@ -85,7 +86,7 @@ public class CompactionsCQLTest extends CQLTester public void before() throws IOException { strategy = DatabaseDescriptor.getCorruptedTombstoneStrategy(); - + CommitLog.instance.resetUnsafe(true); } @@ -877,7 +878,9 @@ public void testNoDiskspace() throws Throwable execute("insert into %s (id, i) values (?,?)", i, i); getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); } - CompactionInfo.Holder holder = holder(OperationType.COMPACTION); + // When we have an existing an compaction with sstables of total size more than double the available space, + // we should not be able to then run a major compaction + CompactionInfo.Holder holder = holder(OperationType.COMPACTION, 2); CompactionManager.instance.active.beginCompaction(holder); try { @@ -893,7 +896,7 @@ public void testNoDiskspace() throws Throwable CompactionManager.instance.active.finishCompaction(holder); } // don't block compactions if there is a huge validation - holder = holder(OperationType.VALIDATION); + holder = holder(OperationType.VALIDATION, 2); CompactionManager.instance.active.beginCompaction(holder); try { @@ -903,9 +906,36 @@ public void testNoDiskspace() throws Throwable { CompactionManager.instance.active.finishCompaction(holder); } + + // Should be able to run when the sstables in question are 90% of the total available space + holder = holder(OperationType.COMPACTION, 0.9); + CompactionManager.instance.active.beginCompaction(holder); + try + { + getCurrentColumnFamilyStore().forceMajorCompaction(); + } + finally + { + CompactionManager.instance.active.finishCompaction(holder); + } + } + + private static final class OnDiskLengthOverrideSSTableReader extends ForwardingSSTableReader + { + private final long onDiskLengthOverride; + public OnDiskLengthOverrideSSTableReader(SSTableReader delegate, long onDiskLengthOverride) { + super(delegate); + this.onDiskLengthOverride = onDiskLengthOverride; + } + + @Override + public long onDiskLength() + { + return onDiskLengthOverride; + } } - private CompactionInfo.Holder holder(OperationType opType) + private CompactionInfo.Holder holder(OperationType opType, double availableSpaceMultiplier) { CompactionInfo.Holder holder = new CompactionInfo.Holder() { @@ -915,12 +945,19 @@ public CompactionInfo getCompactionInfo() for (File f : getCurrentColumnFamilyStore().getDirectories().getCFDirectories()) availableSpace += PathUtils.tryGetSpace(f.toPath(), FileStore::getUsableSpace); + Set liveSSTables = getCurrentColumnFamilyStore().getLiveSSTables(); + long totalDiskUsage = (long)(availableSpace * availableSpaceMultiplier); + long sstableSize = totalDiskUsage / liveSSTables.size(); + final Set overridenSStables = liveSSTables.stream().map(s -> new OnDiskLengthOverrideSSTableReader(s, sstableSize)).collect(Collectors.toSet()); + // Arbitrary compression ratio of 3.4 + long totalUncompressedSize = (long) ((double) totalDiskUsage * 3.4); return new CompactionInfo(getCurrentColumnFamilyStore().metadata(), opType, +0, - +availableSpace * 2, + totalUncompressedSize, + totalDiskUsage, nextTimeUUID(), - getCurrentColumnFamilyStore().getLiveSSTables()); + overridenSStables); } public boolean isGlobal() diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java index f39eeaf1bec9..387cd4f1b42f 100644 --- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java @@ -609,7 +609,7 @@ private void tryPredicate(ColumnFamilyStore cfs, List compacting, { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 1000, nextTimeUUID(), compacting); + return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 1000, 1000, nextTimeUUID(), compacting); } public boolean isGlobal() @@ -650,7 +650,7 @@ public void testRetries() throws InterruptedException, ExecutionException { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, nextTimeUUID(), cfs.getLiveSSTables()); + return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, 0, nextTimeUUID(), cfs.getLiveSSTables()); } public boolean isGlobal() @@ -703,7 +703,7 @@ public void testRetriesTimeout() throws InterruptedException, ExecutionException { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, nextTimeUUID(), cfs.getLiveSSTables()); + return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, 0, nextTimeUUID(), cfs.getLiveSSTables()); } public boolean isGlobal() diff --git a/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java b/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java index 3a7ec8389483..5934810318b6 100644 --- a/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java @@ -70,6 +70,7 @@ public void testSelectAll() throws Throwable long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -81,7 +82,7 @@ public void testSelectAll() throws Throwable { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables, directory); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables, directory); } public boolean isGlobal() diff --git a/test/unit/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManagerTest.java index b7596ae07b63..366f4d20ad12 100644 --- a/test/unit/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManagerTest.java @@ -655,7 +655,7 @@ public void testCancelIndexHelper(Consumer cancelFunction) th { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.UNKNOWN, 0, 0, nextTimeUUID(), compacting); + return new CompactionInfo(cfs.metadata(), OperationType.UNKNOWN, 0, 0, 0, nextTimeUUID(), compacting); } public boolean isGlobal() diff --git a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java index 64ab8fe34bb3..b0bf059e036d 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java @@ -57,6 +57,7 @@ public void testCompactionStats() long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -65,7 +66,7 @@ public void testCompactionStats() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables); } public boolean isGlobal() @@ -109,6 +110,7 @@ public void testCompactionStatsVtable() long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -118,7 +120,7 @@ public void testCompactionStatsVtable() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables, targetDirectory); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables, targetDirectory); } public boolean isGlobal() @@ -131,7 +133,7 @@ public boolean isGlobal() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.CLEANUP, bytesCompacted, bytesTotal, compactionId, sstables); + return new CompactionInfo(cfs.metadata(), OperationType.CLEANUP, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables); } public boolean isGlobal() @@ -168,6 +170,7 @@ public void testCompactionStatsHumanReadable() long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -176,7 +179,7 @@ public void testCompactionStatsHumanReadable() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables); } public boolean isGlobal() @@ -205,6 +208,7 @@ public void testCompactionStatsVtableHumanReadable() long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -214,7 +218,7 @@ public void testCompactionStatsVtableHumanReadable() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables, targetDirectory); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables, targetDirectory); } public boolean isGlobal() @@ -227,7 +231,7 @@ public boolean isGlobal() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.CLEANUP, bytesCompacted, bytesTotal, compactionId, sstables); + return new CompactionInfo(cfs.metadata(), OperationType.CLEANUP, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables); } public boolean isGlobal() From 95283d2a87ebd1ab8ce3e75633e9d02fd17969f3 Mon Sep 17 00:00:00 2001 From: Matt Byrd Date: Sat, 4 Apr 2026 00:24:10 +0100 Subject: [PATCH 2/2] fix compilation issues from cherry-picking from older version, fix typo and add CHANGES.txt entry --- CHANGES.txt | 1 + .../cassandra/db/compaction/CursorCompactor.java | 9 ++++++++- .../index/accord/RouteSecondaryIndexBuilder.java | 12 +++++++++++- .../cassandra/db/compaction/CompactionsCQLTest.java | 3 ++- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 85d2a3e330d7..7b7f6c6d78ea 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 6.0-alpha2 + * Uncompressed size is being used for compressed tables in maintenance operations (CASSANDRA-21245) * Fix a removed TTLed row re-appearance in a materialized view after a cursor compaction (CASSANDRA-21152) * Rework ZSTD dictionary compression logic to create a trainer per training (CASSANDRA-21209) Merged from 5.0: diff --git a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java index 7d528b5e2d9d..83b50d3d79a6 100644 --- a/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java +++ b/src/java/org/apache/cassandra/db/compaction/CursorCompactor.java @@ -207,6 +207,7 @@ private static void logDebugReason(TableMetadata metadata, String reason) private final long nowInSec; private final TimeUUID compactionId; private final long totalInputBytes; + private final long totalCompressedInputBytes; private final StatefulCursor[] sstableCursors; private final boolean[] sstableCursorsEqualsNext; private final boolean hasStaticColumns; @@ -265,9 +266,14 @@ private CursorCompactor(OperationType type, this.compactionId = compactionId; long inputBytes = 0; + long compressedInputBytes = 0; for (ISSTableScanner scanner : scanners) + { inputBytes += scanner.getLengthInBytes(); + compressedInputBytes += scanner.getCompressedLengthInBytes(); + } this.totalInputBytes = inputBytes; + this.totalCompressedInputBytes = compressedInputBytes; this.partitionMergeCounters = new long[scanners.size()]; this.staticRowMergeCounters = new long[partitionMergeCounters.length]; this.rowMergeCounters = new long[partitionMergeCounters.length]; @@ -1458,6 +1464,7 @@ public CompactionInfo getCompactionInfo() type, getBytesRead(), totalInputBytes, + totalCompressedInputBytes, compactionId, sstables, targetDirectory); @@ -1663,4 +1670,4 @@ else if (cmp == 0) { } preSortedArray[insertInto] = newElement; } -} \ No newline at end of file +} diff --git a/src/java/org/apache/cassandra/index/accord/RouteSecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/accord/RouteSecondaryIndexBuilder.java index 428ed774c04e..c8c7ac4ee8bc 100644 --- a/src/java/org/apache/cassandra/index/accord/RouteSecondaryIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/accord/RouteSecondaryIndexBuilder.java @@ -57,6 +57,7 @@ public class RouteSecondaryIndexBuilder extends SecondaryIndexBuilder private final boolean isFullRebuild; private final boolean isInitialBuild; private final long totalSizeInBytes; + private final long totalCompressedSizeInBytes; private long bytesProcessed = 0; public RouteSecondaryIndexBuilder(RouteJournalIndex index, @@ -72,7 +73,15 @@ public RouteSecondaryIndexBuilder(RouteJournalIndex index, this.sstables = sstables; this.isFullRebuild = isFullRebuild; this.isInitialBuild = isInitialBuild; - this.totalSizeInBytes = sstables.stream().mapToLong(SSTableReader::uncompressedLength).sum(); + long uncompressedSum = 0L; + long compressedSum = 0L; + for (SSTableReader sstable : sstables) + { + uncompressedSum += sstable.uncompressedLength(); + compressedSum += sstable.onDiskLength(); + } + this.totalSizeInBytes = uncompressedSum; + this.totalCompressedSizeInBytes = compressedSum; } @Override @@ -82,6 +91,7 @@ public CompactionInfo getCompactionInfo() OperationType.INDEX_BUILD, bytesProcessed, totalSizeInBytes, + totalCompressedSizeInBytes, compactionId, sstables); } diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index 006b8146e5a5..6d0ef66f9670 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import com.google.common.collect.Iterables; @@ -878,7 +879,7 @@ public void testNoDiskspace() throws Throwable execute("insert into %s (id, i) values (?,?)", i, i); getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); } - // When we have an existing an compaction with sstables of total size more than double the available space, + // When we have an existing compaction with sstables of total size more than double the available space, // we should not be able to then run a major compaction CompactionInfo.Holder holder = holder(OperationType.COMPACTION, 2); CompactionManager.instance.active.beginCompaction(holder);