Skip to content

Commit

Permalink
Support to_timestamp with chrono formatting apache#5398
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 committed Jan 16, 2024
1 parent a87e19e commit c5e230a
Show file tree
Hide file tree
Showing 6 changed files with 825 additions and 136 deletions.
90 changes: 90 additions & 0 deletions datafusion-examples/examples/dataframe_to_timestamp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use datafusion::arrow::array::StringArray;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::prelude::*;

/// This example demonstrates how to use the DataFrame API against in-memory data.
#[tokio::main]
async fn main() -> Result<()> {
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]));

// define data.
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["2020-09-08T13:42:29Z", "2020-09-08T13:42:29.190855-05:00", "2020-08-09 12:13:29", "2020-01-02"])),
Arc::new(StringArray::from(vec!["2020-09-08T13:42:29Z", "2020-09-08T13:42:29.190855-05:00", "08-09-2020 13/42/29", "09-27-2020 13:42:29-05:30"])),
],
)?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
let ctx = SessionContext::new();

// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
let df = ctx.table("t").await?;

// use to_timestamp function to convert col 'a' to timestamp type using the default parsing
let df = df.with_column("a", to_timestamp(vec![col("a")]))?;
// use to_timestamp_seconds function to convert col 'b' to timestamp(Seconds) type using a list of chrono formats to try
let df = df.with_column("b", to_timestamp_seconds(vec![col("b"), lit("%+"), lit("%d-%m-%Y %H/%M/%S"), lit("%m-%d-%Y %H:%M:%S%#z")]))?;

let df = df.select_columns(&["a", "b"])?;

// print the results
df.show().await?;

// use sql to convert col 'a' to timestamp using the default parsing
let df = ctx.sql("select to_timestamp(a) from t").await?;

// print the results
df.show().await?;

// use sql to convert col 'b' to timestamp using a list of chrono formats to try
let df = ctx.sql("select to_timestamp(b, '%+', '%d-%m-%Y %H/%M/%S', '%m-%d-%Y %H:%M:%S%#z') from t").await?;

// print the results
df.show().await?;

// use sql to convert a static string to a timestamp using a list of chrono formats to try
let df = ctx.sql("select to_timestamp('01-14-2023 01:01:30+05:30', '%+', '%d-%m-%Y %H/%M/%S', '%m-%d-%Y %H:%M:%S%#z')").await?;

// print the results
df.show().await?;

// use sql to convert a static string to a timestamp using a non-matching chrono format to try
let result = ctx.sql("select to_timestamp('01-14-2023 01/01/30', '%d-%m-%Y %H:%M:%S')").await?.collect().await;

if result.is_err() {
println!("Received the expected error: {:?}", result.err().unwrap());
}
else {
panic!("timestamp parsing with no matching formats should fail")
}

Ok(())
}
68 changes: 7 additions & 61 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1053,67 +1053,13 @@ impl BuiltinScalarFunction {
vec![Exact(vec![Utf8, Int64]), Exact(vec![LargeUtf8, Int64])],
self.volatility(),
),
BuiltinScalarFunction::ToTimestamp => Signature::uniform(
1,
vec![
Int64,
Float64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestampMillis => Signature::uniform(
1,
vec![
Int64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestampMicros => Signature::uniform(
1,
vec![
Int64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestampNanos => Signature::uniform(
1,
vec![
Int64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestampSeconds => Signature::uniform(
1,
vec![
Int64,
Timestamp(Nanosecond, None),
Timestamp(Microsecond, None),
Timestamp(Millisecond, None),
Timestamp(Second, None),
Utf8,
],
self.volatility(),
),
BuiltinScalarFunction::ToTimestamp
| BuiltinScalarFunction::ToTimestampSeconds
| BuiltinScalarFunction::ToTimestampMillis
| BuiltinScalarFunction::ToTimestampMicros
| BuiltinScalarFunction::ToTimestampNanos => {
Signature::variadic_any(self.volatility())
},
BuiltinScalarFunction::FromUnixtime => {
Signature::uniform(1, vec![Int64], self.volatility())
}
Expand Down
17 changes: 9 additions & 8 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,28 +885,29 @@ nary_scalar_expr!(
scalar_expr!(DatePart, date_part, part date, "extracts a subfield from the date");
scalar_expr!(DateTrunc, date_trunc, part date, "truncates the date to a specified level of precision");
scalar_expr!(DateBin, date_bin, stride source origin, "coerces an arbitrary timestamp to the start of the nearest specified interval");
scalar_expr!(
nary_scalar_expr!(
ToTimestamp,
to_timestamp,
"converts a string to a `Timestamp(Nanoseconds, None)`"
);
nary_scalar_expr!(
ToTimestampMillis,
to_timestamp_millis,
date,
"converts a string to a `Timestamp(Milliseconds, None)`"
);
scalar_expr!(
nary_scalar_expr!(
ToTimestampMicros,
to_timestamp_micros,
date,
"converts a string to a `Timestamp(Microseconds, None)`"
);
scalar_expr!(
nary_scalar_expr!(
ToTimestampNanos,
to_timestamp_nanos,
date,
"converts a string to a `Timestamp(Nanoseconds, None)`"
);
scalar_expr!(
nary_scalar_expr!(
ToTimestampSeconds,
to_timestamp_seconds,
date,
"converts a string to a `Timestamp(Seconds, None)`"
);
scalar_expr!(
Expand Down
Loading

0 comments on commit c5e230a

Please sign in to comment.