Skip to content

Commit

Permalink
AVRO-3814: Fix schema resolution for records in union types (#2441)
Browse files Browse the repository at this point in the history
* AVRO-3814: Add a minimal test-case to reproduce

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>

* AVRO-3814: Fix schema resolution for records in union types

The logic for validation records in Value::validate_internal() would be
too strict when resolving union types containing a record. This could
lead to a situation where schema resolution would fail because the
correct schema to use for a union type could not be identified.

This commit fixes this by passing a boolean `schema_resolution` to
`Value::validate_internal()` which governs whether schema_resolution
rules should be applied.

* AVRO-3814: Ensure to validate the deserialized value against the schema

* AVRO-3814: Extend test case for validate_record

* AVRO-3814: Revert whitespace changes

* AVRO-3814: Remove confusing comments

* AVRO-3786: Add test-cases and fix for AVRO-3786

* AVRO-3786: Revert change to UnionSchema::find_schema_with_known_schemata

* AVRO-3814: [Rust] Use types::Value::resolve_internal() instead of validate_internal()

... when looking for the matching schema in an union

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>

* AVRO-3814: Revert changes to validate_internal()

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>

* AVRO-3814: Remove obsolete rustdoc for arguments

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>

---------

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
Co-authored-by: Rik Heijdens <r.heijdens@lithic.com>
(cherry picked from commit 598911d)
  • Loading branch information
martin-g committed Aug 15, 2023
1 parent 56f08b8 commit 6e73e52
Show file tree
Hide file tree
Showing 3 changed files with 858 additions and 3 deletions.
130 changes: 128 additions & 2 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,11 @@ impl UnionSchema {
// extend known schemas with just resolved names
collected_names.extend(resolved_names);
let namespace = &schema.namespace().or_else(|| enclosing_namespace.clone());

value
.validate_internal(schema, &collected_names, namespace)
.is_none()
.clone()
.resolve_internal(schema, &collected_names, namespace, &None)
.is_ok()
})
}
}
Expand Down Expand Up @@ -5171,4 +5173,128 @@ mod tests {

Ok(())
}

#[test]
fn test_avro_3814_schema_resolution_failure() -> TestResult {
// Define a reader schema: a nested record with an optional field.
let reader_schema = json!(
{
"type": "record",
"name": "MyOuterRecord",
"fields": [
{
"name": "inner_record",
"type": [
"null",
{
"type": "record",
"name": "MyRecord",
"fields": [
{"name": "a", "type": "string"}
]
}
],
"default": null
}
]
}
);

// Define a writer schema: a nested record with an optional field, which
// may optionally contain an enum.
let writer_schema = json!(
{
"type": "record",
"name": "MyOuterRecord",
"fields": [
{
"name": "inner_record",
"type": [
"null",
{
"type": "record",
"name": "MyRecord",
"fields": [
{"name": "a", "type": "string"},
{
"name": "b",
"type": [
"null",
{
"type": "enum",
"name": "MyEnum",
"symbols": ["A", "B", "C"],
"default": "C"
}
],
"default": null
},
]
}
]
}
],
"default": null
}
);

// Use different structs to represent the "Reader" and the "Writer"
// to mimic two different versions of a producer & consumer application.
#[derive(Serialize, Deserialize, Debug)]
struct MyInnerRecordReader {
a: String,
}

#[derive(Serialize, Deserialize, Debug)]
struct MyRecordReader {
inner_record: Option<MyInnerRecordReader>,
}

#[derive(Serialize, Deserialize, Debug)]
enum MyEnum {
A,
B,
C,
}

#[derive(Serialize, Deserialize, Debug)]
struct MyInnerRecordWriter {
a: String,
b: Option<MyEnum>,
}

#[derive(Serialize, Deserialize, Debug)]
struct MyRecordWriter {
inner_record: Option<MyInnerRecordWriter>,
}

let s = MyRecordWriter {
inner_record: Some(MyInnerRecordWriter {
a: "foo".to_string(),
b: None,
}),
};

// Serialize using the writer schema.
let writer_schema = Schema::parse(&writer_schema)?;
let avro_value = crate::to_value(s)?;
assert!(
avro_value.validate(&writer_schema),
"value is valid for schema",
);
let datum = crate::to_avro_datum(&writer_schema, avro_value)?;

// Now, attempt to deserialize using the reader schema.
let reader_schema = Schema::parse(&reader_schema)?;
let mut x = &datum[..];

// Deserialization should succeed and we should be able to resolve the schema.
let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
assert!(deser_value.validate(&reader_schema));

// Verify that we can read a field from the record.
let d: MyRecordReader = crate::from_value(&deser_value)?;
assert_eq!(d.inner_record.unwrap().a, "foo".to_string());
Ok(())
}
}
4 changes: 3 additions & 1 deletion lang/rust/avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl Value {
}
}

/// Validates the value against the provided schema.
pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
&self,
schema: &Schema,
Expand Down Expand Up @@ -516,6 +517,7 @@ impl Value {
let non_nullable_fields_count =
fields.iter().filter(|&rf| !rf.is_nullable()).count();

// If the record contains fewer fields as required fields by the schema, it is invalid.
if record_fields.len() < non_nullable_fields_count {
return Some(format!(
"The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
Expand Down Expand Up @@ -603,7 +605,7 @@ impl Value {
self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
}

fn resolve_internal(
pub(crate) fn resolve_internal(
mut self,
schema: &Schema,
names: &NamesRef,
Expand Down

0 comments on commit 6e73e52

Please sign in to comment.