mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-20 03:41:05 -05:00
Mailjet backend support (#66)
Add Mailjet backend. (Docs and webhooks to follow.) Thanks to @Lekensteyn and @calvin.
This commit is contained in:
@@ -2,6 +2,8 @@ Anymail
|
||||
=======
|
||||
|
||||
Mike Edmunds
|
||||
Calvin Jeong
|
||||
Peter Wu
|
||||
|
||||
|
||||
Anymail was forked from Djrill, which included contributions from:
|
||||
|
||||
258
anymail/backends/mailjet.py
Normal file
258
anymail/backends/mailjet.py
Normal file
@@ -0,0 +1,258 @@
|
||||
from ..exceptions import AnymailRequestsAPIError
|
||||
from ..message import AnymailRecipientStatus, ANYMAIL_STATUSES
|
||||
from ..utils import get_anymail_setting, ParsedEmail, parse_address_list
|
||||
|
||||
from .base_requests import AnymailRequestsBackend, RequestsPayload
|
||||
|
||||
|
||||
class EmailBackend(AnymailRequestsBackend):
|
||||
"""
|
||||
Mailjet API Email Backend
|
||||
"""
|
||||
|
||||
esp_name = "Mailjet"
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Init options from Django settings"""
|
||||
esp_name = self.esp_name
|
||||
self.api_key = get_anymail_setting('api_key', esp_name=esp_name, kwargs=kwargs, allow_bare=True)
|
||||
self.secret_key = get_anymail_setting('secret_key', esp_name=esp_name, kwargs=kwargs, allow_bare=True)
|
||||
api_url = get_anymail_setting('api_url', esp_name=esp_name, kwargs=kwargs,
|
||||
default="https://api.mailjet.com/v3")
|
||||
if not api_url.endswith("/"):
|
||||
api_url += "/"
|
||||
super(EmailBackend, self).__init__(api_url, **kwargs)
|
||||
|
||||
def build_message_payload(self, message, defaults):
|
||||
return MailjetPayload(message, defaults, self)
|
||||
|
||||
def raise_for_status(self, response, payload, message):
|
||||
# Improve Mailjet's (lack of) error message for bad API key
|
||||
if response.status_code == 401 and not response.content:
|
||||
raise AnymailRequestsAPIError(
|
||||
"Invalid Mailjet API key or secret",
|
||||
email_message=message, payload=payload, response=response, backend=self)
|
||||
super(EmailBackend, self).raise_for_status(response, payload, message)
|
||||
|
||||
def parse_recipient_status(self, response, payload, message):
|
||||
# Mailjet's (v3.0) transactional send API is not covered in their reference docs.
|
||||
# The response appears to be either:
|
||||
# {"Sent": [{"Email": ..., "MessageID": ...}, ...]}
|
||||
# where only successful recipients are included
|
||||
# or if the entire call has failed:
|
||||
# {"ErrorCode": nnn, "Message": ...}
|
||||
parsed_response = self.deserialize_json_response(response, payload, message)
|
||||
if "ErrorCode" in parsed_response:
|
||||
raise AnymailRequestsAPIError(email_message=message, payload=payload, response=response,
|
||||
backend=self)
|
||||
|
||||
recipient_status = {}
|
||||
try:
|
||||
for key in parsed_response:
|
||||
status = key.lower()
|
||||
if status not in ANYMAIL_STATUSES:
|
||||
status = 'unknown'
|
||||
|
||||
for item in parsed_response[key]:
|
||||
message_id = str(item['MessageID'])
|
||||
email = item['Email']
|
||||
recipient_status[email] = AnymailRecipientStatus(message_id=message_id, status=status)
|
||||
except (KeyError, TypeError):
|
||||
raise AnymailRequestsAPIError("Invalid Mailjet API response format",
|
||||
email_message=message, payload=payload, response=response,
|
||||
backend=self)
|
||||
# Make sure we ended up with a status for every original recipient
|
||||
# (Mailjet only communicates "Sent")
|
||||
for recipients in payload.recipients.values():
|
||||
for email in recipients:
|
||||
if email.email not in recipient_status:
|
||||
recipient_status[email.email] = AnymailRecipientStatus(message_id=None, status='unknown')
|
||||
|
||||
return recipient_status
|
||||
|
||||
|
||||
class MailjetPayload(RequestsPayload):
|
||||
|
||||
def __init__(self, message, defaults, backend, *args, **kwargs):
|
||||
self.esp_extra = {} # late-bound in serialize_data
|
||||
auth = (backend.api_key, backend.secret_key)
|
||||
http_headers = {
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
# Late binding of recipients and their variables
|
||||
self.recipients = {}
|
||||
self.merge_data = None
|
||||
super(MailjetPayload, self).__init__(message, defaults, backend,
|
||||
auth=auth, headers=http_headers, *args, **kwargs)
|
||||
|
||||
def get_api_endpoint(self):
|
||||
return "send"
|
||||
|
||||
def serialize_data(self):
|
||||
self._finish_recipients()
|
||||
self._populate_sender_from_template()
|
||||
return self.serialize_json(self.data)
|
||||
|
||||
#
|
||||
# Payload construction
|
||||
#
|
||||
|
||||
def _finish_recipients(self):
|
||||
# NOTE do not set both To and Recipients, it behaves specially: each
|
||||
# recipient receives a separate mail but the To address receives one
|
||||
# listing all recipients.
|
||||
if "cc" in self.recipients or "bcc" in self.recipients:
|
||||
self._finish_recipients_single()
|
||||
else:
|
||||
self._finish_recipients_with_vars()
|
||||
|
||||
def _populate_sender_from_template(self):
|
||||
# If no From address was given, use the address from the template.
|
||||
# Unfortunately, API 3.0 requires the From address to be given, so let's
|
||||
# query it when needed. This will supposedly be fixed in 3.1 with a
|
||||
# public beta in May 2017.
|
||||
template_id = self.data.get("Mj-TemplateID")
|
||||
if template_id and not self.data.get("FromEmail"):
|
||||
response = self.backend.session.get(
|
||||
"%sREST/template/%s/detailcontent" % (self.backend.api_url, template_id),
|
||||
auth=self.auth
|
||||
)
|
||||
self.backend.raise_for_status(response, None, self.message)
|
||||
json_response = self.backend.deserialize_json_response(response, None, self.message)
|
||||
# Populate email address header from template.
|
||||
try:
|
||||
headers = json_response["Data"][0]["Headers"]
|
||||
if "From" in headers:
|
||||
# Workaround Mailjet returning malformed From header
|
||||
# if there's a comma in the template's From display-name:
|
||||
from_email = headers["From"].replace(",", "||COMMA||")
|
||||
parsed = parse_address_list([from_email])[0]
|
||||
if parsed.name:
|
||||
parsed.name = parsed.name.replace("||COMMA||", ",")
|
||||
else:
|
||||
name_addr = (headers["SenderName"], headers["SenderEmail"])
|
||||
parsed = ParsedEmail(name_addr)
|
||||
except KeyError:
|
||||
raise AnymailRequestsAPIError("Invalid Mailjet template API response",
|
||||
email_message=self.message, response=response, backend=self.backend)
|
||||
self.set_from_email(parsed)
|
||||
|
||||
def _finish_recipients_with_vars(self):
|
||||
"""Send bulk mail with different variables for each mail."""
|
||||
assert "Cc" not in self.data and "Bcc" not in self.data
|
||||
recipients = []
|
||||
merge_data = self.merge_data or {}
|
||||
for email in self.recipients["to"]:
|
||||
recipient = {
|
||||
"Email": email.email,
|
||||
"Name": email.name,
|
||||
"Vars": merge_data.get(email.email)
|
||||
}
|
||||
# Strip out empty Name and Vars
|
||||
recipient = {k: v for k, v in recipient.items() if v}
|
||||
recipients.append(recipient)
|
||||
self.data["Recipients"] = recipients
|
||||
|
||||
def _finish_recipients_single(self):
|
||||
"""Send a single mail with some To, Cc and Bcc headers."""
|
||||
assert "Recipients" not in self.data
|
||||
if self.merge_data:
|
||||
# When Cc and Bcc headers are given, then merge data cannot be set.
|
||||
raise NotImplementedError("Cannot set merge data with bcc/cc")
|
||||
for recipient_type, emails in self.recipients.items():
|
||||
# Workaround Mailjet 3.0 bug parsing display-name with commas
|
||||
# (see test_comma_in_display_name in test_mailjet_backend for details)
|
||||
formatted_emails = [
|
||||
email.address if "," not in email.name
|
||||
# else name has a comma, so force it into MIME encoded-word utf-8 syntax:
|
||||
else ParsedEmail((email.name.encode('utf-8'), email.email)).formataddr('utf-8')
|
||||
for email in emails
|
||||
]
|
||||
self.data[recipient_type.capitalize()] = ", ".join(formatted_emails)
|
||||
|
||||
def init_payload(self):
|
||||
self.data = {
|
||||
}
|
||||
|
||||
def set_from_email(self, email):
|
||||
self.data["FromEmail"] = email.email
|
||||
if email.name:
|
||||
self.data["FromName"] = email.name
|
||||
|
||||
def set_recipients(self, recipient_type, emails):
|
||||
assert recipient_type in ["to", "cc", "bcc"]
|
||||
# Will be handled later in serialize_data
|
||||
if emails:
|
||||
self.recipients[recipient_type] = emails
|
||||
|
||||
def set_subject(self, subject):
|
||||
self.data["Subject"] = subject
|
||||
|
||||
def set_reply_to(self, emails):
|
||||
headers = self.data.setdefault("Headers", {})
|
||||
if emails:
|
||||
headers["Reply-To"] = ", ".join([str(email) for email in emails])
|
||||
elif "Reply-To" in headers:
|
||||
del headers["Reply-To"]
|
||||
|
||||
def set_extra_headers(self, headers):
|
||||
self.data.setdefault("Headers", {}).update(headers)
|
||||
|
||||
def set_text_body(self, body):
|
||||
self.data["Text-part"] = body
|
||||
|
||||
def set_html_body(self, body):
|
||||
if "Html-part" in self.data:
|
||||
# second html body could show up through multiple alternatives, or html body + alternative
|
||||
self.unsupported_feature("multiple html parts")
|
||||
|
||||
self.data["Html-part"] = body
|
||||
|
||||
def add_attachment(self, attachment):
|
||||
if attachment.inline:
|
||||
field = "Inline_attachments"
|
||||
name = attachment.cid
|
||||
else:
|
||||
field = "Attachments"
|
||||
name = attachment.name or ""
|
||||
self.data.setdefault(field, []).append({
|
||||
"Content-type": attachment.mimetype,
|
||||
"Filename": name,
|
||||
"content": attachment.b64content
|
||||
})
|
||||
|
||||
def set_metadata(self, metadata):
|
||||
# Mailjet expects a single string payload
|
||||
self.data["Mj-EventPayLoad"] = self.serialize_json(metadata)
|
||||
|
||||
def set_tags(self, tags):
|
||||
# The choices here are CustomID or Campaign, and Campaign seems closer
|
||||
# to how "tags" are handled by other ESPs -- e.g., you can view dashboard
|
||||
# statistics across all messages with the same Campaign.
|
||||
if len(tags) > 0:
|
||||
self.data["Tag"] = tags[0]
|
||||
self.data["Mj-campaign"] = tags[0]
|
||||
if len(tags) > 1:
|
||||
self.unsupported_feature('multiple tags (%r)' % tags)
|
||||
|
||||
def set_track_clicks(self, track_clicks):
|
||||
# 1 disables tracking, 2 enables tracking
|
||||
self.data["Mj-trackclick"] = 2 if track_clicks else 1
|
||||
|
||||
def set_track_opens(self, track_opens):
|
||||
# 1 disables tracking, 2 enables tracking
|
||||
self.data["Mj-trackopen"] = 2 if track_opens else 1
|
||||
|
||||
def set_template_id(self, template_id):
|
||||
self.data["Mj-TemplateID"] = template_id
|
||||
self.data["Mj-TemplateLanguage"] = True
|
||||
|
||||
def set_merge_data(self, merge_data):
|
||||
# Will be handled later in serialize_data
|
||||
self.merge_data = merge_data
|
||||
|
||||
def set_merge_global_data(self, merge_global_data):
|
||||
self.data["Vars"] = merge_global_data
|
||||
|
||||
def set_esp_extra(self, extra):
|
||||
self.data.update(extra)
|
||||
@@ -1,6 +1,7 @@
|
||||
import json
|
||||
from traceback import format_exception_only
|
||||
|
||||
import six
|
||||
from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation
|
||||
from requests import HTTPError
|
||||
|
||||
@@ -65,7 +66,16 @@ class AnymailError(Exception):
|
||||
"""Return a formatted string of self.status_code and response, or None"""
|
||||
if self.status_code is None:
|
||||
return None
|
||||
description = "%s API response %d:" % (self.esp_name or "ESP", self.status_code)
|
||||
|
||||
# Decode response.reason to text -- borrowed from requests.Response.raise_for_status:
|
||||
reason = self.response.reason
|
||||
if isinstance(reason, six.binary_type):
|
||||
try:
|
||||
reason = reason.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
reason = reason.decode('iso-8859-1')
|
||||
|
||||
description = "%s API response %d: %s" % (self.esp_name or "ESP", self.status_code, reason)
|
||||
try:
|
||||
json_response = self.response.json()
|
||||
description += "\n" + json.dumps(json_response, indent=2)
|
||||
|
||||
@@ -21,11 +21,13 @@ class RequestsBackendMockAPITestCase(SimpleTestCase, AnymailTestMixin):
|
||||
|
||||
class MockResponse(requests.Response):
|
||||
"""requests.request return value mock sufficient for testing"""
|
||||
def __init__(self, status_code=200, raw=b"RESPONSE", encoding='utf-8'):
|
||||
def __init__(self, status_code=200, raw=b"RESPONSE", encoding='utf-8', reason=None):
|
||||
super(RequestsBackendMockAPITestCase.MockResponse, self).__init__()
|
||||
self.status_code = status_code
|
||||
self.encoding = encoding
|
||||
self.raw = six.BytesIO(raw)
|
||||
self.reason = reason or ("OK" if 200 <= status_code < 300 else "ERROR")
|
||||
# six.BytesIO(None) returns b'None' in PY2 (rather than b'')
|
||||
self.raw = six.BytesIO(raw) if raw is not None else six.BytesIO()
|
||||
|
||||
def setUp(self):
|
||||
super(RequestsBackendMockAPITestCase, self).setUp()
|
||||
@@ -34,10 +36,10 @@ class RequestsBackendMockAPITestCase(SimpleTestCase, AnymailTestMixin):
|
||||
self.addCleanup(self.patch_request.stop)
|
||||
self.set_mock_response()
|
||||
|
||||
def set_mock_response(self, status_code=DEFAULT_STATUS_CODE, raw=UNSET, encoding='utf-8'):
|
||||
def set_mock_response(self, status_code=DEFAULT_STATUS_CODE, raw=UNSET, encoding='utf-8', reason=None):
|
||||
if raw is UNSET:
|
||||
raw = self.DEFAULT_RAW_RESPONSE
|
||||
mock_response = self.MockResponse(status_code, raw, encoding)
|
||||
mock_response = self.MockResponse(status_code, raw=raw, encoding=encoding, reason=reason)
|
||||
self.mock_request.return_value = mock_response
|
||||
return mock_response
|
||||
|
||||
|
||||
640
tests/test_mailjet_backend.py
Normal file
640
tests/test_mailjet_backend.py
Normal file
@@ -0,0 +1,640 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from base64 import b64encode
|
||||
from decimal import Decimal
|
||||
from email.mime.base import MIMEBase
|
||||
from email.mime.image import MIMEImage
|
||||
|
||||
from django.core import mail
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.test import SimpleTestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from anymail.exceptions import (AnymailAPIError, AnymailSerializationError,
|
||||
AnymailUnsupportedFeature,
|
||||
AnymailRequestsAPIError)
|
||||
from anymail.message import attach_inline_image_file
|
||||
|
||||
from .mock_requests_backend import RequestsBackendMockAPITestCase, SessionSharingTestCasesMixin
|
||||
from .utils import sample_image_content, sample_image_path, SAMPLE_IMAGE_FILENAME, AnymailTestMixin, decode_att
|
||||
|
||||
|
||||
@override_settings(EMAIL_BACKEND='anymail.backends.mailjet.EmailBackend',
|
||||
ANYMAIL={
|
||||
'MAILJET_API_KEY': '',
|
||||
'MAILJET_SECRET_KEY': ''
|
||||
})
|
||||
class MailjetBackendMockAPITestCase(RequestsBackendMockAPITestCase):
|
||||
DEFAULT_RAW_RESPONSE = b"""{
|
||||
"Sent": [{
|
||||
"Email": "to@example.com",
|
||||
"MessageID": 12345678901234567
|
||||
}]
|
||||
}"""
|
||||
|
||||
DEFAULT_TEMPLATE_RESPONSE = b"""{
|
||||
"Count": 1,
|
||||
"Data": [{
|
||||
"Text-part": "text body",
|
||||
"Html-part": "html body",
|
||||
"MJMLContent": "",
|
||||
"Headers": {
|
||||
"Subject": "Hello World!",
|
||||
"SenderName": "Friendly Tester",
|
||||
"SenderEmail": "some@example.com",
|
||||
"ReplyEmail": ""
|
||||
}
|
||||
}],
|
||||
"Total": 1
|
||||
}"""
|
||||
|
||||
def setUp(self):
|
||||
super(MailjetBackendMockAPITestCase, self).setUp()
|
||||
# Simple message useful for many tests
|
||||
self.message = mail.EmailMultiAlternatives('Subject', 'Text Body', 'from@example.com', ['to@example.com'])
|
||||
|
||||
def set_template_response(self, status_code=200, raw=None):
|
||||
"""Sets an expectation for a template and populate its response."""
|
||||
if raw is None:
|
||||
raw = self.DEFAULT_TEMPLATE_RESPONSE
|
||||
template_response = RequestsBackendMockAPITestCase.MockResponse(status_code, raw)
|
||||
self.mock_request.side_effect = iter([
|
||||
template_response,
|
||||
self.mock_request.return_value
|
||||
])
|
||||
|
||||
|
||||
class MailjetBackendStandardEmailTests(MailjetBackendMockAPITestCase):
|
||||
"""Test backend support for Django standard email features"""
|
||||
|
||||
def test_send_mail(self):
|
||||
"""Test basic API for simple send"""
|
||||
mail.send_mail('Subject here', 'Here is the message.',
|
||||
'from@sender.example.com', ['to@example.com'], fail_silently=False)
|
||||
self.assert_esp_called('/v3/send')
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Subject'], "Subject here")
|
||||
self.assertEqual(data['Text-part'], "Here is the message.")
|
||||
self.assertEqual(data['FromEmail'], "from@sender.example.com")
|
||||
self.assertEqual(data['Recipients'], [{"Email": "to@example.com"}])
|
||||
|
||||
def test_name_addr(self):
|
||||
"""Make sure RFC2822 name-addr format (with display-name) is allowed
|
||||
|
||||
(Test both sender and recipient addresses)
|
||||
"""
|
||||
msg = mail.EmailMessage(
|
||||
'Subject', 'Message', 'From Name <from@example.com>',
|
||||
['Recipient #1 <to1@example.com>', 'to2@example.com'],
|
||||
cc=['Carbon Copy <cc1@example.com>', 'cc2@example.com'],
|
||||
bcc=['Blind Copy <bcc1@example.com>', 'bcc2@example.com'])
|
||||
msg.send()
|
||||
data = self.get_api_call_json()
|
||||
# See https://dev.mailjet.com/guides/#sending-a-basic-email
|
||||
self.assertEqual(data['FromName'], 'From Name')
|
||||
self.assertEqual(data['FromEmail'], 'from@example.com')
|
||||
self.assertEqual(data['To'], 'Recipient #1 <to1@example.com>, to2@example.com')
|
||||
self.assertEqual(data['Cc'], 'Carbon Copy <cc1@example.com>, cc2@example.com')
|
||||
self.assertEqual(data['Bcc'], 'Blind Copy <bcc1@example.com>, bcc2@example.com')
|
||||
|
||||
def test_comma_in_display_name(self):
|
||||
# note there are two paths: with cc/bcc, and without
|
||||
msg = mail.EmailMessage(
|
||||
'Subject', 'Message', '"Example, Inc." <from@example.com>',
|
||||
['"Recipient, Ltd." <to@example.com>'])
|
||||
msg.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['FromName'], 'Example, Inc.')
|
||||
self.assertEqual(data['FromEmail'], 'from@example.com')
|
||||
self.assertEqual(data['Recipients'][0]["Email"], "to@example.com")
|
||||
self.assertEqual(data['Recipients'][0]["Name"], "Recipient, Ltd.") # separate Name field works fine
|
||||
|
||||
# Mailjet 3.0 API doesn't properly parse RFC-2822 quoted display-names from To/Cc/Bcc:
|
||||
# `To: "Recipient, Ltd." <to@example.com>` tries to send messages to `"Recipient`
|
||||
# and to `Ltd.` (neither of which are actual email addresses).
|
||||
# As a workaround, force MIME "encoded-word" utf-8 encoding, which gets past Mailjet's broken parsing.
|
||||
# (This shouldn't be necessary in Mailjet 3.1, where Name becomes a separate json field for Cc/Bcc.)
|
||||
msg.cc = ['cc@example.com']
|
||||
msg.send()
|
||||
data = self.get_api_call_json()
|
||||
# self.assertEqual(data['To'], '"Recipient, Ltd." <to@example.com>') # this doesn't work
|
||||
self.assertEqual(data['To'], '=?utf-8?q?Recipient=2C_Ltd=2E?= <to@example.com>') # workaround
|
||||
|
||||
def test_email_message(self):
|
||||
email = mail.EmailMessage(
|
||||
'Subject', 'Body goes here', 'from@example.com',
|
||||
['to1@example.com', 'Also To <to2@example.com>'],
|
||||
bcc=['bcc1@example.com', 'Also BCC <bcc2@example.com>'],
|
||||
cc=['cc1@example.com', 'Also CC <cc2@example.com>'],
|
||||
headers={'Reply-To': 'another@example.com',
|
||||
'X-MyHeader': 'my value'})
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Subject'], "Subject")
|
||||
self.assertEqual(data['Text-part'], "Body goes here")
|
||||
self.assertEqual(data['FromEmail'], "from@example.com")
|
||||
self.assertEqual(data['To'], 'to1@example.com, Also To <to2@example.com>')
|
||||
self.assertEqual(data['Bcc'], 'bcc1@example.com, Also BCC <bcc2@example.com>')
|
||||
self.assertEqual(data['Cc'], 'cc1@example.com, Also CC <cc2@example.com>')
|
||||
self.assertCountEqual(data['Headers'], {
|
||||
'Reply-To': 'another@example.com',
|
||||
'X-MyHeader': 'my value',
|
||||
})
|
||||
|
||||
def test_html_message(self):
|
||||
text_content = 'This is an important message.'
|
||||
html_content = '<p>This is an <strong>important</strong> message.</p>'
|
||||
email = mail.EmailMultiAlternatives('Subject', text_content,
|
||||
'from@example.com', ['to@example.com'])
|
||||
email.attach_alternative(html_content, "text/html")
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Text-part'], text_content)
|
||||
self.assertEqual(data['Html-part'], html_content)
|
||||
# Don't accidentally send the html part as an attachment:
|
||||
self.assertNotIn('Attachments', data)
|
||||
|
||||
def test_html_only_message(self):
|
||||
html_content = '<p>This is an <strong>important</strong> message.</p>'
|
||||
email = mail.EmailMessage('Subject', html_content, 'from@example.com', ['to@example.com'])
|
||||
email.content_subtype = "html" # Main content is now text/html
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('Text-part', data)
|
||||
self.assertEqual(data['Html-part'], html_content)
|
||||
|
||||
def test_extra_headers(self):
|
||||
self.message.extra_headers = {'X-Custom': 'string', 'X-Num': 123}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertCountEqual(data['Headers'], {
|
||||
'X-Custom': 'string',
|
||||
'X-Num': 123,
|
||||
})
|
||||
|
||||
def test_extra_headers_serialization_error(self):
|
||||
self.message.extra_headers = {'X-Custom': Decimal(12.5)}
|
||||
with self.assertRaisesMessage(AnymailSerializationError, "Decimal"):
|
||||
self.message.send()
|
||||
|
||||
def test_reply_to(self):
|
||||
email = mail.EmailMessage('Subject', 'Body goes here', 'from@example.com', ['to1@example.com'],
|
||||
reply_to=['reply@example.com', 'Other <reply2@example.com>'],
|
||||
headers={'X-Other': 'Keep'})
|
||||
email.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Headers'], {
|
||||
'Reply-To': 'reply@example.com, Other <reply2@example.com>',
|
||||
'X-Other': 'Keep'
|
||||
}) # don't lose other headers
|
||||
|
||||
def test_attachments(self):
|
||||
text_content = "* Item one\n* Item two\n* Item three"
|
||||
self.message.attach(filename="test.txt", content=text_content, mimetype="text/plain")
|
||||
|
||||
# Should guess mimetype if not provided...
|
||||
png_content = b"PNG\xb4 pretend this is the contents of a png file"
|
||||
self.message.attach(filename="test.png", content=png_content)
|
||||
|
||||
# Should work with a MIMEBase object (also tests no filename)...
|
||||
pdf_content = b"PDF\xb4 pretend this is valid pdf data"
|
||||
mimeattachment = MIMEBase('application', 'pdf')
|
||||
mimeattachment.set_payload(pdf_content)
|
||||
self.message.attach(mimeattachment)
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
attachments = data['Attachments']
|
||||
self.assertEqual(len(attachments), 3)
|
||||
self.assertEqual(attachments[0]["Filename"], "test.txt")
|
||||
self.assertEqual(attachments[0]["Content-type"], "text/plain")
|
||||
self.assertEqual(decode_att(attachments[0]["content"]).decode('ascii'), text_content)
|
||||
self.assertNotIn('ContentID', attachments[0])
|
||||
|
||||
self.assertEqual(attachments[1]["Content-type"], "image/png") # inferred from filename
|
||||
self.assertEqual(attachments[1]["Filename"], "test.png")
|
||||
self.assertEqual(decode_att(attachments[1]["content"]), png_content)
|
||||
self.assertNotIn('ContentID', attachments[1]) # make sure image not treated as inline
|
||||
|
||||
self.assertEqual(attachments[2]["Content-type"], "application/pdf")
|
||||
self.assertEqual(attachments[2]["Filename"], "") # none
|
||||
self.assertEqual(decode_att(attachments[2]["content"]), pdf_content)
|
||||
self.assertNotIn('ContentID', attachments[2])
|
||||
|
||||
def test_unicode_attachment_correctly_decoded(self):
|
||||
self.message.attach(u"Une pièce jointe.html", u'<p>\u2019</p>', mimetype='text/html')
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Attachments'], [{
|
||||
'Filename': u'Une pièce jointe.html',
|
||||
'Content-type': 'text/html',
|
||||
'content': b64encode(u'<p>\u2019</p>'.encode('utf-8')).decode('ascii')
|
||||
}])
|
||||
|
||||
def test_embedded_images(self):
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
image_path = sample_image_path(image_filename)
|
||||
image_data = sample_image_content(image_filename)
|
||||
|
||||
cid = attach_inline_image_file(self.message, image_path) # Read from a png file
|
||||
html_content = '<p>This has an <img src="cid:%s" alt="inline" /> image.</p>' % cid
|
||||
self.message.attach_alternative(html_content, "text/html")
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Html-part'], html_content)
|
||||
|
||||
attachments = data['Inline_attachments']
|
||||
self.assertEqual(len(attachments), 1)
|
||||
self.assertEqual(attachments[0]['Filename'], cid)
|
||||
self.assertEqual(attachments[0]['Content-type'], 'image/png')
|
||||
self.assertEqual(decode_att(attachments[0]["content"]), image_data)
|
||||
|
||||
def test_attached_images(self):
|
||||
image_filename = SAMPLE_IMAGE_FILENAME
|
||||
image_path = sample_image_path(image_filename)
|
||||
image_data = sample_image_content(image_filename)
|
||||
|
||||
self.message.attach_file(image_path) # option 1: attach as a file
|
||||
|
||||
image = MIMEImage(image_data) # option 2: construct the MIMEImage and attach it directly
|
||||
self.message.attach(image)
|
||||
|
||||
image_data_b64 = b64encode(image_data).decode('ascii')
|
||||
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Attachments'], [
|
||||
{
|
||||
'Filename': image_filename, # the named one
|
||||
'Content-type': 'image/png',
|
||||
'content': image_data_b64,
|
||||
},
|
||||
{
|
||||
'Filename': '', # the unnamed one
|
||||
'Content-type': 'image/png',
|
||||
'content': image_data_b64,
|
||||
},
|
||||
])
|
||||
|
||||
def test_multiple_html_alternatives(self):
|
||||
# Multiple alternatives not allowed
|
||||
self.message.attach_alternative("<p>First html is OK</p>", "text/html")
|
||||
self.message.attach_alternative("<p>But not second html</p>", "text/html")
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
|
||||
def test_html_alternative(self):
|
||||
# Only html alternatives allowed
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
with self.assertRaises(AnymailUnsupportedFeature):
|
||||
self.message.send()
|
||||
|
||||
def test_alternatives_fail_silently(self):
|
||||
# Make sure fail_silently is respected
|
||||
self.message.attach_alternative("{'not': 'allowed'}", "application/json")
|
||||
sent = self.message.send(fail_silently=True)
|
||||
self.assert_esp_not_called("API should not be called when send fails silently")
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_suppress_empty_address_lists(self):
|
||||
"""Empty to, cc, bcc, and reply_to shouldn't generate empty fields"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('Cc', data)
|
||||
self.assertNotIn('Bcc', data)
|
||||
self.assertNotIn('ReplyTo', data)
|
||||
|
||||
# Test empty `to` -- but send requires at least one recipient somewhere (like cc)
|
||||
self.message.to = []
|
||||
self.message.cc = ['cc@example.com']
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('To', data)
|
||||
self.assertEqual(data['Cc'], 'cc@example.com')
|
||||
|
||||
def test_api_failure(self):
|
||||
self.set_mock_response(status_code=500)
|
||||
with self.assertRaisesMessage(AnymailAPIError, "Mailjet API response 500"):
|
||||
mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'])
|
||||
|
||||
# Make sure fail_silently is respected
|
||||
self.set_mock_response(status_code=500)
|
||||
sent = mail.send_mail('Subject', 'Body', 'from@example.com', ['to@example.com'], fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
|
||||
def test_api_error_includes_details(self):
|
||||
"""AnymailAPIError should include ESP's error message"""
|
||||
# JSON error response:
|
||||
error_response = b"""{
|
||||
"ErrorCode": 451,
|
||||
"Message": "Helpful explanation from Mailjet."
|
||||
}"""
|
||||
self.set_mock_response(status_code=200, raw=error_response)
|
||||
with self.assertRaisesMessage(AnymailAPIError, "Helpful explanation from Mailjet"):
|
||||
self.message.send()
|
||||
|
||||
# Non-JSON error response:
|
||||
self.set_mock_response(status_code=500, raw=b"Ack! Bad proxy!")
|
||||
with self.assertRaisesMessage(AnymailAPIError, "Ack! Bad proxy!"):
|
||||
self.message.send()
|
||||
|
||||
# No content in the error response:
|
||||
self.set_mock_response(status_code=502, raw=None)
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
self.message.send()
|
||||
|
||||
def test_invalid_api_key(self):
|
||||
"""Anymail should add a helpful message for an invalid API key"""
|
||||
# Mailjet just returns a 401 error -- without additional explanation --
|
||||
# for invalid keys. We want to provide users something more helpful
|
||||
# than just "Mailjet API response 401:
|
||||
self.set_mock_response(status_code=401, reason="Unauthorized", raw=None)
|
||||
with self.assertRaisesMessage(AnymailAPIError, "Invalid Mailjet API key or secret"):
|
||||
self.message.send()
|
||||
|
||||
|
||||
class MailjetBackendAnymailFeatureTests(MailjetBackendMockAPITestCase):
|
||||
"""Test backend support for Anymail added features"""
|
||||
|
||||
def test_metadata(self):
|
||||
# Mailjet expects the payload to be a single string
|
||||
# https://dev.mailjet.com/guides/#tagging-email-messages
|
||||
self.message.metadata = {'user_id': "12345", 'items': 6}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertJSONEqual(data['Mj-EventPayLoad'], {"user_id": "12345", "items": 6})
|
||||
|
||||
def test_send_at(self):
|
||||
self.message.send_at = 1651820889 # 2022-05-06 07:08:09 UTC
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, 'send_at'):
|
||||
self.message.send()
|
||||
|
||||
def test_tags(self):
|
||||
self.message.tags = ["receipt"]
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Mj-campaign'], "receipt")
|
||||
|
||||
self.message.tags = ["receipt", "repeat-user"]
|
||||
with self.assertRaisesMessage(AnymailUnsupportedFeature, 'multiple tags'):
|
||||
self.message.send()
|
||||
|
||||
def test_track_opens(self):
|
||||
self.message.track_opens = True
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Mj-trackopen'], 2)
|
||||
|
||||
def test_track_clicks(self):
|
||||
self.message.track_clicks = True
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Mj-trackclick'], 2)
|
||||
|
||||
# Also explicit "None" for False (to override server default)
|
||||
self.message.track_clicks = False
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Mj-trackclick'], 1)
|
||||
|
||||
def test_template(self):
|
||||
# template_id can be str or int (but must be numeric ID -- not the template's name)
|
||||
self.message.template_id = '1234567'
|
||||
self.message.merge_global_data = {'name': "Alice", 'group': "Developers"}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Mj-TemplateID'], '1234567')
|
||||
self.assertEqual(data['Vars'], {'name': "Alice", 'group': "Developers"})
|
||||
|
||||
def test_template_populate_from_sender(self):
|
||||
self.set_template_response()
|
||||
self.message.template_id = '1234567'
|
||||
self.message.from_email = None
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Mj-TemplateID'], '1234567')
|
||||
self.assertEqual(data['FromName'], 'Friendly Tester')
|
||||
self.assertEqual(data['FromEmail'], 'some@example.com')
|
||||
|
||||
def test_template_populate_from(self):
|
||||
# Note: Mailjet fails to properly quote the From field's display-name
|
||||
# if the template sender name contains commas (as shown here):
|
||||
template_response_content = b'''{
|
||||
"Count": 1,
|
||||
"Data": [{
|
||||
"Text-part": "text body",
|
||||
"Html-part": "html body",
|
||||
"MJMLContent": "",
|
||||
"Headers": {
|
||||
"Subject": "Hello World!!",
|
||||
"From": "Widgets, Inc. <noreply@example.com>",
|
||||
"Reply-To": ""
|
||||
}
|
||||
}],
|
||||
"Total": 1
|
||||
}'''
|
||||
self.set_template_response(raw=template_response_content)
|
||||
self.message.template_id = '1234568'
|
||||
self.message.from_email = None
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Mj-TemplateID'], '1234568')
|
||||
self.assertEqual(data['FromName'], 'Widgets, Inc.')
|
||||
self.assertEqual(data['FromEmail'], 'noreply@example.com')
|
||||
|
||||
def test_template_not_found(self):
|
||||
template_response_content = b'''{
|
||||
"ErrorInfo": "",
|
||||
"ErrorMessage": "Object not found",
|
||||
"StatusCode": 404
|
||||
}'''
|
||||
self.set_template_response(status_code=404, raw=template_response_content)
|
||||
self.message.template_id = '1234560'
|
||||
self.message.from_email = None
|
||||
with self.assertRaises(AnymailRequestsAPIError):
|
||||
self.message.send()
|
||||
|
||||
def test_template_unexpected_response(self):
|
||||
# Missing headers (not sure if possible though).
|
||||
template_response_content = b'''{
|
||||
"Count": 1,
|
||||
"Data": [{
|
||||
"Text-part": "text body",
|
||||
"Html-part": "html body",
|
||||
"MJMLContent": "",
|
||||
"Headers": {
|
||||
}
|
||||
}],
|
||||
"Total": 1
|
||||
}'''
|
||||
self.set_template_response(raw=template_response_content)
|
||||
self.message.template_id = '1234561'
|
||||
self.message.from_email = None
|
||||
with self.assertRaisesMessage(AnymailRequestsAPIError, "template API"):
|
||||
self.message.send()
|
||||
|
||||
def test_template_invalid_response(self):
|
||||
"""Test scenario when MJ service returns no JSON for some reason."""
|
||||
template_response_content = b'''total garbage'''
|
||||
self.set_template_response(raw=template_response_content)
|
||||
self.message.template_id = '1234562'
|
||||
self.message.from_email = None
|
||||
with self.assertRaisesMessage(AnymailRequestsAPIError, "Invalid JSON"):
|
||||
self.message.send()
|
||||
|
||||
def test_merge_data(self):
|
||||
self.message.to = ['alice@example.com']
|
||||
self.message.template_id = '1234567'
|
||||
self.message.merge_data = {
|
||||
'alice@example.com': {'name': "Alice", 'group': "Developers"},
|
||||
}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['Mj-TemplateID'], '1234567')
|
||||
self.assertNotIn('Vars', data)
|
||||
self.assertEqual(data['Recipients'], [{
|
||||
'Email': 'alice@example.com',
|
||||
'Vars': {'name': "Alice", 'group': "Developers"}
|
||||
}])
|
||||
|
||||
def test_default_omits_options(self):
|
||||
"""Make sure by default we don't send any ESP-specific options.
|
||||
|
||||
Options not specified by the caller should be omitted entirely from
|
||||
the API call (*not* sent as False or empty). This ensures
|
||||
that your ESP account settings apply by default.
|
||||
"""
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertNotIn('Mj-campaign', data)
|
||||
self.assertNotIn('Mj-EventPayLoad', data)
|
||||
self.assertNotIn('Mj-TemplateID', data)
|
||||
self.assertNotIn('Vars', data)
|
||||
self.assertNotIn('Mj-trackopen', data)
|
||||
self.assertNotIn('Mj-trackclick', data)
|
||||
|
||||
def test_esp_extra(self):
|
||||
self.message.esp_extra = {
|
||||
'MJ-TemplateErrorDeliver': True,
|
||||
'MJ-TemplateErrorReporting': 'bugs@example.com'
|
||||
}
|
||||
self.message.send()
|
||||
data = self.get_api_call_json()
|
||||
self.assertEqual(data['MJ-TemplateErrorDeliver'], True)
|
||||
self.assertEqual(data['MJ-TemplateErrorReporting'], 'bugs@example.com')
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_attaches_anymail_status(self):
|
||||
""" The anymail_status should be attached to the message when it is sent """
|
||||
response_content = b"""{
|
||||
"Sent": [{
|
||||
"Email": "to1@example.com",
|
||||
"MessageID": 12345678901234500
|
||||
}]
|
||||
}"""
|
||||
self.set_mock_response(raw=response_content)
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com', ['to1@example.com'],)
|
||||
sent = msg.send()
|
||||
self.assertEqual(sent, 1)
|
||||
self.assertEqual(msg.anymail_status.status, {'sent'})
|
||||
self.assertEqual(msg.anymail_status.message_id, "12345678901234500")
|
||||
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].status, 'sent')
|
||||
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].message_id, "12345678901234500")
|
||||
self.assertEqual(msg.anymail_status.esp_response.content, response_content)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_status_includes_all_recipients(self):
|
||||
"""The status should include an entry for each recipient"""
|
||||
# Note that Mailjet's response only communicates "Sent" status; not failed addresses.
|
||||
# (This is an example response from before the workaround for commas in display-names...)
|
||||
response_content = b"""{
|
||||
"Sent": [{
|
||||
"Email": "to1@example.com",
|
||||
"MessageID": 12345678901234500
|
||||
}, {
|
||||
"Email": "\\"Recipient",
|
||||
"MessageID": 12345678901234501
|
||||
}, {
|
||||
"Email": "Also",
|
||||
"MessageID": 12345678901234502
|
||||
}]
|
||||
}"""
|
||||
self.set_mock_response(raw=response_content)
|
||||
msg = mail.EmailMessage('Subject', 'Message', 'from@example.com',
|
||||
['to1@example.com', '"Recipient, Also" <to2@example.com>'],)
|
||||
sent = msg.send()
|
||||
self.assertEqual(sent, 1)
|
||||
self.assertEqual(msg.anymail_status.status, {'sent', 'unknown'})
|
||||
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].status, 'sent')
|
||||
self.assertEqual(msg.anymail_status.recipients['to1@example.com'].message_id, "12345678901234500")
|
||||
self.assertEqual(msg.anymail_status.recipients['to2@example.com'].status, 'unknown') # because, whoops
|
||||
self.assertEqual(msg.anymail_status.recipients['to2@example.com'].message_id, None)
|
||||
self.assertEqual(msg.anymail_status.message_id,
|
||||
{"12345678901234500", "12345678901234501", "12345678901234502", None})
|
||||
self.assertEqual(msg.anymail_status.esp_response.content, response_content)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_failed_anymail_status(self):
|
||||
""" If the send fails, anymail_status should contain initial values"""
|
||||
self.set_mock_response(status_code=500)
|
||||
sent = self.message.send(fail_silently=True)
|
||||
self.assertEqual(sent, 0)
|
||||
self.assertIsNone(self.message.anymail_status.status)
|
||||
self.assertIsNone(self.message.anymail_status.message_id)
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertIsNone(self.message.anymail_status.esp_response)
|
||||
|
||||
# noinspection PyUnresolvedReferences
|
||||
def test_send_unparsable_response(self):
|
||||
"""If the send succeeds, but a non-JSON API response, should raise an API exception"""
|
||||
mock_response = self.set_mock_response(status_code=200,
|
||||
raw=b"yikes, this isn't a real response")
|
||||
with self.assertRaises(AnymailAPIError):
|
||||
self.message.send()
|
||||
self.assertIsNone(self.message.anymail_status.status)
|
||||
self.assertIsNone(self.message.anymail_status.message_id)
|
||||
self.assertEqual(self.message.anymail_status.recipients, {})
|
||||
self.assertEqual(self.message.anymail_status.esp_response, mock_response)
|
||||
|
||||
def test_json_serialization_errors(self):
|
||||
"""Try to provide more information about non-json-serializable data"""
|
||||
self.message.tags = [Decimal('19.99')] # yeah, don't do this
|
||||
with self.assertRaises(AnymailSerializationError) as cm:
|
||||
self.message.send()
|
||||
print(self.get_api_call_json())
|
||||
err = cm.exception
|
||||
self.assertIsInstance(err, TypeError) # compatibility with json.dumps
|
||||
self.assertIn("Don't know how to send this data to Mailjet", str(err)) # our added context
|
||||
self.assertRegex(str(err), r"Decimal.*is not JSON serializable") # original message
|
||||
|
||||
def test_merge_data_null_values(self):
|
||||
# Mailjet doesn't accept None (null) as a merge value;
|
||||
# returns "HTTP/1.1 500 Cannot convert data from Null value"
|
||||
self.message.merge_global_data = {'Some': None}
|
||||
self.set_mock_response(status_code=500, reason="Cannot convert data from Null value", raw=None)
|
||||
with self.assertRaisesMessage(AnymailAPIError, "Cannot convert data from Null value"):
|
||||
self.message.send()
|
||||
|
||||
|
||||
class MailjetBackendSessionSharingTestCase(SessionSharingTestCasesMixin, MailjetBackendMockAPITestCase):
|
||||
"""Requests session sharing tests"""
|
||||
pass # tests are defined in the mixin
|
||||
|
||||
|
||||
@override_settings(EMAIL_BACKEND="anymail.backends.mailjet.EmailBackend")
|
||||
class MailjetBackendImproperlyConfiguredTests(SimpleTestCase, AnymailTestMixin):
|
||||
"""Test ESP backend without required settings in place"""
|
||||
|
||||
def test_missing_api_key(self):
|
||||
with self.assertRaises(ImproperlyConfigured) as cm:
|
||||
mail.send_mail('Subject', 'Message', 'from@example.com', ['to@example.com'])
|
||||
errmsg = str(cm.exception)
|
||||
self.assertRegex(errmsg, r'\bMAILJET_API_KEY\b')
|
||||
|
||||
@override_settings(ANYMAIL={'MAILJET_API_KEY': 'dummy'})
|
||||
def test_missing_secret_key(self):
|
||||
with self.assertRaises(ImproperlyConfigured) as cm:
|
||||
mail.send_mail('Subject', 'Message', 'from@example.com', ['to@example.com'])
|
||||
errmsg = str(cm.exception)
|
||||
self.assertRegex(errmsg, r'\bMAILJET_SECRET_KEY\b')
|
||||
130
tests/test_mailjet_integration.py
Normal file
130
tests/test_mailjet_integration.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from django.test import SimpleTestCase
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from anymail.exceptions import AnymailAPIError
|
||||
from anymail.message import AnymailMessage
|
||||
|
||||
from .utils import AnymailTestMixin, sample_image_path, RUN_LIVE_TESTS
|
||||
|
||||
MAILJET_TEST_API_KEY = os.getenv('MAILJET_TEST_API_KEY')
|
||||
MAILJET_TEST_SECRET_KEY = os.getenv('MAILJET_TEST_SECRET_KEY')
|
||||
|
||||
|
||||
@unittest.skipUnless(RUN_LIVE_TESTS, "RUN_LIVE_TESTS disabled in this environment")
|
||||
@unittest.skipUnless(MAILJET_TEST_API_KEY and MAILJET_TEST_SECRET_KEY,
|
||||
"Set MAILJET_TEST_API_KEY and MAILJET_TEST_SECRET_KEY "
|
||||
"environment variables to run Mailjet integration tests")
|
||||
@override_settings(ANYMAIL_MAILJET_API_KEY=MAILJET_TEST_API_KEY,
|
||||
ANYMAIL_MAILJET_SECRET_KEY=MAILJET_TEST_SECRET_KEY,
|
||||
EMAIL_BACKEND="anymail.backends.mailjet.EmailBackend")
|
||||
class MailjetBackendIntegrationTests(SimpleTestCase, AnymailTestMixin):
|
||||
"""Mailjet API integration tests
|
||||
|
||||
These tests run against the **live** Mailjet API, using the
|
||||
environment variables `MAILJET_TEST_API_KEY` and `MAILJET_TEST_SECRET_KEY`
|
||||
as the API key and API secret key, respectively.
|
||||
If those variables are not set, these tests won't run.
|
||||
|
||||
Mailjet doesn't (in v3.0) offer a test/sandbox mode -- it tries to send everything
|
||||
you ask.
|
||||
|
||||
Mailjet also doesn't support unverified senders (so no from@example.com).
|
||||
We've set up @test-mj.anymail.info as a validated sending domain for these tests.
|
||||
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(MailjetBackendIntegrationTests, self).setUp()
|
||||
self.message = AnymailMessage('Anymail Mailjet integration test', 'Text content',
|
||||
'test@test-mj.anymail.info', ['to@example.com'])
|
||||
self.message.attach_alternative('<p>HTML content</p>', "text/html")
|
||||
|
||||
def test_simple_send(self):
|
||||
# Example of getting the Mailjet send status and message id from the message
|
||||
sent_count = self.message.send()
|
||||
self.assertEqual(sent_count, 1)
|
||||
|
||||
anymail_status = self.message.anymail_status
|
||||
sent_status = anymail_status.recipients['to@example.com'].status
|
||||
message_id = anymail_status.recipients['to@example.com'].message_id
|
||||
|
||||
self.assertEqual(sent_status, 'sent')
|
||||
self.assertRegex(message_id, r'.+')
|
||||
self.assertEqual(anymail_status.status, {sent_status}) # set of all recipient statuses
|
||||
self.assertEqual(anymail_status.message_id, message_id)
|
||||
|
||||
def test_all_options(self):
|
||||
message = AnymailMessage(
|
||||
subject="Anymail all-options integration test",
|
||||
body="This is the text body",
|
||||
from_email='"Test Sender, Inc." <test@test-mj.anymail.info>',
|
||||
to=['to1@example.com', '"Recipient, 2nd" <to2@example.com>'],
|
||||
cc=['cc1@example.com', 'Copy 2 <cc2@example.com>'],
|
||||
bcc=['bcc1@example.com', 'Blind Copy 2 <bcc2@example.com>'],
|
||||
reply_to=['reply1@example.com', '"Reply, 2nd" <reply2@example.com>'],
|
||||
headers={"X-Anymail-Test": "value"},
|
||||
|
||||
metadata={"meta1": "simple string", "meta2": 2},
|
||||
tags=["tag 1"], # Mailjet only allows a single tag
|
||||
track_clicks=True,
|
||||
track_opens=True,
|
||||
)
|
||||
message.attach("attachment1.txt", "Here is some\ntext for you", "text/plain")
|
||||
message.attach("attachment2.csv", "ID,Name\n1,Amy Lina", "text/csv")
|
||||
cid = message.attach_inline_image_file(sample_image_path())
|
||||
message.attach_alternative(
|
||||
"<p><b>HTML:</b> with <a href='http://example.com'>link</a>"
|
||||
"and image: <img src='cid:%s'></div>" % cid,
|
||||
"text/html")
|
||||
|
||||
message.send()
|
||||
self.assertEqual(message.anymail_status.status, {'sent'})
|
||||
|
||||
def test_merge_data(self):
|
||||
message = AnymailMessage(
|
||||
subject="Anymail merge_data test", # Mailjet doesn't support merge fields in the subject
|
||||
body="This body includes merge data: [[var:value]]\n"
|
||||
"And global merge data: [[var:global]]",
|
||||
from_email="Test From <test@test-mj.anymail.info>",
|
||||
to=["to1@example.com", "Recipient 2 <to2@example.com>"],
|
||||
merge_data={
|
||||
'to1@example.com': {'value': 'one'},
|
||||
'to2@example.com': {'value': 'two'},
|
||||
},
|
||||
merge_global_data={
|
||||
'global': 'global_value'
|
||||
},
|
||||
)
|
||||
message.send()
|
||||
recipient_status = message.anymail_status.recipients
|
||||
self.assertEqual(recipient_status['to1@example.com'].status, 'sent')
|
||||
self.assertEqual(recipient_status['to2@example.com'].status, 'sent')
|
||||
|
||||
def test_stored_template(self):
|
||||
message = AnymailMessage(
|
||||
template_id='176375', # ID of the real template named 'test-template' in our Mailjet test account
|
||||
to=["to1@example.com"],
|
||||
merge_data={
|
||||
'to1@example.com': {
|
||||
'name': "Test Recipient",
|
||||
}
|
||||
},
|
||||
merge_global_data={
|
||||
'order': '12345',
|
||||
},
|
||||
)
|
||||
message.from_email = None # use the template's sender email/name
|
||||
message.send()
|
||||
recipient_status = message.anymail_status.recipients
|
||||
self.assertEqual(recipient_status['to1@example.com'].status, 'sent')
|
||||
|
||||
@override_settings(ANYMAIL_MAILJET_API_KEY="Hey, that's not an API key!")
|
||||
def test_invalid_api_key(self):
|
||||
with self.assertRaises(AnymailAPIError) as cm:
|
||||
self.message.send()
|
||||
err = cm.exception
|
||||
self.assertEqual(err.status_code, 401)
|
||||
self.assertIn("Invalid Mailjet API key or secret", str(err))
|
||||
Reference in New Issue
Block a user