diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java index 0d19bb826..abb61e131 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java @@ -23,9 +23,12 @@ import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.cassandra.cdc.TypeCache; import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; import org.apache.cassandra.spark.utils.Throwing; import org.jetbrains.annotations.NotNull; @@ -33,9 +36,11 @@ public final class CdcBridgeFactory extends BaseCassandraBridgeFactory { + private static final AtomicBoolean SHUTDOWN_HOOK_REGISTERED = new AtomicBoolean(false); + // maps Cassandra version-specific jar name (e.g. 'four-zero') to matching CassandraBridge and SparkSqlTypeConverter private static final Map CASSANDRA_BRIDGES = - new ConcurrentHashMap<>(CassandraVersion.values().length); + new ConcurrentHashMap<>(CassandraVersion.values().length); public static class VersionSpecificBridge { @@ -77,11 +82,9 @@ public static CassandraBridge get(@NotNull CassandraVersionFeatures features) private static CdcBridgeFactory.VersionSpecificBridge getVersionSpecificBridge(@NotNull CassandraVersion version) { + maybeRegisterShutdownHook(); String jarBaseName = version.jarBaseName(); - if (jarBaseName == null) - { - throw new NullPointerException("Cassandra version " + version + " is not supported"); - } + Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version + " is not supported"); return CASSANDRA_BRIDGES.computeIfAbsent(jarBaseName, CdcBridgeFactory::create); } @@ -165,6 +168,51 @@ private static VersionSpecificBridge create(@NotNull String label) } } + /** + * Returns whether the shutdown hook has been registered. + */ + @VisibleForTesting + static boolean isShutdownHookRegistered() + { + return SHUTDOWN_HOOK_REGISTERED.get(); + } + + /** + * Registers a JVM shutdown hook that calls {@link #close()} to release classloader resources. + * Clears the {@link TypeCache} first so type references are released before classloaders close. + * Uses CAS to ensure the hook is registered at most once. + */ + @VisibleForTesting + static void maybeRegisterShutdownHook() + { + if (SHUTDOWN_HOOK_REGISTERED.compareAndSet(false, true)) + { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try + { + TypeCache.clear(); + close(); + } + catch (Exception e) + { + // best-effort cleanup during JVM shutdown + } + }, CdcBridgeFactory.class.getSimpleName() + "-shutdown")); + } + } + + /** + * Closes all cached bridge classloaders and clears the cache. Call during application shutdown + * to release classloader resources and delete temp JAR files. + * + * Do not use this method outside testing; rely on the shutdown hook logic to clean things up in prod. + */ + @VisibleForTesting + public static void close() + { + closeBridges(CASSANDRA_BRIDGES, bridge -> bridge.classLoader); + } + @VisibleForTesting public static T executeActionOnBridgeClassLoader(@NotNull CassandraVersion version, Throwing.Function action) { diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java index 6d3617d2e..b3270c9fa 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/Cdc.java @@ -450,6 +450,11 @@ protected void refreshSchema() // Closable + /** + * Stops the CDC consumer. Bridge resources (classloaders, temp files, type caches) are owned + * by the static bridge factories and should be cleaned up at application shutdown via + * {@link TypeCache#clear} and {@link CdcBridgeFactory#close}, not per-instance. + */ @Override public void close() { diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java index 7fde0ca36..b63f101f9 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/cdc/TypeCache.java @@ -38,6 +38,7 @@ /** * Caches Cassandra CqlField.CqlType objects, so they don't need to be re-created everytime. Keyed on keyspace and type to permit per keyspace UDT definitions. */ +@SuppressWarnings("UnstableApiUsage") public class TypeCache { private static final Logger LOGGER = LoggerFactory.getLogger(TypeCache.class); @@ -47,6 +48,12 @@ public class TypeCache private final Supplier cassandraTypesSupplier; + /** We have contention between instance level cache access and the global {@link #clear} method; we go ahead and + * lock at the class level since the clearing operation is expected to happen very infrequently leaving cache + * access uncontended and as predominantly a very fast null check. + */ + private static final Object creationLock = new Object(); + protected TypeCache(Supplier cassandraTypesSupplier) { this.cassandraTypesSupplier = cassandraTypesSupplier; @@ -54,12 +61,12 @@ protected TypeCache(Supplier cassandraTypesSupplier) public CqlField.CqlType getType(String keyspace, String typeString) { - maybeInit(); + Cache localCache = maybeInit(); CqlField.CqlType result; KeyspaceTypeKey key = KeyspaceTypeKey.of(keyspace, typeString); try { - result = cqlTypeCache.get(key, () -> getTypes().parseType(keyspace, typeString)); + result = localCache.get(key, () -> getTypes().parseType(keyspace, typeString)); } catch (CacheLoader.InvalidCacheLoadException | ExecutionException e) { @@ -69,7 +76,7 @@ public CqlField.CqlType getType(String keyspace, String typeString) { throw new RuntimeException("Unable to parse type: " + typeString); } - cqlTypeCache.put(key, result); + localCache.put(key, result); } return result; } @@ -82,36 +89,67 @@ public CassandraTypes getTypes() public static TypeCache get(CassandraVersion version) { return VERSION_TYPE_CACHE - .compute(version, - (key, previous) -> - new TypeCache(() -> CdcBridgeFactory.get(key).cassandraTypes()) + .computeIfAbsent(version, + key -> + new TypeCache(() -> CdcBridgeFactory.get(key).cassandraTypes()) ); } - private void maybeInit() + /** + * Globally clears all cached TypeCache instances. Each entry's Guava cache is invalidated and + * nullified to release references to CQL types (and transitively, bridge classloaders). + * + * This is a "do it once" kind of shutdown action; while _in theory_ it should be unnecessary (i.e. we let JVM + * shutdown just nuke everything), in practice better to be deliberate about this, especially in test environments + * where things are spun up and spun down and global resources hanging around can be problematic. + */ + public static void clear() { - if (cqlTypeCache != null) + synchronized (creationLock) { - return; + VERSION_TYPE_CACHE.forEach((version, typeCache) -> { + if (typeCache.cqlTypeCache != null) + { + typeCache.cqlTypeCache.invalidateAll(); + typeCache.cqlTypeCache = null; + } + }); + VERSION_TYPE_CACHE.clear(); } + } - synchronized (this) + /** + * We use a manual cache since the parser is version dependent + * see {@link org.apache.cassandra.cdc.test.TestCdcBridgeProvider#getCassandraBridge} + * + * Since there's a real risk of race between this and {@link #clear}, we lock on {@link #creationLock} to enforce + * clear boundaries between the two. + * + * Since most calls here will jump right into the "cqlTypeCache != null" path, the risk of contention on a simple + * variable null check should be very _very_ low. We can't rely on {@link #cqlTypeCache} being volatile since we + * have the "check for null then return" pattern and we could race with teardown leading to NPE. + * + * @return Reference to the cache so subsequent clear calls don't NPE us. + */ + private Cache maybeInit() + { + synchronized (creationLock) { if (cqlTypeCache != null) { - return; + return cqlTypeCache; } - // using a manual cache since the parser is version dependent, see getCassandraBridge() cqlTypeCache = CacheBuilder.newBuilder() - .initialCapacity(CACHE_CAPACITY / 2) - .maximumSize(CACHE_CAPACITY) - .removalListener(notification -> { - // Log at the info level: the event is not expected, but could happen. - // With the logs, we can check how frequent it happens. - LOGGER.info("Type is evicted from cache. type='{}'", notification.getKey()); - }) - .build(); + .initialCapacity(CACHE_CAPACITY / 2) + .maximumSize(CACHE_CAPACITY) + .removalListener(notification -> { + // Log at the info level: the event is not expected, but could happen. + // With the logs, we can check how frequent it happens. + LOGGER.info("Type is evicted from cache. type='{}'", notification.getKey()); + }) + .build(); + return cqlTypeCache; } } } diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/bridge/CdcBridgeFactoryShutdownHookTest.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/bridge/CdcBridgeFactoryShutdownHookTest.java new file mode 100644 index 000000000..c44529161 --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/bridge/CdcBridgeFactoryShutdownHookTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.bridge; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link CdcBridgeFactory} shutdown hook registration. + * Relies on {@code forkEvery = 1} so each test class gets a fresh JVM with clean static state. + */ +public class CdcBridgeFactoryShutdownHookTest +{ + @Test + public void testShutdownHookNotRegisteredInitially() + { + assertThat(CdcBridgeFactory.isShutdownHookRegistered()).isFalse(); + } + + @Test + public void testShutdownHookRegisteredOnce() + { + CdcBridgeFactory.maybeRegisterShutdownHook(); + assertThat(CdcBridgeFactory.isShutdownHookRegistered()).isTrue(); + + // Second call is idempotent — the flag stays true and no exception is thrown + CdcBridgeFactory.maybeRegisterShutdownHook(); + assertThat(CdcBridgeFactory.isShutdownHookRegistered()).isTrue(); + } +} diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTestBase.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTestBase.java index 0ea52357e..ec1a70b15 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTestBase.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTestBase.java @@ -21,6 +21,8 @@ import java.nio.file.Path; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.ExtendWith; import org.apache.cassandra.bridge.CassandraBridge; @@ -44,13 +46,28 @@ public abstract class CdcTestBase protected CommitLogInstance commitLog; protected Path commitLogDir; + @BeforeAll + static void beforeAll() + { + TestCdcBridgeProvider.setup(); + } + + /** + * Releases all cached bridges, temp directories, and classloaders after all tests in the class complete. + */ + @AfterAll + static void afterAll() + { + TestCdcBridgeProvider.tearDown(); + } + void setup(CassandraVersion version) { - this.cdcOptions = CdcBridgeProvider.getCdcOptions(version); - this.bridge = CdcBridgeProvider.getCassandraBridge(version); - this.cdcBridge = CdcBridgeProvider.getTestCdcBridge(version); - this.commitLogDir = CdcBridgeProvider.getCommitLogDir(version); + this.cdcOptions = TestCdcBridgeProvider.getCdcOptions(version); + this.bridge = TestCdcBridgeProvider.getCassandraBridge(version); + this.cdcBridge = TestCdcBridgeProvider.getTestCdcBridge(version); + this.commitLogDir = TestCdcBridgeProvider.getCommitLogDir(version); this.commitLog = cdcBridge.createCommitLogInstance(commitLogDir); - this.messageConverter = CdcBridgeProvider.getMessageConverter(version); + this.messageConverter = TestCdcBridgeProvider.getMessageConverter(version); } } diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java index 2c6d7ce73..f05c0ae3c 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcTester.java @@ -233,7 +233,7 @@ public CdcTester build() CdcOptions options = cdcOptions; if (options == null) { - options = CdcBridgeProvider.getCdcOptions(bridge.getVersion()); + options = TestCdcBridgeProvider.getCdcOptions(bridge.getVersion()); } return new CdcTester(bridge, cdcBridge, schemaBuilder.build(), testDir, writers, numRows, expectedNumRows, addLastModificationTime, eventChecker, shouldCdcEventWriterFailOnProcessing, diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcBridgeProvider.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestCdcBridgeProvider.java similarity index 71% rename from cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcBridgeProvider.java rename to cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestCdcBridgeProvider.java index 4d8e8401f..e5e1044ff 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/CdcBridgeProvider.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/test/TestCdcBridgeProvider.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.cassandra.bridge.BridgeInitializationParameters; import org.apache.cassandra.bridge.CassandraBridge; @@ -35,39 +36,72 @@ import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.msg.jdk.JdkMessageConverter; -public class CdcBridgeProvider +/** + * Test utility for managing CDC bridge instances per version. Call {@link #setup()} explicitly + * before using any getter methods (e.g. from a {@code @BeforeAll} hook). Call {@link #tearDown()} + * to release all cached bridges, temp directories, and classloaders. + */ +public class TestCdcBridgeProvider { private static final ConcurrentMap OPTIONS = new ConcurrentHashMap<>(); private static final ConcurrentMap COMMIT_LOG_DIRS = new ConcurrentHashMap<>(); private static final ConcurrentMap BRIDGES = new ConcurrentHashMap<>(); private static final ConcurrentMap CDC_BRIDGES = new ConcurrentHashMap<>(); private static final ConcurrentMap MESSAGE_CONVERTERS = new ConcurrentHashMap<>(); + private static final AtomicBoolean initialized = new AtomicBoolean(false); - static + private TestCdcBridgeProvider() { - setup(); + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } - private CdcBridgeProvider() + /** + * Initializes bridge instances for all test versions. Idempotent via {@link AtomicBoolean}; + * only the first call performs initialization. Must be called from {@code @BeforeAll} in test classes. + */ + public static void setup() { - throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + if (initialized.compareAndSet(false, true)) + { + TestVersionSupplier.testVersions().forEach(v -> { + try + { + setupVersion(v); + } + catch (IOException e) + { + throw new IllegalStateException(e); + } + }); + } } - private static void setup() + /** + * Releases all cached bridges, temp commit log directories, and resets the bridge factory. + * Call from {@code @AfterAll} in test base classes when explicit cleanup is needed. + */ + public static void tearDown() { - TestVersionSupplier.testVersions().forEach(v -> { + OPTIONS.clear(); + CDC_BRIDGES.clear(); + BRIDGES.clear(); + MESSAGE_CONVERTERS.clear(); + COMMIT_LOG_DIRS.forEach((version, dir) -> { try { - setup(v); + Files.deleteIfExists(dir); } - catch (IOException e) + catch (IOException ignored) { - throw new IllegalStateException(e); + // best-effort cleanup } }); + COMMIT_LOG_DIRS.clear(); + CdcBridgeFactory.close(); + initialized.set(false); } - private static void setup(CassandraVersion version) throws IOException + private static void setupVersion(CassandraVersion version) throws IOException { OPTIONS.put(version, new CdcOptions() { diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java index 677e61ae4..2b87473ad 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java @@ -30,13 +30,15 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.apache.cassandra.cdc.test.TestCdcBridgeProvider; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.bridge.CdcBridge; -import org.apache.cassandra.cdc.test.CdcBridgeProvider; import org.apache.cassandra.cdc.CdcTests; import org.apache.cassandra.cdc.LocalCommitLog; import org.apache.cassandra.cdc.api.CommitLog; @@ -55,13 +57,25 @@ public class BufferingCommitLogReaderTests { + @BeforeAll + static void beforeAll() + { + TestCdcBridgeProvider.setup(); + } + + @AfterAll + static void afterAll() + { + TestCdcBridgeProvider.tearDown(); + } + @ParameterizedTest @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") public void testReaderSeek(CassandraVersion version) { - CassandraBridge bridge = CdcBridgeProvider.getCassandraBridge(version); - CdcBridge cdcBridge = CdcBridgeProvider.getTestCdcBridge(version); - Path directory = CdcBridgeProvider.getCommitLogDir(version); + CassandraBridge bridge = TestCdcBridgeProvider.getCassandraBridge(version); + CdcBridge cdcBridge = TestCdcBridgeProvider.getTestCdcBridge(version); + Path directory = TestCdcBridgeProvider.getCommitLogDir(version); CommitLogInstance commitLog = cdcBridge.createCommitLogInstance(directory); TestSchema schema = TestSchema.builder(bridge) .withPartitionKey("pk", bridge.bigint()) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java index ae8b17d35..976f4ddd2 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java @@ -23,7 +23,9 @@ import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter; @@ -31,6 +33,8 @@ public final class CassandraBridgeFactory extends BaseCassandraBridgeFactory { + private static final AtomicBoolean SHUTDOWN_HOOK_REGISTERED = new AtomicBoolean(false); + // maps Cassandra version-specific jar name (e.g. 'four-zero') to matching CassandraBridge and SparkSqlTypeConverter private static final Map CASSANDRA_BRIDGES = new ConcurrentHashMap<>(CassandraVersion.values().length); @@ -39,11 +43,15 @@ public static class VersionSpecificBridge { public final CassandraBridge cassandraBridge; public final SparkSqlTypeConverter sparkSqlTypeConverter; + final ClassLoader classLoader; - public VersionSpecificBridge(CassandraBridge cassandraBridge, SparkSqlTypeConverter sparkSqlTypeConverter) + public VersionSpecificBridge(CassandraBridge cassandraBridge, + SparkSqlTypeConverter sparkSqlTypeConverter, + ClassLoader classLoader) { this.cassandraBridge = cassandraBridge; this.sparkSqlTypeConverter = sparkSqlTypeConverter; + this.classLoader = classLoader; } } @@ -65,12 +73,18 @@ public static CassandraBridge get(@NotNull CassandraVersionFeatures features) return get(getCassandraVersion(features)); } - @NotNull - public static CassandraBridge get(@NotNull CassandraVersion version) + private static VersionSpecificBridge getVersionSpecificBridge(@NotNull CassandraVersion version) { + maybeRegisterShutdownHook(); String jarBaseName = version.jarBaseName(); Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version + " is not supported"); - return CASSANDRA_BRIDGES.computeIfAbsent(jarBaseName, CassandraBridgeFactory::create).cassandraBridge; + return CASSANDRA_BRIDGES.computeIfAbsent(jarBaseName, CassandraBridgeFactory::create); + } + + @NotNull + public static CassandraBridge get(@NotNull CassandraVersion version) + { + return getVersionSpecificBridge(version).cassandraBridge; } @NotNull @@ -88,9 +102,7 @@ public static SparkSqlTypeConverter getSparkSql(@NotNull CassandraBridge bridge) @NotNull public static SparkSqlTypeConverter getSparkSql(@NotNull CassandraVersion version) { - String jarBaseName = version.jarBaseName(); - Preconditions.checkNotNull(jarBaseName, "Cassandra version " + version + " is not supported"); - return CASSANDRA_BRIDGES.computeIfAbsent(jarBaseName, CassandraBridgeFactory::create).sparkSqlTypeConverter; + return getVersionSpecificBridge(version).sparkSqlTypeConverter; } @NotNull @@ -119,7 +131,7 @@ private static VersionSpecificBridge create(@NotNull String label) .loadClass(SparkSqlTypeConverter.IMPLEMENTATION_FQCN); Constructor typeConverterConstructor = typeConverter.getConstructor(); SparkSqlTypeConverter typeConverterInstance = typeConverterConstructor.newInstance(); - return new VersionSpecificBridge(bridgeInstance, typeConverterInstance); + return new VersionSpecificBridge(bridgeInstance, typeConverterInstance, loader); } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException exception) @@ -128,6 +140,56 @@ private static VersionSpecificBridge create(@NotNull String label) } } + /** + * Returns whether the shutdown hook has been registered. + */ + @VisibleForTesting + static boolean isShutdownHookRegistered() + { + return SHUTDOWN_HOOK_REGISTERED.get(); + } + + /** + * Registers a JVM shutdown hook that calls {@link #close} to release classloader resources. + * Uses CAS to ensure the hook is registered at most once. + */ + @VisibleForTesting + static void maybeRegisterShutdownHook() + { + if (SHUTDOWN_HOOK_REGISTERED.compareAndSet(false, true)) + { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try + { + close(); + } + catch (Exception e) + { + // best-effort cleanup during JVM shutdown + } + }, CassandraBridgeFactory.class.getSimpleName() + "-shutdown")); + } + } + + /** + * Closes all cached bridge classloaders and clears the cache. Call during application shutdown + * to release classloader resources and delete temp JAR files. + */ + public static void close() + { + closeBridges(CASSANDRA_BRIDGES, bridge -> bridge.classLoader); + } + + /** + * Clears the bridge cache without closing classloaders. Intended for test cleanup where + * JVM exit handles resource release. + */ + @VisibleForTesting + public static void reset() + { + CASSANDRA_BRIDGES.clear(); + } + /*** * Returns the quoted name when the {@code quoteIdentifiers} parameter is {@code true} AND the * {@code unquotedName} needs to be quoted (i.e. it uses mixed case, or it is a Cassandra reserved word). diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/CassandraBridgeFactoryShutdownHookTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/CassandraBridgeFactoryShutdownHookTest.java new file mode 100644 index 000000000..5afc3764a --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/CassandraBridgeFactoryShutdownHookTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.bridge; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link CassandraBridgeFactory} shutdown hook registration. + * Relies on {@code forkEvery = 1} so each test class gets a fresh JVM with clean static state. + */ +public class CassandraBridgeFactoryShutdownHookTest +{ + @Test + public void testShutdownHookNotRegisteredInitially() + { + assertThat(CassandraBridgeFactory.isShutdownHookRegistered()).isFalse(); + } + + @Test + public void testShutdownHookRegisteredOnce() + { + CassandraBridgeFactory.maybeRegisterShutdownHook(); + assertThat(CassandraBridgeFactory.isShutdownHookRegistered()).isTrue(); + + // Second call is idempotent — the flag stays true and no exception is thrown + CassandraBridgeFactory.maybeRegisterShutdownHook(); + assertThat(CassandraBridgeFactory.isShutdownHookRegistered()).isTrue(); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/PostDelegationClassLoaderTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/PostDelegationClassLoaderTests.java new file mode 100644 index 000000000..2be2f85db --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/bridge/PostDelegationClassLoaderTests.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.bridge; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link PostDelegationClassLoader} resource cleanup. + */ +public class PostDelegationClassLoaderTests +{ + @Test + public void testTempFilesDeletedOnClose() throws IOException + { + List tempFiles = new ArrayList<>(); + for (int i = 0; i < 3; i++) + { + tempFiles.add(Files.createTempFile("test-bridge-", ".jar")); + } + tempFiles.forEach(f -> assertThat(f).exists()); + + PostDelegationClassLoader loader = new PostDelegationClassLoader(new URL[0], null, tempFiles); + loader.close(); + + tempFiles.forEach(f -> assertThat(f).doesNotExist()); + } +} diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BaseCassandraBridgeFactory.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BaseCassandraBridgeFactory.java index 66338e49b..0b522ea97 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BaseCassandraBridgeFactory.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/BaseCassandraBridgeFactory.java @@ -29,20 +29,53 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.function.Function; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.jetbrains.annotations.NotNull; public class BaseCassandraBridgeFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(BaseCassandraBridgeFactory.class); + protected BaseCassandraBridgeFactory() { throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } + /** + * Closes all bridge classloaders in the provided cache and clears it. + * Each entry's classloader is extracted via the provided function; if it is a + * {@link PostDelegationClassLoader}, it is closed (which deletes its temp JAR files). + */ + static void closeBridges(Map bridges, Function classLoaderExtractor) + { + bridges.forEach((label, bridge) -> { + ClassLoader classLoader = classLoaderExtractor.apply(bridge); + if (classLoader instanceof PostDelegationClassLoader) + { + try + { + ((PostDelegationClassLoader) classLoader).close(); + } + catch (IOException e) + { + LOGGER.warn("Failed to close classloader for bridge: {}", label, e); + } + } + }); + bridges.clear(); + } + @NotNull public static CassandraVersion getCassandraVersion(@NotNull String version) { @@ -100,11 +133,17 @@ public static CassandraBridge loadCassandraBridge(String label) return constructor.newInstance(); } - public static ClassLoader buildClassLoader(String... resourceNames) + /** + * Builds a classloader from the given resource names. Temp files extracted from class resources + * are tracked by the returned {@link PostDelegationClassLoader} and deleted when it is closed. + */ + public static PostDelegationClassLoader buildClassLoader(String... resourceNames) { + List tempFiles = new ArrayList<>(resourceNames.length); URL[] urls = Arrays.stream(resourceNames) .map(BaseCassandraBridgeFactory::copyClassResourceToFile) .map(jar -> { + tempFiles.add(jar.toPath()); try { return jar.toURI().toURL(); @@ -115,7 +154,7 @@ public static ClassLoader buildClassLoader(String... resourceNames) } }).toArray(URL[]::new); - return new PostDelegationClassLoader(urls, Thread.currentThread().getContextClassLoader()); + return new PostDelegationClassLoader(urls, Thread.currentThread().getContextClassLoader(), tempFiles); } public static File copyClassResourceToFile(String resource) @@ -127,6 +166,7 @@ public static File copyClassResourceToFile(String resource) throw new NullPointerException("Could not find resource: " + resource); } Path jarPath = Files.createTempFile(null, ".jar"); + jarPath.toFile().deleteOnExit(); Files.copy(contents, jarPath, StandardCopyOption.REPLACE_EXISTING); return jarPath.toFile(); } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java index 63e11cfea..c0ce4505b 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/PostDelegationClassLoader.java @@ -19,8 +19,15 @@ package org.apache.cassandra.bridge; +import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -28,14 +35,18 @@ /** * This custom implementation of a {@link ClassLoader} enables deferred execution-time loading of a particular version * of class hierarchy from one of many embedded the {@code cassandra-all} library JARs. It first attempts to load any - * requested class from the extracted JAR, and resorts to using the parent class loader when the the class is not there. + * requested class from the extracted JAR, and resorts to using the parent class loader when the class is not there. * This behavior is opposite to the one of standard {@link URLClassLoader}, which invokes its parent class loader first. */ public class PostDelegationClassLoader extends URLClassLoader { - public PostDelegationClassLoader(@NotNull URL[] urls, @Nullable ClassLoader parent) + private static final Logger LOGGER = LoggerFactory.getLogger(PostDelegationClassLoader.class); + private final List tempFiles; + + public PostDelegationClassLoader(@NotNull URL[] urls, @Nullable ClassLoader parent, @NotNull List tempFiles) { super(urls, parent); + this.tempFiles = tempFiles; } @Override @@ -66,4 +77,30 @@ protected synchronized Class loadClass(@Nullable String name, boolean resolve } return type; } + + /** + * Closes this classloader and deletes all associated temp JAR files. + */ + @Override + public void close() throws IOException + { + try + { + super.close(); + } + finally + { + for (Path tempFile : tempFiles) + { + try + { + Files.deleteIfExists(tempFile); + } + catch (IOException e) + { + LOGGER.warn("Failed to delete temp JAR file: {}", tempFile, e); + } + } + } + } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java index cd50347c6..77f8f8814 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java @@ -27,7 +27,6 @@ import java.util.Objects; import java.util.Random; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -77,20 +76,6 @@ public abstract class AbstractCdcBridgeImplementation extends CdcBridge { - private static final TableIdLookup INTERNAL_TABLE_ID_LOOKUP = new TableIdLookup() - { - @Nullable - public UUID lookup(String keyspace, String table) throws NoSuchElementException - { - TableMetadata tm = Schema.instance.getTableMetadata(keyspace, table); - if (tm == null) - { - throw new NoSuchElementException(); - } - return tm.id.asUUID(); - } - }; - public void log(CqlTable cqlTable, CommitLogInstance log, Row row, long timestamp) { log(TimeProvider.DEFAULT, cqlTable, log, row, timestamp); @@ -101,9 +86,21 @@ public CommitLogInstance createCommitLogInstance(Path path) return new FourZeroCommitLog(path); } + /** + * Returns a TableIdLookup that resolves table IDs via the current Schema instance. + * Creates a new lambda on each call to avoid permanently pinning Schema.instance + * to a static field, which would prevent GC of the entire schema graph. + */ public TableIdLookup internalTableIdLookup() { - return INTERNAL_TABLE_ID_LOOKUP; + return (keyspace, table) -> { + TableMetadata tm = Schema.instance.getTableMetadata(keyspace, table); + if (tm == null) + { + throw new NoSuchElementException(); + } + return tm.id.asUUID(); + }; } public void updateCdcSchema(@NotNull Set cdcTables, @NotNull Partitioner partitioner, @NotNull TableIdLookup tableIdLookup)