From 190497cd4e89a37cd8ef10681d4278c4ed188ae4 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sat, 13 Jan 2024 12:23:06 -0500 Subject: [PATCH 01/24] Start setting up SchemaRef --- datafusion/common/src/dfschema.rs | 148 +++++++++++++++++++----------- 1 file changed, 92 insertions(+), 56 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 85b97aac037d..d6d2b0bc0ae7 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -106,10 +106,11 @@ pub type DFSchemaRef = Arc; /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { - /// Fields - fields: Vec, - /// Additional metadata in form of key value pairs - metadata: HashMap, + /// Inner Arrow schema reference. + inner: SchemaRef, + /// Optional qualifiers for each column in this schema. In the same order as + /// the `self.inner.fields()` + field_qualifiers: Vec>, /// Stores functional dependencies in the schema. functional_dependencies: FunctionalDependencies, } @@ -118,61 +119,96 @@ impl DFSchema { /// Creates an empty `DFSchema` pub fn empty() -> Self { Self { - fields: vec![], - metadata: HashMap::new(), + inner: Arc::new(Schema::new([])), + field_qualifiers: vec![], functional_dependencies: FunctionalDependencies::empty(), } } - #[deprecated(since = "7.0.0", note = "please use `new_with_metadata` instead")] - /// Create a new `DFSchema` - pub fn new(fields: Vec) -> Result { - Self::new_with_metadata(fields, HashMap::new()) - } - - /// Create a new `DFSchema` - pub fn new_with_metadata( - fields: Vec, - metadata: HashMap, + /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier + pub fn from_qualified_schema<'a>( + qualifier: impl Into>, + schema: &SchemaRef, ) -> Result { - let mut qualified_names = HashSet::new(); - let mut unqualified_names = HashSet::new(); - - for field in &fields { - if let Some(qualifier) = field.qualifier() { - qualified_names.insert((qualifier, field.name())); - } else if !unqualified_names.insert(field.name()) { - return _schema_err!(SchemaError::DuplicateUnqualifiedField { - name: field.name().to_string(), - }); - } - } + let qualifier = qualifier.into(); + let new_self = Self { + inner: schema.clone(), + field_qualifiers: vec![Some(qualifier.clone()); schema.fields().len()], + functional_dependencies: FunctionalDependencies::empty(), + }; + // new_self.check_names()?; + Ok(new_self) + } - // check for mix of qualified and unqualified field with same unqualified name - // note that we need to sort the contents of the HashSet first so that errors are - // deterministic - let mut qualified_names = qualified_names - .iter() - .map(|(l, r)| (l.to_owned(), r.to_owned())) - .collect::>(); - qualified_names.sort(); - for (qualifier, name) in &qualified_names { - if unqualified_names.contains(name) { - return _schema_err!(SchemaError::AmbiguousReference { - field: Column { - relation: Some((*qualifier).clone()), - name: name.to_string(), - } - }); - } - } - Ok(Self { - fields, - metadata, + /// Create a `DFSchema` from an Arrow where all fields have no qualifier. + pub fn from_unqualified_schema<'a>(schema: &SchemaRef) -> Result { + let new_self = Self { + inner: schema.clone(), + field_qualifiers: vec![None; schema.fields.len()], functional_dependencies: FunctionalDependencies::empty(), - }) + }; + // new_self.check_names()?; + Ok(new_self) } + // fn check_names(&self) -> Result<()> { + // let mut qualified_names = HashSet::new(); + // let mut unqualified_names = HashSet::new(); + // + // for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) { + // + // } + // } + + // #[deprecated(since = "7.0.0", note = "please use `new_with_metadata` instead")] + /// Create a new `DFSchema` + // pub fn new(fields: Vec) -> Result { + // Self::new_with_metadata(fields, HashMap::new()) + // } + + /// Create a new `DFSchema` + // pub fn new_with_metadata( + // fields: Vec, + // metadata: HashMap, + // ) -> Result { + // let mut qualified_names = HashSet::new(); + // let mut unqualified_names = HashSet::new(); + // + // for field in &fields { + // if let Some(qualifier) = field.qualifier() { + // qualified_names.insert((qualifier, field.name())); + // } else if !unqualified_names.insert(field.name()) { + // return _schema_err!(SchemaError::DuplicateUnqualifiedField { + // name: field.name().to_string(), + // }); + // } + // } + // + // // check for mix of qualified and unqualified field with same unqualified name + // // note that we need to sort the contents of the HashSet first so that errors are + // // deterministic + // let mut qualified_names = qualified_names + // .iter() + // .map(|(l, r)| (l.to_owned(), r.to_owned())) + // .collect::>(); + // qualified_names.sort(); + // for (qualifier, name) in &qualified_names { + // if unqualified_names.contains(name) { + // return _schema_err!(SchemaError::AmbiguousReference { + // field: Column { + // relation: Some((*qualifier).clone()), + // name: name.to_string(), + // } + // }); + // } + // } + // Ok(Self { + // fields, + // metadata, + // functional_dependencies: FunctionalDependencies::empty(), + // }) + // } + /// Create a `DFSchema` from an Arrow schema and a given qualifier /// /// To create a schema from an Arrow schema without a qualifier, use @@ -197,7 +233,7 @@ impl DFSchema { mut self, functional_dependencies: FunctionalDependencies, ) -> Result { - if functional_dependencies.is_valid(self.fields.len()) { + if functional_dependencies.is_valid(self.inner.fields.len()) { self.functional_dependencies = functional_dependencies; Ok(self) } else { @@ -211,11 +247,11 @@ impl DFSchema { /// Create a new schema that contains the fields from this schema followed by the fields /// from the supplied schema. An error will be returned if there are duplicate field names. pub fn join(&self, schema: &DFSchema) -> Result { - let mut fields = self.fields.clone(); - let mut metadata = self.metadata.clone(); - fields.extend_from_slice(schema.fields().as_slice()); - metadata.extend(schema.metadata.clone()); - Self::new_with_metadata(fields, metadata) + // let mut fields = self.fields.clone(); + // let mut metadata = self.metadata.clone(); + // fields.extend_from_slice(schema.fields().as_slice()); + // metadata.extend(schema.metadata.clone()); + // Self::new_with_metadata(fields, metadata) } /// Modify this schema by appending the fields from the supplied schema, ignoring any From b9fd992fa9bbb730295a2a7a06a833d4d4107cfd Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sun, 14 Jan 2024 11:56:46 -0500 Subject: [PATCH 02/24] Start updating DFSchema --- datafusion/common/src/dfschema.rs | 206 ++++++++---------------------- datafusion/common/src/lib.rs | 2 +- 2 files changed, 51 insertions(+), 157 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index d6d2b0bc0ae7..d2b536c6f2e8 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -247,11 +247,20 @@ impl DFSchema { /// Create a new schema that contains the fields from this schema followed by the fields /// from the supplied schema. An error will be returned if there are duplicate field names. pub fn join(&self, schema: &DFSchema) -> Result { - // let mut fields = self.fields.clone(); - // let mut metadata = self.metadata.clone(); - // fields.extend_from_slice(schema.fields().as_slice()); - // metadata.extend(schema.metadata.clone()); - // Self::new_with_metadata(fields, metadata) + let (new_field_qualifiers, new_fields) = self + .iter() + .chain(schema.iter()) + .map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone())) + .unzip(); + + let mut new_metadata = self.inner.metadata.clone(); + new_metadata.extend(schema.inner.metadata.clone()); + + let new_self = Self { + inner: Arc::new(Schema::new_with_metadata(new_fields, new_metadata)), + field_qualifiers: new_field_qualifiers, + }; + Ok(new_self) } /// Modify this schema by appending the fields from the supplied schema, ignoring any @@ -275,14 +284,14 @@ impl DFSchema { } /// Get a list of fields - pub fn fields(&self) -> &Vec { - &self.fields + pub fn fields(&self) -> &Fields { + &self.inner.fields } /// Returns an immutable reference of a specific `Field` instance selected using an /// offset within the internal `fields` vector - pub fn field(&self, i: usize) -> &DFField { - &self.fields[i] + pub fn field(&self, i: usize) -> &Field { + &self.inner.fields[i] } #[deprecated(since = "8.0.0", note = "please use `index_of_column_by_name` instead")] @@ -385,15 +394,16 @@ impl DFSchema { &self, qualifier: &TableReference, ) -> Vec { - self.fields - .iter() - .enumerate() - .filter_map(|(idx, field)| { - field - .qualifier() - .and_then(|q| q.eq(qualifier).then_some(idx)) - }) - .collect() + // self.inner + // .fields + // .iter() + // .enumerate() + // .filter_map(|(idx, field)| { + // field + // .qualifier() + // .and_then(|q| q.eq(qualifier).then_some(idx)) + // }) + // .collect() } /// Find all fields match the given name @@ -450,7 +460,7 @@ impl DFSchema { } /// Find the field with the given qualified column - pub fn field_from_column(&self, column: &Column) -> Result<&DFField> { + pub fn field_from_column(&self, column: &Column) -> Result<&Field> { match &column.relation { Some(r) => self.field_with_qualified_name(r, &column.name), None => self.field_with_unqualified_name(&column.name), @@ -484,7 +494,8 @@ impl DFSchema { /// Check to see if unqualified field names matches field names in Arrow schema pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool { - self.fields + self.inner + .fields .iter() .zip(arrow_schema.fields().iter()) .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name()) @@ -677,21 +688,29 @@ impl DFSchema { /// Get list of fully-qualified field names in this schema pub fn field_names(&self) -> Vec { - self.fields - .iter() - .map(|f| f.qualified_name()) + self.iter() + .map(|qualifier, field| qualified_name(qualifier, field)) .collect::>() } /// Get metadata of this schema pub fn metadata(&self) -> &HashMap { - &self.metadata + &self.inner.metadata } /// Get functional dependencies pub fn functional_dependencies(&self) -> &FunctionalDependencies { &self.functional_dependencies } + + pub fn iter<'a>( + &'a self, + ) -> impl Iterator, &'a FieldRef)> { + self.field_qualifiers + .iter() + .zip(self.inner.fields().iter()) + .map(|(qualifier, field)| (qualifier.as_ref(), field)) + } } impl From for Schema { @@ -836,138 +855,6 @@ impl ExprSchema for DFSchema { } } -/// DFField wraps an Arrow field and adds an optional qualifier -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DFField { - /// Optional qualifier (usually a table or relation name) - qualifier: Option, - /// Arrow field definition - field: FieldRef, -} - -impl DFField { - /// Creates a new `DFField` - pub fn new>( - qualifier: Option, - name: &str, - data_type: DataType, - nullable: bool, - ) -> Self { - DFField { - qualifier: qualifier.map(|s| s.into()), - field: Arc::new(Field::new(name, data_type, nullable)), - } - } - - /// Convenience method for creating new `DFField` without a qualifier - pub fn new_unqualified(name: &str, data_type: DataType, nullable: bool) -> Self { - DFField { - qualifier: None, - field: Arc::new(Field::new(name, data_type, nullable)), - } - } - - /// Create a qualified field from an existing Arrow field - pub fn from_qualified<'a>( - qualifier: impl Into>, - field: impl Into, - ) -> Self { - Self { - qualifier: Some(qualifier.into().to_owned_reference()), - field: field.into(), - } - } - - /// Returns an immutable reference to the `DFField`'s unqualified name - pub fn name(&self) -> &String { - self.field.name() - } - - /// Returns an immutable reference to the `DFField`'s data-type - pub fn data_type(&self) -> &DataType { - self.field.data_type() - } - - /// Indicates whether this `DFField` supports null values - pub fn is_nullable(&self) -> bool { - self.field.is_nullable() - } - - pub fn metadata(&self) -> &HashMap { - self.field.metadata() - } - - /// Returns a string to the `DFField`'s qualified name - pub fn qualified_name(&self) -> String { - if let Some(qualifier) = &self.qualifier { - format!("{}.{}", qualifier, self.field.name()) - } else { - self.field.name().to_owned() - } - } - - /// Builds a qualified column based on self - pub fn qualified_column(&self) -> Column { - Column { - relation: self.qualifier.clone(), - name: self.field.name().to_string(), - } - } - - /// Builds an unqualified column based on self - pub fn unqualified_column(&self) -> Column { - Column { - relation: None, - name: self.field.name().to_string(), - } - } - - /// Get the optional qualifier - pub fn qualifier(&self) -> Option<&OwnedTableReference> { - self.qualifier.as_ref() - } - - /// Get the arrow field - pub fn field(&self) -> &FieldRef { - &self.field - } - - /// Return field with qualifier stripped - pub fn strip_qualifier(mut self) -> Self { - self.qualifier = None; - self - } - - /// Return field with nullable specified - pub fn with_nullable(mut self, nullable: bool) -> Self { - let f = self.field().as_ref().clone().with_nullable(nullable); - self.field = f.into(); - self - } - - /// Return field with new metadata - pub fn with_metadata(mut self, metadata: HashMap) -> Self { - let f = self.field().as_ref().clone().with_metadata(metadata); - self.field = f.into(); - self - } -} - -impl From for DFField { - fn from(value: FieldRef) -> Self { - Self { - qualifier: None, - field: value, - } - } -} - -impl From for DFField { - fn from(value: Field) -> Self { - Self::from(Arc::new(value)) - } -} - /// DataFusion-specific extensions to [`Schema`]. pub trait SchemaExt { /// This is a specialized version of Eq that ignores differences @@ -1020,6 +907,13 @@ impl SchemaExt for Schema { } } +fn qualified_name(qualifier: &Option, name: &str) -> String { + match qualifier { + Some(q) => format!("{}.{}", q, name), + None => name.to_string(), + } +} + #[cfg(test)] mod tests { use crate::assert_contains; diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index ed547782e4a5..e898e946842e 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -45,7 +45,7 @@ pub mod utils; /// Reexport arrow crate pub use arrow; pub use column::Column; -pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema}; +pub use dfschema::{DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema}; pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, SharedResult, From 93c4a1ca7ea7e59e3ac77c99014cf2c3ec3be9d0 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 15 Jan 2024 13:54:55 -0500 Subject: [PATCH 03/24] More updates to df schema --- datafusion/common/src/dfschema.rs | 144 +++++++++++------------------- 1 file changed, 54 insertions(+), 90 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index d2b536c6f2e8..54b05128e9bb 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -255,10 +255,13 @@ impl DFSchema { let mut new_metadata = self.inner.metadata.clone(); new_metadata.extend(schema.inner.metadata.clone()); + self.functional_dependencies + .extend(schema.functional_dependencies); let new_self = Self { inner: Arc::new(Schema::new_with_metadata(new_fields, new_metadata)), field_qualifiers: new_field_qualifiers, + functional_dependencies: self.functional_dependencies.clone(), }; Ok(new_self) } @@ -269,10 +272,10 @@ impl DFSchema { if other_schema.fields.is_empty() { return; } - for field in other_schema.fields() { + for (qualifier, field) in other_schema.iter() { // skip duplicate columns - let duplicated_field = match field.qualifier() { - Some(q) => self.has_column_with_qualified_name(q, field.name()), + let duplicated_field = match qualifier { + Some(q) => self.has_column_with_qualified_name(qualifier, field.name()), // for unqualified columns, check as unqualified name None => self.has_column_with_unqualified_name(field.name()), }; @@ -373,7 +376,7 @@ impl DFSchema { &self, qualifier: Option<&TableReference>, name: &str, - ) -> Result<&DFField> { + ) -> Result<&Field> { if let Some(qualifier) = qualifier { self.field_with_qualified_name(qualifier, name) } else { @@ -382,11 +385,8 @@ impl DFSchema { } /// Find all fields having the given qualifier - pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&DFField> { - self.fields - .iter() - .filter(|field| field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false)) - .collect() + pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> { + self.iter().filter(|(q, f)| q == qualifier).collect() } /// Find all fields indices having the given qualifier @@ -394,56 +394,24 @@ impl DFSchema { &self, qualifier: &TableReference, ) -> Vec { - // self.inner - // .fields - // .iter() - // .enumerate() - // .filter_map(|(idx, field)| { - // field - // .qualifier() - // .and_then(|q| q.eq(qualifier).then_some(idx)) - // }) - // .collect() + self.iter() + .enumerate() + .filter(|(idx, (q, field))| qualifier == q) + .collect() } /// Find all fields match the given name - pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> { - self.fields - .iter() - .filter(|field| field.name() == name) - .collect() + pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { + self.iter().filter(|field| field.name() == name).collect() } /// Find the field with the given name - pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> { - let matches = self.fields_with_unqualified_name(name); - match matches.len() { - 0 => Err(unqualified_field_not_found(name, self)), - 1 => Ok(matches[0]), - _ => { - // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem. - // Because name may generate from Alias/... . It means that it don't own qualifier. - // For example: - // Join on id = b.id - // Project a.id as id TableScan b id - // In this case, there isn't `ambiguous name` problem. When `matches` just contains - // one field without qualifier, we should return it. - let fields_without_qualifier = matches - .iter() - .filter(|f| f.qualifier.is_none()) - .collect::>(); - if fields_without_qualifier.len() == 1 { - Ok(fields_without_qualifier[0]) - } else { - _schema_err!(SchemaError::AmbiguousReference { - field: Column { - relation: None, - name: name.to_string(), - }, - }) - } - } - } + pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> { + let field = self + .iter() + .filter(|(q, f)| name == f.name()) + .map(|(q, f)| f); + Ok(field) } /// Find the field with the given qualified name @@ -451,12 +419,12 @@ impl DFSchema { &self, qualifier: &TableReference, name: &str, - ) -> Result<&DFField> { - let idx = self - .index_of_column_by_name(Some(qualifier), name)? - .ok_or_else(|| field_not_found(Some(qualifier.to_string()), name, self))?; - - Ok(self.field(idx)) + ) -> Result<&Field> { + let field = self + .iter() + .find(|(q, f)| q == qualifier && name == f.name()) + .map(|(q, f)| f); + Ok(field) } /// Find the field with the given qualified column @@ -663,33 +631,21 @@ impl DFSchema { /// Strip all field qualifier in schema pub fn strip_qualifiers(self) -> Self { - DFSchema { - fields: self - .fields - .into_iter() - .map(|f| f.strip_qualifier()) - .collect(), - ..self - } + self.field_qualifiers = vec![None; self.inner.fields.len()]; + self } /// Replace all field qualifier with new value in schema pub fn replace_qualifier(self, qualifier: impl Into) -> Self { let qualifier = qualifier.into(); - DFSchema { - fields: self - .fields - .into_iter() - .map(|f| DFField::from_qualified(qualifier.clone(), f.field)) - .collect(), - ..self - } + self.field_qualifiers = vec![qualifier; self.inner.fields().len()]; + self } /// Get list of fully-qualified field names in this schema pub fn field_names(&self) -> Vec { self.iter() - .map(|qualifier, field| qualified_name(qualifier, field)) + .map(|(qualifier, field)| qualified_name(&qualifier, field)) .collect::>() } @@ -733,14 +689,12 @@ impl From<&DFSchema> for Schema { impl TryFrom for DFSchema { type Error = DataFusionError; fn try_from(schema: Schema) -> Result { - Self::new_with_metadata( - schema - .fields() - .iter() - .map(|f| DFField::from(f.clone())) - .collect(), - schema.metadata().clone(), - ) + let dfschema = Self { + inner: schema, + field_qualifiers: vec![None, schema.fields.len()], + functional_dependencies: FunctionalDependencies::empty(), + }; + Ok(dfschema) } } @@ -753,8 +707,8 @@ impl From for SchemaRef { // Hashing refers to a subset of fields considered in PartialEq. impl Hash for DFSchema { fn hash(&self, state: &mut H) { - self.fields.hash(state); - self.metadata.len().hash(state); // HashMap is not hashable + self.inner.fields.hash(state); + self.inner.metadata.len().hash(state); // HashMap is not hashable } } @@ -789,9 +743,18 @@ impl ToDFSchema for SchemaRef { } } -impl ToDFSchema for Vec { +impl ToDFSchema for Vec { fn to_dfschema(self) -> Result { - DFSchema::new_with_metadata(self, HashMap::new()) + let schema = Schema { + fields: self.into(), + metadata: HashMap::new(), + }; + let dfschema = DFSchema { + inner: schema.into(), + field_qualifiers: vec![None; self.len()], + functional_dependencies: FunctionalDependencies::empty(), + }; + Ok(dfschema) } } @@ -800,12 +763,13 @@ impl Display for DFSchema { write!( f, "fields:[{}], metadata:{:?}", - self.fields + self.inner + .fields .iter() .map(|field| field.qualified_name()) .collect::>() .join(", "), - self.metadata + self.inner.metadata ) } } From 92a2a457ee62c32c38514374152b1b1ff1382964 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 17 Jan 2024 10:40:49 -0500 Subject: [PATCH 04/24] More updates --- datafusion/common/src/dfschema.rs | 139 ++++++++++++++++-------------- 1 file changed, 75 insertions(+), 64 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 54b05128e9bb..89ccb02a95ae 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -34,6 +34,7 @@ use crate::{ use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; +use arrow_schema::SchemaBuilder; /// A reference-counted reference to a [DFSchema]. pub type DFSchemaRef = Arc; @@ -218,14 +219,12 @@ impl DFSchema { schema: &Schema, ) -> Result { let qualifier = qualifier.into(); - Self::new_with_metadata( - schema - .fields() - .iter() - .map(|f| DFField::from_qualified(qualifier.clone(), f.clone())) - .collect(), - schema.metadata().clone(), - ) + let schema = DFSchema { + inner: schema.clone().into(), + field_qualifiers: vec![Some(qualifier); schema.fields.len()], + functional_dependencies: FunctionalDependencies::empty(), + }; + Ok(schema) } /// Assigns functional dependencies. @@ -269,21 +268,27 @@ impl DFSchema { /// Modify this schema by appending the fields from the supplied schema, ignoring any /// duplicate fields. pub fn merge(&mut self, other_schema: &DFSchema) { - if other_schema.fields.is_empty() { + if other_schema.inner.fields.is_empty() { return; } + let mut schema_builder = SchemaBuilder::from(self.inner.fields); for (qualifier, field) in other_schema.iter() { // skip duplicate columns let duplicated_field = match qualifier { - Some(q) => self.has_column_with_qualified_name(qualifier, field.name()), + Some(q) => self.has_column_with_qualified_name(q, field.name()), // for unqualified columns, check as unqualified name None => self.has_column_with_unqualified_name(field.name()), }; if !duplicated_field { - self.fields.push(field.clone()); + // self.inner.fields.push(field.clone()); + schema_builder.push(field.clone()) } } - self.metadata.extend(other_schema.metadata.clone()) + let finished = schema_builder.finish(); + self.inner = finished.into(); + self.inner + .metadata + .extend(other_schema.inner.metadata.clone()); } /// Get a list of fields @@ -297,31 +302,31 @@ impl DFSchema { &self.inner.fields[i] } - #[deprecated(since = "8.0.0", note = "please use `index_of_column_by_name` instead")] + // #[deprecated(since = "8.0.0", note = "please use `index_of_column_by_name` instead")] /// Find the index of the column with the given unqualified name - pub fn index_of(&self, name: &str) -> Result { - for i in 0..self.fields.len() { - if self.fields[i].name() == name { - return Ok(i); - } else { - // Now that `index_of` is deprecated an error is thrown if - // a fully qualified field name is provided. - match &self.fields[i].qualifier { - Some(qualifier) => { - if (qualifier.to_string() + "." + self.fields[i].name()) == name { - return _plan_err!( - "Fully qualified field name '{name}' was supplied to `index_of` \ - which is deprecated. Please use `index_of_column_by_name` instead" - ); - } - } - None => (), - } - } - } - - Err(unqualified_field_not_found(name, self)) - } + // pub fn index_of(&self, name: &str) -> Result { + // for i in 0..self.fields.len() { + // if self.fields[i].name() == name { + // return Ok(i); + // } else { + // // Now that `index_of` is deprecated an error is thrown if + // // a fully qualified field name is provided. + // match &self.fields[i].qualifier { + // Some(qualifier) => { + // if (qualifier.to_string() + "." + self.fields[i].name()) == name { + // return _plan_err!( + // "Fully qualified field name '{name}' was supplied to `index_of` \ + // which is deprecated. Please use `index_of_column_by_name` instead" + // ); + // } + // } + // None => (), + // } + // } + // } + // + // Err(unqualified_field_not_found(name, self)) + // } pub fn index_of_column_by_name( &self, @@ -329,21 +334,18 @@ impl DFSchema { name: &str, ) -> Result> { let mut matches = self - .fields .iter() .enumerate() - .filter(|(_, field)| match (qualifier, &field.qualifier) { + .filter(|(i, (q, f))| match (qualifier, q) { // field to lookup is qualified. // current field is qualified and not shared between relations, compare both // qualifier and name. - (Some(q), Some(field_q)) => { - q.resolved_eq(field_q) && field.name() == name - } + (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name, // field to lookup is qualified but current field is unqualified. (Some(qq), None) => { // the original field may now be aliased with a name that matches the // original qualified name - let column = Column::from_qualified_name(field.name()); + let column = Column::from_qualified_name(f.name()); match column { Column { relation: Some(r), @@ -353,7 +355,7 @@ impl DFSchema { } } // field to lookup is unqualified, no need to compare qualifier - (None, Some(_)) | (None, None) => field.name() == name, + (None, Some(_)) | (None, None) => f.name() == name, }) .map(|(idx, _)| idx); Ok(matches.next()) @@ -386,7 +388,12 @@ impl DFSchema { /// Find all fields having the given qualifier pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> { - self.iter().filter(|(q, f)| q == qualifier).collect() + let fields = self + .iter() + .filter(|(q, f)| q.map(|q| q == qualifier).unwrap_or(false)) + .map(|(_, f)| f.into()) + .collect(); + fields } /// Find all fields indices having the given qualifier @@ -446,10 +453,8 @@ impl DFSchema { qualifier: &TableReference, name: &str, ) -> bool { - self.fields().iter().any(|field| { - field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false) - && field.name() == name - }) + self.iter() + .any(|(q, f)| q.map(|q| q == qualifier).unwrap_or(false) && f.name() == name) } /// Find if the field exists with the given qualified column @@ -631,21 +636,29 @@ impl DFSchema { /// Strip all field qualifier in schema pub fn strip_qualifiers(self) -> Self { - self.field_qualifiers = vec![None; self.inner.fields.len()]; - self + let stripped_schema = DFSchema { + inner: self.inner.clone(), + field_qualifiers: vec![None; self.inner.fields.len()], + functional_dependencies: self.functional_dependencies.clone(), + }; + stripped_schema } /// Replace all field qualifier with new value in schema pub fn replace_qualifier(self, qualifier: impl Into) -> Self { let qualifier = qualifier.into(); - self.field_qualifiers = vec![qualifier; self.inner.fields().len()]; - self + let replaced_schema = DFSchema { + inner: self.inner.clone(), + field_qualifiers: vec![Some(qualifier); self.inner.fields.len()], + functional_dependencies: self.functional_dependencies.clone(), + }; + replaced_schema } /// Get list of fully-qualified field names in this schema pub fn field_names(&self) -> Vec { self.iter() - .map(|(qualifier, field)| qualified_name(&qualifier, field)) + .map(|(qualifier, field)| qualified_name(qualifier, field.name())) .collect::>() } @@ -672,16 +685,16 @@ impl DFSchema { impl From for Schema { /// Convert DFSchema into a Schema fn from(df_schema: DFSchema) -> Self { - let fields: Fields = df_schema.fields.into_iter().map(|f| f.field).collect(); - Schema::new_with_metadata(fields, df_schema.metadata) + let fields: Fields = df_schema.inner.fields.clone(); + Schema::new_with_metadata(fields, df_schema.inner.metadata) } } impl From<&DFSchema> for Schema { /// Convert DFSchema reference into a Schema fn from(df_schema: &DFSchema) -> Self { - let fields: Fields = df_schema.fields.iter().map(|f| f.field.clone()).collect(); - Schema::new_with_metadata(fields, df_schema.metadata.clone()) + let fields: Fields = df_schema.inner.fields.clone(); + Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) } } @@ -690,8 +703,8 @@ impl TryFrom for DFSchema { type Error = DataFusionError; fn try_from(schema: Schema) -> Result { let dfschema = Self { - inner: schema, - field_qualifiers: vec![None, schema.fields.len()], + inner: schema.into(), + field_qualifiers: vec![None; schema.fields.len()], functional_dependencies: FunctionalDependencies::empty(), }; Ok(dfschema) @@ -763,10 +776,8 @@ impl Display for DFSchema { write!( f, "fields:[{}], metadata:{:?}", - self.inner - .fields - .iter() - .map(|field| field.qualified_name()) + self.iter() + .map(|(q, f)| qualified_name(q, f.name())) .collect::>() .join(", "), self.inner.metadata @@ -871,7 +882,7 @@ impl SchemaExt for Schema { } } -fn qualified_name(qualifier: &Option, name: &str) -> String { +fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String { match qualifier { Some(q) => format!("{}.{}", q, name), None => name.to_string(), From 23c66347bd86be0dfe30ed64d1ef0b0f5e0117ec Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 18 Jan 2024 10:00:23 -0500 Subject: [PATCH 05/24] More updates --- datafusion/common/src/dfschema.rs | 62 ++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 89ccb02a95ae..3dd002c51262 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -388,10 +388,10 @@ impl DFSchema { /// Find all fields having the given qualifier pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> { - let fields = self + let fields: Vec<&Field> = self .iter() .filter(|(q, f)| q.map(|q| q == qualifier).unwrap_or(false)) - .map(|(_, f)| f.into()) + .map(|(_, f)| f.as_ref()) .collect(); fields } @@ -403,22 +403,37 @@ impl DFSchema { ) -> Vec { self.iter() .enumerate() - .filter(|(idx, (q, field))| qualifier == q) + .filter_map(|(idx, (q, _))| { + let qualifier_matches = match q { + Some(q) => *q == *qualifier, + None => false, + }; + match qualifier_matches { + true => Some(idx), + false => None, + } + }) .collect() } /// Find all fields match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { - self.iter().filter(|field| field.name() == name).collect() + self.iter() + .filter(|(_, field)| field.name() == name) + .map(|(_, f)| f.as_ref()) + .collect() } /// Find the field with the given name pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> { - let field = self - .iter() - .filter(|(q, f)| name == f.name()) - .map(|(q, f)| f); - Ok(field) + let field = self.iter().find(|(q, f)| match q { + Some(q) => false, + None => name == f.name(), + }); + match field { + Some((_, f)) => Ok(f), + None => Err(DataFusionError::Internal("Field not found".to_string())), + } } /// Find the field with the given qualified name @@ -427,11 +442,14 @@ impl DFSchema { qualifier: &TableReference, name: &str, ) -> Result<&Field> { - let field = self - .iter() - .find(|(q, f)| q == qualifier && name == f.name()) - .map(|(q, f)| f); - Ok(field) + let qualifier_and_field = self.iter().find(|(q, f)| match q { + Some(q) => *q == qualifier && name == f.name(), + None => false, + }); + match qualifier_and_field { + Some((q, f)) => Ok(f), + None => Err(DataFusionError::Internal("Field not found".to_string())), + } } /// Find the field with the given qualified column @@ -506,10 +524,10 @@ impl DFSchema { if self.fields().len() != other.fields().len() { return false; } - let self_fields = self.fields().iter(); - let other_fields = other.fields().iter(); - self_fields.zip(other_fields).all(|(f1, f2)| { - f1.qualifier() == f2.qualifier() + let self_fields = self.iter(); + let other_fields = other.iter(); + self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| { + q1 == q2 && f1.name() == f2.name() && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type()) }) @@ -528,10 +546,10 @@ impl DFSchema { if self.fields().len() != other.fields().len() { return false; } - let self_fields = self.fields().iter(); - let other_fields = other.fields().iter(); - self_fields.zip(other_fields).all(|(f1, f2)| { - f1.qualifier() == f2.qualifier() + let self_fields = self.iter(); + let other_fields = other.iter(); + self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| { + q1 == q2 && f1.name() == f2.name() && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type()) }) From dd8823342466648b38efdc56124a64b31970abf7 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 19 Jan 2024 10:17:55 -0500 Subject: [PATCH 06/24] Start working on columns --- datafusion/common/src/column.rs | 10 ++++------ datafusion/common/src/dfschema.rs | 10 ++++++++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index f0edc7175948..1ba81ff45041 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -178,11 +178,11 @@ impl Column { } for schema in schemas { - let fields = schema.fields_with_unqualified_name(&self.name); + let fields = schema.qualified_fields_with_unqualified_name(&self.name); match fields.len() { 0 => continue, 1 => { - return Ok(fields[0].qualified_column()); + return Ok(fields[0].into()); } _ => { // More than 1 fields in this schema have their names set to self.name. @@ -199,13 +199,11 @@ impl Column { // Compare matched fields with one USING JOIN clause at a time for using_col in using_columns { - let all_matched = fields - .iter() - .all(|f| using_col.contains(&f.qualified_column())); + let all_matched = fields.iter().all(|f| using_col.contains(f)); // All matched fields belong to the same using column set, in orther words // the same join clause. We simply pick the qualifer from the first match. if all_matched { - return Ok(fields[0].qualified_column()); + return Ok(fields[0].into()); } } } diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 3dd002c51262..8d174c6e4f28 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -424,6 +424,16 @@ impl DFSchema { .collect() } + /// Find all fields with the given name and return their fully qualified name. + /// This was added after making DFSchema wrap SchemaRef to facilitate the transition + /// for `Column`. TODO: Or maybe just make a columns_with_unqualified_name method? + pub fn qualified_fields_with_unqualified_name(&self, name: &str) -> Vec { + self.iter() + .filter(|(_, field)| field.name() == name) + .map(|(q, f)| qualified_name(q, f.name())) + .collect() + } + /// Find the field with the given name pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> { let field = self.iter().find(|(q, f)| match q { From ee2bd43ebe3770d04517b82ad2d691859af46b80 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sat, 20 Jan 2024 12:20:01 -0500 Subject: [PATCH 07/24] Start cleaning up columns --- datafusion/common/src/column.rs | 32 ++++++++++++++----------------- datafusion/common/src/dfschema.rs | 19 +++++++++++++++--- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index 1ba81ff45041..4d10ea71087a 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -178,11 +178,11 @@ impl Column { } for schema in schemas { - let fields = schema.qualified_fields_with_unqualified_name(&self.name); - match fields.len() { + let columns = schema.columns_with_unqualified_name(&self.name); + match columns.len() { 0 => continue, 1 => { - return Ok(fields[0].into()); + return Ok(columns[0].into()); } _ => { // More than 1 fields in this schema have their names set to self.name. @@ -199,11 +199,11 @@ impl Column { // Compare matched fields with one USING JOIN clause at a time for using_col in using_columns { - let all_matched = fields.iter().all(|f| using_col.contains(f)); + let all_matched = columns.iter().all(|f| using_col.contains(f)); // All matched fields belong to the same using column set, in orther words // the same join clause. We simply pick the qualifer from the first match. if all_matched { - return Ok(fields[0].into()); + return Ok(columns[0].into()); } } } @@ -212,10 +212,7 @@ impl Column { _schema_err!(SchemaError::FieldNotFound { field: Box::new(Column::new(self.relation.clone(), self.name)), - valid_fields: schemas - .iter() - .flat_map(|s| s.fields().iter().map(|f| f.qualified_column())) - .collect(), + valid_fields: schemas.iter().flat_map(|s| s.columns()).collect(), }) } @@ -265,13 +262,14 @@ impl Column { } for schema_level in schemas { - let fields = schema_level + let columns = schema_level .iter() - .flat_map(|s| s.fields_with_unqualified_name(&self.name)) + .flat_map(|s| s.columns_with_unqualified_name(&self.name)) .collect::>(); - match fields.len() { + match columns.len() { 0 => continue, - 1 => return Ok(fields[0].qualified_column()), + 1 => return Ok(columns[0]), + _ => { // More than 1 fields in this schema have their names set to self.name. // @@ -287,13 +285,11 @@ impl Column { // Compare matched fields with one USING JOIN clause at a time for using_col in using_columns { - let all_matched = fields - .iter() - .all(|f| using_col.contains(&f.qualified_column())); + let all_matched = columns.iter().all(|c| using_col.contains(c)); // All matched fields belong to the same using column set, in orther words // the same join clause. We simply pick the qualifer from the first match. if all_matched { - return Ok(fields[0].qualified_column()); + return Ok(columns[0]); } } @@ -310,7 +306,7 @@ impl Column { valid_fields: schemas .iter() .flat_map(|s| s.iter()) - .flat_map(|s| s.fields().iter().map(|f| f.qualified_column())) + .flat_map(|s| s.columns()) .collect(), }) } diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 8d174c6e4f28..0c92b4b1f304 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -18,7 +18,7 @@ //! DFSchema is an extended schema struct that DataFusion uses to provide support for //! fields with optional relation names. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::convert::TryFrom; use std::fmt::{Display, Formatter}; use std::hash::Hash; @@ -416,7 +416,7 @@ impl DFSchema { .collect() } - /// Find all fields match the given name + /// Find all fields that match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { self.iter() .filter(|(_, field)| field.name() == name) @@ -424,10 +424,23 @@ impl DFSchema { .collect() } + /// Find all fields that match the given name and convert to column + pub fn columns_with_unqualified_name(&self, name: &str) -> Vec { + self.iter() + .filter(|(_, field)| field.name() == name) + .map(|(_, f)| Column::from_name(f.name())) + .collect() + } + + /// Return all `Column`s for the schema + pub fn columns(&self) -> Vec { + self.iter().map(|(q, f)| Column::new(q, f.name())).collect() + } + /// Find all fields with the given name and return their fully qualified name. /// This was added after making DFSchema wrap SchemaRef to facilitate the transition /// for `Column`. TODO: Or maybe just make a columns_with_unqualified_name method? - pub fn qualified_fields_with_unqualified_name(&self, name: &str) -> Vec { + pub fn columns_with_unqualified_name(&self, name: &str) -> Vec { self.iter() .filter(|(_, field)| field.name() == name) .map(|(q, f)| qualified_name(q, f.name())) From a10a85419b55354b4bd81b740e82180a99119d2c Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 25 Jan 2024 16:36:56 -0500 Subject: [PATCH 08/24] Remove DFField from dfschema tests --- datafusion/common/src/dfschema.rs | 315 ++---------------- datafusion/common/src/error.rs | 12 +- .../common/src/functional_dependencies.rs | 26 +- 3 files changed, 41 insertions(+), 312 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 0c92b4b1f304..292ee3d9c70f 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -251,6 +251,11 @@ impl DFSchema { .chain(schema.iter()) .map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone())) .unzip(); + // let (new_field_qualifiers, new_fields) = self + // .iter() + // .chain(schema.iter()) + // .map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone())) + // .unzip(); let mut new_metadata = self.inner.metadata.clone(); new_metadata.extend(schema.inner.metadata.clone()); @@ -440,12 +445,12 @@ impl DFSchema { /// Find all fields with the given name and return their fully qualified name. /// This was added after making DFSchema wrap SchemaRef to facilitate the transition /// for `Column`. TODO: Or maybe just make a columns_with_unqualified_name method? - pub fn columns_with_unqualified_name(&self, name: &str) -> Vec { - self.iter() - .filter(|(_, field)| field.name() == name) - .map(|(q, f)| qualified_name(q, f.name())) - .collect() - } + // pub fn columns_with_unqualified_name(&self, name: &str) -> Vec { + // self.iter() + // .filter(|(_, field)| field.name() == name) + // .map(|(q, f)| qualified_name(q, f.name())) + // .collect() + // } /// Find the field with the given name pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> { @@ -970,22 +975,6 @@ mod tests { Ok(()) } - #[test] - fn from_unqualified_field() { - let field = Field::new("c0", DataType::Boolean, true); - let field = DFField::from(field); - assert_eq!("c0", field.name()); - assert_eq!("c0", field.qualified_name()); - } - - #[test] - fn from_qualified_field() { - let field = Field::new("c0", DataType::Boolean, true); - let field = DFField::from_qualified("t1", field); - assert_eq!("c0", field.name()); - assert_eq!("t1.c0", field.qualified_name()); - } - #[test] fn from_unqualified_schema() -> Result<()> { let schema = DFSchema::try_from(test_schema_1())?; @@ -1111,9 +1100,14 @@ mod tests { .to_string(), expected_help ); - assert_contains!(schema.index_of("y").unwrap_err().to_string(), expected_help); + let y_col = Column::new_unqualified("y"); + assert_contains!( + schema.index_of_column(&y_col).unwrap_err().to_string(), + expected_help + ); + let c0_column = Column::new(Some("t1"), "c0"); assert_contains!( - schema.index_of("t1.c0").unwrap_err().to_string(), + schema.index_of_column(&c0_column).unwrap_err().to_string(), expected_err_msg ); Ok(()) @@ -1133,252 +1127,6 @@ mod tests { assert_eq!(err.strip_backtrace(), "Schema error: No field named c0."); } - #[test] - fn equivalent_names_and_types() { - let arrow_field1 = Field::new("f1", DataType::Int16, true); - let arrow_field1_meta = arrow_field1.clone().with_metadata(test_metadata_n(2)); - - let field1_i16_t = DFField::from(arrow_field1); - let field1_i16_t_meta = DFField::from(arrow_field1_meta); - let field1_i16_t_qualified = - DFField::from_qualified("foo", field1_i16_t.field().clone()); - let field1_i16_f = DFField::from(Field::new("f1", DataType::Int16, false)); - let field1_i32_t = DFField::from(Field::new("f1", DataType::Int32, true)); - let field2_i16_t = DFField::from(Field::new("f2", DataType::Int16, true)); - let field3_i16_t = DFField::from(Field::new("f3", DataType::Int16, true)); - - let dict = - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - let field_dict_t = DFField::from(Field::new("f_dict", dict.clone(), true)); - let field_dict_f = DFField::from(Field::new("f_dict", dict, false)); - - let list_t = DFField::from(Field::new_list( - "f_list", - field1_i16_t.field().clone(), - true, - )); - let list_f = DFField::from(Field::new_list( - "f_list", - field1_i16_f.field().clone(), - false, - )); - - let list_f_name = DFField::from(Field::new_list( - "f_list", - field2_i16_t.field().clone(), - false, - )); - - let struct_t = DFField::from(Field::new_struct( - "f_struct", - vec![field1_i16_t.field().clone()], - true, - )); - let struct_f = DFField::from(Field::new_struct( - "f_struct", - vec![field1_i16_f.field().clone()], - false, - )); - - let struct_f_meta = DFField::from(Field::new_struct( - "f_struct", - vec![field1_i16_t_meta.field().clone()], - false, - )); - - let struct_f_type = DFField::from(Field::new_struct( - "f_struct", - vec![field1_i32_t.field().clone()], - false, - )); - - // same - TestCase { - fields1: vec![&field1_i16_t], - fields2: vec![&field1_i16_t], - expected_dfschema: true, - expected_arrow: true, - } - .run(); - - // same but metadata is different, should still be true - TestCase { - fields1: vec![&field1_i16_t_meta], - fields2: vec![&field1_i16_t], - expected_dfschema: true, - expected_arrow: true, - } - .run(); - - // different name - TestCase { - fields1: vec![&field1_i16_t], - fields2: vec![&field2_i16_t], - expected_dfschema: false, - expected_arrow: false, - } - .run(); - - // different type - TestCase { - fields1: vec![&field1_i16_t], - fields2: vec![&field1_i32_t], - expected_dfschema: false, - expected_arrow: false, - } - .run(); - - // different nullability - TestCase { - fields1: vec![&field1_i16_t], - fields2: vec![&field1_i16_f], - expected_dfschema: true, - expected_arrow: true, - } - .run(); - - // different qualifier - TestCase { - fields1: vec![&field1_i16_t], - fields2: vec![&field1_i16_t_qualified], - expected_dfschema: false, - expected_arrow: true, - } - .run(); - - // different name after first - TestCase { - fields1: vec![&field2_i16_t, &field1_i16_t], - fields2: vec![&field2_i16_t, &field3_i16_t], - expected_dfschema: false, - expected_arrow: false, - } - .run(); - - // different number - TestCase { - fields1: vec![&field1_i16_t, &field2_i16_t], - fields2: vec![&field1_i16_t], - expected_dfschema: false, - expected_arrow: false, - } - .run(); - - // dictionary - TestCase { - fields1: vec![&field_dict_t], - fields2: vec![&field_dict_t], - expected_dfschema: true, - expected_arrow: true, - } - .run(); - - // dictionary (different nullable) - TestCase { - fields1: vec![&field_dict_t], - fields2: vec![&field_dict_f], - expected_dfschema: true, - expected_arrow: true, - } - .run(); - - // dictionary (wrong type) - TestCase { - fields1: vec![&field_dict_t], - fields2: vec![&field1_i16_t], - expected_dfschema: false, - expected_arrow: false, - } - .run(); - - // list (different embedded nullability) - TestCase { - fields1: vec![&list_t], - fields2: vec![&list_f], - expected_dfschema: true, - expected_arrow: true, - } - .run(); - - // list (different sub field names) - TestCase { - fields1: vec![&list_t], - fields2: vec![&list_f_name], - expected_dfschema: false, - expected_arrow: false, - } - .run(); - - // struct - TestCase { - fields1: vec![&struct_t], - fields2: vec![&struct_f], - expected_dfschema: true, - expected_arrow: true, - } - .run(); - - // struct (different embedded meta) - TestCase { - fields1: vec![&struct_t], - fields2: vec![&struct_f_meta], - expected_dfschema: true, - expected_arrow: true, - } - .run(); - - // struct (different field type) - TestCase { - fields1: vec![&struct_t], - fields2: vec![&struct_f_type], - expected_dfschema: false, - expected_arrow: false, - } - .run(); - - #[derive(Debug)] - struct TestCase<'a> { - fields1: Vec<&'a DFField>, - fields2: Vec<&'a DFField>, - expected_dfschema: bool, - expected_arrow: bool, - } - - impl<'a> TestCase<'a> { - fn run(self) { - println!("Running {self:#?}"); - let schema1 = to_df_schema(self.fields1); - let schema2 = to_df_schema(self.fields2); - assert_eq!( - schema1.equivalent_names_and_types(&schema2), - self.expected_dfschema, - "Comparison did not match expected: {}\n\n\ - schema1:\n\n{:#?}\n\nschema2:\n\n{:#?}", - self.expected_dfschema, - schema1, - schema2 - ); - - let arrow_schema1 = Schema::from(schema1); - let arrow_schema2 = Schema::from(schema2); - assert_eq!( - arrow_schema1.equivalent_names_and_types(&arrow_schema2), - self.expected_arrow, - "Comparison did not match expected: {}\n\n\ - arrow schema1:\n\n{:#?}\n\n arrow schema2:\n\n{:#?}", - self.expected_arrow, - arrow_schema1, - arrow_schema2 - ); - } - } - - fn to_df_schema(fields: Vec<&DFField>) -> DFSchema { - let fields = fields.into_iter().cloned().collect(); - DFSchema::new_with_metadata(fields, HashMap::new()).unwrap() - } - } - #[test] fn into() { // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef @@ -1389,11 +1137,11 @@ mod tests { ); let arrow_schema_ref = Arc::new(arrow_schema.clone()); - let df_schema = DFSchema::new_with_metadata( - vec![DFField::new_unqualified("c0", DataType::Int64, true)], - metadata, - ) - .unwrap(); + let df_schema = DFSchema { + inner: arrow_schema_ref, + field_qualifiers: vec![None; arrow_schema_ref.fields.len()], + functional_dependencies: FunctionalDependencies::empty(), + }; let df_schema_ref = Arc::new(df_schema.clone()); { @@ -1433,16 +1181,15 @@ mod tests { b_metadata.insert("key".to_string(), "value".to_string()); let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata); - let a: DFField = DFField::from_qualified("table1", a_field); - let b: DFField = DFField::from_qualified("table1", b_field); + let schema = Arc::new(Schema::new(vec![a_field, b_field])); - let df_schema = Arc::new( - DFSchema::new_with_metadata([a, b].to_vec(), HashMap::new()).unwrap(), - ); - let schema: Schema = df_schema.as_ref().clone().into(); - let a_df = df_schema.fields.first().unwrap().field(); - let a_arrow = schema.fields.first().unwrap(); - assert_eq!(a_df.metadata(), a_arrow.metadata()) + let df_schema = DFSchema { + inner: schema, + field_qualifiers: vec![None; schema.fields.len()], + functional_dependencies: FunctionalDependencies::empty(), + }; + + assert_eq!(df_schema.inner.metadata(), schema.metadata()) } #[test] diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 331f5910d7e5..1dad3da7bcaa 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -596,11 +596,7 @@ pub fn field_not_found>( ) -> DataFusionError { schema_datafusion_err!(SchemaError::FieldNotFound { field: Box::new(Column::new(qualifier, name)), - valid_fields: schema - .fields() - .iter() - .map(|f| f.qualified_column()) - .collect(), + valid_fields: schema.columns().iter().map(|c| c.clone()).collect(), }) } @@ -608,11 +604,7 @@ pub fn field_not_found>( pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError { schema_datafusion_err!(SchemaError::FieldNotFound { field: Box::new(Column::new_unqualified(name)), - valid_fields: schema - .fields() - .iter() - .map(|f| f.qualified_column()) - .collect(), + valid_fields: schema.columns().iter().map(|c| c.clone()).collect(), }) } diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 1cb1751d713e..5ff5e9b87c6a 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -78,11 +78,9 @@ impl Constraints { .iter() .map(|pk| { let idx = df_schema - .fields() + .field_names() .iter() - .position(|item| { - item.qualified_name() == pk.value.clone() - }) + .position(|item| item == &pk.value.clone()) .ok_or_else(|| { DataFusionError::Execution( "Primary key doesn't exist".to_string(), @@ -452,7 +450,7 @@ pub fn aggregate_functional_dependencies( aggr_schema: &DFSchema, ) -> FunctionalDependencies { let mut aggregate_func_dependencies = vec![]; - let aggr_input_fields = aggr_input_schema.fields(); + let aggr_input_fields = aggr_input_schema.field_names(); let aggr_fields = aggr_schema.fields(); // Association covers the whole table: let target_indices = (0..aggr_schema.fields().len()).collect::>(); @@ -470,7 +468,7 @@ pub fn aggregate_functional_dependencies( let mut new_source_field_names = vec![]; let source_field_names = source_indices .iter() - .map(|&idx| aggr_input_fields[idx].qualified_name()) + .map(|&idx| aggr_input_fields[idx]) .collect::>(); for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() { @@ -538,11 +536,7 @@ pub fn get_target_functional_dependencies( ) -> Option> { let mut combined_target_indices = HashSet::new(); let dependencies = schema.functional_dependencies(); - let field_names = schema - .fields() - .iter() - .map(|item| item.qualified_name()) - .collect::>(); + let field_names = schema.field_names().iter().collect::>(); for FunctionalDependence { source_indices, target_indices, @@ -577,17 +571,13 @@ pub fn get_required_group_by_exprs_indices( group_by_expr_names: &[String], ) -> Option> { let dependencies = schema.functional_dependencies(); - let field_names = schema - .fields() - .iter() - .map(|item| item.qualified_name()) - .collect::>(); + let field_names = schema.field_names().iter().collect::>(); let mut groupby_expr_indices = group_by_expr_names .iter() .map(|group_by_expr_name| { field_names .iter() - .position(|field_name| field_name == group_by_expr_name) + .position(|field_name| *field_name == group_by_expr_name) }) .collect::>>()?; @@ -615,7 +605,7 @@ pub fn get_required_group_by_exprs_indices( .map(|idx| { group_by_expr_names .iter() - .position(|name| &field_names[*idx] == name) + .position(|name| field_names[*idx] == name) }) .collect() } From 25b4e4295b7d857e240914cc2c24844dbd777ebd Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 29 Jan 2024 11:53:09 -0500 Subject: [PATCH 09/24] More cleanup --- datafusion/common/src/dfschema.rs | 55 +++++++++++-------- .../common/src/functional_dependencies.rs | 10 ++-- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 292ee3d9c70f..d477bc4d16e7 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -246,25 +246,40 @@ impl DFSchema { /// Create a new schema that contains the fields from this schema followed by the fields /// from the supplied schema. An error will be returned if there are duplicate field names. pub fn join(&self, schema: &DFSchema) -> Result { - let (new_field_qualifiers, new_fields) = self - .iter() - .chain(schema.iter()) - .map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone())) - .unzip(); // let (new_field_qualifiers, new_fields) = self // .iter() // .chain(schema.iter()) // .map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone())) // .unzip(); + // let (new_field_qualifiers, new_fields) = self + // .iter() + // .chain(schema.iter()) + // .map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone())) + // .unzip(); + + let fields = self.inner.fields().clone(); + let mut schema_builder = SchemaBuilder::new(); + schema_builder.extend(fields.iter().map(|f| f.clone())); + + let other_fields = schema.inner.fields.clone(); + schema_builder.extend(other_fields.iter().map(|f| f.clone())); + let new_schema = schema_builder.finish(); let mut new_metadata = self.inner.metadata.clone(); new_metadata.extend(schema.inner.metadata.clone()); + + let new_schema_with_metadata = new_schema.with_metadata(new_metadata); + + let mut new_qualifiers = self.field_qualifiers.clone(); + new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice()); + + let combined_schema = schema_builder.finish(); self.functional_dependencies .extend(schema.functional_dependencies); let new_self = Self { - inner: Arc::new(Schema::new_with_metadata(new_fields, new_metadata)), - field_qualifiers: new_field_qualifiers, + inner: Arc::new(new_schema_with_metadata), + field_qualifiers: new_qualifiers, functional_dependencies: self.functional_dependencies.clone(), }; Ok(new_self) @@ -439,23 +454,15 @@ impl DFSchema { /// Return all `Column`s for the schema pub fn columns(&self) -> Vec { - self.iter().map(|(q, f)| Column::new(q, f.name())).collect() + self.iter() + .map(|(q, f)| Column::new(q.map(|q| q.clone()), f.name().clone())) + .collect() } - /// Find all fields with the given name and return their fully qualified name. - /// This was added after making DFSchema wrap SchemaRef to facilitate the transition - /// for `Column`. TODO: Or maybe just make a columns_with_unqualified_name method? - // pub fn columns_with_unqualified_name(&self, name: &str) -> Vec { - // self.iter() - // .filter(|(_, field)| field.name() == name) - // .map(|(q, f)| qualified_name(q, f.name())) - // .collect() - // } - /// Find the field with the given name pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> { let field = self.iter().find(|(q, f)| match q { - Some(q) => false, + Some(_) => false, None => name == f.name(), }); match field { @@ -475,7 +482,7 @@ impl DFSchema { None => false, }); match qualifier_and_field { - Some((q, f)) => Ok(f), + Some((_, f)) => Ok(f), None => Err(DataFusionError::Internal("Field not found".to_string())), } } @@ -732,7 +739,7 @@ impl From for Schema { /// Convert DFSchema into a Schema fn from(df_schema: DFSchema) -> Self { let fields: Fields = df_schema.inner.fields.clone(); - Schema::new_with_metadata(fields, df_schema.inner.metadata) + Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) } } @@ -748,9 +755,10 @@ impl From<&DFSchema> for Schema { impl TryFrom for DFSchema { type Error = DataFusionError; fn try_from(schema: Schema) -> Result { + let field_count = schema.fields.len(); let dfschema = Self { inner: schema.into(), - field_qualifiers: vec![None; schema.fields.len()], + field_qualifiers: vec![None; field_count], functional_dependencies: FunctionalDependencies::empty(), }; Ok(dfschema) @@ -804,13 +812,14 @@ impl ToDFSchema for SchemaRef { impl ToDFSchema for Vec { fn to_dfschema(self) -> Result { + let field_count = self.len(); let schema = Schema { fields: self.into(), metadata: HashMap::new(), }; let dfschema = DFSchema { inner: schema.into(), - field_qualifiers: vec![None; self.len()], + field_qualifiers: vec![None; field_count], functional_dependencies: FunctionalDependencies::empty(), }; Ok(dfschema) diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 5ff5e9b87c6a..27b8a80c5793 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -468,7 +468,7 @@ pub fn aggregate_functional_dependencies( let mut new_source_field_names = vec![]; let source_field_names = source_indices .iter() - .map(|&idx| aggr_input_fields[idx]) + .map(|&idx| aggr_input_fields[idx].clone()) .collect::>(); for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() { @@ -536,7 +536,7 @@ pub fn get_target_functional_dependencies( ) -> Option> { let mut combined_target_indices = HashSet::new(); let dependencies = schema.functional_dependencies(); - let field_names = schema.field_names().iter().collect::>(); + let field_names = schema.field_names(); for FunctionalDependence { source_indices, target_indices, @@ -571,13 +571,13 @@ pub fn get_required_group_by_exprs_indices( group_by_expr_names: &[String], ) -> Option> { let dependencies = schema.functional_dependencies(); - let field_names = schema.field_names().iter().collect::>(); + let field_names = schema.field_names(); let mut groupby_expr_indices = group_by_expr_names .iter() .map(|group_by_expr_name| { field_names .iter() - .position(|field_name| *field_name == group_by_expr_name) + .position(|field_name| field_name == group_by_expr_name) }) .collect::>>()?; @@ -605,7 +605,7 @@ pub fn get_required_group_by_exprs_indices( .map(|idx| { group_by_expr_names .iter() - .position(|name| field_names[*idx] == name) + .position(|name| &field_names[*idx] == name) }) .collect() } From 80f1181fcc9313e5cf4f81831c02f3a16c33f155 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 30 Jan 2024 09:27:40 -0500 Subject: [PATCH 10/24] datafusion common is building --- datafusion/common/src/column.rs | 44 ++++++++++++------------------- datafusion/common/src/dfschema.rs | 35 ++++++++++++------------ 2 files changed, 34 insertions(+), 45 deletions(-) diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index 4d10ea71087a..964f61d97f32 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -182,7 +182,7 @@ impl Column { match columns.len() { 0 => continue, 1 => { - return Ok(columns[0].into()); + return Ok(columns[0].clone().into()); } _ => { // More than 1 fields in this schema have their names set to self.name. @@ -203,7 +203,7 @@ impl Column { // All matched fields belong to the same using column set, in orther words // the same join clause. We simply pick the qualifer from the first match. if all_matched { - return Ok(columns[0].into()); + return Ok(columns[0].clone().into()); } } } @@ -268,7 +268,7 @@ impl Column { .collect::>(); match columns.len() { 0 => continue, - 1 => return Ok(columns[0]), + 1 => return Ok(columns[0].clone()), _ => { // More than 1 fields in this schema have their names set to self.name. @@ -289,7 +289,7 @@ impl Column { // All matched fields belong to the same using column set, in orther words // the same join clause. We simply pick the qualifer from the first match. if all_matched { - return Ok(columns[0]); + return Ok(columns[0].clone()); } } @@ -349,36 +349,26 @@ impl fmt::Display for Column { #[cfg(test)] mod tests { use super::*; - use crate::DFField; use arrow::datatypes::DataType; + use arrow_schema::{Field, SchemaBuilder}; use std::collections::HashMap; - fn create_schema(names: &[(Option<&str>, &str)]) -> Result { - let fields = names - .iter() - .map(|(qualifier, name)| { - DFField::new( - qualifier.to_owned().map(|s| s.to_string()), - name, - DataType::Boolean, - true, - ) - }) - .collect::>(); - DFSchema::new_with_metadata(fields, HashMap::new()) + fn create_qualified_schema(qualifier: &str, names: &[&str]) -> Result { + let mut schema_builder = SchemaBuilder::new(); + schema_builder.extend( + names + .iter() + .map(|f| Field::new(f.clone(), DataType::Boolean, true)), + ); + let schema = Arc::new(schema_builder.finish()); + DFSchema::try_from_qualified_schema(qualifier, &schema) } #[test] fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> { - let schema1 = create_schema(&[(Some("t1"), "a"), (Some("t1"), "b")])?; - let schema2 = create_schema(&[(Some("t2"), "c"), (Some("t2"), "d")])?; - let schema3 = create_schema(&[ - (Some("t3"), "a"), - (Some("t3"), "b"), - (Some("t3"), "c"), - (Some("t3"), "d"), - (Some("t3"), "e"), - ])?; + let schema1 = create_qualified_schema("t1", &["a", "b"])?; + let schema2 = create_qualified_schema("t2", &["c", "d"])?; + let schema3 = create_qualified_schema("t3", &["a", "b", "c", "d", "e"])?; // already normalized let col = Column::new(Some("t1"), "a"); diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index d477bc4d16e7..694eaa8743ec 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -24,10 +24,7 @@ use std::fmt::{Display, Formatter}; use std::hash::Hash; use std::sync::Arc; -use crate::error::{ - unqualified_field_not_found, DataFusionError, Result, SchemaError, _plan_err, - _schema_err, -}; +use crate::error::{DataFusionError, Result, _plan_err}; use crate::{ field_not_found, Column, FunctionalDependencies, OwnedTableReference, TableReference, }; @@ -132,9 +129,10 @@ impl DFSchema { schema: &SchemaRef, ) -> Result { let qualifier = qualifier.into(); + let owned_qualifier = qualifier.to_owned_reference(); let new_self = Self { inner: schema.clone(), - field_qualifiers: vec![Some(qualifier.clone()); schema.fields().len()], + field_qualifiers: vec![Some(owned_qualifier); schema.fields().len()], functional_dependencies: FunctionalDependencies::empty(), }; // new_self.check_names()?; @@ -142,7 +140,7 @@ impl DFSchema { } /// Create a `DFSchema` from an Arrow where all fields have no qualifier. - pub fn from_unqualified_schema<'a>(schema: &SchemaRef) -> Result { + pub fn from_unqualified_schema(schema: &SchemaRef) -> Result { let new_self = Self { inner: schema.clone(), field_qualifiers: vec![None; schema.fields.len()], @@ -219,9 +217,10 @@ impl DFSchema { schema: &Schema, ) -> Result { let qualifier = qualifier.into(); + let owned_qualifier = qualifier.to_owned_reference(); let schema = DFSchema { inner: schema.clone().into(), - field_qualifiers: vec![Some(qualifier); schema.fields.len()], + field_qualifiers: vec![Some(owned_qualifier); schema.fields.len()], functional_dependencies: FunctionalDependencies::empty(), }; Ok(schema) @@ -273,14 +272,13 @@ impl DFSchema { let mut new_qualifiers = self.field_qualifiers.clone(); new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice()); - let combined_schema = schema_builder.finish(); - self.functional_dependencies - .extend(schema.functional_dependencies); + let mut functional_dependencies = self.functional_dependencies.clone(); + functional_dependencies.extend(schema.functional_dependencies.clone()); let new_self = Self { inner: Arc::new(new_schema_with_metadata), field_qualifiers: new_qualifiers, - functional_dependencies: self.functional_dependencies.clone(), + functional_dependencies, }; Ok(new_self) } @@ -291,7 +289,7 @@ impl DFSchema { if other_schema.inner.fields.is_empty() { return; } - let mut schema_builder = SchemaBuilder::from(self.inner.fields); + let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone()); for (qualifier, field) in other_schema.iter() { // skip duplicate columns let duplicated_field = match qualifier { @@ -304,11 +302,12 @@ impl DFSchema { schema_builder.push(field.clone()) } } + let mut metadata = self.inner.metadata.clone(); + metadata.extend(other_schema.inner.metadata.clone()); + let finished = schema_builder.finish(); - self.inner = finished.into(); - self.inner - .metadata - .extend(other_schema.inner.metadata.clone()); + let finished_with_metadata = finished.with_metadata(metadata); + self.inner = finished_with_metadata.into(); } /// Get a list of fields @@ -1147,7 +1146,7 @@ mod tests { let arrow_schema_ref = Arc::new(arrow_schema.clone()); let df_schema = DFSchema { - inner: arrow_schema_ref, + inner: arrow_schema_ref.clone(), field_qualifiers: vec![None; arrow_schema_ref.fields.len()], functional_dependencies: FunctionalDependencies::empty(), }; @@ -1193,7 +1192,7 @@ mod tests { let schema = Arc::new(Schema::new(vec![a_field, b_field])); let df_schema = DFSchema { - inner: schema, + inner: schema.clone(), field_qualifiers: vec![None; schema.fields.len()], functional_dependencies: FunctionalDependencies::empty(), }; From 20694ea9e6068da74eb40256b9738ffcdaeda793 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 31 Jan 2024 09:30:54 -0500 Subject: [PATCH 11/24] More cleanup --- datafusion/common/src/column.rs | 1 - datafusion/common/src/dfschema.rs | 66 +++++++++++++++++++++---------- 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index 964f61d97f32..b097205d557c 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -351,7 +351,6 @@ mod tests { use super::*; use arrow::datatypes::DataType; use arrow_schema::{Field, SchemaBuilder}; - use std::collections::HashMap; fn create_qualified_schema(qualifier: &str, names: &[&str]) -> Result { let mut schema_builder = SchemaBuilder::new(); diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 694eaa8743ec..47704de3b653 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -26,7 +26,8 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result, _plan_err}; use crate::{ - field_not_found, Column, FunctionalDependencies, OwnedTableReference, TableReference, + field_not_found, unqualified_field_not_found, Column, FunctionalDependencies, + OwnedTableReference, TableReference, }; use arrow::compute::can_cast_types; @@ -245,17 +246,6 @@ impl DFSchema { /// Create a new schema that contains the fields from this schema followed by the fields /// from the supplied schema. An error will be returned if there are duplicate field names. pub fn join(&self, schema: &DFSchema) -> Result { - // let (new_field_qualifiers, new_fields) = self - // .iter() - // .chain(schema.iter()) - // .map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone())) - // .unzip(); - // let (new_field_qualifiers, new_fields) = self - // .iter() - // .chain(schema.iter()) - // .map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone())) - // .unzip(); - let fields = self.inner.fields().clone(); let mut schema_builder = SchemaBuilder::new(); schema_builder.extend(fields.iter().map(|f| f.clone())); @@ -355,7 +345,7 @@ impl DFSchema { let mut matches = self .iter() .enumerate() - .filter(|(i, (q, f))| match (qualifier, q) { + .filter(|(_, (q, f))| match (qualifier, q) { // field to lookup is qualified. // current field is qualified and not shared between relations, compare both // qualifier and name. @@ -409,7 +399,7 @@ impl DFSchema { pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> { let fields: Vec<&Field> = self .iter() - .filter(|(q, f)| q.map(|q| q == qualifier).unwrap_or(false)) + .filter(|(q, _)| q.map(|q| q == qualifier).unwrap_or(false)) .map(|(_, f)| f.as_ref()) .collect(); fields @@ -443,6 +433,17 @@ impl DFSchema { .collect() } + /// Find all fields that match the given name and return them with their qualifier + pub fn fields_and_qualifiers_with_unqualified_name( + &self, + name: &str, + ) -> Vec<(Option<&TableReference>, &Field)> { + self.iter() + .filter(|(_, field)| field.name() == name) + .map(|(q, f)| (q, f.as_ref())) + .collect() + } + /// Find all fields that match the given name and convert to column pub fn columns_with_unqualified_name(&self, name: &str) -> Vec { self.iter() @@ -460,14 +461,37 @@ impl DFSchema { /// Find the field with the given name pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> { - let field = self.iter().find(|(q, f)| match q { - Some(_) => false, - None => name == f.name(), - }); - match field { - Some((_, f)) => Ok(f), - None => Err(DataFusionError::Internal("Field not found".to_string())), + let matches = self.fields_and_qualifiers_with_unqualified_name(name); + match matches.len() { + 0 => Err(unqualified_field_not_found(name, self)), + 1 => Ok(matches[0].1), + _ => { + let fields_without_qualifier = matches + .iter() + .filter(|(q, _)| q.is_none()) + .collect::>(); + if fields_without_qualifier.len() == 1 { + Ok(fields_without_qualifier[0].1) + } else { + Err(DataFusionError::Internal("Field not found".to_string())) + // _schema_err!(SchemaError::AmbiguousReference { + // field: Column { + // relation: None, + // name: name.to_string(), + // }, + // }) + } + } } + // let field = self.iter().find(|(_, f)| name == f.name()); + // let field = self.iter().find(|(q, f)| match q { + // Some(_) => false, + // None => name == f.name(), + // }); + // match field { + // Some((_, f)) => Ok(f), + // None => Err(DataFusionError::Internal("Field not found".to_string())), + // } } /// Find the field with the given qualified name From cba88e766e641aeb3a789af097acc93692a35bd9 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 1 Feb 2024 10:08:53 -0500 Subject: [PATCH 12/24] Start updating expr --- datafusion/common/src/dfschema.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 47704de3b653..52c9e15a4767 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -140,6 +140,31 @@ impl DFSchema { Ok(new_self) } + // TODO ADD TESTS FOR THIS NEW FUNCTION + /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier + pub fn from_field_specific_qualified_schema<'a>( + qualifiers: Vec>>>, + schema: &SchemaRef, + ) -> Result { + let owned_qualifiers = qualifiers + .into_iter() + .map(|maybe_q| { + maybe_q.map(|q| { + let qualifier = q.into(); + let owned_qualifier = qualifier.to_owned_reference(); + owned_qualifier + }) + }) + .collect(); + let new_self = Self { + inner: schema.clone(), + field_qualifiers: owned_qualifiers, + functional_dependencies: FunctionalDependencies::empty(), + }; + // new_self.check_names()?; + Ok(new_self) + } + /// Create a `DFSchema` from an Arrow where all fields have no qualifier. pub fn from_unqualified_schema(schema: &SchemaRef) -> Result { let new_self = Self { From 32106752f7750fd9915a65048b98b7eb34c49e5a Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sun, 4 Feb 2024 10:28:15 -0500 Subject: [PATCH 13/24] More cleanup --- datafusion/expr/src/expr_schema.rs | 100 ++++++++++++-------- datafusion/expr/src/logical_plan/builder.rs | 14 +-- datafusion/expr/src/logical_plan/plan.rs | 14 +-- datafusion/expr/src/utils.rs | 11 ++- 4 files changed, 76 insertions(+), 63 deletions(-) diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index ba21d09f0619..e05b9a5bc085 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -28,8 +28,8 @@ use crate::{utils, LogicalPlan, Projection, Subquery}; use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field}; use datafusion_common::{ - internal_err, plan_datafusion_err, plan_err, Column, DFField, DFSchema, - DataFusionError, ExprSchema, Result, + internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DataFusionError, + ExprSchema, Result, TableReference, }; use std::collections::HashMap; use std::sync::Arc; @@ -46,7 +46,10 @@ pub trait ExprSchemable { fn metadata(&self, schema: &S) -> Result>; /// convert to a field with respect to a schema - fn to_field(&self, input_schema: &DFSchema) -> Result; + fn to_field( + &self, + input_schema: &DFSchema, + ) -> Result<(Option<&TableReference>, &Arc)>; /// cast to a type with respect to a schema fn cast_to(self, cast_to_type: &DataType, schema: &S) -> Result; @@ -306,28 +309,24 @@ impl ExprSchemable for Expr { /// /// So for example, a projected expression `col(c1) + col(c2)` is /// placed in an output field **named** col("c1 + c2") - fn to_field(&self, input_schema: &DFSchema) -> Result { + fn to_field( + &self, + input_schema: &DFSchema, + ) -> Result<(Option<&TableReference>, &Arc)> { match self { - Expr::Column(c) => Ok(DFField::new( - c.relation.clone(), - &c.name, - self.get_type(input_schema)?, - self.nullable(input_schema)?, - ) - .with_metadata(self.metadata(input_schema)?)), - Expr::Alias(Alias { relation, name, .. }) => Ok(DFField::new( - relation.clone(), - name, - self.get_type(input_schema)?, - self.nullable(input_schema)?, - ) - .with_metadata(self.metadata(input_schema)?)), - _ => Ok(DFField::new_unqualified( - &self.display_name()?, - self.get_type(input_schema)?, - self.nullable(input_schema)?, - ) - .with_metadata(self.metadata(input_schema)?)), + Expr::Column(c) => { + let field = input_schema.field_from_column(c)?; + Ok((c.relation, field)) + } + Expr::Alias(Alias { relation, name, .. }) => { + let field = input_schema.field_with_qualified_name(relation, name)?; + Ok((relation, field)) + } + _ => { + let field = + input_schema.field_with_unqualified_name(&self.display_name()?)?; + Ok((None, field)) + } } } @@ -418,7 +417,7 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result>(); for (i, j) in nulls { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 93a38fb40df5..23a228f2a206 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -50,7 +50,7 @@ use datafusion_common::tree_node::{ }; use datafusion_common::{ aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, - DFField, DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, + DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, FunctionalDependencies, OwnedTableReference, ParamValues, Result, UnnestOptions, }; @@ -2096,17 +2096,7 @@ impl TableScan { .map(|p| { let projected_func_dependencies = func_dependencies.project_functional_dependencies(p, p.len()); - let df_schema = DFSchema::new_with_metadata( - p.iter() - .map(|i| { - DFField::from_qualified( - table_name.clone(), - schema.field(*i).clone(), - ) - }) - .collect(), - schema.metadata().clone(), - )?; + let df_schema = DFSchema::from_qualified_schema(table_name, table_source); df_schema.with_functional_dependencies(projected_func_dependencies) }) .unwrap_or_else(|| { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 914b354d2950..efa80ad0abb1 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -30,12 +30,12 @@ use crate::{ Operator, TryCast, }; -use arrow::datatypes::{DataType, TimeUnit}; +use arrow::datatypes::{DataType, Field, TimeUnit}; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::utils::get_at_indices; use datafusion_common::{ - internal_err, plan_datafusion_err, plan_err, Column, DFField, DFSchema, DFSchemaRef, - DataFusionError, Result, ScalarValue, TableReference, + internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, + DataFusionError, OwnedTableReference, Result, ScalarValue, TableReference, }; use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, WildcardAdditionalOptions}; @@ -734,7 +734,10 @@ fn agg_cols(agg: &Aggregate) -> Vec { .collect() } -fn exprlist_to_fields_aggregate(exprs: &[Expr], agg: &Aggregate) -> Result> { +fn exprlist_to_fields_aggregate( + exprs: &[Expr], + agg: &Aggregate, +) -> Result, Field)>> { let agg_cols = agg_cols(agg); let mut fields = vec![]; for expr in exprs { From 6b465ce25fab5aec444d26d9774b765be6dad9a6 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 5 Feb 2024 08:48:55 -0500 Subject: [PATCH 14/24] Update build_join_schema --- datafusion/expr/src/logical_plan/builder.rs | 30 ++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 58e1e5081d93..bfd93904bc51 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1106,29 +1106,31 @@ pub fn build_join_schema( right: &DFSchema, join_type: &JoinType, ) -> Result { - fn nullify_fields(fields: &[DFField]) -> Vec { + fn nullify_fields( + fields: &[(Option, Arc)], + ) -> Vec<(Option, Arc)> { fields .iter() - .map(|f| f.clone().with_nullable(true)) + .map(|(q, f)| (q, f.clone().with_nullable(true))) .collect() } - let right_fields = right.fields(); - let left_fields = left.fields(); + let right_fields = right.iter(); + let left_fields = left.iter(); - let fields: Vec = match join_type { + let fields: Vec<(Option, &Arc)> = match join_type { JoinType::Inner => { // left then right left_fields - .iter() - .chain(right_fields.iter()) + // .iter() + .chain(right_fields) .cloned() .collect() } JoinType::Left => { // left then right, right set to nullable in case of not matched scenario left_fields - .iter() + // .iter() .chain(&nullify_fields(right_fields)) .cloned() .collect() @@ -1136,7 +1138,7 @@ pub fn build_join_schema( JoinType::Right => { // left then right, left set to nullable in case of not matched scenario nullify_fields(left_fields) - .iter() + // .iter() .chain(right_fields.iter()) .cloned() .collect() @@ -1144,7 +1146,7 @@ pub fn build_join_schema( JoinType::Full => { // left then right, all set to nullable in case of not matched scenario nullify_fields(left_fields) - .iter() + // .iter() .chain(&nullify_fields(right_fields)) .cloned() .collect() @@ -1165,8 +1167,12 @@ pub fn build_join_schema( ); let mut metadata = left.metadata().clone(); metadata.extend(right.metadata().clone()); - let schema = DFSchema::new_with_metadata(fields, metadata)?; - schema.with_functional_dependencies(func_dependencies) + let (qualifiers, fields): (Vec>, Arc) = + fields.iter().unzip(); + let schema = Schema::new_with_metadata(fields, metadata); + let dfschema = + DFSchema::from_field_specific_qualified_schema(qualifiers, schema.into())?; + dfschema.with_functional_dependencies(func_dependencies) } /// Add additional "synthetic" group by expressions based on functional From 5a595e699f09be0b2c2891d87a5153e7679e62cc Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 6 Feb 2024 09:32:27 -0500 Subject: [PATCH 15/24] Cleanup expr to_field --- datafusion/expr/src/expr_schema.rs | 19 +++++--- datafusion/expr/src/logical_plan/builder.rs | 53 +++++++++++++-------- datafusion/expr/src/utils.rs | 4 +- 3 files changed, 46 insertions(+), 30 deletions(-) diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index e05b9a5bc085..c2aee8bae2af 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -29,7 +29,7 @@ use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field}; use datafusion_common::{ internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DataFusionError, - ExprSchema, Result, TableReference, + ExprSchema, OwnedTableReference, Result, TableReference, }; use std::collections::HashMap; use std::sync::Arc; @@ -49,7 +49,7 @@ pub trait ExprSchemable { fn to_field( &self, input_schema: &DFSchema, - ) -> Result<(Option<&TableReference>, &Arc)>; + ) -> Result<(Option, Arc)>; /// cast to a type with respect to a schema fn cast_to(self, cast_to_type: &DataType, schema: &S) -> Result; @@ -312,20 +312,25 @@ impl ExprSchemable for Expr { fn to_field( &self, input_schema: &DFSchema, - ) -> Result<(Option<&TableReference>, &Arc)> { + ) -> Result<(Option, Arc)> { match self { Expr::Column(c) => { let field = input_schema.field_from_column(c)?; - Ok((c.relation, field)) + Ok((c.relation, Arc::new(field.clone()))) } Expr::Alias(Alias { relation, name, .. }) => { - let field = input_schema.field_with_qualified_name(relation, name)?; - Ok((relation, field)) + if let Some(rel) = relation { + let field = input_schema.field_with_qualified_name(rel, name)?; + Ok((Some(rel.into()), Arc::new(field.clone()))) + } else { + let field = input_schema.field_with_unqualified_name(name)?; + Ok((None, Arc::new(field.clone()))) + } } _ => { let field = input_schema.field_with_unqualified_name(&self.display_name()?)?; - Ok((None, field)) + Ok((None, Arc::new(field.clone()))) } } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index bfd93904bc51..4ccfdafd5c2b 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1270,34 +1270,45 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result>, + Vec>, + ) = zip(left_plan.schema().iter(), right_plan.schema().iter()) + .map(|(left_field, right_field)| { + let nullable = left_field.is_nullable() || right_field.is_nullable(); + let data_type = + comparison_coercion(left_field.1.data_type(), right_field.1.data_type()) + .ok_or_else(|| { + plan_datafusion_err!( "UNION Column {} (type: {}) is not compatible with column {} (type: {})", right_field.name(), right_field.data_type(), left_field.name(), left_field.data_type() ) - })?; - - Ok(DFField::new( - left_field.qualifier().cloned(), - left_field.name(), - data_type, - nullable, - )) - }) - .collect::>>()? - .to_dfschema()?; + })?; + + Ok(( + left_field.0, + Arc::new(Field::new(left_field.name(), data_type, nullable)), + )) + + // Ok(DFField::new( + // left_field.qualifier().cloned(), + // left_field.name(), + // data_type, + // nullable, + // )) + }) + .unzip(); + // .collect::>>()?; + + // .to_dfschema()?; + let union_schema = + DFSchema::from_field_specific_qualified_schema(union_table_refs, union_fields); let inputs = vec![left_plan, right_plan] .into_iter() diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index efa80ad0abb1..9c318c354827 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -737,7 +737,7 @@ fn agg_cols(agg: &Aggregate) -> Vec { fn exprlist_to_fields_aggregate( exprs: &[Expr], agg: &Aggregate, -) -> Result, Field)>> { +) -> Result, Arc)>> { let agg_cols = agg_cols(agg); let mut fields = vec![]; for expr in exprs { @@ -756,7 +756,7 @@ fn exprlist_to_fields_aggregate( pub fn exprlist_to_fields<'a>( expr: impl IntoIterator, plan: &LogicalPlan, -) -> Result> { +) -> Result, Arc)>> { let exprs: Vec = expr.into_iter().cloned().collect(); // when dealing with aggregate plans we cannot simply look in the aggregate output schema // because it will contain columns representing complex expressions (such a column named From f49644fe47ae2fbce2252b55d8ee46acc578c291 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 8 Feb 2024 09:26:53 -0500 Subject: [PATCH 16/24] Builder updates --- datafusion/expr/src/logical_plan/builder.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 4ccfdafd5c2b..9246221ce01c 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -185,8 +185,9 @@ impl LogicalPlanBuilder { for (i, j) in nulls { values[i][j] = Expr::Literal(ScalarValue::try_from(fields[j].data_type())?); } - let schema = - DFSchemaRef::new(DFSchema::new_with_metadata(fields, HashMap::new())?); + let inner = Arc::new(Schema::new_with_metadata(fields, HashMap::new())); + let dfschema = DFSchema::from_unqualified_schema(&inner); + let schema = DFSchemaRef::new(dfschema?); Ok(Self::from(LogicalPlan::Values(Values { schema, values }))) } @@ -329,10 +330,10 @@ impl LogicalPlanBuilder { /// Select the given column indices pub fn select(self, indices: impl IntoIterator) -> Result { - let fields = self.plan.schema().fields(); + let fields = self.plan.schema().columns(); let exprs: Vec<_> = indices .into_iter() - .map(|x| Expr::Column(fields[x].qualified_column())) + .map(|x| Expr::Column(fields[x])) .collect(); self.project(exprs) } @@ -519,9 +520,9 @@ impl LogicalPlanBuilder { // remove pushed down sort columns let new_expr = schema - .fields() + .columns() .iter() - .map(|f| Expr::Column(f.qualified_column())) + .map(|f| Expr::Column(f.clone())) .collect(); let is_distinct = false; From 51f9c07915d07dab51d14510cf0658177c0d59b3 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 9 Feb 2024 09:14:44 -0500 Subject: [PATCH 17/24] Update expr utils --- datafusion/expr/src/utils.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 9c318c354827..cdb741676754 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -337,14 +337,18 @@ fn get_excluded_columns( } let mut result = vec![]; + let columns = schema.columns(); for ident in unique_idents.into_iter() { let col_name = ident.value.as_str(); - let field = if let Some(qualifier) = qualifier { - schema.field_with_qualified_name(qualifier, col_name)? + let field_idx = if let Some(qualifier) = qualifier { + schema.index_of_column_by_name(Some(qualifier), col_name)? } else { - schema.field_with_unqualified_name(col_name)? + schema.index_of_column_by_name(None, col_name)? }; - result.push(field.qualified_column()) + if let Some(field_idx) = field_idx { + let field = columns[field_idx]; + result.push(field) + } } Ok(result) } @@ -356,18 +360,17 @@ fn get_exprs_except_skipped( ) -> Vec { if columns_to_skip.is_empty() { schema - .fields() + .field_names() .iter() - .map(|f| Expr::Column(f.qualified_column())) + .map(|f| Expr::Column(f.into())) .collect::>() } else { schema - .fields() + .columns() .iter() - .filter_map(|f| { - let col = f.qualified_column(); - if !columns_to_skip.contains(&col) { - Some(Expr::Column(col)) + .filter_map(|c| { + if !columns_to_skip.contains(c) { + Some(Expr::Column(c.name.into())) } else { None } From 03b133a0699f39a3d3c883237aa420b57d6b0163 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 12 Feb 2024 09:12:23 -0500 Subject: [PATCH 18/24] Work on logical plan --- datafusion/expr/src/logical_plan/plan.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 23a228f2a206..2b41ffea39bf 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2373,12 +2373,14 @@ impl Aggregate { if is_grouping_set { fields = fields .into_iter() - .map(|field| field.with_nullable(true)) + .map(|(q, f)| (q, f.with_nullable(true))) .collect::>(); } fields.extend(exprlist_to_fields(aggr_expr.iter(), &input)?); + // let schema = DFSchema::from_field_specific_qualified_schema(fields); + let schema = DFSchema::new_with_metadata(fields, input.schema().metadata().clone())?; @@ -2475,7 +2477,7 @@ fn calc_func_dependencies_for_project( exprs: &[Expr], input: &LogicalPlan, ) -> Result { - let input_fields = input.schema().fields(); + let input_fields = input.schema().field_names(); // Calculate expression indices (if present) in the input schema. let proj_indices = exprs .iter() @@ -2486,9 +2488,7 @@ fn calc_func_dependencies_for_project( } _ => format!("{}", expr), }; - input_fields - .iter() - .position(|item| item.qualified_name() == expr_name) + input_fields.into_iter().position(|item| item == expr_name) }) .collect::>(); Ok(input From a519a7f7e2387706ae44267785e4633cb1c51ba1 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 14 Feb 2024 10:45:08 -0500 Subject: [PATCH 19/24] Update expr rewriter --- datafusion/expr/src/expr_rewriter/mod.rs | 54 +++++++++++++++--------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 1f04c80833f0..ec36d816d22f 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -262,9 +262,9 @@ mod test { use super::*; use crate::expr::Sort; use crate::{col, lit, Cast}; - use arrow::datatypes::DataType; + use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter}; - use datafusion_common::{DFField, DFSchema, ScalarValue}; + use datafusion_common::{DFSchema, OwnedTableReference, ScalarValue}; use std::ops::Add; #[derive(Default)] @@ -318,20 +318,21 @@ mod test { let expr = col("a") + col("b") + col("c"); // Schemas with some matching and some non matching cols - let schema_a = make_schema_with_empty_metadata(vec![ - make_field("tableA", "a"), - make_field("tableA", "aa"), - ]); - let schema_c = make_schema_with_empty_metadata(vec![ - make_field("tableC", "cc"), - make_field("tableC", "c"), - ]); - let schema_b = make_schema_with_empty_metadata(vec![make_field("tableB", "b")]); + let schema_a = make_schema_with_empty_metadata( + vec![Some("tableA".into()), Some("tableA".into())], + vec!["a", "aa"], + ); + let schema_c = make_schema_with_empty_metadata( + vec![Some("tableC".into()), Some("tableC".into())], + vec!["cc", "c"], + ); + let schema_b = + make_schema_with_empty_metadata(vec![Some("tableB".into())], vec!["b"]); // non matching - let schema_f = make_schema_with_empty_metadata(vec![ - make_field("tableC", "f"), - make_field("tableC", "ff"), - ]); + let schema_f = make_schema_with_empty_metadata( + vec![Some("tableC".into()), Some("tableC".into())], + vec!["f", "ff"], + ); let schemas = vec![schema_c, schema_f, schema_b, schema_a]; let schemas = schemas.iter().collect::>(); @@ -349,9 +350,12 @@ mod test { fn normalize_cols_priority() { let expr = col("a") + col("b"); // Schemas with multiple matches for column a, first takes priority - let schema_a = make_schema_with_empty_metadata(vec![make_field("tableA", "a")]); - let schema_b = make_schema_with_empty_metadata(vec![make_field("tableB", "b")]); - let schema_a2 = make_schema_with_empty_metadata(vec![make_field("tableA2", "a")]); + let schema_a = + make_schema_with_empty_metadata(vec![Some("tableA".into())], vec!["a"]); + let schema_b = + make_schema_with_empty_metadata(vec![Some("tableB".into())], vec!["b"]); + let schema_a2 = + make_schema_with_empty_metadata(vec![Some("tableA2".into())], vec!["a"]); let schemas = vec![schema_a2, schema_b, schema_a] .into_iter() .map(Arc::new) @@ -367,7 +371,7 @@ mod test { // test normalizing columns when the name doesn't exist let expr = col("a") + col("b"); let schema_a = - make_schema_with_empty_metadata(vec![make_field("\"tableA\"", "a")]); + make_schema_with_empty_metadata(vec![Some("\"tableA\"".into())], vec!["a"]); let schemas = vec![schema_a]; let schemas = schemas.iter().collect::>(); @@ -388,8 +392,16 @@ mod test { assert_eq!(unnormalized_expr, col("a") + col("b")); } - fn make_schema_with_empty_metadata(fields: Vec) -> DFSchema { - DFSchema::new_with_metadata(fields, HashMap::new()).unwrap() + fn make_schema_with_empty_metadata( + qualifiers: Vec>, + fields: Vec<&'static str>, + ) -> DFSchema { + let fields = fields + .iter() + .map(|f| Arc::new(Field::new(f.to_string(), DataType::Int8, false))) + .collect::>(); + let schema = Arc::new(Schema::new(fields)); + DFSchema::from_field_specific_qualified_schema(qualifiers, &schema).unwrap() } fn make_field(relation: &str, column: &str) -> DFField { From b748981b32fea34df8b211230008cd22da041cfa Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 15 Feb 2024 09:04:35 -0500 Subject: [PATCH 20/24] Cleanup up logical plan --- datafusion/expr/src/logical_plan/plan.rs | 32 ++++++++++++------------ 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 2b41ffea39bf..2558fdf6353d 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -510,11 +510,11 @@ impl LogicalPlan { cross.left.head_output_expr() } } - LogicalPlan::Union(union) => Ok(Some(Expr::Column( - union.schema.fields()[0].qualified_column(), - ))), + LogicalPlan::Union(union) => { + Ok(Some(Expr::Column(union.schema.field_names()[0].into()))) + } LogicalPlan::TableScan(table) => Ok(Some(Expr::Column( - table.projected_schema.fields()[0].qualified_column(), + table.projected_schema.field_names()[0].into(), ))), LogicalPlan::SubqueryAlias(subquery_alias) => { let expr_opt = subquery_alias.input.head_output_expr()?; @@ -2373,18 +2373,19 @@ impl Aggregate { if is_grouping_set { fields = fields .into_iter() - .map(|(q, f)| (q, f.with_nullable(true))) + .map(|(q, f)| (q, f.with_nullable(true).into())) .collect::>(); } fields.extend(exprlist_to_fields(aggr_expr.iter(), &input)?); + let (q, f): (Vec>, Vec>) = + fields.into_iter().unzip(); - // let schema = DFSchema::from_field_specific_qualified_schema(fields); - - let schema = - DFSchema::new_with_metadata(fields, input.schema().metadata().clone())?; + let schema = Arc::new(Schema::new(f)); + let dfschema = + DFSchema::from_field_specific_qualified_schema(q, &schema).unwrap(); - Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema)) + Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(dfschema)) } /// Create a new aggregate operator using the provided schema to avoid the overhead of @@ -2624,7 +2625,6 @@ pub struct Unnest { #[cfg(test)] mod tests { - use std::collections::HashMap; use std::sync::Arc; use super::*; @@ -2974,7 +2974,7 @@ digraph { #[test] fn projection_expr_schema_mismatch() -> Result<()> { - let empty_schema = Arc::new(DFSchema::new_with_metadata(vec![], HashMap::new())?); + let empty_schema = Arc::new(DFSchema::empty()); let p = Projection::try_new_with_schema( vec![col("a")], Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { @@ -3168,10 +3168,10 @@ digraph { filters: vec![], fetch: None, })); - let col = schema.field(0).qualified_column(); + let col = schema.field_names()[0]; let filter = Filter::try_new( - Expr::Column(col).eq(Expr::Literal(ScalarValue::Int32(Some(1)))), + Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)))), scan, ) .unwrap(); @@ -3198,10 +3198,10 @@ digraph { filters: vec![], fetch: None, })); - let col = schema.field(0).qualified_column(); + let col = schema.field_names()[0]; let filter = Filter::try_new( - Expr::Column(col).eq(Expr::Literal(ScalarValue::Int32(Some(1)))), + Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)))), scan, ) .unwrap(); From 9b5a8708d0af32659ef5ae7a01d3bf517101c11e Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 16 Feb 2024 09:11:03 -0500 Subject: [PATCH 21/24] More cleanup --- datafusion/common/src/dfschema.rs | 14 ++++++++++++++ datafusion/expr/src/logical_plan/plan.rs | 19 ++++++++++++++----- datafusion/expr/src/utils.rs | 18 +++++++++++------- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 52c9e15a4767..ca12857e87f4 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -124,6 +124,20 @@ impl DFSchema { } } + pub fn new_with_metadata( + fields: Vec, + metadata: HashMap, + ) -> Self { + let field_count = fields.len(); + let schema = Arc::new(Schema::new_with_metadata(fields, metadata)); + Self { + inner: schema, + field_qualifiers: vec![None; field_count], + functional_dependencies: FunctionalDependencies::empty(), + } + } + + // TODO Check this vs `try_from_qualified_schema` /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier pub fn from_qualified_schema<'a>( qualifier: impl Into>, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 2558fdf6353d..f35d6c2f5748 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2096,7 +2096,10 @@ impl TableScan { .map(|p| { let projected_func_dependencies = func_dependencies.project_functional_dependencies(p, p.len()); - let df_schema = DFSchema::from_qualified_schema(table_name, table_source); + let df_schema = DFSchema::try_from_qualified_schema( + table_name, + &table_source.schema(), + )?; df_schema.with_functional_dependencies(projected_func_dependencies) }) .unwrap_or_else(|| { @@ -2286,18 +2289,24 @@ impl DistinctOn { } let on_expr = normalize_cols(on_expr, input.as_ref())?; + let (qualifiers, fields): (Vec>, Vec>) = + exprlist_to_fields(&select_expr, &input)? + .into_iter() + .unzip(); - let schema = DFSchema::new_with_metadata( - exprlist_to_fields(&select_expr, &input)?, + let schema = Arc::new(Schema::new_with_metadata( + fields, input.schema().metadata().clone(), - )?; + )); + let dfschema = + DFSchema::from_field_specific_qualified_schema(qualifiers, &schema)?; let mut distinct_on = DistinctOn { on_expr, select_expr, sort_expr: None, input, - schema: Arc::new(schema), + schema: Arc::new(dfschema), }; if let Some(sort_expr) = sort_expr { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index cdb741676754..1017e228566a 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -30,7 +30,7 @@ use crate::{ Operator, TryCast, }; -use arrow::datatypes::{DataType, Field, TimeUnit}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::utils::get_at_indices; use datafusion_common::{ @@ -433,13 +433,14 @@ pub fn expand_qualified_wildcard( let projected_func_dependencies = schema .functional_dependencies() .project_functional_dependencies(&qualified_indices, qualified_indices.len()); - let qualified_fields = get_at_indices(schema.fields(), &qualified_indices)?; - if qualified_fields.is_empty() { + let fields_with_qualified = get_at_indices(schema.fields(), &qualified_indices)?; + if fields_with_qualified.is_empty() { return plan_err!("Invalid qualifier {qualifier}"); } - let qualified_schema = - DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())? - // We can use the functional dependencies as is, since it only stores indices: + + let qualified_schema = Arc::new(Schema::new(fields_with_qualified)); + let qualified_dfschema = + DFSchema::try_from_qualified_schema(qualifier, &qualified_schema)? .with_functional_dependencies(projected_func_dependencies)?; let excluded_columns = if let Some(WildcardAdditionalOptions { opt_exclude, @@ -459,7 +460,10 @@ pub fn expand_qualified_wildcard( // Add each excluded `Column` to columns_to_skip let mut columns_to_skip = HashSet::new(); columns_to_skip.extend(excluded_columns); - Ok(get_exprs_except_skipped(&qualified_schema, columns_to_skip)) + Ok(get_exprs_except_skipped( + &qualified_dfschema, + columns_to_skip, + )) } /// (expr, "is the SortExpr for window (either comes from PARTITION BY or ORDER BY columns)") From fe60a4553b23d63d916b90c1419b61bbd130f552 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 23 Feb 2024 10:06:31 -0500 Subject: [PATCH 22/24] More cleanup --- datafusion/common/src/dfschema.rs | 26 ++++++++++++++++++++++-- datafusion/expr/src/logical_plan/plan.rs | 19 ++++++++++------- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index ca12857e87f4..138bc3458fb8 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -26,8 +26,8 @@ use std::sync::Arc; use crate::error::{DataFusionError, Result, _plan_err}; use crate::{ - field_not_found, unqualified_field_not_found, Column, FunctionalDependencies, - OwnedTableReference, TableReference, + field_not_found, functional_dependencies, unqualified_field_not_found, Column, + FunctionalDependencies, OwnedTableReference, TableReference, }; use arrow::compute::can_cast_types; @@ -190,6 +190,28 @@ impl DFSchema { Ok(new_self) } + // TODO Add tests + pub fn from_qualified_fields( + qualified_fields: Vec<(Option, Arc)>, + metadata: Option>, + ) -> Result { + let (qualifiers, fields): (Vec>, Vec>) = + qualified_fields.into_iter().unzip(); + + let schema = if let Some(metadata) = metadata { + Arc::new(Schema::new_with_metadata(fields, metadata)) + } else { + Arc::new(Schema::new(fields)) + }; + + let dfschema = Self { + inner: schema, + field_qualifiers: qualifiers, + functional_dependencies: FunctionalDependencies::empty(), + }; + Ok(dfschema) + } + // fn check_names(&self) -> Result<()> { // let mut qualified_names = HashSet::new(); // let mut unqualified_names = HashSet::new(); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index f35d6c2f5748..77d1d1ce8962 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -896,10 +896,10 @@ impl LogicalPlan { .fields() .iter() .map(|f| { - if f == nested_field { + if f.as_ref() == nested_field { unnested_field.clone() } else { - f.clone() + f.as_ref().clone() } }) .collect::>(); @@ -908,7 +908,7 @@ impl LogicalPlan { DFSchema::new_with_metadata( fields, input.schema().metadata().clone(), - )? + ) // We can use the existing functional dependencies as is: .with_functional_dependencies( input.schema().functional_dependencies().clone(), @@ -1796,9 +1796,10 @@ impl Projection { /// produced by the projection operation. If the schema computation is successful, /// the `Result` will contain the schema; otherwise, it will contain an error. pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result> { - let mut schema = DFSchema::new_with_metadata( + // let mut schema = DFSchema::from_qualified_fields + let mut schema = DFSchema::from_qualified_fields( exprlist_to_fields(exprs, input)?, - input.schema().metadata().clone(), + Some(input.schema().metadata().clone()), )?; schema = schema.with_functional_dependencies(calc_func_dependencies_for_project( exprs, input, @@ -1970,7 +1971,11 @@ pub struct Window { impl Window { /// Create a new window operator. pub fn try_new(window_expr: Vec, input: Arc) -> Result { - let fields = input.schema().fields(); + let fields: Vec<(Option, Arc)> = input + .schema() + .iter() + .map(|(q, f)| (q.map(|q| q.clone()), f.clone())) + .collect(); let input_len = fields.len(); let mut window_fields = fields.clone(); window_fields.extend_from_slice(&exprlist_to_fields(window_expr.iter(), &input)?); @@ -2025,7 +2030,7 @@ impl Window { input, window_expr, schema: Arc::new( - DFSchema::new_with_metadata(window_fields, metadata)? + DFSchema::from_qualified_fields(window_fields, Some(metadata))? .with_functional_dependencies(window_func_dependencies)?, ), }) From 9600fdc6f3b2ce2da9a44e8349df6634a4cfce9d Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 28 Feb 2024 10:12:41 -0500 Subject: [PATCH 23/24] Cleanup --- datafusion/common/src/dfschema.rs | 12 ++++++++++++ datafusion/expr/src/logical_plan/builder.rs | 2 +- datafusion/expr/src/logical_plan/plan.rs | 7 +------ datafusion/expr/src/utils.rs | 16 ++++++++++++++-- 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 138bc3458fb8..600ea03a3c9f 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -579,6 +579,18 @@ impl DFSchema { } } + /// Find the field with the given qualified column + pub fn qualifier_and_field_from_column( + &self, + column: &Column, + ) -> Option<(Option, Arc)> { + self.iter() + .find(|&(q, f)| { + column.relation == q.cloned() && column.name == f.name().clone() + }) + .map(|(q, f)| (q.cloned(), f.clone())) + } + /// Find if the field exists with the given name pub fn has_column_with_unqualified_name(&self, name: &str) -> bool { self.fields().iter().any(|field| field.name() == name) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 9246221ce01c..3c2f0c14cc03 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1508,7 +1508,7 @@ pub fn unnest_with_options( column: Column, options: UnnestOptions, ) -> Result { - let unnest_field = input.schema().field_from_column(&column)?; + let unnest_field = input.schema().qualifier_and_field_from_column(&column); // Extract the type of the nested field in the list. let unnested_field = match unnest_field.data_type() { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 77d1d1ce8962..0b2990a0a0af 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1768,12 +1768,7 @@ impl Projection { /// Create a new Projection using the specified output schema pub fn new_from_schema(input: Arc, schema: DFSchemaRef) -> Self { - let expr: Vec = schema - .fields() - .iter() - .map(|field| field.qualified_column()) - .map(Expr::Column) - .collect(); + let expr: Vec = schema.columns().into_iter().map(Expr::Column).collect(); Self { expr, input, diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 1017e228566a..3a51d491811d 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -853,8 +853,20 @@ pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec { pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result { match expr { Expr::Column(col) => { - let field = plan.schema().field_from_column(col)?; - Ok(Expr::Column(field.qualified_column())) + let maybe_field = plan + .schema() + .iter() + .find(|&(qu, fi)| { + col.relation == qu.cloned() && col.name == fi.name().clone() + }) + .map(|(q, f)| (q.clone(), f.clone())); + if let Some(field) = maybe_field { + Ok(Expr::Column(Column::new(field.0, field.1.name()))) + } else { + Err(DataFusionError::Internal( + "A column for the expression could not be found".to_string(), + )) + } } _ => Ok(Expr::Column(Column::from_name(expr.display_name()?))), } From 4524fae749b11990d85e59926b2894f6554f6d7a Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 7 Mar 2024 10:09:29 -0500 Subject: [PATCH 24/24] Fix unnest --- datafusion/expr/src/logical_plan/builder.rs | 32 ++++++++++++--------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 3c2f0c14cc03..ae4fcbfc9ca7 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1508,18 +1508,21 @@ pub fn unnest_with_options( column: Column, options: UnnestOptions, ) -> Result { - let unnest_field = input.schema().qualifier_and_field_from_column(&column); + let maybe_unnest_field = input.schema().qualifier_and_field_from_column(&column); + if maybe_unnest_field.is_none() { + return Ok(input); + } + let unnest_field = maybe_unnest_field.unwrap(); // Extract the type of the nested field in the list. - let unnested_field = match unnest_field.data_type() { + let unnested_field = match unnest_field.1.data_type() { DataType::List(field) | DataType::FixedSizeList(field, _) - | DataType::LargeList(field) => DFField::new( - unnest_field.qualifier().cloned(), - unnest_field.name(), + | DataType::LargeList(field) => Arc::new(Field::new( + unnest_field.1.name(), field.data_type().clone(), - unnest_field.is_nullable(), - ), + unnest_field.1.is_nullable(), + )), _ => { // If the unnest field is not a list type return the input plan. return Ok(input); @@ -1529,26 +1532,27 @@ pub fn unnest_with_options( // Update the schema with the unnest column type changed to contain the nested type. let input_schema = input.schema(); let fields = input_schema - .fields() .iter() - .map(|f| { - if f == unnest_field { - unnested_field.clone() + .map(|(q, f)| { + if f == &unnest_field.1 { + (q.cloned(), unnested_field.clone()) } else { - f.clone() + (q.cloned(), f.clone()) } }) .collect::>(); let metadata = input_schema.metadata().clone(); - let df_schema = DFSchema::new_with_metadata(fields, metadata)?; + let df_schema = DFSchema::from_qualified_fields(fields, Some(metadata))?; + // let df_schema = DFSchema::new_with_metadata(fields, metadata); // We can use the existing functional dependencies: let deps = input_schema.functional_dependencies().clone(); let schema = Arc::new(df_schema.with_functional_dependencies(deps)?); + let column = Column::new(unnest_field.0, unnested_field.name()); Ok(LogicalPlan::Unnest(Unnest { input: Arc::new(input), - column: unnested_field.qualified_column(), + column, schema, options, }))