Skip to content

Commit

Permalink
Avoid concat for array_replace (#8337)
Browse files Browse the repository at this point in the history
* add benchmark

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fmt

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* address clippy

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* cleanup

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fix comment

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

---------

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
jayzhan211 and alamb authored Dec 2, 2023
1 parent 3b29837 commit 340ecfd
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 67 deletions.
4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,7 @@ name = "sort"
[[bench]]
harness = false
name = "topk_aggregate"

[[bench]]
harness = false
name = "array_expression"
73 changes: 73 additions & 0 deletions datafusion/core/benches/array_expression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
extern crate arrow;
extern crate datafusion;

mod data_utils;
use crate::criterion::Criterion;
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::{ArrayRef, Int64Array, ListArray};
use datafusion_physical_expr::array_expressions;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
// Construct large arrays for benchmarking

let array_len = 100000000;

let array = (0..array_len).map(|_| Some(2_i64)).collect::<Vec<_>>();
let list_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(array.clone()),
Some(array.clone()),
Some(array),
]);
let from_array = Int64Array::from_value(2, 3);
let to_array = Int64Array::from_value(-2, 3);

let args = vec![
Arc::new(list_array) as ArrayRef,
Arc::new(from_array) as ArrayRef,
Arc::new(to_array) as ArrayRef,
];

let array = (0..array_len).map(|_| Some(-2_i64)).collect::<Vec<_>>();
let expected_array = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(array.clone()),
Some(array.clone()),
Some(array),
]);

// Benchmark array functions

c.bench_function("array_replace", |b| {
b.iter(|| {
assert_eq!(
array_expressions::array_replace_all(args.as_slice())
.unwrap()
.as_list::<i32>(),
criterion::black_box(&expected_array)
)
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
125 changes: 58 additions & 67 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ use datafusion_common::cast::{
};
use datafusion_common::utils::{array_into_list_array, list_ndims};
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err,
DataFusionError, Result,
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result,
};

use itertools::Itertools;
Expand Down Expand Up @@ -1320,84 +1319,76 @@ fn general_replace(
) -> Result<ArrayRef> {
// Build up the offsets for the final output array
let mut offsets: Vec<i32> = vec![0];
let data_type = list_array.value_type();
let mut new_values = vec![];
let values = list_array.values();
let original_data = values.to_data();
let to_data = to_array.to_data();
let capacity = Capacities::Array(original_data.len());

// n is the number of elements to replace in this row
for (row_index, (list_array_row, n)) in
list_array.iter().zip(arr_n.iter()).enumerate()
{
let last_offset: i32 = offsets
.last()
.copied()
.ok_or_else(|| internal_datafusion_err!("offsets should not be empty"))?;
// First array is the original array, second array is the element to replace with.
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &to_data],
false,
capacity,
);

match list_array_row {
Some(list_array_row) => {
// Compute all positions in list_row_array (that is itself an
// array) that are equal to `from_array_row`
let eq_array = compare_element_to_list(
&list_array_row,
&from_array,
row_index,
true,
)?;
let mut valid = BooleanBufferBuilder::new(list_array.len());

// Use MutableArrayData to build the replaced array
let original_data = list_array_row.to_data();
let to_data = to_array.to_data();
let capacity = Capacities::Array(original_data.len() + to_data.len());
for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
if list_array.is_null(row_index) {
offsets.push(offsets[row_index]);
valid.append(false);
continue;
}

// First array is the original array, second array is the element to replace with.
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &to_data],
false,
capacity,
);
let original_idx = 0;
let replace_idx = 1;

let mut counter = 0;
for (i, to_replace) in eq_array.iter().enumerate() {
if let Some(true) = to_replace {
mutable.extend(replace_idx, row_index, row_index + 1);
counter += 1;
if counter == *n {
// copy original data for any matches past n
mutable.extend(original_idx, i + 1, eq_array.len());
break;
}
} else {
// copy original data for false / null matches
mutable.extend(original_idx, i, i + 1);
}
}
let start = offset_window[0] as usize;
let end = offset_window[1] as usize;

let data = mutable.freeze();
let replaced_array = arrow_array::make_array(data);
let list_array_row = list_array.value(row_index);

offsets.push(last_offset + replaced_array.len() as i32);
new_values.push(replaced_array);
}
None => {
// Null element results in a null row (no new offsets)
offsets.push(last_offset);
// Compute all positions in list_row_array (that is itself an
// array) that are equal to `from_array_row`
let eq_array =
compare_element_to_list(&list_array_row, &from_array, row_index, true)?;

let original_idx = 0;
let replace_idx = 1;
let n = arr_n[row_index];
let mut counter = 0;

// All elements are false, no need to replace, just copy original data
if eq_array.false_count() == eq_array.len() {
mutable.extend(original_idx, start, end);
offsets.push(offsets[row_index] + (end - start) as i32);
valid.append(true);
continue;
}

for (i, to_replace) in eq_array.iter().enumerate() {
if let Some(true) = to_replace {
mutable.extend(replace_idx, row_index, row_index + 1);
counter += 1;
if counter == n {
// copy original data for any matches past n
mutable.extend(original_idx, start + i + 1, end);
break;
}
} else {
// copy original data for false / null matches
mutable.extend(original_idx, start + i, start + i + 1);
}
}

offsets.push(offsets[row_index] + (end - start) as i32);
valid.append(true);
}

let values = if new_values.is_empty() {
new_empty_array(&data_type)
} else {
let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
arrow::compute::concat(&new_values)?
};
let data = mutable.freeze();

Ok(Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type, true)),
Arc::new(Field::new("item", list_array.value_type(), true)),
OffsetBuffer::new(offsets.into()),
values,
list_array.nulls().cloned(),
arrow_array::make_array(data),
Some(NullBuffer::new(valid.finish())),
)?))
}

Expand Down

0 comments on commit 340ecfd

Please sign in to comment.