Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor for removing pdb symbols #330

Merged
merged 2 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ cscope.out
.tox
.coverage
*.egg-info
env
34 changes: 17 additions & 17 deletions jwcrypto/jwa.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,10 +870,10 @@ def verify(self, key, payload, signature):

class _RawJWE:

def encrypt(self, k, a, m):
def encrypt(self, k, aad, m):
raise NotImplementedError

def decrypt(self, k, a, iv, e, t):
def decrypt(self, k, aad, iv, e, t):
raise NotImplementedError


Expand All @@ -887,23 +887,23 @@ def __init__(self, hashfn):
self.blocksize = algorithms.AES.block_size
self.wrap_key_size = self.keysize * 2

def _mac(self, k, a, iv, e):
al = _encode_int(_bitsize(a), 64)
def _mac(self, k, aad, iv, e):
al = _encode_int(_bitsize(aad), 64)
h = hmac.HMAC(k, self.hashfn, backend=self.backend)
h.update(a)
h.update(aad)
h.update(iv)
h.update(e)
h.update(al)
m = h.finalize()
return m[:_inbytes(self.keysize)]

# RFC 7518 - 5.2.2
def encrypt(self, k, a, m):
def encrypt(self, k, aad, m):
""" Encrypt according to the selected encryption and hashing
functions.

:param k: Encryption key
:param a: Additional Authentication Data
:param aad: Additional Authentication Data
:param m: Plaintext

Returns a dictionary with the computed data.
Expand All @@ -924,15 +924,15 @@ def encrypt(self, k, a, m):
e = encryptor.update(padded_data) + encryptor.finalize()

# mac
t = self._mac(hkey, a, iv, e)
t = self._mac(hkey, aad, iv, e)

return (iv, e, t)

def decrypt(self, k, a, iv, e, t):
def decrypt(self, k, aad, iv, e, t):
""" Decrypt according to the selected encryption and hashing
functions.
:param k: Encryption key
:param a: Additional Authenticated Data
:param aad: Additional Authenticated Data
:param iv: Initialization Vector
:param e: Ciphertext
:param t: Authentication Tag
Expand All @@ -946,7 +946,7 @@ def decrypt(self, k, a, iv, e, t):
dkey = k[_inbytes(self.keysize):]

# verify mac
if not constant_time.bytes_eq(t, self._mac(hkey, a, iv, e)):
if not constant_time.bytes_eq(t, self._mac(hkey, aad, iv, e)):
raise InvalidSignature('Failed to verify MAC')

# decrypt
Expand Down Expand Up @@ -1003,12 +1003,12 @@ def __init__(self):
self.wrap_key_size = self.keysize

# RFC 7518 - 5.3
def encrypt(self, k, a, m):
def encrypt(self, k, aad, m):
""" Encrypt according to the selected encryption and hashing
functions.

:param k: Encryption key
:param a: Additional Authentication Data
:param aad: Additional Authentication Data
:param m: Plaintext

Returns a dictionary with the computed data.
Expand All @@ -1017,16 +1017,16 @@ def encrypt(self, k, a, m):
cipher = Cipher(algorithms.AES(k), modes.GCM(iv),
backend=self.backend)
encryptor = cipher.encryptor()
encryptor.authenticate_additional_data(a)
encryptor.authenticate_additional_data(aad)
e = encryptor.update(m) + encryptor.finalize()

return (iv, e, encryptor.tag)

def decrypt(self, k, a, iv, e, t):
def decrypt(self, k, aad, iv, e, t):
""" Decrypt according to the selected encryption and hashing
functions.
:param k: Encryption key
:param a: Additional Authenticated Data
:param aad: Additional Authenticated Data
:param iv: Initialization Vector
:param e: Ciphertext
:param t: Authentication Tag
Expand All @@ -1036,7 +1036,7 @@ def decrypt(self, k, a, iv, e, t):
cipher = Cipher(algorithms.AES(k), modes.GCM(iv, t),
backend=self.backend)
decryptor = cipher.decryptor()
decryptor.authenticate_additional_data(a)
decryptor.authenticate_additional_data(aad)
return decryptor.update(e) + decryptor.finalize()


Expand Down
26 changes: 13 additions & 13 deletions jwcrypto/jwe.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,17 +525,17 @@ def deserialize(self, raw_jwe, key=None):
o['header'] = json_encode(djwe['header'])

except ValueError as e:
c = raw_jwe.split('.')
if len(c) != 5:
data = raw_jwe.split('.')
if len(data) != 5:
raise InvalidJWEData() from e
p = base64url_decode(c[0])
p = base64url_decode(data[0])
o['protected'] = p.decode('utf-8')
ekey = base64url_decode(c[1])
ekey = base64url_decode(data[1])
if ekey != b'':
o['encrypted_key'] = base64url_decode(c[1])
o['iv'] = base64url_decode(c[2])
o['ciphertext'] = base64url_decode(c[3])
o['tag'] = base64url_decode(c[4])
o['encrypted_key'] = base64url_decode(data[1])
o['iv'] = base64url_decode(data[2])
o['ciphertext'] = base64url_decode(data[3])
o['tag'] = base64url_decode(data[4])

self.objects = o

Expand Down Expand Up @@ -581,11 +581,11 @@ def __eq__(self, other):
try:
return self.serialize() == other.serialize()
except Exception: # pylint: disable=broad-except
a = {'plaintext': self.plaintext}
a.update(self.objects)
b = {'plaintext': other.plaintext}
b.update(other.objects)
return a == b
data1 = {'plaintext': self.plaintext}
data1.update(self.objects)
data2 = {'plaintext': other.plaintext}
data2.update(other.objects)
return data1 == data2

def __str__(self):
try:
Expand Down
17 changes: 9 additions & 8 deletions jwcrypto/jwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,11 @@ def import_key(self, **kwargs):
# check key_ops
if 'key_ops' in newkey:
for ko in newkey['key_ops']:
c = 0
cnt = 0
for cko in newkey['key_ops']:
if ko == cko:
c += 1
if c != 1:
cnt += 1
if cnt != 1:
raise InvalidJWKValue('Duplicate values in "key_ops"')

# check use/key_ops consistency
Expand Down Expand Up @@ -1028,26 +1028,26 @@ def export_to_pem(self, private_key=False, password=False):
:return: A serialized bytes buffer containing a PEM formatted key.
:rtype: `bytes`
"""
e = serialization.Encoding.PEM
enc = serialization.Encoding.PEM
if private_key:
if not self.has_private:
raise InvalidJWKType("No private key available")
f = serialization.PrivateFormat.PKCS8
if password is None:
a = serialization.NoEncryption()
peppelinux marked this conversation as resolved.
Show resolved Hide resolved
enc_alg = serialization.NoEncryption()
elif isinstance(password, bytes):
a = serialization.BestAvailableEncryption(password)
enc_alg = serialization.BestAvailableEncryption(password)
elif password is False:
raise ValueError("The password must be None or a bytes string")
else:
raise TypeError("The password string must be bytes")
return self._get_private_key().private_bytes(
encoding=e, format=f, encryption_algorithm=a)
encoding=enc, format=f, encryption_algorithm=enc_alg)
else:
if not self.has_public:
raise InvalidJWKType("No public key available")
f = serialization.PublicFormat.SubjectPublicKeyInfo
return self._get_public_key().public_bytes(encoding=e, format=f)
return self._get_public_key().public_bytes(encoding=enc, format=f)

@classmethod
def from_pyca(cls, key):
Expand Down Expand Up @@ -1280,6 +1280,7 @@ class JWKSet(dict):
Creates a special key 'keys' that is of a type derived from 'set'
The 'keys' attribute accepts only :class:`jwcrypto.jwk.JWK` elements.
"""

def __init__(self, *args, **kwargs):
super(JWKSet, self).__init__()
super(JWKSet, self).__setitem__('keys', _JWKkeys())
Expand Down
33 changes: 19 additions & 14 deletions jwcrypto/jws.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,18 +279,20 @@ def _verify(self, alg, key, payload, signature, protected, header=None):
raise InvalidJWSSignature('No "alg" in headers')
if alg:
if 'alg' in p and alg != p['alg']:
raise InvalidJWSSignature('"alg" mismatch, requested '
'"%s", found "%s"' % (alg,
p['alg']))
a = alg
raise InvalidJWSSignature(
'"alg" mismatch, requested'
f''' "{alg}", found "{p['alg']}"'''
)
resulting_alg = alg
else:
a = p['alg']
resulting_alg = p['alg']

# the following will verify the "alg" is supported and the signature
# verifies
if isinstance(key, JWK):
c = JWSCore(a, key, protected, payload, self._allowed_algs)
c.verify(signature)
signer = JWSCore(resulting_alg, key, protected,
payload, self._allowed_algs)
signer.verify(signature)
self.verifylog.append("Success")
elif isinstance(key, JWKSet):
keys = key
Expand All @@ -303,8 +305,11 @@ def _verify(self, alg, key, payload, signature, protected, header=None):

for k in keys:
try:
c = JWSCore(a, k, protected, payload, self._allowed_algs)
c.verify(signature)
signer2 = JWSCore(
resulting_alg, k, protected,
payload, self._allowed_algs
)
signer2.verify(signature)
self.verifylog.append("Success")
break
except Exception as e: # pylint: disable=broad-except
Expand Down Expand Up @@ -455,16 +460,16 @@ def deserialize(self, raw_jws, key=None, alg=None):
o['payload'] = djws['payload']

except ValueError:
c = raw_jws.split('.')
if len(c) != 3:
data = raw_jws.split('.')
if len(data) != 3:
raise InvalidJWSObject('Unrecognized'
' representation') from None
p = base64url_decode(str(c[0]))
p = base64url_decode(str(data[0]))
if len(p) > 0:
o['protected'] = p.decode('utf-8')
self._deserialize_b64(o, o['protected'])
o['payload'] = base64url_decode(str(c[1]))
o['signature'] = base64url_decode(str(c[2]))
o['payload'] = base64url_decode(str(data[1]))
o['signature'] = base64url_decode(str(data[2]))

self.objects = o

Expand Down
20 changes: 10 additions & 10 deletions jwcrypto/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,20 @@ def claims(self):
return self._claims

@claims.setter
def claims(self, c):
if not isinstance(c, dict):
def claims(self, data):
if not isinstance(data, dict):
if not self._reg_claims:
# no default_claims, can return immediately
self._claims = c
self._claims = data
return
c = json_decode(c)
data = json_decode(data)
else:
# _add_default_claims modifies its argument
# so we must always copy it.
c = copy.deepcopy(c)
data = copy.deepcopy(data)

self._add_default_claims(c)
self._claims = json_encode(c)
self._add_default_claims(data)
self._claims = json_encode(data)

@property
def token(self):
Expand Down Expand Up @@ -661,10 +661,10 @@ def deserialize(self, jwt, key=None):
decryption key, or a (:class:`jwcrypto.jwk.JWKSet`) that
contains a key indexed by the 'kid' header.
"""
c = jwt.count('.')
if c == 2:
data = jwt.count('.')
if data == 2:
self.token = JWS()
elif c == 4:
elif data == 4:
self.token = JWE()
else:
raise ValueError("Token format unrecognized")
Expand Down