-
Notifications
You must be signed in to change notification settings - Fork 1.1k
chore(spanner): use channel affinity #13231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bee2988
7021df9
3d07803
b971db0
3dfcb7e
36329d8
4b99a65
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -107,6 +107,40 @@ public class GcpManagedChannel extends ManagedChannel { | |||||||||||||||||||||
| public static final CallOptions.Key<Integer> CHANNEL_ID_KEY = | ||||||||||||||||||||||
| CallOptions.Key.create("GcpChannelId"); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** CallOptions key for sticky channel routing without affinity-key map state. */ | ||||||||||||||||||||||
| public static final CallOptions.Key<ChannelAffinityRef> CHANNEL_AFFINITY_REF_KEY = | ||||||||||||||||||||||
| CallOptions.Key.create("GcpChannelAffinityRef"); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** Opaque sticky channel reference for callers that should not depend on {@link ChannelRef}. */ | ||||||||||||||||||||||
| public static final class ChannelAffinityRef { | ||||||||||||||||||||||
| private static final int USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK = 1 << 31; | ||||||||||||||||||||||
| private static final int CHANNEL_ID_MASK = ~USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK; | ||||||||||||||||||||||
| private static final int NO_CHANNEL_ID = -1; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Single allocation hot-path state: | ||||||||||||||||||||||
| // * lower 31 bits: channel id + 1, or 0 when unset. | ||||||||||||||||||||||
| // * high bit: use a different active channel on the next call. | ||||||||||||||||||||||
| private final AtomicInteger state = new AtomicInteger(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** Forces the next RPC to prefer a different active channel if one is available. */ | ||||||||||||||||||||||
| public void useDifferentChannelOnNextCall() { | ||||||||||||||||||||||
| state.getAndUpdate(value -> value | USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private static int channelIdFromState(int state) { | ||||||||||||||||||||||
| int encodedChannelId = state & CHANNEL_ID_MASK; | ||||||||||||||||||||||
| return encodedChannelId == 0 ? NO_CHANNEL_ID : encodedChannelId - 1; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private static boolean useDifferentChannelOnNextCallFromState(int state) { | ||||||||||||||||||||||
| return (state & USE_DIFFERENT_CHANNEL_ON_NEXT_CALL_MASK) != 0; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private static int stateFromChannelId(int channelId) { | ||||||||||||||||||||||
| return (channelId + 1) & CHANNEL_ID_MASK; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @GuardedBy("this") | ||||||||||||||||||||||
| private Integer bindingIndex = -1; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -140,6 +174,7 @@ public class GcpManagedChannel extends ManagedChannel { | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| // The channel pool. | ||||||||||||||||||||||
| @VisibleForTesting final List<ChannelRef> channelRefs = new CopyOnWriteArrayList<>(); | ||||||||||||||||||||||
| private final Map<Integer, ChannelRef> channelIdToChannelRef = new ConcurrentHashMap<>(); | ||||||||||||||||||||||
| // A set of channels that we removed from the pool and wait for their RPCs to be completed before | ||||||||||||||||||||||
| // we can shut them down. | ||||||||||||||||||||||
| final Set<ChannelRef> removedChannelRefs = new HashSet<>(); | ||||||||||||||||||||||
|
|
@@ -352,6 +387,7 @@ private synchronized void checkScaleDown() { | |||||||||||||||||||||
| channelRef.getChannel().shutdown(); | ||||||||||||||||||||||
| // Remove channel from broken channels map. | ||||||||||||||||||||||
| fallbackMap.remove(channelRef.getId()); | ||||||||||||||||||||||
| channelIdToChannelRef.remove(channelRef.getId()); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -372,6 +408,7 @@ private void removeOldestChannels(int num) { | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| for (ChannelRef channelRef : channelsToRemove) { | ||||||||||||||||||||||
| channelRef.resetAffinityCount(); | ||||||||||||||||||||||
| channelRef.deactivate(); | ||||||||||||||||||||||
| if (channelRef.getState() == ConnectivityState.READY) { | ||||||||||||||||||||||
| decReadyChannels(false); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
@@ -1678,6 +1715,58 @@ protected ChannelRef getChannelRef(@Nullable String key) { | |||||||||||||||||||||
| return mappedChannel; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /** | ||||||||||||||||||||||
| * Pick a {@link ChannelRef} using a caller-owned reference instead of grpc-gcp's affinity map. | ||||||||||||||||||||||
| */ | ||||||||||||||||||||||
| protected ChannelRef getChannelRefByAffinityRef(ChannelAffinityRef affinityRef) { | ||||||||||||||||||||||
| maybeDynamicUpscale(); | ||||||||||||||||||||||
| while (true) { | ||||||||||||||||||||||
| int state = affinityRef.state.get(); | ||||||||||||||||||||||
| int channelId = ChannelAffinityRef.channelIdFromState(state); | ||||||||||||||||||||||
| boolean useDifferentChannel = | ||||||||||||||||||||||
| ChannelAffinityRef.useDifferentChannelOnNextCallFromState(state); | ||||||||||||||||||||||
| ChannelRef channelRef = | ||||||||||||||||||||||
| channelId == ChannelAffinityRef.NO_CHANNEL_ID | ||||||||||||||||||||||
| ? null | ||||||||||||||||||||||
| : channelIdToChannelRef.get(channelId); | ||||||||||||||||||||||
| if (!useDifferentChannel && channelRef != null && channelRef.isActive()) { | ||||||||||||||||||||||
| return channelRef; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| ChannelRef selectedChannelRef = | ||||||||||||||||||||||
| useDifferentChannel | ||||||||||||||||||||||
| ? pickLeastBusyChannelDifferentFrom(channelRef) | ||||||||||||||||||||||
| : pickLeastBusyChannel(/* forFallback= */ false); | ||||||||||||||||||||||
| if (affinityRef.state.compareAndSet( | ||||||||||||||||||||||
| state, ChannelAffinityRef.stateFromChannelId(selectedChannelRef.getId()))) { | ||||||||||||||||||||||
| return selectedChannelRef; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
Comment on lines
+1723
to
+1744
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The boolean useDifferentChannel = affinityRef.useDifferentChannelOnNextCall.getAndSet(false);
while (true) {
ChannelRef channelRef = affinityRef.channelRef.get();
if (!useDifferentChannel && channelRef != null && channelRef.isActive()) {
return channelRef;
}
ChannelRef selectedChannelRef =
useDifferentChannel
? pickLeastBusyChannelDifferentFrom(channelRef)
: pickLeastBusyChannel(/* forFallback= */ false);
if (affinityRef.channelRef.compareAndSet(channelRef, selectedChannelRef)) {
return selectedChannelRef;
}
useDifferentChannel = false;
}References
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both versions current/suggested consume the flag once on the first iteration and drop it after — the suggested rewrite is the same behavior with extra lines. |
||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private ChannelRef pickLeastBusyChannelDifferentFrom(@Nullable ChannelRef excludedChannelRef) { | ||||||||||||||||||||||
| ChannelRef channelRef = pickLeastBusyChannel(/* forFallback= */ false); | ||||||||||||||||||||||
| if (excludedChannelRef == null || channelRefs.size() <= 1) { | ||||||||||||||||||||||
| return channelRef; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| if (channelRef != excludedChannelRef && channelRef.isActive()) { | ||||||||||||||||||||||
| return channelRef; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| ChannelRef leastBusyChannelRef = null; | ||||||||||||||||||||||
| int leastBusyStreams = Integer.MAX_VALUE; | ||||||||||||||||||||||
| for (ChannelRef candidate : channelRefs) { | ||||||||||||||||||||||
| if (candidate == excludedChannelRef || !candidate.isActive()) { | ||||||||||||||||||||||
| continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| int streams = candidate.getActiveStreamsCount(); | ||||||||||||||||||||||
| if (leastBusyChannelRef == null || streams < leastBusyStreams) { | ||||||||||||||||||||||
| leastBusyChannelRef = candidate; | ||||||||||||||||||||||
| leastBusyStreams = streams; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
Comment on lines
+1757
to
+1766
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fallback loop in
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stale review |
||||||||||||||||||||||
| return leastBusyChannelRef == null ? channelRef : leastBusyChannelRef; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Create a new channel and add it to channelRefs. | ||||||||||||||||||||||
| // If we have a ready channel not in the pool that we wait for completing its RPCs, | ||||||||||||||||||||||
| // then re-use that channel instead. | ||||||||||||||||||||||
|
|
@@ -1688,6 +1777,8 @@ ChannelRef createNewChannel() { | |||||||||||||||||||||
| ChannelRef chRef = reusedChannelRef.get(); | ||||||||||||||||||||||
| channelRefs.add(chRef); | ||||||||||||||||||||||
| removedChannelRefs.remove(chRef); | ||||||||||||||||||||||
| channelIdToChannelRef.put(chRef.getId(), chRef); | ||||||||||||||||||||||
| chRef.activate(); | ||||||||||||||||||||||
| logger.finer(log("Channel %d reused.", chRef.getId())); | ||||||||||||||||||||||
| incReadyChannels(false); | ||||||||||||||||||||||
| maxChannels.accumulateAndGet(getNumberOfChannels(), Math::max); | ||||||||||||||||||||||
|
|
@@ -1696,6 +1787,7 @@ ChannelRef createNewChannel() { | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| ChannelRef channelRef = new ChannelRef(delegateChannelBuilder.build()); | ||||||||||||||||||||||
| channelRefs.add(channelRef); | ||||||||||||||||||||||
| channelIdToChannelRef.put(channelRef.getId(), channelRef); | ||||||||||||||||||||||
| logger.finer(log("Channel %d created.", channelRef.getId())); | ||||||||||||||||||||||
| maxChannels.accumulateAndGet(getNumberOfChannels(), Math::max); | ||||||||||||||||||||||
| return channelRef; | ||||||||||||||||||||||
|
|
@@ -1961,6 +2053,12 @@ public String authority() { | |||||||||||||||||||||
| @Override | ||||||||||||||||||||||
| public <ReqT, RespT> ClientCall<ReqT, RespT> newCall( | ||||||||||||||||||||||
| MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) { | ||||||||||||||||||||||
| ChannelAffinityRef channelAffinityRef = callOptions.getOption(CHANNEL_AFFINITY_REF_KEY); | ||||||||||||||||||||||
| if (channelAffinityRef != null) { | ||||||||||||||||||||||
| return new GcpClientCall.SimpleGcpClientCall<>( | ||||||||||||||||||||||
| this, getChannelRefByAffinityRef(channelAffinityRef), methodDescriptor, callOptions); | ||||||||||||||||||||||
|
Comment on lines
+2058
to
+2059
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The existing code for handling affinity by key ( ChannelRef channelRef = getChannelRefByAffinityRef(channelAffinityRef);
if (channelRef == null) {
logger.log(Level.WARNING, log("No channel available for affinity ref, this should not happen"));
return new GcpClientCall.NoopGcpClientCall<>(
Status.UNAVAILABLE.withDescription(
"No channel available for affinity ref, this should not happen"));
}
return new GcpClientCall.SimpleGcpClientCall<>(this, channelRef, methodDescriptor, callOptions);
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed: |
||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if (callOptions.getOption(DISABLE_AFFINITY_KEY) | ||||||||||||||||||||||
| || DISABLE_AFFINITY_CTX_KEY.get(Context.current())) { | ||||||||||||||||||||||
| if (logger.isLoggable(Level.FINEST)) { | ||||||||||||||||||||||
|
|
@@ -2314,6 +2412,7 @@ protected class ChannelRef { | |||||||||||||||||||||
| private final AtomicLong okCalls = new AtomicLong(); | ||||||||||||||||||||||
| private final AtomicLong errCalls = new AtomicLong(); | ||||||||||||||||||||||
| private final ChannelStateMonitor channelStateMonitor; | ||||||||||||||||||||||
| private volatile boolean active = true; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| protected ChannelRef(ManagedChannel channel) { | ||||||||||||||||||||||
| this(channel, 0, 0); | ||||||||||||||||||||||
|
|
@@ -2343,6 +2442,18 @@ protected int getId() { | |||||||||||||||||||||
| return channelId; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| protected boolean isActive() { | ||||||||||||||||||||||
| return active; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private void activate() { | ||||||||||||||||||||||
| active = true; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private void deactivate() { | ||||||||||||||||||||||
| active = false; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| protected void affinityCountIncr() { | ||||||||||||||||||||||
| int count = affinityCount.incrementAndGet(); | ||||||||||||||||||||||
| maxAffinity.accumulateAndGet(count, Math::max); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -109,7 +109,6 @@ | |
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
| import javax.annotation.concurrent.GuardedBy; | ||
|
|
||
| /** Options for the Cloud Spanner service. */ | ||
| public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> { | ||
|
|
@@ -316,8 +315,7 @@ enum TracingFramework { | |
|
|
||
| private static final Object lock = new Object(); | ||
|
|
||
| @GuardedBy("lock") | ||
| private static TracingFramework activeTracingFramework; | ||
| private static volatile TracingFramework activeTracingFramework; | ||
|
|
||
| /** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */ | ||
| public interface CallCredentialsProvider { | ||
|
|
@@ -2239,12 +2237,8 @@ static void resetActiveTracingFramework() { | |
| } | ||
|
|
||
| public static TracingFramework getActiveTracingFramework() { | ||
| synchronized (lock) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI: this change is unrelated, but since it was small perf change I included it here, this was on critical request path being shown up in benchmark mutex profile |
||
| if (activeTracingFramework == null) { | ||
| return TracingFramework.OPEN_CENSUS; | ||
| } | ||
| return activeTracingFramework; | ||
| } | ||
| TracingFramework framework = activeTracingFramework; | ||
| return framework == null ? TracingFramework.OPEN_CENSUS : framework; | ||
| } | ||
|
|
||
| /** Disables OpenCensus metrics. Disable OpenCensus metrics before creating Spanner client. */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If
selectedChannelRefisnull, this method can returnnull. The caller,newCall, passes this to theGcpClientCallconstructor, which will cause aNullPointerExceptionas it does not expect a nullChannelRef. This method should throw an exception if no channel is available instead of returningnullto maintain the expected invariant.References
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neither pickLeastBusyChannel nor pickLeastBusyChannelDifferentFrom can return null — empty-pool case throws on channelRefs.get(0) long before this point, so the NPE path doesn't exist.