Skip to content

Commit

Permalink
Revamp ArrowFile
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Nov 20, 2023
1 parent 4812d7c commit 69c2715
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 55 deletions.
10 changes: 5 additions & 5 deletions arrow-integration-testing/src/bin/arrow-json-integration-test.rs
Expand Up @@ -19,7 +19,7 @@ use arrow::error::{ArrowError, Result};
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::FileWriter;
use arrow_integration_test::*;
use arrow_integration_testing::{canonicalize_schema, read_json_file};
use arrow_integration_testing::{canonicalize_schema, open_json_file};
use clap::Parser;
use std::fs::File;

Expand Down Expand Up @@ -63,12 +63,12 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()>
eprintln!("Converting {json_name} to {arrow_name}");
}

let json_file = read_json_file(json_name)?;
let json_file = open_json_file(json_name)?;

let arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?;

for b in json_file.batches {
for b in json_file.read_batches()? {
writer.write(&b)?;
}

Expand Down Expand Up @@ -116,7 +116,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
}

// open JSON file
let json_file = read_json_file(json_name)?;
let json_file = open_json_file(json_name)?;

// open Arrow file
let arrow_file = File::open(arrow_name)?;
Expand All @@ -131,7 +131,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
)));
}

let json_batches = &json_file.batches;
let json_batches = json_file.read_batches()?;

// compare number of batches
assert!(
Expand Down
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::{read_json_file, ArrowFile};
use crate::open_json_file;
use std::collections::HashMap;

use arrow::{
Expand Down Expand Up @@ -45,23 +45,16 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result {

let client = FlightServiceClient::connect(url).await?;

let ArrowFile {
schema, batches, ..
} = read_json_file(path)?;
let json_file = open_json_file(path)?;

let schema = Arc::new(schema);
let batches = json_file.read_batches()?;
let schema = Arc::new(json_file.schema);

let mut descriptor = FlightDescriptor::default();
descriptor.set_type(DescriptorType::Path);
descriptor.path = vec![path.to_string()];

upload_data(
client.clone(),
schema.clone(),
descriptor.clone(),
batches.clone(),
)
.await?;
upload_data(client.clone(), schema, descriptor.clone(), batches.clone()).await?;
verify_data(client, descriptor, &batches).await?;

Ok(())
Expand Down
66 changes: 28 additions & 38 deletions arrow-integration-testing/src/lib.rs
Expand Up @@ -46,8 +46,28 @@ pub struct ArrowFile {
pub schema: Schema,
// we can evolve this into a concrete Arrow type
// this is temporarily not being read from
pub _dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
pub batches: Vec<RecordBatch>,
dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
arrow_json: Value,
}

impl ArrowFile {
pub fn read_batch(&self, batch_num: usize) -> Result<RecordBatch> {
let b = self.arrow_json["batches"].get(batch_num).unwrap();
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))
}

pub fn read_batches(&self) -> Result<Vec<RecordBatch>> {
self.arrow_json["batches"]
.as_array()
.unwrap()
.iter()
.map(|b| {
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))
})
.collect()
}
}

// Canonicalize the names of map fields in a schema
Expand Down Expand Up @@ -87,13 +107,7 @@ pub fn canonicalize_schema(schema: &Schema) -> Schema {
Schema::new(fields).with_metadata(schema.metadata().clone())
}

struct LazyArrowFile {
schema: Schema,
dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
arrow_json: Value,
}

fn read_json_file_metadata(json_name: &str) -> Result<LazyArrowFile> {
pub fn open_json_file(json_name: &str) -> Result<ArrowFile> {
let json_file = File::open(json_name)?;
let reader = BufReader::new(json_file);
let arrow_json: Value = serde_json::from_reader(reader).unwrap();
Expand All @@ -111,37 +125,13 @@ fn read_json_file_metadata(json_name: &str) -> Result<LazyArrowFile> {
dictionaries.insert(json_dict.id, json_dict);
}
}
Ok(LazyArrowFile {
Ok(ArrowFile {
schema,
dictionaries,
arrow_json,
})
}

pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
let f = read_json_file_metadata(json_name)?;

let mut batches = vec![];
for b in f.arrow_json["batches"].as_array().unwrap() {
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
let batch = record_batch_from_json(&f.schema, json_batch, Some(&f.dictionaries))?;
batches.push(batch);
}
Ok(ArrowFile {
schema: f.schema,
_dictionaries: f.dictionaries,
batches,
})
}

pub fn read_single_batch_from_json_file(json_name: &str, batch_num: usize) -> Result<RecordBatch> {
let f = read_json_file_metadata(json_name)?;
let b = f.arrow_json["batches"].get(batch_num).unwrap();
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
let batch = record_batch_from_json(&f.schema, json_batch, Some(&f.dictionaries))?;
Ok(batch)
}

/// Read gzipped JSON test file
///
/// For example given the input:
Expand Down Expand Up @@ -176,7 +166,7 @@ fn cdata_integration_export_schema_from_json(
out: *mut FFI_ArrowSchema,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let f = read_json_file_metadata(json_name.to_str()?)?;
let f = open_json_file(json_name.to_str()?)?;
let c_schema = FFI_ArrowSchema::try_from(&f.schema)?;
// Move exported schema into output struct
unsafe { ptr::write(out, c_schema) };
Expand All @@ -189,7 +179,7 @@ fn cdata_integration_export_batch_from_json(
out: *mut FFI_ArrowArray,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let b = read_single_batch_from_json_file(json_name.to_str()?, batch_num.try_into().unwrap())?;
let b = open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
let a = StructArray::from(b).into_data();
let c_array = FFI_ArrowArray::new(&a);
// Move exported array into output struct
Expand All @@ -202,7 +192,7 @@ fn cdata_integration_import_schema_and_compare_to_json(
c_schema: *mut FFI_ArrowSchema,
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let json_schema = read_json_file_metadata(json_name.to_str()?)?.schema;
let json_schema = open_json_file(json_name.to_str()?)?.schema;

// The source ArrowSchema will be released when this is dropped
let imported_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema) };
Expand Down Expand Up @@ -241,7 +231,7 @@ fn cdata_integration_import_batch_and_compare_to_json(
) -> Result<()> {
let json_name = unsafe { CStr::from_ptr(c_json_name) };
let json_batch =
read_single_batch_from_json_file(json_name.to_str()?, batch_num.try_into().unwrap())?;
open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
let schema = json_batch.schema();

let data_type_for_import = DataType::Struct(schema.fields.clone());
Expand Down

0 comments on commit 69c2715

Please sign in to comment.