Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate Final Mode requires invalid columns when working in multi partitions #7128

Closed
mustafasrepo opened this issue Jul 28, 2023 · 0 comments · Fixed by #7129
Closed

Aggregate Final Mode requires invalid columns when working in multi partitions #7128

mustafasrepo opened this issue Jul 28, 2023 · 0 comments · Fixed by #7129
Labels
bug Something isn't working

Comments

@mustafasrepo
Copy link
Contributor

Describe the bug

Assume we following tables.

statement ok
CREATE TABLE exchange_rates (
    sn INTEGER,
    ts TIMESTAMP,
    currency_from VARCHAR(3),
    currency_to VARCHAR(3),
    rate DECIMAL(10,2)
) as VALUES
    (0, '2022-01-01 06:00:00'::timestamp, 'EUR', 'USD', 1.10),
    (1, '2022-01-01 08:00:00'::timestamp, 'TRY', 'USD', 0.10),
    (2, '2022-01-01 11:30:00'::timestamp, 'EUR', 'USD', 1.12),
    (3, '2022-01-02 12:00:00'::timestamp, 'TRY', 'USD', 0.11),
    (4, '2022-01-03 10:00:00'::timestamp, 'EUR', 'USD', 1.12)

and

statement ok
CREATE TABLE sales_global (zip_code INT,
          country VARCHAR(3),
          sn INT,
          ts TIMESTAMP,
          currency VARCHAR(3),
          amount FLOAT
        ) as VALUES
          (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
          (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
          (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
          (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
          (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0),
          (0, 'GRC', 4, '2022-01-03 10:00:00'::timestamp, 'EUR', 80.0)

The following query

SELECT ARRAY_AGG(e.rate ORDER BY e.sn)
FROM sales_global AS s
JOIN exchange_rates AS e
ON s.currency = e.currency_from AND
   e.currency_to = 'USD' AND
   s.ts >= e.ts
GROUP BY s.sn
ORDER BY s.sn;

gives the error. When working in multi partitions.
External error: query failed: DataFusion error: Arrow error: Compute error: Sort not supported for list type Decimal128(10, 2)

I have examined the bug. Above query produces following physical plan

+   ProjectionExec: expr=[ARRAY_AGG(e.rate) ORDER BY [e.sn ASC NULLS LAST]@0 as ARRAY_AGG(e.rate)]
+   --SortPreservingMergeExec: [sn@1 ASC NULLS LAST]
+   ----SortExec: expr=[sn@1 ASC NULLS LAST]
+   ------ProjectionExec: expr=[ARRAY_AGG(e.rate) ORDER BY [e.sn ASC NULLS LAST]@1 as ARRAY_AGG(e.rate), sn@0 as sn]
+   --------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn], aggr=[ARRAY_AGG(e.rate)]
+   ----------SortExec: expr=[sn@1 ASC NULLS LAST]
+   ------------CoalesceBatchesExec: target_batch_size=8192
+   --------------RepartitionExec: partitioning=Hash([sn@0], 8), input_partitions=8
+   ----------------AggregateExec: mode=Partial, gby=[sn@0 as sn], aggr=[ARRAY_AGG(e.rate)], ordering_mode=None
+   ------------------SortExec: expr=[sn@1 ASC NULLS LAST]
+   --------------------ProjectionExec: expr=[sn@0 as sn, sn@3 as sn, rate@6 as rate]
+   ----------------------CoalesceBatchesExec: target_batch_size=8192
+   ------------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(currency@2, currency_from@2)], filter=ts@0 >= ts@1
+   --------------------------CoalesceBatchesExec: target_batch_size=8192
+   ----------------------------RepartitionExec: partitioning=Hash([currency@2], 8), input_partitions=8
+   ------------------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+   --------------------------------MemoryExec: partitions=1, partition_sizes=[1]
+   --------------------------CoalesceBatchesExec: target_batch_size=8192
+   ----------------------------RepartitionExec: partitioning=Hash([currency_from@2], 8), input_partitions=8
+   ------------------------------ProjectionExec: expr=[sn@0 as sn, ts@1 as ts, currency_from@2 as currency_from, rate@4 as rate]
+   --------------------------------CoalesceBatchesExec: target_batch_size=8192
+   ----------------------------------FilterExec: currency_to@3 = USD
+   ------------------------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]

The reason of the error is that AggregateExec: mode=FinalPartitioned requires its input to be sorted by e.sn as in AggregateExec: mode=Partial. Column sn@1 is valid for the input AggregateExec: mode=Partial. However, this column is no longer valid for AggregateExec: mode=FinalPartitioned. at index 1 we have array_agg result.

To Reproduce

No response

Expected behavior

No response

Additional context

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant