Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure parquet schema arg is propagated to IR #19084

Merged
merged 7 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/read/options.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use arrow::datatypes::ArrowSchemaRef;
use polars_core::schema::SchemaRef;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetOptions {
pub schema: Option<ArrowSchemaRef>,
pub schema: Option<SchemaRef>,
pub parallel: ParallelStrategy,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change ParquetOptions::schema to hold our native schema type, this reduces excess copy/conversions as that is the type we receive at input, and also the type we give to the IR. The conversion to arrow schema is now done below in to_alp_impl and stored directly to the FileInfo::reader_schema field

pub low_memory: bool,
pub use_statistics: bool,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.low_memory,
self.args.cloud_options,
self.args.use_statistics,
self.args.schema.as_deref(),
self.args.schema,
self.args.hive_options,
self.args.glob,
self.args.include_file_paths,
Expand Down
12 changes: 2 additions & 10 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ impl ParquetExec {
// Modified if we have a negative slice
let mut first_source = 0;

let first_schema = self
.options
.schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());
let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Expand Down Expand Up @@ -262,11 +258,7 @@ impl ParquetExec {
eprintln!("POLARS PREFETCH_SIZE: {}", batch_size)
}

let first_schema = self
.options
.schema
.clone()
.unwrap_or_else(|| self.file_info.reader_schema.clone().unwrap().unwrap_left());
let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,7 @@ impl ParquetSource {
}
let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async();

let first_schema = options
.schema
.clone()
.unwrap_or_else(|| file_info.reader_schema.clone().unwrap().unwrap_left());
let first_schema = file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = file_options.with_columns.as_deref() {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl DslBuilder {
low_memory: bool,
cloud_options: Option<CloudOptions>,
use_statistics: bool,
schema: Option<&Schema>,
schema: Option<SchemaRef>,
hive_options: HiveOptions,
glob: bool,
include_file_paths: Option<PlSmallStr>,
Expand All @@ -109,7 +109,7 @@ impl DslBuilder {
file_options: options,
scan_type: FileScan::Parquet {
options: ParquetOptions {
schema: schema.map(|x| Arc::new(x.to_arrow(CompatLevel::newest()))),
schema,
parallel,
low_memory,
use_statistics,
Expand Down
31 changes: 22 additions & 9 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,31 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
let mut file_info = match &mut scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet {
options,
cloud_options,
metadata,
..
} => {
let (file_info, md) = scans::parquet_file_info(
&sources,
&file_options,
cloud_options.as_ref(),
)
.map_err(|e| e.context(failed_here!(parquet scan)))?;
*metadata = md;
file_info
if let Some(schema) = &options.schema {
// We were passed a schema, we don't have to call `parquet_file_info`,
// but this does mean we don't have `row_estimation` and `first_metadata`.
FileInfo {
schema: schema.clone(),
reader_schema: Some(either::Either::Left(Arc::new(
schema.to_arrow(CompatLevel::newest()),
))),
row_estimation: (None, 0),
}
} else {
let (file_info, md) = scans::parquet_file_info(
&sources,
&file_options,
cloud_options.as_ref(),
)
.map_err(|e| e.context(failed_here!(parquet scan)))?;

*metadata = md;
file_info
}
},
#[cfg(feature = "ipc")]
FileScan::Ipc {
Expand Down
13 changes: 1 addition & 12 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,7 @@ impl ComputeNode for ParquetSourceNode {
eprintln!("[ParquetSource]: {:?}", &self.config);
}

self.schema = Some(
self.options
.schema
.take()
.unwrap_or_else(|| self.file_info.reader_schema.take().unwrap().unwrap_left()),
);

{
// Ensure these are not used anymore
self.options.schema.take();
self.file_info.reader_schema.take();
}
self.schema = Some(self.file_info.reader_schema.take().unwrap().unwrap_left());

self.init_projected_arrow_schema();
self.physical_predicate = self.predicate.clone().map(phys_expr_to_io_expr);
Expand Down
12 changes: 12 additions & 0 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,18 @@ def test_parquet_schema_arg(
pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),
)

# Issue #19081: If a schema arg is passed, ensure its fields are propagated
# to the IR, otherwise even if `allow_missing_columns=True`, downstream
# `select()`s etc. will fail with ColumnNotFound if the column is not in
# the first file.
lf = pl.scan_parquet(
paths, parallel=parallel, schema=schema, allow_missing_columns=True
).select("1")

s = lf.collect(streaming=streaming).to_series()
assert s.len() == 2
assert s.null_count() == 2

# Test files containing extra columns not in `schema`

schema: dict[str, type[pl.DataType]] = {"a": pl.Int64} # type: ignore[no-redef]
Expand Down
Loading