mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-20 11:51:05 -05:00
Inbound: add parse_raw_mime_bytes and parse_raw_mime_file
Useful for cases where ESP could send raw 8bit message (and its charset is something other than utf-8). Also reworks earlier Python 2.7 workaround email.parser.Parser header unfolding bugs to handle any text-like, file-like IO stream, without trying to manipulate the entire message as a single string.
This commit is contained in:
@@ -14,25 +14,27 @@ try:
|
|||||||
# avoids earlier bugs. (Note that Parser defaults to policy=compat32,
|
# avoids earlier bugs. (Note that Parser defaults to policy=compat32,
|
||||||
# which *preserves* earlier bugs.)
|
# which *preserves* earlier bugs.)
|
||||||
from email.policy import default
|
from email.policy import default
|
||||||
|
from email.parser import BytesParser
|
||||||
|
|
||||||
class EmailParser(Parser):
|
class EmailParser(Parser):
|
||||||
def __init__(self, _class=None, policy=default): # don't default to compat32 policy
|
def __init__(self, _class=None, policy=default): # don't default to compat32 policy
|
||||||
super(EmailParser, self).__init__(_class, policy=policy)
|
super(EmailParser, self).__init__(_class, policy=policy)
|
||||||
|
|
||||||
|
class EmailBytesParser(BytesParser):
|
||||||
|
def __init__(self, _class=None, policy=default): # don't default to compat32 policy
|
||||||
|
super(EmailBytesParser, self).__init__(_class, policy=policy)
|
||||||
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
# Pre-Python 3.3 email package: try to work around some bugs
|
# Pre-Python 3.3 email package: try to work around some bugs
|
||||||
import re
|
|
||||||
from email.header import decode_header
|
from email.header import decode_header
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
class EmailParser(Parser):
|
class EmailParser(Parser):
|
||||||
def parsestr(self, text, headersonly=False):
|
def parse(self, fp, headersonly=False):
|
||||||
# Older Parser doesn't correctly unfold headers (RFC5322 section 2.2.3).
|
# Older Parser doesn't correctly unfold headers (RFC5322 section 2.2.3).
|
||||||
# Help it out by pre-unfolding the headers for it.
|
# Help it out by pre-unfolding the headers for it.
|
||||||
# This only works for root headers, not ones within a MIME subpart.
|
fp = HeaderUnfoldingWrapper(fp)
|
||||||
# (Finding subpart headers requires actually parsing the message.)
|
message = Parser.parse(self, fp, headersonly=headersonly)
|
||||||
headers, body = _split_headers_and_body(text)
|
|
||||||
unfolded = "".join([_unfold_headers(headers), body])
|
|
||||||
message = Parser.parsestr(self, unfolded, headersonly=headersonly)
|
|
||||||
|
|
||||||
# Older Parser doesn't decode RFC2047 headers, so fix them up here.
|
# Older Parser doesn't decode RFC2047 headers, so fix them up here.
|
||||||
# (Since messsage is fully parsed, can decode headers in all MIME subparts.)
|
# (Since messsage is fully parsed, can decode headers in all MIME subparts.)
|
||||||
@@ -42,29 +44,74 @@ except ImportError:
|
|||||||
for name, value in part._headers]
|
for name, value in part._headers]
|
||||||
return message
|
return message
|
||||||
|
|
||||||
# Note: email.feedparser.headerRE is a more-complicated RE for recognizing headers.
|
class EmailBytesParser(EmailParser):
|
||||||
# It tries to support defective messages missing a blank line between headers and body
|
def parsebytes(self, text, headersonly=False):
|
||||||
# (but introduces other problems, e.g., https://bugs.python.org/issue26686).
|
# In Python 2, bytes is str, and Parser.parsestr uses bytes-friendly cStringIO.StringIO.
|
||||||
# Since those messages are already out of spec, this code doesn't worry about them.
|
return self.parsestr(text, headersonly)
|
||||||
_body_sep_re = re.compile(r'(\r\n|\r|\n)(\1)') # "an empty line" allowing CRLF, CR, or LF endings (but not mixed)
|
|
||||||
_header_fold_re = re.compile(r'(\r\n|\r|\n)(?=[ \t])') # "any CRLF that is immediately followed by WSP"
|
|
||||||
|
|
||||||
def _split_headers_and_body(text):
|
class HeaderUnfoldingWrapper:
|
||||||
# RFC5322 section 2.1:
|
"""
|
||||||
# "The body ... is separated from the header section by an empty line (i.e., a line with nothing
|
A wrapper for file-like objects passed to email.parser.Parser.parse which works
|
||||||
# preceding the CRLF)." (And per email.parser semantics, this allows CRLF, CR, or LF endings)
|
around older Parser bugs with folded email headers by pre-unfolding them.
|
||||||
parts = _body_sep_re.split(text, maxsplit=1) # [headers, sep, sep, body] or just [headers]
|
|
||||||
|
This only works for headers at the message root, not ones within a MIME subpart.
|
||||||
|
(Accurately recognizing subpart headers would require parsing mixed-content boundaries.)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, fp):
|
||||||
|
self.fp = fp
|
||||||
|
self._in_headers = True
|
||||||
|
self._pushback = deque()
|
||||||
|
|
||||||
|
def _readline(self, limit=-1):
|
||||||
try:
|
try:
|
||||||
return "".join(parts[0:2]), "".join(parts[2:])
|
line = self._pushback.popleft()
|
||||||
except IndexError:
|
except IndexError:
|
||||||
assert len(parts) == 1
|
line = self.fp.readline(limit)
|
||||||
return parts[0], ""
|
# cStringIO.readline doesn't recognize universal newlines; splitlines does
|
||||||
|
lines = line.splitlines(True)
|
||||||
|
if len(lines) > 1:
|
||||||
|
line = lines[0]
|
||||||
|
self._pushback.extend(lines[1:])
|
||||||
|
return line
|
||||||
|
|
||||||
def _unfold_headers(text):
|
def _peekline(self, limit=-1):
|
||||||
# RFC5322 section 2.2.3:
|
try:
|
||||||
# "Unfolding is accomplished by simply removing any CRLF that is immediately followed by WSP"
|
line = self._pushback[0]
|
||||||
# (WSP is space or tab, and per email.parser semantics, this allows CRLF, CR, or LF endings)
|
except IndexError:
|
||||||
return _header_fold_re.sub("", text)
|
line = self._readline(limit)
|
||||||
|
self._pushback.appendleft(line)
|
||||||
|
return line
|
||||||
|
|
||||||
|
def readline(self, limit=-1):
|
||||||
|
line = self._readline(limit)
|
||||||
|
if self._in_headers:
|
||||||
|
line_without_end = line.rstrip("\r\n") # CRLF, CR, or LF -- "universal newlines"
|
||||||
|
if len(line_without_end) == 0:
|
||||||
|
# RFC5322 section 2.1: "The body ... is separated from the header section
|
||||||
|
# by an empty line (i.e., a line with nothing preceding the CRLF)."
|
||||||
|
self._in_headers = False
|
||||||
|
else:
|
||||||
|
# Is this header line folded? Need to check next line...
|
||||||
|
# RFC5322 section 2.2.3: "Unfolding is accomplished by simply removing any CRLF
|
||||||
|
# that is immediately followed by WSP." (WSP is space or tab)
|
||||||
|
next_line = self._peekline(limit)
|
||||||
|
if next_line.startswith((' ', '\t')):
|
||||||
|
line = line_without_end
|
||||||
|
return line
|
||||||
|
|
||||||
|
def read(self, size):
|
||||||
|
if self._in_headers:
|
||||||
|
# For simplicity, just read a line at a time while in the header section.
|
||||||
|
# (This works because we know email.parser.Parser doesn't really care if it reads
|
||||||
|
# more or less data than it asked for -- it just pushes it into FeedParser either way.)
|
||||||
|
return self.readline(size)
|
||||||
|
elif len(self._pushback):
|
||||||
|
buf = ''.join(self._pushback)
|
||||||
|
self._pushback.clear()
|
||||||
|
return buf
|
||||||
|
else:
|
||||||
|
return self.fp.read(size)
|
||||||
|
|
||||||
def _decode_rfc2047(value):
|
def _decode_rfc2047(value):
|
||||||
result = value
|
result = value
|
||||||
@@ -278,6 +325,19 @@ class AnymailInboundMessage(Message, object): # `object` ensures new-style clas
|
|||||||
"""Returns a new AnymailInboundMessage parsed from str s"""
|
"""Returns a new AnymailInboundMessage parsed from str s"""
|
||||||
return EmailParser(cls).parsestr(s)
|
return EmailParser(cls).parsestr(s)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def parse_raw_mime_bytes(cls, b):
|
||||||
|
"""Returns a new AnymailInboundMessage parsed from bytes b"""
|
||||||
|
return EmailBytesParser(cls).parsebytes(b)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def parse_raw_mime_file(cls, fp):
|
||||||
|
"""Returns a new AnymailInboundMessage parsed from file-like object fp"""
|
||||||
|
if isinstance(fp.read(0), six.binary_type):
|
||||||
|
return EmailBytesParser(cls).parse(fp)
|
||||||
|
else:
|
||||||
|
return EmailParser(cls).parse(fp)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def construct(cls, raw_headers=None, from_email=None, to=None, cc=None, subject=None, headers=None,
|
def construct(cls, raw_headers=None, from_email=None, to=None, cc=None, subject=None, headers=None,
|
||||||
text=None, text_charset='utf-8', html=None, html_charset='utf-8',
|
text=None, text_charset='utf-8', html=None, html_charset='utf-8',
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ from django.test import SimpleTestCase
|
|||||||
|
|
||||||
from anymail.inbound import AnymailInboundMessage
|
from anymail.inbound import AnymailInboundMessage
|
||||||
|
|
||||||
from .utils import SAMPLE_IMAGE_FILENAME, python_has_broken_mime_param_handling, sample_image_content
|
from .utils import SAMPLE_IMAGE_FILENAME, python_has_broken_mime_param_handling, sample_email_path, sample_image_content
|
||||||
|
|
||||||
SAMPLE_IMAGE_CONTENT = sample_image_content()
|
SAMPLE_IMAGE_CONTENT = sample_image_content()
|
||||||
|
|
||||||
@@ -153,6 +153,35 @@ class AnymailInboundMessageConstructionTests(SimpleTestCase):
|
|||||||
|
|
||||||
# (see test_attachment_as_uploaded_file below for parsing basic attachment from raw mime)
|
# (see test_attachment_as_uploaded_file below for parsing basic attachment from raw mime)
|
||||||
|
|
||||||
|
def test_parse_raw_mime_bytes(self):
|
||||||
|
raw = (
|
||||||
|
b'Content-Type: text/plain; charset=ISO-8859-3\r\n'
|
||||||
|
b'Content-Transfer-Encoding: 8bit\r\n'
|
||||||
|
b'Subject: Test bytes\r\n'
|
||||||
|
b'\r\n'
|
||||||
|
b'\xD8i estas retpo\xFEto.\r\n')
|
||||||
|
msg = AnymailInboundMessage.parse_raw_mime_bytes(raw)
|
||||||
|
self.assertEqual(msg['Subject'], "Test bytes")
|
||||||
|
self.assertEqual(msg.get_content_text(), "Ĝi estas retpoŝto.\r\n")
|
||||||
|
self.assertEqual(msg.get_content_bytes(), b'\xD8i estas retpo\xFEto.\r\n')
|
||||||
|
self.assertEqual(msg.defects, [])
|
||||||
|
|
||||||
|
def test_parse_raw_mime_file_text(self):
|
||||||
|
with open(sample_email_path(), mode="r") as fp:
|
||||||
|
msg = AnymailInboundMessage.parse_raw_mime_file(fp)
|
||||||
|
self.assertEqual(msg["Subject"], "Test email")
|
||||||
|
self.assertEqual(msg.text, "Hi Bob, This is a message. Thanks!\n")
|
||||||
|
self.assertEqual(msg.get_all("Received"), [ # this is the first line in the sample email file
|
||||||
|
"by luna.mailgun.net with SMTP mgrt 8734663311733; Fri, 03 May 2013 18:26:27 +0000"])
|
||||||
|
|
||||||
|
def test_parse_raw_mime_file_bytes(self):
|
||||||
|
with open(sample_email_path(), mode="rb") as fp:
|
||||||
|
msg = AnymailInboundMessage.parse_raw_mime_file(fp)
|
||||||
|
self.assertEqual(msg["Subject"], "Test email")
|
||||||
|
self.assertEqual(msg.text, "Hi Bob, This is a message. Thanks!\n")
|
||||||
|
self.assertEqual(msg.get_all("Received"), [ # this is the first line in the sample email file
|
||||||
|
"by luna.mailgun.net with SMTP mgrt 8734663311733; Fri, 03 May 2013 18:26:27 +0000"])
|
||||||
|
|
||||||
|
|
||||||
class AnymailInboundMessageConveniencePropTests(SimpleTestCase):
|
class AnymailInboundMessageConveniencePropTests(SimpleTestCase):
|
||||||
# AnymailInboundMessage defines several properties to simplify reading
|
# AnymailInboundMessage defines several properties to simplify reading
|
||||||
@@ -470,12 +499,13 @@ class EmailParserWorkaroundTests(SimpleTestCase):
|
|||||||
Not-A-Header: This is the body.
|
Not-A-Header: This is the body.
|
||||||
It is not folded.
|
It is not folded.
|
||||||
""")
|
""")
|
||||||
msg = AnymailInboundMessage.parse_raw_mime(raw)
|
for end in ('\n', '\r', '\r\n'): # check NL, CR, and CRNL line-endings
|
||||||
|
msg = AnymailInboundMessage.parse_raw_mime(raw.replace('\n', end))
|
||||||
self.assertEqual(msg['Subject'], "This subject uses header folding")
|
self.assertEqual(msg['Subject'], "This subject uses header folding")
|
||||||
self.assertEqual(msg["X-Json"],
|
self.assertEqual(msg["X-Json"],
|
||||||
'{"problematic": ["encoded newline\\n", "comma,semi;no space"]}')
|
'{"problematic": ["encoded newline\\n", "comma,semi;no space"]}')
|
||||||
self.assertEqual(msg.get_content_text(),
|
self.assertEqual(msg.get_content_text(),
|
||||||
"Not-A-Header: This is the body.\n It is not folded.\n")
|
"Not-A-Header: This is the body.{end} It is not folded.{end}".format(end=end))
|
||||||
self.assertEqual(msg.defects, [])
|
self.assertEqual(msg.defects, [])
|
||||||
|
|
||||||
def test_parse_encoded_headers(self):
|
def test_parse_encoded_headers(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user