diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 485ab0a902c2..bd878932d80f 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -746,15 +746,7 @@ impl TableProvider for ListingTable { .options .table_partition_cols .iter() - .map(|col| { - Ok(( - col.0.to_owned(), - self.table_schema - .field_with_name(&col.0)? - .data_type() - .clone(), - )) - }) + .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) .collect::>>()?; let filters = if let Some(expr) = conjunction(filters.to_vec()) { diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index f08bc9b8df20..237772eb8360 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -420,7 +420,7 @@ mod tests { statistics: Statistics::new_unknown(&file_schema), file_schema, limit: None, - table_partition_cols: vec![("date".to_owned(), DataType::Utf8)], + table_partition_cols: vec![Field::new("date", DataType::Utf8, false)], output_ordering: vec![], infinite_source: false, }); diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index f3b2fa9de7a9..e60a249b0b4a 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -871,7 +871,7 @@ mod tests { let mut config = partitioned_csv_config(file_schema, file_groups)?; // Add partition columns - config.table_partition_cols = vec![("date".to_owned(), DataType::Utf8)]; + config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)]; config.file_groups[0][0].partition_values = vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))]; diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index c1a19b745b8d..d8a9697b2bf7 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -101,7 +101,7 @@ pub struct FileScanConfig { /// all records after filtering are returned. pub limit: Option, /// The partitioning columns - pub table_partition_cols: Vec<(String, DataType)>, + pub table_partition_cols: Vec, /// All equivalent lexicographical orderings that describe the schema. pub output_ordering: Vec, /// Indicates whether this plan may produce an infinite stream of records. @@ -135,8 +135,7 @@ impl FileScanConfig { table_cols_stats.push(self.statistics.column_statistics[idx].clone()) } else { let partition_idx = idx - self.file_schema.fields().len(); - let (name, dtype) = &self.table_partition_cols[partition_idx]; - table_fields.push(Field::new(name, dtype.to_owned(), false)); + table_fields.push(self.table_partition_cols[partition_idx].to_owned()); // TODO provide accurate stat for partition column (#1186) table_cols_stats.push(ColumnStatistics::new_unknown()) } @@ -501,10 +500,10 @@ mod tests { Arc::clone(&file_schema), None, Statistics::new_unknown(&file_schema), - vec![( + to_partition_cols(vec![( "date".to_owned(), wrap_partition_type_in_dict(DataType::Utf8), - )], + )]), ); let (proj_schema, proj_statistics, _) = conf.project(); @@ -527,6 +526,35 @@ mod tests { assert_eq!(col_indices, None); } + #[test] + fn physical_plan_config_no_projection_tab_cols_as_field() { + let file_schema = aggr_test_schema(); + + // make a table_partition_col as a field + let table_partition_col = + Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true) + .with_metadata(HashMap::from_iter(vec![( + "key_whatever".to_owned(), + "value_whatever".to_owned(), + )])); + + let conf = config_for_projection( + Arc::clone(&file_schema), + None, + Statistics::new_unknown(&file_schema), + vec![table_partition_col.clone()], + ); + + // verify the proj_schema inlcudes the last column and exactly the same the field it is defined + let (proj_schema, _proj_statistics, _) = conf.project(); + assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1); + assert_eq!( + *proj_schema.field(file_schema.fields().len()), + table_partition_col, + "partition columns are the last columns and ust have all values defined in created field" + ); + } + #[test] fn physical_plan_config_with_projection() { let file_schema = aggr_test_schema(); @@ -545,10 +573,10 @@ mod tests { .collect(), total_byte_size: Precision::Absent, }, - vec![( + to_partition_cols(vec![( "date".to_owned(), wrap_partition_type_in_dict(DataType::Utf8), - )], + )]), ); let (proj_schema, proj_statistics, _) = conf.project(); @@ -602,7 +630,7 @@ mod tests { file_batch.schema().fields().len() + 2, ]), Statistics::new_unknown(&file_batch.schema()), - partition_cols.clone(), + to_partition_cols(partition_cols.clone()), ); let (proj_schema, ..) = conf.project(); // created a projector for that projected schema @@ -747,7 +775,7 @@ mod tests { file_schema: SchemaRef, projection: Option>, statistics: Statistics, - table_partition_cols: Vec<(String, DataType)>, + table_partition_cols: Vec, ) -> FileScanConfig { FileScanConfig { file_schema, @@ -762,6 +790,14 @@ mod tests { } } + /// Convert partition columns from Vec to Vec + fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec { + table_partition_cols + .iter() + .map(|(name, dtype)| Field::new(name, dtype.clone(), false)) + .collect::>() + } + /// returns record batch with 3 columns of i32 in memory pub fn build_table_i32( a: (&str, &Vec), diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 22ff3f42ebda..a715f6e8e3cd 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -259,7 +259,7 @@ impl FileStream { &config .table_partition_cols .iter() - .map(|x| x.0.clone()) + .map(|x| x.name().clone()) .collect::>(), ); diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index e59686453f0e..6cab27b0846c 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1624,14 +1624,15 @@ mod tests { projection: Some(vec![0, 1, 2, 12, 13]), limit: None, table_partition_cols: vec![ - ("year".to_owned(), DataType::Utf8), - ("month".to_owned(), DataType::UInt8), - ( - "day".to_owned(), + Field::new("year", DataType::Utf8, false), + Field::new("month", DataType::UInt8, false), + Field::new( + "day", DataType::Dictionary( Box::new(DataType::UInt16), Box::new(DataType::Utf8), ), + false, ), ], output_ordering: vec![], diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index ff02b8052136..cdc772f71d80 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -22,7 +22,6 @@ use std::ops::Deref; use std::sync::Arc; use arrow::compute::SortOptions; -use arrow::datatypes::DataType; use datafusion::arrow::datatypes::Schema; use datafusion::datasource::listing::{FileRange, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -489,13 +488,8 @@ pub fn parse_protobuf_file_scan_config( let table_partition_cols = proto .table_partition_cols .iter() - .map(|col| { - Ok(( - col.to_owned(), - schema.field_with_name(col)?.data_type().clone(), - )) - }) - .collect::>>()?; + .map(|col| Ok(schema.field_with_name(col)?.clone())) + .collect::>>()?; let mut output_ordering = vec![]; for node_collection in &proto.output_ordering { diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index db97c393255a..466b99b68472 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -729,7 +729,7 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf { table_partition_cols: conf .table_partition_cols .iter() - .map(|x| x.0.clone()) + .map(|x| x.name().clone()) .collect::>(), object_store_url: conf.object_store_url.to_string(), output_ordering: output_orderings