diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index b0769df1e9db..2011247346e0 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2109,11 +2109,13 @@ impl ScalarValue { impl_checked_op!(self, rhs, checked_sub, -) } + #[deprecated(note = "Use arrow kernels or specialization (#6842)")] pub fn and>(&self, other: T) -> Result { let rhs = other.borrow(); impl_op!(self, rhs, &&) } + #[deprecated(note = "Use arrow kernels or specialization (#6842)")] pub fn or>(&self, other: T) -> Result { let rhs = other.borrow(); impl_op!(self, rhs, ||) diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs b/datafusion/physical-expr/src/aggregate/bool_and_or.rs index bbab4dfce660..e444dc61ee1b 100644 --- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs +++ b/datafusion/physical-expr/src/aggregate/bool_and_or.rs @@ -18,7 +18,6 @@ //! Defines physical expressions that can evaluated at runtime during query execution use std::any::Any; -use std::convert::TryFrom; use std::sync::Arc; use crate::{AggregateExpr, PhysicalExpr}; @@ -161,7 +160,7 @@ impl AggregateExpr for BoolAnd { } fn create_accumulator(&self) -> Result> { - Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?)) + Ok(Box::::default()) } fn state_fields(&self) -> Result> { @@ -199,7 +198,7 @@ impl AggregateExpr for BoolAnd { } fn create_sliding_accumulator(&self) -> Result> { - Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?)) + Ok(Box::::default()) } } @@ -217,25 +216,20 @@ impl PartialEq for BoolAnd { } } -#[derive(Debug)] +#[derive(Debug, Default)] struct BoolAndAccumulator { - bool_and: ScalarValue, -} - -impl BoolAndAccumulator { - /// new bool_and accumulator - pub fn try_new(data_type: &DataType) -> Result { - Ok(Self { - bool_and: ScalarValue::try_from(data_type)?, - }) - } + acc: Option, } impl Accumulator for BoolAndAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; - let delta = &bool_and_batch(values)?; - self.bool_and = self.bool_and.and(delta)?; + self.acc = match (self.acc, bool_and_batch(values)?) { + (None, ScalarValue::Boolean(v)) => v, + (Some(v), ScalarValue::Boolean(None)) => Some(v), + (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b), + _ => unreachable!(), + }; Ok(()) } @@ -244,16 +238,15 @@ impl Accumulator for BoolAndAccumulator { } fn state(&self) -> Result> { - Ok(vec![self.bool_and.clone()]) + Ok(vec![ScalarValue::Boolean(self.acc)]) } fn evaluate(&self) -> Result { - Ok(self.bool_and.clone()) + Ok(ScalarValue::Boolean(self.acc)) } fn size(&self) -> usize { - std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_and) - + self.bool_and.size() + std::mem::size_of_val(self) } } @@ -355,7 +348,7 @@ impl AggregateExpr for BoolOr { } fn create_accumulator(&self) -> Result> { - Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?)) + Ok(Box::::default()) } fn state_fields(&self) -> Result> { @@ -393,7 +386,7 @@ impl AggregateExpr for BoolOr { } fn create_sliding_accumulator(&self) -> Result> { - Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?)) + Ok(Box::::default()) } } @@ -411,29 +404,24 @@ impl PartialEq for BoolOr { } } -#[derive(Debug)] +#[derive(Debug, Default)] struct BoolOrAccumulator { - bool_or: ScalarValue, -} - -impl BoolOrAccumulator { - /// new bool_or accumulator - pub fn try_new(data_type: &DataType) -> Result { - Ok(Self { - bool_or: ScalarValue::try_from(data_type)?, - }) - } + acc: Option, } impl Accumulator for BoolOrAccumulator { fn state(&self) -> Result> { - Ok(vec![self.bool_or.clone()]) + Ok(vec![ScalarValue::Boolean(self.acc)]) } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; - let delta = bool_or_batch(values)?; - self.bool_or = self.bool_or.or(&delta)?; + self.acc = match (self.acc, bool_or_batch(values)?) { + (None, ScalarValue::Boolean(v)) => v, + (Some(v), ScalarValue::Boolean(None)) => Some(v), + (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b), + _ => unreachable!(), + }; Ok(()) } @@ -442,12 +430,11 @@ impl Accumulator for BoolOrAccumulator { } fn evaluate(&self) -> Result { - Ok(self.bool_or.clone()) + Ok(ScalarValue::Boolean(self.acc)) } fn size(&self) -> usize { - std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_or) - + self.bool_or.size() + std::mem::size_of_val(self) } }