mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-20 03:41:05 -05:00
Add Postmark support
This commit is contained in:
@@ -71,9 +71,18 @@ class AnymailRequestsBackend(AnymailBaseBackend):
|
||||
"""
|
||||
params = payload.get_request_params(self.api_url)
|
||||
response = self.session.request(**params)
|
||||
self.raise_for_status(response, payload, message)
|
||||
return response
|
||||
|
||||
def raise_for_status(self, response, payload, message):
|
||||
"""Raise AnymailRequestsAPIError if response is an HTTP error
|
||||
|
||||
Subclasses can override for custom error checking
|
||||
(though should defer parsing/deserialization of the body to
|
||||
parse_recipient_status)
|
||||
"""
|
||||
if response.status_code != 200:
|
||||
raise AnymailRequestsAPIError(email_message=message, payload=payload, response=response)
|
||||
return response
|
||||
|
||||
def deserialize_json_response(self, response, payload, message):
|
||||
"""Deserialize an ESP API response that's in json.
|
||||
|
||||
189
anymail/backends/postmark.py
Normal file
189
anymail/backends/postmark.py
Normal file
@@ -0,0 +1,189 @@
|
||||
import re
|
||||
|
||||
from ..exceptions import AnymailRequestsAPIError
|
||||
from ..message import AnymailRecipientStatus
|
||||
from ..utils import get_anymail_setting
|
||||
|
||||
from .base_requests import AnymailRequestsBackend, RequestsPayload
|
||||
|
||||
|
||||
class PostmarkBackend(AnymailRequestsBackend):
|
||||
"""
|
||||
Postmark API Email Backend
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Init options from Django settings"""
|
||||
self.server_token = get_anymail_setting('POSTMARK_SERVER_TOKEN', allow_bare=True)
|
||||
api_url = get_anymail_setting("POSTMARK_API_URL", "https://api.postmarkapp.com/")
|
||||
if not api_url.endswith("/"):
|
||||
api_url += "/"
|
||||
super(PostmarkBackend, self).__init__(api_url, **kwargs)
|
||||
|
||||
def build_message_payload(self, message, defaults):
|
||||
return PostmarkPayload(message, defaults, self)
|
||||
|
||||
def raise_for_status(self, response, payload, message):
|
||||
# We need to handle 422 responses in parse_recipient_status
|
||||
if response.status_code != 422:
|
||||
super(PostmarkBackend, self).raise_for_status(response, payload, message)
|
||||
|
||||
def parse_recipient_status(self, response, payload, message):
|
||||
parsed_response = self.deserialize_json_response(response, payload, message)
|
||||
try:
|
||||
error_code = parsed_response["ErrorCode"]
|
||||
msg = parsed_response["Message"]
|
||||
except (KeyError, TypeError):
|
||||
raise AnymailRequestsAPIError("Invalid Postmark API response format",
|
||||
email_message=message, payload=payload, response=response)
|
||||
|
||||
message_id = parsed_response.get("MessageID", None)
|
||||
rejected_emails = []
|
||||
|
||||
if error_code == 300: # Invalid email request
|
||||
# Either the From address or at least one recipient was invalid. Email not sent.
|
||||
if "'From' address" in msg:
|
||||
# Normal error
|
||||
raise AnymailRequestsAPIError(email_message=message, payload=payload, response=response)
|
||||
else:
|
||||
# Use AnymailRecipientsRefused logic
|
||||
default_status = 'invalid'
|
||||
elif error_code == 406: # Inactive recipient
|
||||
# All recipients were rejected as hard-bounce or spam-complaint. Email not sent.
|
||||
default_status = 'rejected'
|
||||
elif error_code == 0:
|
||||
# At least partial success, and email was sent.
|
||||
# Sadly, have to parse human-readable message to figure out if everyone got it.
|
||||
default_status = 'sent'
|
||||
rejected_emails = self.parse_inactive_recipients(msg)
|
||||
else:
|
||||
raise AnymailRequestsAPIError(email_message=message, payload=payload, response=response)
|
||||
|
||||
return {
|
||||
recipient.email: AnymailRecipientStatus(
|
||||
message_id=message_id,
|
||||
status=('rejected' if recipient.email.lower() in rejected_emails
|
||||
else default_status)
|
||||
)
|
||||
for recipient in payload.all_recipients
|
||||
}
|
||||
|
||||
def parse_inactive_recipients(self, msg):
|
||||
"""Return a list of 'inactive' email addresses from a Postmark "OK" response
|
||||
|
||||
:param str msg: the "Message" from the Postmark API response
|
||||
"""
|
||||
# Example msg with inactive recipients:
|
||||
# "Message OK, but will not deliver to these inactive addresses: one@xample.com, two@example.com."
|
||||
# " Inactive recipients are ones that have generated a hard bounce or a spam complaint."
|
||||
# Example msg with everything OK: "OK"
|
||||
match = re.search(r'inactive addresses:\s*(.*)\.\s*Inactive recipients', msg)
|
||||
if match:
|
||||
emails = match.group(1) # "one@xample.com, two@example.com"
|
||||
return [email.strip().lower() for email in emails.split(',')]
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
class PostmarkPayload(RequestsPayload):
|
||||
|
||||
def __init__(self, message, defaults, backend, *args, **kwargs):
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Accept': 'application/json',
|
||||
# 'X-Postmark-Server-Token': see get_request_params (and set_esp_extra)
|
||||
}
|
||||
self.server_token = backend.server_token # added to headers later, so esp_extra can override
|
||||
self.all_recipients = [] # used for backend.parse_recipient_status
|
||||
super(PostmarkPayload, self).__init__(message, defaults, backend, headers=headers, *args, **kwargs)
|
||||
|
||||
def get_api_endpoint(self):
|
||||
return "email"
|
||||
|
||||
def get_request_params(self, api_url):
|
||||
params = super(PostmarkPayload, self).get_request_params(api_url)
|
||||
params['headers']['X-Postmark-Server-Token'] = self.server_token
|
||||
return params
|
||||
|
||||
def serialize_data(self):
|
||||
return self.serialize_json(self.data)
|
||||
|
||||
#
|
||||
# Payload construction
|
||||
#
|
||||
|
||||
def init_payload(self):
|
||||
self.data = {} # becomes json
|
||||
|
||||
def set_from_email(self, email):
|
||||
self.data["From"] = email.address
|
||||
|
||||
def set_recipients(self, recipient_type, emails):
|
||||
assert recipient_type in ["to", "cc", "bcc"]
|
||||
if emails:
|
||||
field = recipient_type.capitalize()
|
||||
self.data[field] = ', '.join([email.address for email in emails])
|
||||
self.all_recipients += emails # used for backend.parse_recipient_status
|
||||
|
||||
def set_subject(self, subject):
|
||||
self.data["Subject"] = subject
|
||||
|
||||
def set_reply_to(self, emails):
|
||||
if emails:
|
||||
reply_to = ", ".join([email.address for email in emails])
|
||||
self.data["ReplyTo"] = reply_to
|
||||
|
||||
def set_extra_headers(self, headers):
|
||||
self.data["Headers"] = [
|
||||
{"Name": key, "Value": value}
|
||||
for key, value in headers.items()
|
||||
]
|
||||
|
||||
def set_text_body(self, body):
|
||||
self.data["TextBody"] = body
|
||||
|
||||
def set_html_body(self, body):
|
||||
if "HtmlBody" in self.data:
|
||||
# second html body could show up through multiple alternatives, or html body + alternative
|
||||
self.unsupported_feature("multiple html parts")
|
||||
self.data["HtmlBody"] = body
|
||||
|
||||
def make_attachment(self, attachment):
|
||||
"""Returns Postmark attachment dict for attachment"""
|
||||
att = {
|
||||
"Name": attachment.name or "",
|
||||
"Content": attachment.b64content,
|
||||
"ContentType": attachment.mimetype,
|
||||
}
|
||||
if attachment.inline:
|
||||
att["ContentID"] = "cid:%s" % attachment.cid
|
||||
return att
|
||||
|
||||
def set_attachments(self, attachments):
|
||||
if attachments:
|
||||
self.data["Attachments"] = [
|
||||
self.make_attachment(attachment) for attachment in attachments
|
||||
]
|
||||
|
||||
# Postmark doesn't support metadata
|
||||
# def set_metadata(self, metadata):
|
||||
|
||||
# Postmark doesn't support delayed sending
|
||||
# def set_send_at(self, send_at):
|
||||
|
||||
def set_tags(self, tags):
|
||||
if len(tags) > 0:
|
||||
self.data["Tag"] = tags[0]
|
||||
if len(tags) > 1:
|
||||
self.unsupported_feature('multiple tags (%r)' % tags)
|
||||
|
||||
# Postmark doesn't support click-tracking
|
||||
# def set_track_clicks(self, track_clicks):
|
||||
|
||||
def set_track_opens(self, track_opens):
|
||||
self.data["TrackOpens"] = track_opens
|
||||
|
||||
def set_esp_extra(self, extra):
|
||||
self.data.update(extra)
|
||||
# Special handling for 'server_token':
|
||||
self.server_token = self.data.pop('server_token', self.server_token)
|
||||
574
anymail/tests/test_postmark_backend.py
Normal file
574
anymail/tests/test_postmark_backend.py
Normal file
@@ -0,0 +1,574 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from base64 import b64encode
|
||||
from decimal import Decimal
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.image import MIMEImage
|
||||
|
||||
from django.core import mail
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.test import SimpleTestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from anymail.exceptions import (AnymailAPIError, AnymailSerializationError,
|
||||
AnymailUnsupportedFeature, AnymailRecipientsRefused)
|
||||
from anymail.message import attach_inline_image_file
|
||||
|
||||
from .mock_requests_backend import RequestsBackendMockAPITestCase
|
||||
from .utils import sample_image_content, sample_image_path, SAMPLE_IMAGE_FILENAME, AnymailTestMixin, decode_att
|
||||
|
||||
|
||||
@override_settings(EMAIL_BACKEND='anymail.backends.postmark.PostmarkBackend',
|
||||
ANYMAIL={'POSTMARK_SERVER_TOKEN': 'test_server_token'})
|
||||
class PostmarkBackendMockAPITestCase(RequestsBackendMockAPITestCase):
|
||||
DEFAULT_RAW_RESPONSE = b"""{
|
||||
"To": "to@example.com",
|
||||
"SubmittedAt": "2016-03-12T15:27:50.4468803-05:00",
|
||||
"MessageID": "b4007d94-33f1-4e78-a783-97417d6c80e6",
|
||||
"ErrorCode":0,
|
||||
"Message":"OK"
|
||||
}"""
|
||||
|
||||
def setUp(self):
|
||||
super(PostmarkBackendMockAPITestCase, self).setUp()
|
||||
# Simple message useful for many tests
|
||||
self.message = mail.EmailMultiAlternatives('Subject', 'Text Body', 'from@example.com', ['to@example.com'])
|
||||
|
||||
|
||||
class PostmarkBackendStandardEmailTests(PostmarkBackendMockAPITestCase):
|
||||
"""Test backend support for Django standard email features"""
|
||||
|
||||
def test_send_mail(self):
|
||||
"""Test basic API for simple send"""
|
||||
mail.send_mail('Subject here', 'Here is the message.',
|
||||
'from@sender.example.com', ['to@example.com'], fail_silently=False)
|
||||
self.assert_esp_called('/email')
|
||||
headers = self.get_api_call_headers()
|
||||
self.assertEqual(headers["X-Postmark-Server-Token"], "test_server_token")
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Subject'], "Subject here")
|
||||
self.assertEqual(data['TextBody'], "Here is the message.")
|
||||
self.assertEqual(data['From'], "from@sender.example.com")
|
||||
self.assertEqual(data['To'], "to@example.com")
|
||||
|
||||
def test_name_addr(self):
|
||||
"""Make sure RFC2822 name-addr format (with display-name) is allowed
|
||||
|
||||
(Test both sender and recipient addresses)
|
||||
"""
|
||||
msg = mail.EmailMessage(
|
||||
'Subject', 'Message', 'From Name <from@example.com>',
|
||||
['Recipient #1 <to1@example.com>', 'to2@example.com'],
|
||||
cc=['Carbon Copy <cc1@example.com>', 'cc2@example.com'],
|
||||
bcc=['Blind Copy <bcc1@example.com>', 'bcc2@example.com'])
|
||||
msg.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['From'], 'From Name <from@example.com>')
|
||||
self.assertEqual(data['To'], 'Recipient #1 <to1@example.com>, to2@example.com')
|
||||
self.assertEqual(data['Cc'], 'Carbon Copy <cc1@example.com>, cc2@example.com')
|
||||
self.assertEqual(data['Bcc'], 'Blind Copy <bcc1@example.com>, bcc2@example.com')
|
||||
|
||||
def test_email_message(self):
|
||||
email = mail.EmailMessage(
|
||||
'Subject', 'Body goes here', 'from@example.com',
|
||||
['to1@example.com', 'Also To <to2@example.com>'],
|
||||
bcc=['bcc1@example.com', 'Also BCC <bcc2@example.com>'],
|
||||
cc=['cc1@example.com', 'Also CC <cc2@example.com>'],
|
||||
headers={'Reply-To': 'another@example.com',
|
||||
'X-MyHeader': 'my value',
|
||||
'Message-ID': 'mycustommsgid@sales.example.com'}) # should override backend msgid
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Subject'], "Subject")
|
||||
self.assertEqual(data['TextBody'], "Body goes here")
|
||||
self.assertEqual(data['From'], "from@example.com")
|
||||
self.assertEqual(data['To'], 'to1@example.com, Also To <to2@example.com>')
|
||||
self.assertEqual(data['Bcc'], 'bcc1@example.com, Also BCC <bcc2@example.com>')
|
||||
self.assertEqual(data['Cc'], 'cc1@example.com, Also CC <cc2@example.com>')
|
||||
self.assertCountEqual(data['Headers'], [
|
||||
{'Name': 'Message-ID', 'Value': 'mycustommsgid@sales.example.com'},
|
||||
{'Name': 'Reply-To', 'Value': 'another@example.com'},
|
||||
{'Name': 'X-MyHeader', 'Value': 'my value'},
|
||||
])
|
||||
|
||||
def test_html_message(self):
|
||||
text_content = 'This is an important message.'
|
||||
html_content = '<p>This is an <strong>important</strong> message.</p>'
|
||||
email = mail.EmailMultiAlternatives('Subject', text_content,
|
||||
'from@example.com', ['to@example.com'])
|
||||
email.attach_alternative(html_content, "text/html")
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['TextBody'], text_content)
|
||||
self.assertEqual(data['HtmlBody'], html_content)
|
||||
# Don't accidentally send the html part as an attachment:
|
||||
self.assertNotIn('Attachments', data)
|
||||
|
||||
def test_html_only_message(self):
|
||||
html_content = '<p>This is an <strong>important</strong> message.</p>'
|
||||
email = mail.EmailMessage('Subject', html_content, 'from@example.com', ['to@example.com'])
|
||||
email.content_subtype = "html" # Main content is now text/html
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('TextBody', data)
|
||||
self.assertEqual(data['HtmlBody'], html_content)
|
||||
|
||||
def test_extra_headers(self):
|
||||
self.message.extra_headers = {'X-Custom': 'string', 'X-Num': 123}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertCountEqual(data['Headers'], [
|
||||
{'Name': 'X-Custom', 'Value': 'string'},
|
||||
{'Name': 'X-Num', 'Value': 123}
|
||||
])
|
||||
|
||||
def test_extra_headers_serialization_error(self):
|
||||
self.message.extra_headers = {'X-Custom': Decimal(12.5)}
|
||||
with self.assertRaisesMessage(AnymailSerializationError, "Decimal('12.5')"):
|
||||
self.message.send()
|
||||
|
||||
def test_reply_to(self):
|
||||
# reply_to is new in Django 1.8 -- before that, you can simply include it in headers
|
||||
try:
|
||||
# noinspection PyArgumentList
|
||||
email = mail.EmailMessage('Subject', 'Body goes here', 'from@example.com', ['to1@example.com'],
|
||||
reply_to=['reply@example.com', 'Other <reply2@example.com>'],
|
||||
headers={'X-Other': 'Keep'})
|
||||
except TypeError:
|
||||
# Pre-Django 1.8
|
||||
return self.skipTest("Django version doesn't support EmailMessage(reply_to)")
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['ReplyTo'], 'reply@example.com, Other <reply2@example.com>')
|
||||
self.assertEqual(data['Headers'], [{'Name': 'X-Other', 'Value': 'Keep'}]) # don't lose other headers
|
||||
|
||||
def test_attachments(self):
|
||||
text_content = "* Item one\n* Item two\n* Item three"
|
||||
self.message.attach(filename="test.txt", content=text_content, mimetype="text/plain")
|
||||
|
||||
# Should guess mimetype if not provided...
|
||||
png_content = b"PNG\xb4 pretend this is the contents of a png file"
|
||||
self.message.attach(filename="test.png", content=png_content)
|
||||
|
||||
# Should work with a MIMEBase object (also tests no filename)...
|
||||
pdf_content = b"PDF\xb4 pretend this is valid pdf data"
|
||||
mimeattachment = MIMEBase('application', 'pdf')
|
||||
mimeattachment.set_payload(pdf_content)
|
||||
self.message.attach(mimeattachment)
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
attachments = data['Attachments']
|
||||
self.assertEqual(len(attachments), 3)
|
||||
self.assertEqual(attachments[0]["Name"], "test.txt")
|
||||
self.assertEqual(attachments[0]["ContentType"], "text/plain")
|
||||
self.assertEqual(decode_att(attachments[0]["Content"]).decode('ascii'), text_content)
|
||||
self.assertNotIn('ContentID', attachments[0])
|
||||
|
||||
self.assertEqual(attachments[1]["ContentType"], "image/png") # inferred from filename
|
||||
self.assertEqual(attachments[1]["Name"], "test.png")
|
||||
self.assertEqual(decode_att(attachments[1]["Content"]), png_content)
|
||||
self.assertNotIn('ContentID', attachments[1]) # make sure image not treated as inline
|
||||
|
||||
self.assertEqual(attachments[2]["ContentType"], "application/pdf")
|
||||
self.assertEqual(attachments[2]["Name"], "") # none
|
||||
self.assertEqual(decode_att(attachments[2]["Content"]), pdf_content)
|
||||
self.assertNotIn('ContentID', attachments[2])
|
||||
|
||||
def test_unicode_attachment_correctly_decoded(self):
|
||||
# Slight modification from the Django unicode docs:
|
||||
# http://django.readthedocs.org/en/latest/ref/unicode.html#email
|
||||
self.message.attach("Une pièce jointe.html", '<p>\u2019</p>', mimetype='text/html')
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Attachments'], [{
|
||||
'Name': 'Une pièce jointe.html',
|
||||
'ContentType': 'text/html',
|
||||
'Content': b64encode('<p>\u2019</p>'.encode('utf-8')).decode('ascii')
|
||||
}])
|
||||
|
||||
def test_embedded_images(self):
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
image_path = sample_image_path(image_filename)
|
||||
image_data = sample_image_content(image_filename)
|
||||
|
||||
cid = attach_inline_image_file(self.message, image_path) # Read from a png file
|
||||
html_content = '<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % cid
|
||||
self.message.attach_alternative(html_content, "text/html")
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['HtmlBody'], html_content)
|
||||
|
||||
attachments = data['Attachments']
|
||||
self.assertEqual(len(attachments), 1)
|
||||
self.assertEqual(attachments[0]['Name'], image_filename)
|
||||
self.assertEqual(attachments[0]['ContentType'], 'image/png')
|
||||
self.assertEqual(decode_att(attachments[0]["Content"]), image_data)
|
||||
self.assertEqual(attachments[0]["ContentID"], 'cid:%s' % cid)
|
||||
|
||||
def test_attached_images(self):
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
image_path = sample_image_path(image_filename)
|
||||
image_data = sample_image_content(image_filename)
|
||||
|
||||
self.message.attach_file(image_path) # option 1: attach as a file
|
||||
|
||||
image = MIMEImage(image_data) # option 2: construct the MIMEImage and attach it directly
|
||||
self.message.attach(image)
|
||||
|
||||
image_data_b64 = b64encode(image_data).decode('ascii')
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Attachments'], [
|
||||
{
|
||||
'Name': image_filename, # the named one
|
||||
'ContentType': 'image/png',
|
||||
'Content': image_data_b64,
|
||||
},
|
||||
{
|
||||
'Name': '', # the unnamed one
|
||||
'ContentType': 'image/png',
|
||||
'Content': image_data_b64,
|
||||
},
|
||||
])
|
||||
|
||||
def test_multiple_html_alternatives(self):
|
||||
# Multiple alternatives not allowed
|
||||
self.message.attach_alternative("<p>First html is OK</p>", "text/html")
|
||||
self.message.attach_alternative("<p>But not second html</p>", "text/html")
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
|
||||
def test_html_alternative(self):
|
||||
# Only html alternatives allowed
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
|
||||
def test_alternatives_fail_silently(self):
|
||||
# Make sure fail_silently is respected
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
sent = self.message.send(fail_silently=True)
|
||||
self.assert_esp_not_called("API should not be called when send fails silently")
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_suppress_empty_address_lists(self):
|
||||
"""Empty to, cc, bcc, and reply_to shouldn't generate empty fields"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('Cc', data)
|
||||
self.assertNotIn('Bcc', data)
|
||||
self.assertNotIn('ReplyTo', data)
|
||||
|
||||
# Test empty `to` -- but send requires at least one recipient somewhere (like cc)
|
||||
self.message.to = []
|
||||
self.message.cc = ['cc@example.com']
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('To', data)
|
||||
|
||||
def test_api_failure(self):
|
||||
self.set_mock_response(status_code=500)
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
sent = mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
# Make sure fail_silently is respected
|
||||
self.set_mock_response(status_code=500)
|
||||
sent = mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'], fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_api_error_includes_details(self):
|
||||
"""AnymailAPIError should include ESP's error message"""
|
||||
# JSON error response:
|
||||
error_response = b"""{
|
||||
"ErrorCode": 451,
|
||||
"Message": "Helpful explanation from Postmark."
|
||||
}"""
|
||||
self.set_mock_response(status_code=200, raw=error_response)
|
||||
with self.assertRaisesMessage(AnymailAPIError, "Helpful explanation from Postmark"):
|
||||
self.message.send()
|
||||
|
||||
# Non-JSON error response:
|
||||
self.set_mock_response(status_code=500, raw=b"Ack! Bad proxy!")
|
||||
with self.assertRaisesMessage(AnymailAPIError, "Ack! Bad proxy!"):
|
||||
self.message.send()
|
||||
|
||||
# No content in the error response:
|
||||
self.set_mock_response(status_code=502, raw=None)
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
self.message.send()
|
||||
|
||||
|
||||
class PostmarkBackendAnymailFeatureTests(PostmarkBackendMockAPITestCase):
|
||||
"""Test backend support for Anymail added features"""
|
||||
|
||||
def test_metadata(self):
|
||||
self.message.metadata = {'user_id': "12345", 'items': 6}
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, 'metadata'):
|
||||
self.message.send()
|
||||
|
||||
def test_send_at(self):
|
||||
self.message.send_at = 1651820889 # 2022-05-06 07:08:09 UTC
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, 'send_at'):
|
||||
self.message.send()
|
||||
|
||||
def test_tags(self):
|
||||
self.message.tags = ["receipt"]
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Tag'], "receipt")
|
||||
|
||||
self.message.tags = ["receipt", "repeat-user"]
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, 'multiple tags'):
|
||||
self.message.send()
|
||||
|
||||
def test_track_opens(self):
|
||||
self.message.track_opens = True
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['TrackOpens'], True)
|
||||
|
||||
def test_track_clicks(self):
|
||||
self.message.track_clicks = True
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, 'track_clicks'):
|
||||
self.message.send()
|
||||
|
||||
def test_default_omits_options(self):
|
||||
"""Make sure by default we don't send any ESP-specific options.
|
||||
|
||||
Options not specified by the caller should be omitted entirely from
|
||||
the API call (*not* sent as False or empty). This ensures
|
||||
that your ESP account settings apply by default.
|
||||
"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('Tag', data)
|
||||
self.assertNotIn('TrackOpens', data)
|
||||
|
||||
def test_esp_extra(self):
|
||||
self.message.esp_extra = {
|
||||
'FuturePostmarkOption': 'some-value',
|
||||
}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['FuturePostmarkOption'], 'some-value')
|
||||
|
||||
def test_message_server_token(self):
|
||||
# Can override server-token on a per-message basis:
|
||||
self.message.esp_extra = {
|
||||
'server_token': 'token_for_this_message_only',
|
||||
}
|
||||
self.message.send()
|
||||
headers = self.get_api_call_headers()
|
||||
self.assertEqual(headers["X-Postmark-Server-Token"], "token_for_this_message_only")
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('server_token', data) # not in the json
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_attaches_anymail_status(self):
|
||||
""" The anymail_status should be attached to the message when it is sent """
|
||||
response_content = b"""{
|
||||
"MessageID":"abcdef01-2345-6789-0123-456789abcdef",
|
||||
"ErrorCode":0,
|
||||
"Message":"OK"
|
||||
}"""
|
||||
self.set_mock_response(raw=response_content)
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],)
|
||||
sent = msg.send()
|
||||
self.assertEqual(sent, 1)
|
||||
self.assertEqual(msg.anymail_status.status, {'sent'})
|
||||
self.assertEqual(msg.anymail_status.message_id, 'abcdef01-2345-6789-0123-456789abcdef')
|
||||
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].status, 'sent')
|
||||
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].message_id,
|
||||
'abcdef01-2345-6789-0123-456789abcdef')
|
||||
self.assertEqual(msg.anymail_status.esp_response.content, response_content)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_failed_anymail_status(self):
|
||||
""" If the send fails, anymail_status should contain initial values"""
|
||||
self.set_mock_response(status_code=500)
|
||||
sent = self.message.send(fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
self.assertIsNone(self.message.anymail_status.status)
|
||||
self.assertIsNone(self.message.anymail_status.message_id)
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertIsNone(self.message.anymail_status.esp_response)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_unparsable_response(self):
|
||||
"""If the send succeeds, but a non-JSON API response, should raise an API exception"""
|
||||
mock_response = self.set_mock_response(status_code=200,
|
||||
raw=b"yikes, this isn't a real response")
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
self.message.send()
|
||||
self.assertIsNone(self.message.anymail_status.status)
|
||||
self.assertIsNone(self.message.anymail_status.message_id)
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertEqual(self.message.anymail_status.esp_response, mock_response)
|
||||
|
||||
def test_json_serialization_errors(self):
|
||||
"""Try to provide more information about non-json-serializable data"""
|
||||
self.message.tags = [Decimal('19.99')] # yeah, don't do this
|
||||
with self.assertRaises(AnymailSerializationError) as cm:
|
||||
self.message.send()
|
||||
print(self.get_api_call_json())
|
||||
err = cm.exception
|
||||
self.assertIsInstance(err, TypeError) # compatibility with json.dumps
|
||||
self.assertIn("Don't know how to send this data to Postmark", str(err)) # our added context
|
||||
self.assertIn("Decimal('19.99') is not JSON serializable", str(err)) # original message
|
||||
|
||||
|
||||
class PostmarkBackendRecipientsRefusedTests(PostmarkBackendMockAPITestCase):
|
||||
"""Should raise AnymailRecipientsRefused when *all* recipients are rejected or invalid"""
|
||||
|
||||
def test_recipients_inactive(self):
|
||||
self.set_mock_response(
|
||||
status_code=422,
|
||||
raw=b'{"ErrorCode":406,'
|
||||
b'"Message":"You tried to send to a recipient that has been marked as inactive.\\n'
|
||||
b'Found inactive addresses: hardbounce@example.com, spam@example.com.\\n'
|
||||
b'Inactive recipients are ones that have generated a hard bounce or a spam complaint."}'
|
||||
)
|
||||
msg = mail.EmailMessage('Subject', 'Body', 'from@example.com',
|
||||
['hardbounce@example.com', 'Hates Spam <spam@example.com>'])
|
||||
with self.assertRaises(AnymailRecipientsRefused):
|
||||
msg.send()
|
||||
status = msg.anymail_status
|
||||
self.assertEqual(status.recipients['hardbounce@example.com'].status, 'rejected')
|
||||
self.assertEqual(status.recipients['spam@example.com'].status, 'rejected')
|
||||
|
||||
def test_recipients_invalid(self):
|
||||
self.set_mock_response(
|
||||
status_code=422,
|
||||
raw=b"""{"ErrorCode":300,"Message":"Invalid 'To' address: 'invalid@localhost'."}"""
|
||||
)
|
||||
msg = mail.EmailMessage('Subject', 'Body', 'from@example.com', ['invalid@localhost'])
|
||||
with self.assertRaises(AnymailRecipientsRefused):
|
||||
msg.send()
|
||||
status = msg.anymail_status
|
||||
self.assertEqual(status.recipients['invalid@localhost'].status, 'invalid')
|
||||
|
||||
def test_from_email_invalid(self):
|
||||
# Invalid 'From' address generates same Postmark ErrorCode 300 as invalid 'To',
|
||||
# but should raise a different Anymail error
|
||||
self.set_mock_response(
|
||||
status_code=422,
|
||||
raw=b"""{"ErrorCode":300,"Message":"Invalid 'From' address: 'invalid@localhost'."}"""
|
||||
)
|
||||
msg = mail.EmailMessage('Subject', 'Body', 'invalid@localhost', ['to@example.com'])
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
msg.send()
|
||||
|
||||
def test_fail_silently(self):
|
||||
self.set_mock_response(
|
||||
status_code=422,
|
||||
raw=b'{"ErrorCode":406,'
|
||||
b'"Message":"You tried to send to a recipient that has been marked as inactive.\\n'
|
||||
b'Found inactive addresses: hardbounce@example.com, spam@example.com.\\n'
|
||||
b'Inactive recipients are ones that have generated a hard bounce or a spam complaint."}'
|
||||
)
|
||||
msg = mail.EmailMessage('Subject', 'Body', 'from@example.com',
|
||||
['hardbounce@example.com', 'Hates Spam <spam@example.com>'])
|
||||
msg.send(fail_silently=True)
|
||||
status = msg.anymail_status
|
||||
self.assertEqual(status.recipients['hardbounce@example.com'].status, 'rejected')
|
||||
self.assertEqual(status.recipients['spam@example.com'].status, 'rejected')
|
||||
|
||||
@override_settings(ANYMAIL_IGNORE_RECIPIENT_STATUS=True)
|
||||
def test_ignore_recipient_status(self):
|
||||
self.set_mock_response(
|
||||
status_code=422,
|
||||
raw=b'{"ErrorCode":406,'
|
||||
b'"Message":"You tried to send to a recipient that has been marked as inactive.\\n'
|
||||
b'Found inactive addresses: hardbounce@example.com, spam@example.com.\\n'
|
||||
b'Inactive recipients are ones that have generated a hard bounce or a spam complaint. "}'
|
||||
)
|
||||
msg = mail.EmailMessage('Subject', 'Body', 'from@example.com',
|
||||
['hardbounce@example.com', 'Hates Spam <spam@example.com>'])
|
||||
msg.send()
|
||||
status = msg.anymail_status
|
||||
self.assertEqual(status.recipients['hardbounce@example.com'].status, 'rejected')
|
||||
self.assertEqual(status.recipients['spam@example.com'].status, 'rejected')
|
||||
|
||||
def test_mixed_response(self):
|
||||
"""If *any* recipients are valid or queued, no exception is raised"""
|
||||
self.set_mock_response(
|
||||
status_code=200,
|
||||
raw=b'{"To":"hardbounce@example.com, valid@example.com, Hates Spam <spam@example.com>",'
|
||||
b'"SubmittedAt":"2016-03-12T22:59:06.2505871-05:00",'
|
||||
b'"MessageID":"089dce03-feee-408e-9f0c-ee69bf1c5f35",'
|
||||
b'"ErrorCode":0,'
|
||||
b'"Message":"Message OK, but will not deliver to these inactive addresses:'
|
||||
b' hardbounce@example.com, spam@example.com.'
|
||||
b' Inactive recipients are ones that have generated a hard bounce or a spam complaint."}'
|
||||
)
|
||||
msg = mail.EmailMessage('Subject', 'Body', 'from@example.com',
|
||||
['hardbounce@example.com', 'valid@example.com', 'Hates Spam <spam@example.com>'])
|
||||
sent = msg.send()
|
||||
self.assertEqual(sent, 1) # one message sent, successfully, to 1 of 3 recipients
|
||||
status = msg.anymail_status
|
||||
self.assertEqual(status.recipients['hardbounce@example.com'].status, 'rejected')
|
||||
self.assertEqual(status.recipients['valid@example.com'].status, 'sent')
|
||||
self.assertEqual(status.recipients['spam@example.com'].status, 'rejected')
|
||||
|
||||
|
||||
@override_settings(ANYMAIL_SEND_DEFAULTS={
|
||||
'tags': ['globaltag'],
|
||||
'track_opens': True,
|
||||
'esp_extra': {'globaloption': 'globalsetting'},
|
||||
})
|
||||
class PostmarkBackendSendDefaultsTests(PostmarkBackendMockAPITestCase):
|
||||
"""Tests backend support for global SEND_DEFAULTS"""
|
||||
|
||||
def test_send_defaults(self):
|
||||
"""Test that global send defaults are applied"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
# All these values came from ANYMAIL_SEND_DEFAULTS:
|
||||
self.assertEqual(data['Tag'], 'globaltag')
|
||||
self.assertEqual(data['TrackOpens'], True)
|
||||
self.assertEqual(data['globaloption'], 'globalsetting')
|
||||
|
||||
def test_merge_message_with_send_defaults(self):
|
||||
"""Test that individual message settings are *merged into* the global send defaults"""
|
||||
self.message.tags = None # can't really append (since only one tag), but can suppress it
|
||||
self.message.track_opens = False
|
||||
self.message.esp_extra = {'messageoption': 'messagesetting'}
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
# All these values came from ANYMAIL_SEND_DEFAULTS + message.*:
|
||||
self.assertNotIn('Tag', data)
|
||||
self.assertEqual(data['TrackOpens'], False)
|
||||
self.assertEqual(data['globaloption'], 'globalsetting')
|
||||
self.assertEqual(data['messageoption'], 'messagesetting') # additional esp_extra
|
||||
|
||||
@override_settings(ANYMAIL_POSTMARK_SEND_DEFAULTS={
|
||||
'tags': ['esptag'],
|
||||
'track_opens': False,
|
||||
})
|
||||
def test_esp_send_defaults(self):
|
||||
"""Test that ESP-specific send defaults override individual global defaults"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
# All these values came from ANYMAIL_SEND_DEFAULTS plus ANYMAIL_SENDGRID_SEND_DEFAULTS:
|
||||
self.assertEqual(data['Tag'], 'esptag') # entire tags overridden
|
||||
self.assertEqual(data['TrackOpens'], False) # esp override
|
||||
self.assertEqual(data['globaloption'], 'globalsetting') # we didn't override the global esp_extra
|
||||
|
||||
|
||||
@override_settings(EMAIL_BACKEND="anymail.backends.postmark.PostmarkBackend")
|
||||
class PostmarkBackendImproperlyConfiguredTests(SimpleTestCase, AnymailTestMixin):
|
||||
"""Test ESP backend without required settings in place"""
|
||||
|
||||
def test_missing_api_key(self):
|
||||
with self.assertRaises(ImproperlyConfigured) as cm:
|
||||
mail.send_mail('Subject', 'Message', 'from@example.com', ['to@example.com'])
|
||||
errmsg = str(cm.exception)
|
||||
self.assertRegex(errmsg, r'\bPOSTMARK_SERVER_TOKEN\b')
|
||||
self.assertRegex(errmsg, r'\bANYMAIL_POSTMARK_SERVER_TOKEN\b')
|
||||
82
anymail/tests/test_postmark_integration.py
Normal file
82
anymail/tests/test_postmark_integration.py
Normal file
@@ -0,0 +1,82 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.test import SimpleTestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from anymail.exceptions import AnymailAPIError
|
||||
from anymail.message import AnymailMessage
|
||||
|
||||
from .utils import AnymailTestMixin, sample_image_path
|
||||
|
||||
|
||||
@override_settings(ANYMAIL_POSTMARK_SERVER_TOKEN="POSTMARK_API_TEST",
|
||||
EMAIL_BACKEND="anymail.backends.postmark.PostmarkBackend")
|
||||
class PostmarkBackendIntegrationTests(SimpleTestCase, AnymailTestMixin):
|
||||
"""Postmark API integration tests
|
||||
|
||||
These tests run against the **live** Postmark API, but using a
|
||||
test key that's not capable of sending actual email.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(PostmarkBackendIntegrationTests, self).setUp()
|
||||
self.message = AnymailMessage('Anymail Postmark integration test', 'Text content',
|
||||
'from@example.com', ['to@example.com'])
|
||||
self.message.attach_alternative('<p>HTML content</p>', "text/html")
|
||||
|
||||
def test_simple_send(self):
|
||||
# Example of getting the SendGrid send status and message id from the message
|
||||
sent_count = self.message.send()
|
||||
self.assertEqual(sent_count, 1)
|
||||
|
||||
anymail_status = self.message.anymail_status
|
||||
sent_status = anymail_status.recipients['to@example.com'].status
|
||||
message_id = anymail_status.recipients['to@example.com'].message_id
|
||||
|
||||
self.assertEqual(sent_status, 'sent')
|
||||
self.assertGreater(len(message_id), 0) # non-empty string
|
||||
self.assertEqual(anymail_status.status, {sent_status}) # set of all recipient statuses
|
||||
self.assertEqual(anymail_status.message_id, message_id)
|
||||
|
||||
def test_all_options(self):
|
||||
message = AnymailMessage(
|
||||
subject="Anymail all-options integration test",
|
||||
body="This is the text body",
|
||||
from_email="Test From <from@example.com>",
|
||||
to=["to1@example.com", "Recipient 2 <to2@example.com>"],
|
||||
cc=["cc1@example.com", "Copy 2 <cc2@example.com>"],
|
||||
bcc=["bcc1@example.com", "Blind Copy 2 <bcc2@example.com>"],
|
||||
reply_to=["reply1@example.com", "Reply 2 <reply2@example.com>"],
|
||||
headers={"X-Anymail-Test": "value"},
|
||||
|
||||
# no metadata, send_at, track_clicks support
|
||||
tags=["tag 1"], # max one tag
|
||||
track_opens=True,
|
||||
)
|
||||
message.attach("attachment1.txt", "Here is some\ntext for you", "text/plain")
|
||||
message.attach("attachment2.csv", "ID,Name\n1,Amy Lina", "text/csv")
|
||||
cid = message.attach_inline_image_file(sample_image_path())
|
||||
message.attach_alternative(
|
||||
"<p><b>HTML:</b> with <a href='http://example.com'>link</a>"
|
||||
"and image: <img src='cid:%s'></div>" % cid,
|
||||
"text/html")
|
||||
|
||||
message.send()
|
||||
self.assertEqual(message.anymail_status.status, {'sent'})
|
||||
|
||||
def test_invalid_from(self):
|
||||
self.message.from_email = 'webmaster@localhost' # Django's default From
|
||||
with self.assertRaises(AnymailAPIError) as cm:
|
||||
self.message.send()
|
||||
err = cm.exception
|
||||
self.assertEqual(err.status_code, 422)
|
||||
self.assertIn("Invalid 'From' address", str(err))
|
||||
|
||||
@override_settings(ANYMAIL_POSTMARK_SERVER_TOKEN="Hey, that's not a server token!")
|
||||
def test_invalid_server_token(self):
|
||||
with self.assertRaises(AnymailAPIError) as cm:
|
||||
self.message.send()
|
||||
err = cm.exception
|
||||
self.assertEqual(err.status_code, 401)
|
||||
# Make sure the exception message includes Postmark's response:
|
||||
self.assertIn("Bad or missing Server API token", str(err))
|
||||
Reference in New Issue
Block a user