diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java index 35884cb74319..59814363548a 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java @@ -115,6 +115,9 @@ private enum PoolState { @GuardedBy("this") private int consecutiveFailures = 0; + @GuardedBy("this") + private boolean isDraining = false; + /** * When the client fallback to a non-session AFE session creation will return unimplemented * errors. In which case the requests should fallback to classic client instead of waiting for an @@ -525,17 +528,25 @@ private void onSessionClose( @GuardedBy("this") private void tryDrainPendingRpcs() { - while (!pendingRpcs.isEmpty()) { - if (pendingRpcs.peek().isCancelled) { - pendingRpcs.pop(); - continue; - } - Optional handle = picker.pickSession(); - if (!handle.isPresent()) { - break; + if (isDraining) { + return; + } + isDraining = true; + try { + while (!pendingRpcs.isEmpty()) { + if (pendingRpcs.peek().isCancelled) { + pendingRpcs.pop(); + continue; + } + Optional handle = picker.pickSession(); + if (!handle.isPresent()) { + break; + } + PendingVRpc rpc = pendingRpcs.removeFirst(); + rpc.drainTo(handle.get()); } - PendingVRpc rpc = pendingRpcs.removeFirst(); - rpc.drainTo(handle.get()); + } finally { + isDraining = false; } }