Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions doc/modules/cassandra/pages/architecture/cql-on-accord.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -549,11 +549,10 @@ replay only makes a single attempt to replay before converting the batch
contents to hints. If part of the batch was routed to Accord then there
is no node to hint so there is a fake node that a hint is written to and
when that hint is dispatched it will be split and then executed
appropriately. In https://issues.apache.org/jira/browse/CASSANDRA-20588[CASSANDRA-20588] this needs to be simplified to writing the
entire batch through Accord if any part of it should be written through
Accord because it also addresses an atomicity issue with single token
batches which can be torn when part is applied through Accord and part
is applied through Cassandra.
appropriately. Single token batch that span both Accord and non-Accord tables
are written to the batch log to preserve all or nothing application.
This incurs an additional performance cost because normally these
single token batches would not be written to the batch log.

Hints can be for multiple tables some of which may be Accord and some
non-Accord so splitting occurs. It's also possible a hint will be for an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.TimestampSource;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.transport.messages.ResultMessage;
Expand All @@ -87,6 +88,7 @@

import static java.util.function.Predicate.isEqual;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.isSingleTokenStatementSpanningAccordAndNonAccordTables;

/**
* A <code>BATCH</code> statement parsed from a CQL query.
Expand Down Expand Up @@ -541,7 +543,10 @@ private void executeWithoutConditions(List<? extends IMutation> mutations, Consi

updatePartitionsPerBatchMetrics(mutations.size());

boolean mutateAtomic = (isLogged() && mutations.size() > 1);
// We special case single token batch statements that span both Accord and non-Accord
// tables to go through the batch log in order to preserve all or nothing application
// see CASSANDRA-20588 for more details
boolean mutateAtomic = (isLogged() && mutations.size() > 1) || (isSingleTokenStatementSpanningAccordAndNonAccordTables(ClusterMetadata.current(), mutations.get(0)));
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, requestTime, preserveTimestamp);
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(mutations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,29 @@ public static <T extends IMutation> void splitMutationsIntoAccordAndNormal(Clust
}
}

public static <T extends IMutation> boolean isSingleTokenStatementSpanningAccordAndNonAccordTables(ClusterMetadata cm, T mutation)
{
if (mutation.potentialTxnConflicts().allowed || mutation.getTableIds().size() == 1)
return false;

Token token = mutation.key().getToken();

boolean containsAccordMutation = false;
boolean containsNormalMutation = false;

for (TableId tableId : mutation.getTableIds())
{
boolean test = tokenShouldBeWrittenThroughAccord(cm, tableId, token, TransactionalMode::nonSerialWritesThroughAccord, TransactionalMigrationFromMode::nonSerialWritesThroughAccord);
containsAccordMutation = containsAccordMutation || test;
containsNormalMutation = containsNormalMutation || !test;

if (containsAccordMutation && containsNormalMutation)
return true;
}

return false;
}

/**
* Result of splitting a mutation across Accord and non-transactional boundaries
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@
import accord.topology.TopologyException;

import org.apache.cassandra.config.Config.PaxosVariant;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.ast.AssignmentOperator;
import org.apache.cassandra.cql3.ast.Literal;
import org.apache.cassandra.cql3.ast.Mutation;
Expand All @@ -64,6 +67,8 @@
import org.apache.cassandra.cql3.ast.Txn;
import org.apache.cassandra.cql3.functions.types.utils.Bytes;
import org.apache.cassandra.cql3.statements.TransactionStatement;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
Expand All @@ -79,6 +84,9 @@
import org.apache.cassandra.distributed.test.sai.SAIUtil;
import org.apache.cassandra.distributed.util.QueryResultUtil;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordTestUtils;
import org.apache.cassandra.service.consensus.TransactionalMode;
Expand Down Expand Up @@ -330,6 +338,79 @@ public void testPartitionMultiRowReturn() throws Exception
});
}

@Test
public void testSinglePartitionKeyBatch() throws Throwable
{
List<String> ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';',
"CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}",
"CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH " + transactionalMode.asCqlParam(),
"CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)");

test(ddls, cluster -> {
cluster.coordinator(1).execute("BEGIN BATCH\n" +
"INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (1, 2);\n" +
"INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (1, 3);\n" +
"APPLY BATCH;", ConsistencyLevel.ONE);

SimpleQueryResult r1 = cluster.coordinator(1).executeWithResult("SELECT * FROM " + qualifiedAccordTableName + " WHERE k = 1", ConsistencyLevel.ONE);
SimpleQueryResult r2 = cluster.coordinator(1).executeWithResult("SELECT * FROM " + qualifiedRegularTableName + " WHERE k = 1", ConsistencyLevel.ONE);

assertEquals(1, r1.toObjectArrays().length);
assertEquals(1, r2.toObjectArrays().length);
});
}

@Test
public void testSinglePartitionKeyBatchWrittenToBatchLog() throws Throwable
{
DatabaseDescriptor.daemonInitialization();
List<String> ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';',
"CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}",
"CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH " + transactionalMode.asCqlParam(),
"CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)");

test(ddls, cluster -> {
pauseHints();
blockMutationAndPreAccept(cluster);
try
{
cluster.coordinator(1).execute("BEGIN BATCH\n" +
"INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (1, 2);\n" +
"INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (1, 3);\n" +
"APPLY BATCH;", ConsistencyLevel.ALL);
fail("Should have thrown WTE");
}
catch (Throwable t)
{
assertEquals(t.getClass().getName(), WriteTimeoutException.class.getName());
}

if (transactionalMode.nonSerialWritesThroughAccord)
cluster.get(1).runOnInstance(() -> {
String query = String.format("SELECT id, mutations, version FROM %s.%s",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.BATCHES);

Iterator<UntypedResultSet.Row> r = QueryProcessor.executeInternal(query).iterator();
assert (r.hasNext());
UntypedResultSet.Row row = r.next();

int version = row.getInt("version");
List<ByteBuffer> serializedMutations = row.getList("mutations", BytesType.instance);
assertEquals(1, serializedMutations.size());

try (DataInputBuffer in = new DataInputBuffer(serializedMutations.get(0), true))
{
assertEquals(2, org.apache.cassandra.db.Mutation.serializer.deserialize(in, version).getPartitionUpdates().size());
}
catch (Exception e)
{
logger.info("Deserialization failed");
}
});
});
}

@Test
public void testSaiMultiRowReturn() throws Exception
{
Expand Down