Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ The same situation applies to both `client.batch_send()` and `client.sending_api

### Webhooks API:
- Webhooks management – [`webhooks/webhooks.py`](examples/webhooks/webhooks.py)
- Verifying webhook signatures – [`webhooks/verify_signature.py`](examples/webhooks/verify_signature.py)

### Suppressions API:
- Suppressions (find & delete) – [`suppressions/suppressions.py`](examples/suppressions/suppressions.py)
Expand Down
26 changes: 26 additions & 0 deletions examples/webhooks/verify_signature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
from wsgiref.simple_server import make_server

import mailtrap as mt

SIGNING_SECRET = os.environ["MAILTRAP_WEBHOOK_SIGNING_SECRET"]


def app(environ, start_response):
# Use the raw request body — parsing and re-serializing the JSON may
# reorder keys or alter whitespace and invalidate the signature.
length = int(environ.get("CONTENT_LENGTH") or 0)
payload = environ["wsgi.input"].read(length).decode("utf-8")
signature = environ.get("HTTP_MAILTRAP_SIGNATURE", "")

if not mt.verify_signature(payload, signature, SIGNING_SECRET):
Comment on lines +13 to +16
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In Python 3, does bytes.decode("utf-8") raise UnicodeDecodeError on invalid UTF-8 by default?

💡 Result:

Yes, in Python 3, bytes.decode("utf-8") raises a UnicodeDecodeError by default when it encounters invalid UTF-8 sequences [1][2]. This occurs because the default value for the errors argument in the bytes.decode method is 'strict' [1][2]. The 'strict' error handler is defined to raise a UnicodeError (specifically a UnicodeDecodeError during decoding) whenever an encoding or decoding error is encountered [2][3][4]. If you wish to change this behavior, you can provide a different error handler for the errors argument, such as: - 'ignore': Discards the malformed bytes [3]. - 'replace': Inserts the official Unicode replacement character (U+FFFD) for malformed bytes [3][5]. Example: # This raises UnicodeDecodeError b'\xff'.decode('utf-8') # This returns '\ufffd' b'\xff'.decode('utf-8', errors='replace')

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show the relevant snippet in the example file with surrounding context
sed -n '1,120p' examples/webhooks/verify_signature.py | nl -ba

# Find where verify_signature is defined/used in the repo
rg -n "def verify_signature|verify_signature\(" -S . | head -n 50

Repository: mailtrap/mailtrap-python

Length of output: 111


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Print the example file with line numbers (no nl)
cat -n examples/webhooks/verify_signature.py | sed -n '1,160p'

# Find verify_signature definitions/usages
rg -n "def verify_signature|verify_signature\(" -S . | head -n 80

Repository: mailtrap/mailtrap-python

Length of output: 2293


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect verify_signature implementation
cat -n mailtrap/webhooks.py | sed -n '1,200p'

# Inspect unit tests around webhook signature verification
cat -n tests/unit/test_webhook_signature.py | sed -n '1,220p'

Repository: mailtrap/mailtrap-python

Length of output: 8903


Use raw request body bytes for signature verification (remove UTF-8 decoding).

examples/webhooks/verify_signature.py decodes environ["wsgi.input"] with .decode("utf-8") before calling mt.verify_signature. With Python’s default errors="strict", this can raise UnicodeDecodeError on non-UTF-8 payloads, whereas verify_signature is designed to handle inputs safely; also verify_signature expects the raw request body “exactly as received” and accepts bytes directly.

Suggested patch
-    payload = environ["wsgi.input"].read(length).decode("utf-8")
+    payload = environ["wsgi.input"].read(length)
     signature = environ.get("HTTP_MAILTRAP_SIGNATURE", "")
 
     if not mt.verify_signature(payload, signature, SIGNING_SECRET):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
payload = environ["wsgi.input"].read(length).decode("utf-8")
signature = environ.get("HTTP_MAILTRAP_SIGNATURE", "")
if not mt.verify_signature(payload, signature, SIGNING_SECRET):
payload = environ["wsgi.input"].read(length)
signature = environ.get("HTTP_MAILTRAP_SIGNATURE", "")
if not mt.verify_signature(payload, signature, SIGNING_SECRET):
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/webhooks/verify_signature.py` around lines 13 - 16, The code
currently reads and decodes the request body into a UTF-8 string before
verification which can raise UnicodeDecodeError and violates verify_signature's
expectation of raw bytes; change the logic that reads environ["wsgi.input"] into
the variable payload so it remains bytes (do not call .decode("utf-8")) and pass
those bytes directly to mt.verify_signature(payload, signature, SIGNING_SECRET)
so the signature function receives the exact raw request body.

start_response("401 Unauthorized", [("Content-Type", "text/plain")])
return [b"Invalid signature"]

start_response("200 OK", [("Content-Type", "text/plain")])
return [b""]


if __name__ == "__main__":
with make_server("", 9292, app) as server:
server.serve_forever()
1 change: 1 addition & 0 deletions mailtrap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@
from .models.templates import UpdateEmailTemplateParams
from .models.webhooks import CreateWebhookParams
from .models.webhooks import UpdateWebhookParams
from .webhooks import verify_signature
72 changes: 72 additions & 0 deletions mailtrap/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Helpers for working with inbound Mailtrap webhooks.

See https://docs.mailtrap.io/email-api-smtp/advanced/webhooks#verifying-the-signature
for the algorithm reference.
"""

import hashlib
import hmac
from typing import Union

# Hex-encoded HMAC-SHA256 signature length (SHA-256 produces 32 bytes / 64 hex chars).
SIGNATURE_HEX_LENGTH = 64


def verify_signature(
payload: Union[str, bytes],
signature: str,
signing_secret: str,
) -> bool:
"""Verify the HMAC-SHA256 signature of a Mailtrap webhook payload.

Mailtrap signs every outbound webhook by computing
``HMAC-SHA256(signing_secret, raw_request_body)`` and sending the
lowercase hex digest in the ``Mailtrap-Signature`` HTTP header. Compute
the same digest on your side and compare it in constant time.

The comparison is performed with :func:`hmac.compare_digest` to avoid
timing side-channels.

The function never raises on inputs that could plausibly arrive over the
wire (empty strings, wrong-length signatures, non-hex characters, missing
secret) -- it simply returns ``False``. This makes it safe to call
directly from a request handler without wrapping in ``try``/``except``.

:param payload: The raw request body, exactly as received. Accepts
``str`` (encoded as UTF-8 internally) or ``bytes``. **Do not** parse
and re-serialize the JSON -- re-encoding may reorder keys or alter
whitespace and invalidate the signature.
:param signature: The value of the ``Mailtrap-Signature`` HTTP header
(lowercase hex string).
:param signing_secret: The webhook's ``signing_secret``, returned by
:meth:`mailtrap.api.resources.webhooks.WebhooksApi.create` on
webhook creation.
:returns: ``True`` if the signature is valid for the given payload and
secret, ``False`` otherwise.
"""
if not isinstance(signature, str) or not signature:
return False
if not isinstance(signing_secret, str) or not signing_secret:
return False
if not isinstance(payload, (str, bytes)):
return False
if len(payload) == 0:
return False
if len(signature) != SIGNATURE_HEX_LENGTH:
return False

if isinstance(payload, str):
payload_bytes = payload.encode("utf-8")
else:
payload_bytes = payload

try:
expected = hmac.new(
signing_secret.encode("utf-8"),
payload_bytes,
hashlib.sha256,
).hexdigest()
except (TypeError, ValueError):
return False

return hmac.compare_digest(expected, signature)
125 changes: 125 additions & 0 deletions tests/unit/test_webhook_signature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import hashlib
import hmac

from mailtrap.webhooks import SIGNATURE_HEX_LENGTH
from mailtrap.webhooks import verify_signature

# ---------------------------------------------------------------------------
# Cross-SDK fixture
#
# The (payload, signing_secret, expected_signature) triple below is the
# canonical fixture shared verbatim by every official Mailtrap SDK
# (mailtrap-ruby, mailtrap-python, mailtrap-php, mailtrap-nodejs,
# mailtrap-java, mailtrap-dotnet). Any change here MUST be mirrored in the
# equivalent test files in the other SDKs so the helpers stay byte-for-byte
# compatible across languages.
# ---------------------------------------------------------------------------
FIXTURE_PAYLOAD = (
'{"event":"delivery","sending_stream":"transactional","category":"welcome",'
'"message_id":"a8b1d8f6-1f8d-4a3c-9b2e-1a2b3c4d5e6f",'
'"email":"recipient@example.com",'
'"event_id":"f1e2d3c4-b5a6-7890-1234-567890abcdef",'
'"timestamp":1716070000}'
)
FIXTURE_SIGNING_SECRET = "8d9a3c0e7f5b2d4a6c1e9f8b3a7d5c2e"
FIXTURE_EXPECTED_SIGNATURE = (
"6d262e2611cd09be1f948382b5c611d63b0e585c4c9c5e40139d6ac3876d5433"
)


class TestVerifySignature:
# --- 1. Valid signature for given payload + secret ----------------------
def test_returns_true_for_valid_signature_payload_and_secret(self) -> None:
assert (
verify_signature(
FIXTURE_PAYLOAD,
FIXTURE_EXPECTED_SIGNATURE,
FIXTURE_SIGNING_SECRET,
)
is True
)

# --- 2. Wrong secret ----------------------------------------------------
def test_returns_false_with_wrong_signing_secret(self) -> None:
assert (
verify_signature(
FIXTURE_PAYLOAD,
FIXTURE_EXPECTED_SIGNATURE,
"ffffffffffffffffffffffffffffffff",
)
is False
)

# --- 3. Payload tampered (one byte changed) -----------------------------
def test_returns_false_when_payload_is_tampered(self) -> None:
tampered = FIXTURE_PAYLOAD.replace("delivery", "Delivery")

assert (
verify_signature(
tampered,
FIXTURE_EXPECTED_SIGNATURE,
FIXTURE_SIGNING_SECRET,
)
is False
)

# --- 4. Signature with wrong length -------------------------------------
def test_returns_false_without_raising_when_signature_too_short(self) -> None:
too_short = FIXTURE_EXPECTED_SIGNATURE[:31]

assert (
verify_signature(FIXTURE_PAYLOAD, too_short, FIXTURE_SIGNING_SECRET) is False
)

# --- 5. Signature with non-hex characters -------------------------------
def test_returns_false_without_raising_for_non_hex_signature(self) -> None:
not_hex = "z" * SIGNATURE_HEX_LENGTH

assert verify_signature(FIXTURE_PAYLOAD, not_hex, FIXTURE_SIGNING_SECRET) is False

# --- 6. Empty signature string ------------------------------------------
def test_returns_false_for_empty_signature(self) -> None:
assert verify_signature(FIXTURE_PAYLOAD, "", FIXTURE_SIGNING_SECRET) is False

# --- 7. Empty signing_secret --------------------------------------------
def test_returns_false_for_empty_signing_secret(self) -> None:
assert verify_signature(FIXTURE_PAYLOAD, FIXTURE_EXPECTED_SIGNATURE, "") is False

# --- 8. Empty payload + non-empty signature -----------------------------
def test_returns_false_for_empty_payload(self) -> None:
assert (
verify_signature("", FIXTURE_EXPECTED_SIGNATURE, FIXTURE_SIGNING_SECRET)
is False
)

# --- 9. Known-good cross-SDK fixture ------------------------------------
def test_matches_hardcoded_hmac_sha256_digest_for_shared_fixture(self) -> None:
# Recompute the digest in-place so a regression in the stdlib or the
# fixture itself fails loudly: this is the byte-for-byte contract
# every other Mailtrap SDK must satisfy.
computed = hmac.new(
FIXTURE_SIGNING_SECRET.encode("utf-8"),
FIXTURE_PAYLOAD.encode("utf-8"),
hashlib.sha256,
).hexdigest()

assert computed == FIXTURE_EXPECTED_SIGNATURE
assert (
verify_signature(
FIXTURE_PAYLOAD,
FIXTURE_EXPECTED_SIGNATURE,
FIXTURE_SIGNING_SECRET,
)
is True
)

# --- Bonus: accepts bytes payload ---------------------------------------
def test_accepts_bytes_payload(self) -> None:
assert (
verify_signature(
FIXTURE_PAYLOAD.encode("utf-8"),
FIXTURE_EXPECTED_SIGNATURE,
FIXTURE_SIGNING_SECRET,
)
is True
)
Loading