Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added throttling to token generation (Closes #48) #122

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 79 additions & 1 deletion src/django_otp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ def is_interactive(self):
"""
return not hasattr(self.generate_challenge, 'stub')

def generate_is_allowed(self):
"""
Checks whether it is permissible to call :meth:`generate_challenge`. If
it is allowed, returns ``(True, None)``. Otherwise returns ``(False,
data_dict)``, where ``data_dict`` contains extra information, defined
by the implementation.

This method can be used to implement throttling of token generation for
interactive devices. Client code should check this method before calling
:meth:`generate_challenge` and report problems to the user.
"""
return (True, None)

def generate_challenge(self):
"""
Generates a challenge value that the user will need to produce a token.
Expand Down Expand Up @@ -293,6 +306,68 @@ class VerifyNotAllowed:
N_FAILED_ATTEMPTS = 'N_FAILED_ATTEMPTS'


class GenerationCooldownMixin(models.Model):
"""
Mixin class for models requiring a cooldown duration between generating a challenge.
"""

last_generated_timestamp = models.DateTimeField(
null=True,
blank=True,
help_text="The last time a token was generated for this device.",
)

def generate_is_allowed(self):
"""
Checks if the time since the last token generation is greater than
configured (in seconds).
"""
dt_now = timezone.now()
if (
not self.last_generated_timestamp
or (dt_now - self.last_generated_timestamp).total_seconds()
> self.get_cooldown_duration()
):
return super().generate_is_allowed()
else:
return False, {
'reason': "COOLDOWN_DURATION_PENDING",
'next_generation_at': dt_now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should dt_now be self.last_generated_timestamp here?

Copy link
Contributor Author

@demestav demestav Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! I have corrected it (dbc3fb5) and added a test for the message as well.

+ timedelta(seconds=self.get_cooldown_duration()),
}

def cooldown_reset(self, commit=True):
"""
Call this method to reset cooldown (normally when a successful
verification).

Pass 'commit=False' to avoid calling self.save().
"""
self.last_generated_timestamp = None

if commit:
self.save()

def verify_token(self, token):
"""
Reset the throttle if the token is valid.
"""
verified = super().verify_token(token)
if verified:
self.cooldown_reset()
return verified

@cached_property
def cooldown_enabled(self):
return self.get_cooldown_duration() > 0

def get_cooldown_duration(self):
raise NotImplementedError()

class Meta:
abstract = True


class ThrottlingMixin(models.Model):
"""
Mixin class for models that want throttling behaviour.
Expand All @@ -311,7 +386,10 @@ class ThrottlingMixin(models.Model):
null=True,
blank=True,
default=None,
help_text="A timestamp of the last failed verification attempt. Null if last attempt succeeded.",
help_text=(
"A timestamp of the last failed verification attempt. Null if last attempt"
" succeeded."
),
)

throttling_failure_count = models.PositiveIntegerField(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.16 on 2023-05-12 11:37

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('otp_email', '0004_throttling'),
]

operations = [
migrations.AddField(
model_name='emaildevice',
name='last_generated_timestamp',
field=models.DateTimeField(blank=True, help_text='The last time a token was generated for this device.', null=True),
),
]
81 changes: 53 additions & 28 deletions src/django_otp/plugins/otp_email/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from django.contrib.humanize.templatetags.humanize import naturaltime
from django.core.mail import send_mail
from django.db import models
from django.template import Context, Template
from django.template.loader import get_template
from django.utils import timezone

from django_otp.models import SideChannelDevice, ThrottlingMixin
from django_otp.models import (
GenerationCooldownMixin,
SideChannelDevice,
ThrottlingMixin,
)
from django_otp.util import hex_validator, random_hex

from .conf import settings
Expand All @@ -19,7 +25,7 @@ def key_validator(value): # pragma: no cover
return hex_validator()(value)


class EmailDevice(ThrottlingMixin, SideChannelDevice):
class EmailDevice(GenerationCooldownMixin, ThrottlingMixin, SideChannelDevice):
"""
A :class:`~django_otp.models.SideChannelDevice` that delivers a token to
the email address saved in this object or alternatively to the user's
Expand Down Expand Up @@ -57,34 +63,50 @@ def generate_challenge(self, extra_context=None):
:type extra_context: dict

"""
self.generate_token(valid_secs=settings.OTP_EMAIL_TOKEN_VALIDITY)

context = {'token': self.token, **(extra_context or {})}
if settings.OTP_EMAIL_BODY_TEMPLATE:
body = Template(settings.OTP_EMAIL_BODY_TEMPLATE).render(Context(context))
else:
body = get_template(settings.OTP_EMAIL_BODY_TEMPLATE_PATH).render(context)

if settings.OTP_EMAIL_BODY_HTML_TEMPLATE:
body_html = Template(settings.OTP_EMAIL_BODY_HTML_TEMPLATE).render(
Context(context)
)
elif settings.OTP_EMAIL_BODY_HTML_TEMPLATE_PATH:
body_html = get_template(settings.OTP_EMAIL_BODY_HTML_TEMPLATE_PATH).render(
context
generate_allowed, data_dict = self.generate_is_allowed()
if generate_allowed:
self.generate_token(valid_secs=settings.OTP_EMAIL_TOKEN_VALIDITY)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is now doing two big things. Perhaps it's time to move the body of if generate_allowed: into a private method for clarity. See https://github.com/django-otp/django-otp-twilio/blob/9e1aab519c926a3d281292d549ce2040c821fe1a/src/otp_twilio/models.py#L55, e.g.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a private method for that part (dbe61d7).

self.last_generated_timestamp = timezone.now()

context = {'token': self.token, **(extra_context or {})}
if settings.OTP_EMAIL_BODY_TEMPLATE:
body = Template(settings.OTP_EMAIL_BODY_TEMPLATE).render(
Context(context)
)
else:
body = get_template(settings.OTP_EMAIL_BODY_TEMPLATE_PATH).render(context)

if settings.OTP_EMAIL_BODY_HTML_TEMPLATE:
body_html = Template(settings.OTP_EMAIL_BODY_HTML_TEMPLATE).render(
Context(context)
)
elif settings.OTP_EMAIL_BODY_HTML_TEMPLATE_PATH:
body_html = get_template(settings.OTP_EMAIL_BODY_HTML_TEMPLATE_PATH).render(
context
)
else:
body_html = None

send_mail(
str(settings.OTP_EMAIL_SUBJECT),
body,
settings.OTP_EMAIL_SENDER,
[self.email or self.user.email],
html_message=body_html,
)
else:
body_html = None

send_mail(
str(settings.OTP_EMAIL_SUBJECT),
body,
settings.OTP_EMAIL_SENDER,
[self.email or self.user.email],
html_message=body_html,
)

message = "sent by email"
message = "sent by email"
else:
if data_dict['reason'] == 'COOLDOWN_DURATION_PENDING':
next_generation_naturaltime = naturaltime(
data_dict['next_generation_at']
)
message = (
"Token generation cooldown period has not expired yet. Next"
f" generation allowed {next_generation_naturaltime}"
)
else:
message = "Token generation is not allowed at this time"

return message

Expand All @@ -101,3 +123,6 @@ def verify_token(self, token):
verified = False

return verified

def get_cooldown_duration(self):
return settings.OTP_EMAIL_GENERATION_INTERVAL
15 changes: 14 additions & 1 deletion src/django_otp/plugins/otp_email/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from django.test.utils import override_settings

from django_otp.forms import OTPAuthenticationForm
from django_otp.tests import TestCase, ThrottlingTestMixin
from django_otp.tests import (
GenerationThrottlingTestMixin,
TestCase,
ThrottlingTestMixin,
)

from .models import EmailDevice

Expand Down Expand Up @@ -235,3 +239,12 @@ def valid_token(self):

def invalid_token(self):
return -1


@override_settings(
OTP_EMAIL_GENERATION_INTERVAL=1,
)
class GenerationThrottlingTestCase(
EmailDeviceMixin, GenerationThrottlingTestMixin, TestCase
):
pass
40 changes: 40 additions & 0 deletions src/django_otp/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from django.core import mail
from django.core.management import call_command
from django.core.management.base import CommandError
from django.db import IntegrityError, connection
Expand Down Expand Up @@ -164,6 +165,45 @@ def test_verify_is_allowed(self):
self.assertEqual(data3, None)


class GenerationThrottlingTestMixin:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is based on ThrottlingTestMixin, which makes sense. However, it's asserting on len(mail.outbox), which is email-specific. This will presumably need at least one abstract method to actually be generic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I have added generic tests and moved email specific tests to the email test file (29ebe3e).

def setUp(self):
self.device = None

def test_cooldown_imposed_after_successful_generation(self):
message = self.device.generate_challenge()
self.assertEqual(message, 'sent by email')
message = self.device.generate_challenge()
self.assertTrue(
message.startswith('Token generation cooldown period has not expired yet.')
)

def test_allow_generation_after_cooldown(self):
self.assertEqual(len(mail.outbox), 0)
# First generation is allowed
self.device.generate_challenge()
# Assert that an email is sent
self.assertEqual(len(mail.outbox), 1)

with freeze_time():
# Second generation, within cooldown period
message = self.device.generate_challenge()
self.assertTrue(
message.startswith(
'Token generation cooldown period has not expired yet.'
)
)
# Assert that no email is sent
self.assertEqual(len(mail.outbox), 1)

with freeze_time() as frozen_time:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should work in practice as-is, but would it make sense to use this context for the entire method body? The point is to control time for the whole sequence of operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it makes perfect sense. I have corrected this in all tests (29ebe3e).

# Third generation after cooldown period
frozen_time.tick(delta=timedelta(seconds=1.1))
message = self.device.generate_challenge()
self.assertEqual(message, 'sent by email')
# Assert that a second email is sent
self.assertEqual(len(mail.outbox), 2)


@override_settings(OTP_STATIC_THROTTLE_FACTOR=0)
class APITestCase(TestCase):
def setUp(self):
Expand Down