Skip to content

Commit

Permalink
Restore Spark compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Jan 16, 2024
1 parent 11d6c6a commit bfce4eb
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 25 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.7.1] - 2024-01-15
### Changed
- Reverted to use `int32` and `int64` for `op` and `offset` respectively to preserve compatibility with Spark engine until Spark's Parquet version is upgraded.

## [0.7.0] - 2024-01-10
### Changed
- Migration to ODF changelog schema
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kamu-engine-datafusion"
version = "0.7.0"
version = "0.7.1"
authors = ["Kamu Data Inc. <[email protected]>"]
license-file = "LICENSE.txt"
edition = "2021"
Expand Down
34 changes: 23 additions & 11 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ impl Engine {

// Get result's execution plan
let df = ctx.table(Self::OUTPUT_VIEW_NAME).await.int_err()?;
tracing::info!(schema = ?df.schema(), "Raw result schema");

let df = Self::normalize_raw_result(df, &request.vocab)?;
tracing::info!(schema = ?df.schema(), "Normalized result schema");

Self::validate_raw_result(&df, &request.vocab)?;

Expand Down Expand Up @@ -321,11 +323,17 @@ impl Engine {
)
.alias(field.name())
}
// For compatibility with engines that cannot write correct Parquet logical types we
// allow plain INT32, but cast it to UINT8 here
DataType::Int32 if *field.name() == vocab.operation_type_column => {
// TODO: Normalize towards UInt8 after Spark is updated
// See: https://github.com/kamu-data/kamu-cli/issues/445
DataType::Int8
| DataType::UInt8
| DataType::Int16
| DataType::UInt16
| DataType::UInt32
if *field.name() == vocab.operation_type_column =>
{
noop = false;
cast(col(field.unqualified_column()), DataType::UInt8).alias(field.name())
cast(col(field.unqualified_column()), DataType::Int32).alias(field.name())
}
_ => col(field.unqualified_column()),
};
Expand All @@ -343,8 +351,6 @@ impl Engine {
df: &DataFrame,
vocab: &DatasetVocabulary,
) -> Result<(), ExecuteTransformError> {
tracing::info!(schema = ?df.schema(), "Computed raw result schema");

let system_columns = [&vocab.offset_column, &vocab.system_time_column];
for system_column in system_columns {
if df.schema().has_column_with_unqualified_name(system_column) {
Expand All @@ -366,11 +372,13 @@ impl Engine {
.first()
{
match op_col.data_type() {
DataType::UInt8 => {}
// TODO: Require UInt8 after Spark is updated
// See: https://github.com/kamu-data/kamu-cli/issues/445
DataType::Int32 => {}
typ => {
return Err(TransformResponseInvalidQuery {
message: format!(
"Operation type column '{}' should be UInt8, but found: {}",
"Operation type column '{}' should be Int32, but found: {}",
vocab.operation_type_column, typ
),
}
Expand Down Expand Up @@ -475,11 +483,13 @@ impl Engine {
}),
)?;

// TODO: Cast to UInt64 after Spark is updated
// See: https://github.com/kamu-data/kamu-cli/issues/445
let df = df.with_column(
&vocab.offset_column,
cast(
col(&vocab.offset_column as &str) + lit(start_offset as i64 - 1),
DataType::UInt64,
DataType::Int64,
),
)?;

Expand All @@ -490,7 +500,9 @@ impl Engine {
{
df.with_column(
&vocab.operation_type_column,
lit(OperationType::Append as u8),
// TODO: Cast to u8 after Spark is updated
// See: https://github.com/kamu-data/kamu-cli/issues/445
lit(OperationType::Append as i32),
)?
} else {
df
Expand Down Expand Up @@ -572,6 +584,6 @@ impl Engine {
tracing::info!("Produced empty result",);
let _ = std::fs::remove_file(path);
}
Ok(num_records)
Ok(num_records as u64)
}
}
26 changes: 14 additions & 12 deletions tests/tests/test_transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,17 @@ fn write_sample_data(path: impl AsRef<Path>, data: &[Record]) {
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;

// TODO: Replace with UInt64 and UInt8 after Spark is updated
// See: https://github.com/kamu-data/kamu-cli/issues/445
let schema = Arc::new(Schema::new(vec![
Field::new(
DatasetVocabulary::DEFAULT_OFFSET_COLUMN_NAME,
DataType::UInt64,
DataType::Int64,
false,
),
Field::new(
DatasetVocabulary::DEFAULT_OPERATION_TYPE_COLUMN_NAME,
DataType::UInt8,
DataType::Int32,
false,
),
Field::new(
Expand All @@ -81,11 +83,11 @@ fn write_sample_data(path: impl AsRef<Path>, data: &[Record]) {
let record_batch = RecordBatch::try_new(
schema,
vec![
Arc::new(array::UInt64Array::from(
data.iter().map(|r| r.offset).collect::<Vec<_>>(),
Arc::new(array::Int64Array::from(
data.iter().map(|r| r.offset as i64).collect::<Vec<_>>(),
)),
Arc::new(array::UInt8Array::from(
data.iter().map(|r| r.op as u8).collect::<Vec<_>>(),
Arc::new(array::Int32Array::from(
data.iter().map(|r| r.op as i32).collect::<Vec<_>>(),
)),
Arc::new(
array::TimestampMillisecondArray::from(
Expand Down Expand Up @@ -315,8 +317,8 @@ async fn test_result_schema() {
expected_schema: Some(indoc!(
r#"
message arrow_schema {
OPTIONAL INT64 offset (INTEGER(64,false));
REQUIRED INT32 op (INTEGER(8,false));
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
REQUIRED BYTE_ARRAY city (STRING);
Expand All @@ -337,8 +339,8 @@ async fn test_result_optimal_parquet_encoding() {
expected_schema: Some(indoc!(
r#"
message arrow_schema {
OPTIONAL INT64 offset (INTEGER(64,false));
REQUIRED INT32 op (INTEGER(8,false));
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
REQUIRED BYTE_ARRAY city (STRING);
Expand Down Expand Up @@ -667,8 +669,8 @@ async fn test_event_time_coerced_to_millis() {
expected_schema: Some(indoc!(
r#"
message arrow_schema {
OPTIONAL INT64 offset (INTEGER(64,false));
REQUIRED INT32 op (INTEGER(8,false));
OPTIONAL INT64 offset;
REQUIRED INT32 op;
REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));
REQUIRED INT64 event_time (TIMESTAMP(MILLIS,true));
REQUIRED BYTE_ARRAY city (STRING);
Expand Down

0 comments on commit bfce4eb

Please sign in to comment.