diff --git a/Cargo.toml b/Cargo.toml index 48e555bd5527..d9e69e53db7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,14 +57,14 @@ version = "36.0.0" # for the inherited dependency but cannot do the reverse (override from true to false). # # See for more detaiils: https://github.com/rust-lang/cargo/issues/11329 -arrow = { version = "50.0.0", features = ["prettyprint"] } -arrow-array = { version = "50.0.0", default-features = false, features = ["chrono-tz"] } -arrow-buffer = { version = "50.0.0", default-features = false } -arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] } -arrow-ipc = { version = "50.0.0", default-features = false, features = ["lz4"] } -arrow-ord = { version = "50.0.0", default-features = false } -arrow-schema = { version = "50.0.0", default-features = false } -arrow-string = { version = "50.0.0", default-features = false } +arrow = { version = "51.0.0", features = ["prettyprint"] } +arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] } +arrow-buffer = { version = "51.0.0", default-features = false } +arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] } +arrow-ipc = { version = "51.0.0", default-features = false, features = ["lz4"] } +arrow-ord = { version = "51.0.0", default-features = false } +arrow-schema = { version = "51.0.0", default-features = false } +arrow-string = { version = "51.0.0", default-features = false } async-trait = "0.1.73" bigdecimal = "=0.4.1" bytes = "1.4" @@ -95,7 +95,7 @@ log = "^0.4" num_cpus = "1.13.0" object_store = { version = "0.9.0", default-features = false } parking_lot = "0.12" -parquet = { version = "50.0.0", default-features = false, features = ["arrow", "async", "object_store"] } +parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } rand = "0.8" rstest = "0.18.0" serde_json = "1" diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 8e2a2c353e2d..51cccf60a1e4 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -130,9 +130,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa285343fba4d829d49985bdc541e3789cf6000ed0e84be7c039438df4a4e78c" +checksum = "219d05930b81663fd3b32e3bde8ce5bff3c4d23052a99f11a8fa50a3b47b2658" dependencies = [ "arrow-arith", "arrow-array", @@ -151,9 +151,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "753abd0a5290c1bcade7c6623a556f7d1659c5f4148b140b5b63ce7bd1a45705" +checksum = "0272150200c07a86a390be651abdd320a2d12e84535f0837566ca87ecd8f95e0" dependencies = [ "arrow-array", "arrow-buffer", @@ -166,9 +166,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d390feeb7f21b78ec997a4081a025baef1e2e0d6069e181939b61864c9779609" +checksum = "8010572cf8c745e242d1b632bd97bd6d4f40fefed5ed1290a8f433abaa686fea" dependencies = [ "ahash", "arrow-buffer", @@ -183,9 +183,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69615b061701bcdffbc62756bc7e85c827d5290b472b580c972ebbbf690f5aa4" +checksum = "0d0a2432f0cba5692bf4cb757469c66791394bac9ec7ce63c1afe74744c37b27" dependencies = [ "bytes", "half", @@ -194,28 +194,30 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e448e5dd2f4113bf5b74a1f26531708f5edcacc77335b7066f9398f4bcf4cdef" +checksum = "9abc10cd7995e83505cc290df9384d6e5412b207b79ce6bdff89a10505ed2cba" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", - "base64 0.21.7", + "atoi", + "base64 0.22.0", "chrono", "comfy-table", "half", "lexical-core", "num", + "ryu", ] [[package]] name = "arrow-csv" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46af72211f0712612f5b18325530b9ad1bfbdc87290d5fbfd32a7da128983781" +checksum = "95cbcba196b862270bf2a5edb75927380a7f3a163622c61d40cbba416a6305f2" dependencies = [ "arrow-array", "arrow-buffer", @@ -232,9 +234,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67d644b91a162f3ad3135ce1184d0a31c28b816a581e08f29e8e9277a574c64e" +checksum = "2742ac1f6650696ab08c88f6dd3f0eb68ce10f8c253958a18c943a68cd04aec5" dependencies = [ "arrow-buffer", "arrow-schema", @@ -244,9 +246,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03dea5e79b48de6c2e04f03f62b0afea7105be7b77d134f6c5414868feefb80d" +checksum = "a42ea853130f7e78b9b9d178cb4cd01dee0f78e64d96c2949dc0a915d6d9e19d" dependencies = [ "arrow-array", "arrow-buffer", @@ -259,9 +261,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8950719280397a47d37ac01492e3506a8a724b3fb81001900b866637a829ee0f" +checksum = "eaafb5714d4e59feae964714d724f880511500e3569cc2a94d02456b403a2a49" dependencies = [ "arrow-array", "arrow-buffer", @@ -279,9 +281,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ed9630979034077982d8e74a942b7ac228f33dd93a93b615b4d02ad60c260be" +checksum = "e3e6b61e3dc468f503181dccc2fc705bdcc5f2f146755fa5b56d0a6c5943f412" dependencies = [ "arrow-array", "arrow-buffer", @@ -294,9 +296,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "007035e17ae09c4e8993e4cb8b5b96edf0afb927cd38e2dff27189b274d83dcf" +checksum = "848ee52bb92eb459b811fb471175ea3afcf620157674c8794f539838920f9228" dependencies = [ "ahash", "arrow-array", @@ -309,15 +311,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ff3e9c01f7cd169379d269f926892d0e622a704960350d09d331be3ec9e0029" +checksum = "02d9483aaabe910c4781153ae1b6ae0393f72d9ef757d38d09d450070cf2e528" [[package]] name = "arrow-select" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ce20973c1912de6514348e064829e50947e35977bb9d7fb637dc99ea9ffd78c" +checksum = "849524fa70e0e3c5ab58394c770cb8f514d0122d20de08475f7b472ed8075830" dependencies = [ "ahash", "arrow-array", @@ -329,15 +331,16 @@ dependencies = [ [[package]] name = "arrow-string" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f3b37f2aeece31a2636d1b037dabb69ef590e03bdc7eb68519b51ec86932a7" +checksum = "9373cb5a021aee58863498c37eb484998ef13377f69989c6c5ccfbd258236cdb" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", + "memchr", "num", "regex", "regex-syntax", @@ -387,6 +390,15 @@ dependencies = [ "syn 2.0.53", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atty" version = "0.2.14" @@ -739,9 +751,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.2" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" [[package]] name = "blake2" @@ -2128,7 +2140,7 @@ version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "libc", "redox_syscall", ] @@ -2442,9 +2454,9 @@ dependencies = [ [[package]] name = "parquet" -version = "50.0.0" +version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" +checksum = "096795d4f47f65fd3ee1ec5a98b77ab26d602f2cc785b0e4be5443add17ecc32" dependencies = [ "ahash", "arrow-array", @@ -2454,7 +2466,7 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", - "base64 0.21.7", + "base64 0.22.0", "brotli", "bytes", "chrono", @@ -2903,7 +2915,7 @@ version = "0.38.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "errno", "libc", "linux-raw-sys", @@ -3720,9 +3732,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ "getrandom", "serde", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index ad506762f0d0..da744a06f3aa 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -30,7 +30,7 @@ rust-version = "1.72" readme = "README.md" [dependencies] -arrow = "50.0.0" +arrow = "51.0.0" async-trait = "0.1.41" aws-config = "0.55" aws-credential-types = "0.55" @@ -52,7 +52,7 @@ futures = "0.3" mimalloc = { version = "0.1", default-features = false } object_store = { version = "0.9.0", features = ["aws", "gcp", "http"] } parking_lot = { version = "0.12" } -parquet = { version = "50.0.0", default-features = false } +parquet = { version = "51.0.0", default-features = false } regex = "1.8" rustyline = "11.0" tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot", "signal"] } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index ad2a49fb352e..2b6e869ec500 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -74,7 +74,6 @@ serde = { version = "1.0.136", features = ["derive"] } serde_json = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } -# 0.10 and 0.11 are incompatible. Need to upgrade tonic to 0.11 when upgrading to arrow 51 -tonic = "0.10" +tonic = "0.11" url = { workspace = true } uuid = "1.2" diff --git a/datafusion-examples/examples/deserialize_to_struct.rs b/datafusion-examples/examples/deserialize_to_struct.rs index e999fc4dac3e..985cab703a5c 100644 --- a/datafusion-examples/examples/deserialize_to_struct.rs +++ b/datafusion-examples/examples/deserialize_to_struct.rs @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::AsArray; +use arrow::datatypes::{Float64Type, Int32Type}; use datafusion::error::Result; use datafusion::prelude::*; -use serde::Deserialize; +use futures::StreamExt; /// This example shows that it is possible to convert query results into Rust structs . -/// It will collect the query results into RecordBatch, then convert it to serde_json::Value. -/// Then, serde_json::Value is turned into Rust's struct. -/// Any datatype with `Deserialize` implemeneted works. #[tokio::main] async fn main() -> Result<()> { let data_list = Data::new().await?; @@ -30,10 +29,10 @@ async fn main() -> Result<()> { Ok(()) } -#[derive(Deserialize, Debug)] +#[derive(Debug)] struct Data { #[allow(dead_code)] - int_col: i64, + int_col: i32, #[allow(dead_code)] double_col: f64, } @@ -41,35 +40,36 @@ struct Data { impl Data { pub async fn new() -> Result> { // this group is almost the same as the one you find it in parquet_sql.rs - let batches = { - let ctx = SessionContext::new(); + let ctx = SessionContext::new(); - let testdata = datafusion::test_util::parquet_test_data(); + let testdata = datafusion::test_util::parquet_test_data(); - ctx.register_parquet( - "alltypes_plain", - &format!("{testdata}/alltypes_plain.parquet"), - ParquetReadOptions::default(), - ) - .await?; + ctx.register_parquet( + "alltypes_plain", + &format!("{testdata}/alltypes_plain.parquet"), + ParquetReadOptions::default(), + ) + .await?; - let df = ctx - .sql("SELECT int_col, double_col FROM alltypes_plain") - .await?; + let df = ctx + .sql("SELECT int_col, double_col FROM alltypes_plain") + .await?; - df.clone().show().await?; + df.clone().show().await?; - df.collect().await? - }; - let batches: Vec<_> = batches.iter().collect(); + let mut stream = df.execute_stream().await?; + let mut list = vec![]; + while let Some(b) = stream.next().await.transpose()? { + let int_col = b.column(0).as_primitive::(); + let float_col = b.column(1).as_primitive::(); - // converts it to serde_json type and then convert that into Rust type - let list = arrow::json::writer::record_batches_to_json_rows(&batches[..])? - .into_iter() - .map(|val| serde_json::from_value(serde_json::Value::Object(val))) - .take_while(|val| val.is_ok()) - .map(|val| val.unwrap()) - .collect(); + for (i, f) in int_col.values().iter().zip(float_col.values()) { + list.push(Data { + int_col: *i, + double_col: *f, + }) + } + } Ok(list) } diff --git a/datafusion-examples/examples/flight/flight_server.rs b/datafusion-examples/examples/flight/flight_server.rs index cb7b7c28d909..f9d1b8029f04 100644 --- a/datafusion-examples/examples/flight/flight_server.rs +++ b/datafusion-examples/examples/flight/flight_server.rs @@ -18,7 +18,7 @@ use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator}; use std::sync::Arc; -use arrow_flight::SchemaAsIpc; +use arrow_flight::{PollInfo, SchemaAsIpc}; use datafusion::arrow::error::ArrowError; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ListingOptions, ListingTableUrl}; @@ -177,6 +177,13 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { Err(Status::unimplemented("Not yet implemented")) } + + async fn poll_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } } fn to_tonic_err(e: datafusion::error::DataFusionError) -> Status { diff --git a/datafusion-examples/examples/flight/flight_sql_server.rs b/datafusion-examples/examples/flight/flight_sql_server.rs index 35d475623062..ed9457643b7d 100644 --- a/datafusion-examples/examples/flight/flight_sql_server.rs +++ b/datafusion-examples/examples/flight/flight_sql_server.rs @@ -307,6 +307,8 @@ impl FlightSqlService for FlightSqlServiceImpl { let endpoint = FlightEndpoint { ticket: Some(ticket), location: vec![], + expiration_time: None, + app_metadata: Default::default(), }; let endpoints = vec![endpoint]; @@ -329,6 +331,7 @@ impl FlightSqlService for FlightSqlServiceImpl { total_records: -1_i64, total_bytes: -1_i64, ordered: false, + app_metadata: Default::default(), }; let resp = Response::new(info); Ok(resp) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index e8a350e8d389..28e73ba48f53 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -156,6 +156,7 @@ pub(crate) fn parse_encoding_string( "plain" => Ok(parquet::basic::Encoding::PLAIN), "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY), "rle" => Ok(parquet::basic::Encoding::RLE), + #[allow(deprecated)] "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED), "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED), "delta_length_byte_array" => { diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 5ace44f24b69..1f9e72ce57ac 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -1650,7 +1650,11 @@ impl ScalarValue { | DataType::Duration(_) | DataType::Union(_, _) | DataType::Map(_, _) - | DataType::RunEndEncoded(_, _) => { + | DataType::RunEndEncoded(_, _) + | DataType::Utf8View + | DataType::BinaryView + | DataType::ListView(_) + | DataType::LargeListView(_) => { return _internal_err!( "Unsupported creation of {:?} array from ScalarValue {:?}", data_type, @@ -5769,7 +5773,7 @@ mod tests { let batch = RecordBatch::try_from_iter(vec![("s", arr as _)]).unwrap(); #[rustfmt::skip] - let expected = [ + let expected = [ "+---+", "| s |", "+---+", @@ -5803,7 +5807,7 @@ mod tests { &DataType::List(Arc::new(Field::new( "item", DataType::Timestamp(TimeUnit::Millisecond, Some(s.into())), - true + true, ))) ); } diff --git a/datafusion/core/src/datasource/avro_to_arrow/schema.rs b/datafusion/core/src/datasource/avro_to_arrow/schema.rs index 761e6b62680f..039a6aacc07e 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/schema.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/schema.rs @@ -224,6 +224,12 @@ fn default_field_name(dt: &DataType) -> &str { DataType::RunEndEncoded(_, _) => { unimplemented!("RunEndEncoded support not implemented") } + DataType::Utf8View + | DataType::BinaryView + | DataType::ListView(_) + | DataType::LargeListView(_) => { + unimplemented!("View support not implemented") + } DataType::Decimal128(_, _) => "decimal", DataType::Decimal256(_, _) => "decimal", } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index c04c536e7ca6..b7626d41f4dd 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -78,9 +78,6 @@ use hashbrown::HashMap; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; -/// Size of the buffer for [`AsyncArrowWriter`]. -const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760; - /// Initial writing buffer size. Note this is just a size hint for efficiency. It /// will grow beyond the set value if needed. const INITIAL_BUFFER_BYTES: usize = 1048576; @@ -626,7 +623,6 @@ impl ParquetSink { let writer = AsyncArrowWriter::try_new( multipart_writer, self.get_writer_schema(), - PARQUET_WRITER_BUFFER_SIZE, Some(parquet_props), )?; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index a2e645cf3e72..282cd624d036 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -701,12 +701,8 @@ pub async fn plan_to_parquet( let (_, multipart_writer) = storeref.put_multipart(&file).await?; let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { - let mut writer = AsyncArrowWriter::try_new( - multipart_writer, - plan.schema(), - 10485760, - propclone, - )?; + let mut writer = + AsyncArrowWriter::try_new(multipart_writer, plan.schema(), propclone)?; while let Some(next_batch) = stream.next().await { let batch = next_batch?; writer.write(&batch).await?; diff --git a/datafusion/functions/src/datetime/date_part.rs b/datafusion/functions/src/datetime/date_part.rs index 1f00f5bc3137..5d2719bf0365 100644 --- a/datafusion/functions/src/datetime/date_part.rs +++ b/datafusion/functions/src/datetime/date_part.rs @@ -18,16 +18,14 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::types::ArrowTemporalType; -use arrow::array::{Array, ArrayRef, ArrowNumericType, Float64Array, PrimitiveArray}; -use arrow::compute::cast; -use arrow::compute::kernels::temporal; +use arrow::array::{Array, ArrayRef, Float64Array}; +use arrow::compute::{binary, cast, date_part, DatePart}; use arrow::datatypes::DataType::{Date32, Date64, Float64, Timestamp, Utf8}; use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; use arrow::datatypes::{DataType, TimeUnit}; use datafusion_common::cast::{ - as_date32_array, as_date64_array, as_timestamp_microsecond_array, + as_date32_array, as_date64_array, as_int32_array, as_timestamp_microsecond_array, as_timestamp_millisecond_array, as_timestamp_nanosecond_array, as_timestamp_second_array, }; @@ -78,46 +76,6 @@ impl DatePartFunc { } } -macro_rules! extract_date_part { - ($ARRAY: expr, $FN:expr) => { - match $ARRAY.data_type() { - DataType::Date32 => { - let array = as_date32_array($ARRAY)?; - Ok($FN(array) - .map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?) - } - DataType::Date64 => { - let array = as_date64_array($ARRAY)?; - Ok($FN(array) - .map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?) - } - DataType::Timestamp(time_unit, _) => match time_unit { - TimeUnit::Second => { - let array = as_timestamp_second_array($ARRAY)?; - Ok($FN(array) - .map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?) - } - TimeUnit::Millisecond => { - let array = as_timestamp_millisecond_array($ARRAY)?; - Ok($FN(array) - .map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?) - } - TimeUnit::Microsecond => { - let array = as_timestamp_microsecond_array($ARRAY)?; - Ok($FN(array) - .map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?) - } - TimeUnit::Nanosecond => { - let array = as_timestamp_nanosecond_array($ARRAY)?; - Ok($FN(array) - .map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?) - } - }, - datatype => exec_err!("Extract does not support datatype {:?}", datatype), - } - }; -} - impl ScalarUDFImpl for DatePartFunc { fn as_any(&self) -> &dyn Any { self @@ -139,16 +97,15 @@ impl ScalarUDFImpl for DatePartFunc { if args.len() != 2 { return exec_err!("Expected two arguments in DATE_PART"); } - let (date_part, array) = (&args[0], &args[1]); + let (part, array) = (&args[0], &args[1]); - let date_part = - if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = date_part { - v - } else { - return exec_err!( - "First argument of `DATE_PART` must be non-null scalar Utf8" - ); - }; + let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part { + v + } else { + return exec_err!( + "First argument of `DATE_PART` must be non-null scalar Utf8" + ); + }; let is_scalar = matches!(array, ColumnarValue::Scalar(_)); @@ -157,28 +114,28 @@ impl ScalarUDFImpl for DatePartFunc { ColumnarValue::Scalar(scalar) => scalar.to_array()?, }; - let arr = match date_part.to_lowercase().as_str() { - "year" => extract_date_part!(&array, temporal::year), - "quarter" => extract_date_part!(&array, temporal::quarter), - "month" => extract_date_part!(&array, temporal::month), - "week" => extract_date_part!(&array, temporal::week), - "day" => extract_date_part!(&array, temporal::day), - "doy" => extract_date_part!(&array, temporal::doy), - "dow" => extract_date_part!(&array, temporal::num_days_from_sunday), - "hour" => extract_date_part!(&array, temporal::hour), - "minute" => extract_date_part!(&array, temporal::minute), - "second" => extract_date_part!(&array, seconds), - "millisecond" => extract_date_part!(&array, millis), - "microsecond" => extract_date_part!(&array, micros), - "nanosecond" => extract_date_part!(&array, nanos), - "epoch" => extract_date_part!(&array, epoch), - _ => exec_err!("Date part '{date_part}' not supported"), - }?; + let arr = match part.to_lowercase().as_str() { + "year" => date_part_f64(array.as_ref(), DatePart::Year)?, + "quarter" => date_part_f64(array.as_ref(), DatePart::Quarter)?, + "month" => date_part_f64(array.as_ref(), DatePart::Month)?, + "week" => date_part_f64(array.as_ref(), DatePart::Week)?, + "day" => date_part_f64(array.as_ref(), DatePart::Day)?, + "doy" => date_part_f64(array.as_ref(), DatePart::DayOfYear)?, + "dow" => date_part_f64(array.as_ref(), DatePart::DayOfWeekSunday0)?, + "hour" => date_part_f64(array.as_ref(), DatePart::Hour)?, + "minute" => date_part_f64(array.as_ref(), DatePart::Minute)?, + "second" => seconds(array.as_ref(), Second)?, + "millisecond" => seconds(array.as_ref(), Millisecond)?, + "microsecond" => seconds(array.as_ref(), Microsecond)?, + "nanosecond" => seconds(array.as_ref(), Nanosecond)?, + "epoch" => epoch(array.as_ref())?, + _ => return exec_err!("Date part '{part}' not supported"), + }; Ok(if is_scalar { - ColumnarValue::Scalar(ScalarValue::try_from_array(&arr?, 0)?) + ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?) } else { - ColumnarValue::Array(arr?) + ColumnarValue::Array(arr) }) } @@ -187,83 +144,52 @@ impl ScalarUDFImpl for DatePartFunc { } } -fn to_ticks(array: &PrimitiveArray, frac: i32) -> Result -where - T: ArrowTemporalType + ArrowNumericType, - i64: From, -{ - let zipped = temporal::second(array)? - .values() - .iter() - .zip(temporal::nanosecond(array)?.values().iter()) - .map(|o| (*o.0 as f64 + (*o.1 as f64) / 1_000_000_000.0) * (frac as f64)) - .collect::>(); - - Ok(Float64Array::from(zipped)) +/// Invoke [`date_part`] and cast the result to Float64 +fn date_part_f64(array: &dyn Array, part: DatePart) -> Result { + Ok(cast(date_part(array, part)?.as_ref(), &Float64)?) } -fn seconds(array: &PrimitiveArray) -> Result -where - T: ArrowTemporalType + ArrowNumericType, - i64: From, -{ - to_ticks(array, 1) -} - -fn millis(array: &PrimitiveArray) -> Result -where - T: ArrowTemporalType + ArrowNumericType, - i64: From, -{ - to_ticks(array, 1_000) -} - -fn micros(array: &PrimitiveArray) -> Result -where - T: ArrowTemporalType + ArrowNumericType, - i64: From, -{ - to_ticks(array, 1_000_000) +/// invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the +/// result to a total number of seconds, milliseconds, microseconds or +/// nanoseconds +/// +/// # Panics +/// If `array` is not a temporal type such as Timestamp or Date32 +fn seconds(array: &dyn Array, unit: TimeUnit) -> Result { + let sf = match unit { + Second => 1_f64, + Millisecond => 1_000_f64, + Microsecond => 1_000_000_f64, + Nanosecond => 1_000_000_000_f64, + }; + let secs = date_part(array, DatePart::Second)?; + let secs = as_int32_array(secs.as_ref())?; + let subsecs = date_part(array, DatePart::Nanosecond)?; + let subsecs = as_int32_array(subsecs.as_ref())?; + + let r: Float64Array = binary(secs, subsecs, |secs, subsecs| { + (secs as f64 + (subsecs as f64 / 1_000_000_000_f64)) * sf + })?; + Ok(Arc::new(r)) } -fn nanos(array: &PrimitiveArray) -> Result -where - T: ArrowTemporalType + ArrowNumericType, - i64: From, -{ - to_ticks(array, 1_000_000_000) -} +fn epoch(array: &dyn Array) -> Result { + const SECONDS_IN_A_DAY: f64 = 86400_f64; -fn epoch(array: &PrimitiveArray) -> Result -where - T: ArrowTemporalType + ArrowNumericType, - i64: From, -{ - let b = match array.data_type() { - Timestamp(tu, _) => { - let scale = match tu { - Second => 1, - Millisecond => 1_000, - Microsecond => 1_000_000, - Nanosecond => 1_000_000_000, - } as f64; - array.unary(|n| { - let n: i64 = n.into(); - n as f64 / scale - }) + let f: Float64Array = match array.data_type() { + Timestamp(Second, _) => as_timestamp_second_array(array)?.unary(|x| x as f64), + Timestamp(Millisecond, _) => { + as_timestamp_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64) + } + Timestamp(Microsecond, _) => { + as_timestamp_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64) } - Date32 => { - let seconds_in_a_day = 86400_f64; - array.unary(|n| { - let n: i64 = n.into(); - n as f64 * seconds_in_a_day - }) + Timestamp(Nanosecond, _) => { + as_timestamp_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64) } - Date64 => array.unary(|n| { - let n: i64 = n.into(); - n as f64 / 1_000_f64 - }), - _ => return exec_err!("Can not convert {:?} to epoch", array.data_type()), + Date32 => as_date32_array(array)?.unary(|x| x as f64 * SECONDS_IN_A_DAY), + Date64 => as_date64_array(array)?.unary(|x| x as f64 / 1_000_f64), + d => return exec_err!("Can not convert {d:?} to epoch"), }; - Ok(b) + Ok(Arc::new(f)) } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 478f7c779552..92015594906b 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -230,6 +230,9 @@ impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum { "Proto serialization error: The RunEndEncoded data type is not yet supported".to_owned() )) } + DataType::Utf8View | DataType::BinaryView | DataType::ListView(_) | DataType::LargeListView(_) => { + return Err(Error::General(format!("Proto serialization error: {val} not yet supported"))) + } }; Ok(res) diff --git a/datafusion/sql/src/unparser/expr.rs b/datafusion/sql/src/unparser/expr.rs index 9680177d736f..c26e8481ce43 100644 --- a/datafusion/sql/src/unparser/expr.rs +++ b/datafusion/sql/src/unparser/expr.rs @@ -491,11 +491,15 @@ impl Unparser<'_> { DataType::Binary => todo!(), DataType::FixedSizeBinary(_) => todo!(), DataType::LargeBinary => todo!(), + DataType::BinaryView => todo!(), DataType::Utf8 => Ok(ast::DataType::Varchar(None)), DataType::LargeUtf8 => Ok(ast::DataType::Text), + DataType::Utf8View => todo!(), DataType::List(_) => todo!(), DataType::FixedSizeList(_, _) => todo!(), DataType::LargeList(_) => todo!(), + DataType::ListView(_) => todo!(), + DataType::LargeListView(_) => todo!(), DataType::Struct(_) => todo!(), DataType::Union(_, _) => todo!(), DataType::Dictionary(_, _) => todo!(),