diff --git a/pom.xml b/pom.xml index 9d95527e976..100e86ba1ec 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 org.tikv tikv-client-java - 3.3.4-SNAPSHOT + 3.4.0 jar TiKV Java Client A Java Client for TiKV diff --git a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java index 72422736e76..2a281d3e92a 100644 --- a/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/ConcreteScanIterator.java @@ -76,7 +76,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException { try (RegionStoreClient client = builder.build(startKey)) { client.setTimeout(conf.getScanTimeout()); BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff(); - currentCache = client.scan(backOffer, startKey, version); + currentCache = client.scan(backOffer, startKey, rangeEndKey, version, keyOnly); // If we get region before scan, we will use region from cache which // may have wrong end key. This may miss some regions that split from old region. // Client will get the newest region during scan. So we need to diff --git a/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java b/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java index 69fd0217fd5..f5ef54384cc 100644 --- a/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java +++ b/src/main/java/org/tikv/common/operation/iterator/ScanIterator.java @@ -35,6 +35,7 @@ public abstract class ScanIterator implements Iterator { protected final RegionStoreClientBuilder builder; protected List currentCache; protected ByteString startKey; + protected ByteString rangeEndKey; protected int index = -1; protected int limit; protected boolean keyOnly; @@ -52,7 +53,8 @@ public abstract class ScanIterator implements Iterator { int limit, boolean keyOnly) { this.startKey = requireNonNull(startKey, "start key is null"); - this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null")); + this.rangeEndKey = requireNonNull(endKey, "end key is null"); + this.endKey = Key.toRawKey(this.rangeEndKey); this.hasEndKey = !endKey.isEmpty(); this.limit = limit; this.keyOnly = keyOnly; diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index 9a4ed807503..499001b918e 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -337,19 +337,35 @@ private List handleBatchGetResponse( public List scan( BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) { + return scan(backOffer, startKey, ByteString.EMPTY, version, keyOnly); + } + + public List scan( + BackOffer backOffer, ByteString startKey, ByteString endKey, long version, boolean keyOnly) { boolean forWrite = false; while (true) { Supplier request = - () -> - ScanRequest.newBuilder() - .setContext( - makeContext( - getResolvedLocks(version), this.storeType, backOffer.getSlowLog())) - .setStartKey(codec.encodeKey(startKey)) - .setVersion(version) - .setKeyOnly(keyOnly) - .setLimit(getConf().getScanBatchSize()) - .build(); + () -> { + ScanRequest.Builder b = + ScanRequest.newBuilder() + .setContext( + makeContext( + getResolvedLocks(version), this.storeType, backOffer.getSlowLog())) + .setVersion(version) + .setKeyOnly(keyOnly) + .setLimit(getConf().getScanBatchSize()); + + // API version matters here: v2 transactional keys are keyspace-prefixed; encodeRange + // applies that encoding to both bounds. v1 passes raw keys and only sets start on the + // RPC in this client path + if (getConf().getApiVersion().isV2()) { + Pair range = codec.encodeRange(startKey, endKey); + b.setStartKey(range.first).setEndKey(range.second); + } else { + b.setStartKey(codec.encodeKey(startKey)); + } + return b.build(); + }; KVErrorHandler handler = new KVErrorHandler<>( @@ -367,7 +383,9 @@ public List scan( region = regionManager.getRegionByKey(startKey, backOffer); if (handleScanResponse(backOffer, resp, version, forWrite)) { - return resp.getPairsList(); + // Return logical user-space keys (same as batchGet) so scan iterators, prefix bounds, and + // get() stay consistent in API v2; wire format includes the keyspace prefix. + return codec.decodeKvPairs(resp.getPairsList()); } } } diff --git a/src/test/java/org/tikv/common/KVMockServer.java b/src/test/java/org/tikv/common/KVMockServer.java index ea09270cfc7..41c7a53d3ed 100644 --- a/src/test/java/org/tikv/common/KVMockServer.java +++ b/src/test/java/org/tikv/common/KVMockServer.java @@ -43,6 +43,7 @@ import java.util.TreeMap; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tikv.common.key.Key; @@ -362,9 +363,13 @@ public void kvScan( } } else { SortedMap kvs = dataMap.tailMap(key); + Stream> entryStream = kvs.entrySet().stream(); + if (!request.getEndKey().isEmpty()) { + Key rangeEnd = toRawKey(request.getEndKey()); + entryStream = entryStream.filter(entry -> entry.getKey().compareTo(rangeEnd) < 0); + } builder.addAllPairs( - kvs.entrySet() - .stream() + entryStream .map( kv -> { Kvrpcpb.KvPair.Builder kvBuilder =