Skip to content

Commit

Permalink
chore: refactor for removing pdb symbols
Browse files Browse the repository at this point in the history
  • Loading branch information
peppelinux committed Nov 29, 2023
1 parent 41fb08a commit e83a0bc
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 113 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ cscope.out
.tox
.coverage
*.egg-info
env
28 changes: 14 additions & 14 deletions jwcrypto/jwa.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,10 +870,10 @@ def verify(self, key, payload, signature):

class _RawJWE:

def encrypt(self, k, a, m):
def encrypt(self, k, aad, m):
raise NotImplementedError

def decrypt(self, k, a, iv, e, t):
def decrypt(self, k, aad, iv, e, t):
raise NotImplementedError


Expand All @@ -887,18 +887,18 @@ def __init__(self, hashfn):
self.blocksize = algorithms.AES.block_size
self.wrap_key_size = self.keysize * 2

def _mac(self, k, a, iv, e):
al = _encode_int(_bitsize(a), 64)
def _mac(self, k, ai, iv, e):
al = _encode_int(_bitsize(ai), 64)
h = hmac.HMAC(k, self.hashfn, backend=self.backend)
h.update(a)
h.update(ai)
h.update(iv)
h.update(e)
h.update(al)
m = h.finalize()
return m[:_inbytes(self.keysize)]

# RFC 7518 - 5.2.2
def encrypt(self, k, a, m):
def encrypt(self, k, ai, m):
""" Encrypt according to the selected encryption and hashing
functions.
Expand All @@ -924,11 +924,11 @@ def encrypt(self, k, a, m):
e = encryptor.update(padded_data) + encryptor.finalize()

# mac
t = self._mac(hkey, a, iv, e)
t = self._mac(hkey, ai, iv, e)

return (iv, e, t)

def decrypt(self, k, a, iv, e, t):
def decrypt(self, k, ai, iv, e, t):
""" Decrypt according to the selected encryption and hashing
functions.
:param k: Encryption key
Expand All @@ -946,7 +946,7 @@ def decrypt(self, k, a, iv, e, t):
dkey = k[_inbytes(self.keysize):]

# verify mac
if not constant_time.bytes_eq(t, self._mac(hkey, a, iv, e)):
if not constant_time.bytes_eq(t, self._mac(hkey, ai, iv, e)):
raise InvalidSignature('Failed to verify MAC')

# decrypt
Expand Down Expand Up @@ -1003,7 +1003,7 @@ def __init__(self):
self.wrap_key_size = self.keysize

# RFC 7518 - 5.3
def encrypt(self, k, a, m):
def encrypt(self, k, ai, m):
""" Encrypt according to the selected encryption and hashing
functions.
Expand All @@ -1017,16 +1017,16 @@ def encrypt(self, k, a, m):
cipher = Cipher(algorithms.AES(k), modes.GCM(iv),
backend=self.backend)
encryptor = cipher.encryptor()
encryptor.authenticate_additional_data(a)
encryptor.authenticate_additional_data(ai)
e = encryptor.update(m) + encryptor.finalize()

return (iv, e, encryptor.tag)

def decrypt(self, k, a, iv, e, t):
def decrypt(self, k, ai, iv, e, t):
""" Decrypt according to the selected encryption and hashing
functions.
:param k: Encryption key
:param a: Additional Authenticated Data
:param ai: Additional Authenticated Data
:param iv: Initialization Vector
:param e: Ciphertext
:param t: Authentication Tag
Expand All @@ -1036,7 +1036,7 @@ def decrypt(self, k, a, iv, e, t):
cipher = Cipher(algorithms.AES(k), modes.GCM(iv, t),
backend=self.backend)
decryptor = cipher.decryptor()
decryptor.authenticate_additional_data(a)
decryptor.authenticate_additional_data(ai)
return decryptor.update(e) + decryptor.finalize()


Expand Down
26 changes: 13 additions & 13 deletions jwcrypto/jwe.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,17 +525,17 @@ def deserialize(self, raw_jwe, key=None):
o['header'] = json_encode(djwe['header'])

except ValueError as e:
c = raw_jwe.split('.')
if len(c) != 5:
data = raw_jwe.split('.')
if len(data) != 5:
raise InvalidJWEData() from e
p = base64url_decode(c[0])
p = base64url_decode(data[0])
o['protected'] = p.decode('utf-8')
ekey = base64url_decode(c[1])
ekey = base64url_decode(data[1])
if ekey != b'':
o['encrypted_key'] = base64url_decode(c[1])
o['iv'] = base64url_decode(c[2])
o['ciphertext'] = base64url_decode(c[3])
o['tag'] = base64url_decode(c[4])
o['encrypted_key'] = base64url_decode(data[1])
o['iv'] = base64url_decode(data[2])
o['ciphertext'] = base64url_decode(data[3])
o['tag'] = base64url_decode(data[4])

self.objects = o

Expand Down Expand Up @@ -581,11 +581,11 @@ def __eq__(self, other):
try:
return self.serialize() == other.serialize()
except Exception: # pylint: disable=broad-except
a = {'plaintext': self.plaintext}
a.update(self.objects)
b = {'plaintext': other.plaintext}
b.update(other.objects)
return a == b
data1 = {'plaintext': self.plaintext}
data1.update(self.objects)
data2 = {'plaintext': other.plaintext}
data2.update(other.objects)
return data1 == data2

def __str__(self):
try:
Expand Down
17 changes: 9 additions & 8 deletions jwcrypto/jwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,11 @@ def import_key(self, **kwargs):
# check key_ops
if 'key_ops' in newkey:
for ko in newkey['key_ops']:
c = 0
cnt = 0
for cko in newkey['key_ops']:
if ko == cko:
c += 1
if c != 1:
cnt += 1
if cnt != 1:
raise InvalidJWKValue('Duplicate values in "key_ops"')

# check use/key_ops consistency
Expand Down Expand Up @@ -1028,26 +1028,26 @@ def export_to_pem(self, private_key=False, password=False):
:return: A serialized bytes buffer containing a PEM formatted key.
:rtype: `bytes`
"""
e = serialization.Encoding.PEM
enc = serialization.Encoding.PEM
if private_key:
if not self.has_private:
raise InvalidJWKType("No private key available")
f = serialization.PrivateFormat.PKCS8
if password is None:
a = serialization.NoEncryption()
enc_alg = serialization.NoEncryption()
elif isinstance(password, bytes):
a = serialization.BestAvailableEncryption(password)
enc_alg = serialization.BestAvailableEncryption(password)
elif password is False:
raise ValueError("The password must be None or a bytes string")
else:
raise TypeError("The password string must be bytes")
return self._get_private_key().private_bytes(
encoding=e, format=f, encryption_algorithm=a)
encoding=enc, format=f, encryption_algorithm=enc_alg)
else:
if not self.has_public:
raise InvalidJWKType("No public key available")
f = serialization.PublicFormat.SubjectPublicKeyInfo
return self._get_public_key().public_bytes(encoding=e, format=f)
return self._get_public_key().public_bytes(encoding=enc, format=f)

@classmethod
def from_pyca(cls, key):
Expand Down Expand Up @@ -1280,6 +1280,7 @@ class JWKSet(dict):
Creates a special key 'keys' that is of a type derived from 'set'
The 'keys' attribute accepts only :class:`jwcrypto.jwk.JWK` elements.
"""

def __init__(self, *args, **kwargs):
super(JWKSet, self).__init__()
super(JWKSet, self).__setitem__('keys', _JWKkeys())
Expand Down
33 changes: 19 additions & 14 deletions jwcrypto/jws.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,18 +279,20 @@ def _verify(self, alg, key, payload, signature, protected, header=None):
raise InvalidJWSSignature('No "alg" in headers')
if alg:
if 'alg' in p and alg != p['alg']:
raise InvalidJWSSignature('"alg" mismatch, requested '
'"%s", found "%s"' % (alg,
p['alg']))
a = alg
raise InvalidJWSSignature(
'"alg" mismatch, requested'
f''' "{alg}", found "{p['alg']}"'''
)
resulting_alg = alg
else:
a = p['alg']
resulting_alg = p['alg']

# the following will verify the "alg" is supported and the signature
# verifies
if isinstance(key, JWK):
c = JWSCore(a, key, protected, payload, self._allowed_algs)
c.verify(signature)
signer = JWSCore(resulting_alg, key, protected,
payload, self._allowed_algs)
signer.verify(signature)
self.verifylog.append("Success")
elif isinstance(key, JWKSet):
keys = key
Expand All @@ -303,8 +305,11 @@ def _verify(self, alg, key, payload, signature, protected, header=None):

for k in keys:
try:
c = JWSCore(a, k, protected, payload, self._allowed_algs)
c.verify(signature)
signer2 = JWSCore(
resulting_alg, k, protected,
payload, self._allowed_algs
)
signer2.verify(signature)
self.verifylog.append("Success")
break
except Exception as e: # pylint: disable=broad-except
Expand Down Expand Up @@ -455,16 +460,16 @@ def deserialize(self, raw_jws, key=None, alg=None):
o['payload'] = djws['payload']

except ValueError:
c = raw_jws.split('.')
if len(c) != 3:
data = raw_jws.split('.')
if len(data) != 3:
raise InvalidJWSObject('Unrecognized'
' representation') from None
p = base64url_decode(str(c[0]))
p = base64url_decode(str(data[0]))
if len(p) > 0:
o['protected'] = p.decode('utf-8')
self._deserialize_b64(o, o['protected'])
o['payload'] = base64url_decode(str(c[1]))
o['signature'] = base64url_decode(str(c[2]))
o['payload'] = base64url_decode(str(data[1]))
o['signature'] = base64url_decode(str(data[2]))

self.objects = o

Expand Down
20 changes: 10 additions & 10 deletions jwcrypto/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,20 @@ def claims(self):
return self._claims

@claims.setter
def claims(self, c):
if not isinstance(c, dict):
def claims(self, data):
if not isinstance(data, dict):
if not self._reg_claims:
# no default_claims, can return immediately
self._claims = c
self._claims = data
return
c = json_decode(c)
data = json_decode(data)
else:
# _add_default_claims modifies its argument
# so we must always copy it.
c = copy.deepcopy(c)
data = copy.deepcopy(data)

self._add_default_claims(c)
self._claims = json_encode(c)
self._add_default_claims(data)
self._claims = json_encode(data)

@property
def token(self):
Expand Down Expand Up @@ -661,10 +661,10 @@ def deserialize(self, jwt, key=None):
decryption key, or a (:class:`jwcrypto.jwk.JWKSet`) that
contains a key indexed by the 'kid' header.
"""
c = jwt.count('.')
if c == 2:
data = jwt.count('.')
if data == 2:
self.token = JWS()
elif c == 4:
elif data == 4:
self.token = JWE()
else:
raise ValueError("Token format unrecognized")
Expand Down

0 comments on commit e83a0bc

Please sign in to comment.