Skip to content

Commit

Permalink
testcases for randomly ordered build side input
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Dec 29, 2023
1 parent c424eb3 commit 7311bd9
Showing 1 changed file with 91 additions and 2 deletions.
93 changes: 91 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1628,9 +1628,9 @@ mod tests {
async fn join_inner_one_no_shared_column_names() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![1, 3, 2]),
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), // this has a repetition
("c1", &vec![7, 9, 8]),
("c1", &vec![7, 8, 9]),
);
let right = build_table(
("a2", &vec![10, 20, 30]),
Expand All @@ -1652,8 +1652,48 @@ mod tests {
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"| 3 | 5 | 9 | 20 | 5 | 80 |",
"+----+----+----+----+----+----+",
];

// Inner join output is expected to preserve both inputs order
assert_batches_eq!(expected, &batches);

Ok(())
}

#[tokio::test]
async fn join_inner_one_randomly_ordered() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let left = build_table(
("a1", &vec![0, 3, 2, 1]),
("b1", &vec![4, 5, 5, 4]),
("c1", &vec![6, 9, 8, 7]),
);
let right = build_table(
("a2", &vec![20, 30, 10]),
("b2", &vec![5, 6, 4]),
("c2", &vec![80, 90, 70]),
);
let on = vec![(
Column::new_with_schema("b1", &left.schema())?,
Column::new_with_schema("b2", &right.schema())?,
)];

let (columns, batches) =
join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?;

assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);

let expected = [
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| 3 | 5 | 9 | 20 | 5 | 80 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"| 0 | 4 | 6 | 10 | 4 | 70 |",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"+----+----+----+----+----+----+",
];

Expand Down Expand Up @@ -1765,6 +1805,55 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let batch1 = build_table_i32(
("a1", &vec![0, 3]),
("b1", &vec![4, 5]),
("c1", &vec![6, 9]),
);
let batch2 = build_table_i32(
("a1", &vec![2, 1]),
("b1", &vec![5, 4]),
("c1", &vec![8, 7]),
);
let schema = batch1.schema();
let left = Arc::new(
MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None).unwrap(),
);
let right = build_table(
("a2", &vec![20, 30, 10]),
("b2", &vec![5, 6, 4]),
("c2", &vec![80, 90, 70]),
);
let on = vec![(
Column::new_with_schema("b1", &left.schema())?,
Column::new_with_schema("b2", &right.schema())?,
)];

let (columns, batches) =
join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?;

assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);

let expected = [
"+----+----+----+----+----+----+",
"| a1 | b1 | c1 | a2 | b2 | c2 |",
"+----+----+----+----+----+----+",
"| 3 | 5 | 9 | 20 | 5 | 80 |",
"| 2 | 5 | 8 | 20 | 5 | 80 |",
"| 0 | 4 | 6 | 10 | 4 | 70 |",
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"+----+----+----+----+----+----+",
];

// Inner join output is expected to preserve both inputs order
assert_batches_eq!(expected, &batches);

Ok(())
}

/// Test where the left has 1 part, the right has 2 parts => 2 parts
#[tokio::test]
async fn join_inner_one_two_parts_right() -> Result<()> {
Expand Down

0 comments on commit 7311bd9

Please sign in to comment.