From d0f49808e713499b69b45c0a1410d6e6ddb23e08 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 15 May 2026 09:01:14 +0000 Subject: [PATCH] feat(storage): implement App-centric Observability (ACO) OpenTelemetry tracing and lockless bucket metadata caching --- .../cloud/storage/_bucket_metadata_cache.py | 115 +++++++++++++ .../google/cloud/storage/_helpers.py | 49 ++++++ .../google/cloud/storage/_lru_cache.py | 89 ++++++++++ .../google/cloud/storage/blob.py | 49 +++--- .../google/cloud/storage/bucket.py | 45 +++-- .../google/cloud/storage/client.py | 15 ++ .../tests/unit/test__bucket_metadata_cache.py | 159 ++++++++++++++++++ .../tests/unit/test__lru_cache.py | 101 +++++++++++ 8 files changed, 574 insertions(+), 48 deletions(-) create mode 100644 packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py create mode 100644 packages/google-cloud-storage/google/cloud/storage/_lru_cache.py create mode 100644 packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py create mode 100644 packages/google-cloud-storage/tests/unit/test__lru_cache.py diff --git a/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py b/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py new file mode 100644 index 000000000000..72069445ee9b --- /dev/null +++ b/packages/google-cloud-storage/google/cloud/storage/_bucket_metadata_cache.py @@ -0,0 +1,115 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""In-memory LRU cache for bucket metadata supporting App-centric Observability (ACO).""" + +import logging +import threading + +from google.api_core import exceptions as api_exceptions +from google.cloud.exceptions import NotFound +from google.cloud.storage._lru_cache import LRUCache + +logger = logging.getLogger(__name__) + + +class BucketMetadataCache: + """Thread-safe LRU cache for storing GCS bucket metadata (project number and location). + + Supports Singleflight asynchronous background fetching to prevent stampedes on cache misses. + """ + + def __init__(self, client, max_size=10000): + self._client = client + self._cache = LRUCache(max_size) + self._lock = threading.Lock() + self._inflight_fetches = set() + + def get_or_queue_fetch(self, bucket_name): + """Retrieve bucket metadata or queue a background fetch on cache miss. + + Returns None immediately on cache miss so caller does not block. + """ + with self._lock: + if bucket_name in self._cache: + return self._cache.get(bucket_name) + elif bucket_name in self._inflight_fetches: + # this would be the case of thundering herd, where 'n' threads + # all of them faced "cache miss" and 1 is in progress to fetch metadata. + # hence we don't want rest `n - 1` threads to make the same req + return None + else: + # fire a background thread and get bucket metadata. + self._inflight_fetches.add(bucket_name) + threading.Thread( + target=self._fetch_background, args=(bucket_name,), daemon=True + ).start() + return None + + def _fetch_background(self, bucket_name): + """Asynchronously fetch bucket metadata and update the cache.""" + try: + bucket = self._client.get_bucket(bucket_name, timeout=10.0) + self.update_from_bucket(bucket) + except (NotFound, api_exceptions.NotFound): + self.evict(bucket_name) + except api_exceptions.Forbidden: + # On 403 (Forbidden), cache fallback values permanently to avoid retry storms + self.update_cache( + bucket_name, f"projects/_/buckets/{bucket_name}", "global" + ) + except Exception as e: + logger.debug( + f"Background fetch for bucket metadata failed for {bucket_name}: {e}" + ) + finally: + with self._lock: + self._inflight_fetches.discard(bucket_name) + + def update_from_bucket(self, bucket): + """Update cache from a Bucket instance.""" + if not bucket or not bucket.name: + return + + project_number = getattr(bucket, "project_number", None) + location = getattr(bucket, "location", None) or "global" + location = location.lower() + location_type = getattr(bucket, "location_type", None) or "region" + location_type = location_type.lower() + + if location_type in ("multi-region", "dual-region"): + location = "global" + + if project_number: + destination_id = f"projects/{project_number}/buckets/{bucket.name}" + else: + destination_id = f"projects/_/buckets/{bucket.name}" + + self.update_cache(bucket.name, destination_id, location) + + def update_cache(self, bucket_name, destination_id, location): + """Thread-safely update or insert a cache entry with bounded size.""" + with self._lock: + self._cache.put(bucket_name, (destination_id, location)) + + def evict(self, bucket_name): + """Remove a bucket from the cache (e.g., on 404).""" + with self._lock: + self._cache.delete(bucket_name) + + def clear(self): + """Clear all cached metadata.""" + with self._lock: + self._cache.clear() + self._inflight_fetches.clear() diff --git a/packages/google-cloud-storage/google/cloud/storage/_helpers.py b/packages/google-cloud-storage/google/cloud/storage/_helpers.py index 3ba8caff6611..257a7358e71c 100644 --- a/packages/google-cloud-storage/google/cloud/storage/_helpers.py +++ b/packages/google-cloud-storage/google/cloud/storage/_helpers.py @@ -33,6 +33,9 @@ DEFAULT_RETRY, DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ) +from google.cloud.storage._opentelemetry_tracing import ( + create_trace_span as _base_create_trace_span, +) STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST" # Despite name, includes scheme. """Environment variable defining host for Storage emulator.""" @@ -185,6 +188,52 @@ def _require_client(self, client): client = self.client return client + def _get_aco_attributes(self): + from google.cloud.storage.blob import Blob + from google.cloud.storage.bucket import Bucket + + if isinstance(self, Bucket): + cache = getattr(self.client, "_bucket_metadata_cache", None) + bucket_name = self.name + elif isinstance(self, Blob): + bucket = getattr(self, "bucket", None) + cache = ( + getattr(bucket.client, "_bucket_metadata_cache", None) + if bucket and hasattr(bucket, "client") + else None + ) + bucket_name = getattr(bucket, "name", None) if bucket else None + else: + raise TypeError( + f"Unexpected type for ACO attribute retrieval: {type(self)}" + ) + + if callable(bucket_name): + try: + bucket_name = bucket_name() + except Exception: + pass + + if cache and bucket_name and isinstance(bucket_name, str): + try: + cached = cache.get_or_queue_fetch(bucket_name) + if cached and isinstance(cached, tuple) and len(cached) == 2: + dest_id, loc = cached + return { + "gcp.resource.destination.id": dest_id, + "gcp.resource.destination.location": loc, + } + except Exception: + pass + return {} + + def _create_trace_span(self, name, attributes=None, **kwargs): + aco_attrs = self._get_aco_attributes() + if attributes is None: + attributes = {} + attributes.update(aco_attrs) + return _base_create_trace_span(name, attributes=attributes, **kwargs) + def _encryption_headers(self): """Return any encryption headers needed to fetch the object. diff --git a/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py b/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py new file mode 100644 index 000000000000..8a208e0ea6c8 --- /dev/null +++ b/packages/google-cloud-storage/google/cloud/storage/_lru_cache.py @@ -0,0 +1,89 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A Least Recently Used (LRU) cache implementation.""" + +from collections import OrderedDict +from typing import Any, Generic, Optional, TypeVar + +K = TypeVar("K") +V = TypeVar("V") + + +class LRUCache(Generic[K, V]): + """A Least Recently Used (LRU) cache implementation using OrderedDict. + + :type capacity: int + :param capacity: The maximum number of items the cache can hold. + """ + + def __init__(self, capacity: int) -> None: + if capacity <= 0: + raise ValueError("Capacity must be greater than 0") + self._capacity = capacity + self._cache: OrderedDict[K, V] = OrderedDict() + + @property + def capacity(self) -> int: + """Return the capacity of the cache.""" + return self._capacity + + def get(self, key: K, default: Optional[V] = None) -> Optional[V]: + """Retrieve an item from the cache. + + If the key exists, it is moved to the end (marked as most recently used). + + :type key: Any + :param key: The key to look up in the cache. + + :type default: Any + :param default: Default value to return if key is not found. + """ + if key not in self._cache: + return default + self._cache.move_to_end(key) + return self._cache[key] + + def put(self, key: K, value: V) -> None: + """Add or update an item in the cache. + + If the key already exists, it is updated and moved to the end. + If adding the item exceeds capacity, the least recently used item (at the beginning) + is evicted. + + :type key: Any + :param key: The key to store. + + :type value: Any + :param value: The value to store. + """ + if key in self._cache: + self._cache.move_to_end(key) + self._cache[key] = value + if len(self._cache) > self._capacity: + self._cache.popitem(last=False) + + def __len__(self) -> int: + return len(self._cache) + + def __contains__(self, key: K) -> bool: + return key in self._cache + + def clear(self) -> None: + """Clear all items from the cache.""" + self._cache.clear() + + def delete(self, key: K) -> None: + """Remove an item from the cache if it exists.""" + self._cache.pop(key, None) diff --git a/packages/google-cloud-storage/google/cloud/storage/blob.py b/packages/google-cloud-storage/google/cloud/storage/blob.py index c6fbcf4c12b7..7b4c6b0a0e79 100644 --- a/packages/google-cloud-storage/google/cloud/storage/blob.py +++ b/packages/google-cloud-storage/google/cloud/storage/blob.py @@ -60,7 +60,6 @@ ) from google.cloud.storage._opentelemetry_tracing import ( _get_opentelemetry_attributes_from_url, - create_trace_span, ) from google.cloud.storage._signing import generate_signed_url_v2, generate_signed_url_v4 from google.cloud.storage.acl import ACL, ObjectACL @@ -743,7 +742,7 @@ def exists( :rtype: bool :returns: True if the blob exists in Cloud Storage. """ - with create_trace_span(name="Storage.Blob.exists"): + with self._create_trace_span(name="Storage.Blob.exists"): client = self._require_client(client) # We only need the status code (200 or not) so we seek to # minimize the returned payload. @@ -847,7 +846,7 @@ def delete( (propagated from :meth:`google.cloud.storage.bucket.Bucket.delete_blob`). """ - with create_trace_span(name="Storage.Blob.delete"): + with self._create_trace_span(name="Storage.Blob.delete"): self.bucket.delete_blob( self.name, client=client, @@ -1086,7 +1085,7 @@ def _do_download( # not supported for chunked downloads. single_shot_download=single_shot_download, ) - with create_trace_span( + with self._create_trace_span( name=f"Storage.{download_class}/consume", attributes=extra_attributes, api_request=args, @@ -1115,7 +1114,7 @@ def _do_download( retry=retry, ) - with create_trace_span( + with self._create_trace_span( name=f"Storage.{download_class}/consumeNextChunk", attributes=extra_attributes, api_request=args, @@ -1243,7 +1242,7 @@ def download_to_file( :raises: :class:`google.cloud.exceptions.NotFound` """ - with create_trace_span(name="Storage.Blob.downloadToFile"): + with self._create_trace_span(name="Storage.Blob.downloadToFile"): self._prep_and_do_download( file_obj, client=client, @@ -1399,7 +1398,7 @@ def download_to_filename( :raises: :class:`google.cloud.exceptions.NotFound` """ - with create_trace_span(name="Storage.Blob.downloadToFilename"): + with self._create_trace_span(name="Storage.Blob.downloadToFilename"): self._handle_filename_and_download( filename, client=client, @@ -1524,7 +1523,7 @@ def download_as_bytes( :raises: :class:`google.cloud.exceptions.NotFound` """ - with create_trace_span(name="Storage.Blob.downloadAsBytes"): + with self._create_trace_span(name="Storage.Blob.downloadAsBytes"): string_buffer = BytesIO() self._prep_and_do_download( @@ -1647,7 +1646,7 @@ def download_as_string( PendingDeprecationWarning, stacklevel=2, ) - with create_trace_span(name="Storage.Blob.downloadAsString"): + with self._create_trace_span(name="Storage.Blob.downloadAsString"): return self.download_as_bytes( client=client, start=start, @@ -1761,7 +1760,7 @@ def download_as_text( :rtype: text :returns: The data stored in this blob, decoded to text. """ - with create_trace_span(name="Storage.Blob.downloadAsText"): + with self._create_trace_span(name="Storage.Blob.downloadAsText"): data = self.download_as_bytes( client=client, start=start, @@ -2052,7 +2051,7 @@ def _do_multipart_upload( extra_attributes = _get_opentelemetry_attributes_from_url(upload_url) extra_attributes["upload.checksum"] = f"{checksum}" args = {"timeout": timeout} - with create_trace_span( + with self._create_trace_span( name="Storage.MultipartUpload/transmit", attributes=extra_attributes, client=client, @@ -2452,7 +2451,7 @@ def _do_resumable_upload( extra_attributes["upload.checksum"] = f"{checksum}" args = {"timeout": timeout} - with create_trace_span( + with self._create_trace_span( name="Storage.ResumableUpload/transmitNextChunk", attributes=extra_attributes, client=client, @@ -3011,7 +3010,7 @@ def upload_from_file( :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. """ - with create_trace_span(name="Storage.Blob.uploadFromFile"): + with self._create_trace_span(name="Storage.Blob.uploadFromFile"): self._prep_and_do_upload( file_obj, rewind=rewind, @@ -3194,7 +3193,7 @@ def upload_from_filename( https://datatracker.ietf.org/doc/html/rfc4960#appendix-B and base64: https://datatracker.ietf.org/doc/html/rfc4648#section-4 """ - with create_trace_span(name="Storage.Blob.uploadFromFilename"): + with self._create_trace_span(name="Storage.Blob.uploadFromFilename"): self._handle_filename_and_upload( filename, content_type=content_type, @@ -3343,7 +3342,7 @@ def upload_from_string( https://datatracker.ietf.org/doc/html/rfc4960#appendix-B and base64: https://datatracker.ietf.org/doc/html/rfc4648#section-4 """ - with create_trace_span(name="Storage.Blob.uploadFromString"): + with self._create_trace_span(name="Storage.Blob.uploadFromString"): data = _to_bytes(data, encoding="utf-8") string_buffer = BytesIO(data) self.upload_from_file( @@ -3495,7 +3494,7 @@ def create_resumable_upload_session( :raises: :class:`google.cloud.exceptions.GoogleCloudError` if the session creation response returns an error status. """ - with create_trace_span(name="Storage.Blob.createResumableUploadSession"): + with self._create_trace_span(name="Storage.Blob.createResumableUploadSession"): # Handle ConditionalRetryPolicy. if isinstance(retry, ConditionalRetryPolicy): # Conditional retries are designed for non-media calls, which change @@ -3591,7 +3590,7 @@ def get_iam_policy( :returns: the policy instance, based on the resource returned from the ``getIamPolicy`` API request. """ - with create_trace_span(name="Storage.Blob.getIamPolicy"): + with self._create_trace_span(name="Storage.Blob.getIamPolicy"): client = self._require_client(client) query_params = {} @@ -3652,7 +3651,7 @@ def set_iam_policy( :returns: the policy instance, based on the resource returned from the ``setIamPolicy`` API request. """ - with create_trace_span(name="Storage.Blob.setIamPolicy"): + with self._create_trace_span(name="Storage.Blob.setIamPolicy"): client = self._require_client(client) query_params = {} @@ -3714,7 +3713,7 @@ def test_iam_permissions( :returns: the permissions returned by the ``testIamPermissions`` API request. """ - with create_trace_span(name="Storage.Blob.testIamPermissions"): + with self._create_trace_span(name="Storage.Blob.testIamPermissions"): client = self._require_client(client) query_params = {"permissions": permissions} @@ -3774,7 +3773,7 @@ def make_public( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Blob.makePublic"): + with self._create_trace_span(name="Storage.Blob.makePublic"): self.acl.all().grant_read() self.acl.save( client=client, @@ -3828,7 +3827,7 @@ def make_private( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Blob.makePrivate"): + with self._create_trace_span(name="Storage.Blob.makePrivate"): self.acl.all().revoke_read() self.acl.save( client=client, @@ -3909,7 +3908,7 @@ def compose( to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). """ - with create_trace_span(name="Storage.Blob.compose"): + with self._create_trace_span(name="Storage.Blob.compose"): sources_len = len(sources) client = self._require_client(client) query_params = {} @@ -4088,7 +4087,7 @@ def rewrite( and ``total_bytes`` is the total number of bytes to be rewritten. """ - with create_trace_span(name="Storage.Blob.rewrite"): + with self._create_trace_span(name="Storage.Blob.rewrite"): client = self._require_client(client) headers = _get_encryption_headers(self._encryption_key) headers.update(_get_encryption_headers(source._encryption_key, source=True)) @@ -4248,7 +4247,7 @@ def update_storage_class( to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). """ - with create_trace_span(name="Storage.Blob.updateStorageClass"): + with self._create_trace_span(name="Storage.Blob.updateStorageClass"): # Update current blob's storage class prior to rewrite self._patch_property("storageClass", new_class) @@ -4392,7 +4391,7 @@ def open( 'google.cloud.storage.fileio', or an 'io.TextIOWrapper' around one of those classes, depending on the 'mode' argument. """ - with create_trace_span(name="Storage.Blob.open"): + with self._create_trace_span(name="Storage.Blob.open"): if mode == "rb": if encoding or errors or newline: raise ValueError( diff --git a/packages/google-cloud-storage/google/cloud/storage/bucket.py b/packages/google-cloud-storage/google/cloud/storage/bucket.py index 6fd690cf38b2..4abcbf09444a 100644 --- a/packages/google-cloud-storage/google/cloud/storage/bucket.py +++ b/packages/google-cloud-storage/google/cloud/storage/bucket.py @@ -38,7 +38,6 @@ _validate_name, _virtual_hosted_style_base_url, ) -from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage._signing import generate_signed_url_v2, generate_signed_url_v4 from google.cloud.storage.acl import BucketACL, DefaultObjectACL from google.cloud.storage.blob import Blob, _quote @@ -972,7 +971,7 @@ def exists( :rtype: bool :returns: True if the bucket exists in Cloud Storage. """ - with create_trace_span(name="Storage.Bucket.exists"): + with self._create_trace_span(name="Storage.Bucket.exists"): client = self._require_client(client) # We only need the status code (200 or not) so we seek to # minimize the returned payload. @@ -1073,7 +1072,7 @@ def create( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Bucket.create"): + with self._create_trace_span(name="Storage.Bucket.create"): client = self._require_client(client) client.create_bucket( bucket_or_name=self, @@ -1123,7 +1122,7 @@ def update( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Bucket.update"): + with self._create_trace_span(name="Storage.Bucket.update"): super(Bucket, self).update( client=client, timeout=timeout, @@ -1190,7 +1189,7 @@ def reload( set if ``soft_deleted`` is set to True. See: https://cloud.google.com/storage/docs/soft-delete """ - with create_trace_span(name="Storage.Bucket.reload"): + with self._create_trace_span(name="Storage.Bucket.reload"): super(Bucket, self).reload( client=client, projection=projection, @@ -1239,7 +1238,7 @@ def patch( :param retry: (Optional) How to retry the RPC. See: :ref:`configuring_retries` """ - with create_trace_span(name="Storage.Bucket.patch"): + with self._create_trace_span(name="Storage.Bucket.patch"): # Special case: For buckets, it is possible that labels are being # removed; this requires special handling. if self._label_removals: @@ -1375,7 +1374,7 @@ def get_blob( :rtype: :class:`google.cloud.storage.blob.Blob` or None :returns: The blob object if it exists, otherwise None. """ - with create_trace_span(name="Storage.Bucket.getBlob"): + with self._create_trace_span(name="Storage.Bucket.getBlob"): blob = Blob( bucket=self, name=blob_name, @@ -1525,7 +1524,7 @@ def list_blobs( :returns: Iterator of all :class:`~google.cloud.storage.blob.Blob` in this bucket matching the arguments. """ - with create_trace_span(name="Storage.Bucket.listBlobs"): + with self._create_trace_span(name="Storage.Bucket.listBlobs"): client = self._require_client(client) return client.list_blobs( self, @@ -1573,7 +1572,7 @@ def list_notifications( :rtype: list of :class:`.BucketNotification` :returns: notification instances """ - with create_trace_span(name="Storage.Bucket.listNotifications"): + with self._create_trace_span(name="Storage.Bucket.listNotifications"): client = self._require_client(client) path = self.path + "/notificationConfigs" iterator = client._list_resource( @@ -1618,7 +1617,7 @@ def get_notification( :rtype: :class:`.BucketNotification` :returns: notification instance. """ - with create_trace_span(name="Storage.Bucket.getNotification"): + with self._create_trace_span(name="Storage.Bucket.getNotification"): notification = self.notification(notification_id=notification_id) notification.reload(client=client, timeout=timeout, retry=retry) return notification @@ -1678,7 +1677,7 @@ def delete( :raises: :class:`ValueError` if ``force`` is ``True`` and the bucket contains more than 256 objects / blobs. """ - with create_trace_span(name="Storage.Bucket.delete"): + with self._create_trace_span(name="Storage.Bucket.delete"): client = self._require_client(client) query_params = {} @@ -1801,7 +1800,7 @@ def delete_blob( the exception, use :meth:`delete_blobs` by passing a no-op ``on_error`` callback. """ - with create_trace_span(name="Storage.Bucket.deleteBlob"): + with self._create_trace_span(name="Storage.Bucket.deleteBlob"): client = self._require_client(client) blob = Blob(blob_name, bucket=self, generation=generation) @@ -1914,7 +1913,7 @@ def delete_blobs( :raises: :class:`~google.cloud.exceptions.NotFound` (if `on_error` is not passed). """ - with create_trace_span(name="Storage.Bucket.deleteBlobs"): + with self._create_trace_span(name="Storage.Bucket.deleteBlobs"): _raise_if_len_differs( len(blobs), if_generation_match=if_generation_match, @@ -2068,7 +2067,7 @@ def copy_blob( :rtype: :class:`google.cloud.storage.blob.Blob` :returns: The new Blob. """ - with create_trace_span(name="Storage.Bucket.copyBlob"): + with self._create_trace_span(name="Storage.Bucket.copyBlob"): client = self._require_client(client) query_params = {} @@ -2222,7 +2221,7 @@ def rename_blob( :rtype: :class:`Blob` :returns: The newly-renamed blob. """ - with create_trace_span(name="Storage.Bucket.renameBlob"): + with self._create_trace_span(name="Storage.Bucket.renameBlob"): same_name = blob.name == new_name new_blob = self.copy_blob( @@ -2342,7 +2341,7 @@ def move_blob( :rtype: :class:`Blob` :returns: The newly-moved blob. """ - with create_trace_span(name="Storage.Bucket.moveBlob"): + with self._create_trace_span(name="Storage.Bucket.moveBlob"): client = self._require_client(client) query_params = {} @@ -2451,7 +2450,7 @@ def restore_blob( :rtype: :class:`google.cloud.storage.blob.Blob` :returns: The restored Blob. """ - with create_trace_span(name="Storage.Bucket.restore_blob"): + with self._create_trace_span(name="Storage.Bucket.restore_blob"): client = self._require_client(client) query_params = {} @@ -3346,7 +3345,7 @@ def get_iam_policy( :returns: the policy instance, based on the resource returned from the ``getIamPolicy`` API request. """ - with create_trace_span(name="Storage.Bucket.getIamPolicy"): + with self._create_trace_span(name="Storage.Bucket.getIamPolicy"): client = self._require_client(client) query_params = {} @@ -3400,7 +3399,7 @@ def set_iam_policy( :returns: the policy instance, based on the resource returned from the ``setIamPolicy`` API request. """ - with create_trace_span(name="Storage.Bucket.setIamPolicy"): + with self._create_trace_span(name="Storage.Bucket.setIamPolicy"): client = self._require_client(client) query_params = {} @@ -3457,7 +3456,7 @@ def test_iam_permissions( :returns: the permissions returned by the ``testIamPermissions`` API request. """ - with create_trace_span(name="Storage.Bucket.testIamPermissions"): + with self._create_trace_span(name="Storage.Bucket.testIamPermissions"): client = self._require_client(client) query_params = {"permissions": permissions} @@ -3523,7 +3522,7 @@ def make_public( :meth:`~google.cloud.storage.blob.Blob.make_public` for each blob. """ - with create_trace_span(name="Storage.Bucket.makePublic"): + with self._create_trace_span(name="Storage.Bucket.makePublic"): self.acl.all().grant_read() self.acl.save( client=client, @@ -3620,7 +3619,7 @@ def make_private( :meth:`~google.cloud.storage.blob.Blob.make_private` for each blob. """ - with create_trace_span(name="Storage.Bucket.makePrivate"): + with self._create_trace_span(name="Storage.Bucket.makePrivate"): self.acl.all().revoke_read() self.acl.save( client=client, @@ -3744,7 +3743,7 @@ def lock_retention_policy( if the bucket has no retention policy assigned; if the bucket's retention policy is already locked. """ - with create_trace_span(name="Storage.Bucket.lockRetentionPolicy"): + with self._create_trace_span(name="Storage.Bucket.lockRetentionPolicy"): if "metageneration" not in self._properties: raise ValueError( "Bucket has no retention policy assigned: try 'reload'?" diff --git a/packages/google-cloud-storage/google/cloud/storage/client.py b/packages/google-cloud-storage/google/cloud/storage/client.py index 528b2255f451..afd97fe33286 100644 --- a/packages/google-cloud-storage/google/cloud/storage/client.py +++ b/packages/google-cloud-storage/google/cloud/storage/client.py @@ -44,6 +44,7 @@ _get_storage_emulator_override, _virtual_hosted_style_base_url, ) +from google.cloud.storage._bucket_metadata_cache import BucketMetadataCache from google.cloud.storage._http import Connection from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage._signing import ( @@ -289,6 +290,20 @@ def __init__( connection.extra_headers = extra_headers self._connection = connection self._batch_stack = _LocalStack() + self._bucket_metadata_cache = BucketMetadataCache(self) + + def close(self): + """Close the client and clear any cached metadata or active connections.""" + if hasattr(self, "_bucket_metadata_cache") and self._bucket_metadata_cache: + self._bucket_metadata_cache.clear() + if hasattr(self._http, "close"): + self._http.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() @classmethod def create_anonymous_client(cls): diff --git a/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py new file mode 100644 index 000000000000..22a94cb23fa6 --- /dev/null +++ b/packages/google-cloud-storage/tests/unit/test__bucket_metadata_cache.py @@ -0,0 +1,159 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest +from unittest import mock + +from google.api_core import exceptions as api_exceptions +from google.cloud.exceptions import NotFound +from google.cloud.storage._bucket_metadata_cache import BucketMetadataCache + + +class TestBucketMetadataCache(unittest.TestCase): + @mock.patch("threading.Thread") + def test_lru_eviction(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client, max_size=3) + + cache.update_cache("b1", "dest1", "loc1") + cache.update_cache("b2", "dest2", "loc2") + cache.update_cache("b3", "dest3", "loc3") + cache.update_cache("b4", "dest4", "loc4") # Evicts b1 (oldest) + + self.assertIsNone(cache.get_or_queue_fetch("b1")) + self.assertEqual(cache.get_or_queue_fetch("b2"), ("dest2", "loc2")) + self.assertEqual(cache.get_or_queue_fetch("b3"), ("dest3", "loc3")) + self.assertEqual(cache.get_or_queue_fetch("b4"), ("dest4", "loc4")) + + def test_update_from_bucket(self): + client = mock.Mock() + cache = BucketMetadataCache(client) + + # Multi-region -> global + b1 = mock.Mock() + b1.name = "b1" + b1.location = "US" + b1.location_type = "multi-region" + b1.project_number = 123 + cache.update_from_bucket(b1) + self.assertEqual( + cache.get_or_queue_fetch("b1"), ("projects/123/buckets/b1", "global") + ) + + # Dual-region -> global + b2 = mock.Mock() + b2.name = "b2" + b2.location = "NAM4" + b2.location_type = "dual-region" + b2.project_number = 456 + cache.update_from_bucket(b2) + self.assertEqual( + cache.get_or_queue_fetch("b2"), ("projects/456/buckets/b2", "global") + ) + + # Region -> us-east1 + b3 = mock.Mock() + b3.name = "b3" + b3.location = "US-EAST1" + b3.location_type = "region" + b3.project_number = 789 + cache.update_from_bucket(b3) + self.assertEqual( + cache.get_or_queue_fetch("b3"), ("projects/789/buckets/b3", "us-east1") + ) + + # Missing project number -> _ + b4 = mock.Mock() + b4.name = "b4" + b4.location = "eu-west1" + b4.location_type = "region" + b4.project_number = None + cache.update_from_bucket(b4) + self.assertEqual( + cache.get_or_queue_fetch("b4"), ("projects/_/buckets/b4", "eu-west1") + ) + + @mock.patch("threading.Thread") + def test_get_or_queue_fetch(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client) + + # Cache miss -> returns None immediately and spawns thread + result = cache.get_or_queue_fetch("my-bucket") + self.assertIsNone(result) + mock_thread.assert_called_once() + + # Second immediate lookup -> returns None, does not spawn another thread (singleflight) + mock_thread.reset_mock() + result2 = cache.get_or_queue_fetch("my-bucket") + self.assertIsNone(result2) + mock_thread.assert_not_called() + + def test_fetch_background_success(self): + client = mock.Mock() + b1 = mock.Mock() + b1.name = "b1" + b1.location = "US-WEST1" + b1.location_type = "region" + b1.project_number = 999 + client.get_bucket.return_value = b1 + + cache = BucketMetadataCache(client) + cache._inflight_fetches.add("b1") + + cache._fetch_background("b1") + + self.assertEqual( + cache.get_or_queue_fetch("b1"), ("projects/999/buckets/b1", "us-west1") + ) + self.assertNotIn("b1", cache._inflight_fetches) + + def test_fetch_background_not_found(self): + client = mock.Mock() + client.get_bucket.side_effect = NotFound("Bucket not found") + cache = BucketMetadataCache(client) + cache.update_cache("b1", "projects/_/buckets/b1", "global") + cache._inflight_fetches.add("b1") + + cache._fetch_background("b1") + + self.assertNotIn("b1", cache._cache) + self.assertNotIn("b1", cache._inflight_fetches) + + def test_fetch_background_forbidden(self): + client = mock.Mock() + client.get_bucket.side_effect = api_exceptions.Forbidden("403") + cache = BucketMetadataCache(client) + cache._inflight_fetches.add("b1") + + cache._fetch_background("b1") + + self.assertEqual( + cache.get_or_queue_fetch("b1"), ("projects/_/buckets/b1", "global") + ) + self.assertNotIn("b1", cache._inflight_fetches) + + @mock.patch("threading.Thread") + def test_clear_and_evict(self, mock_thread): + client = mock.Mock() + cache = BucketMetadataCache(client) + + cache.update_cache("b1", "dest1", "loc1") + cache.evict("b1") + self.assertNotIn("b1", cache._cache) + + cache.update_cache("b2", "dest2", "loc2") + cache.clear() + self.assertNotIn("b2", cache._cache) diff --git a/packages/google-cloud-storage/tests/unit/test__lru_cache.py b/packages/google-cloud-storage/tests/unit/test__lru_cache.py new file mode 100644 index 000000000000..2f3fb59527a3 --- /dev/null +++ b/packages/google-cloud-storage/tests/unit/test__lru_cache.py @@ -0,0 +1,101 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.cloud.storage._lru_cache import LRUCache + + +def test_lru_cache_capacity(): + cache = LRUCache(capacity=3) + assert cache.capacity == 3 + + with pytest.raises(ValueError): + LRUCache(capacity=0) + + with pytest.raises(ValueError): + LRUCache(capacity=-1) + + +def test_lru_cache_put_and_get(): + cache = LRUCache(capacity=2) + assert cache.get("a") is None + assert cache.get("a", default="default") == "default" + + cache.put("a", 1) + assert cache.get("a") == 1 + assert len(cache) == 1 + assert "a" in cache + + cache.put("b", 2) + assert cache.get("b") == 2 + assert len(cache) == 2 + assert "b" in cache + + +def test_lru_cache_eviction(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + # Access "a" so "b" becomes least recently used + assert cache.get("a") == 1 + + # Put "c" should evict "b" + cache.put("c", 3) + + assert "b" not in cache + assert cache.get("b") is None + assert cache.get("a") == 1 + assert cache.get("c") == 3 + assert len(cache) == 2 + + +def test_lru_cache_update(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + # Update "a", so it becomes most recently used + cache.put("a", 10) + + # Put "c" should evict "b" + cache.put("c", 3) + + assert "b" not in cache + assert cache.get("a") == 10 + assert cache.get("c") == 3 + + +def test_lru_cache_clear(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + cache.clear() + assert len(cache) == 0 + assert "a" not in cache + assert "b" not in cache + + +def test_lru_cache_delete(): + cache = LRUCache(capacity=2) + cache.put("a", 1) + cache.put("b", 2) + + cache.delete("a") + assert len(cache) == 1 + assert "a" not in cache + assert cache.get("a") is None + assert cache.get("b") == 2