Skip to content

Commit

Permalink
Change FileScanConfig.table_partition_cols from `(String, DataType)…
Browse files Browse the repository at this point in the history
…` to `Field`s (apache#7890)

* feat: make data type of FileScanConfig.table_partition_cols a vector of Fields

* fix: avro test

* chore: Apply suggestions from code review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* chore: address review comments

* chore: remove uncessary to_owned

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
NGA-TRAN and alamb authored Oct 22, 2023
1 parent dae1efb commit ca5dc8c
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 34 deletions.
10 changes: 1 addition & 9 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,15 +746,7 @@ impl TableProvider for ListingTable {
.options
.table_partition_cols
.iter()
.map(|col| {
Ok((
col.0.to_owned(),
self.table_schema
.field_with_name(&col.0)?
.data_type()
.clone(),
))
})
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

let filters = if let Some(expr) = conjunction(filters.to_vec()) {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ mod tests {
statistics: Statistics::new_unknown(&file_schema),
file_schema,
limit: None,
table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
table_partition_cols: vec![Field::new("date", DataType::Utf8, false)],
output_ordering: vec![],
infinite_source: false,
});
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups)?;

// Add partition columns
config.table_partition_cols = vec![("date".to_owned(), DataType::Utf8)];
config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)];
config.file_groups[0][0].partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];

Expand Down
54 changes: 45 additions & 9 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct FileScanConfig {
/// all records after filtering are returned.
pub limit: Option<usize>,
/// The partitioning columns
pub table_partition_cols: Vec<(String, DataType)>,
pub table_partition_cols: Vec<Field>,
/// All equivalent lexicographical orderings that describe the schema.
pub output_ordering: Vec<LexOrdering>,
/// Indicates whether this plan may produce an infinite stream of records.
Expand Down Expand Up @@ -135,8 +135,7 @@ impl FileScanConfig {
table_cols_stats.push(self.statistics.column_statistics[idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
let (name, dtype) = &self.table_partition_cols[partition_idx];
table_fields.push(Field::new(name, dtype.to_owned(), false));
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::new_unknown())
}
Expand Down Expand Up @@ -501,10 +500,10 @@ mod tests {
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
vec![(
to_partition_cols(vec![(
"date".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
)],
)]),
);

let (proj_schema, proj_statistics, _) = conf.project();
Expand All @@ -527,6 +526,35 @@ mod tests {
assert_eq!(col_indices, None);
}

#[test]
fn physical_plan_config_no_projection_tab_cols_as_field() {
let file_schema = aggr_test_schema();

// make a table_partition_col as a field
let table_partition_col =
Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
.with_metadata(HashMap::from_iter(vec![(
"key_whatever".to_owned(),
"value_whatever".to_owned(),
)]));

let conf = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
vec![table_partition_col.clone()],
);

// verify the proj_schema inlcudes the last column and exactly the same the field it is defined
let (proj_schema, _proj_statistics, _) = conf.project();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
*proj_schema.field(file_schema.fields().len()),
table_partition_col,
"partition columns are the last columns and ust have all values defined in created field"
);
}

#[test]
fn physical_plan_config_with_projection() {
let file_schema = aggr_test_schema();
Expand All @@ -545,10 +573,10 @@ mod tests {
.collect(),
total_byte_size: Precision::Absent,
},
vec![(
to_partition_cols(vec![(
"date".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
)],
)]),
);

let (proj_schema, proj_statistics, _) = conf.project();
Expand Down Expand Up @@ -602,7 +630,7 @@ mod tests {
file_batch.schema().fields().len() + 2,
]),
Statistics::new_unknown(&file_batch.schema()),
partition_cols.clone(),
to_partition_cols(partition_cols.clone()),
);
let (proj_schema, ..) = conf.project();
// created a projector for that projected schema
Expand Down Expand Up @@ -747,7 +775,7 @@ mod tests {
file_schema: SchemaRef,
projection: Option<Vec<usize>>,
statistics: Statistics,
table_partition_cols: Vec<(String, DataType)>,
table_partition_cols: Vec<Field>,
) -> FileScanConfig {
FileScanConfig {
file_schema,
Expand All @@ -762,6 +790,14 @@ mod tests {
}
}

/// Convert partition columns from Vec<String DataType> to Vec<Field>
fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
table_partition_cols
.iter()
.map(|(name, dtype)| Field::new(name, dtype.clone(), false))
.collect::<Vec<_>>()
}

/// returns record batch with 3 columns of i32 in memory
pub fn build_table_i32(
a: (&str, &Vec<i32>),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl<F: FileOpener> FileStream<F> {
&config
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.map(|x| x.name().clone())
.collect::<Vec<_>>(),
);

Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1624,14 +1624,15 @@ mod tests {
projection: Some(vec![0, 1, 2, 12, 13]),
limit: None,
table_partition_cols: vec![
("year".to_owned(), DataType::Utf8),
("month".to_owned(), DataType::UInt8),
(
"day".to_owned(),
Field::new("year", DataType::Utf8, false),
Field::new("month", DataType::UInt8, false),
Field::new(
"day",
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
false,
),
],
output_ordering: vec![],
Expand Down
10 changes: 2 additions & 8 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::ops::Deref;
use std::sync::Arc;

use arrow::compute::SortOptions;
use arrow::datatypes::DataType;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
Expand Down Expand Up @@ -489,13 +488,8 @@ pub fn parse_protobuf_file_scan_config(
let table_partition_cols = proto
.table_partition_cols
.iter()
.map(|col| {
Ok((
col.to_owned(),
schema.field_with_name(col)?.data_type().clone(),
))
})
.collect::<Result<Vec<(String, DataType)>>>()?;
.map(|col| Ok(schema.field_with_name(col)?.clone()))
.collect::<Result<Vec<_>>>()?;

let mut output_ordering = vec![];
for node_collection in &proto.output_ordering {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
table_partition_cols: conf
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.map(|x| x.name().clone())
.collect::<Vec<_>>(),
object_store_url: conf.object_store_url.to_string(),
output_ordering: output_orderings
Expand Down

0 comments on commit ca5dc8c

Please sign in to comment.