mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-22 12:51:06 -05:00
Fork from Djrill and rename to "anymail"
This commit is contained in:
3
anymail/__init__.py
Normal file
3
anymail/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from ._version import __version__, VERSION
|
||||
from .exceptions import (MandrillAPIError, MandrillRecipientsRefused,
|
||||
NotSerializableForMandrillError, NotSupportedByMandrillError)
|
||||
3
anymail/_version.py
Normal file
3
anymail/_version.py
Normal file
@@ -0,0 +1,3 @@
|
||||
VERSION = (0, 1, 'dev0') # Remove the 'dev' component in release branches
|
||||
__version__ = '.'.join([str(x) for x in VERSION]) # major.minor.patch or major.minor.devN
|
||||
__minor_version__ = '.'.join([str(x) for x in VERSION[:2]]) # Sphinx's X.Y "version"
|
||||
0
anymail/backends/__init__.py
Normal file
0
anymail/backends/__init__.py
Normal file
485
anymail/backends/mandrill.py
Normal file
485
anymail/backends/mandrill.py
Normal file
@@ -0,0 +1,485 @@
|
||||
import json
|
||||
import mimetypes
|
||||
import requests
|
||||
from base64 import b64encode
|
||||
from datetime import date, datetime
|
||||
from email.mime.base import MIMEBase
|
||||
from email.utils import parseaddr
|
||||
try:
|
||||
from urlparse import urljoin # python 2
|
||||
except ImportError:
|
||||
from urllib.parse import urljoin # python 3
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.core.mail.backends.base import BaseEmailBackend
|
||||
from django.core.mail.message import sanitize_address, DEFAULT_ATTACHMENT_MIME_TYPE
|
||||
|
||||
from .._version import __version__
|
||||
from ..exceptions import (DjrillError, MandrillAPIError, MandrillRecipientsRefused,
|
||||
NotSerializableForMandrillError, NotSupportedByMandrillError)
|
||||
|
||||
|
||||
class MandrillBackend(BaseEmailBackend):
|
||||
"""
|
||||
Mandrill API Email Backend
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Init options from Django settings"""
|
||||
super(MandrillBackend, self).__init__(**kwargs)
|
||||
|
||||
try:
|
||||
self.api_key = settings.MANDRILL_API_KEY
|
||||
except AttributeError:
|
||||
raise ImproperlyConfigured("Set MANDRILL_API_KEY in settings.py to use Djrill")
|
||||
|
||||
self.api_url = getattr(settings, "MANDRILL_API_URL", "https://mandrillapp.com/api/1.0")
|
||||
if not self.api_url.endswith("/"):
|
||||
self.api_url += "/"
|
||||
|
||||
self.global_settings = {}
|
||||
try:
|
||||
self.global_settings.update(settings.MANDRILL_SETTINGS)
|
||||
except AttributeError:
|
||||
pass # no MANDRILL_SETTINGS setting
|
||||
except (TypeError, ValueError): # e.g., not enumerable
|
||||
raise ImproperlyConfigured("MANDRILL_SETTINGS must be a dict or mapping")
|
||||
|
||||
try:
|
||||
self.global_settings["subaccount"] = settings.MANDRILL_SUBACCOUNT
|
||||
except AttributeError:
|
||||
pass # no MANDRILL_SUBACCOUNT setting
|
||||
|
||||
self.ignore_recipient_status = getattr(settings, "MANDRILL_IGNORE_RECIPIENT_STATUS", False)
|
||||
self.session = None
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
Ensure we have a requests Session to connect to the Mandrill API.
|
||||
Returns True if a new session was created (and the caller must close it).
|
||||
"""
|
||||
if self.session:
|
||||
return False # already exists
|
||||
|
||||
try:
|
||||
self.session = requests.Session()
|
||||
except requests.RequestException:
|
||||
if not self.fail_silently:
|
||||
raise
|
||||
else:
|
||||
self.session.headers["User-Agent"] = "Djrill/%s %s" % (
|
||||
__version__, self.session.headers.get("User-Agent", ""))
|
||||
return True
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Close the Mandrill API Session unconditionally.
|
||||
|
||||
(You should call this only if you called open and it returned True;
|
||||
else someone else created the session and will clean it up themselves.)
|
||||
"""
|
||||
if self.session is None:
|
||||
return
|
||||
try:
|
||||
self.session.close()
|
||||
except requests.RequestException:
|
||||
if not self.fail_silently:
|
||||
raise
|
||||
finally:
|
||||
self.session = None
|
||||
|
||||
def send_messages(self, email_messages):
|
||||
"""
|
||||
Sends one or more EmailMessage objects and returns the number of email
|
||||
messages sent.
|
||||
"""
|
||||
if not email_messages:
|
||||
return 0
|
||||
|
||||
created_session = self.open()
|
||||
if not self.session:
|
||||
return 0 # exception in self.open with fail_silently
|
||||
|
||||
num_sent = 0
|
||||
try:
|
||||
for message in email_messages:
|
||||
sent = self._send(message)
|
||||
if sent:
|
||||
num_sent += 1
|
||||
finally:
|
||||
if created_session:
|
||||
self.close()
|
||||
|
||||
return num_sent
|
||||
|
||||
def _send(self, message):
|
||||
message.mandrill_response = None # until we have a response
|
||||
if not message.recipients():
|
||||
return False
|
||||
|
||||
try:
|
||||
payload = self.get_base_payload()
|
||||
self.build_send_payload(payload, message)
|
||||
response = self.post_to_mandrill(payload, message)
|
||||
|
||||
# add the response from mandrill to the EmailMessage so callers can inspect it
|
||||
message.mandrill_response = self.parse_response(response, payload, message)
|
||||
self.validate_response(message.mandrill_response, response, payload, message)
|
||||
|
||||
except DjrillError:
|
||||
# every *expected* error is derived from DjrillError;
|
||||
# we deliberately don't silence unexpected errors
|
||||
if not self.fail_silently:
|
||||
raise
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def get_base_payload(self):
|
||||
"""Return non-message-dependent payload for Mandrill send call
|
||||
|
||||
(The return value will be modified for the send, so must be a copy
|
||||
of any shared state.)
|
||||
"""
|
||||
payload = {
|
||||
"key": self.api_key,
|
||||
}
|
||||
return payload
|
||||
|
||||
def build_send_payload(self, payload, message):
|
||||
"""Modify payload to add all message-specific options for Mandrill send call.
|
||||
|
||||
payload is a dict that will become the Mandrill send data
|
||||
message is an EmailMessage, possibly with additional Mandrill-specific attrs
|
||||
|
||||
Can raise NotSupportedByMandrillError for unsupported options in message.
|
||||
"""
|
||||
msg_dict = self._build_standard_message_dict(message)
|
||||
self._add_mandrill_options(message, msg_dict)
|
||||
if getattr(message, 'alternatives', None):
|
||||
self._add_alternatives(message, msg_dict)
|
||||
self._add_attachments(message, msg_dict)
|
||||
payload.setdefault('message', {}).update(msg_dict)
|
||||
if hasattr(message, 'template_name'):
|
||||
payload['template_name'] = message.template_name
|
||||
payload['template_content'] = \
|
||||
self._expand_merge_vars(getattr(message, 'template_content', {}))
|
||||
self._add_mandrill_toplevel_options(message, payload)
|
||||
|
||||
def get_api_url(self, payload, message):
|
||||
"""Return the correct Mandrill API url for sending payload
|
||||
|
||||
Override this to substitute your own logic for determining API endpoint.
|
||||
"""
|
||||
if 'template_name' in payload:
|
||||
api_method = "messages/send-template.json"
|
||||
else:
|
||||
api_method = "messages/send.json"
|
||||
return urljoin(self.api_url, api_method)
|
||||
|
||||
def serialize_payload(self, payload, message):
|
||||
"""Return payload serialized to a json str.
|
||||
|
||||
Override this to substitute your own JSON serializer (e.g., to handle dates)
|
||||
"""
|
||||
return json.dumps(payload)
|
||||
|
||||
def post_to_mandrill(self, payload, message):
|
||||
"""Post payload to correct Mandrill send API endpoint, and return the response.
|
||||
|
||||
payload is a dict to use as Mandrill send data
|
||||
message is the original EmailMessage
|
||||
return should be a requests.Response
|
||||
|
||||
Can raise NotSerializableForMandrillError if payload is not serializable
|
||||
Can raise MandrillAPIError for HTTP errors in the post
|
||||
"""
|
||||
api_url = self.get_api_url(payload, message)
|
||||
try:
|
||||
json_payload = self.serialize_payload(payload, message)
|
||||
except TypeError as err:
|
||||
# Add some context to the "not JSON serializable" message
|
||||
raise NotSerializableForMandrillError(
|
||||
orig_err=err, email_message=message, payload=payload)
|
||||
|
||||
response = self.session.post(api_url, data=json_payload)
|
||||
if response.status_code != 200:
|
||||
raise MandrillAPIError(email_message=message, payload=payload, response=response)
|
||||
return response
|
||||
|
||||
def parse_response(self, response, payload, message):
|
||||
"""Return parsed json from Mandrill API response
|
||||
|
||||
Can raise MandrillAPIError if response is not valid JSON
|
||||
"""
|
||||
try:
|
||||
return response.json()
|
||||
except ValueError:
|
||||
raise MandrillAPIError("Invalid JSON in Mandrill API response",
|
||||
email_message=message, payload=payload, response=response)
|
||||
|
||||
def validate_response(self, parsed_response, response, payload, message):
|
||||
"""Validate parsed_response, raising exceptions for any problems.
|
||||
|
||||
Extend this to provide your own validation checks.
|
||||
Validation exceptions should inherit from anymail.exceptions.DjrillException
|
||||
for proper fail_silently behavior.
|
||||
|
||||
The base version here checks for invalid or refused recipients.
|
||||
"""
|
||||
if self.ignore_recipient_status:
|
||||
return
|
||||
try:
|
||||
recipient_status = [item["status"] for item in parsed_response]
|
||||
except (KeyError, TypeError):
|
||||
raise MandrillAPIError("Invalid Mandrill API response format",
|
||||
email_message=message, payload=payload, response=response)
|
||||
# Error if *all* recipients are invalid or refused
|
||||
# (This behavior parallels smtplib.SMTPRecipientsRefused from Django's SMTP EmailBackend)
|
||||
if all([status in ('invalid', 'rejected') for status in recipient_status]):
|
||||
raise MandrillRecipientsRefused(email_message=message, payload=payload, response=response)
|
||||
|
||||
#
|
||||
# Payload construction
|
||||
#
|
||||
|
||||
def _build_standard_message_dict(self, message):
|
||||
"""Create a Mandrill send message struct from a Django EmailMessage.
|
||||
|
||||
Builds the standard dict that Django's send_mail and send_mass_mail
|
||||
use by default. Standard text email messages sent through Django will
|
||||
still work through Mandrill.
|
||||
|
||||
Raises NotSupportedByMandrillError for any standard EmailMessage
|
||||
features that cannot be accurately communicated to Mandrill.
|
||||
"""
|
||||
sender = sanitize_address(message.from_email, message.encoding)
|
||||
from_name, from_email = parseaddr(sender)
|
||||
|
||||
to_list = self._make_mandrill_to_list(message, message.to, "to")
|
||||
to_list += self._make_mandrill_to_list(message, message.cc, "cc")
|
||||
to_list += self._make_mandrill_to_list(message, message.bcc, "bcc")
|
||||
|
||||
content = "html" if message.content_subtype == "html" else "text"
|
||||
msg_dict = {
|
||||
content: message.body,
|
||||
"to": to_list
|
||||
}
|
||||
|
||||
if not getattr(message, 'use_template_from', False):
|
||||
msg_dict["from_email"] = from_email
|
||||
if from_name:
|
||||
msg_dict["from_name"] = from_name
|
||||
|
||||
if not getattr(message, 'use_template_subject', False):
|
||||
msg_dict["subject"] = message.subject
|
||||
|
||||
if hasattr(message, 'reply_to'):
|
||||
reply_to = [sanitize_address(addr, message.encoding) for addr in message.reply_to]
|
||||
msg_dict["headers"] = {'Reply-To': ', '.join(reply_to)}
|
||||
# Note: An explicit Reply-To header will override the reply_to attr below
|
||||
# (matching Django's own behavior)
|
||||
|
||||
if message.extra_headers:
|
||||
msg_dict["headers"] = msg_dict.get("headers", {})
|
||||
msg_dict["headers"].update(message.extra_headers)
|
||||
|
||||
return msg_dict
|
||||
|
||||
def _add_mandrill_toplevel_options(self, message, api_params):
|
||||
"""Extend api_params to include Mandrill global-send options set on message"""
|
||||
# Mandrill attributes that can be copied directly:
|
||||
mandrill_attrs = [
|
||||
'async', 'ip_pool'
|
||||
]
|
||||
for attr in mandrill_attrs:
|
||||
if attr in self.global_settings:
|
||||
api_params[attr] = self.global_settings[attr]
|
||||
if hasattr(message, attr):
|
||||
api_params[attr] = getattr(message, attr)
|
||||
|
||||
# Mandrill attributes that require conversion:
|
||||
if hasattr(message, 'send_at'):
|
||||
api_params['send_at'] = self.encode_date_for_mandrill(message.send_at)
|
||||
# setting send_at in global_settings wouldn't make much sense
|
||||
|
||||
def _make_mandrill_to_list(self, message, recipients, recipient_type="to"):
|
||||
"""Create a Mandrill 'to' field from a list of emails.
|
||||
|
||||
Parses "Real Name <address@example.com>" format emails.
|
||||
Sanitizes all email addresses.
|
||||
"""
|
||||
parsed_rcpts = [parseaddr(sanitize_address(addr, message.encoding))
|
||||
for addr in recipients]
|
||||
return [{"email": to_email, "name": to_name, "type": recipient_type}
|
||||
for (to_name, to_email) in parsed_rcpts]
|
||||
|
||||
def _add_mandrill_options(self, message, msg_dict):
|
||||
"""Extend msg_dict to include Mandrill per-message options set on message"""
|
||||
# Mandrill attributes that can be copied directly:
|
||||
mandrill_attrs = [
|
||||
'from_name', # overrides display name parsed from from_email above
|
||||
'important',
|
||||
'track_opens', 'track_clicks', 'auto_text', 'auto_html',
|
||||
'inline_css', 'url_strip_qs',
|
||||
'tracking_domain', 'signing_domain', 'return_path_domain',
|
||||
'merge_language',
|
||||
'tags', 'preserve_recipients', 'view_content_link', 'subaccount',
|
||||
'google_analytics_domains', 'google_analytics_campaign',
|
||||
'metadata']
|
||||
|
||||
for attr in mandrill_attrs:
|
||||
if attr in self.global_settings:
|
||||
msg_dict[attr] = self.global_settings[attr]
|
||||
if hasattr(message, attr):
|
||||
msg_dict[attr] = getattr(message, attr)
|
||||
|
||||
# Allow simple python dicts in place of Mandrill
|
||||
# [{name:name, value:value},...] arrays...
|
||||
|
||||
# Merge global and per message global_merge_vars
|
||||
# (in conflicts, per-message vars win)
|
||||
global_merge_vars = {}
|
||||
if 'global_merge_vars' in self.global_settings:
|
||||
global_merge_vars.update(self.global_settings['global_merge_vars'])
|
||||
if hasattr(message, 'global_merge_vars'):
|
||||
global_merge_vars.update(message.global_merge_vars)
|
||||
if global_merge_vars:
|
||||
msg_dict['global_merge_vars'] = \
|
||||
self._expand_merge_vars(global_merge_vars)
|
||||
|
||||
if hasattr(message, 'merge_vars'):
|
||||
# For testing reproducibility, we sort the recipients
|
||||
msg_dict['merge_vars'] = [
|
||||
{ 'rcpt': rcpt,
|
||||
'vars': self._expand_merge_vars(message.merge_vars[rcpt]) }
|
||||
for rcpt in sorted(message.merge_vars.keys())
|
||||
]
|
||||
if hasattr(message, 'recipient_metadata'):
|
||||
# For testing reproducibility, we sort the recipients
|
||||
msg_dict['recipient_metadata'] = [
|
||||
{ 'rcpt': rcpt, 'values': message.recipient_metadata[rcpt] }
|
||||
for rcpt in sorted(message.recipient_metadata.keys())
|
||||
]
|
||||
|
||||
def _expand_merge_vars(self, vardict):
|
||||
"""Convert a Python dict to an array of name-content used by Mandrill.
|
||||
|
||||
{ name: value, ... } --> [ {'name': name, 'content': value }, ... ]
|
||||
"""
|
||||
# For testing reproducibility, we sort the keys
|
||||
return [{'name': name, 'content': vardict[name]}
|
||||
for name in sorted(vardict.keys())]
|
||||
|
||||
def _add_alternatives(self, message, msg_dict):
|
||||
"""
|
||||
There can be only one! ... alternative attachment, and it must be text/html.
|
||||
|
||||
Since mandrill does not accept image attachments or anything other
|
||||
than HTML, the assumption is the only thing you are attaching is
|
||||
the HTML output for your email.
|
||||
"""
|
||||
if len(message.alternatives) > 1:
|
||||
raise NotSupportedByMandrillError(
|
||||
"Too many alternatives attached to the message. "
|
||||
"Mandrill only accepts plain text and html emails.",
|
||||
email_message=message)
|
||||
|
||||
(content, mimetype) = message.alternatives[0]
|
||||
if mimetype != 'text/html':
|
||||
raise NotSupportedByMandrillError(
|
||||
"Invalid alternative mimetype '%s'. "
|
||||
"Mandrill only accepts plain text and html emails."
|
||||
% mimetype,
|
||||
email_message=message)
|
||||
|
||||
msg_dict['html'] = content
|
||||
|
||||
def _add_attachments(self, message, msg_dict):
|
||||
"""Extend msg_dict to include any attachments in message"""
|
||||
if message.attachments:
|
||||
str_encoding = message.encoding or settings.DEFAULT_CHARSET
|
||||
mandrill_attachments = []
|
||||
mandrill_embedded_images = []
|
||||
for attachment in message.attachments:
|
||||
att_dict, is_embedded = self._make_mandrill_attachment(attachment, str_encoding)
|
||||
if is_embedded:
|
||||
mandrill_embedded_images.append(att_dict)
|
||||
else:
|
||||
mandrill_attachments.append(att_dict)
|
||||
if len(mandrill_attachments) > 0:
|
||||
msg_dict['attachments'] = mandrill_attachments
|
||||
if len(mandrill_embedded_images) > 0:
|
||||
msg_dict['images'] = mandrill_embedded_images
|
||||
|
||||
def _make_mandrill_attachment(self, attachment, str_encoding=None):
|
||||
"""Returns EmailMessage.attachments item formatted for sending with Mandrill.
|
||||
|
||||
Returns mandrill_dict, is_embedded_image:
|
||||
mandrill_dict: {"type":..., "name":..., "content":...}
|
||||
is_embedded_image: True if the attachment should instead be handled as an inline image.
|
||||
|
||||
"""
|
||||
# Note that an attachment can be either a tuple of (filename, content,
|
||||
# mimetype) or a MIMEBase object. (Also, both filename and mimetype may
|
||||
# be missing.)
|
||||
is_embedded_image = False
|
||||
if isinstance(attachment, MIMEBase):
|
||||
name = attachment.get_filename()
|
||||
content = attachment.get_payload(decode=True)
|
||||
mimetype = attachment.get_content_type()
|
||||
# Treat image attachments that have content ids as embedded:
|
||||
if attachment.get_content_maintype() == "image" and attachment["Content-ID"] is not None:
|
||||
is_embedded_image = True
|
||||
name = attachment["Content-ID"]
|
||||
else:
|
||||
(name, content, mimetype) = attachment
|
||||
|
||||
# Guess missing mimetype from filename, borrowed from
|
||||
# django.core.mail.EmailMessage._create_attachment()
|
||||
if mimetype is None and name is not None:
|
||||
mimetype, _ = mimetypes.guess_type(name)
|
||||
if mimetype is None:
|
||||
mimetype = DEFAULT_ATTACHMENT_MIME_TYPE
|
||||
|
||||
# b64encode requires bytes, so let's convert our content.
|
||||
try:
|
||||
# noinspection PyUnresolvedReferences
|
||||
if isinstance(content, unicode):
|
||||
# Python 2.X unicode string
|
||||
content = content.encode(str_encoding)
|
||||
except NameError:
|
||||
# Python 3 doesn't differentiate between strings and unicode
|
||||
# Convert python3 unicode str to bytes attachment:
|
||||
if isinstance(content, str):
|
||||
content = content.encode(str_encoding)
|
||||
|
||||
content_b64 = b64encode(content)
|
||||
|
||||
mandrill_attachment = {
|
||||
'type': mimetype,
|
||||
'name': name or "",
|
||||
'content': content_b64.decode('ascii'),
|
||||
}
|
||||
return mandrill_attachment, is_embedded_image
|
||||
|
||||
@classmethod
|
||||
def encode_date_for_mandrill(cls, dt):
|
||||
"""Format a date or datetime for use as a Mandrill API date field
|
||||
|
||||
datetime becomes "YYYY-MM-DD HH:MM:SS"
|
||||
converted to UTC, if timezone-aware
|
||||
microseconds removed
|
||||
date becomes "YYYY-MM-DD 00:00:00"
|
||||
anything else gets returned intact
|
||||
"""
|
||||
if isinstance(dt, datetime):
|
||||
dt = dt.replace(microsecond=0)
|
||||
if dt.utcoffset() is not None:
|
||||
dt = (dt - dt.utcoffset()).replace(tzinfo=None)
|
||||
return dt.isoformat(' ')
|
||||
elif isinstance(dt, date):
|
||||
return dt.isoformat() + ' 00:00:00'
|
||||
else:
|
||||
return dt
|
||||
11
anymail/compat.py
Normal file
11
anymail/compat.py
Normal file
@@ -0,0 +1,11 @@
|
||||
# For python 3 compatibility, see http://python3porting.com/problems.html#nicer-solutions
|
||||
import sys
|
||||
|
||||
if sys.version < '3':
|
||||
def b(x):
|
||||
return x
|
||||
else:
|
||||
import codecs
|
||||
|
||||
def b(x):
|
||||
return codecs.latin_1_encode(x)[0]
|
||||
117
anymail/exceptions.py
Normal file
117
anymail/exceptions.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import json
|
||||
from requests import HTTPError
|
||||
|
||||
|
||||
class DjrillError(Exception):
|
||||
"""Base class for exceptions raised by Djrill
|
||||
|
||||
Overrides __str__ to provide additional information about
|
||||
Mandrill API call and response.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""
|
||||
Optional kwargs:
|
||||
email_message: the original EmailMessage being sent
|
||||
payload: data arg (*not* json-stringified) for the Mandrill send call
|
||||
response: requests.Response from the send call
|
||||
"""
|
||||
self.email_message = kwargs.pop('email_message', None)
|
||||
self.payload = kwargs.pop('payload', None)
|
||||
if isinstance(self, HTTPError):
|
||||
# must leave response in kwargs for HTTPError
|
||||
self.response = kwargs.get('response', None)
|
||||
else:
|
||||
self.response = kwargs.pop('response', None)
|
||||
super(DjrillError, self).__init__(*args, **kwargs)
|
||||
|
||||
def __str__(self):
|
||||
parts = [
|
||||
" ".join([str(arg) for arg in self.args]),
|
||||
self.describe_send(),
|
||||
self.describe_response(),
|
||||
]
|
||||
return "\n".join(filter(None, parts))
|
||||
|
||||
def describe_send(self):
|
||||
"""Return a string describing the Mandrill send in self.payload, or None"""
|
||||
if self.payload is None:
|
||||
return None
|
||||
description = "Sending a message"
|
||||
try:
|
||||
to_emails = [to['email'] for to in self.payload['message']['to']]
|
||||
description += " to %s" % ','.join(to_emails)
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
description += " from %s" % self.payload['message']['from_email']
|
||||
except KeyError:
|
||||
pass
|
||||
return description
|
||||
|
||||
def describe_response(self):
|
||||
"""Return a formatted string of self.response, or None"""
|
||||
if self.response is None:
|
||||
return None
|
||||
description = "Mandrill API response %d:" % self.response.status_code
|
||||
try:
|
||||
json_response = self.response.json()
|
||||
description += "\n" + json.dumps(json_response, indent=2)
|
||||
except (AttributeError, KeyError, ValueError): # not JSON = ValueError
|
||||
try:
|
||||
description += " " + self.response.text
|
||||
except AttributeError:
|
||||
pass
|
||||
return description
|
||||
|
||||
|
||||
class MandrillAPIError(DjrillError, HTTPError):
|
||||
"""Exception for unsuccessful response from Mandrill API."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(MandrillAPIError, self).__init__(*args, **kwargs)
|
||||
if self.response is not None:
|
||||
self.status_code = self.response.status_code
|
||||
|
||||
|
||||
class MandrillRecipientsRefused(DjrillError):
|
||||
"""Exception for send where all recipients are invalid or rejected."""
|
||||
|
||||
def __init__(self, message=None, *args, **kwargs):
|
||||
if message is None:
|
||||
message = "All message recipients were rejected or invalid"
|
||||
super(MandrillRecipientsRefused, self).__init__(message, *args, **kwargs)
|
||||
|
||||
|
||||
class NotSupportedByMandrillError(DjrillError, ValueError):
|
||||
"""Exception for email features that Mandrill doesn't support.
|
||||
|
||||
This is typically raised when attempting to send a Django EmailMessage that
|
||||
uses options or values you might expect to work, but that are silently
|
||||
ignored by or can't be communicated to Mandrill's API. (E.g., non-HTML
|
||||
alternative parts.)
|
||||
|
||||
It's generally *not* raised for Mandrill-specific features, like limitations
|
||||
on Mandrill tag names or restrictions on from emails. (Djrill expects
|
||||
Mandrill to return an API error for these where appropriate, and tries to
|
||||
avoid duplicating Mandrill's validation logic locally.)
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class NotSerializableForMandrillError(DjrillError, TypeError):
|
||||
"""Exception for data that Djrill doesn't know how to convert to JSON.
|
||||
|
||||
This typically results from including something like a date or Decimal
|
||||
in your merge_vars (or other Mandrill-specific EmailMessage option).
|
||||
|
||||
"""
|
||||
# inherits from TypeError for backwards compatibility with Djrill 1.x
|
||||
|
||||
def __init__(self, message=None, orig_err=None, *args, **kwargs):
|
||||
if message is None:
|
||||
message = "Don't know how to send this data to Mandrill. " \
|
||||
"Try converting it to a string or number first."
|
||||
if orig_err is not None:
|
||||
message += "\n%s" % str(orig_err)
|
||||
super(NotSerializableForMandrillError, self).__init__(message, *args, **kwargs)
|
||||
3
anymail/signals.py
Normal file
3
anymail/signals.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from django.dispatch import Signal
|
||||
|
||||
webhook_event = Signal(providing_args=['event_type', 'data'])
|
||||
6
anymail/tests/__init__.py
Normal file
6
anymail/tests/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from .test_mandrill_integration import *
|
||||
from .test_mandrill_send import *
|
||||
from .test_mandrill_send_template import *
|
||||
from .test_mandrill_session_sharing import *
|
||||
from .test_mandrill_subaccounts import *
|
||||
from .test_mandrill_webhook import *
|
||||
72
anymail/tests/mock_backend.py
Normal file
72
anymail/tests/mock_backend.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import json
|
||||
import requests
|
||||
import six
|
||||
from mock import patch
|
||||
|
||||
from django.test import TestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
|
||||
MANDRILL_SUCCESS_RESPONSE = b"""[{
|
||||
"email": "to@example.com",
|
||||
"status": "sent",
|
||||
"_id": "abc123",
|
||||
"reject_reason": null
|
||||
}]"""
|
||||
|
||||
|
||||
@override_settings(MANDRILL_API_KEY="FAKE_API_KEY_FOR_TESTING",
|
||||
EMAIL_BACKEND="anymail.backends.mandrill.MandrillBackend")
|
||||
class DjrillBackendMockAPITestCase(TestCase):
|
||||
"""TestCase that uses Djrill EmailBackend with a mocked Mandrill API"""
|
||||
|
||||
class MockResponse(requests.Response):
|
||||
"""requests.post return value mock sufficient for MandrillBackend"""
|
||||
def __init__(self, status_code=200, raw=MANDRILL_SUCCESS_RESPONSE, encoding='utf-8'):
|
||||
super(DjrillBackendMockAPITestCase.MockResponse, self).__init__()
|
||||
self.status_code = status_code
|
||||
self.encoding = encoding
|
||||
self.raw = six.BytesIO(raw)
|
||||
|
||||
def setUp(self):
|
||||
self.patch = patch('requests.Session.post', autospec=True)
|
||||
self.mock_post = self.patch.start()
|
||||
self.mock_post.return_value = self.MockResponse()
|
||||
|
||||
def tearDown(self):
|
||||
self.patch.stop()
|
||||
|
||||
def assert_mandrill_called(self, endpoint):
|
||||
"""Verifies the (mock) Mandrill API was called on endpoint.
|
||||
|
||||
endpoint is a Mandrill API, e.g., "/messages/send.json"
|
||||
"""
|
||||
# This assumes the last (or only) call to requests.post is the
|
||||
# Mandrill API call of interest.
|
||||
if self.mock_post.call_args is None:
|
||||
raise AssertionError("Mandrill API was not called")
|
||||
(args, kwargs) = self.mock_post.call_args
|
||||
try:
|
||||
post_url = kwargs.get('url', None) or args[1]
|
||||
except IndexError:
|
||||
raise AssertionError("requests.post was called without an url (?!)")
|
||||
if not post_url.endswith(endpoint):
|
||||
raise AssertionError(
|
||||
"requests.post was not called on %s\n(It was called on %s)"
|
||||
% (endpoint, post_url))
|
||||
|
||||
def get_api_call_data(self):
|
||||
"""Returns the data posted to the Mandrill API.
|
||||
|
||||
Fails test if API wasn't called.
|
||||
"""
|
||||
if self.mock_post.call_args is None:
|
||||
raise AssertionError("Mandrill API was not called")
|
||||
(args, kwargs) = self.mock_post.call_args
|
||||
try:
|
||||
post_data = kwargs.get('data', None) or args[2]
|
||||
except IndexError:
|
||||
raise AssertionError("requests.post was called without data")
|
||||
return json.loads(post_data)
|
||||
|
||||
|
||||
BIN
anymail/tests/sample_image.png
Normal file
BIN
anymail/tests/sample_image.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 579 B |
105
anymail/tests/test_mandrill_integration.py
Normal file
105
anymail/tests/test_mandrill_integration.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from django.core import mail
|
||||
from django.test import TestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from anymail import MandrillAPIError, MandrillRecipientsRefused
|
||||
|
||||
|
||||
MANDRILL_TEST_API_KEY = os.getenv('MANDRILL_TEST_API_KEY')
|
||||
|
||||
|
||||
@unittest.skipUnless(MANDRILL_TEST_API_KEY,
|
||||
"Set MANDRILL_TEST_API_KEY environment variable to run integration tests")
|
||||
@override_settings(MANDRILL_API_KEY=MANDRILL_TEST_API_KEY,
|
||||
EMAIL_BACKEND="anymail.backends.mandrill.MandrillBackend")
|
||||
class DjrillIntegrationTests(TestCase):
|
||||
"""Mandrill API integration tests
|
||||
|
||||
These tests run against the **live** Mandrill API, using the
|
||||
environment variable `MANDRILL_TEST_API_KEY` as the API key.
|
||||
If that variable is not set, these tests won't run.
|
||||
|
||||
See https://mandrill.zendesk.com/hc/en-us/articles/205582447
|
||||
for info on Mandrill test keys.
|
||||
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.message = mail.EmailMultiAlternatives(
|
||||
'Subject', 'Text content', 'from@example.com', ['to@example.com'])
|
||||
self.message.attach_alternative('<p>HTML content</p>', "text/html")
|
||||
|
||||
def test_send_mail(self):
|
||||
# Example of getting the Mandrill send status and _id from the message
|
||||
sent_count = self.message.send()
|
||||
self.assertEqual(sent_count, 1)
|
||||
# noinspection PyUnresolvedReferences
|
||||
response = self.message.mandrill_response
|
||||
self.assertIn(response[0]['status'], ['sent', 'queued']) # successful send (could still bounce later)
|
||||
self.assertEqual(response[0]['email'], 'to@example.com')
|
||||
self.assertGreater(len(response[0]['_id']), 0)
|
||||
|
||||
def test_invalid_from(self):
|
||||
# Example of trying to send from an invalid address
|
||||
# Mandrill returns a 500 response (which raises a MandrillAPIError)
|
||||
self.message.from_email = 'webmaster@localhost' # Django default DEFAULT_FROM_EMAIL
|
||||
try:
|
||||
self.message.send()
|
||||
self.fail("This line will not be reached, because send() raised an exception")
|
||||
except MandrillAPIError as err:
|
||||
self.assertEqual(err.status_code, 500)
|
||||
self.assertIn("email address is invalid", str(err))
|
||||
|
||||
def test_invalid_to(self):
|
||||
# Example of detecting when a recipient is not a valid email address
|
||||
self.message.to = ['invalid@localhost']
|
||||
try:
|
||||
self.message.send()
|
||||
except MandrillRecipientsRefused:
|
||||
# Mandrill refused to deliver the mail -- message.mandrill_response will tell you why:
|
||||
# noinspection PyUnresolvedReferences
|
||||
response = self.message.mandrill_response
|
||||
self.assertEqual(response[0]['status'], 'invalid')
|
||||
else:
|
||||
# Sometimes Mandrill queues these test sends
|
||||
# noinspection PyUnresolvedReferences
|
||||
response = self.message.mandrill_response
|
||||
if response[0]['status'] == 'queued':
|
||||
self.skipTest("Mandrill queued the send -- can't complete this test")
|
||||
else:
|
||||
self.fail("Djrill did not raise MandrillRecipientsRefused for invalid recipient")
|
||||
|
||||
def test_rejected_to(self):
|
||||
# Example of detecting when a recipient is on Mandrill's rejection blacklist
|
||||
self.message.to = ['reject@test.mandrillapp.com']
|
||||
try:
|
||||
self.message.send()
|
||||
except MandrillRecipientsRefused:
|
||||
# Mandrill refused to deliver the mail -- message.mandrill_response will tell you why:
|
||||
# noinspection PyUnresolvedReferences
|
||||
response = self.message.mandrill_response
|
||||
self.assertEqual(response[0]['status'], 'rejected')
|
||||
self.assertEqual(response[0]['reject_reason'], 'test')
|
||||
else:
|
||||
# Sometimes Mandrill queues these test sends
|
||||
# noinspection PyUnresolvedReferences
|
||||
response = self.message.mandrill_response
|
||||
if response[0]['status'] == 'queued':
|
||||
self.skipTest("Mandrill queued the send -- can't complete this test")
|
||||
else:
|
||||
self.fail("Djrill did not raise MandrillRecipientsRefused for blacklist recipient")
|
||||
|
||||
@override_settings(MANDRILL_API_KEY="Hey, that's not an API key!")
|
||||
def test_invalid_api_key(self):
|
||||
# Example of trying to send with an invalid MANDRILL_API_KEY
|
||||
try:
|
||||
self.message.send()
|
||||
self.fail("This line will not be reached, because send() raised an exception")
|
||||
except MandrillAPIError as err:
|
||||
self.assertEqual(err.status_code, 500)
|
||||
self.assertIn("Invalid API key", str(err))
|
||||
752
anymail/tests/test_mandrill_send.py
Normal file
752
anymail/tests/test_mandrill_send.py
Normal file
@@ -0,0 +1,752 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import six
|
||||
import unittest
|
||||
from base64 import b64decode
|
||||
from datetime import date, datetime, timedelta, tzinfo
|
||||
from decimal import Decimal
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.image import MIMEImage
|
||||
|
||||
from django.core import mail
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.core.mail import make_msgid
|
||||
from django.test import TestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from anymail import (MandrillAPIError, MandrillRecipientsRefused,
|
||||
NotSerializableForMandrillError, NotSupportedByMandrillError)
|
||||
|
||||
from .mock_backend import DjrillBackendMockAPITestCase
|
||||
|
||||
|
||||
def decode_att(att):
|
||||
"""Returns the original data from base64-encoded attachment content"""
|
||||
return b64decode(att.encode('ascii'))
|
||||
|
||||
|
||||
class DjrillBackendTests(DjrillBackendMockAPITestCase):
|
||||
"""Test Djrill backend support for Django mail wrappers"""
|
||||
|
||||
sample_image_filename = "sample_image.png"
|
||||
|
||||
def sample_image_pathname(self):
|
||||
"""Returns path to an actual image file in the tests directory"""
|
||||
test_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
path = os.path.join(test_dir, self.sample_image_filename)
|
||||
return path
|
||||
|
||||
def sample_image_content(self):
|
||||
"""Returns contents of an actual image file from the tests directory"""
|
||||
filename = self.sample_image_pathname()
|
||||
with open(filename, "rb") as f:
|
||||
return f.read()
|
||||
|
||||
def test_send_mail(self):
|
||||
mail.send_mail('Subject here', 'Here is the message.',
|
||||
'from@example.com', ['to@example.com'], fail_silently=False)
|
||||
self.assert_mandrill_called("/messages/send.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['subject'], "Subject here")
|
||||
self.assertEqual(data['message']['text'], "Here is the message.")
|
||||
self.assertFalse('from_name' in data['message'])
|
||||
self.assertEqual(data['message']['from_email'], "from@example.com")
|
||||
self.assertEqual(len(data['message']['to']), 1)
|
||||
self.assertEqual(data['message']['to'][0]['email'], "to@example.com")
|
||||
|
||||
def test_name_addr(self):
|
||||
"""Make sure RFC2822 name-addr format (with display-name) is allowed
|
||||
|
||||
(Test both sender and recipient addresses)
|
||||
"""
|
||||
msg = mail.EmailMessage('Subject', 'Message',
|
||||
'From Name <from@example.com>',
|
||||
['Recipient #1 <to1@example.com>', 'to2@example.com'],
|
||||
cc=['Carbon Copy <cc1@example.com>', 'cc2@example.com'],
|
||||
bcc=['Blind Copy <bcc1@example.com>', 'bcc2@example.com'])
|
||||
msg.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['from_name'], "From Name")
|
||||
self.assertEqual(data['message']['from_email'], "from@example.com")
|
||||
self.assertEqual(len(data['message']['to']), 6)
|
||||
self.assertEqual(data['message']['to'][0]['name'], "Recipient #1")
|
||||
self.assertEqual(data['message']['to'][0]['email'], "to1@example.com")
|
||||
self.assertEqual(data['message']['to'][1]['name'], "")
|
||||
self.assertEqual(data['message']['to'][1]['email'], "to2@example.com")
|
||||
self.assertEqual(data['message']['to'][2]['name'], "Carbon Copy")
|
||||
self.assertEqual(data['message']['to'][2]['email'], "cc1@example.com")
|
||||
self.assertEqual(data['message']['to'][3]['name'], "")
|
||||
self.assertEqual(data['message']['to'][3]['email'], "cc2@example.com")
|
||||
self.assertEqual(data['message']['to'][4]['name'], "Blind Copy")
|
||||
self.assertEqual(data['message']['to'][4]['email'], "bcc1@example.com")
|
||||
self.assertEqual(data['message']['to'][5]['name'], "")
|
||||
self.assertEqual(data['message']['to'][5]['email'], "bcc2@example.com")
|
||||
|
||||
def test_email_message(self):
|
||||
email = mail.EmailMessage('Subject', 'Body goes here',
|
||||
'from@example.com',
|
||||
['to1@example.com', 'Also To <to2@example.com>'],
|
||||
bcc=['bcc1@example.com', 'Also BCC <bcc2@example.com>'],
|
||||
cc=['cc1@example.com', 'Also CC <cc2@example.com>'],
|
||||
headers={'Reply-To': 'another@example.com',
|
||||
'X-MyHeader': 'my value',
|
||||
'Message-ID': 'mycustommsgid@example.com'})
|
||||
email.send()
|
||||
self.assert_mandrill_called("/messages/send.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['subject'], "Subject")
|
||||
self.assertEqual(data['message']['text'], "Body goes here")
|
||||
self.assertEqual(data['message']['from_email'], "from@example.com")
|
||||
self.assertEqual(data['message']['headers'],
|
||||
{'Reply-To': 'another@example.com',
|
||||
'X-MyHeader': 'my value',
|
||||
'Message-ID': 'mycustommsgid@example.com'})
|
||||
# Verify recipients correctly identified as "to", "cc", or "bcc"
|
||||
self.assertEqual(len(data['message']['to']), 6)
|
||||
self.assertEqual(data['message']['to'][0]['email'], "to1@example.com")
|
||||
self.assertEqual(data['message']['to'][0]['type'], "to")
|
||||
self.assertEqual(data['message']['to'][1]['email'], "to2@example.com")
|
||||
self.assertEqual(data['message']['to'][1]['type'], "to")
|
||||
self.assertEqual(data['message']['to'][2]['email'], "cc1@example.com")
|
||||
self.assertEqual(data['message']['to'][2]['type'], "cc")
|
||||
self.assertEqual(data['message']['to'][3]['email'], "cc2@example.com")
|
||||
self.assertEqual(data['message']['to'][3]['type'], "cc")
|
||||
self.assertEqual(data['message']['to'][4]['email'], "bcc1@example.com")
|
||||
self.assertEqual(data['message']['to'][4]['type'], "bcc")
|
||||
self.assertEqual(data['message']['to'][5]['email'], "bcc2@example.com")
|
||||
self.assertEqual(data['message']['to'][5]['type'], "bcc")
|
||||
# Don't use Mandrill's bcc_address "logging" feature for bcc's:
|
||||
self.assertNotIn('bcc_address', data['message'])
|
||||
|
||||
def test_html_message(self):
|
||||
text_content = 'This is an important message.'
|
||||
html_content = '<p>This is an <strong>important</strong> message.</p>'
|
||||
email = mail.EmailMultiAlternatives('Subject', text_content,
|
||||
'from@example.com', ['to@example.com'])
|
||||
email.attach_alternative(html_content, "text/html")
|
||||
email.send()
|
||||
self.assert_mandrill_called("/messages/send.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['text'], text_content)
|
||||
self.assertEqual(data['message']['html'], html_content)
|
||||
# Don't accidentally send the html part as an attachment:
|
||||
self.assertFalse('attachments' in data['message'])
|
||||
|
||||
def test_html_only_message(self):
|
||||
html_content = '<p>This is an <strong>important</strong> message.</p>'
|
||||
email = mail.EmailMessage('Subject', html_content,
|
||||
'from@example.com', ['to@example.com'])
|
||||
email.content_subtype = "html" # Main content is now text/html
|
||||
email.send()
|
||||
self.assert_mandrill_called("/messages/send.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertNotIn('text', data['message'])
|
||||
self.assertEqual(data['message']['html'], html_content)
|
||||
|
||||
def test_reply_to(self):
|
||||
# reply_to is new in Django 1.8 -- before that, you can simply include it in headers
|
||||
try:
|
||||
# noinspection PyArgumentList
|
||||
email = mail.EmailMessage('Subject', 'Body goes here', 'from@example.com', ['to1@example.com'],
|
||||
reply_to=['reply@example.com', 'Other <reply2@example.com>'],
|
||||
headers={'X-Other': 'Keep'})
|
||||
except TypeError:
|
||||
# Pre-Django 1.8
|
||||
raise unittest.SkipTest("Django version doesn't support EmailMessage(reply_to)")
|
||||
email.send()
|
||||
self.assert_mandrill_called("/messages/send.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['headers']['Reply-To'],
|
||||
'reply@example.com, Other <reply2@example.com>')
|
||||
self.assertEqual(data['message']['headers']['X-Other'], 'Keep') # don't lose other headers
|
||||
|
||||
def test_attachments(self):
|
||||
email = mail.EmailMessage('Subject', 'Body goes here', 'from@example.com', ['to1@example.com'])
|
||||
|
||||
text_content = "* Item one\n* Item two\n* Item three"
|
||||
email.attach(filename="test.txt", content=text_content, mimetype="text/plain")
|
||||
|
||||
# Should guess mimetype if not provided...
|
||||
png_content = b"PNG\xb4 pretend this is the contents of a png file"
|
||||
email.attach(filename="test.png", content=png_content)
|
||||
|
||||
# Should work with a MIMEBase object (also tests no filename)...
|
||||
pdf_content = b"PDF\xb4 pretend this is valid pdf data"
|
||||
mimeattachment = MIMEBase('application', 'pdf')
|
||||
mimeattachment.set_payload(pdf_content)
|
||||
email.attach(mimeattachment)
|
||||
|
||||
# Attachment type that wasn't supported in early Mandrill releases:
|
||||
ppt_content = b"PPT\xb4 pretend this is a valid ppt file"
|
||||
email.attach(filename="presentation.ppt", content=ppt_content,
|
||||
mimetype="application/vnd.ms-powerpoint")
|
||||
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
attachments = data['message']['attachments']
|
||||
self.assertEqual(len(attachments), 4)
|
||||
self.assertEqual(attachments[0]["type"], "text/plain")
|
||||
self.assertEqual(attachments[0]["name"], "test.txt")
|
||||
self.assertEqual(decode_att(attachments[0]["content"]).decode('ascii'), text_content)
|
||||
self.assertEqual(attachments[1]["type"], "image/png") # inferred from filename
|
||||
self.assertEqual(attachments[1]["name"], "test.png")
|
||||
self.assertEqual(decode_att(attachments[1]["content"]), png_content)
|
||||
self.assertEqual(attachments[2]["type"], "application/pdf")
|
||||
self.assertEqual(attachments[2]["name"], "") # none
|
||||
self.assertEqual(decode_att(attachments[2]["content"]), pdf_content)
|
||||
self.assertEqual(attachments[3]["type"], "application/vnd.ms-powerpoint")
|
||||
self.assertEqual(attachments[3]["name"], "presentation.ppt")
|
||||
self.assertEqual(decode_att(attachments[3]["content"]), ppt_content)
|
||||
# Make sure the image attachment is not treated as embedded:
|
||||
self.assertFalse('images' in data['message'])
|
||||
|
||||
def test_unicode_attachment_correctly_decoded(self):
|
||||
msg = mail.EmailMessage(
|
||||
subject='Subject',
|
||||
body='Body goes here',
|
||||
from_email='from@example.com',
|
||||
to=['to1@example.com'],
|
||||
)
|
||||
# Slight modification from the Django unicode docs:
|
||||
# http://django.readthedocs.org/en/latest/ref/unicode.html#email
|
||||
msg.attach("Une pièce jointe.html", '<p>\u2019</p>', mimetype='text/html')
|
||||
|
||||
msg.send()
|
||||
data = self.get_api_call_data()
|
||||
|
||||
attachments = data['message']['attachments']
|
||||
self.assertEqual(len(attachments), 1)
|
||||
|
||||
def test_embedded_images(self):
|
||||
image_data = self.sample_image_content() # Read from a png file
|
||||
image_cid = make_msgid("img") # Content ID per RFC 2045 section 7 (with <...>)
|
||||
image_cid_no_brackets = image_cid[1:-1] # Without <...>, for use as the <img> tag src
|
||||
|
||||
text_content = 'This has an inline image.'
|
||||
html_content = '<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % image_cid_no_brackets
|
||||
email = mail.EmailMultiAlternatives('Subject', text_content, 'from@example.com', ['to@example.com'])
|
||||
email.attach_alternative(html_content, "text/html")
|
||||
|
||||
image = MIMEImage(image_data)
|
||||
image.add_header('Content-ID', image_cid)
|
||||
email.attach(image)
|
||||
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['text'], text_content)
|
||||
self.assertEqual(data['message']['html'], html_content)
|
||||
self.assertEqual(len(data['message']['images']), 1)
|
||||
self.assertEqual(data['message']['images'][0]["type"], "image/png")
|
||||
self.assertEqual(data['message']['images'][0]["name"], image_cid)
|
||||
self.assertEqual(decode_att(data['message']['images'][0]["content"]), image_data)
|
||||
# Make sure neither the html nor the inline image is treated as an attachment:
|
||||
self.assertFalse('attachments' in data['message'])
|
||||
|
||||
def test_attached_images(self):
|
||||
image_data = self.sample_image_content()
|
||||
|
||||
email = mail.EmailMultiAlternatives('Subject', 'Message', 'from@example.com', ['to@example.com'])
|
||||
email.attach_file(self.sample_image_pathname()) # option 1: attach as a file
|
||||
|
||||
image = MIMEImage(image_data) # option 2: construct the MIMEImage and attach it directly
|
||||
email.attach(image)
|
||||
|
||||
email.send()
|
||||
data = self.get_api_call_data()
|
||||
attachments = data['message']['attachments']
|
||||
self.assertEqual(len(attachments), 2)
|
||||
self.assertEqual(attachments[0]["type"], "image/png")
|
||||
self.assertEqual(attachments[0]["name"], self.sample_image_filename)
|
||||
self.assertEqual(decode_att(attachments[0]["content"]), image_data)
|
||||
self.assertEqual(attachments[1]["type"], "image/png")
|
||||
self.assertEqual(attachments[1]["name"], "") # unknown -- not attached as file
|
||||
self.assertEqual(decode_att(attachments[1]["content"]), image_data)
|
||||
# Make sure the image attachments are not treated as embedded:
|
||||
self.assertFalse('images' in data['message'])
|
||||
|
||||
def test_alternative_errors(self):
|
||||
# Multiple alternatives not allowed
|
||||
email = mail.EmailMultiAlternatives('Subject', 'Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
email.attach_alternative("<p>First html is OK</p>", "text/html")
|
||||
email.attach_alternative("<p>But not second html</p>", "text/html")
|
||||
with self.assertRaises(NotSupportedByMandrillError):
|
||||
email.send()
|
||||
|
||||
# Only html alternatives allowed
|
||||
email = mail.EmailMultiAlternatives('Subject', 'Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
email.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
with self.assertRaises(NotSupportedByMandrillError):
|
||||
email.send()
|
||||
|
||||
# Make sure fail_silently is respected
|
||||
email = mail.EmailMultiAlternatives('Subject', 'Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
email.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
sent = email.send(fail_silently=True)
|
||||
self.assertFalse(self.mock_post.called,
|
||||
msg="Mandrill API should not be called when send fails silently")
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_mandrill_api_failure(self):
|
||||
self.mock_post.return_value = self.MockResponse(status_code=400)
|
||||
with self.assertRaises(MandrillAPIError):
|
||||
sent = mail.send_mail('Subject', 'Body', 'from@example.com',
|
||||
['to@example.com'])
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
# Make sure fail_silently is respected
|
||||
self.mock_post.return_value = self.MockResponse(status_code=400)
|
||||
sent = mail.send_mail('Subject', 'Body', 'from@example.com',
|
||||
['to@example.com'], fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_api_error_includes_details(self):
|
||||
"""MandrillAPIError should include Mandrill's error message"""
|
||||
msg = mail.EmailMessage('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
|
||||
# JSON error response:
|
||||
error_response = b"""{
|
||||
"status": "error",
|
||||
"code": 12,
|
||||
"name": "Error_Name",
|
||||
"message": "Helpful explanation from Mandrill"
|
||||
}"""
|
||||
self.mock_post.return_value = self.MockResponse(status_code=400, raw=error_response)
|
||||
with self.assertRaisesMessage(MandrillAPIError, "Helpful explanation from Mandrill"):
|
||||
msg.send()
|
||||
|
||||
# Non-JSON error response:
|
||||
self.mock_post.return_value = self.MockResponse(status_code=500, raw=b"Invalid API key")
|
||||
with self.assertRaisesMessage(MandrillAPIError, "Invalid API key"):
|
||||
msg.send()
|
||||
|
||||
# No content in the error response:
|
||||
self.mock_post.return_value = self.MockResponse(status_code=502, raw=None)
|
||||
with self.assertRaises(MandrillAPIError):
|
||||
msg.send()
|
||||
|
||||
|
||||
class DjrillMandrillFeatureTests(DjrillBackendMockAPITestCase):
|
||||
"""Test Djrill backend support for Mandrill-specific features"""
|
||||
|
||||
def setUp(self):
|
||||
super(DjrillMandrillFeatureTests, self).setUp()
|
||||
self.message = mail.EmailMessage('Subject', 'Text Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
|
||||
def assertStrContains(self, haystack, needle, msg=None):
|
||||
six.assertRegex(self, haystack, re.escape(needle), msg)
|
||||
|
||||
def test_tracking(self):
|
||||
# First make sure we're not setting the API param if the track_click
|
||||
# attr isn't there. (The Mandrill account option of True for html,
|
||||
# False for plaintext can't be communicated through the API, other than
|
||||
# by omitting the track_clicks API param to use your account default.)
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertFalse('track_clicks' in data['message'])
|
||||
# Now re-send with the params set
|
||||
self.message.track_opens = True
|
||||
self.message.track_clicks = True
|
||||
self.message.url_strip_qs = True
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['track_opens'], True)
|
||||
self.assertEqual(data['message']['track_clicks'], True)
|
||||
self.assertEqual(data['message']['url_strip_qs'], True)
|
||||
|
||||
def test_message_options(self):
|
||||
self.message.important = True
|
||||
self.message.auto_text = True
|
||||
self.message.auto_html = True
|
||||
self.message.inline_css = True
|
||||
self.message.preserve_recipients = True
|
||||
self.message.view_content_link = False
|
||||
self.message.tracking_domain = "click.example.com"
|
||||
self.message.signing_domain = "example.com"
|
||||
self.message.return_path_domain = "support.example.com"
|
||||
self.message.subaccount = "marketing-dept"
|
||||
self.message.async = True
|
||||
self.message.ip_pool = "Bulk Pool"
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['important'], True)
|
||||
self.assertEqual(data['message']['auto_text'], True)
|
||||
self.assertEqual(data['message']['auto_html'], True)
|
||||
self.assertEqual(data['message']['inline_css'], True)
|
||||
self.assertEqual(data['message']['preserve_recipients'], True)
|
||||
self.assertEqual(data['message']['view_content_link'], False)
|
||||
self.assertEqual(data['message']['tracking_domain'], "click.example.com")
|
||||
self.assertEqual(data['message']['signing_domain'], "example.com")
|
||||
self.assertEqual(data['message']['return_path_domain'], "support.example.com")
|
||||
self.assertEqual(data['message']['subaccount'], "marketing-dept")
|
||||
self.assertEqual(data['async'], True)
|
||||
self.assertEqual(data['ip_pool'], "Bulk Pool")
|
||||
|
||||
def test_merge(self):
|
||||
# Djrill expands simple python dicts into the more-verbose name/content
|
||||
# structures the Mandrill API uses
|
||||
self.message.merge_language = "mailchimp"
|
||||
self.message.global_merge_vars = { 'GREETING': "Hello",
|
||||
'ACCOUNT_TYPE': "Basic" }
|
||||
self.message.merge_vars = {
|
||||
"customer@example.com": { 'GREETING': "Dear Customer",
|
||||
'ACCOUNT_TYPE': "Premium" },
|
||||
"guest@example.com": { 'GREETING': "Dear Guest" },
|
||||
}
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['merge_language'], "mailchimp")
|
||||
self.assertEqual(data['message']['global_merge_vars'],
|
||||
[ {'name': 'ACCOUNT_TYPE', 'content': "Basic"},
|
||||
{'name': "GREETING", 'content': "Hello"} ])
|
||||
self.assertEqual(data['message']['merge_vars'],
|
||||
[ { 'rcpt': "customer@example.com",
|
||||
'vars': [{ 'name': 'ACCOUNT_TYPE', 'content': "Premium" },
|
||||
{ 'name': "GREETING", 'content': "Dear Customer"}] },
|
||||
{ 'rcpt': "guest@example.com",
|
||||
'vars': [{ 'name': "GREETING", 'content': "Dear Guest"}] }
|
||||
])
|
||||
|
||||
def test_tags(self):
|
||||
self.message.tags = ["receipt", "repeat-user"]
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['tags'], ["receipt", "repeat-user"])
|
||||
|
||||
def test_google_analytics(self):
|
||||
self.message.google_analytics_domains = ["example.com"]
|
||||
self.message.google_analytics_campaign = "Email Receipts"
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['google_analytics_domains'],
|
||||
["example.com"])
|
||||
self.assertEqual(data['message']['google_analytics_campaign'],
|
||||
"Email Receipts")
|
||||
|
||||
def test_metadata(self):
|
||||
self.message.metadata = { 'batch_num': "12345", 'type': "Receipts" }
|
||||
self.message.recipient_metadata = {
|
||||
# Djrill expands simple python dicts into the more-verbose
|
||||
# rcpt/values structures the Mandrill API uses
|
||||
"customer@example.com": { 'cust_id': "67890", 'order_id': "54321" },
|
||||
"guest@example.com": { 'cust_id': "94107", 'order_id': "43215" }
|
||||
}
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['metadata'], { 'batch_num': "12345",
|
||||
'type': "Receipts" })
|
||||
self.assertEqual(data['message']['recipient_metadata'],
|
||||
[ { 'rcpt': "customer@example.com",
|
||||
'values': { 'cust_id': "67890", 'order_id': "54321" } },
|
||||
{ 'rcpt': "guest@example.com",
|
||||
'values': { 'cust_id': "94107", 'order_id': "43215" } }
|
||||
])
|
||||
|
||||
def test_send_at(self):
|
||||
# String passed unchanged
|
||||
self.message.send_at = "2013-11-12 01:02:03"
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['send_at'], "2013-11-12 01:02:03")
|
||||
|
||||
# Timezone-naive datetime assumed to be UTC
|
||||
self.message.send_at = datetime(2022, 10, 11, 12, 13, 14, 567)
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['send_at'], "2022-10-11 12:13:14")
|
||||
|
||||
# Timezone-aware datetime converted to UTC:
|
||||
class GMTminus8(tzinfo):
|
||||
def utcoffset(self, dt): return timedelta(hours=-8)
|
||||
def dst(self, dt): return timedelta(0)
|
||||
|
||||
self.message.send_at = datetime(2016, 3, 4, 5, 6, 7, tzinfo=GMTminus8())
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['send_at'], "2016-03-04 13:06:07")
|
||||
|
||||
# Date-only treated as midnight UTC
|
||||
self.message.send_at = date(2022, 10, 22)
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['send_at'], "2022-10-22 00:00:00")
|
||||
|
||||
def test_default_omits_options(self):
|
||||
"""Make sure by default we don't send any Mandrill-specific options.
|
||||
|
||||
Options not specified by the caller should be omitted entirely from
|
||||
the Mandrill API call (*not* sent as False or empty). This ensures
|
||||
that your Mandrill account settings apply by default.
|
||||
"""
|
||||
self.message.send()
|
||||
self.assert_mandrill_called("/messages/send.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertFalse('from_name' in data['message'])
|
||||
self.assertFalse('bcc_address' in data['message'])
|
||||
self.assertFalse('important' in data['message'])
|
||||
self.assertFalse('track_opens' in data['message'])
|
||||
self.assertFalse('track_clicks' in data['message'])
|
||||
self.assertFalse('auto_text' in data['message'])
|
||||
self.assertFalse('auto_html' in data['message'])
|
||||
self.assertFalse('inline_css' in data['message'])
|
||||
self.assertFalse('url_strip_qs' in data['message'])
|
||||
self.assertFalse('tags' in data['message'])
|
||||
self.assertFalse('preserve_recipients' in data['message'])
|
||||
self.assertFalse('view_content_link' in data['message'])
|
||||
self.assertFalse('tracking_domain' in data['message'])
|
||||
self.assertFalse('signing_domain' in data['message'])
|
||||
self.assertFalse('return_path_domain' in data['message'])
|
||||
self.assertFalse('subaccount' in data['message'])
|
||||
self.assertFalse('google_analytics_domains' in data['message'])
|
||||
self.assertFalse('google_analytics_campaign' in data['message'])
|
||||
self.assertFalse('metadata' in data['message'])
|
||||
self.assertFalse('merge_language' in data['message'])
|
||||
self.assertFalse('global_merge_vars' in data['message'])
|
||||
self.assertFalse('merge_vars' in data['message'])
|
||||
self.assertFalse('recipient_metadata' in data['message'])
|
||||
self.assertFalse('images' in data['message'])
|
||||
# Options at top level of api params (not in message dict):
|
||||
self.assertFalse('send_at' in data)
|
||||
self.assertFalse('async' in data)
|
||||
self.assertFalse('ip_pool' in data)
|
||||
|
||||
def test_send_attaches_mandrill_response(self):
|
||||
""" The mandrill_response should be attached to the message when it is sent """
|
||||
response = [{'email': 'to1@example.com', 'status': 'sent'}]
|
||||
self.mock_post.return_value = self.MockResponse(raw=six.b(json.dumps(response)))
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],)
|
||||
sent = msg.send()
|
||||
self.assertEqual(sent, 1)
|
||||
self.assertEqual(msg.mandrill_response, response)
|
||||
|
||||
def test_send_failed_mandrill_response(self):
|
||||
""" If the send fails, mandrill_response should be set to None """
|
||||
self.mock_post.return_value = self.MockResponse(status_code=500)
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],)
|
||||
sent = msg.send(fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
self.assertIsNone(msg.mandrill_response)
|
||||
|
||||
def test_send_unparsable_mandrill_response(self):
|
||||
"""If the send succeeds, but a non-JSON API response, should raise an API exception"""
|
||||
self.mock_post.return_value = self.MockResponse(status_code=500, raw=b"this isn't json")
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],)
|
||||
with self.assertRaises(MandrillAPIError):
|
||||
msg.send()
|
||||
self.assertIsNone(msg.mandrill_response)
|
||||
|
||||
def test_json_serialization_errors(self):
|
||||
"""Try to provide more information about non-json-serializable data"""
|
||||
self.message.global_merge_vars = {'PRICE': Decimal('19.99')}
|
||||
with self.assertRaises(NotSerializableForMandrillError) as cm:
|
||||
self.message.send()
|
||||
err = cm.exception
|
||||
self.assertTrue(isinstance(err, TypeError)) # Djrill 1.x re-raised TypeError from json.dumps
|
||||
self.assertStrContains(str(err), "Don't know how to send this data to Mandrill") # our added context
|
||||
self.assertStrContains(str(err), "Decimal('19.99') is not JSON serializable") # original message
|
||||
|
||||
def test_dates_not_serialized(self):
|
||||
"""Pre-2.0 Djrill accidentally serialized dates to ISO"""
|
||||
self.message.global_merge_vars = {'SHIP_DATE': date(2015, 12, 2)}
|
||||
with self.assertRaises(NotSerializableForMandrillError):
|
||||
self.message.send()
|
||||
|
||||
|
||||
class DjrillRecipientsRefusedTests(DjrillBackendMockAPITestCase):
|
||||
"""Djrill raises MandrillRecipientsRefused when *all* recipients are rejected or invalid"""
|
||||
|
||||
def test_recipients_refused(self):
|
||||
msg = mail.EmailMessage('Subject', 'Body', 'from@example.com',
|
||||
['invalid@localhost', 'reject@test.mandrillapp.com'])
|
||||
self.mock_post.return_value = self.MockResponse(status_code=200, raw=b"""
|
||||
[{ "email": "invalid@localhost", "status": "invalid" },
|
||||
{ "email": "reject@test.mandrillapp.com", "status": "rejected" }]""")
|
||||
with self.assertRaises(MandrillRecipientsRefused):
|
||||
msg.send()
|
||||
|
||||
def test_fail_silently(self):
|
||||
self.mock_post.return_value = self.MockResponse(status_code=200, raw=b"""
|
||||
[{ "email": "invalid@localhost", "status": "invalid" },
|
||||
{ "email": "reject@test.mandrillapp.com", "status": "rejected" }]""")
|
||||
sent = mail.send_mail('Subject', 'Body', 'from@example.com',
|
||||
['invalid@localhost', 'reject@test.mandrillapp.com'],
|
||||
fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_mixed_response(self):
|
||||
"""If *any* recipients are valid or queued, no exception is raised"""
|
||||
msg = mail.EmailMessage('Subject', 'Body', 'from@example.com',
|
||||
['invalid@localhost', 'valid@example.com',
|
||||
'reject@test.mandrillapp.com', 'also.valid@example.com'])
|
||||
self.mock_post.return_value = self.MockResponse(status_code=200, raw=b"""
|
||||
[{ "email": "invalid@localhost", "status": "invalid" },
|
||||
{ "email": "valid@example.com", "status": "sent" },
|
||||
{ "email": "reject@test.mandrillapp.com", "status": "rejected" },
|
||||
{ "email": "also.valid@example.com", "status": "queued" }]""")
|
||||
sent = msg.send()
|
||||
self.assertEqual(sent, 1) # one message sent, successfully, to 2 of 4 recipients
|
||||
|
||||
@override_settings(MANDRILL_IGNORE_RECIPIENT_STATUS=True)
|
||||
def test_settings_override(self):
|
||||
"""Setting restores Djrill 1.x behavior"""
|
||||
self.mock_post.return_value = self.MockResponse(status_code=200, raw=b"""
|
||||
[{ "email": "invalid@localhost", "status": "invalid" },
|
||||
{ "email": "reject@test.mandrillapp.com", "status": "rejected" }]""")
|
||||
sent = mail.send_mail('Subject', 'Body', 'from@example.com',
|
||||
['invalid@localhost', 'reject@test.mandrillapp.com'])
|
||||
self.assertEqual(sent, 1) # refused message is included in sent count
|
||||
|
||||
|
||||
@override_settings(MANDRILL_SETTINGS={
|
||||
'from_name': 'Djrill Test',
|
||||
'important': True,
|
||||
'track_opens': True,
|
||||
'track_clicks': True,
|
||||
'auto_text': True,
|
||||
'auto_html': True,
|
||||
'inline_css': True,
|
||||
'url_strip_qs': True,
|
||||
'tags': ['djrill'],
|
||||
'preserve_recipients': True,
|
||||
'view_content_link': True,
|
||||
'subaccount': 'example-subaccount',
|
||||
'tracking_domain': 'example.com',
|
||||
'signing_domain': 'example.com',
|
||||
'return_path_domain': 'example.com',
|
||||
'google_analytics_domains': ['example.com/test'],
|
||||
'google_analytics_campaign': ['UA-00000000-1'],
|
||||
'metadata': ['djrill'],
|
||||
'merge_language': 'mailchimp',
|
||||
'global_merge_vars': {'TEST': 'djrill'},
|
||||
'async': True,
|
||||
'ip_pool': 'Pool1',
|
||||
'invalid': 'invalid',
|
||||
})
|
||||
class DjrillMandrillGlobalFeatureTests(DjrillBackendMockAPITestCase):
|
||||
"""Test Djrill backend support for global ovveride Mandrill-specific features"""
|
||||
|
||||
def setUp(self):
|
||||
super(DjrillMandrillGlobalFeatureTests, self).setUp()
|
||||
self.message = mail.EmailMessage('Subject', 'Text Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
|
||||
def test_global_options(self):
|
||||
"""Test that any global settings get passed through
|
||||
"""
|
||||
self.message.send()
|
||||
self.assert_mandrill_called("/messages/send.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['from_name'], 'Djrill Test')
|
||||
self.assertTrue(data['message']['important'])
|
||||
self.assertTrue(data['message']['track_opens'])
|
||||
self.assertTrue(data['message']['track_clicks'])
|
||||
self.assertTrue(data['message']['auto_text'])
|
||||
self.assertTrue(data['message']['auto_html'])
|
||||
self.assertTrue(data['message']['inline_css'])
|
||||
self.assertTrue(data['message']['url_strip_qs'])
|
||||
self.assertEqual(data['message']['tags'], ['djrill'])
|
||||
self.assertTrue(data['message']['preserve_recipients'])
|
||||
self.assertTrue(data['message']['view_content_link'])
|
||||
self.assertEqual(data['message']['subaccount'], 'example-subaccount')
|
||||
self.assertEqual(data['message']['tracking_domain'], 'example.com')
|
||||
self.assertEqual(data['message']['signing_domain'], 'example.com')
|
||||
self.assertEqual(data['message']['return_path_domain'], 'example.com')
|
||||
self.assertEqual(data['message']['google_analytics_domains'], ['example.com/test'])
|
||||
self.assertEqual(data['message']['google_analytics_campaign'], ['UA-00000000-1'])
|
||||
self.assertEqual(data['message']['metadata'], ['djrill'])
|
||||
self.assertEqual(data['message']['merge_language'], 'mailchimp')
|
||||
self.assertEqual(data['message']['global_merge_vars'],
|
||||
[{'name': 'TEST', 'content': 'djrill'}])
|
||||
self.assertFalse('merge_vars' in data['message'])
|
||||
self.assertFalse('recipient_metadata' in data['message'])
|
||||
# Options at top level of api params (not in message dict):
|
||||
self.assertTrue(data['async'])
|
||||
self.assertEqual(data['ip_pool'], 'Pool1')
|
||||
# Option that shouldn't be added
|
||||
self.assertFalse('invalid' in data['message'])
|
||||
|
||||
def test_global_options_override(self):
|
||||
"""Test that manually settings options overrides global settings
|
||||
"""
|
||||
self.message.from_name = "override"
|
||||
self.message.important = False
|
||||
self.message.track_opens = False
|
||||
self.message.track_clicks = False
|
||||
self.message.auto_text = False
|
||||
self.message.auto_html = False
|
||||
self.message.inline_css = False
|
||||
self.message.url_strip_qs = False
|
||||
self.message.tags = ['override']
|
||||
self.message.preserve_recipients = False
|
||||
self.message.view_content_link = False
|
||||
self.message.subaccount = "override"
|
||||
self.message.tracking_domain = "override.example.com"
|
||||
self.message.signing_domain = "override.example.com"
|
||||
self.message.return_path_domain = "override.example.com"
|
||||
self.message.google_analytics_domains = ['override.example.com']
|
||||
self.message.google_analytics_campaign = ['UA-99999999-1']
|
||||
self.message.metadata = ['override']
|
||||
self.message.merge_language = 'handlebars'
|
||||
self.message.async = False
|
||||
self.message.ip_pool = "Bulk Pool"
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['from_name'], 'override')
|
||||
self.assertFalse(data['message']['important'])
|
||||
self.assertFalse(data['message']['track_opens'])
|
||||
self.assertFalse(data['message']['track_clicks'])
|
||||
self.assertFalse(data['message']['auto_text'])
|
||||
self.assertFalse(data['message']['auto_html'])
|
||||
self.assertFalse(data['message']['inline_css'])
|
||||
self.assertFalse(data['message']['url_strip_qs'])
|
||||
self.assertEqual(data['message']['tags'], ['override'])
|
||||
self.assertFalse(data['message']['preserve_recipients'])
|
||||
self.assertFalse(data['message']['view_content_link'])
|
||||
self.assertEqual(data['message']['subaccount'], 'override')
|
||||
self.assertEqual(data['message']['tracking_domain'], 'override.example.com')
|
||||
self.assertEqual(data['message']['signing_domain'], 'override.example.com')
|
||||
self.assertEqual(data['message']['return_path_domain'], 'override.example.com')
|
||||
self.assertEqual(data['message']['google_analytics_domains'], ['override.example.com'])
|
||||
self.assertEqual(data['message']['google_analytics_campaign'], ['UA-99999999-1'])
|
||||
self.assertEqual(data['message']['metadata'], ['override'])
|
||||
self.assertEqual(data['message']['merge_language'], 'handlebars')
|
||||
self.assertEqual(data['message']['global_merge_vars'],
|
||||
[{'name': 'TEST', 'content': 'djrill'}])
|
||||
# Options at top level of api params (not in message dict):
|
||||
self.assertFalse(data['async'])
|
||||
self.assertEqual(data['ip_pool'], 'Bulk Pool')
|
||||
|
||||
def test_global_merge(self):
|
||||
# Test that global settings merge in
|
||||
self.message.global_merge_vars = {'GREETING': "Hello"}
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['global_merge_vars'],
|
||||
[{'name': "GREETING", 'content': "Hello"},
|
||||
{'name': 'TEST', 'content': 'djrill'}])
|
||||
|
||||
def test_global_merge_overwrite(self):
|
||||
# Test that global merge settings are overwritten
|
||||
self.message.global_merge_vars = {'TEST': "Hello"}
|
||||
self.message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['global_merge_vars'],
|
||||
[{'name': 'TEST', 'content': 'Hello'}])
|
||||
|
||||
|
||||
@override_settings(EMAIL_BACKEND="anymail.backends.mandrill.MandrillBackend")
|
||||
class DjrillImproperlyConfiguredTests(TestCase):
|
||||
"""Test Djrill backend without Djrill-specific settings in place"""
|
||||
|
||||
def test_missing_api_key(self):
|
||||
with self.assertRaises(ImproperlyConfigured):
|
||||
mail.send_mail('Subject', 'Message', 'from@example.com',
|
||||
['to@example.com'])
|
||||
84
anymail/tests/test_mandrill_send_template.py
Normal file
84
anymail/tests/test_mandrill_send_template.py
Normal file
@@ -0,0 +1,84 @@
|
||||
from django.core import mail
|
||||
|
||||
from anymail import MandrillAPIError
|
||||
|
||||
from .mock_backend import DjrillBackendMockAPITestCase
|
||||
|
||||
|
||||
class DjrillMandrillSendTemplateTests(DjrillBackendMockAPITestCase):
|
||||
"""Test Djrill backend support for Mandrill send-template features"""
|
||||
|
||||
def test_send_template(self):
|
||||
msg = mail.EmailMessage('Subject', 'Text Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
msg.template_name = "PERSONALIZED_SPECIALS"
|
||||
msg.template_content = {
|
||||
'HEADLINE': "<h1>Specials Just For *|FNAME|*</h1>",
|
||||
'OFFER_BLOCK': "<p><em>Half off</em> all fruit</p>"
|
||||
}
|
||||
msg.send()
|
||||
self.assert_mandrill_called("/messages/send-template.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['template_name'], "PERSONALIZED_SPECIALS")
|
||||
# Djrill expands simple python dicts into the more-verbose name/content
|
||||
# structures the Mandrill API uses
|
||||
self.assertEqual(data['template_content'],
|
||||
[ {'name': "HEADLINE",
|
||||
'content': "<h1>Specials Just For *|FNAME|*</h1>"},
|
||||
{'name': "OFFER_BLOCK",
|
||||
'content': "<p><em>Half off</em> all fruit</p>"} ]
|
||||
)
|
||||
|
||||
def test_send_template_without_from_field(self):
|
||||
msg = mail.EmailMessage('Subject', 'Text Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
msg.template_name = "PERSONALIZED_SPECIALS"
|
||||
msg.use_template_from = True
|
||||
msg.send()
|
||||
self.assert_mandrill_called("/messages/send-template.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['template_name'], "PERSONALIZED_SPECIALS")
|
||||
self.assertFalse('from_email' in data['message'])
|
||||
self.assertFalse('from_name' in data['message'])
|
||||
|
||||
def test_send_template_without_from_field_api_failure(self):
|
||||
self.mock_post.return_value = self.MockResponse(status_code=400)
|
||||
msg = mail.EmailMessage('Subject', 'Text Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
msg.template_name = "PERSONALIZED_SPECIALS"
|
||||
msg.use_template_from = True
|
||||
with self.assertRaises(MandrillAPIError):
|
||||
msg.send()
|
||||
|
||||
def test_send_template_without_subject_field(self):
|
||||
msg = mail.EmailMessage('Subject', 'Text Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
msg.template_name = "PERSONALIZED_SPECIALS"
|
||||
msg.use_template_subject = True
|
||||
msg.send()
|
||||
self.assert_mandrill_called("/messages/send-template.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['template_name'], "PERSONALIZED_SPECIALS")
|
||||
self.assertFalse('subject' in data['message'])
|
||||
|
||||
def test_no_template_content(self):
|
||||
# Just a template, without any template_content to be merged
|
||||
msg = mail.EmailMessage('Subject', 'Text Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
msg.template_name = "WELCOME_MESSAGE"
|
||||
msg.send()
|
||||
self.assert_mandrill_called("/messages/send-template.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['template_name'], "WELCOME_MESSAGE")
|
||||
self.assertEqual(data['template_content'], []) # Mandrill requires this field
|
||||
|
||||
def test_non_template_send(self):
|
||||
# Make sure the non-template case still uses /messages/send.json
|
||||
msg = mail.EmailMessage('Subject', 'Text Body',
|
||||
'from@example.com', ['to@example.com'])
|
||||
msg.send()
|
||||
self.assert_mandrill_called("/messages/send.json")
|
||||
data = self.get_api_call_data()
|
||||
self.assertFalse('template_name' in data)
|
||||
self.assertFalse('template_content' in data)
|
||||
self.assertFalse('async' in data)
|
||||
72
anymail/tests/test_mandrill_session_sharing.py
Normal file
72
anymail/tests/test_mandrill_session_sharing.py
Normal file
@@ -0,0 +1,72 @@
|
||||
from decimal import Decimal
|
||||
from mock import patch
|
||||
|
||||
from django.core import mail
|
||||
|
||||
from .mock_backend import DjrillBackendMockAPITestCase
|
||||
|
||||
|
||||
class DjrillSessionSharingTests(DjrillBackendMockAPITestCase):
|
||||
"""Test Djrill backend sharing of single Mandrill API connection"""
|
||||
|
||||
@patch('requests.Session.close', autospec=True)
|
||||
def test_connection_sharing(self, mock_close):
|
||||
"""Djrill reuses one requests session when sending multiple messages"""
|
||||
datatuple = (
|
||||
('Subject 1', 'Body 1', 'from@example.com', ['to@example.com']),
|
||||
('Subject 2', 'Body 2', 'from@example.com', ['to@example.com']),
|
||||
)
|
||||
mail.send_mass_mail(datatuple)
|
||||
self.assertEqual(self.mock_post.call_count, 2)
|
||||
session1 = self.mock_post.call_args_list[0][0] # arg[0] (self) is session
|
||||
session2 = self.mock_post.call_args_list[1][0]
|
||||
self.assertEqual(session1, session2)
|
||||
self.assertEqual(mock_close.call_count, 1)
|
||||
|
||||
@patch('requests.Session.close', autospec=True)
|
||||
def test_caller_managed_connections(self, mock_close):
|
||||
"""Calling code can created long-lived connection that it opens and closes"""
|
||||
connection = mail.get_connection()
|
||||
connection.open()
|
||||
mail.send_mail('Subject 1', 'body', 'from@example.com', ['to@example.com'], connection=connection)
|
||||
session1 = self.mock_post.call_args[0]
|
||||
self.assertEqual(mock_close.call_count, 0) # shouldn't be closed yet
|
||||
|
||||
mail.send_mail('Subject 2', 'body', 'from@example.com', ['to@example.com'], connection=connection)
|
||||
self.assertEqual(mock_close.call_count, 0) # still shouldn't be closed
|
||||
session2 = self.mock_post.call_args[0]
|
||||
self.assertEqual(session1, session2) # should have reused same session
|
||||
|
||||
connection.close()
|
||||
self.assertEqual(mock_close.call_count, 1)
|
||||
|
||||
def test_session_closed_after_exception(self):
|
||||
# fail loud case:
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],)
|
||||
msg.global_merge_vars = {'PRICE': Decimal('19.99')} # will cause JSON serialization error
|
||||
with patch('requests.Session.close', autospec=True) as mock_close:
|
||||
with self.assertRaises(TypeError):
|
||||
msg.send()
|
||||
self.assertEqual(mock_close.call_count, 1)
|
||||
|
||||
# fail silently case (EmailMessage caches backend on send, so must create new one):
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],)
|
||||
msg.global_merge_vars = {'PRICE': Decimal('19.99')}
|
||||
with patch('requests.Session.close', autospec=True) as mock_close:
|
||||
sent = msg.send(fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
self.assertEqual(mock_close.call_count, 1)
|
||||
|
||||
# caller-supplied connection case:
|
||||
with patch('requests.Session.close', autospec=True) as mock_close:
|
||||
connection = mail.get_connection()
|
||||
connection.open()
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],
|
||||
connection=connection)
|
||||
msg.global_merge_vars = {'PRICE': Decimal('19.99')}
|
||||
with self.assertRaises(TypeError):
|
||||
msg.send()
|
||||
self.assertEqual(mock_close.call_count, 0) # wait for us to close it
|
||||
|
||||
connection.close()
|
||||
self.assertEqual(mock_close.call_count, 1)
|
||||
42
anymail/tests/test_mandrill_subaccounts.py
Normal file
42
anymail/tests/test_mandrill_subaccounts.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from django.core import mail
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from .mock_backend import DjrillBackendMockAPITestCase
|
||||
|
||||
|
||||
class DjrillMandrillSubaccountTests(DjrillBackendMockAPITestCase):
|
||||
"""Test Djrill backend support for Mandrill subaccounts"""
|
||||
|
||||
def test_no_subaccount_by_default(self):
|
||||
mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
data = self.get_api_call_data()
|
||||
self.assertFalse('subaccount' in data['message'])
|
||||
|
||||
@override_settings(MANDRILL_SETTINGS={'subaccount': 'test_subaccount'})
|
||||
def test_subaccount_setting(self):
|
||||
mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['subaccount'], "test_subaccount")
|
||||
|
||||
@override_settings(MANDRILL_SETTINGS={'subaccount': 'global_setting_subaccount'})
|
||||
def test_subaccount_message_overrides_setting(self):
|
||||
message = mail.EmailMessage('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
message.subaccount = "individual_message_subaccount" # should override global setting
|
||||
message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['subaccount'], "individual_message_subaccount")
|
||||
|
||||
# Djrill 1.x offered dedicated MANDRILL_SUBACCOUNT setting.
|
||||
# In Djrill 2.x, you should use the MANDRILL_SETTINGS dict as in the earlier tests.
|
||||
# But we still support the old setting for compatibility:
|
||||
@override_settings(MANDRILL_SUBACCOUNT="legacy_setting_subaccount")
|
||||
def test_subaccount_legacy_setting(self):
|
||||
mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['subaccount'], "legacy_setting_subaccount")
|
||||
|
||||
message = mail.EmailMessage('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
message.subaccount = "individual_message_subaccount" # should override legacy setting
|
||||
message.send()
|
||||
data = self.get_api_call_data()
|
||||
self.assertEqual(data['message']['subaccount'], "individual_message_subaccount")
|
||||
128
anymail/tests/test_mandrill_webhook.py
Normal file
128
anymail/tests/test_mandrill_webhook.py
Normal file
@@ -0,0 +1,128 @@
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
from base64 import b64encode
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.test import TestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from anymail.compat import b
|
||||
from anymail.signals import webhook_event
|
||||
|
||||
|
||||
class DjrillWebhookSecretMixinTests(TestCase):
|
||||
"""
|
||||
Test mixin used in optional Mandrill webhook support
|
||||
"""
|
||||
|
||||
def test_missing_secret(self):
|
||||
with self.assertRaises(ImproperlyConfigured):
|
||||
self.client.get('/webhook/')
|
||||
|
||||
@override_settings(DJRILL_WEBHOOK_SECRET='abc123')
|
||||
def test_incorrect_secret(self):
|
||||
response = self.client.head('/webhook/?secret=wrong')
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
@override_settings(DJRILL_WEBHOOK_SECRET='abc123')
|
||||
def test_default_secret_name(self):
|
||||
response = self.client.head('/webhook/?secret=abc123')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@override_settings(DJRILL_WEBHOOK_SECRET='abc123', DJRILL_WEBHOOK_SECRET_NAME='verysecret')
|
||||
def test_custom_secret_name(self):
|
||||
response = self.client.head('/webhook/?verysecret=abc123')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
|
||||
@override_settings(DJRILL_WEBHOOK_SECRET='abc123',
|
||||
DJRILL_WEBHOOK_SIGNATURE_KEY="signature")
|
||||
class DjrillWebhookSignatureMixinTests(TestCase):
|
||||
"""
|
||||
Test mixin used in optional Mandrill webhook signature support
|
||||
"""
|
||||
|
||||
def test_incorrect_settings(self):
|
||||
with self.assertRaises(ImproperlyConfigured):
|
||||
self.client.post('/webhook/?secret=abc123')
|
||||
|
||||
@override_settings(DJRILL_WEBHOOK_URL="/webhook/?secret=abc123",
|
||||
DJRILL_WEBHOOK_SIGNATURE_KEY = "anothersignature")
|
||||
def test_unauthorized(self):
|
||||
response = self.client.post(settings.DJRILL_WEBHOOK_URL)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
|
||||
@override_settings(DJRILL_WEBHOOK_URL="/webhook/?secret=abc123")
|
||||
def test_signature(self):
|
||||
signature = hmac.new(key=b(settings.DJRILL_WEBHOOK_SIGNATURE_KEY),
|
||||
msg=b(settings.DJRILL_WEBHOOK_URL+"mandrill_events[]"),
|
||||
digestmod=hashlib.sha1)
|
||||
hash_string = b64encode(signature.digest())
|
||||
response = self.client.post('/webhook/?secret=abc123', data={"mandrill_events":"[]"},
|
||||
**{"HTTP_X_MANDRILL_SIGNATURE": hash_string})
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
|
||||
@override_settings(DJRILL_WEBHOOK_SECRET='abc123')
|
||||
class DjrillWebhookViewTests(TestCase):
|
||||
"""
|
||||
Test optional Mandrill webhook view
|
||||
"""
|
||||
|
||||
def test_head_request(self):
|
||||
response = self.client.head('/webhook/?secret=abc123')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_post_request_invalid_json(self):
|
||||
response = self.client.post('/webhook/?secret=abc123')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_post_request_valid_json(self):
|
||||
response = self.client.post('/webhook/?secret=abc123', {
|
||||
'mandrill_events': json.dumps([{"event": "send", "msg": {}}])
|
||||
})
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_webhook_send_signal(self):
|
||||
self.signal_received_count = 0
|
||||
test_event = {"event": "send", "msg": {}}
|
||||
|
||||
def my_callback(sender, event_type, data, **kwargs):
|
||||
self.signal_received_count += 1
|
||||
self.assertEqual(event_type, 'send')
|
||||
self.assertEqual(data, test_event)
|
||||
|
||||
try:
|
||||
webhook_event.connect(my_callback, weak=False) # local callback func, so don't use weak ref
|
||||
response = self.client.post('/webhook/?secret=abc123', {
|
||||
'mandrill_events': json.dumps([test_event])
|
||||
})
|
||||
finally:
|
||||
webhook_event.disconnect(my_callback)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(self.signal_received_count, 1)
|
||||
|
||||
def test_webhook_sync_event(self):
|
||||
# Mandrill sync events use a different format from other events
|
||||
# https://mandrill.zendesk.com/hc/en-us/articles/205583297-Sync-Event-Webhook-format
|
||||
self.signal_received_count = 0
|
||||
test_event = {"type": "whitelist", "action": "add"}
|
||||
|
||||
def my_callback(sender, event_type, data, **kwargs):
|
||||
self.signal_received_count += 1
|
||||
self.assertEqual(event_type, 'whitelist_add') # synthesized event_type
|
||||
self.assertEqual(data, test_event)
|
||||
|
||||
try:
|
||||
webhook_event.connect(my_callback, weak=False) # local callback func, so don't use weak ref
|
||||
response = self.client.post('/webhook/?secret=abc123', {
|
||||
'mandrill_events': json.dumps([test_event])
|
||||
})
|
||||
finally:
|
||||
webhook_event.disconnect(my_callback)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(self.signal_received_count, 1)
|
||||
11
anymail/urls.py
Normal file
11
anymail/urls.py
Normal file
@@ -0,0 +1,11 @@
|
||||
try:
|
||||
from django.conf.urls import url
|
||||
except ImportError:
|
||||
from django.conf.urls.defaults import url
|
||||
|
||||
from .views import DjrillWebhookView
|
||||
|
||||
|
||||
urlpatterns = [
|
||||
url(r'^webhook/$', DjrillWebhookView.as_view(), name='djrill_webhook'),
|
||||
]
|
||||
99
anymail/views.py
Normal file
99
anymail/views.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
from base64 import b64encode
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.http import HttpResponse
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.views.generic import View
|
||||
|
||||
from .compat import b
|
||||
from .signals import webhook_event
|
||||
|
||||
|
||||
class DjrillWebhookSecretMixin(object):
|
||||
|
||||
@method_decorator(csrf_exempt)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
secret = getattr(settings, 'DJRILL_WEBHOOK_SECRET', None)
|
||||
secret_name = getattr(settings, 'DJRILL_WEBHOOK_SECRET_NAME', 'secret')
|
||||
|
||||
if secret is None:
|
||||
raise ImproperlyConfigured(
|
||||
"You have not set DJRILL_WEBHOOK_SECRET in the settings file.")
|
||||
|
||||
if request.GET.get(secret_name) != secret:
|
||||
return HttpResponse(status=403)
|
||||
|
||||
return super(DjrillWebhookSecretMixin, self).dispatch(
|
||||
request, *args, **kwargs)
|
||||
|
||||
|
||||
class DjrillWebhookSignatureMixin(object):
|
||||
|
||||
@method_decorator(csrf_exempt)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
|
||||
signature_key = getattr(settings, 'DJRILL_WEBHOOK_SIGNATURE_KEY', None)
|
||||
|
||||
if signature_key and request.method == "POST":
|
||||
|
||||
# Make webhook url an explicit setting to make sure that this is the exact same string
|
||||
# that the user entered in Mandrill
|
||||
post_string = getattr(settings, "DJRILL_WEBHOOK_URL", None)
|
||||
if post_string is None:
|
||||
raise ImproperlyConfigured(
|
||||
"You have set DJRILL_WEBHOOK_SIGNATURE_KEY, but haven't set DJRILL_WEBHOOK_URL in the settings file.")
|
||||
|
||||
signature = request.META.get("HTTP_X_MANDRILL_SIGNATURE", None)
|
||||
if not signature:
|
||||
return HttpResponse(status=403, content="X-Mandrill-Signature not set")
|
||||
|
||||
# The querydict is a bit special, see https://docs.djangoproject.com/en/dev/ref/request-response/#querydict-objects
|
||||
# Mandrill needs it to be sorted and added to the hash
|
||||
post_lists = sorted(request.POST.lists())
|
||||
for value_list in post_lists:
|
||||
for item in value_list[1]:
|
||||
post_string += "%s%s" % (value_list[0], item)
|
||||
|
||||
hash_string = b64encode(hmac.new(key=b(signature_key), msg=b(post_string), digestmod=hashlib.sha1).digest())
|
||||
if signature != hash_string:
|
||||
return HttpResponse(status=403, content="Signature doesn't match")
|
||||
|
||||
return super(DjrillWebhookSignatureMixin, self).dispatch(
|
||||
request, *args, **kwargs)
|
||||
|
||||
|
||||
class DjrillWebhookView(DjrillWebhookSecretMixin, DjrillWebhookSignatureMixin, View):
|
||||
def head(self, request, *args, **kwargs):
|
||||
return HttpResponse()
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
try:
|
||||
data = json.loads(request.POST.get('mandrill_events'))
|
||||
except TypeError:
|
||||
return HttpResponse(status=400)
|
||||
|
||||
for event in data:
|
||||
webhook_event.send(
|
||||
sender=None, event_type=self.get_event_type(event), data=event)
|
||||
|
||||
return HttpResponse()
|
||||
|
||||
def get_event_type(self, event):
|
||||
try:
|
||||
# Message event: https://mandrill.zendesk.com/hc/en-us/articles/205583307
|
||||
# Inbound event: https://mandrill.zendesk.com/hc/en-us/articles/205583207
|
||||
event_type = event['event']
|
||||
except KeyError:
|
||||
try:
|
||||
# Sync event: https://mandrill.zendesk.com/hc/en-us/articles/205583297
|
||||
# Synthesize an event_type like "whitelist_add" or "blacklist_change"
|
||||
event_type = "%s_%s" % (event['type'], event['action'])
|
||||
except KeyError:
|
||||
# Unknown future event format
|
||||
event_type = None
|
||||
return event_type
|
||||
Reference in New Issue
Block a user