Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bson timestamp type casting #2593

Merged
merged 2 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 67 additions & 5 deletions crates/datasources/src/bson/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::arrow::array::{
StructArray,
TimestampMicrosecondBuilder,
TimestampMillisecondBuilder,
TimestampNanosecondBuilder,
TimestampSecondBuilder,
};
use datafusion::arrow::datatypes::{DataType, Field, Fields, TimeUnit};
Expand Down Expand Up @@ -331,7 +332,10 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
)
}

// Datetime (actual timestamps that you'd actually use. in an application )
// Datetime (actual timestamps that you'd actually use in an application)
(RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Second, _)) => {
append_scalar!(TimestampSecondBuilder, col, v.timestamp_millis() / 1000)
}
(RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Millisecond, _)) => {
append_scalar!(TimestampMillisecondBuilder, col, v.timestamp_millis())
}
Expand All @@ -342,9 +346,19 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
v.timestamp_millis() * 1000
)
}
(RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Nanosecond, _)) => {
append_scalar!(
TimestampNanosecondBuilder,
col,
v.timestamp_millis() * 1000 * 1000
)
}
(RawBsonRef::DateTime(v), DataType::Date64) => {
append_scalar!(Date64Builder, col, v.timestamp_millis())
}
(RawBsonRef::DateTime(v), DataType::Date32) => {
append_scalar!(Date32Builder, col, (v.timestamp_millis() / 1000) as i32)
}

// Document
(RawBsonRef::Document(nested), DataType::Struct(_)) => {
Expand Down Expand Up @@ -411,21 +425,56 @@ fn append_null(typ: &DataType, col: &mut dyn ArrayBuilder) -> Result<()> {
.downcast_mut::<Float64Builder>()
.unwrap()
.append_null(),
&DataType::Timestamp(_, _) => col
&DataType::Timestamp(TimeUnit::Nanosecond, _) => col
.as_any_mut()
.downcast_mut::<TimestampNanosecondBuilder>()
.unwrap()
.append_null(),
&DataType::Timestamp(TimeUnit::Microsecond, _) => col
.as_any_mut()
.downcast_mut::<TimestampMillisecondBuilder>()
.unwrap()
.append_null(),
&DataType::Timestamp(TimeUnit::Millisecond, _) => col
.as_any_mut()
.downcast_mut::<TimestampMillisecondBuilder>()
.unwrap()
.append_null(),
&DataType::Timestamp(TimeUnit::Second, _) => col
.as_any_mut()
.downcast_mut::<TimestampMillisecondBuilder>() // TODO: Possibly change to nanosecond.
.downcast_mut::<TimestampSecondBuilder>()
.unwrap()
.append_null(),
&DataType::Date64 => col
.as_any_mut()
.downcast_mut::<Date64Builder>()
.unwrap()
.append_null(),
&DataType::Date32 => col
.as_any_mut()
.downcast_mut::<Date32Builder>()
.unwrap()
.append_null(),
&DataType::Utf8 => col
.as_any_mut()
.downcast_mut::<StringBuilder>()
.unwrap()
.append_null(),
&DataType::LargeUtf8 => col
.as_any_mut()
.downcast_mut::<LargeStringBuilder>()
.unwrap()
.append_null(),
&DataType::Binary => col
.as_any_mut()
.downcast_mut::<BinaryBuilder>()
.unwrap()
.append_null(),
&DataType::LargeBinary => col
.as_any_mut()
.downcast_mut::<LargeBinaryBuilder>()
.unwrap()
.append_null(),
&DataType::Struct(_) => col
.as_any_mut()
.downcast_mut::<RecordStructBuilder>()
Expand Down Expand Up @@ -453,11 +502,24 @@ fn column_builders_for_fields(
DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
DataType::Timestamp(_, _) => {
Box::new(TimestampMicrosecondBuilder::with_capacity(capacity)) // TODO: Possibly change to nanosecond.
DataType::Timestamp(TimeUnit::Second, _) => {
Box::new(TimestampSecondBuilder::with_capacity(capacity))
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
Box::new(TimestampMicrosecondBuilder::with_capacity(capacity))
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
Box::new(TimestampMillisecondBuilder::with_capacity(capacity))
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
Box::new(TimestampNanosecondBuilder::with_capacity(capacity))
}
DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, 10)), // TODO: Can collect avg when inferring schema.
DataType::LargeUtf8 => Box::new(LargeStringBuilder::with_capacity(capacity, 10)), // TODO: Can collect avg when inferring schema.
DataType::Binary => Box::new(BinaryBuilder::with_capacity(capacity, 10)), // TODO: Can collect avg when inferring schema.
DataType::LargeBinary => Box::new(LargeBinaryBuilder::with_capacity(capacity, 10)), // TODO: Can collect avg when inferring schema.
DataType::Decimal128(_, _) => Box::new(Decimal128Builder::with_capacity(capacity)), // TODO: Can collect avg when inferring schema.
DataType::Struct(fields) => {
let nested = column_builders_for_fields(fields.clone(), capacity)?;
Expand Down
44 changes: 25 additions & 19 deletions crates/datasources/src/bson/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod schema;
pub mod stream;
pub mod table;

use bson::DateTime;
use datafusion::arrow::array::cast::as_string_array;
use datafusion::arrow::array::types::{
Date32Type,
Expand Down Expand Up @@ -36,7 +37,13 @@ use datafusion::arrow::array::types::{
UInt8Type,
};
use datafusion::arrow::array::{Array, AsArray, StructArray};
use datafusion::arrow::datatypes::{DataType, Fields, IntervalUnit, TimeUnit};
use datafusion::arrow::datatypes::{
DataType,
Fields,
IntervalUnit,
TimeUnit,
TimestampNanosecondType,
};
use datafusion::arrow::error::ArrowError;

pub struct BsonBatchConverter {
Expand Down Expand Up @@ -193,14 +200,16 @@ pub fn array_to_bson(array: &dyn Array) -> Result<Vec<bson::Bson>, ArrowError> {
}))
})
}
DataType::Date32 => array
.as_primitive::<Date32Type>()
.iter()
.for_each(|val| out.push(bson::Bson::Int32(val.unwrap_or_default()))),
DataType::Date64 => array
.as_primitive::<Date64Type>()
.iter()
.for_each(|val| out.push(bson::Bson::Int64(val.unwrap_or_default()))),
DataType::Date64 => array.as_primitive::<Date64Type>().iter().for_each(|val| {
out.push(bson::Bson::DateTime(DateTime::from_millis(
val.unwrap_or_default(),
)))
}),
DataType::Date32 => array.as_primitive::<Date32Type>().iter().for_each(|val| {
out.push(bson::Bson::DateTime(DateTime::from_millis(
(val.unwrap_or_default() / 1000) as i64,
)))
}),
DataType::Interval(IntervalUnit::DayTime) => array
.as_primitive::<IntervalDayTimeType>()
.iter()
Expand Down Expand Up @@ -235,11 +244,11 @@ pub fn array_to_bson(array: &dyn Array) -> Result<Vec<bson::Bson>, ArrowError> {
.iter()
.for_each(|val| {
out.push(bson::Bson::DateTime(bson::datetime::DateTime::from_millis(
val.unwrap_or_default() / 100,
val.unwrap_or_default() / 1000,
)))
}),
DataType::Timestamp(TimeUnit::Nanosecond, _) => array
.as_primitive::<TimestampMicrosecondType>()
.as_primitive::<TimestampNanosecondType>()
.iter()
.for_each(|val| {
out.push(bson::Bson::DateTime(bson::datetime::DateTime::from_millis(
Expand All @@ -254,14 +263,6 @@ pub fn array_to_bson(array: &dyn Array) -> Result<Vec<bson::Bson>, ArrowError> {
.as_primitive::<Time32MillisecondType>()
.iter()
.for_each(|val| out.push(bson::Bson::Int32(val.unwrap_or_default()))),
DataType::Time32(TimeUnit::Nanosecond)
| DataType::Time32(TimeUnit::Microsecond)
| DataType::Time64(TimeUnit::Second)
| DataType::Time64(TimeUnit::Millisecond) => {
return Err(ArrowError::CastError(
"unreasonable time value conversion BSON".to_string(),
))
}
DataType::Time64(TimeUnit::Microsecond) => array
.as_primitive::<Time64MicrosecondType>()
.iter()
Expand All @@ -270,6 +271,11 @@ pub fn array_to_bson(array: &dyn Array) -> Result<Vec<bson::Bson>, ArrowError> {
.as_primitive::<Time64NanosecondType>()
.iter()
.for_each(|val| out.push(bson::Bson::Int64(val.unwrap_or_default()))),
DataType::Time32(_) | DataType::Time64(_) => {
return Err(ArrowError::CastError(
"unreasonable time value conversion BSON".to_string(),
))
}
DataType::Duration(TimeUnit::Second) => array
.as_primitive::<DurationSecondType>()
.iter()
Expand Down
33 changes: 33 additions & 0 deletions testdata/sqllogictests_object_store/local/bson.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
statement ok
COPY (select * from '${PWD}/testdata/parquet/userdata1.parquet') to '${TMP}/userdata1.bson';

query I
SELECT count(*) FROM '${TMP}/userdata1.bson';
----
1000

statement ok
create table timestamps (t timestamp);

statement ok
insert into timestamps values (arrow_cast(946684860000000, 'Timestamp(Microsecond, None)'));

query
select * from timestamps;
----
2000-01-01 00:01:00

statement ok
copy timestamps to '${TMP}/timestamp_test_out.bson';

# bson's date format is an int64, of milliseconds since unix epoch;
Copy link
Contributor

@universalmind303 universalmind303 Feb 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't there the '\x09'

"UTC datetime - The int64 is UTC milliseconds since the Unix epoch."

which isn't part of the internal mongodb spec

https://bsonspec.org/spec.html

Copy link
Collaborator Author

@tychoish tychoish Feb 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, exactly, that's what we're using. We just convert all arrow Date and Timestamp values into these int64 (millis since epoch), but when we read bson data back into GlareDB all of those values become Date64; not back to timestamps.

> create table timestamps (t timestamp);
Table created
> insert into timestamps values (arrow_cast(946684860000000, 'Timestamp(Microsecond, None)'));
Inserted 1 row
>
>
> select * from timestamps;
┌─────────────────────┐
│ t                   │
│ ──                  │
│ Timestamp<µs, UTC>  │
╞═════════════════════╡
│ 2000-01-01T00:01:00 │
└─────────────────────┘
> copy timestamps to 'timestamps.bson';
Copy success
> select * from timestamps;
┌─────────────────────┐
│ t                   │
│ ──                  │
│ Timestamp<µs, UTC>  │
╞═════════════════════╡
│ 2000-01-01T00:01:00 │
└─────────────────────┘
> select * from 'timestamps.bson';
┌─────────────────────┐
│ t                   │
│ ──                  │
│ Date64              │
╞═════════════════════╡
│ 2000-01-01T00:01:00 │
└─────────────────────┘

It works as intended but sqllogictest renders Date64 as int64s.

When we add support for declaring a schema rather than inferring it, we'll be able to cast these bson dates to whatever we need (with only the expected loss of fidelity and resolution), and the code mostly supports this today.


which isn't part of the internal mongodb spec

The internal one is \x11 which they call Timestamp (it's an int32 of seconds-since-epoch, and a int32 increment, but it's used as a logical/monotonic clock, but a sort of weird one to keep operations in the replication log (oplog) in order.) BSON libraries implement but no one should use it.

# which most closely maps to arrow's Date64, and since that's the only
# date format, (the bson "timestamp" type is really a mongodb
# implementation detail, generally regarded to be a mistake.) all
# arrow timetstamp and date types are upcast or truncated into this
# type. (Time64 and Time32 just go to integers)

query
select * from '${TMP}/timestamp_test_out.bson';
----
946684860000
Loading