Skip to content

Commit

Permalink
[FLINK-32743][Connectors/Kafka] Parse data from kafka connect and con…
Browse files Browse the repository at this point in the history
…vert it into regular JSON data
  • Loading branch information
sunxiaojian authored and xiaoyi committed Jan 26, 2024
1 parent 4f30099 commit 4df2bd8
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 16 deletions.
16 changes: 15 additions & 1 deletion docs/content.zh/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,23 @@ CREATE TABLE KafkaTable (
<td><h5>scan.topic-partition-discovery.interval</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">5分钟</td>
<td>Duration</td>
<td>Boolean</td>
<td>Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。需要显式地设置'scan.topic-partition-discovery.interval'为0才能关闭此功能</td>
</tr>
<tr>
<td><h5>record.key.include.kafka.connect.json.schema</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>判断ConsumerRecord key中是否是包含了schema信息的kafka connect SourceRecord, 若为true, 在解析过程中直接取payload, 转换成普通的对象数据.</td>
</tr>
<tr>
<td><h5>record.value.include.kafka.connect.json.schema</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">false</td>
<td>Duration</td>
<td>判断ConsumerRecord value中是否是包含了schema信息的kafka connect SourceRecord, 若为true, 在解析过程中直接取payload, 转换成普通的对象数据.</td>
</tr>
<tr>
<td><h5>sink.partitioner</h5></td>
<td>可选</td>
Expand Down
14 changes: 14 additions & 0 deletions docs/content/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,20 @@ Connector Options
<td>Duration</td>
<td>Interval for consumer to discover dynamically created Kafka topics and partitions periodically. To disable this feature, you need to explicitly set the 'scan.topic-partition-discovery.interval' value to 0.</td>
</tr>
<tr>
<td><h5>record.key.include.kafka.connect.json.schema</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>The record key from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed.</td>
</tr>
<tr>
<td><h5>record.value.include.kafka.connect.json.schema</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>The record value from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed.</td>
</tr>
<tr>
<td><h5>sink.partitioner</h5></td>
<td>optional</td>
Expand Down
8 changes: 7 additions & 1 deletion flink-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,15 @@ under the License.
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<!-- Tests -->

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,15 @@ public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
return this;
}

public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
DeserializationSchema<OUT> deserializationSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
this.deserializationSchema =
KafkaRecordDeserializationSchema.valueOnly(
deserializationSchema, valueIncludeKafkaConnectJsonSchema);
return this;
}

/**
* Sets the client id prefix of this KafkaSource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ static <V> KafkaRecordDeserializationSchema<V> valueOnly(
return new KafkaValueOnlyDeserializationSchemaWrapper<>(valueDeserializationSchema);
}

static <V> KafkaRecordDeserializationSchema<V> valueOnly(
DeserializationSchema<V> valueDeserializationSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
return new KafkaValueOnlyDeserializationSchemaWrapper<>(
valueDeserializationSchema, valueIncludeKafkaConnectJsonSchema);
}

/**
* Wraps a Kafka {@link Deserializer} to a {@link KafkaRecordDeserializationSchema}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.util.ExtractPayloadSourceRecordUtil;
import org.apache.flink.util.Collector;

import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -35,9 +36,17 @@
class KafkaValueOnlyDeserializationSchemaWrapper<T> implements KafkaRecordDeserializationSchema<T> {
private static final long serialVersionUID = 1L;
private final DeserializationSchema<T> deserializationSchema;
private final boolean valueIncludeKafkaConnectJsonSchema;

KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
this(deserializationSchema, false);
}

KafkaValueOnlyDeserializationSchemaWrapper(
DeserializationSchema<T> deserializationSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
this.deserializationSchema = deserializationSchema;
this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema;
}

@Override
Expand All @@ -48,7 +57,10 @@ public void open(DeserializationSchema.InitializationContext context) throws Exc
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
throws IOException {
deserializationSchema.deserialize(message.value(), out);
byte[] extractValue =
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
message.value(), valueIncludeKafkaConnectJsonSchema);
deserializationSchema.deserialize(extractValue, out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kafka.source.util;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/** Extract payload from kafka connect SourceRecord,filter out the schema. */
public class ExtractPayloadSourceRecordUtil {

private static final String RECORD_PAYLOAD_FIELD = "payload";
private static final ObjectMapper objectMapper = new ObjectMapper();

public static byte[] extractPayloadIfIncludeConnectSchema(byte[] message, boolean includeSchema)
throws IOException {
if (includeSchema) {
JsonNode jsonNode = deserializeToJsonNode(message);
return objectMapper.writeValueAsBytes(jsonNode.get(RECORD_PAYLOAD_FIELD));
}
return message;
}

private static JsonNode deserializeToJsonNode(byte[] message) throws IOException {
return objectMapper.readTree(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.util.ExtractPayloadSourceRecordUtil;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -56,6 +57,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro

private final boolean upsertMode;

private final boolean keyIncludeKafkaConnectJsonSchema;
private final boolean valueIncludeKafkaConnectJsonSchema;

DynamicKafkaDeserializationSchema(
int physicalArity,
@Nullable DeserializationSchema<RowData> keyDeserialization,
Expand All @@ -65,7 +69,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
boolean hasMetadata,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean upsertMode) {
boolean upsertMode,
boolean keyIncludeKafkaConnectJsonSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
Expand All @@ -84,6 +90,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
upsertMode);
this.producedTypeInfo = producedTypeInfo;
this.upsertMode = upsertMode;
this.keyIncludeKafkaConnectJsonSchema = keyIncludeKafkaConnectJsonSchema;
this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema;
}

@Override
Expand All @@ -110,13 +118,19 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(record.value(), collector);
byte[] extractRecordValue =
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
record.value(), valueIncludeKafkaConnectJsonSchema);
valueDeserialization.deserialize(extractRecordValue, collector);
return;
}

// buffer key(s)
if (keyDeserialization != null) {
keyDeserialization.deserialize(record.key(), keyCollector);
byte[] extractRecordKey =
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
record.key(), keyIncludeKafkaConnectJsonSchema);
keyDeserialization.deserialize(extractRecordKey, keyCollector);
}

// project output while emitting values
Expand All @@ -127,7 +141,10 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
valueDeserialization.deserialize(record.value(), outputCollector);
byte[] extractRecordValue =
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
record.value(), valueIncludeKafkaConnectJsonSchema);
valueDeserialization.deserialize(extractRecordValue, outputCollector);
}
keyCollector.buffer.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ public class KafkaConnectorOptions {
+ "The value 0 disables the partition discovery."
+ "The default value is 5 minutes, which is equal to the default value of metadata.max.age.ms in Kafka.");

public static final ConfigOption<Boolean> RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA =
ConfigOptions.key("record.key.include.kafka.connect.json.schema")
.booleanType()
.defaultValue(false)
.withDescription(
"The record key from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed.");

public static final ConfigOption<Boolean> RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA =
ConfigOptions.key("record.value.include.kafka.connect.json.schema")
.booleanType()
.defaultValue(false)
.withDescription(
"The record value from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed.");

// --------------------------------------------------------------------------------------------
// Sink specific options
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ public class KafkaDynamicSource

protected final String tableIdentifier;

protected final boolean keyIncludeKafkaConnectJsonSchema;
protected final boolean valueIncludeKafkaConnectJsonSchema;

public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
Expand All @@ -189,6 +192,48 @@ public KafkaDynamicSource(
long boundedTimestampMillis,
boolean upsertMode,
String tableIdentifier) {
this(
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
topics,
topicPattern,
properties,
startupMode,
specificStartupOffsets,
startupTimestampMillis,
boundedMode,
specificBoundedOffsets,
boundedTimestampMillis,
upsertMode,
tableIdentifier,
false,
false);
}

public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
BoundedMode boundedMode,
Map<KafkaTopicPartition, Long> specificBoundedOffsets,
long boundedTimestampMillis,
boolean upsertMode,
String tableIdentifier,
boolean keyIncludeKafkaConnectJsonSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
Expand Down Expand Up @@ -228,6 +273,8 @@ public KafkaDynamicSource(
this.boundedTimestampMillis = boundedTimestampMillis;
this.upsertMode = upsertMode;
this.tableIdentifier = tableIdentifier;
this.keyIncludeKafkaConnectJsonSchema = keyIncludeKafkaConnectJsonSchema;
this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema;
}

@Override
Expand Down Expand Up @@ -344,7 +391,9 @@ public DynamicTableSource copy() {
specificBoundedOffsets,
boundedTimestampMillis,
upsertMode,
tableIdentifier);
tableIdentifier,
keyIncludeKafkaConnectJsonSchema,
valueIncludeKafkaConnectJsonSchema);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
Expand Down Expand Up @@ -384,7 +433,9 @@ public boolean equals(Object o) {
&& boundedTimestampMillis == that.boundedTimestampMillis
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(tableIdentifier, that.tableIdentifier)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
&& Objects.equals(watermarkStrategy, that.watermarkStrategy)
&& keyIncludeKafkaConnectJsonSchema == that.keyIncludeKafkaConnectJsonSchema
&& valueIncludeKafkaConnectJsonSchema == that.valueIncludeKafkaConnectJsonSchema;
}

@Override
Expand Down Expand Up @@ -550,7 +601,9 @@ private KafkaDeserializationSchema<RowData> createKafkaDeserializationSchema(
hasMetadata,
metadataConverters,
producedTypeInfo,
upsertMode);
upsertMode,
keyIncludeKafkaConnectJsonSchema,
valueIncludeKafkaConnectJsonSchema);
}

private @Nullable DeserializationSchema<RowData> createDeserialization(
Expand Down
Loading

0 comments on commit 4df2bd8

Please sign in to comment.