Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b33d818
feat: add async support for RAB to ServiceAccountCredentials and impl…
nbayati May 7, 2026
80529c6
Add unit tests for the async RAB implementation.
nbayati May 7, 2026
73617c5
fix async unit tests
nbayati May 7, 2026
9e43ecc
Update unit tests to accept both mtls and standard allowedLocations e…
nbayati May 7, 2026
4d5787b
test: verify iam endpoint constant resolution in mTLS environments
nbayati May 7, 2026
e3f8e90
refactor: introduce _after_refresh hook in Credentials base class to …
nbayati May 7, 2026
95af8e5
add __setstate__ to the base RAB class for backward compatibility
nbayati May 8, 2026
1b4270b
Implement RAB support for jwt credentials
nbayati May 8, 2026
0478330
fix lint errors
nbayati May 11, 2026
9650ac7
fix: preserve refresh manager type when copying RAB manager
nbayati May 11, 2026
5b6e715
refactor(auth): optimize RAB manager copy logic to only share boundar…
nbayati May 14, 2026
2729e70
fix(auth): enhance client lookup robustness with defensive checks and…
nbayati May 14, 2026
e1ee432
refactor(auth): centralize async RAB lifecycle via _after_refresh bas…
nbayati May 14, 2026
39fbe10
feat: add pickling support for _AsyncRegionalAccessBoundaryRefreshMan…
nbayati May 14, 2026
d20d7ba
revert changes to the _token_endpoint_request_no_throw to keep PR foc…
nbayati May 14, 2026
c1b946e
fix(auth): align async client with AIO transport spec and add unit tests
nbayati May 14, 2026
75b3231
test(auth): assert closed session safety in async RAB refresh and fix…
nbayati May 15, 2026
a952527
docs(auth): clarify async RAB transport requirements in docstrings
nbayati May 15, 2026
be67ab6
feat(auth): support async blocking RAB lookups and add support to asy…
nbayati May 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions packages/google-auth/google/auth/_credentials_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import abc
import inspect

from google.auth import _regional_access_boundary_utils
from google.auth import credentials


Expand Down Expand Up @@ -64,8 +65,28 @@ async def before_request(self, request, method, url, headers):
await self.refresh(request)
else:
self.refresh(request)

if inspect.iscoroutinefunction(self._after_refresh):
await self._after_refresh(request, method, url, headers)
else:
self._after_refresh(request, method, url, headers)

self.apply(headers)

def _after_refresh(self, request, method, url, headers):
"""Hook for subclasses to perform actions after refresh but before
applying credentials to headers.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
method (str): The request's HTTP method or the RPC method being
invoked.
url (str): The request's URI or the RPC service's URI.
headers (Mapping[str, str]): The request's headers.
"""
pass


class CredentialsWithQuotaProject(credentials.CredentialsWithQuotaProject):
"""Abstract base for credentials supporting ``with_quota_project`` factory"""
Expand Down Expand Up @@ -169,3 +190,74 @@ def with_scopes_if_required(credentials, scopes):

class Signing(credentials.Signing, metaclass=abc.ABCMeta):
"""Interface for credentials that can cryptographically sign messages."""


class CredentialsWithRegionalAccessBoundary(
Credentials, credentials.CredentialsWithRegionalAccessBoundary
):
"""Async base for credentials supporting regional access boundary configuration."""

def __init__(self):
super().__init__()
self._rab_manager.refresh_manager = (
_regional_access_boundary_utils._AsyncRegionalAccessBoundaryRefreshManager()
)

def __setstate__(self, state):
super().__setstate__(state)
self._rab_manager.refresh_manager = (
_regional_access_boundary_utils._AsyncRegionalAccessBoundaryRefreshManager()
)

async def _after_refresh(self, request, method, url, headers):
"""Triggers the Regional Access Boundary lookup asynchronously if necessary."""
await self._maybe_start_regional_access_boundary_refresh_async(request, url)

async def _maybe_start_regional_access_boundary_refresh_async(self, request, url):
"""Starts a background refresh or performs a blocking refresh asynchronously.

Args:
request (google.auth.aio.transport.Request): The object used to make
HTTP requests.
url (str): The URL of the request.
"""
# Do not perform a lookup if the request is for a regional endpoint.
if self._is_regional_endpoint(url):
return

# A refresh is only needed if the feature is enabled.
if not self._is_regional_access_boundary_lookup_required():
return

# Trigger background or blocking refresh if needed.
await self._rab_manager.maybe_start_refresh_async(self, request)

async def _lookup_regional_access_boundary(self, request, fail_fast=False):
"""Calls the Regional Access Boundary lookup API asynchronously.

Args:
request (google.auth.aio.transport.Request): The object used to make
HTTP requests.
fail_fast (bool): Whether the lookup should fail fast (short timeout, no retries).

Returns:
Optional[Dict[str, str]]: The Regional Access Boundary information
returned by the lookup API, or None if the lookup failed.
"""
url_builder = self._build_regional_access_boundary_lookup_url
if inspect.iscoroutinefunction(url_builder):
url = await url_builder(request=request)
else:
url = url_builder(request=request)

if not url:
return None

headers = {}
self._apply(headers)

from google.oauth2 import _client_async

return await _client_async._lookup_regional_access_boundary(
request, url, headers=headers, fail_fast=fail_fast
)
13 changes: 12 additions & 1 deletion packages/google-auth/google/auth/_jwt_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ def decode(token, certs=None, verify=True, audience=None):


class Credentials(
jwt.Credentials, _credentials_async.Signing, _credentials_async.Credentials
jwt.Credentials,
_credentials_async.Signing,
_credentials_async.CredentialsWithRegionalAccessBoundary,
):
"""Credentials that use a JWT as the bearer token.

Expand Down Expand Up @@ -142,6 +144,15 @@ class Credentials(
new_credentials = credentials.with_claims(audience=new_audience)
"""

def __setstate__(self, state):
"""Restores the credential state and ensures the async refresh manager is attached."""
super().__setstate__(state)
from google.auth import _regional_access_boundary_utils

self._rab_manager.refresh_manager = (
_regional_access_boundary_utils._AsyncRegionalAccessBoundaryRefreshManager()
)


class OnDemandCredentials(
jwt.OnDemandCredentials, _credentials_async.Signing, _credentials_async.Credentials
Expand Down
129 changes: 123 additions & 6 deletions packages/google-auth/google/auth/_regional_access_boundary_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Utilities for Regional Access Boundary management."""

import asyncio
import copy
import datetime
import functools
Expand Down Expand Up @@ -170,12 +171,11 @@ def apply_headers(self, headers):
else:
headers.pop(_REGIONAL_ACCESS_BOUNDARY_HEADER, None)

def maybe_start_refresh(self, credentials, request):
"""Starts a background thread to refresh the Regional Access Boundary if needed.
def _should_refresh(self):
"""Checks if the Regional Access Boundary data needs a refresh and is not in cooldown.

Args:
credentials (google.auth.credentials.Credentials): The credentials to refresh.
request (google.auth.transport.Request): The object used to make HTTP requests.
Returns:
bool: True if a refresh is required, False otherwise.
"""
rab_data = self._data

Expand All @@ -186,10 +186,22 @@ def maybe_start_refresh(self, credentials, request):
and _helpers.utcnow()
< (rab_data.expiry - REGIONAL_ACCESS_BOUNDARY_REFRESH_THRESHOLD)
):
return
return False

# Don't start a new refresh if the cooldown is still in effect.
if rab_data.cooldown_expiry and _helpers.utcnow() < rab_data.cooldown_expiry:
return False

return True

def maybe_start_refresh(self, credentials, request):
"""Starts a background thread to refresh the Regional Access Boundary if needed.

Args:
credentials (google.auth.credentials.Credentials): The credentials to refresh.
request (google.auth.transport.Request): The object used to make HTTP requests.
"""
if not self._should_refresh():
return

# If all checks pass, start the background refresh.
Expand All @@ -198,6 +210,22 @@ def maybe_start_refresh(self, credentials, request):
else:
self.refresh_manager.start_refresh(credentials, request, self)

async def maybe_start_refresh_async(self, credentials, request):
"""Starts a background refresh or performs a blocking refresh asynchronously.

Args:
credentials (google.auth.credentials.Credentials): The credentials to refresh.
request (google.auth.aio.transport.Request): The object used to make HTTP requests.
"""
if not self._should_refresh():
return

# If all checks pass, start the refresh.
if self._use_blocking_regional_access_boundary_lookup:
await self.start_blocking_refresh_async(credentials, request)
else:
self.refresh_manager.start_refresh(credentials, request, self)

def start_blocking_refresh(self, credentials, request):
"""Initiates a blocking lookup of the Regional Access Boundary.

Expand Down Expand Up @@ -227,6 +255,37 @@ def start_blocking_refresh(self, credentials, request):

self.process_regional_access_boundary_info(regional_access_boundary_info)

async def start_blocking_refresh_async(self, credentials, request):
"""Initiates a blocking lookup of the Regional Access Boundary asynchronously.

If the lookup raises an exception, it is caught and logged as a warning,
and the lookup is treated as a failure (entering cooldown). Exceptions
are not propagated to the caller.

Args:
credentials (google.auth.credentials.Credentials): The credentials to refresh.
request (google.auth.aio.transport.Request): The object used to make HTTP requests.
"""
try:
# The fail_fast parameter is set to True to ensure we don't block the calling
# thread for too long. This will do two things: 1) set a timeout to 3s
# instead of the default 120s and 2) ensure we do not retry at all
regional_access_boundary_info = (
await credentials._lookup_regional_access_boundary(
request, fail_fast=True
)
)
except Exception as e:
if _helpers.is_logging_enabled(_LOGGER):
_LOGGER.warning(
"Regional Access Boundary lookup raised an exception: %s",
e,
exc_info=True,
)
regional_access_boundary_info = None

self.process_regional_access_boundary_info(regional_access_boundary_info)

def process_regional_access_boundary_info(self, regional_access_boundary_info):
"""Processes the regional access boundary info and updates the state.

Expand Down Expand Up @@ -384,3 +443,61 @@ def start_refresh(self, credentials, request, rab_manager):
credentials, copied_request, rab_manager
)
self._worker.start()


class _AsyncRegionalAccessBoundaryRefreshManager(object):
"""Manages a task for background refreshing of the Regional Access Boundary in async flows."""

def __init__(self):
self._lock = threading.Lock()
self._worker_task = None

def __getstate__(self):
"""Pickle helper that excludes the un-picklable _lock and _worker_task attributes from serialization."""
state = self.__dict__.copy()
state["_lock"] = None
state["_worker_task"] = None
return state

def __setstate__(self, state):
"""Pickle helper that restores state and re-initializes the _lock and _worker_task attributes."""
self.__dict__.update(state)
self._lock = threading.Lock()
self._worker_task = None

def start_refresh(self, credentials, request, rab_manager):
"""
Starts a background task to refresh the Regional Access Boundary if one is not already running.

Args:
credentials (CredentialsWithRegionalAccessBoundary): The credentials
to refresh.
request (google.auth.aio.transport.Request): The object used to make
HTTP requests.
rab_manager (_RegionalAccessBoundaryManager): The manager container to update.
"""
with self._lock:
if self._worker_task and not self._worker_task.done():
# A refresh is already in progress.
return

async def _worker():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the synchronous refresh manager which safely deepcopies the transport (copied_request = copy.deepcopy(request)), the async manager passes the exact same request instance directly into the background coroutine task. Because start_refresh is invoked inside before_request, the main application coroutine immediately proceeds to make its actual service API HTTP call using the exact same request transport while the background task is concurrently using it, risking HTTP state corruption or interleaved headers.

Additionally, spawning asyncio.create_task(_worker()) without tracking cancellation hooks upon client session closure can potentially cause dangling tasks that raise RuntimeError: Session is closed when executing against closed client sessions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both concerns are valid, but safe under the hood:

  1. Deepcopying the Transport is impossible and not needed in async
    The async transport object (e.g., aiohttp_requests.Request) contains an active aiohttp.ClientSession with open TCP sockets. Attempting to copy.deepcopy(request) will instantly raise a TypeError: cannot pickle 'ClientSession' object and crash the application at runtime. Unlike synchronous transports running on separate OS threads, async HTTP clients (like aiohttp or httpx) are natively designed to handle concurrent requests sharing the same connection session. All request-specific states (headers, payloads) are stored in localized coroutine call stack frames, preventing any HTTP state corruption or interleaving.

  2. Session closure and dangling tasks are handled safely
    The background worker is a single-shot asyncio task that executes exactly one lookup request and then immediately terminates and gets garbage-collected.
    If the user's application closes the underlying client session while a background task is still running, the resulting RuntimeError: Session is closed is safely caught by the worker's generic except Exception as e: block. It logs a warning, fails open cleanly, and does not raise an unhandled exception or crash the application.

I think no code changes are required here, as the current design is fully protected.

try:
# credentials._lookup_regional_access_boundary should be async in the async creds class
regional_access_boundary_info = (
await credentials._lookup_regional_access_boundary(request)
)
except Exception as e:
if _helpers.is_logging_enabled(_LOGGER):
_LOGGER.warning(
"Asynchronous Regional Access Boundary lookup raised an exception: %s",
e,
exc_info=True,
)
regional_access_boundary_info = None

rab_manager.process_regional_access_boundary_info(
regional_access_boundary_info
)

self._worker_task = asyncio.create_task(_worker())
Loading
Loading