Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
import com.google.bigtable.v2.InstanceName;
import com.google.cloud.bigtable.data.v2.internal.JwtCredentialsWithAudience;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableDirectAccessMetricsRecorder;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.ChannelPoolMetricsTracer;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider;
import com.google.cloud.bigtable.data.v2.stub.metrics.DirectAccessMetricsRecorder;
import com.google.cloud.bigtable.data.v2.stub.metrics.Util;
import com.google.cloud.bigtable.gaxx.grpc.BigtableTransportChannelProvider;
import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer;
Expand Down Expand Up @@ -167,12 +169,19 @@ public static BigtableClientContext create(
builder.getHeaderProvider().getHeaders());
}

DirectAccessMetricsRecorder directAccessMetricsRecorder =
DirectAccessMetricsRecorder.NOOP_DIRECT_ACCESS_METRIC_RECORDER;
if (builtinOtel != null) {
directAccessMetricsRecorder = new BigtableDirectAccessMetricsRecorder(builtinOtel);
}

BigtableTransportChannelProvider btTransportProvider =
BigtableTransportChannelProvider.create(
transportProvider.build(),
channelPrimer,
channelPoolMetricsTracer,
backgroundExecutor);
backgroundExecutor,
directAccessMetricsRecorder);

builder.setTransportChannelProvider(btTransportProvider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,25 @@ public boolean areInternalMetricsEnabled() {
return areInternalMetricsEnabled;
}

/** Applies common pool, message size, and keep-alive settings to the provided builder. */
private static InstantiatingGrpcChannelProvider.Builder commonTraits(
InstantiatingGrpcChannelProvider.Builder builder) {
return builder
.setChannelPoolSettings(
ChannelPoolSettings.builder()
.setInitialChannelCount(10)
.setMinRpcsPerChannel(1)
// Keep it conservative as we scale the channel size every 1min
// and delta is 2 channels.
.setMaxRpcsPerChannel(25)
.setPreemptiveRefreshEnabled(true)
.build())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval
.setKeepAliveTimeout(
Duration.ofSeconds(10)); // wait this long before considering the connection dead
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
InstantiatingGrpcChannelProvider.Builder grpcTransportProviderBuilder =
Expand All @@ -256,20 +275,25 @@ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProvi
Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS));
}
}
return grpcTransportProviderBuilder
.setChannelPoolSettings(
ChannelPoolSettings.builder()
.setInitialChannelCount(10)
.setMinRpcsPerChannel(1)
// Keep it conservative as we scale the channel size every 1min
// and delta is 2 channels.
.setMaxRpcsPerChannel(25)
.setPreemptiveRefreshEnabled(true)
.build())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval
.setKeepAliveTimeout(
Duration.ofSeconds(10)); // wait this long before considering the connection dead
return commonTraits(grpcTransportProviderBuilder);
}

/** Returns a builder for the Direct Access Transport Provider. */
/** Applies Direct Access traits (DirectPath & ALTS) to an existing builder. */
public static InstantiatingGrpcChannelProvider.Builder applyDirectAccessTraits(
InstantiatingGrpcChannelProvider.Builder builder) {

builder
.setAttemptDirectPathXds()
.setAttemptDirectPath(true)
.setAllowNonDefaultServiceAccount(true);

if (!DIRECT_PATH_BOUND_TOKEN_DISABLED) {
builder.setAllowHardBoundTokenTypes(
Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS));
}

return builder;
}

@SuppressWarnings("WeakerAccess")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.data.v2.stub.metrics;

import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.DIRECT_ACCESS_COMPATIBLE_NAME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME;

import com.google.api.core.InternalApi;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.LongGauge;
import io.opentelemetry.api.metrics.Meter;

@InternalApi
public class BigtableDirectAccessMetricsRecorder implements DirectAccessMetricsRecorder {
private final LongGauge compatibleGauge;

public BigtableDirectAccessMetricsRecorder(OpenTelemetry openTelemetry) {
Meter meter = openTelemetry.getMeter(METER_NAME);
this.compatibleGauge =
meter
.gaugeBuilder(DIRECT_ACCESS_COMPATIBLE_NAME)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, gauges cannot be aggregated. Have you tried running a few clients and run a query to graph the number of eligible clients? Also, how can we know the number of not eligible clients if this reports 0?
Maybe worth converting to UpDownCounter with a label signifying eligibility.

.ofLongs()
.setDescription(
"Reports 1 if the environment is eligible for DirectPath, 0 otherwise. Based on an attempt at startup.")
.setUnit("1")
.build();
}

@Override
public void recordEligibility(boolean isEligible) {
if (compatibleGauge != null) {
// Record 1 if eligible, 0 otherwise
compatibleGauge.set(isEligible ? 1L : 0L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class BuiltinMetricsConstants {
static final String BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME =
"batch_write_flow_control_target_qps";
static final String BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME = "batch_write_flow_control_factor";
static final String DIRECT_ACCESS_COMPATIBLE_NAME = "direct_access/compatible";

// Start allow list of metrics that will be exported as internal
public static final Map<String, Set<String>> GRPC_METRICS =
Expand Down Expand Up @@ -168,7 +169,11 @@ public class BuiltinMetricsConstants {
.build();

public static final Set<String> INTERNAL_METRICS =
ImmutableSet.of(PER_CONNECTION_ERROR_COUNT_NAME, OUTSTANDING_RPCS_PER_CHANNEL_NAME).stream()
ImmutableSet.of(
PER_CONNECTION_ERROR_COUNT_NAME,
OUTSTANDING_RPCS_PER_CHANNEL_NAME,
DIRECT_ACCESS_COMPATIBLE_NAME)
.stream()
.map(m -> METER_NAME + m)
.collect(ImmutableSet.toImmutableSet());
// End allow list of metrics that will be exported
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;

@InternalApi
public interface DirectAccessMetricsRecorder {
/** Records whether the underlying transport was eligible for Direct Access. */
void recordEligibility(boolean isEligible);

DirectAccessMetricsRecorder NOOP_DIRECT_ACCESS_METRIC_RECORDER = isEligible -> {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ public static BigtableChannelPool create(
BigtableChannelPoolSettings settings,
ChannelFactory channelFactory,
ChannelPrimer channelPrimer,
ScheduledExecutorService backgroundExecutor)
ScheduledExecutorService backgroundExecutor,
@Nullable ManagedChannel preCreatedChannel)
throws IOException {
return new BigtableChannelPool(settings, channelFactory, channelPrimer, backgroundExecutor);
return new BigtableChannelPool(
settings, channelFactory, channelPrimer, backgroundExecutor, preCreatedChannel);
}

/**
Expand All @@ -99,7 +101,8 @@ public static BigtableChannelPool create(
BigtableChannelPoolSettings settings,
ChannelFactory channelFactory,
ChannelPrimer channelPrimer,
ScheduledExecutorService executor)
ScheduledExecutorService executor,
@Nullable ManagedChannel preCreatedChannel)
throws IOException {
this.settings = settings;
this.channelFactory = channelFactory;
Expand All @@ -110,8 +113,12 @@ public static BigtableChannelPool create(
this.channelPoolHealthChecker.start();

ImmutableList.Builder<Entry> initialListBuilder = ImmutableList.builder();

for (int i = 0; i < settings.getInitialChannelCount(); i++) {
int channelsToCreate = settings.getInitialChannelCount();
if (preCreatedChannel != null) {
initialListBuilder.add(new Entry(preCreatedChannel));
channelsToCreate--;
}
for (int i = 0; i < channelsToCreate; i++) {
ManagedChannel newChannel = channelFactory.createSingleChannel();
channelPrimer.primeChannel(newChannel);
initialListBuilder.add(new Entry(newChannel));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.gaxx.grpc;

import com.google.api.core.InternalApi;
import com.google.cloud.bigtable.data.v2.stub.BigtableChannelPrimer;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.alts.AltsContextUtil;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

@InternalApi
public class BigtableDirectAccessChecker implements DirectAccessChecker {
private static final Logger LOG = Logger.getLogger(BigtableDirectAccessChecker.class.getName());
private final ChannelPrimer channelPrimer;

public BigtableDirectAccessChecker(ChannelPrimer channelPrimer) {
this.channelPrimer = channelPrimer;
}

/// Performs a request on the provided channel to check for Direct Access eligibility.
@Override
public boolean check(ManagedChannel channel) {
// Return false in case channelPrime is not an instance, rare
if (!(channelPrimer instanceof BigtableChannelPrimer)) {
return false;
}

BigtableChannelPrimer primer = (BigtableChannelPrimer) channelPrimer;
final AtomicBoolean isDirectAccessEligible = new AtomicBoolean(false);

// Create the interceptor to check the headers
ClientInterceptor altsInterceptor =
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
Channel next) {

// Capture the actual ClientCall to access its attributes later
final ClientCall<ReqT, RespT> thisCall = next.newCall(methodDescriptor, callOptions);

return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(thisCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {

// Wrap the listener to intercept the response headers
Listener<RespT> forwardingListener =
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onHeaders(Metadata responseHeaders) {
boolean altsCheckPassed = false;
try {
// Verify ALTS context is present
if (AltsContextUtil.check(thisCall.getAttributes())) {
altsCheckPassed = true;
}
} catch (Exception e) {
LOG.warning("direct access check failed: " + e.getMessage());
}
if (altsCheckPassed) {
isDirectAccessEligible.set(true);
}

super.onHeaders(responseHeaders);
}
};

super.start(forwardingListener, headers);
}
};
}
};

try {
// Wrap the channel with our custom ALTS interceptor
Channel interceptedChannel = ClientInterceptors.intercept(channel, altsInterceptor);

ManagedChannel wrappedManagedChannel =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI: welp, this is awkward -- ChannelPrimer should operate on Channel and not ManagedChannel, but I'm not sure if we can change this now. If we could change it, we wouldn't need this wrapper.

new ManagedChannel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
// Delegate RPCs to the intercepted channel!
return interceptedChannel.newCall(methodDescriptor, callOptions);
}

@Override
public String authority() {
return channel.authority();
}

// Delegate all lifecycle methods to the original ManagedChannel
@Override
public ManagedChannel shutdown() {
return channel.shutdown();
}

@Override
public boolean isShutdown() {
return channel.isShutdown();
}

@Override
public boolean isTerminated() {
return channel.isTerminated();
}

@Override
public ManagedChannel shutdownNow() {
return channel.shutdownNow();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return channel.awaitTermination(timeout, unit);
}
};

// Delegate the actual RPC execution to the primer.
// This synchronously sends the PingAndWarm request and triggers the onHeaders callback.
primer.primeChannel(wrappedManagedChannel);

} catch (Exception e) {
LOG.warning("The direct access probe failed to execute: " + e.getMessage());
}

return isDirectAccessEligible.get();
}
}
Loading