Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant unwrap in ScalarValue::new_primitive, return a Result #7830

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,21 +744,21 @@ macro_rules! eq_array_primitive {
}

impl ScalarValue {
/// Create a [`ScalarValue`] with the provided value and datatype
/// Create a [`Result<ScalarValue>`] with the provided value and datatype
///
/// # Panics
///
/// Panics if d is not compatible with T
pub fn new_primitive<T: ArrowPrimitiveType>(
a: Option<T::Native>,
d: &DataType,
) -> Self {
) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comments in this function also need to be updated (the 'panic's section is no longer correct)

    /// Create a [`ScalarValue`] with the provided value and datatype
    ///
    /// # Panics
    ///
    /// Panics if d is not compatible with T

Copy link
Contributor

@tustvold tustvold Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with_data_type can still panic, I personally don't see anything wrong with this FWIW, panic are a perfectly valid way to handle unexpected failure imo

Copy link
Contributor

@alamb alamb Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong with what? As in it is fine if this function panics and there is no need to propagate the error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its an exceptional case that would imply some sort of schema inconsistency within the running plan, so imo panicking is perfectly valid, but then so is returning an error. The broader point is the doc is still correct

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the point about the panics -- thank you

I don't have a strong opinion either way with regards to propagating errors vs panic'ing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb, I rewrote the comment and left a mention about panic. As @tustvold metioned with_data_type can panic.

match a {
None => d.try_into().unwrap(),
None => d.try_into(),
Some(v) => {
let array = PrimitiveArray::<T>::new(vec![v].into(), None)
.with_data_type(d.clone());
Self::try_from_array(&array, 0).unwrap()
Self::try_from_array(&array, 0)
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ where
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -356,7 +356,7 @@ where
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -517,7 +517,7 @@ where
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -638,11 +638,11 @@ where
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set
let state_out = {
let values: Vec<ScalarValue> = self
let values = self
.values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x), &T::DATA_TYPE))
.collect();
.collect::<Result<Vec<_>>>()?;

let arr = ScalarValue::new_list(&values, &T::DATA_TYPE);
vec![ScalarValue::List(arr)]
Expand Down Expand Up @@ -685,7 +685,7 @@ where
acc = acc ^ *distinct_value;
}
let v = (!self.values.is_empty()).then_some(acc);
Ok(ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE))
ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE)
}

fn size(&self) -> usize {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/aggregate/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ impl<T: ArrowNumericType> std::fmt::Debug for MedianAccumulator<T> {

impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
fn state(&self) -> Result<Vec<ScalarValue>> {
let all_values: Vec<ScalarValue> = self
let all_values = self
.all_values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x), &self.data_type))
.collect();
.collect::<Result<Vec<_>>>()?;

let arr = ScalarValue::new_list(&all_values, &self.data_type);
Ok(vec![ScalarValue::List(arr)])
Expand Down Expand Up @@ -188,7 +188,7 @@ impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
let (_, median, _) = d.select_nth_unstable_by(len / 2, cmp);
Some(*median)
};
Ok(ScalarValue::new_primitive::<T>(median, &self.data_type))
ScalarValue::new_primitive::<T>(median, &self.data_type)
}

fn size(&self) -> usize {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl<T: ArrowNumericType> Accumulator for SumAccumulator<T> {
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::new_primitive::<T>(self.sum, &self.data_type))
ScalarValue::new_primitive::<T>(self.sum, &self.data_type)
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -265,7 +265,7 @@ impl<T: ArrowNumericType> Accumulator for SlidingSumAccumulator<T> {

fn evaluate(&self) -> Result<ScalarValue> {
let v = (self.count != 0).then_some(self.sum);
Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
ScalarValue::new_primitive::<T>(v, &self.data_type)
}

fn size(&self) -> usize {
Expand Down
17 changes: 9 additions & 8 deletions datafusion/physical-expr/src/aggregate/sum_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set
let state_out = {
let mut distinct_values = Vec::new();
self.values.iter().for_each(|distinct_value| {
distinct_values.push(ScalarValue::new_primitive::<T>(
Some(distinct_value.0),
&self.data_type,
))
});
let distinct_values = self
.values
.iter()
.map(|value| {
ScalarValue::new_primitive::<T>(Some(value.0), &self.data_type)
})
.collect::<Result<Vec<_>>>()?;

vec![ScalarValue::List(ScalarValue::new_list(
&distinct_values,
&self.data_type,
Expand Down Expand Up @@ -206,7 +207,7 @@ impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
acc = acc.add_wrapping(distinct_value.0)
}
let v = (!self.values.is_empty()).then_some(acc);
Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
ScalarValue::new_primitive::<T>(v, &self.data_type)
}

fn size(&self) -> usize {
Expand Down