diff --git a/README.md b/README.md index 81028cc9..a1de6353 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,30 @@ Or you can get the latest pre-release version from Cloudsmith: pip install --upgrade cloudsmith-cli --extra-index-url=https://dl.cloudsmith.io/public/cloudsmith/cli/python/index/ ``` +### Optional Dependencies + +The CLI supports optional extras for additional functionality: + +#### AWS OIDC Support + +For AWS environments (ECS, EKS, EC2), install with `aws` extra to enable automatic credential discovery: + +``` +pip install cloudsmith-cli[aws] +``` + +This installs `boto3[crt]` for AWS credential chain support, STS token generation, and AWS SSO compatibility. + +#### All Optional Features + +To install all optional dependencies: + +``` +pip install cloudsmith-cli[all] +``` + +**Note:** If you don't install the AWS extra, the AWS OIDC detector will gracefully skip itself with no errors. All other CI/CD platforms (GitHub Actions, GitLab CI, CircleCI, Azure DevOps, Bitbucket Pipelines, Jenkins) work without any extras. + ## Configuration There are two configuration files used by the CLI: diff --git a/cloudsmith_cli/cli/commands/__init__.py b/cloudsmith_cli/cli/commands/__init__.py index 0ee11133..8f629729 100644 --- a/cloudsmith_cli/cli/commands/__init__.py +++ b/cloudsmith_cli/cli/commands/__init__.py @@ -17,6 +17,7 @@ metrics, move, policy, + print_token, push, quarantine, quota, diff --git a/cloudsmith_cli/cli/commands/login.py b/cloudsmith_cli/cli/commands/login.py index 43840dc2..0ec69c1d 100644 --- a/cloudsmith_cli/cli/commands/login.py +++ b/cloudsmith_cli/cli/commands/login.py @@ -94,6 +94,12 @@ def login(ctx, opts, login, password): # pylint: disable=redefined-outer-name "Your API key/token is: %(token)s" % {"token": click.style(api_key, fg="magenta")} ) + click.echo() + click.echo( + "💡 Tip: Use " + + click.style("cloudsmith print-token", fg="cyan") + + " to retrieve this token later" + ) create, has_errors = create_config_files(ctx, opts, api_key=api_key) new_config_messaging(has_errors, opts, create, api_key=api_key) diff --git a/cloudsmith_cli/cli/commands/print_token.py b/cloudsmith_cli/cli/commands/print_token.py new file mode 100644 index 00000000..9e0d10fd --- /dev/null +++ b/cloudsmith_cli/cli/commands/print_token.py @@ -0,0 +1,104 @@ +"""CLI/Commands - Print the active authentication token.""" + +import click + +from .. import decorators, utils +from .main import main + + +def _extract_token(api_config): + """Extract the active token from the API configuration. + + Returns (token, token_type) where token_type is 'bearer' or 'api_key'. + """ + headers = getattr(api_config, "headers", {}) or {} + auth_header = headers.get("Authorization", "") + + if auth_header.startswith("Bearer "): + return auth_header[len("Bearer ") :], "bearer" + + api_keys = getattr(api_config, "api_key", {}) or {} + api_key = api_keys.get("X-Api-Key") + if api_key: + return api_key, "api_key" + + return None, None + + +@main.command(name="print-token") +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@decorators.common_api_auth_options +@decorators.initialise_api +@click.pass_context +def token(ctx, opts): + """Print the active authentication token. + + Outputs the token currently used to authenticate with the Cloudsmith API. + This is useful for passing to other tools like curl, docker, pip, etc. + + Note: This prints your CURRENT/ACTIVE token. To LOGIN and get a NEW token + interactively, use 'cloudsmith login' (or its alias 'cloudsmith token'). + + ⚠️ WARNING: This command prints sensitive credentials to stdout. + Avoid running this command in logged/recorded terminal sessions. + The token will appear in your shell history if stored in a variable. + + For safer usage, pipe directly to another command without storing + in variables. + + \b + Examples: + # Use with curl (pipe directly) + cloudsmith print-token | xargs -I{} curl -H "X-Api-Key: {}" https://api.cloudsmith.io/v1/user/self/ + + # Use with docker login (pipe to stdin) + cloudsmith print-token | docker login docker.cloudsmith.io -u token --password-stdin + + # Avoid: storing in shell variable (appears in history) + # export CS_TOKEN=$(cloudsmith print-token) # NOT RECOMMENDED + + \b + See also: + cloudsmith login Interactive login to get a NEW token + cloudsmith whoami Show current authentication status + """ + active_token, token_type = _extract_token(opts.api_config) + + if not active_token: + click.secho("Error: No authentication token available", fg="red", err=True) + click.echo(err=True) + click.echo("Try one of these commands to authenticate:", err=True) + click.echo( + " " + + click.style("cloudsmith login", fg="cyan") + + " # Interactive login", + err=True, + ) + click.echo( + " " + + click.style("cloudsmith authenticate", fg="cyan") + + " # SAML/SSO login", + err=True, + ) + click.echo( + " " + + click.style("export CLOUDSMITH_API_KEY=...", fg="cyan") + + " # Set via environment variable", + err=True, + ) + click.echo(err=True) + click.echo( + "For OIDC auto-discovery, set CLOUDSMITH_ORG and CLOUDSMITH_SERVICE_SLUG", + err=True, + ) + ctx.exit(1) + + if utils.maybe_print_as_json( + opts, + {"token": active_token, "type": token_type}, + ): + return + + # Print bare token to stdout (stderr used for any messages) + click.echo(active_token) diff --git a/cloudsmith_cli/cli/commands/whoami.py b/cloudsmith_cli/cli/commands/whoami.py index ffbf9842..e9b0224b 100644 --- a/cloudsmith_cli/cli/commands/whoami.py +++ b/cloudsmith_cli/cli/commands/whoami.py @@ -27,27 +27,74 @@ def _get_api_key_source(opts): """Determine where the API key was loaded from. Checks in priority order matching actual resolution: - CLI --api-key flag > CLOUDSMITH_API_KEY env var > credentials.ini. + CLI --api-key flag > CLOUDSMITH_API_KEY env var > credentials.ini > OIDC. """ - if not opts.api_key: + # Check if ANY API key is configured (from any source) + api_key_configured = opts.api_key or ( + hasattr(opts, "api_config") and opts.api_config and opts.api_config.api_key + ) + + if not api_key_configured: return {"configured": False, "source": None, "source_key": None} env_key = os.environ.get("CLOUDSMITH_API_KEY") - # If env var is set but differs from the resolved key, CLI flag won - if env_key and opts.api_key != env_key: - source, key = "CLI --api-key flag", "cli_flag" + # If CLI --api-key flag was explicitly passed + if opts.api_key: + # If env var is set but differs from the CLI flag, CLI flag won + if env_key and opts.api_key != env_key: + source, key = "CLI --api-key flag", "cli_flag" + # If env var is set and matches, it's actually from env var + elif env_key and opts.api_key == env_key: + suffix = env_key[-4:] + source, key = ( + f"CLOUDSMITH_API_KEY env var (ends with ...{suffix})", + "env_var", + ) + # CLI flag was set explicitly (not from env) + else: + source, key = "CLI --api-key flag", "cli_flag" + # No CLI flag, check other sources elif env_key: suffix = env_key[-4:] source, key = f"CLOUDSMITH_API_KEY env var (ends with ...{suffix})", "env_var" elif creds := CredentialsReader.find_existing_files(): source, key = f"credentials.ini ({creds[0]})", "credentials_file" + elif _is_oidc_configured(): + org = os.environ.get("CLOUDSMITH_ORG", "") + detector_name = _get_oidc_detector_name() + if detector_name: + source = f"OIDC auto-discovery: {detector_name} (org: {org})" + else: + source = f"OIDC auto-discovery (org: {org})" + key = "oidc" else: - source, key = "CLI --api-key flag", "cli_flag" + source, key = "Unknown source", "unknown" return {"configured": True, "source": source, "source_key": key} +def _get_oidc_detector_name(): + """Get the name of the OIDC detector that would be used.""" + try: + from cloudsmith_cli.core.credentials.oidc.detectors import detect_environment + + detector = detect_environment(debug=False) + if detector: + return detector.name + except Exception: # pylint: disable=broad-exception-caught + # Gracefully handle any detection failures - this is for display only + pass + return None + + +def _is_oidc_configured(): + """Check if OIDC environment variables are set.""" + return bool( + os.environ.get("CLOUDSMITH_ORG") and os.environ.get("CLOUDSMITH_SERVICE_SLUG") + ) + + def _get_sso_status(api_host): """Return SSO token status from the system keyring.""" enabled = keyring.should_use_keyring() @@ -120,7 +167,12 @@ def _print_verbose_text(data): click.echo(f" Source: {ak['source']}") click.echo(" Note: SSO token is being used instead") elif active == "api_key": - click.secho("Authentication Method: API Key", fg="cyan", bold=True) + if ak.get("source_key") == "oidc": + click.secho( + "Authentication Method: OIDC Auto-Discovery", fg="cyan", bold=True + ) + else: + click.secho("Authentication Method: API Key", fg="cyan", bold=True) for label, field in [ ("Source", "source"), ("Token Slug", "slug"), @@ -128,6 +180,10 @@ def _print_verbose_text(data): ]: if ak.get(field): click.echo(f" {label}: {ak[field]}") + click.echo() + click.echo( + "💡 Export this token: " + click.style("cloudsmith print-token", fg="cyan") + ) else: click.secho("Authentication Method: None (anonymous)", fg="yellow", bold=True) diff --git a/cloudsmith_cli/core/api/init.py b/cloudsmith_cli/core/api/init.py index b6a856bc..b7f1290f 100644 --- a/cloudsmith_cli/core/api/init.py +++ b/cloudsmith_cli/core/api/init.py @@ -1,6 +1,7 @@ """Cloudsmith API - Initialisation.""" import base64 +import logging from typing import Type, TypeVar import click @@ -11,6 +12,37 @@ from ..rest import RestClient from .exceptions import ApiException +logger = logging.getLogger(__name__) + + +def _try_oidc_credential(config): + """Attempt OIDC auto-discovery as a last-resort credential provider. + + Only activates when CLOUDSMITH_ORG and CLOUDSMITH_SERVICE_SLUG are set. + """ + from ..credentials import CredentialContext + from ..credentials.providers import OidcProvider + + context = CredentialContext( + api_host=config.host, + debug=config.debug, + proxy=config.proxy, + ssl_verify=config.verify_ssl, + user_agent=config.user_agent, + headers=config.headers.copy() if config.headers else None, + ) + + provider = OidcProvider() + result = provider.resolve(context) + + if result is not None: + config.api_key["X-Api-Key"] = result.api_key + + if config.debug: + click.echo(f"OIDC credential resolved: {result.source_detail}") + elif config.debug: + logger.debug("OIDC auto-discovery did not resolve credentials") + def initialise_api( debug=False, @@ -104,6 +136,9 @@ def initialise_api( if config.debug: click.echo("User API key config value set") + else: + # No access token and no API key provided — try OIDC auto-discovery + _try_oidc_credential(config) auth_header = headers and config.headers.get("Authorization") if auth_header and " " in auth_header: diff --git a/cloudsmith_cli/core/credentials/__init__.py b/cloudsmith_cli/core/credentials/__init__.py new file mode 100644 index 00000000..9e3c7831 --- /dev/null +++ b/cloudsmith_cli/core/credentials/__init__.py @@ -0,0 +1,85 @@ +"""Credential Provider Chain for Cloudsmith CLI. + +Implements an AWS SDK-style credential resolution chain that evaluates +credential sources sequentially and returns the first valid result. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class CredentialContext: + """Context passed to credential providers during resolution.""" + + api_host: str = "https://api.cloudsmith.io" + config_file_path: str | None = None + creds_file_path: str | None = None + profile: str | None = None + debug: bool = False + # Pre-resolved values from CLI flags (highest priority) + cli_api_key: str | None = None + # API networking configuration + proxy: str | None = None + ssl_verify: bool = True + user_agent: str | None = None + headers: dict | None = None + + +@dataclass +class CredentialResult: + """Result from a successful credential resolution.""" + + api_key: str + source_name: str + source_detail: str | None = None + + +class CredentialProvider: + """Base class for credential providers.""" + + name: str = "base" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Attempt to resolve credentials. Return CredentialResult or None.""" + raise NotImplementedError + + +class CredentialProviderChain: + """Evaluates credential providers in order, returning the first valid result.""" + + def __init__(self, providers: list[CredentialProvider]): + self.providers = providers + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Evaluate each provider in order. Return the first successful result.""" + for provider in self.providers: + try: + result = provider.resolve(context) + if result is not None: + if context.debug: + logger.debug( + "Credentials resolved by %s: %s", + provider.name, + result.source_detail or result.source_name, + ) + return result + if context.debug: + logger.debug( + "Provider %s did not resolve credentials, trying next", + provider.name, + ) + except Exception: # pylint: disable=broad-exception-caught + # Intentionally broad - one provider failing shouldn't stop others + logger.debug( + "Provider %s raised an exception, skipping", + provider.name, + exc_info=True, + ) + continue + return None diff --git a/cloudsmith_cli/core/credentials/oidc/__init__.py b/cloudsmith_cli/core/credentials/oidc/__init__.py new file mode 100644 index 00000000..e3c414ab --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/__init__.py @@ -0,0 +1,6 @@ +"""OIDC support for the Cloudsmith CLI credential chain. + +References: + https://help.cloudsmith.io/docs/openid-connect + https://cloudsmith.com/blog/securely-connect-cloudsmith-to-your-cicd-using-oidc-authentication +""" diff --git a/cloudsmith_cli/core/credentials/oidc/cache.py b/cloudsmith_cli/core/credentials/oidc/cache.py new file mode 100644 index 00000000..2e71f6de --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/cache.py @@ -0,0 +1,261 @@ +"""OIDC token cache. + +Caches Cloudsmith API tokens obtained via OIDC exchange to avoid unnecessary +re-exchanges. Uses system keyring when available (respecting CLOUDSMITH_NO_KEYRING), +with automatic fallback to filesystem storage when keyring is unavailable. + +Storage behavior: +- If keyring available: Stores in system keyring only (encrypted, secure) +- If keyring unavailable: Falls back to ~/.cloudsmith/oidc_token_cache/ (0o600) + +Tokens are cached until they expire (with a safety margin). +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import os +import time +from base64 import urlsafe_b64decode + +logger = logging.getLogger(__name__) + +# Re-exchange when token has less than this many seconds remaining +EXPIRY_MARGIN_SECONDS = 60 + +_CACHE_DIR_NAME = "oidc_token_cache" + + +def _get_cache_dir() -> str: + """Return the cache directory path, creating it if needed.""" + from ....cli.config import get_default_config_path + + base = get_default_config_path() + cache_dir = os.path.join(base, _CACHE_DIR_NAME) + if not os.path.isdir(cache_dir): + os.makedirs(cache_dir, mode=0o700, exist_ok=True) + return cache_dir + + +def _cache_key(api_host: str, org: str, service_slug: str) -> str: + """Compute a deterministic cache filename from the exchange parameters.""" + raw = f"{api_host}|{org}|{service_slug}" + digest = hashlib.sha256(raw.encode()).hexdigest()[:32] + return f"oidc_{digest}.json" + + +def _decode_jwt_exp(token: str) -> float | None: + """Decode the exp claim from a JWT without verification. + + We only need the expiry for cache management; the server validates + the token on each request anyway. + """ + try: + parts = token.split(".") + if len(parts) != 3: + return None + payload_b64 = parts[1] + # Fix padding + padding = 4 - len(payload_b64) % 4 + if padding != 4: + payload_b64 += "=" * padding + payload = json.loads(urlsafe_b64decode(payload_b64)) + exp = payload.get("exp") + if exp is not None: + return float(exp) + except Exception: # pylint: disable=broad-exception-caught + # JWT could be malformed in various ways + logger.debug("Failed to decode JWT expiry", exc_info=True) + return None + + +def get_cached_token(api_host: str, org: str, service_slug: str) -> str | None: + """Return a cached token if it exists and is not expired. + + Checks keyring first (if enabled), then falls back to filesystem cache. + + Returns the token string, or None if no valid cached token is available. + """ + # Try keyring first + token = _get_from_keyring(api_host, org, service_slug) + if token: + return token + + # Fall back to filesystem cache + return _get_from_disk(api_host, org, service_slug) + + +def _get_from_keyring(api_host: str, org: str, service_slug: str) -> str | None: + """Try to get token from keyring.""" + try: + from ...keyring import get_oidc_token + + token_data = get_oidc_token(api_host, org, service_slug) + if not token_data: + return None + + data = json.loads(token_data) + token = data.get("token") + expires_at = data.get("expires_at") + + if not token: + return None + + # Check expiry + if expires_at is not None: + remaining = expires_at - time.time() + if remaining < EXPIRY_MARGIN_SECONDS: + logger.debug( + "Keyring OIDC token expired or expiring soon " + "(%.0fs remaining, margin=%ds)", + remaining, + EXPIRY_MARGIN_SECONDS, + ) + # Clean up expired token from keyring + from ...keyring import delete_oidc_token + + delete_oidc_token(api_host, org, service_slug) + return None + logger.debug("Using keyring OIDC token (expires in %.0fs)", remaining) + else: + logger.debug("Using keyring OIDC token (no expiry information)") + + return token + + except Exception: # pylint: disable=broad-exception-caught + # Keyring errors can vary by platform and backend + logger.debug("Failed to read OIDC token from keyring", exc_info=True) + return None + + +def _get_from_disk(api_host: str, org: str, service_slug: str) -> str | None: + """Try to get token from disk cache.""" + cache_dir = _get_cache_dir() + cache_file = os.path.join(cache_dir, _cache_key(api_host, org, service_slug)) + + if not os.path.isfile(cache_file): + return None + + try: + with open(cache_file) as f: + data = json.load(f) + + token = data.get("token") + expires_at = data.get("expires_at") + + if not token: + return None + + # Check expiry + if expires_at is not None: + remaining = expires_at - time.time() + if remaining < EXPIRY_MARGIN_SECONDS: + logger.debug( + "Disk cached OIDC token expired or expiring soon " + "(%.0fs remaining, margin=%ds)", + remaining, + EXPIRY_MARGIN_SECONDS, + ) + _remove_cache_file(cache_file) + return None + logger.debug("Using disk cached OIDC token (expires in %.0fs)", remaining) + else: + logger.debug("Using disk cached OIDC token (no expiry information)") + + return token + + except (json.JSONDecodeError, OSError, KeyError): + logger.debug("Failed to read OIDC token from disk cache", exc_info=True) + _remove_cache_file(cache_file) + return None + + +def store_cached_token(api_host: str, org: str, service_slug: str, token: str) -> None: + """Cache a token in keyring (if available) or filesystem. + + Tries keyring first for secure system storage. Only falls back to + filesystem if keyring is unavailable or fails. + + The expiry is extracted from the JWT exp claim. If the token has no + exp claim, it is still cached but will be used until the process + explicitly invalidates it. + """ + expires_at = _decode_jwt_exp(token) + + data = { + "token": token, + "expires_at": expires_at, + "api_host": api_host, + "org": org, + "service_slug": service_slug, + "cached_at": time.time(), + } + + # Try to store in keyring first + if _store_in_keyring(api_host, org, service_slug, data): + # Success! No need for disk fallback + return + + # Keyring unavailable/failed, fall back to disk + _store_on_disk(api_host, org, service_slug, data) + + +def _store_in_keyring(api_host: str, org: str, service_slug: str, data: dict) -> bool: + """Try to store token in keyring.""" + try: + from ...keyring import store_oidc_token + + token_data = json.dumps(data) + success = store_oidc_token(api_host, org, service_slug, token_data) + if success: + logger.debug( + "Stored OIDC token in keyring (expires_at=%s)", data.get("expires_at") + ) + return success + except Exception: # pylint: disable=broad-exception-caught + # Keyring errors can vary by platform and backend + logger.debug("Failed to store OIDC token in keyring", exc_info=True) + return False + + +def _store_on_disk(api_host: str, org: str, service_slug: str, data: dict) -> None: + """Store token on disk.""" + cache_dir = _get_cache_dir() + cache_file = os.path.join(cache_dir, _cache_key(api_host, org, service_slug)) + + try: + fd = os.open(cache_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + with os.fdopen(fd, "w") as f: + json.dump(data, f) + logger.debug( + "Stored OIDC token on disk (expires_at=%s)", data.get("expires_at") + ) + except OSError: + logger.debug("Failed to write OIDC token to disk cache", exc_info=True) + + +def invalidate_cached_token(api_host: str, org: str, service_slug: str) -> None: + """Remove a cached token from both keyring and disk.""" + # Remove from keyring + try: + from ...keyring import delete_oidc_token + + delete_oidc_token(api_host, org, service_slug) + except Exception: # pylint: disable=broad-exception-caught + # Keyring errors can vary by platform and backend + logger.debug("Failed to delete OIDC token from keyring", exc_info=True) + + # Remove from disk + cache_dir = _get_cache_dir() + cache_file = os.path.join(cache_dir, _cache_key(api_host, org, service_slug)) + _remove_cache_file(cache_file) + + +def _remove_cache_file(path: str) -> None: + """Safely remove a cache file.""" + try: + os.unlink(path) + except OSError: + pass diff --git a/cloudsmith_cli/core/credentials/oidc/detectors/__init__.py b/cloudsmith_cli/core/credentials/oidc/detectors/__init__.py new file mode 100644 index 00000000..dfee1aca --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/detectors/__init__.py @@ -0,0 +1,77 @@ +"""CI/CD environment detectors for OIDC token retrieval.""" + +from __future__ import annotations + +import logging + +from .aws import AWSDetector +from .azure_devops import AzureDevOpsDetector +from .base import ( + DEFAULT_OIDC_AUDIENCE, + OIDC_AUDIENCE_ENV_VAR, + EnvironmentDetector, + get_oidc_audience, +) +from .bitbucket_pipelines import BitbucketPipelinesDetector +from .circleci import CircleCIDetector +from .generic import GenericDetector +from .github_actions import GitHubActionsDetector +from .gitlab_ci import GitLabCIDetector + +logger = logging.getLogger(__name__) + +# Ordered list of detectors to try +_DETECTORS: list[type[EnvironmentDetector]] = [ + GitHubActionsDetector, + GitLabCIDetector, + CircleCIDetector, + AzureDevOpsDetector, + BitbucketPipelinesDetector, + AWSDetector, + GenericDetector, +] + + +def detect_environment( + debug: bool = False, + proxy: str | None = None, + ssl_verify: bool = True, + user_agent: str | None = None, + headers: dict | None = None, +) -> EnvironmentDetector | None: + """Try each detector in order, returning the first that matches. + + Args: + debug: Enable debug logging. + proxy: HTTP/HTTPS proxy URL (optional). + ssl_verify: Whether to verify SSL certificates (default: True). + user_agent: Custom user-agent string (optional). + headers: Additional headers to include (optional). + + Returns: + The first matching detector instance, or None. + """ + for detector_cls in _DETECTORS: + detector = detector_cls( + proxy=proxy, + ssl_verify=ssl_verify, + user_agent=user_agent, + headers=headers, + ) + try: + if detector.detect(): + if debug: + logger.debug("Detected CI/CD environment: %s", detector.name) + return detector + except Exception: # pylint: disable=broad-exception-caught + # Intentionally broad - one detector failing shouldn't stop others + logger.debug( + "Detector %s raised an exception during detection", + detector.name, + exc_info=True, + ) + continue + + if debug: + logger.debug("No CI/CD environment detected for OIDC") + return None diff --git a/cloudsmith_cli/core/credentials/oidc/detectors/aws.py b/cloudsmith_cli/core/credentials/oidc/detectors/aws.py new file mode 100644 index 00000000..ecc93e8e --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/detectors/aws.py @@ -0,0 +1,87 @@ +"""AWS OIDC detector. + +Uses boto3 to auto-discover AWS credentials (via any mechanism: env vars, +config files, IAM instance profiles, IRSA, etc.) and then calls +STS GetWebIdentityToken to obtain a signed JWT for Cloudsmith. + +**Note:** Requires boto3 (optional dependency). +Install with: pip install cloudsmith-cli[aws] + +**For AWS SSO users:** Requires the CRT extension, which is automatically included +in the [aws] extra. If you install boto3 separately, use: pip install boto3[crt] + +References: + https://cloudsmith.com/blog/authenticate-to-cloudsmith-with-your-aws-identity + https://aws.amazon.com/blogs/aws/simplify-access-to-external-services-using-aws-iam-outbound-identity-federation/ +""" + +from __future__ import annotations + +import logging + +from .base import EnvironmentDetector, get_oidc_audience + +logger = logging.getLogger(__name__) + + +class AWSDetector(EnvironmentDetector): + """Detects AWS environments and obtains a JWT via STS GetWebIdentityToken. + + Requires boto3 (optional dependency): pip install cloudsmith-cli[aws] + """ + + name = "AWS" + + def detect(self) -> bool: + try: + import boto3 + from botocore.exceptions import ( + BotoCoreError, + ClientError, + MissingDependencyException, + NoCredentialsError, + ) + except ImportError: + logger.debug("AWSDetector: boto3 not installed, skipping") + return False + + try: + session = boto3.Session() + credentials = session.get_credentials() + if credentials is None: + return False + # Resolve to verify they are actually usable + credentials = credentials.get_frozen_credentials() + return bool(credentials.access_key) + except MissingDependencyException as e: + logger.debug( + "AWSDetector: Missing boto3 dependency for SSO credentials: %s. " + "Install with: pip install 'botocore[crt]' or 'boto3[crt]'", + e, + ) + return False + except (BotoCoreError, NoCredentialsError, ClientError): + return False + except Exception: # pylint: disable=broad-exception-caught + # Catch-all for unexpected boto3 errors + logger.debug( + "AWSDetector: unexpected error during detection", exc_info=True + ) + return False + + def get_token(self) -> str: + import boto3 # pylint: disable=import-error + + audience = get_oidc_audience() + sts = boto3.client("sts") + response = sts.get_web_identity_token( + Audience=[audience], + SigningAlgorithm="RS256", + ) + + token = response.get("WebIdentityToken") + if not token: + raise ValueError( + "AWS STS GetWebIdentityToken did not return a WebIdentityToken" + ) + return token diff --git a/cloudsmith_cli/core/credentials/oidc/detectors/azure_devops.py b/cloudsmith_cli/core/credentials/oidc/detectors/azure_devops.py new file mode 100644 index 00000000..c7738c7a --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/detectors/azure_devops.py @@ -0,0 +1,50 @@ +"""Azure DevOps OIDC detector. + +Fetches OIDC token via the ``SYSTEM_OIDCREQUESTURI`` HTTP endpoint using +the pipeline's ``SYSTEM_ACCESSTOKEN`` for authorization. + +References: + https://learn.microsoft.com/en-us/azure/devops/release-notes/2024/sprint-240-update#pipelines-and-tasks-populate-variables-to-customize-workload-identity-federation-authentication + https://docs.cloudsmith.com/integrations/integrating-with-azure-devops + https://cloudsmith.com/changelog/native-oidc-authentication-for-azure-devops +""" + +from __future__ import annotations + +import os + +from .base import EnvironmentDetector, get_oidc_audience + + +class AzureDevOpsDetector(EnvironmentDetector): + """Detects Azure DevOps and fetches OIDC token via HTTP POST.""" + + name = "Azure DevOps" + + def detect(self) -> bool: + return bool(os.environ.get("SYSTEM_OIDCREQUESTURI")) and bool( + os.environ.get("SYSTEM_ACCESSTOKEN") + ) + + def get_token(self) -> str: + request_uri = os.environ["SYSTEM_OIDCREQUESTURI"] + access_token = os.environ["SYSTEM_ACCESSTOKEN"] + audience = get_oidc_audience() + + session = self._create_session() + response = session.post( + request_uri, + json={"audience": audience}, + headers={ + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + }, + timeout=30, + ) + response.raise_for_status() + + data = response.json() + token = data.get("oidcToken") + if not token: + raise ValueError("Azure DevOps OIDC response did not contain an oidcToken") + return token diff --git a/cloudsmith_cli/core/credentials/oidc/detectors/base.py b/cloudsmith_cli/core/credentials/oidc/detectors/base.py new file mode 100644 index 00000000..1b4c3747 --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/detectors/base.py @@ -0,0 +1,70 @@ +"""Base class and utilities for CI/CD environment OIDC detectors.""" + +from __future__ import annotations + +import os + +import requests + +DEFAULT_OIDC_AUDIENCE = "cloudsmith" +OIDC_AUDIENCE_ENV_VAR = "CLOUDSMITH_OIDC_AUDIENCE" + + +def get_oidc_audience() -> str: + """Return the OIDC audience to request, allowing override via env var.""" + return os.environ.get(OIDC_AUDIENCE_ENV_VAR, "").strip() or DEFAULT_OIDC_AUDIENCE + + +class EnvironmentDetector: + """Base class for CI/CD environment detectors.""" + + name: str = "base" + + def __init__( + self, + proxy: str | None = None, + ssl_verify: bool = True, + user_agent: str | None = None, + headers: dict | None = None, + ): + """Initialize detector with optional networking configuration. + + Args: + proxy: HTTP/HTTPS proxy URL (optional). + ssl_verify: Whether to verify SSL certificates (default: True). + user_agent: Custom user-agent string (optional). + headers: Additional headers to include (optional). + """ + self.proxy = proxy + self.ssl_verify = ssl_verify + self.user_agent = user_agent + self.headers = headers + + def detect(self) -> bool: + """Return True if running in this CI/CD environment.""" + raise NotImplementedError + + def get_token(self) -> str: + """Retrieve the OIDC JWT from this environment. Raises on failure.""" + raise NotImplementedError + + def _create_session(self) -> requests.Session: + """Create a requests session configured with networking settings. + + Returns: + Configured requests.Session instance. + """ + session = requests.Session() + + if self.proxy: + session.proxies = {"http": self.proxy, "https": self.proxy} + + session.verify = self.ssl_verify + + if self.user_agent: + session.headers.update({"User-Agent": self.user_agent}) + + if self.headers: + session.headers.update(self.headers) + + return session diff --git a/cloudsmith_cli/core/credentials/oidc/detectors/bitbucket_pipelines.py b/cloudsmith_cli/core/credentials/oidc/detectors/bitbucket_pipelines.py new file mode 100644 index 00000000..3d30f82b --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/detectors/bitbucket_pipelines.py @@ -0,0 +1,29 @@ +"""Bitbucket Pipelines OIDC detector. + +Reads OIDC token from the ``BITBUCKET_STEP_OIDC_TOKEN`` environment variable +set when OIDC is enabled for a pipeline step. + +References: + https://support.atlassian.com/bitbucket-cloud/docs/variables-and-secrets/ +""" + +from __future__ import annotations + +import os + +from .base import EnvironmentDetector + + +class BitbucketPipelinesDetector(EnvironmentDetector): + """Detects Bitbucket Pipelines and reads OIDC token from environment.""" + + name = "Bitbucket Pipelines" + + def detect(self) -> bool: + return bool(os.environ.get("BITBUCKET_STEP_OIDC_TOKEN")) + + def get_token(self) -> str: + token = os.environ.get("BITBUCKET_STEP_OIDC_TOKEN") + if not token: + raise ValueError("BITBUCKET_STEP_OIDC_TOKEN not set") + return token diff --git a/cloudsmith_cli/core/credentials/oidc/detectors/circleci.py b/cloudsmith_cli/core/credentials/oidc/detectors/circleci.py new file mode 100644 index 00000000..48a8482c --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/detectors/circleci.py @@ -0,0 +1,35 @@ +"""CircleCI OIDC detector. + +Reads OIDC token from the ``CIRCLE_OIDC_TOKEN_V2`` or ``CIRCLE_OIDC_TOKEN`` +environment variables set by CircleCI's OIDC support. + +References: + https://circleci.com/docs/guides/permissions-authentication/openid-connect-tokens/ + https://docs.cloudsmith.com/integrations/integrating-with-circleci +""" + +from __future__ import annotations + +import os + +from .base import EnvironmentDetector + + +class CircleCIDetector(EnvironmentDetector): + """Detects CircleCI and reads OIDC token from environment variable.""" + + name = "CircleCI" + + def detect(self) -> bool: + return os.environ.get("CIRCLECI") == "true" and bool( + os.environ.get("CIRCLE_OIDC_TOKEN_V2") + or os.environ.get("CIRCLE_OIDC_TOKEN") + ) + + def get_token(self) -> str: + token = os.environ.get("CIRCLE_OIDC_TOKEN_V2") or os.environ.get( + "CIRCLE_OIDC_TOKEN" + ) + if not token: + raise ValueError("CircleCI detected but CIRCLE_OIDC_TOKEN not set") + return token diff --git a/cloudsmith_cli/core/credentials/oidc/detectors/generic.py b/cloudsmith_cli/core/credentials/oidc/detectors/generic.py new file mode 100644 index 00000000..ccb43e57 --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/detectors/generic.py @@ -0,0 +1,35 @@ +"""Generic fallback OIDC detector. + +Reads an OIDC token from the ``CLOUDSMITH_OIDC_TOKEN`` environment variable. +Works for Jenkins (with credentials binding plugin), or any custom CI/CD +system that can inject an OIDC token via environment variable. + +References: + https://docs.cloudsmith.com/authentication/setup-jenkins-to-authenticate-to-cloudsmith-using-oidc + https://plugins.jenkins.io/credentials-binding/ +""" + +from __future__ import annotations + +import os + +from .base import EnvironmentDetector + + +class GenericDetector(EnvironmentDetector): + """Generic fallback: reads OIDC token from CLOUDSMITH_OIDC_TOKEN env var. + + Works for Jenkins (with credentials binding plugin), or any custom + CI/CD system that can inject an OIDC token via environment variable. + """ + + name = "Generic (CLOUDSMITH_OIDC_TOKEN)" + + def detect(self) -> bool: + return bool(os.environ.get("CLOUDSMITH_OIDC_TOKEN")) + + def get_token(self) -> str: + token = os.environ.get("CLOUDSMITH_OIDC_TOKEN") + if not token: + raise ValueError("CLOUDSMITH_OIDC_TOKEN not set") + return token diff --git a/cloudsmith_cli/core/credentials/oidc/detectors/github_actions.py b/cloudsmith_cli/core/credentials/oidc/detectors/github_actions.py new file mode 100644 index 00000000..30458980 --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/detectors/github_actions.py @@ -0,0 +1,54 @@ +"""GitHub Actions OIDC detector. + +Fetches OIDC token via the Actions runtime HTTP endpoint. + +References: + https://docs.github.com/en/actions/reference/security/oidc + https://docs.cloudsmith.com/authentication/setup-cloudsmith-to-authenticate-with-oidc-in-github-actions +""" + +from __future__ import annotations + +import os + +from .base import EnvironmentDetector, get_oidc_audience + + +class GitHubActionsDetector(EnvironmentDetector): + """Detects GitHub Actions and fetches OIDC token via HTTP request.""" + + name = "GitHub Actions" + + def detect(self) -> bool: + return ( + os.environ.get("GITHUB_ACTIONS") == "true" + and bool(os.environ.get("ACTIONS_ID_TOKEN_REQUEST_URL")) + and bool(os.environ.get("ACTIONS_ID_TOKEN_REQUEST_TOKEN")) + ) + + def get_token(self) -> str: + from urllib.parse import quote + + request_url = os.environ["ACTIONS_ID_TOKEN_REQUEST_URL"] + request_token = os.environ["ACTIONS_ID_TOKEN_REQUEST_TOKEN"] + + audience = get_oidc_audience() + separator = "&" if "?" in request_url else "?" + url = f"{request_url}{separator}audience={quote(audience, safe='')}" + + session = self._create_session() + response = session.get( + url, + headers={ + "Authorization": f"Bearer {request_token}", + "Accept": "application/json; api-version=2.0", + }, + timeout=30, + ) + response.raise_for_status() + + data = response.json() + token = data.get("value") + if not token: + raise ValueError("GitHub Actions OIDC response did not contain a token") + return token diff --git a/cloudsmith_cli/core/credentials/oidc/detectors/gitlab_ci.py b/cloudsmith_cli/core/credentials/oidc/detectors/gitlab_ci.py new file mode 100644 index 00000000..fb78ecfe --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/detectors/gitlab_ci.py @@ -0,0 +1,44 @@ +"""GitLab CI OIDC detector. + +Reads OIDC token from environment variables populated by GitLab's +``id_tokens`` configuration in ``.gitlab-ci.yml``. + +References: + https://docs.gitlab.com/ci/cloud_services/ + https://docs.cloudsmith.com/integrations/integrating-with-gitlab-cicd +""" + +from __future__ import annotations + +import os + +from .base import EnvironmentDetector + + +class GitLabCIDetector(EnvironmentDetector): + """Detects GitLab CI and reads OIDC token from environment variable. + + GitLab requires users to configure ``id_tokens`` in .gitlab-ci.yml. + The token is exposed as an environment variable with a user-chosen name. + We check common conventions: CLOUDSMITH_OIDC_TOKEN, CI_JOB_JWT_V2, CI_JOB_JWT. + """ + + name = "GitLab CI" + + TOKEN_ENV_VARS = ["CLOUDSMITH_OIDC_TOKEN", "CI_JOB_JWT_V2", "CI_JOB_JWT"] + + def detect(self) -> bool: + if os.environ.get("GITLAB_CI") != "true": + return False + return any(os.environ.get(var) for var in self.TOKEN_ENV_VARS) + + def get_token(self) -> str: + for var in self.TOKEN_ENV_VARS: + token = os.environ.get(var) + if token: + return token + raise ValueError( + "GitLab CI detected but no OIDC token found. " + "Configure id_tokens in .gitlab-ci.yml and set one of: " + + ", ".join(self.TOKEN_ENV_VARS) + ) diff --git a/cloudsmith_cli/core/credentials/oidc/exchange.py b/cloudsmith_cli/core/credentials/oidc/exchange.py new file mode 100644 index 00000000..2e2b8b2f --- /dev/null +++ b/cloudsmith_cli/core/credentials/oidc/exchange.py @@ -0,0 +1,174 @@ +"""Cloudsmith OIDC token exchange. + +Exchanges a vendor CI/CD OIDC JWT for a short-lived Cloudsmith API token +via the POST /openid/{org}/ endpoint. + +References: + https://help.cloudsmith.io/docs/openid-connect +""" + +from __future__ import annotations + +import logging +import random +import time + +import requests + +logger = logging.getLogger(__name__) + +DEFAULT_TIMEOUT = 30 +MAX_RETRIES = 3 + + +def create_exchange_session( + proxy: str | None = None, + ssl_verify: bool = True, + user_agent: str | None = None, + headers: dict | None = None, +) -> requests.Session: + """Create a requests session configured with networking settings. + + Args: + proxy: HTTP/HTTPS proxy URL. + ssl_verify: Whether to verify SSL certificates. + user_agent: Custom user-agent string. + headers: Additional headers to include. + + Returns: + Configured requests.Session instance. + """ + session = requests.Session() + + if proxy: + session.proxies = {"http": proxy, "https": proxy} + + session.verify = ssl_verify + + if user_agent: + session.headers.update({"User-Agent": user_agent}) + + if headers: + session.headers.update(headers) + + return session + + +def exchange_oidc_token( + api_host: str, + org: str, + service_slug: str, + oidc_token: str, + proxy: str | None = None, + ssl_verify: bool = True, + user_agent: str | None = None, + headers: dict | None = None, +) -> str: + """Exchange a vendor OIDC JWT for a Cloudsmith API token. + + Args: + api_host: The Cloudsmith API host (e.g. "https://api.cloudsmith.io"). + org: The Cloudsmith organization slug. + service_slug: The Cloudsmith service account slug. + oidc_token: The vendor OIDC JWT to exchange. + proxy: HTTP/HTTPS proxy URL (optional). + ssl_verify: Whether to verify SSL certificates (default: True). + user_agent: Custom user-agent string (optional). + headers: Additional headers to include (optional). + + Returns: + The short-lived Cloudsmith API token. + + Raises: + OidcExchangeError: If the exchange fails after retries. + """ + # Normalize host + host = api_host.rstrip("/") + if not host.startswith("http"): + host = f"https://{host}" + + url = f"{host}/openid/{org}/" + payload = { + "oidc_token": oidc_token, + "service_slug": service_slug, + } + + # Create configured session for the exchange + session = create_exchange_session( + proxy=proxy, + ssl_verify=ssl_verify, + user_agent=user_agent, + headers=headers, + ) + + last_error = None + for attempt in range(1, MAX_RETRIES + 1): + try: + response = session.post( + url, + json=payload, + headers={"Content-Type": "application/json"}, + timeout=DEFAULT_TIMEOUT, + ) + except requests.exceptions.RequestException as exc: + last_error = OidcExchangeError(f"OIDC token exchange request failed: {exc}") + logger.debug( + "OIDC exchange attempt %d/%d failed with error: %s", + attempt, + MAX_RETRIES, + exc, + ) + if attempt < MAX_RETRIES: + backoff = min(30, (2**attempt) + random.uniform(0, 1)) + logger.debug("Retrying in %.1fs...", backoff) + time.sleep(backoff) + continue + + if response.status_code in (200, 201): + data = response.json() + token = data.get("token") + if not token or not isinstance(token, str) or not token.strip(): + raise OidcExchangeError( + "Cloudsmith OIDC exchange returned an empty or invalid token" + ) + return token + + # 4xx errors are not retryable + if 400 <= response.status_code < 500: + error_detail = "" + try: + error_detail = response.json() + except Exception: # pylint: disable=broad-exception-caught + # Intentionally broad - response could be malformed in various ways + error_detail = response.text[:1000] # Truncate long responses + + logger.debug( + "OIDC exchange 4xx error: %s - %s", response.status_code, error_detail + ) + raise OidcExchangeError( + f"OIDC token exchange failed with {response.status_code}: " + f"{error_detail}" + ) + + # 5xx errors are retryable + last_error = OidcExchangeError( + f"OIDC token exchange failed with {response.status_code} " + f"(attempt {attempt}/{MAX_RETRIES})" + ) + logger.debug( + "OIDC exchange attempt %d/%d failed with status %d", + attempt, + MAX_RETRIES, + response.status_code, + ) + + if attempt < MAX_RETRIES: + backoff = min(30, (2**attempt) + random.uniform(0, 1)) + logger.debug("Retrying in %.1fs...", backoff) + time.sleep(backoff) + + raise last_error + + +class OidcExchangeError(Exception): + """Raised when the OIDC token exchange with Cloudsmith fails.""" diff --git a/cloudsmith_cli/core/credentials/providers.py b/cloudsmith_cli/core/credentials/providers.py new file mode 100644 index 00000000..6d9f907a --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers.py @@ -0,0 +1,234 @@ +"""Credential providers for the Cloudsmith CLI.""" + +from __future__ import annotations + +import logging +import os + +from . import CredentialContext, CredentialProvider, CredentialResult + +logger = logging.getLogger(__name__) + + +class EnvironmentVariableProvider(CredentialProvider): + """Resolves credentials from the CLOUDSMITH_API_KEY environment variable.""" + + name = "environment_variable" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + api_key = os.environ.get("CLOUDSMITH_API_KEY") + if api_key and api_key.strip(): + suffix = api_key.strip()[-4:] + return CredentialResult( + api_key=api_key.strip(), + source_name="environment_variable", + source_detail=f"CLOUDSMITH_API_KEY env var (ends with ...{suffix})", + ) + return None + + +class ConfigFileProvider(CredentialProvider): + """Resolves credentials from the credentials.ini config file.""" + + name = "config_file" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + from ...cli.config import CredentialsReader + + reader = CredentialsReader + path = context.creds_file_path + + try: + config = {} + if path and os.path.exists(path): + if os.path.isdir(path): + reader.config_searchpath.insert(0, path) + else: + reader.config_files.insert(0, path) + + raw_config = reader.read_config() + values = raw_config.get("default", {}) + config.update(values) + + if context.profile and context.profile != "default": + profile_values = raw_config.get(f"profile:{context.profile}", {}) + config.update(profile_values) + + api_key = config.get("api_key") + if api_key and isinstance(api_key, str): + api_key = api_key.strip().strip("'\"") + if api_key: + source_files = reader.find_existing_files() + source = source_files[0] if source_files else "credentials.ini" + return CredentialResult( + api_key=api_key, + source_name="config_file", + source_detail=f"credentials.ini ({source})", + ) + except Exception: # pylint: disable=broad-exception-caught + # Config file errors can be varied (permissions, parse errors, etc.) + logger.debug("ConfigFileProvider failed to read config", exc_info=True) + + return None + + +class KeyringProvider(CredentialProvider): + """Resolves credentials from SSO tokens stored in the system keyring.""" + + name = "keyring" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + from ...cli import saml + from ...core import keyring + from ...core.api.exceptions import ApiException + + if not keyring.should_use_keyring(): + return None + + api_host = context.api_host + access_token = keyring.get_access_token(api_host) + + if not access_token: + return None + + # Attempt refresh if needed + try: + if keyring.should_refresh_access_token(api_host): + refresh_token = keyring.get_refresh_token(api_host) + import requests + + session = requests.Session() + new_access_token, new_refresh_token = saml.refresh_access_token( + api_host, + access_token, + refresh_token, + session=session, + ) + keyring.store_sso_tokens(api_host, new_access_token, new_refresh_token) + access_token = new_access_token + except (ApiException, Exception): # pylint: disable=broad-exception-caught + # SSO refresh can fail in various ways (network, API errors, keyring issues) + keyring.update_refresh_attempted_at(api_host) + logger.debug("Failed to refresh SSO token", exc_info=True) + + return CredentialResult( + api_key=access_token, + source_name="keyring", + source_detail="SSO token from system keyring", + ) + + @staticmethod + def is_bearer_token() -> bool: + """Keyring tokens are Bearer tokens, not API keys.""" + return True + + +class OidcProvider(CredentialProvider): + """Resolves credentials via OIDC auto-discovery in CI/CD environments. + + Requires CLOUDSMITH_ORG and CLOUDSMITH_SERVICE_SLUG environment variables. + Auto-detects the CI/CD environment, fetches the vendor OIDC JWT, and + exchanges it for a short-lived Cloudsmith API token. + """ + + name = "oidc" + + def resolve( # pylint: disable=too-many-return-statements + self, context: CredentialContext + ) -> CredentialResult | None: + import re + + org = os.environ.get("CLOUDSMITH_ORG", "").strip() + service_slug = os.environ.get("CLOUDSMITH_SERVICE_SLUG", "").strip() + + if not org or not service_slug: + if context.debug: + logger.debug( + "OidcProvider: CLOUDSMITH_ORG and/or CLOUDSMITH_SERVICE_SLUG " + "not set, skipping OIDC auto-discovery" + ) + return None + + # Validate slug format (alphanumeric, hyphens, underscores) + if not re.match(r"^[a-zA-Z0-9_-]+$", org): + logger.warning("OidcProvider: Invalid CLOUDSMITH_ORG format: %s", org) + return None + if not re.match(r"^[a-zA-Z0-9_-]+$", service_slug): + logger.warning( + "OidcProvider: Invalid CLOUDSMITH_SERVICE_SLUG format: %s", service_slug + ) + return None + + from .oidc.detectors import detect_environment + from .oidc.exchange import exchange_oidc_token + + # Detect CI/CD environment and get vendor JWT + detector = detect_environment( + debug=context.debug, + proxy=context.proxy, + ssl_verify=context.ssl_verify, + user_agent=context.user_agent, + headers=context.headers, + ) + if detector is None: + if context.debug: + logger.debug("OidcProvider: No CI/CD environment detected, skipping") + return None + + try: + vendor_token = detector.get_token() + except Exception: # pylint: disable=broad-exception-caught + # Detector token retrieval can fail in various ways + logger.debug( + "OidcProvider: Failed to retrieve OIDC token from %s", + detector.name, + exc_info=True, + ) + return None + + if not vendor_token: + logger.debug( + "OidcProvider: %s detector returned empty token", detector.name + ) + return None + + # Check cache before doing a full exchange + from .oidc.cache import get_cached_token, store_cached_token + + cached = get_cached_token(context.api_host, org, service_slug) + if cached: + logger.debug("OidcProvider: Using cached OIDC token") + return CredentialResult( + api_key=cached, + source_name="oidc", + source_detail=f"OIDC via {detector.name} [cached] (org: {org}, service: {service_slug})", + ) + + # Exchange vendor JWT for Cloudsmith token + try: + cloudsmith_token = exchange_oidc_token( + api_host=context.api_host, + org=org, + service_slug=service_slug, + oidc_token=vendor_token, + proxy=context.proxy, + ssl_verify=context.ssl_verify, + user_agent=context.user_agent, + headers=context.headers, + ) + except Exception: # pylint: disable=broad-exception-caught + # Exchange can fail for various reasons (network, API errors, auth issues) + logger.debug("OidcProvider: OIDC token exchange failed", exc_info=True) + return None + + if not cloudsmith_token: + return None + + # Cache the token for future use + store_cached_token(context.api_host, org, service_slug, cloudsmith_token) + + return CredentialResult( + api_key=cloudsmith_token, + source_name="oidc", + source_detail=f"OIDC via {detector.name} (org: {org}, service: {service_slug})", + ) diff --git a/cloudsmith_cli/core/keyring.py b/cloudsmith_cli/core/keyring.py index 974598dd..9168780a 100644 --- a/cloudsmith_cli/core/keyring.py +++ b/cloudsmith_cli/core/keyring.py @@ -138,3 +138,63 @@ def delete_sso_tokens(api_host): """Delete all SSO tokens from the keyring for the given host.""" results = [_delete_value(key) for key in _sso_keys(api_host)] return any(results) + + +# OIDC token storage +OIDC_TOKEN_KEY = "cloudsmith_cli-oidc_token-{api_host}-{org}-{service_slug}" + + +def store_oidc_token(api_host, org, service_slug, token_data): + """Store OIDC token in keyring if enabled. + + Args: + api_host: API host (e.g., "api.cloudsmith.io") + org: Organization slug + service_slug: Service slug + token_data: JSON string containing token and expiry + + Returns: + True if stored in keyring, False if keyring disabled + """ + if not should_use_keyring(): + return False + + key = OIDC_TOKEN_KEY.format(api_host=api_host, org=org, service_slug=service_slug) + try: + _set_value(key, token_data) + return True + except KeyringError: + return False + + +def get_oidc_token(api_host, org, service_slug): + """Retrieve OIDC token from keyring. + + Args: + api_host: API host (e.g., "api.cloudsmith.io") + org: Organization slug + service_slug: Service slug + + Returns: + JSON string containing token and expiry, or None if not found + """ + if not should_use_keyring(): + return None + + key = OIDC_TOKEN_KEY.format(api_host=api_host, org=org, service_slug=service_slug) + return _get_value(key) + + +def delete_oidc_token(api_host, org, service_slug): + """Delete OIDC token from keyring. + + Args: + api_host: API host (e.g., "api.cloudsmith.io") + org: Organization slug + service_slug: Service slug + + Returns: + True if deleted, False otherwise + """ + key = OIDC_TOKEN_KEY.format(api_host=api_host, org=org, service_slug=service_slug) + return _delete_value(key) diff --git a/cloudsmith_cli/core/tests/test_credentials.py b/cloudsmith_cli/core/tests/test_credentials.py new file mode 100644 index 00000000..3834ff8b --- /dev/null +++ b/cloudsmith_cli/core/tests/test_credentials.py @@ -0,0 +1,1011 @@ +"""Tests for the credential provider chain and OIDC auto-discovery.""" + +# pylint: disable=too-many-lines + +import json +import os +import stat +import time +from unittest.mock import MagicMock, patch + +import pytest + +from cloudsmith_cli.core.credentials import ( + CredentialContext, + CredentialProvider, + CredentialProviderChain, + CredentialResult, +) +from cloudsmith_cli.core.credentials.oidc.detectors import ( + AWSDetector, + AzureDevOpsDetector, + BitbucketPipelinesDetector, + CircleCIDetector, + GenericDetector, + GitHubActionsDetector, + GitLabCIDetector, + detect_environment, +) +from cloudsmith_cli.core.credentials.oidc.exchange import ( + OidcExchangeError, + exchange_oidc_token, +) +from cloudsmith_cli.core.credentials.providers import ( + EnvironmentVariableProvider, + OidcProvider, +) + +# --- CredentialProviderChain Tests --- + + +class DummyProvider(CredentialProvider): + """Test provider that returns a configurable result.""" + + def __init__(self, name, result=None, should_raise=False): + self.name = name + self._result = result + self._should_raise = should_raise + + def resolve(self, context): + if self._should_raise: + raise RuntimeError("Provider error") + return self._result + + +class TestCredentialProviderChain: + def test_first_provider_wins(self): + result1 = CredentialResult(api_key="key1", source_name="first") + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=result1), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key1" + assert result.source_name == "first" + + def test_falls_through_to_second(self): + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=None), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key2" + + def test_returns_none_when_all_fail(self): + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=None), + DummyProvider("p2", result=None), + ] + ) + result = chain.resolve(CredentialContext()) + assert result is None + + def test_skips_erroring_provider(self): + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", should_raise=True), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key2" + + def test_empty_chain(self): + chain = CredentialProviderChain([]) + result = chain.resolve(CredentialContext()) + assert result is None + + +# --- EnvironmentVariableProvider Tests --- + + +class TestEnvironmentVariableProvider: + def test_resolves_from_env(self): + provider = EnvironmentVariableProvider() + with patch.dict(os.environ, {"CLOUDSMITH_API_KEY": "test-key-1234"}): + result = provider.resolve(CredentialContext()) + assert result is not None + assert result.api_key == "test-key-1234" + assert "1234" in result.source_detail + + def test_returns_none_when_not_set(self): + provider = EnvironmentVariableProvider() + env = os.environ.copy() + env.pop("CLOUDSMITH_API_KEY", None) + with patch.dict(os.environ, env, clear=True): + result = provider.resolve(CredentialContext()) + assert result is None + + def test_returns_none_for_empty_value(self): + provider = EnvironmentVariableProvider() + with patch.dict(os.environ, {"CLOUDSMITH_API_KEY": " "}): + result = provider.resolve(CredentialContext()) + assert result is None + + +# --- CI/CD Detector Tests --- + + +class TestGitHubActionsDetector: + def test_detects_github_actions(self): + detector = GitHubActionsDetector() + env = { + "GITHUB_ACTIONS": "true", + "ACTIONS_ID_TOKEN_REQUEST_URL": "https://token.example.com", + "ACTIONS_ID_TOKEN_REQUEST_TOKEN": "gha-token", + } + with patch.dict(os.environ, env, clear=True): + assert detector.detect() is True + + def test_not_detected_without_env(self): + detector = GitHubActionsDetector() + with patch.dict(os.environ, {}, clear=True): + assert detector.detect() is False + + def test_not_detected_without_request_url(self): + detector = GitHubActionsDetector() + env = { + "GITHUB_ACTIONS": "true", + "ACTIONS_ID_TOKEN_REQUEST_TOKEN": "gha-token", + } + with patch.dict(os.environ, env, clear=True): + assert detector.detect() is False + + @patch("cloudsmith_cli.core.credentials.oidc.detectors.base.requests.Session") + def test_get_token(self, mock_session_cls): + mock_response = MagicMock() + mock_response.json.return_value = {"value": "jwt-token-123"} + mock_response.raise_for_status.return_value = None + + mock_session = MagicMock() + mock_session.get.return_value = mock_response + mock_session_cls.return_value = mock_session + + detector = GitHubActionsDetector() + env = { + "ACTIONS_ID_TOKEN_REQUEST_URL": "https://token.example.com", + "ACTIONS_ID_TOKEN_REQUEST_TOKEN": "gha-token", + } + with patch.dict(os.environ, env): + token = detector.get_token() + assert token == "jwt-token-123" + + mock_session.get.assert_called_once() + call_args = mock_session.get.call_args + assert "Bearer gha-token" in call_args.kwargs["headers"]["Authorization"] + assert "audience=cloudsmith" in call_args.args[0] + + @patch("cloudsmith_cli.core.credentials.oidc.detectors.base.requests.Session") + def test_get_token_custom_audience(self, mock_session_cls): + mock_response = MagicMock() + mock_response.json.return_value = {"value": "jwt-token-123"} + mock_response.raise_for_status.return_value = None + + mock_session = MagicMock() + mock_session.get.return_value = mock_response + mock_session_cls.return_value = mock_session + + detector = GitHubActionsDetector() + env = { + "ACTIONS_ID_TOKEN_REQUEST_URL": "https://token.example.com", + "ACTIONS_ID_TOKEN_REQUEST_TOKEN": "gha-token", + "CLOUDSMITH_OIDC_AUDIENCE": "custom-aud", + } + with patch.dict(os.environ, env, clear=True): + token = detector.get_token() + assert token == "jwt-token-123" + + assert "audience=custom-aud" in mock_session.get.call_args.args[0] + + +class TestGitLabCIDetector: + def test_detects_gitlab_ci(self): + detector = GitLabCIDetector() + env = {"GITLAB_CI": "true", "CI_JOB_JWT_V2": "gitlab-jwt"} + with patch.dict(os.environ, env, clear=True): + assert detector.detect() is True + + def test_not_detected_without_gitlab_ci(self): + detector = GitLabCIDetector() + with patch.dict(os.environ, {"CI_JOB_JWT_V2": "jwt"}, clear=True): + assert detector.detect() is False + + def test_prefers_cloudsmith_oidc_token(self): + detector = GitLabCIDetector() + env = { + "GITLAB_CI": "true", + "CLOUDSMITH_OIDC_TOKEN": "preferred-jwt", + "CI_JOB_JWT_V2": "fallback-jwt", + } + with patch.dict(os.environ, env, clear=True): + token = detector.get_token() + assert token == "preferred-jwt" + + +class TestCircleCIDetector: + def test_detects_circleci(self): + detector = CircleCIDetector() + env = {"CIRCLECI": "true", "CIRCLE_OIDC_TOKEN_V2": "circle-jwt"} + with patch.dict(os.environ, env, clear=True): + assert detector.detect() is True + + def test_not_detected_without_token(self): + detector = CircleCIDetector() + env = {"CIRCLECI": "true"} + with patch.dict(os.environ, env, clear=True): + assert detector.detect() is False + + def test_get_token_v2(self): + detector = CircleCIDetector() + env = { + "CIRCLECI": "true", + "CIRCLE_OIDC_TOKEN_V2": "v2-jwt", + } + with patch.dict(os.environ, env, clear=True): + assert detector.get_token() == "v2-jwt" + + +class TestAzureDevOpsDetector: + def test_detects_azure(self): + detector = AzureDevOpsDetector() + env = { + "SYSTEM_OIDCREQUESTURI": "https://oidc.example.com", + "SYSTEM_ACCESSTOKEN": "ado-token", + } + with patch.dict(os.environ, env, clear=True): + assert detector.detect() is True + + @patch("cloudsmith_cli.core.credentials.oidc.detectors.base.requests.Session") + def test_get_token(self, mock_session_cls): + mock_response = MagicMock() + mock_response.json.return_value = {"oidcToken": "ado-jwt-123"} + mock_response.raise_for_status.return_value = None + + mock_session = MagicMock() + mock_session.post.return_value = mock_response + mock_session_cls.return_value = mock_session + + detector = AzureDevOpsDetector() + env = { + "SYSTEM_OIDCREQUESTURI": "https://oidc.example.com", + "SYSTEM_ACCESSTOKEN": "ado-token", + } + with patch.dict(os.environ, env): + token = detector.get_token() + assert token == "ado-jwt-123" + + mock_session.post.assert_called_once() + call_args = mock_session.post.call_args + assert call_args.kwargs["json"]["audience"] == "cloudsmith" + + +class TestBitbucketPipelinesDetector: + def test_detects_bitbucket(self): + detector = BitbucketPipelinesDetector() + env = {"BITBUCKET_STEP_OIDC_TOKEN": "bb-jwt"} + with patch.dict(os.environ, env, clear=True): + assert detector.detect() is True + + def test_get_token(self): + detector = BitbucketPipelinesDetector() + env = {"BITBUCKET_STEP_OIDC_TOKEN": "bb-jwt-123"} + with patch.dict(os.environ, env): + assert detector.get_token() == "bb-jwt-123" + + +class TestAWSDetector: + def test_detects_aws_with_credentials(self): + mock_creds = MagicMock() + mock_creds.access_key = "AKIAIOSFODNN7EXAMPLE" + mock_frozen = MagicMock() + mock_frozen.access_key = "AKIAIOSFODNN7EXAMPLE" + mock_creds.get_frozen_credentials.return_value = mock_frozen + mock_session = MagicMock() + mock_session.get_credentials.return_value = mock_creds + mock_boto3 = MagicMock() + mock_boto3.Session.return_value = mock_session + + mock_botocore_exc = MagicMock() + mock_botocore_exc.BotoCoreError = Exception + mock_botocore_exc.ClientError = Exception + mock_botocore_exc.NoCredentialsError = Exception + + detector = AWSDetector() + with patch.dict( + "sys.modules", + { + "boto3": mock_boto3, + "botocore": MagicMock(), + "botocore.exceptions": mock_botocore_exc, + }, + ): + assert detector.detect() is True + + def test_not_detected_without_credentials(self): + mock_session = MagicMock() + mock_session.get_credentials.return_value = None + mock_boto3 = MagicMock() + mock_boto3.Session.return_value = mock_session + + mock_botocore_exc = MagicMock() + mock_botocore_exc.BotoCoreError = Exception + mock_botocore_exc.ClientError = Exception + mock_botocore_exc.NoCredentialsError = Exception + + detector = AWSDetector() + with patch.dict( + "sys.modules", + { + "boto3": mock_boto3, + "botocore": MagicMock(), + "botocore.exceptions": mock_botocore_exc, + }, + ): + assert detector.detect() is False + + def test_not_detected_without_boto3(self): + detector = AWSDetector() + with patch.dict("sys.modules", {"boto3": None}): + assert detector.detect() is False + + def test_get_token(self): + mock_sts = MagicMock() + mock_sts.get_web_identity_token.return_value = { + "WebIdentityToken": "aws-jwt-456", + } + mock_boto3 = MagicMock() + mock_boto3.client.return_value = mock_sts + + detector = AWSDetector() + with patch.dict("sys.modules", {"boto3": mock_boto3}): + assert detector.get_token() == "aws-jwt-456" + + mock_boto3.client.assert_called_once_with("sts") + call_kwargs = mock_sts.get_web_identity_token.call_args.kwargs + assert call_kwargs["Audience"] == ["cloudsmith"] + assert call_kwargs["SigningAlgorithm"] == "RS256" + + +class TestGenericDetector: + def test_detects_generic(self): + detector = GenericDetector() + env = {"CLOUDSMITH_OIDC_TOKEN": "generic-jwt"} + with patch.dict(os.environ, env, clear=True): + assert detector.detect() is True + + def test_get_token(self): + detector = GenericDetector() + env = {"CLOUDSMITH_OIDC_TOKEN": "generic-jwt-789"} + with patch.dict(os.environ, env): + assert detector.get_token() == "generic-jwt-789" + + +class TestDetectEnvironment: + def test_returns_none_in_empty_env(self): + with patch.dict(os.environ, {}, clear=True): + assert detect_environment() is None + + def test_github_takes_priority(self): + env = { + "GITHUB_ACTIONS": "true", + "ACTIONS_ID_TOKEN_REQUEST_URL": "https://token.example.com", + "ACTIONS_ID_TOKEN_REQUEST_TOKEN": "gha-token", + "CLOUDSMITH_OIDC_TOKEN": "generic-jwt", + } + with patch.dict(os.environ, env, clear=True): + detector = detect_environment() + assert detector is not None + assert detector.name == "GitHub Actions" + + +# --- OIDC Exchange Tests --- + + +class TestOidcExchange: + @patch("cloudsmith_cli.core.credentials.oidc.exchange.requests.Session") + def test_successful_exchange(self, mock_session_cls): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"token": "cloudsmith-jwt-abc"} + + mock_session = MagicMock() + mock_session.post.return_value = mock_response + mock_session_cls.return_value = mock_session + + token = exchange_oidc_token( + api_host="https://api.cloudsmith.io", + org="test-org", + service_slug="test-service", + oidc_token="vendor-jwt", + ) + assert token == "cloudsmith-jwt-abc" + + mock_session.post.assert_called_once() + call_args = mock_session.post.call_args + assert call_args.args[0] == "https://api.cloudsmith.io/openid/test-org/" + assert call_args.kwargs["json"]["oidc_token"] == "vendor-jwt" + assert call_args.kwargs["json"]["service_slug"] == "test-service" + + @patch("cloudsmith_cli.core.credentials.oidc.exchange.requests.Session") + def test_4xx_raises_immediately(self, mock_session_cls): + mock_response = MagicMock() + mock_response.status_code = 401 + mock_response.json.return_value = {"detail": "Invalid token"} + + mock_session = MagicMock() + mock_session.post.return_value = mock_response + mock_session_cls.return_value = mock_session + + with pytest.raises(OidcExchangeError, match="401"): + exchange_oidc_token( + api_host="https://api.cloudsmith.io", + org="test-org", + service_slug="test-service", + oidc_token="bad-jwt", + ) + # 4xx should NOT retry + assert mock_session.post.call_count == 1 + + @patch("cloudsmith_cli.core.credentials.oidc.exchange.requests.Session") + def test_empty_token_raises(self, mock_session_cls): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"token": ""} + + mock_session = MagicMock() + mock_session.post.return_value = mock_response + mock_session_cls.return_value = mock_session + + with pytest.raises(OidcExchangeError, match="empty or invalid"): + exchange_oidc_token( + api_host="https://api.cloudsmith.io", + org="test-org", + service_slug="test-service", + oidc_token="vendor-jwt", + ) + + @patch("cloudsmith_cli.core.credentials.oidc.exchange.requests.Session") + def test_host_normalization(self, mock_session_cls): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"token": "jwt-123"} + + mock_session = MagicMock() + mock_session.post.return_value = mock_response + mock_session_cls.return_value = mock_session + + exchange_oidc_token( + api_host="api.cloudsmith.io", + org="myorg", + service_slug="svc", + oidc_token="jwt", + ) + call_url = mock_session.post.call_args.args[0] + assert call_url == "https://api.cloudsmith.io/openid/myorg/" + + +# --- OidcProvider Tests --- + + +class TestOidcProvider: + def test_skips_without_org(self): + provider = OidcProvider() + env = {"CLOUDSMITH_SERVICE_SLUG": "svc"} + with patch.dict(os.environ, env, clear=True): + result = provider.resolve(CredentialContext(debug=True)) + assert result is None + + def test_skips_without_service_slug(self): + provider = OidcProvider() + env = {"CLOUDSMITH_ORG": "myorg"} + with patch.dict(os.environ, env, clear=True): + result = provider.resolve(CredentialContext(debug=True)) + assert result is None + + @patch("cloudsmith_cli.core.credentials.oidc.exchange.exchange_oidc_token") + @patch("cloudsmith_cli.core.credentials.oidc.detectors.detect_environment") + def test_resolves_oidc(self, mock_detect, mock_exchange): + mock_detector = MagicMock() + mock_detector.name = "GitHub Actions" + mock_detector.get_token.return_value = "vendor-jwt" + mock_detect.return_value = mock_detector + mock_exchange.return_value = "cloudsmith-jwt-xyz" + + provider = OidcProvider() + env = { + "CLOUDSMITH_ORG": "myorg", + "CLOUDSMITH_SERVICE_SLUG": "mysvc", + } + with patch.dict(os.environ, env, clear=True), patch( + "cloudsmith_cli.core.credentials.oidc.cache.get_cached_token", + return_value=None, + ), patch( + "cloudsmith_cli.core.credentials.oidc.cache.store_cached_token", + ): + result = provider.resolve( + CredentialContext(api_host="https://api.cloudsmith.io") + ) + assert result is not None + assert result.api_key == "cloudsmith-jwt-xyz" + assert result.source_name == "oidc" + assert "GitHub Actions" in result.source_detail + assert "myorg" in result.source_detail + + @patch("cloudsmith_cli.core.credentials.oidc.detectors.detect_environment") + def test_returns_none_when_no_env_detected(self, mock_detect): + mock_detect.return_value = None + + provider = OidcProvider() + env = { + "CLOUDSMITH_ORG": "myorg", + "CLOUDSMITH_SERVICE_SLUG": "mysvc", + } + with patch.dict(os.environ, env, clear=True): + result = provider.resolve(CredentialContext()) + assert result is None + + @patch("cloudsmith_cli.core.credentials.oidc.exchange.exchange_oidc_token") + @patch("cloudsmith_cli.core.credentials.oidc.detectors.detect_environment") + def test_returns_none_on_exchange_failure(self, mock_detect, mock_exchange): + mock_detector = MagicMock() + mock_detector.name = "CircleCI" + mock_detector.get_token.return_value = "vendor-jwt" + mock_detect.return_value = mock_detector + mock_exchange.side_effect = OidcExchangeError("exchange failed") + + provider = OidcProvider() + env = { + "CLOUDSMITH_ORG": "myorg", + "CLOUDSMITH_SERVICE_SLUG": "mysvc", + } + with patch.dict(os.environ, env, clear=True), patch( + "cloudsmith_cli.core.credentials.oidc.cache.get_cached_token", + return_value=None, + ): + result = provider.resolve(CredentialContext()) + assert result is None + + +# --- Integration: Chain Priority Tests --- + + +class TestChainPriority: + """Verify that earlier providers take priority over OIDC.""" + + def test_env_var_beats_oidc(self): + env_provider = EnvironmentVariableProvider() + oidc_provider = OidcProvider() + + chain = CredentialProviderChain([env_provider, oidc_provider]) + + env = { + "CLOUDSMITH_API_KEY": "my-api-key", + "CLOUDSMITH_ORG": "myorg", + "CLOUDSMITH_SERVICE_SLUG": "mysvc", + "CLOUDSMITH_OIDC_TOKEN": "generic-jwt", + } + with patch.dict(os.environ, env, clear=True): + result = chain.resolve(CredentialContext()) + assert result is not None + assert result.api_key == "my-api-key" + assert result.source_name == "environment_variable" + + @patch("cloudsmith_cli.core.credentials.oidc.exchange.exchange_oidc_token") + @patch("cloudsmith_cli.core.credentials.oidc.detectors.detect_environment") + def test_oidc_used_as_fallback(self, mock_detect, mock_exchange): + mock_detector = MagicMock() + mock_detector.name = "Generic (CLOUDSMITH_OIDC_TOKEN)" + mock_detector.get_token.return_value = "vendor-jwt" + mock_detect.return_value = mock_detector + mock_exchange.return_value = "cloudsmith-jwt" + + env_provider = EnvironmentVariableProvider() + oidc_provider = OidcProvider() + + chain = CredentialProviderChain([env_provider, oidc_provider]) + + env = { + "CLOUDSMITH_ORG": "myorg", + "CLOUDSMITH_SERVICE_SLUG": "mysvc", + "CLOUDSMITH_OIDC_TOKEN": "generic-jwt", + } + # Ensure CLOUDSMITH_API_KEY is NOT set + clean_env = {k: v for k, v in env.items()} + with patch.dict(os.environ, clean_env, clear=True), patch( + "cloudsmith_cli.core.credentials.oidc.cache.get_cached_token", + return_value=None, + ), patch( + "cloudsmith_cli.core.credentials.oidc.cache.store_cached_token", + ): + result = chain.resolve( + CredentialContext(api_host="https://api.cloudsmith.io") + ) + assert result is not None + assert result.api_key == "cloudsmith-jwt" + assert result.source_name == "oidc" + + +# ============================================================================= +# OIDC Token Cache Tests +# ============================================================================= + + +class TestOidcTokenCache: + """Tests for OIDC token filesystem caching.""" + + def _make_jwt(self, exp=None): + """Create a minimal JWT with an optional exp claim.""" + import base64 + + header = ( + base64.urlsafe_b64encode(json.dumps({"alg": "HS256"}).encode()) + .rstrip(b"=") + .decode() + ) + payload_data = {} + if exp is not None: + payload_data["exp"] = exp + payload = ( + base64.urlsafe_b64encode(json.dumps(payload_data).encode()) + .rstrip(b"=") + .decode() + ) + sig = base64.urlsafe_b64encode(b"fakesig").rstrip(b"=").decode() + return f"{header}.{payload}.{sig}" + + def test_cache_roundtrip(self, tmp_path): + """Store and retrieve a cached token.""" + from cloudsmith_cli.core.credentials.oidc.cache import ( + get_cached_token, + store_cached_token, + ) + + future_exp = time.time() + 3600 + token = self._make_jwt(exp=future_exp) + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ): + store_cached_token("https://api.cloudsmith.io", "org", "svc", token) + result = get_cached_token("https://api.cloudsmith.io", "org", "svc") + assert result == token + + def test_expired_token_not_returned(self, tmp_path): + """An expired token should not be returned from cache.""" + from cloudsmith_cli.core.credentials.oidc.cache import ( + get_cached_token, + store_cached_token, + ) + + past_exp = time.time() - 100 + token = self._make_jwt(exp=past_exp) + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ): + store_cached_token("https://api.cloudsmith.io", "org", "svc", token) + result = get_cached_token("https://api.cloudsmith.io", "org", "svc") + assert result is None + + def test_expiring_soon_token_not_returned(self, tmp_path): + """A token expiring within the margin should not be returned.""" + from cloudsmith_cli.core.credentials.oidc.cache import ( + get_cached_token, + store_cached_token, + ) + + # Expires in 30s but margin is 60s + exp = time.time() + 30 + token = self._make_jwt(exp=exp) + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ): + store_cached_token("https://api.cloudsmith.io", "org", "svc", token) + result = get_cached_token("https://api.cloudsmith.io", "org", "svc") + assert result is None + + def test_no_cache_file_returns_none(self, tmp_path): + """Missing cache file returns None.""" + from cloudsmith_cli.core.credentials.oidc.cache import get_cached_token + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ): + result = get_cached_token("https://api.cloudsmith.io", "org", "svc") + assert result is None + + def test_different_keys_are_independent(self, tmp_path): + """Tokens for different org/service combos are cached separately.""" + from cloudsmith_cli.core.credentials.oidc.cache import ( + get_cached_token, + store_cached_token, + ) + + future_exp = time.time() + 3600 + token_a = self._make_jwt(exp=future_exp) + token_b = self._make_jwt(exp=future_exp) + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ): + store_cached_token("https://api.cloudsmith.io", "org-a", "svc", token_a) + store_cached_token("https://api.cloudsmith.io", "org-b", "svc", token_b) + assert ( + get_cached_token("https://api.cloudsmith.io", "org-a", "svc") == token_a + ) + assert ( + get_cached_token("https://api.cloudsmith.io", "org-b", "svc") == token_b + ) + + def test_invalidate_cached_token(self, tmp_path): + """invalidate_cached_token removes the cache file.""" + from cloudsmith_cli.core.credentials.oidc.cache import ( + get_cached_token, + invalidate_cached_token, + store_cached_token, + ) + + future_exp = time.time() + 3600 + token = self._make_jwt(exp=future_exp) + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ): + store_cached_token("https://api.cloudsmith.io", "org", "svc", token) + assert ( + get_cached_token("https://api.cloudsmith.io", "org", "svc") is not None + ) + invalidate_cached_token("https://api.cloudsmith.io", "org", "svc") + assert get_cached_token("https://api.cloudsmith.io", "org", "svc") is None + + def test_corrupt_cache_file_returns_none(self, tmp_path): + """A corrupted cache file should be handled gracefully.""" + from cloudsmith_cli.core.credentials.oidc.cache import ( + _cache_key, + get_cached_token, + ) + + cache_file = tmp_path / _cache_key("https://api.cloudsmith.io", "org", "svc") + cache_file.write_text("not valid json{{{") + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ): + result = get_cached_token("https://api.cloudsmith.io", "org", "svc") + assert result is None + + def test_token_without_exp_is_cached(self, tmp_path): + """A token with no exp claim should still be cached and returned.""" + from cloudsmith_cli.core.credentials.oidc.cache import ( + get_cached_token, + store_cached_token, + ) + + token = self._make_jwt(exp=None) + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ): + store_cached_token("https://api.cloudsmith.io", "org", "svc", token) + result = get_cached_token("https://api.cloudsmith.io", "org", "svc") + assert result == token + + def test_cache_file_permissions(self, tmp_path): + """Cache files should be created with restricted permissions (0600).""" + from cloudsmith_cli.core.credentials.oidc.cache import ( + _cache_key, + store_cached_token, + ) + + future_exp = time.time() + 3600 + token = self._make_jwt(exp=future_exp) + + # Mock keyring to be unavailable so disk cache is used + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ), patch("cloudsmith_cli.core.keyring.should_use_keyring", return_value=False): + store_cached_token("https://api.cloudsmith.io", "org", "svc", token) + cache_file = tmp_path / _cache_key( + "https://api.cloudsmith.io", "org", "svc" + ) + mode = cache_file.stat().st_mode + assert stat.S_IMODE(mode) == 0o600 + + @patch("cloudsmith_cli.core.credentials.oidc.exchange.exchange_oidc_token") + @patch("cloudsmith_cli.core.credentials.oidc.detectors.detect_environment") + def test_oidc_provider_uses_cache(self, mock_detect, mock_exchange, tmp_path): + """OidcProvider should use cached token instead of re-exchanging.""" + from cloudsmith_cli.core.credentials.oidc.cache import store_cached_token + + future_exp = time.time() + 3600 + cached_token = self._make_jwt(exp=future_exp) + + mock_detector = MagicMock() + mock_detector.name = "GitHub Actions" + mock_detector.get_token.return_value = "vendor-jwt" + mock_detect.return_value = mock_detector + + env = { + "CLOUDSMITH_ORG": "myorg", + "CLOUDSMITH_SERVICE_SLUG": "mysvc", + } + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ), patch.dict(os.environ, env, clear=True): + store_cached_token( + "https://api.cloudsmith.io", "myorg", "mysvc", cached_token + ) + + provider = OidcProvider() + result = provider.resolve( + CredentialContext(api_host="https://api.cloudsmith.io") + ) + + assert result is not None + assert result.api_key == cached_token + assert "[cached]" in result.source_detail + # exchange should NOT have been called + mock_exchange.assert_not_called() + + @patch("cloudsmith_cli.core.credentials.oidc.exchange.exchange_oidc_token") + @patch("cloudsmith_cli.core.credentials.oidc.detectors.detect_environment") + def test_oidc_provider_exchanges_when_cache_expired( + self, mock_detect, mock_exchange, tmp_path + ): + """OidcProvider should exchange when cached token is expired.""" + from cloudsmith_cli.core.credentials.oidc.cache import store_cached_token + + expired_token = self._make_jwt(exp=time.time() - 100) + fresh_token = "fresh-cloudsmith-jwt" + + mock_detector = MagicMock() + mock_detector.name = "GitHub Actions" + mock_detector.get_token.return_value = "vendor-jwt" + mock_detect.return_value = mock_detector + mock_exchange.return_value = fresh_token + + env = { + "CLOUDSMITH_ORG": "myorg", + "CLOUDSMITH_SERVICE_SLUG": "mysvc", + } + + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ), patch.dict(os.environ, env, clear=True): + store_cached_token( + "https://api.cloudsmith.io", "myorg", "mysvc", expired_token + ) + + provider = OidcProvider() + result = provider.resolve( + CredentialContext(api_host="https://api.cloudsmith.io") + ) + + assert result is not None + assert result.api_key == fresh_token + assert "[cached]" not in result.source_detail + mock_exchange.assert_called_once() + + def test_keyring_storage_with_fallback(self, tmp_path): + """OIDC tokens should use keyring when available, skip disk when keyring works.""" + from cloudsmith_cli.core.credentials.oidc.cache import store_cached_token + + future_exp = time.time() + 3600 + token = self._make_jwt(exp=future_exp) + + # Mock keyring to succeed + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ), patch( + "cloudsmith_cli.core.keyring.should_use_keyring", return_value=True + ), patch( + "cloudsmith_cli.core.keyring.store_oidc_token", return_value=True + ) as mock_keyring_store: + store_cached_token("https://api.cloudsmith.io", "org", "svc", token) + + # Keyring storage should have been attempted + mock_keyring_store.assert_called_once() + + # Disk storage should NOT happen when keyring succeeds + cache_files = list(tmp_path.glob("oidc_*.json")) + assert len(cache_files) == 0 + + def test_keyring_failure_falls_back_to_disk(self, tmp_path): + """When keyring fails, should fall back to disk storage.""" + from cloudsmith_cli.core.credentials.oidc.cache import store_cached_token + + future_exp = time.time() + 3600 + token = self._make_jwt(exp=future_exp) + + # Mock keyring to fail + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ), patch( + "cloudsmith_cli.core.keyring.should_use_keyring", return_value=True + ), patch( + "cloudsmith_cli.core.keyring.store_oidc_token", return_value=False + ) as mock_keyring_store: + store_cached_token("https://api.cloudsmith.io", "org", "svc", token) + + # Keyring storage should have been attempted + mock_keyring_store.assert_called_once() + + # Disk storage SHOULD happen when keyring fails + cache_files = list(tmp_path.glob("oidc_*.json")) + assert len(cache_files) == 1 + + def test_keyring_retrieval_priority(self, tmp_path): + """Keyring should be checked before disk cache.""" + from cloudsmith_cli.core.credentials.oidc.cache import get_cached_token + + future_exp = time.time() + 3600 + keyring_token = self._make_jwt(exp=future_exp) + + token_data = json.dumps( + { + "token": keyring_token, + "expires_at": future_exp, + "api_host": "https://api.cloudsmith.io", + "org": "org", + "service_slug": "svc", + "cached_at": time.time(), + } + ) + + # Mock keyring to return a token + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ), patch("cloudsmith_cli.core.keyring.get_oidc_token", return_value=token_data): + result = get_cached_token("https://api.cloudsmith.io", "org", "svc") + + # Should get the keyring token (disk cache not even checked) + assert result == keyring_token + + def test_keyring_disabled_uses_disk(self, tmp_path): + """When keyring is disabled, should use disk cache only.""" + from cloudsmith_cli.core.credentials.oidc.cache import ( + get_cached_token, + store_cached_token, + ) + + future_exp = time.time() + 3600 + token = self._make_jwt(exp=future_exp) + + # Mock CLOUDSMITH_NO_KEYRING=1 + with patch( + "cloudsmith_cli.core.credentials.oidc.cache._get_cache_dir", + return_value=str(tmp_path), + ), patch("cloudsmith_cli.core.keyring.should_use_keyring", return_value=False): + store_cached_token("https://api.cloudsmith.io", "org", "svc", token) + result = get_cached_token("https://api.cloudsmith.io", "org", "svc") + + # Should successfully use disk cache + assert result == token + + # Verify disk file was created + cache_files = list(tmp_path.glob("oidc_*.json")) + assert len(cache_files) == 1 diff --git a/setup.py b/setup.py index dbcb075f..37101eb5 100644 --- a/setup.py +++ b/setup.py @@ -63,6 +63,14 @@ def get_long_description(): "semver>=2.7.9", "urllib3>=2.5", ], + extras_require={ + "aws": [ + "boto3[crt]>=1.26.0", # For AWS OIDC authentication (includes SSO support) + ], + "all": [ + "boto3[crt]>=1.26.0", # Install all optional dependencies + ], + }, entry_points={ "console_scripts": ["cloudsmith=cloudsmith_cli.cli.commands.main:main"] },