Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloudsmith_cli/core/api/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def _try_oidc_credential(config):
context = CredentialContext(
api_host=config.host,
debug=config.debug,
proxy=config.proxy,
ssl_verify=config.verify_ssl,
user_agent=config.user_agent,
headers=config.headers.copy() if config.headers else None,
)

provider = OidcProvider()
Expand Down
5 changes: 5 additions & 0 deletions cloudsmith_cli/core/credentials/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class CredentialContext:
debug: bool = False
# Pre-resolved values from CLI flags (highest priority)
cli_api_key: str | None = None
# API networking configuration
proxy: str | None = None
ssl_verify: bool = True
user_agent: str | None = None
headers: dict | None = None


@dataclass
Expand Down
28 changes: 25 additions & 3 deletions cloudsmith_cli/core/credentials/oidc/detectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,32 @@
]


def detect_environment(debug: bool = False) -> EnvironmentDetector | None:
"""Try each detector in order, returning the first that matches."""
def detect_environment(
debug: bool = False,
proxy: str | None = None,
ssl_verify: bool = True,
user_agent: str | None = None,
headers: dict | None = None,
) -> EnvironmentDetector | None:
"""Try each detector in order, returning the first that matches.

Args:
debug: Enable debug logging.
proxy: HTTP/HTTPS proxy URL (optional).
ssl_verify: Whether to verify SSL certificates (default: True).
user_agent: Custom user-agent string (optional).
headers: Additional headers to include (optional).

Returns:
The first matching detector instance, or None.
"""
for detector_cls in _DETECTORS:
detector = detector_cls()
detector = detector_cls(
proxy=proxy,
ssl_verify=ssl_verify,
user_agent=user_agent,
headers=headers,
)
try:
if detector.detect():
if debug:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def get_token(self) -> str:
access_token = os.environ["SYSTEM_ACCESSTOKEN"]
audience = get_oidc_audience()

response = requests.post(
session = self._create_session()
response = session.post(
request_uri,
json={"audience": audience},
headers={
Expand Down
43 changes: 43 additions & 0 deletions cloudsmith_cli/core/credentials/oidc/detectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import os

import requests

DEFAULT_OIDC_AUDIENCE = "cloudsmith"
OIDC_AUDIENCE_ENV_VAR = "CLOUDSMITH_OIDC_AUDIENCE"

Expand All @@ -18,10 +20,51 @@ class EnvironmentDetector:

name: str = "base"

def __init__(
self,
proxy: str | None = None,
ssl_verify: bool = True,
user_agent: str | None = None,
headers: dict | None = None,
):
"""Initialize detector with optional networking configuration.

Args:
proxy: HTTP/HTTPS proxy URL (optional).
ssl_verify: Whether to verify SSL certificates (default: True).
user_agent: Custom user-agent string (optional).
headers: Additional headers to include (optional).
"""
self.proxy = proxy
self.ssl_verify = ssl_verify
self.user_agent = user_agent
self.headers = headers

def detect(self) -> bool:
"""Return True if running in this CI/CD environment."""
raise NotImplementedError

def get_token(self) -> str:
"""Retrieve the OIDC JWT from this environment. Raises on failure."""
raise NotImplementedError

def _create_session(self) -> requests.Session:
"""Create a requests session configured with networking settings.

Returns:
Configured requests.Session instance.
"""
session = requests.Session()

if self.proxy:
session.proxies = {"http": self.proxy, "https": self.proxy}

session.verify = self.ssl_verify

if self.user_agent:
session.headers.update({"User-Agent": self.user_agent})

if self.headers:
session.headers.update(self.headers)

return session
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def get_token(self) -> str:
separator = "&" if "?" in request_url else "?"
url = f"{request_url}{separator}audience={quote(audience, safe='')}"

response = requests.get(
session = self._create_session()
response = session.get(
url,
headers={
"Authorization": f"Bearer {request_token}",
Expand Down
51 changes: 50 additions & 1 deletion cloudsmith_cli/core/credentials/oidc/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,48 @@
MAX_RETRIES = 3


def create_exchange_session(
proxy: str | None = None,
ssl_verify: bool = True,
user_agent: str | None = None,
headers: dict | None = None,
) -> requests.Session:
"""Create a requests session configured with networking settings.

Args:
proxy: HTTP/HTTPS proxy URL.
ssl_verify: Whether to verify SSL certificates.
user_agent: Custom user-agent string.
headers: Additional headers to include.

Returns:
Configured requests.Session instance.
"""
session = requests.Session()

if proxy:
session.proxies = {"http": proxy, "https": proxy}

session.verify = ssl_verify

if user_agent:
session.headers.update({"User-Agent": user_agent})

if headers:
session.headers.update(headers)

return session


def exchange_oidc_token(
api_host: str,
org: str,
service_slug: str,
oidc_token: str,
proxy: str | None = None,
ssl_verify: bool = True,
user_agent: str | None = None,
headers: dict | None = None,
) -> str:
"""Exchange a vendor OIDC JWT for a Cloudsmith API token.

Expand All @@ -34,6 +71,10 @@ def exchange_oidc_token(
org: The Cloudsmith organization slug.
service_slug: The Cloudsmith service account slug.
oidc_token: The vendor OIDC JWT to exchange.
proxy: HTTP/HTTPS proxy URL (optional).
ssl_verify: Whether to verify SSL certificates (default: True).
user_agent: Custom user-agent string (optional).
headers: Additional headers to include (optional).

Returns:
The short-lived Cloudsmith API token.
Expand All @@ -52,10 +93,18 @@ def exchange_oidc_token(
"service_slug": service_slug,
}

# Create configured session for the exchange
session = create_exchange_session(
proxy=proxy,
ssl_verify=ssl_verify,
user_agent=user_agent,
headers=headers,
)

last_error = None
for attempt in range(1, MAX_RETRIES + 1):
try:
response = requests.post(
response = session.post(
url,
json=payload,
headers={"Content-Type": "application/json"},
Expand Down
12 changes: 11 additions & 1 deletion cloudsmith_cli/core/credentials/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,13 @@ def resolve( # pylint: disable=too-many-return-statements
from .oidc.exchange import exchange_oidc_token

# Detect CI/CD environment and get vendor JWT
detector = detect_environment(debug=context.debug)
detector = detect_environment(
debug=context.debug,
proxy=context.proxy,
ssl_verify=context.ssl_verify,
user_agent=context.user_agent,
headers=context.headers,
)
if detector is None:
if context.debug:
logger.debug("OidcProvider: No CI/CD environment detected, skipping")
Expand Down Expand Up @@ -205,6 +211,10 @@ def resolve( # pylint: disable=too-many-return-statements
org=org,
service_slug=service_slug,
oidc_token=vendor_token,
proxy=context.proxy,
ssl_verify=context.ssl_verify,
user_agent=context.user_agent,
headers=context.headers,
)
except Exception: # pylint: disable=broad-exception-caught
# Exchange can fail for various reasons (network, API errors, auth issues)
Expand Down
Loading