Skip to content

Commit 14635da

Browse files
authored
perf: Reuse row converter during sort (#15302)
* reuse row converter during sort * review * update submodule pin
1 parent e2b7919 commit 14635da

File tree

1 file changed

+88
-12
lines changed
  • datafusion/physical-plan/src/sorts

1 file changed

+88
-12
lines changed

datafusion/physical-plan/src/sorts/sort.rs

+88-12
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ use arrow::array::{
4949
};
5050
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
5151
use arrow::datatypes::{DataType, SchemaRef};
52-
use arrow::row::{RowConverter, SortField};
53-
use datafusion_common::{internal_datafusion_err, internal_err, Result};
52+
use arrow::row::{RowConverter, Rows, SortField};
53+
use datafusion_common::{
54+
exec_datafusion_err, internal_datafusion_err, internal_err, Result,
55+
};
5456
use datafusion_execution::disk_manager::RefCountedTempFile;
5557
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
5658
use datafusion_execution::runtime_env::RuntimeEnv;
@@ -203,6 +205,8 @@ struct ExternalSorter {
203205
schema: SchemaRef,
204206
/// Sort expressions
205207
expr: Arc<[PhysicalSortExpr]>,
208+
/// RowConverter corresponding to the sort expressions
209+
sort_keys_row_converter: Arc<RowConverter>,
206210
/// If Some, the maximum number of output rows that will be produced
207211
fetch: Option<usize>,
208212
/// The target number of rows for output batches
@@ -265,7 +269,7 @@ impl ExternalSorter {
265269
sort_in_place_threshold_bytes: usize,
266270
metrics: &ExecutionPlanMetricsSet,
267271
runtime: Arc<RuntimeEnv>,
268-
) -> Self {
272+
) -> Result<Self> {
269273
let metrics = ExternalSorterMetrics::new(metrics, partition_id);
270274
let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]"))
271275
.with_can_spill(true)
@@ -275,19 +279,36 @@ impl ExternalSorter {
275279
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
276280
.register(&runtime.memory_pool);
277281

282+
// Construct RowConverter for sort keys
283+
let sort_fields = expr
284+
.iter()
285+
.map(|e| {
286+
let data_type = e
287+
.expr
288+
.data_type(&schema)
289+
.map_err(|e| e.context("Resolving sort expression data type"))?;
290+
Ok(SortField::new_with_options(data_type, e.options))
291+
})
292+
.collect::<Result<Vec<_>>>()?;
293+
294+
let converter = RowConverter::new(sort_fields).map_err(|e| {
295+
exec_datafusion_err!("Failed to create RowConverter: {:?}", e)
296+
})?;
297+
278298
let spill_manager = SpillManager::new(
279299
Arc::clone(&runtime),
280300
metrics.spill_metrics.clone(),
281301
Arc::clone(&schema),
282302
);
283303

284-
Self {
304+
Ok(Self {
285305
schema,
286306
in_mem_batches: vec![],
287307
in_mem_batches_sorted: false,
288308
in_progress_spill_file: None,
289309
finished_spill_files: vec![],
290310
expr: expr.into(),
311+
sort_keys_row_converter: Arc::new(converter),
291312
metrics,
292313
fetch,
293314
reservation,
@@ -297,7 +318,7 @@ impl ExternalSorter {
297318
batch_size,
298319
sort_spill_reservation_bytes,
299320
sort_in_place_threshold_bytes,
300-
}
321+
})
301322
}
302323

303324
/// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
@@ -723,15 +744,29 @@ impl ExternalSorter {
723744

724745
let fetch = self.fetch;
725746
let expressions: LexOrdering = self.expr.iter().cloned().collect();
726-
let stream = futures::stream::once(futures::future::lazy(move |_| {
727-
let timer = metrics.elapsed_compute().timer();
728-
let sorted = sort_batch(&batch, &expressions, fetch)?;
729-
timer.done();
747+
let row_converter = Arc::clone(&self.sort_keys_row_converter);
748+
let stream = futures::stream::once(async move {
749+
let _timer = metrics.elapsed_compute().timer();
750+
751+
let sort_columns = expressions
752+
.iter()
753+
.map(|expr| expr.evaluate_to_sort_column(&batch))
754+
.collect::<Result<Vec<_>>>()?;
755+
756+
let sorted = if is_multi_column_with_lists(&sort_columns) {
757+
// lex_sort_to_indices doesn't support List with more than one column
758+
// https://github.com/apache/arrow-rs/issues/5454
759+
sort_batch_row_based(&batch, &expressions, row_converter, fetch)?
760+
} else {
761+
sort_batch(&batch, &expressions, fetch)?
762+
};
763+
730764
metrics.record_output(sorted.num_rows());
731765
drop(batch);
732766
drop(reservation);
733767
Ok(sorted)
734-
}));
768+
});
769+
735770
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
736771
}
737772

@@ -776,6 +811,45 @@ impl Debug for ExternalSorter {
776811
}
777812
}
778813

814+
/// Converts rows into a sorted array of indices based on their order.
815+
/// This function returns the indices that represent the sorted order of the rows.
816+
fn rows_to_indices(rows: Rows, limit: Option<usize>) -> Result<UInt32Array> {
817+
let mut sort: Vec<_> = rows.iter().enumerate().collect();
818+
sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
819+
820+
let mut len = rows.num_rows();
821+
if let Some(limit) = limit {
822+
len = limit.min(len);
823+
}
824+
let indices =
825+
UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as u32));
826+
Ok(indices)
827+
}
828+
829+
/// Sorts a `RecordBatch` by converting its sort columns into Arrow Row Format for faster comparison.
830+
fn sort_batch_row_based(
831+
batch: &RecordBatch,
832+
expressions: &LexOrdering,
833+
row_converter: Arc<RowConverter>,
834+
fetch: Option<usize>,
835+
) -> Result<RecordBatch> {
836+
let sort_columns = expressions
837+
.iter()
838+
.map(|expr| expr.evaluate_to_sort_column(batch).map(|col| col.values))
839+
.collect::<Result<Vec<_>>>()?;
840+
let rows = row_converter.convert_columns(&sort_columns)?;
841+
let indices = rows_to_indices(rows, fetch)?;
842+
let columns = take_arrays(batch.columns(), &indices, None)?;
843+
844+
let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
845+
846+
Ok(RecordBatch::try_new_with_options(
847+
batch.schema(),
848+
columns,
849+
&options,
850+
)?)
851+
}
852+
779853
pub fn sort_batch(
780854
batch: &RecordBatch,
781855
expressions: &LexOrdering,
@@ -838,7 +912,9 @@ pub(crate) fn lexsort_to_indices_multi_columns(
838912
},
839913
);
840914

841-
// TODO reuse converter and rows, refer to TopK.
915+
// Note: row converter is reused through `sort_batch_row_based()`, this function
916+
// is not used during normal sort execution, but it's kept temporarily because
917+
// it's inside a public interface `sort_batch()`.
842918
let converter = RowConverter::new(fields)?;
843919
let rows = converter.convert_columns(&columns)?;
844920
let mut sort: Vec<_> = rows.iter().enumerate().collect();
@@ -1154,7 +1230,7 @@ impl ExecutionPlan for SortExec {
11541230
execution_options.sort_in_place_threshold_bytes,
11551231
&self.metrics_set,
11561232
context.runtime_env(),
1157-
);
1233+
)?;
11581234
Ok(Box::pin(RecordBatchStreamAdapter::new(
11591235
self.schema(),
11601236
futures::stream::once(async move {

0 commit comments

Comments
 (0)