Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,7 @@ public static void log(Config config)
public volatile DurationSpec.LongMillisecondsBound progress_barrier_backoff = new DurationSpec.LongMillisecondsBound("1000ms");
public volatile DurationSpec.LongSecondsBound discovery_timeout = new DurationSpec.LongSecondsBound("30s");
public boolean unsafe_tcm_mode = false;
public boolean legacy_state_listener_sync_local_updates = true;

public enum TriggersPolicy
{
Expand Down
20 changes: 20 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6162,6 +6162,26 @@ public static boolean getUnsafeTCMMode()
return conf.unsafe_tcm_mode;
}

public static boolean getLegacyStateListenerSyncLocalUpdates()
{
return conf.legacy_state_listener_sync_local_updates;
}

public static void setLegacyStateListenerSyncLocalUpdates(boolean sync)
{
if (sync != conf.legacy_state_listener_sync_local_updates)
{
logger.info("Changing processing mode of state updates to the local node in LegacyStateListener from {} to {}",
sync ? "async" : "sync", sync ? "sync" : "async");
conf.legacy_state_listener_sync_local_updates = sync;
}
else
{
logger.info("Not changing processing mode of state updates to the local node in LegacyStateListener, already set to {}",
sync ? "sync" : "async");
}
}

public static int getSaiSSTableIndexesPerQueryWarnThreshold()
{
return conf.sai_sstable_indexes_per_query_warn_threshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@

public class AccordService implements IAccordService, Shutdownable
{
public static class MetadataChangeListener implements ChangeListener.Async
public static class MetadataChangeListener implements ChangeListener
{
// Listener is initialized before Accord is initialized
public static MetadataChangeListener instance = new MetadataChangeListener();
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/tcm/CMSOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ public Map<Long, Map<String, String>> dumpLog(long startEpoch, long endEpoch)
return convertToStringValues(log);
}

@Override
public boolean getLegacyStateListenerSyncLocalUpdates()
{
return DatabaseDescriptor.getLegacyStateListenerSyncLocalUpdates();
}

@Override
public void setLegacyStateListenerSyncLocalUpdates(boolean sync)
{
DatabaseDescriptor.setLegacyStateListenerSyncLocalUpdates(sync);
}

private Map<Long, Map<String, String>> convertToStringValues(Map<Long, Map<String, Object>> log)
{
Map<Long, Map<String, String>> res = new LinkedHashMap<>();
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ public interface CMSOperationsMBean
public Map<Long, Map<String, String>> dumpLog(long startEpoch, long endEpoch);

public void resumeDropAccordTable(String tableId);

public boolean getLegacyStateListenerSyncLocalUpdates();
public void setLegacyStateListenerSyncLocalUpdates(boolean sync);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,4 @@ default void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next, boolean
*/
default void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot) {}

interface Async extends ChangeListener {}

}
164 changes: 112 additions & 52 deletions src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.virtual.PeersTable;
Expand All @@ -54,7 +56,7 @@
import static org.apache.cassandra.tcm.membership.NodeState.MOVING;
import static org.apache.cassandra.tcm.membership.NodeState.REGISTERED;

public class LegacyStateListener implements ChangeListener.Async
public class LegacyStateListener implements ChangeListener
{
private static final Logger logger = LoggerFactory.getLogger(LegacyStateListener.class);

Expand All @@ -75,52 +77,96 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean
changed.add(node);
}

for (InetAddressAndPort remove : removedAddr)
// next.myNodeId() can be null during replay (before we have registered) but if it is present and
// there is a relevant change to the state of the local node, process that synchronously.
if (next.myNodeId() != null && changed.contains(next.myNodeId()))
{
GossipHelper.removeFromGossip(remove);
GossipHelper.evictFromMembership(remove);
PeersTable.removeFromSystemPeersTables(remove);
// Default is to process updates for the local node synchronously, overridable via config/hotprop
if (DatabaseDescriptor.getLegacyStateListenerSyncLocalUpdates())
processChangesToLocalState(prev, next, next.myNodeId());
else
ScheduledExecutors.optionalTasks.submit(() -> processChangesToLocalState(prev, next, next.myNodeId()));

changed.remove(next.myNodeId());
}

for (NodeId change : changed)
// Schedule async processing of changes to peers and removing unregistered nodes (potentially including the
// local node).
ScheduledExecutors.optionalTasks.submit(() -> {
processRemovedNodes(removedAddr);
processChangesToRemotePeers(prev, next, changed);
});
}

private void processChangesToLocalState(ClusterMetadata prev, ClusterMetadata next, NodeId localId)
{
logger.info("Processing changes to local node state {} for epoch {}->{}", localId, prev.epoch.getEpoch(), next.epoch.getEpoch());
Collection<Token> tokensForGossip = next.tokenMap.tokens(localId);
NodeState state = next.directory.peerState(localId);
switch (state)
{
// next.myNodeId() can be null during replay (before we have registered)
if (next.myNodeId() != null && next.myNodeId().equals(change))
{
switch (next.directory.peerState(change))
case BOOTSTRAPPING:
case BOOT_REPLACING:
// For compatibility with clients, ensure we set TOKENS for bootstrapping nodes in gossip.
// As these are not yet added to the token map they must be extracted from the in progress sequence.
tokensForGossip = GossipHelper.getTokensFromOperation(localId, next);
if (state == BOOTSTRAPPING && prev.directory.peerState(localId) != BOOTSTRAPPING)
{
case BOOTSTRAPPING:
if (prev.directory.peerState(change) != BOOTSTRAPPING)
{
// legacy log messages for tests
logger.info("JOINING: Starting to bootstrap");
logger.info("JOINING: calculation complete, ready to bootstrap");
}
break;
case BOOT_REPLACING:
case REGISTERED:
break;
case JOINED:
SystemKeyspace.updateTokens(next.directory.endpoint(change), next.tokenMap.tokens(change));
// needed if we miss the REGISTERED above; Does nothing if we are already in epStateMap:
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false)
.filter(cfs -> Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName()))
.forEach(cfs -> cfs.indexManager.executePreJoinTasksBlocking(true));
if (prev.directory.peerState(change) == MOVING)
logger.info("Node {} state jump to NORMAL", next.directory.endpoint(change));
break;
// legacy log messages for tests
logger.info("JOINING: Starting to bootstrap");
logger.info("JOINING: calculation complete, ready to bootstrap");
}
// Maybe intitialise local epstate whatever the node state because we could be processing after a
// replay and so may have not seen any previous local states, making this the first mutation of gossip
// state for the local node.
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
Gossiper.instance.addLocalApplicationState(SCHEMA, StorageService.instance.valueFactory.schema(next.schema.getVersion()));
// if the local node's location has changed, update system.local.
if (!next.directory.location(change).equals(prev.directory.location(change)))
SystemKeyspace.updateLocation(next.directory.location(change));
}
break;
case JOINED:
tokensForGossip = next.tokenMap.tokens(localId);
SystemKeyspace.updateTokens(next.directory.endpoint(localId), tokensForGossip);
StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false)
.filter(cfs -> Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName()))
.forEach(cfs -> cfs.indexManager.executePreJoinTasksBlocking(true));
NodeState previousState = prev.directory.peerState(localId);
if (previousState == MOVING)
{
logger.info("Node {} state jump to NORMAL", next.directory.endpoint(localId));
}
else if (previousState == BOOT_REPLACING)
{
// legacy log message for compatibility (& tests)
MultiStepOperation<?> sequence = prev.inProgressSequences.get(localId);
if (sequence != null && sequence.kind() == MultiStepOperation.Kind.REPLACE)
{
logCompletedReplacement(prev.directory, (BootstrapAndReplace) sequence);
tokensForGossip = GossipHelper.getTokensFromOperation(sequence);
}
}
break;
case MOVING:
logger.debug("Node {} state MOVING, tokens {}", next.directory.endpoint(localId), prev.tokenMap.tokens(localId));
tokensForGossip = next.tokenMap.tokens(localId);
break;
case LEFT:
tokensForGossip = prev.tokenMap.tokens(localId);
break;
}

// Maybe initialise local epstate whatever the node state because we could be processing after a
// replay and so may have not seen any previous local states, making this the first mutation of gossip
// state for the local node.
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
Gossiper.instance.addLocalApplicationState(SCHEMA, StorageService.instance.valueFactory.schema(next.schema.getVersion()));
// Pull node properties from cluster metadata into gossip, except if the node is only in the REGISTERED state
// as that has no equivalent gossip STATUS
if (state != REGISTERED)
Gossiper.instance.mergeNodeToGossip(localId, next, tokensForGossip);
// if the local node's location has changed, update system.local.
if (!next.directory.location(localId).equals(prev.directory.location(localId)))
SystemKeyspace.updateLocation(next.directory.location(localId));
}

private void processChangesToRemotePeers(ClusterMetadata prev, ClusterMetadata next, Set<NodeId> changed)
{
for (NodeId change : changed)
{
logger.info("Processing changes to peer {} for epoch {}->{}", change, prev.epoch.getEpoch(), next.epoch.getEpoch());
if (next.directory.peerState(change) == REGISTERED)
{
// Re-establish any connections made prior to this node registering
Expand Down Expand Up @@ -155,21 +201,11 @@ else if (NodeState.isBootstrap(next.directory.peerState(change)))
}
else if (prev.directory.peerState(change) == BOOT_REPLACING)
{
// legacy log message for compatibility (& tests)
MultiStepOperation<?> sequence = prev.inProgressSequences.get(change);
if (sequence != null && sequence.kind() == MultiStepOperation.Kind.REPLACE)
{
BootstrapAndReplace replace = (BootstrapAndReplace) sequence;
InetAddressAndPort replaced = prev.directory.endpoint(replace.startReplace.replaced());
InetAddressAndPort replacement = prev.directory.endpoint(change);
Collection<Token> tokens = GossipHelper.getTokensFromOperation(replace);
logger.info("Node {} will complete replacement of {} for tokens {}", replacement, replaced, tokens);
if (!replacement.equals(replaced))
{
for (Token token : tokens)
logger.warn("Token {} changing ownership from {} to {}", token, replaced, replacement);
}
Gossiper.instance.mergeNodeToGossip(change, next, tokens);
logCompletedReplacement(prev.directory, (BootstrapAndReplace) sequence);
Gossiper.instance.mergeNodeToGossip(change, next, GossipHelper.getTokensFromOperation(sequence));
PeersTable.updateLegacyPeerTable(change, prev, next);
}
}
Expand All @@ -181,6 +217,30 @@ else if (prev.directory.peerState(change) == BOOT_REPLACING)
}
}

private void processRemovedNodes(Set<InetAddressAndPort> removed)
{
for (InetAddressAndPort remove : removed)
{
GossipHelper.removeFromGossip(remove);
GossipHelper.evictFromMembership(remove);
PeersTable.removeFromSystemPeersTables(remove);
}
}

private void logCompletedReplacement(Directory directory, BootstrapAndReplace sequence)
{
// legacy log message for compatibility (& tests)
InetAddressAndPort replaced = directory.endpoint(sequence.startReplace.replaced());
InetAddressAndPort replacement = directory.endpoint(sequence.startReplace.replacement());
Collection<Token> tokens = GossipHelper.getTokensFromOperation(sequence);
logger.info("Node {} will complete replacement of {} for tokens {}", replacement, replaced, tokens);
if (!replacement.equals(replaced))
{
for (Token token : tokens)
logger.warn("Token {} changing ownership from {} to {}", token, replaced, replacement);
}
}

private boolean directoryEntryChangedFor(NodeId nodeId, Directory prev, Directory next)
{
return prev.peerState(nodeId) != next.peerState(nodeId) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,39 @@
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.utils.CassandraVersion;

public class UpgradeMigrationListener implements ChangeListener.Async
/**
* For handling changes in Cassandra version.
* One use case is to react to the initial migration from Gossip based metadata in Cassandra 5.0 and earlier. When
* a node first transitions to using ClusterMetadataService, this listener will update its gossip state with the new
* nodeId based hostId and ensure that is propagated.
*
* Another use is for evolving distributed system tables, this listener can identify when a new Cassandra version has
* been deployed across the cluster and provides a hook to take actions such as creating new internal tables etc.
*/
public class UpgradeMigrationListener implements ChangeListener
{
private static final Logger logger = LoggerFactory.getLogger(UpgradeMigrationListener.class);
public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean fromSnapshot)
{
if (!prev.epoch.equals(Epoch.UPGRADE_GOSSIP))
if (prev.epoch.equals(Epoch.UPGRADE_GOSSIP))
{
logger.info("Detected upgrade from gossip mode, updating my host id in gossip to {}", next.myNodeId());
Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next);
if (Gossiper.instance.getQuarantineDisabled())
Gossiper.instance.clearQuarantinedEndpoints();
return;
}

logger.info("Detected upgrade from gossip mode, updating my host id in gossip to {}", next.myNodeId());
Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next);
if (Gossiper.instance.getQuarantineDisabled())
Gossiper.instance.clearQuarantinedEndpoints();
CassandraVersion prevMinVersion = prev.directory.clusterMinVersion.cassandraVersion;
CassandraVersion minVersion = next.directory.clusterMinVersion.cassandraVersion;
if (prevMinVersion.compareTo(minVersion) == 0 || (prev.epoch.is(Epoch.EMPTY) && fromSnapshot))
{
// nothing to do if the min version in the cluster has not changed
// likewise, we don't need to trigger if applying a snapshot to a previously empty cluster metadata for e.g.
// when replaying at startup
logger.debug("Cluster min version has not changed, nothing to do");
}
}
}
Loading