Skip to content

Commit

Permalink
[tidb] Add metadata column ITCase and improve the code style
Browse files Browse the repository at this point in the history
This closes #898.
  • Loading branch information
leonardBang committed Mar 16, 2022
1 parent 22a4688 commit 39a9974
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 87 deletions.
10 changes: 0 additions & 10 deletions flink-cdc-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,6 @@ under the License.
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>

<artifactItem>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-tidb-cdc</artifactId>
<version>${project.version}</version>
<destFileName>tidb-cdc-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ protected void flushRows(final long timestamp) throws Exception {

@Override
public void cancel() {
// TODO: abort pending transactions
try {
if (cdcClient != null) {
cdcClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,6 @@ public void deserialize(Row row, Collector<RowData> out) throws Exception {
row.getValue().toByteArray(), handle, tableInfo));
emit(new TiKVMetadataConverter.TiKVRowValue(row), rowDataInsert, out);
} else {
// TODO TiKV cdc client doesn't return old value in PUT event
// if (!row.getOldValue().isEmpty()) {
// out.collect(
// GenericRowData.ofKind(
// RowKind.UPDATE_BEFORE,
// getRowDataFields(
//
// row.getOldValue().toByteArray(),
// handle,
// tableInfo)));
// }
RowData rowDataUpdate =
GenericRowData.ofKind(
RowKind.UPDATE_AFTER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.tidb.table;

/**
* Startup modes for the Oracle CDC Consumer.
* Startup modes for the TiDB CDC Consumer.
*
* @see StartupOptions
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

/** TiDB CDC Source startup options. */
public final class StartupOptions {

public final StartupMode startupMode;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ public TiDBTableSource(
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
// TODO TiKV cdc client doesn't return old value in PUT event
// .addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

/** Emits a row with physical fields and metadata fields. */
public class TiKVAppendMetadataCollector implements Collector<RowData>, Serializable {

private static final long serialVersionUID = 1L;

private final TiKVMetadataConverter[] metadataConverters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@
@FunctionalInterface
@Internal
public interface TiKVMetadataConverter extends Serializable {

Object read(TiKVRowValue row);

/** TiKV Row Value. */
class TiKVRowValue {
public boolean isKv;
public boolean isSnapshotRecord;
public Kvrpcpb.KvPair kvPair;
public Cdcpb.Event.Row row;

public TiKVRowValue(Kvrpcpb.KvPair kvPair) {
this.isKv = true;
this.isSnapshotRecord = true;
this.kvPair = kvPair;
}

public TiKVRowValue(Cdcpb.Event.Row row) {
this.isKv = false;
this.isSnapshotRecord = false;
this.row = row;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public static TiKVReadableMetadata createOpTsMetadata() {

@Override
public Object read(TiKVRowValue row) {
if (row.isKv) {
// We cannot get ts from KvPair, use default value.
return TimestampData.fromEpochMillis(0);
if (row.isSnapshotRecord) {
// Uses OL as the operation time of snapshot records.
return TimestampData.fromEpochMillis(0L);
} else {
return TimestampData.fromEpochMillis(row.row.getStartTs());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ public static Object toRowDataType(Object object, DataType dataType) {
break;
case "BigDecimal":
BigDecimal bigDecimal = (BigDecimal) object;
// TODO improve the conversion code
int precision = ((DecimalType) dataType.getLogicalType()).getPrecision();
int scale = ((DecimalType) dataType.getLogicalType()).getScale();
result = DecimalData.fromBigDecimal(bigDecimal, precision, scale);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,40 +345,83 @@ public void testAddColumn() throws Exception {
result.getJobClient().get().cancel().get();
}

/* @Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
TiDBTableSource tidbTableSource = (TiDBTableSource) actualSource;
tidbTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "table_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = tidbTableSource.copy();
TiDBTableSource expectedSource =
new TiDBTableSource(
SCHEMA_WITH_METADATA,
MY_HOSTNAME,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
PD_ADDRESS,
StartupOptions.latest(),
OPTIONS);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name");
assertEquals(expectedSource, actualSource);
ScanTableSource.ScanRuntimeProvider provider =
tidbTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
TiKVRichParallelSourceFunction<RowData> sourceFunction =
(TiKVRichParallelSourceFunction<RowData>)
((SourceFunctionProvider) provider).createSourceFunction();
assertProducedTypeOfSourceFunction(sourceFunction, expectedSource.producedDataType);
}*/
@Test
public void testMetadataColumns() throws Exception {
initializeTidbTable("inventory");

String sourceDDL =
String.format(
"CREATE TABLE tidb_source ("
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL,"
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'tidb-cdc',"
+ " 'hostname' = '%s',"
+ " 'tikv.grpc.timeout_in_ms' = '20000',"
+ " 'pd-addresses' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
TIDB.getContainerIpAddress(),
PD.getContainerIpAddress() + ":" + PD.getMappedPort(PD_PORT_ORIGIN),
TIDB_USER,
TIDB_PASSWORD,
"inventory",
"products");

String sinkDDL =
"CREATE TABLE sink ("
+ " database_name STRING,"
+ " table_name STRING,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " primary key (database_name, table_name, id) not enforced"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);

// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM tidb_source");

// wait for snapshot finished and begin binlog
waitForSinkSize("sink", 9);

try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
}

waitForSinkSize("sink", 10);

List<String> expected =
Arrays.asList(
"+I(inventory,products,101,scooter,Small 2-wheel scooter,3.1400000000)",
"+I(inventory,products,102,car battery,12V car battery,8.1000000000)",
"+I(inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)",
"+I(inventory,products,104,hammer,12oz carpenter's hammer,0.7500000000)",
"+I(inventory,products,105,hammer,14oz carpenter's hammer,0.8750000000)",
"+I(inventory,products,106,hammer,16oz carpenter's hammer,1.0000000000)",
"+I(inventory,products,107,rocks,box of assorted rocks,5.3000000000)",
"+I(inventory,products,108,jacket,water resistent black wind breaker,0.1000000000)",
"+I(inventory,products,109,spare tire,24 inch spare tire,22.2000000000)",
"+U(inventory,products,106,hammer,18oz carpenter hammer,1.0000000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertEqualsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}

private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package com.ververica.cdc.connectors.tidb.table;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
Expand All @@ -29,12 +28,8 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;

import com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction;
import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -43,12 +38,10 @@
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;

/** Integration tests for TiDB table source factory. */
public class TiDBTableSourceFactoryITCase {
/** Unit tests for TiDB table source factory. */
public class TiDBTableSourceFactoryTest {

private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
Expand Down Expand Up @@ -162,21 +155,11 @@ private static DynamicTableSource createTableSource(
options),
schema),
new Configuration(),
TiDBTableSourceFactoryITCase.class.getClassLoader(),
TiDBTableSourceFactoryTest.class.getClassLoader(),
false);
}

private static DynamicTableSource createTableSource(Map<String, String> options) {
return createTableSource(SCHEMA, options);
}

private static void assertProducedTypeOfSourceFunction(
TiKVRichParallelSourceFunction<RowData> sourceFunction, DataType expectedProducedType) {
TypeInformation<RowData> producedType = sourceFunction.getProducedType();
assertThat(producedType, instanceOf(InternalTypeInfo.class));
InternalTypeInfo<RowData> rowDataInternalTypeInfo =
(InternalTypeInfo<RowData>) producedType;
DataType producedDataType = rowDataInternalTypeInfo.getDataType();
assertEquals(expectedProducedType.toString(), producedDataType.toString());
}
}
7 changes: 6 additions & 1 deletion flink-sql-connector-tidb-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ under the License.
<include>com.ververica:flink-connector-tidb-cdc</include>
<include>org.tikv:tikv-client-java</include>
<include>com.google.protobuf:*</include>
<!-- TODO shade grpc dependency with resources -->
<include>io.grpc:*</include>
</includes>
</artifactSet>
Expand All @@ -67,6 +66,12 @@ under the License.
com.ververica.cdc.connectors.shaded.com.google
</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.io.grpc
</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down

0 comments on commit 39a9974

Please sign in to comment.