Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
import org.apache.cassandra.db.virtual.VirtualTable;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.SSTableReadsListener;
Expand Down Expand Up @@ -121,7 +122,8 @@ protected SinglePartitionReadCommand(Epoch serializedAtEpoch,
DataRange dataRange)
{
super(serializedAtEpoch, Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, potentialTxnConflicts, metadata, nowInSec, columnFilter, rowFilter, limits, indexQueryPlan, trackWarnings, dataRange);
assert partitionKey.getPartitioner() == metadata.partitioner;
assert IPartitioner.equivalent(partitionKey.getPartitioner(), metadata.partitioner) : String.format("Mismatching partitioners for key (%s) and table metadata (%s)",
partitionKey.getPartitioner(), metadata.partitioner);
this.partitionKey = partitionKey;
this.clusteringIndexFilter = clusteringIndexFilter;
}
Expand Down
9 changes: 9 additions & 0 deletions src/java/org/apache/cassandra/dht/IPartitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ static IPartitioner global()
return DatabaseDescriptor.getPartitioner();
}

static boolean equivalent(IPartitioner p1, IPartitioner p2)
{
if (p1 == p2) return true;
if (p1.getClass() != p2.getClass()) return false;
if (p1.getClass() == LocalPartitioner.class)
return ((LocalPartitioner)p1).comparator == ((LocalPartitioner)p2).comparator;
return true;
}

/**
* @return a new instance of a reusable key
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test.tcm;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.util.concurrent.Uninterruptibles;

import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;

import org.awaitility.Awaitility;
import org.junit.Test;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.index.internal.CassandraIndexSearcher;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.tcm.Epoch;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.junit.Assert.assertEquals;

public class IndexTablePartitionerAfterForceSnapshotTest extends TestBaseImpl
{
@Test
public void indexQueryAfterSnapshotTest() throws IOException, InterruptedException
{
try (Cluster cluster = Cluster.build(3)
.withInstanceInitializer(BBInstaller::install)
.withConfig(config -> config.with(GOSSIP, NETWORK))
.start())
{
init(cluster);
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.schemaChange(withKeyspace("create index iii2 on %s.tbl (v)"));
for (int i = 0; i < 10000; i++)
cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?)"), ConsistencyLevel.ALL, 1, i, i % 5);

// drop metadata change replication messages from node 1 to nodes 2 & 3
cluster.filters().verbs(Verb.TCM_REPLICATION.id,
Verb.TCM_FETCH_CMS_LOG_RSP.id,
Verb.TCM_FETCH_PEER_LOG_RSP.id,
Verb.TCM_CURRENT_EPOCH_REQ.id)
.from(1)
.drop()
.on();

// node1 makes some metadata changes interspersed with snapshots. When the message filters are dropped, this
// will cause nodes 2 & 3 to try and catchup. We want multiple snapshots to be taken to ensure that the
// catchup responses contain one, rather than just a list of entries.
cluster.get(1).nodetoolResult("cms", "snapshot").asserts().success();
cluster.get(1).nodetoolResult("cms", "snapshot").asserts().success();
final Epoch epoch = ClusterUtils.getCurrentEpoch(cluster.get(1));
// start executing
AtomicBoolean stop = new AtomicBoolean();
AtomicInteger queryFailures = new AtomicInteger();
Thread t = new Thread(() -> {
while (!stop.get())
{
try
{
cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl where pk=1 and v=4"), ConsistencyLevel.ALL);
}
catch (Throwable e)
{
queryFailures.incrementAndGet();
}
}
});
t.start();
TimeUnit.SECONDS.sleep(1);
// drop the filters and wait for nodes 2 & 3 to catch up
cluster.filters().reset();
Awaitility.waitAtMost(30, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(() -> ClusterUtils.getCurrentEpoch(cluster.get(2)).isEqualOrAfter(epoch) &&
ClusterUtils.getCurrentEpoch(cluster.get(3)).isEqualOrAfter(epoch));

// give it another second of querying
TimeUnit.SECONDS.sleep(1);
stop.set(true);
t.join();
assertEquals(0, queryFailures.get());
}
}

public static class BBInstaller
{
public static void install(ClassLoader classLoader, int inst)
{
if (inst == 1)
return;
new ByteBuddy().rebase(CassandraIndexSearcher.class)
.method(named("queryIndex"))
.intercept(MethodDelegation.to(BBInterceptor.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
}
}

public static class BBInterceptor
{
public static UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadExecutionController executionController, @SuperCall Callable<UnfilteredRowIterator> zuper) throws Exception
{
// this makes it more likely that we decorate the key with one partitioner and execute the SinglePartitionReadCommand with another
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
return zuper.call();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.upgrade;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import com.google.common.util.concurrent.Uninterruptibles;

import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;

import org.junit.Test;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.index.internal.CassandraIndexSearcher;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.junit.Assert.assertEquals;

public class ClusterMetadata2iInitializeTest extends UpgradeTestBase
{
@Test
public void initializeCMSWithConcurrentIndexReadsTest() throws Throwable
{
Consumer<UpgradeableCluster.Builder > builderUpdater = builder -> builder.withInstanceInitializer(BBInstaller::install);
new TestCase()
.nodes(3)
.withConfig((cfg) -> cfg.with(Feature.GOSSIP))
.withBuilder(builderUpdater)
.upgradesToCurrentFrom(v41)
.setup((cluster) -> {
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.schemaChange(withKeyspace("create index iii2 on %s.tbl (v)"));
for (int i = 0; i < 10000; i++)
cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?)"), ConsistencyLevel.ALL, 1, i, i%5);
})
.runAfterClusterUpgrade((cluster) -> {
AtomicBoolean stop = new AtomicBoolean();
AtomicInteger queryFailures = new AtomicInteger();
Thread t = new Thread(() -> {
while (!stop.get())
{
try
{
cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl where pk=1 and v=4"), ConsistencyLevel.ALL);
}
catch (Throwable e)
{
queryFailures.incrementAndGet();
}
}
});
t.start();
cluster.get(1).nodetoolResult("cms", "initialize").asserts().success();
stop.set(true);
t.join();
assertEquals(0, queryFailures.get());
}).run();
}

public static class BBInstaller
{
public static void install(ClassLoader classLoader, int inst)
{
if (inst == 1)
return;
new ByteBuddy().rebase(CassandraIndexSearcher.class)
.method(named("queryIndex"))
.intercept(MethodDelegation.to(BBInterceptor.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
}
}

public static class BBInterceptor
{
public static UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadExecutionController executionController, @SuperCall Callable<UnfilteredRowIterator> zuper) throws Exception
{
// this makes it more likely that we decorate the key with one partitioner and execute the SinglePartitionReadCommand with another
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
return zuper.call();
}
}
}