Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
6.0-alpha2
* Uncompressed size is being used for compressed tables in maintenance operations (CASSANDRA-21245)
* Fix a removed TTLed row re-appearance in a materialized view after a cursor compaction (CASSANDRA-21152)
* Rework ZSTD dictionary compression logic to create a trainer per training (CASSANDRA-21209)
Merged from 5.0:
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/cache/AutoSavingCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ else if (cacheType == CacheService.CacheType.COUNTER_CACHE)
type,
0,
keysEstimate,
keysEstimate,
Unit.KEYS,
nextTimeUUID(),
getCacheDataPath(CURRENT_VERSION).toPath().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void finishCompaction(CompactionInfo.Holder ci)
/**
* Get the estimated number of bytes remaining to write per sstable directory
*/
public Map<File, Long> estimatedRemainingWriteBytes()
public Map<File, Long> estimatedRemainingWriteToDiskBytes()
{
synchronized (compactions)
{
Expand All @@ -66,7 +66,7 @@ public Map<File, Long> estimatedRemainingWriteBytes()
List<File> directories = compactionInfo.getTargetDirectories();
if (directories == null || directories.isEmpty())
continue;
long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteBytes() / directories.size();
long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteToDiskBytes() / directories.size();
for (File directory : directories)
writeBytesPerSSTableDir.merge(directory, remainingWriteBytesPerDataDir, Long::sum);
}
Expand Down
32 changes: 19 additions & 13 deletions src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,55 +52,57 @@ public final class CompactionInfo
private final OperationType tasktype;
private final long completed;
private final long total;
private final long totalCompressed;
private final Unit unit;
private final TimeUUID compactionId;
private final ImmutableSet<SSTableReader> sstables;
private final String targetDirectory;

public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, Collection<? extends SSTableReader> sstables, String targetDirectory)
public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, Collection<? extends SSTableReader> sstables, String targetDirectory)
{
this.tasktype = tasktype;
this.completed = completed;
this.total = total;
this.totalCompressed = totalCompressed;
this.metadata = metadata;
this.unit = unit;
this.compactionId = compactionId;
this.sstables = ImmutableSet.copyOf(sstables);
this.targetDirectory = targetDirectory;
}

public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection<SSTableReader> sstables, String targetDirectory)
public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection<SSTableReader> sstables, String targetDirectory)
{
this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, targetDirectory);
this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, targetDirectory);
}

public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection<? extends SSTableReader> sstables)
public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection<? extends SSTableReader> sstables)
{
this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, null);
this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, null);
}

/**
* Special compaction info where we always need to cancel the compaction - for example ViewBuilderTask where we don't know
* the sstables at construction
*/
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId)
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId)
{
return withoutSSTables(metadata, tasktype, completed, total, unit, compactionId, null);
return withoutSSTables(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, null);
}

/**
* Special compaction info where we always need to cancel the compaction - for example AutoSavingCache where we don't know
* the sstables at construction
*/
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, String targetDirectory)
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, String targetDirectory)
{
return new CompactionInfo(metadata, tasktype, completed, total, unit, compactionId, ImmutableSet.of(), targetDirectory);
return new CompactionInfo(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, ImmutableSet.of(), targetDirectory);
}

/** @return A copy of this CompactionInfo with updated progress. */
public CompactionInfo forProgress(long complete, long total)
{
return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId, sstables, targetDirectory);
return new CompactionInfo(metadata, tasktype, complete, total, totalCompressed, unit, compactionId, sstables, targetDirectory);
}

public Optional<String> getKeyspace()
Expand Down Expand Up @@ -183,12 +185,16 @@ public String targetDirectory()
/**
* Note that this estimate is based on the amount of data we have left to read - it assumes input
* size == output size for a compaction, which is not really true, but should most often provide a worst case
* remaining write size.
* remaining write size. We also scale by the effective compression ratio since total/completed are for the uncompressed size.
*/
public long estimatedRemainingWriteBytes()
public long estimatedRemainingWriteToDiskBytes()
{
if (unit == Unit.BYTES && tasktype.writesData)
return getTotal() - getCompleted();
{
final long total = getTotal();
double compressionRatio = total == 0 ? 1 : ((double) totalCompressed / (double)total);
return (long)(compressionRatio * (total - getCompleted()));
}
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
private final long nowInSec;
private final TimeUUID compactionId;
private final long totalBytes;
private final long totalCompressedBytes;
private long bytesRead;
private long totalSourceCQLRows;

Expand Down Expand Up @@ -227,9 +228,13 @@ public CompactionIterator(OperationType type,
this.bytesRead = 0;

long bytes = 0;
for (ISSTableScanner scanner : scanners)
long compressedBytes = 0;
for (ISSTableScanner scanner : scanners) {
bytes += scanner.getLengthInBytes();
compressedBytes += scanner.getCompressedLengthInBytes();
}
this.totalBytes = bytes;
this.totalCompressedBytes = compressedBytes;
this.mergeCounters = new long[scanners.size()];
// note that we leak `this` from the constructor when calling beginCompaction below, this means we have to get the sstables before
// calling that to avoid a NPE.
Expand Down Expand Up @@ -277,6 +282,7 @@ public CompactionInfo getCompactionInfo()
type,
bytesRead,
totalBytes,
totalCompressedBytes,
compactionId,
sstables,
targetDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ protected boolean buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTab
for (File directory : newCompactionDatadirs)
expectedNewWriteSize.put(directory, writeSizePerOutputDatadir);

Map<File, Long> expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteBytes();
Map<File, Long> expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes();

// todo: abort streams if they block compactions
if (cfs.getDirectories().hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSize, expectedWriteSize))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ private static void logDebugReason(TableMetadata metadata, String reason)
private final long nowInSec;
private final TimeUUID compactionId;
private final long totalInputBytes;
private final long totalCompressedInputBytes;
private final StatefulCursor[] sstableCursors;
private final boolean[] sstableCursorsEqualsNext;
private final boolean hasStaticColumns;
Expand Down Expand Up @@ -265,9 +266,14 @@ private CursorCompactor(OperationType type,
this.compactionId = compactionId;

long inputBytes = 0;
long compressedInputBytes = 0;
for (ISSTableScanner scanner : scanners)
{
inputBytes += scanner.getLengthInBytes();
compressedInputBytes += scanner.getCompressedLengthInBytes();
}
this.totalInputBytes = inputBytes;
this.totalCompressedInputBytes = compressedInputBytes;
this.partitionMergeCounters = new long[scanners.size()];
this.staticRowMergeCounters = new long[partitionMergeCounters.length];
this.rowMergeCounters = new long[partitionMergeCounters.length];
Expand Down Expand Up @@ -1458,6 +1464,7 @@ public CompactionInfo getCompactionInfo()
type,
getBytesRead(),
totalInputBytes,
totalCompressedInputBytes,
compactionId,
sstables,
targetDirectory);
Expand Down Expand Up @@ -1663,4 +1670,4 @@ else if (cmp == 0) {
}
preSortedArray[insertInto] = newElement;
}
}
}
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,13 @@ public CompactionInfo getCompactionInfo()
if (range.left.getPartitioner().splitter().isPresent())
{
long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, 1000, Unit.RANGES, compactionId);
}

// When there is no splitter, estimate based on number of total keys but
// take the max with keysBuilt + 1 to avoid having more completed than total
long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range));
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, keysTotal, Unit.KEYS, compactionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class RouteSecondaryIndexBuilder extends SecondaryIndexBuilder
private final boolean isFullRebuild;
private final boolean isInitialBuild;
private final long totalSizeInBytes;
private final long totalCompressedSizeInBytes;
private long bytesProcessed = 0;

public RouteSecondaryIndexBuilder(RouteJournalIndex index,
Expand All @@ -72,7 +73,15 @@ public RouteSecondaryIndexBuilder(RouteJournalIndex index,
this.sstables = sstables;
this.isFullRebuild = isFullRebuild;
this.isInitialBuild = isInitialBuild;
this.totalSizeInBytes = sstables.stream().mapToLong(SSTableReader::uncompressedLength).sum();
long uncompressedSum = 0L;
long compressedSum = 0L;
for (SSTableReader sstable : sstables)
{
uncompressedSum += sstable.uncompressedLength();
compressedSum += sstable.onDiskLength();
}
this.totalSizeInBytes = uncompressedSum;
this.totalCompressedSizeInBytes = compressedSum;
}

@Override
Expand All @@ -82,6 +91,7 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
bytesProcessed,
totalSizeInBytes,
totalCompressedSizeInBytes,
compactionId,
sstables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
iter.getBytesRead(),
iter.getTotalBytes(),
iter.getTotalBytes(),
compactionId,
sstables);
sstables
);
}

public void build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
bytesProcessed,
totalSizeInBytes,
totalSizeInBytes,
compactionId,
sstables.keySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
bytesProcessed,
totalBytesToProcess,
totalBytesToProcess,
compactionId,
sstables.keySet(),
targetDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ public CompactionInfo getCompactionInfo()
OperationType.SCRUB,
dataFile.getFilePointer(),
dataFile.length(),
sstable.onDiskLength(),
scrubCompactionId,
ImmutableSet.of(sstable),
File.getPath(sstable.getFilename()).getParent().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ public CompactionInfo getCompactionInfo()
OperationType.VERIFY,
dataFile.getFilePointer(),
dataFile.length(),
sstable.onDiskLength(),
verificationCompactionId,
ImmutableSet.of(sstable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ static <T extends SSTableReader & IndexSummarySupport<T>> Pair<List<T>, List<Res

public CompactionInfo getCompactionInfo()
{
return CompactionInfo.withoutSSTables(null, OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
return CompactionInfo.withoutSSTables(null, OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, memoryPoolBytes, Unit.BYTES, compactionId);
}

public boolean isGlobal()
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/streaming/StreamSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ static boolean checkDiskSpace(Map<TableId, Long> perTableIdIncomingBytes,
for (FileStore fs : allWriteableFileStores)
newStreamBytesToWritePerFileStore.merge(fs, totalBytesInPerFileStore, Long::sum);
}
Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteBytes(),
Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes(),
fileStoreMapper);
long totalStreamRemaining = StreamManager.instance.getTotalRemainingOngoingBytes();
long totalBytesStreamRemainingPerFileStore = totalStreamRemaining / Math.max(1, allFileStores.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static class BB
public static void install(ClassLoader cl, Integer node)
{
new ByteBuddy().rebase(ActiveCompactions.class)
.method(named("estimatedRemainingWriteBytes"))
.method(named("estimatedRemainingWriteToDiskBytes"))
.intercept(MethodDelegation.to(BB.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
Expand All @@ -127,7 +127,7 @@ public static void install(ClassLoader cl, Integer node)
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}

public static Map<File, Long> estimatedRemainingWriteBytes()
public static Map<File, Long> estimatedRemainingWriteToDiskBytes()
{
if (sstableDir != null)
return ImmutableMap.of(sstableDir, estimatedRemaining.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void test2iCompaction() throws IOException
// emulate ongoing index compaction:
CompactionInfo.Holder h = new MockHolder(i.getIndexCfs().metadata(), idxSSTables);
CompactionManager.instance.active.beginCompaction(h);
CompactionManager.instance.active.estimatedRemainingWriteBytes();
CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes();
CompactionManager.instance.active.finishCompaction(h);
});
}
Expand All @@ -79,7 +79,7 @@ public MockHolder(TableMetadata metadata, Set<SSTableReader> sstables)
@Override
public CompactionInfo getCompactionInfo()
{
return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, nextTimeUUID(), sstables);
return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, 0, nextTimeUUID(), sstables);
Copy link
Copy Markdown
Contributor

@netudima netudima Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we set compressed value as 0 intentionally here?

}

@Override
Expand Down
Loading