Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport smime fix #8390

Merged
merged 3 commits into from
Feb 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

.. _v39-0-2:

39.0.2 - 2023-03-02
~~~~~~~~~~~~~~~~~~~

* Fixed a bug where the content type header was not properly encoded for
PKCS7 signatures when using the ``Text`` option and ``SMIME`` encoding.

.. _v39-0-1:

39.0.1 - 2023-02-07
Expand Down
23 changes: 19 additions & 4 deletions src/cryptography/hazmat/primitives/serialization/pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import email.base64mime
import email.generator
import email.message
import email.policy
import io
import typing

Expand Down Expand Up @@ -177,7 +178,9 @@ def sign(
return rust_pkcs7.sign_and_serialize(self, encoding, options)


def _smime_encode(data: bytes, signature: bytes, micalg: str) -> bytes:
def _smime_encode(
data: bytes, signature: bytes, micalg: str, text_mode: bool
) -> bytes:
# This function works pretty hard to replicate what OpenSSL does
# precisely. For good and for ill.

Expand All @@ -192,9 +195,10 @@ def _smime_encode(data: bytes, signature: bytes, micalg: str) -> bytes:

m.preamble = "This is an S/MIME signed message\n"

msg_part = email.message.MIMEPart()
msg_part = OpenSSLMimePart()
msg_part.set_payload(data)
msg_part.add_header("Content-Type", "text/plain")
if text_mode:
msg_part.add_header("Content-Type", "text/plain")
m.attach(msg_part)

sig_part = email.message.MIMEPart()
Expand All @@ -213,7 +217,18 @@ def _smime_encode(data: bytes, signature: bytes, micalg: str) -> bytes:

fp = io.BytesIO()
g = email.generator.BytesGenerator(
fp, maxheaderlen=0, mangle_from_=False, policy=m.policy
fp,
maxheaderlen=0,
mangle_from_=False,
policy=m.policy.clone(linesep="\r\n"),
)
g.flatten(m)
return fp.getvalue()


class OpenSSLMimePart(email.message.MIMEPart):
# A MIMEPart subclass that replicates OpenSSL's behavior of not including
# a newline if there are no headers.
def _write_headers(self, generator) -> None:
if list(self.raw_items()):
generator._write_headers(self)
123 changes: 81 additions & 42 deletions src/rust/src/pkcs7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,13 @@ fn sign_and_serialize<'p>(
.getattr(crate::intern!(py, "PKCS7Options"))?;

let raw_data = builder.getattr(crate::intern!(py, "_data"))?.extract()?;
let data = if options.contains(pkcs7_options.getattr(crate::intern!(py, "Binary"))?)? {
Cow::Borrowed(raw_data)
} else {
smime_canonicalize(
raw_data,
options.contains(pkcs7_options.getattr(crate::intern!(py, "Text"))?)?,
)
};
let text_mode = options.contains(pkcs7_options.getattr(crate::intern!(py, "Text"))?)?;
let (data_with_header, data_without_header) =
if options.contains(pkcs7_options.getattr(crate::intern!(py, "Binary"))?)? {
(Cow::Borrowed(raw_data), Cow::Borrowed(raw_data))
} else {
smime_canonicalize(raw_data, text_mode)
};

let content_type_bytes = asn1::write_single(&PKCS7_DATA_OID)?;
let signing_time_bytes = asn1::write_single(&x509::certificate::time_from_chrono(
Expand Down Expand Up @@ -180,7 +179,7 @@ fn sign_and_serialize<'p>(
{
(
None,
x509::sign::sign_data(py, py_private_key, py_hash_alg, &data)?,
x509::sign::sign_data(py, py_private_key, py_hash_alg, &data_with_header)?,
)
} else {
let mut authenticated_attrs = vec![];
Expand All @@ -198,7 +197,8 @@ fn sign_and_serialize<'p>(
])),
});

let digest = asn1::write_single(&x509::ocsp::hash_data(py, py_hash_alg, &data)?)?;
let digest =
asn1::write_single(&x509::ocsp::hash_data(py, py_hash_alg, &data_with_header)?)?;
// Gross hack: copy to PyBytes to extend the lifetime to 'p
let digest_bytes = pyo3::types::PyBytes::new(py, &digest);
authenticated_attrs.push(x509::csr::Attribute {
Expand Down Expand Up @@ -264,7 +264,7 @@ fn sign_and_serialize<'p>(
if options.contains(pkcs7_options.getattr(crate::intern!(py, "DetachedSignature"))?)? {
None
} else {
data_tlv_bytes = asn1::write_single(&data.deref())?;
data_tlv_bytes = asn1::write_single(&data_with_header.deref())?;
Some(asn1::parse_single(&data_tlv_bytes).unwrap())
};

Expand All @@ -290,7 +290,7 @@ fn sign_and_serialize<'p>(
content_type: PKCS7_SIGNED_DATA_OID,
content: Some(asn1::parse_single(&signed_data_bytes).unwrap()),
};
let content_info_bytes = asn1::write_single(&content_info)?;
let ci_bytes = asn1::write_single(&content_info)?;

let encoding_class = py
.import("cryptography.hazmat.primitives.serialization")?
Expand All @@ -302,43 +302,49 @@ fn sign_and_serialize<'p>(
.map(|d| OIDS_TO_MIC_NAME[&d.oid])
.collect::<Vec<_>>()
.join(",");
Ok(py
let smime_encode = py
.import("cryptography.hazmat.primitives.serialization.pkcs7")?
.getattr(crate::intern!(py, "_smime_encode"))?
.call1((
pyo3::types::PyBytes::new(py, &data),
pyo3::types::PyBytes::new(py, &content_info_bytes),
mic_algs,
))?
.getattr(crate::intern!(py, "_smime_encode"))?;
Ok(smime_encode
.call1((&*data_without_header, &*ci_bytes, mic_algs, text_mode))?
.extract()?)
} else {
// Handles the DER, PEM, and error cases
encode_der_data(py, "PKCS7".to_string(), content_info_bytes, encoding)
encode_der_data(py, "PKCS7".to_string(), ci_bytes, encoding)
}
}

fn smime_canonicalize(data: &[u8], text_mode: bool) -> Cow<'_, [u8]> {
let mut new_data = vec![];
fn smime_canonicalize(data: &[u8], text_mode: bool) -> (Cow<'_, [u8]>, Cow<'_, [u8]>) {
let mut new_data_with_header = vec![];
let mut new_data_without_header = vec![];
if text_mode {
new_data.extend_from_slice(b"Content-Type: text/plain\r\n\r\n");
new_data_with_header.extend_from_slice(b"Content-Type: text/plain\r\n\r\n");
}

let mut last_idx = 0;
for (i, c) in data.iter().copied().enumerate() {
if c == b'\n' && (i == 0 || data[i - 1] != b'\r') {
new_data.extend_from_slice(&data[last_idx..i]);
new_data.push(b'\r');
new_data.push(b'\n');
new_data_with_header.extend_from_slice(&data[last_idx..i]);
new_data_with_header.push(b'\r');
new_data_with_header.push(b'\n');

new_data_without_header.extend_from_slice(&data[last_idx..i]);
new_data_without_header.push(b'\r');
new_data_without_header.push(b'\n');
last_idx = i + 1;
}
}
// If there's stuff in new_data, that means we need to copy the rest of
// data over.
if !new_data.is_empty() {
new_data.extend_from_slice(&data[last_idx..]);
Cow::Owned(new_data)
if !new_data_with_header.is_empty() {
new_data_with_header.extend_from_slice(&data[last_idx..]);
new_data_without_header.extend_from_slice(&data[last_idx..]);
(
Cow::Owned(new_data_with_header),
Cow::Owned(new_data_without_header),
)
} else {
Cow::Borrowed(data)
(Cow::Borrowed(data), Cow::Borrowed(data))
}
}

Expand All @@ -359,27 +365,60 @@ mod tests {

#[test]
fn test_smime_canonicalize() {
for (input, text_mode, expected, expected_is_borrowed) in [
for (
input,
text_mode,
expected_with_header,
expected_without_header,
expected_is_borrowed,
) in [
// Values with text_mode=false
(b"" as &[u8], false, b"" as &[u8], true),
(b"\n", false, b"\r\n", false),
(b"abc", false, b"abc", true),
(b"abc\r\ndef\n", false, b"abc\r\ndef\r\n", false),
(b"abc\r\n", false, b"abc\r\n", true),
(b"abc\ndef\n", false, b"abc\r\ndef\r\n", false),
(b"" as &[u8], false, b"" as &[u8], b"" as &[u8], true),
(b"\n", false, b"\r\n", b"\r\n", false),
(b"abc", false, b"abc", b"abc", true),
(
b"abc\r\ndef\n",
false,
b"abc\r\ndef\r\n",
b"abc\r\ndef\r\n",
false,
),
(b"abc\r\n", false, b"abc\r\n", b"abc\r\n", true),
(
b"abc\ndef\n",
false,
b"abc\r\ndef\r\n",
b"abc\r\ndef\r\n",
false,
),
// Values with text_mode=true
(b"", true, b"Content-Type: text/plain\r\n\r\n", false),
(b"abc", true, b"Content-Type: text/plain\r\n\r\nabc", false),
(b"", true, b"Content-Type: text/plain\r\n\r\n", b"", false),
(
b"abc",
true,
b"Content-Type: text/plain\r\n\r\nabc",
b"abc",
false,
),
(
b"abc\n",
true,
b"Content-Type: text/plain\r\n\r\nabc\r\n",
b"abc\r\n",
false,
),
] {
let result = smime_canonicalize(input, text_mode);
assert_eq!(result.deref(), expected);
assert_eq!(matches!(result, Cow::Borrowed(_)), expected_is_borrowed);
let (result_with_header, result_without_header) = smime_canonicalize(input, text_mode);
assert_eq!(result_with_header.deref(), expected_with_header);
assert_eq!(result_without_header.deref(), expected_without_header);
assert_eq!(
matches!(result_with_header, Cow::Borrowed(_)),
expected_is_borrowed
);
assert_eq!(
matches!(result_without_header, Cow::Borrowed(_)),
expected_is_borrowed
);
}
}
}
25 changes: 20 additions & 5 deletions tests/hazmat/primitives/test_pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# for complete details.


import email.parser
import os
import typing

Expand Down Expand Up @@ -289,6 +290,7 @@ def test_smime_sign_detached(self, backend):

sig = builder.sign(serialization.Encoding.SMIME, options)
sig_binary = builder.sign(serialization.Encoding.DER, options)
assert b"text/plain" not in sig
# We don't have a generic ASN.1 parser available to us so we instead
# will assert on specific byte sequences being present based on the
# parameters chosen above.
Expand All @@ -298,8 +300,17 @@ def test_smime_sign_detached(self, backend):
# as a separate section before the PKCS7 data. So we should expect to
# have data in sig but not in sig_binary
assert data in sig
# Parse the message to get the signed data, which is the
# first payload in the message
message = email.parser.BytesParser().parsebytes(sig)
signed_data = message.get_payload()[0].get_payload().encode()
_pkcs7_verify(
serialization.Encoding.SMIME, sig, data, [cert], options, backend
serialization.Encoding.SMIME,
sig,
signed_data,
[cert],
options,
backend,
)
assert data not in sig_binary
_pkcs7_verify(
Expand Down Expand Up @@ -492,10 +503,14 @@ def test_sign_text(self, backend):
# The text option adds text/plain headers to the S/MIME message
# These headers are only relevant in SMIME mode, not binary, which is
# just the PKCS7 structure itself.
assert b"text/plain" in sig_pem
# When passing the Text option the header is prepended so the actual
# signed data is this.
signed_data = b"Content-Type: text/plain\r\n\r\nhello world"
assert sig_pem.count(b"text/plain") == 1
assert b"Content-Type: text/plain\r\n\r\nhello world\r\n" in sig_pem
# Parse the message to get the signed data, which is the
# first payload in the message
message = email.parser.BytesParser().parsebytes(sig_pem)
signed_data = message.get_payload()[0].as_bytes(
policy=message.policy.clone(linesep="\r\n")
)
_pkcs7_verify(
serialization.Encoding.SMIME,
sig_pem,
Expand Down