-
Notifications
You must be signed in to change notification settings - Fork 105
WIP: use a direct access checker in client #2802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
sushanb
wants to merge
3
commits into
main
Choose a base branch
from
direct_access
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
50 changes: 50 additions & 0 deletions
50
...a/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableDirectAccessMetricsRecorder.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| /* | ||
| * Copyright 2026 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.google.cloud.bigtable.data.v2.stub.metrics; | ||
|
|
||
| import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.DIRECT_ACCESS_COMPATIBLE_NAME; | ||
| import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME; | ||
|
|
||
| import com.google.api.core.InternalApi; | ||
| import io.opentelemetry.api.OpenTelemetry; | ||
| import io.opentelemetry.api.metrics.LongGauge; | ||
| import io.opentelemetry.api.metrics.Meter; | ||
|
|
||
| @InternalApi | ||
| public class BigtableDirectAccessMetricsRecorder implements DirectAccessMetricsRecorder { | ||
| private final LongGauge compatibleGauge; | ||
|
|
||
| public BigtableDirectAccessMetricsRecorder(OpenTelemetry openTelemetry) { | ||
| Meter meter = openTelemetry.getMeter(METER_NAME); | ||
| this.compatibleGauge = | ||
| meter | ||
| .gaugeBuilder(DIRECT_ACCESS_COMPATIBLE_NAME) | ||
| .ofLongs() | ||
| .setDescription( | ||
| "Reports 1 if the environment is eligible for DirectPath, 0 otherwise. Based on an attempt at startup.") | ||
| .setUnit("1") | ||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
| public void recordEligibility(boolean isEligible) { | ||
| if (compatibleGauge != null) { | ||
| // Record 1 if eligible, 0 otherwise | ||
| compatibleGauge.set(isEligible ? 1L : 0L); | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
27 changes: 27 additions & 0 deletions
27
...main/java/com/google/cloud/bigtable/data/v2/stub/metrics/DirectAccessMetricsRecorder.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| /* | ||
| * Copyright 2026 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.google.cloud.bigtable.data.v2.stub.metrics; | ||
|
|
||
| import com.google.api.core.InternalApi; | ||
|
|
||
| @InternalApi | ||
| public interface DirectAccessMetricsRecorder { | ||
| /** Records whether the underlying transport was eligible for Direct Access. */ | ||
| void recordEligibility(boolean isEligible); | ||
|
|
||
| DirectAccessMetricsRecorder NOOP_DIRECT_ACCESS_METRIC_RECORDER = isEligible -> {}; | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
157 changes: 157 additions & 0 deletions
157
...gtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableDirectAccessChecker.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,157 @@ | ||
| /* | ||
| * Copyright 2026 Google LLC | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.google.cloud.bigtable.gaxx.grpc; | ||
|
|
||
| import com.google.api.core.InternalApi; | ||
| import com.google.cloud.bigtable.data.v2.stub.BigtableChannelPrimer; | ||
| import io.grpc.CallOptions; | ||
| import io.grpc.Channel; | ||
| import io.grpc.ClientCall; | ||
| import io.grpc.ClientInterceptor; | ||
| import io.grpc.ClientInterceptors; | ||
| import io.grpc.ForwardingClientCall; | ||
| import io.grpc.ForwardingClientCallListener; | ||
| import io.grpc.ManagedChannel; | ||
| import io.grpc.Metadata; | ||
| import io.grpc.MethodDescriptor; | ||
| import io.grpc.alts.AltsContextUtil; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.logging.Logger; | ||
|
|
||
| @InternalApi | ||
| public class BigtableDirectAccessChecker implements DirectAccessChecker { | ||
| private static final Logger LOG = Logger.getLogger(BigtableDirectAccessChecker.class.getName()); | ||
| private final ChannelPrimer channelPrimer; | ||
|
|
||
| public BigtableDirectAccessChecker(ChannelPrimer channelPrimer) { | ||
| this.channelPrimer = channelPrimer; | ||
| } | ||
|
|
||
| /// Performs a request on the provided channel to check for Direct Access eligibility. | ||
| @Override | ||
| public boolean check(ManagedChannel channel) { | ||
| // Return false in case channelPrime is not an instance, rare | ||
| if (!(channelPrimer instanceof BigtableChannelPrimer)) { | ||
| return false; | ||
| } | ||
|
|
||
| BigtableChannelPrimer primer = (BigtableChannelPrimer) channelPrimer; | ||
| final AtomicBoolean isDirectAccessEligible = new AtomicBoolean(false); | ||
|
|
||
| // Create the interceptor to check the headers | ||
| ClientInterceptor altsInterceptor = | ||
| new ClientInterceptor() { | ||
| @Override | ||
| public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( | ||
| MethodDescriptor<ReqT, RespT> methodDescriptor, | ||
| CallOptions callOptions, | ||
| Channel next) { | ||
|
|
||
| // Capture the actual ClientCall to access its attributes later | ||
| final ClientCall<ReqT, RespT> thisCall = next.newCall(methodDescriptor, callOptions); | ||
|
|
||
| return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(thisCall) { | ||
| @Override | ||
| public void start(Listener<RespT> responseListener, Metadata headers) { | ||
|
|
||
| // Wrap the listener to intercept the response headers | ||
| Listener<RespT> forwardingListener = | ||
| new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>( | ||
| responseListener) { | ||
| @Override | ||
| public void onHeaders(Metadata responseHeaders) { | ||
| boolean altsCheckPassed = false; | ||
| try { | ||
| // Verify ALTS context is present | ||
| if (AltsContextUtil.check(thisCall.getAttributes())) { | ||
| altsCheckPassed = true; | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warning("direct access check failed: " + e.getMessage()); | ||
| } | ||
| if (altsCheckPassed) { | ||
| isDirectAccessEligible.set(true); | ||
| } | ||
|
|
||
| super.onHeaders(responseHeaders); | ||
| } | ||
| }; | ||
|
|
||
| super.start(forwardingListener, headers); | ||
| } | ||
| }; | ||
| } | ||
| }; | ||
|
|
||
| try { | ||
| // Wrap the channel with our custom ALTS interceptor | ||
| Channel interceptedChannel = ClientInterceptors.intercept(channel, altsInterceptor); | ||
|
|
||
| ManagedChannel wrappedManagedChannel = | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just FYI: welp, this is awkward -- ChannelPrimer should operate on Channel and not ManagedChannel, but I'm not sure if we can change this now. If we could change it, we wouldn't need this wrapper. |
||
| new ManagedChannel() { | ||
| @Override | ||
| public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall( | ||
| MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) { | ||
| // Delegate RPCs to the intercepted channel! | ||
| return interceptedChannel.newCall(methodDescriptor, callOptions); | ||
| } | ||
|
|
||
| @Override | ||
| public String authority() { | ||
| return channel.authority(); | ||
| } | ||
|
|
||
| // Delegate all lifecycle methods to the original ManagedChannel | ||
| @Override | ||
| public ManagedChannel shutdown() { | ||
| return channel.shutdown(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isShutdown() { | ||
| return channel.isShutdown(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isTerminated() { | ||
| return channel.isTerminated(); | ||
| } | ||
|
|
||
| @Override | ||
| public ManagedChannel shutdownNow() { | ||
| return channel.shutdownNow(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean awaitTermination(long timeout, TimeUnit unit) | ||
| throws InterruptedException { | ||
| return channel.awaitTermination(timeout, unit); | ||
| } | ||
| }; | ||
|
|
||
| // Delegate the actual RPC execution to the primer. | ||
| // This synchronously sends the PingAndWarm request and triggers the onHeaders callback. | ||
| primer.primeChannel(wrappedManagedChannel); | ||
|
|
||
| } catch (Exception e) { | ||
| LOG.warning("The direct access probe failed to execute: " + e.getMessage()); | ||
| } | ||
|
|
||
| return isDirectAccessEligible.get(); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, gauges cannot be aggregated. Have you tried running a few clients and run a query to graph the number of eligible clients? Also, how can we know the number of not eligible clients if this reports 0?
Maybe worth converting to UpDownCounter with a label signifying eligibility.