diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index eee779685e87..ac7ccf8d0a6f 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -59,7 +59,10 @@ import java.util.Collections; import java.util.EnumMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -321,6 +324,8 @@ static class Builder extends AbstractReadContext.Builder transactionIdFuture; + private final AtomicInteger pendingStarts = new AtomicInteger(0); + private static final long WAIT_FOR_INLINE_BEGIN_TIMEOUT_MILLIS = 60_000L; + private final Map channelHint; + private final Options.BeginTransactionOption beginTransactionOption; MultiUseReadOnlyTransaction(Builder builder) { super(builder); @@ -386,6 +402,7 @@ static Builder newBuilder() { session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE), session.getSpanner().getOptions().isGrpcGcpExtensionEnabled()); + this.beginTransactionOption = builder.beginTransactionOption; } @Override @@ -398,21 +415,68 @@ protected boolean isRouteToLeader() { return false; } + private boolean shouldUseInlinedBegin() { + return beginTransactionOption == Options.BeginTransactionOption.INLINE; + } + @Override void beforeReadOrQuery() { super.beforeReadOrQuery(); - initTransaction(); + if (shouldUseInlinedBegin()) { + // Keep the same nested transaction guard as the explicit BeginTransaction path. This checks + // TransactionRunner's thread-local pending state, not the session's active transaction. + SessionImpl.throwIfTransactionsPending(); + } else { + initTransaction(); + } } @Override @Nullable TransactionSelector getTransactionSelector() { - // No need for synchronization: super.readInternal() is always preceded by a check of - // "transactionId" that provides a happens-before from initialization, and the value is never - // changed afterwards. - @SuppressWarnings("GuardedByChecker") - TransactionSelector selector = TransactionSelector.newBuilder().setId(transactionId).build(); - return selector; + if (!shouldUseInlinedBegin()) { + // No need for synchronization: super.readInternal() is always preceded by a check of + // "transactionId" that provides a happens-before from initialization, and the value is + // never changed afterwards. + @SuppressWarnings("GuardedByChecker") + TransactionSelector selector = + TransactionSelector.newBuilder().setId(transactionId).build(); + return selector; + } + + ApiFuture futureToWaitFor = null; + txnLock.lock(); + try { + if (transactionId != null) { + return TransactionSelector.newBuilder().setId(transactionId).build(); + } + if (transactionIdFuture == null) { + transactionIdFuture = SettableApiFuture.create(); + return TransactionSelector.newBuilder() + .setBegin(createReadOnlyTransactionOptions()) + .build(); + } + futureToWaitFor = transactionIdFuture; + } finally { + txnLock.unlock(); + } + + try { + return TransactionSelector.newBuilder() + .setId(futureToWaitFor.get(WAIT_FOR_INLINE_BEGIN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) + .build(); + } catch (ExecutionException e) { + throw SpannerExceptionFactory.asSpannerException(e.getCause()); + } catch (TimeoutException e) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.DEADLINE_EXCEEDED, + "Timeout while waiting for an inlined read-only transaction to be returned by another" + + " statement.", + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e); + } } private void decrementPendingStartsAndSignal() { @@ -503,6 +567,80 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti bufferRows); } + @Override + public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) { + Timestamp readTimestamp = null; + if (transaction.hasReadTimestamp()) { + try { + readTimestamp = Timestamp.fromProto(transaction.getReadTimestamp()); + } catch (IllegalArgumentException e) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e); + } + } + if (shouldIncludeId && transaction.getId().isEmpty()) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG); + } + txnLock.lock(); + try { + if (timestamp == null) { + if (readTimestamp == null) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field"); + } + timestamp = readTimestamp; + } + if (shouldIncludeId && transactionId == null) { + transactionId = transaction.getId(); + if (transactionIdFuture != null && !transactionIdFuture.isDone()) { + transactionIdFuture.set(transactionId); + } + } + } finally { + txnLock.unlock(); + } + } + + @Override + public SpannerException onError( + SpannerException e, boolean withBeginTransaction, boolean lastStatement) { + e = super.onError(e, withBeginTransaction, lastStatement); + if (withBeginTransaction) { + failTransactionIdFuture(e); + } + return e; + } + + @Override + public void onDone(boolean withBeginTransaction) { + if (withBeginTransaction) { + failTransactionIdFuture( + SpannerExceptionFactory.newSpannerException( + ErrorCode.FAILED_PRECONDITION, + "ResultSet was closed before a read-only transaction id was returned")); + } + super.onDone(withBeginTransaction); + } + + @Override + void onStartFailed(boolean withBeginTransaction, Throwable t) { + if (withBeginTransaction) { + failTransactionIdFuture(t); + } + } + + private void failTransactionIdFuture(Throwable t) { + txnLock.lock(); + try { + if (transactionIdFuture != null && !transactionIdFuture.isDone()) { + transactionIdFuture.setException(t); + } + } finally { + txnLock.unlock(); + } + } + @Override public Timestamp getReadTimestamp() { txnLock.lock(); @@ -544,6 +682,19 @@ public void close() { super.close(); } + private TransactionOptions createReadOnlyTransactionOptions() { + TransactionOptions.Builder options = TransactionOptions.newBuilder(); + if (timestamp != null) { + options + .getReadOnlyBuilder() + .setReadTimestamp(timestamp.toProto()) + .setReturnReadTimestamp(true); + } else { + bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true); + } + return options.build(); + } + /** * Initializes the transaction with the timestamp specified within MultiUseReadOnlyTransaction. * This is used only for fallback of PartitionQueryRequest and PartitionReadRequest with @@ -553,19 +704,10 @@ void initFallbackTransaction() { txnLock.lock(); try { span.addAnnotation("Creating Transaction"); - TransactionOptions.Builder options = TransactionOptions.newBuilder(); - if (timestamp != null) { - options - .getReadOnlyBuilder() - .setReadTimestamp(timestamp.toProto()) - .setReturnReadTimestamp(true); - } else { - bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true); - } final BeginTransactionRequest request = BeginTransactionRequest.newBuilder() .setSession(session.getName()) - .setOptions(options) + .setOptions(createReadOnlyTransactionOptions()) .build(); initTransactionInternal(request); } finally { @@ -589,12 +731,10 @@ void initTransaction() { return; } span.addAnnotation("Creating Transaction"); - TransactionOptions.Builder options = TransactionOptions.newBuilder(); - bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true); final BeginTransactionRequest request = BeginTransactionRequest.newBuilder() .setSession(session.getName()) - .setOptions(options) + .setOptions(createReadOnlyTransactionOptions()) .build(); initTransactionInternal(request); } finally { @@ -976,15 +1116,22 @@ CloseableIterator startStream( if (selector != null) { request.setTransaction(selector); } - SpannerRpc.StreamingCall call = - rpc.executeQuery( - request.build(), - stream.consumer(), - getTransactionChannelHint(), - requestId, - isRouteToLeader()); + boolean withBeginTransaction = request.getTransaction().hasBegin(); + SpannerRpc.StreamingCall call; + try { + call = + rpc.executeQuery( + request.build(), + stream.consumer(), + getTransactionChannelHint(), + requestId, + isRouteToLeader()); + } catch (RuntimeException | Error t) { + onStartFailed(withBeginTransaction, t); + throw t; + } session.markUsed(clock.instant()); - stream.setCall(call, request.getTransaction().hasBegin()); + stream.setCall(call, withBeginTransaction); return stream; } @@ -1100,6 +1247,8 @@ public void onDone(boolean withBeginTransaction) { this.session.onReadDone(); } + void onStartFailed(boolean withBeginTransaction, Throwable t) {} + /** * For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be * present in the RPC response. In such cases, this method will be a no-op. @@ -1199,15 +1348,22 @@ CloseableIterator startStream( builder.setTransaction(selector); } builder.setRequestOptions(buildRequestOptions(readOptions)); - SpannerRpc.StreamingCall call = - rpc.read( - builder.build(), - stream.consumer(), - getTransactionChannelHint(), - requestId, - isRouteToLeader()); + boolean withBeginTransaction = builder.getTransaction().hasBegin(); + SpannerRpc.StreamingCall call; + try { + call = + rpc.read( + builder.build(), + stream.consumer(), + getTransactionChannelHint(), + requestId, + isRouteToLeader()); + } catch (RuntimeException | Error t) { + onStartFailed(withBeginTransaction, t); + throw t; + } session.markUsed(clock.instant()); - stream.setCall(call, /* withBeginTransaction= */ builder.getTransaction().hasBegin()); + stream.setCall(call, withBeginTransaction); return stream; } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 8b2dcc31786a..0293c7248d79 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -18,6 +18,7 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.ReadOnlyTransactionOption; import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; @@ -351,6 +352,33 @@ ServerStream batchWriteAtLeastOnce( */ ReadOnlyTransaction readOnlyTransaction(); + /** + * Returns a read-only transaction context in which multiple reads and/or queries can be performed + * using {@link TimestampBound#strong()} concurrency and the given read-only transaction options. + * + *

Options can include: + * + *

    + *
  • {@link Options#beginTransactionOption(Options.BeginTransactionOption)}: Controls whether + * the transaction is started by an explicit BeginTransaction RPC or by inlining + * BeginTransaction on the first read/query. Inlining can avoid one round trip for + * latency-sensitive transactions that start with a small or fast read/query. The default + * explicit BeginTransaction RPC can be preferable when the first read/query is expected to + * be large or slow. Avoid inlining when starting multiple reads/queries concurrently in the + * same transaction, as statements that do not include the inlined BeginTransaction must + * wait for the first statement to return the transaction id and can time out if that + * statement is slow. Also note that {@link ReadOnlyTransaction#getReadTimestamp()} is not + * available for an inlined transaction until the first read/query has returned transaction + * metadata, which normally happens when the first result is returned or the stream + * completes. + *
+ * + * @param options options for starting the read-only transaction + */ + default ReadOnlyTransaction readOnlyTransaction(ReadOnlyTransactionOption... options) { + return readOnlyTransaction(TimestampBound.strong(), options); + } + /** * Returns a read-only transaction context in which a multiple reads and/or queries can be * performed at the given timestamp bound. All reads/queries will use the same timestamp, and the @@ -384,6 +412,41 @@ ServerStream batchWriteAtLeastOnce( */ ReadOnlyTransaction readOnlyTransaction(TimestampBound bound); + /** + * Returns a read-only transaction context in which multiple reads and/or queries can be performed + * at the given timestamp bound and with the given read-only transaction options. + * + *

Options can include: + * + *

    + *
  • {@link Options#beginTransactionOption(Options.BeginTransactionOption)}: Controls whether + * the transaction is started by an explicit BeginTransaction RPC or by inlining + * BeginTransaction on the first read/query. Inlining can avoid one round trip for + * latency-sensitive transactions that start with a small or fast read/query. The default + * explicit BeginTransaction RPC can be preferable when the first read/query is expected to + * be large or slow. Avoid inlining when starting multiple reads/queries concurrently in the + * same transaction, as statements that do not include the inlined BeginTransaction must + * wait for the first statement to return the transaction id and can time out if that + * statement is slow. Also note that {@link ReadOnlyTransaction#getReadTimestamp()} is not + * available for an inlined transaction until the first read/query has returned transaction + * metadata, which normally happens when the first result is returned or the stream + * completes. + *
+ * + * @param bound the timestamp bound at which to perform the read + * @param options options for starting the read-only transaction + */ + default ReadOnlyTransaction readOnlyTransaction( + TimestampBound bound, ReadOnlyTransactionOption... options) { + Options readOnlyTransactionOptions = Options.fromReadOnlyTransactionOptions(options); + if (readOnlyTransactionOptions.beginTransactionOption() + == Options.BeginTransactionOption.EXPLICIT) { + return readOnlyTransaction(bound); + } + throw new UnsupportedOperationException( + "This DatabaseClient implementation does not support read-only transaction options"); + } + /** * Returns a transaction runner for executing a single logical transaction with retries. The * returned runner can only be used once. diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index bae8067e33dd..ecd82500a9c4 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -18,6 +18,7 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.ReadOnlyTransactionOption; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SpannerImpl.ClosedException; @@ -218,21 +219,25 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { @Override public ReadOnlyTransaction readOnlyTransaction() { - ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes); - try (IScope s = tracer.withSpan(span)) { - return getMultiplexedSession().readOnlyTransaction(); - } catch (RuntimeException e) { - span.setStatus(e); - span.end(); - throw e; - } + return readOnlyTransaction(TimestampBound.strong()); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction(ReadOnlyTransactionOption... options) { + return readOnlyTransaction(TimestampBound.strong(), options); } @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { + return readOnlyTransaction(bound, new ReadOnlyTransactionOption[0]); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction( + TimestampBound bound, ReadOnlyTransactionOption... options) { ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, databaseAttributes); try (IScope s = tracer.withSpan(span)) { - return getMultiplexedSession().readOnlyTransaction(bound); + return getMultiplexedSession().readOnlyTransaction(bound, options); } catch (RuntimeException e) { span.setStatus(e); span.end(); diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 81e29cfda487..a691d5969dfc 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -24,6 +24,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; +import com.google.cloud.spanner.Options.ReadOnlyTransactionOption; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.common.util.concurrent.MoreExecutors; @@ -109,25 +110,30 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { @Override public ReadOnlyTransaction readOnlyTransaction() { - return new DelayedReadOnlyTransaction( - ApiFutures.transform( - this.sessionFuture, - sessionReference -> - new MultiplexedSessionTransaction( - client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse= */ false) - .readOnlyTransaction(), - MoreExecutors.directExecutor())); + return readOnlyTransaction(TimestampBound.strong()); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction(ReadOnlyTransactionOption... options) { + return readOnlyTransaction(TimestampBound.strong(), options); } @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { + return readOnlyTransaction( + bound, Options.beginTransactionOption(Options.BeginTransactionOption.EXPLICIT)); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction( + TimestampBound bound, ReadOnlyTransactionOption... options) { return new DelayedReadOnlyTransaction( ApiFutures.transform( this.sessionFuture, sessionReference -> new MultiplexedSessionTransaction( client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse= */ false) - .readOnlyTransaction(bound), + .readOnlyTransaction(bound, options), MoreExecutors.directExecutor())); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 0f1388c36f9c..ece6d862f87b 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -23,6 +23,7 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.ReadOnlyTransactionOption; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; @@ -560,12 +561,25 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { @Override public ReadOnlyTransaction readOnlyTransaction() { - return createMultiplexedSessionTransaction(/* singleUse= */ false).readOnlyTransaction(); + return readOnlyTransaction(TimestampBound.strong()); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction(ReadOnlyTransactionOption... options) { + return readOnlyTransaction(TimestampBound.strong(), options); } @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { - return createMultiplexedSessionTransaction(/* singleUse= */ false).readOnlyTransaction(bound); + return readOnlyTransaction( + bound, Options.beginTransactionOption(Options.BeginTransactionOption.EXPLICIT)); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction( + TimestampBound bound, ReadOnlyTransactionOption... options) { + return createMultiplexedSessionTransaction(/* singleUse= */ false) + .readOnlyTransaction(bound, options); } @Override diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 116e1aa4fc5d..20cf3c015a4d 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -79,6 +79,22 @@ public static RpcOrderBy fromProto(OrderBy proto) { } } + /** Controls how a multi-use read-only transaction is started. */ + public enum BeginTransactionOption { + /** Execute a separate BeginTransaction RPC before any reads or queries. */ + EXPLICIT, + + /** + * Include BeginTransaction on the first read or query. This saves one round trip when the first + * operation is not slow and the transaction starts serially. It can be less performant than + * {@link #EXPLICIT} when multiple reads or queries are started concurrently, because only one + * operation can carry the BeginTransaction option and the others must wait until that operation + * returns transaction metadata. {@link ReadOnlyTransaction#getReadTimestamp()} is also not + * available until the first operation has returned transaction metadata. + */ + INLINE + } + public enum RpcLockHint { UNSPECIFIED(LockHint.LOCK_HINT_UNSPECIFIED), SHARED(LockHint.LOCK_HINT_SHARED), @@ -136,6 +152,9 @@ public interface QueryOption {} /** Marker interface to mark options applicable to write operations */ public interface TransactionOption {} + /** Marker interface to mark options applicable to multi-use read-only transactions. */ + public interface ReadOnlyTransactionOption {} + /** Marker interface to mark options applicable to update operation. */ public interface UpdateOption {} @@ -297,6 +316,14 @@ RequestOptions toRequestOptionsProto(boolean isTransactionOption) { return builder.build(); } + /** + * Specifies how a multi-use read-only transaction should be started. The default is {@link + * BeginTransactionOption#EXPLICIT}. + */ + public static ReadOnlyTransactionOption beginTransactionOption(BeginTransactionOption option) { + return new BeginTransactionOptionOption(Preconditions.checkNotNull(option)); + } + public static TransactionOption maxCommitDelay(Duration maxCommitDelay) { Preconditions.checkArgument(!maxCommitDelay.isNegative(), "maxCommitDelay should be positive"); return new MaxCommitDelayOption(maxCommitDelay); @@ -633,6 +660,7 @@ void appendToOptions(Options options) { private IsolationLevel isolationLevel; private XGoogSpannerRequestId reqId; private ReadLockMode readLockMode; + private BeginTransactionOption beginTransactionOption; // Construction is via factory methods below. private Options() {} @@ -805,6 +833,12 @@ ReadLockMode readLockMode() { return readLockMode; } + BeginTransactionOption beginTransactionOption() { + return beginTransactionOption == null + ? BeginTransactionOption.EXPLICIT + : beginTransactionOption; + } + @Override public String toString() { StringBuilder b = new StringBuilder(); @@ -1038,6 +1072,16 @@ static Options fromTransactionOptions(TransactionOption... options) { return transactionOptions; } + static Options fromReadOnlyTransactionOptions(ReadOnlyTransactionOption... options) { + Options transactionOptions = new Options(); + for (ReadOnlyTransactionOption option : options) { + if (option instanceof InternalOption) { + ((InternalOption) option).appendToOptions(transactionOptions); + } + } + return transactionOptions; + } + static Options fromListOptions(ListOption... options) { Options listOptions = new Options(); for (ListOption option : options) { @@ -1062,6 +1106,20 @@ private abstract static class InternalOption { abstract void appendToOptions(Options options); } + static final class BeginTransactionOptionOption extends InternalOption + implements ReadOnlyTransactionOption { + private final BeginTransactionOption beginTransactionOption; + + BeginTransactionOptionOption(BeginTransactionOption beginTransactionOption) { + this.beginTransactionOption = beginTransactionOption; + } + + @Override + void appendToOptions(Options options) { + options.beginTransactionOption = beginTransactionOption; + } + } + static class LimitOption extends InternalOption implements ReadOption { private final long limit; diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index e14d18fa8bf7..55e43a7f342d 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -27,6 +27,7 @@ import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; import com.google.cloud.spanner.ErrorHandler.DefaultErrorHandler; +import com.google.cloud.spanner.Options.ReadOnlyTransactionOption; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionOption; @@ -418,12 +419,25 @@ public ReadOnlyTransaction readOnlyTransaction() { return readOnlyTransaction(TimestampBound.strong()); } + @Override + public ReadOnlyTransaction readOnlyTransaction(ReadOnlyTransactionOption... options) { + return readOnlyTransaction(TimestampBound.strong(), options); + } + @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { + return readOnlyTransaction(bound, new ReadOnlyTransactionOption[0]); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction( + TimestampBound bound, ReadOnlyTransactionOption... options) { + Options readOnlyTransactionOptions = Options.fromReadOnlyTransactionOptions(options); return setActive( MultiUseReadOnlyTransaction.newBuilder() .setSession(this) .setTimestampBound(bound) + .setBeginTransactionOption(readOnlyTransactionOptions.beginTransactionOption()) .setRpc(spanner.getRpc()) .setDefaultQueryOptions(spanner.getDefaultQueryOptions(getDatabaseId())) .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 95e81cf7e6e5..65c9c9cadaad 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -45,8 +45,10 @@ import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; +import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.Mutation.Write; import com.google.spanner.v1.PartialResultSet; +import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.RollbackRequest; @@ -57,12 +59,19 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; import java.text.ParseException; +import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; import java.util.GregorianCalendar; +import java.util.List; import java.util.Map; import java.util.TimeZone; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import org.junit.Before; import org.junit.BeforeClass; @@ -97,6 +106,8 @@ public void setUp() { when(spannerOptions.getNumChannels()).thenReturn(4); when(spannerOptions.getDefaultTransactionOptions()) .thenReturn(TransactionOptions.getDefaultInstance()); + when(spannerOptions.getDefaultQueryOptions(Mockito.any(DatabaseId.class))) + .thenReturn(ExecuteSqlRequest.QueryOptions.getDefaultInstance()); when(spannerOptions.getPrefetchChunks()).thenReturn(1); when(spannerOptions.getDatabaseRole()).thenReturn("role"); when(spannerOptions.getRetrySettings()).thenReturn(RetrySettings.newBuilder().build()); @@ -628,6 +639,248 @@ private void mockRead(final PartialResultSet myResultSet) { }); } + private static PartialResultSet inlineBeginResultSet(String transactionId) throws ParseException { + com.google.protobuf.Timestamp timestamp = Timestamps.parse("2015-10-01T10:54:20.021Z"); + return PartialResultSet.newBuilder() + .setMetadata( + newMetadata(Type.struct(Type.StructField.of("C", Type.string()))).toBuilder() + .setTransaction( + Transaction.newBuilder() + .setId(ByteString.copyFromUtf8(transactionId)) + .setReadTimestamp(timestamp))) + .build(); + } + + private static PartialResultSet resultSetWithoutTransaction() { + return PartialResultSet.newBuilder() + .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) + .build(); + } + + private ReadOnlyTransaction inlineReadOnlyTransaction() { + return session.readOnlyTransaction( + TimestampBound.strong(), + Options.beginTransactionOption(Options.BeginTransactionOption.INLINE)); + } + + @Test + public void multiUseReadOnlyTransactionCanUseInlineBegin() throws ParseException { + PartialResultSet resultSet = inlineBeginResultSet("inline-tx"); + final ArgumentCaptor consumer = + ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); + final ArgumentCaptor request = ArgumentCaptor.forClass(ReadRequest.class); + Mockito.when(rpc.read(request.capture(), consumer.capture(), anyMap(), any(), eq(false))) + .then( + invocation -> { + consumer.getValue().onPartialResultSet(resultSet); + consumer.getValue().onCompleted(); + return new NoOpStreamingCall(); + }); + + try (ReadOnlyTransaction txn = inlineReadOnlyTransaction()) { + txn.readRow("Dummy", Key.of(), Collections.singletonList("C")); + txn.readRow("Dummy", Key.of(), Collections.singletonList("C")); + assertEquals( + Timestamp.fromProto(Timestamps.parse("2015-10-01T10:54:20.021Z")), + txn.getReadTimestamp()); + } + + Mockito.verify(rpc, Mockito.never()).beginTransaction(Mockito.any(), anyMap(), eq(false)); + assertEquals(2, request.getAllValues().size()); + assertThat(request.getAllValues().get(0).getTransaction().hasBegin()).isTrue(); + assertEquals( + ByteString.copyFromUtf8("inline-tx"), + request.getAllValues().get(1).getTransaction().getId()); + } + + @Test + public void multiUseReadOnlyTransactionCanUseInlineBeginForQuery() throws ParseException { + PartialResultSet resultSet = inlineBeginResultSet("inline-query-tx"); + final ArgumentCaptor consumer = + ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); + final ArgumentCaptor request = + ArgumentCaptor.forClass(ExecuteSqlRequest.class); + Mockito.when( + rpc.executeQuery(request.capture(), consumer.capture(), anyMap(), any(), eq(false))) + .then( + invocation -> { + consumer.getValue().onPartialResultSet(resultSet); + consumer.getValue().onCompleted(); + return new NoOpStreamingCall(); + }); + + try (ReadOnlyTransaction txn = inlineReadOnlyTransaction()) { + try (ResultSet rs = txn.executeQuery(Statement.of("SELECT 1"))) { + while (rs.next()) {} + } + try (ResultSet rs = txn.executeQuery(Statement.of("SELECT 1"))) { + while (rs.next()) {} + } + } + + Mockito.verify(rpc, Mockito.never()).beginTransaction(Mockito.any(), anyMap(), eq(false)); + assertEquals(2, request.getAllValues().size()); + assertThat(request.getAllValues().get(0).getTransaction().hasBegin()).isTrue(); + assertEquals( + ByteString.copyFromUtf8("inline-query-tx"), + request.getAllValues().get(1).getTransaction().getId()); + } + + @Test + public void multiUseReadOnlyTransactionInlineBeginFirstQueryErrorPropagates() { + SpannerException error = + SpannerExceptionFactory.newSpannerException(ErrorCode.INVALID_ARGUMENT, "bad query"); + final ArgumentCaptor consumer = + ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); + final ArgumentCaptor request = + ArgumentCaptor.forClass(ExecuteSqlRequest.class); + Mockito.when( + rpc.executeQuery(request.capture(), consumer.capture(), anyMap(), any(), eq(false))) + .then( + invocation -> { + consumer.getValue().onError(error); + return new NoOpStreamingCall(); + }); + + try (ReadOnlyTransaction txn = inlineReadOnlyTransaction()) { + try (ResultSet rs = txn.executeQuery(Statement.of("SELECT BAD"))) { + SpannerException e = assertThrows(SpannerException.class, () -> rs.next()); + assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); + } + SpannerException e = + assertThrows( + SpannerException.class, + () -> txn.readRow("Dummy", Key.of(), Collections.singletonList("C"))); + assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); + } + + Mockito.verify(rpc, Mockito.never()).beginTransaction(Mockito.any(), anyMap(), eq(false)); + assertEquals(1, request.getAllValues().size()); + assertThat(request.getAllValues().get(0).getTransaction().hasBegin()).isTrue(); + Mockito.verify(rpc, Mockito.never()) + .read(Mockito.any(), Mockito.any(), anyMap(), any(), eq(false)); + } + + @Test + public void multiUseReadOnlyTransactionInlineBeginConcurrentStartWaitsForTransactionId() + throws Exception { + PartialResultSet beginResultSet = inlineBeginResultSet("concurrent-inline-tx"); + PartialResultSet secondResultSet = resultSetWithoutTransaction(); + final List requests = Collections.synchronizedList(new ArrayList<>()); + final List consumers = + Collections.synchronizedList(new ArrayList<>()); + final AtomicInteger callCount = new AtomicInteger(); + final CountDownLatch firstRpcStarted = new CountDownLatch(1); + Mockito.when(rpc.read(Mockito.any(), Mockito.any(), anyMap(), any(), eq(false))) + .then( + invocation -> { + int call = callCount.incrementAndGet(); + ReadRequest readRequest = invocation.getArgument(0); + SpannerRpc.ResultStreamConsumer consumer = invocation.getArgument(1); + requests.add(readRequest); + consumers.add(consumer); + if (call == 1) { + firstRpcStarted.countDown(); + } else { + consumer.onPartialResultSet(secondResultSet); + consumer.onCompleted(); + } + return new NoOpStreamingCall(); + }); + + ExecutorService executor = Executors.newFixedThreadPool(2); + try (ReadOnlyTransaction txn = inlineReadOnlyTransaction()) { + Future first = + executor.submit(() -> txn.readRow("Dummy", Key.of(), Collections.singletonList("C"))); + assertThat(firstRpcStarted.await(5, TimeUnit.SECONDS)).isTrue(); + + Future second = + executor.submit(() -> txn.readRow("Dummy", Key.of(), Collections.singletonList("C"))); + Thread.sleep(100L); + assertThat(callCount.get()).isEqualTo(1); + assertThat(second.isDone()).isFalse(); + + consumers.get(0).onPartialResultSet(beginResultSet); + consumers.get(0).onCompleted(); + + assertThat(first.get(5, TimeUnit.SECONDS)).isNull(); + assertThat(second.get(5, TimeUnit.SECONDS)).isNull(); + } finally { + executor.shutdownNow(); + } + + Mockito.verify(rpc, Mockito.never()).beginTransaction(Mockito.any(), anyMap(), eq(false)); + assertEquals(2, requests.size()); + assertThat(requests.get(0).getTransaction().hasBegin()).isTrue(); + assertEquals( + ByteString.copyFromUtf8("concurrent-inline-tx"), requests.get(1).getTransaction().getId()); + } + + @Test + public void multiUseReadOnlyTransactionCanUseInlineBeginForReadAsync() throws Exception { + PartialResultSet resultSet = inlineBeginResultSet("async-inline-tx"); + final ArgumentCaptor consumer = + ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); + final ArgumentCaptor request = ArgumentCaptor.forClass(ReadRequest.class); + Mockito.when(rpc.read(request.capture(), consumer.capture(), anyMap(), any(), eq(false))) + .then( + invocation -> { + consumer.getValue().onPartialResultSet(resultSet); + consumer.getValue().onCompleted(); + return new NoOpStreamingCall(); + }); + + try (ReadOnlyTransaction txn = inlineReadOnlyTransaction()) { + try (AsyncResultSet rs = + txn.readAsync("Dummy", KeySet.all(), Collections.singletonList("C"))) { + rs.setCallback( + Runnable::run, + asyncResultSet -> { + while (asyncResultSet.tryNext() == AsyncResultSet.CursorState.OK) {} + return AsyncResultSet.CallbackResponse.CONTINUE; + }) + .get(5, TimeUnit.SECONDS); + } + assertEquals( + Timestamp.fromProto(Timestamps.parse("2015-10-01T10:54:20.021Z")), + txn.getReadTimestamp()); + } + + Mockito.verify(rpc, Mockito.never()).beginTransaction(Mockito.any(), anyMap(), eq(false)); + assertEquals(1, request.getAllValues().size()); + assertThat(request.getValue().getTransaction().hasBegin()).isTrue(); + } + + @Test + public void multiUseReadOnlyTransactionInlineBeginQueryCloseWithoutReadingDoesNotStartRpc() + throws ParseException { + PartialResultSet resultSet = inlineBeginResultSet("close-inline-tx"); + final ArgumentCaptor consumer = + ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); + final ArgumentCaptor request = ArgumentCaptor.forClass(ReadRequest.class); + Mockito.when(rpc.read(request.capture(), consumer.capture(), anyMap(), any(), eq(false))) + .then( + invocation -> { + consumer.getValue().onPartialResultSet(resultSet); + consumer.getValue().onCompleted(); + return new NoOpStreamingCall(); + }); + + try (ReadOnlyTransaction txn = inlineReadOnlyTransaction()) { + ResultSet rs = txn.executeQuery(Statement.of("SELECT 1")); + rs.close(); + Mockito.verify(rpc, Mockito.never()) + .executeQuery( + Mockito.any(ExecuteSqlRequest.class), Mockito.any(), anyMap(), any(), eq(false)); + + txn.readRow("Dummy", Key.of(), Collections.singletonList("C")); + } + + Mockito.verify(rpc, Mockito.never()).beginTransaction(Mockito.any(), anyMap(), eq(false)); + assertEquals(1, request.getAllValues().size()); + assertThat(request.getValue().getTransaction().hasBegin()).isTrue(); + } + @Test public void multiUseReadOnlyTransactionReturnsEmptyTransactionMetadata() { Transaction txnMetadata = Transaction.newBuilder().setId(ByteString.copyFromUtf8("x")).build();