Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 13 additions & 34 deletions src/java/org/apache/cassandra/batchlog/BatchlogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.locator.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,7 +47,6 @@
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.UntypedResultSet.Row;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SystemKeyspace;
Expand All @@ -58,20 +58,13 @@
import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.PreserveTimestamp;
Expand Down Expand Up @@ -607,22 +600,21 @@ private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(fin
String ks = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
ClusterMetadata metadata = ClusterMetadata.current();
KeyspaceMetadata keyspaceMetadata = metadata.schema.getKeyspaceMetadata(ks);
Keyspace keyspace = Keyspace.open(ks);

// TODO: this logic could do with revisiting at some point, as it is unclear what its rationale is
// we perform a local write, ignoring errors and inline in this thread (potentially slowing replay down)
// effectively bumping CL for locally owned writes and also potentially stalling log replay if an error occurs
// once we decide how it should work, it can also probably be simplified, and avoid constructing a ReplicaPlan directly
ReplicaLayout.ForTokenWrite allReplias = ReplicaLayout.forTokenWriteLiveAndDown(metadata, keyspaceMetadata, tk);
ReplicaPlan.ForWrite replicaPlan = forReplayMutation(metadata, Keyspace.open(ks), tk);
ReplicaLayout.ForTokenWrite allReplicas = ReplicaLayout.forTokenWriteLiveAndDown(metadata, keyspace.getMetadata(), tk);
CoordinationPlan.ForWrite replayPlan = CoordinationPlan.forReplayMutation(metadata, keyspace, tk);

Replica selfReplica = allReplias.all().selfIfPresent();
Replica selfReplica = allReplicas.all().selfIfPresent();
if (selfReplica != null)
mutation.apply();

for (Replica replica : allReplias.all())
for (Replica replica : allReplicas.all())
{
if (replica == selfReplica || replicaPlan.liveAndDown().contains(replica))
if (replica == selfReplica || replayPlan.replicas().liveAndDown().contains(replica))
continue;

UUID hostId = metadata.directory.peerId(replica.endpoint()).toUUID();
Expand All @@ -633,26 +625,13 @@ private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(fin
}
}

ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(replicaPlan, mutation, Dispatcher.RequestTime.forImmediateExecution());
ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(replayPlan, () -> mutation, Dispatcher.RequestTime.forImmediateExecution());
Message<Mutation> message = Message.outWithFlag(MUTATION_REQ, mutation, MessageFlag.CALL_BACK_ON_FAILURE);
for (Replica replica : replicaPlan.liveAndDown())
for (Replica replica : replayPlan.replicas().liveAndDown())
MessagingService.instance().sendWriteWithCallback(message, replica, handler);
return handler;
}

public static ReplicaPlan.ForWrite forReplayMutation(ClusterMetadata metadata, Keyspace keyspace, Token token)
{
ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(metadata, keyspace.getMetadata(), token);
Replicas.temporaryAssertFull(liveAndDown.all()); // TODO in CASSANDRA-14549

Replica selfReplica = liveAndDown.all().selfIfPresent();
ReplicaLayout.ForTokenWrite liveRemoteOnly = liveAndDown.filter(r -> FailureDetector.isReplicaAlive.test(r) && r != selfReplica);

return new ReplicaPlan.ForWrite(keyspace, liveAndDown.replicationStrategy(),
ConsistencyLevel.ONE, liveRemoteOnly.pending(), liveRemoteOnly.all(), liveRemoteOnly.all(), liveRemoteOnly.all(),
(cm) -> forReplayMutation(cm, keyspace, token),
metadata.epoch);
}
private static int gcgs(Collection<Mutation> mutations)
{
int gcgs = Integer.MAX_VALUE;
Expand All @@ -670,16 +649,16 @@ private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<
private final Set<InetAddressAndPort> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());

// TODO: should we be hinting here, since presumably batch log will retry? Maintaining historical behaviour for the moment.
ReplayWriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime)
ReplayWriteResponseHandler(CoordinationPlan.ForWrite coordinationPlan, Supplier<Mutation> hintOnFailure, Dispatcher.RequestTime requestTime)
{
super(replicaPlan, null, WriteType.UNLOGGED_BATCH, hintOnFailure, requestTime);
Iterables.addAll(undelivered, replicaPlan.contacts().endpoints());
super(coordinationPlan, null, WriteType.UNLOGGED_BATCH, hintOnFailure, requestTime);
Iterables.addAll(undelivered, replicaPlan().contacts().endpoints());
}

@Override
protected int blockFor()
{
return this.replicaPlan.contacts().size();
return this.replicaPlan().contacts().size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ public enum CassandraRelevantProperties
*/
SAI_VECTOR_SEARCH_ORDER_CHUNK_SIZE("cassandra.sai.vector_search.order_chunk_size", "100000"),

/**
* enables additional correctness checks in the satellite datacenter replication strategy
*/
SATELLITE_REPLICATION_ADDITIONAL_CHECKS("cassandra.satellite_replication_addl_checks", "true"),

SCHEMA_UPDATE_HANDLER_FACTORY_CLASS("cassandra.schema.update_handler_factory.class"),
SEARCH_CONCURRENCY_FACTOR("cassandra.search_concurrency_factor", "1"),

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.SatelliteFailoverState;
import org.apache.cassandra.locator.SatelliteReplicationStrategy;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.tcm.ClusterMetadata;

/**
* Verb handler for PAXOS2_COMMIT_REMOTE_REQ that wraps {@link MutationVerbHandler} with a failover state check
* for SatelliteReplicationStrategy keyspaces.
*
* PAXOS2_COMMIT_REMOTE_REQ sends a normal {@link Mutation} that doesn't indicate it came from a paxos commit
* so the standard MutationVerbHandler has no awareness that it originated from a paxos commit. This wrapper
* rejects mutations during {@link SatelliteFailoverState.State#TRANSITION_ACK} to prevent stale paxos commits
* from being applied to satellite/secondary DCs during failover.
*/
public class PaxosCommitRemoteMutationVerbHandler implements IVerbHandler<Mutation>
{
public static final PaxosCommitRemoteMutationVerbHandler instance = new PaxosCommitRemoteMutationVerbHandler();
private static final Logger logger = LoggerFactory.getLogger(PaxosCommitRemoteMutationVerbHandler.class);

@Override
public void doVerb(Message<Mutation> message)
{
Mutation mutation = message.payload;
Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
AbstractReplicationStrategy strategy = keyspace.getReplicationStrategy();

if (strategy instanceof SatelliteReplicationStrategy)
{
SatelliteReplicationStrategy srs = (SatelliteReplicationStrategy) strategy;
ClusterMetadata metadata = ClusterMetadata.current();
SatelliteFailoverState.FailoverInfo failoverInfo = srs.getFailoverInfo(mutation.key().getToken(), metadata);
if (failoverInfo.getState() == SatelliteFailoverState.State.TRANSITION_ACK)
{
logger.debug("Rejecting PAXOS2_COMMIT_REMOTE_REQ for {} during TRANSITION_ACK", mutation.getKeyspaceName());
MessagingService.instance().respondWithFailure(RequestFailureReason.UNKNOWN, message);
return;
}
}

MutationVerbHandler.instance.doVerb(message);
}
}
56 changes: 38 additions & 18 deletions src/java/org/apache/cassandra/index/IndexStatusManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,36 @@ public class IndexStatusManager
*/
public final Map<InetAddressAndPort, Map<String, Index.Status>> peerIndexStatus = new HashMap<>();

private IndexStatusManager() {}
public Index.Status getUnqueryableStatus(String keyspace, InetAddressAndPort endpoint, Index.QueryPlan indexQueryPlan)
{
Index.Status result = null;
for (Index index : indexQueryPlan.getIndexes())
{
Index.Status status = getIndexStatus(endpoint, keyspace, index.getIndexMetadata().name);
if (result == null || !index.isQueryable(status))
result = status;
}
return result;
}

/**
* Remove endpoints whose indexes are not queryable for the specified {@link Index.QueryPlan}.
*
* @param liveEndpoints current live endpoints where non-queryable endpoints will be removed
* @param keyspace to be queried
* @param liveEndpoints current live endpoints where non-queryable endpoints will be removed
* @param keyspace to be queried
* @param indexQueryPlan index query plan used in the read command
* @param level consistency level of read command
*/
public <E extends Endpoints<E>> E filterForQuery(E liveEndpoints, Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel level)
public <E extends Endpoints<E>> E filterForQuery(E liveEndpoints, String keyspace, Index.QueryPlan indexQueryPlan, Map<InetAddressAndPort, Index.Status> indexStatusMap)
{
// UNKNOWN states are transient/rare; only a few replicas should have this state at any time. See CASSANDRA-19400
Set<Replica> queryableNonSucceeded = new HashSet<>(4);
Map<InetAddressAndPort, Index.Status> indexStatusMap = new HashMap<>();

E queryableEndpoints = liveEndpoints.filter(replica -> {

boolean allBuilt = true;
for (Index index : indexQueryPlan.getIndexes())
{
Index.Status status = getIndexStatus(replica.endpoint(), keyspace.getName(), index.getIndexMetadata().name);
Index.Status status = getIndexStatus(replica.endpoint(), keyspace, index.getIndexMetadata().name);
if (!index.isQueryable(status))
{
indexStatusMap.put(replica.endpoint(), status);
Expand All @@ -120,6 +128,28 @@ public <E extends Endpoints<E>> E filterForQuery(E liveEndpoints, Keyspace keysp
if (!queryableNonSucceeded.isEmpty() && queryableNonSucceeded.size() != queryableEndpoints.size())
queryableEndpoints = queryableEndpoints.sorted(Comparator.comparingInt(e -> queryableNonSucceeded.contains(e) ? 1 : -1));

return queryableEndpoints;
}


public void readFailureException(int filtered, Iterable<Replica> removed, Map<InetAddressAndPort, Index.Status> indexStatusMap, ConsistencyLevel level, int required)
{
Map<InetAddressAndPort, RequestFailureReason> failureReasons = new HashMap<>();
removed.forEach(replica -> {
Index.Status status = indexStatusMap.get(replica.endpoint());
if (status == Index.Status.FULL_REBUILD_STARTED)
failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_BUILD_IN_PROGRESS);
else
failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_NOT_AVAILABLE);
});

throw new ReadFailureException(level, filtered, required, false, failureReasons);
}

public <E extends Endpoints<E>> E filterForQueryOrThrow(E liveEndpoints, Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel level)
{
Map<InetAddressAndPort, Index.Status> indexStatusMap = new HashMap<>();
E queryableEndpoints = filterForQuery(liveEndpoints, keyspace.getName(), indexQueryPlan, indexStatusMap);
int initial = liveEndpoints.size();
int filtered = queryableEndpoints.size();

Expand All @@ -130,17 +160,7 @@ public <E extends Endpoints<E>> E filterForQuery(E liveEndpoints, Keyspace keysp
int required = level.blockFor(keyspace.getReplicationStrategy());
if (required <= initial && required > filtered)
{
Map<InetAddressAndPort, RequestFailureReason> failureReasons = new HashMap<>();
liveEndpoints.without(queryableEndpoints.endpoints())
.forEach(replica -> {
Index.Status status = indexStatusMap.get(replica.endpoint());
if (status == Index.Status.FULL_REBUILD_STARTED)
failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_BUILD_IN_PROGRESS);
else
failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_NOT_AVAILABLE);
});

throw new ReadFailureException(level, filtered, required, false, failureReasons);
readFailureException(filtered, liveEndpoints.without(queryableEndpoints.endpoints()), indexStatusMap, level, level.blockFor(keyspace.getReplicationStrategy()));
}
}

Expand Down
Loading