mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-20 03:41:05 -05:00
Resend: new ESP (#341)
Add support for Resend.com backend and webhooks. Closes #341
This commit is contained in:
572
tests/test_resend_backend.py
Normal file
572
tests/test_resend_backend.py
Normal file
@@ -0,0 +1,572 @@
|
||||
import json
|
||||
from base64 import b64encode
|
||||
from decimal import Decimal
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.image import MIMEImage
|
||||
from email.utils import formataddr
|
||||
|
||||
from django.core import mail
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.test import SimpleTestCase, override_settings, tag
|
||||
|
||||
from anymail.exceptions import (
|
||||
AnymailAPIError,
|
||||
AnymailSerializationError,
|
||||
AnymailUnsupportedFeature,
|
||||
)
|
||||
from anymail.message import attach_inline_image_file
|
||||
|
||||
from .mock_requests_backend import (
|
||||
RequestsBackendMockAPITestCase,
|
||||
SessionSharingTestCases,
|
||||
)
|
||||
from .utils import (
|
||||
SAMPLE_IMAGE_FILENAME,
|
||||
AnymailTestMixin,
|
||||
decode_att,
|
||||
sample_image_content,
|
||||
sample_image_path,
|
||||
)
|
||||
|
||||
|
||||
@tag("resend")
|
||||
@override_settings(
|
||||
EMAIL_BACKEND="anymail.backends.resend.EmailBackend",
|
||||
ANYMAIL={
|
||||
"RESEND_API_KEY": "test_api_key",
|
||||
},
|
||||
)
|
||||
class ResendBackendMockAPITestCase(RequestsBackendMockAPITestCase):
|
||||
DEFAULT_RAW_RESPONSE = b'{"id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}'
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
# Simple message useful for many tests
|
||||
self.message = mail.EmailMultiAlternatives(
|
||||
"Subject", "Text Body", "from@example.com", ["to@example.com"]
|
||||
)
|
||||
|
||||
|
||||
@tag("resend")
|
||||
class ResendBackendStandardEmailTests(ResendBackendMockAPITestCase):
|
||||
"""Test backend support for Django standard email features"""
|
||||
|
||||
def test_send_mail(self):
|
||||
"""Test basic API for simple send"""
|
||||
mail.send_mail(
|
||||
"Subject here",
|
||||
"Here is the message.",
|
||||
"from@sender.example.com",
|
||||
["to@example.com"],
|
||||
fail_silently=False,
|
||||
)
|
||||
self.assert_esp_called("/emails")
|
||||
headers = self.get_api_call_headers()
|
||||
self.assertEqual(headers["Authorization"], "Bearer test_api_key")
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data["subject"], "Subject here")
|
||||
self.assertEqual(data["text"], "Here is the message.")
|
||||
self.assertEqual(data["from"], "from@sender.example.com")
|
||||
self.assertEqual(data["to"], ["to@example.com"])
|
||||
|
||||
def test_name_addr(self):
|
||||
"""Make sure RFC2822 name-addr format (with display-name) is allowed
|
||||
|
||||
(Test both sender and recipient addresses)
|
||||
"""
|
||||
msg = mail.EmailMessage(
|
||||
"Subject",
|
||||
"Message",
|
||||
"From Name <from@example.com>",
|
||||
["Recipient #1 <to1@example.com>", "to2@example.com"],
|
||||
cc=["Carbon Copy <cc1@example.com>", "cc2@example.com"],
|
||||
bcc=["Blind Copy <bcc1@example.com>", "bcc2@example.com"],
|
||||
)
|
||||
msg.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data["from"], "From Name <from@example.com>")
|
||||
self.assertEqual(
|
||||
data["to"], ["Recipient #1 <to1@example.com>", "to2@example.com"]
|
||||
)
|
||||
self.assertEqual(
|
||||
data["cc"], ["Carbon Copy <cc1@example.com>", "cc2@example.com"]
|
||||
)
|
||||
self.assertEqual(
|
||||
data["bcc"], ["Blind Copy <bcc1@example.com>", "bcc2@example.com"]
|
||||
)
|
||||
|
||||
def test_display_name_workarounds(self):
|
||||
# Resend's API has a bug that rejects a display-name in double quotes
|
||||
# (per RFC 5322 section 3.4). Attempting to omit the quotes works, unless
|
||||
# the display-name also contains a comma. Try to avoid the whole problem
|
||||
# by using RFC 2047 encoded words for addresses Resend will parse incorrectly.
|
||||
msg = mail.EmailMessage(
|
||||
"Subject",
|
||||
"Message",
|
||||
formataddr(("Félix Företag, Inc.", "from@example.com")),
|
||||
[
|
||||
'"To, comma" <to1@example.com>',
|
||||
"non–ascii <to2@example.com>",
|
||||
"=?utf-8?q?pre_encoded?= <to3@example.com>",
|
||||
],
|
||||
reply_to=['"Reply, comma" <reply1@example.com>'],
|
||||
)
|
||||
msg.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(
|
||||
data["from"],
|
||||
# for `from` field only, avoid RFC 2047 and retain non-ASCII characters:
|
||||
'"Félix Företag, Inc." <from@example.com>',
|
||||
)
|
||||
self.assertEqual(
|
||||
data["to"],
|
||||
[
|
||||
"=?utf-8?q?To=2C_comma?= <to1@example.com>",
|
||||
"=?utf-8?b?bm9u4oCTYXNjaWk=?= <to2@example.com>",
|
||||
"=?utf-8?q?pre_encoded?= <to3@example.com>",
|
||||
],
|
||||
)
|
||||
self.assertEqual(
|
||||
data["reply_to"], ["=?utf-8?q?Reply=2C_comma?= <reply1@example.com>"]
|
||||
)
|
||||
|
||||
@override_settings(ANYMAIL_RESEND_WORKAROUND_DISPLAY_NAME_BUGS=False)
|
||||
def test_undocumented_workaround_setting(self):
|
||||
# Same test as above, but workarounds disabled
|
||||
msg = mail.EmailMessage(
|
||||
"Subject",
|
||||
"Message",
|
||||
'"Félix Företag" <from@example.com>',
|
||||
[
|
||||
'"To, comma" <to1@example.com>',
|
||||
"non–ascii <to2@example.com>",
|
||||
"=?utf-8?q?pre_encoded?= <to3@example.com>",
|
||||
],
|
||||
reply_to=['"Reply, comma" <reply1@example.com>'],
|
||||
)
|
||||
msg.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(
|
||||
data["from"],
|
||||
# (Django uses base64 encoded word unless QP is shorter)
|
||||
"=?utf-8?b?RsOpbGl4IEbDtnJldGFn?= <from@example.com>",
|
||||
)
|
||||
self.assertEqual(
|
||||
data["to"],
|
||||
[
|
||||
'"To, comma" <to1@example.com>',
|
||||
"=?utf-8?b?bm9u4oCTYXNjaWk=?= <to2@example.com>",
|
||||
"=?utf-8?q?pre_encoded?= <to3@example.com>",
|
||||
],
|
||||
)
|
||||
self.assertEqual(data["reply_to"], ['"Reply, comma" <reply1@example.com>'])
|
||||
|
||||
def test_email_message(self):
|
||||
email = mail.EmailMessage(
|
||||
"Subject",
|
||||
"Body goes here",
|
||||
"from@example.com",
|
||||
["to1@example.com", "Also To <to2@example.com>"],
|
||||
bcc=["bcc1@example.com", "Also BCC <bcc2@example.com>"],
|
||||
cc=["cc1@example.com", "Also CC <cc2@example.com>"],
|
||||
reply_to=["another@example.com"],
|
||||
headers={
|
||||
"X-MyHeader": "my value",
|
||||
},
|
||||
)
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data["subject"], "Subject")
|
||||
self.assertEqual(data["text"], "Body goes here")
|
||||
self.assertEqual(data["from"], "from@example.com")
|
||||
self.assertEqual(data["to"], ["to1@example.com", "Also To <to2@example.com>"])
|
||||
self.assertEqual(
|
||||
data["bcc"], ["bcc1@example.com", "Also BCC <bcc2@example.com>"]
|
||||
)
|
||||
self.assertEqual(data["cc"], ["cc1@example.com", "Also CC <cc2@example.com>"])
|
||||
self.assertEqual(data["reply_to"], ["another@example.com"])
|
||||
self.assertCountEqual(
|
||||
data["headers"],
|
||||
{"X-MyHeader": "my value"},
|
||||
)
|
||||
|
||||
def test_html_message(self):
|
||||
text_content = "This is an important message."
|
||||
html_content = "<p>This is an <strong>important</strong> message.</p>"
|
||||
email = mail.EmailMultiAlternatives(
|
||||
"Subject", text_content, "from@example.com", ["to@example.com"]
|
||||
)
|
||||
email.attach_alternative(html_content, "text/html")
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data["text"], text_content)
|
||||
self.assertEqual(data["html"], html_content)
|
||||
# Don't accidentally send the html part as an attachment:
|
||||
self.assertNotIn("attachments", data)
|
||||
|
||||
def test_html_only_message(self):
|
||||
html_content = "<p>This is an <strong>important</strong> message.</p>"
|
||||
email = mail.EmailMessage(
|
||||
"Subject", html_content, "from@example.com", ["to@example.com"]
|
||||
)
|
||||
email.content_subtype = "html" # Main content is now text/html
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn("text", data)
|
||||
self.assertEqual(data["html"], html_content)
|
||||
|
||||
def test_extra_headers(self):
|
||||
self.message.extra_headers = {"X-Custom": "string", "X-Num": 123}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
# header values must be strings (or they'll cause an "invalid literal" API error)
|
||||
self.assertEqual(data["headers"], {"X-Custom": "string", "X-Num": "123"})
|
||||
|
||||
def test_extra_headers_serialization_error(self):
|
||||
self.message.extra_headers = {"X-Custom": Decimal(12.5)}
|
||||
with self.assertRaisesMessage(AnymailSerializationError, "Decimal"):
|
||||
self.message.send()
|
||||
|
||||
def test_reply_to(self):
|
||||
email = mail.EmailMessage(
|
||||
"Subject",
|
||||
"Body goes here",
|
||||
"from@example.com",
|
||||
["to1@example.com"],
|
||||
reply_to=["reply@example.com", "Other <reply2@example.com>"],
|
||||
)
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(
|
||||
data["reply_to"], ["reply@example.com", "Other <reply2@example.com>"]
|
||||
)
|
||||
|
||||
def test_attachments(self):
|
||||
text_content = "* Item one\n* Item two\n* Item three"
|
||||
self.message.attach(
|
||||
filename="test.txt", content=text_content, mimetype="text/plain"
|
||||
)
|
||||
|
||||
# Should guess mimetype if not provided...
|
||||
png_content = b"PNG\xb4 pretend this is the contents of a png file"
|
||||
self.message.attach(filename="test.png", content=png_content)
|
||||
|
||||
# Should work with a MIMEBase object (also tests no filename)...
|
||||
pdf_content = b"PDF\xb4 pretend this is valid pdf data"
|
||||
mimeattachment = MIMEBase("application", "pdf")
|
||||
mimeattachment.set_payload(pdf_content)
|
||||
self.message.attach(mimeattachment)
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
attachments = data["attachments"]
|
||||
self.assertEqual(len(attachments), 3)
|
||||
self.assertEqual(attachments[0]["filename"], "test.txt")
|
||||
self.assertEqual(
|
||||
decode_att(attachments[0]["content"]).decode("ascii"), text_content
|
||||
)
|
||||
|
||||
self.assertEqual(attachments[1]["filename"], "test.png")
|
||||
self.assertEqual(decode_att(attachments[1]["content"]), png_content)
|
||||
|
||||
# unnamed attachment given default name with correct extension for content type
|
||||
self.assertEqual(attachments[2]["filename"], "attachment.pdf")
|
||||
self.assertEqual(decode_att(attachments[2]["content"]), pdf_content)
|
||||
|
||||
def test_unicode_attachment_correctly_decoded(self):
|
||||
self.message.attach(
|
||||
"Une pièce jointe.html", "<p>\u2019</p>", mimetype="text/html"
|
||||
)
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(
|
||||
data["attachments"],
|
||||
[
|
||||
{
|
||||
"filename": "Une pièce jointe.html",
|
||||
"content": b64encode("<p>\u2019</p>".encode("utf-8")).decode(
|
||||
"ascii"
|
||||
),
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
def test_embedded_images(self):
|
||||
# Resend's API doesn't have a way to specify content-id
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
image_path = sample_image_path(image_filename)
|
||||
|
||||
cid = attach_inline_image_file(self.message, image_path) # Read from a png file
|
||||
html_content = (
|
||||
'<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % cid
|
||||
)
|
||||
self.message.attach_alternative(html_content, "text/html")
|
||||
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, "inline content-id"):
|
||||
self.message.send()
|
||||
|
||||
def test_attached_images(self):
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
image_path = sample_image_path(image_filename)
|
||||
image_data = sample_image_content(image_filename)
|
||||
|
||||
# option 1: attach as a file
|
||||
self.message.attach_file(image_path)
|
||||
|
||||
# option 2: construct the MIMEImage and attach it directly
|
||||
image = MIMEImage(image_data)
|
||||
self.message.attach(image)
|
||||
|
||||
image_data_b64 = b64encode(image_data).decode("ascii")
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(
|
||||
data["attachments"],
|
||||
[
|
||||
{
|
||||
"filename": image_filename, # the named one
|
||||
"content": image_data_b64,
|
||||
},
|
||||
{
|
||||
# For unnamed attachments, Anymail constructs a default name
|
||||
# based on the content_type:
|
||||
"filename": "attachment.png",
|
||||
"content": image_data_b64,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
def test_multiple_html_alternatives(self):
|
||||
# Multiple alternatives not allowed
|
||||
self.message.attach_alternative("<p>First html is OK</p>", "text/html")
|
||||
self.message.attach_alternative("<p>But not second html</p>", "text/html")
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, "multiple html parts"):
|
||||
self.message.send()
|
||||
|
||||
def test_html_alternative(self):
|
||||
# Only html alternatives allowed
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
|
||||
def test_alternatives_fail_silently(self):
|
||||
# Make sure fail_silently is respected
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
sent = self.message.send(fail_silently=True)
|
||||
self.assert_esp_not_called("API should not be called when send fails silently")
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_suppress_empty_address_lists(self):
|
||||
"""Empty to, cc, bcc, and reply_to shouldn't generate empty fields"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn("cc", data)
|
||||
self.assertNotIn("bcc", data)
|
||||
self.assertNotIn("reply_to", data)
|
||||
|
||||
# Test empty `to`--but send requires at least one recipient somewhere (like cc)
|
||||
self.message.to = []
|
||||
self.message.cc = ["cc@example.com"]
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn("to", data)
|
||||
|
||||
def test_api_failure(self):
|
||||
failure_response = {
|
||||
"statusCode": 400,
|
||||
"message": "API key is invalid",
|
||||
"name": "validation_error",
|
||||
}
|
||||
self.set_mock_response(status_code=400, json_data=failure_response)
|
||||
with self.assertRaisesMessage(
|
||||
AnymailAPIError, r"Resend API response 400"
|
||||
) as cm:
|
||||
mail.send_mail("Subject", "Body", "from@example.com", ["to@example.com"])
|
||||
self.assertIn("API key is invalid", str(cm.exception))
|
||||
|
||||
# Make sure fail_silently is respected
|
||||
self.set_mock_response(status_code=422, json_data=failure_response)
|
||||
sent = mail.send_mail(
|
||||
"Subject",
|
||||
"Body",
|
||||
"from@example.com",
|
||||
["to@example.com"],
|
||||
fail_silently=True,
|
||||
)
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
|
||||
@tag("resend")
|
||||
class ResendBackendAnymailFeatureTests(ResendBackendMockAPITestCase):
|
||||
"""Test backend support for Anymail added features"""
|
||||
|
||||
def test_envelope_sender(self):
|
||||
self.message.envelope_sender = "anything@bounces.example.com"
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, "envelope_sender"):
|
||||
self.message.send()
|
||||
|
||||
def test_metadata(self):
|
||||
self.message.metadata = {"user_id": "12345", "items": 6}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(
|
||||
json.loads(data["headers"]["X-Metadata"]),
|
||||
{"user_id": "12345", "items": 6},
|
||||
)
|
||||
|
||||
def test_send_at(self):
|
||||
self.message.send_at = 1651820889 # 2022-05-06 07:08:09 UTC
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, "send_at"):
|
||||
self.message.send()
|
||||
|
||||
def test_tags(self):
|
||||
self.message.tags = ["receipt", "reorder test 12"]
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(
|
||||
json.loads(data["headers"]["X-Tags"]),
|
||||
["receipt", "reorder test 12"],
|
||||
)
|
||||
|
||||
def test_headers_metadata_tags_interaction(self):
|
||||
# Test three features that use custom headers don't clobber each other
|
||||
self.message.extra_headers = {"X-Custom": "custom value"}
|
||||
self.message.metadata = {"user_id": "12345"}
|
||||
self.message.tags = ["receipt", "reorder test 12"]
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(
|
||||
data["headers"],
|
||||
{
|
||||
"X-Custom": "custom value",
|
||||
"X-Tags": '["receipt", "reorder test 12"]',
|
||||
"X-Metadata": '{"user_id": "12345"}',
|
||||
},
|
||||
)
|
||||
|
||||
def test_track_opens(self):
|
||||
self.message.track_opens = True
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, "track_opens"):
|
||||
self.message.send()
|
||||
|
||||
def test_track_clicks(self):
|
||||
self.message.track_clicks = True
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, "track_clicks"):
|
||||
self.message.send()
|
||||
|
||||
def test_default_omits_options(self):
|
||||
"""Make sure by default we don't send any ESP-specific options.
|
||||
|
||||
Options not specified by the caller should be omitted entirely from
|
||||
the API call (*not* sent as False or empty). This ensures
|
||||
that your ESP account settings apply by default.
|
||||
"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn("headers", data)
|
||||
self.assertNotIn("attachments", data)
|
||||
self.assertNotIn("tags", data)
|
||||
|
||||
def test_esp_extra(self):
|
||||
self.message.esp_extra = {
|
||||
"tags": [{"name": "my_tag", "value": "my_tag_value"}],
|
||||
}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data["tags"], [{"name": "my_tag", "value": "my_tag_value"}])
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_attaches_anymail_status(self):
|
||||
"""The anymail_status should be attached to the message when it is sent"""
|
||||
msg = mail.EmailMessage(
|
||||
"Subject",
|
||||
"Message",
|
||||
"from@example.com",
|
||||
["Recipient <to1@example.com>"],
|
||||
)
|
||||
sent = msg.send()
|
||||
self.assertEqual(sent, 1)
|
||||
self.assertEqual(msg.anymail_status.status, {"queued"})
|
||||
self.assertEqual(
|
||||
msg.anymail_status.message_id, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
|
||||
)
|
||||
self.assertEqual(
|
||||
msg.anymail_status.recipients["to1@example.com"].status, "queued"
|
||||
)
|
||||
self.assertEqual(
|
||||
msg.anymail_status.recipients["to1@example.com"].message_id,
|
||||
"aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
)
|
||||
self.assertEqual(
|
||||
msg.anymail_status.esp_response.content, self.DEFAULT_RAW_RESPONSE
|
||||
)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_failed_anymail_status(self):
|
||||
"""If the send fails, anymail_status should contain initial values"""
|
||||
self.set_mock_response(status_code=500)
|
||||
sent = self.message.send(fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
self.assertIsNone(self.message.anymail_status.status)
|
||||
self.assertIsNone(self.message.anymail_status.message_id)
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertIsNone(self.message.anymail_status.esp_response)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_unparsable_response(self):
|
||||
"""
|
||||
If the send succeeds, but a non-JSON API response, should raise an API exception
|
||||
"""
|
||||
mock_response = self.set_mock_response(
|
||||
status_code=200, raw=b"yikes, this isn't a real response"
|
||||
)
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
self.message.send()
|
||||
self.assertIsNone(self.message.anymail_status.status)
|
||||
self.assertIsNone(self.message.anymail_status.message_id)
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertEqual(self.message.anymail_status.esp_response, mock_response)
|
||||
|
||||
def test_json_serialization_errors(self):
|
||||
"""Try to provide more information about non-json-serializable data"""
|
||||
self.message.metadata = {"price": Decimal("19.99")} # yeah, don't do this
|
||||
with self.assertRaises(AnymailSerializationError) as cm:
|
||||
self.message.send()
|
||||
print(self.get_api_call_json())
|
||||
err = cm.exception
|
||||
self.assertIsInstance(err, TypeError) # compatibility with json.dumps
|
||||
# our added context:
|
||||
self.assertIn("Don't know how to send this data to Resend", str(err))
|
||||
# original message:
|
||||
self.assertRegex(str(err), r"Decimal.*is not JSON serializable")
|
||||
|
||||
|
||||
@tag("resend")
|
||||
class ResendBackendRecipientsRefusedTests(ResendBackendMockAPITestCase):
|
||||
# Resend doesn't check email bounce or complaint lists at time of send --
|
||||
# it always just queues the message. You'll need to listen for the "rejected"
|
||||
# and "failed" events to detect refused recipients.
|
||||
pass
|
||||
|
||||
|
||||
@tag("resend")
|
||||
class ResendBackendSessionSharingTestCase(
|
||||
SessionSharingTestCases, ResendBackendMockAPITestCase
|
||||
):
|
||||
"""Requests session sharing tests"""
|
||||
|
||||
pass # tests are defined in SessionSharingTestCases
|
||||
|
||||
|
||||
@tag("resend")
|
||||
@override_settings(EMAIL_BACKEND="anymail.backends.resend.EmailBackend")
|
||||
class ResendBackendImproperlyConfiguredTests(AnymailTestMixin, SimpleTestCase):
|
||||
"""Test ESP backend without required settings in place"""
|
||||
|
||||
def test_missing_api_key(self):
|
||||
with self.assertRaises(ImproperlyConfigured) as cm:
|
||||
mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"])
|
||||
errmsg = str(cm.exception)
|
||||
self.assertRegex(errmsg, r"\bRESEND_API_KEY\b")
|
||||
self.assertRegex(errmsg, r"\bANYMAIL_RESEND_API_KEY\b")
|
||||
92
tests/test_resend_integration.py
Normal file
92
tests/test_resend_integration.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import os
|
||||
import unittest
|
||||
from email.utils import formataddr
|
||||
|
||||
from django.test import SimpleTestCase, override_settings, tag
|
||||
|
||||
from anymail.exceptions import AnymailAPIError
|
||||
from anymail.message import AnymailMessage
|
||||
|
||||
from .utils import AnymailTestMixin
|
||||
|
||||
ANYMAIL_TEST_RESEND_API_KEY = os.getenv("ANYMAIL_TEST_RESEND_API_KEY")
|
||||
ANYMAIL_TEST_RESEND_DOMAIN = os.getenv("ANYMAIL_TEST_RESEND_DOMAIN")
|
||||
|
||||
|
||||
@tag("resend", "live")
|
||||
@unittest.skipUnless(
|
||||
ANYMAIL_TEST_RESEND_API_KEY and ANYMAIL_TEST_RESEND_DOMAIN,
|
||||
"Set ANYMAIL_TEST_RESEND_API_KEY and ANYMAIL_TEST_RESEND_DOMAIN "
|
||||
"environment variables to run Resend integration tests",
|
||||
)
|
||||
@override_settings(
|
||||
ANYMAIL_RESEND_API_KEY=ANYMAIL_TEST_RESEND_API_KEY,
|
||||
EMAIL_BACKEND="anymail.backends.resend.EmailBackend",
|
||||
)
|
||||
class ResendBackendIntegrationTests(AnymailTestMixin, SimpleTestCase):
|
||||
"""Resend.com API integration tests
|
||||
|
||||
Resend doesn't have sandbox so these tests run
|
||||
against the **live** Resend API, using the
|
||||
environment variable `ANYMAIL_TEST_RESEND_API_KEY` as the API key,
|
||||
and `ANYMAIL_TEST_RESEND_DOMAIN` to construct sender addresses.
|
||||
If those variables are not set, these tests won't run.
|
||||
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.from_email = "from@%s" % ANYMAIL_TEST_RESEND_DOMAIN
|
||||
self.message = AnymailMessage(
|
||||
"Anymail Resend integration test",
|
||||
"Text content",
|
||||
self.from_email,
|
||||
["test+to1@anymail.dev"],
|
||||
)
|
||||
self.message.attach_alternative("<p>HTML content</p>", "text/html")
|
||||
|
||||
def test_simple_send(self):
|
||||
# Example of getting the Resend message id from the message
|
||||
sent_count = self.message.send()
|
||||
self.assertEqual(sent_count, 1)
|
||||
|
||||
anymail_status = self.message.anymail_status
|
||||
sent_status = anymail_status.recipients["test+to1@anymail.dev"].status
|
||||
message_id = anymail_status.recipients["test+to1@anymail.dev"].message_id
|
||||
|
||||
self.assertEqual(sent_status, "queued") # Resend always queues
|
||||
self.assertGreater(len(message_id), 0) # non-empty string
|
||||
# set of all recipient statuses:
|
||||
self.assertEqual(anymail_status.status, {sent_status})
|
||||
self.assertEqual(anymail_status.message_id, message_id)
|
||||
|
||||
def test_all_options(self):
|
||||
message = AnymailMessage(
|
||||
subject="Anymail Resend all-options integration test",
|
||||
body="This is the text body",
|
||||
# Verify workarounds for address formatting issues:
|
||||
from_email=formataddr(("Test «Från», med komma", self.from_email)),
|
||||
to=["test+to1@anymail.dev", '"Recipient 2, OK?" <test+to2@anymail.dev>'],
|
||||
cc=["test+cc1@anymail.dev", "Copy 2 <test+cc2@anymail.dev>"],
|
||||
bcc=["test+bcc1@anymail.dev", "Blind Copy 2 <test+bcc2@anymail.dev>"],
|
||||
reply_to=['"Reply, with comma" <reply@example.com>', "reply2@example.com"],
|
||||
headers={"X-Anymail-Test": "value", "X-Anymail-Count": 3},
|
||||
metadata={"meta1": "simple string", "meta2": 2},
|
||||
tags=["tag 1", "tag 2"],
|
||||
)
|
||||
message.attach_alternative("<p>HTML content</p>", "text/html")
|
||||
|
||||
message.attach("attachment1.txt", "Here is some\ntext for you", "text/plain")
|
||||
message.attach("attachment2.csv", "ID,Name\n1,Amy Lina", "text/csv")
|
||||
|
||||
message.send()
|
||||
# Resend always queues:
|
||||
self.assertEqual(message.anymail_status.status, {"queued"})
|
||||
self.assertGreater(
|
||||
len(message.anymail_status.message_id), 0
|
||||
) # non-empty string
|
||||
|
||||
@override_settings(ANYMAIL_RESEND_API_KEY="Hey, that's not an API key!")
|
||||
def test_invalid_api_key(self):
|
||||
with self.assertRaisesMessage(AnymailAPIError, "API key is invalid"):
|
||||
self.message.send()
|
||||
416
tests/test_resend_webhooks.py
Normal file
416
tests/test_resend_webhooks.py
Normal file
@@ -0,0 +1,416 @@
|
||||
import base64
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from unittest import skipIf, skipUnless
|
||||
from unittest.mock import ANY
|
||||
|
||||
from django.test import override_settings, tag
|
||||
|
||||
from anymail.exceptions import AnymailImproperlyInstalled, AnymailInsecureWebhookWarning
|
||||
from anymail.signals import AnymailTrackingEvent
|
||||
from anymail.webhooks.resend import ResendTrackingWebhookView
|
||||
|
||||
from .webhook_cases import WebhookBasicAuthTestCase, WebhookTestCase
|
||||
|
||||
# These tests are run both with and without 'svix' installed.
|
||||
try:
|
||||
from svix import Webhook
|
||||
except ImportError:
|
||||
SVIX_INSTALLED = False
|
||||
Webhook = None
|
||||
else:
|
||||
SVIX_INSTALLED = True
|
||||
|
||||
|
||||
def svix_secret(secret):
|
||||
return f"whsec_{base64.b64encode(secret.encode('ascii')).decode('ascii')}"
|
||||
|
||||
|
||||
TEST_SIGNING_SECRET = svix_secret("TEST_SIGNING_SECRET") if SVIX_INSTALLED else None
|
||||
TEST_WEBHOOK_MESSAGE_ID = "msg_abcdefghijklmnopqrst12345"
|
||||
|
||||
|
||||
class ResendWebhookTestCase(WebhookTestCase):
|
||||
def client_post_signed(self, url, json_data, svix_id=None, secret=None):
|
||||
"""Return self.client.post(url, serialized json_data) signed with secret"""
|
||||
svix_id = svix_id or TEST_WEBHOOK_MESSAGE_ID
|
||||
secret = secret or TEST_SIGNING_SECRET
|
||||
data = json.dumps(json_data)
|
||||
headers = {
|
||||
"svix-id": svix_id,
|
||||
}
|
||||
|
||||
if SVIX_INSTALLED:
|
||||
timestamp = datetime.now(tz=timezone.utc)
|
||||
signature = Webhook(secret).sign(
|
||||
msg_id=svix_id, timestamp=timestamp, data=data
|
||||
)
|
||||
headers.update(
|
||||
{
|
||||
"svix-timestamp": timestamp.timestamp(),
|
||||
"svix-signature": signature,
|
||||
}
|
||||
)
|
||||
|
||||
return self.client.post(
|
||||
url,
|
||||
content_type="application/json",
|
||||
data=data.encode("utf-8"),
|
||||
# Django 4.2+ test Client allows headers=headers;
|
||||
# before that, must convert to HTTP_ args:
|
||||
**{
|
||||
f"HTTP_{header.upper().replace('-', '_')}": value
|
||||
for header, value in headers.items()
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@tag("resend")
|
||||
@override_settings(ANYMAIL={}) # clear WEBHOOK_SECRET from base class
|
||||
class ResendWebhookSettingsTestCase(ResendWebhookTestCase):
|
||||
@skipIf(SVIX_INSTALLED, "test covers behavior when 'svix' package missing")
|
||||
@override_settings(ANYMAIL_RESEND_SIGNING_SECRET=svix_secret("settings secret"))
|
||||
def test_secret_requires_svix_installed(self):
|
||||
"""If webhook secret is specified, error if svix not available to verify"""
|
||||
with self.assertRaisesMessage(AnymailImproperlyInstalled, "svix"):
|
||||
self.client_post_signed("/anymail/resend/tracking/", {"type": "email.sent"})
|
||||
|
||||
# Test with and without SVIX_INSTALLED
|
||||
def test_basic_auth_required_without_secret(self):
|
||||
with self.assertWarns(AnymailInsecureWebhookWarning):
|
||||
self.client_post_signed("/anymail/resend/tracking/", {"type": "email.sent"})
|
||||
|
||||
# Test with and without SVIX_INSTALLED
|
||||
@override_settings(ANYMAIL={"WEBHOOK_SECRET": "username:password"})
|
||||
def test_signing_secret_optional_with_basic_auth(self):
|
||||
"""Secret verification is optional if using basic auth"""
|
||||
response = self.client_post_signed(
|
||||
"/anymail/resend/tracking/", {"type": "email.sent"}
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@skipUnless(SVIX_INSTALLED, "secret verification requires 'svix' package")
|
||||
@override_settings(ANYMAIL_RESEND_SIGNING_SECRET=svix_secret("settings secret"))
|
||||
def test_signing_secret_view_params(self):
|
||||
"""Webhook signing secret can be provided as a view param"""
|
||||
view_secret = svix_secret("view-level secret")
|
||||
view = ResendTrackingWebhookView.as_view(signing_secret=view_secret)
|
||||
view_instance = view.view_class(**view.view_initkwargs)
|
||||
self.assertEqual(view_instance.signing_secret, view_secret)
|
||||
|
||||
|
||||
@tag("resend")
|
||||
@override_settings(ANYMAIL_RESEND_SIGNING_SECRET=TEST_SIGNING_SECRET)
|
||||
class ResendWebhookSecurityTestCase(ResendWebhookTestCase, WebhookBasicAuthTestCase):
|
||||
should_warn_if_no_auth = TEST_SIGNING_SECRET is None
|
||||
|
||||
def call_webhook(self):
|
||||
return self.client_post_signed(
|
||||
"/anymail/resend/tracking/",
|
||||
{"type": "email.sent"},
|
||||
secret=TEST_SIGNING_SECRET,
|
||||
)
|
||||
|
||||
# Additional tests are in WebhookBasicAuthTestCase
|
||||
|
||||
@skipUnless(SVIX_INSTALLED, "signature verification requires 'svix' package")
|
||||
def test_verifies_correct_signature(self):
|
||||
response = self.client_post_signed(
|
||||
"/anymail/resend/tracking/",
|
||||
{"type": "email.sent"},
|
||||
secret=TEST_SIGNING_SECRET,
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@skipUnless(SVIX_INSTALLED, "signature verification requires 'svix' package")
|
||||
def test_verifies_missing_signature(self):
|
||||
response = self.client.post(
|
||||
"/anymail/resend/tracking/",
|
||||
content_type="application/json",
|
||||
data={"type": "email.sent"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
@skipUnless(SVIX_INSTALLED, "signature verification requires 'svix' package")
|
||||
def test_verifies_bad_signature(self):
|
||||
# This also verifies that the error log references the correct setting to check.
|
||||
with self.assertLogs() as logs:
|
||||
response = self.client_post_signed(
|
||||
"/anymail/resend/tracking/",
|
||||
{"type": "email.sent"},
|
||||
secret=svix_secret("wrong signing key"),
|
||||
)
|
||||
# SuspiciousOperation causes 400 response (even in test client):
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("check Anymail RESEND_SIGNING_SECRET", logs.output[0])
|
||||
|
||||
|
||||
@tag("resend")
|
||||
@override_settings(ANYMAIL_RESEND_SIGNING_SECRET=TEST_SIGNING_SECRET)
|
||||
class ResendTestCase(ResendWebhookTestCase):
|
||||
def test_sent_event(self):
|
||||
raw_event = {
|
||||
"created_at": "2023-09-28T17:19:43.736Z",
|
||||
"data": {
|
||||
"created_at": "2023-09-28T17:19:43.982Z",
|
||||
"email_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"from": "Sender <from@example.com>",
|
||||
"headers": [
|
||||
{"name": "Reply-To", "value": "reply@example.com"},
|
||||
{"name": "X-Tags", "value": '["tag1", "Tag 2"]'},
|
||||
{
|
||||
"name": "X-Metadata",
|
||||
"value": '{"cohort": "2018-08-B", "user_id": 123456}',
|
||||
},
|
||||
{"name": "Cc", "value": "cc1@example.org, Cc 2 <cc2@example.org>"},
|
||||
],
|
||||
"subject": "Sending test",
|
||||
"tags": {"tag1": "Tag_1_value", "tag2": "Tag_2_value"},
|
||||
"to": ["Recipient <to@example.org>", "to2@example.org"],
|
||||
},
|
||||
"type": "email.sent",
|
||||
}
|
||||
response = self.client_post_signed(
|
||||
"/anymail/resend/tracking/",
|
||||
raw_event,
|
||||
svix_id="msg_2W2D3qXLS5fOaPja1GDg7rF2CwB",
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
kwargs = self.assert_handler_called_once_with(
|
||||
self.tracking_handler,
|
||||
sender=ResendTrackingWebhookView,
|
||||
event=ANY,
|
||||
esp_name="Resend",
|
||||
)
|
||||
event = kwargs["event"]
|
||||
self.assertIsInstance(event, AnymailTrackingEvent)
|
||||
self.assertEqual(event.event_type, "sent")
|
||||
# event.timestamp comes from root-level created_at:
|
||||
self.assertEqual(
|
||||
event.timestamp,
|
||||
# "2023-09-28T17:19:43.736Z"
|
||||
datetime(2023, 9, 28, 17, 19, 43, microsecond=736000, tzinfo=timezone.utc),
|
||||
)
|
||||
# event.message_id matches the message.anymail_status.message_id when the
|
||||
# message was sent. It comes from data.email_id:
|
||||
self.assertEqual(event.message_id, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")
|
||||
# event.event_id is unique for each event, and comes from svix-id header:
|
||||
self.assertEqual(event.event_id, "msg_2W2D3qXLS5fOaPja1GDg7rF2CwB")
|
||||
# event.recipient is always the first "to" addr:
|
||||
self.assertEqual(event.recipient, "to@example.org")
|
||||
self.assertEqual(event.tags, ["tag1", "Tag 2"])
|
||||
self.assertEqual(event.metadata, {"cohort": "2018-08-B", "user_id": 123456})
|
||||
self.assertEqual(event.esp_event, raw_event)
|
||||
|
||||
# You can retrieve Resend native tags (which are different from Anymail tags)
|
||||
# from esp_event:
|
||||
resend_tags = event.esp_event["data"].get("tags", {})
|
||||
self.assertEqual(resend_tags, {"tag1": "Tag_1_value", "tag2": "Tag_2_value"})
|
||||
|
||||
def test_delivered_event(self):
|
||||
raw_event = {
|
||||
"created_at": "2023-09-28T17:19:44.823Z",
|
||||
"data": {
|
||||
"created_at": "2023-09-28T17:19:43.982Z",
|
||||
"email_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"from": "Sender <from@example.com>",
|
||||
"subject": "Sending test",
|
||||
"to": ["to@example.org"],
|
||||
},
|
||||
"type": "email.delivered",
|
||||
}
|
||||
response = self.client_post_signed("/anymail/resend/tracking/", raw_event)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
kwargs = self.assert_handler_called_once_with(
|
||||
self.tracking_handler,
|
||||
sender=ResendTrackingWebhookView,
|
||||
event=ANY,
|
||||
esp_name="Resend",
|
||||
)
|
||||
event = kwargs["event"]
|
||||
self.assertIsInstance(event, AnymailTrackingEvent)
|
||||
self.assertEqual(event.event_type, "delivered")
|
||||
self.assertEqual(event.recipient, "to@example.org")
|
||||
self.assertEqual(event.tags, [])
|
||||
self.assertEqual(event.metadata, {})
|
||||
|
||||
def test_hard_bounced_event(self):
|
||||
raw_event = {
|
||||
"created_at": "2023-10-02T18:11:26.101Z",
|
||||
"data": {
|
||||
"bounce": {
|
||||
"message": (
|
||||
"The recipient's email provider sent a hard bounce message, but"
|
||||
" didn't specify the reason for the hard bounce. We recommend"
|
||||
" removing the recipient's email address from your mailing list."
|
||||
" Sending messages to addresses that produce hard bounces can"
|
||||
" have a negative impact on your reputation as a sender."
|
||||
)
|
||||
},
|
||||
"created_at": "2023-10-02T18:11:25.729Z",
|
||||
"email_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"from": "Sender <from@example.com>",
|
||||
"subject": "Sending test",
|
||||
"to": ["bounced@resend.dev"],
|
||||
},
|
||||
"type": "email.bounced",
|
||||
}
|
||||
response = self.client_post_signed("/anymail/resend/tracking/", raw_event)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
kwargs = self.assert_handler_called_once_with(
|
||||
self.tracking_handler,
|
||||
sender=ResendTrackingWebhookView,
|
||||
event=ANY,
|
||||
esp_name="Resend",
|
||||
)
|
||||
event = kwargs["event"]
|
||||
self.assertEqual(event.event_type, "bounced")
|
||||
self.assertEqual(event.reject_reason, "bounced")
|
||||
self.assertRegex(
|
||||
event.description,
|
||||
r"^The recipient's email provider sent a hard bounce message.*",
|
||||
)
|
||||
self.assertIsNone(event.mta_response) # raw MTA info not provided
|
||||
|
||||
def test_suppressed_event(self):
|
||||
raw_event = {
|
||||
"created_at": "2023-10-01T20:01:01.598Z",
|
||||
"data": {
|
||||
"bounce": {
|
||||
"message": (
|
||||
"Resend has suppressed sending to this address because it is"
|
||||
" on the account-level suppression list. This does not count"
|
||||
" toward your bounce rate metric"
|
||||
)
|
||||
},
|
||||
"created_at": "2023-10-01T20:01:01.339Z",
|
||||
"email_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"from": "Sender <from@example.com>",
|
||||
"subject": "Sending test",
|
||||
"to": ["blocked@example.org"],
|
||||
},
|
||||
"type": "email.bounced",
|
||||
}
|
||||
response = self.client_post_signed("/anymail/resend/tracking/", raw_event)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
kwargs = self.assert_handler_called_once_with(
|
||||
self.tracking_handler,
|
||||
sender=ResendTrackingWebhookView,
|
||||
event=ANY,
|
||||
esp_name="Resend",
|
||||
)
|
||||
event = kwargs["event"]
|
||||
self.assertEqual(event.event_type, "bounced")
|
||||
self.assertEqual(event.reject_reason, "blocked")
|
||||
self.assertRegex(
|
||||
event.description, r"^Resend has suppressed sending to this address.*"
|
||||
)
|
||||
self.assertIsNone(event.mta_response) # raw MTA info not provided
|
||||
|
||||
def test_delivery_delayed_event(self):
|
||||
# Haven't been able to trigger a real-world version of this event
|
||||
# (even with SMTP reply 450, status 4.0.0 "temporary failure").
|
||||
# This is the sample payload from Resend's docs, but correcting the type
|
||||
# from "email.delivered_delayed" to "email.delivery_delayed" to match
|
||||
# docs and configuration UI.
|
||||
raw_event = {
|
||||
"type": "email.delivery_delayed", # "email.delivered_delayed",
|
||||
"created_at": "2023-02-22T23:41:12.126Z",
|
||||
"data": {
|
||||
"created_at": "2023-02-22T23:41:11.894719+00:00",
|
||||
"email_id": "56761188-7520-42d8-8898-ff6fc54ce618",
|
||||
"from": "Acme <onboarding@resend.dev>",
|
||||
"to": ["delivered@resend.dev"],
|
||||
"subject": "Sending this example",
|
||||
},
|
||||
}
|
||||
response = self.client_post_signed("/anymail/resend/tracking/", raw_event)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
kwargs = self.assert_handler_called_once_with(
|
||||
self.tracking_handler,
|
||||
sender=ResendTrackingWebhookView,
|
||||
event=ANY,
|
||||
esp_name="Resend",
|
||||
)
|
||||
event = kwargs["event"]
|
||||
self.assertEqual(event.event_type, "deferred")
|
||||
self.assertIsNone(event.reject_reason)
|
||||
self.assertIsNone(event.description)
|
||||
self.assertIsNone(event.mta_response) # raw MTA info not provided
|
||||
|
||||
def test_complained_event(self):
|
||||
raw_event = {
|
||||
"created_at": "2023-10-02T18:10:03.690Z",
|
||||
"data": {
|
||||
"created_at": "2023-10-02T18:10:03.241Z",
|
||||
"email_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"from": "Sender <from@example.com>",
|
||||
"subject": "Sending test",
|
||||
"to": ["complained@resend.dev"],
|
||||
},
|
||||
"type": "email.complained",
|
||||
}
|
||||
response = self.client_post_signed("/anymail/resend/tracking/", raw_event)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
kwargs = self.assert_handler_called_once_with(
|
||||
self.tracking_handler,
|
||||
sender=ResendTrackingWebhookView,
|
||||
event=ANY,
|
||||
esp_name="Resend",
|
||||
)
|
||||
event = kwargs["event"]
|
||||
self.assertEqual(event.event_type, "complained")
|
||||
|
||||
def test_opened_event(self):
|
||||
raw_event = {
|
||||
"created_at": "2023-09-28T17:20:38.990Z",
|
||||
"data": {
|
||||
"created_at": "2023-09-28T17:19:43.982Z",
|
||||
"email_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"from": "Sender <from@example.com>",
|
||||
"subject": "Sending test",
|
||||
"to": ["to@example.org"],
|
||||
},
|
||||
"type": "email.opened",
|
||||
}
|
||||
response = self.client_post_signed("/anymail/resend/tracking/", raw_event)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
kwargs = self.assert_handler_called_once_with(
|
||||
self.tracking_handler,
|
||||
sender=ResendTrackingWebhookView,
|
||||
event=ANY,
|
||||
esp_name="Resend",
|
||||
)
|
||||
event = kwargs["event"]
|
||||
self.assertEqual(event.event_type, "opened")
|
||||
|
||||
def test_clicked_event(self):
|
||||
raw_event = {
|
||||
"created_at": "2023-09-28T17:21:35.257Z",
|
||||
"data": {
|
||||
"click": {
|
||||
"ipAddress": "192.168.1.101",
|
||||
"link": "https://example.com/test",
|
||||
"timestamp": "2023-09-28T17:21:35.257Z",
|
||||
"userAgent": "Mozilla/5.0 ...",
|
||||
},
|
||||
"created_at": "2023-09-28T17:19:43.982Z",
|
||||
"email_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
|
||||
"from": "Sender <from@example.com>",
|
||||
"subject": "Sending test",
|
||||
"to": ["to@example.org"],
|
||||
},
|
||||
"type": "email.clicked",
|
||||
}
|
||||
response = self.client_post_signed("/anymail/resend/tracking/", raw_event)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
kwargs = self.assert_handler_called_once_with(
|
||||
self.tracking_handler,
|
||||
sender=ResendTrackingWebhookView,
|
||||
event=ANY,
|
||||
esp_name="Resend",
|
||||
)
|
||||
event = kwargs["event"]
|
||||
self.assertEqual(event.event_type, "clicked")
|
||||
self.assertEqual(event.click_url, "https://example.com/test")
|
||||
self.assertEqual(event.user_agent, "Mozilla/5.0 ...")
|
||||
Reference in New Issue
Block a user