From 2a64438a7c0395ba1850ac509c97685f048c5470 Mon Sep 17 00:00:00 2001 From: Sam Tunnicliffe Date: Thu, 19 Mar 2026 13:07:08 +0000 Subject: [PATCH] Apply changes to gossip state for the local node synchronously, remote peers asynchronously Patch by Sam Tunnicliffe; reviewed by XXX for CASSANDRA-21239 --- .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 20 +++ .../service/accord/AccordService.java | 2 +- .../apache/cassandra/tcm/CMSOperations.java | 12 ++ .../cassandra/tcm/CMSOperationsMBean.java | 3 + .../tcm/listeners/ChangeListener.java | 2 - .../tcm/listeners/LegacyStateListener.java | 164 ++++++++++++------ .../listeners/UpgradeMigrationListener.java | 33 +++- .../apache/cassandra/tcm/log/LocalLog.java | 21 +-- .../cms/PrepareCMSReconfiguration.java | 1 + .../InProgressSequenceCoordinationTest.java | 27 +++ 11 files changed, 207 insertions(+), 79 deletions(-) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 0ac9c691a0b4..a27df7d184c0 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -1500,6 +1500,7 @@ public static void log(Config config) public volatile DurationSpec.LongMillisecondsBound progress_barrier_backoff = new DurationSpec.LongMillisecondsBound("1000ms"); public volatile DurationSpec.LongSecondsBound discovery_timeout = new DurationSpec.LongSecondsBound("30s"); public boolean unsafe_tcm_mode = false; + public boolean legacy_state_listener_sync_local_updates = true; public enum TriggersPolicy { diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 59ffb9f02afa..7789d09885ed 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -6162,6 +6162,26 @@ public static boolean getUnsafeTCMMode() return conf.unsafe_tcm_mode; } + public static boolean getLegacyStateListenerSyncLocalUpdates() + { + return conf.legacy_state_listener_sync_local_updates; + } + + public static void setLegacyStateListenerSyncLocalUpdates(boolean sync) + { + if (sync != conf.legacy_state_listener_sync_local_updates) + { + logger.info("Changing processing mode of state updates to the local node in LegacyStateListener from {} to {}", + sync ? "async" : "sync", sync ? "sync" : "async"); + conf.legacy_state_listener_sync_local_updates = sync; + } + else + { + logger.info("Not changing processing mode of state updates to the local node in LegacyStateListener, already set to {}", + sync ? "sync" : "async"); + } + } + public static int getSaiSSTableIndexesPerQueryWarnThreshold() { return conf.sai_sstable_indexes_per_query_warn_threshold; diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 2c5a12ccfc82..f7b64f4f9825 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -161,7 +161,7 @@ public class AccordService implements IAccordService, Shutdownable { - public static class MetadataChangeListener implements ChangeListener.Async + public static class MetadataChangeListener implements ChangeListener { // Listener is initialized before Accord is initialized public static MetadataChangeListener instance = new MetadataChangeListener(); diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java b/src/java/org/apache/cassandra/tcm/CMSOperations.java index d4b496e1acb7..aa49a8ac3bf8 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperations.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java @@ -280,6 +280,18 @@ public Map> dumpLog(long startEpoch, long endEpoch) return convertToStringValues(log); } + @Override + public boolean getLegacyStateListenerSyncLocalUpdates() + { + return DatabaseDescriptor.getLegacyStateListenerSyncLocalUpdates(); + } + + @Override + public void setLegacyStateListenerSyncLocalUpdates(boolean sync) + { + DatabaseDescriptor.setLegacyStateListenerSyncLocalUpdates(sync); + } + private Map> convertToStringValues(Map> log) { Map> res = new LinkedHashMap<>(); diff --git a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java index 6656634a6417..efdf28267e87 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java @@ -50,4 +50,7 @@ public interface CMSOperationsMBean public Map> dumpLog(long startEpoch, long endEpoch); public void resumeDropAccordTable(String tableId); + + public boolean getLegacyStateListenerSyncLocalUpdates(); + public void setLegacyStateListenerSyncLocalUpdates(boolean sync); } diff --git a/src/java/org/apache/cassandra/tcm/listeners/ChangeListener.java b/src/java/org/apache/cassandra/tcm/listeners/ChangeListener.java index 3f06d1353182..0cffab1450fa 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/ChangeListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/ChangeListener.java @@ -37,6 +37,4 @@ default void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next, boolean */ default void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot) {} - interface Async extends ChangeListener {} - } diff --git a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java index 708094a361ff..54f43cf2a142 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java @@ -29,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.virtual.PeersTable; @@ -54,7 +56,7 @@ import static org.apache.cassandra.tcm.membership.NodeState.MOVING; import static org.apache.cassandra.tcm.membership.NodeState.REGISTERED; -public class LegacyStateListener implements ChangeListener.Async +public class LegacyStateListener implements ChangeListener { private static final Logger logger = LoggerFactory.getLogger(LegacyStateListener.class); @@ -75,52 +77,96 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean changed.add(node); } - for (InetAddressAndPort remove : removedAddr) + // next.myNodeId() can be null during replay (before we have registered) but if it is present and + // there is a relevant change to the state of the local node, process that synchronously. + if (next.myNodeId() != null && changed.contains(next.myNodeId())) { - GossipHelper.removeFromGossip(remove); - GossipHelper.evictFromMembership(remove); - PeersTable.removeFromSystemPeersTables(remove); + // Default is to process updates for the local node synchronously, overridable via config/hotprop + if (DatabaseDescriptor.getLegacyStateListenerSyncLocalUpdates()) + processChangesToLocalState(prev, next, next.myNodeId()); + else + ScheduledExecutors.optionalTasks.submit(() -> processChangesToLocalState(prev, next, next.myNodeId())); + + changed.remove(next.myNodeId()); } - for (NodeId change : changed) + // Schedule async processing of changes to peers and removing unregistered nodes (potentially including the + // local node). + ScheduledExecutors.optionalTasks.submit(() -> { + processRemovedNodes(removedAddr); + processChangesToRemotePeers(prev, next, changed); + }); + } + + private void processChangesToLocalState(ClusterMetadata prev, ClusterMetadata next, NodeId localId) + { + logger.info("Processing changes to local node state {} for epoch {}->{}", localId, prev.epoch.getEpoch(), next.epoch.getEpoch()); + Collection tokensForGossip = next.tokenMap.tokens(localId); + NodeState state = next.directory.peerState(localId); + switch (state) { - // next.myNodeId() can be null during replay (before we have registered) - if (next.myNodeId() != null && next.myNodeId().equals(change)) - { - switch (next.directory.peerState(change)) + case BOOTSTRAPPING: + case BOOT_REPLACING: + // For compatibility with clients, ensure we set TOKENS for bootstrapping nodes in gossip. + // As these are not yet added to the token map they must be extracted from the in progress sequence. + tokensForGossip = GossipHelper.getTokensFromOperation(localId, next); + if (state == BOOTSTRAPPING && prev.directory.peerState(localId) != BOOTSTRAPPING) { - case BOOTSTRAPPING: - if (prev.directory.peerState(change) != BOOTSTRAPPING) - { - // legacy log messages for tests - logger.info("JOINING: Starting to bootstrap"); - logger.info("JOINING: calculation complete, ready to bootstrap"); - } - break; - case BOOT_REPLACING: - case REGISTERED: - break; - case JOINED: - SystemKeyspace.updateTokens(next.directory.endpoint(change), next.tokenMap.tokens(change)); - // needed if we miss the REGISTERED above; Does nothing if we are already in epStateMap: - Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); - StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false) - .filter(cfs -> Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName())) - .forEach(cfs -> cfs.indexManager.executePreJoinTasksBlocking(true)); - if (prev.directory.peerState(change) == MOVING) - logger.info("Node {} state jump to NORMAL", next.directory.endpoint(change)); - break; + // legacy log messages for tests + logger.info("JOINING: Starting to bootstrap"); + logger.info("JOINING: calculation complete, ready to bootstrap"); } - // Maybe intitialise local epstate whatever the node state because we could be processing after a - // replay and so may have not seen any previous local states, making this the first mutation of gossip - // state for the local node. - Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); - Gossiper.instance.addLocalApplicationState(SCHEMA, StorageService.instance.valueFactory.schema(next.schema.getVersion())); - // if the local node's location has changed, update system.local. - if (!next.directory.location(change).equals(prev.directory.location(change))) - SystemKeyspace.updateLocation(next.directory.location(change)); - } + break; + case JOINED: + tokensForGossip = next.tokenMap.tokens(localId); + SystemKeyspace.updateTokens(next.directory.endpoint(localId), tokensForGossip); + StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false) + .filter(cfs -> Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName())) + .forEach(cfs -> cfs.indexManager.executePreJoinTasksBlocking(true)); + NodeState previousState = prev.directory.peerState(localId); + if (previousState == MOVING) + { + logger.info("Node {} state jump to NORMAL", next.directory.endpoint(localId)); + } + else if (previousState == BOOT_REPLACING) + { + // legacy log message for compatibility (& tests) + MultiStepOperation sequence = prev.inProgressSequences.get(localId); + if (sequence != null && sequence.kind() == MultiStepOperation.Kind.REPLACE) + { + logCompletedReplacement(prev.directory, (BootstrapAndReplace) sequence); + tokensForGossip = GossipHelper.getTokensFromOperation(sequence); + } + } + break; + case MOVING: + logger.debug("Node {} state MOVING, tokens {}", next.directory.endpoint(localId), prev.tokenMap.tokens(localId)); + tokensForGossip = next.tokenMap.tokens(localId); + break; + case LEFT: + tokensForGossip = prev.tokenMap.tokens(localId); + break; + } + // Maybe initialise local epstate whatever the node state because we could be processing after a + // replay and so may have not seen any previous local states, making this the first mutation of gossip + // state for the local node. + Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); + Gossiper.instance.addLocalApplicationState(SCHEMA, StorageService.instance.valueFactory.schema(next.schema.getVersion())); + // Pull node properties from cluster metadata into gossip, except if the node is only in the REGISTERED state + // as that has no equivalent gossip STATUS + if (state != REGISTERED) + Gossiper.instance.mergeNodeToGossip(localId, next, tokensForGossip); + // if the local node's location has changed, update system.local. + if (!next.directory.location(localId).equals(prev.directory.location(localId))) + SystemKeyspace.updateLocation(next.directory.location(localId)); + } + + private void processChangesToRemotePeers(ClusterMetadata prev, ClusterMetadata next, Set changed) + { + for (NodeId change : changed) + { + logger.info("Processing changes to peer {} for epoch {}->{}", change, prev.epoch.getEpoch(), next.epoch.getEpoch()); if (next.directory.peerState(change) == REGISTERED) { // Re-establish any connections made prior to this node registering @@ -155,21 +201,11 @@ else if (NodeState.isBootstrap(next.directory.peerState(change))) } else if (prev.directory.peerState(change) == BOOT_REPLACING) { - // legacy log message for compatibility (& tests) MultiStepOperation sequence = prev.inProgressSequences.get(change); if (sequence != null && sequence.kind() == MultiStepOperation.Kind.REPLACE) { - BootstrapAndReplace replace = (BootstrapAndReplace) sequence; - InetAddressAndPort replaced = prev.directory.endpoint(replace.startReplace.replaced()); - InetAddressAndPort replacement = prev.directory.endpoint(change); - Collection tokens = GossipHelper.getTokensFromOperation(replace); - logger.info("Node {} will complete replacement of {} for tokens {}", replacement, replaced, tokens); - if (!replacement.equals(replaced)) - { - for (Token token : tokens) - logger.warn("Token {} changing ownership from {} to {}", token, replaced, replacement); - } - Gossiper.instance.mergeNodeToGossip(change, next, tokens); + logCompletedReplacement(prev.directory, (BootstrapAndReplace) sequence); + Gossiper.instance.mergeNodeToGossip(change, next, GossipHelper.getTokensFromOperation(sequence)); PeersTable.updateLegacyPeerTable(change, prev, next); } } @@ -181,6 +217,30 @@ else if (prev.directory.peerState(change) == BOOT_REPLACING) } } + private void processRemovedNodes(Set removed) + { + for (InetAddressAndPort remove : removed) + { + GossipHelper.removeFromGossip(remove); + GossipHelper.evictFromMembership(remove); + PeersTable.removeFromSystemPeersTables(remove); + } + } + + private void logCompletedReplacement(Directory directory, BootstrapAndReplace sequence) + { + // legacy log message for compatibility (& tests) + InetAddressAndPort replaced = directory.endpoint(sequence.startReplace.replaced()); + InetAddressAndPort replacement = directory.endpoint(sequence.startReplace.replacement()); + Collection tokens = GossipHelper.getTokensFromOperation(sequence); + logger.info("Node {} will complete replacement of {} for tokens {}", replacement, replaced, tokens); + if (!replacement.equals(replaced)) + { + for (Token token : tokens) + logger.warn("Token {} changing ownership from {} to {}", token, replaced, replacement); + } + } + private boolean directoryEntryChangedFor(NodeId nodeId, Directory prev, Directory next) { return prev.peerState(nodeId) != next.peerState(nodeId) || diff --git a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java index 67931af1779c..e66e36a4d2ad 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java @@ -24,18 +24,39 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.utils.CassandraVersion; -public class UpgradeMigrationListener implements ChangeListener.Async +/** + * For handling changes in Cassandra version. + * One use case is to react to the initial migration from Gossip based metadata in Cassandra 5.0 and earlier. When + * a node first transitions to using ClusterMetadataService, this listener will update its gossip state with the new + * nodeId based hostId and ensure that is propagated. + * + * Another use is for evolving distributed system tables, this listener can identify when a new Cassandra version has + * been deployed across the cluster and provides a hook to take actions such as creating new internal tables etc. + */ +public class UpgradeMigrationListener implements ChangeListener { private static final Logger logger = LoggerFactory.getLogger(UpgradeMigrationListener.class); public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot) { - if (!prev.epoch.equals(Epoch.UPGRADE_GOSSIP)) + if (prev.epoch.equals(Epoch.UPGRADE_GOSSIP)) + { + logger.info("Detected upgrade from gossip mode, updating my host id in gossip to {}", next.myNodeId()); + Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next); + if (Gossiper.instance.getQuarantineDisabled()) + Gossiper.instance.clearQuarantinedEndpoints(); return; + } - logger.info("Detected upgrade from gossip mode, updating my host id in gossip to {}", next.myNodeId()); - Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next); - if (Gossiper.instance.getQuarantineDisabled()) - Gossiper.instance.clearQuarantinedEndpoints(); + CassandraVersion prevMinVersion = prev.directory.clusterMinVersion.cassandraVersion; + CassandraVersion minVersion = next.directory.clusterMinVersion.cassandraVersion; + if (prevMinVersion.compareTo(minVersion) == 0 || (prev.epoch.is(Epoch.EMPTY) && fromSnapshot)) + { + // nothing to do if the min version in the cluster has not changed + // likewise, we don't need to trigger if applying a snapshot to a previously empty cluster metadata for e.g. + // when replaying at startup + logger.debug("Cluster min version has not changed, nothing to do"); + } } } diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 0da8e6715b6d..4b7760e002cd 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -121,7 +121,6 @@ public static class LogSpec private final Set listeners = new HashSet<>(); private final Set changeListeners = new HashSet<>(); - private final Set asyncChangeListeners = new HashSet<>(); private LogSpec() { @@ -157,7 +156,7 @@ public LogSpec loadSSTables(boolean loadSSTables) public LogSpec withDefaultListeners(boolean withDefaultListeners) { if (withDefaultListeners && - !(listeners.isEmpty() && changeListeners.isEmpty() && asyncChangeListeners.isEmpty())) + !(listeners.isEmpty() && changeListeners.isEmpty())) { throw new IllegalStateException("LogSpec can only require all listeners OR specific listeners"); } @@ -178,10 +177,7 @@ public LogSpec withListener(ChangeListener listener) { if (defaultListeners) throw new IllegalStateException("LogSpec can only require all listeners OR specific listeners"); - if (listener instanceof ChangeListener.Async) - asyncChangeListeners.add((ChangeListener.Async) listener); - else - changeListeners.add(listener); + changeListeners.add(listener); return this; } @@ -257,7 +253,6 @@ public final LocalLog createLog() protected final LogStorage storage; protected final Set listeners; protected final Set changeListeners; - protected final Set asyncChangeListeners; protected final LogSpec spec; // for testing - used to inject filters which cause entries to be dropped before appending @@ -277,7 +272,6 @@ private LocalLog(LogSpec logSpec) this.storage = logSpec.storage; listeners = Sets.newConcurrentHashSet(); changeListeners = Sets.newConcurrentHashSet(); - asyncChangeListeners = Sets.newConcurrentHashSet(); entryFilters = Lists.newCopyOnWriteArrayList(); } @@ -596,10 +590,7 @@ public void addListener(LogListener listener) public void addListener(ChangeListener listener) { - if (listener instanceof ChangeListener.Async) - this.asyncChangeListeners.add((ChangeListener.Async) listener); - else - this.changeListeners.add(listener); + this.changeListeners.add(listener); } public void removeListener(ChangeListener listener) @@ -619,16 +610,12 @@ private void notifyPreCommit(ClusterMetadata before, ClusterMetadata after, bool { for (ChangeListener listener : changeListeners) listener.notifyPreCommit(before, after, fromSnapshot); - for (ChangeListener.Async listener : asyncChangeListeners) - ScheduledExecutors.optionalTasks.submit(() -> listener.notifyPreCommit(before, after, fromSnapshot)); } private void notifyPostCommit(ClusterMetadata before, ClusterMetadata after, boolean fromSnapshot) { for (ChangeListener listener : changeListeners) listener.notifyPostCommit(before, after, fromSnapshot); - for (ChangeListener.Async listener : asyncChangeListeners) - ScheduledExecutors.optionalTasks.submit(() -> listener.notifyPostCommit(before, after, fromSnapshot)); } public void addFilter(Predicate filter) @@ -680,7 +667,6 @@ public ClusterMetadata ready() throws StartupException logger.info("Adding specified listeners to LocalLog"); spec.listeners.forEach(this::addListener); spec.changeListeners.forEach(this::addListener); - spec.asyncChangeListeners.forEach(this::addListener); } logger.info("Notifying all registered listeners of both pre and post commit event"); @@ -917,7 +903,6 @@ protected void addListeners() { listeners.clear(); changeListeners.clear(); - asyncChangeListeners.clear(); addListener(snapshotListener()); addListener(new InitializationListener()); diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java index e94292831e4b..18c7a23471f2 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java @@ -81,6 +81,7 @@ protected Transformation.Result executeInternal(ClusterMetadata prev, Function cms = prev.fullCMSMembers().stream().map(prev.directory::peerId).collect(Collectors.toSet()); Set tmp = new HashSet<>(cms); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java index 94558ac66efc..a365d1bc361b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java @@ -22,7 +22,9 @@ import java.util.concurrent.Callable; import java.util.function.BiFunction; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.apache.cassandra.distributed.Cluster; @@ -53,6 +55,7 @@ import org.apache.cassandra.utils.concurrent.Condition; import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACE_ADDRESS_FIRST_BOOT; +import static org.apache.cassandra.config.CassandraRelevantProperties.TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE; import static org.apache.cassandra.distributed.Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN; import static org.apache.cassandra.distributed.Constants.KEY_DTEST_FULL_STARTUP; import static org.apache.cassandra.distributed.shared.ClusterUtils.addInstance; @@ -65,6 +68,30 @@ public class InProgressSequenceCoordinationTest extends FuzzTestBase { + private boolean skipReconfiguation; + + @Before + public void setup() + { + // Skip the automatic, speculative CMS reconfiguration after join/replace operations. + // These can cause tests to run long or hang as peers start to shutdown while they are + // in flight. If the node initiating the reconfiguration sees the current CMS node as + // DOWN, a reconfiguration is triggered which cannot be be completed in a timely fashion. + // Previously, this was hidden because LegacyStateListener would queue up gossip tasks + // on a separate executor and the backlog of these prevented the failure detector from + // marking any nodes DOWN. Since making LegacyStateListener run synchronously on the + // log follower thread, the joining node does see the CMS node as DOWN and so tries to + // perform a reconfiguration. + skipReconfiguation = TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.getBoolean(); + TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.setBoolean(true); + } + + @After + public void tearDown() + { + TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.setBoolean(skipReconfiguation); + } + @Test public void bootstrapProgressTest() throws Throwable {