Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScalarValue::List not working as expected in UDAF state #8472

Closed
rspears74 opened this issue Dec 8, 2023 · 8 comments · Fixed by #8562
Closed

ScalarValue::List not working as expected in UDAF state #8472

rspears74 opened this issue Dec 8, 2023 · 8 comments · Fixed by #8562
Labels
bug Something isn't working

Comments

@rspears74
Copy link
Contributor

rspears74 commented Dec 8, 2023

Describe the bug

I am trying to use ScalarValue::List to store f64 values for the state in a UDAF. It was working well in datafusion version 32, but I upgraded to datafusion 33, only to find that the API had changed for ScalarValue::List to accept an ArrayRef rather than an Option<Vec<ScalarValue>>. I thought I had converted my code correctly, but I'm getting the following error:

Error: External(Internal("Inconsistent types in ScalarValue::iter_to_array. Expected Float64, got List([PrimitiveArray<Float64>\n[\n  43.0,\n  1.0,\n  43.0,\n  1.0,\n  43.0,\n  1.0,\n  43.0,\n  1.0,\n  43.0,\n  3.0,\n  ...180 elements...,\n  62.0,\n  3.0,\n  62.0,\n  2.0,\n  63.0,\n  1.0,\n  63.0,\n  1.0,\n  63.0,\n  1.0,\n]])"))

First of all, I can't tell exactly where this error is coming from (and RUST_BACKTRACE=1 doesn't do anything), even after adding some log statements at the beginning and end of each Accumulator function (what I mean by this is that this error doesn't seem to be occurring "in the middle" of any of the Accumulator functions I've implemented).

I am serializing my list via ScalarValue::List(Arc::new(Float64Array::from(floats))) where floats: Vec<f64>
And deserializing my list via something like this:

if let ScalarValue::List(float_vals) = &v[0] {
  let floats: Vec<f64> = float_vals
    .as_any()
    .downcast_ref::<Float64Array>()
    .expect("Failed to downcast")
    .values()
    .collect()
}

Finally, my state_type in the create_udaf function is:

Arc::new(vec![
    DataType::List(Field::new(
        "item",
        DataType::Float64,
        true
    ).into())

Via logging, I seem to get successful calls to state and update_batch (update_batch doesn't concern itself with the serialized state and probably isn't part of the issue). If the issue was in the state deserialization I'd expect to see a log statement from the beginning of merge_batch but not one from the end, but I'm not seeing any of my merge_batch log statements.

I'm not sure if I'm making a mistake somewhere, or if this is a bug. But it seems like somewhere in the guts of the Accumulator, something is not working correctly. Happy to help fix the bug if one can be identified.

To Reproduce

Define a UDAF Accumulator that uses ScalarValue::List to serialize its state, and use the aggregate function.

Expected behavior

Successful aggregation of the values into a new dataframe column.

Additional context

No response

@rspears74 rspears74 added the bug Something isn't working label Dec 8, 2023
@alamb
Copy link
Contributor

alamb commented Dec 11, 2023

Maybe @jayzhan211 or @Weijun-H have some thoughts

@jayzhan211
Copy link
Contributor

jayzhan211 commented Dec 12, 2023

@rspears74

ScalarValue::List(Arc::new(Float64Array::from(floats)))

Note that ScalarValue::List only accept ListArray. In your example, it should be something like ListArray(Float64Array::from(floats)) where ListArray is length = 1.

Btw, ScalarValue::Float64 is somethinkg like Float64Array::from(floats) but length of floats is one.

@rspears74
Copy link
Contributor Author

The docs say that it accepts an Arc<dyn Array>, and it compiles as I have it written

@rspears74
Copy link
Contributor Author

Alright, so after constructing a ListArray with my Float64Array as its values, it works! But it's certainly not clear from the docs that you have to pass in a ListArray of length 1. I would think there should be some way to enforce that outside of failing in a random spot at runtime.

@jayzhan211
Copy link
Contributor

Agree, I had thought about it but did not come out a nice solution. The problem is that ScalarValue is enum not a function created with new(), so we can only let it panic somewhere...

@rspears74
Copy link
Contributor Author

rspears74 commented Dec 12, 2023

Understood. Can the signature of ScalarValue::List not just be changed from ArrayRef to Arc<ListArray> (similar for FixedSizeList and LargeList)? That at least solves half the problem. I'm not trying to tell anyone how to maintain their project, but if this was my project, I would not be okay leaving ScalarValue::List like this. More people are going to run into issues around this in the future.

@alamb
Copy link
Contributor

alamb commented Dec 12, 2023

Understood. Can the signature of ScalarValue::List not just be changed from ArrayRef to Arc<ListArray> (similar for FixedSizeList and LargeList)? That at least solves half the problem. I'm not trying to tell anyone how to maintain their project, but if this was my project, I would not be okay leaving ScalarValue::List like this. More people are going to run into issues around this in the future.

I think this would be a very interesting idea to pursue. Thank you for the suggestion @rspears74

Are you (or @jayzhan211 ) interested in doing so? If not, that is fine, and I will file another ticket with a more approachable description. However, if one of you who already has this context is going to do so I won't bother writing it up more carefully :)

@rspears74
Copy link
Contributor Author

I'll have to familiarize myself with the contribution guide, but I'd certainly like to take a whack at this @alamb

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants