Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.jeffjirsa.cassandra.db.compaction</groupId>
<artifactId>TimeWindowCompactionStrategy</artifactId>
<packaging>jar</packaging>
<version>3.7</version>
<version>3.11</version>
<name>TimeWindowCompactionStrategy</name>
<url>https://github.com/jeffjirsa/twcs</url>
<properties>
Expand All @@ -21,7 +21,7 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.7</version>
<version>3.11.3</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,27 @@
*/
package com.jeffjirsa.cassandra.db.compaction;

import java.util.*;
import java.util.Map.Entry;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.AbstractCompactionTask;
import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.Map.Entry;

import static com.google.common.collect.Iterables.filter;

Expand Down Expand Up @@ -66,7 +69,8 @@ private long avgSize(List<SSTableReader> sstables)

protected SizeTieredCompactionStrategyOptions sizeTieredOptions;
protected volatile int estimatedRemainingTasks;
private final Set<SSTableReader> sstables = new HashSet<>();
@VisibleForTesting
protected final Set<SSTableReader> sstables = new HashSet<>();

public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
Expand All @@ -75,7 +79,7 @@ public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> o
this.sizeTieredOptions = new SizeTieredCompactionStrategyOptions(options);
}

private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
// make local copies so they can't be changed out from under us mid-method
int minThreshold = cfs.getMinimumCompactionThreshold();
Expand All @@ -85,7 +89,8 @@ private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)

List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize);
logger.trace("Compaction buckets are {}", buckets);
updateEstimatedCompactionsByTasks(buckets);
estimatedRemainingTasks = getEstimatedCompactionsByTasks(cfs, buckets);
cfs.getCompactionStrategyManager().compactionLogger.pending(this, estimatedRemainingTasks);
List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
if (!mostInteresting.isEmpty())
return mostInteresting;
Expand All @@ -101,8 +106,7 @@ private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
if (sstablesWithTombstones.isEmpty())
return Collections.emptyList();

Collections.sort(sstablesWithTombstones, new SSTableReader.SizeComparator());
return Collections.singletonList(sstablesWithTombstones.get(0));
return Collections.singletonList(Collections.max(sstablesWithTombstones, SSTableReader.sizeComparator));
}


Expand Down Expand Up @@ -175,18 +179,30 @@ private static double hotness(SSTableReader sstr)
}

@SuppressWarnings("resource")
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);

if (hottestBucket.isEmpty())
return null;

// Already tried acquiring references without success. It means there is a race with
// the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
if (hottestBucket.equals(previousCandidate))
{
logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
"unless it happens frequently, in which case it must be reported. Will retry later.",
hottestBucket);
return null;
}

LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION);
if (transaction != null)
return new CompactionTask(cfs, transaction, gcBefore);
previousCandidate = hottestBucket;
}
}

Expand Down Expand Up @@ -283,15 +299,15 @@ public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
return new ArrayList<List<T>>(buckets.values());
}

private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>> tasks)
public static int getEstimatedCompactionsByTasks(ColumnFamilyStore cfs, List<List<SSTableReader>> tasks)
{
int n = 0;
for (List<SSTableReader> bucket: tasks)
for (List<SSTableReader> bucket : tasks)
{
if (bucket.size() >= cfs.getMinimumCompactionThreshold())
n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold());
}
estimatedRemainingTasks = n;
return n;
}

public long getMaxSSTableBytes()
Expand Down Expand Up @@ -328,6 +344,11 @@ public void removeSSTable(SSTableReader sstable)
sstables.remove(sstable);
}

protected Set<SSTableReader> getSSTables()
{
return ImmutableSet.copyOf(sstables);
}

public String toString()
{
return String.format("SizeTieredCompactionStrategy[%s/%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,31 @@ public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> o
}

@Override
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);

if (latestBucket.isEmpty())
return null;

// Already tried acquiring references without success. It means there is a race with
// the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
if (latestBucket.equals(previousCandidate))
{
logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
"unless it happens frequently, in which case it must be reported. Will retry later.",
latestBucket);
return null;
}

LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
if (modifier != null && !options.enableCleanup)
return new CompactionTask(cfs, modifier, gcBefore);
else if(modifier != null && options.enableCleanup)
return new CleaningTimeWindowCompactionTask(cfs, modifier, gcBefore);
if (modifier != null)
return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps);
previousCandidate = latestBucket;
}
}

Expand All @@ -94,7 +105,7 @@ else if(modifier != null && options.enableCleanup)
* @param gcBefore
* @return
*/
private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
return Collections.emptyList();
Expand All @@ -107,7 +118,8 @@ private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
{
logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, options.ignoreOverlaps ? Collections.emptySet() : cfs.getOverlappingLiveSSTables(uncompacting),
gcBefore, options.ignoreOverlaps);
lastExpiredCheck = System.currentTimeMillis();
}
else
Expand Down Expand Up @@ -147,7 +159,7 @@ private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> no
if (sstablesWithTombstones.isEmpty())
return Collections.emptyList();

return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
return Collections.singletonList(Collections.min(sstablesWithTombstones, SSTableReader.sizeComparator));
}

private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
Expand All @@ -161,8 +173,6 @@ private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> cand
List<SSTableReader> mostInteresting = newestBucket(buckets.left,
cfs.getMinimumCompactionThreshold(),
cfs.getMaximumCompactionThreshold(),
options.sstableWindowUnit,
options.sstableWindowSize,
options.stcsOptions,
this.highestWindowSeen);
if (!mostInteresting.isEmpty())
Expand All @@ -182,6 +192,11 @@ public void removeSSTable(SSTableReader sstable)
sstables.remove(sstable);
}

protected Set<SSTableReader> getSSTables()
{
return ImmutableSet.copyOf(sstables);
}

/**
* Find the lowest and highest timestamps in a given timestamp/unit pair
* Returns milliseconds, caller should adjust accordingly
Expand All @@ -190,26 +205,27 @@ public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, i
{
long lowerTimestamp;
long upperTimestamp;
long timestampInSeconds = timestampInMillis / 1000L;
long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);

switch(windowTimeUnit)
{
case MINUTES:
lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60 * windowTimeSize));
lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60L * windowTimeSize));
upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L;
break;
case HOURS:
lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600 * windowTimeSize));
lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600L * windowTimeSize));
upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L;
break;
case DAYS:
default:
lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400 * windowTimeSize));
lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400L * windowTimeSize));
upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L;
break;
}

return Pair.create(lowerTimestamp * 1000L, upperTimestamp * 1000L);
return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS),
TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS));

}

Expand All @@ -233,24 +249,16 @@ static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTable
// Where the bucket is the file's max timestamp rounded to the nearest window bucket
for (SSTableReader f : files)
{
long tStamp = f.getMaxTimestamp();
if (timestampResolution.equals(TimeUnit.MICROSECONDS))
tStamp = tStamp / 1000L;
else if (timestampResolution.equals(TimeUnit.NANOSECONDS))
tStamp = tStamp / 1000000L;
else if (timestampResolution.equals(TimeUnit.SECONDS))
tStamp = tStamp * 1000L;
else
assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);

assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
buckets.put(bounds.left, f );
buckets.put(bounds.left, f);
if (bounds.left > maxTimestamp)
maxTimestamp = bounds.left;
}

logger.debug("buckets {}, max timestamp", buckets, maxTimestamp);
return Pair.< HashMultimap<Long, SSTableReader>, Long >create(buckets, maxTimestamp);
logger.trace("buckets {}, max timestamp {}", buckets, maxTimestamp);
return Pair.create(buckets, maxTimestamp);
}

private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
Expand All @@ -277,7 +285,7 @@ else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
* @return a bucket (list) of sstables to compact.
*/
@VisibleForTesting
static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, SizeTieredCompactionStrategyOptions stcsOptions, long now)
{
// If the current bucket has at least minThreshold SSTables, choose that one.
// For any other bucket, at least 2 SSTables is enough.
Expand All @@ -290,7 +298,7 @@ static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> bucket
{
Long key = it.next();
Set<SSTableReader> bucket = buckets.get(key);
logger.debug("Key {}, now {}", key, now);
logger.trace("Key {}, now {}", key, now);
if (bucket.size() >= minThreshold && key >= now)
{
// If we're in the newest bucket, we'll use STCS to prioritize sstables
Expand Down Expand Up @@ -327,12 +335,13 @@ static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThr
List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);

// Trim the largest sstables off the end to meet the maxThreshold
Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
Collections.sort(ssTableReaders, SSTableReader.sizeComparator);

return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
}

@Override
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
{
Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
Expand All @@ -341,10 +350,11 @@ public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefo
LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
if (txn == null)
return null;
return Collections.<AbstractCompactionTask>singleton(new CompactionTask(cfs, txn, gcBefore));
return Collections.singleton(new TimeWindowCompactionTask(cfs, txn, gcBefore, options.ignoreOverlaps));
}

@Override
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
Expand All @@ -356,7 +366,7 @@ public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTable
return null;
}

return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true);
return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps).setUserDefined(true);
}

public int getEstimatedRemainingTasks()
Expand Down
Loading