-
Notifications
You must be signed in to change notification settings - Fork 7
MT-22022: Add webhook signature verification helper #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+225
−0
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| import os | ||
| from wsgiref.simple_server import make_server | ||
|
|
||
| import mailtrap as mt | ||
|
|
||
| SIGNING_SECRET = os.environ["MAILTRAP_WEBHOOK_SIGNING_SECRET"] | ||
|
|
||
|
|
||
| def app(environ, start_response): | ||
| # Use the raw request body — parsing and re-serializing the JSON may | ||
| # reorder keys or alter whitespace and invalidate the signature. | ||
| length = int(environ.get("CONTENT_LENGTH") or 0) | ||
| payload = environ["wsgi.input"].read(length).decode("utf-8") | ||
| signature = environ.get("HTTP_MAILTRAP_SIGNATURE", "") | ||
|
|
||
| if not mt.verify_signature(payload, signature, SIGNING_SECRET): | ||
| start_response("401 Unauthorized", [("Content-Type", "text/plain")]) | ||
| return [b"Invalid signature"] | ||
|
|
||
| start_response("200 OK", [("Content-Type", "text/plain")]) | ||
| return [b""] | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| with make_server("", 9292, app) as server: | ||
| server.serve_forever() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| """Helpers for working with inbound Mailtrap webhooks. | ||
|
|
||
| See https://docs.mailtrap.io/email-api-smtp/advanced/webhooks#verifying-the-signature | ||
| for the algorithm reference. | ||
| """ | ||
|
|
||
| import hashlib | ||
| import hmac | ||
| from typing import Union | ||
|
|
||
| # Hex-encoded HMAC-SHA256 signature length (SHA-256 produces 32 bytes / 64 hex chars). | ||
| SIGNATURE_HEX_LENGTH = 64 | ||
|
|
||
|
|
||
| def verify_signature( | ||
| payload: Union[str, bytes], | ||
| signature: str, | ||
| signing_secret: str, | ||
| ) -> bool: | ||
| """Verify the HMAC-SHA256 signature of a Mailtrap webhook payload. | ||
|
|
||
| Mailtrap signs every outbound webhook by computing | ||
| ``HMAC-SHA256(signing_secret, raw_request_body)`` and sending the | ||
| lowercase hex digest in the ``Mailtrap-Signature`` HTTP header. Compute | ||
| the same digest on your side and compare it in constant time. | ||
|
|
||
| The comparison is performed with :func:`hmac.compare_digest` to avoid | ||
| timing side-channels. | ||
|
|
||
| The function never raises on inputs that could plausibly arrive over the | ||
| wire (empty strings, wrong-length signatures, non-hex characters, missing | ||
| secret) -- it simply returns ``False``. This makes it safe to call | ||
| directly from a request handler without wrapping in ``try``/``except``. | ||
|
|
||
| :param payload: The raw request body, exactly as received. Accepts | ||
| ``str`` (encoded as UTF-8 internally) or ``bytes``. **Do not** parse | ||
| and re-serialize the JSON -- re-encoding may reorder keys or alter | ||
| whitespace and invalidate the signature. | ||
| :param signature: The value of the ``Mailtrap-Signature`` HTTP header | ||
| (lowercase hex string). | ||
| :param signing_secret: The webhook's ``signing_secret``, returned by | ||
| :meth:`mailtrap.api.resources.webhooks.WebhooksApi.create` on | ||
| webhook creation. | ||
| :returns: ``True`` if the signature is valid for the given payload and | ||
| secret, ``False`` otherwise. | ||
| """ | ||
| if not isinstance(signature, str) or not signature: | ||
| return False | ||
| if not isinstance(signing_secret, str) or not signing_secret: | ||
| return False | ||
| if not isinstance(payload, (str, bytes)): | ||
| return False | ||
| if len(payload) == 0: | ||
| return False | ||
| if len(signature) != SIGNATURE_HEX_LENGTH: | ||
| return False | ||
|
|
||
| if isinstance(payload, str): | ||
| payload_bytes = payload.encode("utf-8") | ||
| else: | ||
| payload_bytes = payload | ||
|
|
||
| try: | ||
| expected = hmac.new( | ||
| signing_secret.encode("utf-8"), | ||
| payload_bytes, | ||
| hashlib.sha256, | ||
| ).hexdigest() | ||
| except (TypeError, ValueError): | ||
| return False | ||
|
|
||
| return hmac.compare_digest(expected, signature) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,125 @@ | ||
| import hashlib | ||
| import hmac | ||
|
|
||
| from mailtrap.webhooks import SIGNATURE_HEX_LENGTH | ||
| from mailtrap.webhooks import verify_signature | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Cross-SDK fixture | ||
| # | ||
| # The (payload, signing_secret, expected_signature) triple below is the | ||
| # canonical fixture shared verbatim by every official Mailtrap SDK | ||
| # (mailtrap-ruby, mailtrap-python, mailtrap-php, mailtrap-nodejs, | ||
| # mailtrap-java, mailtrap-dotnet). Any change here MUST be mirrored in the | ||
| # equivalent test files in the other SDKs so the helpers stay byte-for-byte | ||
| # compatible across languages. | ||
| # --------------------------------------------------------------------------- | ||
| FIXTURE_PAYLOAD = ( | ||
| '{"event":"delivery","sending_stream":"transactional","category":"welcome",' | ||
| '"message_id":"a8b1d8f6-1f8d-4a3c-9b2e-1a2b3c4d5e6f",' | ||
| '"email":"recipient@example.com",' | ||
| '"event_id":"f1e2d3c4-b5a6-7890-1234-567890abcdef",' | ||
| '"timestamp":1716070000}' | ||
| ) | ||
| FIXTURE_SIGNING_SECRET = "8d9a3c0e7f5b2d4a6c1e9f8b3a7d5c2e" | ||
| FIXTURE_EXPECTED_SIGNATURE = ( | ||
| "6d262e2611cd09be1f948382b5c611d63b0e585c4c9c5e40139d6ac3876d5433" | ||
| ) | ||
|
|
||
|
|
||
| class TestVerifySignature: | ||
| # --- 1. Valid signature for given payload + secret ---------------------- | ||
| def test_returns_true_for_valid_signature_payload_and_secret(self) -> None: | ||
| assert ( | ||
| verify_signature( | ||
| FIXTURE_PAYLOAD, | ||
| FIXTURE_EXPECTED_SIGNATURE, | ||
| FIXTURE_SIGNING_SECRET, | ||
| ) | ||
| is True | ||
| ) | ||
|
|
||
| # --- 2. Wrong secret ---------------------------------------------------- | ||
| def test_returns_false_with_wrong_signing_secret(self) -> None: | ||
| assert ( | ||
| verify_signature( | ||
| FIXTURE_PAYLOAD, | ||
| FIXTURE_EXPECTED_SIGNATURE, | ||
| "ffffffffffffffffffffffffffffffff", | ||
| ) | ||
| is False | ||
| ) | ||
|
|
||
| # --- 3. Payload tampered (one byte changed) ----------------------------- | ||
| def test_returns_false_when_payload_is_tampered(self) -> None: | ||
| tampered = FIXTURE_PAYLOAD.replace("delivery", "Delivery") | ||
|
|
||
| assert ( | ||
| verify_signature( | ||
| tampered, | ||
| FIXTURE_EXPECTED_SIGNATURE, | ||
| FIXTURE_SIGNING_SECRET, | ||
| ) | ||
| is False | ||
| ) | ||
|
|
||
| # --- 4. Signature with wrong length ------------------------------------- | ||
| def test_returns_false_without_raising_when_signature_too_short(self) -> None: | ||
| too_short = FIXTURE_EXPECTED_SIGNATURE[:31] | ||
|
|
||
| assert ( | ||
| verify_signature(FIXTURE_PAYLOAD, too_short, FIXTURE_SIGNING_SECRET) is False | ||
| ) | ||
|
|
||
| # --- 5. Signature with non-hex characters ------------------------------- | ||
| def test_returns_false_without_raising_for_non_hex_signature(self) -> None: | ||
| not_hex = "z" * SIGNATURE_HEX_LENGTH | ||
|
|
||
| assert verify_signature(FIXTURE_PAYLOAD, not_hex, FIXTURE_SIGNING_SECRET) is False | ||
|
|
||
| # --- 6. Empty signature string ------------------------------------------ | ||
| def test_returns_false_for_empty_signature(self) -> None: | ||
| assert verify_signature(FIXTURE_PAYLOAD, "", FIXTURE_SIGNING_SECRET) is False | ||
|
|
||
| # --- 7. Empty signing_secret -------------------------------------------- | ||
| def test_returns_false_for_empty_signing_secret(self) -> None: | ||
| assert verify_signature(FIXTURE_PAYLOAD, FIXTURE_EXPECTED_SIGNATURE, "") is False | ||
|
|
||
| # --- 8. Empty payload + non-empty signature ----------------------------- | ||
| def test_returns_false_for_empty_payload(self) -> None: | ||
| assert ( | ||
| verify_signature("", FIXTURE_EXPECTED_SIGNATURE, FIXTURE_SIGNING_SECRET) | ||
| is False | ||
| ) | ||
|
|
||
| # --- 9. Known-good cross-SDK fixture ------------------------------------ | ||
| def test_matches_hardcoded_hmac_sha256_digest_for_shared_fixture(self) -> None: | ||
| # Recompute the digest in-place so a regression in the stdlib or the | ||
| # fixture itself fails loudly: this is the byte-for-byte contract | ||
| # every other Mailtrap SDK must satisfy. | ||
| computed = hmac.new( | ||
| FIXTURE_SIGNING_SECRET.encode("utf-8"), | ||
| FIXTURE_PAYLOAD.encode("utf-8"), | ||
| hashlib.sha256, | ||
| ).hexdigest() | ||
|
|
||
| assert computed == FIXTURE_EXPECTED_SIGNATURE | ||
| assert ( | ||
| verify_signature( | ||
| FIXTURE_PAYLOAD, | ||
| FIXTURE_EXPECTED_SIGNATURE, | ||
| FIXTURE_SIGNING_SECRET, | ||
| ) | ||
| is True | ||
| ) | ||
|
|
||
| # --- Bonus: accepts bytes payload --------------------------------------- | ||
| def test_accepts_bytes_payload(self) -> None: | ||
| assert ( | ||
| verify_signature( | ||
| FIXTURE_PAYLOAD.encode("utf-8"), | ||
| FIXTURE_EXPECTED_SIGNATURE, | ||
| FIXTURE_SIGNING_SECRET, | ||
| ) | ||
| is True | ||
| ) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
In Python 3, does bytes.decode("utf-8") raise UnicodeDecodeError on invalid UTF-8 by default?💡 Result:
Yes, in Python 3, bytes.decode("utf-8") raises a UnicodeDecodeError by default when it encounters invalid UTF-8 sequences [1][2]. This occurs because the default value for the errors argument in the bytes.decode method is 'strict' [1][2]. The 'strict' error handler is defined to raise a UnicodeError (specifically a UnicodeDecodeError during decoding) whenever an encoding or decoding error is encountered [2][3][4]. If you wish to change this behavior, you can provide a different error handler for the errors argument, such as: - 'ignore': Discards the malformed bytes [3]. - 'replace': Inserts the official Unicode replacement character (U+FFFD) for malformed bytes [3][5]. Example: # This raises UnicodeDecodeError b'\xff'.decode('utf-8') # This returns '\ufffd' b'\xff'.decode('utf-8', errors='replace')
Citations:
🏁 Script executed:
Repository: mailtrap/mailtrap-python
Length of output: 111
🏁 Script executed:
Repository: mailtrap/mailtrap-python
Length of output: 2293
🏁 Script executed:
Repository: mailtrap/mailtrap-python
Length of output: 8903
Use raw request body bytes for signature verification (remove UTF-8 decoding).
examples/webhooks/verify_signature.pydecodesenviron["wsgi.input"]with.decode("utf-8")before callingmt.verify_signature. With Python’s defaulterrors="strict", this can raiseUnicodeDecodeErroron non-UTF-8 payloads, whereasverify_signatureis designed to handle inputs safely; alsoverify_signatureexpects the raw request body “exactly as received” and acceptsbytesdirectly.Suggested patch
📝 Committable suggestion
🤖 Prompt for AI Agents