Skip to content

Commit

Permalink
Support arrow:schema in Parquet writer to faithfully roundtrip `dur…
Browse files Browse the repository at this point in the history
…ation` types with Arrow (#15875)

Closes #15847

This PR adds the support to construct and write base64-encoded serialized `arrow:schema`-type IPC message to parquet file footer to allow faithfully roundtrip with Arrow via Parquet for `duration` type.

### Answered
- [x] Only construct and write `arrow:schema` if  asked by the user via `store_schema` argument (cudf) or `write_arrow_schema` (libcudf). i.e. Default these variables to `false` otherwise.
- [x] The internal/libcudf variable name for `store_schema` can stay `write_arrow_schema` and it should be fine. This has been done to disambiguate which schema (arrow or parquet) we are talking about.
- [x] Separate PR: `int96_timestamps` cannot be deprecated/removed in cuDF as Spark is actively using it. #15901 
- [x] cuDF Parquet writer supports `decimal32` and `decimal64` [fixed types](https://github.com/rapidsai/cudf/blob/branch-24.08/cpp/src/io/parquet/writer_impl.cu#L561). These are not directly supported by Arrow so we will [convert](https://github.com/rapidsai/cudf/blob/branch-24.08/cpp/src/interop/to_arrow.cu#L155) `decimal32/decimal64` columns to `decimal128`.
- [x] `is_col_nullable()` function moved to `writer_impl_helpers.cpp` along with some other helper functions.
- [x] A common `convert_data_to_decimal128` can be separated out and used in `writer_impl.cu` and `to_arrow.cu`. Tracking in a separate issue. #16194 

CC @vuule @etseidl @nvdbaranec @GregoryKimball @galipremsagar for vis.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Thomas Li (https://github.com/lithomas1)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Bradley Dice (https://github.com/bdice)

URL: #15875
  • Loading branch information
mhaseeb123 committed Jul 9, 2024
1 parent 248b2de commit 67bd366
Show file tree
Hide file tree
Showing 18 changed files with 1,386 additions and 197 deletions.
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ add_library(
src/io/orc/stripe_init.cu
src/datetime/timezone.cpp
src/io/orc/writer_impl.cu
src/io/parquet/arrow_schema_writer.cpp
src/io/parquet/compact_protocol_reader.cpp
src/io/parquet/compact_protocol_writer.cpp
src/io/parquet/decode_preprocess.cu
Expand All @@ -425,6 +426,7 @@ add_library(
src/io/parquet/reader_impl_helpers.cpp
src/io/parquet/reader_impl_preprocess.cu
src/io/parquet/writer_impl.cu
src/io/parquet/writer_impl_helpers.cpp
src/io/parquet/decode_fixed.cu
src/io/statistics/orc_column_statistics.cu
src/io/statistics/parquet_column_statistics.cu
Expand Down
25 changes: 25 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ class parquet_writer_options_base {
// Parquet writer can write timestamps as UTC
// Defaults to true because libcudf timestamps are implicitly UTC
bool _write_timestamps_as_UTC = true;
// Whether to write ARROW schema
bool _write_arrow_schema = false;
// Maximum size of each row group (unless smaller than a single page)
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
Expand Down Expand Up @@ -689,6 +691,13 @@ class parquet_writer_options_base {
*/
[[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; }

/**
* @brief Returns `true` if arrow schema will be written
*
* @return `true` if arrow schema will be written
*/
[[nodiscard]] auto is_enabled_write_arrow_schema() const { return _write_arrow_schema; }

/**
* @brief Returns maximum row group size, in bytes.
*
Expand Down Expand Up @@ -824,6 +833,13 @@ class parquet_writer_options_base {
*/
void enable_utc_timestamps(bool val);

/**
* @brief Sets preference for writing arrow schema. Write arrow schema if set to `true`.
*
* @param val Boolean value to enable/disable writing of arrow schema.
*/
void enable_write_arrow_schema(bool val);

/**
* @brief Sets the maximum row group size, in bytes.
*
Expand Down Expand Up @@ -1084,6 +1100,15 @@ class parquet_writer_options_builder_base {
* @return this for chaining
*/
BuilderT& utc_timestamps(bool enabled);

/**
* @brief Set to true if arrow schema is to be written
*
* @param enabled Boolean value to enable/disable writing of arrow schema
* @return this for chaining
*/
BuilderT& write_arrow_schema(bool enabled);

/**
* @brief Set to true if V2 page headers are to be written.
*
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,9 @@ void parquet_writer_options_base::set_compression(compression_type compression)

void parquet_writer_options_base::enable_int96_timestamps(bool req)
{
CUDF_EXPECTS(not req or not is_enabled_write_arrow_schema(),
"INT96 timestamps and arrow schema cannot be simultaneously "
"enabled as INT96 timestamps are deprecated in Arrow.");
_write_timestamps_as_int96 = req;
}

Expand All @@ -770,6 +773,14 @@ void parquet_writer_options_base::enable_utc_timestamps(bool val)
_write_timestamps_as_UTC = val;
}

void parquet_writer_options_base::enable_write_arrow_schema(bool val)
{
CUDF_EXPECTS(not val or not is_enabled_int96_timestamps(),
"arrow schema and INT96 timestamps cannot be simultaneously "
"enabled as INT96 timestamps are deprecated in Arrow.");
_write_arrow_schema = val;
}

void parquet_writer_options_base::set_row_group_size_bytes(size_t size_bytes)
{
CUDF_EXPECTS(
Expand Down Expand Up @@ -974,6 +985,13 @@ BuilderT& parquet_writer_options_builder_base<BuilderT, OptionsT>::utc_timestamp
return static_cast<BuilderT&>(*this);
}

template <class BuilderT, class OptionsT>
BuilderT& parquet_writer_options_builder_base<BuilderT, OptionsT>::write_arrow_schema(bool enabled)
{
_options.enable_write_arrow_schema(enabled);
return static_cast<BuilderT&>(*this);
}

template <class BuilderT, class OptionsT>
BuilderT& parquet_writer_options_builder_base<BuilderT, OptionsT>::write_v2_headers(bool enabled)
{
Expand Down
Loading

0 comments on commit 67bd366

Please sign in to comment.