From bfce4eb7e4a2e31b19ba4e0f63946f44425858b1 Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Mon, 15 Jan 2024 17:49:41 -0800 Subject: [PATCH] Restore Spark compatibility --- CHANGELOG.md | 4 ++++ Cargo.lock | 2 +- Cargo.toml | 2 +- src/engine.rs | 34 +++++++++++++++++++++++----------- tests/tests/test_transform.rs | 26 ++++++++++++++------------ 5 files changed, 43 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc1c388..ed70a54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.7.1] - 2024-01-15 +### Changed +- Reverted to use `int32` and `int64` for `op` and `offset` respectively to preserve compatibility with Spark engine until Spark's Parquet version is upgraded. + ## [0.7.0] - 2024-01-10 ### Changed - Migration to ODF changelog schema diff --git a/Cargo.lock b/Cargo.lock index 205d7df..733c199 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1550,7 +1550,7 @@ dependencies = [ [[package]] name = "kamu-engine-datafusion" -version = "0.7.0" +version = "0.7.1" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index dcecefd..e8df50b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kamu-engine-datafusion" -version = "0.7.0" +version = "0.7.1" authors = ["Kamu Data Inc. "] license-file = "LICENSE.txt" edition = "2021" diff --git a/src/engine.rs b/src/engine.rs index 3c06b77..dba0353 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -127,8 +127,10 @@ impl Engine { // Get result's execution plan let df = ctx.table(Self::OUTPUT_VIEW_NAME).await.int_err()?; + tracing::info!(schema = ?df.schema(), "Raw result schema"); let df = Self::normalize_raw_result(df, &request.vocab)?; + tracing::info!(schema = ?df.schema(), "Normalized result schema"); Self::validate_raw_result(&df, &request.vocab)?; @@ -321,11 +323,17 @@ impl Engine { ) .alias(field.name()) } - // For compatibility with engines that cannot write correct Parquet logical types we - // allow plain INT32, but cast it to UINT8 here - DataType::Int32 if *field.name() == vocab.operation_type_column => { + // TODO: Normalize towards UInt8 after Spark is updated + // See: https://github.com/kamu-data/kamu-cli/issues/445 + DataType::Int8 + | DataType::UInt8 + | DataType::Int16 + | DataType::UInt16 + | DataType::UInt32 + if *field.name() == vocab.operation_type_column => + { noop = false; - cast(col(field.unqualified_column()), DataType::UInt8).alias(field.name()) + cast(col(field.unqualified_column()), DataType::Int32).alias(field.name()) } _ => col(field.unqualified_column()), }; @@ -343,8 +351,6 @@ impl Engine { df: &DataFrame, vocab: &DatasetVocabulary, ) -> Result<(), ExecuteTransformError> { - tracing::info!(schema = ?df.schema(), "Computed raw result schema"); - let system_columns = [&vocab.offset_column, &vocab.system_time_column]; for system_column in system_columns { if df.schema().has_column_with_unqualified_name(system_column) { @@ -366,11 +372,13 @@ impl Engine { .first() { match op_col.data_type() { - DataType::UInt8 => {} + // TODO: Require UInt8 after Spark is updated + // See: https://github.com/kamu-data/kamu-cli/issues/445 + DataType::Int32 => {} typ => { return Err(TransformResponseInvalidQuery { message: format!( - "Operation type column '{}' should be UInt8, but found: {}", + "Operation type column '{}' should be Int32, but found: {}", vocab.operation_type_column, typ ), } @@ -475,11 +483,13 @@ impl Engine { }), )?; + // TODO: Cast to UInt64 after Spark is updated + // See: https://github.com/kamu-data/kamu-cli/issues/445 let df = df.with_column( &vocab.offset_column, cast( col(&vocab.offset_column as &str) + lit(start_offset as i64 - 1), - DataType::UInt64, + DataType::Int64, ), )?; @@ -490,7 +500,9 @@ impl Engine { { df.with_column( &vocab.operation_type_column, - lit(OperationType::Append as u8), + // TODO: Cast to u8 after Spark is updated + // See: https://github.com/kamu-data/kamu-cli/issues/445 + lit(OperationType::Append as i32), )? } else { df @@ -572,6 +584,6 @@ impl Engine { tracing::info!("Produced empty result",); let _ = std::fs::remove_file(path); } - Ok(num_records) + Ok(num_records as u64) } } diff --git a/tests/tests/test_transform.rs b/tests/tests/test_transform.rs index 6fc839e..dd3a7bb 100644 --- a/tests/tests/test_transform.rs +++ b/tests/tests/test_transform.rs @@ -53,15 +53,17 @@ fn write_sample_data(path: impl AsRef, data: &[Record]) { use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; + // TODO: Replace with UInt64 and UInt8 after Spark is updated + // See: https://github.com/kamu-data/kamu-cli/issues/445 let schema = Arc::new(Schema::new(vec![ Field::new( DatasetVocabulary::DEFAULT_OFFSET_COLUMN_NAME, - DataType::UInt64, + DataType::Int64, false, ), Field::new( DatasetVocabulary::DEFAULT_OPERATION_TYPE_COLUMN_NAME, - DataType::UInt8, + DataType::Int32, false, ), Field::new( @@ -81,11 +83,11 @@ fn write_sample_data(path: impl AsRef, data: &[Record]) { let record_batch = RecordBatch::try_new( schema, vec![ - Arc::new(array::UInt64Array::from( - data.iter().map(|r| r.offset).collect::>(), + Arc::new(array::Int64Array::from( + data.iter().map(|r| r.offset as i64).collect::>(), )), - Arc::new(array::UInt8Array::from( - data.iter().map(|r| r.op as u8).collect::>(), + Arc::new(array::Int32Array::from( + data.iter().map(|r| r.op as i32).collect::>(), )), Arc::new( array::TimestampMillisecondArray::from( @@ -315,8 +317,8 @@ async fn test_result_schema() { expected_schema: Some(indoc!( r#" message arrow_schema { - OPTIONAL INT64 offset (INTEGER(64,false)); - REQUIRED INT32 op (INTEGER(8,false)); + OPTIONAL INT64 offset; + REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); REQUIRED BYTE_ARRAY city (STRING); @@ -337,8 +339,8 @@ async fn test_result_optimal_parquet_encoding() { expected_schema: Some(indoc!( r#" message arrow_schema { - OPTIONAL INT64 offset (INTEGER(64,false)); - REQUIRED INT32 op (INTEGER(8,false)); + OPTIONAL INT64 offset; + REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); REQUIRED BYTE_ARRAY city (STRING); @@ -667,8 +669,8 @@ async fn test_event_time_coerced_to_millis() { expected_schema: Some(indoc!( r#" message arrow_schema { - OPTIONAL INT64 offset (INTEGER(64,false)); - REQUIRED INT32 op (INTEGER(8,false)); + OPTIONAL INT64 offset; + REQUIRED INT32 op; REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true)); REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true)); REQUIRED BYTE_ARRAY city (STRING);