|
| 1 | +# Telnyx ED25519 Webhook Verification |
| 2 | +# |
| 3 | +# This module provides ED25519 signature verification for Telnyx webhooks, |
| 4 | +# matching the implementation pattern used in the Go, Java, and Ruby SDKs. |
| 5 | +# |
| 6 | +# This file is in the `lib/` directory which is preserved across Stainless |
| 7 | +# code generation runs. Do not move it to the generated code directories. |
| 8 | +# |
| 9 | +# Usage: |
| 10 | +# |
| 11 | +# from telnyx.lib.webhook_verification import verify_webhook_signature, WebhookVerificationError |
| 12 | +# |
| 13 | +# try: |
| 14 | +# verify_webhook_signature(payload, headers, public_key) |
| 15 | +# except WebhookVerificationError as e: |
| 16 | +# print(f"Verification failed: {e}") |
| 17 | +# |
| 18 | +# Or use the higher-level unwrap with ED25519: |
| 19 | +# |
| 20 | +# from telnyx.lib.webhooks_ed25519 import unwrap_with_ed25519 |
| 21 | +# |
| 22 | +# event = unwrap_with_ed25519(client, payload, headers) |
| 23 | + |
| 24 | +from __future__ import annotations |
| 25 | + |
| 26 | +import time |
| 27 | +import base64 |
| 28 | +from typing import Mapping |
| 29 | + |
| 30 | +__all__ = [ |
| 31 | + "WebhookVerificationError", |
| 32 | + "verify_webhook_signature", |
| 33 | + "SIGNATURE_HEADER", |
| 34 | + "TIMESTAMP_HEADER", |
| 35 | + "TIMESTAMP_TOLERANCE_SECONDS", |
| 36 | +] |
| 37 | + |
| 38 | +# Telnyx webhook signature headers (case-insensitive per HTTP spec) |
| 39 | +SIGNATURE_HEADER = "telnyx-signature-ed25519" |
| 40 | +TIMESTAMP_HEADER = "telnyx-timestamp" |
| 41 | + |
| 42 | +# Tolerance for timestamp validation (5 minutes) |
| 43 | +TIMESTAMP_TOLERANCE_SECONDS = 300 |
| 44 | + |
| 45 | + |
| 46 | +class WebhookVerificationError(Exception): |
| 47 | + """Error raised when webhook signature verification fails. |
| 48 | +
|
| 49 | + This error is raised by the webhook verification module when: |
| 50 | + - No public key is configured |
| 51 | + - Required headers are missing (telnyx-signature-ed25519, telnyx-timestamp) |
| 52 | + - Timestamp is too old or too new (outside 5-minute tolerance) |
| 53 | + - Signature verification fails |
| 54 | + - Public key or signature format is invalid |
| 55 | +
|
| 56 | + Example: |
| 57 | + try: |
| 58 | + verify_webhook_signature(payload, headers, public_key) |
| 59 | + except WebhookVerificationError as e: |
| 60 | + print(f"Webhook verification failed: {e}") |
| 61 | + """ |
| 62 | + |
| 63 | + def __init__(self, message: str) -> None: |
| 64 | + self.message = message |
| 65 | + super().__init__(message) |
| 66 | + |
| 67 | + |
| 68 | +def _get_header(headers: Mapping[str, str], key: str) -> str | None: |
| 69 | + """Get header value case-insensitively. |
| 70 | +
|
| 71 | + Args: |
| 72 | + headers: The headers mapping |
| 73 | + key: The header key to find |
| 74 | +
|
| 75 | + Returns: |
| 76 | + The header value or None if not found |
| 77 | + """ |
| 78 | + key_lower = key.lower() |
| 79 | + for header_key, header_value in headers.items(): |
| 80 | + if header_key.lower() == key_lower: |
| 81 | + return header_value |
| 82 | + return None |
| 83 | + |
| 84 | + |
| 85 | +def verify_webhook_signature( |
| 86 | + payload: str | bytes, |
| 87 | + headers: Mapping[str, str], |
| 88 | + public_key: str, |
| 89 | +) -> None: |
| 90 | + """Verify the ED25519 signature of a Telnyx webhook. |
| 91 | +
|
| 92 | + Telnyx webhooks are signed using ED25519 with the following format: |
| 93 | + - Header "Telnyx-Signature-Ed25519": Base64-encoded ED25519 signature (64 bytes) |
| 94 | + - Header "Telnyx-Timestamp": Unix timestamp in seconds |
| 95 | + - Signed payload: "{timestamp}|{payload}" |
| 96 | +
|
| 97 | + Args: |
| 98 | + payload: The raw webhook body as string or bytes |
| 99 | + headers: The webhook HTTP headers |
| 100 | + public_key: Base64-encoded ED25519 public key from Telnyx Mission Control |
| 101 | +
|
| 102 | + Raises: |
| 103 | + WebhookVerificationError: If verification fails for any reason |
| 104 | +
|
| 105 | + Example: |
| 106 | + verify_webhook_signature(request.body, request.headers, os.environ["TELNYX_PUBLIC_KEY"]) |
| 107 | + """ |
| 108 | + # Lazy import to avoid making nacl a hard dependency |
| 109 | + try: |
| 110 | + from nacl.signing import VerifyKey # type: ignore[import-not-found] |
| 111 | + from nacl.exceptions import BadSignatureError # type: ignore[import-not-found] |
| 112 | + except ImportError as exc: |
| 113 | + raise WebhookVerificationError( |
| 114 | + "PyNaCl is required for ED25519 webhook verification. Install it with: pip install pynacl" |
| 115 | + ) from exc |
| 116 | + |
| 117 | + # Extract required headers (case-insensitive) |
| 118 | + signature_header = _get_header(headers, SIGNATURE_HEADER) |
| 119 | + timestamp_header = _get_header(headers, TIMESTAMP_HEADER) |
| 120 | + |
| 121 | + if not signature_header: |
| 122 | + raise WebhookVerificationError(f"Missing required header: {SIGNATURE_HEADER}") |
| 123 | + |
| 124 | + if not timestamp_header: |
| 125 | + raise WebhookVerificationError(f"Missing required header: {TIMESTAMP_HEADER}") |
| 126 | + |
| 127 | + # Validate timestamp to prevent replay attacks (5 minute tolerance) |
| 128 | + try: |
| 129 | + webhook_time = int(timestamp_header) |
| 130 | + current_time = int(time.time()) |
| 131 | + time_diff = abs(current_time - webhook_time) |
| 132 | + |
| 133 | + if time_diff > TIMESTAMP_TOLERANCE_SECONDS: |
| 134 | + raise WebhookVerificationError(f"Webhook timestamp is too old or too new ({time_diff}s difference)") |
| 135 | + except ValueError as e: |
| 136 | + raise WebhookVerificationError(f"Invalid timestamp format: {timestamp_header}") from e |
| 137 | + |
| 138 | + # Decode public key from base64 |
| 139 | + try: |
| 140 | + public_key_bytes = base64.b64decode(public_key) |
| 141 | + if len(public_key_bytes) != 32: |
| 142 | + raise WebhookVerificationError(f"Invalid public key: expected 32 bytes, got {len(public_key_bytes)} bytes") |
| 143 | + except Exception as e: |
| 144 | + if isinstance(e, WebhookVerificationError): |
| 145 | + raise |
| 146 | + raise WebhookVerificationError(f"Invalid public key format: {e}") from e |
| 147 | + |
| 148 | + # Decode signature from base64 |
| 149 | + try: |
| 150 | + signature_bytes = base64.b64decode(signature_header) |
| 151 | + if len(signature_bytes) != 64: |
| 152 | + raise WebhookVerificationError( |
| 153 | + f"Invalid signature length: expected 64 bytes, got {len(signature_bytes)} bytes" |
| 154 | + ) |
| 155 | + except Exception as e: |
| 156 | + if isinstance(e, WebhookVerificationError): |
| 157 | + raise |
| 158 | + raise WebhookVerificationError(f"Invalid signature format: {e}") from e |
| 159 | + |
| 160 | + # Convert payload to string if bytes |
| 161 | + if isinstance(payload, bytes): |
| 162 | + payload_str = payload.decode("utf-8") |
| 163 | + else: |
| 164 | + payload_str = payload |
| 165 | + |
| 166 | + # Build the signed payload: "{timestamp}|{payload}" |
| 167 | + signed_payload = f"{timestamp_header}|{payload_str}" |
| 168 | + |
| 169 | + # Verify the ED25519 signature using PyNaCl |
| 170 | + try: |
| 171 | + verify_key = VerifyKey(public_key_bytes) # type: ignore[no-untyped-call] |
| 172 | + # PyNaCl's verify expects message + signature combined, or use verify_key.verify(signature, message) |
| 173 | + # Actually, we need to use the raw verify since we have separate signature and message |
| 174 | + verify_key.verify(signed_payload.encode("utf-8"), signature_bytes) # type: ignore[no-untyped-call] |
| 175 | + except BadSignatureError as e: # type: ignore[no-untyped-call] |
| 176 | + raise WebhookVerificationError("Signature verification failed: signature does not match payload") from e |
| 177 | + except Exception as e: |
| 178 | + raise WebhookVerificationError(f"Signature verification failed: {e}") from e |
0 commit comments