Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate PKCS7 backend to Rust #10228

Merged
merged 3 commits into from Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 1 addition & 56 deletions src/cryptography/hazmat/backends/openssl/backend.py
Expand Up @@ -10,7 +10,7 @@
import typing

from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl import aead
from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
Expand Down Expand Up @@ -863,61 +863,6 @@ def poly1305_supported(self) -> bool:
def pkcs7_supported(self) -> bool:
return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL

def load_pem_pkcs7_certificates(
self, data: bytes
) -> list[x509.Certificate]:
utils._check_bytes("data", data)
bio = self._bytes_to_bio(data)
p7 = self._lib.PEM_read_bio_PKCS7(
bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
)
if p7 == self._ffi.NULL:
self._consume_errors()
raise ValueError("Unable to parse PKCS7 data")

p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
return self._load_pkcs7_certificates(p7)

def load_der_pkcs7_certificates(
self, data: bytes
) -> list[x509.Certificate]:
utils._check_bytes("data", data)
bio = self._bytes_to_bio(data)
p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
if p7 == self._ffi.NULL:
self._consume_errors()
raise ValueError("Unable to parse PKCS7 data")

p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
return self._load_pkcs7_certificates(p7)

def _load_pkcs7_certificates(self, p7) -> list[x509.Certificate]:
nid = self._lib.OBJ_obj2nid(p7.type)
self.openssl_assert(nid != self._lib.NID_undef)
if nid != self._lib.NID_pkcs7_signed:
raise UnsupportedAlgorithm(
"Only basic signed structures are currently supported. NID"
f" for this data was {nid}",
_Reasons.UNSUPPORTED_SERIALIZATION,
)

if p7.d.sign == self._ffi.NULL:
raise ValueError(
"The provided PKCS7 has no certificate data, but a cert "
"loading method was called."
)

sk_x509 = p7.d.sign.cert
num = self._lib.sk_X509_num(sk_x509)
certs: list[x509.Certificate] = []
for i in range(num):
x509 = self._lib.sk_X509_value(sk_x509, i)
self.openssl_assert(x509 != self._ffi.NULL)
cert = self._ossl2cert(x509)
certs.append(cert)

return certs


class GetCipherByName:
def __init__(self, fmt: str):
Expand Down
6 changes: 6 additions & 0 deletions src/cryptography/hazmat/bindings/_rust/pkcs7.pyi
Expand Up @@ -13,3 +13,9 @@ def sign_and_serialize(
encoding: serialization.Encoding,
options: typing.Iterable[pkcs7.PKCS7Options],
) -> bytes: ...
def load_pem_pkcs7_certificates(
data: bytes,
) -> list[x509.Certificate]: ...
def load_der_pkcs7_certificates(
data: bytes,
) -> list[x509.Certificate]: ...
14 changes: 2 additions & 12 deletions src/cryptography/hazmat/primitives/serialization/pkcs7.py
Expand Up @@ -17,22 +17,12 @@
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from cryptography.utils import _check_byteslike

load_pem_pkcs7_certificates = rust_pkcs7.load_pem_pkcs7_certificates

def load_pem_pkcs7_certificates(data: bytes) -> list[x509.Certificate]:
from cryptography.hazmat.backends.openssl.backend import backend

return backend.load_pem_pkcs7_certificates(data)


def load_der_pkcs7_certificates(data: bytes) -> list[x509.Certificate]:
from cryptography.hazmat.backends.openssl.backend import backend

return backend.load_der_pkcs7_certificates(data)

load_der_pkcs7_certificates = rust_pkcs7.load_der_pkcs7_certificates

serialize_certificates = rust_pkcs7.serialize_certificates


PKCS7HashTypes = typing.Union[
hashes.SHA224,
hashes.SHA256,
Expand Down
2 changes: 1 addition & 1 deletion src/rust/cryptography-openssl/Cargo.toml
Expand Up @@ -9,6 +9,6 @@ rust-version = "1.63.0"

[dependencies]
openssl = "0.10.63"
ffi = { package = "openssl-sys", version = "0.9.91" }
ffi = { package = "openssl-sys", version = "0.9.99" }
foreign-types = "0.3"
foreign-types-shared = "0.1"
96 changes: 94 additions & 2 deletions src/rust/src/pkcs7.rs
Expand Up @@ -9,11 +9,17 @@ use std::ops::Deref;
use cryptography_x509::csr::Attribute;
use cryptography_x509::{common, oid, pkcs7};
use once_cell::sync::Lazy;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use openssl::pkcs7::Pkcs7;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use pyo3::IntoPy;

use crate::asn1::encode_der_data;
use crate::buf::CffiBuf;
use crate::error::CryptographyResult;
use crate::{types, x509};
use crate::error::{CryptographyError, CryptographyResult};
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use crate::x509::certificate::load_der_x509_certificate;
use crate::{exceptions, types, x509};

const PKCS7_CONTENT_TYPE_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 3);
const PKCS7_MESSAGE_DIGEST_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 4);
Expand Down Expand Up @@ -290,11 +296,97 @@ fn smime_canonicalize(data: &[u8], text_mode: bool) -> (Cow<'_, [u8]>, Cow<'_, [
}
}

#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
fn load_pkcs7_certificates(
py: pyo3::Python<'_>,
pkcs7: Pkcs7,
) -> CryptographyResult<Vec<x509::certificate::Certificate>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit: Instead of returning a Vec, I'd suggest the &pyo3::types::PyList. This avoids a copy+malloc round trip.

See verify.rs l239 for an example of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

let nid = pkcs7.type_().map(|t| t.nid());
if nid != Some(openssl::nid::Nid::PKCS7_SIGNED) {
let nid_string = nid.map_or("empty".to_string(), |n| n.as_raw().to_string());
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
format!("Only basic signed structures are currently supported. NID for this data was {}", nid_string),
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
));
}

let signed_certificates = pkcs7.signed().and_then(|x| x.certificates());
match signed_certificates {
None => Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The provided PKCS7 has no certificate data, but a cert loading method was called.",
),
)),
Some(c) => c
.iter()
.map(|c| {
load_der_x509_certificate(
py,
pyo3::types::PyBytes::new(py, c.to_der()?.as_slice()).into_py(py),
None,
)
})
.collect(),
}
}

#[pyo3::prelude::pyfunction]
fn load_pem_pkcs7_certificates(
py: pyo3::Python<'_>,
data: &[u8],
) -> CryptographyResult<Vec<x509::certificate::Certificate>> {
cfg_if::cfg_if! {
if #[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] {
let pkcs7_decoded = openssl::pkcs7::Pkcs7::from_pem(data).map_err(|_| {
CryptographyError::from(pyo3::exceptions::PyValueError::new_err(
"Unable to parse PKCS7 data",
))
})?;
load_pkcs7_certificates(py, pkcs7_decoded)
} else {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"PKCS#7 is not supported by this backend.",
exceptions::Reasons::BACKEND_MISSING_INTERFACE,
)),
));
}
}
}

#[pyo3::prelude::pyfunction]
fn load_der_pkcs7_certificates(
py: pyo3::Python<'_>,
data: &[u8],
) -> CryptographyResult<Vec<x509::certificate::Certificate>> {
cfg_if::cfg_if! {
if #[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] {
let pkcs7_decoded = openssl::pkcs7::Pkcs7::from_der(data).map_err(|_| {
CryptographyError::from(pyo3::exceptions::PyValueError::new_err(
"Unable to parse PKCS7 data",
))
})?;
load_pkcs7_certificates(py, pkcs7_decoded)
} else {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"PKCS#7 is not supported by this backend.",
exceptions::Reasons::BACKEND_MISSING_INTERFACE,
)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use UNSUPPORTED_SERIALIZATION here I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

));
}
}
}

pub(crate) fn create_submodule(py: pyo3::Python<'_>) -> pyo3::PyResult<&pyo3::prelude::PyModule> {
let submod = pyo3::prelude::PyModule::new(py, "pkcs7")?;

submod.add_function(pyo3::wrap_pyfunction!(serialize_certificates, submod)?)?;
submod.add_function(pyo3::wrap_pyfunction!(sign_and_serialize, submod)?)?;
submod.add_function(pyo3::wrap_pyfunction!(load_pem_pkcs7_certificates, submod)?)?;
submod.add_function(pyo3::wrap_pyfunction!(load_der_pkcs7_certificates, submod)?)?;

Ok(submod)
}
Expand Down
2 changes: 1 addition & 1 deletion src/rust/src/x509/certificate.rs
Expand Up @@ -378,7 +378,7 @@ fn load_pem_x509_certificates(
}

#[pyo3::prelude::pyfunction]
fn load_der_x509_certificate(
pub(crate) fn load_der_x509_certificate(
py: pyo3::Python<'_>,
data: pyo3::Py<pyo3::types::PyBytes>,
backend: Option<&pyo3::PyAny>,
Expand Down
13 changes: 13 additions & 0 deletions tests/hazmat/primitives/test_pkcs7.py
Expand Up @@ -922,3 +922,16 @@ def test_invalid_types(self):
certs,
"not an encoding", # type: ignore[arg-type]
)


@pytest.mark.supported(
only_if=lambda backend: not backend.pkcs7_supported(),
skip_message="Requires OpenSSL without PKCS7 support (BoringSSL)",
)
class TestPKCS7Unsupported:
def test_pkcs7_functions_unsupported(self):
with raises_unsupported_algorithm(_Reasons.BACKEND_MISSING_INTERFACE):
pkcs7.load_der_pkcs7_certificates(b"nonsense")

with raises_unsupported_algorithm(_Reasons.BACKEND_MISSING_INTERFACE):
pkcs7.load_pem_pkcs7_certificates(b"nonsense")