Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of nested leaf columns in parallel parquet writer #8923

Merged
merged 3 commits into from
Jan 22, 2024

Conversation

devinjdangelo
Copy link
Contributor

Which issue does this PR close?

Closes #8851
Closes #8853

Rationale for this change

See issues above. Parallel parquet writer causes various errors/panics when used with nested columns.

What changes are included in this PR?

I identified the issue in this function which is supposed to send the appropriate arrow arrays to the correct column serialization workers:

https://github.com/apache/arrow-datafusion/blob/95e739cb605307d3337c54ef3f0ab8c72cca5717/datafusion/core/src/datasource/file_format/parquet.rs#L883-L902

The outer loop iterates over the "col_array_channels". This works when there are no nested columns (i.e. the inner loop only ever iterates once), but it is incorrect when there are nested columns.

The varying errors reported are explained by this bug since a few different things can go wrong here:

  • The wrong array of the wrong type is sent to a column serializer
  • The same column serializer is sent too many rows
  • A given column serializer is sent zero rows

This PR fixes this function so that it properly sends nested columns to the correct column serializer.

Are these changes tested?

Yes, copy.slt now includes tests with various column types at various levels of nesting with structs and arrays

Are there any user-facing changes?

No

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Jan 20, 2024
@devinjdangelo
Copy link
Contributor Author

cc @alamb @tustvold

Are there any additional tests which we should add to improve coverage for the parallel writer?

@devinjdangelo devinjdangelo mentioned this pull request Jan 20, 2024
3 tasks
@@ -408,7 +408,7 @@ config_namespace! {
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
/// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
pub allow_single_file_parallelism: bool, default = false
pub allow_single_file_parallelism: bool, default = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could choose to leave this feature disabled by default if we wish to merge this before the next release. That would give us more time to ensure there are no other issues and improve test coverage further.

Copy link
Contributor

@alamb alamb Jan 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are about to release 35.0.0 which would not include this change. See voting thread here: https://lists.apache.org/thread/onbs8l0w5s7693fchpyvwwgh61gf1jf8

Thus I think it would actually be best to turn this default to true so the code gets maximum "bake time" from people working off main before a release

==> This PR looks good to me

@alamb
Copy link
Contributor

alamb commented Jan 21, 2024

Are there any additional tests which we should add to improve coverage for the parallel writer?

I think if we could including tests that write lists of structs and structs of lists that would also be good

Perhaps like

select [struct('foo', 1), struct('bar', 2)];
+-----------------------------------------------------------------------+
| make_array(struct(Utf8("foo"),Int64(1)),struct(Utf8("bar"),Int64(2))) |
+-----------------------------------------------------------------------+
| [{c0: foo, c1: 1}, {c0: bar, c1: 2}]                                  |
+-----------------------------------------------------------------------+
1 row in set. Query took 0.002 seconds.
select struct('foo', [1,2,3], struct('bar', [2,3,4]));
+-----------------------------------------------------------------------------------------------------------------------+
| struct(Utf8("foo"),make_array(Int64(1),Int64(2),Int64(3)),struct(Utf8("bar"),make_array(Int64(2),Int64(3),Int64(4)))) |
+-----------------------------------------------------------------------------------------------------------------------+
| {c0: foo, c1: [1, 2, 3], c2: {c0: bar, c1: [2, 3, 4]}}                                                                |
+-----------------------------------------------------------------------------------------------------------------------+
1 row in set. Query took 0.003 seconds.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- this looks great to me.

I have a suggestion for some additional tests, which we can add to this PR or we can also add it as a follow on PRs

@@ -408,7 +408,7 @@ config_namespace! {
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
/// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
pub allow_single_file_parallelism: bool, default = false
pub allow_single_file_parallelism: bool, default = true
Copy link
Contributor

@alamb alamb Jan 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are about to release 35.0.0 which would not include this change. See voting thread here: https://lists.apache.org/thread/onbs8l0w5s7693fchpyvwwgh61gf1jf8

Thus I think it would actually be best to turn this default to true so the code gets maximum "bake time" from people working off main before a release

==> This PR looks good to me

.map_err(|_| {
DataFusionError::Internal("Unable to send array to writer!".into())
})?;
next_channel += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the idea that there can be more than one array can be sent per field ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per top level field, yes. The "Field" referenced in the outer loop is only top level, non-nested fields. The compute_leaves function does the recursive iteration of all nested fields for each top level field.

There is actually an independent channel / parallel serializer for every nested (not only top level) field. So, we must advance the channel we are sending to on every iteration of compute_leaves not just on every iteration of a new top level field.

@alamb
Copy link
Contributor

alamb commented Jan 22, 2024

Thanks again @devinjdangelo

@alamb alamb merged commit 38d5f75 into apache:main Jan 22, 2024
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
2 participants