diff --git a/doc/modules/cassandra/pages/architecture/cql-on-accord.adoc b/doc/modules/cassandra/pages/architecture/cql-on-accord.adoc index 4ba46c1e79af..6b100a8229e3 100644 --- a/doc/modules/cassandra/pages/architecture/cql-on-accord.adoc +++ b/doc/modules/cassandra/pages/architecture/cql-on-accord.adoc @@ -549,11 +549,10 @@ replay only makes a single attempt to replay before converting the batch contents to hints. If part of the batch was routed to Accord then there is no node to hint so there is a fake node that a hint is written to and when that hint is dispatched it will be split and then executed -appropriately. In https://issues.apache.org/jira/browse/CASSANDRA-20588[CASSANDRA-20588] this needs to be simplified to writing the -entire batch through Accord if any part of it should be written through -Accord because it also addresses an atomicity issue with single token -batches which can be torn when part is applied through Accord and part -is applied through Cassandra. +appropriately. Single token batch that span both Accord and non-Accord tables +are written to the batch log to preserve all or nothing application. +This incurs an additional performance cost because normally these +single token batches would not be written to the batch log. Hints can be for multiple tables some of which may be Accord and some non-Accord so splitting occurs. It's also possible a hint will be for an diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 8a38387d8882..302d923436c5 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -78,6 +78,7 @@ import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.TimestampSource; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.transport.messages.ResultMessage; @@ -87,6 +88,7 @@ import static java.util.function.Predicate.isEqual; import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; +import static org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.isSingleTokenStatementSpanningAccordAndNonAccordTables; /** * A BATCH statement parsed from a CQL query. @@ -541,7 +543,10 @@ private void executeWithoutConditions(List mutations, Consi updatePartitionsPerBatchMetrics(mutations.size()); - boolean mutateAtomic = (isLogged() && mutations.size() > 1); + // We special case single token batch statements that span both Accord and non-Accord + // tables to go through the batch log in order to preserve all or nothing application + // see CASSANDRA-20588 for more details + boolean mutateAtomic = (isLogged() && mutations.size() > 1) || (isSingleTokenStatementSpanningAccordAndNonAccordTables(ClusterMetadata.current(), mutations.get(0))); StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, requestTime, preserveTimestamp); ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(mutations); } diff --git a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java index 9fc8f6ba284a..c8118139357b 100644 --- a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java +++ b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java @@ -204,6 +204,29 @@ public static void splitMutationsIntoAccordAndNormal(Clust } } + public static boolean isSingleTokenStatementSpanningAccordAndNonAccordTables(ClusterMetadata cm, T mutation) + { + if (mutation.potentialTxnConflicts().allowed || mutation.getTableIds().size() == 1) + return false; + + Token token = mutation.key().getToken(); + + boolean containsAccordMutation = false; + boolean containsNormalMutation = false; + + for (TableId tableId : mutation.getTableIds()) + { + boolean test = tokenShouldBeWrittenThroughAccord(cm, tableId, token, TransactionalMode::nonSerialWritesThroughAccord, TransactionalMigrationFromMode::nonSerialWritesThroughAccord); + containsAccordMutation = containsAccordMutation || test; + containsNormalMutation = containsNormalMutation || !test; + + if (containsAccordMutation && containsNormalMutation) + return true; + } + + return false; + } + /** * Result of splitting a mutation across Accord and non-transactional boundaries */ diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java index dbf6d6553f64..62bc252c3c0e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java @@ -53,7 +53,10 @@ import accord.topology.TopologyException; import org.apache.cassandra.config.Config.PaxosVariant; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.ast.AssignmentOperator; import org.apache.cassandra.cql3.ast.Literal; import org.apache.cassandra.cql3.ast.Mutation; @@ -64,6 +67,8 @@ import org.apache.cassandra.cql3.ast.Txn; import org.apache.cassandra.cql3.functions.types.utils.Bytes; import org.apache.cassandra.cql3.statements.TransactionStatement; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.MapType; @@ -79,6 +84,9 @@ import org.apache.cassandra.distributed.test.sai.SAIUtil; import org.apache.cassandra.distributed.util.QueryResultUtil; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.AccordTestUtils; import org.apache.cassandra.service.consensus.TransactionalMode; @@ -330,6 +338,79 @@ public void testPartitionMultiRowReturn() throws Exception }); } + @Test + public void testSinglePartitionKeyBatch() throws Throwable + { + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH " + transactionalMode.asCqlParam(), + "CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)"); + + test(ddls, cluster -> { + cluster.coordinator(1).execute("BEGIN BATCH\n" + + "INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (1, 2);\n" + + "INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (1, 3);\n" + + "APPLY BATCH;", ConsistencyLevel.ONE); + + SimpleQueryResult r1 = cluster.coordinator(1).executeWithResult("SELECT * FROM " + qualifiedAccordTableName + " WHERE k = 1", ConsistencyLevel.ONE); + SimpleQueryResult r2 = cluster.coordinator(1).executeWithResult("SELECT * FROM " + qualifiedRegularTableName + " WHERE k = 1", ConsistencyLevel.ONE); + + assertEquals(1, r1.toObjectArrays().length); + assertEquals(1, r2.toObjectArrays().length); + }); + } + + @Test + public void testSinglePartitionKeyBatchWrittenToBatchLog() throws Throwable + { + DatabaseDescriptor.daemonInitialization(); + List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';', + "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}", + "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH " + transactionalMode.asCqlParam(), + "CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)"); + + test(ddls, cluster -> { + pauseHints(); + blockMutationAndPreAccept(cluster); + try + { + cluster.coordinator(1).execute("BEGIN BATCH\n" + + "INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (1, 2);\n" + + "INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (1, 3);\n" + + "APPLY BATCH;", ConsistencyLevel.ALL); + fail("Should have thrown WTE"); + } + catch (Throwable t) + { + assertEquals(t.getClass().getName(), WriteTimeoutException.class.getName()); + } + + if (transactionalMode.nonSerialWritesThroughAccord) + cluster.get(1).runOnInstance(() -> { + String query = String.format("SELECT id, mutations, version FROM %s.%s", + SchemaConstants.SYSTEM_KEYSPACE_NAME, + SystemKeyspace.BATCHES); + + Iterator r = QueryProcessor.executeInternal(query).iterator(); + assert (r.hasNext()); + UntypedResultSet.Row row = r.next(); + + int version = row.getInt("version"); + List serializedMutations = row.getList("mutations", BytesType.instance); + assertEquals(1, serializedMutations.size()); + + try (DataInputBuffer in = new DataInputBuffer(serializedMutations.get(0), true)) + { + assertEquals(2, org.apache.cassandra.db.Mutation.serializer.deserialize(in, version).getPartitionUpdates().size()); + } + catch (Exception e) + { + logger.info("Deserialization failed"); + } + }); + }); + } + @Test public void testSaiMultiRowReturn() throws Exception {