Add Mailgun backend

This commit is contained in:
medmunds
2016-03-07 18:07:48 -08:00
parent 4ad2af2469
commit d1d41badc8
13 changed files with 1103 additions and 64 deletions

View File

@@ -5,45 +5,10 @@ from django.core.mail.backends.base import BaseEmailBackend
from django.utils.timezone import is_naive, get_current_timezone, make_aware, utc from django.utils.timezone import is_naive, get_current_timezone, make_aware, utc
from ..exceptions import AnymailError, AnymailUnsupportedFeature, AnymailRecipientsRefused from ..exceptions import AnymailError, AnymailUnsupportedFeature, AnymailRecipientsRefused
from ..message import AnymailStatus
from ..utils import Attachment, ParsedEmail, UNSET, combine, last, get_anymail_setting from ..utils import Attachment, ParsedEmail, UNSET, combine, last, get_anymail_setting
# The AnymailStatus* stuff should get moved to an anymail.message module
ANYMAIL_STATUSES = [
'sent', # the ESP has sent the message (though it may or may not get delivered)
'queued', # the ESP will try to send the message later
'invalid', # the recipient email was not valid
'rejected', # the recipient is blacklisted
'unknown', # anything else
]
class AnymailRecipientStatus(object):
"""Information about an EmailMessage's send status for a single recipient"""
def __init__(self, message_id, status):
self.message_id = message_id # ESP message id
self.status = status # one of ANYMAIL_STATUSES, or None for not yet sent to ESP
class AnymailStatus(object):
"""Information about an EmailMessage's send status for all recipients"""
def __init__(self):
self.message_id = None # set of ESP message ids across all recipients, or bare id if only one, or None
self.status = None # set of ANYMAIL_STATUSES across all recipients, or None for not yet sent to ESP
self.recipients = {} # per-recipient: { email: AnymailRecipientStatus, ... }
self.esp_response = None
def set_recipient_status(self, recipients):
self.recipients.update(recipients)
recipient_statuses = self.recipients.values()
self.message_id = set([recipient.message_id for recipient in recipient_statuses])
if len(self.message_id) == 1:
self.message_id = self.message_id.pop() # de-set-ify if single message_id
self.status = set([recipient.status for recipient in recipient_statuses])
class AnymailBaseBackend(BaseEmailBackend): class AnymailBaseBackend(BaseEmailBackend):
""" """
Base Anymail email backend Base Anymail email backend

155
anymail/backends/mailgun.py Normal file
View File

@@ -0,0 +1,155 @@
from datetime import datetime
from ..exceptions import AnymailRequestsAPIError, AnymailError
from ..message import AnymailRecipientStatus
from ..utils import get_anymail_setting, rfc2822date
from .base_requests import AnymailRequestsBackend, RequestsPayload
class MailgunBackend(AnymailRequestsBackend):
"""
Mailgun API Email Backend
"""
def __init__(self, **kwargs):
"""Init options from Django settings"""
self.api_key = get_anymail_setting('MAILGUN_API_KEY', allow_bare=True)
api_url = get_anymail_setting("MAILGUN_API_URL", "https://api.mailgun.net/v3")
if not api_url.endswith("/"):
api_url += "/"
super(MailgunBackend, self).__init__(api_url, **kwargs)
def build_message_payload(self, message, defaults):
return MailgunPayload(message, defaults, self)
def parse_recipient_status(self, response, payload, message):
# The *only* 200 response from Mailgun seems to be:
# {
# "id": "<20160306015544.116301.25145@example.org>",
# "message": "Queued. Thank you."
# }
#
# That single message id applies to all recipients.
# The only way to detect rejected, etc. is via webhooks.
# (*Any* invalid recipient addresses will generate a 400 API error)
parsed_response = self.deserialize_json_response(response, payload, message)
try:
message_id = parsed_response["id"]
mailgun_message = parsed_response["message"]
except (KeyError, TypeError):
raise AnymailRequestsAPIError("Invalid Mailgun API response format",
email_message=message, payload=payload, response=response)
if not mailgun_message.startswith("Queued"):
raise AnymailRequestsAPIError("Unrecognized Mailgun API message '%s'" % mailgun_message,
email_message=message, payload=payload, response=response)
# Simulate a per-recipient status of "queued":
status = AnymailRecipientStatus(message_id=message_id, status="queued")
return {recipient.email: status for recipient in payload.all_recipients}
class MailgunPayload(RequestsPayload):
def __init__(self, message, defaults, backend, *args, **kwargs):
auth = ("api", backend.api_key)
self.sender_domain = None
self.all_recipients = [] # used for backend.parse_recipient_status
super(MailgunPayload, self).__init__(message, defaults, backend, auth=auth, *args, **kwargs)
def get_api_endpoint(self):
if self.sender_domain is None:
raise AnymailError("Cannot call Mailgun unknown sender domain. "
"Either provide valid `from_email`, "
"or set `message.esp_extra={'sender_domain': 'example.com'}`",
backend=self.backend, email_message=self.message, payload=self)
return "%s/messages" % self.sender_domain
#
# Payload construction
#
def init_payload(self):
self.data = {} # {field: [multiple, values]}
self.files = [] # [(field, multiple), (field, values)]
def set_from_email(self, email):
self.data["from"] = str(email)
if self.sender_domain is None:
# try to intuit sender_domain from from_email
try:
_, domain = email.email.split('@')
self.sender_domain = domain
except ValueError:
pass
def set_recipients(self, recipient_type, emails):
assert recipient_type in ["to", "cc", "bcc"]
if emails:
self.data[recipient_type] = [str(email) for email in emails]
self.all_recipients += emails # used for backend.parse_recipient_status
def set_subject(self, subject):
self.data["subject"] = subject
def set_reply_to(self, emails):
if emails:
reply_to = ", ".join([str(email) for email in emails])
self.data["h:Reply-To"] = reply_to
def set_extra_headers(self, headers):
for key, value in headers.items():
self.data["h:%s" % key] = value
def set_text_body(self, body):
self.data["text"] = body
def set_html_body(self, body):
if "html" in self.data:
# second html body could show up through multiple alternatives, or html body + alternative
self.unsupported_feature("multiple html parts")
self.data["html"] = body
def add_attachment(self, attachment):
# http://docs.python-requests.org/en/v2.4.3/user/advanced/#post-multiple-multipart-encoded-files
field = "inline" if attachment.inline else "attachment"
self.files.append(
(field, (attachment.name, attachment.content, attachment.mimetype))
)
def set_metadata(self, metadata):
# The Mailgun docs are a little unclear on whether to send each var as a separate v: field,
# or to send a single 'v:my-custom-data' field with a json blob of all vars.
# (https://documentation.mailgun.com/user_manual.html#attaching-data-to-messages)
# From experimentation, it seems like the first option works:
for key, value in metadata.items():
# Ensure the value is json-serializable (for Mailgun storage)
json = self.serialize_json(value) # will raise AnymailSerializationError
# Special case: a single string value should be sent bare (without quotes),
# because Mailgun will add quotes when querying the value as json.
if json.startswith('"'): # only a single string could be serialized as "...
json = value
self.data["v:%s" % key] = json
def set_send_at(self, send_at):
# Mailgun expects RFC-2822 format dates
# (BasePayload has converted most date-like values to datetime by now;
# if the caller passes a string, they'll need to format it themselves.)
if isinstance(send_at, datetime):
send_at = rfc2822date(send_at)
self.data["o:deliverytime"] = send_at
def set_tags(self, tags):
self.data["o:tag"] = tags
def set_track_clicks(self, track_clicks):
# Mailgun also supports an "htmlonly" option, which Anymail doesn't offer
self.data["o:tracking-clicks"] = "yes" if track_clicks else "no"
def set_track_opens(self, track_opens):
self.data["o:tracking-opens"] = "yes" if track_opens else "no"
def set_esp_extra(self, extra):
self.data.update(extra)
# Allow override of sender_domain via esp_extra
# (but pop it out of params to send to Mailgun)
self.sender_domain = self.data.pop("sender_domain", self.sender_domain)

View File

@@ -1,7 +1,7 @@
from datetime import datetime from datetime import datetime
from anymail.backends.base import AnymailRecipientStatus, ANYMAIL_STATUSES
from ..exceptions import AnymailRequestsAPIError from ..exceptions import AnymailRequestsAPIError
from ..message import AnymailRecipientStatus, ANYMAIL_STATUSES
from ..utils import last, combine, get_anymail_setting from ..utils import last, combine, get_anymail_setting
from .base_requests import AnymailRequestsBackend, RequestsPayload from .base_requests import AnymailRequestsBackend, RequestsPayload

83
anymail/message.py Normal file
View File

@@ -0,0 +1,83 @@
from email.mime.image import MIMEImage
from django.core.mail import EmailMessage, EmailMultiAlternatives, make_msgid
from .utils import UNSET
class AnymailMessageMixin(object):
"""Mixin for EmailMessage that exposes Anymail features.
Use of this mixin is optional. You can always just set Anymail
attributes on any EmailMessage.
(The mixin can be helpful with type checkers and other development
tools that complain about accessing Anymail's added attributes
on a regular EmailMessage.)
"""
def __init__(self, *args, **kwargs):
self.esp_extra = kwargs.pop('esp_extra', UNSET)
self.metadata = kwargs.pop('metadata', UNSET)
self.send_at = kwargs.pop('send_at', UNSET)
self.tags = kwargs.pop('tags', UNSET)
self.track_clicks = kwargs.pop('track_clicks', UNSET)
self.track_opens = kwargs.pop('track_opens', UNSET)
self.anymail_status = None
# noinspection PyArgumentList
super(AnymailMessageMixin, self).__init__(*args, **kwargs)
def attach_inline_image(self, content, subtype=None, idstring="img", domain=None):
"""Add inline image and return its content id"""
assert isinstance(self, EmailMessage)
return attach_inline_image(self, content, subtype, idstring, domain)
class AnymailMessage(AnymailMessageMixin, EmailMultiAlternatives):
pass
def attach_inline_image(message, content, subtype=None, idstring="img", domain=None):
"""Add inline image to an EmailMessage, and return its content id"""
cid = make_msgid(idstring, domain) # Content ID per RFC 2045 section 7 (with <...>)
image = MIMEImage(content, subtype)
image.add_header('Content-ID', cid)
message.attach(image)
return cid[1:-1] # Without <...>, for use as the <img> tag src
ANYMAIL_STATUSES = [
'sent', # the ESP has sent the message (though it may or may not get delivered)
'queued', # the ESP will try to send the message later
'invalid', # the recipient email was not valid
'rejected', # the recipient is blacklisted
'failed', # the attempt to send failed for some other reason
'unknown', # anything else
]
class AnymailRecipientStatus(object):
"""Information about an EmailMessage's send status for a single recipient"""
def __init__(self, message_id, status):
self.message_id = message_id # ESP message id
self.status = status # one of ANYMAIL_STATUSES, or None for not yet sent to ESP
class AnymailStatus(object):
"""Information about an EmailMessage's send status for all recipients"""
def __init__(self):
self.message_id = None # set of ESP message ids across all recipients, or bare id if only one, or None
self.status = None # set of ANYMAIL_STATUSES across all recipients, or None for not yet sent to ESP
self.recipients = {} # per-recipient: { email: AnymailRecipientStatus, ... }
self.esp_response = None
def set_recipient_status(self, recipients):
self.recipients.update(recipients)
recipient_statuses = self.recipients.values()
self.message_id = set([recipient.message_id for recipient in recipient_statuses])
if len(self.message_id) == 1:
self.message_id = self.message_id.pop() # de-set-ify if single message_id
self.status = set([recipient.status for recipient in recipient_statuses])

View File

@@ -1,3 +1,8 @@
# Exposing all TestCases at the 'tests' module level
# is required by the old (<=1.5) DjangoTestSuiteRunner.
from .test_mailgun_backend import *
from .test_mandrill_integration import * from .test_mandrill_integration import *
from .test_mandrill_send import * from .test_mandrill_send import *
from .test_mandrill_send_template import * from .test_mandrill_send_template import *

View File

@@ -47,9 +47,9 @@ class DjrillBackendMockAPITestCase(TestCase):
raise AssertionError("Mandrill API was not called") raise AssertionError("Mandrill API was not called")
(args, kwargs) = self.mock_post.call_args (args, kwargs) = self.mock_post.call_args
try: try:
post_url = kwargs.get('url', None) or args[1] post_url = kwargs.get('url', None) or args[2]
except IndexError: except IndexError:
raise AssertionError("requests.post was called without an url (?!)") raise AssertionError("requests.Session.request was called without an url (?!)")
if not post_url.endswith(endpoint): if not post_url.endswith(endpoint):
raise AssertionError( raise AssertionError(
"requests.post was not called on %s\n(It was called on %s)" "requests.post was not called on %s\n(It was called on %s)"
@@ -64,9 +64,9 @@ class DjrillBackendMockAPITestCase(TestCase):
raise AssertionError("Mandrill API was not called") raise AssertionError("Mandrill API was not called")
(args, kwargs) = self.mock_post.call_args (args, kwargs) = self.mock_post.call_args
try: try:
post_data = kwargs.get('data', None) or args[2] post_data = kwargs.get('data', None) or args[4]
except IndexError: except IndexError:
raise AssertionError("requests.post was called without data") raise AssertionError("requests.Session.request was called without data")
return json.loads(post_data) return json.loads(post_data)

View File

@@ -0,0 +1,103 @@
import json
from django.test import SimpleTestCase
import requests
import six
from mock import patch
from .utils import AnymailTestMixin
UNSET = object()
class RequestsBackendMockAPITestCase(SimpleTestCase, AnymailTestMixin):
"""TestCase that uses Djrill EmailBackend with a mocked Mandrill API"""
DEFAULT_RAW_RESPONSE = b"""{"subclass": "should override"}"""
class MockResponse(requests.Response):
"""requests.request return value mock sufficient for testing"""
def __init__(self, status_code=200, raw=b"RESPONSE", encoding='utf-8'):
super(RequestsBackendMockAPITestCase.MockResponse, self).__init__()
self.status_code = status_code
self.encoding = encoding
self.raw = six.BytesIO(raw)
def setUp(self):
super(RequestsBackendMockAPITestCase, self).setUp()
self.patch = patch('requests.Session.request', autospec=True)
self.mock_request = self.patch.start()
self.addCleanup(self.patch.stop)
self.set_mock_response()
def set_mock_response(self, status_code=200, raw=UNSET, encoding='utf-8'):
if raw is UNSET:
raw = self.DEFAULT_RAW_RESPONSE
mock_response = self.MockResponse(status_code, raw, encoding)
self.mock_request.return_value = mock_response
return mock_response
def assert_esp_called(self, url, method="POST"):
"""Verifies the (mock) ESP API was called on endpoint.
url can be partial, and is just checked against the end of the url requested"
"""
# This assumes the last (or only) call to requests.Session.request is the API call of interest.
if self.mock_request.call_args is None:
raise AssertionError("No ESP API was called")
(args, kwargs) = self.mock_request.call_args
try:
actual_method = kwargs.get('method', None) or args[1]
actual_url = kwargs.get('url', None) or args[2]
except IndexError:
raise AssertionError("API was called without a method or url (?!)")
if actual_method != method:
raise AssertionError("API was not called using %s. (%s was used instead.)"
% (method, actual_method))
if not actual_url.endswith(url):
raise AssertionError("API was not called at %s\n(It was called at %s)"
% (url, actual_url))
def get_api_call_arg(self, kwarg, pos, required=True):
"""Returns an argument passed to the mock ESP API.
Fails test if API wasn't called.
"""
if self.mock_request.call_args is None:
raise AssertionError("API was not called")
(args, kwargs) = self.mock_request.call_args
try:
return kwargs.get(kwarg, None) or args[pos]
except IndexError:
if required:
raise AssertionError("API was called without required %s" % kwarg)
else:
return None
def get_api_call_params(self, required=True):
"""Returns the query params sent to the mock ESP API."""
return self.get_api_call_arg('params', 3, required)
def get_api_call_data(self, required=True):
"""Returns the raw data sent to the mock ESP API."""
return self.get_api_call_arg('data', 4, required)
def get_api_call_json(self, required=True):
"""Returns the data sent to the mock ESP API, json-parsed"""
return json.loads(self.get_api_call_data(required))
def get_api_call_headers(self, required=True):
"""Returns the headers sent to the mock ESP API"""
return self.get_api_call_arg('headers', 5, required)
def get_api_call_files(self, required=True):
"""Returns the files sent to the mock ESP API"""
return self.get_api_call_arg('files', 7, required)
def get_api_call_auth(self, required=True):
"""Returns the auth sent to the mock ESP API"""
return self.get_api_call_arg('auth', 8, required)
def assert_esp_not_called(self, msg=None):
if self.mock_request.called:
raise AssertionError(msg or "ESP API was called and shouldn't have been")

View File

@@ -0,0 +1,508 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from datetime import date, datetime
from decimal import Decimal
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from django.core import mail
from django.core.exceptions import ImproperlyConfigured
from django.test import SimpleTestCase
from django.test.utils import override_settings
from django.utils.timezone import get_fixed_timezone, override as override_current_timezone
from anymail.exceptions import AnymailAPIError, AnymailSerializationError, AnymailUnsupportedFeature
from anymail.message import attach_inline_image
from .mock_requests_backend import RequestsBackendMockAPITestCase
from .utils import sample_image_content, sample_image_path, SAMPLE_IMAGE_FILENAME, AnymailTestMixin
@override_settings(EMAIL_BACKEND='anymail.backends.mailgun.MailgunBackend',
ANYMAIL={'MAILGUN_API_KEY': 'test_api_key'})
class MailgunBackendMockAPITestCase(RequestsBackendMockAPITestCase):
DEFAULT_RAW_RESPONSE = b"""{
"id": "<20160306015544.116301.25145@example.com>",
"message": "Queued. Thank you."
}"""
def setUp(self):
super(MailgunBackendMockAPITestCase, self).setUp()
# Simple message useful for many tests
self.message = mail.EmailMultiAlternatives('Subject', 'Text Body', 'from@example.com', ['to@example.com'])
class MailgunBackendStandardEmailTests(MailgunBackendMockAPITestCase):
"""Test backend support for Django standard email features"""
def test_send_mail(self):
"""Test basic API for simple send"""
mail.send_mail('Subject here', 'Here is the message.',
'from@example.com', ['to@example.com'], fail_silently=False)
self.assert_esp_called('/example.com/messages')
auth = self.get_api_call_auth()
self.assertEqual(auth, ('api', 'test_api_key'))
data = self.get_api_call_data()
self.assertEqual(data['subject'], "Subject here")
self.assertEqual(data['text'], "Here is the message.")
self.assertEqual(data['from'], "from@example.com")
self.assertEqual(data['to'], ["to@example.com"])
def test_name_addr(self):
"""Make sure RFC2822 name-addr format (with display-name) is allowed
(Test both sender and recipient addresses)
"""
msg = mail.EmailMessage(
'Subject', 'Message', 'From Name <from@example.com>',
['Recipient #1 <to1@example.com>', 'to2@example.com'],
cc=['Carbon Copy <cc1@example.com>', 'cc2@example.com'],
bcc=['Blind Copy <bcc1@example.com>', 'bcc2@example.com'])
msg.send()
data = self.get_api_call_data()
self.assertEqual(data['from'], "From Name <from@example.com>")
self.assertEqual(data['to'], ['Recipient #1 <to1@example.com>', 'to2@example.com'])
self.assertEqual(data['cc'], ['Carbon Copy <cc1@example.com>', 'cc2@example.com'])
self.assertEqual(data['bcc'], ['Blind Copy <bcc1@example.com>', 'bcc2@example.com'])
def test_email_message(self):
email = mail.EmailMessage(
'Subject', 'Body goes here', 'from@example.com',
['to1@example.com', 'Also To <to2@example.com>'],
bcc=['bcc1@example.com', 'Also BCC <bcc2@example.com>'],
cc=['cc1@example.com', 'Also CC <cc2@example.com>'],
headers={'Reply-To': 'another@example.com',
'X-MyHeader': 'my value',
'Message-ID': 'mycustommsgid@example.com'})
email.send()
data = self.get_api_call_data()
self.assertEqual(data['subject'], "Subject")
self.assertEqual(data['text'], "Body goes here")
self.assertEqual(data['from'], "from@example.com")
self.assertEqual(data['to'], ['to1@example.com', 'Also To <to2@example.com>'])
self.assertEqual(data['bcc'], ['bcc1@example.com', 'Also BCC <bcc2@example.com>'])
self.assertEqual(data['cc'], ['cc1@example.com', 'Also CC <cc2@example.com>'])
self.assertEqual(data['h:Reply-To'], "another@example.com")
self.assertEqual(data['h:X-MyHeader'], 'my value')
self.assertEqual(data['h:Message-ID'], 'mycustommsgid@example.com')
def test_html_message(self):
text_content = 'This is an important message.'
html_content = '<p>This is an <strong>important</strong> message.</p>'
email = mail.EmailMultiAlternatives('Subject', text_content,
'from@example.com', ['to@example.com'])
email.attach_alternative(html_content, "text/html")
email.send()
data = self.get_api_call_data()
self.assertEqual(data['text'], text_content)
self.assertEqual(data['html'], html_content)
# Don't accidentally send the html part as an attachment:
files = self.get_api_call_files(required=False)
self.assertIsNone(files)
def test_html_only_message(self):
html_content = '<p>This is an <strong>important</strong> message.</p>'
email = mail.EmailMessage('Subject', html_content, 'from@example.com', ['to@example.com'])
email.content_subtype = "html" # Main content is now text/html
email.send()
data = self.get_api_call_data()
self.assertNotIn('text', data)
self.assertEqual(data['html'], html_content)
def test_reply_to(self):
# reply_to is new in Django 1.8 -- before that, you can simply include it in headers
try:
# noinspection PyArgumentList
email = mail.EmailMessage('Subject', 'Body goes here', 'from@example.com', ['to1@example.com'],
reply_to=['reply@example.com', 'Other <reply2@example.com>'],
headers={'X-Other': 'Keep'})
except TypeError:
# Pre-Django 1.8
return self.skipTest("Django version doesn't support EmailMessage(reply_to)")
email.send()
data = self.get_api_call_data()
self.assertEqual(data['h:Reply-To'], 'reply@example.com, Other <reply2@example.com>')
self.assertEqual(data['h:X-Other'], 'Keep') # don't lose other headers
def test_attachments(self):
text_content = "* Item one\n* Item two\n* Item three"
self.message.attach(filename="test.txt", content=text_content, mimetype="text/plain")
# Should guess mimetype if not provided...
png_content = b"PNG\xb4 pretend this is the contents of a png file"
self.message.attach(filename="test.png", content=png_content)
# Should work with a MIMEBase object (also tests no filename)...
pdf_content = b"PDF\xb4 pretend this is valid pdf data"
mimeattachment = MIMEBase('application', 'pdf')
mimeattachment.set_payload(pdf_content)
self.message.attach(mimeattachment)
self.message.send()
files = self.get_api_call_files()
attachments = [value for (field, value) in files if field == 'attachment']
self.assertEqual(len(attachments), 3)
self.assertEqual(attachments[0], ('test.txt', text_content, 'text/plain'))
self.assertEqual(attachments[1], ('test.png', png_content, 'image/png')) # type inferred from filename
self.assertEqual(attachments[2], (None, pdf_content, 'application/pdf')) # no filename
# Make sure the image attachment is not treated as embedded:
inlines = [value for (field, value) in files if field == 'inline']
self.assertEqual(len(inlines), 0)
def test_unicode_attachment_correctly_decoded(self):
# Slight modification from the Django unicode docs:
# http://django.readthedocs.org/en/latest/ref/unicode.html#email
self.message.attach("Une pièce jointe.html", '<p>\u2019</p>', mimetype='text/html')
self.message.send()
files = self.get_api_call_files()
attachments = [value for (field, value) in files if field == 'attachment']
self.assertEqual(len(attachments), 1)
def test_embedded_images(self):
image_data = sample_image_content() # Read from a png file
cid = attach_inline_image(self.message, image_data)
html_content = '<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % cid
self.message.attach_alternative(html_content, "text/html")
self.message.send()
data = self.get_api_call_data()
self.assertEqual(data['html'], html_content)
files = self.get_api_call_files()
inlines = [value for (field, value) in files if field == 'inline']
self.assertEqual(len(inlines), 1)
self.assertEqual(inlines[0], (cid, image_data, "image/png")) # filename is cid; type is guessed
# Make sure neither the html nor the inline image is treated as an attachment:
attachments = [value for (field, value) in files if field == 'attachment']
self.assertEqual(len(attachments), 0)
def test_attached_images(self):
image_filename = SAMPLE_IMAGE_FILENAME
image_path = sample_image_path(image_filename)
image_data = sample_image_content(image_filename)
self.message.attach_file(image_path) # option 1: attach as a file
image = MIMEImage(image_data) # option 2: construct the MIMEImage and attach it directly
self.message.attach(image)
self.message.send()
files = self.get_api_call_files()
attachments = [value for (field, value) in files if field == 'attachment']
self.assertEqual(len(attachments), 2)
self.assertEqual(attachments[0], (image_filename, image_data, 'image/png'))
self.assertEqual(attachments[1], (None, image_data, 'image/png')) # name unknown -- not attached as file
# Make sure the image attachments are not treated as inline:
inlines = [value for (field, value) in files if field == 'inline']
self.assertEqual(len(inlines), 0)
def test_multiple_html_alternatives(self):
# Multiple alternatives not allowed
self.message.attach_alternative("<p>First html is OK</p>", "text/html")
self.message.attach_alternative("<p>But not second html</p>", "text/html")
with self.assertRaises(AnymailUnsupportedFeature):
self.message.send()
def test_html_alternative(self):
# Only html alternatives allowed
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
with self.assertRaises(AnymailUnsupportedFeature):
self.message.send()
def test_alternatives_fail_silently(self):
# Make sure fail_silently is respected
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
sent = self.message.send(fail_silently=True)
self.assert_esp_not_called("API should not be called when send fails silently")
self.assertEqual(sent, 0)
def test_suppress_empty_address_lists(self):
"""Empty to, cc, bcc, and reply_to shouldn't generate empty headers"""
self.message.send()
data = self.get_api_call_data()
self.assertNotIn('cc', data)
self.assertNotIn('bcc', data)
self.assertNotIn('h:Reply-To', data)
# Test empty `to` -- but send requires at least one recipient somewhere (like cc)
self.message.to = []
self.message.cc = ['cc@example.com']
self.message.send()
data = self.get_api_call_data()
self.assertNotIn('to', data)
def test_api_failure(self):
self.set_mock_response(status_code=400)
with self.assertRaises(AnymailAPIError):
sent = mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'])
self.assertEqual(sent, 0)
# Make sure fail_silently is respected
self.set_mock_response(status_code=400)
sent = mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'], fail_silently=True)
self.assertEqual(sent, 0)
def test_api_error_includes_details(self):
"""AnymailAPIError should include ESP's error message"""
# JSON error response:
error_response = b"""{"message": "Helpful explanation from your ESP"}"""
self.set_mock_response(status_code=400, raw=error_response)
with self.assertRaisesMessage(AnymailAPIError, "Helpful explanation from your ESP"):
self.message.send()
# Non-JSON error response:
self.set_mock_response(status_code=500, raw=b"Invalid API key")
with self.assertRaisesMessage(AnymailAPIError, "Invalid API key"):
self.message.send()
# No content in the error response:
self.set_mock_response(status_code=502, raw=None)
with self.assertRaises(AnymailAPIError):
self.message.send()
class MailgunBackendAnymailFeatureTests(MailgunBackendMockAPITestCase):
"""Test backend support for Anymail added features"""
def test_metadata(self):
self.message.metadata = {'user_id': "12345", 'items': ['mail', 'gun']}
self.message.send()
data = self.get_api_call_data()
# note values get serialized to json:
self.assertEqual(data['v:user_id'], '12345') # simple values are transmitted as-is
self.assertEqual(data['v:items'], '["mail", "gun"]') # complex values get json-serialized
def test_send_at(self):
utc_plus_6 = get_fixed_timezone(6 * 60)
utc_minus_8 = get_fixed_timezone(-8 * 60)
with override_current_timezone(utc_plus_6):
# Timezone-aware datetime converted to UTC:
self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=utc_minus_8)
self.message.send()
data = self.get_api_call_data()
self.assertEqual(data['o:deliverytime'], "Fri, 04 Mar 2016 13:06:07 GMT") # 05:06 UTC-8 == 13:06 UTC
# Timezone-naive datetime assumed to be Django current_timezone
self.message.send_at = datetime(2022, 10, 11, 12, 13, 14, 567)
self.message.send()
data = self.get_api_call_data()
self.assertEqual(data['o:deliverytime'], "Tue, 11 Oct 2022 06:13:14 GMT") # 12:13 UTC+6 == 06:13 UTC
# Date-only treated as midnight in current timezone
self.message.send_at = date(2022, 10, 22)
self.message.send()
data = self.get_api_call_data()
self.assertEqual(data['o:deliverytime'], "Fri, 21 Oct 2022 18:00:00 GMT") # 00:00 UTC+6 == 18:00-1d UTC
# POSIX timestamp
self.message.send_at = 1651820889 # 2022-05-06 07:08:09 UTC
self.message.send()
data = self.get_api_call_data()
self.assertEqual(data['o:deliverytime'], "Fri, 06 May 2022 07:08:09 GMT")
# String passed unchanged (this is *not* portable between ESPs)
self.message.send_at = "Thu, 13 Oct 2022 18:02:00 GMT"
self.message.send()
data = self.get_api_call_data()
self.assertEqual(data['o:deliverytime'], "Thu, 13 Oct 2022 18:02:00 GMT")
def test_tags(self):
self.message.tags = ["receipt", "repeat-user"]
self.message.send()
data = self.get_api_call_data()
self.assertEqual(data['o:tag'], ["receipt", "repeat-user"])
def test_tracking(self):
# Test one way...
self.message.track_opens = True
self.message.track_clicks = False
self.message.send()
data = self.get_api_call_data()
self.assertEqual(data['o:tracking-opens'], 'yes')
self.assertEqual(data['o:tracking-clicks'], 'no')
# ...and the opposite way
self.message.track_opens = False
self.message.track_clicks = True
self.message.send()
data = self.get_api_call_data()
self.assertEqual(data['o:tracking-opens'], 'no')
self.assertEqual(data['o:tracking-clicks'], 'yes')
def test_sender_domain(self):
"""Mailgun send domain can come from from_email or esp_extra"""
# You could also use ANYMAIL_SEND_DEFAULTS={'esp_extra': {'sender_domain': 'your-domain.com'}}
# (The mailgun_integration_tests do that.)
self.message.from_email = "Test From <from@from-email.example.com>"
self.message.send()
self.assert_esp_called('/from-email.example.com/messages') # API url includes the sender-domain
self.message.esp_extra = {'sender_domain': 'esp-extra.example.com'}
self.message.send()
self.assert_esp_called('/esp-extra.example.com/messages') # overrides from_email
def test_default_omits_options(self):
"""Make sure by default we don't send any ESP-specific options.
Options not specified by the caller should be omitted entirely from
the API call (*not* sent as False or empty). This ensures
that your ESP account settings apply by default.
"""
self.message.send()
self.assert_esp_called('/example.com/messages')
data = self.get_api_call_data()
mailgun_fields = {key: value for key, value in data.items()
if key.startswith('o:') or key.startswith('v:')}
self.assertEqual(mailgun_fields, {})
# noinspection PyUnresolvedReferences
def test_send_attaches_anymail_status(self):
""" The anymail_status should be attached to the message when it is sent """
response_content = b"""{
"id": "<12345.67890@example.com>",
"message": "Queued. Thank you."
}"""
self.set_mock_response(raw=response_content)
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],)
sent = msg.send()
self.assertEqual(sent, 1)
self.assertEqual(msg.anymail_status.status, {'queued'})
self.assertEqual(msg.anymail_status.message_id, '<12345.67890@example.com>')
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].status, 'queued')
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].message_id, '<12345.67890@example.com>')
self.assertEqual(msg.anymail_status.esp_response.content, response_content)
# noinspection PyUnresolvedReferences
def test_send_failed_anymail_status(self):
""" If the send fails, anymail_status should contain initial values"""
self.set_mock_response(status_code=500)
sent = self.message.send(fail_silently=True)
self.assertEqual(sent, 0)
self.assertIsNone(self.message.anymail_status.status)
self.assertIsNone(self.message.anymail_status.message_id)
self.assertEqual(self.message.anymail_status.recipients, {})
self.assertIsNone(self.message.anymail_status.esp_response)
# noinspection PyUnresolvedReferences
def test_send_unparsable_response(self):
"""If the send succeeds, but a non-JSON API response, should raise an API exception"""
mock_response = self.set_mock_response(status_code=200,
raw=b"yikes, this isn't a real response")
with self.assertRaises(AnymailAPIError):
self.message.send()
self.assertIsNone(self.message.anymail_status.status)
self.assertIsNone(self.message.anymail_status.message_id)
self.assertEqual(self.message.anymail_status.recipients, {})
self.assertEqual(self.message.anymail_status.esp_response, mock_response)
def test_json_serialization_errors(self):
"""Try to provide more information about non-json-serializable data"""
self.message.metadata = {'total': Decimal('19.99')}
with self.assertRaises(AnymailSerializationError) as cm:
self.message.send()
print(self.get_api_call_data())
err = cm.exception
self.assertIsInstance(err, TypeError) # compatibility with json.dumps
self.assertIn("Don't know how to send this data to Mailgun", str(err)) # our added context
self.assertIn("Decimal('19.99') is not JSON serializable", str(err)) # original message
class MailgunBackendRecipientsRefusedTests(MailgunBackendMockAPITestCase):
"""Should raise AnymailRecipientsRefused when *all* recipients are rejected or invalid"""
# Mailgun doesn't check email bounce or complaint lists at time of send --
# it always just queues the message. You'll need to listen for the "rejected"
# and "failed" events to detect refused recipients.
# The one exception is a completely invalid email, which will return a 400 response
# and show up as an AnymailAPIError at send time.
INVALID_TO_RESPONSE = b"""{
"message": "'to' parameter is not a valid address. please check documentation"
}"""
def test_invalid_email(self):
self.set_mock_response(status_code=400, raw=self.INVALID_TO_RESPONSE)
msg = mail.EmailMessage('Subject', 'Body', 'from@example.com', to=['not a valid email'])
with self.assertRaises(AnymailAPIError):
msg.send()
def test_fail_silently(self):
self.set_mock_response(status_code=400, raw=self.INVALID_TO_RESPONSE)
sent = mail.send_mail('Subject', 'Body', 'from@example.com', ['not a valid email'],
fail_silently=True)
self.assertEqual(sent, 0)
@override_settings(ANYMAIL_SEND_DEFAULTS={
'metadata': {'global': 'globalvalue', 'other': 'othervalue'},
'tags': ['globaltag'],
'track_clicks': True,
'track_opens': True,
'esp_extra': {'o:globaloption': 'globalsetting'},
})
class MailgunBackendSendDefaultsTests(MailgunBackendMockAPITestCase):
"""Tests backend support for global SEND_DEFAULTS"""
def test_send_defaults(self):
"""Test that global send defaults are applied"""
self.message.send()
data = self.get_api_call_data()
# All these values came from ANYMAIL_SEND_DEFAULTS:
self.assertEqual(data['v:global'], 'globalvalue')
self.assertEqual(data['v:other'], 'othervalue')
self.assertEqual(data['o:tag'], ['globaltag'])
self.assertEqual(data['o:tracking-clicks'], 'yes')
self.assertEqual(data['o:tracking-opens'], 'yes')
self.assertEqual(data['o:globaloption'], 'globalsetting')
def test_merge_message_with_send_defaults(self):
"""Test that individual message settings are *merged into* the global send defaults"""
self.message.metadata = {'message': 'messagevalue', 'other': 'override'}
self.message.tags = ['messagetag']
self.message.track_clicks = False
self.message.esp_extra = {'o:messageoption': 'messagesetting'}
self.message.send()
data = self.get_api_call_data()
# All these values came from ANYMAIL_SEND_DEFAULTS + message.*:
self.assertEqual(data['v:global'], 'globalvalue')
self.assertEqual(data['v:message'], 'messagevalue') # additional metadata
self.assertEqual(data['v:other'], 'override') # override global value
self.assertEqual(data['o:tag'], ['globaltag', 'messagetag']) # tags concatenated
self.assertEqual(data['o:tracking-clicks'], 'no') # message overrides
self.assertEqual(data['o:tracking-opens'], 'yes')
self.assertEqual(data['o:globaloption'], 'globalsetting')
self.assertEqual(data['o:messageoption'], 'messagesetting') # additional esp_extra
@override_settings(ANYMAIL_MAILGUN_SEND_DEFAULTS={
'tags': ['esptag'],
'metadata': {'esp': 'espvalue'},
'track_opens': False,
})
def test_esp_send_defaults(self):
"""Test that ESP-specific send defaults override individual global defaults"""
self.message.send()
data = self.get_api_call_data()
# All these values came from ANYMAIL_SEND_DEFAULTS plus ANYMAIL_MAILGUN_SEND_DEFAULTS:
self.assertNotIn('v:global', data) # entire metadata overridden
self.assertEqual(data['v:esp'], 'espvalue')
self.assertEqual(data['o:tag'], ['esptag']) # entire tags overridden
self.assertEqual(data['o:tracking-clicks'], 'yes') # we didn't override the global track_clicks
self.assertEqual(data['o:tracking-opens'], 'no')
self.assertEqual(data['o:globaloption'], 'globalsetting') # we didn't override the global esp_extra
@override_settings(EMAIL_BACKEND="anymail.backends.mailgun.MailgunBackend")
class MailgunBackendImproperlyConfiguredTests(SimpleTestCase, AnymailTestMixin):
"""Test ESP backend without required settings in place"""
def test_missing_api_key(self):
with self.assertRaises(ImproperlyConfigured) as cm:
mail.send_mail('Subject', 'Message', 'from@example.com', ['to@example.com'])
errmsg = str(cm.exception)
# Make sure the error mentions MAILGUN_API_KEY and ANYMAIL_MAILGUN_API_KEY
self.assertRegex(errmsg, r'\bMAILGUN_API_KEY\b')
self.assertRegex(errmsg, r'\bANYMAIL_MAILGUN_API_KEY\b')

View File

@@ -0,0 +1,165 @@
from __future__ import unicode_literals
import os
import unittest
from datetime import datetime, timedelta
from time import mktime, sleep
import requests
from django.test import SimpleTestCase
from django.test.utils import override_settings
from anymail.exceptions import AnymailAPIError
from anymail.message import AnymailMessage
from .utils import sample_image_content, AnymailTestMixin
MAILGUN_TEST_API_KEY = os.getenv('MAILGUN_TEST_API_KEY')
MAILGUN_TEST_DOMAIN = os.getenv('MAILGUN_TEST_DOMAIN')
@unittest.skipUnless(MAILGUN_TEST_API_KEY and MAILGUN_TEST_DOMAIN,
"Set MAILGUN_TEST_API_KEY and MAILGUN_TEST_DOMAIN environment variables "
"to run Mailgun integration tests")
@override_settings(ANYMAIL_MAILGUN_API_KEY=MAILGUN_TEST_API_KEY,
ANYMAIL_MAILGUN_SEND_DEFAULTS={
'esp_extra': {'o:testmode': 'yes',
'sender_domain': MAILGUN_TEST_DOMAIN}},
EMAIL_BACKEND="anymail.backends.mailgun.MailgunBackend")
class MailgunBackendIntegrationTests(SimpleTestCase, AnymailTestMixin):
"""Mailgun API integration tests
These tests run against the **live** Mailgun API, using the
environment variable `MAILGUN_TEST_API_KEY` as the API key
and `MAILGUN_TEST_DOMAIN` as the sender domain.
If those variables are not set, these tests won't run.
"""
def setUp(self):
super(MailgunBackendIntegrationTests, self).setUp()
self.message = AnymailMessage('Anymail integration test', 'Text content',
'from@example.com', ['to@example.com'])
self.message.attach_alternative('<p>HTML content</p>', "text/html")
def fetch_mailgun_events(self, message_id, event=None,
initial_delay=2, retry_delay=1, max_retries=5):
"""Return list of Mailgun events related to message_id"""
url = "https://api.mailgun.net/v3/%s/events" % MAILGUN_TEST_DOMAIN
auth = ("api", MAILGUN_TEST_API_KEY)
# Despite the docs, Mailgun's events API actually expects the message-id
# without the <...> brackets (so, not exactly "as returned by the messages API")
# https://documentation.mailgun.com/api-events.html#filter-field
params = {'message-id': message_id[1:-1]} # strip <...>
if event is not None:
params['event'] = event
# It can take a few seconds for the events to show up
# in Mailgun's logs, so retry a few times if necessary:
sleep(initial_delay)
for retry in range(max_retries):
if retry > 0:
sleep(retry_delay)
response = requests.get(url, auth=auth, params=params)
response.raise_for_status()
items = response.json()["items"]
if len(items) > 0:
return items
return None
def test_simple_send(self):
# Example of getting the Mailgun send status and message id from the message
sent_count = self.message.send()
self.assertEqual(sent_count, 1)
anymail_status = self.message.anymail_status
sent_status = anymail_status.recipients['to@example.com'].status
message_id = anymail_status.recipients['to@example.com'].message_id
self.assertEqual(sent_status, 'queued') # Mailgun always queues
self.assertGreater(len(message_id), 0) # don't know what it'll be, but it should exist
self.assertEqual(anymail_status.status, {sent_status}) # set of all recipient statuses
self.assertEqual(anymail_status.message_id, message_id)
def test_all_options(self):
send_at = datetime.now().replace(microsecond=0) + timedelta(minutes=2)
send_at_timestamp = mktime(send_at.timetuple()) # python3: send_at.timestamp()
message = AnymailMessage(
subject="Anymail all-options integration test",
body="This is the text body",
from_email="Test From <from@example.com>",
to=["to1@example.com", "Recipient 2 <to2@example.com>"],
cc=["cc1@example.com", "Copy 2 <cc2@example.com>"],
bcc=["bcc1@example.com", "Blind Copy 2 <bcc2@example.com>"],
reply_to=["reply1@example.com", "Reply 2 <reply2@example.com>"],
headers={"X-Anymail-Test": "value"},
metadata={"meta1": "simple string", "meta2": 2, "meta3": {"complex": "value"}},
send_at=send_at,
tags=["tag 1", "tag 2"],
track_clicks=False,
track_opens=True,
)
message.attach("attachment1.txt", "Here is some\ntext for you", "text/plain")
message.attach("attachment2.csv", "ID,Name\n1,3", "text/csv")
cid = message.attach_inline_image(sample_image_content(), domain=MAILGUN_TEST_DOMAIN)
message.attach_alternative(
"<div>This is the <i>html</i> body <img src='cid:%s'></div>" % cid,
"text/html")
message.send()
self.assertEqual(message.anymail_status.status, {'queued'}) # Mailgun always queues
message_id = message.anymail_status.message_id
events = self.fetch_mailgun_events(message_id, event="accepted")
if events is None:
self.skipTest("No Mailgun 'accepted' event after 30sec -- can't complete this test")
return
event = events.pop()
self.assertCountEqual(event["tags"], ["tag 1", "tag 2"]) # don't care about order
self.assertEqual(event["user-variables"],
{"meta1": "simple string",
"meta2": "2", # numbers become strings
"meta3": '{"complex": "value"}'}) # complex values become json
self.assertEqual(event["message"]["scheduled-for"], send_at_timestamp)
self.assertCountEqual(event["message"]["recipients"],
['to1@example.com', 'to2@example.com', 'cc1@example.com', 'cc2@example.com',
'bcc1@example.com', 'bcc2@example.com']) # don't care about order
headers = event["message"]["headers"]
self.assertEqual(headers["from"], "Test From <from@example.com>")
self.assertEqual(headers["to"], "to1@example.com, Recipient 2 <to2@example.com>")
self.assertEqual(headers["subject"], "Anymail all-options integration test")
attachments = event["message"]["attachments"]
self.assertEqual(len(attachments), 2) # because inline image shouldn't be an attachment
self.assertEqual(attachments[0]["filename"], "attachment1.txt")
self.assertEqual(attachments[0]["content-type"], "text/plain")
self.assertEqual(attachments[1]["filename"], "attachment2.csv")
self.assertEqual(attachments[1]["content-type"], "text/csv")
# No other fields are verifiable from the event data.
# (We could try fetching the message from event["storage"]["url"]
# to verify content and other headers.)
def test_invalid_from(self):
self.message.from_email = 'webmaster'
with self.assertRaises(AnymailAPIError) as cm:
self.message.send()
err = cm.exception
self.assertEqual(err.status_code, 400)
self.assertIn("'from' parameter is not a valid address", str(err))
@override_settings(ANYMAIL_MAILGUN_API_KEY="Hey, that's not an API key!")
def test_invalid_api_key(self):
with self.assertRaises(AnymailAPIError) as cm:
self.message.send()
err = cm.exception
self.assertEqual(err.status_code, 401)
# Mailgun doesn't offer any additional explanation in its response body
# self.assertIn("Forbidden", str(err))

View File

@@ -4,7 +4,6 @@ from __future__ import unicode_literals
import json import json
import os import os
import re
import six import six
import unittest import unittest
from base64 import b64decode from base64 import b64decode
@@ -15,13 +14,13 @@ from email.mime.image import MIMEImage
from django.core import mail from django.core import mail
from django.core.exceptions import ImproperlyConfigured from django.core.exceptions import ImproperlyConfigured
from django.core.mail import make_msgid
from django.test import TestCase from django.test import TestCase
from django.test.utils import override_settings from django.test.utils import override_settings
from django.utils.timezone import get_fixed_timezone, override as override_current_timezone from django.utils.timezone import get_fixed_timezone, override as override_current_timezone
from anymail.exceptions import (AnymailAPIError, AnymailRecipientsRefused, from anymail.exceptions import (AnymailAPIError, AnymailRecipientsRefused,
AnymailSerializationError, AnymailUnsupportedFeature) AnymailSerializationError, AnymailUnsupportedFeature)
from anymail.message import attach_inline_image
from .mock_backend import DjrillBackendMockAPITestCase from .mock_backend import DjrillBackendMockAPITestCase
@@ -224,18 +223,14 @@ class DjrillBackendTests(DjrillBackendMockAPITestCase):
self.assertEqual(len(attachments), 1) self.assertEqual(len(attachments), 1)
def test_embedded_images(self): def test_embedded_images(self):
image_data = self.sample_image_content() # Read from a png file
image_cid = make_msgid("img") # Content ID per RFC 2045 section 7 (with <...>)
image_cid_no_brackets = image_cid[1:-1] # Without <...>, for use as the <img> tag src
text_content = 'This has an inline image.' text_content = 'This has an inline image.'
html_content = '<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % image_cid_no_brackets
email = mail.EmailMultiAlternatives('Subject', text_content, 'from@example.com', ['to@example.com']) email = mail.EmailMultiAlternatives('Subject', text_content, 'from@example.com', ['to@example.com'])
email.attach_alternative(html_content, "text/html")
image = MIMEImage(image_data) image_data = self.sample_image_content() # Read from a png file
image.add_header('Content-ID', image_cid) cid = attach_inline_image(email, image_data)
email.attach(image)
html_content = '<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % cid
email.attach_alternative(html_content, "text/html")
email.send() email.send()
data = self.get_api_call_data() data = self.get_api_call_data()
@@ -243,7 +238,7 @@ class DjrillBackendTests(DjrillBackendMockAPITestCase):
self.assertEqual(data['message']['html'], html_content) self.assertEqual(data['message']['html'], html_content)
self.assertEqual(len(data['message']['images']), 1) self.assertEqual(len(data['message']['images']), 1)
self.assertEqual(data['message']['images'][0]["type"], "image/png") self.assertEqual(data['message']['images'][0]["type"], "image/png")
self.assertEqual(data['message']['images'][0]["name"], image_cid) self.assertEqual(data['message']['images'][0]["name"], cid)
self.assertEqual(decode_att(data['message']['images'][0]["content"]), image_data) self.assertEqual(decode_att(data['message']['images'][0]["content"]), image_data)
# Make sure neither the html nor the inline image is treated as an attachment: # Make sure neither the html nor the inline image is treated as an attachment:
self.assertFalse('attachments' in data['message']) self.assertFalse('attachments' in data['message'])
@@ -342,9 +337,6 @@ class DjrillMandrillFeatureTests(DjrillBackendMockAPITestCase):
self.message = mail.EmailMessage('Subject', 'Text Body', self.message = mail.EmailMessage('Subject', 'Text Body',
'from@example.com', ['to@example.com']) 'from@example.com', ['to@example.com'])
def assertStrContains(self, haystack, needle, msg=None):
six.assertRegex(self, haystack, re.escape(needle), msg)
def test_tracking(self): def test_tracking(self):
# First make sure we're not setting the API param if the track_click # First make sure we're not setting the API param if the track_click
# attr isn't there. (The Mandrill account option of True for html, # attr isn't there. (The Mandrill account option of True for html,
@@ -570,8 +562,8 @@ class DjrillMandrillFeatureTests(DjrillBackendMockAPITestCase):
self.message.send() self.message.send()
err = cm.exception err = cm.exception
self.assertTrue(isinstance(err, TypeError)) # Djrill 1.x re-raised TypeError from json.dumps self.assertTrue(isinstance(err, TypeError)) # Djrill 1.x re-raised TypeError from json.dumps
self.assertStrContains(str(err), "Don't know how to send this data to Mandrill") # our added context self.assertIn("Don't know how to send this data to Mandrill", str(err)) # our added context
self.assertStrContains(str(err), "Decimal('19.99') is not JSON serializable") # original message self.assertIn("Decimal('19.99') is not JSON serializable", str(err)) # original message
def test_dates_not_serialized(self): def test_dates_not_serialized(self):
"""Pre-2.0 Djrill accidentally serialized dates to ISO""" """Pre-2.0 Djrill accidentally serialized dates to ISO"""

47
anymail/tests/utils.py Normal file
View File

@@ -0,0 +1,47 @@
# Anymail test utils
import os
import unittest
from base64 import b64decode
import six
def decode_att(att):
"""Returns the original data from base64-encoded attachment content"""
return b64decode(att.encode('ascii'))
SAMPLE_IMAGE_FILENAME = "sample_image.png"
def sample_image_path(filename=SAMPLE_IMAGE_FILENAME):
"""Returns path to an actual image file in the tests directory"""
test_dir = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(test_dir, filename)
return path
def sample_image_content(filename=SAMPLE_IMAGE_FILENAME):
"""Returns contents of an actual image file from the tests directory"""
filename = sample_image_path(filename)
with open(filename, "rb") as f:
return f.read()
class AnymailTestMixin:
"""Helpful additional methods for Anymail tests"""
pass
# Plus these methods added below:
# assertCountEqual
# assertRaisesRegex
# assertRegex
# Add the Python 3 TestCase assertions, if they're not already there.
# (The six implementations cause infinite recursion if installed on
# a py3 TestCase.)
for method in ('assertCountEqual', 'assertRaisesRegex', 'assertRegex'):
try:
getattr(unittest.TestCase, method)
except AttributeError:
setattr(AnymailTestMixin, method, getattr(six, method))

View File

@@ -1,7 +1,8 @@
import mimetypes import mimetypes
from base64 import b64encode from base64 import b64encode
from calendar import timegm
from email.mime.base import MIMEBase from email.mime.base import MIMEBase
from email.utils import parseaddr from email.utils import parseaddr, formatdate
import six import six
from django.conf import settings from django.conf import settings
@@ -41,7 +42,7 @@ def combine(*args):
try: try:
result.update(value) # shallow merge if dict-like result.update(value) # shallow merge if dict-like
except AttributeError: except AttributeError:
result += value # concatenate if sequence-like result = result + value # concatenate if sequence-like
return result return result
@@ -97,10 +98,11 @@ class Attachment(object):
"""A normalized EmailMessage.attachments item with additional functionality """A normalized EmailMessage.attachments item with additional functionality
Normalized to have these properties: Normalized to have these properties:
name: attachment filename; may be empty string; will be Content-ID for inline attachments name: attachment filename; may be empty string; will be Content-ID (without <>) for inline attachments
content content
mimetype: the content type; guessed if not explicit mimetype: the content type; guessed if not explicit
inline: bool, True if attachment has a Content-ID header inline: bool, True if attachment has a Content-ID header
content_id: for inline, the Content-ID (with <>)
""" """
def __init__(self, attachment, encoding): def __init__(self, attachment, encoding):
@@ -109,6 +111,7 @@ class Attachment(object):
self._attachment = attachment self._attachment = attachment
self.encoding = encoding # should we be checking attachment["Content-Encoding"] ??? self.encoding = encoding # should we be checking attachment["Content-Encoding"] ???
self.inline = False self.inline = False
self.content_id = None
if isinstance(attachment, MIMEBase): if isinstance(attachment, MIMEBase):
self.name = attachment.get_filename() self.name = attachment.get_filename()
@@ -117,7 +120,8 @@ class Attachment(object):
# Treat image attachments that have content ids as inline: # Treat image attachments that have content ids as inline:
if attachment.get_content_maintype() == "image" and attachment["Content-ID"] is not None: if attachment.get_content_maintype() == "image" and attachment["Content-ID"] is not None:
self.inline = True self.inline = True
self.name = attachment["Content-ID"] self.content_id = attachment["Content-ID"] # including the <...>
self.name = self.content_id[1:-1] # without the <, >
else: else:
(self.name, self.content, self.mimetype) = attachment (self.name, self.content, self.mimetype) = attachment
@@ -175,3 +179,11 @@ def get_anymail_setting(setting, default=UNSET, allow_bare=False):
raise ImproperlyConfigured(message) raise ImproperlyConfigured(message)
else: else:
return default return default
def rfc2822date(dt):
"""Turn an aware datetime into a date string as specified in RFC 2822."""
# This is the equivalent of Python 3.3's email.utils.format_datetime
assert dt.tzinfo is not None # only aware datetimes allowed
timeval = timegm(dt.utctimetuple())
return formatdate(timeval, usegmt=True)

View File

@@ -31,10 +31,14 @@ setup(
license="BSD License", license="BSD License",
packages=["anymail"], packages=["anymail"],
zip_safe=False, zip_safe=False,
install_requires=["requests>=1.0.0", "django>=1.4"], install_requires=["django>=1.8", "six"],
extras_require={
"mailgun": ["requests>=2.4.3"],
"mandrill": ["requests>=1.0.0"],
},
include_package_data=True, include_package_data=True,
test_suite="runtests.runtests", test_suite="runtests.runtests",
tests_require=["mock", "six"], tests_require=["mock"],
classifiers=[ classifiers=[
"Development Status :: 2 - Pre-Alpha", "Development Status :: 2 - Pre-Alpha",
"Programming Language :: Python", "Programming Language :: Python",