1616
1717package com .google .cloud .executor .spanner ;
1818
19- import static com .google .cloud .spanner .TransactionRunner .TransactionCallable ;
20-
2119import com .google .api .gax .core .FixedCredentialsProvider ;
2220import com .google .api .gax .longrunning .OperationFuture ;
2321import com .google .api .gax .paging .Page ;
5553import com .google .cloud .spanner .Mutation .WriteBuilder ;
5654import com .google .cloud .spanner .Options ;
5755import com .google .cloud .spanner .Options .RpcPriority ;
56+ import com .google .cloud .spanner .Options .TransactionOption ;
5857import com .google .cloud .spanner .Partition ;
5958import com .google .cloud .spanner .PartitionOptions ;
6059import com .google .cloud .spanner .ReadContext ;
156155import com .google .spanner .executor .v1 .UpdateCloudDatabaseDdlAction ;
157156import com .google .spanner .executor .v1 .UpdateCloudInstanceAction ;
158157import com .google .spanner .v1 .StructType ;
158+ import com .google .spanner .v1 .TransactionOptions .IsolationLevel ;
159+ import com .google .spanner .v1 .TransactionOptions .ReadWrite .ReadLockMode ;
159160import com .google .spanner .v1 .TypeAnnotationCode ;
160161import com .google .spanner .v1 .TypeCode ;
161162import io .grpc .Status ;
174175import java .time .Duration ;
175176import java .time .LocalDate ;
176177import java .util .ArrayList ;
178+ import java .util .HashMap ;
177179import java .util .List ;
178180import java .util .Map ;
179181import java .util .Objects ;
@@ -248,40 +250,60 @@ public static String unexpectedExceptionResponse(Exception e) {
248250 * again.
249251 */
250252 private static class ReadWriteTransaction {
253+
251254 private final DatabaseClient dbClient ;
252255 private TransactionRunner runner ;
253256 private TransactionContext txnContext ;
254257 private com .google .protobuf .Timestamp timestamp ;
255258 private Mode finishMode ;
259+ private SpannerException abortedException ;
256260 private SpannerException error ;
257261 private final String transactionSeed ;
258262 private final boolean optimistic ;
263+ private final boolean repeatableRead ;
259264 // Set to true when the transaction runner completed, one of these three could happen: runner
260265 // committed, abandoned or threw an error.
261266 private boolean runnerCompleted ;
262267
263268 public ReadWriteTransaction (
264- DatabaseClient dbClient , String transactionSeed , boolean optimistic ) {
269+ DatabaseClient dbClient ,
270+ String transactionSeed ,
271+ boolean optimistic ,
272+ boolean repeatableRead ) {
265273 this .dbClient = dbClient ;
266274 this .transactionSeed = transactionSeed ;
267275 this .optimistic = optimistic ;
276+ this .repeatableRead = repeatableRead ;
268277 this .runnerCompleted = false ;
269278 }
270279
271280 /** Set context to be used for executing actions. */
272281 private synchronized void setContext (TransactionContext transaction ) {
273282 finishMode = null ;
283+ abortedException = null ;
274284 txnContext = transaction ;
275285 Preconditions .checkNotNull (txnContext );
276286 LOGGER .log (Level .INFO , "Transaction callable created, setting context %s\n " , transactionSeed );
277287 notifyAll ();
278288 }
279289
290+ private synchronized void setAborted (SpannerException abortedException ) {
291+ LOGGER .log (Level .INFO , "Got aborted exception %s\n " , abortedException .toString ());
292+ this .abortedException = abortedException ;
293+ notifyAll ();
294+ }
295+
280296 /** Wait for finishAction to be executed and return the requested finish mode. */
281- private synchronized Mode waitForFinishAction () throws Exception {
282- while (finishMode == null ) {
297+ private synchronized Mode waitForFinishActionOrAbort () throws Exception {
298+ while (finishMode == null && abortedException == null ) {
283299 wait ();
284300 }
301+ // If a read aborted, throw the exception to the TransactionRunner callable to
302+ // restart the transaction.
303+ if (abortedException != null ) {
304+ LOGGER .log (Level .INFO , "Throw aborted exception %s\n " , abortedException .toString ());
305+ throw abortedException ;
306+ }
285307 return finishMode ;
286308 }
287309
@@ -320,9 +342,27 @@ public synchronized com.google.protobuf.Timestamp getTimestamp() {
320342 return timestamp ;
321343 }
322344
323- /** Return the transactionContext to run actions. Must be called after start action . */
345+ /** Return the transactionContext to run actions, waiting until it is set . */
324346 public synchronized TransactionContext getContext () {
325- Preconditions .checkState (txnContext != null );
347+ while (txnContext == null || abortedException != null ) {
348+ // If the transaction was aborted by a read action, the abortedException will
349+ // be thrown to the TransactionRunner callable to restart the transaction.
350+ // The restarted callable will call setContext() to set the new transaction context
351+ // and clear abortedException.
352+ if (abortedException != null ) {
353+ LOGGER .log (Level .INFO , "Waiting for new RW transaction context after abort\n " );
354+ } else {
355+ LOGGER .log (Level .INFO , "Waiting for RW transaction context." );
356+ }
357+ try {
358+ wait ();
359+ } catch (InterruptedException e ) {
360+ LOGGER .log (Level .INFO , "Interrupted while waiting for RW transaction context." );
361+ Thread .currentThread ().interrupt ();
362+ throw SpannerExceptionFactory .newSpannerException (
363+ ErrorCode .CANCELLED , "Interrupted while waiting for transaction context" , e );
364+ }
365+ }
326366 return txnContext ;
327367 }
328368
@@ -339,7 +379,7 @@ public void startRWTransaction() throws Exception {
339379 String .format (
340380 "Transaction context set, executing and waiting for finish %s\n " ,
341381 transactionSeed ));
342- Mode mode = waitForFinishAction ();
382+ Mode mode = waitForFinishActionOrAbort ();
343383 if (mode == Mode .ABANDON ) {
344384 throw new Exception (TRANSACTION_ABANDONED );
345385 }
@@ -351,10 +391,21 @@ public void startRWTransaction() throws Exception {
351391 context .wrap (
352392 () -> {
353393 try {
394+ List <TransactionOption > transactionOptions = new ArrayList <>();
395+ if (repeatableRead ) {
396+ transactionOptions .add (Options .isolationLevel (IsolationLevel .REPEATABLE_READ ));
397+ } else {
398+ transactionOptions .add (Options .isolationLevel (IsolationLevel .SERIALIZABLE ));
399+ }
400+ if (optimistic ) {
401+ transactionOptions .add (Options .readLockMode (ReadLockMode .OPTIMISTIC ));
402+ } else {
403+ transactionOptions .add (Options .readLockMode (ReadLockMode .PESSIMISTIC ));
404+ }
354405 runner =
355- optimistic
356- ? dbClient . readWriteTransaction ( Options . optimisticLock ())
357- : dbClient . readWriteTransaction ( );
406+ dbClient . readWriteTransaction (
407+ transactionOptions . toArray (
408+ new TransactionOption [ transactionOptions . size ()]) );
358409 LOGGER .log (
359410 Level .INFO , String .format ("Ready to run callable %s\n " , transactionSeed ));
360411 runner .run (callable );
@@ -397,7 +448,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception {
397448 "TxnContext cleared, sending finishMode to finish transaction %s\n " ,
398449 transactionSeed ));
399450 notifyAll ();
400- // Wait for the transaction to finish or restart
451+ // Wait for the transaction to finish or restart due to an abort on COMMIT.
401452 while (txnContext == null && !runnerCompleted ) {
402453 wait ();
403454 }
@@ -434,6 +485,7 @@ public synchronized boolean finish(Mode finishMode) throws Exception {
434485 * initialized.
435486 */
436487 class ExecutionFlowContext {
488+
437489 // Database path from previous action
438490 private String prevDbPath ;
439491 // Current read-write transaction
@@ -448,9 +500,6 @@ class ExecutionFlowContext {
448500 private Metadata metadata ;
449501 // Number of pending read/query actions.
450502 private int numPendingReads ;
451- // Indicate whether there's a read/query action got aborted and the transaction need to be
452- // reset.
453- private boolean readAborted ;
454503 // Log the workid and op pair for tracing the thread.
455504 private String transactionSeed ;
456505 // Outgoing stream.
@@ -588,7 +637,11 @@ public synchronized void startReadWriteTxn(
588637 String .format (
589638 "There's no active transaction, safe to create rwTxn: %s\n " , getTransactionSeed ()));
590639 this .metadata = metadata ;
591- rwTxn = new ReadWriteTransaction (dbClient , transactionSeed , options .getOptimistic ());
640+ boolean optimistic =
641+ options .getSerializableOptimistic () || options .getSnapshotIsolationOptimistic ();
642+ boolean repeatableRead =
643+ options .getSnapshotIsolationOptimistic () || options .getSnapshotIsolationPessimistic ();
644+ rwTxn = new ReadWriteTransaction (dbClient , transactionSeed , optimistic , repeatableRead );
592645 LOGGER .log (
593646 Level .INFO ,
594647 String .format (
@@ -644,20 +697,17 @@ public synchronized void startRead() {
644697 * Decrease the read count when a read/query is finished, if status is aborted and there's no
645698 * pending read/query, reset the transaction for retry.
646699 */
647- public synchronized void finishRead (Status status ) {
700+ public synchronized void finishRead (Status status , SpannerException e ) {
648701 if (status .getCode () == Status .ABORTED .getCode ()) {
649- readAborted = true ;
702+ if (rwTxn != null ) {
703+ rwTxn .setAborted (e );
704+ }
650705 }
651706 --numPendingReads ;
652- if (readAborted && numPendingReads <= 0 ) {
653- LOGGER .log (Level .FINE , "Transaction reset due to read/query abort" );
654- readAborted = false ;
655- }
656707 }
657708
658709 /** Initialize the read count and aborted status when transaction started. */
659710 public synchronized void initReadState () {
660- readAborted = false ;
661711 numPendingReads = 0 ;
662712 }
663713
@@ -724,6 +774,12 @@ public synchronized Status finish(Mode finishMode, OutcomeSender sender) {
724774 if (rwTxn .getTimestamp () != null ) {
725775 outcomeBuilder .setCommitTime (rwTxn .getTimestamp ());
726776 }
777+ if (finishMode == Mode .COMMIT
778+ && rwTxn .runner .getCommitResponse ().getSnapshotTimestamp () != null ) {
779+ outcomeBuilder .setSnapshotIsolationTxnReadTimestamp (
780+ Timestamps .toMicros (
781+ rwTxn .runner .getCommitResponse ().getSnapshotTimestamp ().toProto ()));
782+ }
727783 clear ();
728784 }
729785 }
@@ -761,7 +817,7 @@ public synchronized void closeBatchTxn() throws SpannerException {
761817 }
762818
763819 private Spanner client ;
764- private Spanner clientWithTimeout ;
820+ private Map < Long , Spanner > clientWithTimeoutMap = new HashMap <>() ;
765821
766822 private static final String TRANSACTION_ABANDONED = "Fake error to abandon transaction" ;
767823
@@ -782,23 +838,25 @@ public synchronized void closeBatchTxn() throws SpannerException {
782838
783839 private synchronized Spanner getClientWithTimeout (
784840 long timeoutSeconds , boolean useMultiplexedSession ) throws IOException {
785- if (clientWithTimeout != null ) {
786- return clientWithTimeout ;
841+ if (clientWithTimeoutMap . containsKey ( timeoutSeconds ) ) {
842+ return clientWithTimeoutMap . get ( timeoutSeconds ) ;
787843 }
788- clientWithTimeout = getClient (timeoutSeconds , useMultiplexedSession );
789- return clientWithTimeout ;
844+ clientWithTimeoutMap .put (
845+ timeoutSeconds , initializeClient (timeoutSeconds , useMultiplexedSession ));
846+ return clientWithTimeoutMap .get (timeoutSeconds );
790847 }
791848
792849 private synchronized Spanner getClient (boolean useMultiplexedSession ) throws IOException {
793850 if (client != null ) {
794851 return client ;
795852 }
796- client = getClient (/* timeoutSeconds= */ 0 , useMultiplexedSession );
853+ client = initializeClient (/* timeoutSeconds= */ 0 , useMultiplexedSession );
797854 return client ;
798855 }
799856
800- // Return the spanner client, create one if not exists.
801- private synchronized Spanner getClient (long timeoutSeconds , boolean useMultiplexedSession )
857+ // Initializes a newly created spanner client. NEVER CALL THIS METHOD DIRECTLY.
858+ // ALWAYS CALL getClientWithTimeout() or getClient() INSTEAD.
859+ private synchronized Spanner initializeClient (long timeoutSeconds , boolean useMultiplexedSession )
802860 throws IOException {
803861 // Create a cloud spanner client
804862 Credentials credentials ;
@@ -2807,7 +2865,7 @@ private Status processResults(
28072865 Level .INFO ,
28082866 String .format (
28092867 "Successfully processed result: %s\n " , executionContext .getTransactionSeed ()));
2810- executionContext .finishRead (Status .OK );
2868+ executionContext .finishRead (Status .OK , null );
28112869 return sender .finishWithOK ();
28122870 } catch (SpannerException e ) {
28132871 LOGGER .log (Level .WARNING , "Encountered exception: " , e );
@@ -2817,7 +2875,7 @@ private Status processResults(
28172875 String .format (
28182876 "Encountered exception: %s %s\n " ,
28192877 status .getDescription (), executionContext .getTransactionSeed ()));
2820- executionContext .finishRead (status );
2878+ executionContext .finishRead (status , e );
28212879 if (status .getCode () == Status .ABORTED .getCode ()) {
28222880 return sender .finishWithTransactionRestarted ();
28232881 } else {
0 commit comments