feat(auth): implement regional access boundary support for standalone JWT and async service accounts#17025
feat(auth): implement regional access boundary support for standalone JWT and async service accounts#17025nbayati wants to merge 19 commits into
Conversation
…ement async refresh manager
…support Regional Access Boundary logic
There was a problem hiding this comment.
Code Review
This pull request implements asynchronous support for Regional Access Boundary (RAB) management, including background refresh tasks and mTLS endpoint support. Key changes include the addition of _AsyncRegionalAccessBoundaryRefreshManager, updates to JWT and service account credentials to handle RAB state during cloning and serialization, and comprehensive test coverage for these new flows. However, a critical issue was identified where the refresh method in google/auth/jwt.py was renamed to _perform_refresh_token, which will break token updates as the base class expects a refresh method. Additionally, a typo was found in a test assertion URL.
| except asyncio.TimeoutError: | ||
| return False, {}, False | ||
|
|
||
| response_body1 = await response.content() |
There was a problem hiding this comment.
This is an async_generator, not an awaitable method. I think this crashes as written. Should call await response.read() instead.
Let's also add a test that will catch this, and cover in try/catch block
There was a problem hiding this comment.
I actually was able to run the code as-is and verify that it works. We have the same pattern at the top of this file in _token_endpoint_request_no_throw method.

Though I did dig a bit to see why it's working and this is my current understanding:
in google-auth the async Request transport adapter wraps the raw response in a custom _CombinedResponse class (defined in google/auth/transport/_aiohttp_requests.py).
The _CombinedResponse class explicitly implements content as an asynchronous method to mirror the synchronous requests interface. Because it is a coroutine method returning the decoded byte body, await response.content() works as properly.
I renamed the variable to response_bytes though, to make it a bit more intuitive.
There was a problem hiding this comment.
Ty for validating. If this is only intended to support the legacy _aiohttp_requests transport then great, but we do then need to update docstrings (currently says google.auth.aio.transport.Request)
There was a problem hiding this comment.
Done. Updated docstrings to document support for both standard (read()) and legacy (content()) async responses.
| try: | ||
| if timeout: | ||
| response = await asyncio.wait_for( | ||
| request(method="GET", url=url, headers=headers), timeout=timeout |
There was a problem hiding this comment.
Should timeout be passed to the request here?
There was a problem hiding this comment.
Good catch! I've added the timeout to the request.
| ) | ||
| else: | ||
| response = await request(method="GET", url=url, headers=headers) | ||
| except asyncio.TimeoutError: |
There was a problem hiding this comment.
What happens if the request fails? I think we may need to catch additional errors here (timeout error, transport error..)
There was a problem hiding this comment.
Good catch! I have updated the code to catch google.auth.exceptions.TransportError along with asyncio.TimeoutError and return (False, {}, False) on failure.
If a request fails due to a connection issue or timeout, the method now returns a non-retryable failure, which triggers the background refresh manager to enter the cooldown period. Any other unexpected exceptions will be caught and logged as warnings by the background worker task.
| @@ -369,6 +401,8 @@ def _copy_regional_access_boundary_manager(self, target): | |||
| # but share the immutable data reference to avoid unnecessary initial lookups. | |||
| new_manager = _regional_access_boundary_utils._RegionalAccessBoundaryManager() | |||
| new_manager._data = self._rab_manager._data | |||
| # Preserve the type of refresh manager (sync or async) | |||
| new_manager.refresh_manager = self._rab_manager.refresh_manager.__class__() | |||
There was a problem hiding this comment.
I don't think this is safe across sync/async creds.
- Using the source refresh-manager means we can have an async refresh manager on a sync cred which can later call asyncio.create_task() from a sync before_request.
- _use_blocking_regional_access_boundary_lookup is not kept, so a credential configured for blocking RAB can become non-blocking after with_scopes(), with_quota_project(), etc (breaking gcloud)
I think the cred should keep its initialized manager type and we should only copy the RAB data/config.
There was a problem hiding this comment.
You're right, I had missed that the initialization of the credential would take care of the manager and didn't have to copy/create it here! Updated this method to only copy over the RAB data, and also _use_blocking_regional_access_boundary_lookup.
| except ValueError: | ||
| response_data = response_body | ||
|
|
||
| if response.status == http_client.OK: |
There was a problem hiding this comment.
In google.auth.aio.transport.Response, the HTTP status code is exposed via the status_code property, not status. Passing a compliant google.auth.aio transport callable raises AttributeError: 'Response' object has no attribute 'status'. Please update the async lookup and grant methods to check .status_code.
There was a problem hiding this comment.
Actually decided to only apply the fix to the RAB lookup, and instead open a bug to bring the grant methods up to date separately. This was can keep the blast radius of this PR smaller and limit it to only RAB changes without touching any token fetching flows.
There was a problem hiding this comment.
Created #17139 to track the necessary changes for the token endpoints.
| @@ -288,3 +289,145 @@ async def refresh_grant( | |||
| request, token_uri, body, can_retry=can_retry | |||
| ) | |||
| return client._handle_refresh_grant_response(response_data, refresh_token) | |||
|
|
|||
|
|
|||
There was a problem hiding this comment.
is _jwt_async.py out of scope?
| await credentials_async.Credentials.before_request( | ||
| self, request, method, url, headers | ||
| ) | ||
| self._maybe_start_regional_access_boundary_refresh(request, url) | ||
| self._rab_manager.apply_headers(headers) |
There was a problem hiding this comment.
This may be redundant, why is this needed?
There was a problem hiding this comment.
Thanks for flagging this. I actually realized we no longer need the before_request override, if we follow what we did with the sync RAB flow and using the _after_refresh.
With this hook in place, the base class now automatically orchestrates the token refresh, RAB lookup, and header application in the correct order.
| # A refresh is already in progress. | ||
| return | ||
|
|
||
| async def _worker(): |
There was a problem hiding this comment.
Unlike the synchronous refresh manager which safely deepcopies the transport (copied_request = copy.deepcopy(request)), the async manager passes the exact same request instance directly into the background coroutine task. Because start_refresh is invoked inside before_request, the main application coroutine immediately proceeds to make its actual service API HTTP call using the exact same request transport while the background task is concurrently using it, risking HTTP state corruption or interleaved headers.
Additionally, spawning asyncio.create_task(_worker()) without tracking cancellation hooks upon client session closure can potentially cause dangling tasks that raise RuntimeError: Session is closed when executing against closed client sessions.
There was a problem hiding this comment.
Both concerns are valid, but safe under the hood:
-
Deepcopying the Transport is impossible and not needed in async
The async transport object (e.g., aiohttp_requests.Request) contains an active aiohttp.ClientSession with open TCP sockets. Attempting to copy.deepcopy(request) will instantly raise a TypeError: cannot pickle 'ClientSession' object and crash the application at runtime. Unlike synchronous transports running on separate OS threads, async HTTP clients (like aiohttp or httpx) are natively designed to handle concurrent requests sharing the same connection session. All request-specific states (headers, payloads) are stored in localized coroutine call stack frames, preventing any HTTP state corruption or interleaving. -
Session closure and dangling tasks are handled safely
The background worker is a single-shot asyncio task that executes exactly one lookup request and then immediately terminates and gets garbage-collected.
If the user's application closes the underlying client session while a background task is still running, the resulting RuntimeError: Session is closed is safely caught by the worker's generic except Exception as e: block. It logs a warning, fails open cleanly, and does not raise an unhandled exception or crash the application.
I think no code changes are required here, as the current design is fully protected.
| @@ -66,6 +67,12 @@ class Credentials( | |||
| credentials = credentials.with_quota_project('myproject-123') | |||
| """ | |||
|
|
|||
| def __init__(self, *args, **kwargs): | |||
There was a problem hiding this comment.
_service_account_async.Credentials lacks a __setstate__ override. When older pickled async credentials unpickle, they fall back to CredentialsWithRegionalAccessBoundary.__setstate__, which attaches a synchronous _RegionalAccessBoundaryRefreshManager. If a background RAB lookup triggers on this unpickled credential, a synchronous background thread will invoke async def _lookup_regional_access_boundary synchronously without awaiting it, causing a fatal thread crash (AttributeError: 'coroutine' object has no attribute 'get'). Please implement __setstate__ to ensure self._rab_manager.refresh_manager is always restored as an _AsyncRegionalAccessBoundaryRefreshManager().
There was a problem hiding this comment.
Good catch. I've added __setstate__ and a unit test.
…y data and config
…ager in service account credentials
| new_manager._data = self._rab_manager._data | ||
| target._rab_manager = new_manager | ||
| """Copies the regional access boundary manager state to another instance.""" | ||
| target._rab_manager._data = self._rab_manager._data |
There was a problem hiding this comment.
Tests are failing:
FAILED tests/test_external_account.py::TestCredentials::test_with_scopes_full_options_propagated - AttributeError: 'CredentialsImpl' object has no attribute '_rab_manager'
FAILED tests/test_external_account.py::TestCredentials::test_with_quota_project_full_options_propagated - AttributeError: 'CredentialsImpl' object has no attribute '_rab_manager'
Probably tests are configured incorrectly? I don't think it's possible for there to not be a rab manager?
| if hasattr(response, "read"): | ||
| response_bytes = await response.read() | ||
| else: | ||
| response_bytes = await response.content() |
There was a problem hiding this comment.
This can raise an error too, but currently not being caught.
| else response_bytes | ||
| ) | ||
| response_data = json.loads(response_body) | ||
| except (UnicodeDecodeError, ValueError): |
There was a problem hiding this comment.
As written, a retryable error response without a JSON body would not be retried because we return here before checking the status.
| expected_url_standard = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}/allowedLocations".format( | ||
| self.SERVICE_ACCOUNT_EMAIL | ||
| ) | ||
| expected_url_mtls = "https://iamcredentials.mtls.googleapis.com/v1/projects/-/serviceAccounts/{}/allowedLocations".format( |
There was a problem hiding this comment.
Can you remind me what prompted the mTLS addition?
We should have test(s) that set that signal and makes sure the right endpoint is called on refresh etc.
This PR implements the following changes: