-
Notifications
You must be signed in to change notification settings - Fork 285
feat(datafusion): Support insert_into in IcebergTableProvider #1511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@@ -440,10 +440,12 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor { | |||
Ok(schema_partner) | |||
} | |||
|
|||
// todo generate field_pos in datafusion instead of passing to here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it tricky to handle this case: the input from datafusion won't have field id, and we will need to assign them manually. maybe there is a way to do name mapping here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help me to understand why we need to change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CTTY for this pr, just finished round of review. My suggestion is to start with unpartitioned table first.
// Define a schema. | ||
Arc::new(ArrowSchema::new(vec![ | ||
Field::new("data_files", DataType::Utf8, false), | ||
Field::new("count", DataType::UInt64, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the meaning of count
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Datafusion expects insert_into
to return the number of rows(count
) it written: https://datafusion.apache.org/user-guide/sql/dml.html#insert Here I'm sending count
to the commit node, and have the commit node to return the number of rows eventually.
Technically we don't need to follow Datafusion's convention on insert_into
and can return nothing, do you think that would be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should still follow datafusion's convention. But do we really need this? DataFile
has a field called record_count
, and I think it's enough for insert only case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah using record_count
makes more sense, I'll fix this
@@ -432,3 +433,69 @@ async fn test_metadata_table() -> Result<()> { | |||
|
|||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn test_insert_into() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of adding this kind of integration tests. How about adding sqllogictests?
// Define a schema. | ||
Arc::new(ArrowSchema::new(vec![ | ||
Field::new("data_files", DataType::Utf8, false), | ||
Field::new("count", DataType::UInt64, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should still follow datafusion's convention. But do we really need this? DataFile
has a field called record_count
, and I think it's enough for insert only case?
PlanProperties::new( | ||
EquivalenceProperties::new(schema), | ||
input.output_partitioning().clone(), | ||
input.pipeline_behavior(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be Final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking maybe IcebergWriteExec
can be used for the steaming case so the pipeline behavior and boundedness should be the same as input
's. for normal INSERT INTO
query it shouldn't matter as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite familiar with datafusion's streaming mode, but my suggestion is that we should not assume it's executed in streaming for now. We could always change this when we actually add streaming support.
EquivalenceProperties::new(schema), | ||
input.output_partitioning().clone(), | ||
input.pipeline_behavior(), | ||
input.boundedness(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be Bounded
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
PlanProperties::new( | ||
EquivalenceProperties::new(schema), | ||
input.output_partitioning().clone(), | ||
input.pipeline_behavior(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite familiar with datafusion's streaming mode, but my suggestion is that we should not assume it's executed in streaming for now. We could always change this when we actually add streaming support.
EquivalenceProperties::new(schema), | ||
input.output_partitioning().clone(), | ||
input.pipeline_behavior(), | ||
input.boundedness(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
) -> DFResult<Arc<dyn ExecutionPlan>> { | ||
if children.len() != 1 { | ||
return Err(DataFusionError::Internal( | ||
"IcebergWriteExec expects exactly one child".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"IcebergWriteExec expects exactly one child".to_string(), | |
"IcebergWriteExec expects exactly one child, but provided {} ".to_string(), |
|
||
// Create data file writer builder | ||
let data_file_writer_builder = DataFileWriterBuilder::new( | ||
ParquetWriterBuilder::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be RollingFileWriter
fn make_result_batch(data_files: Vec<String>) -> DFResult<RecordBatch> { | ||
let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef; | ||
|
||
RecordBatch::try_from_iter_with_nullable(vec![("data_files", files_array, false)]).map_err( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why not just try_new
so that we could reuse the result of make_result_schema
?
let batch = batch_result?; | ||
|
||
let files_array = batch | ||
.column_by_name("data_files") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should define these as constants
// // Apply the action and commit the transaction | ||
// let updated_table = action | ||
// .apply(tx) | ||
// .map_err(to_datafusion_error)? | ||
// .commit(catalog.as_ref()) | ||
// .await | ||
// .map_err(to_datafusion_error)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why comment out this?
pub fn serialize_data_file_to_json( | ||
data_file: DataFile, | ||
partition_type: &super::StructType, | ||
is_version_1: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We hould use TableFormatVersion
println!("----StructArray from record stream: {:?}", struct_arr); | ||
println!("----Schema.as_struct from table: {:?}", schema.as_struct()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use log here.
@@ -440,10 +440,12 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor { | |||
Ok(schema_partner) | |||
} | |||
|
|||
// todo generate field_pos in datafusion instead of passing to here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help me to understand why we need to change this?
Which issue does this PR close?
What changes are included in this PR?
Are these changes tested?