Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): POC faster min max accumulator #12677

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

devanbenz
Copy link
Contributor

Which issue does this PR close?

Closes #6906

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions functions labels Sep 30, 2024
@devanbenz devanbenz marked this pull request as draft September 30, 2024 01:28

let input_array = &values[0];

for (i, &group_index) in group_indices.iter().enumerate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think much of the filter logic is handled by accumulate_indices

accumulate_indices(
group_indices,
values.logical_nulls().as_ref(),
opt_filter,
|group_index| {
self.counts[group_index] += 1;
},
);

You could likely avoid much of this repetition (and likely it would be faster)

It woudl also be nice to avoid the duplication between min /max by using generics. Here is how the primitive one does it (passes in a comparison function)

https://github.com/apache/datafusion/blob/main/datafusion/functions-aggregate/src/min_max.rs#L119

Copy link
Contributor Author

@devanbenz devanbenz Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so I would probably want to do something like this?

accumulate_indices(group_indices, input_array.logical_nulls().as_ref(), opt_filter, |group_index| {
        let value = input_array.as_binary_view().value(i);
    
        let value_str = std::str::from_utf8(value).map_err(|e| {
            DataFusionError::Execution(format!(
                "could not build utf8 from binary view {}",
                e
            ))
        }).unwrap();
    
        if self.states[group_index].is_empty() {
            self.states[group_index] = value_str.to_string();
        } else {
            let curr_value_bytes = self.states[group_index].as_bytes();
            if value < curr_value_bytes {
                self.states[group_index] = value_str.parse().unwrap();
            }
        } 
});

And then make this generic. I.E. I can pass a generic function in instead of:

        if self.states[group_index].is_empty() {
            self.states[group_index] = value_str.to_string();
        } else {
            let curr_value_bytes = self.states[group_index].as_bytes();
            if value < curr_value_bytes {
                self.states[group_index] = value_str.parse().unwrap();
            }
        } 

Afterwards I can likely use a const generic for deciding how to down-cast here with string array or string view?

let value = input_array.as_binary_view().value(i);

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @devanbenz -- I think this is on the right track

What I suggest is:

  1. We start with StringArray (rather than StringViewArray)
  2. Run some benchmarks (clickbench) -- I can do this if it would help

All in all, thanks again

@devanbenz
Copy link
Contributor Author

@alamb thanks for taking a peek. Will go ahead and implement this for StringArray as well and modify the core functionality to lean on already existing methods + make it more generic so that MAX and MIN can share code. 🫡

@alamb
Copy link
Contributor

alamb commented Oct 1, 2024

BTW here is an example of using a const generic: #12703

let value = input_array.as_binary_view().value(i);

let value_str = std::str::from_utf8(value).map_err(|e| {
DataFusionError::Execution(format!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could replace with macro exec_err

return;
}

let value: &[u8] = if VIEW {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Optimize downcasts (remove them)

input_array.as_binary::<i32>().value(i)
};

let value_str = std::str::from_utf8(value)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Optimize from_utf8 and remove this validation

let mut builder = BinaryViewBuilder::new();

for i in 0..values.len() {
let value = input_array.as_binary_view().value(i);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Remove downcasts

@alamb
Copy link
Contributor

alamb commented Oct 3, 2024

Some other notes I had from our talkL

  1. Recommend you start with just support for StringArray (not StringView, LargeString, etc)
  2. We will then successively build up to the fully general solution
  3. The structure should likely be implemented in terms of StringArray (not ArrayRef)
  4. When creating the final output we'll need to do some unsafe to avoid re-checking all UTF8 values

Should be a hoot

Thanks again

@alamb
Copy link
Contributor

alamb commented Oct 6, 2024

@devanbenz -- I am interested in pushing this one along. If I found some time, would you mind if I spent some time tweaking it?

@devanbenz
Copy link
Contributor Author

devanbenz commented Oct 6, 2024

@devanbenz -- I am interested in pushing this one along. If I found some time, would you mind if I spent some time tweaking it?

Please feel free to, I've been spending some time this morning trying to get it working with just StringArray but am having issues. Probably just a fundamental misunderstanding of how this should be achieved (skill issue :P). I'm attempting to set the inner state to Vec<StringArray> instead of Vec<String> but I often find that I need to do some hacky things to convert to Binary and then back to StringArray especially in the evaluate method. I am using the following signature and when I do get it working its actually less performant then my original methodology :(

pub struct StringGroupsAccumulator<F> {
    states: Vec<StringArray>,
    fun: F,
}

impl<F> StringGroupsAccumulator<F>
where
    F: Fn(&[u8], &[u8]) -> bool + Send + Sync,
{
    pub fn new(s_fn: F) -> Self {
        Self {
            states: Vec::new(),
            fun: s_fn,
        }
    }
}

This is certainly a task that has made me realize how much more I need to learn about Arrow semantics 🫡 😆 and just memory operations in general. I definitely take on more of a 'hacker' way of coding like just throwing things at the editor until they work in instances like this where I don't fully understand the underlying mechanisms. I have so much more to learn 😮

@jayzhan211
Copy link
Contributor

jayzhan211 commented Oct 7, 2024

I'm attempting to set the inner state to Vec instead of Vec

Why replace Vec with Vec? 🤔 Doesn't Vec<String> represent the best current value for each group? Since we only need to store one value per group, why make the change?

@alamb
Copy link
Contributor

alamb commented Oct 7, 2024

I started hacking on this in #12792 -- not quite done yet but we are getting close

@alamb
Copy link
Contributor

alamb commented Oct 9, 2024

BTW I think #12792 has all the right code -- I need to write a few more tests, but it has the basic idea completed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
functions physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement fast min/max accumulator for binary / strings (now it uses the slower path)
3 participants