Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Accumulator size function performance (fix regression on clickbench) #5325

Closed
comphead opened this issue Feb 17, 2023 · 16 comments · Fixed by #5377
Closed

Optimize Accumulator size function performance (fix regression on clickbench) #5325

comphead opened this issue Feb 17, 2023 · 16 comments · Fixed by #5377
Labels
enhancement New feature or request good first issue Good for newcomers

Comments

@comphead
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
During regression benchmarks it was found that DISTINCT queries has a performance drop. The analysis showed the size function implementation for DistinctCountAccumulator is inefficient.

Describe the solution you'd like
Need to improve size function or the way how number of bytes is collected

Describe alternatives you've considered
None

Additional context
Analysis details #5313
Original benchmarks ticket #5276

@comphead comphead added the enhancement New feature or request label Feb 17, 2023
@comphead
Copy link
Contributor Author

@alamb @Dandandan @crepererum

@alamb
Copy link
Contributor

alamb commented Feb 18, 2023

Thank you @comphead

I think this would be relatively straightforward to implement for the common case (the one in this code) of fixed size values

Basically instead of looping over all scalar values to count their sizes (vec.iter()) we could special case by type for fixed size types

https://github.com/apache/arrow-datafusion/blob/c5108aef4d2660cce950976af14d33444f27075e/datafusion/common/src/scalar.rs#L2367-L2374

Instead add some code that checks "if is ScalarType::Int8, UInt8, etc then size = size[0]*vec.len()"

This would be a good first issue I think -- a good result and straightforward implementation

@alamb alamb added the good first issue Good for newcomers label Feb 18, 2023
@alamb alamb changed the title Optimize Accumulator size function performance Optimize Accumulator size function performance (fix regression on clickbench) Feb 18, 2023
@comphead
Copy link
Contributor Author

@alamb I'll take this ticket if not other volunteers, also I want to play a bit if we need super accurate size, probably we can do approx size which will serve to get the structure size with minor inaccuracy but will be faster

@alamb
Copy link
Contributor

alamb commented Feb 20, 2023

Thanks @comphead

@Dandandan
Copy link
Contributor

@alamb I'll take this ticket if not other volunteers, also I want to play a bit if we need super accurate size, probably we can do approx size which will serve to get the structure size with minor inaccuracy but will be faster

Maybe the size can be accumulated as well during updates.

@alamb
Copy link
Contributor

alamb commented Feb 20, 2023

Maybe the size can be accumulated as well during updates.

This is an excellent idea that might work very well for variable length structures (like strings)

Though in general the distinct accumulators are going to be fairly poor at any high cardinality usecase (as they store all values as ScalarValue). It would be great to make them better, but probably as another separate project

@comphead
Copy link
Contributor Author

comphead commented Feb 22, 2023

that was not that trivial I expected, so I have ran some experiments.

  • Update size incrementally for upcoming batch only -> doesn't seem to be a solution as we do not know in advance which hashes already counted and which are not. Expenses on calculating is higher than benefit.
  • Increase batch size from 8k to 65k improves the query speed 6 times(size() depends on batch size and gets called less often)
  • remove state constituents from size() improves 20%
  • approx size(first scalar value size * len()) improves up to 10 times, but not accurate size for variable length, like strings

@alamb @Dandandan let me know your thoughts

@crepererum
Copy link
Contributor

Update size incrementally for upcoming batch only -> doesn't seem to be a solution as we do not know in advance which hashes already counted and which are not. Expenses on calculating is higher than benefit.

Is the problem the self.values.insert(...) bit? HashSet sadly doesn't have an entry API (IIRC), but you can use a HashMap<K, ()> instead so you don't need to double-hash.

@alamb
Copy link
Contributor

alamb commented Feb 23, 2023

approx size(first scalar value size * len()) improves up to 10 times, but not accurate size for variable length, like strings

Thank you for looking into this @comphead

I think we should use this approach for fixed length (non variable length) data -- it will solve the performance regression we saw for clickbench

In terms of handling variable length data more efficiently, I am not sure it is worth a lot of time optimizing the size() implementation because my guess is that the size() calculation will be a smaller portion of the overall runtime for high cardinality string columns (each one will be an allocated string, for example).

I think a separate project to handle COUNT DISTINCT(high_cardinality_string_column) is probably needed

Thus I propose:

  1. Implement the simplest thing that fixes the clickbench performance regression
  2. File a follow on ticket to track further improving the performance of COUNT DISTINCT queries (I can do this)

@yjshen
Copy link
Member

yjshen commented Feb 25, 2023

For the further improvement @alamb mentioned, we could rewrite an aggregate query with distinct aggregations into an expanded double aggregation. This would also eliminate the need for Vec<ScalarValue> to be stored in an accumulator. Spark does this in RewriteDistinctAggregates.

Before

Aggregate(
   key = ['key]
   functions = [COUNT(DISTINCT 'cat1),
                COUNT(DISTINCT 'cat2),
                sum('value)]
   output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  LocalTableScan [...]

After rewrite:

Aggregate(
   key = ['key]
   functions = [count('cat1) FILTER (WHERE 'gid = 1),
                count('cat2) FILTER (WHERE 'gid = 2),
                first('total) ignore nulls FILTER (WHERE 'gid = 0)]
   output = ['key, 'cat1_cnt, 'cat2_cnt, 'total])
  Aggregate(
     key = ['key, 'cat1, 'cat2, 'gid]
     functions = [sum('value)]
     output = ['key, 'cat1, 'cat2, 'gid, 'total])
    Expand(
       projections = [('key, null, null, 0, cast('value as bigint)),
                      ('key, 'cat1, null, 1, null),
                      ('key, null, 'cat2, 2, null)]
       output = ['key, 'cat1, 'cat2, 'gid, 'value])
      LocalTableScan [...]

@alamb
Copy link
Contributor

alamb commented Feb 25, 2023

This would also eliminate the need for Vec to be stored in an accumulator. Spark does this in RewriteDistinctAggregates.

Thanks -- this is a neat idea @yjshen

One challenge I have seen with this approach in the past is it will result in a "diamond shaped plan" (where the same input stream is split into two output streams (to the different aggregates) and then brought back together. In general, this approach may required unbounded buffering if using sort based aggregation.

But I think would definitely be worth considering

@Dandandan
Copy link
Contributor

Another approach might be speeding up the accumulator.

It might be worth looking at the dictionary and parquet interner implementations from @tustvold in arrow-rs, and use a similar approach here which should have non-trivial performance impact.

@comphead
Copy link
Contributor Author

comphead commented Mar 3, 2023

Another approach might be speeding up the accumulator.

It might be worth looking at the dictionary and parquet interner implementations from @tustvold in arrow-rs, and use a similar approach here which should have non-trivial performance impact.

Thanks @Dandandan if you could provide a link would be great!

@alamb
Copy link
Contributor

alamb commented Mar 3, 2023

If people are serious about wanting to improve the performance of the aggregators in general, I think we should consider combining our efforts as there are several people who seem interested and I think the work will collide if not done carefully.

Here is one ticket that tracks ideas: #4973

@yjshen
Copy link
Member

yjshen commented Mar 4, 2023

One challenge I have seen with this approach in the past is it will result in a "diamond shaped plan" (where the same input stream is split into two output streams (to the different aggregates) and then brought back together. In general, this approach may required unbounded buffering if using sort based aggregation.

Could you please elaborate more on 1. why two output streams are generated and 2. why it requires unbounded buffering? Thanks!

@alamb
Copy link
Contributor

alamb commented Mar 5, 2023

Hi @yjshen

Here was my understanding of what you were proposing, which shows the diamond I am referring to.

I may be misunderstanding your proposal;

SELECT date, 
  COUNT(DISTINCT x), 
  COUNT(DISITNCT y)
FROM
  t;
                      ┌─────────────────┐                         
                      │ Combine somehow │   Maybe Join? Could     
                      │    (on date)    │   also be some more     
                      │                 │   optimized version     
                      └─────────────────┘                         
                               ▲                                  
                               │                                  
                               │                                  
            ┌──────────────────┴─────────────────────┐            
            │                                        │            
            │                                        │            
            │                                        │            
┌───────────────────────┐                ┌───────────────────────┐
│     HashAggregate     │                │     HashAggregate     │
│       gby: date       │                │       gby: date       │
│     agg: COUNT(x)     │                │     agg: COUNT(y)     │
└───────────────────────┘                └───────────────────────┘
            ▲                                        ▲            
            │                                        │            
            │                                        │            
┌───────────────────────┐                ┌───────────────────────┐
│     HashAggregate     │                │     HashAggregate     │
│     gby: date, x      │                │     gby: date, y      │
│      agg: <NONE>      │                │      agg: <NONE>      │
└───────────────────────┘                └───────────────────────┘
            ▲                                        ▲            
            │                                        │            
            │                                        │            
            └───────────────────┬────────────────────┘            
                                │                                 
                                │                                 
                                │                                 
                    ┌───────────────────────┐                     
                    │         Scan          │                     
                    └───────────────────────┘                     

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants