mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-20 11:51:05 -05:00
344 lines
14 KiB
Python
344 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
import typing
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from email.charset import QP, Charset
|
|
from email.headerregistry import Address
|
|
|
|
from django.core.mail import EmailMessage
|
|
from requests import Response
|
|
from requests.structures import CaseInsensitiveDict
|
|
|
|
from anymail.backends.base_requests import AnymailRequestsBackend, RequestsPayload
|
|
from anymail.message import AnymailRecipientStatus
|
|
from anymail.utils import Attachment, EmailAddress, get_anymail_setting, update_deep
|
|
|
|
# Used to force RFC-2047 encoded word
|
|
# in address formatting workaround
|
|
QP_CHARSET = Charset("utf-8")
|
|
QP_CHARSET.header_encoding = QP
|
|
|
|
|
|
class EmailBackend(AnymailRequestsBackend):
|
|
"""Unisender Go v1 Web API Email Backend"""
|
|
|
|
esp_name = "Unisender Go"
|
|
|
|
def __init__(self, **kwargs: typing.Any):
|
|
"""Init options from Django settings"""
|
|
esp_name = self.esp_name
|
|
|
|
self.api_key = get_anymail_setting(
|
|
"api_key", esp_name=esp_name, kwargs=kwargs, allow_bare=True
|
|
)
|
|
|
|
self.generate_message_id = get_anymail_setting(
|
|
"generate_message_id", esp_name=esp_name, kwargs=kwargs, default=True
|
|
)
|
|
|
|
# No default for api_url setting -- it depends on account's data center. E.g.:
|
|
# - https://go1.unisender.ru/ru/transactional/api/v1
|
|
# - https://go2.unisender.ru/ru/transactional/api/v1
|
|
api_url = get_anymail_setting("api_url", esp_name=esp_name, kwargs=kwargs)
|
|
if not api_url.endswith("/"):
|
|
api_url += "/"
|
|
|
|
# Undocumented setting to control workarounds for Unisender Go display-name issues
|
|
# (see below). If/when Unisender Go fixes the problems, you can disable Anymail's
|
|
# workarounds by adding `"UNISENDER_GO_WORKAROUND_DISPLAY_NAME_BUGS": False`
|
|
# to your `ANYMAIL` settings.
|
|
self.workaround_display_name_bugs = get_anymail_setting(
|
|
"workaround_display_name_bugs",
|
|
esp_name=esp_name,
|
|
kwargs=kwargs,
|
|
default=True,
|
|
)
|
|
|
|
super().__init__(api_url, **kwargs)
|
|
|
|
def build_message_payload(
|
|
self, message: EmailMessage, defaults: dict
|
|
) -> UnisenderGoPayload:
|
|
return UnisenderGoPayload(message=message, defaults=defaults, backend=self)
|
|
|
|
# Map Unisender Go "failed_email" code -> AnymailRecipientStatus.status
|
|
_unisender_failure_status = {
|
|
# "duplicate": ignored (see parse_recipient_status)
|
|
"invalid": "invalid",
|
|
"permanent_unavailable": "rejected",
|
|
"temporary_unavailable": "failed",
|
|
"unsubscribed": "rejected",
|
|
}
|
|
|
|
def parse_recipient_status(
|
|
self, response: Response, payload: UnisenderGoPayload, message: EmailMessage
|
|
) -> dict:
|
|
"""
|
|
Response example:
|
|
{
|
|
"status": "success",
|
|
"job_id": "1ZymBc-00041N-9X",
|
|
"emails": [
|
|
"user@example.com",
|
|
"email@example.com",
|
|
],
|
|
"failed_emails": {
|
|
"email1@gmail.com": "temporary_unavailable",
|
|
"bad@address": "invalid",
|
|
"email@example.com": "duplicate",
|
|
"root@example.org": "permanent_unavailable",
|
|
"olduser@example.net": "unsubscribed"
|
|
}
|
|
}
|
|
"""
|
|
parsed_response = self.deserialize_json_response(response, payload, message)
|
|
# job_id serves as message_id when not self.generate_message_id
|
|
job_id = parsed_response.get("job_id")
|
|
succeed_emails = {
|
|
recipient: AnymailRecipientStatus(
|
|
message_id=payload.message_ids.get(recipient, job_id), status="queued"
|
|
)
|
|
for recipient in parsed_response["emails"]
|
|
}
|
|
failed_emails = {
|
|
recipient: AnymailRecipientStatus(
|
|
# Message wasn't sent to this recipient, so Unisender Go hasn't stored
|
|
# any metadata (including message_id)
|
|
message_id=None,
|
|
status=self._unisender_failure_status.get(status, "failed"),
|
|
)
|
|
for recipient, status in parsed_response.get("failed_emails", {}).items()
|
|
if status != "duplicate" # duplicates are in both succeed and failed lists
|
|
}
|
|
return {**succeed_emails, **failed_emails}
|
|
|
|
|
|
class UnisenderGoPayload(RequestsPayload):
|
|
# Payload: see https://godocs.unisender.ru/web-api-ref#email-send
|
|
|
|
data: dict
|
|
|
|
def __init__(
|
|
self,
|
|
message: EmailMessage,
|
|
defaults: dict,
|
|
backend: EmailBackend,
|
|
*args: typing.Any,
|
|
**kwargs: typing.Any,
|
|
):
|
|
self.generate_message_id = backend.generate_message_id
|
|
self.message_ids = CaseInsensitiveDict() # recipient -> generated message_id
|
|
|
|
http_headers = kwargs.pop("headers", {})
|
|
http_headers["Content-Type"] = "application/json"
|
|
http_headers["Accept"] = "application/json"
|
|
http_headers["X-API-KEY"] = backend.api_key
|
|
super().__init__(
|
|
message, defaults, backend, headers=http_headers, *args, **kwargs
|
|
)
|
|
|
|
def get_api_endpoint(self) -> str:
|
|
return "email/send.json"
|
|
|
|
def init_payload(self) -> None:
|
|
self.data = { # becomes json
|
|
"headers": CaseInsensitiveDict(),
|
|
"recipients": [],
|
|
}
|
|
|
|
def serialize_data(self) -> str:
|
|
if self.generate_message_id:
|
|
self.set_anymail_id()
|
|
|
|
headers = self.data["headers"]
|
|
if self.is_batch():
|
|
# Remove the all-recipient "to" header for batch sends.
|
|
# Unisender Go will construct a single-recipient "to" for each recipient.
|
|
# Unisender Go doesn't allow a "cc" header without an explicit "to"
|
|
# header, so we cannot support "cc" for batch sends.
|
|
headers.pop("to", None)
|
|
if headers.pop("cc", None):
|
|
self.unsupported_feature(
|
|
"cc with batch send (merge_data or merge_metadata)"
|
|
)
|
|
|
|
if not headers:
|
|
del self.data["headers"] # don't send empty headers
|
|
|
|
return self.serialize_json({"message": self.data})
|
|
|
|
def set_anymail_id(self) -> None:
|
|
"""Ensure each personalization has a known anymail_id for event tracking"""
|
|
for recipient in self.data["recipients"]:
|
|
# This ensures duplicate recipients get same anymail_id
|
|
# (because Unisender Go only sends to first instance of duplicate)
|
|
email_address = recipient["email"]
|
|
anymail_id = self.message_ids.get(email_address) or str(uuid.uuid4())
|
|
recipient.setdefault("metadata", {})["anymail_id"] = anymail_id
|
|
self.message_ids[email_address] = anymail_id
|
|
|
|
#
|
|
# Payload construction
|
|
#
|
|
|
|
def set_from_email(self, email: EmailAddress) -> None:
|
|
self.data["from_email"] = email.addr_spec
|
|
if email.display_name:
|
|
self.data["from_name"] = email.display_name
|
|
|
|
def _format_email_address(self, address):
|
|
"""
|
|
Return EmailAddress address formatted for use with Unisender Go to/cc headers.
|
|
|
|
Works around a bug in Unisender Go's API that rejects To or Cc headers
|
|
containing commas, angle brackets, or @ in any display-name, despite those
|
|
names being properly enclosed in "quotes" per RFC 5322. Workaround substitutes
|
|
an RFC 2047 encoded word, which avoids the problem characters.
|
|
|
|
Note that parens, quote chars, and other special characters appearing
|
|
in "quoted strings" don't cause problems. (Unisender Go tech support
|
|
has confirmed the problem is limited to , < > @.)
|
|
|
|
This workaround is only necessary in the To and Cc headers. Unisender Go
|
|
properly formats commas and other characters in `to_name` and `from_name`.
|
|
(But see set_reply_to for a related issue.)
|
|
"""
|
|
formatted = address.address
|
|
if self.backend.workaround_display_name_bugs:
|
|
# Workaround: force RFC-2047 QP encoded word for display_name if it has
|
|
# prohibited chars (and isn't already encoded in the formatted address)
|
|
display_name = address.display_name
|
|
if re.search(r"[,<>@]", display_name) and display_name in formatted:
|
|
formatted = str(
|
|
Address(
|
|
display_name=QP_CHARSET.header_encode(address.display_name),
|
|
addr_spec=address.addr_spec,
|
|
)
|
|
)
|
|
return formatted
|
|
|
|
def set_recipients(self, recipient_type: str, emails: list[EmailAddress]):
|
|
for email in emails:
|
|
recipient = {"email": email.addr_spec}
|
|
if email.display_name:
|
|
recipient["substitutions"] = {"to_name": email.display_name}
|
|
self.data["recipients"].append(recipient)
|
|
|
|
if emails and recipient_type in {"to", "cc"}:
|
|
# Add "to" or "cc" header listing all recipients of type.
|
|
# See https://godocs.unisender.ru/cc-and-bcc.
|
|
# (For batch sends, these will be adjusted later in self.serialize_data.)
|
|
self.data["headers"][recipient_type] = ", ".join(
|
|
self._format_email_address(email) for email in emails
|
|
)
|
|
|
|
def set_subject(self, subject: str) -> None:
|
|
if subject:
|
|
self.data["subject"] = subject
|
|
|
|
def set_reply_to(self, emails: list[EmailAddress]) -> None:
|
|
# Unisender Go only supports a single address in the reply_to API param.
|
|
if len(emails) > 1:
|
|
self.unsupported_feature("multiple reply_to addresses")
|
|
if len(emails) > 0:
|
|
reply_to = emails[0]
|
|
self.data["reply_to"] = reply_to.addr_spec
|
|
display_name = reply_to.display_name
|
|
if display_name:
|
|
if self.backend.workaround_display_name_bugs:
|
|
# Unisender Go doesn't properly "quote" (RFC 5322) a `reply_to_name`
|
|
# containing special characters (comma, parens, etc.), resulting
|
|
# in an invalid Reply-To header that can cause problems when the
|
|
# recipient tries to reply. (They *do* properly handle special chars
|
|
# in `to_name` and `from_name`; this only affects `reply_to_name`.)
|
|
if reply_to.address.startswith('"'): # requires quoted syntax
|
|
# Workaround: force RFC-2047 encoded word
|
|
display_name = QP_CHARSET.header_encode(display_name)
|
|
self.data["reply_to_name"] = display_name
|
|
|
|
def set_extra_headers(self, headers: dict[str, str]) -> None:
|
|
self.data["headers"].update(headers)
|
|
|
|
def set_text_body(self, body: str) -> None:
|
|
if body:
|
|
self.data.setdefault("body", {})["plaintext"] = body
|
|
|
|
def set_html_body(self, body: str) -> None:
|
|
if body:
|
|
self.data.setdefault("body", {})["html"] = body
|
|
|
|
def add_alternative(self, content: str, mimetype: str):
|
|
if mimetype.lower() == "text/x-amp-html":
|
|
if "amp" in self.data.get("body", {}):
|
|
self.unsupported_feature("multiple amp-html parts")
|
|
self.data.setdefault("body", {})["amp"] = content
|
|
else:
|
|
super().add_alternative(content, mimetype)
|
|
|
|
def add_attachment(self, attachment: Attachment) -> None:
|
|
name = attachment.cid if attachment.inline else attachment.name
|
|
att = {
|
|
"content": attachment.b64content,
|
|
"type": attachment.mimetype,
|
|
"name": name or "", # required - submit empty string if unknown
|
|
}
|
|
if attachment.inline:
|
|
self.data.setdefault("inline_attachments", []).append(att)
|
|
else:
|
|
self.data.setdefault("attachments", []).append(att)
|
|
|
|
def set_metadata(self, metadata: dict[str, str]) -> None:
|
|
self.data["global_metadata"] = metadata
|
|
|
|
def set_send_at(self, send_at: datetime | str) -> None:
|
|
try:
|
|
# "Date and time in the format “YYYY-MM-DD hh:mm:ss” in the UTC time zone."
|
|
# If send_at is a datetime, it's guaranteed to be aware, but maybe not UTC.
|
|
# Convert to UTC, then strip tzinfo to avoid isoformat "+00:00" at end.
|
|
send_at_utc = send_at.astimezone(timezone.utc).replace(tzinfo=None)
|
|
send_at_formatted = send_at_utc.isoformat(sep=" ", timespec="seconds")
|
|
assert len(send_at_formatted) == 19
|
|
except (AttributeError, TypeError):
|
|
# Not a datetime - caller is responsible for formatting
|
|
send_at_formatted = send_at
|
|
self.data.setdefault("options", {})["send_at"] = send_at_formatted
|
|
|
|
def set_tags(self, tags: list[str]) -> None:
|
|
self.data["tags"] = tags
|
|
|
|
def set_track_clicks(self, track_clicks: typing.Any):
|
|
self.data["track_links"] = 1 if track_clicks else 0
|
|
|
|
def set_track_opens(self, track_opens: typing.Any):
|
|
self.data["track_read"] = 1 if track_opens else 0
|
|
|
|
def set_template_id(self, template_id: str) -> None:
|
|
self.data["template_id"] = template_id
|
|
|
|
def set_merge_data(self, merge_data: dict[str, dict[str, str]]) -> None:
|
|
if not merge_data:
|
|
return
|
|
assert self.data["recipients"] # must be called after set_to
|
|
for recipient in self.data["recipients"]:
|
|
recipient_email = recipient["email"]
|
|
if recipient_email in merge_data:
|
|
# (substitutions may already be present with "to_email")
|
|
recipient.setdefault("substitutions", {}).update(
|
|
merge_data[recipient_email]
|
|
)
|
|
|
|
def set_merge_global_data(self, merge_global_data: dict[str, str]) -> None:
|
|
self.data["global_substitutions"] = merge_global_data
|
|
|
|
def set_merge_metadata(self, merge_metadata: dict[str, str]) -> None:
|
|
assert self.data["recipients"] # must be called after set_to
|
|
for recipient in self.data["recipients"]:
|
|
recipient_email = recipient["email"]
|
|
if recipient_email in merge_metadata:
|
|
recipient["metadata"] = merge_metadata[recipient_email]
|
|
|
|
def set_esp_extra(self, extra: dict) -> None:
|
|
update_deep(self.data, extra)
|