mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-19 19:31:06 -05:00
Raise an `AnymailUnsupportedFeature` error when trying to use a `template_id` along with other content payload fields that SparkPost silently ignores when template_id is present.
980 lines
38 KiB
Python
980 lines
38 KiB
Python
import json
|
|
from datetime import date, datetime, timezone
|
|
from decimal import Decimal
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.image import MIMEImage
|
|
from email.mime.text import MIMEText
|
|
|
|
from django.core import mail
|
|
from django.test import override_settings, tag
|
|
from django.utils.timezone import (
|
|
get_fixed_timezone,
|
|
override as override_current_timezone,
|
|
)
|
|
|
|
from anymail.exceptions import (
|
|
AnymailAPIError,
|
|
AnymailConfigurationError,
|
|
AnymailRecipientsRefused,
|
|
AnymailSerializationError,
|
|
AnymailUnsupportedFeature,
|
|
)
|
|
from anymail.message import AnymailMessage, attach_inline_image_file
|
|
|
|
from .mock_requests_backend import RequestsBackendMockAPITestCase
|
|
from .utils import (
|
|
SAMPLE_IMAGE_FILENAME,
|
|
decode_att,
|
|
sample_image_content,
|
|
sample_image_path,
|
|
)
|
|
|
|
|
|
@tag("sparkpost")
|
|
@override_settings(
|
|
EMAIL_BACKEND="anymail.backends.sparkpost.EmailBackend",
|
|
ANYMAIL={"SPARKPOST_API_KEY": "test_api_key"},
|
|
)
|
|
class SparkPostBackendMockAPITestCase(RequestsBackendMockAPITestCase):
|
|
"""TestCase that uses SparkPostEmailBackend with a mocked transmissions.send API"""
|
|
|
|
DEFAULT_RAW_RESPONSE = b"""{
|
|
"results": {
|
|
"id": "12345678901234567890",
|
|
"total_accepted_recipients": 1,
|
|
"total_rejected_recipients": 0
|
|
}
|
|
}"""
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
# Simple message useful for many tests
|
|
self.message = mail.EmailMultiAlternatives(
|
|
"Subject", "Text Body", "from@example.com", ["to@example.com"]
|
|
)
|
|
|
|
def set_mock_result(self, accepted=1, rejected=0, id="12345678901234567890"):
|
|
"""Set a mock response that reflects count of accepted/rejected recipients"""
|
|
raw = json.dumps(
|
|
{
|
|
"results": {
|
|
"id": id,
|
|
"total_accepted_recipients": accepted,
|
|
"total_rejected_recipients": rejected,
|
|
}
|
|
}
|
|
).encode("utf-8")
|
|
self.set_mock_response(raw=raw)
|
|
return raw
|
|
|
|
|
|
@tag("sparkpost")
|
|
class SparkPostBackendStandardEmailTests(SparkPostBackendMockAPITestCase):
|
|
"""Test backend support for Django standard email features"""
|
|
|
|
def test_send_mail(self):
|
|
"""Test basic API for simple send"""
|
|
mail.send_mail(
|
|
"Subject here",
|
|
"Here is the message.",
|
|
"from@example.com",
|
|
["to@example.com"],
|
|
fail_silently=False,
|
|
)
|
|
|
|
self.assert_esp_called("/api/v1/transmissions/")
|
|
|
|
headers = self.get_api_call_headers()
|
|
self.assertEqual("test_api_key", headers["Authorization"])
|
|
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["content"]["subject"], "Subject here")
|
|
self.assertEqual(data["content"]["text"], "Here is the message.")
|
|
self.assertEqual(data["content"]["from"], "from@example.com")
|
|
self.assertEqual(
|
|
data["recipients"],
|
|
[{"address": {"email": "to@example.com", "header_to": "to@example.com"}}],
|
|
)
|
|
|
|
def test_name_addr(self):
|
|
"""Make sure RFC2822 name-addr format (with display-name) is allowed
|
|
|
|
(Test both sender and recipient addresses)
|
|
"""
|
|
self.set_mock_result(accepted=6)
|
|
msg = mail.EmailMessage(
|
|
"Subject",
|
|
"Message",
|
|
"From Name <from@example.com>",
|
|
["Recipient #1 <to1@example.com>", "to2@example.com"],
|
|
cc=["Carbon Copy <cc1@example.com>", "cc2@example.com"],
|
|
bcc=["Blind Copy <bcc1@example.com>", "bcc2@example.com"],
|
|
)
|
|
msg.send()
|
|
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["content"]["from"], "From Name <from@example.com>")
|
|
# This also checks recipient generation for cc and bcc. Because it's *not* a
|
|
# batch send, each recipient should see a To header reflecting all To addresses.
|
|
self.assertCountEqual(
|
|
data["recipients"],
|
|
[
|
|
{
|
|
"address": {
|
|
"email": "to1@example.com",
|
|
"header_to": "Recipient #1 <to1@example.com>, to2@example.com",
|
|
}
|
|
},
|
|
{
|
|
"address": {
|
|
"email": "to2@example.com",
|
|
"header_to": "Recipient #1 <to1@example.com>, to2@example.com",
|
|
}
|
|
},
|
|
# cc and bcc must be explicitly specified as recipients
|
|
{
|
|
"address": {
|
|
"email": "cc1@example.com",
|
|
"header_to": "Recipient #1 <to1@example.com>, to2@example.com",
|
|
}
|
|
},
|
|
{
|
|
"address": {
|
|
"email": "cc2@example.com",
|
|
"header_to": "Recipient #1 <to1@example.com>, to2@example.com",
|
|
}
|
|
},
|
|
{
|
|
"address": {
|
|
"email": "bcc1@example.com",
|
|
"header_to": "Recipient #1 <to1@example.com>, to2@example.com",
|
|
}
|
|
},
|
|
{
|
|
"address": {
|
|
"email": "bcc2@example.com",
|
|
"header_to": "Recipient #1 <to1@example.com>, to2@example.com",
|
|
}
|
|
},
|
|
],
|
|
)
|
|
# Make sure we added a formatted Cc header visible to recipients
|
|
# (and not a Bcc header)
|
|
self.assertEqual(
|
|
data["content"]["headers"],
|
|
{"Cc": "Carbon Copy <cc1@example.com>, cc2@example.com"},
|
|
)
|
|
|
|
def test_custom_headers(self):
|
|
self.set_mock_result(accepted=6)
|
|
email = mail.EmailMessage(
|
|
"Subject",
|
|
"Body goes here",
|
|
"from@example.com",
|
|
["to1@example.com"],
|
|
cc=["cc1@example.com"],
|
|
headers={
|
|
"Reply-To": "another@example.com",
|
|
"X-MyHeader": "my value",
|
|
"Message-ID": "mycustommsgid@example.com",
|
|
},
|
|
)
|
|
email.send()
|
|
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(
|
|
data["content"]["headers"],
|
|
{
|
|
# Reply-To moved to separate param (below)
|
|
"X-MyHeader": "my value",
|
|
"Message-ID": "mycustommsgid@example.com",
|
|
"Cc": "cc1@example.com", # Cc header added
|
|
},
|
|
)
|
|
self.assertEqual(data["content"]["reply_to"], "another@example.com")
|
|
|
|
def test_html_message(self):
|
|
text_content = "This is an important message."
|
|
html_content = "<p>This is an <strong>important</strong> message.</p>"
|
|
email = mail.EmailMultiAlternatives(
|
|
"Subject", text_content, "from@example.com", ["to@example.com"]
|
|
)
|
|
email.attach_alternative(html_content, "text/html")
|
|
email.send()
|
|
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["content"]["text"], text_content)
|
|
self.assertEqual(data["content"]["html"], html_content)
|
|
# Don't accidentally send the html part as an attachment:
|
|
self.assertNotIn("attachments", data["content"])
|
|
|
|
def test_html_only_message(self):
|
|
html_content = "<p>This is an <strong>important</strong> message.</p>"
|
|
email = mail.EmailMessage(
|
|
"Subject", html_content, "from@example.com", ["to@example.com"]
|
|
)
|
|
email.content_subtype = "html" # Main content is now text/html
|
|
email.send()
|
|
|
|
data = self.get_api_call_json()
|
|
self.assertNotIn("text", data["content"])
|
|
self.assertEqual(data["content"]["html"], html_content)
|
|
|
|
def test_reply_to(self):
|
|
email = mail.EmailMessage(
|
|
"Subject",
|
|
"Body goes here",
|
|
"from@example.com",
|
|
["to1@example.com"],
|
|
reply_to=["reply@example.com", "Other <reply2@example.com>"],
|
|
headers={"X-Other": "Keep"},
|
|
)
|
|
email.send()
|
|
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(
|
|
data["content"]["reply_to"], "reply@example.com, Other <reply2@example.com>"
|
|
)
|
|
# don't lose other headers:
|
|
self.assertEqual(data["content"]["headers"], {"X-Other": "Keep"})
|
|
|
|
def test_attachments(self):
|
|
text_content = "* Item one\n* Item two\n* Item three"
|
|
self.message.attach(
|
|
filename="test.txt", content=text_content, mimetype="text/plain"
|
|
)
|
|
|
|
# Should guess mimetype if not provided...
|
|
png_content = b"PNG\xb4 pretend this is the contents of a png file"
|
|
self.message.attach(filename="test.png", content=png_content)
|
|
|
|
# Should work with a MIMEBase object (also tests no filename)...
|
|
pdf_content = b"PDF\xb4 pretend this is valid pdf params"
|
|
mimeattachment = MIMEBase("application", "pdf")
|
|
mimeattachment.set_payload(pdf_content)
|
|
self.message.attach(mimeattachment)
|
|
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
attachments = data["content"]["attachments"]
|
|
self.assertEqual(len(attachments), 3)
|
|
self.assertEqual(attachments[0]["type"], "text/plain")
|
|
self.assertEqual(attachments[0]["name"], "test.txt")
|
|
self.assertEqual(
|
|
decode_att(attachments[0]["data"]).decode("ascii"), text_content
|
|
)
|
|
self.assertEqual(attachments[1]["type"], "image/png") # inferred from filename
|
|
self.assertEqual(attachments[1]["name"], "test.png")
|
|
self.assertEqual(decode_att(attachments[1]["data"]), png_content)
|
|
self.assertEqual(attachments[2]["type"], "application/pdf")
|
|
self.assertEqual(attachments[2]["name"], "") # none
|
|
self.assertEqual(decode_att(attachments[2]["data"]), pdf_content)
|
|
# Make sure the image attachment is not treated as embedded:
|
|
self.assertNotIn("inline_images", data["content"])
|
|
|
|
def test_unicode_attachment_correctly_decoded(self):
|
|
# Slight modification from the Django unicode docs:
|
|
# http://django.readthedocs.org/en/latest/ref/unicode.html#email
|
|
self.message.attach(
|
|
"Une pièce jointe.html", "<p>\u2019</p>", mimetype="text/html"
|
|
)
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
attachments = data["content"]["attachments"]
|
|
self.assertEqual(len(attachments), 1)
|
|
|
|
def test_attachment_charset(self):
|
|
# SparkPost allows charset param in attachment type
|
|
self.message.attach(MIMEText("Une pièce jointe", "plain", "iso8859-1"))
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
attachment = data["content"]["attachments"][0]
|
|
self.assertEqual(attachment["type"], 'text/plain; charset="iso8859-1"')
|
|
self.assertEqual(
|
|
decode_att(attachment["data"]), "Une pièce jointe".encode("iso8859-1")
|
|
)
|
|
|
|
def test_embedded_images(self):
|
|
image_filename = SAMPLE_IMAGE_FILENAME
|
|
image_path = sample_image_path(image_filename)
|
|
image_data = sample_image_content(image_filename)
|
|
|
|
cid = attach_inline_image_file(self.message, image_path)
|
|
html_content = (
|
|
'<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % cid
|
|
)
|
|
self.message.attach_alternative(html_content, "text/html")
|
|
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["content"]["html"], html_content)
|
|
|
|
self.assertEqual(len(data["content"]["inline_images"]), 1)
|
|
self.assertEqual(data["content"]["inline_images"][0]["type"], "image/png")
|
|
self.assertEqual(data["content"]["inline_images"][0]["name"], cid)
|
|
self.assertEqual(
|
|
decode_att(data["content"]["inline_images"][0]["data"]), image_data
|
|
)
|
|
# Make sure neither the html nor the inline image is treated as an attachment:
|
|
self.assertNotIn("attachments", data["content"])
|
|
|
|
def test_attached_images(self):
|
|
image_filename = SAMPLE_IMAGE_FILENAME
|
|
image_path = sample_image_path(image_filename)
|
|
image_data = sample_image_content(image_filename)
|
|
|
|
# option 1: attach as a file
|
|
self.message.attach_file(image_path)
|
|
|
|
# option 2: construct the MIMEImage and attach it directly
|
|
image = MIMEImage(image_data)
|
|
self.message.attach(image)
|
|
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
attachments = data["content"]["attachments"]
|
|
self.assertEqual(len(attachments), 2)
|
|
self.assertEqual(attachments[0]["type"], "image/png")
|
|
self.assertEqual(attachments[0]["name"], image_filename)
|
|
self.assertEqual(decode_att(attachments[0]["data"]), image_data)
|
|
self.assertEqual(attachments[1]["type"], "image/png")
|
|
self.assertEqual(attachments[1]["name"], "") # unknown -- not attached as file
|
|
self.assertEqual(decode_att(attachments[1]["data"]), image_data)
|
|
# Make sure the image attachments are not treated as embedded:
|
|
self.assertNotIn("inline_images", data["content"])
|
|
|
|
def test_multiple_html_alternatives(self):
|
|
# Multiple text/html alternatives not allowed
|
|
self.message.attach_alternative("<p>First html is OK</p>", "text/html")
|
|
self.message.attach_alternative("<p>But not second html</p>", "text/html")
|
|
with self.assertRaises(AnymailUnsupportedFeature):
|
|
self.message.send()
|
|
|
|
def test_amp_html_alternative(self):
|
|
# SparkPost *does* support text/x-amp-html alongside text/html
|
|
self.message.attach_alternative("<p>HTML</p>", "text/html")
|
|
self.message.attach_alternative("<p>And AMP HTML</p>", "text/x-amp-html")
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["content"]["html"], "<p>HTML</p>")
|
|
self.assertEqual(data["content"]["amp_html"], "<p>And AMP HTML</p>")
|
|
|
|
def test_html_alternative(self):
|
|
# Only html alternatives allowed
|
|
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
|
with self.assertRaises(AnymailUnsupportedFeature):
|
|
self.message.send()
|
|
|
|
def test_alternatives_fail_silently(self):
|
|
# Make sure fail_silently is respected
|
|
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
|
sent = self.message.send(fail_silently=True)
|
|
self.assert_esp_not_called("API should not be called when send fails silently")
|
|
self.assertEqual(sent, 0)
|
|
|
|
def test_suppress_empty_address_lists(self):
|
|
"""Empty to, cc, bcc, and reply_to shouldn't generate empty headers"""
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertNotIn("headers", data["content"]) # No Cc, Bcc or Reply-To header
|
|
self.assertNotIn("reply_to", data["content"])
|
|
|
|
def test_empty_to(self):
|
|
# Test empty `to`--but send requires at least one recipient somewhere (like cc)
|
|
self.message.to = []
|
|
self.message.cc = ["cc@example.com"]
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(
|
|
data["recipients"],
|
|
[
|
|
{
|
|
"address": {
|
|
"email": "cc@example.com",
|
|
# This results in a message with an empty To header, as desired:
|
|
"header_to": "",
|
|
},
|
|
}
|
|
],
|
|
)
|
|
|
|
def test_api_failure(self):
|
|
self.set_mock_response(status_code=400)
|
|
with self.assertRaisesMessage(AnymailAPIError, "SparkPost API response 400"):
|
|
self.message.send()
|
|
|
|
def test_api_failure_fail_silently(self):
|
|
# Make sure fail_silently is respected
|
|
self.set_mock_response(status_code=400)
|
|
sent = self.message.send(fail_silently=True)
|
|
self.assertEqual(sent, 0)
|
|
|
|
def test_api_error_includes_details(self):
|
|
"""AnymailAPIError should include ESP's error message"""
|
|
failure_response = b"""{
|
|
"errors": [{
|
|
"message": "Helpful explanation from your ESP"
|
|
}]
|
|
}"""
|
|
self.set_mock_response(status_code=400, raw=failure_response)
|
|
with self.assertRaisesMessage(
|
|
AnymailAPIError, "Helpful explanation from your ESP"
|
|
):
|
|
self.message.send()
|
|
|
|
|
|
@tag("sparkpost")
|
|
class SparkPostBackendAnymailFeatureTests(SparkPostBackendMockAPITestCase):
|
|
"""Test backend support for Anymail added features"""
|
|
|
|
def test_envelope_sender(self):
|
|
self.message.envelope_sender = "bounce-handler@bounces.example.com"
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["return_path"], "bounce-handler@bounces.example.com")
|
|
|
|
def test_metadata(self):
|
|
self.message.metadata = {"user_id": "12345", "items": "spark, post"}
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["metadata"], {"user_id": "12345", "items": "spark, post"})
|
|
|
|
def test_send_at(self):
|
|
utc_plus_6 = get_fixed_timezone(6 * 60)
|
|
utc_minus_8 = get_fixed_timezone(-8 * 60)
|
|
|
|
# SparkPost expects ISO-8601 YYYY-MM-DDTHH:MM:SS+-HH:MM
|
|
with override_current_timezone(utc_plus_6):
|
|
# Timezone-aware datetime converted to UTC:
|
|
self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=utc_minus_8)
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["options"]["start_time"], "2016-03-04T05:06:07-08:00")
|
|
|
|
# Explicit UTC:
|
|
self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=timezone.utc)
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["options"]["start_time"], "2016-03-04T05:06:07+00:00")
|
|
|
|
# Timezone-naive datetime assumed to be Django current_timezone
|
|
# (also checks stripping microseconds)
|
|
self.message.send_at = datetime(2022, 10, 11, 12, 13, 14, 567)
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["options"]["start_time"], "2022-10-11T12:13:14+06:00")
|
|
|
|
# Date-only treated as midnight in current timezone
|
|
self.message.send_at = date(2022, 10, 22)
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["options"]["start_time"], "2022-10-22T00:00:00+06:00")
|
|
|
|
# POSIX timestamp
|
|
self.message.send_at = 1651820889 # 2022-05-06 07:08:09 UTC
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["options"]["start_time"], "2022-05-06T07:08:09+00:00")
|
|
|
|
# String passed unchanged (this is *not* portable between ESPs)
|
|
self.message.send_at = "2022-10-13T18:02:00-11:30"
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["options"]["start_time"], "2022-10-13T18:02:00-11:30")
|
|
|
|
def test_tags(self):
|
|
self.message.tags = ["receipt"]
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["campaign_id"], "receipt")
|
|
|
|
self.message.tags = ["receipt", "repeat-user"]
|
|
with self.assertRaisesMessage(AnymailUnsupportedFeature, "multiple tags"):
|
|
self.message.send()
|
|
|
|
def test_tracking(self):
|
|
# Test one way...
|
|
self.message.track_opens = True
|
|
self.message.track_clicks = False
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["options"]["open_tracking"], True)
|
|
self.assertEqual(data["options"]["click_tracking"], False)
|
|
|
|
# ...and the opposite way
|
|
self.message.track_opens = False
|
|
self.message.track_clicks = True
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["options"]["open_tracking"], False)
|
|
self.assertEqual(data["options"]["click_tracking"], True)
|
|
|
|
def test_template_id(self):
|
|
message = mail.EmailMultiAlternatives(to=["to@example.com"])
|
|
message.from_email = None
|
|
message.template_id = "welcome_template"
|
|
message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["content"]["template_id"], "welcome_template")
|
|
# SparkPost disallows all content (even empty strings) with stored template:
|
|
self.assertNotIn("subject", data["content"])
|
|
self.assertNotIn("text", data["content"])
|
|
self.assertNotIn("html", data["content"])
|
|
self.assertNotIn("from", data["content"])
|
|
|
|
def test_template_id_ignores_default_from_email(self):
|
|
# No from_email, or from_email=None in constructor,
|
|
# uses settings.DEFAULT_FROM_EMAIL. We strip that with a template_id:
|
|
message = AnymailMessage(to=["to@example.com"], template_id="welcome_template")
|
|
self.assertIsNotNone(message.from_email) # because it's DEFAULT_FROM_EMAIL
|
|
message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertNotIn("from", data["content"])
|
|
|
|
def test_unsupported_content_with_template_id(self):
|
|
# Make sure we raise an error for options that SparkPost
|
|
# silently ignores when sending with a template_id
|
|
message = AnymailMessage(
|
|
to=["to@example.com"],
|
|
from_email="non-default-from@example.com",
|
|
reply_to=["reply@example.com"],
|
|
headers={"X-Custom": "header"},
|
|
template_id="welcome_template",
|
|
)
|
|
message.attach(filename="test.txt", content="attachment", mimetype="text/plain")
|
|
with self.assertRaisesMessage(
|
|
AnymailUnsupportedFeature,
|
|
"template_id with attachments, extra headers and/or cc recipients,"
|
|
" from_email, reply_to",
|
|
):
|
|
message.send()
|
|
|
|
def test_merge_data(self):
|
|
self.set_mock_result(accepted=4) # two 'to' plus one 'cc' for each 'to'
|
|
self.message.to = ["alice@example.com", "Bob <bob@example.com>"]
|
|
self.message.cc = ["cc@example.com"]
|
|
self.message.body = "Hi {{address.name}}. Welcome to {{group}} at {{site}}."
|
|
self.message.merge_data = {
|
|
"alice@example.com": {"name": "Alice", "group": "Developers"},
|
|
"bob@example.com": {"name": "Bob"}, # and leave group undefined
|
|
"nobody@example.com": {"name": "Not a recipient for this message"},
|
|
}
|
|
self.message.merge_global_data = {"group": "Users", "site": "ExampleCo"}
|
|
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(
|
|
{"group": "Users", "site": "ExampleCo"}, data["substitution_data"]
|
|
)
|
|
self.assertEqual(
|
|
[
|
|
{
|
|
"address": {
|
|
"email": "alice@example.com",
|
|
"header_to": "alice@example.com",
|
|
},
|
|
"substitution_data": {"name": "Alice", "group": "Developers"},
|
|
},
|
|
{
|
|
"address": {
|
|
"email": "bob@example.com",
|
|
"header_to": "Bob <bob@example.com>",
|
|
},
|
|
"substitution_data": {"name": "Bob"},
|
|
},
|
|
{ # duplicated for cc recipients...
|
|
"address": {
|
|
"email": "cc@example.com",
|
|
"header_to": "alice@example.com",
|
|
},
|
|
"substitution_data": {"name": "Alice", "group": "Developers"},
|
|
},
|
|
{
|
|
"address": {
|
|
"email": "cc@example.com",
|
|
"header_to": "Bob <bob@example.com>",
|
|
},
|
|
"substitution_data": {"name": "Bob"},
|
|
},
|
|
],
|
|
data["recipients"],
|
|
)
|
|
|
|
def test_merge_metadata(self):
|
|
self.set_mock_result(accepted=2)
|
|
self.message.to = ["alice@example.com", "Bob <bob@example.com>"]
|
|
self.message.merge_metadata = {
|
|
"alice@example.com": {"order_id": 123},
|
|
"bob@example.com": {"order_id": 678, "tier": "premium"},
|
|
}
|
|
self.message.metadata = {"notification_batch": "zx912"}
|
|
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(
|
|
[
|
|
{
|
|
"address": {
|
|
"email": "alice@example.com",
|
|
"header_to": "alice@example.com",
|
|
},
|
|
"metadata": {"order_id": 123},
|
|
},
|
|
{
|
|
"address": {
|
|
"email": "bob@example.com",
|
|
"header_to": "Bob <bob@example.com>",
|
|
},
|
|
"metadata": {"order_id": 678, "tier": "premium"},
|
|
},
|
|
],
|
|
data["recipients"],
|
|
)
|
|
self.assertEqual(data["metadata"], {"notification_batch": "zx912"})
|
|
|
|
def test_merge_headers(self):
|
|
self.set_mock_result(accepted=2)
|
|
self.message.to = ["alice@example.com", "Bob <bob@example.com>"]
|
|
self.message.extra_headers = {
|
|
"X-Custom-1": "custom 1",
|
|
"X-Custom-2": "custom 2 (default)",
|
|
}
|
|
self.message.merge_headers = {
|
|
"alice@example.com": {
|
|
"X-Custom-2": "custom 2 alice",
|
|
"X-Custom-3": "custom 3 alice",
|
|
},
|
|
"bob@example.com": {"X-Custom-2": "custom 2 bob"},
|
|
}
|
|
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
recipients = data["recipients"]
|
|
self.assertEqual(len(recipients), 2)
|
|
self.assertEqual(recipients[0]["address"]["email"], "alice@example.com")
|
|
self.assertEqual(
|
|
recipients[0]["substitution_data"],
|
|
{
|
|
"Header__X_Custom_2": "custom 2 alice",
|
|
"Header__X_Custom_3": "custom 3 alice",
|
|
},
|
|
)
|
|
self.assertEqual(recipients[1]["address"]["email"], "bob@example.com")
|
|
self.assertEqual(
|
|
recipients[1]["substitution_data"],
|
|
{
|
|
"Header__X_Custom_2": "custom 2 bob",
|
|
},
|
|
)
|
|
# Indirect merge_headers through template substitutions:
|
|
self.assertEqual(
|
|
data["content"]["headers"],
|
|
{
|
|
"X-Custom-1": "custom 1", # (not a merge_header, value unchanged)
|
|
"X-Custom-2": "{{Header__X_Custom_2}}",
|
|
"X-Custom-3": "{{Header__X_Custom_3}}",
|
|
},
|
|
)
|
|
# Defaults for merge_headers in global substitution_data:
|
|
self.assertEqual(
|
|
data["substitution_data"],
|
|
{
|
|
"Header__X_Custom_2": "custom 2 (default)",
|
|
# No default specified for X-Custom-3; SparkPost will use empty string
|
|
},
|
|
)
|
|
|
|
def test_default_omits_options(self):
|
|
"""Make sure by default we don't send any ESP-specific options.
|
|
|
|
Options not specified by the caller should be omitted entirely from
|
|
the API call (*not* sent as False or empty). This ensures
|
|
that your ESP account settings apply by default.
|
|
"""
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertNotIn("campaign_id", data)
|
|
self.assertNotIn("metadata", data)
|
|
# covers start_time, click_tracking, open_tracking:
|
|
self.assertNotIn("options", data)
|
|
self.assertNotIn("substitution_data", data)
|
|
self.assertNotIn("template_id", data["content"])
|
|
|
|
def test_esp_extra(self):
|
|
self.message.esp_extra = {
|
|
"description": "The description",
|
|
"options": {
|
|
"transactional": True,
|
|
},
|
|
"content": {
|
|
"use_draft_template": True,
|
|
"ab_test_id": "highlight_support_links",
|
|
},
|
|
}
|
|
self.message.track_clicks = True
|
|
self.message.send()
|
|
data = self.get_api_call_json()
|
|
self.assertEqual(data["description"], "The description")
|
|
self.assertEqual(
|
|
data["options"],
|
|
{
|
|
"transactional": True,
|
|
"click_tracking": True, # deep merge
|
|
},
|
|
)
|
|
self.assertDictMatches(
|
|
{
|
|
"use_draft_template": True,
|
|
"ab_test_id": "highlight_support_links",
|
|
"text": "Text Body", # deep merge
|
|
"subject": "Subject", # deep merge
|
|
},
|
|
data["content"],
|
|
)
|
|
|
|
def test_send_attaches_anymail_status(self):
|
|
"""The anymail_status should be attached to the message when it is sent"""
|
|
response_content = self.set_mock_result(accepted=1, rejected=0, id="9876543210")
|
|
msg = mail.EmailMessage(
|
|
"Subject",
|
|
"Message",
|
|
"from@example.com",
|
|
["to1@example.com"],
|
|
)
|
|
sent = msg.send()
|
|
self.assertEqual(sent, 1)
|
|
self.assertEqual(msg.anymail_status.status, {"queued"})
|
|
self.assertEqual(msg.anymail_status.message_id, "9876543210")
|
|
self.assertEqual(
|
|
msg.anymail_status.recipients["to1@example.com"].status, "queued"
|
|
)
|
|
self.assertEqual(
|
|
msg.anymail_status.recipients["to1@example.com"].message_id, "9876543210"
|
|
)
|
|
self.assertEqual(msg.anymail_status.esp_response.content, response_content)
|
|
|
|
@override_settings(
|
|
ANYMAIL_IGNORE_RECIPIENT_STATUS=True # exception is tested later
|
|
)
|
|
def test_send_all_rejected(self):
|
|
"""The anymail_status should be 'rejected' when all recipients rejected"""
|
|
self.set_mock_result(accepted=0, rejected=2)
|
|
msg = mail.EmailMessage(
|
|
"Subject",
|
|
"Message",
|
|
"from@example.com",
|
|
["to1@example.com", "to2@example.com"],
|
|
)
|
|
msg.send()
|
|
self.assertEqual(msg.anymail_status.status, {"rejected"})
|
|
self.assertEqual(
|
|
msg.anymail_status.recipients["to1@example.com"].status, "rejected"
|
|
)
|
|
self.assertEqual(
|
|
msg.anymail_status.recipients["to2@example.com"].status, "rejected"
|
|
)
|
|
|
|
def test_send_some_rejected(self):
|
|
"""
|
|
The anymail_status should be 'unknown'
|
|
when some recipients accepted and some rejected
|
|
"""
|
|
self.set_mock_result(accepted=1, rejected=1)
|
|
msg = mail.EmailMessage(
|
|
"Subject",
|
|
"Message",
|
|
"from@example.com",
|
|
["to1@example.com", "to2@example.com"],
|
|
)
|
|
msg.send()
|
|
self.assertEqual(msg.anymail_status.status, {"unknown"})
|
|
self.assertEqual(
|
|
msg.anymail_status.recipients["to1@example.com"].status, "unknown"
|
|
)
|
|
self.assertEqual(
|
|
msg.anymail_status.recipients["to2@example.com"].status, "unknown"
|
|
)
|
|
|
|
def test_send_unexpected_count(self):
|
|
"""The anymail_status should be 'unknown' when the total result count
|
|
doesn't match the number of recipients"""
|
|
self.set_mock_result(accepted=3, rejected=0) # but only 2 in the to-list
|
|
msg = mail.EmailMessage(
|
|
"Subject",
|
|
"Message",
|
|
"from@example.com",
|
|
["to1@example.com", "to2@example.com"],
|
|
)
|
|
msg.send()
|
|
self.assertEqual(msg.anymail_status.status, {"unknown"})
|
|
self.assertEqual(
|
|
msg.anymail_status.recipients["to1@example.com"].status, "unknown"
|
|
)
|
|
self.assertEqual(
|
|
msg.anymail_status.recipients["to2@example.com"].status, "unknown"
|
|
)
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
def test_send_failed_anymail_status(self):
|
|
"""If the send fails, anymail_status should contain initial values"""
|
|
self.set_mock_response(status_code=400)
|
|
sent = self.message.send(fail_silently=True)
|
|
self.assertEqual(sent, 0)
|
|
self.assertIsNone(self.message.anymail_status.status)
|
|
self.assertIsNone(self.message.anymail_status.message_id)
|
|
self.assertEqual(self.message.anymail_status.recipients, {})
|
|
self.assertIsNone(self.message.anymail_status.esp_response)
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
def test_send_unparsable_response(self):
|
|
"""
|
|
If the send succeeds, but result is unexpected format,
|
|
should raise an API exception
|
|
"""
|
|
response_content = b"""{"wrong": "format"}"""
|
|
self.set_mock_response(raw=response_content)
|
|
with self.assertRaises(AnymailAPIError):
|
|
self.message.send()
|
|
self.assertIsNone(self.message.anymail_status.status)
|
|
self.assertIsNone(self.message.anymail_status.message_id)
|
|
self.assertEqual(self.message.anymail_status.recipients, {})
|
|
self.assertEqual(
|
|
self.message.anymail_status.esp_response.content, response_content
|
|
)
|
|
|
|
def test_json_serialization_errors(self):
|
|
"""Try to provide more information about non-json-serializable data"""
|
|
self.message.tags = [Decimal("19.99")] # yeah, don't do this
|
|
with self.assertRaises(AnymailSerializationError) as cm:
|
|
self.message.send()
|
|
print(self.get_api_call_json())
|
|
err = cm.exception
|
|
self.assertIsInstance(err, TypeError) # compatibility with json.dumps
|
|
# our added context:
|
|
self.assertIn("Don't know how to send this data to SparkPost", str(err))
|
|
# original message:
|
|
self.assertRegex(str(err), r"Decimal.*is not JSON serializable")
|
|
|
|
|
|
@tag("sparkpost")
|
|
class SparkPostBackendRecipientsRefusedTests(SparkPostBackendMockAPITestCase):
|
|
"""
|
|
Should raise AnymailRecipientsRefused when *all* recipients are rejected or invalid
|
|
"""
|
|
|
|
def test_recipients_refused(self):
|
|
self.set_mock_result(accepted=0, rejected=2)
|
|
msg = mail.EmailMessage(
|
|
"Subject",
|
|
"Body",
|
|
"from@example.com",
|
|
["invalid@localhost", "reject@example.com"],
|
|
)
|
|
with self.assertRaises(AnymailRecipientsRefused):
|
|
msg.send()
|
|
|
|
def test_fail_silently(self):
|
|
self.set_mock_result(accepted=0, rejected=2)
|
|
sent = mail.send_mail(
|
|
"Subject",
|
|
"Body",
|
|
"from@example.com",
|
|
["invalid@localhost", "reject@example.com"],
|
|
fail_silently=True,
|
|
)
|
|
self.assertEqual(sent, 0)
|
|
|
|
def test_mixed_response(self):
|
|
"""If *any* recipients are valid or queued, no exception is raised"""
|
|
self.set_mock_result(accepted=2, rejected=2)
|
|
msg = mail.EmailMessage(
|
|
"Subject",
|
|
"Body",
|
|
"from@example.com",
|
|
[
|
|
"invalid@localhost",
|
|
"valid@example.com",
|
|
"reject@example.com",
|
|
"also.valid@example.com",
|
|
],
|
|
)
|
|
sent = msg.send()
|
|
# one message sent, successfully, to 2 of 4 recipients:
|
|
self.assertEqual(sent, 1)
|
|
status = msg.anymail_status
|
|
# We don't know which recipients were rejected
|
|
self.assertEqual(status.recipients["invalid@localhost"].status, "unknown")
|
|
self.assertEqual(status.recipients["valid@example.com"].status, "unknown")
|
|
self.assertEqual(status.recipients["reject@example.com"].status, "unknown")
|
|
self.assertEqual(status.recipients["also.valid@example.com"].status, "unknown")
|
|
|
|
@override_settings(ANYMAIL_IGNORE_RECIPIENT_STATUS=True)
|
|
def test_settings_override(self):
|
|
"""No exception with ignore setting"""
|
|
self.set_mock_result(accepted=0, rejected=2)
|
|
sent = mail.send_mail(
|
|
"Subject",
|
|
"Body",
|
|
"from@example.com",
|
|
["invalid@localhost", "reject@example.com"],
|
|
)
|
|
self.assertEqual(sent, 1) # refused message is included in sent count
|
|
|
|
|
|
@tag("sparkpost")
|
|
class SparkPostBackendConfigurationTests(SparkPostBackendMockAPITestCase):
|
|
"""Test various SparkPost client options"""
|
|
|
|
@override_settings(
|
|
# clear SPARKPOST_API_KEY from SparkPostBackendMockAPITestCase:
|
|
ANYMAIL={}
|
|
)
|
|
def test_missing_api_key(self):
|
|
with self.assertRaises(AnymailConfigurationError) as cm:
|
|
mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"])
|
|
errmsg = str(cm.exception)
|
|
# Make sure the error mentions the different places to set the key
|
|
self.assertRegex(errmsg, r"\bSPARKPOST_API_KEY\b")
|
|
self.assertRegex(errmsg, r"\bANYMAIL_SPARKPOST_API_KEY\b")
|
|
|
|
@override_settings(
|
|
ANYMAIL={
|
|
"SPARKPOST_API_URL": "https://api.eu.sparkpost.com/api/v1",
|
|
"SPARKPOST_API_KEY": "test_api_key",
|
|
}
|
|
)
|
|
def test_sparkpost_api_url(self):
|
|
mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"])
|
|
self.assert_esp_called("https://api.eu.sparkpost.com/api/v1/transmissions/")
|
|
|
|
# can also override on individual connection
|
|
# (and even use non-versioned labs endpoint)
|
|
connection = mail.get_connection(api_url="https://api.sparkpost.com/api/labs")
|
|
mail.send_mail(
|
|
"Subject",
|
|
"Message",
|
|
"from@example.com",
|
|
["to@example.com"],
|
|
connection=connection,
|
|
)
|
|
self.assert_esp_called("https://api.sparkpost.com/api/labs/transmissions/")
|
|
|
|
def test_subaccount(self):
|
|
# A likely use case is supplying subaccount for a particular message.
|
|
# (For all messages, just set SPARKPOST_SUBACCOUNT in ANYMAIL settings.)
|
|
connection = mail.get_connection(subaccount=123)
|
|
mail.send_mail(
|
|
"Subject",
|
|
"Message",
|
|
"from@example.com",
|
|
["to@example.com"],
|
|
connection=connection,
|
|
)
|
|
headers = self.get_api_call_headers()
|
|
self.assertEqual(headers["X-MSYS-SUBACCOUNT"], 123)
|
|
|
|
# Make sure we're not setting the header on non-subaccount sends
|
|
mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"])
|
|
headers = self.get_api_call_headers()
|
|
self.assertNotIn("X-MSYS-SUBACCOUNT", headers)
|