mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-20 03:41:05 -05:00
SendGrid: fix inbound webhook Unicode error when not utf-8
Fix a crash or text-mangling issue when an inbound message uses a charset other than utf-8 for its text or html body, and SendGrid's "post raw" inbound parse option is *not* enabled. Update docs to recommend "post raw" option. Fixes #187
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import json
|
||||
from textwrap import dedent
|
||||
|
||||
@@ -9,7 +11,7 @@ from anymail.inbound import AnymailInboundMessage
|
||||
from anymail.signals import AnymailInboundEvent
|
||||
from anymail.webhooks.sendgrid import SendGridInboundWebhookView
|
||||
|
||||
from .utils import sample_image_content, sample_email_content
|
||||
from .utils import dedent_bytes, sample_image_content, sample_email_content
|
||||
from .webhook_cases import WebhookTestCase
|
||||
|
||||
|
||||
@@ -183,3 +185,59 @@ class SendgridInboundTestCase(WebhookTestCase):
|
||||
self.assertEqual(message.subject, 'Raw MIME test')
|
||||
self.assertEqual(message.text, u"It's a body\N{HORIZONTAL ELLIPSIS}\n")
|
||||
self.assertEqual(message.html, u"""<div dir="ltr">It's a body\N{HORIZONTAL ELLIPSIS}</div>\n""")
|
||||
|
||||
def test_inbound_charsets(self):
|
||||
# Captured (sanitized) from actual SendGrid inbound webhook payload 7/2020,
|
||||
# using a test message constructed with a variety of charsets:
|
||||
raw_post = dedent_bytes(b"""\
|
||||
--xYzZY
|
||||
Content-Disposition: form-data; name="headers"
|
||||
|
||||
Date: Fri, 24 Jul 2020 16:43:46 UTC
|
||||
To: =?utf-8?q?R=C3=A9cipiendaire_pr=C3=A9cieux?= <inbound@sg.example.com>
|
||||
From: =?utf-8?q?Op=C3=A9rateur?= de test <sender@example.com>
|
||||
Subject: =?cp850?q?Como_usted_pidi=A2?=
|
||||
|
||||
--xYzZY
|
||||
Content-Disposition: form-data; name="subject"
|
||||
|
||||
Como usted pidi\xa2
|
||||
--xYzZY
|
||||
Content-Disposition: form-data; name="to"
|
||||
|
||||
R\xc3\xa9cipiendaire pr\xc3\xa9cieux <inbound@sg.example.com>
|
||||
--xYzZY
|
||||
Content-Disposition: form-data; name="html"
|
||||
|
||||
<p>\xbfEsto se ve como esperabas?</p>
|
||||
--xYzZY
|
||||
Content-Disposition: form-data; name="from"
|
||||
|
||||
Op\xc3\xa9rateur de test <sender@example.com>
|
||||
--xYzZY
|
||||
Content-Disposition: form-data; name="text"
|
||||
|
||||
Test the ESP\x92s inbound charset handling\x85
|
||||
--xYzZY
|
||||
Content-Disposition: form-data; name="charsets"
|
||||
|
||||
{"to":"UTF-8","cc":"UTF-8","html":"iso-8859-1","subject":"cp850","from":"UTF-8","text":"windows-1252"}
|
||||
--xYzZY--
|
||||
""").replace(b"\n", b"\r\n")
|
||||
|
||||
response = self.client.post('/anymail/sendgrid/inbound/', data=raw_post,
|
||||
content_type="multipart/form-data; boundary=xYzZY")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
kwargs = self.assert_handler_called_once_with(self.inbound_handler, sender=SendGridInboundWebhookView,
|
||||
event=ANY, esp_name='SendGrid')
|
||||
event = kwargs['event']
|
||||
message = event.message
|
||||
|
||||
self.assertEqual(message.from_email.display_name, u"Opérateur de test")
|
||||
self.assertEqual(message.from_email.addr_spec, "sender@example.com")
|
||||
self.assertEqual(len(message.to), 1)
|
||||
self.assertEqual(message.to[0].display_name, u"Récipiendaire précieux")
|
||||
self.assertEqual(message.to[0].addr_spec, "inbound@sg.example.com")
|
||||
self.assertEqual(message.subject, u"Como usted pidió")
|
||||
self.assertEqual(message.text, u"Test the ESP’s inbound charset handling…")
|
||||
self.assertEqual(message.html, u"<p>¿Esto se ve como esperabas?</p>")
|
||||
|
||||
@@ -324,3 +324,45 @@ class ClientWithCsrfChecks(Client):
|
||||
def __init__(self, **defaults):
|
||||
super(ClientWithCsrfChecks, self).__init__(
|
||||
enforce_csrf_checks=True, **defaults)
|
||||
|
||||
|
||||
# dedent for bytestrs
|
||||
# https://stackoverflow.com/a/39841195/647002
|
||||
_whitespace_only_re = re.compile(b'^[ \t]+$', re.MULTILINE)
|
||||
_leading_whitespace_re = re.compile(b'(^[ \t]*)(?:[^ \t\n])', re.MULTILINE)
|
||||
|
||||
|
||||
def dedent_bytes(text):
|
||||
"""textwrap.dedent, but for bytes"""
|
||||
# Look for the longest leading string of spaces and tabs common to
|
||||
# all lines.
|
||||
margin = None
|
||||
text = _whitespace_only_re.sub(b'', text)
|
||||
indents = _leading_whitespace_re.findall(text)
|
||||
for indent in indents:
|
||||
if margin is None:
|
||||
margin = indent
|
||||
|
||||
# Current line more deeply indented than previous winner:
|
||||
# no change (previous winner is still on top).
|
||||
elif indent.startswith(margin):
|
||||
pass
|
||||
|
||||
# Current line consistent with and no deeper than previous winner:
|
||||
# it's the new winner.
|
||||
elif margin.startswith(indent):
|
||||
margin = indent
|
||||
|
||||
# Find the largest common whitespace between current line
|
||||
# and previous winner.
|
||||
else:
|
||||
for i, (x, y) in enumerate(zip(margin, indent)):
|
||||
if x != y:
|
||||
margin = margin[:i]
|
||||
break
|
||||
else:
|
||||
margin = margin[:len(indent)]
|
||||
|
||||
if margin:
|
||||
text = re.sub(b'(?m)^' + margin, b'', text)
|
||||
return text
|
||||
|
||||
Reference in New Issue
Block a user