Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve C Data Interface and Add Integration Testing Entrypoints #5080

Merged
merged 10 commits into from Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion arrow-data/src/ffi.rs
Expand Up @@ -168,6 +168,12 @@ impl FFI_ArrowArray {
.collect::<Box<_>>();
let n_children = children.len() as i64;

// As in the IPC format, emit null_count = length for Null type
let null_count = match data.data_type() {
DataType::Null => data.len(),
_ => data.null_count(),
};

// create the private data owning everything.
// any other data must be added here, e.g. via a struct, to track lifetime.
let mut private_data = Box::new(ArrayPrivateData {
Expand All @@ -179,7 +185,7 @@ impl FFI_ArrowArray {

Self {
length: data.len() as i64,
null_count: data.null_count() as i64,
null_count: null_count as i64,
offset: data.offset() as i64,
n_buffers,
n_children,
Expand Down
5 changes: 4 additions & 1 deletion arrow-integration-testing/Cargo.toml
Expand Up @@ -27,11 +27,14 @@ edition = { workspace = true }
publish = false
rust-version = { workspace = true }

[lib]
crate-type = ["lib", "cdylib"]

[features]
logging = ["tracing-subscriber"]

[dependencies]
arrow = { path = "../arrow", default-features = false, features = ["test_utils", "ipc", "ipc_compression", "json"] }
arrow = { path = "../arrow", default-features = false, features = ["test_utils", "ipc", "ipc_compression", "json", "ffi"] }
arrow-flight = { path = "../arrow-flight", default-features = false }
arrow-buffer = { path = "../arrow-buffer", default-features = false }
arrow-integration-test = { path = "../arrow-integration-test", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion arrow-integration-testing/README.md
Expand Up @@ -48,7 +48,7 @@ ln -s <path_to_arrow_rs> arrow/rust

```shell
cd arrow
pip install -e dev/archery[docker]
pip install -e dev/archery[integration]
```

### Build the C++ binaries:
Expand Down
41 changes: 1 addition & 40 deletions arrow-integration-testing/src/bin/arrow-json-integration-test.rs
Expand Up @@ -15,16 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{DataType, Field};
use arrow::datatypes::{Fields, Schema};
use arrow::error::{ArrowError, Result};
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::FileWriter;
use arrow_integration_test::*;
use arrow_integration_testing::read_json_file;
use arrow_integration_testing::{canonicalize_schema, read_json_file};
use clap::Parser;
use std::fs::File;
use std::sync::Arc;

#[derive(clap::ValueEnum, Debug, Clone)]
#[clap(rename_all = "SCREAMING_SNAKE_CASE")]
Expand Down Expand Up @@ -113,42 +110,6 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>
Ok(())
}

fn canonicalize_schema(schema: &Schema) -> Schema {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Map(child_field, sorted) => match child_field.data_type() {
DataType::Struct(fields) if fields.len() == 2 => {
let first_field = fields.get(0).unwrap();
let key_field =
Arc::new(Field::new("key", first_field.data_type().clone(), false));
let second_field = fields.get(1).unwrap();
let value_field = Arc::new(Field::new(
"value",
second_field.data_type().clone(),
second_field.is_nullable(),
));

let fields = Fields::from([key_field, value_field]);
let struct_type = DataType::Struct(fields);
let child_field = Field::new("entries", struct_type, false);

Arc::new(Field::new(
field.name().as_str(),
DataType::Map(Arc::new(child_field), *sorted),
field.is_nullable(),
))
}
_ => panic!("The child field of Map type should be Struct type with 2 fields."),
},
_ => field.clone(),
})
.collect::<Fields>();

Schema::new(fields).with_metadata(schema.metadata().clone())
}

fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
if verbose {
eprintln!("Validating {arrow_name} and {json_name}");
Expand Down
224 changes: 217 additions & 7 deletions arrow-integration-testing/src/lib.rs
Expand Up @@ -19,14 +19,20 @@

use serde_json::Value;

use arrow::datatypes::Schema;
use arrow::error::Result;
use arrow::array::{Array, StructArray};
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::error::{ArrowError, Result};
use arrow::ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema};
use arrow::record_batch::RecordBatch;
use arrow::util::test_util::arrow_test_data;
use arrow_integration_test::*;
use std::collections::HashMap;
use std::ffi::{c_int, CStr, CString};
use std::fs::File;
use std::io::BufReader;
use std::iter::zip;
use std::ptr;
use std::sync::Arc;

/// The expected username for the basic auth integration test.
pub const AUTH_USERNAME: &str = "arrow";
Expand All @@ -44,7 +50,50 @@ pub struct ArrowFile {
pub batches: Vec<RecordBatch>,
}

pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
// Canonicalize the names of map fields in a schema
pub fn canonicalize_schema(schema: &Schema) -> Schema {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Map(child_field, sorted) => match child_field.data_type() {
DataType::Struct(fields) if fields.len() == 2 => {
let first_field = fields.get(0).unwrap();
let key_field =
Arc::new(Field::new("key", first_field.data_type().clone(), false));
let second_field = fields.get(1).unwrap();
let value_field = Arc::new(Field::new(
"value",
second_field.data_type().clone(),
second_field.is_nullable(),
));

let fields = Fields::from([key_field, value_field]);
let struct_type = DataType::Struct(fields);
let child_field = Field::new("entries", struct_type, false);

Arc::new(Field::new(
field.name().as_str(),
DataType::Map(Arc::new(child_field), *sorted),
field.is_nullable(),
))
}
_ => panic!("The child field of Map type should be Struct type with 2 fields."),
},
_ => field.clone(),
})
.collect::<Fields>();

Schema::new(fields).with_metadata(schema.metadata().clone())
}

struct LazyArrowFile {
schema: Schema,
dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
arrow_json: Value,
}

fn read_json_file_metadata(json_name: &str) -> Result<LazyArrowFile> {
let json_file = File::open(json_name)?;
let reader = BufReader::new(json_file);
let arrow_json: Value = serde_json::from_reader(reader).unwrap();
Expand All @@ -62,20 +111,37 @@ pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
dictionaries.insert(json_dict.id, json_dict);
}
}
Ok(LazyArrowFile {
schema,
dictionaries,
arrow_json,
})
}

pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
let f = read_json_file_metadata(json_name)?;

let mut batches = vec![];
for b in arrow_json["batches"].as_array().unwrap() {
for b in f.arrow_json["batches"].as_array().unwrap() {
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
let batch = record_batch_from_json(&schema, json_batch, Some(&dictionaries))?;
let batch = record_batch_from_json(&f.schema, json_batch, Some(&f.dictionaries))?;
batches.push(batch);
}
Ok(ArrowFile {
schema,
_dictionaries: dictionaries,
schema: f.schema,
_dictionaries: f.dictionaries,
batches,
})
}

pub fn read_single_batch_from_json_file(json_name: &str, batch_num: usize) -> Result<RecordBatch> {
let f = read_json_file_metadata(json_name)?;
let b = f.arrow_json["batches"].get(batch_num).unwrap();
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
let batch = record_batch_from_json(&f.schema, json_batch, Some(&f.dictionaries))?;
Ok(batch)
}

/// Read gzipped JSON test file
///
/// For example given the input:
Expand All @@ -100,3 +166,147 @@ pub fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
arrow_json
}

//
// C Data Integration entrypoints
//

fn cdata_integration_export_schema_from_json(
c_json_name: *const i8,
out: *mut FFI_ArrowSchema,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let f = read_json_file_metadata(json_name.to_str()?)?;
let c_schema = FFI_ArrowSchema::try_from(&f.schema)?;
// Move exported schema into output struct
unsafe { ptr::write(out, c_schema) };
Ok(())
}

fn cdata_integration_export_batch_from_json(
c_json_name: *const i8,
batch_num: c_int,
out: *mut FFI_ArrowArray,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let b = read_single_batch_from_json_file(json_name.to_str()?, batch_num.try_into().unwrap())?;
let a = StructArray::from(b).into_data();
let c_array = FFI_ArrowArray::new(&a);
// Move exported array into output struct
unsafe { ptr::write(out, c_array) };
tustvold marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

fn cdata_integration_import_schema_and_compare_to_json(
c_json_name: *const i8,
c_schema: *mut FFI_ArrowSchema,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let json_schema = read_json_file_metadata(json_name.to_str()?)?.schema;

// The source ArrowSchema will be released when this is dropped
let imported_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema) };
let imported_schema = Schema::try_from(&imported_schema)?;

// compare schemas
if canonicalize_schema(&json_schema) != canonicalize_schema(&imported_schema) {
return Err(ArrowError::ComputeError(format!(
"Schemas do not match.\n- JSON: {:?}\n- Imported: {:?}",
json_schema, imported_schema
)));
}
Ok(())
}

fn compare_batches(a: &RecordBatch, b: &RecordBatch) -> Result<()> {
if a.num_columns() != b.num_columns() {
return Err(ArrowError::InvalidArgumentError(
"batches do not have the same number of columns".to_string(),
));
}
for (a_column, b_column) in zip(a.columns(), b.columns()) {
if a_column != b_column {
return Err(ArrowError::InvalidArgumentError(
"batch columns are not the same".to_string(),
));
}
}
Ok(())
}

fn cdata_integration_import_batch_and_compare_to_json(
c_json_name: *const i8,
batch_num: c_int,
c_array: *mut FFI_ArrowArray,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let json_batch =
read_single_batch_from_json_file(json_name.to_str()?, batch_num.try_into().unwrap())?;
let schema = json_batch.schema();

let data_type_for_import = DataType::Struct(schema.fields.clone());
let imported_array = unsafe { FFI_ArrowArray::from_raw(c_array) };
let imported_array = unsafe { from_ffi_and_data_type(imported_array, data_type_for_import) }?;
imported_array.validate_full()?;
let imported_batch = RecordBatch::from(StructArray::from(imported_array));

compare_batches(&json_batch, &imported_batch)
}

// If Result is an error, then export a const char* to its string display, otherwise NULL
fn result_to_c_error<T, E: std::fmt::Display>(result: &std::result::Result<T, E>) -> *mut i8 {
match result {
Ok(_) => ptr::null_mut(),
Err(e) => CString::new(format!("{}", e)).unwrap().into_raw(),
}
}

/// Release a const char* exported by result_to_c_error()
///
/// # Safety
///
/// The pointer is assumed to have been obtained using CString::into_raw.
#[no_mangle]
pub unsafe extern "C" fn arrow_rs_free_error(c_error: *mut i8) {
if !c_error.is_null() {
drop(unsafe { CString::from_raw(c_error) });
}
}

#[no_mangle]
pub extern "C" fn arrow_rs_cdata_integration_export_schema_from_json(
c_json_name: *const i8,
out: *mut FFI_ArrowSchema,
) -> *mut i8 {
let r = cdata_integration_export_schema_from_json(c_json_name, out);
result_to_c_error(&r)
}

#[no_mangle]
pub extern "C" fn arrow_rs_cdata_integration_import_schema_and_compare_to_json(
c_json_name: *const i8,
c_schema: *mut FFI_ArrowSchema,
) -> *mut i8 {
let r = cdata_integration_import_schema_and_compare_to_json(c_json_name, c_schema);
result_to_c_error(&r)
}

#[no_mangle]
pub extern "C" fn arrow_rs_cdata_integration_export_batch_from_json(
c_json_name: *const i8,
batch_num: c_int,
out: *mut FFI_ArrowArray,
) -> *mut i8 {
let r = cdata_integration_export_batch_from_json(c_json_name, batch_num, out);
result_to_c_error(&r)
}

#[no_mangle]
pub extern "C" fn arrow_rs_cdata_integration_import_batch_and_compare_to_json(
c_json_name: *const i8,
batch_num: c_int,
c_array: *mut FFI_ArrowArray,
) -> *mut i8 {
let r = cdata_integration_import_batch_and_compare_to_json(c_json_name, batch_num, c_array);
result_to_c_error(&r)
}
6 changes: 6 additions & 0 deletions arrow-schema/src/error.rs
Expand Up @@ -58,6 +58,12 @@ impl From<std::io::Error> for ArrowError {
}
}

impl From<std::str::Utf8Error> for ArrowError {
fn from(error: std::str::Utf8Error) -> Self {
ArrowError::ParseError(error.to_string())
}
}

impl From<std::string::FromUtf8Error> for ArrowError {
fn from(error: std::string::FromUtf8Error) -> Self {
ArrowError::ParseError(error.to_string())
Expand Down