-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Multifernet.rotate method #3979
Changes from all commits
c14309d
7bb56bb
bf6ec57
e9e3b80
e5983a2
6a40caf
ea34b0a
fc01194
ef0ab94
ecf768d
331bd36
dd22738
6ba329b
6aae7d9
b4629ee
6bc8daf
414fdb5
5509ef9
054d64d
5ca5794
a6d2dff
cb51edd
0ed4ea8
ef70387
1b08fcd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ cryptographic | |
cryptographically | ||
Debian | ||
decrypt | ||
decrypts | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to register my (continued) displeasure that enchant can't figure out plurals. |
||
Decrypts | ||
decrypted | ||
decrypting | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,11 +71,14 @@ def _encrypt_from_parts(self, data, current_time, iv): | |
return base64.urlsafe_b64encode(basic_parts + hmac) | ||
|
||
def decrypt(self, token, ttl=None): | ||
timestamp, data = Fernet._get_unverified_token_data(token) | ||
return self._decrypt_data(data, timestamp, ttl) | ||
|
||
@staticmethod | ||
def _get_unverified_token_data(token): | ||
if not isinstance(token, bytes): | ||
raise TypeError("token must be bytes.") | ||
|
||
current_time = int(time.time()) | ||
|
||
try: | ||
data = base64.urlsafe_b64decode(token) | ||
except (TypeError, binascii.Error): | ||
|
@@ -88,6 +91,10 @@ def decrypt(self, token, ttl=None): | |
timestamp, = struct.unpack(">Q", data[1:9]) | ||
except struct.error: | ||
raise InvalidToken | ||
return timestamp, data | ||
|
||
def _decrypt_data(self, data, timestamp, ttl): | ||
current_time = int(time.time()) | ||
if ttl is not None: | ||
if timestamp + ttl < current_time: | ||
raise InvalidToken | ||
|
@@ -134,6 +141,20 @@ def __init__(self, fernets): | |
def encrypt(self, msg): | ||
return self._fernets[0].encrypt(msg) | ||
|
||
def rotate(self, msg): | ||
timestamp, data = Fernet._get_unverified_token_data(msg) | ||
for f in self._fernets: | ||
try: | ||
p = f._decrypt_data(data, timestamp, None) | ||
break | ||
except InvalidToken: | ||
pass | ||
else: | ||
raise InvalidToken | ||
|
||
iv = os.urandom(16) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah. Well that's a good reason. I still don't like os.urandom being called in two places, but maybe I need to just be okay with that. |
||
return self._fernets[0]._encrypt_from_parts(p, timestamp, iv) | ||
|
||
def decrypt(self, msg, ttl=None): | ||
for f in self._fernets: | ||
try: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
|
||
import base64 | ||
import calendar | ||
import datetime | ||
import json | ||
import os | ||
import time | ||
|
@@ -156,3 +157,55 @@ def test_no_fernets(self, backend): | |
def test_non_iterable_argument(self, backend): | ||
with pytest.raises(TypeError): | ||
MultiFernet(None) | ||
|
||
def test_rotate(self, backend): | ||
f1 = Fernet(base64.urlsafe_b64encode(b"\x00" * 32), backend=backend) | ||
f2 = Fernet(base64.urlsafe_b64encode(b"\x01" * 32), backend=backend) | ||
|
||
mf1 = MultiFernet([f1]) | ||
mf2 = MultiFernet([f2, f1]) | ||
|
||
plaintext = b"abc" | ||
mf1_ciphertext = mf1.encrypt(plaintext) | ||
|
||
assert mf2.decrypt(mf1_ciphertext) == plaintext | ||
|
||
rotated = mf2.rotate(mf1_ciphertext) | ||
|
||
assert rotated != mf1_ciphertext | ||
assert mf2.decrypt(rotated) == plaintext | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add an asseriotn that |
||
|
||
with pytest.raises(InvalidToken): | ||
mf1.decrypt(rotated) | ||
|
||
def test_rotate_preserves_timestamp(self, backend, monkeypatch): | ||
f1 = Fernet(base64.urlsafe_b64encode(b"\x00" * 32), backend=backend) | ||
f2 = Fernet(base64.urlsafe_b64encode(b"\x01" * 32), backend=backend) | ||
|
||
mf1 = MultiFernet([f1]) | ||
mf2 = MultiFernet([f2, f1]) | ||
|
||
plaintext = b"abc" | ||
mf1_ciphertext = mf1.encrypt(plaintext) | ||
|
||
later = datetime.datetime.now() + datetime.timedelta(minutes=5) | ||
later_time = time.mktime(later.timetuple()) | ||
monkeypatch.setattr(time, "time", lambda: later_time) | ||
|
||
original_time, _ = Fernet._get_unverified_token_data(mf1_ciphertext) | ||
rotated_time, _ = Fernet._get_unverified_token_data( | ||
mf2.rotate(mf1_ciphertext) | ||
) | ||
|
||
assert later_time != rotated_time | ||
assert original_time == rotated_time | ||
|
||
def test_rotate_decrypt_no_shared_keys(self, backend): | ||
f1 = Fernet(base64.urlsafe_b64encode(b"\x00" * 32), backend=backend) | ||
f2 = Fernet(base64.urlsafe_b64encode(b"\x01" * 32), backend=backend) | ||
|
||
mf1 = MultiFernet([f1]) | ||
mf2 = MultiFernet([f2]) | ||
|
||
with pytest.raises(InvalidToken): | ||
mf2.rotate(mf1.encrypt(b"abc")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This language still bugs me but I don't have constructive improvement to offer.