Skip to content

Commit

Permalink
update cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Sep 24, 2024
1 parent 1fb8087 commit 745a42e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 44 deletions.
2 changes: 1 addition & 1 deletion paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ under the License.
<name>Paimon : Flink : CDC</name>

<properties>
<flink.version>1.18.1</flink.version>
<flink.version>1.20.0</flink.version>
<flink.cdc.version>3.1.1</flink.cdc.version>
<flink.mongodb.cdc.version>3.1.1</flink.mongodb.cdc.version>
<avro.version>1.11.1</avro.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,40 +349,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
// TableDescriptor)
Map<String, String> options = new HashMap<>(table.getOptions());
if (table instanceof CatalogMaterializedTable) {
CatalogMaterializedTable mt = (CatalogMaterializedTable) table;
Options mtOptions = new Options();
mtOptions.set(CATALOG_TABLE_TYPE, CatalogTableType.MATERIALIZED_TABLE);
mt.getSnapshot().ifPresent(x -> mtOptions.set(CATALOG_MATERIALIZED_TABLE_SNAPSHOT, x));
mtOptions.set(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY, mt.getDefinitionQuery());
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS,
mt.getDefinitionFreshness().getInterval());
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT,
MaterializedTableIntervalFreshnessTimeUnit.valueOf(
mt.getDefinitionFreshness().getTimeUnit().name()));
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE,
MaterializedTableRefreshMode.valueOf(mt.getLogicalRefreshMode().name()));
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_REFRESH_MODE,
MaterializedTableRefreshMode.valueOf(mt.getRefreshMode().name()));
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS,
MaterializedTableRefreshStatus.valueOf(mt.getRefreshStatus().name()));
mt.getRefreshHandlerDescription()
.ifPresent(
desc ->
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION,
desc));
byte[] serializedRefreshHandler = mt.getSerializedRefreshHandler();
if (serializedRefreshHandler != null) {
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES,
encodeBytesToBase64(serializedRefreshHandler));
}
options.putAll(mtOptions.toMap());
fillOptionsForMaterializedTable((CatalogMaterializedTable) table, options);
}
Schema paimonSchema = buildPaimonSchema(identifier, table, options);

Expand All @@ -402,6 +369,43 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private static void fillOptionsForMaterializedTable(
CatalogMaterializedTable mt, Map<String, String> options) {
Options mtOptions = new Options();
mtOptions.set(CATALOG_TABLE_TYPE, CatalogTableType.MATERIALIZED_TABLE);
mt.getSnapshot().ifPresent(x -> mtOptions.set(CATALOG_MATERIALIZED_TABLE_SNAPSHOT, x));
mtOptions.set(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY, mt.getDefinitionQuery());
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS,
mt.getDefinitionFreshness().getInterval());
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT,
MaterializedTableIntervalFreshnessTimeUnit.valueOf(
mt.getDefinitionFreshness().getTimeUnit().name()));
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE,
MaterializedTableRefreshMode.valueOf(mt.getLogicalRefreshMode().name()));
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_REFRESH_MODE,
MaterializedTableRefreshMode.valueOf(mt.getRefreshMode().name()));
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS,
MaterializedTableRefreshStatus.valueOf(mt.getRefreshStatus().name()));
mt.getRefreshHandlerDescription()
.ifPresent(
desc ->
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION,
desc));
byte[] serializedRefreshHandler = mt.getSerializedRefreshHandler();
if (serializedRefreshHandler != null) {
mtOptions.set(
CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES,
encodeBytesToBase64(serializedRefreshHandler));
}
options.putAll(mtOptions.toMap());
}

protected Schema buildPaimonSchema(
Identifier identifier, CatalogBaseTable catalogTable, Map<String, String> options) {
String connector = options.get(CONNECTOR.key());
Expand Down Expand Up @@ -893,6 +897,11 @@ private CatalogBaseTable toCatalogTable(Table table) {
table.comment().orElse(""),
nonPhysicalColumnComments);
}
return buildMaterializedTable(table, newOptions, schema, options);
}

private CatalogMaterializedTable buildMaterializedTable(
Table table, Map<String, String> newOptions, TableSchema schema, Options options) {
Long snapshot = options.get(CATALOG_MATERIALIZED_TABLE_SNAPSHOT);
String definitionQuery = options.get(CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY);
IntervalFreshness freshness =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,22 +422,22 @@ public class FlinkConnectorOptions {

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<Long> CATALOG_MATERIALIZED_TABLE_SNAPSHOT =
key("catalog-materialized-table.snapshot")
key("materialized-table.snapshot")
.longType()
.noDefaultValue()
.withDescription("The snapshot specified for the materialized table");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<String> CATALOG_MATERIALIZED_TABLE_DEFINITION_QUERY =
key("catalog-materialized-table.definition-query")
key("materialized-table.definition-query")
.stringType()
.noDefaultValue()
.withDescription(
"The definition query text of materialized table, text is expanded in contrast to the original SQL.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<String> CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS =
key("catalog-materialized-table.interval-freshness")
key("materialized-table.interval-freshness")
.stringType()
.noDefaultValue()
.withDescription(
Expand All @@ -446,47 +446,47 @@ public class FlinkConnectorOptions {
@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<MaterializedTableIntervalFreshnessTimeUnit>
CATALOG_MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT =
key("catalog-materialized-table.interval-freshness.time-unit")
key("materialized-table.interval-freshness.time-unit")
.enumType(MaterializedTableIntervalFreshnessTimeUnit.class)
.noDefaultValue()
.withDescription("The time unit of freshness interval.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<MaterializedTableRefreshMode>
CATALOG_MATERIALIZED_TABLE_LOGICAL_REFRESH_MODE =
key("catalog-materialized-table.logical-refresh-mode")
key("materialized-table.logical-refresh-mode")
.enumType(MaterializedTableRefreshMode.class)
.noDefaultValue()
.withDescription("the logical refresh mode of materialized table.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<MaterializedTableRefreshMode>
CATALOG_MATERIALIZED_TABLE_REFRESH_MODE =
key("catalog-materialized-table.refresh-mode")
key("materialized-table.refresh-mode")
.enumType(MaterializedTableRefreshMode.class)
.noDefaultValue()
.withDescription("the physical refresh mode of materialized table.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<MaterializedTableRefreshStatus>
CATALOG_MATERIALIZED_TABLE_REFRESH_STATUS =
key("catalog-materialized-table.refresh-status")
key("materialized-table.refresh-status")
.enumType(MaterializedTableRefreshStatus.class)
.noDefaultValue()
.withDescription("the refresh status of materialized table.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<String>
CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_DESCRIPTION =
key("catalog-materialized-table.refresh-handler-description")
key("materialized-table.refresh-handler-description")
.stringType()
.noDefaultValue()
.withDescription(
"The summary description of materialized table's refresh handler");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<String> CATALOG_MATERIALIZED_TABLE_REFRESH_HANDLER_BYTES =
key("catalog-materialized-table.refresh-handler-bytes")
key("materialized-table.refresh-handler-bytes")
.stringType()
.noDefaultValue()
.withDescription("The serialized refresh handler of materialized table.");
Expand Down

0 comments on commit 745a42e

Please sign in to comment.