mirror of
https://github.com/pacnpal/django-anymail.git
synced 2025-12-20 03:41:05 -05:00
128 lines
4.8 KiB
Python
128 lines
4.8 KiB
Python
import base64
|
|
|
|
from django.test import override_settings, SimpleTestCase
|
|
from mock import create_autospec, ANY
|
|
|
|
from anymail.exceptions import AnymailInsecureWebhookWarning
|
|
from anymail.signals import tracking, inbound
|
|
|
|
from .utils import AnymailTestMixin, ClientWithCsrfChecks
|
|
|
|
|
|
def event_handler(sender, event, esp_name, **kwargs):
|
|
"""Prototypical webhook signal handler"""
|
|
pass
|
|
|
|
|
|
@override_settings(ANYMAIL={'WEBHOOK_AUTHORIZATION': 'username:password'})
|
|
class WebhookTestCase(AnymailTestMixin, SimpleTestCase):
|
|
"""Base for testing webhooks
|
|
|
|
- connects webhook signal handlers
|
|
- sets up basic auth by default (since most ESP webhooks warn if it's not enabled)
|
|
"""
|
|
|
|
client_class = ClientWithCsrfChecks
|
|
|
|
def setUp(self):
|
|
super(WebhookTestCase, self).setUp()
|
|
# Use correct basic auth by default (individual tests can override):
|
|
self.set_basic_auth()
|
|
|
|
# Install mocked signal handlers
|
|
self.tracking_handler = create_autospec(event_handler)
|
|
tracking.connect(self.tracking_handler)
|
|
self.addCleanup(tracking.disconnect, self.tracking_handler)
|
|
|
|
self.inbound_handler = create_autospec(event_handler)
|
|
inbound.connect(self.inbound_handler)
|
|
self.addCleanup(inbound.disconnect, self.inbound_handler)
|
|
|
|
def set_basic_auth(self, username='username', password='password'):
|
|
"""Set basic auth for all subsequent test client requests"""
|
|
credentials = base64.b64encode("{}:{}".format(username, password).encode('utf-8')).decode('utf-8')
|
|
self.client.defaults['HTTP_AUTHORIZATION'] = "Basic {}".format(credentials)
|
|
|
|
def clear_basic_auth(self):
|
|
self.client.defaults.pop('HTTP_AUTHORIZATION', None)
|
|
|
|
def assert_handler_called_once_with(self, mockfn, *expected_args, **expected_kwargs):
|
|
"""Verifies mockfn was called with expected_args and at least expected_kwargs.
|
|
|
|
Ignores *additional* actual kwargs (which might be added by Django signal dispatch).
|
|
(This differs from mock.assert_called_once_with.)
|
|
|
|
Returns the actual kwargs.
|
|
"""
|
|
self.assertEqual(mockfn.call_count, 1)
|
|
actual_args, actual_kwargs = mockfn.call_args
|
|
self.assertEqual(actual_args, expected_args)
|
|
for key, expected_value in expected_kwargs.items():
|
|
if expected_value is ANY:
|
|
self.assertIn(key, actual_kwargs)
|
|
else:
|
|
self.assertEqual(actual_kwargs[key], expected_value)
|
|
return actual_kwargs
|
|
|
|
def get_kwargs(self, mockfn):
|
|
"""Return the kwargs passed to the most recent call to mockfn"""
|
|
self.assertIsNotNone(mockfn.call_args) # mockfn hasn't been called yet
|
|
actual_args, actual_kwargs = mockfn.call_args
|
|
return actual_kwargs
|
|
|
|
|
|
# noinspection PyUnresolvedReferences
|
|
class WebhookBasicAuthTestsMixin(object):
|
|
"""Common test cases for webhook basic authentication.
|
|
|
|
Instantiate for each ESP's webhooks by:
|
|
- mixing into WebhookTestCase
|
|
- defining call_webhook to invoke the ESP's webhook
|
|
"""
|
|
|
|
should_warn_if_no_auth = True # subclass set False if other webhook verification used
|
|
|
|
def call_webhook(self):
|
|
# Concrete test cases should call a webhook via self.client.post,
|
|
# and return the response
|
|
raise NotImplementedError()
|
|
|
|
@override_settings(ANYMAIL={}) # Clear the WEBHOOK_AUTH settings from superclass
|
|
def test_warns_if_no_auth(self):
|
|
if self.should_warn_if_no_auth:
|
|
with self.assertWarns(AnymailInsecureWebhookWarning):
|
|
response = self.call_webhook()
|
|
else:
|
|
with self.assertDoesNotWarn(AnymailInsecureWebhookWarning):
|
|
response = self.call_webhook()
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
def test_verifies_basic_auth(self):
|
|
response = self.call_webhook()
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
def test_verifies_bad_auth(self):
|
|
self.set_basic_auth('baduser', 'wrongpassword')
|
|
response = self.call_webhook()
|
|
self.assertEqual(response.status_code, 400)
|
|
|
|
def test_verifies_missing_auth(self):
|
|
self.clear_basic_auth()
|
|
response = self.call_webhook()
|
|
self.assertEqual(response.status_code, 400)
|
|
|
|
@override_settings(ANYMAIL={'WEBHOOK_AUTHORIZATION': ['cred1:pass1', 'cred2:pass2']})
|
|
def test_supports_credential_rotation(self):
|
|
"""You can supply a list of basic auth credentials, and any is allowed"""
|
|
self.set_basic_auth('cred1', 'pass1')
|
|
response = self.call_webhook()
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
self.set_basic_auth('cred2', 'pass2')
|
|
response = self.call_webhook()
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
self.set_basic_auth('baduser', 'wrongpassword')
|
|
response = self.call_webhook()
|
|
self.assertEqual(response.status_code, 400)
|