Skip to content

Commit

Permalink
Upgrade to deltalake 0.16.5
Browse files Browse the repository at this point in the history
This change includes a lot of minor changes to compile against newer
deltalake libraries

Closes delta-io#156
  • Loading branch information
rtyler committed Dec 9, 2023
1 parent 6b50a26 commit b64938b
Show file tree
Hide file tree
Showing 8 changed files with 814 additions and 1,084 deletions.
1,801 changes: 758 additions & 1,043 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ tokio-util = "0.6.3"
uuid = { version = "0.8", features = ["serde", "v4"] }
url = "2.3"

deltalake = { version = "0.12.0", features = ["json"], optional = true }
deltalake = { version = "0.16.5", features = ["arrow", "json", "parquet"], optional = true }

# s3 feature enabled
dynamodb_lock = { version = "^0.4.3", optional = true }
dynamodb_lock = { version = "=0.4.3", optional = true }
rusoto_core = { version = "0.46", optional = true }
rusoto_credential = { version = "0.46", optional = true }
rusoto_s3 = { version = "0.46", optional = true }
Expand Down
11 changes: 8 additions & 3 deletions src/coercions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,14 @@ fn string_to_timestamp(string: &str) -> Option<Value> {
e
)
}
parsed
.ok()
.map(|dt: DateTime<Utc>| Value::Number((dt.timestamp_nanos() / 1000).into()))
parsed.ok().map(|dt: DateTime<Utc>| {
Value::Number(
(dt.timestamp_nanos_opt()
.expect("Failed to turn timestamp nanoseconds")
/ 1000)
.into(),
)
})
}

#[cfg(test)]
Expand Down
26 changes: 13 additions & 13 deletions src/dead_letters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use serde_json::Value;
use std::collections::HashMap;

use crate::{transforms::TransformError, writer::*};
use deltalake::checkpoints::CheckpointError;

#[cfg(feature = "s3")]
mod env_vars {
Expand Down Expand Up @@ -47,7 +46,10 @@ impl DeadLetter {
base64_bytes: Some(base64::encode(bytes)),
json_string: None,
error: Some(err),
timestamp: timestamp.timestamp_nanos() / 1000,
timestamp: timestamp
.timestamp_nanos_opt()
.expect("Failed to convert timezone to nanoseconds")
/ 1000,
}
}

Expand All @@ -59,7 +61,10 @@ impl DeadLetter {
base64_bytes: None,
json_string: Some(value.to_string()),
error: Some(err.to_string()),
timestamp: timestamp.timestamp_nanos() / 1000,
timestamp: timestamp
.timestamp_nanos_opt()
.expect("Failed to convert timezone to nanoseconds")
/ 1000,
}
}

Expand All @@ -72,7 +77,10 @@ impl DeadLetter {
base64_bytes: None,
json_string: Some(value.to_string()),
error: Some(err.to_string()),
timestamp: timestamp.timestamp_nanos() / 1000,
timestamp: timestamp
.timestamp_nanos_opt()
.expect("Failed to convert timezone to nanoseconds")
/ 1000,
}
}

Expand Down Expand Up @@ -111,14 +119,6 @@ pub enum DeadLetterQueueError {
source: TransformError,
},

/// Error occurred when writing a delta log checkpoint.
#[error("CheckpointErrorError error: {source}")]
CheckpointErrorError {
/// The wrapped [`CheckpointError`]
#[from]
source: CheckpointError,
},

/// DeltaTable returned an error.
#[error("DeltaTable interaction failed: {source}")]
DeltaTable {
Expand Down Expand Up @@ -151,7 +151,7 @@ pub(crate) struct DeadLetterQueueOptions {
/// The [LoggingDeadLetterQueue] is intended for local development only
/// and is not provided by the [dlq_from_opts] factory method.
#[async_trait]
pub(crate) trait DeadLetterQueue: Send + Sync {
pub(crate) trait DeadLetterQueue: Send {
/// Writes one [DeadLetter] to the [DeadLetterQueue].
async fn write_dead_letter(
&mut self,
Expand Down
5 changes: 2 additions & 3 deletions src/delta_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{DataTypeOffset, DataTypePartition};
use deltalake::action::{Action, Add, Txn};
use deltalake::checkpoints::CheckpointError;
use deltalake::protocol::{Action, Add, Txn};
use deltalake::{DeltaTable, DeltaTableError};
use std::collections::HashMap;

Expand Down Expand Up @@ -43,7 +42,7 @@ pub(crate) fn create_txn_action(txn_app_id: String, offset: DataTypeOffset) -> A
pub(crate) async fn try_create_checkpoint(
table: &mut DeltaTable,
version: i64,
) -> Result<(), CheckpointError> {
) -> Result<(), DeltaTableError> {
if version % 10 == 0 {
let table_version = table.version();
// if there's new version right after current commit, then we need to reset
Expand Down
15 changes: 4 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ extern crate strum_macros;
extern crate serde_json;

use coercions::CoercionTree;
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::OutputMode;
use deltalake::{DeltaTable, DeltaTableError};
use futures::stream::StreamExt;
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -58,7 +60,6 @@ use crate::{
writer::{DataWriter, DataWriterError},
};
use delta_helpers::*;
use deltalake::checkpoints::CheckpointError;
use rdkafka::message::BorrowedMessage;
use std::ops::Add;

Expand Down Expand Up @@ -100,14 +101,6 @@ pub enum IngestError {
source: Box<DataWriterError>,
},

/// Error occurred when writing a delta log checkpoint.
#[error("CheckpointErrorError error: {source}")]
CheckpointErrorError {
/// The wrapped [`CheckpointError`]
#[from]
source: CheckpointError,
},

/// Error from [`WriteOffsetsError`]
#[error("WriteOffsets error: {source}")]
WriteOffsets {
Expand Down Expand Up @@ -985,8 +978,8 @@ impl IngestProcessor {
match deltalake::operations::transaction::commit(
(self.table.object_store().storage_backend()).as_ref(),
&actions,
deltalake::action::DeltaOperation::StreamingUpdate {
output_mode: deltalake::action::OutputMode::Append,
DeltaOperation::StreamingUpdate {
output_mode: OutputMode::Append,
query_id: self.opts.app_id.clone(),
epoch_id,
},
Expand Down
8 changes: 5 additions & 3 deletions src/offsets.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::delta_helpers::*;
use crate::{DataTypeOffset, DataTypePartition};
use deltalake::action::Action;
use deltalake::protocol::Action;
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::OutputMode;
use deltalake::{DeltaTable, DeltaTableError};
use log::{error, info};

Expand Down Expand Up @@ -116,8 +118,8 @@ async fn commit_partition_offsets(
match deltalake::operations::transaction::commit(
(table.object_store().storage_backend()).as_ref(),
&actions,
deltalake::action::DeltaOperation::StreamingUpdate {
output_mode: deltalake::action::OutputMode::Complete,
DeltaOperation::StreamingUpdate {
output_mode: OutputMode::Complete,
query_id: app_id,
epoch_id,
},
Expand Down
28 changes: 22 additions & 6 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ use deltalake::arrow::{
json::reader::ReaderBuilder,
record_batch::*,
};

use deltalake::parquet::format::FileMetaData;
use deltalake::parquet::{
arrow::ArrowWriter,
basic::{Compression, LogicalType},
errors::ParquetError,
file::{metadata::RowGroupMetaData, properties::WriterProperties, statistics::Statistics},
format::TimeUnit,
schema::types::{ColumnDescriptor, SchemaDescriptor},
};
use deltalake::protocol::DeltaOperation;
use deltalake::protocol::SaveMode;
use deltalake::{
action::{Action, Add, ColumnCountStat, ColumnValueStat, Stats},
protocol::{Action, Add, ColumnCountStat, ColumnValueStat, Stats},
storage::DeltaObjectStore,
time_utils::timestamp_to_delta_stats_string,
DeltaResult, DeltaTable, DeltaTableError, DeltaTableMetaData, ObjectStoreError, Schema,
table::DeltaTableMetaData,
DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, Schema,
};
use log::{error, info, warn};
use serde_json::{Number, Value};
Expand Down Expand Up @@ -576,8 +578,8 @@ impl DataWriter {
let version = deltalake::operations::transaction::commit(
(table.object_store().storage_backend()).as_ref(),
&actions,
deltalake::action::DeltaOperation::Write {
mode: deltalake::action::SaveMode::Append,
DeltaOperation::Write {
mode: SaveMode::Append,
partition_by: Some(self.partition_columns.clone()),
predicate: None,
},
Expand Down Expand Up @@ -1086,6 +1088,7 @@ fn create_add(
stats: Some(stats_string),
stats_parsed: None,
tags: None,
..Default::default()
})
}

Expand Down Expand Up @@ -1147,6 +1150,19 @@ fn stringified_partition_value(
Ok(Some(s))
}

/// Vendored from delta-rs since it's no longer a public API
fn timestamp_to_delta_stats_string(n: i64, time_unit: &TimeUnit) -> Option<String> {
use deltalake::arrow::temporal_conversions;

let dt = match time_unit {
TimeUnit::MILLIS(_) => temporal_conversions::timestamp_ms_to_datetime(n),
TimeUnit::MICROS(_) => temporal_conversions::timestamp_us_to_datetime(n),
TimeUnit::NANOS(_) => temporal_conversions::timestamp_ns_to_datetime(n),
}?;

Some(format!("{}", dt.format("%Y-%m-%dT%H:%M:%S%.3fZ")))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit b64938b

Please sign in to comment.