Skip to content

Commit

Permalink
Removed generate token, adapted EmailDevice, added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
demestav committed Oct 18, 2023
1 parent b966e1d commit bb10746
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 33 deletions.
51 changes: 27 additions & 24 deletions src/django_otp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,9 @@ class VerifyNotAllowed:
N_FAILED_ATTEMPTS = 'N_FAILED_ATTEMPTS'


class GenerationThrottlingMixin(models.Model):
class GenerationCooldownMixin(models.Model):
"""
Mixin class for models that need throttling behaviour for generating
tokens.
Mixin class for models requiring a cooldown duration between generating a challenge.
"""

last_generated_timestamp = models.DateTimeField(
Expand All @@ -318,37 +317,29 @@ class GenerationThrottlingMixin(models.Model):
help_text="The last time a token was generated for this device.",
)

def generate_token(self, *args, **kwargs):
"""
Update the timestamp of the last token generation to the current time.
"""
self.last_generated_timestamp = timezone.now()
super().generate_token(*args, **kwargs)

def generate_is_allowed(self):
"""
Checks if the time since the last token generation is greater than
configured (in seconds) by OTP_GENERATION_INTERVAL.
configured (in seconds).
"""
dt_now = timezone.now()
if (
not self.last_generated_timestamp
or (dt_now - self.last_generated_timestamp).total_seconds()
> self.get_generation_interval()
> self.get_cooldown_duration()
):
return True, None
return super().generate_is_allowed()
else:
return False, {
'reason': "TOKEN_GENERATION_THROTTLED",
'next_generation_at': dt_now + timedelta(
seconds=self.get_generation_interval()
),
'reason': "COOLDOWN_DURATION_PENDING",
'next_generation_at': dt_now
+ timedelta(seconds=self.get_cooldown_duration()),
}

def generate_throttle_reset(self, commit=True):
def cooldown_reset(self, commit=True):
"""
Call this method to reset throttling (normally when a token generation
succeeded).
Call this method to reset cooldown (normally when a successful
verification).
Pass 'commit=False' to avoid calling self.save().
"""
Expand All @@ -357,11 +348,20 @@ def generate_throttle_reset(self, commit=True):
if commit:
self.save()

def verify_token(self, token):
"""
Reset the throttle if the token is valid.
"""
verified = super().verify_token(token)
if verified:
self.cooldown_reset()
return verified

@cached_property
def generation_throttling_enabled(self):
return self.get_generation_interval() > 0
def cooldown_enabled(self):
return self.get_cooldown_duration() > 0

def get_generation_interval(self):
def get_cooldown_duration(self):
raise NotImplementedError()

class Meta:
Expand All @@ -386,7 +386,10 @@ class ThrottlingMixin(models.Model):
null=True,
blank=True,
default=None,
help_text="A timestamp of the last failed verification attempt. Null if last attempt succeeded.",
help_text=(
"A timestamp of the last failed verification attempt. Null if last attempt"
" succeeded."
),
)

throttling_failure_count = models.PositiveIntegerField(
Expand Down
27 changes: 19 additions & 8 deletions src/django_otp/plugins/otp_email/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from django.contrib.humanize.templatetags.humanize import naturaltime
from django.core.mail import send_mail
from django.db import models
from django.template import Context, Template
from django.template.loader import get_template
from django.utils import timezone

from django_otp.models import (
GenerationThrottlingMixin,
GenerationCooldownMixin,
SideChannelDevice,
ThrottlingMixin,
)
Expand All @@ -23,7 +25,7 @@ def key_validator(value): # pragma: no cover
return hex_validator()(value)


class EmailDevice(GenerationThrottlingMixin, ThrottlingMixin, SideChannelDevice):
class EmailDevice(GenerationCooldownMixin, ThrottlingMixin, SideChannelDevice):
"""
A :class:`~django_otp.models.SideChannelDevice` that delivers a token to
the email address saved in this object or alternatively to the user's
Expand All @@ -49,9 +51,6 @@ class EmailDevice(GenerationThrottlingMixin, ThrottlingMixin, SideChannelDevice)
help_text='Optional alternative email address to send tokens to',
)

def get_generation_interval(self):
return settings.OTP_EMAIL_GENERATION_INTERVAL

def get_throttle_factor(self):
return settings.OTP_EMAIL_THROTTLE_FACTOR

Expand All @@ -64,9 +63,10 @@ def generate_challenge(self, extra_context=None):
:type extra_context: dict
"""
generate_allowed, _ = self.generate_is_allowed()
generate_allowed, data_dict = self.generate_is_allowed()
if generate_allowed:
self.generate_token(valid_secs=settings.OTP_EMAIL_TOKEN_VALIDITY)
self.last_generated_timestamp = timezone.now()

context = {'token': self.token, **(extra_context or {})}
if settings.OTP_EMAIL_BODY_TEMPLATE:
Expand Down Expand Up @@ -97,7 +97,16 @@ def generate_challenge(self, extra_context=None):

message = "sent by email"
else:
message = "email not sent. Not allowed to generate token at this time."
if data_dict['reason'] == 'COOLDOWN_DURATION_PENDING':
next_generation_naturaltime = naturaltime(
data_dict['next_generation_at']
)
message = (
"Token generation cooldown period has not expired yet. Next"
f" generation allowed {next_generation_naturaltime}"
)
else:
message = "Token generation is not allowed at this time"

return message

Expand All @@ -108,10 +117,12 @@ def verify_token(self, token):

if verified:
self.throttle_reset()
self.generate_throttle_reset()
else:
self.throttle_increment()
else:
verified = False

return verified

def get_cooldown_duration(self):
return settings.OTP_EMAIL_GENERATION_INTERVAL
15 changes: 14 additions & 1 deletion src/django_otp/plugins/otp_email/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from django.test.utils import override_settings

from django_otp.forms import OTPAuthenticationForm
from django_otp.tests import TestCase, ThrottlingTestMixin
from django_otp.tests import (
GenerationThrottlingTestMixin,
TestCase,
ThrottlingTestMixin,
)

from .models import EmailDevice

Expand Down Expand Up @@ -235,3 +239,12 @@ def valid_token(self):

def invalid_token(self):
return -1


@override_settings(
OTP_EMAIL_GENERATION_INTERVAL=1,
)
class GenerationThrottlingTestCase(
EmailDeviceMixin, GenerationThrottlingTestMixin, TestCase
):
pass
40 changes: 40 additions & 0 deletions src/django_otp/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from django.core import mail
from django.core.management import call_command
from django.core.management.base import CommandError
from django.db import IntegrityError, connection
Expand Down Expand Up @@ -164,6 +165,45 @@ def test_verify_is_allowed(self):
self.assertEqual(data3, None)


class GenerationThrottlingTestMixin:
def setUp(self):
self.device = None

def test_cooldown_imposed_after_successful_generation(self):
message = self.device.generate_challenge()
self.assertEqual(message, 'sent by email')
message = self.device.generate_challenge()
self.assertTrue(
message.startswith('Token generation cooldown period has not expired yet.')
)

def test_allow_generation_after_cooldown(self):
self.assertEqual(len(mail.outbox), 0)
# First generation is allowed
self.device.generate_challenge()
# Assert that an email is sent
self.assertEqual(len(mail.outbox), 1)

with freeze_time():
# Second generation, within cooldown period
message = self.device.generate_challenge()
self.assertTrue(
message.startswith(
'Token generation cooldown period has not expired yet.'
)
)
# Assert that no email is sent
self.assertEqual(len(mail.outbox), 1)

with freeze_time() as frozen_time:
# Third generation after cooldown period
frozen_time.tick(delta=timedelta(seconds=1.1))
message = self.device.generate_challenge()
self.assertEqual(message, 'sent by email')
# Assert that a second email is sent
self.assertEqual(len(mail.outbox), 2)


@override_settings(OTP_STATIC_THROTTLE_FACTOR=0)
class APITestCase(TestCase):
def setUp(self):
Expand Down

0 comments on commit bb10746

Please sign in to comment.