Skip to content

Commit

Permalink
Deprecate ScalarValue::and, ScalarValue::or (#6842) (#6844)
Browse files Browse the repository at this point in the history
* Deprecate ScalarValue::and, ScalarValue::or (#6842)

* Review feedback
  • Loading branch information
tustvold authored Jul 6, 2023
1 parent eb03b4f commit a1281aa
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 39 deletions.
2 changes: 2 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2109,11 +2109,13 @@ impl ScalarValue {
impl_checked_op!(self, rhs, checked_sub, -)
}

#[deprecated(note = "Use arrow kernels or specialization (#6842)")]
pub fn and<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
let rhs = other.borrow();
impl_op!(self, rhs, &&)
}

#[deprecated(note = "Use arrow kernels or specialization (#6842)")]
pub fn or<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
let rhs = other.borrow();
impl_op!(self, rhs, ||)
Expand Down
65 changes: 26 additions & 39 deletions datafusion/physical-expr/src/aggregate/bool_and_or.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Defines physical expressions that can evaluated at runtime during query execution

use std::any::Any;
use std::convert::TryFrom;
use std::sync::Arc;

use crate::{AggregateExpr, PhysicalExpr};
Expand Down Expand Up @@ -161,7 +160,7 @@ impl AggregateExpr for BoolAnd {
}

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
Ok(Box::<BoolAndAccumulator>::default())
}

fn state_fields(&self) -> Result<Vec<Field>> {
Expand Down Expand Up @@ -199,7 +198,7 @@ impl AggregateExpr for BoolAnd {
}

fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
Ok(Box::<BoolAndAccumulator>::default())
}
}

Expand All @@ -217,25 +216,20 @@ impl PartialEq<dyn Any> for BoolAnd {
}
}

#[derive(Debug)]
#[derive(Debug, Default)]
struct BoolAndAccumulator {
bool_and: ScalarValue,
}

impl BoolAndAccumulator {
/// new bool_and accumulator
pub fn try_new(data_type: &DataType) -> Result<Self> {
Ok(Self {
bool_and: ScalarValue::try_from(data_type)?,
})
}
acc: Option<bool>,
}

impl Accumulator for BoolAndAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &values[0];
let delta = &bool_and_batch(values)?;
self.bool_and = self.bool_and.and(delta)?;
self.acc = match (self.acc, bool_and_batch(values)?) {
(None, ScalarValue::Boolean(v)) => v,
(Some(v), ScalarValue::Boolean(None)) => Some(v),
(Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b),
_ => unreachable!(),
};
Ok(())
}

Expand All @@ -244,16 +238,15 @@ impl Accumulator for BoolAndAccumulator {
}

fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.bool_and.clone()])
Ok(vec![ScalarValue::Boolean(self.acc)])
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(self.bool_and.clone())
Ok(ScalarValue::Boolean(self.acc))
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_and)
+ self.bool_and.size()
std::mem::size_of_val(self)
}
}

Expand Down Expand Up @@ -355,7 +348,7 @@ impl AggregateExpr for BoolOr {
}

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
Ok(Box::<BoolOrAccumulator>::default())
}

fn state_fields(&self) -> Result<Vec<Field>> {
Expand Down Expand Up @@ -393,7 +386,7 @@ impl AggregateExpr for BoolOr {
}

fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
Ok(Box::<BoolOrAccumulator>::default())
}
}

Expand All @@ -411,29 +404,24 @@ impl PartialEq<dyn Any> for BoolOr {
}
}

#[derive(Debug)]
#[derive(Debug, Default)]
struct BoolOrAccumulator {
bool_or: ScalarValue,
}

impl BoolOrAccumulator {
/// new bool_or accumulator
pub fn try_new(data_type: &DataType) -> Result<Self> {
Ok(Self {
bool_or: ScalarValue::try_from(data_type)?,
})
}
acc: Option<bool>,
}

impl Accumulator for BoolOrAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.bool_or.clone()])
Ok(vec![ScalarValue::Boolean(self.acc)])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &values[0];
let delta = bool_or_batch(values)?;
self.bool_or = self.bool_or.or(&delta)?;
self.acc = match (self.acc, bool_or_batch(values)?) {
(None, ScalarValue::Boolean(v)) => v,
(Some(v), ScalarValue::Boolean(None)) => Some(v),
(Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b),
_ => unreachable!(),
};
Ok(())
}

Expand All @@ -442,12 +430,11 @@ impl Accumulator for BoolOrAccumulator {
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(self.bool_or.clone())
Ok(ScalarValue::Boolean(self.acc))
}

fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_or)
+ self.bool_or.size()
std::mem::size_of_val(self)
}
}

Expand Down

0 comments on commit a1281aa

Please sign in to comment.