Skip to content

Commit

Permalink
simplify sink
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 19, 2024
1 parent 28b88c6 commit ff19cec
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 21 deletions.
24 changes: 10 additions & 14 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub struct IcebergConfig {
pub force_append_only: bool,

#[serde(flatten)]
pub common: IcebergCommon,
common: IcebergCommon,

#[serde(
rename = "s3.path.style.access",
Expand Down Expand Up @@ -106,6 +106,13 @@ pub struct IcebergConfig {
}

impl IcebergConfig {
pub async fn load_table(&self) -> Result<Table> {
self.common
.load_table(&self.path_style_access, &self.java_catalog_props)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
}

pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
let mut config =
serde_json::from_value::<IcebergConfig>(serde_json::to_value(&values).unwrap())
Expand Down Expand Up @@ -198,11 +205,7 @@ impl IcebergSink {

let table = self
.config
.common
.load_table(
&self.config.path_style_access,
&self.config.java_catalog_props,
)
.load_table()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;

Expand Down Expand Up @@ -970,14 +973,7 @@ mod test {
async fn test_create_catalog(configs: BTreeMap<String, String>) {
let iceberg_config = IcebergConfig::from_btreemap(configs).unwrap();

let table = iceberg_config
.common
.load_table(
&iceberg_config.path_style_access,
&iceberg_config.java_catalog_props,
)
.await
.unwrap();
let table = iceberg_config.load_table().await.unwrap();

println!("{:?}", table.table_name());
}
Expand Down
8 changes: 1 addition & 7 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,7 @@ async fn get_partition_compute_info_for_iceberg(
if iceberg_config.create_table_if_not_exists {
return Ok(None);
}
let table = iceberg_config
.common
.load_table(
&iceberg_config.path_style_access,
&iceberg_config.java_catalog_props,
)
.await?;
let table = iceberg_config.load_table().await?;
let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() else {
return Ok(None);
};
Expand Down

0 comments on commit ff19cec

Please sign in to comment.