diff --git a/CHANGES.txt b/CHANGES.txt index ce89887d..0d24aa57 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * Support per-instance sidecar port resolution in CDC client (CASSANALYTICS-130) * Pass SidecarCdcClient as a constructor parameter to avoid thread/resource leaks (CASSANALYTICS-142) * Support extended deletion time in CDC for Cassandra 5.0 * Flush event consumer before persisting CDC state to prevent data loss on failure (CASSANALYTICS-126) diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java index d8aa09ef..1a4a34ff 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.stream.Collectors; import o.a.c.sidecar.client.shaded.common.utils.HttpRange; @@ -45,6 +46,7 @@ import org.apache.cassandra.spark.utils.MapUtils; import org.apache.cassandra.spark.utils.ThrowableUtils; import org.apache.cassandra.spark.utils.streaming.StreamConsumer; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE; @@ -61,11 +63,23 @@ public class SidecarCdcClient implements AutoCloseable final ClientConfig config; final SidecarClient sidecarClient; final ICdcStats stats; + @NotNull + final Function portResolver; public SidecarCdcClient(ClientConfig clientConfig, CdcSidecarInstancesProvider instancesProvider, SecretsProvider secretsProvider, ICdcStats cdcStats) throws IOException + { + this(clientConfig, instancesProvider, secretsProvider, cdcStats, + ignored -> clientConfig.effectivePort()); + } + + public SidecarCdcClient(ClientConfig clientConfig, + CdcSidecarInstancesProvider instancesProvider, + SecretsProvider secretsProvider, + ICdcStats cdcStats, + @NotNull Function portResolver) throws IOException { this(clientConfig, Sidecar.from(new SimpleSidecarInstancesProvider(instancesProvider.instances() @@ -74,16 +88,19 @@ public SidecarCdcClient(ClientConfig clientConfig, .collect(Collectors.toList())), clientConfig.toGenericSidecarConfig(), secretsProvider), - cdcStats); + cdcStats, + portResolver); } - private SidecarCdcClient(ClientConfig config, - SidecarClient sidecarClient, - ICdcStats stats) + SidecarCdcClient(ClientConfig config, + SidecarClient sidecarClient, + ICdcStats stats, + @NotNull Function portResolver) { this.config = config; this.sidecarClient = sidecarClient; this.stats = stats; + this.portResolver = portResolver; } /** @@ -190,7 +207,7 @@ protected SidecarInstance toSidecarInstance(CassandraInstance instance) @Override public int port() { - return config.effectivePort(); + return portResolver.apply(instance.nodeName()); } @Override diff --git a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java index b8913492..77d540dc 100644 --- a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java +++ b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java @@ -19,13 +19,19 @@ package org.apache.cassandra.cdc.sidecar; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import o.a.c.sidecar.client.shaded.client.SidecarClient; +import o.a.c.sidecar.client.shaded.client.SidecarInstance; import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.api.TokenRangeSupplier; import org.apache.cassandra.cdc.stats.ICdcStats; +import org.apache.cassandra.spark.data.partitioner.CassandraInstance; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -35,6 +41,16 @@ */ public class SidecarCdcTest { + private SidecarClient mockSidecarClient; + private ICdcStats cdcStats; + + @BeforeEach + public void setup() + { + mockSidecarClient = mock(SidecarClient.class); + cdcStats = mock(ICdcStats.class); + } + @Test public void testBuilderMethodCreatesValidBuilder() { @@ -46,7 +62,6 @@ public void testBuilderMethodCreatesValidBuilder() SchemaSupplier schemaSupplier = mock(SchemaSupplier.class); TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class); SidecarCdcClient mockSidecarCdcClient = mock(SidecarCdcClient.class); - ICdcStats cdcStats = mock(ICdcStats.class); SidecarCdcBuilder builder = new SidecarCdcBuilder( jobId, @@ -65,4 +80,40 @@ public void testBuilderMethodCreatesValidBuilder() assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider); assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient); } + + @Test + public void testPerInstancePortResolution() + { + Map portMapping = Map.of("host1", 9043, "host2", 9044, "host3", 9045); + SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(); + + SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, + hostname -> portMapping.getOrDefault(hostname, 9043)); + + SidecarInstance si1 = client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")); + assertThat(si1.hostname()).isEqualTo("host1"); + assertThat(si1.port()).isEqualTo(9043); + + SidecarInstance si2 = client.toSidecarInstance(new CassandraInstance("100", "host2", "DC1")); + assertThat(si2.hostname()).isEqualTo("host2"); + assertThat(si2.port()).isEqualTo(9044); + + SidecarInstance si3 = client.toSidecarInstance(new CassandraInstance("200", "host3", "DC1")); + assertThat(si3.hostname()).isEqualTo("host3"); + assertThat(si3.port()).isEqualTo(9045); + } + + @Test + public void testFallbackToEffectivePortWhenHostNotFound() + { + SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(8888, 3, 100L); + Map portMapping = Map.of("host1", 9043); + + SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, + hostname -> portMapping.getOrDefault(hostname, + clientConfig.effectivePort())); + + assertThat(client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()).isEqualTo(9043); + assertThat(client.toSidecarInstance(new CassandraInstance("100", "unknown-host", "DC1")).port()).isEqualTo(8888); + } }