Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,40 @@ public class GcpManagedChannel extends ManagedChannel {
public static final CallOptions.Key<Integer> CHANNEL_ID_KEY =
CallOptions.Key.create("GcpChannelId");

/** CallOptions key for sticky channel routing without affinity-key map state. */
public static final CallOptions.Key<ChannelAffinityRef> CHANNEL_AFFINITY_REF_KEY =
CallOptions.Key.create("GcpChannelAffinityRef");

/** Opaque sticky channel reference for callers that should not depend on {@link ChannelRef}. */
public static final class ChannelAffinityRef {
private static final int USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK = 1 << 31;
private static final int CHANNEL_ID_MASK = ~USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK;
private static final int NO_CHANNEL_ID = -1;

// Single allocation hot-path state:
// * lower 31 bits: channel id + 1, or 0 when unset.
// * high bit: use a different active channel on the next call.
private final AtomicInteger state = new AtomicInteger();

/** Forces the next RPC to prefer a different active channel if one is available. */
public void useDifferentChannelOnNextCall() {
state.getAndUpdate(value -> value | USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK);
}

private static int channelIdFromState(int state) {
int encodedChannelId = state & CHANNEL_ID_MASK;
return encodedChannelId == 0 ? NO_CHANNEL_ID : encodedChannelId - 1;
}

private static boolean useDifferentChannelOnNextCallFromState(int state) {
return (state & USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK) != 0;
}

private static int stateFromChannelId(int channelId) {
return (channelId + 1) & CHANNEL_ID_MASK;
}
}

@GuardedBy("this")
private Integer bindingIndex = -1;

Expand Down Expand Up @@ -140,6 +174,7 @@ public class GcpManagedChannel extends ManagedChannel {

// The channel pool.
@VisibleForTesting final List<ChannelRef> channelRefs = new CopyOnWriteArrayList<>();
private final Map<Integer, ChannelRef> channelIdToChannelRef = new ConcurrentHashMap<>();
// A set of channels that we removed from the pool and wait for their RPCs to be completed before
// we can shut them down.
final Set<ChannelRef> removedChannelRefs = new HashSet<>();
Expand Down Expand Up @@ -352,6 +387,7 @@ private synchronized void checkScaleDown() {
channelRef.getChannel().shutdown();
// Remove channel from broken channels map.
fallbackMap.remove(channelRef.getId());
channelIdToChannelRef.remove(channelRef.getId());
}
}

Expand All @@ -372,6 +408,7 @@ private void removeOldestChannels(int num) {

for (ChannelRef channelRef : channelsToRemove) {
channelRef.resetAffinityCount();
channelRef.deactivate();
if (channelRef.getState() == ConnectivityState.READY) {
decReadyChannels(false);
}
Expand Down Expand Up @@ -1678,6 +1715,58 @@ protected ChannelRef getChannelRef(@Nullable String key) {
return mappedChannel;
}

/**
* Pick a {@link ChannelRef} using a caller-owned reference instead of grpc-gcp's affinity map.
*/
protected ChannelRef getChannelRefByAffinityRef(ChannelAffinityRef affinityRef) {
maybeDynamicUpscale();
while (true) {
int state = affinityRef.state.get();
int channelId = ChannelAffinityRef.channelIdFromState(state);
boolean useDifferentChannel =
ChannelAffinityRef.useDifferentChannelOnNextCallFromState(state);
ChannelRef channelRef =
channelId == ChannelAffinityRef.NO_CHANNEL_ID
? null
: channelIdToChannelRef.get(channelId);
if (!useDifferentChannel && channelRef != null && channelRef.isActive()) {
return channelRef;
}

ChannelRef selectedChannelRef =
useDifferentChannel
? pickLeastBusyChannelDifferentFrom(channelRef)
: pickLeastBusyChannel(/* forFallback= */ false);
if (affinityRef.state.compareAndSet(
state, ChannelAffinityRef.stateFromChannelId(selectedChannelRef.getId()))) {
return selectedChannelRef;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

If selectedChannelRef is null, this method can return null. The caller, newCall, passes this to the GcpClientCall constructor, which will cause a NullPointerException as it does not expect a null ChannelRef. This method should throw an exception if no channel is available instead of returning null to maintain the expected invariant.

References
  1. Avoid adding defensive null checks for values that are guaranteed to be non-null by design, as this can hide invariant breaks. Instead, ensure the method returns a valid object or throws an exception.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither pickLeastBusyChannel nor pickLeastBusyChannelDifferentFrom can return null — empty-pool case throws on channelRefs.get(0) long before this point, so the NPE path doesn't exist.

}
}
Comment on lines +1723 to +1744
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The useDifferentChannelOnNextCall.getAndSet(false) call is located inside the while (true) loop. If the compareAndSet operation fails due to contention, the useDifferentChannel flag will be false in the next iteration, potentially losing the intent to pick a different channel. Moving the getAndSet call outside the loop ensures the flag is correctly handled even if retries occur. Additionally, once the reference has changed (causing the CAS to fail), the "different channel" requirement is effectively satisfied for the next iteration.

    boolean useDifferentChannel = affinityRef.useDifferentChannelOnNextCall.getAndSet(false);
    while (true) {
      ChannelRef channelRef = affinityRef.channelRef.get();
      if (!useDifferentChannel && channelRef != null && channelRef.isActive()) {
        return channelRef;
      }

      ChannelRef selectedChannelRef =
          useDifferentChannel
              ? pickLeastBusyChannelDifferentFrom(channelRef)
              : pickLeastBusyChannel(/* forFallback= */ false);
      if (affinityRef.channelRef.compareAndSet(channelRef, selectedChannelRef)) {
        return selectedChannelRef;
      }
      useDifferentChannel = false;
    }
References
  1. In performance-sensitive code, prefer using explicit locks or atomic operations over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.
  2. Ensure that state flags in channel management logic are thread-safe, using constructs like AtomicBoolean or AtomicInteger.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both versions current/suggested consume the flag once on the first iteration and drop it after — the suggested rewrite is the same behavior with extra lines.

}

private ChannelRef pickLeastBusyChannelDifferentFrom(@Nullable ChannelRef excludedChannelRef) {
ChannelRef channelRef = pickLeastBusyChannel(/* forFallback= */ false);
if (excludedChannelRef == null || channelRefs.size() <= 1) {
return channelRef;
}
if (channelRef != excludedChannelRef && channelRef.isActive()) {
return channelRef;
}
ChannelRef leastBusyChannelRef = null;
int leastBusyStreams = Integer.MAX_VALUE;
for (ChannelRef candidate : channelRefs) {
if (candidate == excludedChannelRef || !candidate.isActive()) {
continue;
}
int streams = candidate.getActiveStreamsCount();
if (leastBusyChannelRef == null || streams < leastBusyStreams) {
leastBusyChannelRef = candidate;
leastBusyStreams = streams;
}
}
Comment on lines +1757 to +1766
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The fallback loop in pickLeastBusyChannelDifferentFrom returns the first candidate that is not the excluded one, which might not be the least busy channel. Additionally, it should verify that the candidate is still active using candidate.isActive(), as channels might be marked inactive before being removed from the channelRefs list.

Suggested change
for (ChannelRef candidate : channelRefs) {
if (candidate != excludedChannelRef) {
return candidate;
}
}
for (ChannelRef candidate : channelRefs) {
if (candidate != excludedChannelRef && candidate.isActive()) {
return candidate;
}
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale review

return leastBusyChannelRef == null ? channelRef : leastBusyChannelRef;
}

// Create a new channel and add it to channelRefs.
// If we have a ready channel not in the pool that we wait for completing its RPCs,
// then re-use that channel instead.
Expand All @@ -1688,6 +1777,8 @@ ChannelRef createNewChannel() {
ChannelRef chRef = reusedChannelRef.get();
channelRefs.add(chRef);
removedChannelRefs.remove(chRef);
channelIdToChannelRef.put(chRef.getId(), chRef);
chRef.activate();
logger.finer(log("Channel %d reused.", chRef.getId()));
incReadyChannels(false);
maxChannels.accumulateAndGet(getNumberOfChannels(), Math::max);
Expand All @@ -1696,6 +1787,7 @@ ChannelRef createNewChannel() {

ChannelRef channelRef = new ChannelRef(delegateChannelBuilder.build());
channelRefs.add(channelRef);
channelIdToChannelRef.put(channelRef.getId(), channelRef);
logger.finer(log("Channel %d created.", channelRef.getId()));
maxChannels.accumulateAndGet(getNumberOfChannels(), Math::max);
return channelRef;
Expand Down Expand Up @@ -1961,6 +2053,12 @@ public String authority() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) {
ChannelAffinityRef channelAffinityRef = callOptions.getOption(CHANNEL_AFFINITY_REF_KEY);
if (channelAffinityRef != null) {
return new GcpClientCall.SimpleGcpClientCall<>(
this, getChannelRefByAffinityRef(channelAffinityRef), methodDescriptor, callOptions);
Comment on lines +2058 to +2059
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

getChannelRefByAffinityRef can return null if no channels are available (e.g., if pickLeastBusyChannel returns null). This would cause a NullPointerException inside the SimpleGcpClientCall constructor, which likely doesn't expect a null ChannelRef.

The existing code for handling affinity by key (getChannelRef(affinityKey)) checks for a null ChannelRef and returns a NoopGcpClientCall in that case. A similar null check should be added here to gracefully handle the case where no channel is available.

      ChannelRef channelRef = getChannelRefByAffinityRef(channelAffinityRef);
      if (channelRef == null) {
        logger.log(Level.WARNING, log("No channel available for affinity ref, this should not happen"));
        return new GcpClientCall.NoopGcpClientCall<>(
            Status.UNAVAILABLE.withDescription(
                "No channel available for affinity ref, this should not happen"));
      }
      return new GcpClientCall.SimpleGcpClientCall<>(this, channelRef, methodDescriptor, callOptions);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed:
pickLeastBusyChannel never returns null and NoopGcpClientCall doesn't exist — same null-handling (none) as the sibling getChannelRef(key) path, so no change needed.

}

if (callOptions.getOption(DISABLE_AFFINITY_KEY)
|| DISABLE_AFFINITY_CTX_KEY.get(Context.current())) {
if (logger.isLoggable(Level.FINEST)) {
Expand Down Expand Up @@ -2314,6 +2412,7 @@ protected class ChannelRef {
private final AtomicLong okCalls = new AtomicLong();
private final AtomicLong errCalls = new AtomicLong();
private final ChannelStateMonitor channelStateMonitor;
private volatile boolean active = true;

protected ChannelRef(ManagedChannel channel) {
this(channel, 0, 0);
Expand Down Expand Up @@ -2343,6 +2442,18 @@ protected int getId() {
return channelId;
}

protected boolean isActive() {
return active;
}

private void activate() {
active = true;
}

private void deactivate() {
active = false;
}

protected void affinityCountIncr() {
int count = affinityCount.incrementAndGet();
maxAffinity.accumulateAndGet(count, Math::max);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef;
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AsyncResultSet.CallbackResponse;
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
Expand Down Expand Up @@ -56,8 +57,6 @@
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -209,19 +208,11 @@ private SingleReadContext(Builder builder) {
// of a channel hint. GAX will automatically choose a hint when used
// with a multiplexed session to perform a round-robin channel selection. We are
// passing a hint here to prefer random channel selection instead of doing GAX round-robin.
// Also signal unbind so the grpc-gcp affinity map entry is cleaned up once the call
// completes. The unbind flag is preserved on retries via prepareRetryOnDifferentGrpcChannel.
this.channelHint =
getChannelHintOptions(
session.getOptions(),
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE),
session.getSpanner().getOptions().isGrpcGcpExtensionEnabled());
if (this.channelHint != null) {
Map<SpannerRpc.Option, Object> mutable = new EnumMap<>(SpannerRpc.Option.class);
mutable.putAll(this.channelHint);
mutable.put(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE);
this.channelHint = Collections.unmodifiableMap(mutable);
}
}

@Override
Expand Down Expand Up @@ -256,12 +247,10 @@ TransactionSelector getTransactionSelector() {

@Override
boolean prepareRetryOnDifferentGrpcChannel() {
if (session.getIsMultiplexed() && channelHint.get(Option.CHANNEL_HINT) != null) {
long channelHintForTransaction = Option.CHANNEL_HINT.getLong(channelHint) + 1L;
channelHint =
optionMap(
SessionOption.channelHint(channelHintForTransaction),
SessionOption.unbindChannelHint());
ChannelAffinityRef channelAffinityRef =
Option.CHANNEL_ID_AFFINITY.getChannelAffinityRef(channelHint);
if (session.getIsMultiplexed() && channelAffinityRef != null) {
channelAffinityRef.useDifferentChannelOnNextCall();
return true;
}
return super.prepareRetryOnDifferentGrpcChannel();
Expand Down Expand Up @@ -537,10 +526,6 @@ public void close() {
} finally {
txnLock.unlock();
}
ByteString id = getTransactionId();
if (id != null && !id.isEmpty()) {
rpc.clearTransactionAndChannelAffinity(id, Option.CHANNEL_HINT.getLong(channelHint));
}
super.close();
}

Expand Down Expand Up @@ -1006,11 +991,12 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
static Map<SpannerRpc.Option, ?> getChannelHintOptions(
Map<SpannerRpc.Option, ?> channelHintForSession,
Long channelHintForTransaction,
boolean useTransactionHint) {
boolean grpcGcpEnabled) {
// grpc-gcp uses a per-operation/per-transaction random hint instead of reusing the session
// hint so requests distribute independently from session affinity.
if (useTransactionHint && channelHintForTransaction != null) {
return optionMap(SessionOption.channelHint(channelHintForTransaction));
// hint so requests distribute independently from session affinity. Use direct channel-ref
// affinity so grpc-gcp does not need affinity-key map entries for Spanner operations.
if (grpcGcpEnabled && channelHintForTransaction != null) {
return optionMap(SessionOption.channelAffinityRef(new ChannelAffinityRef()));
}
if (channelHintForSession != null) {
return channelHintForSession;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction
this.isRetryableInternalErrorPredicate = new IsRetryableInternalError();
this.channelHintOptions =
getChannelHintOptions(
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
session.getOptions(),
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE),
session.getSpanner().getOptions().isGrpcGcpExtensionEnabled());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
Expand Down Expand Up @@ -89,8 +90,8 @@ static SessionOption channelHint(long hint) {
return new SessionOption(SpannerRpc.Option.CHANNEL_HINT, hint);
}

static SessionOption unbindChannelHint() {
return new SessionOption(SpannerRpc.Option.UNBIND_CHANNEL_HINT, Boolean.TRUE);
static SessionOption channelAffinityRef(ChannelAffinityRef channelAffinityRef) {
return new SessionOption(SpannerRpc.Option.CHANNEL_ID_AFFINITY, channelAffinityRef);
}

SpannerRpc.Option rpcOption() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GcpManagedChannel.ChannelAffinityRef;
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
Expand Down Expand Up @@ -51,7 +52,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -332,8 +332,7 @@ private RequestOptions getRequestOptions(TransactionOption... transactionOptions
if (!spanner.getOptions().isGrpcGcpExtensionEnabled()) {
return getOptions();
}
return optionMap(
SessionOption.channelHint(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)));
return optionMap(SessionOption.channelAffinityRef(new ChannelAffinityRef()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/** Options for the Cloud Spanner service. */
public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
Expand Down Expand Up @@ -316,8 +315,7 @@ enum TracingFramework {

private static final Object lock = new Object();

@GuardedBy("lock")
private static TracingFramework activeTracingFramework;
private static volatile TracingFramework activeTracingFramework;

/** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */
public interface CallCredentialsProvider {
Expand Down Expand Up @@ -2239,12 +2237,8 @@ static void resetActiveTracingFramework() {
}

public static TracingFramework getActiveTracingFramework() {
synchronized (lock) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this change is unrelated, but since it was small perf change I included it here, this was on critical request path being shown up in benchmark mutex profile

if (activeTracingFramework == null) {
return TracingFramework.OPEN_CENSUS;
}
return activeTracingFramework;
}
TracingFramework framework = activeTracingFramework;
return framework == null ? TracingFramework.OPEN_CENSUS : framework;
}

/** Disables OpenCensus metrics. Disable OpenCensus metrics before creating Spanner client. */
Expand Down
Loading
Loading