Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .release-please-manifest.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
".": "4.89.0"
".": "4.90.0"
}
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 4.90.0 (2026-03-31)

Full Changelog: [v4.89.0...v4.90.0](https://github.com/team-telnyx/telnyx-python/compare/v4.89.0...v4.90.0)

### Features

* **lib:** add ED25519 webhook signature verification ([18be54b](https://github.com/team-telnyx/telnyx-python/commit/18be54bff12109ba79b6ba2a80a881fdfaa523f0))

## 4.89.0 (2026-03-30)

Full Changelog: [v4.88.1...v4.89.0](https://github.com/team-telnyx/telnyx-python/compare/v4.88.1...v4.89.0)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "telnyx"
version = "4.89.0"
version = "4.90.0"
description = "The official Python library for the telnyx API"
dynamic = ["readme"]
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion src/telnyx/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.

__title__ = "telnyx"
__version__ = "4.89.0" # x-release-please-version
__version__ = "4.90.0" # x-release-please-version
23 changes: 22 additions & 1 deletion src/telnyx/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@
Custom utilities for the Telnyx SDK.

This package provides additional functionality that extends the core SDK,
including WebSocket support for real-time streaming APIs.
including WebSocket support for real-time streaming APIs and ED25519
webhook signature verification.
"""

from telnyx.lib.webhooks_ed25519 import (
verify_ed25519,
unwrap_with_ed25519,
)
from telnyx.lib.speech_to_text_ws import (
SttWord,
SttEvent,
Expand All @@ -13,12 +18,28 @@
SpeechToTextWSError,
SpeechToTextStreamParams,
)
from telnyx.lib.webhook_verification import (
SIGNATURE_HEADER,
TIMESTAMP_HEADER,
TIMESTAMP_TOLERANCE_SECONDS,
WebhookVerificationError,
verify_webhook_signature,
)

__all__ = [
# Speech-to-Text WebSocket
"SpeechToTextStreamParams",
"SttEvent",
"SttWord",
"SpeechToTextWS",
"AsyncSpeechToTextWS",
"SpeechToTextWSError",
# Webhook verification (ED25519)
"WebhookVerificationError",
"verify_webhook_signature",
"verify_ed25519",
"unwrap_with_ed25519",
"SIGNATURE_HEADER",
"TIMESTAMP_HEADER",
"TIMESTAMP_TOLERANCE_SECONDS",
]
178 changes: 178 additions & 0 deletions src/telnyx/lib/webhook_verification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Telnyx ED25519 Webhook Verification
#
# This module provides ED25519 signature verification for Telnyx webhooks,
# matching the implementation pattern used in the Go, Java, and Ruby SDKs.
#
# This file is in the `lib/` directory which is preserved across Stainless
# code generation runs. Do not move it to the generated code directories.
#
# Usage:
#
# from telnyx.lib.webhook_verification import verify_webhook_signature, WebhookVerificationError
#
# try:
# verify_webhook_signature(payload, headers, public_key)
# except WebhookVerificationError as e:
# print(f"Verification failed: {e}")
#
# Or use the higher-level unwrap with ED25519:
#
# from telnyx.lib.webhooks_ed25519 import unwrap_with_ed25519
#
# event = unwrap_with_ed25519(client, payload, headers)

from __future__ import annotations

import time
import base64
from typing import Mapping

__all__ = [
"WebhookVerificationError",
"verify_webhook_signature",
"SIGNATURE_HEADER",
"TIMESTAMP_HEADER",
"TIMESTAMP_TOLERANCE_SECONDS",
]

# Telnyx webhook signature headers (case-insensitive per HTTP spec)
SIGNATURE_HEADER = "telnyx-signature-ed25519"
TIMESTAMP_HEADER = "telnyx-timestamp"

# Tolerance for timestamp validation (5 minutes)
TIMESTAMP_TOLERANCE_SECONDS = 300


class WebhookVerificationError(Exception):
"""Error raised when webhook signature verification fails.

This error is raised by the webhook verification module when:
- No public key is configured
- Required headers are missing (telnyx-signature-ed25519, telnyx-timestamp)
- Timestamp is too old or too new (outside 5-minute tolerance)
- Signature verification fails
- Public key or signature format is invalid

Example:
try:
verify_webhook_signature(payload, headers, public_key)
except WebhookVerificationError as e:
print(f"Webhook verification failed: {e}")
"""

def __init__(self, message: str) -> None:
self.message = message
super().__init__(message)


def _get_header(headers: Mapping[str, str], key: str) -> str | None:
"""Get header value case-insensitively.

Args:
headers: The headers mapping
key: The header key to find

Returns:
The header value or None if not found
"""
key_lower = key.lower()
for header_key, header_value in headers.items():
if header_key.lower() == key_lower:
return header_value
return None


def verify_webhook_signature(
payload: str | bytes,
headers: Mapping[str, str],
public_key: str,
) -> None:
"""Verify the ED25519 signature of a Telnyx webhook.

Telnyx webhooks are signed using ED25519 with the following format:
- Header "Telnyx-Signature-Ed25519": Base64-encoded ED25519 signature (64 bytes)
- Header "Telnyx-Timestamp": Unix timestamp in seconds
- Signed payload: "{timestamp}|{payload}"

Args:
payload: The raw webhook body as string or bytes
headers: The webhook HTTP headers
public_key: Base64-encoded ED25519 public key from Telnyx Mission Control

Raises:
WebhookVerificationError: If verification fails for any reason

Example:
verify_webhook_signature(request.body, request.headers, os.environ["TELNYX_PUBLIC_KEY"])
"""
# Lazy import to avoid making nacl a hard dependency
try:
from nacl.signing import VerifyKey # type: ignore[import-not-found]
from nacl.exceptions import BadSignatureError # type: ignore[import-not-found]
except ImportError as exc:
raise WebhookVerificationError(
"PyNaCl is required for ED25519 webhook verification. Install it with: pip install pynacl"
) from exc

# Extract required headers (case-insensitive)
signature_header = _get_header(headers, SIGNATURE_HEADER)
timestamp_header = _get_header(headers, TIMESTAMP_HEADER)

if not signature_header:
raise WebhookVerificationError(f"Missing required header: {SIGNATURE_HEADER}")

if not timestamp_header:
raise WebhookVerificationError(f"Missing required header: {TIMESTAMP_HEADER}")

# Validate timestamp to prevent replay attacks (5 minute tolerance)
try:
webhook_time = int(timestamp_header)
current_time = int(time.time())
time_diff = abs(current_time - webhook_time)

if time_diff > TIMESTAMP_TOLERANCE_SECONDS:
raise WebhookVerificationError(f"Webhook timestamp is too old or too new ({time_diff}s difference)")
except ValueError as e:
raise WebhookVerificationError(f"Invalid timestamp format: {timestamp_header}") from e

# Decode public key from base64
try:
public_key_bytes = base64.b64decode(public_key)
if len(public_key_bytes) != 32:
raise WebhookVerificationError(f"Invalid public key: expected 32 bytes, got {len(public_key_bytes)} bytes")
except Exception as e:
if isinstance(e, WebhookVerificationError):
raise
raise WebhookVerificationError(f"Invalid public key format: {e}") from e

# Decode signature from base64
try:
signature_bytes = base64.b64decode(signature_header)
if len(signature_bytes) != 64:
raise WebhookVerificationError(
f"Invalid signature length: expected 64 bytes, got {len(signature_bytes)} bytes"
)
except Exception as e:
if isinstance(e, WebhookVerificationError):
raise
raise WebhookVerificationError(f"Invalid signature format: {e}") from e

# Convert payload to string if bytes
if isinstance(payload, bytes):
payload_str = payload.decode("utf-8")
else:
payload_str = payload

# Build the signed payload: "{timestamp}|{payload}"
signed_payload = f"{timestamp_header}|{payload_str}"

# Verify the ED25519 signature using PyNaCl
try:
verify_key = VerifyKey(public_key_bytes) # type: ignore[no-untyped-call]
# PyNaCl's verify expects message + signature combined, or use verify_key.verify(signature, message)
# Actually, we need to use the raw verify since we have separate signature and message
verify_key.verify(signed_payload.encode("utf-8"), signature_bytes) # type: ignore[no-untyped-call]
except BadSignatureError as e: # type: ignore[no-untyped-call]
raise WebhookVerificationError("Signature verification failed: signature does not match payload") from e
except Exception as e:
raise WebhookVerificationError(f"Signature verification failed: {e}") from e
140 changes: 140 additions & 0 deletions src/telnyx/lib/webhooks_ed25519.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Telnyx ED25519 Webhook Helpers
#
# This module provides higher-level webhook helpers that use ED25519
# signature verification instead of the standard HMAC-SHA256 used by
# the generated `webhooks.py` module.
#
# This file is in the `lib/` directory which is preserved across Stainless
# code generation runs. Do not move it to the generated code directories.
#
# Usage:
#
# from telnyx.lib.webhooks_ed25519 import unwrap_with_ed25519, verify_ed25519
#
# # Verify signature only (raises WebhookVerificationError on failure)
# verify_ed25519(client, payload, headers)
#
# # Verify and parse (ED25519 verification, then parse)
# event = unwrap_with_ed25519(client, payload, headers)

from __future__ import annotations

import json
from typing import TYPE_CHECKING, Mapping

from .webhook_verification import (
WebhookVerificationError,
verify_webhook_signature,
)

if TYPE_CHECKING:
from telnyx import Telnyx
from telnyx.types.unwrap_webhook_event import UnwrapWebhookEvent

__all__ = [
"WebhookVerificationError",
"verify_ed25519",
"unwrap_with_ed25519",
]


def verify_ed25519(
client: Telnyx,
payload: str | bytes,
headers: Mapping[str, str],
*,
key: str | None = None,
) -> None:
"""Verify webhook signature using ED25519 without parsing the payload.

This function verifies the ED25519 signature of a Telnyx webhook but
does not parse the payload. Use this when you only need to verify
authenticity and will handle parsing yourself.

Args:
client: The Telnyx client instance (used to get public_key if not provided)
payload: The raw webhook payload as string or bytes
headers: The webhook HTTP headers
key: Optional public key override (base64-encoded ED25519).
If not provided, uses client.public_key.

Raises:
WebhookVerificationError: If verification fails
ValueError: If no public key is configured

Example:
from telnyx import Telnyx
from telnyx.lib.webhooks_ed25519 import verify_ed25519

client = Telnyx(
api_key=os.environ["TELNYX_API_KEY"],
public_key=os.environ["TELNYX_PUBLIC_KEY"]
)

verify_ed25519(client, request.body, request.headers)
"""
public_key = key or client.public_key

if not public_key:
raise ValueError("No public key configured. Provide key parameter or configure client with public_key.")

verify_webhook_signature(payload, headers, public_key)


def unwrap_with_ed25519(
client: Telnyx,
payload: str | bytes,
headers: Mapping[str, str],
*,
key: str | None = None,
) -> UnwrapWebhookEvent:
"""Verify webhook signature using ED25519 and parse the payload.

This function verifies the ED25519 signature of a Telnyx webhook and
then parses the payload into an UnwrapWebhookEvent object. Use this
as a drop-in replacement for client.webhooks.unwrap() when you need
ED25519 verification instead of HMAC-SHA256.

Args:
client: The Telnyx client instance
payload: The raw webhook payload as string or bytes
headers: The webhook HTTP headers
key: Optional public key override (base64-encoded ED25519).
If not provided, uses client.public_key.

Returns:
UnwrapWebhookEvent: The parsed webhook event

Raises:
WebhookVerificationError: If signature verification fails
ValueError: If no public key is configured

Example:
from telnyx import Telnyx
from telnyx.lib.webhooks_ed25519 import unwrap_with_ed25519

client = Telnyx(
api_key=os.environ["TELNYX_API_KEY"],
public_key=os.environ["TELNYX_PUBLIC_KEY"]
)

event = unwrap_with_ed25519(client, request.body, request.headers)
print(f"Received event: {event.event_type}")
"""
# Verify the signature first
verify_ed25519(client, payload, headers, key=key)

# Import here to avoid circular imports and keep this module lightweight
from telnyx._models import construct_type
from telnyx.types.unwrap_webhook_event import UnwrapWebhookEvent

# Parse the payload
if isinstance(payload, bytes):
payload_str = payload.decode("utf-8")
else:
payload_str = payload

parsed = json.loads(payload_str)

# Construct the typed event using Telnyx's model infrastructure
return construct_type(type_=UnwrapWebhookEvent, value=parsed) # type: ignore[return-value]
Loading