From e0db40fb93865e1bb5ab4fe1ee485327eec41755 Mon Sep 17 00:00:00 2001 From: Josh McKenzie Date: Fri, 27 Mar 2026 16:49:10 -0400 Subject: [PATCH 1/2] CASSANALYTICS-143 Fix various leaks in CDC caching and bridge implementation Patch by Josh McKenzie; reviewed by TBD for CASSANALYTICS-143 This patch makes a lot of the lifecycle implementation in tests explicit (temp failes, cleanup of resources, etc) that previously relied on the CI env Just Working. Some of it is about tidying up and making explicit our position on resource management w/regards to test class lifecycle, and some of it is about leaks preventing GC (schema graph, etc). --- .../cassandra/bridge/CdcBridgeFactory.java | 58 ++++++++++++-- .../java/org/apache/cassandra/cdc/Cdc.java | 5 ++ .../org/apache/cassandra/cdc/TypeCache.java | 80 ++++++++++++++----- .../CdcBridgeFactoryShutdownHookTest.java | 48 +++++++++++ .../cassandra/cdc/test/CdcTestBase.java | 27 +++++-- .../apache/cassandra/cdc/test/CdcTester.java | 2 +- ...ovider.java => TestCdcBridgeProvider.java} | 56 ++++++++++--- .../BufferingCommitLogReaderTests.java | 22 ++++- .../bridge/CassandraBridgeFactory.java | 78 ++++++++++++++++-- ...assandraBridgeFactoryShutdownHookTest.java | 48 +++++++++++ .../PostDelegationClassLoaderTests.java | 53 ++++++++++++ .../bridge/BaseCassandraBridgeFactory.java | 44 +++++++++- .../bridge/PostDelegationClassLoader.java | 42 +++++++++- .../AbstractCdcBridgeImplementation.java | 28 +++---- 14 files changed, 518 insertions(+), 73 deletions(-) create mode 100644 cassandra-analytics-cdc/src/test/java/org/apache/cassandra/bridge/CdcBridgeFactoryShutdownHookTest.java rename cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/{CdcBridgeProvider.java => TestCdcBridgeProvider.java} (71%) create mode 100644 cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/CassandraBridgeFactoryShutdownHookTest.java create mode 100644 cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/PostDelegationClassLoaderTests.java diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java index 0d19bb826..abb61e131 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java @@ -23,9 +23,12 @@ import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.cassandra.cdc.TypeCache; import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; import org.apache.cassandra.spark.utils.Throwing; import org.jetbrains.annotations.NotNull; @@ -33,9 +36,11 @@ public final class CdcBridgeFactory extends BaseCassandraBridgeFactory { + private static final AtomicBoolean SHUTDOWN_HOOK_REGISTERED = new AtomicBoolean(false); + // maps Cassandra version-specific jar name (e.g. 'four-zero') to matching CassandraBridge and SparkSqlTypeConverter private static final Map CASSANDRA_BRIDGES = - new ConcurrentHashMap<>(CassandraVersion.values().length); + new ConcurrentHashMap<>(CassandraVersion.values().length); public static class VersionSpecificBridge { @@ -77,11 +82,9 @@ public static CassandraBridge get(@NotNull CassandraVersionFeatures features) private static CdcBridgeFactory.VersionSpecificBridge getVersionSpecificBridge(@NotNull CassandraVersion version) { + maybeRegisterShutdownHook(); String jarBaseName = version.jarBaseName(); - if (jarBaseName == null) - { - throw new NullPointerException("Cassandra version " + version + " is not supported"); - } + Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version + " is not supported"); return CASSANDRA_BRIDGES.computeIfAbsent(jarBaseName, CdcBridgeFactory::create); } @@ -165,6 +168,51 @@ private static VersionSpecificBridge create(@NotNull String label) } } + /** + * Returns whether the shutdown hook has been registered. + */ + @VisibleForTesting + static boolean isShutdownHookRegistered() + { + return SHUTDOWN_HOOK_REGISTERED.get(); + } + + /** + * Registers a JVM shutdown hook that calls {@link #close()} to release classloader resources. + * Clears the {@link TypeCache} first so type references are released before classloaders close. + * Uses CAS to ensure the hook is registered at most once. + */ + @VisibleForTesting + static void maybeRegisterShutdownHook() + { + if (SHUTDOWN_HOOK_REGISTERED.compareAndSet(false, true)) + { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try + { + TypeCache.clear(); + close(); + } + catch (Exception e) + { + // best-effort cleanup during JVM shutdown + } + }, CdcBridgeFactory.class.getSimpleName() + "-shutdown")); + } + } + + /** + * Closes all cached bridge classloaders and clears the cache. Call during application shutdown + * to release classloader resources and delete temp JAR files. + * + * Do not use this method outside testing; rely on the shutdown hook logic to clean things up in prod. + */ + @VisibleForTesting + public static void close() + { + closeBridges(CASSANDRA_BRIDGES, bridge -> bridge.classLoader); + } + @VisibleForTesting public static T executeActionOnBridgeClassLoader(@NotNull CassandraVersion version, Throwing.Function action) { diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java index 6d3617d2e..b3270c9fa 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java @@ -450,6 +450,11 @@ protected void refreshSchema() // Closable + /** + * Stops the CDC consumer. Bridge resources (classloaders, temp files, type caches) are owned + * by the static bridge factories and should be cleaned up at application shutdown via + * {@link TypeCache#clear} and {@link CdcBridgeFactory#close}, not per-instance. + */ @Override public void close() { diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java index 7fde0ca36..ba723c4c0 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java @@ -38,6 +38,7 @@ /** * Caches Cassandra CqlField.CqlType objects, so they don't need to be re-created everytime. Keyed on keyspace and type to permit per keyspace UDT definitions. */ +@SuppressWarnings("UnstableApiUsage") public class TypeCache { private static final Logger LOGGER = LoggerFactory.getLogger(TypeCache.class); @@ -47,6 +48,12 @@ public class TypeCache private final Supplier cassandraTypesSupplier; + /** We have contention between instance level cache access and the global {@link #clear} method; we go ahead and + * lock at the class level since the clearing operation is expected to happen very infrequently leaving cache + * access uncontended and as predominantly a very fast null check. + */ + private static final Object creationLock = new Object(); + protected TypeCache(Supplier cassandraTypesSupplier) { this.cassandraTypesSupplier = cassandraTypesSupplier; @@ -54,12 +61,12 @@ protected TypeCache(Supplier cassandraTypesSupplier) public CqlField.CqlType getType(String keyspace, String typeString) { - maybeInit(); + Cache localCache = maybeInit(); CqlField.CqlType result; KeyspaceTypeKey key = KeyspaceTypeKey.of(keyspace, typeString); try { - result = cqlTypeCache.get(key, () -> getTypes().parseType(keyspace, typeString)); + result = localCache.get(key, () -> getTypes().parseType(keyspace, typeString)); } catch (CacheLoader.InvalidCacheLoadException | ExecutionException e) { @@ -69,7 +76,7 @@ public CqlField.CqlType getType(String keyspace, String typeString) { throw new RuntimeException("Unable to parse type: " + typeString); } - cqlTypeCache.put(key, result); + localCache.put(key, result); } return result; } @@ -82,36 +89,69 @@ public CassandraTypes getTypes() public static TypeCache get(CassandraVersion version) { return VERSION_TYPE_CACHE - .compute(version, - (key, previous) -> - new TypeCache(() -> CdcBridgeFactory.get(key).cassandraTypes()) + .computeIfAbsent(version, + key -> + new TypeCache(() -> CdcBridgeFactory.get(key).cassandraTypes()) ); } - private void maybeInit() + /** + * Globally clears all cached TypeCache instances. Each entry's Guava cache is invalidated and + * nullified to release references to CQL types (and transitively, bridge classloaders). + * + * This is a "do it once" kind of shutdown action; while _in theory_ it should be unnecessary (i.e. we let JVM + * shutdown just nuke everything), in practice better to be deliberate about this, especially in test environments + * where things are spun up and spun down and global resources hanging around can be problematic. + */ + public static void clear() { - if (cqlTypeCache != null) + synchronized (creationLock) { - return; + VERSION_TYPE_CACHE.forEach((version, typeCache) -> + { + if (typeCache.cqlTypeCache != null) + { + typeCache.cqlTypeCache.invalidateAll(); + typeCache.cqlTypeCache = null; + } + }); + VERSION_TYPE_CACHE.clear(); } + } - synchronized (this) + /** + * We use a manual cache since the parser is version dependent + * see {@link org.apache.cassandra.cdc.test.TestCdcBridgeProvider#getCassandraBridge} + * + * Since there's a real risk of race between this and {@link #clear}, we lock on {@link #creationLock} to enforce + * clear boundaries between the two. + * + * Since most calls here will jump right into the "cqlTypeCache != null" path, the risk of contention on a simple + * variable null check should be very _very_ low. We can't rely on {@link #cqlTypeCache} being volatile since we + * have the "check for null then return" pattern and we could race with teardown leading to NPE. + * + * @return Reference to the cache so subsequent clear calls don't NPE us. + */ + private Cache maybeInit() + { + synchronized (creationLock) { if (cqlTypeCache != null) { - return; + return cqlTypeCache; } - // using a manual cache since the parser is version dependent, see getCassandraBridge() cqlTypeCache = CacheBuilder.newBuilder() - .initialCapacity(CACHE_CAPACITY / 2) - .maximumSize(CACHE_CAPACITY) - .removalListener(notification -> { - // Log at the info level: the event is not expected, but could happen. - // With the logs, we can check how frequent it happens. - LOGGER.info("Type is evicted from cache. type='{}'", notification.getKey()); - }) - .build(); + .initialCapacity(CACHE_CAPACITY / 2) + .maximumSize(CACHE_CAPACITY) + .removalListener(notification -> + { + // Log at the info level: the event is not expected, but could happen. + // With the logs, we can check how frequent it happens. + LOGGER.info("Type is evicted from cache. type='{}'", notification.getKey()); + }) + .build(); + return cqlTypeCache; } } } diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/bridge/CdcBridgeFactoryShutdownHookTest.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/bridge/CdcBridgeFactoryShutdownHookTest.java new file mode 100644 index 000000000..c44529161 --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/bridge/CdcBridgeFactoryShutdownHookTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.bridge; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link CdcBridgeFactory} shutdown hook registration. + * Relies on {@code forkEvery = 1} so each test class gets a fresh JVM with clean static state. + */ +public class CdcBridgeFactoryShutdownHookTest +{ + @Test + public void testShutdownHookNotRegisteredInitially() + { + assertThat(CdcBridgeFactory.isShutdownHookRegistered()).isFalse(); + } + + @Test + public void testShutdownHookRegisteredOnce() + { + CdcBridgeFactory.maybeRegisterShutdownHook(); + assertThat(CdcBridgeFactory.isShutdownHookRegistered()).isTrue(); + + // Second call is idempotent — the flag stays true and no exception is thrown + CdcBridgeFactory.maybeRegisterShutdownHook(); + assertThat(CdcBridgeFactory.isShutdownHookRegistered()).isTrue(); + } +} diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTestBase.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTestBase.java index 0ea52357e..ec1a70b15 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTestBase.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTestBase.java @@ -21,6 +21,8 @@ import java.nio.file.Path; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.ExtendWith; import org.apache.cassandra.bridge.CassandraBridge; @@ -44,13 +46,28 @@ public abstract class CdcTestBase protected CommitLogInstance commitLog; protected Path commitLogDir; + @BeforeAll + static void beforeAll() + { + TestCdcBridgeProvider.setup(); + } + + /** + * Releases all cached bridges, temp directories, and classloaders after all tests in the class complete. + */ + @AfterAll + static void afterAll() + { + TestCdcBridgeProvider.tearDown(); + } + void setup(CassandraVersion version) { - this.cdcOptions = CdcBridgeProvider.getCdcOptions(version); - this.bridge = CdcBridgeProvider.getCassandraBridge(version); - this.cdcBridge = CdcBridgeProvider.getTestCdcBridge(version); - this.commitLogDir = CdcBridgeProvider.getCommitLogDir(version); + this.cdcOptions = TestCdcBridgeProvider.getCdcOptions(version); + this.bridge = TestCdcBridgeProvider.getCassandraBridge(version); + this.cdcBridge = TestCdcBridgeProvider.getTestCdcBridge(version); + this.commitLogDir = TestCdcBridgeProvider.getCommitLogDir(version); this.commitLog = cdcBridge.createCommitLogInstance(commitLogDir); - this.messageConverter = CdcBridgeProvider.getMessageConverter(version); + this.messageConverter = TestCdcBridgeProvider.getMessageConverter(version); } } diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java index 2c6d7ce73..f05c0ae3c 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java @@ -233,7 +233,7 @@ public CdcTester build() CdcOptions options = cdcOptions; if (options == null) { - options = CdcBridgeProvider.getCdcOptions(bridge.getVersion()); + options = TestCdcBridgeProvider.getCdcOptions(bridge.getVersion()); } return new CdcTester(bridge, cdcBridge, schemaBuilder.build(), testDir, writers, numRows, expectedNumRows, addLastModificationTime, eventChecker, shouldCdcEventWriterFailOnProcessing, diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcBridgeProvider.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestCdcBridgeProvider.java similarity index 71% rename from cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcBridgeProvider.java rename to cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestCdcBridgeProvider.java index 4d8e8401f..e5e1044ff 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcBridgeProvider.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestCdcBridgeProvider.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.cassandra.bridge.BridgeInitializationParameters; import org.apache.cassandra.bridge.CassandraBridge; @@ -35,39 +36,72 @@ import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.msg.jdk.JdkMessageConverter; -public class CdcBridgeProvider +/** + * Test utility for managing CDC bridge instances per version. Call {@link #setup()} explicitly + * before using any getter methods (e.g. from a {@code @BeforeAll} hook). Call {@link #tearDown()} + * to release all cached bridges, temp directories, and classloaders. + */ +public class TestCdcBridgeProvider { private static final ConcurrentMap OPTIONS = new ConcurrentHashMap<>(); private static final ConcurrentMap COMMIT_LOG_DIRS = new ConcurrentHashMap<>(); private static final ConcurrentMap BRIDGES = new ConcurrentHashMap<>(); private static final ConcurrentMap CDC_BRIDGES = new ConcurrentHashMap<>(); private static final ConcurrentMap MESSAGE_CONVERTERS = new ConcurrentHashMap<>(); + private static final AtomicBoolean initialized = new AtomicBoolean(false); - static + private TestCdcBridgeProvider() { - setup(); + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } - private CdcBridgeProvider() + /** + * Initializes bridge instances for all test versions. Idempotent via {@link AtomicBoolean}; + * only the first call performs initialization. Must be called from {@code @BeforeAll} in test classes. + */ + public static void setup() { - throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + if (initialized.compareAndSet(false, true)) + { + TestVersionSupplier.testVersions().forEach(v -> { + try + { + setupVersion(v); + } + catch (IOException e) + { + throw new IllegalStateException(e); + } + }); + } } - private static void setup() + /** + * Releases all cached bridges, temp commit log directories, and resets the bridge factory. + * Call from {@code @AfterAll} in test base classes when explicit cleanup is needed. + */ + public static void tearDown() { - TestVersionSupplier.testVersions().forEach(v -> { + OPTIONS.clear(); + CDC_BRIDGES.clear(); + BRIDGES.clear(); + MESSAGE_CONVERTERS.clear(); + COMMIT_LOG_DIRS.forEach((version, dir) -> { try { - setup(v); + Files.deleteIfExists(dir); } - catch (IOException e) + catch (IOException ignored) { - throw new IllegalStateException(e); + // best-effort cleanup } }); + COMMIT_LOG_DIRS.clear(); + CdcBridgeFactory.close(); + initialized.set(false); } - private static void setup(CassandraVersion version) throws IOException + private static void setupVersion(CassandraVersion version) throws IOException { OPTIONS.put(version, new CdcOptions() { diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java index 677e61ae4..2b87473ad 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java @@ -30,13 +30,15 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.apache.cassandra.cdc.test.TestCdcBridgeProvider; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.bridge.CdcBridge; -import org.apache.cassandra.cdc.test.CdcBridgeProvider; import org.apache.cassandra.cdc.CdcTests; import org.apache.cassandra.cdc.LocalCommitLog; import org.apache.cassandra.cdc.api.CommitLog; @@ -55,13 +57,25 @@ public class BufferingCommitLogReaderTests { + @BeforeAll + static void beforeAll() + { + TestCdcBridgeProvider.setup(); + } + + @AfterAll + static void afterAll() + { + TestCdcBridgeProvider.tearDown(); + } + @ParameterizedTest @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") public void testReaderSeek(CassandraVersion version) { - CassandraBridge bridge = CdcBridgeProvider.getCassandraBridge(version); - CdcBridge cdcBridge = CdcBridgeProvider.getTestCdcBridge(version); - Path directory = CdcBridgeProvider.getCommitLogDir(version); + CassandraBridge bridge = TestCdcBridgeProvider.getCassandraBridge(version); + CdcBridge cdcBridge = TestCdcBridgeProvider.getTestCdcBridge(version); + Path directory = TestCdcBridgeProvider.getCommitLogDir(version); CommitLogInstance commitLog = cdcBridge.createCommitLogInstance(directory); TestSchema schema = TestSchema.builder(bridge) .withPartitionKey("pk", bridge.bigint()) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java index ae8b17d35..976f4ddd2 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java @@ -23,7 +23,9 @@ import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter; @@ -31,6 +33,8 @@ public final class CassandraBridgeFactory extends BaseCassandraBridgeFactory { + private static final AtomicBoolean SHUTDOWN_HOOK_REGISTERED = new AtomicBoolean(false); + // maps Cassandra version-specific jar name (e.g. 'four-zero') to matching CassandraBridge and SparkSqlTypeConverter private static final Map CASSANDRA_BRIDGES = new ConcurrentHashMap<>(CassandraVersion.values().length); @@ -39,11 +43,15 @@ public static class VersionSpecificBridge { public final CassandraBridge cassandraBridge; public final SparkSqlTypeConverter sparkSqlTypeConverter; + final ClassLoader classLoader; - public VersionSpecificBridge(CassandraBridge cassandraBridge, SparkSqlTypeConverter sparkSqlTypeConverter) + public VersionSpecificBridge(CassandraBridge cassandraBridge, + SparkSqlTypeConverter sparkSqlTypeConverter, + ClassLoader classLoader) { this.cassandraBridge = cassandraBridge; this.sparkSqlTypeConverter = sparkSqlTypeConverter; + this.classLoader = classLoader; } } @@ -65,12 +73,18 @@ public static CassandraBridge get(@NotNull CassandraVersionFeatures features) return get(getCassandraVersion(features)); } - @NotNull - public static CassandraBridge get(@NotNull CassandraVersion version) + private static VersionSpecificBridge getVersionSpecificBridge(@NotNull CassandraVersion version) { + maybeRegisterShutdownHook(); String jarBaseName = version.jarBaseName(); Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version + " is not supported"); - return CASSANDRA_BRIDGES.computeIfAbsent(jarBaseName, CassandraBridgeFactory::create).cassandraBridge; + return CASSANDRA_BRIDGES.computeIfAbsent(jarBaseName, CassandraBridgeFactory::create); + } + + @NotNull + public static CassandraBridge get(@NotNull CassandraVersion version) + { + return getVersionSpecificBridge(version).cassandraBridge; } @NotNull @@ -88,9 +102,7 @@ public static SparkSqlTypeConverter getSparkSql(@NotNull CassandraBridge bridge) @NotNull public static SparkSqlTypeConverter getSparkSql(@NotNull CassandraVersion version) { - String jarBaseName = version.jarBaseName(); - Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version + " is not supported"); - return CASSANDRA_BRIDGES.computeIfAbsent(jarBaseName, CassandraBridgeFactory::create).sparkSqlTypeConverter; + return getVersionSpecificBridge(version).sparkSqlTypeConverter; } @NotNull @@ -119,7 +131,7 @@ private static VersionSpecificBridge create(@NotNull String label) .loadClass(SparkSqlTypeConverter.IMPLEMENTATION_FQCN); Constructor typeConverterConstructor = typeConverter.getConstructor(); SparkSqlTypeConverter typeConverterInstance = typeConverterConstructor.newInstance(); - return new VersionSpecificBridge(bridgeInstance, typeConverterInstance); + return new VersionSpecificBridge(bridgeInstance, typeConverterInstance, loader); } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException exception) @@ -128,6 +140,56 @@ private static VersionSpecificBridge create(@NotNull String label) } } + /** + * Returns whether the shutdown hook has been registered. + */ + @VisibleForTesting + static boolean isShutdownHookRegistered() + { + return SHUTDOWN_HOOK_REGISTERED.get(); + } + + /** + * Registers a JVM shutdown hook that calls {@link #close} to release classloader resources. + * Uses CAS to ensure the hook is registered at most once. + */ + @VisibleForTesting + static void maybeRegisterShutdownHook() + { + if (SHUTDOWN_HOOK_REGISTERED.compareAndSet(false, true)) + { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try + { + close(); + } + catch (Exception e) + { + // best-effort cleanup during JVM shutdown + } + }, CassandraBridgeFactory.class.getSimpleName() + "-shutdown")); + } + } + + /** + * Closes all cached bridge classloaders and clears the cache. Call during application shutdown + * to release classloader resources and delete temp JAR files. + */ + public static void close() + { + closeBridges(CASSANDRA_BRIDGES, bridge -> bridge.classLoader); + } + + /** + * Clears the bridge cache without closing classloaders. Intended for test cleanup where + * JVM exit handles resource release. + */ + @VisibleForTesting + public static void reset() + { + CASSANDRA_BRIDGES.clear(); + } + /*** * Returns the quoted name when the {@code quoteIdentifiers} parameter is {@code true} AND the * {@code unquotedName} needs to be quoted (i.e. it uses mixed case, or it is a Cassandra reserved word). diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/CassandraBridgeFactoryShutdownHookTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/CassandraBridgeFactoryShutdownHookTest.java new file mode 100644 index 000000000..5afc3764a --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/CassandraBridgeFactoryShutdownHookTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.bridge; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link CassandraBridgeFactory} shutdown hook registration. + * Relies on {@code forkEvery = 1} so each test class gets a fresh JVM with clean static state. + */ +public class CassandraBridgeFactoryShutdownHookTest +{ + @Test + public void testShutdownHookNotRegisteredInitially() + { + assertThat(CassandraBridgeFactory.isShutdownHookRegistered()).isFalse(); + } + + @Test + public void testShutdownHookRegisteredOnce() + { + CassandraBridgeFactory.maybeRegisterShutdownHook(); + assertThat(CassandraBridgeFactory.isShutdownHookRegistered()).isTrue(); + + // Second call is idempotent — the flag stays true and no exception is thrown + CassandraBridgeFactory.maybeRegisterShutdownHook(); + assertThat(CassandraBridgeFactory.isShutdownHookRegistered()).isTrue(); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/PostDelegationClassLoaderTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/PostDelegationClassLoaderTests.java new file mode 100644 index 000000000..2be2f85db --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/PostDelegationClassLoaderTests.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.bridge; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link PostDelegationClassLoader} resource cleanup. + */ +public class PostDelegationClassLoaderTests +{ + @Test + public void testTempFilesDeletedOnClose() throws IOException + { + List tempFiles = new ArrayList<>(); + for (int i = 0; i < 3; i++) + { + tempFiles.add(Files.createTempFile("test-bridge-", ".jar")); + } + tempFiles.forEach(f -> assertThat(f).exists()); + + PostDelegationClassLoader loader = new PostDelegationClassLoader(new URL[0], null, tempFiles); + loader.close(); + + tempFiles.forEach(f -> assertThat(f).doesNotExist()); + } +} diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BaseCassandraBridgeFactory.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BaseCassandraBridgeFactory.java index 66338e49b..0b522ea97 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BaseCassandraBridgeFactory.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BaseCassandraBridgeFactory.java @@ -29,20 +29,53 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.function.Function; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.jetbrains.annotations.NotNull; public class BaseCassandraBridgeFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(BaseCassandraBridgeFactory.class); + protected BaseCassandraBridgeFactory() { throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } + /** + * Closes all bridge classloaders in the provided cache and clears it. + * Each entry's classloader is extracted via the provided function; if it is a + * {@link PostDelegationClassLoader}, it is closed (which deletes its temp JAR files). + */ + static void closeBridges(Map bridges, Function classLoaderExtractor) + { + bridges.forEach((label, bridge) -> { + ClassLoader classLoader = classLoaderExtractor.apply(bridge); + if (classLoader instanceof PostDelegationClassLoader) + { + try + { + ((PostDelegationClassLoader) classLoader).close(); + } + catch (IOException e) + { + LOGGER.warn("Failed to close classloader for bridge: {}", label, e); + } + } + }); + bridges.clear(); + } + @NotNull public static CassandraVersion getCassandraVersion(@NotNull String version) { @@ -100,11 +133,17 @@ public static CassandraBridge loadCassandraBridge(String label) return constructor.newInstance(); } - public static ClassLoader buildClassLoader(String... resourceNames) + /** + * Builds a classloader from the given resource names. Temp files extracted from class resources + * are tracked by the returned {@link PostDelegationClassLoader} and deleted when it is closed. + */ + public static PostDelegationClassLoader buildClassLoader(String... resourceNames) { + List tempFiles = new ArrayList<>(resourceNames.length); URL[] urls = Arrays.stream(resourceNames) .map(BaseCassandraBridgeFactory::copyClassResourceToFile) .map(jar -> { + tempFiles.add(jar.toPath()); try { return jar.toURI().toURL(); @@ -115,7 +154,7 @@ public static ClassLoader buildClassLoader(String... resourceNames) } }).toArray(URL[]::new); - return new PostDelegationClassLoader(urls, Thread.currentThread().getContextClassLoader()); + return new PostDelegationClassLoader(urls, Thread.currentThread().getContextClassLoader(), tempFiles); } public static File copyClassResourceToFile(String resource) @@ -127,6 +166,7 @@ public static File copyClassResourceToFile(String resource) throw new NullPointerException("Could not find resource: " + resource); } Path jarPath = Files.createTempFile(null, ".jar"); + jarPath.toFile().deleteOnExit(); Files.copy(contents, jarPath, StandardCopyOption.REPLACE_EXISTING); return jarPath.toFile(); } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java index 63e11cfea..993a2349d 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java @@ -19,8 +19,16 @@ package org.apache.cassandra.bridge; +import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -28,14 +36,18 @@ /** * This custom implementation of a {@link ClassLoader} enables deferred execution-time loading of a particular version * of class hierarchy from one of many embedded the {@code cassandra-all} library JARs. It first attempts to load any - * requested class from the extracted JAR, and resorts to using the parent class loader when the the class is not there. + * requested class from the extracted JAR, and resorts to using the parent class loader when the class is not there. * This behavior is opposite to the one of standard {@link URLClassLoader}, which invokes its parent class loader first. */ public class PostDelegationClassLoader extends URLClassLoader { - public PostDelegationClassLoader(@NotNull URL[] urls, @Nullable ClassLoader parent) + private static final Logger LOGGER = LoggerFactory.getLogger(PostDelegationClassLoader.class); + private final List tempFiles; + + public PostDelegationClassLoader(@NotNull URL[] urls, @Nullable ClassLoader parent, @NotNull List tempFiles) { super(urls, parent); + this.tempFiles = tempFiles; } @Override @@ -66,4 +78,30 @@ protected synchronized Class loadClass(@Nullable String name, boolean resolve } return type; } + + /** + * Closes this classloader and deletes all associated temp JAR files. + */ + @Override + public void close() throws IOException + { + try + { + super.close(); + } + finally + { + for (Path tempFile : tempFiles) + { + try + { + Files.deleteIfExists(tempFile); + } + catch (IOException e) + { + LOGGER.warn("Failed to delete temp JAR file: {}", tempFile, e); + } + } + } + } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java index cd50347c6..ec8e29f79 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java @@ -77,20 +77,6 @@ public abstract class AbstractCdcBridgeImplementation extends CdcBridge { - private static final TableIdLookup INTERNAL_TABLE_ID_LOOKUP = new TableIdLookup() - { - @Nullable - public UUID lookup(String keyspace, String table) throws NoSuchElementException - { - TableMetadata tm = Schema.instance.getTableMetadata(keyspace, table); - if (tm == null) - { - throw new NoSuchElementException(); - } - return tm.id.asUUID(); - } - }; - public void log(CqlTable cqlTable, CommitLogInstance log, Row row, long timestamp) { log(TimeProvider.DEFAULT, cqlTable, log, row, timestamp); @@ -101,9 +87,21 @@ public CommitLogInstance createCommitLogInstance(Path path) return new FourZeroCommitLog(path); } + /** + * Returns a TableIdLookup that resolves table IDs via the current Schema instance. + * Creates a new lambda on each call to avoid permanently pinning Schema.instance + * to a static field, which would prevent GC of the entire schema graph. + */ public TableIdLookup internalTableIdLookup() { - return INTERNAL_TABLE_ID_LOOKUP; + return (keyspace, table) -> { + TableMetadata tm = Schema.instance.getTableMetadata(keyspace, table); + if (tm == null) + { + throw new NoSuchElementException(); + } + return tm.id.asUUID(); + }; } public void updateCdcSchema(@NotNull Set cdcTables, @NotNull Partitioner partitioner, @NotNull TableIdLookup tableIdLookup) From aae3ed005ecf2f6fcf7162734e1455e3b2675af8 Mon Sep 17 00:00:00 2001 From: Josh McKenzie Date: Tue, 31 Mar 2026 12:54:54 -0400 Subject: [PATCH 2/2] checkstyle fixes --- .../src/main/java/org/apache/cassandra/cdc/TypeCache.java | 6 ++---- .../apache/cassandra/bridge/PostDelegationClassLoader.java | 1 - .../cassandra/bridge/AbstractCdcBridgeImplementation.java | 1 - 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java index ba723c4c0..b63f101f9 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java @@ -107,8 +107,7 @@ public static void clear() { synchronized (creationLock) { - VERSION_TYPE_CACHE.forEach((version, typeCache) -> - { + VERSION_TYPE_CACHE.forEach((version, typeCache) -> { if (typeCache.cqlTypeCache != null) { typeCache.cqlTypeCache.invalidateAll(); @@ -144,8 +143,7 @@ private Cache maybeInit() cqlTypeCache = CacheBuilder.newBuilder() .initialCapacity(CACHE_CAPACITY / 2) .maximumSize(CACHE_CAPACITY) - .removalListener(notification -> - { + .removalListener(notification -> { // Log at the info level: the event is not expected, but could happen. // With the logs, we can check how frequent it happens. LOGGER.info("Type is evicted from cache. type='{}'", notification.getKey()); diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java index 993a2349d..c0ce4505b 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java @@ -24,7 +24,6 @@ import java.net.URLClassLoader; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collections; import java.util.List; import org.slf4j.Logger; diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java index ec8e29f79..77f8f8814 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java @@ -27,7 +27,6 @@ import java.util.Objects; import java.util.Random; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiConsumer; import java.util.function.Consumer;