mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-21 12:21:06 -05:00
SendGrid: update to new v3 send API (#50)
SendGrid: update to v3 send API **SendGrid:** **[possibly-breaking]** Update SendGrid backend to newer Web API v3. This should be a transparent change for most projects. Exceptions: if you use SendGrid username/password auth, esp_extra with "x-smtpapi", or multiple Reply-To addresses, please review the [porting notes](http://anymail.readthedocs.io/en/latest/esps/sendgrid/#sendgrid-v3-upgrade). Closes #28
This commit is contained in:
@@ -17,6 +17,7 @@ class RequestsBackendMockAPITestCase(SimpleTestCase, AnymailTestMixin):
|
||||
"""TestCase that mocks API calls through requests"""
|
||||
|
||||
DEFAULT_RAW_RESPONSE = b"""{"subclass": "should override"}"""
|
||||
DEFAULT_STATUS_CODE = 200 # most APIs use '200 OK' for success
|
||||
|
||||
class MockResponse(requests.Response):
|
||||
"""requests.request return value mock sufficient for testing"""
|
||||
@@ -33,7 +34,7 @@ class RequestsBackendMockAPITestCase(SimpleTestCase, AnymailTestMixin):
|
||||
self.addCleanup(self.patch_request.stop)
|
||||
self.set_mock_response()
|
||||
|
||||
def set_mock_response(self, status_code=200, raw=UNSET, encoding='utf-8'):
|
||||
def set_mock_response(self, status_code=DEFAULT_STATUS_CODE, raw=UNSET, encoding='utf-8'):
|
||||
if raw is UNSET:
|
||||
raw = self.DEFAULT_RAW_RESPONSE
|
||||
mock_response = self.MockResponse(status_code, raw, encoding)
|
||||
|
||||
@@ -81,12 +81,12 @@ class BackendSettingsTests(SimpleTestCase, AnymailTestMixin): # (so not
|
||||
def test_username_password_kwargs_overrides(self):
|
||||
"""Overrides for 'username' and 'password' should work like other overrides"""
|
||||
# These are special-cased because of default args in Django core mail functions.
|
||||
# (Use the SendGrid backend, which has settings named 'username' and 'password'.)
|
||||
backend = get_connection('anymail.backends.sendgrid.SendGridBackend')
|
||||
# (Use the SendGrid v2 backend, which has settings named 'username' and 'password'.)
|
||||
backend = get_connection('anymail.backends.sendgrid_v2.EmailBackend')
|
||||
self.assertEqual(backend.username, 'username_from_settings')
|
||||
self.assertEqual(backend.password, 'password_from_settings')
|
||||
|
||||
backend = get_connection('anymail.backends.sendgrid.SendGridBackend',
|
||||
backend = get_connection('anymail.backends.sendgrid_v2.EmailBackend',
|
||||
username='username_from_kwargs', password='password_from_kwargs')
|
||||
self.assertEqual(backend.username, 'username_from_kwargs')
|
||||
self.assertEqual(backend.password, 'password_from_kwargs')
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import json
|
||||
from base64 import b64encode, b64decode
|
||||
from calendar import timegm
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
@@ -8,12 +8,12 @@ from email.mime.base import MIMEBase
|
||||
from email.mime.image import MIMEImage
|
||||
|
||||
from django.core import mail
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.test import SimpleTestCase
|
||||
from django.test.utils import override_settings
|
||||
from django.utils.timezone import get_fixed_timezone, override as override_current_timezone
|
||||
|
||||
from anymail.exceptions import AnymailAPIError, AnymailSerializationError, AnymailUnsupportedFeature, AnymailWarning
|
||||
from anymail.exceptions import (AnymailAPIError, AnymailConfigurationError, AnymailSerializationError,
|
||||
AnymailUnsupportedFeature, AnymailWarning)
|
||||
from anymail.message import attach_inline_image_file
|
||||
|
||||
from .mock_requests_backend import RequestsBackendMockAPITestCase, SessionSharingTestCasesMixin
|
||||
@@ -23,20 +23,14 @@ from .utils import sample_image_content, sample_image_path, SAMPLE_IMAGE_FILENAM
|
||||
@override_settings(EMAIL_BACKEND='anymail.backends.sendgrid.SendGridBackend',
|
||||
ANYMAIL={'SENDGRID_API_KEY': 'test_api_key'})
|
||||
class SendGridBackendMockAPITestCase(RequestsBackendMockAPITestCase):
|
||||
DEFAULT_RAW_RESPONSE = b"""{
|
||||
"message": "success"
|
||||
}"""
|
||||
DEFAULT_RAW_RESPONSE = b"" # SendGrid v3 success responses are empty
|
||||
DEFAULT_STATUS_CODE = 202 # SendGrid v3 uses '202 Accepted' for success (in most cases)
|
||||
|
||||
def setUp(self):
|
||||
super(SendGridBackendMockAPITestCase, self).setUp()
|
||||
# Simple message useful for many tests
|
||||
self.message = mail.EmailMultiAlternatives('Subject', 'Text Body', 'from@example.com', ['to@example.com'])
|
||||
|
||||
def get_smtpapi(self):
|
||||
"""Returns the x-smtpapi data passed to the mock requests call"""
|
||||
data = self.get_api_call_data()
|
||||
return json.loads(data["x-smtpapi"])
|
||||
|
||||
|
||||
class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
"""Test backend support for Django standard email features"""
|
||||
@@ -45,39 +39,22 @@ class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
"""Test basic API for simple send"""
|
||||
mail.send_mail('Subject here', 'Here is the message.',
|
||||
'from@sender.example.com', ['to@example.com'], fail_silently=False)
|
||||
self.assert_esp_called('/api/mail.send.json')
|
||||
self.assert_esp_called('https://api.sendgrid.com/v3/mail/send')
|
||||
http_headers = self.get_api_call_headers()
|
||||
self.assertEqual(http_headers["Authorization"], "Bearer test_api_key")
|
||||
self.assertEqual(http_headers["Content-Type"], "application/json")
|
||||
|
||||
query = self.get_api_call_params(required=False)
|
||||
if query:
|
||||
self.assertNotIn('api_user', query)
|
||||
self.assertNotIn('api_key', query)
|
||||
|
||||
data = self.get_api_call_data()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['subject'], "Subject here")
|
||||
self.assertEqual(data['text'], "Here is the message.")
|
||||
self.assertEqual(data['from'], "from@sender.example.com")
|
||||
self.assertEqual(data['to'], ["to@example.com"])
|
||||
self.assertEqual(data['content'], [{'type': "text/plain", 'value': "Here is the message."}])
|
||||
self.assertEqual(data['from'], {'email': "from@sender.example.com"})
|
||||
self.assertEqual(data['personalizations'], [{
|
||||
'to': [{'email': "to@example.com"}],
|
||||
}])
|
||||
# make sure backend assigned a Message-ID for event tracking
|
||||
email_headers = json.loads(data['headers'])
|
||||
self.assertRegex(email_headers['Message-ID'], r'\<.+@sender\.example\.com\>') # id uses from_email's domain
|
||||
# make sure we added the Message-ID to unique_args for event notification
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(email_headers['Message-ID'], smtpapi['unique_args']['smtp-id'])
|
||||
|
||||
@override_settings(ANYMAIL={'SENDGRID_USERNAME': 'sg_username', 'SENDGRID_PASSWORD': 'sg_password'})
|
||||
def test_user_pass_auth(self):
|
||||
"""Make sure alternative USERNAME/PASSWORD auth works"""
|
||||
mail.send_mail('Subject here', 'Here is the message.',
|
||||
'from@sender.example.com', ['to@example.com'], fail_silently=False)
|
||||
self.assert_esp_called('/api/mail.send.json')
|
||||
query = self.get_api_call_params()
|
||||
self.assertEqual(query['api_user'], 'sg_username')
|
||||
self.assertEqual(query['api_key'], 'sg_password')
|
||||
http_headers = self.get_api_call_headers(required=False)
|
||||
if http_headers:
|
||||
self.assertNotIn('Authorization', http_headers)
|
||||
self.assertRegex(data['headers']['Message-ID'], r'\<.+@sender\.example\.com\>') # id uses from_email's domain
|
||||
# make sure we added the Message-ID to custom_args for event notification
|
||||
self.assertEqual(data['headers']['Message-ID'], data['custom_args']['smtp-id'])
|
||||
|
||||
def test_name_addr(self):
|
||||
"""Make sure RFC2822 name-addr format (with display-name) is allowed
|
||||
@@ -90,15 +67,24 @@ class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
cc=['Carbon Copy <cc1@example.com>', 'cc2@example.com'],
|
||||
bcc=['Blind Copy <bcc1@example.com>', 'bcc2@example.com'])
|
||||
msg.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['from'], "from@example.com")
|
||||
self.assertEqual(data['fromname'], "From Name")
|
||||
self.assertEqual(data['to'], ['to1@example.com', 'to2@example.com'])
|
||||
self.assertEqual(data['toname'], ['Recipient #1', ' ']) # note space -- SendGrid balks on ''
|
||||
self.assertEqual(data['cc'], ['cc1@example.com', 'cc2@example.com'])
|
||||
self.assertEqual(data['ccname'], ['Carbon Copy', ' '])
|
||||
self.assertEqual(data['bcc'], ['bcc1@example.com', 'bcc2@example.com'])
|
||||
self.assertEqual(data['bccname'], ['Blind Copy', ' '])
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['from'], {'email': "from@example.com", 'name': "From Name"})
|
||||
|
||||
# single message (single "personalization") sent to all those recipients
|
||||
# (note workaround for SendGrid v3 API bug quoting display-name in personalizations)
|
||||
self.assertEqual(len(data['personalizations']), 1)
|
||||
self.assertEqual(data['personalizations'][0]['to'], [
|
||||
{'name': '"Recipient #1"', 'email': 'to1@example.com'},
|
||||
{'email': 'to2@example.com'}
|
||||
])
|
||||
self.assertEqual(data['personalizations'][0]['cc'], [
|
||||
{'name': '"Carbon Copy"', 'email': 'cc1@example.com'},
|
||||
{'email': 'cc2@example.com'}
|
||||
])
|
||||
self.assertEqual(data['personalizations'][0]['bcc'], [
|
||||
{'name': '"Blind Copy"', 'email': 'bcc1@example.com'},
|
||||
{'email': 'bcc2@example.com'}
|
||||
])
|
||||
|
||||
def test_email_message(self):
|
||||
email = mail.EmailMessage(
|
||||
@@ -110,24 +96,27 @@ class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
'X-MyHeader': 'my value',
|
||||
'Message-ID': '<mycustommsgid@sales.example.com>'}) # should override backend msgid
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['personalizations'], [{
|
||||
'to': [{'email': "to1@example.com"},
|
||||
{'email': "to2@example.com", 'name': '"Also To"'}],
|
||||
'cc': [{'email': "cc1@example.com"},
|
||||
{'email': "cc2@example.com", 'name': '"Also CC"'}],
|
||||
'bcc': [{'email': "bcc1@example.com"},
|
||||
{'email': "bcc2@example.com", 'name': '"Also BCC"'}],
|
||||
}])
|
||||
|
||||
self.assertEqual(data['from'], {'email': "from@example.com"})
|
||||
self.assertEqual(data['subject'], "Subject")
|
||||
self.assertEqual(data['text'], "Body goes here")
|
||||
self.assertEqual(data['from'], "from@example.com")
|
||||
self.assertEqual(data['to'], ['to1@example.com', 'to2@example.com'])
|
||||
self.assertEqual(data['toname'], [' ', 'Also To'])
|
||||
self.assertEqual(data['bcc'], ['bcc1@example.com', 'bcc2@example.com'])
|
||||
self.assertEqual(data['bccname'], [' ', 'Also BCC'])
|
||||
self.assertEqual(data['cc'], ['cc1@example.com', 'cc2@example.com'])
|
||||
self.assertEqual(data['ccname'], [' ', 'Also CC'])
|
||||
self.assertJSONEqual(data['headers'], {
|
||||
'Message-ID': '<mycustommsgid@sales.example.com>',
|
||||
'Reply-To': 'another@example.com',
|
||||
'X-MyHeader': 'my value',
|
||||
self.assertEqual(data['content'], [{'type': "text/plain", 'value': "Body goes here"}])
|
||||
self.assertEqual(data['reply_to'], {'email': "another@example.com"})
|
||||
self.assertEqual(data['headers'], {
|
||||
'X-MyHeader': "my value",
|
||||
'Message-ID': "<mycustommsgid@sales.example.com>",
|
||||
})
|
||||
# make sure custom Message-ID also added to unique_args
|
||||
self.assertJSONEqual(data['x-smtpapi'], {
|
||||
'unique_args': {'smtp-id': '<mycustommsgid@sales.example.com>'}
|
||||
# make sure custom Message-ID also added to custom_args
|
||||
self.assertEqual(data['custom_args'], {
|
||||
'smtp-id': "<mycustommsgid@sales.example.com>",
|
||||
})
|
||||
|
||||
def test_html_message(self):
|
||||
@@ -137,29 +126,33 @@ class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
'from@example.com', ['to@example.com'])
|
||||
email.attach_alternative(html_content, "text/html")
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['text'], text_content)
|
||||
self.assertEqual(data['html'], html_content)
|
||||
data = self.get_api_call_json()
|
||||
# SendGrid requires content in text, html order:
|
||||
self.assertEqual(len(data['content']), 2)
|
||||
self.assertEqual(data['content'][0], {'type': "text/plain", 'value': text_content})
|
||||
self.assertEqual(data['content'][1], {'type': "text/html", 'value': html_content})
|
||||
# Don't accidentally send the html part as an attachment:
|
||||
files = self.get_api_call_files(required=False)
|
||||
self.assertIsNone(files)
|
||||
self.assertNotIn('attachments', data)
|
||||
|
||||
def test_html_only_message(self):
|
||||
html_content = '<p>This is an <strong>important</strong> message.</p>'
|
||||
email = mail.EmailMessage('Subject', html_content, 'from@example.com', ['to@example.com'])
|
||||
email.content_subtype = "html" # Main content is now text/html
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('text', data)
|
||||
self.assertEqual(data['html'], html_content)
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(len(data['content']), 1)
|
||||
self.assertEqual(data['content'][0], {'type': "text/html", 'value': html_content})
|
||||
|
||||
def test_extra_headers(self):
|
||||
self.message.extra_headers = {'X-Custom': 'string', 'X-Num': 123}
|
||||
self.message.extra_headers = {'X-Custom': 'string', 'X-Num': 123,
|
||||
'Reply-To': '"Do Not Reply" <noreply@example.com>'}
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
headers = json.loads(data['headers'])
|
||||
self.assertEqual(headers['X-Custom'], 'string')
|
||||
self.assertEqual(headers['X-Num'], '123') # number converted to string (per SendGrid requirement)
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['headers']['X-Custom'], 'string')
|
||||
self.assertEqual(data['headers']['X-Num'], '123') # converted to string (undoc'd SendGrid requirement)
|
||||
# Reply-To must be moved to separate param
|
||||
self.assertNotIn('Reply-To', data['headers'])
|
||||
self.assertEqual(data['reply_to'], {'name': "Do Not Reply", 'email': "noreply@example.com"})
|
||||
|
||||
def test_extra_headers_serialization_error(self):
|
||||
self.message.extra_headers = {'X-Custom': Decimal(12.5)}
|
||||
@@ -167,15 +160,24 @@ class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
self.message.send()
|
||||
|
||||
def test_reply_to(self):
|
||||
email = mail.EmailMessage('Subject', 'Body goes here', 'from@example.com', ['to1@example.com'],
|
||||
reply_to=['reply@example.com', 'Other <reply2@example.com>'],
|
||||
headers={'X-Other': 'Keep'})
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('replyto', data) # don't use SendGrid's replyto (it's broken); just use headers
|
||||
headers = json.loads(data['headers'])
|
||||
self.assertEqual(headers['Reply-To'], 'reply@example.com, Other <reply2@example.com>')
|
||||
self.assertEqual(headers['X-Other'], 'Keep') # don't lose other headers
|
||||
self.message.reply_to = ['"Reply recipient" <reply@example.com']
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['reply_to'], {'name': "Reply recipient", 'email': "reply@example.com"})
|
||||
|
||||
def test_multiple_reply_to(self):
|
||||
# SendGrid v3 prohibits Reply-To in custom headers, and only allows a single reply address
|
||||
self.message.reply_to = ['"Reply recipient" <reply@example.com', 'reply2@example.com']
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
|
||||
@override_settings(ANYMAIL_IGNORE_UNSUPPORTED_FEATURES=True)
|
||||
def test_multiple_reply_to_ignore_unsupported(self):
|
||||
# Should use first Reply-To if ignoring unsupported features
|
||||
self.message.reply_to = ['"Reply recipient" <reply@example.com', 'reply2@example.com']
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['reply_to'], {'name': "Reply recipient", 'email': "reply@example.com"})
|
||||
|
||||
def test_attachments(self):
|
||||
text_content = "* Item one\n* Item two\n* Item three"
|
||||
@@ -192,37 +194,29 @@ class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
self.message.attach(mimeattachment)
|
||||
|
||||
self.message.send()
|
||||
files = self.get_api_call_files()
|
||||
self.assertEqual(files, {
|
||||
'files[test.txt]': ('test.txt', text_content, 'text/plain'),
|
||||
'files[test.png]': ('test.png', png_content, 'image/png'), # type inferred from filename
|
||||
'files[]': ('', pdf_content, 'application/pdf'), # no filename
|
||||
})
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(len(data['attachments']), 3)
|
||||
|
||||
def test_attachment_name_conflicts(self):
|
||||
# It's not clear how to (or whether) supply multiple attachments with
|
||||
# the same name to SendGrid's API. Anymail treats this case as unsupported.
|
||||
self.message.attach('foo.txt', 'content', 'text/plain')
|
||||
self.message.attach('bar.txt', 'content', 'text/plain')
|
||||
self.message.attach('foo.txt', 'different content', 'text/plain')
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature,
|
||||
"multiple attachments with the same filename") as cm:
|
||||
self.message.send()
|
||||
self.assertIn('foo.txt', str(cm.exception)) # say which filename
|
||||
|
||||
def test_unnamed_attachment_conflicts(self):
|
||||
# Same as previous test, but with None/empty filenames
|
||||
self.message.attach(None, 'content', 'text/plain')
|
||||
self.message.attach('', 'different content', 'text/plain')
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, "multiple unnamed attachments"):
|
||||
self.message.send()
|
||||
attachments = data['attachments']
|
||||
self.assertEqual(attachments[0], {
|
||||
'filename': "test.txt",
|
||||
'content': b64encode(text_content.encode('utf-8')).decode('ascii'),
|
||||
'type': "text/plain"})
|
||||
self.assertEqual(attachments[1], {
|
||||
'filename': "test.png",
|
||||
'content': b64encode(png_content).decode('ascii'),
|
||||
'type': "image/png"}) # type inferred from filename
|
||||
self.assertEqual(attachments[2], {
|
||||
'filename': "", # no filename -- but param is required
|
||||
'content': b64encode(pdf_content).decode('ascii'),
|
||||
'type': "application/pdf"})
|
||||
|
||||
def test_unicode_attachment_correctly_decoded(self):
|
||||
self.message.attach(u"Une pièce jointe.html", u'<p>\u2019</p>', mimetype='text/html')
|
||||
self.message.send()
|
||||
files = self.get_api_call_files()
|
||||
self.assertEqual(files[u'files[Une pièce jointe.html]'],
|
||||
(u'Une pièce jointe.html', u'<p>\u2019</p>', 'text/html'))
|
||||
attachment = self.get_api_call_json()['attachments'][0]
|
||||
self.assertEqual(attachment['filename'], u'Une pièce jointe.html')
|
||||
self.assertEqual(b64decode(attachment['content']).decode('utf-8'), u'<p>\u2019</p>')
|
||||
|
||||
def test_embedded_images(self):
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
@@ -234,14 +228,15 @@ class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
self.message.attach_alternative(html_content, "text/html")
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['html'], html_content)
|
||||
data = self.get_api_call_json()
|
||||
|
||||
files = self.get_api_call_files()
|
||||
self.assertEqual(files, {
|
||||
'files[%s]' % image_filename: (image_filename, image_data, "image/png"),
|
||||
self.assertEqual(data['attachments'][0], {
|
||||
'filename': image_filename,
|
||||
'content': b64encode(image_data).decode('ascii'),
|
||||
'type': "image/png", # type inferred from filename
|
||||
'disposition': "inline",
|
||||
'content_id': cid,
|
||||
})
|
||||
self.assertEqual(data['content[%s]' % image_filename], cid)
|
||||
|
||||
def test_attached_images(self):
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
@@ -254,50 +249,48 @@ class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
self.message.attach(image)
|
||||
|
||||
self.message.send()
|
||||
files = self.get_api_call_files()
|
||||
self.assertEqual(files, {
|
||||
'files[%s]' % image_filename: (image_filename, image_data, "image/png"), # the named one
|
||||
'files[]': ('', image_data, "image/png"), # the unnamed one
|
||||
|
||||
image_data_b64 = b64encode(image_data).decode('ascii')
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['attachments'][0], {
|
||||
'filename': image_filename, # the named one
|
||||
'content': image_data_b64,
|
||||
'type': "image/png",
|
||||
})
|
||||
self.assertEqual(data['attachments'][1], {
|
||||
'filename': '', # the unnamed one
|
||||
'content': image_data_b64,
|
||||
'type': "image/png",
|
||||
})
|
||||
|
||||
def test_multiple_html_alternatives(self):
|
||||
# Multiple alternatives not allowed
|
||||
# SendGrid's v3 API allows all kinds of content alternatives.
|
||||
# It's unclear whether this would permit multiple text/html parts
|
||||
# (the API docs warn that "If included, text/plain and text/html must be
|
||||
# the first indices of the [content] array in this order"), but Anymail
|
||||
# generally passes whatever the API structure supports -- deferring any
|
||||
# limitations to the ESP.
|
||||
self.message.body = "Text body"
|
||||
self.message.attach_alternative("<p>First html is OK</p>", "text/html")
|
||||
self.message.attach_alternative("<p>But not second html</p>", "text/html")
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
self.message.attach_alternative("<p>And maybe second html, too</p>", "text/html")
|
||||
|
||||
def test_html_alternative(self):
|
||||
# Only html alternatives allowed
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
|
||||
def test_alternatives_fail_silently(self):
|
||||
# Make sure fail_silently is respected
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
sent = self.message.send(fail_silently=True)
|
||||
self.assert_esp_not_called("API should not be called when send fails silently")
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_suppress_empty_address_lists(self):
|
||||
"""Empty to, cc, bcc, and reply_to shouldn't generate empty headers"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('cc', data)
|
||||
self.assertNotIn('ccname', data)
|
||||
self.assertNotIn('bcc', data)
|
||||
self.assertNotIn('bccname', data)
|
||||
headers = json.loads(data['headers'])
|
||||
self.assertNotIn('Reply-To', headers)
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['content'], [
|
||||
{'type': "text/plain", 'value': "Text body"},
|
||||
{'type': "text/html", 'value': "<p>First html is OK</p>"},
|
||||
{'type': "text/html", 'value': "<p>And maybe second html, too</p>"},
|
||||
])
|
||||
|
||||
# Test empty `to` -- but send requires at least one recipient somewhere (like cc)
|
||||
self.message.to = []
|
||||
self.message.cc = ['cc@example.com']
|
||||
def test_non_html_alternative(self):
|
||||
self.message.body = "Text body"
|
||||
self.message.attach_alternative("{'maybe': 'allowed'}", "application/json")
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('to', data)
|
||||
self.assertNotIn('toname', data)
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['content'], [
|
||||
{'type': "text/plain", 'value': "Text body"},
|
||||
{'type': "application/json", 'value': "{'maybe': 'allowed'}"},
|
||||
])
|
||||
|
||||
def test_api_failure(self):
|
||||
self.set_mock_response(status_code=400)
|
||||
@@ -313,17 +306,16 @@ class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
def test_api_error_includes_details(self):
|
||||
"""AnymailAPIError should include ESP's error message"""
|
||||
# JSON error response:
|
||||
error_response = b"""{
|
||||
"message": "error",
|
||||
"errors": [
|
||||
"Helpful explanation from SendGrid",
|
||||
"and more"
|
||||
]
|
||||
}"""
|
||||
self.set_mock_response(status_code=200, raw=error_response)
|
||||
with self.assertRaisesRegex(AnymailAPIError,
|
||||
r"\bHelpful explanation from SendGrid\b.*and more\b"):
|
||||
error_response = b"""{"errors":[
|
||||
{"message":"Helpful explanation from SendGrid","field":"subject","help":null},
|
||||
{"message":"Another error","field":null,"help":null}
|
||||
]}"""
|
||||
self.set_mock_response(status_code=400, raw=error_response)
|
||||
with self.assertRaises(AnymailAPIError) as cm:
|
||||
self.message.send()
|
||||
err = cm.exception
|
||||
self.assertIn("Helpful explanation from SendGrid", str(err))
|
||||
self.assertIn("Another error", str(err))
|
||||
|
||||
# Non-JSON error response:
|
||||
self.set_mock_response(status_code=500, raw=b"Ack! Bad proxy!")
|
||||
@@ -340,12 +332,12 @@ class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
"""Test backend support for Anymail added features"""
|
||||
|
||||
def test_metadata(self):
|
||||
# Note: SendGrid doesn't handle complex types in metadata
|
||||
self.message.metadata = {'user_id': "12345", 'items': 6}
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
smtpapi['unique_args'].pop('smtp-id', None) # remove Message-ID we added as tracking workaround
|
||||
self.assertEqual(smtpapi['unique_args'], {'user_id': "12345", 'items': 6})
|
||||
data = self.get_api_call_json()
|
||||
data['custom_args'].pop('smtp-id', None) # remove Message-ID we added as tracking workaround
|
||||
self.assertEqual(data['custom_args'], {'user_id': "12345",
|
||||
'items': "6"}) # number converted to string
|
||||
|
||||
def test_send_at(self):
|
||||
utc_plus_6 = get_fixed_timezone(6 * 60)
|
||||
@@ -355,76 +347,75 @@ class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
# Timezone-aware datetime converted to UTC:
|
||||
self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=utc_minus_8)
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['send_at'], timegm((2016, 3, 4, 13, 6, 7))) # 05:06 UTC-8 == 13:06 UTC
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['send_at'], timegm((2016, 3, 4, 13, 6, 7))) # 05:06 UTC-8 == 13:06 UTC
|
||||
|
||||
# Timezone-naive datetime assumed to be Django current_timezone
|
||||
self.message.send_at = datetime(2022, 10, 11, 12, 13, 14, 567) # microseconds should get stripped
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['send_at'], timegm((2022, 10, 11, 6, 13, 14))) # 12:13 UTC+6 == 06:13 UTC
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['send_at'], timegm((2022, 10, 11, 6, 13, 14))) # 12:13 UTC+6 == 06:13 UTC
|
||||
|
||||
# Date-only treated as midnight in current timezone
|
||||
self.message.send_at = date(2022, 10, 22)
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['send_at'], timegm((2022, 10, 21, 18, 0, 0))) # 00:00 UTC+6 == 18:00-1d UTC
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['send_at'], timegm((2022, 10, 21, 18, 0, 0))) # 00:00 UTC+6 == 18:00-1d UTC
|
||||
|
||||
# POSIX timestamp
|
||||
self.message.send_at = 1651820889 # 2022-05-06 07:08:09 UTC
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['send_at'], 1651820889)
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['send_at'], 1651820889)
|
||||
|
||||
def test_tags(self):
|
||||
self.message.tags = ["receipt", "repeat-user"]
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertCountEqual(smtpapi['category'], ["receipt", "repeat-user"])
|
||||
data = self.get_api_call_json()
|
||||
self.assertCountEqual(data['categories'], ["receipt", "repeat-user"])
|
||||
|
||||
def test_tracking(self):
|
||||
# Test one way...
|
||||
self.message.track_clicks = False
|
||||
self.message.track_opens = True
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['filters']['clicktrack'], {'settings': {'enable': 0}})
|
||||
self.assertEqual(smtpapi['filters']['opentrack'], {'settings': {'enable': 1}})
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['tracking_settings']['click_tracking'], {'enable': False})
|
||||
self.assertEqual(data['tracking_settings']['open_tracking'], {'enable': True})
|
||||
|
||||
# ...and the opposite way
|
||||
self.message.track_clicks = True
|
||||
self.message.track_opens = False
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['filters']['clicktrack'], {'settings': {'enable': 1}})
|
||||
self.assertEqual(smtpapi['filters']['opentrack'], {'settings': {'enable': 0}})
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['tracking_settings']['click_tracking'], {'enable': True})
|
||||
self.assertEqual(data['tracking_settings']['open_tracking'], {'enable': False})
|
||||
|
||||
def test_template_id(self):
|
||||
self.message.attach_alternative("HTML Body", "text/html")
|
||||
self.message.template_id = "5997fcf6-2b9f-484d-acd5-7e9a99f0dc1f"
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['filters']['templates'], {
|
||||
'settings': {'enable': 1,
|
||||
'template_id': "5997fcf6-2b9f-484d-acd5-7e9a99f0dc1f"}
|
||||
})
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['text'], "Text Body")
|
||||
self.assertEqual(data['html'], "HTML Body")
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['template_id'], "5997fcf6-2b9f-484d-acd5-7e9a99f0dc1f")
|
||||
|
||||
def test_template_id_with_empty_body(self):
|
||||
# Text and html must be present (and non-empty-string), or the corresponding
|
||||
# part will not render from the template. Make sure we fill in strings:
|
||||
# v2 API required *some* text and html in message to render those template bodies,
|
||||
# so the v2 backend set those to " " when necessary.
|
||||
# But per v3 docs:
|
||||
# "If you use a template that contains content and a subject (either text or html),
|
||||
# you do not need to specify those in the respective personalizations or message
|
||||
# level parameters."
|
||||
# So make sure we aren't adding body content where not needed:
|
||||
message = mail.EmailMessage(from_email='from@example.com', to=['to@example.com'])
|
||||
message.template_id = "5997fcf6-2b9f-484d-acd5-7e9a99f0dc1f"
|
||||
message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['text'], " ") # single space is sufficient
|
||||
self.assertEqual(data['html'], " ")
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('content', data) # neither text nor html body
|
||||
self.assertNotIn('subject', data)
|
||||
|
||||
def test_merge_data(self):
|
||||
self.message.from_email = 'from@example.com'
|
||||
self.message.to = ['alice@example.com', 'Bob <bob@example.com>']
|
||||
self.message.to = ['alice@example.com', 'Bob <bob@example.com>', 'celia@example.com']
|
||||
self.message.cc = ['cc@example.com'] # gets applied to *each* recipient in a merge
|
||||
# SendGrid template_id is not required to use merge.
|
||||
# You can just supply template content as the message (e.g.):
|
||||
self.message.body = "Hi :name. Welcome to :group at :site."
|
||||
@@ -433,6 +424,7 @@ class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
# as shown here, or use one of the merge_field_format options shown in the test cases below
|
||||
'alice@example.com': {':name': "Alice", ':group': "Developers"},
|
||||
'bob@example.com': {':name': "Bob"}, # and leave :group undefined
|
||||
# and no data for celia@example.com
|
||||
}
|
||||
self.message.merge_global_data = {
|
||||
':group': "Users",
|
||||
@@ -440,20 +432,21 @@ class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
}
|
||||
self.message.send()
|
||||
|
||||
data = self.get_api_call_data()
|
||||
smtpapi = self.get_smtpapi()
|
||||
# For batch send, smtpapi['to'] gets real recipient list;
|
||||
# normal 'to' is not used (but must be valid, so we substitute the from_email):
|
||||
self.assertEqual(data['to'], ['from@example.com'])
|
||||
self.assertEqual(data['toname'], [' ']) # empty string if no name in from_email
|
||||
self.assertEqual(smtpapi['to'], ['alice@example.com', 'Bob <bob@example.com>'])
|
||||
# smtpapi['sub'] values should be in to-list order:
|
||||
self.assertEqual(smtpapi['sub'], {
|
||||
':name': ["Alice", "Bob"],
|
||||
':group': ["Developers", ":group"], # missing value gets replaced with var name...
|
||||
})
|
||||
self.assertEqual(smtpapi['section'], {
|
||||
':group': "Users", # ... which SG should then try to resolve from here
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['personalizations'], [
|
||||
{'to': [{'email': 'alice@example.com'}],
|
||||
'cc': [{'email': 'cc@example.com'}], # all recipients get the cc
|
||||
'substitutions': {':name': "Alice", ':group': "Developers",
|
||||
':site': ":site"}}, # tell SG to look for global field in 'sections'
|
||||
{'to': [{'email': 'bob@example.com', 'name': '"Bob"'}],
|
||||
'cc': [{'email': 'cc@example.com'}],
|
||||
'substitutions': {':name': "Bob", ':group': ":group", ':site': ":site"}},
|
||||
{'to': [{'email': 'celia@example.com'}],
|
||||
'cc': [{'email': 'cc@example.com'}],
|
||||
'substitutions': {':group': ":group", ':site': ":site"}}, # look for global fields in 'sections'
|
||||
])
|
||||
self.assertEqual(data['sections'], {
|
||||
':group': "Users",
|
||||
':site': "ExampleCo",
|
||||
})
|
||||
|
||||
@@ -467,12 +460,14 @@ class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
}
|
||||
self.message.merge_global_data = {'site': "ExampleCo"}
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['sub'], {
|
||||
':name': ["Alice", "Bob"],
|
||||
':group': ["Developers", ":group"] # substitutes formatted field name if missing for recipient
|
||||
})
|
||||
self.assertEqual(smtpapi['section'], {':site': "ExampleCo"})
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['personalizations'], [
|
||||
{'to': [{'email': 'alice@example.com'}],
|
||||
'substitutions': {':name': "Alice", ':group': "Developers", ':site': ":site"}}, # keys changed to :field
|
||||
{'to': [{'email': 'bob@example.com', 'name': '"Bob"'}],
|
||||
'substitutions': {':name': "Bob", ':site': ":site"}}
|
||||
])
|
||||
self.assertEqual(data['sections'], {':site': "ExampleCo"})
|
||||
|
||||
def test_merge_field_format_esp_extra(self):
|
||||
# Provide merge field delimiters for an individual message
|
||||
@@ -484,14 +479,15 @@ class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
self.message.merge_global_data = {'site': "ExampleCo"}
|
||||
self.message.esp_extra = {'merge_field_format': '*|{}|*'} # match Mandrill/MailChimp delimiters
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['sub'], {
|
||||
'*|name|*': ["Alice", "Bob"],
|
||||
'*|group|*': ["Developers", '*|group|*'] # substitutes formatted field name if missing for recipient
|
||||
})
|
||||
self.assertEqual(smtpapi['section'], {'*|site|*': "ExampleCo"})
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['personalizations'], [
|
||||
{'to': [{'email': 'alice@example.com'}],
|
||||
'substitutions': {'*|name|*': "Alice", '*|group|*': "Developers", '*|site|*': "*|site|*"}},
|
||||
{'to': [{'email': 'bob@example.com', 'name': '"Bob"'}],
|
||||
'substitutions': {'*|name|*': "Bob", '*|site|*': "*|site|*"}}
|
||||
])
|
||||
self.assertEqual(data['sections'], {'*|site|*': "ExampleCo"})
|
||||
# Make sure our esp_extra merge_field_format doesn't get sent to SendGrid API:
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('merge_field_format', data)
|
||||
|
||||
def test_warn_if_no_merge_field_delimiters(self):
|
||||
@@ -502,7 +498,12 @@ class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
with self.assertWarnsRegex(AnymailWarning, r'SENDGRID_MERGE_FIELD_FORMAT'):
|
||||
self.message.send()
|
||||
|
||||
@override_settings(ANYMAIL_SENDGRID_GENERATE_MESSAGE_ID=False) # else we force unique_args
|
||||
def test_warn_if_no_global_merge_field_delimiters(self):
|
||||
self.message.merge_global_data = {'site': "ExampleCo"}
|
||||
with self.assertWarnsRegex(AnymailWarning, r'SENDGRID_MERGE_FIELD_FORMAT'):
|
||||
self.message.send()
|
||||
|
||||
@override_settings(ANYMAIL_SENDGRID_GENERATE_MESSAGE_ID=False) # else we force custom_args
|
||||
def test_default_omits_options(self):
|
||||
"""Make sure by default we don't send any ESP-specific options.
|
||||
|
||||
@@ -511,40 +512,51 @@ class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
that your ESP account settings apply by default.
|
||||
"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('x-smtpapi', data)
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('asm', data)
|
||||
self.assertNotIn('attachments', data)
|
||||
self.assertNotIn('batch_id', data)
|
||||
self.assertNotIn('categories', data)
|
||||
self.assertNotIn('custom_args', data)
|
||||
self.assertNotIn('headers', data)
|
||||
self.assertNotIn('ip_pool_name', data)
|
||||
self.assertNotIn('mail_settings', data)
|
||||
self.assertNotIn('sections', data)
|
||||
self.assertNotIn('send_at', data)
|
||||
self.assertNotIn('template_id', data)
|
||||
self.assertNotIn('tracking_settings', data)
|
||||
|
||||
for personalization in data['personalizations']:
|
||||
self.assertNotIn('custom_args', personalization)
|
||||
self.assertNotIn('headers', personalization)
|
||||
self.assertNotIn('send_at', personalization)
|
||||
self.assertNotIn('substitutions', personalization)
|
||||
|
||||
def test_esp_extra(self):
|
||||
self.message.tags = ["tag"]
|
||||
self.message.track_clicks = True
|
||||
self.message.esp_extra = {
|
||||
'x-smtpapi': {
|
||||
# Most SendMail options go in the 'x-smtpapi' block...
|
||||
'asm_group_id': 1,
|
||||
'filters': {
|
||||
# If you add a filter, you must supply all required settings for it.
|
||||
'subscriptiontrack': {
|
||||
'settings': {
|
||||
'enable': 1,
|
||||
'replace': '[unsubscribe_url]',
|
||||
},
|
||||
},
|
||||
'ip_pool_name': "transactional",
|
||||
'asm': { # subscription management
|
||||
'group_id': 1,
|
||||
},
|
||||
'tracking_settings': {
|
||||
'subscription_tracking': {
|
||||
'enable': True,
|
||||
'substitution_tag': '[unsubscribe_url]',
|
||||
},
|
||||
},
|
||||
'newthing': "some param not supported by Anymail",
|
||||
}
|
||||
self.message.send()
|
||||
# Additional send params:
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['newthing'], "some param not supported by Anymail")
|
||||
# Should merge x-smtpapi, and merge filters within x-smtpapi
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['category'], ["tag"])
|
||||
self.assertEqual(smtpapi['asm_group_id'], 1)
|
||||
self.assertEqual(smtpapi['filters']['subscriptiontrack'],
|
||||
{'settings': {'enable': 1, 'replace': '[unsubscribe_url]'}}) # esp_extra merged
|
||||
self.assertEqual(smtpapi['filters']['clicktrack'],
|
||||
{'settings': {'enable': 1}}) # Anymail message option preserved
|
||||
data = self.get_api_call_json()
|
||||
# merged from esp_extra:
|
||||
self.assertEqual(data['ip_pool_name'], "transactional")
|
||||
self.assertEqual(data['asm'], {'group_id': 1})
|
||||
self.assertEqual(data['tracking_settings']['subscription_tracking'],
|
||||
{'enable': True, 'substitution_tag': "[unsubscribe_url]"})
|
||||
# make sure we didn't overwrite Anymail message options:
|
||||
self.assertEqual(data['categories'], ["tag"])
|
||||
self.assertEqual(data['tracking_settings']['click_tracking'], {'enable': True})
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_attaches_anymail_status(self):
|
||||
@@ -572,29 +584,26 @@ class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertIsNone(self.message.anymail_status.esp_response)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_unparsable_response(self):
|
||||
"""If the send succeeds, but a non-JSON API response, should raise an API exception"""
|
||||
mock_response = self.set_mock_response(status_code=200,
|
||||
raw=b"yikes, this isn't a real response")
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
self.message.send()
|
||||
self.assertIsNone(self.message.anymail_status.status)
|
||||
self.assertIsNone(self.message.anymail_status.message_id)
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertEqual(self.message.anymail_status.esp_response, mock_response)
|
||||
|
||||
def test_json_serialization_errors(self):
|
||||
"""Try to provide more information about non-json-serializable data"""
|
||||
self.message.metadata = {'total': Decimal('19.99')}
|
||||
with self.assertRaises(AnymailSerializationError) as cm:
|
||||
self.message.send()
|
||||
print(self.get_api_call_data())
|
||||
err = cm.exception
|
||||
self.assertIsInstance(err, TypeError) # compatibility with json.dumps
|
||||
self.assertIn("Don't know how to send this data to SendGrid", str(err)) # our added context
|
||||
self.assertIn("Decimal('19.99') is not JSON serializable", str(err)) # original message
|
||||
|
||||
@override_settings(ANYMAIL_SENDGRID_WORKAROUND_NAME_QUOTE_BUG=False)
|
||||
def test_undocumented_workaround_name_quote_bug_setting(self):
|
||||
mail.send_mail("Subject", "Body", '"Sender, Inc." <from@example.com',
|
||||
['"Recipient, Ltd." <to@example.com>'])
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data["personalizations"][0]["to"][0],
|
||||
{"email": "to@example.com", "name": "Recipient, Ltd."}) # no extra quotes on name
|
||||
self.assertEqual(data["from"],
|
||||
{"email": "from@example.com", "name": "Sender, Inc."})
|
||||
|
||||
|
||||
class SendGridBackendRecipientsRefusedTests(SendGridBackendMockAPITestCase):
|
||||
"""Should raise AnymailRecipientsRefused when *all* recipients are rejected or invalid"""
|
||||
@@ -602,8 +611,7 @@ class SendGridBackendRecipientsRefusedTests(SendGridBackendMockAPITestCase):
|
||||
# SendGrid doesn't check email bounce or complaint lists at time of send --
|
||||
# it always just queues the message. You'll need to listen for the "rejected"
|
||||
# and "failed" events to detect refused recipients.
|
||||
|
||||
pass
|
||||
pass # not applicable to this backend
|
||||
|
||||
|
||||
class SendGridBackendSessionSharingTestCase(SessionSharingTestCasesMixin, SendGridBackendMockAPITestCase):
|
||||
@@ -616,10 +624,24 @@ class SendGridBackendImproperlyConfiguredTests(SimpleTestCase, AnymailTestMixin)
|
||||
"""Test ESP backend without required settings in place"""
|
||||
|
||||
def test_missing_auth(self):
|
||||
with self.assertRaises(ImproperlyConfigured) as cm:
|
||||
with self.assertRaisesRegex(AnymailConfigurationError, r'\bSENDGRID_API_KEY\b'):
|
||||
mail.send_mail('Subject', 'Message', 'from@example.com', ['to@example.com'])
|
||||
errmsg = str(cm.exception)
|
||||
# Make sure the exception mentions all the auth keys:
|
||||
self.assertRegex(errmsg, r'\bSENDGRID_API_KEY\b')
|
||||
self.assertRegex(errmsg, r'\bSENDGRID_USERNAME\b')
|
||||
self.assertRegex(errmsg, r'\bSENDGRID_PASSWORD\b')
|
||||
|
||||
|
||||
@override_settings(EMAIL_BACKEND="anymail.backends.sendgrid.SendGridBackend")
|
||||
class SendGridBackendDisallowsV2Tests(SimpleTestCase, AnymailTestMixin):
|
||||
"""Using v2-API-only features should cause errors with v3 backend"""
|
||||
|
||||
@override_settings(ANYMAIL={'SENDGRID_USERNAME': 'sg_username', 'SENDGRID_PASSWORD': 'sg_password'})
|
||||
def test_user_pass_auth(self):
|
||||
"""Make sure v2-only USERNAME/PASSWORD auth raises error"""
|
||||
with self.assertRaisesRegex(AnymailConfigurationError, r'\bsendgrid_v2\.EmailBackend\b'):
|
||||
mail.send_mail('Subject', 'Message', 'from@example.com', ['to@example.com'])
|
||||
|
||||
@override_settings(ANYMAIL={'SENDGRID_API_KEY': 'test_api_key'})
|
||||
def test_esp_extra_smtpapi(self):
|
||||
"""x-smtpapi in the esp_extra indicates a desire to use the v2 api"""
|
||||
message = mail.EmailMessage('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
message.esp_extra = {'x-smtpapi': {'asm_group_id': 1}}
|
||||
with self.assertRaisesRegex(AnymailConfigurationError, r'\bsendgrid_v2\.EmailBackend\b'):
|
||||
message.send()
|
||||
|
||||
@@ -2,7 +2,6 @@ import os
|
||||
import unittest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from django.core.mail import send_mail
|
||||
from django.test import SimpleTestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
@@ -11,12 +10,8 @@ from anymail.message import AnymailMessage
|
||||
|
||||
from .utils import AnymailTestMixin, sample_image_path, RUN_LIVE_TESTS
|
||||
|
||||
# For API_KEY auth tests:
|
||||
SENDGRID_TEST_API_KEY = os.getenv('SENDGRID_TEST_API_KEY')
|
||||
|
||||
# For USERNAME/PASSWORD auth tests:
|
||||
SENDGRID_TEST_USERNAME = os.getenv('SENDGRID_TEST_USERNAME')
|
||||
SENDGRID_TEST_PASSWORD = os.getenv('SENDGRID_TEST_PASSWORD')
|
||||
SENDGRID_TEST_TEMPLATE_ID = os.getenv('SENDGRID_TEST_TEMPLATE_ID')
|
||||
|
||||
|
||||
@unittest.skipUnless(RUN_LIVE_TESTS, "RUN_LIVE_TESTS disabled in this environment")
|
||||
@@ -24,17 +19,21 @@ SENDGRID_TEST_PASSWORD = os.getenv('SENDGRID_TEST_PASSWORD')
|
||||
"Set SENDGRID_TEST_API_KEY environment variable "
|
||||
"to run SendGrid integration tests")
|
||||
@override_settings(ANYMAIL_SENDGRID_API_KEY=SENDGRID_TEST_API_KEY,
|
||||
ANYMAIL_SENDGRID_SEND_DEFAULTS={"esp_extra": {
|
||||
"mail_settings": {"sandbox_mode": {"enable": True}},
|
||||
}},
|
||||
EMAIL_BACKEND="anymail.backends.sendgrid.SendGridBackend")
|
||||
class SendGridBackendIntegrationTests(SimpleTestCase, AnymailTestMixin):
|
||||
"""SendGrid API integration tests
|
||||
"""SendGrid v3 API integration tests
|
||||
|
||||
These tests run against the **live** SendGrid API, using the
|
||||
environment variable `SENDGRID_TEST_API_KEY` as the API key
|
||||
If those variables are not set, these tests won't run.
|
||||
|
||||
SendGrid doesn't offer a test mode -- it tries to send everything
|
||||
you ask. To avoid stacking up a pile of undeliverable @example.com
|
||||
emails, the tests use SendGrid's "sink domain" @sink.sendgrid.net.
|
||||
The SEND_DEFAULTS above force SendGrid's v3 sandbox mode, which avoids sending mail.
|
||||
(Sandbox sends also don't show in the activity feed, so disable that for live debugging.)
|
||||
|
||||
The tests also use SendGrid's "sink domain" @sink.sendgrid.net for recipient addresses.
|
||||
https://support.sendgrid.com/hc/en-us/articles/201995663-Safely-Test-Your-Sending-Speed
|
||||
|
||||
"""
|
||||
@@ -62,20 +61,21 @@ class SendGridBackendIntegrationTests(SimpleTestCase, AnymailTestMixin):
|
||||
def test_all_options(self):
|
||||
send_at = datetime.now().replace(microsecond=0) + timedelta(minutes=2)
|
||||
message = AnymailMessage(
|
||||
subject="Anymail all-options integration test FILES",
|
||||
subject="Anymail all-options integration test",
|
||||
body="This is the text body",
|
||||
from_email="Test From <from@example.com>",
|
||||
to=["to1@sink.sendgrid.net", "Recipient 2 <to2@sink.sendgrid.net>"],
|
||||
from_email='"Test From, with comma" <from@example.com>',
|
||||
to=["to1@sink.sendgrid.net", '"Recipient 2, OK?" <to2@sink.sendgrid.net>'],
|
||||
cc=["cc1@sink.sendgrid.net", "Copy 2 <cc2@sink.sendgrid.net>"],
|
||||
bcc=["bcc1@sink.sendgrid.net", "Blind Copy 2 <bcc2@sink.sendgrid.net>"],
|
||||
reply_to=["reply1@example.com", "Reply 2 <reply2@example.com>"],
|
||||
headers={"X-Anymail-Test": "value"},
|
||||
reply_to=['"Reply, with comma" <reply@example.com>'], # v3 only supports single reply-to
|
||||
headers={"X-Anymail-Test": "value", "X-Anymail-Count": 3},
|
||||
|
||||
metadata={"meta1": "simple string", "meta2": 2},
|
||||
send_at=send_at,
|
||||
tags=["tag 1", "tag 2"],
|
||||
track_clicks=True,
|
||||
track_opens=True,
|
||||
# esp_extra={'asm': {'group_id': 1}}, # this breaks activity feed if you don't have an asm group
|
||||
)
|
||||
message.attach("attachment1.txt", "Here is some\ntext for you", "text/plain")
|
||||
message.attach("attachment2.csv", "ID,Name\n1,Amy Lina", "text/csv")
|
||||
@@ -90,13 +90,14 @@ class SendGridBackendIntegrationTests(SimpleTestCase, AnymailTestMixin):
|
||||
|
||||
def test_merge_data(self):
|
||||
message = AnymailMessage(
|
||||
subject="Anymail merge_data test: %value%",
|
||||
body="This body includes merge data: %value%",
|
||||
subject="Anymail merge_data test: %field%",
|
||||
body="This body includes merge data: %field%",
|
||||
from_email="Test From <from@example.com>",
|
||||
to=["to1@sink.sendgrid.net", "Recipient 2 <to2@sink.sendgrid.net>"],
|
||||
reply_to=['"Merge data in reply name: %field%" <reply@example.com>'],
|
||||
merge_data={
|
||||
'to1@sink.sendgrid.net': {'value': 'one'},
|
||||
'to2@sink.sendgrid.net': {'value': 'two'},
|
||||
'to1@sink.sendgrid.net': {'field': 'one'},
|
||||
'to2@sink.sendgrid.net': {'field': 'two'},
|
||||
},
|
||||
esp_extra={
|
||||
'merge_field_format': '%{}%',
|
||||
@@ -107,40 +108,30 @@ class SendGridBackendIntegrationTests(SimpleTestCase, AnymailTestMixin):
|
||||
self.assertEqual(recipient_status['to1@sink.sendgrid.net'].status, 'queued')
|
||||
self.assertEqual(recipient_status['to2@sink.sendgrid.net'].status, 'queued')
|
||||
|
||||
@unittest.skipUnless(SENDGRID_TEST_TEMPLATE_ID,
|
||||
"Set the SENDGRID_TEST_TEMPLATE_ID environment variable "
|
||||
"to a template in your SendGrid account to test stored templates")
|
||||
def test_stored_template(self):
|
||||
message = AnymailMessage(
|
||||
from_email="Test From <from@example.com>",
|
||||
to=["to@sink.sendgrid.net"],
|
||||
template_id=SENDGRID_TEST_TEMPLATE_ID,
|
||||
# The test template in the Anymail Test account has a substitution "-field-":
|
||||
merge_global_data={
|
||||
'field': 'value from merge_global_data',
|
||||
},
|
||||
esp_extra={
|
||||
'merge_field_format': '-{}-',
|
||||
},
|
||||
)
|
||||
message.send()
|
||||
self.assertEqual(message.anymail_status.status, {'queued'})
|
||||
|
||||
@override_settings(ANYMAIL_SENDGRID_API_KEY="Hey, that's not an API key!")
|
||||
def test_invalid_api_key(self):
|
||||
with self.assertRaises(AnymailAPIError) as cm:
|
||||
self.message.send()
|
||||
err = cm.exception
|
||||
self.assertEqual(err.status_code, 400)
|
||||
self.assertEqual(err.status_code, 401)
|
||||
# Make sure the exception message includes SendGrid's response:
|
||||
self.assertIn("authorization grant is invalid", str(err))
|
||||
|
||||
|
||||
@unittest.skipUnless(RUN_LIVE_TESTS, "RUN_LIVE_TESTS disabled in this environment")
|
||||
@unittest.skipUnless(SENDGRID_TEST_USERNAME and SENDGRID_TEST_PASSWORD,
|
||||
"Set SENDGRID_TEST_USERNAME and SENDGRID_TEST_PASSWORD"
|
||||
"environment variables to run SendGrid integration tests")
|
||||
@override_settings(ANYMAIL_SENDGRID_USERNAME=SENDGRID_TEST_USERNAME,
|
||||
ANYMAIL_SENDGRID_PASSWORD=SENDGRID_TEST_PASSWORD,
|
||||
EMAIL_BACKEND="anymail.backends.sendgrid.SendGridBackend")
|
||||
class SendGridBackendUserPassIntegrationTests(SimpleTestCase, AnymailTestMixin):
|
||||
"""SendGrid username/password API integration tests
|
||||
|
||||
(See notes above for the API-key tests)
|
||||
"""
|
||||
|
||||
def test_valid_auth(self):
|
||||
sent_count = send_mail('Anymail SendGrid username/password integration test',
|
||||
'Text content', 'from@example.com', ['to@sink.sendgrid.net'])
|
||||
self.assertEqual(sent_count, 1)
|
||||
|
||||
@override_settings(ANYMAIL_SENDGRID_PASSWORD="Hey, this isn't the password!")
|
||||
def test_invalid_auth(self):
|
||||
with self.assertRaises(AnymailAPIError) as cm:
|
||||
send_mail('Anymail SendGrid username/password integration test',
|
||||
'Text content', 'from@example.com', ['to@sink.sendgrid.net'])
|
||||
err = cm.exception
|
||||
self.assertEqual(err.status_code, 400)
|
||||
# Make sure the exception message includes SendGrid's response:
|
||||
self.assertIn("Bad username / password", str(err))
|
||||
|
||||
625
tests/test_sendgrid_v2_backend.py
Normal file
625
tests/test_sendgrid_v2_backend.py
Normal file
@@ -0,0 +1,625 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import json
|
||||
from calendar import timegm
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.image import MIMEImage
|
||||
|
||||
from django.core import mail
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.test import SimpleTestCase
|
||||
from django.test.utils import override_settings
|
||||
from django.utils.timezone import get_fixed_timezone, override as override_current_timezone
|
||||
|
||||
from anymail.exceptions import AnymailAPIError, AnymailSerializationError, AnymailUnsupportedFeature, AnymailWarning
|
||||
from anymail.message import attach_inline_image_file
|
||||
|
||||
from .mock_requests_backend import RequestsBackendMockAPITestCase, SessionSharingTestCasesMixin
|
||||
from .utils import sample_image_content, sample_image_path, SAMPLE_IMAGE_FILENAME, AnymailTestMixin
|
||||
|
||||
|
||||
@override_settings(EMAIL_BACKEND='anymail.backends.sendgrid_v2.EmailBackend',
|
||||
ANYMAIL={'SENDGRID_API_KEY': 'test_api_key'})
|
||||
class SendGridBackendMockAPITestCase(RequestsBackendMockAPITestCase):
|
||||
DEFAULT_RAW_RESPONSE = b"""{
|
||||
"message": "success"
|
||||
}"""
|
||||
|
||||
def setUp(self):
|
||||
super(SendGridBackendMockAPITestCase, self).setUp()
|
||||
# Simple message useful for many tests
|
||||
self.message = mail.EmailMultiAlternatives('Subject', 'Text Body', 'from@example.com', ['to@example.com'])
|
||||
|
||||
def get_smtpapi(self):
|
||||
"""Returns the x-smtpapi data passed to the mock requests call"""
|
||||
data = self.get_api_call_data()
|
||||
return json.loads(data["x-smtpapi"])
|
||||
|
||||
|
||||
class SendGridBackendStandardEmailTests(SendGridBackendMockAPITestCase):
|
||||
"""Test backend support for Django standard email features"""
|
||||
|
||||
def test_send_mail(self):
|
||||
"""Test basic API for simple send"""
|
||||
mail.send_mail('Subject here', 'Here is the message.',
|
||||
'from@sender.example.com', ['to@example.com'], fail_silently=False)
|
||||
self.assert_esp_called('/api/mail.send.json')
|
||||
http_headers = self.get_api_call_headers()
|
||||
self.assertEqual(http_headers["Authorization"], "Bearer test_api_key")
|
||||
|
||||
query = self.get_api_call_params(required=False)
|
||||
if query:
|
||||
self.assertNotIn('api_user', query)
|
||||
self.assertNotIn('api_key', query)
|
||||
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['subject'], "Subject here")
|
||||
self.assertEqual(data['text'], "Here is the message.")
|
||||
self.assertEqual(data['from'], "from@sender.example.com")
|
||||
self.assertEqual(data['to'], ["to@example.com"])
|
||||
# make sure backend assigned a Message-ID for event tracking
|
||||
email_headers = json.loads(data['headers'])
|
||||
self.assertRegex(email_headers['Message-ID'], r'\<.+@sender\.example\.com\>') # id uses from_email's domain
|
||||
# make sure we added the Message-ID to unique_args for event notification
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(email_headers['Message-ID'], smtpapi['unique_args']['smtp-id'])
|
||||
|
||||
@override_settings(ANYMAIL={'SENDGRID_USERNAME': 'sg_username', 'SENDGRID_PASSWORD': 'sg_password'})
|
||||
def test_user_pass_auth(self):
|
||||
"""Make sure alternative USERNAME/PASSWORD auth works"""
|
||||
mail.send_mail('Subject here', 'Here is the message.',
|
||||
'from@sender.example.com', ['to@example.com'], fail_silently=False)
|
||||
self.assert_esp_called('/api/mail.send.json')
|
||||
query = self.get_api_call_params()
|
||||
self.assertEqual(query['api_user'], 'sg_username')
|
||||
self.assertEqual(query['api_key'], 'sg_password')
|
||||
http_headers = self.get_api_call_headers(required=False)
|
||||
if http_headers:
|
||||
self.assertNotIn('Authorization', http_headers)
|
||||
|
||||
def test_name_addr(self):
|
||||
"""Make sure RFC2822 name-addr format (with display-name) is allowed
|
||||
|
||||
(Test both sender and recipient addresses)
|
||||
"""
|
||||
msg = mail.EmailMessage(
|
||||
'Subject', 'Message', 'From Name <from@example.com>',
|
||||
['Recipient #1 <to1@example.com>', 'to2@example.com'],
|
||||
cc=['Carbon Copy <cc1@example.com>', 'cc2@example.com'],
|
||||
bcc=['Blind Copy <bcc1@example.com>', 'bcc2@example.com'])
|
||||
msg.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['from'], "from@example.com")
|
||||
self.assertEqual(data['fromname'], "From Name")
|
||||
self.assertEqual(data['to'], ['to1@example.com', 'to2@example.com'])
|
||||
self.assertEqual(data['toname'], ['Recipient #1', ' ']) # note space -- SendGrid balks on ''
|
||||
self.assertEqual(data['cc'], ['cc1@example.com', 'cc2@example.com'])
|
||||
self.assertEqual(data['ccname'], ['Carbon Copy', ' '])
|
||||
self.assertEqual(data['bcc'], ['bcc1@example.com', 'bcc2@example.com'])
|
||||
self.assertEqual(data['bccname'], ['Blind Copy', ' '])
|
||||
|
||||
def test_email_message(self):
|
||||
email = mail.EmailMessage(
|
||||
'Subject', 'Body goes here', 'from@example.com',
|
||||
['to1@example.com', 'Also To <to2@example.com>'],
|
||||
bcc=['bcc1@example.com', 'Also BCC <bcc2@example.com>'],
|
||||
cc=['cc1@example.com', 'Also CC <cc2@example.com>'],
|
||||
headers={'Reply-To': 'another@example.com',
|
||||
'X-MyHeader': 'my value',
|
||||
'Message-ID': '<mycustommsgid@sales.example.com>'}) # should override backend msgid
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['subject'], "Subject")
|
||||
self.assertEqual(data['text'], "Body goes here")
|
||||
self.assertEqual(data['from'], "from@example.com")
|
||||
self.assertEqual(data['to'], ['to1@example.com', 'to2@example.com'])
|
||||
self.assertEqual(data['toname'], [' ', 'Also To'])
|
||||
self.assertEqual(data['bcc'], ['bcc1@example.com', 'bcc2@example.com'])
|
||||
self.assertEqual(data['bccname'], [' ', 'Also BCC'])
|
||||
self.assertEqual(data['cc'], ['cc1@example.com', 'cc2@example.com'])
|
||||
self.assertEqual(data['ccname'], [' ', 'Also CC'])
|
||||
self.assertJSONEqual(data['headers'], {
|
||||
'Message-ID': '<mycustommsgid@sales.example.com>',
|
||||
'Reply-To': 'another@example.com',
|
||||
'X-MyHeader': 'my value',
|
||||
})
|
||||
# make sure custom Message-ID also added to unique_args
|
||||
self.assertJSONEqual(data['x-smtpapi'], {
|
||||
'unique_args': {'smtp-id': '<mycustommsgid@sales.example.com>'}
|
||||
})
|
||||
|
||||
def test_html_message(self):
|
||||
text_content = 'This is an important message.'
|
||||
html_content = '<p>This is an <strong>important</strong> message.</p>'
|
||||
email = mail.EmailMultiAlternatives('Subject', text_content,
|
||||
'from@example.com', ['to@example.com'])
|
||||
email.attach_alternative(html_content, "text/html")
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['text'], text_content)
|
||||
self.assertEqual(data['html'], html_content)
|
||||
# Don't accidentally send the html part as an attachment:
|
||||
files = self.get_api_call_files(required=False)
|
||||
self.assertIsNone(files)
|
||||
|
||||
def test_html_only_message(self):
|
||||
html_content = '<p>This is an <strong>important</strong> message.</p>'
|
||||
email = mail.EmailMessage('Subject', html_content, 'from@example.com', ['to@example.com'])
|
||||
email.content_subtype = "html" # Main content is now text/html
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('text', data)
|
||||
self.assertEqual(data['html'], html_content)
|
||||
|
||||
def test_extra_headers(self):
|
||||
self.message.extra_headers = {'X-Custom': 'string', 'X-Num': 123}
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
headers = json.loads(data['headers'])
|
||||
self.assertEqual(headers['X-Custom'], 'string')
|
||||
self.assertEqual(headers['X-Num'], '123') # number converted to string (per SendGrid requirement)
|
||||
|
||||
def test_extra_headers_serialization_error(self):
|
||||
self.message.extra_headers = {'X-Custom': Decimal(12.5)}
|
||||
with self.assertRaisesMessage(AnymailSerializationError, "Decimal('12.5')"):
|
||||
self.message.send()
|
||||
|
||||
def test_reply_to(self):
|
||||
email = mail.EmailMessage('Subject', 'Body goes here', 'from@example.com', ['to1@example.com'],
|
||||
reply_to=['reply@example.com', 'Other <reply2@example.com>'],
|
||||
headers={'X-Other': 'Keep'})
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('replyto', data) # don't use SendGrid's replyto (it's broken); just use headers
|
||||
headers = json.loads(data['headers'])
|
||||
self.assertEqual(headers['Reply-To'], 'reply@example.com, Other <reply2@example.com>')
|
||||
self.assertEqual(headers['X-Other'], 'Keep') # don't lose other headers
|
||||
|
||||
def test_attachments(self):
|
||||
text_content = "* Item one\n* Item two\n* Item three"
|
||||
self.message.attach(filename="test.txt", content=text_content, mimetype="text/plain")
|
||||
|
||||
# Should guess mimetype if not provided...
|
||||
png_content = b"PNG\xb4 pretend this is the contents of a png file"
|
||||
self.message.attach(filename="test.png", content=png_content)
|
||||
|
||||
# Should work with a MIMEBase object (also tests no filename)...
|
||||
pdf_content = b"PDF\xb4 pretend this is valid pdf data"
|
||||
mimeattachment = MIMEBase('application', 'pdf')
|
||||
mimeattachment.set_payload(pdf_content)
|
||||
self.message.attach(mimeattachment)
|
||||
|
||||
self.message.send()
|
||||
files = self.get_api_call_files()
|
||||
self.assertEqual(files, {
|
||||
'files[test.txt]': ('test.txt', text_content, 'text/plain'),
|
||||
'files[test.png]': ('test.png', png_content, 'image/png'), # type inferred from filename
|
||||
'files[]': ('', pdf_content, 'application/pdf'), # no filename
|
||||
})
|
||||
|
||||
def test_attachment_name_conflicts(self):
|
||||
# It's not clear how to (or whether) supply multiple attachments with
|
||||
# the same name to SendGrid's API. Anymail treats this case as unsupported.
|
||||
self.message.attach('foo.txt', 'content', 'text/plain')
|
||||
self.message.attach('bar.txt', 'content', 'text/plain')
|
||||
self.message.attach('foo.txt', 'different content', 'text/plain')
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature,
|
||||
"multiple attachments with the same filename") as cm:
|
||||
self.message.send()
|
||||
self.assertIn('foo.txt', str(cm.exception)) # say which filename
|
||||
|
||||
def test_unnamed_attachment_conflicts(self):
|
||||
# Same as previous test, but with None/empty filenames
|
||||
self.message.attach(None, 'content', 'text/plain')
|
||||
self.message.attach('', 'different content', 'text/plain')
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, "multiple unnamed attachments"):
|
||||
self.message.send()
|
||||
|
||||
def test_unicode_attachment_correctly_decoded(self):
|
||||
self.message.attach(u"Une pièce jointe.html", u'<p>\u2019</p>', mimetype='text/html')
|
||||
self.message.send()
|
||||
files = self.get_api_call_files()
|
||||
self.assertEqual(files[u'files[Une pièce jointe.html]'],
|
||||
(u'Une pièce jointe.html', u'<p>\u2019</p>', 'text/html'))
|
||||
|
||||
def test_embedded_images(self):
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
image_path = sample_image_path(image_filename)
|
||||
image_data = sample_image_content(image_filename)
|
||||
|
||||
cid = attach_inline_image_file(self.message, image_path) # Read from a png file
|
||||
html_content = '<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % cid
|
||||
self.message.attach_alternative(html_content, "text/html")
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['html'], html_content)
|
||||
|
||||
files = self.get_api_call_files()
|
||||
self.assertEqual(files, {
|
||||
'files[%s]' % image_filename: (image_filename, image_data, "image/png"),
|
||||
})
|
||||
self.assertEqual(data['content[%s]' % image_filename], cid)
|
||||
|
||||
def test_attached_images(self):
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
image_path = sample_image_path(image_filename)
|
||||
image_data = sample_image_content(image_filename)
|
||||
|
||||
self.message.attach_file(image_path) # option 1: attach as a file
|
||||
|
||||
image = MIMEImage(image_data) # option 2: construct the MIMEImage and attach it directly
|
||||
self.message.attach(image)
|
||||
|
||||
self.message.send()
|
||||
files = self.get_api_call_files()
|
||||
self.assertEqual(files, {
|
||||
'files[%s]' % image_filename: (image_filename, image_data, "image/png"), # the named one
|
||||
'files[]': ('', image_data, "image/png"), # the unnamed one
|
||||
})
|
||||
|
||||
def test_multiple_html_alternatives(self):
|
||||
# Multiple alternatives not allowed
|
||||
self.message.attach_alternative("<p>First html is OK</p>", "text/html")
|
||||
self.message.attach_alternative("<p>But not second html</p>", "text/html")
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
|
||||
def test_html_alternative(self):
|
||||
# Only html alternatives allowed
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
|
||||
def test_alternatives_fail_silently(self):
|
||||
# Make sure fail_silently is respected
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
sent = self.message.send(fail_silently=True)
|
||||
self.assert_esp_not_called("API should not be called when send fails silently")
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_suppress_empty_address_lists(self):
|
||||
"""Empty to, cc, bcc, and reply_to shouldn't generate empty headers"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('cc', data)
|
||||
self.assertNotIn('ccname', data)
|
||||
self.assertNotIn('bcc', data)
|
||||
self.assertNotIn('bccname', data)
|
||||
headers = json.loads(data['headers'])
|
||||
self.assertNotIn('Reply-To', headers)
|
||||
|
||||
# Test empty `to` -- but send requires at least one recipient somewhere (like cc)
|
||||
self.message.to = []
|
||||
self.message.cc = ['cc@example.com']
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('to', data)
|
||||
self.assertNotIn('toname', data)
|
||||
|
||||
def test_api_failure(self):
|
||||
self.set_mock_response(status_code=400)
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
sent = mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
# Make sure fail_silently is respected
|
||||
self.set_mock_response(status_code=400)
|
||||
sent = mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'], fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_api_error_includes_details(self):
|
||||
"""AnymailAPIError should include ESP's error message"""
|
||||
# JSON error response:
|
||||
error_response = b"""{
|
||||
"message": "error",
|
||||
"errors": [
|
||||
"Helpful explanation from SendGrid",
|
||||
"and more"
|
||||
]
|
||||
}"""
|
||||
self.set_mock_response(status_code=200, raw=error_response)
|
||||
with self.assertRaisesRegex(AnymailAPIError,
|
||||
r"\bHelpful explanation from SendGrid\b.*and more\b"):
|
||||
self.message.send()
|
||||
|
||||
# Non-JSON error response:
|
||||
self.set_mock_response(status_code=500, raw=b"Ack! Bad proxy!")
|
||||
with self.assertRaisesMessage(AnymailAPIError, "Ack! Bad proxy!"):
|
||||
self.message.send()
|
||||
|
||||
# No content in the error response:
|
||||
self.set_mock_response(status_code=502, raw=None)
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
self.message.send()
|
||||
|
||||
|
||||
class SendGridBackendAnymailFeatureTests(SendGridBackendMockAPITestCase):
|
||||
"""Test backend support for Anymail added features"""
|
||||
|
||||
def test_metadata(self):
|
||||
# Note: SendGrid doesn't handle complex types in metadata
|
||||
self.message.metadata = {'user_id': "12345", 'items': 6}
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
smtpapi['unique_args'].pop('smtp-id', None) # remove Message-ID we added as tracking workaround
|
||||
self.assertEqual(smtpapi['unique_args'], {'user_id': "12345", 'items': 6})
|
||||
|
||||
def test_send_at(self):
|
||||
utc_plus_6 = get_fixed_timezone(6 * 60)
|
||||
utc_minus_8 = get_fixed_timezone(-8 * 60)
|
||||
|
||||
with override_current_timezone(utc_plus_6):
|
||||
# Timezone-aware datetime converted to UTC:
|
||||
self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=utc_minus_8)
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['send_at'], timegm((2016, 3, 4, 13, 6, 7))) # 05:06 UTC-8 == 13:06 UTC
|
||||
|
||||
# Timezone-naive datetime assumed to be Django current_timezone
|
||||
self.message.send_at = datetime(2022, 10, 11, 12, 13, 14, 567) # microseconds should get stripped
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['send_at'], timegm((2022, 10, 11, 6, 13, 14))) # 12:13 UTC+6 == 06:13 UTC
|
||||
|
||||
# Date-only treated as midnight in current timezone
|
||||
self.message.send_at = date(2022, 10, 22)
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['send_at'], timegm((2022, 10, 21, 18, 0, 0))) # 00:00 UTC+6 == 18:00-1d UTC
|
||||
|
||||
# POSIX timestamp
|
||||
self.message.send_at = 1651820889 # 2022-05-06 07:08:09 UTC
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['send_at'], 1651820889)
|
||||
|
||||
def test_tags(self):
|
||||
self.message.tags = ["receipt", "repeat-user"]
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertCountEqual(smtpapi['category'], ["receipt", "repeat-user"])
|
||||
|
||||
def test_tracking(self):
|
||||
# Test one way...
|
||||
self.message.track_clicks = False
|
||||
self.message.track_opens = True
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['filters']['clicktrack'], {'settings': {'enable': 0}})
|
||||
self.assertEqual(smtpapi['filters']['opentrack'], {'settings': {'enable': 1}})
|
||||
|
||||
# ...and the opposite way
|
||||
self.message.track_clicks = True
|
||||
self.message.track_opens = False
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['filters']['clicktrack'], {'settings': {'enable': 1}})
|
||||
self.assertEqual(smtpapi['filters']['opentrack'], {'settings': {'enable': 0}})
|
||||
|
||||
def test_template_id(self):
|
||||
self.message.attach_alternative("HTML Body", "text/html")
|
||||
self.message.template_id = "5997fcf6-2b9f-484d-acd5-7e9a99f0dc1f"
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['filters']['templates'], {
|
||||
'settings': {'enable': 1,
|
||||
'template_id': "5997fcf6-2b9f-484d-acd5-7e9a99f0dc1f"}
|
||||
})
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['text'], "Text Body")
|
||||
self.assertEqual(data['html'], "HTML Body")
|
||||
|
||||
def test_template_id_with_empty_body(self):
|
||||
# Text and html must be present (and non-empty-string), or the corresponding
|
||||
# part will not render from the template. Make sure we fill in strings:
|
||||
message = mail.EmailMessage(from_email='from@example.com', to=['to@example.com'])
|
||||
message.template_id = "5997fcf6-2b9f-484d-acd5-7e9a99f0dc1f"
|
||||
message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['text'], " ") # single space is sufficient
|
||||
self.assertEqual(data['html'], " ")
|
||||
|
||||
def test_merge_data(self):
|
||||
self.message.from_email = 'from@example.com'
|
||||
self.message.to = ['alice@example.com', 'Bob <bob@example.com>']
|
||||
# SendGrid template_id is not required to use merge.
|
||||
# You can just supply template content as the message (e.g.):
|
||||
self.message.body = "Hi :name. Welcome to :group at :site."
|
||||
self.message.merge_data = {
|
||||
# You must either include merge field delimiters in the keys (':name' rather than just 'name')
|
||||
# as shown here, or use one of the merge_field_format options shown in the test cases below
|
||||
'alice@example.com': {':name': "Alice", ':group': "Developers"},
|
||||
'bob@example.com': {':name': "Bob"}, # and leave :group undefined
|
||||
}
|
||||
self.message.merge_global_data = {
|
||||
':group': "Users",
|
||||
':site': "ExampleCo",
|
||||
}
|
||||
self.message.send()
|
||||
|
||||
data = self.get_api_call_data()
|
||||
smtpapi = self.get_smtpapi()
|
||||
# For batch send, smtpapi['to'] gets real recipient list;
|
||||
# normal 'to' is not used (but must be valid, so we substitute the from_email):
|
||||
self.assertEqual(data['to'], ['from@example.com'])
|
||||
self.assertEqual(data['toname'], [' ']) # empty string if no name in from_email
|
||||
self.assertEqual(smtpapi['to'], ['alice@example.com', 'Bob <bob@example.com>'])
|
||||
# smtpapi['sub'] values should be in to-list order:
|
||||
self.assertEqual(smtpapi['sub'], {
|
||||
':name': ["Alice", "Bob"],
|
||||
':group': ["Developers", ":group"], # missing value gets replaced with var name...
|
||||
})
|
||||
self.assertEqual(smtpapi['section'], {
|
||||
':group': "Users", # ... which SG should then try to resolve from here
|
||||
':site': "ExampleCo",
|
||||
})
|
||||
|
||||
@override_settings(ANYMAIL_SENDGRID_MERGE_FIELD_FORMAT=":{}") # :field as shown in SG examples
|
||||
def test_merge_field_format_setting(self):
|
||||
# Provide merge field delimiters in settings.py
|
||||
self.message.to = ['alice@example.com', 'Bob <bob@example.com>']
|
||||
self.message.merge_data = {
|
||||
'alice@example.com': {'name': "Alice", 'group': "Developers"},
|
||||
'bob@example.com': {'name': "Bob"}, # and leave group undefined
|
||||
}
|
||||
self.message.merge_global_data = {'site': "ExampleCo"}
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['sub'], {
|
||||
':name': ["Alice", "Bob"],
|
||||
':group': ["Developers", ":group"] # substitutes formatted field name if missing for recipient
|
||||
})
|
||||
self.assertEqual(smtpapi['section'], {':site': "ExampleCo"})
|
||||
|
||||
def test_merge_field_format_esp_extra(self):
|
||||
# Provide merge field delimiters for an individual message
|
||||
self.message.to = ['alice@example.com', 'Bob <bob@example.com>']
|
||||
self.message.merge_data = {
|
||||
'alice@example.com': {'name': "Alice", 'group': "Developers"},
|
||||
'bob@example.com': {'name': "Bob"}, # and leave group undefined
|
||||
}
|
||||
self.message.merge_global_data = {'site': "ExampleCo"}
|
||||
self.message.esp_extra = {'merge_field_format': '*|{}|*'} # match Mandrill/MailChimp delimiters
|
||||
self.message.send()
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['sub'], {
|
||||
'*|name|*': ["Alice", "Bob"],
|
||||
'*|group|*': ["Developers", '*|group|*'] # substitutes formatted field name if missing for recipient
|
||||
})
|
||||
self.assertEqual(smtpapi['section'], {'*|site|*': "ExampleCo"})
|
||||
# Make sure our esp_extra merge_field_format doesn't get sent to SendGrid API:
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('merge_field_format', data)
|
||||
|
||||
def test_warn_if_no_merge_field_delimiters(self):
|
||||
self.message.to = ['alice@example.com']
|
||||
self.message.merge_data = {
|
||||
'alice@example.com': {'name': "Alice", 'group': "Developers"},
|
||||
}
|
||||
with self.assertWarnsRegex(AnymailWarning, r'SENDGRID_MERGE_FIELD_FORMAT'):
|
||||
self.message.send()
|
||||
|
||||
@override_settings(ANYMAIL_SENDGRID_GENERATE_MESSAGE_ID=False) # else we force unique_args
|
||||
def test_default_omits_options(self):
|
||||
"""Make sure by default we don't send any ESP-specific options.
|
||||
|
||||
Options not specified by the caller should be omitted entirely from
|
||||
the API call (*not* sent as False or empty). This ensures
|
||||
that your ESP account settings apply by default.
|
||||
"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('x-smtpapi', data)
|
||||
|
||||
def test_esp_extra(self):
|
||||
self.message.tags = ["tag"]
|
||||
self.message.track_clicks = True
|
||||
self.message.esp_extra = {
|
||||
'x-smtpapi': {
|
||||
# Most SendMail options go in the 'x-smtpapi' block...
|
||||
'asm_group_id': 1,
|
||||
'filters': {
|
||||
# If you add a filter, you must supply all required settings for it.
|
||||
'subscriptiontrack': {
|
||||
'settings': {
|
||||
'enable': 1,
|
||||
'replace': '[unsubscribe_url]',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
'newthing': "some param not supported by Anymail",
|
||||
}
|
||||
self.message.send()
|
||||
# Additional send params:
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['newthing'], "some param not supported by Anymail")
|
||||
# Should merge x-smtpapi, and merge filters within x-smtpapi
|
||||
smtpapi = self.get_smtpapi()
|
||||
self.assertEqual(smtpapi['category'], ["tag"])
|
||||
self.assertEqual(smtpapi['asm_group_id'], 1)
|
||||
self.assertEqual(smtpapi['filters']['subscriptiontrack'],
|
||||
{'settings': {'enable': 1, 'replace': '[unsubscribe_url]'}}) # esp_extra merged
|
||||
self.assertEqual(smtpapi['filters']['clicktrack'],
|
||||
{'settings': {'enable': 1}}) # Anymail message option preserved
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_attaches_anymail_status(self):
|
||||
""" The anymail_status should be attached to the message when it is sent """
|
||||
# the DEFAULT_RAW_RESPONSE above is the *only* success response SendGrid returns,
|
||||
# so no need to override it here
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],)
|
||||
sent = msg.send()
|
||||
self.assertEqual(sent, 1)
|
||||
self.assertEqual(msg.anymail_status.status, {'queued'})
|
||||
self.assertRegex(msg.anymail_status.message_id, r'\<.+@example\.com\>') # don't know exactly what it'll be
|
||||
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].status, 'queued')
|
||||
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].message_id,
|
||||
msg.anymail_status.message_id)
|
||||
self.assertEqual(msg.anymail_status.esp_response.content, self.DEFAULT_RAW_RESPONSE)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_failed_anymail_status(self):
|
||||
""" If the send fails, anymail_status should contain initial values"""
|
||||
self.set_mock_response(status_code=500)
|
||||
sent = self.message.send(fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
self.assertIsNone(self.message.anymail_status.status)
|
||||
self.assertIsNone(self.message.anymail_status.message_id)
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertIsNone(self.message.anymail_status.esp_response)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_unparsable_response(self):
|
||||
"""If the send succeeds, but a non-JSON API response, should raise an API exception"""
|
||||
mock_response = self.set_mock_response(status_code=200,
|
||||
raw=b"yikes, this isn't a real response")
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
self.message.send()
|
||||
self.assertIsNone(self.message.anymail_status.status)
|
||||
self.assertIsNone(self.message.anymail_status.message_id)
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertEqual(self.message.anymail_status.esp_response, mock_response)
|
||||
|
||||
def test_json_serialization_errors(self):
|
||||
"""Try to provide more information about non-json-serializable data"""
|
||||
self.message.metadata = {'total': Decimal('19.99')}
|
||||
with self.assertRaises(AnymailSerializationError) as cm:
|
||||
self.message.send()
|
||||
print(self.get_api_call_data())
|
||||
err = cm.exception
|
||||
self.assertIsInstance(err, TypeError) # compatibility with json.dumps
|
||||
self.assertIn("Don't know how to send this data to SendGrid", str(err)) # our added context
|
||||
self.assertIn("Decimal('19.99') is not JSON serializable", str(err)) # original message
|
||||
|
||||
|
||||
class SendGridBackendRecipientsRefusedTests(SendGridBackendMockAPITestCase):
|
||||
"""Should raise AnymailRecipientsRefused when *all* recipients are rejected or invalid"""
|
||||
|
||||
# SendGrid doesn't check email bounce or complaint lists at time of send --
|
||||
# it always just queues the message. You'll need to listen for the "rejected"
|
||||
# and "failed" events to detect refused recipients.
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class SendGridBackendSessionSharingTestCase(SessionSharingTestCasesMixin, SendGridBackendMockAPITestCase):
|
||||
"""Requests session sharing tests"""
|
||||
pass # tests are defined in the mixin
|
||||
|
||||
|
||||
@override_settings(EMAIL_BACKEND="anymail.backends.sendgrid_v2.EmailBackend")
|
||||
class SendGridBackendImproperlyConfiguredTests(SimpleTestCase, AnymailTestMixin):
|
||||
"""Test ESP backend without required settings in place"""
|
||||
|
||||
def test_missing_auth(self):
|
||||
with self.assertRaises(ImproperlyConfigured) as cm:
|
||||
mail.send_mail('Subject', 'Message', 'from@example.com', ['to@example.com'])
|
||||
errmsg = str(cm.exception)
|
||||
# Make sure the exception mentions all the auth keys:
|
||||
self.assertRegex(errmsg, r'\bSENDGRID_API_KEY\b')
|
||||
self.assertRegex(errmsg, r'\bSENDGRID_USERNAME\b')
|
||||
self.assertRegex(errmsg, r'\bSENDGRID_PASSWORD\b')
|
||||
146
tests/test_sendgrid_v2_integration.py
Normal file
146
tests/test_sendgrid_v2_integration.py
Normal file
@@ -0,0 +1,146 @@
|
||||
import os
|
||||
import unittest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from django.core.mail import send_mail
|
||||
from django.test import SimpleTestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from anymail.exceptions import AnymailAPIError
|
||||
from anymail.message import AnymailMessage
|
||||
|
||||
from .utils import AnymailTestMixin, sample_image_path, RUN_LIVE_TESTS
|
||||
|
||||
# For API_KEY auth tests:
|
||||
SENDGRID_TEST_API_KEY = os.getenv('SENDGRID_TEST_API_KEY')
|
||||
|
||||
# For USERNAME/PASSWORD auth tests:
|
||||
SENDGRID_TEST_USERNAME = os.getenv('SENDGRID_TEST_USERNAME')
|
||||
SENDGRID_TEST_PASSWORD = os.getenv('SENDGRID_TEST_PASSWORD')
|
||||
|
||||
|
||||
@unittest.skipUnless(RUN_LIVE_TESTS, "RUN_LIVE_TESTS disabled in this environment")
|
||||
@unittest.skipUnless(SENDGRID_TEST_API_KEY,
|
||||
"Set SENDGRID_TEST_API_KEY environment variable "
|
||||
"to run SendGrid integration tests")
|
||||
@override_settings(ANYMAIL_SENDGRID_API_KEY=SENDGRID_TEST_API_KEY,
|
||||
EMAIL_BACKEND="anymail.backends.sendgrid_v2.EmailBackend")
|
||||
class SendGridBackendIntegrationTests(SimpleTestCase, AnymailTestMixin):
|
||||
"""SendGrid v2 API integration tests
|
||||
|
||||
These tests run against the **live** SendGrid API, using the
|
||||
environment variable `SENDGRID_TEST_API_KEY` as the API key
|
||||
If those variables are not set, these tests won't run.
|
||||
|
||||
SendGrid v2 doesn't offer a test mode -- it tries to send everything
|
||||
you ask. To avoid stacking up a pile of undeliverable @example.com
|
||||
emails, the tests use SendGrid's "sink domain" @sink.sendgrid.net.
|
||||
https://support.sendgrid.com/hc/en-us/articles/201995663-Safely-Test-Your-Sending-Speed
|
||||
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(SendGridBackendIntegrationTests, self).setUp()
|
||||
self.message = AnymailMessage('Anymail SendGrid integration test', 'Text content',
|
||||
'from@example.com', ['to@sink.sendgrid.net'])
|
||||
self.message.attach_alternative('<p>HTML content</p>', "text/html")
|
||||
|
||||
def test_simple_send(self):
|
||||
# Example of getting the SendGrid send status and message id from the message
|
||||
sent_count = self.message.send()
|
||||
self.assertEqual(sent_count, 1)
|
||||
|
||||
anymail_status = self.message.anymail_status
|
||||
sent_status = anymail_status.recipients['to@sink.sendgrid.net'].status
|
||||
message_id = anymail_status.recipients['to@sink.sendgrid.net'].message_id
|
||||
|
||||
self.assertEqual(sent_status, 'queued') # SendGrid always queues
|
||||
self.assertRegex(message_id, r'\<.+@example\.com\>') # should use from_email's domain
|
||||
self.assertEqual(anymail_status.status, {sent_status}) # set of all recipient statuses
|
||||
self.assertEqual(anymail_status.message_id, message_id)
|
||||
|
||||
def test_all_options(self):
|
||||
send_at = datetime.now().replace(microsecond=0) + timedelta(minutes=2)
|
||||
message = AnymailMessage(
|
||||
subject="Anymail all-options integration test FILES",
|
||||
body="This is the text body",
|
||||
from_email="Test From <from@example.com>",
|
||||
to=["to1@sink.sendgrid.net", "Recipient 2 <to2@sink.sendgrid.net>"],
|
||||
cc=["cc1@sink.sendgrid.net", "Copy 2 <cc2@sink.sendgrid.net>"],
|
||||
bcc=["bcc1@sink.sendgrid.net", "Blind Copy 2 <bcc2@sink.sendgrid.net>"],
|
||||
reply_to=["reply1@example.com", "Reply 2 <reply2@example.com>"],
|
||||
headers={"X-Anymail-Test": "value"},
|
||||
|
||||
metadata={"meta1": "simple string", "meta2": 2},
|
||||
send_at=send_at,
|
||||
tags=["tag 1", "tag 2"],
|
||||
track_clicks=True,
|
||||
track_opens=True,
|
||||
)
|
||||
message.attach("attachment1.txt", "Here is some\ntext for you", "text/plain")
|
||||
message.attach("attachment2.csv", "ID,Name\n1,Amy Lina", "text/csv")
|
||||
cid = message.attach_inline_image_file(sample_image_path())
|
||||
message.attach_alternative(
|
||||
"<p><b>HTML:</b> with <a href='http://example.com'>link</a>"
|
||||
"and image: <img src='cid:%s'></div>" % cid,
|
||||
"text/html")
|
||||
|
||||
message.send()
|
||||
self.assertEqual(message.anymail_status.status, {'queued'}) # SendGrid always queues
|
||||
|
||||
def test_merge_data(self):
|
||||
message = AnymailMessage(
|
||||
subject="Anymail merge_data test: %value%",
|
||||
body="This body includes merge data: %value%",
|
||||
from_email="Test From <from@example.com>",
|
||||
to=["to1@sink.sendgrid.net", "Recipient 2 <to2@sink.sendgrid.net>"],
|
||||
merge_data={
|
||||
'to1@sink.sendgrid.net': {'value': 'one'},
|
||||
'to2@sink.sendgrid.net': {'value': 'two'},
|
||||
},
|
||||
esp_extra={
|
||||
'merge_field_format': '%{}%',
|
||||
},
|
||||
)
|
||||
message.send()
|
||||
recipient_status = message.anymail_status.recipients
|
||||
self.assertEqual(recipient_status['to1@sink.sendgrid.net'].status, 'queued')
|
||||
self.assertEqual(recipient_status['to2@sink.sendgrid.net'].status, 'queued')
|
||||
|
||||
@override_settings(ANYMAIL_SENDGRID_API_KEY="Hey, that's not an API key!")
|
||||
def test_invalid_api_key(self):
|
||||
with self.assertRaises(AnymailAPIError) as cm:
|
||||
self.message.send()
|
||||
err = cm.exception
|
||||
self.assertEqual(err.status_code, 400)
|
||||
# Make sure the exception message includes SendGrid's response:
|
||||
self.assertIn("authorization grant is invalid", str(err))
|
||||
|
||||
|
||||
@unittest.skipUnless(RUN_LIVE_TESTS, "RUN_LIVE_TESTS disabled in this environment")
|
||||
@unittest.skipUnless(SENDGRID_TEST_USERNAME and SENDGRID_TEST_PASSWORD,
|
||||
"Set SENDGRID_TEST_USERNAME and SENDGRID_TEST_PASSWORD"
|
||||
"environment variables to run SendGrid integration tests")
|
||||
@override_settings(ANYMAIL_SENDGRID_USERNAME=SENDGRID_TEST_USERNAME,
|
||||
ANYMAIL_SENDGRID_PASSWORD=SENDGRID_TEST_PASSWORD,
|
||||
EMAIL_BACKEND="anymail.backends.sendgrid_v2.EmailBackend")
|
||||
class SendGridBackendUserPassIntegrationTests(SimpleTestCase, AnymailTestMixin):
|
||||
"""SendGrid username/password API integration tests
|
||||
|
||||
(See notes above for the API-key tests)
|
||||
"""
|
||||
|
||||
def test_valid_auth(self):
|
||||
sent_count = send_mail('Anymail SendGrid username/password integration test',
|
||||
'Text content', 'from@example.com', ['to@sink.sendgrid.net'])
|
||||
self.assertEqual(sent_count, 1)
|
||||
|
||||
@override_settings(ANYMAIL_SENDGRID_PASSWORD="Hey, this isn't the password!")
|
||||
def test_invalid_auth(self):
|
||||
with self.assertRaises(AnymailAPIError) as cm:
|
||||
send_mail('Anymail SendGrid username/password integration test',
|
||||
'Text content', 'from@example.com', ['to@sink.sendgrid.net'])
|
||||
err = cm.exception
|
||||
self.assertEqual(err.status_code, 400)
|
||||
# Make sure the exception message includes SendGrid's response:
|
||||
self.assertIn("Bad username / password", str(err))
|
||||
@@ -5,7 +5,7 @@ from django.test import SimpleTestCase
|
||||
from django.utils.translation import ugettext_lazy, string_concat
|
||||
|
||||
from anymail.exceptions import AnymailInvalidAddress
|
||||
from anymail.utils import ParsedEmail, is_lazy, force_non_lazy, force_non_lazy_dict, force_non_lazy_list
|
||||
from anymail.utils import ParsedEmail, is_lazy, force_non_lazy, force_non_lazy_dict, force_non_lazy_list, update_deep
|
||||
|
||||
|
||||
class ParsedEmailTests(SimpleTestCase):
|
||||
@@ -116,3 +116,29 @@ class LazyCoercionTests(SimpleTestCase):
|
||||
result = force_non_lazy_list([0, ugettext_lazy(u"b"), u"c"])
|
||||
self.assertEqual(result, [0, u"b", u"c"]) # coerced to list
|
||||
self.assertIsInstance(result[1], six.text_type)
|
||||
|
||||
|
||||
class UpdateDeepTests(SimpleTestCase):
|
||||
"""Test utils.update_deep"""
|
||||
|
||||
def test_updates_recursively(self):
|
||||
first = {'a': {'a1': 1, 'aa': {}}, 'b': "B"}
|
||||
second = {'a': {'a2': 2, 'aa': {'aa1': 11}}}
|
||||
result = update_deep(first, second)
|
||||
self.assertEqual(first, {'a': {'a1': 1, 'a2': 2, 'aa': {'aa1': 11}}, 'b': "B"})
|
||||
self.assertIsNone(result) # modifies first in place; doesn't return it (same as dict.update())
|
||||
|
||||
def test_overwrites_sequences(self):
|
||||
"""Only mappings are handled recursively; sequences are considered atomic"""
|
||||
first = {'a': [1, 2]}
|
||||
second = {'a': [3]}
|
||||
update_deep(first, second)
|
||||
self.assertEqual(first, {'a': [3]})
|
||||
|
||||
def test_handles_non_dict_mappings(self):
|
||||
"""Mapping types in general are supported"""
|
||||
from collections import OrderedDict, defaultdict
|
||||
first = OrderedDict(a=OrderedDict(a1=1), c={'c1': 1})
|
||||
second = defaultdict(None, a=dict(a2=2))
|
||||
update_deep(first, second)
|
||||
self.assertEqual(first, {'a': {'a1': 1, 'a2': 2}, 'c': {'c1': 1}})
|
||||
|
||||
Reference in New Issue
Block a user