Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""In-memory LRU cache for bucket metadata supporting App-centric Observability (ACO)."""

import logging
import threading

from google.api_core import exceptions as api_exceptions
from google.cloud.exceptions import NotFound
from google.cloud.storage._lru_cache import LRUCache

logger = logging.getLogger(__name__)


class BucketMetadataCache:
"""Thread-safe LRU cache for storing GCS bucket metadata (project number and location).

Supports Singleflight asynchronous background fetching to prevent stampedes on cache misses.
"""

def __init__(self, client, max_size=10000):
self._client = client
self._cache = LRUCache(max_size)
self._lock = threading.Lock()
self._inflight_fetches = set()

def get_or_queue_fetch(self, bucket_name):
"""Retrieve bucket metadata or queue a background fetch on cache miss.

Returns None immediately on cache miss so caller does not block.
"""
with self._lock:
if bucket_name in self._cache:
return self._cache.get(bucket_name)
elif bucket_name in self._inflight_fetches:
# this would be the case of thundering herd, where 'n' threads
# all of them faced "cache miss" and 1 is in progress to fetch metadata.
# hence we don't want rest `n - 1` threads to make the same req
return None
else:
# fire a background thread and get bucket metadata.
self._inflight_fetches.add(bucket_name)
threading.Thread(
target=self._fetch_background, args=(bucket_name,), daemon=True
).start()
return None

def _fetch_background(self, bucket_name):
"""Asynchronously fetch bucket metadata and update the cache."""
try:
bucket = self._client.get_bucket(bucket_name, timeout=10.0)
self.update_from_bucket(bucket)
except (NotFound, api_exceptions.NotFound):
self.evict(bucket_name)
except api_exceptions.Forbidden:
# On 403 (Forbidden), cache fallback values permanently to avoid retry storms
self.update_cache(
bucket_name, f"projects/_/buckets/{bucket_name}", "global"
)
except Exception as e:
logger.debug(
f"Background fetch for bucket metadata failed for {bucket_name}: {e}"
)
finally:
with self._lock:
self._inflight_fetches.discard(bucket_name)

def update_from_bucket(self, bucket):
"""Update cache from a Bucket instance."""
if not bucket or not bucket.name:
return

project_number = getattr(bucket, "project_number", None)
location = getattr(bucket, "location", None) or "global"
location = location.lower()
location_type = getattr(bucket, "location_type", None) or "region"
location_type = location_type.lower()

if location_type in ("multi-region", "dual-region"):
location = "global"

if project_number:
destination_id = f"projects/{project_number}/buckets/{bucket.name}"
else:
destination_id = f"projects/_/buckets/{bucket.name}"

self.update_cache(bucket.name, destination_id, location)

def update_cache(self, bucket_name, destination_id, location):
"""Thread-safely update or insert a cache entry with bounded size."""
with self._lock:
self._cache.put(bucket_name, (destination_id, location))

def evict(self, bucket_name):
"""Remove a bucket from the cache (e.g., on 404)."""
with self._lock:
self._cache.delete(bucket_name)

def clear(self):
"""Clear all cached metadata."""
with self._lock:
self._cache.clear()
self._inflight_fetches.clear()
49 changes: 49 additions & 0 deletions packages/google-cloud-storage/google/cloud/storage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
DEFAULT_RETRY,
DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
)
from google.cloud.storage._opentelemetry_tracing import (
create_trace_span as _base_create_trace_span,
)

STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST" # Despite name, includes scheme.
"""Environment variable defining host for Storage emulator."""
Expand Down Expand Up @@ -185,6 +188,52 @@ def _require_client(self, client):
client = self.client
return client

def _get_aco_attributes(self):
from google.cloud.storage.blob import Blob
from google.cloud.storage.bucket import Bucket

if isinstance(self, Bucket):
cache = getattr(self.client, "_bucket_metadata_cache", None)
bucket_name = self.name
elif isinstance(self, Blob):
bucket = getattr(self, "bucket", None)
cache = (
getattr(bucket.client, "_bucket_metadata_cache", None)
if bucket and hasattr(bucket, "client")
else None
)
bucket_name = getattr(bucket, "name", None) if bucket else None
else:
raise TypeError(
f"Unexpected type for ACO attribute retrieval: {type(self)}"
)

if callable(bucket_name):
try:
bucket_name = bucket_name()
except Exception:
pass

if cache and bucket_name and isinstance(bucket_name, str):
try:
cached = cache.get_or_queue_fetch(bucket_name)
if cached and isinstance(cached, tuple) and len(cached) == 2:
dest_id, loc = cached
return {
"gcp.resource.destination.id": dest_id,
"gcp.resource.destination.location": loc,
}
except Exception:
pass
return {}

def _create_trace_span(self, name, attributes=None, **kwargs):
aco_attrs = self._get_aco_attributes()
if attributes is None:
attributes = {}
attributes.update(aco_attrs)
return _base_create_trace_span(name, attributes=attributes, **kwargs)

def _encryption_headers(self):
"""Return any encryption headers needed to fetch the object.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A Least Recently Used (LRU) cache implementation."""

from collections import OrderedDict
from typing import Any, Generic, Optional, TypeVar

K = TypeVar("K")
V = TypeVar("V")


class LRUCache(Generic[K, V]):
"""A Least Recently Used (LRU) cache implementation using OrderedDict.

:type capacity: int
:param capacity: The maximum number of items the cache can hold.
"""

def __init__(self, capacity: int) -> None:
if capacity <= 0:
raise ValueError("Capacity must be greater than 0")
self._capacity = capacity
self._cache: OrderedDict[K, V] = OrderedDict()

@property
def capacity(self) -> int:
"""Return the capacity of the cache."""
return self._capacity

def get(self, key: K, default: Optional[V] = None) -> Optional[V]:
"""Retrieve an item from the cache.

If the key exists, it is moved to the end (marked as most recently used).

:type key: Any
:param key: The key to look up in the cache.

:type default: Any
:param default: Default value to return if key is not found.
"""
if key not in self._cache:
return default
self._cache.move_to_end(key)
return self._cache[key]

def put(self, key: K, value: V) -> None:
"""Add or update an item in the cache.

If the key already exists, it is updated and moved to the end.
If adding the item exceeds capacity, the least recently used item (at the beginning)
is evicted.

:type key: Any
:param key: The key to store.

:type value: Any
:param value: The value to store.
"""
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = value
if len(self._cache) > self._capacity:
self._cache.popitem(last=False)

def __len__(self) -> int:
return len(self._cache)

def __contains__(self, key: K) -> bool:
return key in self._cache

def clear(self) -> None:
"""Clear all items from the cache."""
self._cache.clear()

def delete(self, key: K) -> None:
"""Remove an item from the cache if it exists."""
self._cache.pop(key, None)
Loading
Loading