Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.annotations.VisibleForTesting;

import com.google.common.base.Preconditions;
import org.apache.cassandra.cdc.TypeCache;
import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
import org.apache.cassandra.spark.utils.Throwing;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class CdcBridgeFactory extends BaseCassandraBridgeFactory
{
private static final AtomicBoolean SHUTDOWN_HOOK_REGISTERED = new AtomicBoolean(false);

// maps Cassandra version-specific jar name (e.g. 'four-zero') to matching CassandraBridge and SparkSqlTypeConverter
private static final Map<String, VersionSpecificBridge> CASSANDRA_BRIDGES =
new ConcurrentHashMap<>(CassandraVersion.values().length);
new ConcurrentHashMap<>(CassandraVersion.values().length);

public static class VersionSpecificBridge
{
Expand Down Expand Up @@ -77,11 +82,9 @@ public static CassandraBridge get(@NotNull CassandraVersionFeatures features)

private static CdcBridgeFactory.VersionSpecificBridge getVersionSpecificBridge(@NotNull CassandraVersion version)
{
maybeRegisterShutdownHook();
String jarBaseName = version.jarBaseName();
if (jarBaseName == null)
{
throw new NullPointerException("Cassandra version " + version + " is not supported");
}
Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version + " is not supported");
return CASSANDRA_BRIDGES.computeIfAbsent(jarBaseName, CdcBridgeFactory::create);
}

Expand Down Expand Up @@ -165,6 +168,51 @@ private static VersionSpecificBridge create(@NotNull String label)
}
}

/**
* Returns whether the shutdown hook has been registered.
*/
@VisibleForTesting
static boolean isShutdownHookRegistered()
{
return SHUTDOWN_HOOK_REGISTERED.get();
}

/**
* Registers a JVM shutdown hook that calls {@link #close()} to release classloader resources.
* Clears the {@link TypeCache} first so type references are released before classloaders close.
* Uses CAS to ensure the hook is registered at most once.
*/
@VisibleForTesting
static void maybeRegisterShutdownHook()
{
if (SHUTDOWN_HOOK_REGISTERED.compareAndSet(false, true))
{
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try
{
TypeCache.clear();
close();
}
catch (Exception e)
{
// best-effort cleanup during JVM shutdown
}
}, CdcBridgeFactory.class.getSimpleName() + "-shutdown"));
}
}

/**
* Closes all cached bridge classloaders and clears the cache. Call during application shutdown
* to release classloader resources and delete temp JAR files.
*
* Do not use this method outside testing; rely on the shutdown hook logic to clean things up in prod.
*/
@VisibleForTesting
public static void close()
{
closeBridges(CASSANDRA_BRIDGES, bridge -> bridge.classLoader);
}

@VisibleForTesting
public static <T> T executeActionOnBridgeClassLoader(@NotNull CassandraVersion version, Throwing.Function<ClassLoader, T> action)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ protected void refreshSchema()

// Closable

/**
* Stops the CDC consumer. Bridge resources (classloaders, temp files, type caches) are owned
* by the static bridge factories and should be cleaned up at application shutdown via
* {@link TypeCache#clear} and {@link CdcBridgeFactory#close}, not per-instance.
*/
@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
/**
* Caches Cassandra CqlField.CqlType objects, so they don't need to be re-created everytime. Keyed on keyspace and type to permit per keyspace UDT definitions.
*/
@SuppressWarnings("UnstableApiUsage")
public class TypeCache
{
private static final Logger LOGGER = LoggerFactory.getLogger(TypeCache.class);
Expand All @@ -47,19 +48,25 @@ public class TypeCache

private final Supplier<CassandraTypes> cassandraTypesSupplier;

/** We have contention between instance level cache access and the global {@link #clear} method; we go ahead and
* lock at the class level since the clearing operation is expected to happen very infrequently leaving cache
* access uncontended and as predominantly a very fast null check.
*/
private static final Object creationLock = new Object();

protected TypeCache(Supplier<CassandraTypes> cassandraTypesSupplier)
{
this.cassandraTypesSupplier = cassandraTypesSupplier;
}

public CqlField.CqlType getType(String keyspace, String typeString)
{
maybeInit();
Cache<KeyspaceTypeKey, CqlField.CqlType> localCache = maybeInit();
CqlField.CqlType result;
KeyspaceTypeKey key = KeyspaceTypeKey.of(keyspace, typeString);
try
{
result = cqlTypeCache.get(key, () -> getTypes().parseType(keyspace, typeString));
result = localCache.get(key, () -> getTypes().parseType(keyspace, typeString));
}
catch (CacheLoader.InvalidCacheLoadException | ExecutionException e)
{
Expand All @@ -69,7 +76,7 @@ public CqlField.CqlType getType(String keyspace, String typeString)
{
throw new RuntimeException("Unable to parse type: " + typeString);
}
cqlTypeCache.put(key, result);
localCache.put(key, result);
}
return result;
}
Expand All @@ -82,36 +89,67 @@ public CassandraTypes getTypes()
public static TypeCache get(CassandraVersion version)
{
return VERSION_TYPE_CACHE
.compute(version,
(key, previous) ->
new TypeCache(() -> CdcBridgeFactory.get(key).cassandraTypes())
.computeIfAbsent(version,
key ->
new TypeCache(() -> CdcBridgeFactory.get(key).cassandraTypes())
);
}

private void maybeInit()
/**
* Globally clears all cached TypeCache instances. Each entry's Guava cache is invalidated and
* nullified to release references to CQL types (and transitively, bridge classloaders).
*
* This is a "do it once" kind of shutdown action; while _in theory_ it should be unnecessary (i.e. we let JVM
* shutdown just nuke everything), in practice better to be deliberate about this, especially in test environments
* where things are spun up and spun down and global resources hanging around can be problematic.
*/
public static void clear()
{
if (cqlTypeCache != null)
synchronized (creationLock)
{
return;
VERSION_TYPE_CACHE.forEach((version, typeCache) -> {
if (typeCache.cqlTypeCache != null)
{
typeCache.cqlTypeCache.invalidateAll();
typeCache.cqlTypeCache = null;
}
});
VERSION_TYPE_CACHE.clear();
}
}

synchronized (this)
/**
* We use a manual cache since the parser is version dependent
* see {@link org.apache.cassandra.cdc.test.TestCdcBridgeProvider#getCassandraBridge}
*
* Since there's a real risk of race between this and {@link #clear}, we lock on {@link #creationLock} to enforce
* clear boundaries between the two.
*
* Since most calls here will jump right into the "cqlTypeCache != null" path, the risk of contention on a simple
* variable null check should be very _very_ low. We can't rely on {@link #cqlTypeCache} being volatile since we
* have the "check for null then return" pattern and we could race with teardown leading to NPE.
*
* @return Reference to the cache so subsequent clear calls don't NPE us.
*/
private Cache<KeyspaceTypeKey, CqlField.CqlType> maybeInit()
{
synchronized (creationLock)
{
if (cqlTypeCache != null)
{
return;
return cqlTypeCache;
}

// using a manual cache since the parser is version dependent, see getCassandraBridge()
cqlTypeCache = CacheBuilder.newBuilder()
.initialCapacity(CACHE_CAPACITY / 2)
.maximumSize(CACHE_CAPACITY)
.removalListener(notification -> {
// Log at the info level: the event is not expected, but could happen.
// With the logs, we can check how frequent it happens.
LOGGER.info("Type is evicted from cache. type='{}'", notification.getKey());
})
.build();
.initialCapacity(CACHE_CAPACITY / 2)
.maximumSize(CACHE_CAPACITY)
.removalListener(notification -> {
// Log at the info level: the event is not expected, but could happen.
// With the logs, we can check how frequent it happens.
LOGGER.info("Type is evicted from cache. type='{}'", notification.getKey());
})
.build();
return cqlTypeCache;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cassandra.bridge;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for {@link CdcBridgeFactory} shutdown hook registration.
* Relies on {@code forkEvery = 1} so each test class gets a fresh JVM with clean static state.
*/
public class CdcBridgeFactoryShutdownHookTest
{
@Test
public void testShutdownHookNotRegisteredInitially()
{
assertThat(CdcBridgeFactory.isShutdownHookRegistered()).isFalse();
}

@Test
public void testShutdownHookRegisteredOnce()
{
CdcBridgeFactory.maybeRegisterShutdownHook();
assertThat(CdcBridgeFactory.isShutdownHookRegistered()).isTrue();

// Second call is idempotent — the flag stays true and no exception is thrown
CdcBridgeFactory.maybeRegisterShutdownHook();
assertThat(CdcBridgeFactory.isShutdownHookRegistered()).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import java.nio.file.Path;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

import org.apache.cassandra.bridge.CassandraBridge;
Expand All @@ -44,13 +46,28 @@ public abstract class CdcTestBase
protected CommitLogInstance commitLog;
protected Path commitLogDir;

@BeforeAll
static void beforeAll()
{
TestCdcBridgeProvider.setup();
}

/**
* Releases all cached bridges, temp directories, and classloaders after all tests in the class complete.
*/
@AfterAll
static void afterAll()
{
TestCdcBridgeProvider.tearDown();
}

void setup(CassandraVersion version)
{
this.cdcOptions = CdcBridgeProvider.getCdcOptions(version);
this.bridge = CdcBridgeProvider.getCassandraBridge(version);
this.cdcBridge = CdcBridgeProvider.getTestCdcBridge(version);
this.commitLogDir = CdcBridgeProvider.getCommitLogDir(version);
this.cdcOptions = TestCdcBridgeProvider.getCdcOptions(version);
this.bridge = TestCdcBridgeProvider.getCassandraBridge(version);
this.cdcBridge = TestCdcBridgeProvider.getTestCdcBridge(version);
this.commitLogDir = TestCdcBridgeProvider.getCommitLogDir(version);
this.commitLog = cdcBridge.createCommitLogInstance(commitLogDir);
this.messageConverter = CdcBridgeProvider.getMessageConverter(version);
this.messageConverter = TestCdcBridgeProvider.getMessageConverter(version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public CdcTester build()
CdcOptions options = cdcOptions;
if (options == null)
{
options = CdcBridgeProvider.getCdcOptions(bridge.getVersion());
options = TestCdcBridgeProvider.getCdcOptions(bridge.getVersion());
}
return new CdcTester(bridge, cdcBridge, schemaBuilder.build(), testDir, writers, numRows, expectedNumRows,
addLastModificationTime, eventChecker, shouldCdcEventWriterFailOnProcessing,
Expand Down
Loading
Loading