mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-20 11:51:05 -05:00
Mandrill: include auth in webhook signature calc
Mandrill's webhook signature calculation uses the *exact url* Mandrill is posting to. If HTTP basic auth is also used, that auth is included in the url. Anymail was using Django's request.build_absolute_uri, which doesn't include HTTP basic auth. Anymail now includes the auth in the calculation, if it was present in the request. This should eliminate the need to use the ANYMAIL_MANDRILL_WEBHOOK_URL override, if Django's SECURE_PROXY_SSL_HEADER and USE_X_FORWARDED_HOST (and/or USE_X_FORWARDED_PROTO) settings are correct for your server. (The calculated url is now also included in the validation failure error message, to aid debugging.) Fixes #48
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
import base64
|
||||||
import mimetypes
|
import mimetypes
|
||||||
from base64 import b64encode
|
from base64 import b64encode
|
||||||
from collections import Mapping, MutableMapping
|
from collections import Mapping, MutableMapping
|
||||||
@@ -12,6 +13,8 @@ from django.core.mail.message import sanitize_address, DEFAULT_ATTACHMENT_MIME_T
|
|||||||
from django.utils.encoding import force_text
|
from django.utils.encoding import force_text
|
||||||
from django.utils.functional import Promise
|
from django.utils.functional import Promise
|
||||||
from django.utils.timezone import utc
|
from django.utils.timezone import utc
|
||||||
|
# noinspection PyUnresolvedReferences
|
||||||
|
from six.moves.urllib.parse import urlsplit, urlunsplit
|
||||||
|
|
||||||
from .exceptions import AnymailConfigurationError, AnymailInvalidAddress
|
from .exceptions import AnymailConfigurationError, AnymailInvalidAddress
|
||||||
|
|
||||||
@@ -342,3 +345,33 @@ def force_non_lazy_dict(obj):
|
|||||||
return {key: force_non_lazy_dict(value) for key, value in obj.items()}
|
return {key: force_non_lazy_dict(value) for key, value in obj.items()}
|
||||||
except (AttributeError, TypeError):
|
except (AttributeError, TypeError):
|
||||||
return force_non_lazy(obj)
|
return force_non_lazy(obj)
|
||||||
|
|
||||||
|
|
||||||
|
def get_request_basic_auth(request):
|
||||||
|
"""Returns HTTP basic auth string sent with request, or None.
|
||||||
|
|
||||||
|
If request includes basic auth, result is string 'username:password'.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
authtype, authdata = request.META['HTTP_AUTHORIZATION'].split()
|
||||||
|
if authtype.lower() == "basic":
|
||||||
|
return base64.b64decode(authdata).decode('utf-8')
|
||||||
|
except (IndexError, KeyError, TypeError, ValueError):
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_request_uri(request):
|
||||||
|
"""Returns the "exact" url used to call request.
|
||||||
|
|
||||||
|
Like :func:`django.http.request.HTTPRequest.build_absolute_uri`,
|
||||||
|
but also inlines HTTP basic auth, if present.
|
||||||
|
"""
|
||||||
|
url = request.build_absolute_uri()
|
||||||
|
basic_auth = get_request_basic_auth(request)
|
||||||
|
if basic_auth is not None:
|
||||||
|
# must reassemble url with auth
|
||||||
|
parts = urlsplit(url)
|
||||||
|
url = urlunsplit((parts.scheme, basic_auth + '@' + parts.netloc,
|
||||||
|
parts.path, parts.query, parts.fragment))
|
||||||
|
return url
|
||||||
|
|||||||
@@ -1,15 +1,14 @@
|
|||||||
import base64
|
|
||||||
import re
|
import re
|
||||||
import six
|
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
|
import six
|
||||||
from django.http import HttpResponse
|
from django.http import HttpResponse
|
||||||
from django.utils.decorators import method_decorator
|
from django.utils.decorators import method_decorator
|
||||||
from django.views.decorators.csrf import csrf_exempt
|
from django.views.decorators.csrf import csrf_exempt
|
||||||
from django.views.generic import View
|
from django.views.generic import View
|
||||||
|
|
||||||
from ..exceptions import AnymailInsecureWebhookWarning, AnymailWebhookValidationFailure
|
from ..exceptions import AnymailInsecureWebhookWarning, AnymailWebhookValidationFailure
|
||||||
from ..utils import get_anymail_setting, collect_all_methods
|
from ..utils import get_anymail_setting, collect_all_methods, get_request_basic_auth
|
||||||
|
|
||||||
|
|
||||||
class AnymailBasicAuthMixin(object):
|
class AnymailBasicAuthMixin(object):
|
||||||
@@ -42,16 +41,8 @@ class AnymailBasicAuthMixin(object):
|
|||||||
def validate_request(self, request):
|
def validate_request(self, request):
|
||||||
"""If configured for webhook basic auth, validate request has correct auth."""
|
"""If configured for webhook basic auth, validate request has correct auth."""
|
||||||
if self.basic_auth:
|
if self.basic_auth:
|
||||||
valid = False
|
basic_auth = get_request_basic_auth(request)
|
||||||
try:
|
if basic_auth is None or basic_auth not in self.basic_auth:
|
||||||
authtype, authdata = request.META['HTTP_AUTHORIZATION'].split()
|
|
||||||
if authtype.lower() == "basic":
|
|
||||||
auth = base64.b64decode(authdata).decode('utf-8')
|
|
||||||
if auth in self.basic_auth:
|
|
||||||
valid = True
|
|
||||||
except (IndexError, KeyError, TypeError, ValueError):
|
|
||||||
valid = False
|
|
||||||
if not valid:
|
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
raise AnymailWebhookValidationFailure(
|
raise AnymailWebhookValidationFailure(
|
||||||
"Missing or invalid basic auth in Anymail %s webhook" % self.esp_name)
|
"Missing or invalid basic auth in Anymail %s webhook" % self.esp_name)
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from django.utils.timezone import utc
|
|||||||
from .base import AnymailBaseWebhookView
|
from .base import AnymailBaseWebhookView
|
||||||
from ..exceptions import AnymailWebhookValidationFailure, AnymailConfigurationError
|
from ..exceptions import AnymailWebhookValidationFailure, AnymailConfigurationError
|
||||||
from ..signals import tracking, AnymailTrackingEvent, EventType
|
from ..signals import tracking, AnymailTrackingEvent, EventType
|
||||||
from ..utils import get_anymail_setting, getfirst
|
from ..utils import get_anymail_setting, getfirst, get_request_uri
|
||||||
|
|
||||||
|
|
||||||
class MandrillSignatureMixin(object):
|
class MandrillSignatureMixin(object):
|
||||||
@@ -45,16 +45,18 @@ class MandrillSignatureMixin(object):
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
raise AnymailWebhookValidationFailure("X-Mandrill-Signature header missing from webhook POST")
|
raise AnymailWebhookValidationFailure("X-Mandrill-Signature header missing from webhook POST")
|
||||||
|
|
||||||
# Mandrill signs the exact URL plus the sorted POST params:
|
# Mandrill signs the exact URL (including basic auth, if used) plus the sorted POST params:
|
||||||
signed_data = self.webhook_url or request.build_absolute_uri()
|
url = self.webhook_url or get_request_uri(request)
|
||||||
params = request.POST.dict()
|
params = request.POST.dict()
|
||||||
|
signed_data = url
|
||||||
for key in sorted(params.keys()):
|
for key in sorted(params.keys()):
|
||||||
signed_data += key + params[key]
|
signed_data += key + params[key]
|
||||||
|
|
||||||
expected_signature = b64encode(hmac.new(key=self.webhook_key, msg=signed_data.encode('utf-8'),
|
expected_signature = b64encode(hmac.new(key=self.webhook_key, msg=signed_data.encode('utf-8'),
|
||||||
digestmod=hashlib.sha1).digest())
|
digestmod=hashlib.sha1).digest())
|
||||||
if not constant_time_compare(signature, expected_signature):
|
if not constant_time_compare(signature, expected_signature):
|
||||||
raise AnymailWebhookValidationFailure("Mandrill webhook called with incorrect signature")
|
raise AnymailWebhookValidationFailure(
|
||||||
|
"Mandrill webhook called with incorrect signature (for url %r)" % url)
|
||||||
|
|
||||||
|
|
||||||
class MandrillBaseWebhookView(MandrillSignatureMixin, AnymailBaseWebhookView):
|
class MandrillBaseWebhookView(MandrillSignatureMixin, AnymailBaseWebhookView):
|
||||||
|
|||||||
@@ -19,21 +19,31 @@ from .webhook_cases import WebhookTestCase, WebhookBasicAuthTestsMixin
|
|||||||
TEST_WEBHOOK_KEY = 'TEST_WEBHOOK_KEY'
|
TEST_WEBHOOK_KEY = 'TEST_WEBHOOK_KEY'
|
||||||
|
|
||||||
|
|
||||||
def mandrill_args(events=None, url='/anymail/mandrill/tracking/', key=TEST_WEBHOOK_KEY):
|
def mandrill_args(events=None,
|
||||||
|
host="http://testserver/", # Django test-client default
|
||||||
|
path='/anymail/mandrill/tracking/', # Anymail urlconf default
|
||||||
|
auth="username:password", # WebhookTestCase default
|
||||||
|
key=TEST_WEBHOOK_KEY):
|
||||||
"""Returns TestClient.post kwargs for Mandrill webhook call with events
|
"""Returns TestClient.post kwargs for Mandrill webhook call with events
|
||||||
|
|
||||||
Computes correct signature.
|
Computes correct signature.
|
||||||
"""
|
"""
|
||||||
if events is None:
|
if events is None:
|
||||||
events = []
|
events = []
|
||||||
url = urljoin('http://testserver/', url)
|
test_client_path = urljoin(host, path) # https://testserver/anymail/mandrill/tracking/
|
||||||
|
if auth:
|
||||||
|
# we can get away with this simplification in these controlled tests,
|
||||||
|
# but don't ever construct urls like this in production code -- it's not safe!
|
||||||
|
full_url = test_client_path.replace("://", "://" + auth + "@")
|
||||||
|
else:
|
||||||
|
full_url = test_client_path
|
||||||
mandrill_events = json.dumps(events)
|
mandrill_events = json.dumps(events)
|
||||||
signed_data = url + 'mandrill_events' + mandrill_events
|
signed_data = full_url + 'mandrill_events' + mandrill_events
|
||||||
signature = b64encode(hmac.new(key=key.encode('ascii'),
|
signature = b64encode(hmac.new(key=key.encode('ascii'),
|
||||||
msg=signed_data.encode('utf-8'),
|
msg=signed_data.encode('utf-8'),
|
||||||
digestmod=hashlib.sha1).digest())
|
digestmod=hashlib.sha1).digest())
|
||||||
return {
|
return {
|
||||||
'path': url,
|
'path': test_client_path,
|
||||||
'data': {'mandrill_events': mandrill_events},
|
'data': {'mandrill_events': mandrill_events},
|
||||||
'HTTP_X_MANDRILL_SIGNATURE': signature,
|
'HTTP_X_MANDRILL_SIGNATURE': signature,
|
||||||
}
|
}
|
||||||
@@ -78,6 +88,43 @@ class MandrillWebhookSecurityTestCase(WebhookTestCase, WebhookBasicAuthTestsMixi
|
|||||||
response = self.client.post(**kwargs)
|
response = self.client.post(**kwargs)
|
||||||
self.assertEqual(response.status_code, 400)
|
self.assertEqual(response.status_code, 400)
|
||||||
|
|
||||||
|
@override_settings(ANYMAIL={}) # clear WEBHOOK_AUTHORIZATION from WebhookTestCase
|
||||||
|
def test_no_basic_auth(self):
|
||||||
|
# Signature validation should work properly if you're not using basic auth
|
||||||
|
self.clear_basic_auth()
|
||||||
|
kwargs = mandrill_args([{'event': 'send'}], auth="")
|
||||||
|
response = self.client.post(**kwargs)
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
@override_settings(ANYMAIL={
|
||||||
|
"MANDRILL_WEBHOOK_URL": "https://abcde:12345@example.com/anymail/mandrill/tracking/",
|
||||||
|
"WEBHOOK_AUTHORIZATION": "abcde:12345",
|
||||||
|
})
|
||||||
|
def test_webhook_url_setting(self):
|
||||||
|
# If Django can't build_absolute_uri correctly (e.g., because your proxy
|
||||||
|
# frontend isn't setting the proxy headers correctly), you must set
|
||||||
|
# MANDRILL_WEBHOOK_URL to the actual public url where Mandrill calls the webhook.
|
||||||
|
self.set_basic_auth("abcde", "12345")
|
||||||
|
kwargs = mandrill_args([{'event': 'send'}], host="https://example.com/", auth="abcde:12345")
|
||||||
|
response = self.client.post(SERVER_NAME="127.0.0.1", **kwargs)
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
# override WebhookBasicAuthTestsMixin version of this test
|
||||||
|
@override_settings(ANYMAIL={'WEBHOOK_AUTHORIZATION': ['cred1:pass1', 'cred2:pass2']})
|
||||||
|
def test_supports_credential_rotation(self):
|
||||||
|
"""You can supply a list of basic auth credentials, and any is allowed"""
|
||||||
|
self.set_basic_auth('cred1', 'pass1')
|
||||||
|
response = self.client.post(**mandrill_args(auth="cred1:pass1"))
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
self.set_basic_auth('cred2', 'pass2')
|
||||||
|
response = self.client.post(**mandrill_args(auth="cred2:pass2"))
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
self.set_basic_auth('baduser', 'wrongpassword')
|
||||||
|
response = self.client.post(**mandrill_args(auth="baduser:wrongpassword"))
|
||||||
|
self.assertEqual(response.status_code, 400)
|
||||||
|
|
||||||
|
|
||||||
@override_settings(ANYMAIL_MANDRILL_WEBHOOK_KEY=TEST_WEBHOOK_KEY)
|
@override_settings(ANYMAIL_MANDRILL_WEBHOOK_KEY=TEST_WEBHOOK_KEY)
|
||||||
class MandrillTrackingTestCase(WebhookTestCase):
|
class MandrillTrackingTestCase(WebhookTestCase):
|
||||||
|
|||||||
@@ -1,11 +1,16 @@
|
|||||||
# Tests for the anymail/utils.py module
|
# Tests for the anymail/utils.py module
|
||||||
# (not to be confused with utilities for testing found in in tests/utils.py)
|
# (not to be confused with utilities for testing found in in tests/utils.py)
|
||||||
|
import base64
|
||||||
|
|
||||||
import six
|
import six
|
||||||
from django.test import SimpleTestCase
|
from django.test import SimpleTestCase, RequestFactory, override_settings
|
||||||
from django.utils.translation import ugettext_lazy, string_concat
|
from django.utils.translation import ugettext_lazy, string_concat
|
||||||
|
|
||||||
from anymail.exceptions import AnymailInvalidAddress
|
from anymail.exceptions import AnymailInvalidAddress
|
||||||
from anymail.utils import ParsedEmail, is_lazy, force_non_lazy, force_non_lazy_dict, force_non_lazy_list, update_deep
|
from anymail.utils import (ParsedEmail,
|
||||||
|
is_lazy, force_non_lazy, force_non_lazy_dict, force_non_lazy_list,
|
||||||
|
update_deep,
|
||||||
|
get_request_uri, get_request_basic_auth)
|
||||||
|
|
||||||
|
|
||||||
class ParsedEmailTests(SimpleTestCase):
|
class ParsedEmailTests(SimpleTestCase):
|
||||||
@@ -142,3 +147,61 @@ class UpdateDeepTests(SimpleTestCase):
|
|||||||
second = defaultdict(None, a=dict(a2=2))
|
second = defaultdict(None, a=dict(a2=2))
|
||||||
update_deep(first, second)
|
update_deep(first, second)
|
||||||
self.assertEqual(first, {'a': {'a1': 1, 'a2': 2}, 'c': {'c1': 1}})
|
self.assertEqual(first, {'a': {'a1': 1, 'a2': 2}, 'c': {'c1': 1}})
|
||||||
|
|
||||||
|
|
||||||
|
class RequestUtilsTests(SimpleTestCase):
|
||||||
|
"""Test utils.get_request_* helpers"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.request_factory = RequestFactory()
|
||||||
|
super(RequestUtilsTests, self).setUp()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def basic_auth(username, password):
|
||||||
|
"""Return HTTP_AUTHORIZATION header value for basic auth with username, password"""
|
||||||
|
credentials = base64.b64encode("{}:{}".format(username, password).encode('utf-8')).decode('utf-8')
|
||||||
|
return "Basic {}".format(credentials)
|
||||||
|
|
||||||
|
def test_get_request_basic_auth(self):
|
||||||
|
# without auth:
|
||||||
|
request = self.request_factory.post('/path/to/?query',
|
||||||
|
HTTP_HOST='www.example.com',
|
||||||
|
HTTP_SCHEME='https')
|
||||||
|
self.assertIsNone(get_request_basic_auth(request))
|
||||||
|
|
||||||
|
# with basic auth:
|
||||||
|
request = self.request_factory.post('/path/to/?query',
|
||||||
|
HTTP_HOST='www.example.com',
|
||||||
|
HTTP_AUTHORIZATION=self.basic_auth('user', 'pass'))
|
||||||
|
self.assertEqual(get_request_basic_auth(request), "user:pass")
|
||||||
|
|
||||||
|
# with some other auth
|
||||||
|
request = self.request_factory.post('/path/to/?query',
|
||||||
|
HTTP_HOST='www.example.com',
|
||||||
|
HTTP_AUTHORIZATION="Bearer abcde12345")
|
||||||
|
self.assertIsNone(get_request_basic_auth(request))
|
||||||
|
|
||||||
|
def test_get_request_uri(self):
|
||||||
|
# without auth:
|
||||||
|
request = self.request_factory.post('/path/to/?query', secure=True,
|
||||||
|
HTTP_HOST='www.example.com')
|
||||||
|
self.assertEqual(get_request_uri(request),
|
||||||
|
"https://www.example.com/path/to/?query")
|
||||||
|
|
||||||
|
# with basic auth:
|
||||||
|
request = self.request_factory.post('/path/to/?query', secure=True,
|
||||||
|
HTTP_HOST='www.example.com',
|
||||||
|
HTTP_AUTHORIZATION=self.basic_auth('user', 'pass'))
|
||||||
|
self.assertEqual(get_request_uri(request),
|
||||||
|
"https://user:pass@www.example.com/path/to/?query")
|
||||||
|
|
||||||
|
@override_settings(SECURE_PROXY_SSL_HEADER=('HTTP_X_FORWARDED_PROTO', 'https'),
|
||||||
|
USE_X_FORWARDED_HOST=True)
|
||||||
|
def test_get_request_uri_with_proxy(self):
|
||||||
|
request = self.request_factory.post('/path/to/?query', secure=False,
|
||||||
|
HTTP_HOST='web1.internal',
|
||||||
|
HTTP_X_FORWARDED_PROTO='https',
|
||||||
|
HTTP_X_FORWARDED_HOST='secret.example.com:8989',
|
||||||
|
HTTP_AUTHORIZATION=self.basic_auth('user', 'pass'))
|
||||||
|
self.assertEqual(get_request_uri(request),
|
||||||
|
"https://user:pass@secret.example.com:8989/path/to/?query")
|
||||||
|
|||||||
@@ -43,6 +43,9 @@ class WebhookTestCase(AnymailTestMixin, SimpleTestCase):
|
|||||||
credentials = base64.b64encode("{}:{}".format(username, password).encode('utf-8')).decode('utf-8')
|
credentials = base64.b64encode("{}:{}".format(username, password).encode('utf-8')).decode('utf-8')
|
||||||
self.client.defaults['HTTP_AUTHORIZATION'] = "Basic {}".format(credentials)
|
self.client.defaults['HTTP_AUTHORIZATION'] = "Basic {}".format(credentials)
|
||||||
|
|
||||||
|
def clear_basic_auth(self):
|
||||||
|
self.client.defaults.pop('HTTP_AUTHORIZATION', None)
|
||||||
|
|
||||||
def assert_handler_called_once_with(self, mockfn, *expected_args, **expected_kwargs):
|
def assert_handler_called_once_with(self, mockfn, *expected_args, **expected_kwargs):
|
||||||
"""Verifies mockfn was called with expected_args and at least expected_kwargs.
|
"""Verifies mockfn was called with expected_args and at least expected_kwargs.
|
||||||
|
|
||||||
@@ -98,7 +101,7 @@ class WebhookBasicAuthTestsMixin(object):
|
|||||||
self.assertEqual(response.status_code, 400)
|
self.assertEqual(response.status_code, 400)
|
||||||
|
|
||||||
def test_verifies_missing_auth(self):
|
def test_verifies_missing_auth(self):
|
||||||
del self.client.defaults['HTTP_AUTHORIZATION']
|
self.clear_basic_auth()
|
||||||
response = self.call_webhook()
|
response = self.call_webhook()
|
||||||
self.assertEqual(response.status_code, 400)
|
self.assertEqual(response.status_code, 400)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user