diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java index 248014c8a..f96cd3ea0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import java.time.Duration; import java.util.Properties; import java.util.function.Function; @@ -38,10 +39,11 @@ public class KafkaSourceOptions { public static final ConfigOption PARTITION_DISCOVERY_INTERVAL_MS = ConfigOptions.key("partition.discovery.interval.ms") .longType() - .noDefaultValue() + .defaultValue(Duration.ofMinutes(5).toMillis()) .withDescription( "The interval in milliseconds for the Kafka source to discover " - + "the new partitions. A non-positive value disables the partition discovery."); + + "the new partitions. A non-positive value disables the partition discovery." + + "The default value is 5 minutes, which is equal to the default value of metadata.max.age.ms in Kafka."); public static final ConfigOption REGISTER_KAFKA_CONSUMER_METRICS = ConfigOptions.key("register.consumer.metrics") diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java index a6cdbcedc..81ff13c3c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java @@ -188,9 +188,11 @@ public class KafkaConnectorOptions { public static final ConfigOption SCAN_TOPIC_PARTITION_DISCOVERY = ConfigOptions.key("scan.topic-partition-discovery.interval") .durationType() - .noDefaultValue() + .defaultValue(Duration.ofMinutes(5)) .withDescription( - "Optional interval for consumer to discover dynamically created Kafka partitions periodically."); + "Optional interval for consumer to discover dynamically created Kafka partitions periodically." + + "The value 0 disables the partition discovery." + + "The default value is 5 minutes, which is equal to the default value of metadata.max.age.ms in Kafka."); // -------------------------------------------------------------------------------------------- // Sink specific options diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 48c00918a..89dda61a1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -201,11 +201,11 @@ public DynamicTableSource createDynamicTableSource(Context context) { final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); // add topic-partition discovery - final Optional partitionDiscoveryInterval = - tableOptions.getOptional(SCAN_TOPIC_PARTITION_DISCOVERY).map(Duration::toMillis); + final Duration partitionDiscoveryInterval = + tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY); properties.setProperty( KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), - partitionDiscoveryInterval.orElse(-1L).toString()); + Long.toString(partitionDiscoveryInterval.toMillis())); final DataType physicalDataType = context.getPhysicalRowDataType(); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index 3ced33f98..fe5e5388f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -229,8 +229,8 @@ public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() throws public void testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); - // set partitionDiscoveryIntervalMs = 0 - KafkaSourceEnumerator enumerator = createEnumerator(context, 0L)) { + // Disable periodic partition discovery + KafkaSourceEnumerator enumerator = createEnumerator(context, false)) { // Start the enumerator, and it should schedule a one time task to discover and assign // partitions. @@ -358,7 +358,6 @@ public void testWorkWithPreexistingAssignments() throws Throwable { KafkaSourceEnumerator enumerator = createEnumerator( context2, - ENABLE_PERIODIC_PARTITION_DISCOVERY ? 1 : -1, OffsetsInitializer.earliest(), PRE_EXISTING_TOPICS, preexistingAssignments, @@ -390,7 +389,6 @@ public void testKafkaClientProperties() throws Exception { KafkaSourceEnumerator enumerator = createEnumerator( context, - ENABLE_PERIODIC_PARTITION_DISCOVERY ? 1 : -1, OffsetsInitializer.earliest(), PRE_EXISTING_TOPICS, Collections.emptySet(), @@ -501,6 +499,33 @@ public void testPartitionChangeChecking() throws Throwable { } } + @Test + public void testEnablePartitionDiscoveryByDefault() throws Throwable { + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + KafkaSourceEnumerator enumerator = createEnumerator(context, new Properties())) { + enumerator.start(); + long partitionDiscoveryIntervalMs = + (long) Whitebox.getInternalState(enumerator, "partitionDiscoveryIntervalMs"); + assertThat(partitionDiscoveryIntervalMs) + .isEqualTo(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.defaultValue()); + assertThat(context.getPeriodicCallables()).isNotEmpty(); + } + } + + @Test + public void testDisablePartitionDiscovery() throws Throwable { + Properties props = new Properties(); + props.setProperty( + KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), String.valueOf(0)); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + KafkaSourceEnumerator enumerator = createEnumerator(context, props)) { + enumerator.start(); + assertThat(context.getPeriodicCallables()).isEmpty(); + } + } + // -------------- some common startup sequence --------------- private void startEnumeratorAndRegisterReaders( @@ -539,13 +564,9 @@ private KafkaSourceEnumerator createEnumerator( } private KafkaSourceEnumerator createEnumerator( - MockSplitEnumeratorContext enumContext, - long partitionDiscoveryIntervalMs) { + MockSplitEnumeratorContext enumContext, Properties properties) { return createEnumerator( - enumContext, - partitionDiscoveryIntervalMs, - EXCLUDE_DYNAMIC_TOPIC, - OffsetsInitializer.earliest()); + enumContext, properties, EXCLUDE_DYNAMIC_TOPIC, OffsetsInitializer.earliest()); } private KafkaSourceEnumerator createEnumerator( @@ -557,20 +578,23 @@ private KafkaSourceEnumerator createEnumerator( if (includeDynamicTopic) { topics.add(DYNAMIC_TOPIC_NAME); } + Properties props = new Properties(); + props.setProperty( + KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), + enablePeriodicPartitionDiscovery ? "1" : "-1"); return createEnumerator( enumContext, - enablePeriodicPartitionDiscovery ? 1 : -1, startingOffsetsInitializer, topics, Collections.emptySet(), Collections.emptySet(), false, - new Properties()); + props); } private KafkaSourceEnumerator createEnumerator( MockSplitEnumeratorContext enumContext, - long partitionDiscoveryIntervalMs, + Properties props, boolean includeDynamicTopic, OffsetsInitializer startingOffsetsInitializer) { List topics = new ArrayList<>(PRE_EXISTING_TOPICS); @@ -579,13 +603,12 @@ private KafkaSourceEnumerator createEnumerator( } return createEnumerator( enumContext, - partitionDiscoveryIntervalMs, startingOffsetsInitializer, topics, Collections.emptySet(), Collections.emptySet(), false, - new Properties()); + props); } /** @@ -594,7 +617,6 @@ private KafkaSourceEnumerator createEnumerator( */ private KafkaSourceEnumerator createEnumerator( MockSplitEnumeratorContext enumContext, - long partitionDiscoveryIntervalMs, OffsetsInitializer startingOffsetsInitializer, Collection topicsToSubscribe, Set assignedPartitions, @@ -613,9 +635,6 @@ private KafkaSourceEnumerator createEnumerator( Properties props = new Properties(KafkaSourceTestEnv.getConsumerProperties(StringDeserializer.class)); KafkaSourceEnumerator.deepCopyProperties(overrideProperties, props); - props.setProperty( - KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), - String.valueOf(partitionDiscoveryIntervalMs)); return new KafkaSourceEnumerator( subscriber, diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index 7ab050359..bdcbec71f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -1095,6 +1095,82 @@ public void testPrimaryKeyValidation() { + " guarantee the semantic of primary key."); } + @Test + public void testDiscoverPartitionByDefault() { + Map tableSourceOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> options.remove("scan.topic-partition-discovery.interval")); + final KafkaDynamicSource actualSource = + (KafkaDynamicSource) createTableSource(SCHEMA, tableSourceOptions); + Properties props = new Properties(); + props.putAll(KAFKA_SOURCE_PROPERTIES); + // The default partition discovery interval is 5 minutes + props.setProperty("partition.discovery.interval.ms", "300000"); + final Map specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + final DecodingFormat> valueDecodingFormat = + new DecodingFormatMock(",", true); + // Test scan source equals + final KafkaDynamicSource expectedKafkaSource = + createExpectedScanSource( + SCHEMA_DATA_TYPE, + null, + valueDecodingFormat, + new int[0], + new int[] {0, 1, 2}, + null, + Collections.singletonList(TOPIC), + null, + props, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets, + 0); + assertThat(actualSource).isEqualTo(expectedKafkaSource); + ScanTableSource.ScanRuntimeProvider provider = + actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertKafkaSource(provider); + } + + @Test + public void testDisableDiscoverPartition() { + Map tableSourceOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> options.put("scan.topic-partition-discovery.interval", "0")); + final KafkaDynamicSource actualSource = + (KafkaDynamicSource) createTableSource(SCHEMA, tableSourceOptions); + Properties props = new Properties(); + props.putAll(KAFKA_SOURCE_PROPERTIES); + // Disable discovery if the partition discovery interval is 0 minutes + props.setProperty("partition.discovery.interval.ms", "0"); + final Map specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + final DecodingFormat> valueDecodingFormat = + new DecodingFormatMock(",", true); + // Test scan source equals + final KafkaDynamicSource expectedKafkaSource = + createExpectedScanSource( + SCHEMA_DATA_TYPE, + null, + valueDecodingFormat, + new int[0], + new int[] {0, 1, 2}, + null, + Collections.singletonList(TOPIC), + null, + props, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets, + 0); + assertThat(actualSource).isEqualTo(expectedKafkaSource); + ScanTableSource.ScanRuntimeProvider provider = + actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertKafkaSource(provider); + } + // -------------------------------------------------------------------------------------------- // Utilities // --------------------------------------------------------------------------------------------