diff --git a/doc/modules/cassandra/pages/architecture/cql-on-accord.adoc b/doc/modules/cassandra/pages/architecture/cql-on-accord.adoc
index 4ba46c1e79af..6b100a8229e3 100644
--- a/doc/modules/cassandra/pages/architecture/cql-on-accord.adoc
+++ b/doc/modules/cassandra/pages/architecture/cql-on-accord.adoc
@@ -549,11 +549,10 @@ replay only makes a single attempt to replay before converting the batch
contents to hints. If part of the batch was routed to Accord then there
is no node to hint so there is a fake node that a hint is written to and
when that hint is dispatched it will be split and then executed
-appropriately. In https://issues.apache.org/jira/browse/CASSANDRA-20588[CASSANDRA-20588] this needs to be simplified to writing the
-entire batch through Accord if any part of it should be written through
-Accord because it also addresses an atomicity issue with single token
-batches which can be torn when part is applied through Accord and part
-is applied through Cassandra.
+appropriately. Single token batch that span both Accord and non-Accord tables
+are written to the batch log to preserve all or nothing application.
+This incurs an additional performance cost because normally these
+single token batches would not be written to the batch log.
Hints can be for multiple tables some of which may be Accord and some
non-Accord so splitting occurs. It's also possible a hint will be for an
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 8a38387d8882..302d923436c5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -78,6 +78,7 @@
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.TimestampSource;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -87,6 +88,7 @@
import static java.util.function.Predicate.isEqual;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.isSingleTokenStatementSpanningAccordAndNonAccordTables;
/**
* A BATCH statement parsed from a CQL query.
@@ -541,7 +543,10 @@ private void executeWithoutConditions(List extends IMutation> mutations, Consi
updatePartitionsPerBatchMetrics(mutations.size());
- boolean mutateAtomic = (isLogged() && mutations.size() > 1);
+ // We special case single token batch statements that span both Accord and non-Accord
+ // tables to go through the batch log in order to preserve all or nothing application
+ // see CASSANDRA-20588 for more details
+ boolean mutateAtomic = (isLogged() && mutations.size() > 1) || (isSingleTokenStatementSpanningAccordAndNonAccordTables(ClusterMetadata.current(), mutations.get(0)));
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, requestTime, preserveTimestamp);
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(mutations);
}
diff --git a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
index 9fc8f6ba284a..c8118139357b 100644
--- a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
+++ b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
@@ -204,6 +204,29 @@ public static void splitMutationsIntoAccordAndNormal(Clust
}
}
+ public static boolean isSingleTokenStatementSpanningAccordAndNonAccordTables(ClusterMetadata cm, T mutation)
+ {
+ if (mutation.potentialTxnConflicts().allowed || mutation.getTableIds().size() == 1)
+ return false;
+
+ Token token = mutation.key().getToken();
+
+ boolean containsAccordMutation = false;
+ boolean containsNormalMutation = false;
+
+ for (TableId tableId : mutation.getTableIds())
+ {
+ boolean test = tokenShouldBeWrittenThroughAccord(cm, tableId, token, TransactionalMode::nonSerialWritesThroughAccord, TransactionalMigrationFromMode::nonSerialWritesThroughAccord);
+ containsAccordMutation = containsAccordMutation || test;
+ containsNormalMutation = containsNormalMutation || !test;
+
+ if (containsAccordMutation && containsNormalMutation)
+ return true;
+ }
+
+ return false;
+ }
+
/**
* Result of splitting a mutation across Accord and non-transactional boundaries
*/
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index dbf6d6553f64..62bc252c3c0e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -53,7 +53,10 @@
import accord.topology.TopologyException;
import org.apache.cassandra.config.Config.PaxosVariant;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.ast.AssignmentOperator;
import org.apache.cassandra.cql3.ast.Literal;
import org.apache.cassandra.cql3.ast.Mutation;
@@ -64,6 +67,8 @@
import org.apache.cassandra.cql3.ast.Txn;
import org.apache.cassandra.cql3.functions.types.utils.Bytes;
import org.apache.cassandra.cql3.statements.TransactionStatement;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
@@ -79,6 +84,9 @@
import org.apache.cassandra.distributed.test.sai.SAIUtil;
import org.apache.cassandra.distributed.util.QueryResultUtil;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordTestUtils;
import org.apache.cassandra.service.consensus.TransactionalMode;
@@ -330,6 +338,79 @@ public void testPartitionMultiRowReturn() throws Exception
});
}
+ @Test
+ public void testSinglePartitionKeyBatch() throws Throwable
+ {
+ List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';',
+ "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}",
+ "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH " + transactionalMode.asCqlParam(),
+ "CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)");
+
+ test(ddls, cluster -> {
+ cluster.coordinator(1).execute("BEGIN BATCH\n" +
+ "INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (1, 2);\n" +
+ "INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (1, 3);\n" +
+ "APPLY BATCH;", ConsistencyLevel.ONE);
+
+ SimpleQueryResult r1 = cluster.coordinator(1).executeWithResult("SELECT * FROM " + qualifiedAccordTableName + " WHERE k = 1", ConsistencyLevel.ONE);
+ SimpleQueryResult r2 = cluster.coordinator(1).executeWithResult("SELECT * FROM " + qualifiedRegularTableName + " WHERE k = 1", ConsistencyLevel.ONE);
+
+ assertEquals(1, r1.toObjectArrays().length);
+ assertEquals(1, r2.toObjectArrays().length);
+ });
+ }
+
+ @Test
+ public void testSinglePartitionKeyBatchWrittenToBatchLog() throws Throwable
+ {
+ DatabaseDescriptor.daemonInitialization();
+ List ddls = Arrays.asList("DROP KEYSPACE IF EXISTS " + KEYSPACE + ';',
+ "CREATE KEYSPACE " + KEYSPACE + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}",
+ "CREATE TABLE " + qualifiedAccordTableName + " (k int PRIMARY KEY, v int) WITH " + transactionalMode.asCqlParam(),
+ "CREATE TABLE " + qualifiedRegularTableName + " (k int PRIMARY KEY, v int)");
+
+ test(ddls, cluster -> {
+ pauseHints();
+ blockMutationAndPreAccept(cluster);
+ try
+ {
+ cluster.coordinator(1).execute("BEGIN BATCH\n" +
+ "INSERT INTO " + qualifiedAccordTableName + " (k, v) VALUES (1, 2);\n" +
+ "INSERT INTO " + qualifiedRegularTableName + " (k, v) VALUES (1, 3);\n" +
+ "APPLY BATCH;", ConsistencyLevel.ALL);
+ fail("Should have thrown WTE");
+ }
+ catch (Throwable t)
+ {
+ assertEquals(t.getClass().getName(), WriteTimeoutException.class.getName());
+ }
+
+ if (transactionalMode.nonSerialWritesThroughAccord)
+ cluster.get(1).runOnInstance(() -> {
+ String query = String.format("SELECT id, mutations, version FROM %s.%s",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
+ SystemKeyspace.BATCHES);
+
+ Iterator r = QueryProcessor.executeInternal(query).iterator();
+ assert (r.hasNext());
+ UntypedResultSet.Row row = r.next();
+
+ int version = row.getInt("version");
+ List serializedMutations = row.getList("mutations", BytesType.instance);
+ assertEquals(1, serializedMutations.size());
+
+ try (DataInputBuffer in = new DataInputBuffer(serializedMutations.get(0), true))
+ {
+ assertEquals(2, org.apache.cassandra.db.Mutation.serializer.deserialize(in, version).getPartitionUpdates().size());
+ }
+ catch (Exception e)
+ {
+ logger.info("Deserialization failed");
+ }
+ });
+ });
+ }
+
@Test
public void testSaiMultiRowReturn() throws Exception
{