import json from datetime import date, datetime, timezone from decimal import Decimal from email.mime.base import MIMEBase from email.mime.image import MIMEImage from email.mime.text import MIMEText from django.core import mail from django.test import override_settings, tag from django.utils.timezone import ( get_fixed_timezone, override as override_current_timezone, ) from anymail.exceptions import ( AnymailAPIError, AnymailConfigurationError, AnymailRecipientsRefused, AnymailSerializationError, AnymailUnsupportedFeature, ) from anymail.message import attach_inline_image_file from .mock_requests_backend import RequestsBackendMockAPITestCase from .utils import ( SAMPLE_IMAGE_FILENAME, decode_att, sample_image_content, sample_image_path, ) @tag("sparkpost") @override_settings( EMAIL_BACKEND="anymail.backends.sparkpost.EmailBackend", ANYMAIL={"SPARKPOST_API_KEY": "test_api_key"}, ) class SparkPostBackendMockAPITestCase(RequestsBackendMockAPITestCase): """TestCase that uses SparkPostEmailBackend with a mocked transmissions.send API""" DEFAULT_RAW_RESPONSE = b"""{ "results": { "id": "12345678901234567890", "total_accepted_recipients": 1, "total_rejected_recipients": 0 } }""" def setUp(self): super().setUp() # Simple message useful for many tests self.message = mail.EmailMultiAlternatives( "Subject", "Text Body", "from@example.com", ["to@example.com"] ) def set_mock_result(self, accepted=1, rejected=0, id="12345678901234567890"): """Set a mock response that reflects count of accepted/rejected recipients""" raw = json.dumps( { "results": { "id": id, "total_accepted_recipients": accepted, "total_rejected_recipients": rejected, } } ).encode("utf-8") self.set_mock_response(raw=raw) return raw @tag("sparkpost") class SparkPostBackendStandardEmailTests(SparkPostBackendMockAPITestCase): """Test backend support for Django standard email features""" def test_send_mail(self): """Test basic API for simple send""" mail.send_mail( "Subject here", "Here is the message.", "from@example.com", ["to@example.com"], fail_silently=False, ) self.assert_esp_called("/api/v1/transmissions/") headers = self.get_api_call_headers() self.assertEqual("test_api_key", headers["Authorization"]) data = self.get_api_call_json() self.assertEqual(data["content"]["subject"], "Subject here") self.assertEqual(data["content"]["text"], "Here is the message.") self.assertEqual(data["content"]["from"], "from@example.com") self.assertEqual( data["recipients"], [{"address": {"email": "to@example.com", "header_to": "to@example.com"}}], ) def test_name_addr(self): """Make sure RFC2822 name-addr format (with display-name) is allowed (Test both sender and recipient addresses) """ self.set_mock_result(accepted=6) msg = mail.EmailMessage( "Subject", "Message", "From Name ", ["Recipient #1 ", "to2@example.com"], cc=["Carbon Copy ", "cc2@example.com"], bcc=["Blind Copy ", "bcc2@example.com"], ) msg.send() data = self.get_api_call_json() self.assertEqual(data["content"]["from"], "From Name ") # This also checks recipient generation for cc and bcc. Because it's *not* a # batch send, each recipient should see a To header reflecting all To addresses. self.assertCountEqual( data["recipients"], [ { "address": { "email": "to1@example.com", "header_to": "Recipient #1 , to2@example.com", } }, { "address": { "email": "to2@example.com", "header_to": "Recipient #1 , to2@example.com", } }, # cc and bcc must be explicitly specified as recipients { "address": { "email": "cc1@example.com", "header_to": "Recipient #1 , to2@example.com", } }, { "address": { "email": "cc2@example.com", "header_to": "Recipient #1 , to2@example.com", } }, { "address": { "email": "bcc1@example.com", "header_to": "Recipient #1 , to2@example.com", } }, { "address": { "email": "bcc2@example.com", "header_to": "Recipient #1 , to2@example.com", } }, ], ) # Make sure we added a formatted Cc header visible to recipients # (and not a Bcc header) self.assertEqual( data["content"]["headers"], {"Cc": "Carbon Copy , cc2@example.com"}, ) def test_custom_headers(self): self.set_mock_result(accepted=6) email = mail.EmailMessage( "Subject", "Body goes here", "from@example.com", ["to1@example.com"], cc=["cc1@example.com"], headers={ "Reply-To": "another@example.com", "X-MyHeader": "my value", "Message-ID": "mycustommsgid@example.com", }, ) email.send() data = self.get_api_call_json() self.assertEqual( data["content"]["headers"], { # Reply-To moved to separate param (below) "X-MyHeader": "my value", "Message-ID": "mycustommsgid@example.com", "Cc": "cc1@example.com", # Cc header added }, ) self.assertEqual(data["content"]["reply_to"], "another@example.com") def test_html_message(self): text_content = "This is an important message." html_content = "

This is an important message.

" email = mail.EmailMultiAlternatives( "Subject", text_content, "from@example.com", ["to@example.com"] ) email.attach_alternative(html_content, "text/html") email.send() data = self.get_api_call_json() self.assertEqual(data["content"]["text"], text_content) self.assertEqual(data["content"]["html"], html_content) # Don't accidentally send the html part as an attachment: self.assertNotIn("attachments", data["content"]) def test_html_only_message(self): html_content = "

This is an important message.

" email = mail.EmailMessage( "Subject", html_content, "from@example.com", ["to@example.com"] ) email.content_subtype = "html" # Main content is now text/html email.send() data = self.get_api_call_json() self.assertNotIn("text", data["content"]) self.assertEqual(data["content"]["html"], html_content) def test_reply_to(self): email = mail.EmailMessage( "Subject", "Body goes here", "from@example.com", ["to1@example.com"], reply_to=["reply@example.com", "Other "], headers={"X-Other": "Keep"}, ) email.send() data = self.get_api_call_json() self.assertEqual( data["content"]["reply_to"], "reply@example.com, Other " ) # don't lose other headers: self.assertEqual(data["content"]["headers"], {"X-Other": "Keep"}) def test_attachments(self): text_content = "* Item one\n* Item two\n* Item three" self.message.attach( filename="test.txt", content=text_content, mimetype="text/plain" ) # Should guess mimetype if not provided... png_content = b"PNG\xb4 pretend this is the contents of a png file" self.message.attach(filename="test.png", content=png_content) # Should work with a MIMEBase object (also tests no filename)... pdf_content = b"PDF\xb4 pretend this is valid pdf params" mimeattachment = MIMEBase("application", "pdf") mimeattachment.set_payload(pdf_content) self.message.attach(mimeattachment) self.message.send() data = self.get_api_call_json() attachments = data["content"]["attachments"] self.assertEqual(len(attachments), 3) self.assertEqual(attachments[0]["type"], "text/plain") self.assertEqual(attachments[0]["name"], "test.txt") self.assertEqual( decode_att(attachments[0]["data"]).decode("ascii"), text_content ) self.assertEqual(attachments[1]["type"], "image/png") # inferred from filename self.assertEqual(attachments[1]["name"], "test.png") self.assertEqual(decode_att(attachments[1]["data"]), png_content) self.assertEqual(attachments[2]["type"], "application/pdf") self.assertEqual(attachments[2]["name"], "") # none self.assertEqual(decode_att(attachments[2]["data"]), pdf_content) # Make sure the image attachment is not treated as embedded: self.assertNotIn("inline_images", data["content"]) def test_unicode_attachment_correctly_decoded(self): # Slight modification from the Django unicode docs: # http://django.readthedocs.org/en/latest/ref/unicode.html#email self.message.attach( "Une pièce jointe.html", "

\u2019

", mimetype="text/html" ) self.message.send() data = self.get_api_call_json() attachments = data["content"]["attachments"] self.assertEqual(len(attachments), 1) def test_attachment_charset(self): # SparkPost allows charset param in attachment type self.message.attach(MIMEText("Une pièce jointe", "plain", "iso8859-1")) self.message.send() data = self.get_api_call_json() attachment = data["content"]["attachments"][0] self.assertEqual(attachment["type"], 'text/plain; charset="iso8859-1"') self.assertEqual( decode_att(attachment["data"]), "Une pièce jointe".encode("iso8859-1") ) def test_embedded_images(self): image_filename = SAMPLE_IMAGE_FILENAME image_path = sample_image_path(image_filename) image_data = sample_image_content(image_filename) cid = attach_inline_image_file(self.message, image_path) html_content = ( '

This has an inline image.

' % cid ) self.message.attach_alternative(html_content, "text/html") self.message.send() data = self.get_api_call_json() self.assertEqual(data["content"]["html"], html_content) self.assertEqual(len(data["content"]["inline_images"]), 1) self.assertEqual(data["content"]["inline_images"][0]["type"], "image/png") self.assertEqual(data["content"]["inline_images"][0]["name"], cid) self.assertEqual( decode_att(data["content"]["inline_images"][0]["data"]), image_data ) # Make sure neither the html nor the inline image is treated as an attachment: self.assertNotIn("attachments", data["content"]) def test_attached_images(self): image_filename = SAMPLE_IMAGE_FILENAME image_path = sample_image_path(image_filename) image_data = sample_image_content(image_filename) # option 1: attach as a file self.message.attach_file(image_path) # option 2: construct the MIMEImage and attach it directly image = MIMEImage(image_data) self.message.attach(image) self.message.send() data = self.get_api_call_json() attachments = data["content"]["attachments"] self.assertEqual(len(attachments), 2) self.assertEqual(attachments[0]["type"], "image/png") self.assertEqual(attachments[0]["name"], image_filename) self.assertEqual(decode_att(attachments[0]["data"]), image_data) self.assertEqual(attachments[1]["type"], "image/png") self.assertEqual(attachments[1]["name"], "") # unknown -- not attached as file self.assertEqual(decode_att(attachments[1]["data"]), image_data) # Make sure the image attachments are not treated as embedded: self.assertNotIn("inline_images", data["content"]) def test_multiple_html_alternatives(self): # Multiple text/html alternatives not allowed self.message.attach_alternative("

First html is OK

", "text/html") self.message.attach_alternative("

But not second html

", "text/html") with self.assertRaises(AnymailUnsupportedFeature): self.message.send() def test_amp_html_alternative(self): # SparkPost *does* support text/x-amp-html alongside text/html self.message.attach_alternative("

HTML

", "text/html") self.message.attach_alternative("

And AMP HTML

", "text/x-amp-html") self.message.send() data = self.get_api_call_json() self.assertEqual(data["content"]["html"], "

HTML

") self.assertEqual(data["content"]["amp_html"], "

And AMP HTML

") def test_html_alternative(self): # Only html alternatives allowed self.message.attach_alternative("{'not': 'allowed'}", "application/json") with self.assertRaises(AnymailUnsupportedFeature): self.message.send() def test_alternatives_fail_silently(self): # Make sure fail_silently is respected self.message.attach_alternative("{'not': 'allowed'}", "application/json") sent = self.message.send(fail_silently=True) self.assert_esp_not_called("API should not be called when send fails silently") self.assertEqual(sent, 0) def test_suppress_empty_address_lists(self): """Empty to, cc, bcc, and reply_to shouldn't generate empty headers""" self.message.send() data = self.get_api_call_json() self.assertNotIn("headers", data["content"]) # No Cc, Bcc or Reply-To header self.assertNotIn("reply_to", data["content"]) def test_empty_to(self): # Test empty `to`--but send requires at least one recipient somewhere (like cc) self.message.to = [] self.message.cc = ["cc@example.com"] self.message.send() data = self.get_api_call_json() self.assertEqual( data["recipients"], [ { "address": { "email": "cc@example.com", # This results in a message with an empty To header, as desired: "header_to": "", }, } ], ) def test_api_failure(self): self.set_mock_response(status_code=400) with self.assertRaisesMessage(AnymailAPIError, "SparkPost API response 400"): self.message.send() def test_api_failure_fail_silently(self): # Make sure fail_silently is respected self.set_mock_response(status_code=400) sent = self.message.send(fail_silently=True) self.assertEqual(sent, 0) def test_api_error_includes_details(self): """AnymailAPIError should include ESP's error message""" failure_response = b"""{ "errors": [{ "message": "Helpful explanation from your ESP" }] }""" self.set_mock_response(status_code=400, raw=failure_response) with self.assertRaisesMessage( AnymailAPIError, "Helpful explanation from your ESP" ): self.message.send() @tag("sparkpost") class SparkPostBackendAnymailFeatureTests(SparkPostBackendMockAPITestCase): """Test backend support for Anymail added features""" def test_envelope_sender(self): self.message.envelope_sender = "bounce-handler@bounces.example.com" self.message.send() data = self.get_api_call_json() self.assertEqual(data["return_path"], "bounce-handler@bounces.example.com") def test_metadata(self): self.message.metadata = {"user_id": "12345", "items": "spark, post"} self.message.send() data = self.get_api_call_json() self.assertEqual(data["metadata"], {"user_id": "12345", "items": "spark, post"}) def test_send_at(self): utc_plus_6 = get_fixed_timezone(6 * 60) utc_minus_8 = get_fixed_timezone(-8 * 60) # SparkPost expects ISO-8601 YYYY-MM-DDTHH:MM:SS+-HH:MM with override_current_timezone(utc_plus_6): # Timezone-aware datetime converted to UTC: self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=utc_minus_8) self.message.send() data = self.get_api_call_json() self.assertEqual(data["options"]["start_time"], "2016-03-04T05:06:07-08:00") # Explicit UTC: self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=timezone.utc) self.message.send() data = self.get_api_call_json() self.assertEqual(data["options"]["start_time"], "2016-03-04T05:06:07+00:00") # Timezone-naive datetime assumed to be Django current_timezone # (also checks stripping microseconds) self.message.send_at = datetime(2022, 10, 11, 12, 13, 14, 567) self.message.send() data = self.get_api_call_json() self.assertEqual(data["options"]["start_time"], "2022-10-11T12:13:14+06:00") # Date-only treated as midnight in current timezone self.message.send_at = date(2022, 10, 22) self.message.send() data = self.get_api_call_json() self.assertEqual(data["options"]["start_time"], "2022-10-22T00:00:00+06:00") # POSIX timestamp self.message.send_at = 1651820889 # 2022-05-06 07:08:09 UTC self.message.send() data = self.get_api_call_json() self.assertEqual(data["options"]["start_time"], "2022-05-06T07:08:09+00:00") # String passed unchanged (this is *not* portable between ESPs) self.message.send_at = "2022-10-13T18:02:00-11:30" self.message.send() data = self.get_api_call_json() self.assertEqual(data["options"]["start_time"], "2022-10-13T18:02:00-11:30") def test_tags(self): self.message.tags = ["receipt"] self.message.send() data = self.get_api_call_json() self.assertEqual(data["campaign_id"], "receipt") self.message.tags = ["receipt", "repeat-user"] with self.assertRaisesMessage(AnymailUnsupportedFeature, "multiple tags"): self.message.send() def test_tracking(self): # Test one way... self.message.track_opens = True self.message.track_clicks = False self.message.send() data = self.get_api_call_json() self.assertEqual(data["options"]["open_tracking"], True) self.assertEqual(data["options"]["click_tracking"], False) # ...and the opposite way self.message.track_opens = False self.message.track_clicks = True self.message.send() data = self.get_api_call_json() self.assertEqual(data["options"]["open_tracking"], False) self.assertEqual(data["options"]["click_tracking"], True) def test_template_id(self): message = mail.EmailMultiAlternatives( from_email="from@example.com", to=["to@example.com"] ) message.template_id = "welcome_template" message.send() data = self.get_api_call_json() self.assertEqual(data["content"]["template_id"], "welcome_template") # SparkPost disallows all content (even empty strings) with stored template: self.assertNotIn("subject", data["content"]) self.assertNotIn("text", data["content"]) self.assertNotIn("html", data["content"]) def test_merge_data(self): self.set_mock_result(accepted=4) # two 'to' plus one 'cc' for each 'to' self.message.to = ["alice@example.com", "Bob "] self.message.cc = ["cc@example.com"] self.message.body = "Hi {{address.name}}. Welcome to {{group}} at {{site}}." self.message.merge_data = { "alice@example.com": {"name": "Alice", "group": "Developers"}, "bob@example.com": {"name": "Bob"}, # and leave group undefined "nobody@example.com": {"name": "Not a recipient for this message"}, } self.message.merge_global_data = {"group": "Users", "site": "ExampleCo"} self.message.send() data = self.get_api_call_json() self.assertEqual( {"group": "Users", "site": "ExampleCo"}, data["substitution_data"] ) self.assertEqual( [ { "address": { "email": "alice@example.com", "header_to": "alice@example.com", }, "substitution_data": {"name": "Alice", "group": "Developers"}, }, { "address": { "email": "bob@example.com", "header_to": "Bob ", }, "substitution_data": {"name": "Bob"}, }, { # duplicated for cc recipients... "address": { "email": "cc@example.com", "header_to": "alice@example.com", }, "substitution_data": {"name": "Alice", "group": "Developers"}, }, { "address": { "email": "cc@example.com", "header_to": "Bob ", }, "substitution_data": {"name": "Bob"}, }, ], data["recipients"], ) def test_merge_metadata(self): self.set_mock_result(accepted=2) self.message.to = ["alice@example.com", "Bob "] self.message.merge_metadata = { "alice@example.com": {"order_id": 123}, "bob@example.com": {"order_id": 678, "tier": "premium"}, } self.message.metadata = {"notification_batch": "zx912"} self.message.send() data = self.get_api_call_json() self.assertEqual( [ { "address": { "email": "alice@example.com", "header_to": "alice@example.com", }, "metadata": {"order_id": 123}, }, { "address": { "email": "bob@example.com", "header_to": "Bob ", }, "metadata": {"order_id": 678, "tier": "premium"}, }, ], data["recipients"], ) self.assertEqual(data["metadata"], {"notification_batch": "zx912"}) def test_merge_headers(self): self.set_mock_result(accepted=2) self.message.to = ["alice@example.com", "Bob "] self.message.extra_headers = { "X-Custom-1": "custom 1", "X-Custom-2": "custom 2 (default)", } self.message.merge_headers = { "alice@example.com": { "X-Custom-2": "custom 2 alice", "X-Custom-3": "custom 3 alice", }, "bob@example.com": {"X-Custom-2": "custom 2 bob"}, } self.message.send() data = self.get_api_call_json() recipients = data["recipients"] self.assertEqual(len(recipients), 2) self.assertEqual(recipients[0]["address"]["email"], "alice@example.com") self.assertEqual( recipients[0]["substitution_data"], { "Header__X_Custom_2": "custom 2 alice", "Header__X_Custom_3": "custom 3 alice", }, ) self.assertEqual(recipients[1]["address"]["email"], "bob@example.com") self.assertEqual( recipients[1]["substitution_data"], { "Header__X_Custom_2": "custom 2 bob", }, ) # Indirect merge_headers through template substitutions: self.assertEqual( data["content"]["headers"], { "X-Custom-1": "custom 1", # (not a merge_header, value unchanged) "X-Custom-2": "{{Header__X_Custom_2}}", "X-Custom-3": "{{Header__X_Custom_3}}", }, ) # Defaults for merge_headers in global substitution_data: self.assertEqual( data["substitution_data"], { "Header__X_Custom_2": "custom 2 (default)", # No default specified for X-Custom-3; SparkPost will use empty string }, ) def test_default_omits_options(self): """Make sure by default we don't send any ESP-specific options. Options not specified by the caller should be omitted entirely from the API call (*not* sent as False or empty). This ensures that your ESP account settings apply by default. """ self.message.send() data = self.get_api_call_json() self.assertNotIn("campaign_id", data) self.assertNotIn("metadata", data) # covers start_time, click_tracking, open_tracking: self.assertNotIn("options", data) self.assertNotIn("substitution_data", data) self.assertNotIn("template_id", data["content"]) def test_esp_extra(self): self.message.esp_extra = { "description": "The description", "options": { "transactional": True, }, "content": { "use_draft_template": True, "ab_test_id": "highlight_support_links", }, } self.message.track_clicks = True self.message.send() data = self.get_api_call_json() self.assertEqual(data["description"], "The description") self.assertEqual( data["options"], { "transactional": True, "click_tracking": True, # deep merge }, ) self.assertDictMatches( { "use_draft_template": True, "ab_test_id": "highlight_support_links", "text": "Text Body", # deep merge "subject": "Subject", # deep merge }, data["content"], ) def test_send_attaches_anymail_status(self): """The anymail_status should be attached to the message when it is sent""" response_content = self.set_mock_result(accepted=1, rejected=0, id="9876543210") msg = mail.EmailMessage( "Subject", "Message", "from@example.com", ["to1@example.com"], ) sent = msg.send() self.assertEqual(sent, 1) self.assertEqual(msg.anymail_status.status, {"queued"}) self.assertEqual(msg.anymail_status.message_id, "9876543210") self.assertEqual( msg.anymail_status.recipients["to1@example.com"].status, "queued" ) self.assertEqual( msg.anymail_status.recipients["to1@example.com"].message_id, "9876543210" ) self.assertEqual(msg.anymail_status.esp_response.content, response_content) @override_settings( ANYMAIL_IGNORE_RECIPIENT_STATUS=True # exception is tested later ) def test_send_all_rejected(self): """The anymail_status should be 'rejected' when all recipients rejected""" self.set_mock_result(accepted=0, rejected=2) msg = mail.EmailMessage( "Subject", "Message", "from@example.com", ["to1@example.com", "to2@example.com"], ) msg.send() self.assertEqual(msg.anymail_status.status, {"rejected"}) self.assertEqual( msg.anymail_status.recipients["to1@example.com"].status, "rejected" ) self.assertEqual( msg.anymail_status.recipients["to2@example.com"].status, "rejected" ) def test_send_some_rejected(self): """ The anymail_status should be 'unknown' when some recipients accepted and some rejected """ self.set_mock_result(accepted=1, rejected=1) msg = mail.EmailMessage( "Subject", "Message", "from@example.com", ["to1@example.com", "to2@example.com"], ) msg.send() self.assertEqual(msg.anymail_status.status, {"unknown"}) self.assertEqual( msg.anymail_status.recipients["to1@example.com"].status, "unknown" ) self.assertEqual( msg.anymail_status.recipients["to2@example.com"].status, "unknown" ) def test_send_unexpected_count(self): """The anymail_status should be 'unknown' when the total result count doesn't match the number of recipients""" self.set_mock_result(accepted=3, rejected=0) # but only 2 in the to-list msg = mail.EmailMessage( "Subject", "Message", "from@example.com", ["to1@example.com", "to2@example.com"], ) msg.send() self.assertEqual(msg.anymail_status.status, {"unknown"}) self.assertEqual( msg.anymail_status.recipients["to1@example.com"].status, "unknown" ) self.assertEqual( msg.anymail_status.recipients["to2@example.com"].status, "unknown" ) # noinspection PyUnresolvedReferences def test_send_failed_anymail_status(self): """If the send fails, anymail_status should contain initial values""" self.set_mock_response(status_code=400) sent = self.message.send(fail_silently=True) self.assertEqual(sent, 0) self.assertIsNone(self.message.anymail_status.status) self.assertIsNone(self.message.anymail_status.message_id) self.assertEqual(self.message.anymail_status.recipients, {}) self.assertIsNone(self.message.anymail_status.esp_response) # noinspection PyUnresolvedReferences def test_send_unparsable_response(self): """ If the send succeeds, but result is unexpected format, should raise an API exception """ response_content = b"""{"wrong": "format"}""" self.set_mock_response(raw=response_content) with self.assertRaises(AnymailAPIError): self.message.send() self.assertIsNone(self.message.anymail_status.status) self.assertIsNone(self.message.anymail_status.message_id) self.assertEqual(self.message.anymail_status.recipients, {}) self.assertEqual( self.message.anymail_status.esp_response.content, response_content ) def test_json_serialization_errors(self): """Try to provide more information about non-json-serializable data""" self.message.tags = [Decimal("19.99")] # yeah, don't do this with self.assertRaises(AnymailSerializationError) as cm: self.message.send() print(self.get_api_call_json()) err = cm.exception self.assertIsInstance(err, TypeError) # compatibility with json.dumps # our added context: self.assertIn("Don't know how to send this data to SparkPost", str(err)) # original message: self.assertRegex(str(err), r"Decimal.*is not JSON serializable") @tag("sparkpost") class SparkPostBackendRecipientsRefusedTests(SparkPostBackendMockAPITestCase): """ Should raise AnymailRecipientsRefused when *all* recipients are rejected or invalid """ def test_recipients_refused(self): self.set_mock_result(accepted=0, rejected=2) msg = mail.EmailMessage( "Subject", "Body", "from@example.com", ["invalid@localhost", "reject@example.com"], ) with self.assertRaises(AnymailRecipientsRefused): msg.send() def test_fail_silently(self): self.set_mock_result(accepted=0, rejected=2) sent = mail.send_mail( "Subject", "Body", "from@example.com", ["invalid@localhost", "reject@example.com"], fail_silently=True, ) self.assertEqual(sent, 0) def test_mixed_response(self): """If *any* recipients are valid or queued, no exception is raised""" self.set_mock_result(accepted=2, rejected=2) msg = mail.EmailMessage( "Subject", "Body", "from@example.com", [ "invalid@localhost", "valid@example.com", "reject@example.com", "also.valid@example.com", ], ) sent = msg.send() # one message sent, successfully, to 2 of 4 recipients: self.assertEqual(sent, 1) status = msg.anymail_status # We don't know which recipients were rejected self.assertEqual(status.recipients["invalid@localhost"].status, "unknown") self.assertEqual(status.recipients["valid@example.com"].status, "unknown") self.assertEqual(status.recipients["reject@example.com"].status, "unknown") self.assertEqual(status.recipients["also.valid@example.com"].status, "unknown") @override_settings(ANYMAIL_IGNORE_RECIPIENT_STATUS=True) def test_settings_override(self): """No exception with ignore setting""" self.set_mock_result(accepted=0, rejected=2) sent = mail.send_mail( "Subject", "Body", "from@example.com", ["invalid@localhost", "reject@example.com"], ) self.assertEqual(sent, 1) # refused message is included in sent count @tag("sparkpost") class SparkPostBackendConfigurationTests(SparkPostBackendMockAPITestCase): """Test various SparkPost client options""" @override_settings( # clear SPARKPOST_API_KEY from SparkPostBackendMockAPITestCase: ANYMAIL={} ) def test_missing_api_key(self): with self.assertRaises(AnymailConfigurationError) as cm: mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"]) errmsg = str(cm.exception) # Make sure the error mentions the different places to set the key self.assertRegex(errmsg, r"\bSPARKPOST_API_KEY\b") self.assertRegex(errmsg, r"\bANYMAIL_SPARKPOST_API_KEY\b") @override_settings( ANYMAIL={ "SPARKPOST_API_URL": "https://api.eu.sparkpost.com/api/v1", "SPARKPOST_API_KEY": "test_api_key", } ) def test_sparkpost_api_url(self): mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"]) self.assert_esp_called("https://api.eu.sparkpost.com/api/v1/transmissions/") # can also override on individual connection # (and even use non-versioned labs endpoint) connection = mail.get_connection(api_url="https://api.sparkpost.com/api/labs") mail.send_mail( "Subject", "Message", "from@example.com", ["to@example.com"], connection=connection, ) self.assert_esp_called("https://api.sparkpost.com/api/labs/transmissions/") def test_subaccount(self): # A likely use case is supplying subaccount for a particular message. # (For all messages, just set SPARKPOST_SUBACCOUNT in ANYMAIL settings.) connection = mail.get_connection(subaccount=123) mail.send_mail( "Subject", "Message", "from@example.com", ["to@example.com"], connection=connection, ) headers = self.get_api_call_headers() self.assertEqual(headers["X-MSYS-SUBACCOUNT"], 123) # Make sure we're not setting the header on non-subaccount sends mail.send_mail("Subject", "Message", "from@example.com", ["to@example.com"]) headers = self.get_api_call_headers() self.assertNotIn("X-MSYS-SUBACCOUNT", headers)