From 81993445fe29cceda2a7bfe3c0a20025f00d3d7d Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Thu, 22 Aug 2024 17:44:59 +0800 Subject: [PATCH 1/6] [flink] Support Flink Materialized Table --- .../catalog/CatalogMaterializedTable.java | 34 + .../catalog/CatalogMaterializedTable.java | 34 + .../catalog/CatalogMaterializedTable.java | 34 + .../catalog/CatalogMaterializedTable.java | 34 + .../catalog/CatalogMaterializedTable.java | 34 + .../catalog/CatalogMaterializedTable.java | 34 + paimon-flink/paimon-flink-common/pom.xml | 22 + .../org/apache/paimon/flink/FlinkCatalog.java | 270 ++++++-- .../paimon/flink/FlinkConnectorOptions.java | 114 ++++ .../apache/paimon/flink/FlinkCatalogTest.java | 173 ++++- .../paimon/flink/MaterializedTableITCase.java | 640 ++++++++++++++++++ 11 files changed, 1371 insertions(+), 52 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java create mode 100644 paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java create mode 100644 paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java create mode 100644 paimon-flink/paimon-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MaterializedTableITCase.java diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java new file mode 100644 index 000000000000..6eabd1db7f38 --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +/** + * Dummy placeholder to resolve compatibility issue of CatalogMaterializedTable(introduced in flink + * 1.20). + */ +public interface CatalogMaterializedTable extends CatalogBaseTable { + /** Dummy LogicalRefreshMode placeholder. */ + enum LogicalRefreshMode {} + + /** Dummy RefreshMode placeholder. */ + enum RefreshMode {} + + /** Dummy RefreshStatus placeholder. */ + enum RefreshStatus {} +} diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java new file mode 100644 index 000000000000..6eabd1db7f38 --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +/** + * Dummy placeholder to resolve compatibility issue of CatalogMaterializedTable(introduced in flink + * 1.20). + */ +public interface CatalogMaterializedTable extends CatalogBaseTable { + /** Dummy LogicalRefreshMode placeholder. */ + enum LogicalRefreshMode {} + + /** Dummy RefreshMode placeholder. */ + enum RefreshMode {} + + /** Dummy RefreshStatus placeholder. */ + enum RefreshStatus {} +} diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java new file mode 100644 index 000000000000..6eabd1db7f38 --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +/** + * Dummy placeholder to resolve compatibility issue of CatalogMaterializedTable(introduced in flink + * 1.20). + */ +public interface CatalogMaterializedTable extends CatalogBaseTable { + /** Dummy LogicalRefreshMode placeholder. */ + enum LogicalRefreshMode {} + + /** Dummy RefreshMode placeholder. */ + enum RefreshMode {} + + /** Dummy RefreshStatus placeholder. */ + enum RefreshStatus {} +} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java new file mode 100644 index 000000000000..6eabd1db7f38 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +/** + * Dummy placeholder to resolve compatibility issue of CatalogMaterializedTable(introduced in flink + * 1.20). + */ +public interface CatalogMaterializedTable extends CatalogBaseTable { + /** Dummy LogicalRefreshMode placeholder. */ + enum LogicalRefreshMode {} + + /** Dummy RefreshMode placeholder. */ + enum RefreshMode {} + + /** Dummy RefreshStatus placeholder. */ + enum RefreshStatus {} +} diff --git a/paimon-flink/paimon-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/paimon-flink/paimon-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java new file mode 100644 index 000000000000..6eabd1db7f38 --- /dev/null +++ b/paimon-flink/paimon-flink-1.19/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +/** + * Dummy placeholder to resolve compatibility issue of CatalogMaterializedTable(introduced in flink + * 1.20). + */ +public interface CatalogMaterializedTable extends CatalogBaseTable { + /** Dummy LogicalRefreshMode placeholder. */ + enum LogicalRefreshMode {} + + /** Dummy RefreshMode placeholder. */ + enum RefreshMode {} + + /** Dummy RefreshStatus placeholder. */ + enum RefreshStatus {} +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java new file mode 100644 index 000000000000..6eabd1db7f38 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +/** + * Dummy placeholder to resolve compatibility issue of CatalogMaterializedTable(introduced in flink + * 1.20). + */ +public interface CatalogMaterializedTable extends CatalogBaseTable { + /** Dummy LogicalRefreshMode placeholder. */ + enum LogicalRefreshMode {} + + /** Dummy RefreshMode placeholder. */ + enum RefreshMode {} + + /** Dummy RefreshStatus placeholder. */ + enum RefreshStatus {} +} diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index e086d74786ba..ef0f7fe1776a 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -59,6 +59,28 @@ under the License. provided + + org.apache.flink + flink-clients + ${flink.version} + test + + + + org.apache.flink + flink-sql-gateway + ${flink.version} + test + + + + org.apache.flink + flink-sql-gateway + ${flink.version} + test-jar + test + + diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 4ccf11f4ecf6..b5c81aabc341 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -21,6 +21,10 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkConnectorOptions.CatalogTableType; +import org.apache.paimon.flink.FlinkConnectorOptions.MaterializedTableIntervalFreshnessTimeUnit; +import org.apache.paimon.flink.FlinkConnectorOptions.MaterializedTableRefreshMode; +import org.apache.paimon.flink.FlinkConnectorOptions.MaterializedTableRefreshStatus; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; import org.apache.paimon.fs.Path; @@ -44,15 +48,15 @@ import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionImpl; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; -import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableChange.AddColumn; @@ -62,10 +66,13 @@ import org.apache.flink.table.catalog.TableChange.DropColumn; import org.apache.flink.table.catalog.TableChange.DropWatermark; import org.apache.flink.table.catalog.TableChange.First; +import org.apache.flink.table.catalog.TableChange.MaterializedTableChange; import org.apache.flink.table.catalog.TableChange.ModifyColumnComment; import org.apache.flink.table.catalog.TableChange.ModifyColumnName; import org.apache.flink.table.catalog.TableChange.ModifyColumnPosition; import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType; +import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler; +import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus; import org.apache.flink.table.catalog.TableChange.ModifyWatermark; import org.apache.flink.table.catalog.TableChange.ResetOption; import org.apache.flink.table.catalog.TableChange.SetOption; @@ -95,6 +102,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -113,10 +121,22 @@ import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; +import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes; +import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64; import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_MODE; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_SNAPSHOT; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_TABLE_TYPE; import static org.apache.paimon.flink.LogicalTypeConversion.toDataType; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem; @@ -242,7 +262,7 @@ public List listTables(String databaseName) } @Override - public CatalogTable getTable(ObjectPath tablePath) + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { return getTable(tablePath, null); } @@ -250,12 +270,12 @@ public CatalogTable getTable(ObjectPath tablePath) /** * Do not annotate with @override here to maintain compatibility with Flink 1.17-. */ - public CatalogTable getTable(ObjectPath tablePath, long timestamp) + public CatalogBaseTable getTable(ObjectPath tablePath, long timestamp) throws TableNotExistException, CatalogException { return getTable(tablePath, Long.valueOf(timestamp)); } - private CatalogTable getTable(ObjectPath tablePath, @Nullable Long timestamp) + private CatalogBaseTable getTable(ObjectPath tablePath, @Nullable Long timestamp) throws TableNotExistException { Table table; try { @@ -312,9 +332,10 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - if (!(table instanceof CatalogTable)) { + if (!(table instanceof CatalogTable || table instanceof CatalogMaterializedTable)) { throw new UnsupportedOperationException( - "Only support CatalogTable, but is: " + table.getClass()); + "Only support CatalogTable and CatalogMaterializedTable, but is: " + + table.getClass()); } if (Objects.equals(getDefaultDatabase(), tablePath.getDatabaseName()) @@ -327,7 +348,43 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig // the returned value of "table.getOptions" may be unmodifiable (for example from // TableDescriptor) Map options = new HashMap<>(table.getOptions()); - Schema paimonSchema = buildPaimonSchema(identifier, (CatalogTable) table, options); + if (table instanceof CatalogMaterializedTable) { + CatalogMaterializedTable mt = (CatalogMaterializedTable) table; + Options mtOptions = new Options(); + mtOptions.set(CATALOG_TABLE_TYPE, CatalogTableType.MATERIALIZED_TABLE); + mt.getSnapshot().ifPresent(x -> mtOptions.set(CATALOG_MATERIALIZED_TABLE_SNAPSHOT, x)); + mtOptions.set(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY, mt.getDefinitionQuery()); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS, + mt.getDefinitionFreshness().getInterval()); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT, + MaterializedTableIntervalFreshnessTimeUnit.valueOf( + mt.getDefinitionFreshness().getTimeUnit().name())); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE, + MaterializedTableRefreshMode.valueOf(mt.getLogicalRefreshMode().name())); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_REFRESH_MODE, + MaterializedTableRefreshMode.valueOf(mt.getRefreshMode().name())); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS, + MaterializedTableRefreshStatus.valueOf(mt.getRefreshStatus().name())); + mt.getRefreshHandlerDescription() + .ifPresent( + desc -> + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION, + desc)); + byte[] serializedRefreshHandler = mt.getSerializedRefreshHandler(); + if (serializedRefreshHandler != null) { + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES, + encodeBytesToBase64(serializedRefreshHandler)); + } + options.putAll(mtOptions.toMap()); + } + Schema paimonSchema = buildPaimonSchema(identifier, table, options); boolean unRegisterLogSystem = false; try { @@ -346,7 +403,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } protected Schema buildPaimonSchema( - Identifier identifier, CatalogTable catalogTable, Map options) { + Identifier identifier, CatalogBaseTable catalogTable, Map options) { String connector = options.get(CONNECTOR.key()); options.remove(CONNECTOR.key()); if (!StringUtils.isNullOrWhitespaceOnly(connector) @@ -382,7 +439,10 @@ protected Schema buildPaimonSchema( } } - return fromCatalogTable(catalogTable.copy(options)); + if (catalogTable instanceof CatalogTable) { + return fromCatalogTable(((CatalogTable) catalogTable).copy(options)); + } + return fromCatalogTable(((CatalogMaterializedTable) catalogTable).copy(options)); } private List toSchemaChange( @@ -505,10 +565,47 @@ private List toSchemaChange( throw new UnsupportedOperationException( "Change is not supported: " + change.getClass()); } + } else if (change instanceof MaterializedTableChange + && handleMaterializedTableChange(change, schemaChanges)) { + return schemaChanges; } throw new UnsupportedOperationException("Change is not supported: " + change.getClass()); } + /** + * Try handle change related to materialized table. + * + * @return true, if change can be identified as {@link MaterializedTableChange} and is handled + * properly. Otherwise, false. + */ + protected boolean handleMaterializedTableChange( + TableChange change, List schemaChanges) { + if (change instanceof ModifyRefreshStatus) { + ModifyRefreshStatus modifyRefreshStatus = (ModifyRefreshStatus) change; + CatalogMaterializedTable.RefreshStatus newRefreshStatus = + modifyRefreshStatus.getRefreshStatus(); + schemaChanges.add( + SchemaChange.setOption( + CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS.key(), + newRefreshStatus.name())); + return true; + } else if (change instanceof ModifyRefreshHandler) { + ModifyRefreshHandler modifyRefreshHandler = (ModifyRefreshHandler) change; + String newHandlerDesc = modifyRefreshHandler.getRefreshHandlerDesc(); + byte[] newHandlerBytes = modifyRefreshHandler.getRefreshHandlerBytes(); + schemaChanges.add( + SchemaChange.setOption( + CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION.key(), + newHandlerDesc)); + schemaChanges.add( + SchemaChange.setOption( + CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES.key(), + encodeBytesToBase64(newHandlerBytes))); + return true; + } + return false; + } + @Override public void alterTable( ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) @@ -517,10 +614,10 @@ public void alterTable( return; } - CatalogTable table = getTable(tablePath); + CatalogBaseTable table = getTable(tablePath); // Currently, Flink SQL only support altering table properties. - validateAlterTable(table, (CatalogTable) newTable); + validateAlterTable(table, newTable); List changes = new ArrayList<>(); Map oldProperties = table.getOptions(); @@ -576,7 +673,7 @@ public void alterTable( } Preconditions.checkArgument(table instanceof FileStoreTable, "Can't alter system table."); - validateAlterTable(toCatalogTable(table), (CatalogTable) newTable); + validateAlterTable(toCatalogTable(table), newTable); Map oldTableNonPhysicalColumnIndex = FlinkCatalogPropertiesUtil.nonPhysicalColumns( table.options(), table.rowType().getFieldNames()); @@ -669,36 +766,59 @@ private void setWatermarkOptions( .asSerializableString())); } - private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2) { + private static void validateAlterTable(CatalogBaseTable ct1, CatalogBaseTable ct2) { if (ct1 instanceof SystemCatalogTable) { throw new UnsupportedOperationException("Can't alter system table."); } - org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema(); - org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema(); - boolean pkEquality = false; - - if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) { - pkEquality = - Objects.equals( - ts1.getPrimaryKey().get().getType(), - ts2.getPrimaryKey().get().getType()) - && Objects.equals( - ts1.getPrimaryKey().get().getColumns(), - ts2.getPrimaryKey().get().getColumns()); - } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) { - pkEquality = true; + boolean table1IsMaterialized = ct1 instanceof CatalogMaterializedTable; + boolean table2IsMaterialized = ct2 instanceof CatalogMaterializedTable; + if ((table1IsMaterialized || table2IsMaterialized) + && !(table1IsMaterialized && table2IsMaterialized)) { + throw new UnsupportedOperationException( + "Convert a non-materialized table to materialized table or vice versa is not allowed."); } + // materialized table is not resolved at this time. + if (!table1IsMaterialized) { + org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema(); + org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema(); + boolean pkEquality = false; + + if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) { + pkEquality = + Objects.equals( + ts1.getPrimaryKey().get().getType(), + ts2.getPrimaryKey().get().getType()) + && Objects.equals( + ts1.getPrimaryKey().get().getColumns(), + ts2.getPrimaryKey().get().getColumns()); + } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) { + pkEquality = true; + } - if (!pkEquality) { - throw new UnsupportedOperationException("Altering primary key is not supported yet."); + if (!pkEquality) { + throw new UnsupportedOperationException( + "Altering primary key is not supported yet."); + } } - if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) { + if (!getPartitionKeys(ct1).equals(getPartitionKeys(ct2))) { throw new UnsupportedOperationException( "Altering partition keys is not supported yet."); } } + private static List getPartitionKeys(CatalogBaseTable table) { + if (table instanceof CatalogTable) { + return ((CatalogTable) table).getPartitionKeys(); + } else if (table instanceof CatalogMaterializedTable) { + return ((CatalogMaterializedTable) table).getPartitionKeys(); + } else { + throw new UnsupportedOperationException( + "Only support CatalogTable and CatalogMaterializedTable, but is: " + + table.getClass()); + } + } + @Override public final void open() throws CatalogException {} @@ -711,7 +831,7 @@ public final void close() throws CatalogException { } } - private CatalogTableImpl toCatalogTable(Table table) { + private CatalogBaseTable toCatalogTable(Table table) { Map newOptions = new HashMap<>(table.options()); TableSchema.Builder builder = TableSchema.builder(); @@ -761,18 +881,67 @@ private CatalogTableImpl toCatalogTable(Table table) { removeProperties.putTableSchema(SCHEMA, schema); removeProperties.asMap().keySet().forEach(newOptions::remove); - return new DataCatalogTable( - table, - schema, - table.partitionKeys(), - newOptions, - table.comment().orElse(""), - nonPhysicalColumnComments); + Options options = Options.fromMap(newOptions); + if (CatalogBaseTable.TableKind.TABLE + .name() + .equals(options.get(CATALOG_TABLE_TYPE).name())) { + return new DataCatalogTable( + table, + schema, + table.partitionKeys(), + newOptions, + table.comment().orElse(""), + nonPhysicalColumnComments); + } + Long snapshot = options.get(CATALOG_MATERIALIZED_TABLE_SNAPSHOT); + String definitionQuery = options.get(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY); + IntervalFreshness freshness = + IntervalFreshness.of( + options.get(CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS), + IntervalFreshness.TimeUnit.valueOf( + options.get(CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT) + .name())); + CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode = + CatalogMaterializedTable.LogicalRefreshMode.valueOf( + options.get(CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE).name()); + CatalogMaterializedTable.RefreshMode refreshMode = + CatalogMaterializedTable.RefreshMode.valueOf( + options.get(CATALOG_MATERIALIZED_TABLE_REFRESH_MODE).name()); + CatalogMaterializedTable.RefreshStatus refreshStatus = + CatalogMaterializedTable.RefreshStatus.valueOf( + options.get(CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS).name()); + String refreshHandlerDescription = + options.get(CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION); + byte[] serializedRefreshHandler = + decodeRefreshHandlerBytes( + options.get(CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES)); + // remove materialized table related options + allMaterializedTableAttributes().forEach(newOptions::remove); + return CatalogMaterializedTable.newBuilder() + .schema(schema.toSchema()) + .comment(table.comment().orElse("")) + .partitionKeys(table.partitionKeys()) + .options(newOptions) + .snapshot(snapshot) + .definitionQuery(definitionQuery) + .freshness(freshness) + .logicalRefreshMode(logicalRefreshMode) + .refreshMode(refreshMode) + .refreshStatus(refreshStatus) + .refreshHandlerDescription(refreshHandlerDescription) + .serializedRefreshHandler(serializedRefreshHandler) + .build(); + } + + private byte[] decodeRefreshHandlerBytes(String refreshHandlerBytes) { + return org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly(refreshHandlerBytes) + ? null + : decodeBase64ToBytes(refreshHandlerBytes); } - public static Schema fromCatalogTable(CatalogTable table) { - ResolvedCatalogTable catalogTable = (ResolvedCatalogTable) table; - ResolvedSchema schema = catalogTable.getResolvedSchema(); + public static Schema fromCatalogTable(CatalogBaseTable catalogTable) { + ResolvedSchema schema = + ((ResolvedCatalogBaseTable) catalogTable).getResolvedSchema(); RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType(); Map options = new HashMap<>(catalogTable.getOptions()); @@ -788,7 +957,7 @@ public static Schema fromCatalogTable(CatalogTable table) { schema.getPrimaryKey() .map(pk -> pk.getColumns()) .orElse(Collections.emptyList())) - .partitionKeys(catalogTable.getPartitionKeys()); + .partitionKeys(getPartitionKeys(catalogTable)); Map columnComments = getColumnComments(catalogTable); rowType.getFields() .forEach( @@ -801,7 +970,7 @@ public static Schema fromCatalogTable(CatalogTable table) { return schemaBuilder.build(); } - private static Map getColumnComments(CatalogTable catalogTable) { + private static Map getColumnComments(CatalogBaseTable catalogTable) { return catalogTable.getUnresolvedSchema().getColumns().stream() .filter(c -> c.getComment().isPresent()) .collect( @@ -1134,4 +1303,17 @@ private boolean isCalledFromFlinkRecomputeStatisticsProgram() { } return false; } + + private List allMaterializedTableAttributes() { + return Arrays.asList( + CATALOG_MATERIALIZED_TABLE_SNAPSHOT.key(), + CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY.key(), + CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS.key(), + CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT.key(), + CATALOG_MATERIALIZED_TABLE_REFRESH_MODE.key(), + CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE.key(), + CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS.key(), + CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION.key(), + CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES.key()); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index d181d7b5a0c6..ba5428fb6245 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -404,6 +404,84 @@ public class FlinkConnectorOptions { .withDescription( "Optional endInput watermark used in case of batch mode or bounded stream."); + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption CATALOG_TABLE_TYPE = + key("catalog-table.type") + .enumType(CatalogTableType.class) + .defaultValue(CatalogTableType.TABLE) + .withDescription("The type of flink catalog table."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption CATALOG_MATERIALIZED_TABLE_SNAPSHOT = + key("catalog-materialized-table.snapshot") + .longType() + .noDefaultValue() + .withDescription("The snapshot specified for the materialized table"); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY = + key("catalog-materialized-table.definition-query") + .stringType() + .noDefaultValue() + .withDescription( + "The definition query text of materialized table, text is expanded in contrast to the original SQL."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS = + key("catalog-materialized-table.interval-freshness") + .stringType() + .noDefaultValue() + .withDescription( + "the freshness interval of materialized table which is used to determine the physical refresh mode."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption + CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT = + key("catalog-materialized-table.interval-freshness.time-unit") + .enumType(MaterializedTableIntervalFreshnessTimeUnit.class) + .noDefaultValue() + .withDescription("The time unit of freshness interval."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption + CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE = + key("catalog-materialized-table.logical-refresh-mode") + .enumType(MaterializedTableRefreshMode.class) + .noDefaultValue() + .withDescription("the logical refresh mode of materialized table."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption + CATALOG_MATERIALIZED_TABLE_REFRESH_MODE = + key("catalog-materialized-table.refresh-mode") + .enumType(MaterializedTableRefreshMode.class) + .noDefaultValue() + .withDescription("the physical refresh mode of materialized table."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption + CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS = + key("catalog-materialized-table.refresh-status") + .enumType(MaterializedTableRefreshStatus.class) + .noDefaultValue() + .withDescription("the refresh status of materialized table."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption + CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION = + key("catalog-materialized-table.refresh-handler-description") + .stringType() + .noDefaultValue() + .withDescription( + "The summary description of materialized table's refresh handler"); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES = + key("catalog-materialized-table.refresh-handler-bytes") + .stringType() + .noDefaultValue() + .withDescription("The serialized refresh handler of materialized table."); + public static List> getOptions() { final Field[] fields = FlinkConnectorOptions.class.getFields(); final List> list = new ArrayList<>(fields.length); @@ -419,6 +497,42 @@ public static List> getOptions() { return list; } + /** The type of flink catalog table. */ + public enum CatalogTableType { + TABLE, + MATERIALIZED_TABLE + } + + /** The time unit of materialized table freshness. */ + public enum MaterializedTableIntervalFreshnessTimeUnit { + SECOND, + MINUTE, + HOUR, + DAY + } + + /** The refresh mode of materialized table. */ + public enum MaterializedTableRefreshMode { + /** The refresh pipeline will be executed in continuous mode. */ + CONTINUOUS, + + /** The refresh pipeline will be executed in full mode. */ + FULL, + + /** + * The refresh pipeline mode is determined by freshness of materialized table, either {@link + * #FULL} or {@link #CONTINUOUS}. + */ + AUTOMATIC + } + + /** The refresh status of materialized table. */ + public enum MaterializedTableRefreshStatus { + INITIALIZING, + ACTIVATED, + SUSPENDED + } + /** The mode of lookup cache. */ public enum LookupCacheMode { /** Auto mode, try to use partial mode. */ diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index f7edfa9d644f..ec2bd704073c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -37,11 +37,16 @@ import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TestSchemaResolver; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; @@ -54,6 +59,7 @@ import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.refresh.RefreshHandler; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -80,11 +86,13 @@ import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS; import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; +import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_TABLE_TYPE; import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatCollection; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertThrows; /** Test for {@link FlinkCatalog}. */ public class FlinkCatalogTest { @@ -99,6 +107,10 @@ public class FlinkCatalogTest { private final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist"); private final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist"); + private static final String DEFINITION_QUERY = "SELECT id, region, county FROM T"; + + private static final IntervalFreshness FRESHNESS = IntervalFreshness.ofMinute("3"); + private String warehouse; private Catalog catalog; @@ -186,6 +198,96 @@ private CatalogTable createPartitionedTable(Map options) { return new ResolvedCatalogTable(origin, resolvedSchema); } + private CatalogMaterializedTable createMaterializedTable(Map options) { + ResolvedSchema resolvedSchema = this.createSchema(); + return new ResolvedCatalogMaterializedTable( + CatalogMaterializedTable.newBuilder() + .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build()) + .comment("test materialized table comment") + .partitionKeys(Collections.emptyList()) + .options(options) + .definitionQuery(DEFINITION_QUERY) + .freshness(FRESHNESS) + .logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC) + .refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS) + .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) + .build(), + resolvedSchema); + } + + @ParameterizedTest + @MethodSource("batchOptionProvider") + public void testCreateAndGetCatalogMaterializedTable(Map options) + throws Exception { + ObjectPath tablePath = path1; + CatalogMaterializedTable materializedTable = createMaterializedTable(options); + catalog.createDatabase(tablePath.getDatabaseName(), null, false); + // test create materialized table + catalog.createTable(tablePath, materializedTable, true); + + // test materialized table exist + assertThat(catalog.tableExists(tablePath)).isTrue(); + + // test get materialized table + CatalogBaseTable actualTable = catalog.getTable(tablePath); + // validate table type + assertThat(actualTable.getTableKind()) + .isEqualTo(CatalogBaseTable.TableKind.MATERIALIZED_TABLE); + + CatalogMaterializedTable actualMaterializedTable = (CatalogMaterializedTable) actualTable; + checkCreateTable(tablePath, materializedTable, actualMaterializedTable); + // test create exist materialized table + assertThrows( + TableAlreadyExistException.class, + () -> catalog.createTable(tablePath, materializedTable, false)); + } + + @ParameterizedTest + @MethodSource("batchOptionProvider") + public void testDropMaterializedTable(Map options) throws Exception { + ObjectPath tablePath = path1; + catalog.createDatabase(tablePath.getDatabaseName(), null, false); + catalog.createTable(tablePath, this.createTable(options), false); + assertThat(catalog.tableExists(tablePath)).isTrue(); + catalog.dropTable(tablePath, false); + assertThat(catalog.tableExists(tablePath)).isFalse(); + } + + @ParameterizedTest + @MethodSource("batchOptionProvider") + public void testAlterMaterializedTable(Map options) throws Exception { + ObjectPath tablePath = path1; + CatalogMaterializedTable materializedTable = createMaterializedTable(options); + catalog.createDatabase(tablePath.getDatabaseName(), null, false); + catalog.createTable(tablePath, materializedTable, true); + TestRefreshHandler refreshHandler = new TestRefreshHandler("jobID: xxx, clusterId: yyy"); + + // alter materialized table refresh handler + CatalogMaterializedTable expectedMaterializedTable = + materializedTable.copy( + CatalogMaterializedTable.RefreshStatus.ACTIVATED, + refreshHandler.asSummaryString(), + refreshHandler.toBytes()); + List tableChanges = new ArrayList<>(); + tableChanges.add( + new TableChange.ModifyRefreshStatus( + CatalogMaterializedTable.RefreshStatus.ACTIVATED)); + tableChanges.add( + new TableChange.ModifyRefreshHandler( + refreshHandler.asSummaryString(), refreshHandler.toBytes())); + catalog.alterTable(tablePath, expectedMaterializedTable, tableChanges, false); + + CatalogBaseTable updatedTable = catalog.getTable(tablePath); + checkEquals( + tablePath, + expectedMaterializedTable, + updatedTable, + Collections.singletonMap( + FlinkCatalogOptions.REGISTER_TIMEOUT.key(), + FlinkCatalogOptions.REGISTER_TIMEOUT.defaultValue().toString()), + Collections.emptySet()); + } + @ParameterizedTest @MethodSource("batchOptionProvider") public void testAlterTable(Map options) throws Exception { @@ -644,7 +746,8 @@ void testCreateTableFromTableDescriptor() throws Exception { checkCreateTable(path1, catalogTable, (CatalogTable) catalog.getTable(path1)); } - private void checkCreateTable(ObjectPath path, CatalogTable expected, CatalogTable actual) { + private void checkCreateTable( + ObjectPath path, CatalogBaseTable expected, CatalogBaseTable actual) { checkEquals( path, expected, @@ -661,8 +764,8 @@ private void checkAlterTable(ObjectPath path, CatalogTable expected, CatalogTabl private void checkEquals( ObjectPath path, - CatalogTable t1, - CatalogTable t2, + CatalogBaseTable t1, + CatalogBaseTable t2, Map optionsToAdd, Set optionsToRemove) { Path tablePath; @@ -681,17 +784,53 @@ private void checkEquals( options.put("path", tablePath.toString()); options.putAll(optionsToAdd); optionsToRemove.forEach(options::remove); - t1 = ((ResolvedCatalogTable) t1).copy(options); + if (t1.getTableKind() == CatalogBaseTable.TableKind.TABLE) { + t1 = ((ResolvedCatalogTable) t1).copy(options); + } else { + options.put(CATALOG_TABLE_TYPE.key(), "MATERIALIZED_TABLE"); + t1 = ((ResolvedCatalogMaterializedTable) t1).copy(options); + } checkEquals(t1, t2); } - private static void checkEquals(CatalogTable t1, CatalogTable t2) { + private static void checkEquals(CatalogBaseTable t1, CatalogBaseTable t2) { assertThat(t2.getTableKind()).isEqualTo(t1.getTableKind()); - assertThat(t2.getSchema()).isEqualTo(t1.getSchema()); assertThat(t2.getComment()).isEqualTo(t1.getComment()); - assertThat(t2.getPartitionKeys()).isEqualTo(t1.getPartitionKeys()); - assertThat(t2.isPartitioned()).isEqualTo(t1.isPartitioned()); assertThat(t2.getOptions()).isEqualTo(t1.getOptions()); + if (t1.getTableKind() == CatalogBaseTable.TableKind.TABLE) { + assertThat(t2.getSchema()).isEqualTo(t1.getSchema()); + assertThat(((CatalogTable) (t2)).getPartitionKeys()) + .isEqualTo(((CatalogTable) (t1)).getPartitionKeys()); + assertThat(((CatalogTable) (t2)).isPartitioned()) + .isEqualTo(((CatalogTable) (t1)).isPartitioned()); + } else { + CatalogMaterializedTable mt1 = (CatalogMaterializedTable) t1; + CatalogMaterializedTable mt2 = (CatalogMaterializedTable) t2; + assertThat( + Schema.newBuilder() + .fromResolvedSchema( + t2.getUnresolvedSchema() + .resolve(new TestSchemaResolver())) + .build()) + .isEqualTo(t1.getSchema().toSchema()); + assertThat(mt2.getPartitionKeys()).isEqualTo(mt1.getPartitionKeys()); + assertThat(mt2.isPartitioned()).isEqualTo(mt1.isPartitioned()); + // validate definition query + assertThat(mt2.getDefinitionQuery()).isEqualTo(mt1.getDefinitionQuery()); + // validate freshness + assertThat(mt2.getDefinitionFreshness()).isEqualTo(mt1.getDefinitionFreshness()); + // validate logical refresh mode + assertThat(mt2.getLogicalRefreshMode()).isEqualTo(mt1.getLogicalRefreshMode()); + // validate refresh mode + assertThat(mt2.getRefreshMode()).isEqualTo(mt1.getRefreshMode()); + // validate refresh status + assertThat(mt2.getRefreshStatus()).isEqualTo(mt1.getRefreshStatus()); + // validate refresh handler + assertThat(mt2.getRefreshHandlerDescription()) + .isEqualTo(mt1.getRefreshHandlerDescription()); + assertThat(mt2.getSerializedRefreshHandler()) + .isEqualTo(mt1.getSerializedRefreshHandler()); + } } static Stream> streamingOptionProvider() { @@ -774,4 +913,22 @@ public void unRegisterTopic() { throw new UnsupportedOperationException("Check unregister log store topic here."); } } + + private static class TestRefreshHandler implements RefreshHandler { + + private final String handlerString; + + public TestRefreshHandler(String handlerString) { + this.handlerString = handlerString; + } + + @Override + public String asSummaryString() { + return "test refresh handler"; + } + + public byte[] toBytes() { + return handlerString.getBytes(); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MaterializedTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MaterializedTableITCase.java new file mode 100644 index 000000000000..34f354e54de9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MaterializedTableITCase.java @@ -0,0 +1,640 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.TableInfo; +import org.apache.flink.table.gateway.api.session.SessionEnvironment; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; +import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl; +import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; +import org.apache.flink.table.gateway.workflow.scheduler.EmbeddedQuartzScheduler; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.refresh.ContinuousRefreshHandler; +import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer; +import org.apache.flink.table.shaded.org.quartz.JobKey; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.types.Row; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND; +import static org.apache.flink.table.factories.FactoryUtil.WORKFLOW_SCHEDULER_TYPE; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults; +import static org.apache.flink.test.util.TestUtils.waitUntilAllTasksAreRunning; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test the support of Materialized Table. */ +public class MaterializedTableITCase { + private static final String FILE_CATALOG_STORE = "file_store"; + private static final String TEST_CATALOG_PREFIX = "test_catalog"; + protected static final String TEST_DEFAULT_DATABASE = "default"; + + private static final AtomicLong COUNTER = new AtomicLong(0); + + @RegisterExtension + @Order(1) + static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .build()); + + @RegisterExtension + @Order(2) + protected static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = + new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration); + + @RegisterExtension + @Order(3) + protected static final TestExecutorExtension EXECUTOR_EXTENSION = + new TestExecutorExtension<>( + () -> + Executors.newCachedThreadPool( + new ExecutorThreadFactory( + "SqlGatewayService Test Pool", + IgnoreExceptionHandler.INSTANCE))); + + @RegisterExtension + @Order(4) + protected static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = + new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService); + + protected static SqlGatewayServiceImpl service; + private static SessionEnvironment defaultSessionEnvironment; + private static Path baseCatalogPath; + + private String paimonWarehousePath; + protected String paimonCatalogName; + + protected SessionHandle sessionHandle; + + protected RestClusterClient restClusterClient; + + @BeforeAll + static void setUp(@TempDir Path temporaryFolder) throws Exception { + service = (SqlGatewayServiceImpl) SQL_GATEWAY_SERVICE_EXTENSION.getService(); + + // initialize file catalog store path + Path fileCatalogStore = temporaryFolder.resolve(FILE_CATALOG_STORE); + Files.createDirectory(fileCatalogStore); + Map catalogStoreOptions = new HashMap<>(); + catalogStoreOptions.put(TABLE_CATALOG_STORE_KIND.key(), "file"); + catalogStoreOptions.put("table.catalog-store.file.path", fileCatalogStore.toString()); + + // initialize catalog base path + baseCatalogPath = temporaryFolder.resolve(TEST_CATALOG_PREFIX); + Files.createDirectory(baseCatalogPath); + + // workflow scheduler config + Map workflowSchedulerConfig = new HashMap<>(); + workflowSchedulerConfig.put(WORKFLOW_SCHEDULER_TYPE.key(), "embedded"); + workflowSchedulerConfig.put( + "sql-gateway.endpoint.rest.address", + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress()); + workflowSchedulerConfig.put( + "sql-gateway.endpoint.rest.port", + String.valueOf(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort())); + + // Session conf for testing purpose + Map testConf = new HashMap<>(); + testConf.put("k1", "v1"); + testConf.put("k2", "v2"); + + defaultSessionEnvironment = + SessionEnvironment.newBuilder() + .addSessionConfig(catalogStoreOptions) + .addSessionConfig(workflowSchedulerConfig) + .addSessionConfig(testConf) + .setSessionEndpointVersion(new EndpointVersion() {}) + .build(); + } + + @BeforeEach + void before(@InjectClusterClient RestClusterClient injectClusterClient) throws Exception { + String randomStr = String.valueOf(COUNTER.incrementAndGet()); + // initialize warehouse path with random uuid + Path fileCatalogPath = baseCatalogPath.resolve(randomStr); + Files.createDirectory(fileCatalogPath); + + paimonWarehousePath = fileCatalogPath.toString(); + paimonCatalogName = TEST_CATALOG_PREFIX + randomStr; + // initialize session handle, create paimon catalog and register it to catalog + // store + sessionHandle = initializeSession(); + + // init rest cluster client + restClusterClient = injectClusterClient; + } + + @AfterEach + void after() throws Exception { + Set tableInfos = + service.listTables( + sessionHandle, + paimonCatalogName, + TEST_DEFAULT_DATABASE, + Collections.singleton(CatalogBaseTable.TableKind.TABLE)); + + // drop all materialized tables + for (TableInfo tableInfo : tableInfos) { + ResolvedCatalogBaseTable resolvedTable = + service.getTable(sessionHandle, tableInfo.getIdentifier()); + if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == resolvedTable.getTableKind()) { + String dropTableDDL = + String.format( + "DROP MATERIALIZED TABLE %s", + tableInfo.getIdentifier().asSerializableString()); + OperationHandle dropTableHandle; + dropTableHandle = + service.executeStatement( + sessionHandle, dropTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, dropTableHandle); + } + } + } + + @Test + void testCreateMaterializedTableInContinuousMode() throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // validate materialized table: schema, refresh mode, refresh status, refresh handler, + // doesn't check the data because it generates randomly. + ResolvedCatalogMaterializedTable actualMaterializedTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + paimonCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + + // Expected schema + ResolvedSchema expectedSchema = + ResolvedSchema.of( + Arrays.asList( + Column.physical("user_id", DataTypes.BIGINT()), + Column.physical("shop_id", DataTypes.BIGINT()), + Column.physical("ds", DataTypes.STRING()), + Column.physical("payed_buy_fee_sum", DataTypes.BIGINT()), + Column.physical("pv", DataTypes.INT().notNull()))); + + assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema); + assertThat(actualMaterializedTable.getFreshness()).isEqualTo(Duration.ofSeconds(30)); + assertThat(actualMaterializedTable.getLogicalRefreshMode()) + .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); + assertThat(actualMaterializedTable.getRefreshMode()) + .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS); + assertThat(actualMaterializedTable.getRefreshStatus()) + .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); + assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty(); + assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty(); + + ContinuousRefreshHandler activeRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + actualMaterializedTable.getSerializedRefreshHandler(), + getClass().getClassLoader()); + + waitUntilAllTasksAreRunning( + restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); + + // verify the background job is running + String describeJobDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId()); + OperationHandle describeJobHandle = + service.executeStatement(sessionHandle, describeJobDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, describeJobHandle); + List jobResults = fetchAllResults(service, sessionHandle, describeJobHandle); + assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING"); + + // get checkpoint interval + long checkpointInterval = + getCheckpointIntervalConfig(restClusterClient, activeRefreshHandler.getJobId()); + assertThat(checkpointInterval).isEqualTo(30 * 1000); + } + + @Test + void testAlterMaterializedTableRefresh() throws Exception { + long timeout = Duration.ofSeconds(20).toMillis(); + long pause = Duration.ofSeconds(2).toMillis(); + + List data = new ArrayList<>(); + data.add(Row.of(1L, 1L, 1L, "2024-01-01")); + data.add(Row.of(2L, 2L, 2L, "2024-01-02")); + data.add(Row.of(3L, 3L, 3L, "2024-01-02")); + createAndVerifyCreateMaterializedTableWithData( + "my_materialized_table", + data, + Collections.singletonMap("ds", "yyyy-MM-dd"), + CatalogMaterializedTable.RefreshMode.CONTINUOUS); + + // remove the last element + data.remove(2); + + long currentTime = System.currentTimeMillis(); + String alterStatement = + "ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds = '2024-01-02')"; + OperationHandle alterHandle = + service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterHandle); + List result = fetchAllResults(service, sessionHandle, alterHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); + + // 1. verify a new job is created + verifyRefreshJobCreated(restClusterClient, jobId, currentTime); + // 2. verify the new job overwrite the data + try (ExecutionInBatchModeRunner ignored = new ExecutionInBatchModeRunner()) { + org.apache.paimon.utils.CommonTestUtils.waitUtil( + () -> + fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table") + .size() + == data.size(), + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify the data in materialized table."); + assertThat( + fetchTableData( + sessionHandle, + "SELECT * FROM my_materialized_table where ds = '2024-01-02'") + .size()) + .isEqualTo(1); + } + } + + @Test + void testDropMaterializedTable() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", + Collections.emptyList(), + Collections.emptyMap(), + CatalogMaterializedTable.RefreshMode.FULL); + + JobKey jobKey = + JobKey.jobKey( + "quartz_job_" + + ObjectIdentifier.of( + paimonCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops") + .asSerializableString(), + "default_group"); + EmbeddedQuartzScheduler embeddedWorkflowScheduler = + SQL_GATEWAY_REST_ENDPOINT_EXTENSION + .getSqlGatewayRestEndpoint() + .getQuartzScheduler(); + + // verify refresh workflow is created + assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue(); + + // Drop materialized table using drop table statement + String dropTableUsingMaterializedTableDDL = "DROP TABLE users_shops"; + OperationHandle dropTableUsingMaterializedTableHandle = + service.executeStatement( + sessionHandle, dropTableUsingMaterializedTableDDL, -1, new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + dropTableUsingMaterializedTableHandle)) + .rootCause() + .isInstanceOf(ValidationException.class) + .hasMessage( + String.format( + "Table with identifier '%s' does not exist.", + ObjectIdentifier.of( + paimonCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops") + .asSummaryString())); + + // drop materialized table + String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops"; + OperationHandle dropMaterializedTableHandle = + service.executeStatement( + sessionHandle, dropMaterializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, dropMaterializedTableHandle); + + // verify materialized table metadata is removed + assertThatThrownBy( + () -> + service.getTable( + sessionHandle, + ObjectIdentifier.of( + paimonCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops"))) + .isInstanceOf(SqlGatewayException.class) + .hasMessageContaining("Failed to getTable."); + + // verify refresh workflow is removed + assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isFalse(); + } + + private long getCheckpointIntervalConfig(RestClusterClient restClusterClient, String jobId) + throws Exception { + CheckpointConfigInfo checkpointConfigInfo = + sendJobRequest( + restClusterClient, + CheckpointConfigHeaders.getInstance(), + EmptyRequestBody.getInstance(), + jobId); + return RestMapperUtils.getStrictObjectMapper() + .readTree( + RestMapperUtils.getStrictObjectMapper() + .writeValueAsString(checkpointConfigInfo)) + .get("interval") + .asLong(); + } + + private static + P sendJobRequest( + RestClusterClient restClusterClient, + MessageHeaders headers, + R requestBody, + String jobId) + throws Exception { + M jobMessageParameters = headers.getUnresolvedMessageParameters(); + jobMessageParameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); + + return restClusterClient + .sendRequest(headers, jobMessageParameters, requestBody) + .get(5, TimeUnit.SECONDS); + } + + private SessionHandle initializeSession() { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + String catalogDDL = + String.format( + "CREATE CATALOG %s\n" + + "WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '%s'" + + " )", + paimonCatalogName, paimonWarehousePath); + service.configureSession(sessionHandle, catalogDDL, -1); + service.configureSession( + sessionHandle, String.format("USE CATALOG %s", paimonCatalogName), -1); + + // create source table + String dataGenSource = + "CREATE TEMPORARY TABLE datagenSource (\n" + + " order_id BIGINT,\n" + + " order_number VARCHAR(20),\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " product_id BIGINT,\n" + + " status BIGINT,\n" + + " order_type BIGINT,\n" + + " order_created_at TIMESTAMP,\n" + + " payment_amount_cents BIGINT\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '10'\n" + + ")"; + service.configureSession(sessionHandle, dataGenSource, -1); + return sessionHandle; + } + + public void createAndVerifyCreateMaterializedTableWithData( + String materializedTableName, + List data, + Map partitionFormatter, + CatalogMaterializedTable.RefreshMode refreshMode) + throws Exception { + long timeout = Duration.ofSeconds(20).toMillis(); + long pause = Duration.ofSeconds(2).toMillis(); + + String dataId = TestValuesTableFactory.registerData(data); + String sourceDdl = + String.format( + "CREATE TEMPORARY TABLE IF NOT EXISTS my_source (\n" + + " order_id BIGINT,\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " order_created_at STRING\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true',\n" + + " 'data-id' = '%s'\n" + + ")", + dataId); + OperationHandle sourceHandle = + service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, sourceHandle); + + String partitionFields = + partitionFormatter != null && !partitionFormatter.isEmpty() + ? partitionFormatter.entrySet().stream() + .map( + e -> + String.format( + "'partition.fields.%s.date-formatter' = '%s'", + e.getKey(), e.getValue())) + .collect(Collectors.joining(",\n", "", ",\n")) + : "\n"; + String materializedTableDDL = + String.format( + "CREATE MATERIALIZED TABLE %s" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " %s" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " REFRESH_MODE = %s\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)", + materializedTableName, partitionFields, refreshMode.toString()); + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + try (ExecutionInBatchModeRunner ignore = new ExecutionInBatchModeRunner()) { + // verify data exists in materialized table + CommonTestUtils.waitUtil( + () -> + fetchTableData( + sessionHandle, + String.format( + "SELECT * FROM %s", + materializedTableName)) + .size() + == data.size(), + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify the data in materialized table."); + } + } + + /** + * A helper runner to wrap code with try-with-resource clause. All session execution will be + * executed in flink batch runtime mode. + */ + protected class ExecutionInBatchModeRunner implements AutoCloseable { + private final String oldMode; + + ExecutionInBatchModeRunner() { + this.oldMode = service.getSessionConfig(sessionHandle).get("execution.runtime-mode"); + service.configureSession(sessionHandle, "SET 'execution.runtime-mode' = 'batch'", -1); + } + + @Override + public void close() throws Exception { + if (oldMode != null) { + service.configureSession( + sessionHandle, + String.format("SET 'execution.runtime-mode' = '%s'", oldMode), + -1); + } + } + } + + public List fetchTableData(SessionHandle sessionHandle, String query) { + Configuration configuration = new Configuration(); + OperationHandle queryHandle = + service.executeStatement(sessionHandle, query, -1, configuration); + + return fetchAllResults(service, sessionHandle, queryHandle); + } + + public void verifyRefreshJobCreated( + RestClusterClient restClusterClient, String jobId, long startTime) throws Exception { + long timeout = Duration.ofSeconds(20).toMillis(); + long pause = Duration.ofSeconds(2).toMillis(); + + // 1. verify a new job is created + Optional job = + restClusterClient.listJobs().get(timeout, TimeUnit.MILLISECONDS).stream() + .filter(j -> j.getJobId().toString().equals(jobId)) + .findFirst(); + assertThat(job).isPresent(); + assertThat(job.get().getStartTime()).isGreaterThan(startTime); + + // 2. verify the new job is a batch job + JobDetailsInfo jobDetailsInfo = + restClusterClient + .getJobDetails(JobID.fromHexString(jobId)) + .get(timeout, TimeUnit.MILLISECONDS); + assertThat(jobDetailsInfo.getJobType()).isEqualTo(JobType.BATCH); + + // 3. verify the new job is finished + CommonTestUtils.waitUtil( + () -> { + try { + return JobStatus.FINISHED.equals( + restClusterClient + .getJobStatus(JobID.fromHexString(jobId)) + .get(5, TimeUnit.SECONDS)); + } catch (Exception ignored) { + } + return false; + }, + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify whether the job is finished."); + } +} From c9d8640f90a9607c682d9bd269ea748cf84a2672 Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Tue, 24 Sep 2024 09:52:07 +0800 Subject: [PATCH 2/6] address comments --- .../java/org/apache/paimon/TableType.java | 4 +- .../org/apache/paimon/flink/FlinkCatalog.java | 98 ++++++++++--------- .../paimon/flink/FlinkConnectorOptions.java | 31 ++---- .../apache/paimon/flink/FlinkCatalogTest.java | 4 +- 4 files changed, 65 insertions(+), 72 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/TableType.java b/paimon-common/src/main/java/org/apache/paimon/TableType.java index c5ed8d4dce12..e57b213348aa 100644 --- a/paimon-common/src/main/java/org/apache/paimon/TableType.java +++ b/paimon-common/src/main/java/org/apache/paimon/TableType.java @@ -28,8 +28,8 @@ public enum TableType implements DescribedEnum { TABLE("table", "Normal Paimon table."), FORMAT_TABLE( "format-table", - "A file format table refers to a directory that contains multiple files of the same format."); - + "A file format table refers to a directory that contains multiple files of the same format."), + FLINK_MATERIALIZED_TABLE("flink-materialized-table", "A Flink materialized table."); private final String value; private final String description; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index b5c81aabc341..88189d693769 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -19,9 +19,9 @@ package org.apache.paimon.flink; import org.apache.paimon.CoreOptions; +import org.apache.paimon.TableType; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.flink.FlinkConnectorOptions.CatalogTableType; import org.apache.paimon.flink.FlinkConnectorOptions.MaterializedTableIntervalFreshnessTimeUnit; import org.apache.paimon.flink.FlinkConnectorOptions.MaterializedTableRefreshMode; import org.apache.paimon.flink.FlinkConnectorOptions.MaterializedTableRefreshStatus; @@ -136,7 +136,6 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_MODE; import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS; import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_SNAPSHOT; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_TABLE_TYPE; import static org.apache.paimon.flink.LogicalTypeConversion.toDataType; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem; @@ -349,40 +348,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig // TableDescriptor) Map options = new HashMap<>(table.getOptions()); if (table instanceof CatalogMaterializedTable) { - CatalogMaterializedTable mt = (CatalogMaterializedTable) table; - Options mtOptions = new Options(); - mtOptions.set(CATALOG_TABLE_TYPE, CatalogTableType.MATERIALIZED_TABLE); - mt.getSnapshot().ifPresent(x -> mtOptions.set(CATALOG_MATERIALIZED_TABLE_SNAPSHOT, x)); - mtOptions.set(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY, mt.getDefinitionQuery()); - mtOptions.set( - CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS, - mt.getDefinitionFreshness().getInterval()); - mtOptions.set( - CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT, - MaterializedTableIntervalFreshnessTimeUnit.valueOf( - mt.getDefinitionFreshness().getTimeUnit().name())); - mtOptions.set( - CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE, - MaterializedTableRefreshMode.valueOf(mt.getLogicalRefreshMode().name())); - mtOptions.set( - CATALOG_MATERIALIZED_TABLE_REFRESH_MODE, - MaterializedTableRefreshMode.valueOf(mt.getRefreshMode().name())); - mtOptions.set( - CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS, - MaterializedTableRefreshStatus.valueOf(mt.getRefreshStatus().name())); - mt.getRefreshHandlerDescription() - .ifPresent( - desc -> - mtOptions.set( - CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION, - desc)); - byte[] serializedRefreshHandler = mt.getSerializedRefreshHandler(); - if (serializedRefreshHandler != null) { - mtOptions.set( - CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES, - encodeBytesToBase64(serializedRefreshHandler)); - } - options.putAll(mtOptions.toMap()); + fillOptionsForMaterializedTable((CatalogMaterializedTable) table, options); } Schema paimonSchema = buildPaimonSchema(identifier, table, options); @@ -402,6 +368,43 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } + private static void fillOptionsForMaterializedTable( + CatalogMaterializedTable mt, Map options) { + Options mtOptions = new Options(); + mtOptions.set(CoreOptions.TYPE, TableType.FLINK_MATERIALIZED_TABLE); + mt.getSnapshot().ifPresent(x -> mtOptions.set(CATALOG_MATERIALIZED_TABLE_SNAPSHOT, x)); + mtOptions.set(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY, mt.getDefinitionQuery()); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS, + mt.getDefinitionFreshness().getInterval()); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT, + MaterializedTableIntervalFreshnessTimeUnit.valueOf( + mt.getDefinitionFreshness().getTimeUnit().name())); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE, + MaterializedTableRefreshMode.valueOf(mt.getLogicalRefreshMode().name())); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_REFRESH_MODE, + MaterializedTableRefreshMode.valueOf(mt.getRefreshMode().name())); + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS, + MaterializedTableRefreshStatus.valueOf(mt.getRefreshStatus().name())); + mt.getRefreshHandlerDescription() + .ifPresent( + desc -> + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION, + desc)); + byte[] serializedRefreshHandler = mt.getSerializedRefreshHandler(); + if (serializedRefreshHandler != null) { + mtOptions.set( + CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES, + encodeBytesToBase64(serializedRefreshHandler)); + } + options.putAll(mtOptions.toMap()); + } + protected Schema buildPaimonSchema( Identifier identifier, CatalogBaseTable catalogTable, Map options) { String connector = options.get(CONNECTOR.key()); @@ -882,17 +885,20 @@ private CatalogBaseTable toCatalogTable(Table table) { removeProperties.asMap().keySet().forEach(newOptions::remove); Options options = Options.fromMap(newOptions); - if (CatalogBaseTable.TableKind.TABLE - .name() - .equals(options.get(CATALOG_TABLE_TYPE).name())) { - return new DataCatalogTable( - table, - schema, - table.partitionKeys(), - newOptions, - table.comment().orElse(""), - nonPhysicalColumnComments); + if (TableType.FLINK_MATERIALIZED_TABLE == options.get(CoreOptions.TYPE)) { + return buildMaterializedTable(table, newOptions, schema, options); } + return new DataCatalogTable( + table, + schema, + table.partitionKeys(), + newOptions, + table.comment().orElse(""), + nonPhysicalColumnComments); + } + + private CatalogMaterializedTable buildMaterializedTable( + Table table, Map newOptions, TableSchema schema, Options options) { Long snapshot = options.get(CATALOG_MATERIALIZED_TABLE_SNAPSHOT); String definitionQuery = options.get(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY); IntervalFreshness freshness = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index ba5428fb6245..c74ec2834969 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -404,23 +404,16 @@ public class FlinkConnectorOptions { .withDescription( "Optional endInput watermark used in case of batch mode or bounded stream."); - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption CATALOG_TABLE_TYPE = - key("catalog-table.type") - .enumType(CatalogTableType.class) - .defaultValue(CatalogTableType.TABLE) - .withDescription("The type of flink catalog table."); - @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption CATALOG_MATERIALIZED_TABLE_SNAPSHOT = - key("catalog-materialized-table.snapshot") + key("materialized-table.snapshot") .longType() .noDefaultValue() .withDescription("The snapshot specified for the materialized table"); @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY = - key("catalog-materialized-table.definition-query") + key("materialized-table.definition-query") .stringType() .noDefaultValue() .withDescription( @@ -428,7 +421,7 @@ public class FlinkConnectorOptions { @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS = - key("catalog-materialized-table.interval-freshness") + key("materialized-table.interval-freshness") .stringType() .noDefaultValue() .withDescription( @@ -437,7 +430,7 @@ public class FlinkConnectorOptions { @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT = - key("catalog-materialized-table.interval-freshness.time-unit") + key("materialized-table.interval-freshness.time-unit") .enumType(MaterializedTableIntervalFreshnessTimeUnit.class) .noDefaultValue() .withDescription("The time unit of freshness interval."); @@ -445,7 +438,7 @@ public class FlinkConnectorOptions { @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE = - key("catalog-materialized-table.logical-refresh-mode") + key("materialized-table.logical-refresh-mode") .enumType(MaterializedTableRefreshMode.class) .noDefaultValue() .withDescription("the logical refresh mode of materialized table."); @@ -453,7 +446,7 @@ public class FlinkConnectorOptions { @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption CATALOG_MATERIALIZED_TABLE_REFRESH_MODE = - key("catalog-materialized-table.refresh-mode") + key("materialized-table.refresh-mode") .enumType(MaterializedTableRefreshMode.class) .noDefaultValue() .withDescription("the physical refresh mode of materialized table."); @@ -461,7 +454,7 @@ public class FlinkConnectorOptions { @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS = - key("catalog-materialized-table.refresh-status") + key("materialized-table.refresh-status") .enumType(MaterializedTableRefreshStatus.class) .noDefaultValue() .withDescription("the refresh status of materialized table."); @@ -469,7 +462,7 @@ public class FlinkConnectorOptions { @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION = - key("catalog-materialized-table.refresh-handler-description") + key("materialized-table.refresh-handler-description") .stringType() .noDefaultValue() .withDescription( @@ -477,7 +470,7 @@ public class FlinkConnectorOptions { @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES = - key("catalog-materialized-table.refresh-handler-bytes") + key("materialized-table.refresh-handler-bytes") .stringType() .noDefaultValue() .withDescription("The serialized refresh handler of materialized table."); @@ -497,12 +490,6 @@ public static List> getOptions() { return list; } - /** The type of flink catalog table. */ - public enum CatalogTableType { - TABLE, - MATERIALIZED_TABLE - } - /** The time unit of materialized table freshness. */ public enum MaterializedTableIntervalFreshnessTimeUnit { SECOND, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index ec2bd704073c..7ef884da9385 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink; import org.apache.paimon.CoreOptions; +import org.apache.paimon.TableType; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.log.LogSinkProvider; @@ -86,7 +87,6 @@ import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS; import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_TABLE_TYPE; import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -787,7 +787,7 @@ private void checkEquals( if (t1.getTableKind() == CatalogBaseTable.TableKind.TABLE) { t1 = ((ResolvedCatalogTable) t1).copy(options); } else { - options.put(CATALOG_TABLE_TYPE.key(), "MATERIALIZED_TABLE"); + options.put(CoreOptions.TYPE.key(), TableType.FLINK_MATERIALIZED_TABLE.toString()); t1 = ((ResolvedCatalogMaterializedTable) t1).copy(options); } checkEquals(t1, t2); From ef6f809430fbc7cccd191eb91b1ebb180f00f578 Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Wed, 25 Sep 2024 11:13:06 +0800 Subject: [PATCH 3/6] re-generate doc --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 9870353bde69..f7a592be9e4c 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -886,7 +886,7 @@
type
table

Enum

- Type of the table.

Possible values:
  • "table": Normal Paimon table.
  • "format-table": A file format table refers to a directory that contains multiple files of the same format.
+ Type of the table.

Possible values:
  • "table": Normal Paimon table.
  • "format-table": A file format table refers to a directory that contains multiple files of the same format.
  • "flink-materialized-table": A Flink materialized table.
write-buffer-for-append
From 9476338978338fd31c15a3c08b945b8c015cc36d Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Sun, 29 Sep 2024 15:52:13 +0800 Subject: [PATCH 4/6] fixup! re-generate doc --- .../java/org/apache/paimon/CoreOptions.java | 99 +++++++++++++++++ .../org/apache/paimon/flink/FlinkCatalog.java | 95 ++++++++-------- .../paimon/flink/FlinkConnectorOptions.java | 101 ------------------ 3 files changed, 143 insertions(+), 152 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 0743d98bafb0..b76b9caaaad8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1373,6 +1373,75 @@ public class CoreOptions implements Serializable { .withDescription( "Whether to enable asynchronous IO writing when writing files."); + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption MATERIALIZED_TABLE_SNAPSHOT = + key("materialized-table.snapshot") + .longType() + .noDefaultValue() + .withDescription("The snapshot specified for the materialized table"); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption MATERIALIZED_TABLE_DEFINITION_QUERY = + key("materialized-table.definition-query") + .stringType() + .noDefaultValue() + .withDescription( + "The definition query text of materialized table, text is expanded in contrast to the original SQL."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption MATERIALIZED_TABLE_INTERVAL_FRESHNESS = + key("materialized-table.interval-freshness") + .stringType() + .noDefaultValue() + .withDescription( + "the freshness interval of materialized table which is used to determine the physical refresh mode."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption + MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT = + key("materialized-table.interval-freshness.time-unit") + .enumType(MaterializedTableIntervalFreshnessTimeUnit.class) + .noDefaultValue() + .withDescription("The time unit of freshness interval."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption + MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE = + key("materialized-table.logical-refresh-mode") + .enumType(MaterializedTableRefreshMode.class) + .noDefaultValue() + .withDescription("the logical refresh mode of materialized table."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption MATERIALIZED_TABLE_REFRESH_MODE = + key("materialized-table.refresh-mode") + .enumType(MaterializedTableRefreshMode.class) + .noDefaultValue() + .withDescription("the physical refresh mode of materialized table."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption + MATERIALIZED_TABLE_REFRESH_STATUS = + key("materialized-table.refresh-status") + .enumType(MaterializedTableRefreshStatus.class) + .noDefaultValue() + .withDescription("the refresh status of materialized table."); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION = + key("materialized-table.refresh-handler-description") + .stringType() + .noDefaultValue() + .withDescription( + "The summary description of materialized table's refresh handler"); + + @ExcludeFromDocumentation("Only used internally to support materialized table") + public static final ConfigOption MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES = + key("materialized-table.refresh-handler-bytes") + .stringType() + .noDefaultValue() + .withDescription("The serialized refresh handler of materialized table."); + private final Options options; public CoreOptions(Map options) { @@ -2801,4 +2870,34 @@ public InlineElement getDescription() { return text(description); } } + + /** The time unit of materialized table freshness. */ + public enum MaterializedTableIntervalFreshnessTimeUnit { + SECOND, + MINUTE, + HOUR, + DAY + } + + /** The refresh mode of materialized table. */ + public enum MaterializedTableRefreshMode { + /** The refresh pipeline will be executed in continuous mode. */ + CONTINUOUS, + + /** The refresh pipeline will be executed in full mode. */ + FULL, + + /** + * The refresh pipeline mode is determined by freshness of materialized table, either {@link + * #FULL} or {@link #CONTINUOUS}. + */ + AUTOMATIC + } + + /** The refresh status of materialized table. */ + public enum MaterializedTableRefreshStatus { + INITIALIZING, + ACTIVATED, + SUSPENDED + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 88189d693769..3928289a8346 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -22,9 +22,6 @@ import org.apache.paimon.TableType; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.flink.FlinkConnectorOptions.MaterializedTableIntervalFreshnessTimeUnit; -import org.apache.paimon.flink.FlinkConnectorOptions.MaterializedTableRefreshMode; -import org.apache.paimon.flink.FlinkConnectorOptions.MaterializedTableRefreshStatus; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; import org.apache.paimon.fs.Path; @@ -123,19 +120,19 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes; import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64; +import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_DEFINITION_QUERY; +import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_INTERVAL_FRESHNESS; +import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT; +import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE; +import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES; +import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION; +import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_REFRESH_MODE; +import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_REFRESH_STATUS; +import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_SNAPSHOT; import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_MODE; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS; -import static org.apache.paimon.flink.FlinkConnectorOptions.CATALOG_MATERIALIZED_TABLE_SNAPSHOT; import static org.apache.paimon.flink.LogicalTypeConversion.toDataType; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem; @@ -372,34 +369,33 @@ private static void fillOptionsForMaterializedTable( CatalogMaterializedTable mt, Map options) { Options mtOptions = new Options(); mtOptions.set(CoreOptions.TYPE, TableType.FLINK_MATERIALIZED_TABLE); - mt.getSnapshot().ifPresent(x -> mtOptions.set(CATALOG_MATERIALIZED_TABLE_SNAPSHOT, x)); - mtOptions.set(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY, mt.getDefinitionQuery()); + mt.getSnapshot().ifPresent(x -> mtOptions.set(MATERIALIZED_TABLE_SNAPSHOT, x)); + mtOptions.set(MATERIALIZED_TABLE_DEFINITION_QUERY, mt.getDefinitionQuery()); mtOptions.set( - CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS, - mt.getDefinitionFreshness().getInterval()); + MATERIALIZED_TABLE_INTERVAL_FRESHNESS, mt.getDefinitionFreshness().getInterval()); mtOptions.set( - CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT, - MaterializedTableIntervalFreshnessTimeUnit.valueOf( + MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT, + CoreOptions.MaterializedTableIntervalFreshnessTimeUnit.valueOf( mt.getDefinitionFreshness().getTimeUnit().name())); mtOptions.set( - CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE, - MaterializedTableRefreshMode.valueOf(mt.getLogicalRefreshMode().name())); + MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE, + CoreOptions.MaterializedTableRefreshMode.valueOf( + mt.getLogicalRefreshMode().name())); mtOptions.set( - CATALOG_MATERIALIZED_TABLE_REFRESH_MODE, - MaterializedTableRefreshMode.valueOf(mt.getRefreshMode().name())); + MATERIALIZED_TABLE_REFRESH_MODE, + CoreOptions.MaterializedTableRefreshMode.valueOf(mt.getRefreshMode().name())); mtOptions.set( - CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS, - MaterializedTableRefreshStatus.valueOf(mt.getRefreshStatus().name())); + MATERIALIZED_TABLE_REFRESH_STATUS, + CoreOptions.MaterializedTableRefreshStatus.valueOf(mt.getRefreshStatus().name())); mt.getRefreshHandlerDescription() .ifPresent( desc -> mtOptions.set( - CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION, - desc)); + MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION, desc)); byte[] serializedRefreshHandler = mt.getSerializedRefreshHandler(); if (serializedRefreshHandler != null) { mtOptions.set( - CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES, + MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES, encodeBytesToBase64(serializedRefreshHandler)); } options.putAll(mtOptions.toMap()); @@ -589,8 +585,7 @@ protected boolean handleMaterializedTableChange( modifyRefreshStatus.getRefreshStatus(); schemaChanges.add( SchemaChange.setOption( - CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS.key(), - newRefreshStatus.name())); + MATERIALIZED_TABLE_REFRESH_STATUS.key(), newRefreshStatus.name())); return true; } else if (change instanceof ModifyRefreshHandler) { ModifyRefreshHandler modifyRefreshHandler = (ModifyRefreshHandler) change; @@ -598,11 +593,10 @@ protected boolean handleMaterializedTableChange( byte[] newHandlerBytes = modifyRefreshHandler.getRefreshHandlerBytes(); schemaChanges.add( SchemaChange.setOption( - CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION.key(), - newHandlerDesc)); + MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION.key(), newHandlerDesc)); schemaChanges.add( SchemaChange.setOption( - CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES.key(), + MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES.key(), encodeBytesToBase64(newHandlerBytes))); return true; } @@ -899,28 +893,27 @@ private CatalogBaseTable toCatalogTable(Table table) { private CatalogMaterializedTable buildMaterializedTable( Table table, Map newOptions, TableSchema schema, Options options) { - Long snapshot = options.get(CATALOG_MATERIALIZED_TABLE_SNAPSHOT); - String definitionQuery = options.get(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY); + Long snapshot = options.get(MATERIALIZED_TABLE_SNAPSHOT); + String definitionQuery = options.get(MATERIALIZED_TABLE_DEFINITION_QUERY); IntervalFreshness freshness = IntervalFreshness.of( - options.get(CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS), + options.get(MATERIALIZED_TABLE_INTERVAL_FRESHNESS), IntervalFreshness.TimeUnit.valueOf( - options.get(CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT) + options.get(MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT) .name())); CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode = CatalogMaterializedTable.LogicalRefreshMode.valueOf( - options.get(CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE).name()); + options.get(MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE).name()); CatalogMaterializedTable.RefreshMode refreshMode = CatalogMaterializedTable.RefreshMode.valueOf( - options.get(CATALOG_MATERIALIZED_TABLE_REFRESH_MODE).name()); + options.get(MATERIALIZED_TABLE_REFRESH_MODE).name()); CatalogMaterializedTable.RefreshStatus refreshStatus = CatalogMaterializedTable.RefreshStatus.valueOf( - options.get(CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS).name()); + options.get(MATERIALIZED_TABLE_REFRESH_STATUS).name()); String refreshHandlerDescription = - options.get(CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION); + options.get(MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION); byte[] serializedRefreshHandler = - decodeRefreshHandlerBytes( - options.get(CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES)); + decodeRefreshHandlerBytes(options.get(MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES)); // remove materialized table related options allMaterializedTableAttributes().forEach(newOptions::remove); return CatalogMaterializedTable.newBuilder() @@ -1312,14 +1305,14 @@ private boolean isCalledFromFlinkRecomputeStatisticsProgram() { private List allMaterializedTableAttributes() { return Arrays.asList( - CATALOG_MATERIALIZED_TABLE_SNAPSHOT.key(), - CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY.key(), - CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS.key(), - CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT.key(), - CATALOG_MATERIALIZED_TABLE_REFRESH_MODE.key(), - CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE.key(), - CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS.key(), - CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION.key(), - CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES.key()); + MATERIALIZED_TABLE_SNAPSHOT.key(), + MATERIALIZED_TABLE_DEFINITION_QUERY.key(), + MATERIALIZED_TABLE_INTERVAL_FRESHNESS.key(), + MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT.key(), + MATERIALIZED_TABLE_REFRESH_MODE.key(), + MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE.key(), + MATERIALIZED_TABLE_REFRESH_STATUS.key(), + MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION.key(), + MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES.key()); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index c74ec2834969..d181d7b5a0c6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -404,77 +404,6 @@ public class FlinkConnectorOptions { .withDescription( "Optional endInput watermark used in case of batch mode or bounded stream."); - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption CATALOG_MATERIALIZED_TABLE_SNAPSHOT = - key("materialized-table.snapshot") - .longType() - .noDefaultValue() - .withDescription("The snapshot specified for the materialized table"); - - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY = - key("materialized-table.definition-query") - .stringType() - .noDefaultValue() - .withDescription( - "The definition query text of materialized table, text is expanded in contrast to the original SQL."); - - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS = - key("materialized-table.interval-freshness") - .stringType() - .noDefaultValue() - .withDescription( - "the freshness interval of materialized table which is used to determine the physical refresh mode."); - - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption - CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT = - key("materialized-table.interval-freshness.time-unit") - .enumType(MaterializedTableIntervalFreshnessTimeUnit.class) - .noDefaultValue() - .withDescription("The time unit of freshness interval."); - - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption - CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE = - key("materialized-table.logical-refresh-mode") - .enumType(MaterializedTableRefreshMode.class) - .noDefaultValue() - .withDescription("the logical refresh mode of materialized table."); - - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption - CATALOG_MATERIALIZED_TABLE_REFRESH_MODE = - key("materialized-table.refresh-mode") - .enumType(MaterializedTableRefreshMode.class) - .noDefaultValue() - .withDescription("the physical refresh mode of materialized table."); - - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption - CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS = - key("materialized-table.refresh-status") - .enumType(MaterializedTableRefreshStatus.class) - .noDefaultValue() - .withDescription("the refresh status of materialized table."); - - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption - CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION = - key("materialized-table.refresh-handler-description") - .stringType() - .noDefaultValue() - .withDescription( - "The summary description of materialized table's refresh handler"); - - @ExcludeFromDocumentation("Only used internally to support materialized table") - public static final ConfigOption CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES = - key("materialized-table.refresh-handler-bytes") - .stringType() - .noDefaultValue() - .withDescription("The serialized refresh handler of materialized table."); - public static List> getOptions() { final Field[] fields = FlinkConnectorOptions.class.getFields(); final List> list = new ArrayList<>(fields.length); @@ -490,36 +419,6 @@ public static List> getOptions() { return list; } - /** The time unit of materialized table freshness. */ - public enum MaterializedTableIntervalFreshnessTimeUnit { - SECOND, - MINUTE, - HOUR, - DAY - } - - /** The refresh mode of materialized table. */ - public enum MaterializedTableRefreshMode { - /** The refresh pipeline will be executed in continuous mode. */ - CONTINUOUS, - - /** The refresh pipeline will be executed in full mode. */ - FULL, - - /** - * The refresh pipeline mode is determined by freshness of materialized table, either {@link - * #FULL} or {@link #CONTINUOUS}. - */ - AUTOMATIC - } - - /** The refresh status of materialized table. */ - public enum MaterializedTableRefreshStatus { - INITIALIZING, - ACTIVATED, - SUSPENDED - } - /** The mode of lookup cache. */ public enum LookupCacheMode { /** Auto mode, try to use partial mode. */ From 38dcb20a7555e049be353290a4ea9d3fd25b69cd Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Sun, 29 Sep 2024 15:56:17 +0800 Subject: [PATCH 5/6] re-generate doc --- paimon-common/src/main/java/org/apache/paimon/TableType.java | 2 +- .../src/main/java/org/apache/paimon/flink/FlinkCatalog.java | 4 ++-- .../test/java/org/apache/paimon/flink/FlinkCatalogTest.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/TableType.java b/paimon-common/src/main/java/org/apache/paimon/TableType.java index e57b213348aa..d690d5db3700 100644 --- a/paimon-common/src/main/java/org/apache/paimon/TableType.java +++ b/paimon-common/src/main/java/org/apache/paimon/TableType.java @@ -29,7 +29,7 @@ public enum TableType implements DescribedEnum { FORMAT_TABLE( "format-table", "A file format table refers to a directory that contains multiple files of the same format."), - FLINK_MATERIALIZED_TABLE("flink-materialized-table", "A Flink materialized table."); + MATERIALIZED_TABLE("materialized-table", "A materialized table."); private final String value; private final String description; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 3928289a8346..0f46f2965c86 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -368,7 +368,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig private static void fillOptionsForMaterializedTable( CatalogMaterializedTable mt, Map options) { Options mtOptions = new Options(); - mtOptions.set(CoreOptions.TYPE, TableType.FLINK_MATERIALIZED_TABLE); + mtOptions.set(CoreOptions.TYPE, TableType.MATERIALIZED_TABLE); mt.getSnapshot().ifPresent(x -> mtOptions.set(MATERIALIZED_TABLE_SNAPSHOT, x)); mtOptions.set(MATERIALIZED_TABLE_DEFINITION_QUERY, mt.getDefinitionQuery()); mtOptions.set( @@ -879,7 +879,7 @@ private CatalogBaseTable toCatalogTable(Table table) { removeProperties.asMap().keySet().forEach(newOptions::remove); Options options = Options.fromMap(newOptions); - if (TableType.FLINK_MATERIALIZED_TABLE == options.get(CoreOptions.TYPE)) { + if (TableType.MATERIALIZED_TABLE == options.get(CoreOptions.TYPE)) { return buildMaterializedTable(table, newOptions, schema, options); } return new DataCatalogTable( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 7ef884da9385..4f2db7aabeed 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -787,7 +787,7 @@ private void checkEquals( if (t1.getTableKind() == CatalogBaseTable.TableKind.TABLE) { t1 = ((ResolvedCatalogTable) t1).copy(options); } else { - options.put(CoreOptions.TYPE.key(), TableType.FLINK_MATERIALIZED_TABLE.toString()); + options.put(CoreOptions.TYPE.key(), TableType.MATERIALIZED_TABLE.toString()); t1 = ((ResolvedCatalogMaterializedTable) t1).copy(options); } checkEquals(t1, t2); From 0aab0e938741bee7877a001409f3967bf2e7751a Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Mon, 30 Sep 2024 17:13:53 +0800 Subject: [PATCH 6/6] fix doc --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f7a592be9e4c..4375347785bc 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -886,7 +886,7 @@
type
table

Enum

- Type of the table.

Possible values:
  • "table": Normal Paimon table.
  • "format-table": A file format table refers to a directory that contains multiple files of the same format.
  • "flink-materialized-table": A Flink materialized table.
+ Type of the table.

Possible values:
  • "table": Normal Paimon table.
  • "format-table": A file format table refers to a directory that contains multiple files of the same format.
  • "materialized-table": A materialized table.
write-buffer-for-append